Python "store"

Admin User, created Apr 27. 2024
         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
MASK_PRED_DYNAMIC = 0x00000001
MASK_PRED_SPECIAL = 0x00000002
MASK_PRED_CHECK = 0x00000004
MASK_PRED_ARITHMETIC = 0x00000008
MASK_TOUCH_DYNAMIC = 0x00000001
MASK_ADD_BOTTOM = 0x00000001
MASK_ADD_DYNAMIC = 0x00000002
MASK_REMOVE_BOTTOM = 0x00000001
kb = {}
stage = -1
partition = "system"
###
# Set the clause and predicate current stage.
#
# @param num The current stage.
##
def set_stage(num):
global stage
stage = num
###
# Set the clause current partition.
#
# @param str The current partition.
##
def set_partition(path):
engine.partition = path
################################################################
# Engine #
################################################################
###
# Create a slow state engine.
#
# @constructor The engine.
##
class Engine:
def __init__(self):
self.signal = NotImplemented
self.interrupt = lambda: None
self.text_output = NotImplemented
self.text_error = NotImplemented
self.text_input = NotImplemented
self.low = 0
self.high = 0
self.serno = 0
self.backtrail = None
self.backcount = 0
self.partition = "system" if stage == -1 else "user"
engine = Engine()
def set_engine(ptr):
global engine
engine = ptr
################################################################
# Variable & Compound #
################################################################
###
# Create a Prolog variable.
#
# @constructor The new variable.
##
class Variable:
def __init__(self):
self.instantiated = NotImplemented
if engine.low < engine.high:
self.flags = engine.low
engine.low += 1
else:
self.flags = engine.serno
engine.serno += 1
self.tail = None
###
# Check whether an object is a variable.
#
# @param obj The object.
# @return True if the object is a variable, otherwise false.
##
def is_variable(obj):
return isinstance(obj, Variable)
###
# Create a compound.
#
# @param functor The functor.
# @param args The arguments.
# @constructor The new compound.
##
class Compound:
def __init__(self, functor, args):
self.functor = functor
self.args = args
###
# Check whether an object is a compound.
#
# @param obj The object.
# @return True if the object is a compound, otherwise false.
##
def is_compound(obj):
return isinstance(obj, Compound)
################################################################
# Slot & Functor #
################################################################
###
# Create a Albufeira Code place.
##
class Place:
def __init__(self, index):
self.index = index
###
# Check whether an object is a place.
#
# @param obj The object.
# @return boolean True if the object is a place, otherwise false.
##
def is_place(obj):
return isinstance(obj, Place)
###
# Create a Albufeira Code skeleton.
#
# @param functor The functor.
# @param args The arguments.
##
class Skeleton:
def __init__(self, functor, args):
self.functor = functor
self.args = args
###
# Check whether an object is a skeleton.
#
# @param obj The object.
# @return boolean True if the object is a skeleton, otherwise false.
##
def is_skeleton(obj):
return isinstance(obj, Skeleton)
################################################################
# Quote #
################################################################
###
# Create a Albufeira Code quote.
##
class Quote:
def __init__(self, obj):
self.obj = obj
###
# Check whether an object is a quote.
#
# @param obj The object.
# @return boolean True if the object is a quote, otherwise false.
##
def is_quote(obj):
return isinstance(obj, Quote)
def unquote_objects(body):
i = 0
while i < len(body):
alpha = body[i]
if is_quote(alpha):
body[i] = alpha.obj
i += 1
################################################################
# Clauses Lifecycle #
################################################################
###
# Create a clause.
#
# @param size The number of variables.
# @param head The unify Albufeira code.
# @param body The build Albufeira code.
# @param cutvar The choice point variable index or -1.
# @param idxval The index value.
# @constructor The new clause.
##
class Clause:
def __init__(self, size, head, body, cutvar, idxval):
self.size = size
self.head = head
self.body = body
self.cutvar = cutvar
self.idxval = idxval
self.creator = stage
self.remover = NotImplemented
self.shard = ""
###
# CHeck whether the object is a clause.
#
# @param obj The object.
# @return True if the object is a clause, otherwise false.
##
def is_clause(obj):
return isinstance(obj, Clause)
###
# Create a logical.
#
# @param cache The cache of non-deleted clauses.
# @param count The counter of non-deleted clauses.
# @param data The clause list.
# @constructor The new logical.
##
class Logical:
def __init__(self, cache, count, data):
self.cache = cache
self.count = count
self.data = data
###
# Check whether an object is a logical.
#
# @param obj The object.
# @return True if the object is a logical, otherwise false.
##
def is_logical(obj):
return isinstance(obj, Logical)
################################################################
# Knowledgebase #
################################################################
###
# Create a provable.
#
# @param rope The rope.
# @param nonguard The non guard.
# @param idxmap The index map.
# @constructor The new provable.
##
class Provable:
def __init__(self, flags):
self.flags = flags
self.rope = NotImplemented
self.nonguard = NotImplemented
self.idxmap = NotImplemented
self.func = NotImplemented
self.creator = stage
self.remover = NotImplemented
self.overlay = NotImplemented
###
# Check whether an object is a provable.
#
# @param obj The object.
# @return True if the object is a provable, otherwise false.
##
def is_provable(obj):
return isinstance(obj, Provable)
###
# Create a cache node.
#
# @param name The functor.
# @constructor The new cache node.
##
class Cache:
def __init__(self, name):
self.link = NotImplemented
self.name = name
###
# Check whether an object is a cache.
#
# @param obj The object.
# @return True if the object is a cache, otherwise false.
##
def is_cache(obj):
return isinstance(obj, Cache)
################################################################
# Snapshot Data #
################################################################
###
# Make snapshot of a logical.
#
# @param rope The logical.
# @return The clause list snapshot.
##
def snapshot_data(rope):
res = rope.cache
if res is None:
res = [NotImplemented] * rope.count
rope_copy(res, rope)
rope.cache = res
return res
def rope_copy(res, rope):
data = rope.data
j = 0
i = 0
while i < len(data):
clause = data[i]
if clause.remover is NotImplemented:
res[j] = clause
j += 1
i += 1
################################################################
# Linked Provables #
################################################################
###
# Retrieve a provable from monomorphic cache.
#
# @param cache The cache.
# @param arity The arity.
# @return The provable or NotImplemented
##
def ensure_link(cache, arity):
peek = cache.link
if peek is NotImplemented or peek.remover is not NotImplemented:
peek = pred_link(cache.name, arity)
cache.link = peek
return peek
###
# Retrieve a provable by predicate indicator.
#
# @param functor The functor.
# @param arity The arity.
# @return The provable or NotImplemented
##
def pred_link(functor, arity):
temp = kb.get(functor, NotImplemented)
if temp is NotImplemented:
return NotImplemented
peek = (temp[arity] if arity < len(temp) else NotImplemented)
if peek is NotImplemented or peek.remover is not NotImplemented:
return NotImplemented
return peek
###
# The function returns an anonymous predicate for the given clauses.
#
# @param rope The clauses.
# @return The predicate.
##
def make_defined(rope):
peek = Provable(0)
peek.rope = new_rope()
peek.nonguard = new_rope()
peek.idxmap = {}
i = 0
while i < len(rope):
add_peek(peek, rope[i], MASK_ADD_BOTTOM)
i += 1
return peek
def new_rope():
return Logical(None, 0, [])
################################################################
# Dynamic Predicates #
################################################################
# Avoid cyclic import member error
from nova.machine import (make_error, make_indicator, VOID_ARGS)
###
# Create a new dynamic predicate entry.
#
# @param functor The functor.
# @param arity The arity.
# @param flags The flags.
##
def pred_touch(functor, arity, flags):
temp = kb.get(functor, NotImplemented)
if temp is NotImplemented:
temp = []
kb[functor] = temp
peek = (temp[arity] if arity < len(temp) else NotImplemented)
if peek is NotImplemented or peek.remover is not NotImplemented:
res = make_defined(VOID_ARGS)
if peek is not NotImplemented:
res.overlay = peek
if (flags & MASK_TOUCH_DYNAMIC) != 0:
res.flags |= MASK_PRED_DYNAMIC
while arity >= len(temp):
temp.append(NotImplemented)
temp[arity] = res
else:
if not is_logical(peek.rope):
raise make_error(Compound("permission_error",
["modify", "static_procedure",
make_indicator(functor, arity)]))
if (flags & MASK_TOUCH_DYNAMIC) != 0:
if (peek.flags & MASK_PRED_DYNAMIC) == 0:
raise make_error(Compound("permission_error",
["modify", "static_procedure",
make_indicator(functor, arity)]))
################################################################
# Destroy Provable #
################################################################
###
# Remove a predicate from the knowledge base.
#
# @param functor The functor.
# @param arity The arity.
##
def pred_destroy(functor, arity):
temp = kb.get(functor, NotImplemented)
if temp is NotImplemented:
return
peek = (temp[arity] if arity < len(temp) else NotImplemented)
if peek is NotImplemented or peek.remover is not NotImplemented:
return
if peek.creator == stage:
peek = clear_pop(peek)
temp[arity] = peek
if peek is NotImplemented and trim_arities(temp):
del kb[functor]
else:
peek.remover = stage
###
# Clear and pop a provable.
#
# @param peek The provable.
# @return The parent provable.
##
def clear_pop(peek):
peek.remover = stage
peek.rope = None
peek.idxmap = None
peek.nonguard = None
back = peek
peek = back.overlay
back.overlay = NotImplemented
return peek
###
# Trim the arities array.
#
# @param peek The arities array.
# @return boolean True if empty, otherwise fale.
##
def trim_arities(peek):
pos = len(peek)
while pos > 0 and peek[pos-1] is NotImplemented:
pos -= 1
if pos == 0:
return True
if pos != len(peek):
del peek[pos:]
return False
################################################################
# Clauses Transactions #
################################################################
###
# Rollback the clauses.
##
def clear():
for functor in list(kb):
temp = kb[functor]
i = 0
while i < len(temp):
peek = temp[i]
if peek is NotImplemented:
i += 1
continue
if peek.creator == stage:
peek = clear_pop(peek)
temp[i] = peek
if peek is NotImplemented:
i += 1
continue
if peek.remover == stage:
peek.remover = NotImplemented
if is_logical(peek.rope):
rollback_peek(peek)
i += 1
if trim_arities(temp):
del kb[functor]
###
# Rollback the clauses of a predicate.
#
# @param peek The predicate.
##
def rollback_peek(peek):
if not has_action(peek.rope):
return
for (key, value) in list(peek.idxmap.items()):
if has_action(value):
rollback_rope(value, False)
if len(value.data) == 0:
del peek.idxmap[key]
if has_action(peek.nonguard):
rollback_rope(peek.nonguard, False)
rollback_rope(peek.rope, True)
###
# Check whether a logical has some action.
#
# @param rope The logical.
# @return True if the logical has some action, false otherwise.
##
def has_action(rope):
rope = rope.data
i = 0
while i < len(rope):
clause = rope[i]
if clause.creator == stage:
return True
if clause.remover == stage:
return True
i += 1
return False
###
# Rollback clauses from a logical.
#
# @param rope The logical.
# @param update The update flag.
##
def rollback_rope(rope, update):
data = rope.data
j = 0
i = 0
while i < len(data):
clause = data[i]
if clause.creator == stage:
rope.count -= 1
else:
if clause.remover == stage:
if update:
clause.remover = NotImplemented
rope.count += 1
data[j] = clause
j += 1
i += 1
del data[j:]
rope.cache = None
################################################################
# Clause Addition #
################################################################
###
# Enhance the knowledge base by a claus or predicate.
#
# @param functor The functor.
# @param arity The arity.
# @param clause_or_pred The clause or predicate.
##
def add(functor, arity, clause_or_pred):
add_clause(functor, arity, clause_or_pred, MASK_ADD_BOTTOM)
def add_clause(functor, arity, clause_or_pred, flags):
temp = kb.get(functor, NotImplemented)
if temp is NotImplemented:
temp = []
kb[functor] = temp
peek = (temp[arity] if arity < len(temp) else NotImplemented)
if is_clause(clause_or_pred):
if peek is NotImplemented or peek.remover is not NotImplemented:
res = make_defined(VOID_ARGS)
if peek is not NotImplemented:
res.overlay = peek
if (flags & MASK_ADD_DYNAMIC) != 0:
res.flags |= MASK_PRED_DYNAMIC
while arity >= len(temp):
temp.append(NotImplemented)
temp[arity] = res
peek = res
else:
if not is_logical(peek.rope):
raise make_error(Compound("permission_error",
["modify", "static_procedure",
make_indicator(functor, arity)]))
if (flags & MASK_ADD_DYNAMIC) != 0:
if (peek.flags & MASK_PRED_DYNAMIC) == 0:
raise make_error(Compound("permission_error",
["modify", "static_procedure",
make_indicator(functor, arity)]))
if (flags & MASK_ADD_DYNAMIC) == 0:
clause_or_pred.shard = engine.partition
add_peek(peek, clause_or_pred, flags)
else:
if peek is NotImplemented or peek.remover is not NotImplemented:
if peek is not NotImplemented:
clause_or_pred.overlay = peek
while arity >= len(temp):
temp.append(NotImplemented)
temp[arity] = clause_or_pred
else:
raise make_error(Compound("permission_error",
["coerce", "procedure", make_indicator(functor, arity)]))
def add_peek(peek, clause, flags):
add_rope(peek.rope, clause, flags)
idxval = clause.idxval
if idxval is NotImplemented:
for (key, value) in peek.idxmap.items():
add_rope(value, clause, flags)
add_rope(peek.nonguard, clause, flags)
else:
value = peek.idxmap.get(idxval, NotImplemented)
if value is NotImplemented:
value = clone_rope(peek.nonguard)
peek.idxmap[idxval] = value
add_rope(value, clause, flags)
def add_rope(rope, clause, flags):
if (flags & MASK_ADD_BOTTOM) != 0:
rope.data.append(clause)
else:
rope.data.insert(0, clause)
rope.count += 1
rope.cache = None
def clone_rope(rope):
return Logical(rope.cache, rope.count, list(rope.data))
################################################################
# Clause Removal #
################################################################
###
# Remove a clause from the knowledge base.
#
# @param functor The functor.
# @param arity The arity.
# @param clause The clause.
# @param flags The flags.
# @return True if the clause was removed, false otherwise.
##
def remove_clause(functor, arity, clause, flags):
if not is_clause(clause):
raise make_error(Compound("permission_error",
["coerce", "procedure", make_indicator(functor, arity)]))
temp = kb.get(functor, NotImplemented)
if temp is NotImplemented:
return False
peek = (temp[arity] if arity < len(temp) else NotImplemented)
if peek is NotImplemented or peek.remover is not NotImplemented:
return False
if not is_logical(peek.rope):
raise make_error(Compound("permission_error",
["modify", "static_procedure", make_indicator(functor, arity)]))
return remove_peek(peek, clause, flags)
def remove_peek(peek, clause, flags):
if clause.remover is not NotImplemented:
return False
if not remove_rope(peek.rope, clause, flags):
return False
idxval = clause.idxval
if idxval is NotImplemented:
for (key, value) in list(peek.idxmap.items()):
remove_rope(value, clause, flags)
if len(value.data) == 0:
del peek.idxmap[key]
remove_rope(peek.nonguard, clause, flags)
else:
value = peek.idxmap.get(idxval)
remove_rope(value, clause, flags)
if len(value.data) == 0:
del peek.idxmap[idxval]
clause.remover = stage
return True
def remove_rope(rope, clause, flags):
data = rope.data
if (flags & MASK_REMOVE_BOTTOM) == 0:
index = rope_index(data, clause)
else:
index = rope_last_index(data, clause)
if index == -1:
return False
if clause.creator == stage:
del data[index]
rope.count -= 1
rope.cache = None
return True
def rope_index(rope, clause):
i = 0
while i < len(rope):
if rope[i] is clause:
return i
i += 1
return -1
def rope_last_index(rope, clause):
i = len(rope)
while i > 0:
i -= 1
if rope[i] is clause:
return i
return -1