Python "theatre"

Admin User, created Apr 18. 2025
         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
import asyncio
import math
from nova.store import (add,
Compound, is_variable)
import nova.machine as machine
from nova.machine import (deref, unify, cont, register_signal,
invoke_interrupt, launch, VOID_ARGS, launch_async,
exec_unify, exec_build, make_error, Context,
copy_term, task_async, callback)
from nova.special import (check_atom, make_special, check_integer,
make_check, set_to_list, check_number, narrow_float)
from nova.runtime import (check_goal)
################################################################
# Main API #
################################################################
###
# Post a message to the Prolog interpreter.
#
# @param message The message.
##
def post(message):
register_signal("main", message)
invoke_interrupt("main")
###
# Run a Prolog term once. The goal is run with auto-yield
# disabled and promises are not accepted.
#
# @param goal The Prolog term.
# @return True, false or yield.
##
def perform(goal):
goal = Compound(".", [goal, "[]"])
return launch(goal, "main", VOID_ARGS)
###
# Run a Prolog term once. The goal is run with auto-yield
# enabled and promises are accepted.
#
# @param goal The Prolog term.
# @return True or false.
##
async def perform_async(goal):
goal = Compound(".", [goal, "[]"])
return await launch_async(goal, "main", VOID_ARGS)
################################################################
# Register API #
################################################################
###
# The flag indicates a non-void dispatch foreign function.
##
FFI_FUNC = 0x00000100
###
# Add a Python function to the knowledge base. The given
# function is invoked with rather slow arguments copying and
# spreading. Also the function is wrapped and registered as a
# special, therefore not eligible to neck optimization.
#
# @param functor The functor.
# @param arity The arity.
# @param func The Python function.
# @param flags The flags.
##
def register(functor, arity, func, flags):
if (flags & FFI_FUNC) != 0:
res = make_special(lambda args: invoke_func(func, args))
else:
res = make_special(lambda args: invoke(func, args))
add(functor, arity, res)
def invoke(func, args):
temp = [NotImplemented] * len(args)
i = 0
while i < len(temp):
temp[i] = deref(args[i])
i += 1
func(*temp)
cont(machine.call.args[1])
return True
def invoke_func(func, args):
temp = [NotImplemented] * (len(args) - 1)
i = 0
while i < len(temp):
temp[i] = deref(args[i])
i += 1
res = func(*temp)
if not unify(args[len(args) - 1], res):
return False
cont(machine.call.args[1])
return True
################################################################
# ir_object_new/1 and ir_object_current/3 #
################################################################
###
# ir_object_new(O):
# The predicate succeeds in O with a new Python object.
##
def test_ir_object_new(args):
res = {}
return exec_unify(args[0], res)
###
# ir_object_current(O, K, V):
# The predicate succeeds in V with the value of the key K
# in the Python object O.
##
def test_ir_object_current(args):
obj = deref(exec_build(args[0]))
key = deref(exec_build(args[1]))
check_atom(key)
try:
if isinstance(obj, dict):
res = obj.get(key, NotImplemented)
else:
res = getattr(obj, key, NotImplemented)
except AttributeError:
res = NotImplemented
if res is NotImplemented:
return False
return exec_unify(args[2], res)
################################################################
# ir_object_set/3 and ir_object_keys/2 #
################################################################
###
# ir_object_set(O, K, V):
# The predicate sets the value of the key K in the
# Python object O to V.
##
def test_ir_object_set(args):
obj = deref(exec_build(args[0]))
key = deref(exec_build(args[1]))
check_atom(key)
value = deref(exec_build(args[2]))
try:
if isinstance(obj, dict):
obj[key] = value
else:
setattr(obj, key, value)
except AttributeError:
raise make_error(Compound("permission_error",
["field", "modify", key]))
return True
###
# ir_object_keys(O, L):
# The predicate succeeds in L with the keys of
# the Python object O
##
def test_ir_object_keys(args):
obj = deref(exec_build(args[0]))
res = obj.keys()
res = set_to_list(res)
return exec_unify(args[1], res)
################################################################
# ir_float_value/2 #
################################################################
###
# ir_float_value(P, H):
# If P is instantiated, then the predicate succeeds in H with the
# Python float of P, otherwise in P with the Prolog float of H.
##
def test_ir_float_value(args):
alpha = deref(exec_build(args[0]))
if is_variable(alpha):
beta = deref(exec_build(args[1]))
check_number(beta)
res = norm_float(narrow_float(beta))
return unify(alpha, res)
else:
check_number(alpha)
res = narrow_float(alpha)
return exec_unify(args[1], res)
def norm_float(alpha):
if math.isnan(alpha):
raise make_error(Compound("evaluation_error", ["undefined"]))
if not math.isfinite(alpha):
raise make_error(Compound("evaluation_error", ["float_overflow"]))
if alpha == 0.0:
return 0.0
return alpha
################################################################
# os_call_later/3 and os_call_cancel/1 #
################################################################
###
# os_call_later(N, D, T): internal only
# The predicate succeeds in T with a new timer. As a side
# effect it schedules the compiled goal N to be executed
# after D milliseconds.
##
def test_os_call_later(args):
goal = deref(exec_build(args[0]))
check_goal(goal)
delay = deref(exec_build(args[1]))
check_integer(delay)
buf = machine.ctx
loop = asyncio.get_running_loop()
return exec_unify(args[2], loop.call_later(delay/1000.0,
lambda: callback(goal, buf, VOID_ARGS)))
###
# os_call_cancel(T): internal only
# The predicate succeeds. As a side effect it cancels the timer T.
##
def test_os_call_cancel(args):
res = deref(exec_build(args[0]))
res.cancel()
return True
################################################################
# os_task_current/1, os_task_abort/2 and os_task_create/2 #
################################################################
###
# os_task_current(E): internal only
# The predicate succeeds in E with the current task.
##
def test_os_task_current(args):
return exec_unify(args[0], machine.ctx)
###
# os_task_abort(E, M): internal only
# The predicate succeeds. As a side effect the task E gets
# the message M signalled.
##
def test_os_task_abort(args):
buf = deref(exec_build(args[0]))
msg = exec_build(args[1])
msg = copy_term(msg)
register_signal(buf, msg)
invoke_interrupt(buf)
return True
###
# os_task_create(N, E): internal only
# The predicate succeeds in E with a new task for the compiled
# goal N. The task is scheduled to executed immediately.
##
def test_os_task_create(args):
goal = deref(exec_build(args[0]))
check_goal(goal)
buf = Context()
loop = asyncio.get_running_loop()
loop.create_task(task_async(goal, buf, VOID_ARGS))
return exec_unify(args[1], buf)
################################################################
# Theatre Init #
################################################################
# object specials
add("ir_object_new", 1, make_check(test_ir_object_new))
add("ir_object_current", 3, make_check(test_ir_object_current))
add("ir_object_set", 3, make_check(test_ir_object_set))
add("ir_object_keys", 2, make_check(test_ir_object_keys))
# scalar specials
add("ir_float_value", 2, make_check(test_ir_float_value))
# system specials, coroutines, internal only
add("os_call_later", 3, make_check(test_os_call_later))
add("os_call_cancel", 1, make_check(test_os_call_cancel))
add("os_task_current", 1, make_check(test_os_task_current))
add("os_task_abort", 2, make_check(test_os_task_abort))
add("os_task_create", 2, make_check(test_os_task_create))