Python "special"

Admin User, erstellt 03. Mai 2024
         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
import asyncio
import nova.store as store
from nova.store import (Provable, add,
MASK_PRED_CHECK, MASK_PRED_ARITHMETIC, Variable,
is_variable, Compound, is_compound, MASK_PRED_SPECIAL)
import nova.machine as machine
from nova.machine import (make_error,
deref, cut, cont, bind, GC_MASK_ASYNC_MODE, VAR_MASK_STATE, mark2_term,
Choice, more, snap_setup, snap_cleanup, unify, check_callable,
is_integer, is_atom, is_number, is_float, solve, VOID_ARGS,
solve_signal, set_gc_flags, GC_MASK_ALLOW_YIELD, register_interrupt,
copy_term, check_nonvar, exec_build, exec_unify)
from nova.unicode import code_type, code_numeric
from collections import OrderedDict
import time
import locale
import sys
import importlib
MAX_ARITY = 2147483647
################################################################
# Special Predicate #
################################################################
###
# Return a built-in for a special.
#
# @param func The special.
# @return Provable The built-in.
##
def make_special(func):
peek = Provable(MASK_PRED_SPECIAL)
peek.func = func
return peek
###
# Return a built-in for a check.
#
# @param func The check.
# @return Provable The built-in.
##
def make_check(func):
peek = Provable(MASK_PRED_CHECK)
peek.func = func
return peek
###
# Return a built-in for an arithmetic.
#
# @param func The arithmetic.
# @return Provable The built-in.
##
def make_arithmetic(func):
peek = Provable(MASK_PRED_ARITHMETIC)
peek.func = func
return peek
################################################################
# fail/0, '$CUT'/1 and '$MARK'/1 #
################################################################
###
# fail: [ISO 7.8.2]
# The built-in fails.
##
def test_fail(args):
return False
###
# '$CUT'(R): internal only
# The built-in removes the choice points up to R and succeeds.
##
def test_cut(args):
choice = deref(exec_build(args[0]))
cut(choice)
return True
###
# '$MARK'(R): Internal only
# The built-in binds R to the top choice point.
##
def test_mark(args):
choice = machine.redo
return exec_unify(args[0], choice)
################################################################
# '$SEQ'/2and '$ALT'/1 #
################################################################
###
# '$SEQ'(O, L): internal only
# If the Prolog term O has the form just(R) bind R the top choice point.
# Otherwise, do nothing. The built-in then sequentially adds the goals L
# to the continuation.
##
def special_seq(args):
temp = deref(args[0])
solve_mark(temp)
temp = deref(args[1])
solve_seq(temp)
return True
###
# If the Prolog term has the form just(R) bind R the top choice point.
# Otherwise do nothing.
#
# @param temp The Prolog term.
##
def solve_mark(temp):
if is_compound(temp) and temp.functor == 'just' and len(temp.args) == 1:
temp = deref(temp.args[0])
bind(machine.redo, temp)
###
# Sequentially adds the literals L to the continuation. The list
# is unchecked, no validation that it ends on [].
#
# @param goal The literals.
##
def solve_seq(goal):
back = None
res = None
while is_compound(goal) and goal.functor == "." and len(goal.args) == 2:
peek = goal.args[0]
temp = Compound(".", [peek, NotImplemented])
if back is None:
res = temp
else:
back.args[1] = temp
back = temp
goal = deref(goal.args[1])
if back is None:
res = machine.call.args[1]
else:
back.args[1] = machine.call.args[1]
cont(res)
###
# '$ALT'(L): internal only
# The built-in alternatively adds the variants L to the
# continuation and succeeds.
##
def special_alt(args):
goal = deref(args[0])
return solve_alt(goal, -1, None)
###
# Alternatively adds the variants to the continuation.
#
# @param goal The variants.
# @param at This argument is ignored.
# @param choice The choice point for reuse or null.
# @return True if a variant could be added, otherwise false.
##
def solve_alt(goal, at, choice):
if is_compound(goal) and goal.functor == "." and len(goal.args) == 2:
mark = machine.trail
peek = deref(goal.args[0])
temp = deref(peek.args[0])
solve_mark(temp)
peek = deref(peek.args[1])
goal = deref(goal.args[1])
if is_compound(goal) and goal.functor == "." and len(goal.args) == 2:
if choice is None:
choice = Choice(solve_alt, goal, -1, mark)
else:
choice.data = goal
more(choice)
solve_seq(peek)
return True
else:
return False
################################################################
# sys_raise/1, sys_trap/3 #
################################################################
###
# sys_raise(E): internal only
# The predicate raises the exception E.
##
def test_sys_raise(args):
problem = exec_build(args[0])
raise Exception(copy_term(problem))
###
# sys_trap(G, E, F): internal only
# The built-in succeeds whenever G succeeds. If
# there was an exception that unifies with E, the
# built-in further succeeds whenever F succeeds.
##
def special_sys_trap(args):
goal = deref(args[0])
snap = snap_setup()
goal = Compound(".", [goal, "[]"])
cont(goal)
return solve_catch(snap, True, None)
###
# Call, redo or resume a goal.
# If there is an exception put the handler on the continuation.
#
# @param snap The surrounding choice point.
# @param found The call or redo flag.
# @param choice The choice point for reuse or null.
# @return True if goal succeeds, otherwise false.
##
def solve_catch(snap, found, choice):
if choice is not None:
choice.mark = None
choice.cont = "[]"
choice.tail = None
try:
found = solve(snap, found)
except Exception as err:
snap_cleanup(snap)
goal = deref(machine.call.args[0])
err = map_throwable(err)
if not unify(goal.args[1], err.args[0]):
raise err from None
goal = deref(goal.args[2])
goal = Compound(".", [goal, machine.call.args[1]])
cont(goal)
return True
if found is False:
return False
if machine.redo is not snap:
if choice is None:
choice = Choice(solve_catch, snap, False, machine.trail)
else:
choice.mark = machine.trail
choice.cont = machine.call
choice.tail = machine.redo
more(choice)
else:
cut(snap.tail)
if found is True:
cont(snap.cont.args[1])
return found
def map_throwable(err):
if isinstance(err, RecursionError):
err = make_error(Compound("system_error", ["stack_overflow"]))
return err
################################################################
# os_sleep_promise/2 and os_import_promise/3 #
################################################################
###
# os_sleep_promise(D, P):
# The predicate succeeds in P with a promise for a delay D.
##
def test_os_sleep_promise(args):
delay = deref(exec_build(args[0]))
check_integer(delay)
if delay < 0:
raise make_error(Compound("domain_error",
["not_less_than_zero", delay]))
buf = machine.ctx
return exec_unify(args[1], lambda: sleep_promise(buf, delay))
async def sleep_promise(buf, delay):
register_interrupt(buf, asyncio.current_task().cancel)
try:
await asyncio.sleep(delay/1000.0)
except asyncio.CancelledError:
pass
finally:
register_interrupt(buf, lambda: None)
###
# os_import_promise(F, M, Q):
# The predicate succeeds in Q with with a promise for the
# import M of the file F.
##
def test_os_import_promise(args):
url = deref(exec_build(args[0]))
check_atom(url)
res = {}
if not exec_unify(args[1], res):
return False
buf = machine.ctx
return exec_unify(args[2], lambda: import_promise(buf, url, res))
async def import_promise(buf, url, res):
name = find_name(url)
module = await asyncio.to_thread(blocking_import, name)
res["module"] = module
def blocking_import(name):
return importlib.import_module(name)
###
# Find a Python module name from an url
#
# @param url The url
# @return The Python module name or None
##
def find_name(url):
if not url.endswith(".py"):
return None
path = find_path(url)
if path is None:
return None
url = url[len(path)+1:len(url)-3]
url = url.replace("/", ".")
url = url.replace("\\", ".")
return url
###
# Find a filepath entry for an url
#
# @param url The url
# @return The filepath entry or None
##
def find_path(url):
i = 0
while i < len(sys.path):
path = sys.path[i]
if (url.startswith(path) and
(url.startswith("/", len(path)) or
url.startswith("\\", len(path)))):
return path
i = i+1
return None
###
# os_invoke_main(M):
# Invoke the main method of the module M.
##
def test_os_invoke_main(args):
module = deref(exec_build(args[0]))
module.main()
return True
################################################################
# '$YIELD'/1, shield/1 and unshield/1 #
################################################################
###
# '$YIELD'(R): Internal only
# The built-in stops the interpreter loop with return value R.
##
def special_yield(args):
if (machine.gc_flags & GC_MASK_ALLOW_YIELD) == 0:
raise make_error(Compound("system_error", ["illegal_yield"]))
cont(machine.call.args[1])
more(Choice(solve_signal, None, None, machine.trail))
return deref(args[0])
###
# shield(G):
# The predicate succeeds whenever the goal G succeeds.
# The goal is executed without auto-yield.
##
def special_shield(args):
goal = deref(args[0])
snap = snap_setup()
goal = Compound(".", [goal, "[]"])
cont(goal)
return solve_shield(snap, True, None)
###
# Call, redo or resume a goal.
# If there is an exception put the handler on the continuation.
#
# @param snap The surrounding choice point.
# @param found The call or redo flag.
# @param choice The choice point for reuse or null.
# @return True if goal succeeds, otherwise false.
##
def solve_shield(snap, found, choice):
if choice is not None:
choice.mark = None
choice.cont = "[]"
choice.tail = None
back = machine.gc_flags & GC_MASK_ASYNC_MODE
set_gc_flags(machine.gc_flags & ~GC_MASK_ASYNC_MODE)
try:
found = solve(snap, found)
except Exception as err:
set_gc_flags((machine.gc_flags & ~GC_MASK_ASYNC_MODE) | back)
snap_cleanup(snap)
raise err from None
set_gc_flags((machine.gc_flags & ~GC_MASK_ASYNC_MODE) | back)
if found is False:
return False
if machine.redo is not snap:
if choice is None:
choice = Choice(solve_shield, snap, False, machine.trail)
else:
choice.mark = machine.trail
choice.cont = machine.call
choice.tail = machine.redo
more(choice)
else:
cut(snap.tail)
if found is True:
cont(snap.cont.args[1])
return found
###
# unshield(G):
# The predicate succeeds whenever the goal G succeeds.
# The goal is executed with auto-yield.
##
def special_unshield(args):
goal = deref(args[0])
snap = snap_setup()
goal = Compound(".", [goal, "[]"])
cont(goal)
return solve_unshield(snap, True, None)
###
# Call, redo or resume a goal.
# If there is an exception put the handler on the continuation.
#
# @param snap The surrounding choice point.
# @param found The call or redo flag.
# @param choice The choice point for reuse or null.
# @return True if goal succeeds, otherwise false.
##
def solve_unshield(snap, found, choice):
if choice is not None:
choice.mark = None
choice.cont = "[]"
choice.tail = None
back = machine.gc_flags & GC_MASK_ASYNC_MODE
set_gc_flags(machine.gc_flags | GC_MASK_ASYNC_MODE)
try:
found = solve(snap, found)
except Exception as err:
set_gc_flags((machine.gc_flags & ~GC_MASK_ASYNC_MODE) | back)
snap_cleanup(snap)
raise err from None
set_gc_flags((machine.gc_flags & ~GC_MASK_ASYNC_MODE) | back)
if found is False:
return False
if machine.redo is not snap:
if choice is None:
choice = Choice(solve_unshield, snap, False, machine.trail)
else:
choice.mark = machine.trail
choice.cont = machine.call
choice.tail = machine.redo
more(choice)
else:
cut(snap.tail)
if found is True:
cont(snap.cont.args[1])
return found
################################################################
# Type Assertions #
################################################################
###
# Assure that the object is an atom.
#
# @param beta The object.
##
def check_atom(beta):
if not is_atom(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["atom", beta]))
###
# Assure that the object is a number.
#
# @param beta The object.
##
def check_number(beta):
if not is_number(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["number", beta]))
###
# Assure that the object is an integer.
#
# @param beta The object.
##
def check_integer(beta):
if not is_integer(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["integer", beta]))
###
# Assure that the object is atomic.
#
# @param beta The object.
##
def check_atomic(beta):
if is_variable(beta) or is_compound(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["atomic", beta]))
###
# Assure that the object is nil.
#
# @param beta The object.
##
def check_nil(beta):
if beta != "[]":
if is_compound(beta) and beta.functor == "." and len(beta.args) == 2:
raise make_error(Compound("representation_error", ["int"]))
else:
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["list", beta]))
###
# Assure that the object is a symbol.
#
# @param beta The object.
##
def check_symbol(beta):
if is_variable(beta) or is_compound(beta) or is_number(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["symbol", beta]))
################################################################
# =/2 and copy_term/2 #
################################################################
###
# S = T: [ISO 8.2.1]
# The built-in succeeds when the Prolog terms S and T unify,
# otherwise the built-in fails.
##
def test_unify(args):
alpha = exec_build(args[0])
return exec_unify(args[1], alpha)
###
# copy_term(S, T): [ISO 8.5.4]
# The built-in succeeds in T with a copy of S.
##
def test_copy_term(args):
alpha = exec_build(args[0])
alpha = copy_term(alpha)
return exec_unify(args[1], alpha)
################################################################
# =../2 and functor/3 #
################################################################
###
# T =.. [F|L]: [ISO 8.5.3]
# If T is a variable, the built-in succeeds in T with the Prolog term
# from the functor F and arguments L. Otherwise the built-in succeeds in
# F and L with the functor and arguments of the Prolog term T.
##
def test_univ(args):
alpha = deref(exec_build(args[0]))
if is_variable(alpha):
beta = deref(exec_build(args[1]))
beta = special_univ_pack(beta)
return unify(alpha, beta)
else:
alpha = special_univ_unpack(alpha)
return exec_unify(args[1], alpha)
def special_univ_pack(beta):
if (is_compound(beta) and
"." == beta.functor and
len(beta.args) == 2):
peek = deref(beta.args[1])
args = list_objects(peek)
peek = deref(beta.args[0])
if len(args) == 0:
check_atomic(peek)
else:
check_symbol(peek)
peek = Compound(peek, args)
return peek
else:
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["list", beta]))
def list_objects(obj):
peek = obj
arity = 0
while (is_compound(peek) and
"." == peek.functor and
len(peek.args) == 2 and
arity < MAX_ARITY):
arity += 1
peek = deref(peek.args[1])
check_nil(peek)
if arity == 0:
return VOID_ARGS
else:
args = [NotImplemented] * arity
peek = obj
arity = 0
while (is_compound(peek) and
"." == peek.functor and
len(peek.args) == 2):
args[arity] = deref(peek.args[0])
arity += 1
peek = deref(peek.args[1])
return args
def special_univ_unpack(alpha):
if is_compound(alpha):
res = Compound(".", [alpha.functor, NotImplemented])
back = res
alpha = alpha.args
i = 0
while i < len(alpha):
peek = Compound(".", [alpha[i], NotImplemented])
back.args[1] = peek
back = peek
i += 1
back.args[1] = "[]"
else:
res = Compound(".", [alpha, "[]"])
return res
###
# functor(T, F, A): [ISO 8.5.1]
# If T is a variable, the built-in succeeds in T with a new Prolog term
# from the functor F and the arity A. Otherwise the built-in succeeds in
# F and L with the functor and arguments of the Prolog term T.
##
def test_functor(args):
alpha = deref(exec_build(args[0]))
if is_variable(alpha):
functor = deref(exec_build(args[1]))
arity = deref(exec_build(args[2]))
check_integer(arity)
if arity < 0:
raise make_error(Compound("domain_error",
["not_less_than_zero", arity]))
if arity > MAX_ARITY:
raise make_error(Compound("representation_error",
["max_arity"]))
if arity == 0:
check_atomic(functor)
res = functor
else:
check_symbol(functor)
temp = [NotImplemented] * arity
i = 0
while i < arity:
temp[i] = Variable()
i += 1
res = Compound(functor, temp)
return unify(alpha, res)
else:
if is_compound(alpha):
functor = alpha.functor
arity = len(alpha.args)
else:
functor = alpha
arity = 0
if not exec_unify(args[1], functor):
return False
return exec_unify(args[2], arity)
################################################################
# arg/3 and change_arg/3 #
################################################################
###
# arg(K, X, Y): [ISO 8.5.2]
# The predicate succeeds in Y with the K-th argument of X.
##
def test_arg(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
beta = deref(exec_build(args[1]))
check_callable(beta)
if is_compound(beta):
arity = len(beta.args)
else:
arity = 0
if alpha < 1 or arity < alpha:
return False
beta = beta.args[alpha - 1]
return exec_unify(args[2], beta)
###
# change_arg(K, X, Y):
# The predicate succeeds. As a side-effect the K-th argument of X is set to Y.
##
def test_change_arg(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
beta = deref(exec_build(args[1]))
check_callable(beta)
gamma = deref(exec_build(args[2]))
if is_compound(beta):
arity = len(beta.args)
else:
arity = 0
if alpha < 1 or arity < alpha:
return False
beta = beta.args[alpha - 1]
check_var(beta)
link(gamma, beta)
return True
def check_var(beta):
if not is_variable(beta):
beta = copy_term(beta)
raise make_error(Compound("type_error", ["var", beta]))
def link(source, term):
if term.tail is None:
term.instantiated = source
if ((term.flags & VAR_MASK_STATE) ==
(machine.gc_flags & VAR_MASK_STATE)):
mark2_term(source)
else:
beta = copy_term(term)
raise make_error(Compound("type_error", ["conductor", beta]))
#######################################################################
# ir_object_new/1, ir_object_current/3 and ir_object_set/3 #
#######################################################################
###
# ir_object_new(O):
# The predicate succeeds in O with a new Python object.
##
def test_ir_object_new(args):
res = {}
return exec_unify(args[0], res)
###
# ir_object_current(O, K, V):
# The predicate succeeds in V with the value of the key K
# in the Python object O.
##
def test_ir_object_current(args):
obj = deref(exec_build(args[0]))
key = deref(exec_build(args[1]))
check_atom(key)
try:
res = obj[key]
except KeyError:
return False
return exec_unify(args[2], res)
###
# ir_object_set(O, K, V):
# The predicate sets the value of the key K in the
# Python object O to V.
##
def test_ir_object_set(args):
obj = deref(exec_build(args[0]))
key = deref(exec_build(args[1]))
check_atom(key)
value = deref(exec_build(args[2]))
obj[key] = value
return True
###
# ir_object_keys(O, L):
# The predicate succeeds in L with the keys of
# the Python object O
##
def test_ir_object_keys(args):
obj = deref(exec_build(args[0]))
res = obj.keys()
res = set_to_list(res, "[]")
return exec_unify(args[1], res)
################################################################
# ground/1 and nonground/2 #
################################################################
###
# ground(T): [TC2 8.3.10]
# The built-in succceeds if T is ground.
##
def test_ground(args):
alpha = exec_build(args[0])
alpha = first_variable(alpha)
return alpha is NotImplemented
###
# nonground(T, V):
# The built-in succeeds if T is non-ground and V is the first variable.
##
def test_nonground(args):
alpha = exec_build(args[0])
alpha = first_variable(alpha)
if alpha is NotImplemented:
return False
return exec_unify(args[1], alpha)
###
# Find the first variable of a Prolog term.
#
# @param alpha The Prolog term.
# @return The first variable or undefined.
##
def first_variable(alpha):
while True:
alpha = deref(alpha)
if is_variable(alpha):
return alpha
elif is_compound(alpha):
alpha = alpha.args
i = 0
while i < len(alpha) - 1:
res = first_variable(alpha[i])
if res is not NotImplemented:
return res
i += 1
alpha = alpha[i]
else:
return NotImplemented
################################################################
# term_variables/3 and term_singletons/2 #
################################################################
###
# term_variables(T, L, E): [TC2 8.5.5]
# The built-in succeeds in L with a list with end E and
# with the variables of T as its list elements.
##
def test_term_variables(args):
alpha = exec_build(args[0])
beta = exec_build(args[1])
gamma = deref(exec_build(args[2]))
alpha = term_variables(alpha)
alpha = set_to_list(alpha, gamma)
return unify(beta, alpha)
###
# Collect the variables of a Prolog term.
#
# @param alpha The Prolog term.
# @return The set with the variables.
##
def term_variables(alpha):
res = None
def term_variables2(alpha2):
nonlocal res
while True:
alpha2 = deref(alpha2)
if is_variable(alpha2):
if res is None:
res = OrderedDict()
res[alpha2] = None
break
elif is_compound(alpha2):
alpha2 = alpha2.args
i = 0
while i < len(alpha2) - 1:
term_variables2(alpha2[i])
i += 1
alpha2 = alpha2[i]
else:
break
term_variables2(alpha)
return res
###
# term_singletons(T, L):
# The built-in succeeds in L with the singleton variables of T.
##
def test_term_singletons(args):
alpha = exec_build(args[0])
alpha = term_singletons(alpha)
alpha = set_to_list(alpha, "[]")
return exec_unify(args[1], alpha)
###
# Collect the singleton variables of a Prolog term.
#
# @param alpha The Prolog term.
# @return The set with the singleton variables.
##
def term_singletons(alpha):
res = None
anon = None
def term_singletons2(alpha2):
nonlocal res
nonlocal anon
while True:
alpha2 = deref(alpha2)
if is_variable(alpha2):
if res is None:
res = OrderedDict()
anon = OrderedDict()
if alpha2 in res:
del anon[alpha2]
else:
res[alpha2] = None
anon[alpha2] = None
break
elif is_compound(alpha2):
alpha2 = alpha2.args
i = 0
while i < len(alpha2) - 1:
term_singletons2(alpha2[i])
i += 1
alpha2 = alpha2[i]
else:
break
term_singletons2(alpha)
return anon
####
# Convert the keys of a native dict into a Prolog list.
#
# @param temp The native set.
# @param end The list end.
##
def set_to_list(temp, end):
back = None
res = None
if temp is not None:
for peek in iter(temp):
peek = Compound(".", [peek, NotImplemented])
if back is None:
res = peek
else:
back.args[1] = peek
back = peek
if back is None:
res = end
else:
back.args[1] = end
return res
################################################################
# reference/1, \=/2 and acyclic_term/1 #
################################################################
###
# reference(A):
# The built-in succeeds if A is a Prolog reference. Otherwise, it fails.
##
def test_reference(args):
alpha = deref(exec_build(args[0]))
if is_variable(alpha) or is_compound(alpha):
return False
if is_atom(alpha) or is_number(alpha):
return False
return True
##
# acyclic_term(T): [TC2 8.3.11]
# The predicate succeeds when the Prolog term T is an acyclic term,
# i.e. contains no cycles.
##
def test_acyclic(args):
alpha = exec_build(args[0])
return is_acyclic(alpha)
###
# Check whether the given term is acyclic.
# Tail recursive solution.
#
# @param alpha The term.
# @return True if the term is acyclic, otherwise false.
##
def is_acyclic(alpha):
visited = None
def is_acyclic2(alpha2):
nonlocal visited
undo = None
while True:
if is_variable(alpha2):
if alpha2.instantiated is not NotImplemented:
if visited is None:
visited = set()
elif alpha2 in visited:
return False
visited.add(alpha2)
if undo is None:
undo = []
undo.append(alpha2)
alpha2 = alpha2.instantiated
else:
break
elif is_compound(alpha2):
alpha2 = alpha2.args
i = 0
while i < len(alpha2)-1:
if not is_acyclic2(alpha2[i]):
return False
i += 1
alpha2 = alpha2[i]
else:
break
if undo is not None:
i = len(undo)-1
while i >= 0:
visited.discard(undo[i])
i -= 1
return True
return is_acyclic2(alpha)
################################################################
# callable/1, var/1, nonvar/1, compound/1, atomic/1 and atom/1 #
################################################################
###
# callable(C): [TC2 8.3.9]
# The built-in succeeds if C is a Prolog compound or symbol. Otherwise, it fails.
##
def test_callable(args):
alpha = deref(exec_build(args[0]))
if is_variable(alpha) or is_number(alpha):
return False
return True
###
# var(V): [ISO 8.3.1]
# The built-in succeeds if V is a Prolog variable. Otherwise, it fails.
##
def test_var(args):
alpha = deref(exec_build(args[0]))
return is_variable(alpha)
###
# nonvar(V): [ISO 8.3.7]
# The built-in succeeds if V is not a Prolog variable. Otherwise, it fails.
##
def test_nonvar(args):
alpha = deref(exec_build(args[0]))
return not is_variable(alpha)
###
# compound(C): [ISO 8.3.6]
# The built-in succeeds if V is a Prolog compound. Otherwise, it fails.
##
def test_compound(args):
alpha = deref(exec_build(args[0]))
return is_compound(alpha)
###
# atomic(A): [ISO 8.3.5]
# The built-in succeeds if A is a Prolog symbol or number. Otherwise, it fails.
##
def test_atomic(args):
alpha = deref(exec_build(args[0]))
if is_compound(alpha) or is_variable(alpha):
return False
return True
###
# atom(A): [ISO 8.3.2]
# The built-in succeeds if A is a Prolog atom. Otherwise, it fails.
##
def test_atom(args):
alpha = deref(exec_build(args[0]))
return is_atom(alpha)
################################################################
# number/1, integer/1, float/1 #
################################################################
###
# number(A): [ISO 8.3.8]
# The built-in succeeds if A is a Prolog number. Otherwise, it fails.
##
def test_number(args):
alpha = deref(exec_build(args[0]))
return is_number(alpha)
###
# integer(A): [ISO 8.3.3]
# The built-in succeeds if A is a Prolog integer. Otherwise, it fails.
##
def test_integer(args):
alpha = deref(exec_build(args[0]))
return is_integer(alpha)
###
# float(A): [ISO 8.3.4]
# The built-in succeeds if A is a Prolog float. Otherwise, it fails.
##
def test_float(args):
alpha = deref(exec_build(args[0]))
return is_float(alpha)
################################################################
# Number Utility #
################################################################
def narrow_float(alpha):
if is_integer(alpha):
try:
return float(alpha)
except OverflowError:
raise make_error(Compound("evaluation_error", ["float_overflow"]))
else:
return alpha
################################################################
# code_type/2 and code_numeric/2 #
################################################################
###
# code_type(C, T):
# The predicate succeeds in T with the Unicode general category of C.
# Otherwise, the predicate succeeds in T with 0.
##
def test_code_type(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
if alpha < 0 or alpha > 0x10FFFF:
alpha = 0 # UNASSIGNED
else:
alpha = code_type(alpha)
return exec_unify(args[1], alpha)
###
# code_numeric(C, V):
# The predicate succeeds in V with the Unicode numeric value of C,
# in case it is integer and between 0 and 35. Otherwise, the predicate
# succeeds in V with -1.
##
def test_code_numeric(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
if alpha < 0 or alpha > 0x10FFFF:
alpha = -1 # UNASSIGNED
else:
alpha = code_numeric(alpha)
return exec_unify(args[1], alpha)
################################################################
# atom_integer/3 #
################################################################
###
# atom_integer(A, R, N):
# If A is a variable, then the built-in succeeds in A with the
# atom for the Prolog integer N in radix R. Otherwise the
# built-in succeeds in N with the Prolog number from the
# atom A in radix R.
##
def test_atom_integer(args):
text = deref(exec_build(args[0]))
radix = deref(exec_build(args[1]))
check_integer(radix)
if radix < 2 or radix > 36:
raise make_error(Compound("domain_error", ["radix", radix]))
if is_variable(text):
beta = deref(exec_build(args[2]))
check_integer(beta)
if beta >= 0:
beta = atom_integer_encode(beta, radix)
else:
beta = "-" + atom_integer_encode(- beta, radix)
return unify(text, beta)
else:
check_atom(text)
text = atom_integer_decode(text, radix)
return exec_unify(args[2], text)
###
# Encode a Prolog integer to a string.
#
# @param num The Prolog integer.
# @param radix The radix.
# @return The string.
##
def atom_integer_encode(num, radix):
if radix == 10:
return str(num)
if num.bit_length() < 32:
return str_integer(num, radix)
else:
step = 32 // (radix - 1).bit_length()
base = radix ** step
count = 0
res = str_integer(num % base, radix)
num = num // base
while num != 0:
count += step
res = res.rjust(count, "0")
res = str_integer(num % base, radix) + res
num = num // base
return res
def str_integer(num, radix):
res = chr_digit(num % radix)
num = num // radix
while num != 0:
res = chr_digit(num % radix) + res
num = num // radix
return res
def chr_digit(num):
if num < 10:
return chr(num + 48)
else:
return chr(num + 87)
###
# Decode a Prolog integer from a string.
#
# @param text The string
# @param radix The radix.
# @param signed The signed flag.
# @return The Prolog integer.
##
def atom_integer_decode(text, radix):
text = ascii_replace(text, radix, False)
try:
return int(text, radix)
except Exception:
raise make_error(Compound("syntax_error", ["illegal_number"]))
################################################################
# atom_number/2 #
################################################################
###
# atom_number(A, N):
# If A is a variable, then the built-in succeeds in A with the
# atom for the Prolog number N. Otherwise the built-in succeeds in N
# with the Prolog number from the atom A.
##
def test_atom_number(args):
text = deref(exec_build(args[0]))
if is_variable(text):
beta = deref(exec_build(args[1]))
check_number(beta)
beta = atom_number_encode(beta)
return unify(text, beta)
else:
check_atom(text)
text = atom_number_decode(text)
return exec_unify(args[1], text)
###
# Encode a Prolog number to a string.
#
# @param num The Prolog number.
# @return The string.
##
def atom_number_encode(num):
if is_integer(num):
return str(num)
else:
return shape_number("%.17g" % num)
###
# Shape the number string so that it has always a period,
# no exponent positive sign and upper case exponent. Further
# strip mantissa from trailing zeros, without removing the
# period, and remove the exponent from leading zeros.
#
# @param res The ascii number string.
# @return The shaped number string.
##
def shape_number(res):
peek = res.find("e")
if peek != -1:
res = shape_number_mantissa(res[0:peek]) + \
"E" + shape_number_exponent(res[peek+1:])
else:
res = shape_number_mantissa(res)
return res
def shape_number_mantissa(res):
if "." not in res:
res += ".0"
return res
def shape_number_exponent(res):
if 0 < len(res) and ord(res[0]) == 43: # '+'
res = res[1:]
if res.startswith("0") and 1 < len(res):
res = res[1:]
elif res.startswith("-0") and 2 < len(res):
res = "-" + res[2:]
return res
###
# Decode a Prolog number from a string. Unlike the ISO
# core standard and numbers without a period but with
# an exponent are accepted as float.
#
# @param str The string
# @return The Prolog number.
##
def atom_number_decode(text):
text = ascii_replace(text, 10, True)
if "." in text or "e" in text or "E" in text:
try:
return float(text)
except Exception:
raise make_error(Compound("syntax_error", ["illegal_number"]))
else:
try:
return int(text)
except Exception:
raise make_error(Compound("syntax_error", ["illegal_number"]))
################################################################
# atom_reference/2 #
################################################################
###
# atom_reference(A, R):
# The built-in succeeds in A with the atom for the Prolog reference R.
##
def test_atom_reference(args):
text = deref(exec_build(args[0]))
if is_variable(text):
obj = deref(exec_build(args[1]))
if obj is True:
obj = "True"
elif obj is False:
obj = "False"
elif obj is None:
obj = "None"
else:
obj = "Reference"
return unify(text, obj)
else:
check_atom(text)
if text == "True":
obj = True
elif text == "False":
obj = False
elif text == "None":
obj = None
else:
obj = NotImplemented
if obj is NotImplemented:
raise make_error(Compound("syntax_error", ["illegal_number"]))
return exec_unify(args[1], obj)
################################################################
# Number Checkers #
################################################################
###
# Convert and validate Unicode number values
# into capital ASCII number value.
#
# @param text The string.
# @param radix The radix.
# @param expo The float flag
# @return The The new string.
##
def ascii_replace(text, radix, expo):
buf = ""
last = 0
pos = 0
while pos < len(text):
ch = ord(text[pos])
val = code_numeric(ch)
if val >= 0 and val < radix:
if ch <= 127: # ASCII
pass
else:
if val < 10:
val += 48 # '0'
else:
val += 55 # 'A'-10
buf += text[last:pos] + chr(val)
last = pos + 1
elif (ch == 43 or ch == 45) and (pos == 0 or
(expo and (ord(text[pos-1]) == 101 or ord(text[pos-1]) == 69))):
pass
elif expo and (ch == 46 or ch == 101 or ch == 69):
pass
else:
raise make_error(Compound("syntax_error", ["illegal_number"]))
pos += 1
if last != 0:
buf += text[last:]
return buf
else:
return text
################################################################
# Locale Specials #
################################################################
###
# sys_get_locale(L):
# The built-in succeeds in L with the current locale
# of the python environment.
##
def test_sys_get_locale(args):
loc = locale.getlocale()[0]
return exec_unify(args[0], loc if loc is not None else "")
###
# sys_time_atom(F, T, A):
# If A is a variable, the built-in succeeds in A with the millisecond
# time T formatted by the pattern F. Otherwise the built-in succeeds
# in T with the millisecond time parsed from A by the pattern F.
##
def test_sys_time_atom(args):
text = deref(exec_build(args[0]))
check_atom(text)
tms = deref(exec_build(args[1]))
if is_variable(tms):
res = deref(exec_build(args[2]))
check_atom(res)
date = time.strptime(res, text)
return unify(tms, int(time.mktime(date)*1000))
else:
check_integer(tms)
date = time.localtime(tms/1000)
return exec_unify(args[2], time.strftime(text, date))
###
# sys_get_args(L):
# The built-in succeeds in L with the current command
# line arguments of the python environment.
##
def test_sys_get_args(args):
res = "[]"
i = len(sys.argv)-1
while i >= 1:
res = Compound(".", [sys.argv[i], res])
i -= 1
return exec_unify(args[0], res)
################################################################
# Special Init #
################################################################
# Albufeira compiler, deterministic
add("fail", 0, make_check(test_fail))
add("$CUT", 1, make_check(test_cut))
add("$MARK", 1, make_check(test_mark))
add("$SEQ", 2, make_special(special_seq))
add("$ALT", 1, make_special(special_alt))
add("sys_raise", 1, make_check(test_sys_raise))
add("sys_trap", 3, make_special(special_sys_trap))
# Albufeira compiler, for call/1
add("os_sleep_promise", 2, make_check(test_os_sleep_promise))
add("os_import_promise", 3, make_check(test_os_import_promise))
add("os_invoke_main", 1, make_check(test_os_invoke_main))
add("$YIELD", 1, make_special(special_yield))
add("shield", 1, make_special(special_shield))
add("unshield", 1, make_special(special_unshield))
# term specials
add("=", 2, make_check(test_unify))
add("copy_term", 2, make_check(test_copy_term))
add("=..", 2, make_check(test_univ))
add("functor", 3, make_check(test_functor))
add("arg", 3, make_check(test_arg))
add("change_arg", 3, make_check(test_change_arg))
# object specials
add("ir_object_new", 1, make_check(test_ir_object_new))
add("ir_object_current", 3, make_check(test_ir_object_current))
add("ir_object_set", 3, make_check(test_ir_object_set))
add("ir_object_keys", 2, make_check(test_ir_object_keys))
# variable specials
add("ground", 1, make_check(test_ground))
add("nonground", 2, make_check(test_nonground))
add("term_variables", 3, make_check(test_term_variables))
add("term_singletons", 2, make_check(test_term_singletons))
add("reference", 1, make_check(test_reference))
add("acyclic_term", 1, make_check(test_acyclic))
# type specials
add("callable", 1, make_check(test_callable))
add("var", 1, make_check(test_var))
add("nonvar", 1, make_check(test_nonvar))
add("compound", 1, make_check(test_compound))
add("atomic", 1, make_check(test_atomic))
add("atom", 1, make_check(test_atom))
add("number", 1, make_check(test_number))
add("integer", 1, make_check(test_integer))
add("float", 1, make_check(test_float))
# atom specials
add("code_type", 2, make_check(test_code_type))
add("code_numeric", 2, make_check(test_code_numeric))
add("atom_integer", 3, make_check(test_atom_integer))
add("atom_number", 2, make_check(test_atom_number))
add("atom_reference", 2, make_check(test_atom_reference))
# locale specials
add("sys_get_locale", 1, make_check(test_sys_get_locale))
add("sys_time_atom", 3, make_check(test_sys_time_atom))
add("sys_get_args", 1, make_check(test_sys_get_args))