Python "index"

Admin User, created Mar 06. 2024
         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
import os
import signal
import sys
import asyncio
import nova.store as store
from nova.store import (set_stage, add, set_partition, Compound)
import nova.machine as machine
from nova.machine import (deref, unify, cont, VOID_ARGS, make_error,
launch, register_signal, invoke_interrupt, launch_async)
from nova.special import (check_atom, make_special)
import nova.runtime as runtime
from nova.runtime import (stream_flush, set_bootbase,
Sink, Source, map_stream_error, MASK_SRC_AREAD)
import nova.eval as eval
import bootload
import streams
################################################################
# Main API #
################################################################
###
# Initializes the Prolog system.
##
def init():
set_stage(0)
set_partition("user")
###
# Include the given text into the current stage.
#
# @param text The text.
##
async def consult_async(text):
check_atom(text)
back_input = store.engine.text_input
src = Source()
src.buf = text
store.engine.text_input = src
try:
await perform_async(Compound("ensure_loaded", ["user"]))
finally:
stream_flush(store.engine.text_input)
stream_flush(store.engine.text_error)
store.engine.text_input = back_input
###
# Query the input stream into the current stage.
##
async def toplevel_async():
await perform_async("sys_launch")
###
# Post a message to the Prolog interpreter.
#
# @param message The message.
##
def post(message):
register_signal("main", message)
invoke_interrupt("main")
###
# Writes the Prolog term to the error stream.
#
# @param term The Prolog term.
##
def show(term):
perform(Compound("sys_print_error", [term]))
################################################################
# Foreign Function #
################################################################
###
# The flag indicates a non-void dispatch foreign function.
##
FFI_FUNC = 0x00000100
###
# Add a Python function to the knowledge base. The
# registration is staged, so clear() will remove.
#
# @param functor The functor.
# @param arity The arity.
# @param func The Python function.
# @param flags The flags.
##
def register(functor, arity, func, flags):
if (flags & FFI_FUNC) != 0:
res = make_special(lambda args: invoke_func(func, args))
else:
res = make_special(lambda args: invoke(func, args))
add(functor, arity, res)
def invoke(func, args):
temp = [NotImplemented] * len(args)
i = 0
while i < len(temp):
temp[i] = deref(args[i])
i += 1
func(*temp)
cont(machine.call.args[1])
return True
def invoke_func(func, args):
temp = [NotImplemented] * (len(args) - 1)
i = 0
while i < len(temp):
temp[i] = deref(args[i])
i += 1
temp = func(*temp)
if not unify(args[len(args) - 1], temp):
return False
cont(machine.call.args[1])
return True
###
# Run a Prolog term once. The goal is run with auto-yield
# disabled and promises are not accepted.
#
# @param goal The Prolog term.
# @return True, false or yield.
##
def perform(goal):
goal = Compound(".", [goal, "[]"])
return launch(goal, "main", VOID_ARGS)
###
# Run a Prolog term once. The goal is run with auto-yield
# enabled and promises are accepted.
#
# @param goal The Prolog term.
# @return True or false.
##
async def perform_async(goal):
goal = Compound(".", [goal, "[]"])
return await launch_async(goal, "main", VOID_ARGS)
################################################################
# Plain Console #
################################################################
def plain_out(data, buf):
try:
data.write(buf)
except IOError as err:
raise make_error(map_stream_error(err))
def plain_err(data, buf):
try:
data.write(buf)
except IOError as err:
raise make_error(map_stream_error(err))
def console_flush(data):
try:
data.flush()
except IOError as err:
raise make_error(map_stream_error(err))
async def console_promise(buf, stream):
try:
res = await asyncio.to_thread(blocking_readline, stream.data)
stream.buf = res
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_readline(data):
return data.readline()
################################################################
# Color Console #
################################################################
ANSI_RESET = "\x1B[0m"
ANSI_INPUT = "\x1B[92m" # Intense Green
ANSI_ERROR = "\x1B[91m" # Intense Red
def ansi_out(data, buf):
try:
data.write(ANSI_RESET)
data.write(buf)
data.write(ANSI_INPUT)
except IOError as err:
raise make_error(map_stream_error(err))
def ansi_err(data, buf):
try:
data.write(ANSI_ERROR)
data.write(buf)
data.write(ANSI_INPUT)
except IOError as err:
raise make_error(map_stream_error(err))
################################################################
# When Main #
################################################################
def ctrlc_abort(signum, frame):
post(Compound("system_error", ["user_abort"]))
###
# Simply colored async process standard input/output.
##
async def dogelog_async():
init()
os.system("")
signal.signal(signal.SIGINT, ctrlc_abort)
if len(sys.argv) > 1:
await toplevel_async()
else:
sys.stdout.write(ANSI_INPUT)
sys.stdout.flush()
dst = Sink()
dst.data = sys.stdout
dst.send = ansi_out
dst.notify = console_flush
store.engine.text_output = dst
dst = Sink()
dst.data = sys.stderr
dst.send = ansi_err
dst.notify = console_flush
store.engine.text_error = dst
await toplevel_async()
sys.stdout.write(ANSI_RESET)
sys.stdout.flush()
set_bootbase(__file__)
dst = Sink()
dst.data = sys.stdout
dst.send = plain_out
dst.notify = console_flush
store.engine.text_output = dst
dst = Sink()
dst.data = sys.stderr
dst.send = plain_err
dst.notify = console_flush
store.engine.text_error = dst
src = Source()
src.data = sys.stdin
src.receive = console_promise
src.flags |= MASK_SRC_AREAD
store.engine.text_input = src
if __name__ == "__main__":
asyncio.run(dogelog_async())