Python "runtime"

Admin User, created Apr 17. 2025
         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
import nova.store as store
from nova.store import (MASK_PRED_DYNAMIC, set_stage, pred_link,
Clause, is_provable, remove_clause, is_logical,
set_partition, pred_destroy, clear, add_clause, Cache,
make_defined, add, pred_touch, snapshot_data,
Variable, Compound, is_compound,
Place, Quote, Skeleton, unquote_objects)
import nova.machine as machine
from nova.machine import (cont, deref, gc_major,
make_indicator, lookup_pred, exec_body, exec_head,
defined_clauses, make_indicator_term,
Choice, launch, more, unbind, exec_unify, exec_build,
snap_cleanup, check_nonvar, VOID_ARGS,
Goal, snap_setup, solve, make_error, VAR_MASK_STATE,
unify, real_time, register_signal, teardown,
copy_term, melt_directive)
from nova.special import (check_atom, make_check,
check_integer, make_special, list_objects, check_var,
sys_time_parse)
import os
import time
import sys
import platform
import asyncio
import stat
import gc
import urllib.request
import urllib.error
import errno
import http.client
HTTP_TIME = "%a, %d %b %Y %H:%M:%S GMT"
MAX_BUF = 4096
bootbase = ""
###
# Set the boot base.
#
# @param url The boot base.
##
def set_bootbase(url):
global bootbase
bootbase = url
###################################################################
# current_output/1 and current_error/1 #
###################################################################
###
# Create a text output.
##
class Sink:
def __init__(self):
self.buf = ""
self.send = lambda fd, buf: fd
self.last = -1
self.notify = lambda fd: None
self.release = lambda fd: None
self.data = NotImplemented
self.indent = 0
###
# current_output(S): [ISO 8.11.2]
# The built-in succeeds in S with the current output.
##
def test_current_output(args):
alpha = store.engine.text_output
return exec_unify(args[0], alpha)
###
# current_error(S):
# The built-in succeeds in S with the current error.
##
def test_current_error(args):
alpha = store.engine.text_error
return exec_unify(args[0], alpha)
###################################################################
# set_output/1 and set_error/1 #
###################################################################
###
# set_output(S): [ISO 8.11.4]
# The built-in succeeds. As a side effect the current output is set to S.
##
def test_set_output(args):
obj = deref(exec_build(args[0]))
check_sink(obj)
store.engine.text_output = obj
return True
###
# set_error(S):
# The built-in succeeds. As a side effect the current error is set to S.
##
def test_set_error(args):
obj = deref(exec_build(args[0]))
check_sink(obj)
store.engine.text_error = obj
return True
###
# Assure that the object is a sink.
#
# @param beta The object.
##
def check_sink(beta):
if not isinstance(beta, Sink):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["writer", beta]))
################################################################
# put_atom/2 and dg_var_serno/2 #
################################################################
###
# put_atom(S, A):
# The built-in succeeds. As a side effect, it adds
# the atom to the stream S.
##
def test_put_atom(args):
stream = deref(exec_build(args[0]))
check_sink(stream)
text = deref(exec_build(args[1]))
check_atom(text)
put_atom(stream, text)
return True
def put_atom(stream, text):
if len(text) > 0:
stream.last = last_code(text)
if stream.buf is not None:
stream.buf += text
if len(stream.buf) >= MAX_BUF:
flush_buffer(stream)
else:
stream.data = stream.send(stream.data, text)
def last_code(text):
return ord(text[len(text)-1])
def flush_buffer(stream):
if stream.buf is not None and len(stream.buf) > 0:
text = stream.buf
stream.buf = ""
stream.data = stream.send(stream.data, text)
###
# dg_var_serno(V, S): internal only
# The built-in succeeds in S with the serno of the variable V.
##
def test_dg_var_serno(args):
obj = deref(exec_build(args[0]))
check_var(obj)
return exec_unify(args[1], obj.flags & ~VAR_MASK_STATE)
################################################################
# current_input/1 and set_input/1 #
################################################################
MASK_SRC_SKIP = 0x00000001
MASK_SRC_AREAD = 0x00000002
###
# Create a text input.
##
class Source:
def __init__(self):
self.buf = ""
self.pos = 0
self.receive = lambda fd: ""
self.flags = 0
self.lineno = 0
self.release = lambda fd: None
self.data = NotImplemented
def illegal_state():
raise make_error(Compound("resource_error", ["illegal_state"]))
###
# current_input(S): [ISO 8.11.1]
# The built-in succeeds in S with the current input.
##
def test_current_input(args):
alpha = store.engine.text_input
return exec_unify(args[0], alpha)
###
# set_input(S): [ISO 8.11.3]
# The built-in succeeds. As a side effect it sets the current input to S.
##
def test_set_input(args):
obj = deref(exec_build(args[0]))
check_source(obj)
store.engine.text_input = obj
return True
###
# Assure that the object is a source.
#
# @param beta The object.
##
def check_source(beta):
if not isinstance(beta, Source):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["reader", beta]))
################################################################
# os_read_sync/1, os_get_code/2 and os_peek_code/2 #
################################################################
###
# os_read_sync(S):
# The predicate succeeds. As a side effect the stream buffer is read.
##
def test_os_read_sync(args):
stream = deref(exec_build(args[0]))
stream.buf = stream.receive(stream.data)
stream.pos = 0
return True
###
# os_get_code(S, C):
# The predicate succeeds in C with the Unicode point from the stream buffer S.
# As a side effect the stream position is advanced.
##
def test_os_get_code(args):
stream = deref(exec_build(args[0]))
pos = stream.pos
buf = stream.buf
if pos < len(buf):
ch = ord(buf[pos])
pos += 1
if ch == 13 or (ch == 10 and (stream.flags & MASK_SRC_SKIP) == 0):
stream.lineno += 1
if ch == 13:
stream.flags |= MASK_SRC_SKIP
else:
stream.flags &= ~MASK_SRC_SKIP
stream.pos = pos
return exec_unify(args[1], ch)
else:
return False
###
# os_peek_code(S, C):
# The built-in succeeds in C with the Unicode point from the stream buffer S.
##
def test_os_peek_code(args):
stream = deref(exec_build(args[0]))
pos = stream.pos
buf = stream.buf
if pos < len(buf):
ch = ord(buf[pos])
return exec_unify(args[1], ch)
else:
return False
################################################################
# os_open_promise_opts/4 #
################################################################
###
# os_open_promise_opts(P, L, S, Q):
# The predicate succeeds in Q with a promise for open input S
# on path P and option list L.
##
def test_os_open_promise_opts(args):
url = deref(exec_build(args[0]))
check_atom(url)
opts = deref(exec_build(args[1]))
stream = Source()
if not exec_unify(args[2], stream):
return False
buf = machine.ctx
if url.startswith("http:") or url.startswith("https:"):
prom = open_http_promise_opts(buf, stream, url, opts)
else:
prom = open_file_promise_opts(buf, stream, url, opts)
return exec_unify(args[3], lambda: prom)
async def open_http_promise_opts(buf, stream, url, opts):
if opts is not None and "method" in opts:
method = opts["method"]
elif opts is not None and "body" in opts:
method = "POST"
else:
method = "GET"
if opts is not None and "body" in opts:
opts2 = opts["body"]
enc = get_encoding(opts2)
body = opts2["text"].encode(enc)
else:
body = None
if opts is not None and "headers" in opts:
headers = opts["headers"]
else:
headers = {}
request = urllib.request.Request(url,
data=body, headers=headers, method=method)
try:
response = await asyncio.to_thread(urllib.request.urlopen, request)
except http.client.RemoteDisconnected as err:
register_signal(buf, map_stream_error(err))
return
except urllib.error.HTTPError as err:
register_signal(buf, map_http_result(err, url))
return
except urllib.error.URLError as err:
register_signal(buf, map_file_error(err, url))
return
if opts is not None:
opts["uri"] = response.url
opts["status"] = response.status
res = {}
map = response.headers
for key in map:
res[key.lower()] = map[key]
opts["fields"] = res
enc = get_encoding(opts)
stream.data = response
stream.receive = lambda b, s: http_read_promise(enc, b, s)
stream.release = file_close_promise
stream.flags |= MASK_SRC_AREAD
def get_encoding(opts):
if opts is not None and "encoding" in opts:
return opts["encoding"]
else:
return "utf8"
async def open_file_promise_opts(buf, stream, url, opts):
enc = get_encoding(opts)
try:
file = await asyncio.to_thread(open, url, encoding=enc, newline='')
except OSError as err:
register_signal(buf, map_file_error(err, url))
return
stream.data = file
stream.receive = file_read_promise
stream.release = file_close_promise
stream.flags |= MASK_SRC_AREAD
def map_file_error(err, url):
if hasattr(err, 'reason'):
code = err.reason.errno
if code == 11001:
return Compound("resource_error", ["unknown_host"])
else:
return Compound("resource_error", ["connect_failed"])
else:
code = err.errno
if code == errno.ENOENT or errno.EACCES:
return Compound("existence_error", ["source_sink", url])
else:
return Compound("resource_error", ["remote_error"])
###################################################################
# os_stream_flags/2, os_read_promise/2 and os_close_promise/2 #
###################################################################
###
# os_stream_flags(S, F):
# The predicate succeeds in F with the flags of the stream S.
##
def test_os_stream_flags(args):
obj = deref(exec_build(args[0]))
if isinstance(obj, Source):
flags = obj.flags
else:
check_sink(obj)
flags = 0
return exec_unify(args[1], flags)
###
# os_read_promise(S, P):
# The predicate suceeds in P with a read promise for a input S.
##
def test_os_read_promise(args):
stream = deref(exec_build(args[0]))
buf = machine.ctx
return exec_unify(args[1], lambda: stream.receive(buf, stream))
async def http_read_promise(enc, buf, stream):
try:
res = await asyncio.to_thread(blocking_chunk, enc, stream.data)
stream.buf = res
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_chunk(enc, data):
return data.read().decode(enc)
async def file_read_promise(buf, stream):
try:
res = await asyncio.to_thread(blocking_read, stream.data)
stream.buf = res
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_read(data):
if data.isatty():
return data.readline(8192)
else:
return data.read(8192)
###
# Map a stream error when in transit.
#
# @param err The offending error.
# @return The Prolog error term.
##
def map_stream_error(err):
code = err.errno
if code == errno.ETIMEDOUT:
return Compound("resource_error", ["socket_timeout"])
elif code == errno.EADDRINUSE:
return Compound("resource_error", ["port_error"])
elif code == errno.ECONNRESET:
return Compound("resource_error", ["remote_error"])
else:
return Compound("resource_error", ["io_exception"])
###
# os_close_promise(S, P):
# The predicate suceeds in P with a read promise for a input S.
##
def test_os_close_promise(args):
stream = deref(exec_build(args[0]))
buf = machine.ctx
return exec_unify(args[1], lambda: stream.release(buf, stream))
async def file_close_promise(buf, stream):
try:
await asyncio.to_thread(blocking_close, stream.data)
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_close(data):
return data.close()
################################################################
# os_open_sync_opts/4 #
################################################################
###
# os_open_sync_opts(P, M, L, S):
# The predicate succeeds. As a side effect the stream S is
# opened on the path P with the mode M and the option list L.
##
def test_os_open_sync_opts(args):
url = deref(exec_build(args[0]))
check_atom(url)
mode = deref(exec_build(args[1]))
check_atom(mode)
opts = deref(exec_build(args[2]))
if "read" == mode:
raise make_error(Compound("resource_error",
["not_implemented"]))
elif "write" == mode:
stream = open_write(url, "w", opts)
elif "append" == mode:
stream = open_write(url, "a", opts)
else:
raise make_error(Compound("domain_error",
["io_mode", mode]))
return exec_unify(args[3], stream)
def open_write(url, mode, opts):
enc = get_encoding(opts)
try:
file = open(url, mode, encoding=enc, newline='')
except OSError as err:
raise make_error(map_file_error(err, url))
dst = Sink()
dst.data = file
dst.send = file_write
dst.release = file_close
return dst
def file_write(data, buf):
try:
data.write(buf)
return data
except IOError as err:
raise make_error(map_stream_error(err))
def file_close(data):
try:
data.close()
except IOError as err:
raise make_error(map_stream_error(err))
################################################################
# flush_output/1 and os_close_sync/1 #
################################################################
###
# flush_output(S): [ISO 8.11.7]
# The built-in succeeds. As a side effect, it flushes the stream buffer.
##
def test_flush_output(args):
stream = deref(exec_build(args[0]))
check_sink(stream)
stream_flush(stream)
return True
def stream_flush(stream):
flush_buffer(stream)
stream.notify(stream.data)
###
# close(S): [ISO 8.11.6]
# The built-in succeeds. As a side effect, the stream S is closed.
##
def test_os_close_sync(args):
stream = deref(exec_build(args[0]))
if isinstance(stream, Sink):
pass
else:
check_source(stream)
stream_close(stream)
return True
def stream_close(stream):
if isinstance(stream, Sink):
flush_buffer(stream)
stream.release(stream.data)
else:
stream.release(stream.data)
################################################################
# os_prop_promise/3 and set_file_property/2 #
################################################################
###
# os_prop_promise(F, M, Q):
# The predicate succeeds in Q with with a promise for the
# properties M of the file F. Barks if path F doesn't exist
# or io exception while resolving.
##
def test_os_prop_promise(args):
url = deref(exec_build(args[0]))
check_atom(url)
res = {}
if not exec_unify(args[1], res):
return False
buf = machine.ctx
if url.startswith("http:") or url.startswith("https:"):
prom = prop_http_promise(buf, url, res)
else:
prom = prop_file_promise(buf, url, res)
return exec_unify(args[2], lambda: prom)
async def prop_http_promise(buf, url, res):
request = urllib.request.Request(url, method="HEAD")
try:
response = await asyncio.to_thread(urllib.request.urlopen, request)
await asyncio.to_thread(response.close)
except http.client.RemoteDisconnected as err:
register_signal(buf, map_stream_error(err))
return
except urllib.error.HTTPError as err:
register_signal(buf, map_http_result(err, url))
return
except urllib.error.URLError as err:
register_signal(buf, Compound("resource_error", ["url_exception"]))
return
val = response.headers.get("last-modified")
mtime = sys_time_parse(val, HTTP_TIME, 1) \
if val is not None else -1
res["last_modified"] = mtime
res["absolute_path"] = response.url
res["type"] = "regular"
def map_http_result(err, url):
code = err.code
if code == 403: # Forbidden
return Compound("permission_error",
["open", "source_sink", url])
elif code == 404: # Not Found
return Compound("existence_error",
["source_sink", url])
elif code == 405: # Method Not Allowed
return Compound("resource_error",
["illegal_method"])
elif code == 500: # Internal Server Error
return Compound("resource_error",
["internal_error"])
elif code == 503: # Service Unavailable
return Compound("resource_error",
["service_unavailable"])
else:
return Compound("resource_error",
["io_exception"])
async def prop_file_promise(buf, url, res):
try:
stats = await asyncio.to_thread(os.stat, url)
ftype = "regular" if stat.S_ISREG(stats.st_mode) else \
("directory" if stat.S_ISDIR(stats.st_mode) else "other")
res["last_modified"] = stats.st_mtime_ns // 1000000
res["absolute_path"] = os.path.abspath(url)
res["type"] = ftype
except OSError as err:
register_signal(buf, map_file_error(err, url))
return
###
# set_file_property(F, P):
# The predicate assigns the property P to the file F.
##
def test_set_file_property(args):
url = deref(exec_build(args[0]))
check_atom(url)
if url.startswith("http:") or url.startswith("https:"):
raise make_error(Compound("permission_error", ["access", "source_sink", url]))
prop = deref(exec_build(args[1]))
if (is_compound(prop) and
prop.functor == "last_modified"
and len(prop.args) == 1):
val2 = deref(prop.args[0])
check_integer(val2)
if val2 < 0:
raise make_error(Compound("domain_error",
["not_less_than_zero", val2]))
try:
os.utime(url, (val2/1000, val2/1000))
except OSError as err:
raise make_error(map_file_error(err, url))
else:
check_nonvar(prop)
prop = copy_term(prop)
raise make_error(Compound("domain_error", ["prolog_property", prop]))
return True
################################################################
# ir_place_new/2, ir_skeleton_new/3 and ir_pred_site/2 #
################################################################
###
# ir_place_new(I, S):
# The predicate succeeds in S with a new place for index I.
##
def test_ir_place_new(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
alpha = Place(alpha)
return exec_unify(args[1], Quote(alpha))
###
# ir_skeleton_new(F, L, S):
# The predicate succeeds in S with a new skeleton for functor F and list L.
##
def test_ir_skeleton_new(args):
alpha = deref(exec_build(args[0]))
beta = deref(exec_build(args[1]))
beta = list_objects(beta)
unquote_objects(beta)
alpha = Skeleton(alpha, beta)
return exec_unify(args[2], Quote(alpha))
###
# ir_pred_site(F, Q): internal only
# The built-in succeeds in Q with the cache for the functor F.
##
def test_ir_pred_site(args):
name = deref(exec_build(args[0]))
check_atom(name)
return exec_unify(args[1], Cache(name))
################################################################
# ir_clause_new/5 and ir_clause_add/4 #
################################################################
###
# ir_clause_new(S, H, B, R, D, C):
# The built-in succeeds in C with a Java object representing
# a clause with variable count S, head instructions H, body instructions B,
# cut variable index R and head index option D.
##
def test_ir_clause_new(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
beta = deref(exec_build(args[1]))
beta = list_objects(beta)
gamma = deref(exec_build(args[2]))
gamma = list_objects(gamma)
delta = deref(exec_build(args[3]))
check_integer(delta)
mue = deref(exec_build(args[4]))
if is_compound(mue) and mue.functor == "just" and len(mue.args) == 1:
mue = deref(mue.args[0])
else:
mue = NotImplemented
unquote_objects(beta)
unquote_objects(gamma)
return unify(args[5], Clause(alpha, beta, gamma, delta, mue))
###
# ir_clause_add(F, A, C, O):
# The built-in succeeds. As a side effect the Python object clause C
# is added according to options O to the knowledge base for the predicate
# indicator F/A. If C is not a clause, but directly a Python object
# provable, the built-in definition of the indicator F/A is updated.
##
def test_ir_clause_add(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
gamma = deref(exec_build(args[2]))
flags = deref(exec_build(args[3]))
check_integer(flags)
add_clause(functor, arity, gamma, flags)
return True
################################################################
# ir_goal_new/4 and ir_goal_run/1 #
################################################################
###
# ir_goal_new(S, B, R, G):
# The built-in succeeds in G with a Python object representing
# a goal with variable count S, body instructions B and the
# cut variable index R.
##
def test_ir_goal_new(args):
alpha = deref(exec_build(args[0]))
check_integer(alpha)
beta = deref(exec_build(args[1]))
beta = list_objects(beta)
gamma = deref(exec_build(args[2]))
check_integer(gamma)
unquote_objects(beta)
return exec_unify(args[3], Goal(alpha, beta, gamma))
###
# ir_goal_run(G):
# As a side effect, the built-in executes the Python object goal G,
# cuts away its choice points and undoes its bindings. If the goal fails,
# it throws a new error. If the goal throws an error, it re-throws this error.
# If the goal succeeds, the built-in succeeds.
##
def special_ir_goal_run(args):
goal = deref(args[0])
check_goal(goal)
snap = snap_setup()
cont(melt_directive(goal))
return solve_run(snap, True, None)
def check_goal(beta):
if not isinstance(beta, Goal):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["goal", beta]))
###
# Call or resume a goal.
# Failure results in error, success results in cut.
#
# @param snap The surrounding choice point.
# @param found The call or redo flag.
# @param choice The choice point for reuse or null.
# @return True if goal succeeded, otherwise false.
##
def solve_run(snap, found, choice):
try:
found = solve(snap, found)
except Exception as err:
snap_cleanup(snap)
raise err from None
if found is False:
raise make_error(Compound("syntax_error", ["directive_failed"]))
if found is not True:
if machine.redo is not snap:
if choice is None:
choice = Choice(solve_run, snap, False, machine.trail)
else:
choice.mark = machine.trail
choice.cont = machine.call
choice.tail = machine.redo
more(choice)
else:
more(snap.tail)
return found
snap_cleanup(snap)
cont(machine.call.args[1])
return True
################################################################
# kb_clause_ref/3 and kb_pred_touch/3 #
################################################################
MASK_FIND_MODIFY = 0x00000001
MASK_FIND_DYNAMIC = 0x00000002
MASK_FIND_REVERSE = 0x00000004
###
# kb_clause_ref(H, F, C): internal only
# The built-in succeeds in C with the clause references
# for the head H and the flags F.
##
def special_kb_clause_ref(args):
head = deref(args[0])
flags = deref(args[1])
check_integer(flags)
peek = lookup_pred(head)
if peek is NotImplemented:
return False
if (flags & MASK_FIND_DYNAMIC) != 0:
if (peek.flags & MASK_PRED_DYNAMIC) == 0:
make_error_find(head, flags)
if not is_logical(peek.rope):
return False
if is_compound(head):
head = head.args
else:
head = VOID_ARGS
peek = defined_clauses(peek, head)
peek = snapshot_data(peek)
if (flags & MASK_FIND_REVERSE) == 0:
return solve2_ref(args, peek, 0, None)
else:
return solve2_ref_reverse(args, peek, len(peek), None)
def make_error_find(head, flags):
if (flags & MASK_FIND_MODIFY) != 0:
raise make_error(Compound("permission_error",
["modify", "static_procedure", make_indicator_term(head)]))
else:
raise make_error(Compound("permission_error",
["access", "private_procedure", make_indicator_term(head)]))
def solve_ref(rope, at, choice):
goal = deref(machine.call.args[0])
return solve2_ref(goal.args, rope, at, choice)
def solve_ref_reverse(rope, at, choice):
goal = deref(machine.call.args[0])
return solve2_ref_reverse(goal.args, rope, at, choice)
###
# Search a Prolog clause and return it.
#
# @param args The current arguments.
# @param rope The clause list.
# @param at The clause index.
# @param choice The choice point for reuse or null.
# @return True if search succeeds, otherwise false.
##
def solve2_ref(args, rope, at, choice):
mark = machine.trail
while at < len(rope):
clause = rope[at]
at += 1
if unify(args[2], clause):
if at < len(rope):
if choice is None:
choice = Choice(solve_ref, rope, at, mark)
else:
choice.at = at
more(choice)
cont(machine.call.args[1])
return True
unbind(mark)
return False
###
# Search a Prolog clause backwards and return it.
#
# @param args The current arguments.
# @param rope The clause list.
# @param at The clause index.
# @param choice The choice point for reuse or null.
# @return True if search succeeds, otherwise false.
##
def solve2_ref_reverse(args, rope, at, choice):
mark = machine.trail
while at > 0:
at -= 1
clause = rope[at]
if unify(args[2], clause):
if at > 0:
if choice is None:
choice = Choice(solve_ref_reverse, rope, at, mark)
else:
choice.at = at
more(choice)
cont(machine.call.args[1])
return True
unbind(mark)
return False
###
# kb_pred_touch(F, N, O): internal only
# The built-in succeeds. As a side effect the predicate
# indicator F/N with options O is touched.
##
def test_kb_pred_touch(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
flags = deref(exec_build(args[2]))
check_integer(flags)
pred_touch(functor, arity, flags)
return True
################################################################
# kb_clause_remove/4 and kb_pred_destroy/2 #
################################################################
###
# kb_clause_remove(F, N, G, R): internal only
# The built-in succeeds if the clause or predicate R could
# be removed from the predicate indicator F/N and the flags G.
##
def test_kb_clause_remove(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
flags = deref(exec_build(args[2]))
check_integer(flags)
clause = deref(exec_build(args[3]))
check_clause(clause)
return remove_clause(functor, arity, clause, flags)
def check_clause(beta):
if not isinstance(beta, Clause):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error", ["clause", beta]))
###
# kb_pred_destroy(F, N): internal only
# The built-in succeeds. As a side effect the
# predicate indicator F/N is destroyed.
##
def test_kb_pred_destroy(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
pred_destroy(functor, arity)
return True
################################################################
# kb_make_defined/2, kb_pred_link/3 and kb_link_flags/2 #
################################################################
###
# kb_make_defined(L, P): internal only
# The built-in succeeds in P with an annonymous predicate for the clauses L.
##
def test_kb_make_defined(args):
alpha = deref(exec_build(args[0]))
alpha = list_objects(alpha)
return exec_unify(args[1], make_defined(alpha))
###
# kb_pred_link(F, N, Q): internal only
# The built-in succeeds in Q with the provable of the
# predicate indicator F/A. Otherwise if no such provable
# exists the built-in fails.
##
def test_kb_pred_link(args):
functor = deref(exec_build(args[0]))
check_atom(functor)
arity = deref(exec_build(args[1]))
check_integer(arity)
peek = pred_link(functor, arity)
if peek is NotImplemented:
return False
return exec_unify(args[2], peek)
###
# kb_link_flags(Q, F): internal only
# The built-in succeeds in F with the flags of the provable Q.
##
def test_kb_link_flags(args):
peek = deref(exec_build(args[0]))
check_provable(peek)
return exec_unify(args[1], peek.flags)
###
# Assure that the object is a provable.
#
# @param beta The object.
##
def check_provable(beta):
if not is_provable(beta):
check_nonvar(beta)
beta = copy_term(beta)
raise make_error(Compound("type_error",
["provable", beta]))
################################################################
# kb_pred_list/1, kb_clause_creator/2 and kb_clause_shard/2 #
################################################################
###
# kb_pred_list(L): internal only
# The built-in succeeds in L with the current predicate indicators.
##
def test_kb_pred_list(args):
res = kb_pred_list()
return exec_unify(args[0], res)
def kb_pred_list():
back = None
res = None
for functor in store.kb:
temp = store.kb[functor]
i = 0
while i < len(temp):
peek = temp[i]
if peek is NotImplemented or peek.remover is not NotImplemented:
i += 1
continue
peek = Compound(".", [make_indicator(functor, i), NotImplemented])
if back is None:
res = peek
else:
back.args[1] = peek
back = peek
i += 1
if back is None:
res = "[]"
else:
back.args[1] = "[]"
return res
###
# kb_clause_creator(C, S):
# The built-in succeeds in S with the creator of the clause C.
##
def test_kb_clause_creator(args):
clause = deref(exec_build(args[0]))
check_clause(clause)
return exec_unify(args[1], clause.creator)
###
# kb_clause_shard(C, S):
# The built-in succeeds in S with the shard of the clause C.
##
def test_kb_clause_shard(args):
clause = deref(exec_build(args[0]))
check_clause(clause)
return exec_unify(args[1], clause.shard)
################################################################
# kb_clause_head/2 and kb_clause_data/4 #
################################################################
###
# kb_clause_head(C, H): internal only
# The built-in succeeds in H with the head of the clause C.
##
def special_kb_clause_head(args):
clause = deref(args[0])
check_clause(clause)
head = deref(args[1])
if clause.size != 0:
display = [NotImplemented] * clause.size
else:
display = None
if (is_compound(head) and
not exec_head(clause.head, display, head.args)):
return False
cont(machine.call.args[1])
return True
###
# kb_clause_data(C, H, O, L): internal only
# The built-in succeeds in H, O and L with the head,
# cut var and body of the clause C.
##
def special_kb_clause_data(args):
clause = deref(args[0])
check_clause(clause)
head = deref(args[1])
if clause.size != 0:
display = [NotImplemented] * clause.size
else:
display = None
if (is_compound(head) and
not exec_head(clause.head, display, head.args)):
return False
peek = clause.cutvar
if peek != -1:
temp = Variable()
display[peek] = temp
temp = Compound("just", [temp])
else:
temp = "nothing"
if not unify(args[2], temp):
return False
temp = exec_body(clause.body, display)
if not unify(args[3], temp):
return False
cont(machine.call.args[1])
return True
####################################################################
# sys_stat_map/1, dg_gc_flags/1 and garbage_collect/0 #
####################################################################
###
# sys_stat_map(M): internal only
# The built-in succeeds in W with the statistics map.
##
def test_sys_stat_map(args):
res = {}
res["wall"] = int(round(time.time()*1000))
res["time"] = real_time()
res["calls"] = machine.gc_enter
res["gctime"] = machine.gc_time
res["used"] = 0
return exec_unify(args[0], res)
###
# dg_gc_flags(W): internal only
# The built-in succeeds in W with the garbage collector flags.
##
def test_dg_gc_flags(args):
return exec_unify(args[0], machine.gc_flags)
###
# garbage_collect: internal only
# The built-in succeeds in attempting a garbage colection.
##
def test_garbage_collect(args):
gc_major()
gc.collect()
return True
####################################################################
# sys_flag_map/1 #
####################################################################
###
# sys_flag_map(M): internal only
# The built-in succeeds in W with a Prolog flag map.
##
def test_sys_flag_map(args):
res = {}
res["host_info"] = platform.python_implementation()+", Python "+platform.python_version()
res["mach_info"] = platform.processor()+", "+platform.platform()
return exec_unify(args[0], res)
################################################################
# dg_clear_stage/0, dg_get_stage/1 and dg_set_stage/1 #
################################################################
###
# dg_clear_stage: internal only
# The built-in succeeds. As a side effect it clears the current stage.
##
def test_dg_clear_stage(args):
teardown()
clear()
return True
###
# dg_get_stage(D): internal only
# The built-in succeeds in D with the current stage.
##
def test_dg_get_stage(args):
return exec_unify(args[0], store.stage)
###
# dg_set_stage(D): internal only
# The built-in succeeds. As a side effect it changes the current stage.
##
def test_dg_set_stage(args):
value = deref(exec_build(args[0]))
check_integer(value)
set_stage(value)
return True
################################################################
# func_dg_get_partition/1 and dg_set_partition/1 #
################################################################
###
# dg_get_partition(D): internal only
# The built-in succeeds in D with the current partition.
##
def test_dg_get_partition(args):
return exec_unify(args[0], store.engine.partition)
###
# dg_set_partition(D): internal only
# The built-in succeeds. As a side effect it changes the current partition.
##
def test_dg_set_partition(args):
value = deref(exec_build(args[0]))
check_atom(value)
set_partition(value)
return True
################################################################
# os_get_workdir/1 and os_set_workdir/1 #
################################################################
###
# os_get_workdir(D): internal only
# The built-in succeeds in D with the working directory.
##
def test_os_get_workdir(args):
url = os.getcwd()+os.path.sep
return exec_unify(args[0], url)
###
# os_set_workdir(D): internal only
# The built-in succeeds. As a side effect it changes the working directory to D.
##
def test_os_set_workdir(args):
url = deref(exec_build(args[0]))
check_atom(url)
try:
os.chdir(url)
except OSError as err:
raise make_error(map_file_error(err, url))
return True
################################################################
# os_get_libpath/1 and os_get_natext/1 #
################################################################
###
# os_get_libpath(D): internal only
# The built-in succeeds in D with the library path.
##
def test_os_get_libpath(args):
return exec_unify(args[0], bootbase)
###
# os_get_natext(D): internal only
# The built-in succeeds in D with the native extension.
##
def test_os_get_natext(args):
return exec_unify(args[0], ".py")
################################################################
# Runtime Init #
################################################################
# stream specials, output
add("current_output", 1, make_check(test_current_output))
add("current_error", 1, make_check(test_current_error))
add("set_output", 1, make_check(test_set_output))
add("set_error", 1, make_check(test_set_error))
add("put_atom", 2, make_check(test_put_atom))
add("dg_var_serno", 2, make_check(test_dg_var_serno))
# stream specials, input
add("current_input", 1, make_check(test_current_input))
add("set_input", 1, make_check(test_set_input))
add("os_read_sync", 1, make_check(test_os_read_sync))
add("os_get_code", 2, make_check(test_os_get_code))
add("os_peek_code", 2, make_check(test_os_peek_code))
add("os_open_promise_opts", 4, make_check(test_os_open_promise_opts))
add("os_stream_flags", 2, make_check(test_os_stream_flags))
add("os_read_promise", 2, make_check(test_os_read_promise))
add("os_close_promise", 2, make_check(test_os_close_promise))
# stream specials, files
add("os_open_sync_opts", 4, make_check(test_os_open_sync_opts))
add("flush_output", 1, make_check(test_flush_output))
add("os_close_sync", 1, make_check(test_os_close_sync))
add("os_prop_promise", 3, make_check(test_os_prop_promise))
add("set_file_property", 2, make_check(test_set_file_property))
# intermediate representation, Albufeira code
add("ir_place_new", 2, make_check(test_ir_place_new))
add("ir_skeleton_new", 3, make_check(test_ir_skeleton_new))
add("ir_pred_site", 2, make_check(test_ir_pred_site))
# intermediate representation specials, consult text, internal only
add("ir_clause_new", 6, make_check(test_ir_clause_new))
add("ir_clause_add", 4, make_check(test_ir_clause_add))
add("ir_goal_new", 4, make_check(test_ir_goal_new))
add("ir_goal_run", 1, make_special(special_ir_goal_run))
# knowledge base specials, dynamic database, internal only
add("kb_clause_ref", 3, make_special(special_kb_clause_ref))
add("kb_pred_touch", 3, make_check(test_kb_pred_touch))
add("kb_clause_remove", 4, make_check(test_kb_clause_remove))
add("kb_pred_destroy", 2, make_check(test_kb_pred_destroy))
# knowledge base specials, linked provables, internal only
add("kb_make_defined", 2, make_check(test_kb_make_defined))
add("kb_pred_link", 3, make_check(test_kb_pred_link))
add("kb_link_flags", 2, make_check(test_kb_link_flags))
# knowledge base specials, meta data, internal only
add("kb_pred_list", 1, make_check(test_kb_pred_list))
add("kb_clause_creator", 2, make_check(test_kb_clause_creator))
add("kb_clause_shard", 2, make_check(test_kb_clause_shard))
add("kb_clause_head", 2, make_special(special_kb_clause_head))
add("kb_clause_data", 4, make_special(special_kb_clause_data))
# system specials, statistics, internal only
add("sys_stat_map", 1, make_check(test_sys_stat_map))
add("dg_gc_flags", 1, make_check(test_dg_gc_flags))
add("garbage_collect", 0, make_check(test_garbage_collect))
add("sys_flag_map", 1, make_check(test_sys_flag_map))
# system specials, staging, internal only
add("dg_clear_stage", 0, make_check(test_dg_clear_stage))
add("dg_get_stage", 1, make_check(test_dg_get_stage))
add("dg_set_stage", 1, make_check(test_dg_set_stage))
add("dg_get_partition", 1, make_check(test_dg_get_partition))
add("dg_set_partition", 1, make_check(test_dg_set_partition))
# system specials, operating, internal only
add("os_get_workdir", 1, make_check(test_os_get_workdir))
add("os_set_workdir", 1, make_check(test_os_set_workdir))
add("os_get_libpath", 1, make_check(test_os_get_libpath))
add("os_get_natext", 1, make_check(test_os_get_natext))