Python "runtime"

Admin User, erstellt 27. Apr. 2024
         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
import nova.store as store
from nova.store import (MASK_PRED_DYNAMIC, set_stage, pred_link,
Clause, is_provable, remove_clause, is_logical, is_cache,
set_partition, pred_destroy, clear, add_clause, Cache,
make_defined, add, pred_touch, snapshot_data,
Variable, Compound, is_compound,
Place, Quote, Skeleton, unquote_objects)
import nova.machine as machine
from nova.machine import (cont, deref,
make_indicator, lookup_pred, exec_body, exec_head,
defined_clauses, make_indicator_term,
Choice, launch, more, unbind, exec_unify, exec_build,
snap_cleanup, cut, check_nonvar, VOID_ARGS,
Goal, snap_setup, solve, make_error, VAR_MASK_STATE,
unify, Context, real_time, register_signal, invoke_interrupt,
launch_async, copy_term, melt_directive)
from nova.special import (check_atom, make_check,
check_integer, make_special, list_objects, check_var)
import os
import time
import sys
import importlib
import asyncio
import stat
import urllib.request
import urllib.error
import email.utils
MAX_BUF = 4096
bootbase = ""
###
# Set the boot base.
#
# @param url The boot base.
##
def set_bootbase(url):
global bootbase
bootbase = url
###################################################################
# current_output/1, current_error/1, set_output/1 and set_error/1 #
###################################################################
###
# Create a text output.
##
class Sink:
def __init__(self):
self.buf = ""
self.send = lambda fd, buf: None
self.last = -1
self.notify = lambda fd: None
self.release = lambda fd: None
self.data = NotImplemented
self.indent = 0
###
# current_output(S): [ISO 8.11.2]
# The built-in succeeds in S with the current output.
##
def test_current_output(args):
alpha = store.engine.text_output
return exec_unify(args[0], alpha)
###
# current_error(S):
# The built-in succeeds in S with the current error.
##
def test_current_error(args):
alpha = store.engine.text_error
return exec_unify(args[0], alpha)
###
# set_output(S): [ISO 8.11.4]
# The built-in succeeds. As a side effect the current output is set to S.
##
def test_set_output(args):
obj = deref(exec_build(args[0]))
check_sink(obj)
store.engine.text_output = obj
return True
###
# set_error(S):
# The built-in succeeds. As a side effect the current error is set to S.
##
def test_set_error(args):
obj = deref(exec_build(args[0]))
check_sink(obj)
store.engine.text_error = obj
return True
###
# Assure that the object is a sink.
#
# @param beta The object.
##
def check_sink(beta):
if not isinstance(beta, Sink):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["writer", beta]))
###################################################################
# put_code/2, current_lastcode/2 and set_lastcode/2 #
###################################################################
###
# put_code(S, C): [ISO 8.12.3]
# The built-in succeeds. As a side effect, it adds
# the code point C to the stream S.
##
def test_put_code(args):
stream = deref(exec_build(args[0]))
check_sink(stream)
ch = deref(exec_build(args[1]))
check_integer(ch)
if ch < 0 or ch > 0x10FFFF:
raise make_error(Compound("domain_error", ["code_point", ch]))
put_code(stream, ch)
return True
def put_code(stream, ch):
stream.buf += chr(ch)
stream.last = ch
if len(stream.buf) >= MAX_BUF:
flush_buffer(stream)
def flush_buffer(stream):
if len(stream.buf) > 0:
stream.send(stream.data, stream.buf)
stream.buf = ""
###
# current_lastcode(S, C):
# The built-in succeeds in C with the last code of the stream S.
##
def test_current_lastcode(args):
stream = deref(exec_build(args[0]))
check_sink(stream)
return exec_unify(args[1], stream.last)
###
# set_lastcode(S, C):
# The built-in succeeds. As a side effect, the last code of the stream S is set to C.
##
def test_set_lastcode(args):
stream = deref(exec_build(args[0]))
check_sink(stream)
ch = deref(exec_build(args[1]))
check_integer(ch)
stream.last = ch
return True
################################################################
# put_atom/2 #
################################################################
###
# put_atom(S, A):
# The built-in succeeds. As a side effect, it adds
# the atom to the stream S.
##
def test_put_atom(args):
stream = deref(exec_build(args[0]))
check_sink(stream)
text = deref(exec_build(args[1]))
check_atom(text)
put_atom(stream, text)
return True
def put_atom(stream, text):
if len(text) > 0:
stream.buf += text
stream.last = last_code(text)
if len(stream.buf) >= MAX_BUF:
flush_buffer(stream)
def last_code(text):
return ord(text[len(text)-1])
################################################################
# current_input/1 and set_input/1 #
################################################################
MASK_SRC_SKIP = 0x00000001
MASK_SRC_AREAD = 0x00000002
###
# Create a text input.
##
class Source:
def __init__(self):
self.buf = ""
self.pos = 0
self.receive = lambda fd: ""
self.flags = 0
self.lineno = 0
self.release = lambda fd: None
self.data = NotImplemented
def illegal_state():
raise make_error(Compound("resource_error", ["illegal_state"]))
###
# current_input(S): [ISO 8.11.1]
# The built-in succeeds in S with the current input.
##
def test_current_input(args):
alpha = store.engine.text_input
return exec_unify(args[0], alpha)
###
# set_input(S): [ISO 8.11.3]
# The built-in succeeds. As a side effect it sets the current input to S.
##
def test_set_input(args):
obj = deref(exec_build(args[0]))
check_source(obj)
store.engine.text_input = obj
return True
###
# Assure that the object is a source.
#
# @param beta The object.
##
def check_source(beta):
if not isinstance(beta, Source):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["reader", beta]))
################################################################
# os_read_sync/1, os_get_code/2 and os_peek_code/2 #
################################################################
###
# os_read_sync(S):
# The predicate succeeds. As a side effect the stream buffer is read.
##
def test_os_read_sync(args):
stream = deref(exec_build(args[0]))
stream.buf = stream.receive(stream.data)
stream.pos = 0
return True
###
# os_get_code(S, C):
# The predicate succeeds in C with the Unicode point from the stream buffer S.
# As a side effect the stream position is advanced.
##
def test_os_get_code(args):
stream = deref(exec_build(args[0]))
pos = stream.pos
buf = stream.buf
if pos < len(buf):
ch = ord(buf[pos])
pos += 1
if ch == 13 or (ch == 10 and (stream.flags & MASK_SRC_SKIP) == 0):
stream.lineno += 1
if ch == 13:
stream.flags |= MASK_SRC_SKIP
else:
stream.flags &= ~MASK_SRC_SKIP
stream.pos = pos
return exec_unify(args[1], ch)
else:
return False
###
# os_peek_code(S, C):
# The built-in succeeds in C with the Unicode point from the stream buffer S.
##
def test_os_peek_code(args):
stream = deref(exec_build(args[0]))
pos = stream.pos
buf = stream.buf
if pos < len(buf):
ch = ord(buf[pos])
return exec_unify(args[1], ch)
else:
return False
################################################################
# os_open_promise/3 #
################################################################
###
# os_open_promise(P, S, Q):
# The predicate succeeds in Q with a promise for open input S
# on path P.
##
def test_os_open_promise(args):
url = deref(exec_build(args[0]))
check_atom(url)
stream = Source()
if not exec_unify(args[1], stream):
return False
buf = machine.ctx
if url.startswith("http:") or url.startswith("https:"):
prom = open_http_promise(buf, stream, url)
else:
prom = open_file_promise(buf, stream, url)
return exec_unify(args[2], lambda: prom)
async def open_http_promise(buf, stream, url):
try:
response = await asyncio.to_thread(urllib.request.urlopen, url)
except urllib.error.HTTPError as err:
register_signal(buf, map_http_result(err.code, url))
return
except urllib.error.URLError as err:
register_signal(buf, Compound("resource_error", ["url_exception"]))
return
stream.data = response
stream.receive = http_read_promise
stream.release = file_close_promise
stream.flags |= MASK_SRC_AREAD
async def open_file_promise(buf, stream, url):
try:
file = await asyncio.to_thread(open, url, encoding="utf8")
except OSError as err:
register_signal(buf, map_file_error(err, url))
return
stream.data = file
stream.receive = file_read_promise
stream.release = file_close_promise
stream.flags |= MASK_SRC_AREAD
def map_file_error(err, url):
errno = err.errno
if errno == 2:
return Compound("existence_error", ["source_sink", url])
else:
return Compound("resource_error", ["io_exception"])
###################################################################
# os_stream_flags/2, os_read_promise/2 and os_close_promise/2 #
###################################################################
###
# os_stream_flags(S, F):
# The predicate succeeds in F with the flags of the stream S.
##
def test_os_stream_flags(args):
obj = deref(exec_build(args[0]))
if isinstance(obj, Source):
flags = obj.flags
else:
check_sink(obj)
flags = 0
return exec_unify(args[1], flags)
###
# os_read_promise(S, P):
# The predicate suceeds in P with a read promise for a input S.
##
def test_os_read_promise(args):
stream = deref(exec_build(args[0]))
buf = machine.ctx
return exec_unify(args[1], lambda: stream.receive(buf, stream))
async def http_read_promise(buf, stream):
try:
res = await asyncio.to_thread(blocking_chunk, stream.data)
stream.buf = res
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_chunk(data):
return data.read().decode('utf-8')
async def file_read_promise(buf, stream):
try:
res = await asyncio.to_thread(blocking_read, stream.data)
stream.buf = res
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_read(data):
return data.read()
###
# Map a stream error when in transit.
#
# @param err The offending error.
# @return The Prolog error term.
##
def map_stream_error(err):
return Compound("resource_error", ["io_error"])
###
# os_close_promise(S, P):
# The predicate suceeds in P with a read promise for a input S.
##
def test_os_close_promise(args):
stream = deref(exec_build(args[0]))
buf = machine.ctx
return exec_unify(args[1], lambda: stream.release(buf, stream))
async def file_close_promise(buf, stream):
try:
await asyncio.to_thread(blocking_close, stream.data)
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_close(data):
return data.close()
################################################################
# os_open_sync/3 #
################################################################
###
# os_open_sync(P, M, S):
# The predicate succeeds. As a side effect the stream S is
# opened on the path P with the mode M.
##
def test_os_open_sync(args):
url = deref(exec_build(args[0]))
check_atom(url)
mode = deref(exec_build(args[1]))
check_atom(mode)
if "read" == mode:
raise make_error(Compound("resource_error",
["not_implemented"]))
elif "write" == mode:
stream = open_write(url, "w")
elif "append" == mode:
stream = open_write(url, "a")
else:
raise make_error(Compound("domain_error",
["io_mode", mode]))
return exec_unify(args[2], stream)
def open_write(url, mode):
try:
file = open(url, mode, encoding="utf8", newline='')
except OSError as err:
raise make_error(map_file_error(err, url))
dst = Sink()
dst.data = file
dst.send = file_write
dst.release = file_close
return dst
def file_write(data, buf):
try:
data.write(buf)
except IOError as err:
raise make_error(map_stream_error(err))
def file_close(data):
try:
data.close()
except IOError as err:
raise make_error(map_stream_error(err))
################################################################
# flush_output/1 and os_close_sync/1 #
################################################################
###
# flush_output(S): [ISO 8.11.7]
# The built-in succeeds. As a side effect, it flushes the stream buffer.
##
def test_flush_output(args):
stream = deref(exec_build(args[0]))
check_sink(stream)
stream_flush(stream)
return True
def stream_flush(stream):
flush_buffer(stream)
stream.notify(stream.data)
###
# close(S): [ISO 8.11.6]
# The built-in succeeds. As a side effect, the stream S is closed.
##
def test_os_close_sync(args):
stream = deref(exec_build(args[0]))
if isinstance(stream, Sink):
pass
else:
check_source(stream)
stream_close(stream)
return True
def stream_close(stream):
if isinstance(stream, Sink):
flush_buffer(stream)
stream.release(stream.data)
else:
stream.release(stream.data)
################################################################
# os_prop_promise/3 and set_file_property/2 #
################################################################
###
# os_prop_promise(F, M, Q):
# The predicate succeeds in Q with with a promise for the
# properties M of the file F. Barks if path F doesn't exist
# or io exception while resolving.
##
def test_os_prop_promise(args):
url = deref(exec_build(args[0]))
check_atom(url)
res = {}
if not exec_unify(args[1], res):
return False
buf = machine.ctx
if url.startswith("http:") or url.startswith("https:"):
prom = prop_http_promise(buf, url, res)
else:
prom = prop_file_promise(buf, url, res)
return exec_unify(args[2], lambda: prom)
async def prop_http_promise(buf, url, res):
request = urllib.request.Request(url, method="HEAD")
try:
response = await asyncio.to_thread(urllib.request.urlopen, request)
await asyncio.to_thread(response.close)
except urllib.error.HTTPError as err:
register_signal(buf, map_http_result(err.code, url))
return
except urllib.error.URLError as err:
register_signal(buf, Compound("resource_error", ["url_exception"]))
return
val = response.headers.get("last-modified")
rpath = response.url
mtime = int(email.utils.mktime_tz(email.utils.parsedate_tz(val))*1000) \
if val is not None else -1
ftype = "regular"
res["last_modified"] = mtime
res["real_path"] = rpath
res["type"] = ftype
def map_http_result(res, url):
if res == 403: # Forbidden
return Compound("permission_error",
["open", "source_sink", url])
elif res == 404: # Not Found
return Compound("existence_error",
["source_sink", url])
elif res == 405: # Method Not Allowed
return Compound("resource_error",
["illegal_method"])
else:
return Compound("resource_error",
["io_exception"])
async def prop_file_promise(buf, url, res):
try:
rpath = await asyncio.to_thread(os.path.realpath, url)
stats = await asyncio.to_thread(os.lstat, rpath)
ftype = "regular" if stat.S_ISREG(stats.st_mode) else \
("directory" if stat.S_ISDIR(stats.st_mode) else "other")
except OSError as err:
register_signal(buf, map_file_error(err, url))
return
mtime = (stats.st_mtime_ns + 500000) // 1000000
res["last_modified"] = mtime
res["real_path"] = rpath
res["type"] = ftype
###
# set_file_property(F, P):
# The predicate assigns the property P to the file F.
##
def test_set_file_property(args):
url = deref(exec_build(args[0]))
check_atom(url)
prop = deref(exec_build(args[1]))
if (is_compound(prop) and
prop.functor == "last_modified"
and len(prop.args) == 1):
val2 = deref(prop.args[0])
check_integer(val2)
if val2 < 0:
raise make_error(Compound("domain_error",
["not_less_than_zero", val2]))
try:
val = (os.lstat(url).st_mtime_ns + 500000) // 1000000
os.utime(url, (val/1000, val2/1000), follow_symlinks=False)
except OSError as err:
raise make_error(map_file_error(err, url))
else:
check_nonvar(prop)
prop = copy_term(prop)
raise make_error(Compound("domain_error", ["prolog_property", prop]))
return True
################################################################
# ir_place_new/2 and ir_skeleton_new/3 #
################################################################
###
# ir_place_new(I, S):
# The predicate succeeds in S with a new place for index I.
##
def test_ir_place_new(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
alpha = Place(alpha)
return exec_unify(args[1], Quote(alpha))
###
# ir_skeleton_new(F, L, S):
# The predicate succeeds in S with a new skeleton for functor F and list L.
##
def test_ir_skeleton_new(args):
alpha = deref(exec_build(args[0]))
beta = deref(exec_build(args[1]))
beta = list_objects(beta)
unquote_objects(beta)
alpha = Skeleton(alpha, beta)
return exec_unify(args[2], Quote(alpha))
################################################################
# ir_is_site/1, ir_pred_site/2 and ir_site_name/2 #
################################################################
###
# ir_is_site(Q): internal only
# The built-in succeeds if Q is a cache.
##
def test_ir_is_site(args):
cache = deref(exec_build(args[0]))
return is_cache(cache)
###
# ir_pred_site(F, Q): internal only
# The built-in succeeds in Q with the cache for the functor F.
##
def test_ir_pred_site(args):
name = deref(exec_build(args[0]))
check_atom(name)
return exec_unify(args[1], Cache(name))
###
# ir_site_name(Q, F): internal only
# The built-in succeeds in F with the functor of the cache Q.
##
def test_ir_site_name(args):
cache = deref(exec_build(args[0]))
check_cache(cache)
return exec_unify(args[1], cache.name)
###
# Assure that the object is a provable.
#
# @param beta The object.
##
def check_cache(beta):
if not is_cache(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error",
["cache", beta]))
################################################################
# ir_clause_new/5 and ir_clause_add/4 #
################################################################
###
# ir_clause_new(S, H, B, R, D, C):
# The built-in succeeds in C with a Java object representing
# a clause with variable count S, head instructions H, body instructions B,
# cut variable index R and head index option D.
##
def test_ir_clause_new(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
beta = deref(exec_build(args[1]))
beta = list_objects(beta)
gamma = deref(exec_build(args[2]))
gamma = list_objects(gamma)
delta = deref(exec_build(args[3]))
check_integer(delta)
mue = deref(exec_build(args[4]))
if is_compound(mue) and mue.functor == "just" and len(mue.args) == 1:
mue = deref(mue.args[0])
else:
mue = NotImplemented
unquote_objects(beta)
unquote_objects(gamma)
return unify(args[5], Clause(alpha, beta, gamma, delta, mue))
###
# ir_clause_add(F, A, C, O):
# The built-in succeeds. As a side effect the Python object clause C
# is added according to options O to the knowledge base for the predicate
# indicator F/A. If C is not a clause, but directly a Python object
# provable, the built-in definition of the indicator F/A is updated.
##
def test_ir_clause_add(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
gamma = deref(exec_build(args[2]))
flags = deref(exec_build(args[3]))
check_integer(flags)
add_clause(functor, arity, gamma, flags)
return True
################################################################
# ir_goal_new/4 and ir_goal_run/1 #
################################################################
###
# ir_goal_new(S, B, G):
# The built-in succeeds in G with a Python object representing
# a goal with variable count S, body instructions B.
##
def test_ir_goal_new(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
beta = deref(exec_build(args[1]))
beta = list_objects(beta)
unquote_objects(beta)
return exec_unify(args[2], Goal(alpha, beta))
###
# ir_goal_run(G):
# As a side effect, the built-in executes the Python object goal G,
# cuts away its choice points and undoes its bindings. If the goal fails,
# it throws a new error. If the goal throws an error, it re-throws this error.
# If the goal succeeds, the built-in succeeds.
##
def special_ir_goal_run(args):
goal = deref(args[0])
check_goal(goal)
snap = snap_setup()
cont(melt_directive(goal))
return solve_run(snap, True, None)
###
# Call or resume a goal.
# Failure results in error, success results in cut.
#
# @param snap The surrounding choice point.
# @param found The call or redo flag.
# @param choice The choice point for reuse or null.
# @return True if goal succeeded, otherwise false.
##
def solve_run(snap, found, choice):
try:
found = solve(snap, found)
except Exception as err:
snap_cleanup(snap)
raise err from None
if found is False:
raise make_error(Compound("syntax_error", ["directive_failed"]))
if found is not True:
if machine.redo is not snap:
if choice is None:
choice = Choice(solve_run, snap, False, machine.trail)
else:
choice.mark = machine.trail
choice.cont = machine.call
choice.tail = machine.redo
more(choice)
else:
cut(snap.tail)
return found
snap_cleanup(snap)
cont(machine.call.args[1])
return True
################################################################
# Dynamic Database #
################################################################
MASK_FIND_MODIFY = 0x00000001
MASK_FIND_DYNAMIC = 0x00000002
MASK_FIND_REVERSE = 0x00000004
###
# kb_clause_ref(H, F, C): internal only
# The built-in succeeds in C with the clause references
# for the head H and the flags F.
##
def special_kb_clause_ref(args):
head = deref(args[0])
flags = deref(args[1])
check_integer(flags)
peek = lookup_pred(head)
if peek is NotImplemented:
return False
if (flags & MASK_FIND_DYNAMIC) != 0:
if (peek.flags & MASK_PRED_DYNAMIC) == 0:
make_error_find(head, flags)
if not is_logical(peek.rope):
return False
if is_compound(head):
head = head.args
else:
head = VOID_ARGS
peek = defined_clauses(peek, head)
peek = snapshot_data(peek)
if (flags & MASK_FIND_REVERSE) == 0:
return solve2_ref(args, peek, 0, None)
else:
return solve2_ref_reverse(args, peek, len(peek), None)
def make_error_find(head, flags):
if (flags & MASK_FIND_MODIFY) != 0:
raise make_error(Compound("permission_error",
["modify", "static_procedure", make_indicator_term(head)]))
else:
raise make_error(Compound("permission_error",
["access", "private_procedure", make_indicator_term(head)]))
def solve_ref(rope, at, choice):
goal = deref(machine.call.args[0])
return solve2_ref(goal.args, rope, at, choice)
def solve_ref_reverse(rope, at, choice):
goal = deref(machine.call.args[0])
return solve2_ref_reverse(goal.args, rope, at, choice)
###
# Search a Prolog clause and return it.
#
# @param args The current arguments.
# @param rope The clause list.
# @param at The clause index.
# @param choice The choice point for reuse or null.
# @return True if search succeeds, otherwise false.
##
def solve2_ref(args, rope, at, choice):
mark = machine.trail
while at < len(rope):
clause = rope[at]
at += 1
if unify(args[2], clause):
if at < len(rope):
if choice is None:
choice = Choice(solve_ref, rope, at, mark)
else:
choice.at = at
more(choice)
cont(machine.call.args[1])
return True
unbind(mark)
return False
###
# Search a Prolog clause backwards and return it.
#
# @param args The current arguments.
# @param rope The clause list.
# @param at The clause index.
# @param choice The choice point for reuse or null.
# @return True if search succeeds, otherwise false.
##
def solve2_ref_reverse(args, rope, at, choice):
mark = machine.trail
while at > 0:
at -= 1
clause = rope[at]
if unify(args[2], clause):
if at > 0:
if choice is None:
choice = Choice(solve_ref_reverse, rope, at, mark)
else:
choice.at = at
more(choice)
cont(machine.call.args[1])
return True
unbind(mark)
return False
###
# kb_pred_touch(F, N, O): internal only
# The built-in succeeds. As a side effect the predicate
# indicator F/N with options O is touched.
##
def test_kb_pred_touch(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
flags = deref(exec_build(args[2]))
check_integer(flags)
pred_touch(functor, arity, flags)
return True
###
# kb_clause_remove(F, N, G, R): internal only
# The built-in succeeds if the clause or predicate R could
# be removed from the predicate indicator F/N and the flags G.
##
def test_kb_clause_remove(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
flags = deref(exec_build(args[2]))
check_integer(flags)
clause = deref(exec_build(args[3]))
check_clause(clause)
return remove_clause(functor, arity, clause, flags)
def check_clause(beta):
if not isinstance(beta, Clause):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["clause", beta]))
###
# kb_pred_destroy(F, N): internal only
# The built-in succeeds. As a side effect the
# predicate indicator F/N is destroyed.
##
def test_kb_pred_destroy(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
pred_destroy(functor, arity)
return True
################################################################
# Linked Provables #
################################################################
###
# kb_make_defined(L, P): internal only
# The built-in succeeds in P with an annonymous predicate for the clauses L.
##
def test_kb_make_defined(args):
alpha = deref(exec_build(args[0]))
alpha = list_objects(alpha)
return exec_unify(args[1], make_defined(alpha))
###
# kb_is_link(Q): internal only
# The built-in succeeds if Q is a provable.
##
def test_kb_is_link(args):
peek = deref(exec_build(args[0]))
return is_provable(peek)
###
# kb_pred_link(F, N, Q): internal only
# The built-in succeeds in Q with the provable of the
# predicate indicator F/A. Otherwise if no such provable
# exists the built-in fails.
##
def test_kb_pred_link(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
peek = pred_link(functor, arity)
if peek is NotImplemented:
return False
return exec_unify(args[2], peek)
###
# kb_link_flags(Q, F): internal only
# The built-in succeeds in F with the flags of the provable Q.
##
def test_kb_link_flags(args):
peek = deref(exec_build(args[0]))
check_provable(peek)
return exec_unify(args[1], peek.flags)
###
# Assure that the object is a provable.
#
# @param beta The object.
##
def check_provable(beta):
if not is_provable(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error",
["provable", beta]))
################################################################
# Meta Data #
################################################################
###
# kb_pred_list(L): internal only
# The built-in succeeds in L with the current predicate indicators.
##
def test_kb_pred_list(args):
res = kb_pred_list()
return exec_unify(args[0], res)
def kb_pred_list():
back = None
res = None
for functor in store.kb:
temp = store.kb[functor]
i = 0
while i < len(temp):
peek = temp[i]
if peek is NotImplemented or peek.remover is not NotImplemented:
i += 1
continue
peek = Compound(".", [make_indicator(functor, i), NotImplemented])
if back is None:
res = peek
else:
back.args[1] = peek
back = peek
i += 1
if back is None:
res = "[]"
else:
back.args[1] = "[]"
return res
###
# kb_clause_creator(C, S):
# The built-in succeeds in S with the creator of the clause C.
##
def test_kb_clause_creator(args):
clause = deref(exec_build(args[0]))
check_clause(clause)
return exec_unify(args[1], clause.creator)
###
# kb_clause_shard(C, S):
# The built-in succeeds in S with the shard of the clause C.
##
def test_kb_clause_shard(args):
clause = deref(exec_build(args[0]))
check_clause(clause)
return exec_unify(args[1], clause.shard)
###
# kb_clause_head(C, H): internal only
# The built-in succeeds in H with the head of the clause C.
##
def special_kb_clause_head(args):
clause = deref(args[0])
check_clause(clause)
head = deref(args[1])
if clause.size != 0:
display = [NotImplemented] * clause.size
else:
display = None
if (is_compound(head) and
not exec_head(clause.head, display, head.args)):
return False
cont(machine.call.args[1])
return True
###
# kb_clause_data(C, H, O, L): internal only
# The built-in succeeds in H, O and L with the head,
# cut var and body of the clause C.
##
def special_kb_clause_data(args):
clause = deref(args[0])
check_clause(clause)
head = deref(args[1])
if clause.size != 0:
display = [NotImplemented] * clause.size
else:
display = None
if (is_compound(head) and
not exec_head(clause.head, display, head.args)):
return False
peek = clause.cutvar
if peek != -1:
temp = Variable()
display[peek] = temp
temp = Compound("just", [temp])
else:
temp = "nothing"
if not unify(args[2], temp):
return False
temp = exec_body(clause.body, display)
if not unify(args[3], temp):
return False
cont(machine.call.args[1])
return True
################################################################
# dg_date_now/1 and dg_real_time/1 #
################################################################
###
# dg_date_now(W): internal only
# The built-in succeeds in W with the wall clock time.
##
def test_dg_date_now(args):
return exec_unify(args[0], int(round(time.time()*1000)))
###
# dg_real_time(W): internal only
# The built-in succeeds in W with the real time.
##
def test_dg_real_time(args):
return exec_unify(args[0], real_time())
####################################################################
# dg_gc_time/1, dg_call_count/1 and dg_gc_flags/1 #
####################################################################
###
# dg_gc_time(W): internal only
# The built-in succeeds in W with the garbage collection time.
##
def test_dg_gc_time(args):
return exec_unify(args[0], machine.gc_time)
###
# dg_call_count(W): internal only
# The built-in succeeds in W with the call count.
##
def test_dg_call_count(args):
return exec_unify(args[0], machine.gc_enter)
###
# dg_gc_flags(W): internal only
# The built-in succeeds in W with the garbage collector flags.
##
def test_dg_gc_flags(args):
return exec_unify(args[0], machine.gc_flags)
####################################################################
# dg_var_serno/2 #
####################################################################
###
# dg_var_serno(V, S): internal only
# The built-in succeeds in S with the serno of the variable V.
##
def test_dg_var_serno(args):
obj = deref(exec_build(args[0]))
check_var(obj)
return exec_unify(args[1], obj.flags & ~VAR_MASK_STATE)
################################################################
# dg_clear_stage/0, dg_get_stage/1 and dg_set_stage/1 #
################################################################
###
# dg_clear_stage: internal only
# The built-in succeeds. As a side effect it clears the current stage.
##
def test_dg_clear_stage(args):
clear()
return True
###
# dg_get_stage(D): internal only
# The built-in succeeds in D with the current stage.
##
def test_dg_get_stage(args):
return exec_unify(args[0], store.stage)
###
# dg_set_stage(D): internal only
# The built-in succeeds. As a side effect it changes the current stage.
##
def test_dg_set_stage(args):
value = deref(exec_build(args[0]))
check_integer(value)
set_stage(value)
return True
################################################################
# func_dg_get_partition/1 and dg_set_partition/1 #
################################################################
###
# dg_get_partition(D): internal only
# The built-in succeeds in D with the current partition.
##
def test_dg_get_partition(args):
return exec_unify(args[0], store.engine.partition)
###
# dg_set_partition(D): internal only
# The built-in succeeds. As a side effect it changes the current partition.
##
def test_dg_set_partition(args):
value = deref(exec_build(args[0]))
check_atom(value)
set_partition(value)
return True
################################################################
# os_stream_list/2, os_get_workdir/1 and os_set_workdir/1 #
################################################################
###
# os_stream_list(S, L): internal only
# The built-in succeeds in L with the properties of the stream S.
##
def test_os_stream_list(args):
stream = deref(exec_build(args[0]))
stream = os_stream_list(stream)
return exec_unify(args[1], stream)
def os_stream_list(stream):
if isinstance(stream, Sink):
res = "[]"
else:
check_source(stream)
res = "[]"
value = Compound("line_no", [stream.lineno])
res = Compound(".", [value, res])
return res
###
# os_get_workdir(D): internal only
# The built-in succeeds in D with the working directory.
##
def test_os_get_workdir(args):
url = os.getcwd()+os.path.sep
return exec_unify(args[0], url)
###
# os_set_workdir(D): internal only
# The built-in succeeds. As a side effect it changes the working directory to D.
##
def test_os_set_workdir(args):
url = deref(exec_build(args[0]))
check_atom(url)
try:
os.chdir(url)
except OSError as err:
raise make_error(map_file_error(err, url))
return True
################################################################
# os_get_libpath/1 and os_get_natext/1 #
################################################################
###
# os_get_libpath(D): internal only
# The built-in succeeds in D with the library path.
##
def test_os_get_libpath(args):
return exec_unify(args[0], bootbase)
###
# os_get_natext(D): internal only
# The built-in succeeds in D with the native extension.
##
def test_os_get_natext(args):
return exec_unify(args[0], ".py")
################################################################
# os_call_later/3 and os_call_cancel/1 #
################################################################
###
# os_call_later(N, D, T): internal only
# The predicate succeeds in T with a new timer. As a side
# effect it schedules the compiled goal N to be executed
# after D milliseconds.
##
def test_os_call_later(args):
goal = deref(exec_build(args[0]))
check_goal(goal)
delay = deref(exec_build(args[1]))
check_integer(delay)
buf = machine.ctx
loop = asyncio.get_running_loop()
return exec_unify(args[2], loop.call_later(delay/1000.0, lambda: launch(goal, buf, VOID_ARGS)))
def check_goal(beta):
if not isinstance(beta, Goal):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["goal", beta]))
###
# os_call_cancel(T): internal only
# The predicate succeeds. As a side effect it cancels the timer T.
##
def test_os_call_cancel(args):
res = deref(exec_build(args[0]))
res.cancel()
return True
################################################################
# os_task_current/1, os_task_abort/2 and os_task_create/2 #
################################################################
###
# os_task_current(E): internal only
# The predicate succeeds in E with the current task.
##
def test_os_task_current(args):
return exec_unify(args[0], machine.ctx)
###
# os_task_abort(E, M): internal only
# The predicate succeeds. As a side effect the task E gets
# the message M signalled.
##
def test_os_task_abort(args):
buf = deref(exec_build(args[0]))
msg = exec_build(args[1])
msg = copy_term(msg)
register_signal(buf, msg)
invoke_interrupt(buf)
return True
###
# os_task_create(N, E): internal only
# The predicate succeeds in E with a new task for the compiled
# goal N. The task is scheduled to executed immediately.
##
def test_os_task_create(args):
goal = deref(exec_build(args[0]))
check_goal(goal)
buf = Context()
buf.engine.text_output = store.engine.text_output
buf.engine.text_error = store.engine.text_error
buf.engine.text_input = store.engine.text_input
loop = asyncio.get_running_loop()
loop.create_task(launch_async(goal, buf, VOID_ARGS))
return exec_unify(args[1], buf)
################################################################
# os_host_info/1 #
################################################################
###
# os_host_info(I): internal only
# The predicate succeeds in I with the host programming language info.
##
def test_os_host_info(args):
st = sys.version_info
res = [NotImplemented] * 3
i = 0
while i < len(res):
res[i] = st[i]
i += 1
value = Compound("python", res)
return exec_unify(args[0], value)
################################################################
# Runtime Init #
################################################################
# stream specials, output
add("current_output", 1, make_check(test_current_output))
add("current_error", 1, make_check(test_current_error))
add("set_output", 1, make_check(test_set_output))
add("set_error", 1, make_check(test_set_error))
add("put_code", 2, make_check(test_put_code))
add("current_lastcode", 2, make_check(test_current_lastcode))
add("set_lastcode", 2, make_check(test_set_lastcode))
add("put_atom", 2, make_check(test_put_atom))
# stream specials, input
add("current_input", 1, make_check(test_current_input))
add("set_input", 1, make_check(test_set_input))
add("os_read_sync", 1, make_check(test_os_read_sync))
add("os_get_code", 2, make_check(test_os_get_code))
add("os_peek_code", 2, make_check(test_os_peek_code))
add("os_open_promise", 3, make_check(test_os_open_promise))
add("os_stream_flags", 2, make_check(test_os_stream_flags))
add("os_read_promise", 2, make_check(test_os_read_promise))
add("os_close_promise", 2, make_check(test_os_close_promise))
# stream specials, files
add("os_open_sync", 3, make_check(test_os_open_sync))
add("flush_output", 1, make_check(test_flush_output))
add("os_close_sync", 1, make_check(test_os_close_sync))
add("os_prop_promise", 3, make_check(test_os_prop_promise))
add("set_file_property", 2, make_check(test_set_file_property))
# intermediate representation, Albufeira code
add("ir_place_new", 2, make_check(test_ir_place_new))
add("ir_skeleton_new", 3, make_check(test_ir_skeleton_new))
add("ir_is_site", 1, make_check(test_ir_is_site))
add("ir_pred_site", 2, make_check(test_ir_pred_site))
add("ir_site_name", 2, make_check(test_ir_site_name))
# intermediate representation specials, consult text, internal only
add("ir_clause_new", 6, make_check(test_ir_clause_new))
add("ir_clause_add", 4, make_check(test_ir_clause_add))
add("ir_goal_new", 3, make_check(test_ir_goal_new))
add("ir_goal_run", 1, make_special(special_ir_goal_run))
# knowledge base specials, dynamic database, internal only
add("kb_clause_ref", 3, make_special(special_kb_clause_ref))
add("kb_pred_touch", 3, make_check(test_kb_pred_touch))
add("kb_clause_remove", 4, make_check(test_kb_clause_remove))
add("kb_pred_destroy", 2, make_check(test_kb_pred_destroy))
# knowledge base specials, linked provables, internal only
add("kb_make_defined", 2, make_check(test_kb_make_defined))
add("kb_is_link", 1, make_check(test_kb_is_link))
add("kb_pred_link", 3, make_check(test_kb_pred_link))
add("kb_link_flags", 2, make_check(test_kb_link_flags))
# knowledge base specials, meta data, internal only
add("kb_pred_list", 1, make_check(test_kb_pred_list))
add("kb_clause_creator", 2, make_check(test_kb_clause_creator))
add("kb_clause_shard", 2, make_check(test_kb_clause_shard))
add("kb_clause_head", 2, make_special(special_kb_clause_head))
add("kb_clause_data", 4, make_special(special_kb_clause_data))
# system specials, statistics, internal only
add("dg_date_now", 1, make_check(test_dg_date_now))
add("dg_real_time", 1, make_check(test_dg_real_time))
add("dg_gc_time", 1, make_check(test_dg_gc_time))
add("dg_call_count", 1, make_check(test_dg_call_count))
add("dg_gc_flags", 1, make_check(test_dg_gc_flags))
add("dg_var_serno", 2, make_check(test_dg_var_serno))
# system specials, staging, internal only
add("dg_clear_stage", 0, make_check(test_dg_clear_stage))
add("dg_get_stage", 1, make_check(test_dg_get_stage))
add("dg_set_stage", 1, make_check(test_dg_set_stage))
add("dg_get_partition", 1, make_check(test_dg_get_partition))
add("dg_set_partition", 1, make_check(test_dg_set_partition))
# system specials, operating, internal only
add("os_stream_list", 2, make_check(test_os_stream_list))
add("os_get_workdir", 1, make_check(test_os_get_workdir))
add("os_set_workdir", 1, make_check(test_os_set_workdir))
add("os_get_libpath", 1, make_check(test_os_get_libpath))
add("os_get_natext", 1, make_check(test_os_get_natext))
# system specials, coroutines, internal only
add("os_call_later", 3, make_check(test_os_call_later))
add("os_call_cancel", 1, make_check(test_os_call_cancel))
add("os_task_current", 1, make_check(test_os_task_current))
add("os_task_abort", 2, make_check(test_os_task_abort))
add("os_task_create", 2, make_check(test_os_task_create))
# system specials, natlib, internal only
add("os_host_info", 1, make_check(test_os_host_info))