Admin User, created Apr 17. 2025
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
from nova.core import (exec_unify, exec_build, deref, check_atom,
check_clause, make_error, Sink, Source, map_stream_error,
Context, task_async, get_ctx, get_encoding,
check_integer, add, make_check, register_signal)
from http.server import BaseHTTPRequestHandler, HTTPServer
import asyncio
import threading
import sys
#######################################################################
# HTTP Server #
#######################################################################
###
# os_http_server_new(S):
# The predicate succeeds in S with a new http server.
##
def test_os_http_server_new(args):
obj = WebServer(None, WebRequestHandler, bind_and_activate=False)
return exec_unify(args[0], obj)
class WebServer(HTTPServer):
allow_reuse_address = 0
def handle_error(self, request, client_address):
if isinstance(sys.exception(), ConnectionAbortedError):
pass
else:
super().handle_error(request, client_address)
class WebRequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def log_message(self, fmt, *args):
pass
def do_HEAD(self):
self.server.func(self, self)
def do_GET(self):
self.server.func(self, self)
def do_POST(self):
self.server.func(self, self)
def do_DELETE(self):
self.server.func(self, self)
def do_PUT(self):
self.server.func(self, self)
###
# sys_http_server bind(S, T, C):
# The predicate succeeds. As a side effect the compiled handler C
# is registered as listener to event T from server S.
##
def test_os_http_server_on(args):
obj = deref(exec_build(args[0]))
typ = deref(exec_build(args[1]))
check_atom(typ)
clause = deref(exec_build(args[2]))
check_clause(clause)
loop = asyncio.get_running_loop()
if typ == "request":
buf = Context()
obj.func = lambda *paras: baby_come_back(
task_async(clause, buf, paras), loop)
return True
def baby_come_back(coro, loop):
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
###
# os_http_listen_sync(S, P):
# The predicate succeeds. As a side effect the server S
# starts listening on port P.
##
def test_os_http_listen_sync(args):
obj = deref(exec_build(args[0]))
port = deref(exec_build(args[1]))
check_integer(port)
obj.server_address = ("", port)
try:
obj.server_bind()
obj.server_activate()
except IOError as err:
raise make_error(map_stream_error(err))
thread = threading.Thread(target=blocking_forever, args=(obj,))
thread.start()
return True
def blocking_forever(obj):
obj.serve_forever()
###
# os_http_close_sync(S):
# The predicate succeeds. As a side effect the server S is closed.
##
def test_os_http_close_sync(args):
obj = deref(exec_build(args[0]))
obj.shutdown()
obj.server_close()
return True
#######################################################################
# HTTP Request #
#######################################################################
###
# http_current_method(S, P):
# The predicate succeeds in M with the method of the HTTP request S.
##
def test_http_current_method(args):
obj = deref(exec_build(args[0]))
return exec_unify(args[1], obj.command)
###
# http_current_path(S, P):
# The predicate succeeds in P with the path of the HTTP request S.
##
def test_http_current_path(args):
obj = deref(exec_build(args[0]))
return exec_unify(args[1], obj.path)
###
# os_http_current_headers(S, M):
# The predicate succeeds in M with the headers of the HTTP request S.
##
def test_os_http_current_headers(args):
obj = deref(exec_build(args[0]))
res = {}
map = obj.headers
for key in map:
res[key.lower()] = map[key]
return exec_unify(args[1], res)
###
# os_http_input_promise_opts(S, L, R, Q):
# The predicate succeeds in Q with a promise for a new
# text reader R on HTTP request S and option list L.
###
def test_os_http_input_promise_opts(args):
obj = deref(exec_build(args[0]))
opts = deref(exec_build(args[1]))
stream = Source()
if not exec_unify(args[2], stream):
return False
buf = get_ctx()
prom = http_input_promise_opts(buf, stream, obj, opts)
return exec_unify(args[3], lambda: prom)
async def http_input_promise_opts(buf, stream, obj, opts):
try:
enc = get_encoding(opts)
text = await asyncio.to_thread(blocking_input, enc, obj)
stream.buf = text
stream.pos = 0
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_input(enc, obj):
clen = int(obj.headers.get('content-length'))
return obj.rfile.read(clen).decode(enc)
#######################################################################
# HTTP Response #
#######################################################################
###
# os_http_write_head(S, C, H):
# The predicate succeeds. As a side effect it writes the status
# code C and the headers map H to the HTTP response S.
##
def test_os_http_write_head(args):
obj = deref(exec_build(args[0]))
code = deref(exec_build(args[1]))
check_integer(code)
headers = deref(exec_build(args[2]))
try:
obj.send_response(code)
obj.send_header('transfer-encoding', 'chunked')
for (key, value) in headers.items():
obj.send_header(key, value)
obj.end_headers()
except IOError as err:
raise make_error(map_stream_error(err))
return True
###
# os_http_output_opts(S, L, W):
# The predicate succeeds in W with a new text writer for
# the HTTP response S and the option list L.
##
def test_os_http_output_opts(args):
obj = deref(exec_build(args[0]))
opts = deref(exec_build(args[1]))
dst = Sink()
enc = get_encoding(opts)
dst.data = obj.wfile
dst.send = lambda data, buf: http_send(data, buf, enc)
dst.notify = http_notify
dst.release = http_release
return exec_unify(args[2], dst)
def http_send(data, buf, enc):
if len(buf) == 0:
return
buf = buf.encode(enc)
data.write(b"%X" % len(buf))
data.write(b"\r\n")
data.write(buf)
data.write(b"\r\n")
return data
def http_notify(res):
res.flush()
def http_release(res):
res.write(b"0\r\n\r\n")
#######################################################################
# HTTP Lib Init #
#######################################################################
def main():
add("os_http_server_new", 1, make_check(test_os_http_server_new))
add("os_http_server_on", 3, make_check(test_os_http_server_on))
add("os_http_listen_sync", 2, make_check(test_os_http_listen_sync))
add("os_http_close_sync", 1, make_check(test_os_http_close_sync))
add("http_current_method", 2, make_check(test_http_current_method))
add("http_current_path", 2, make_check(test_http_current_path))
add("os_http_current_headers", 2, make_check(test_os_http_current_headers))
add("os_http_input_promise_opts", 4, make_check(test_os_http_input_promise_opts))
add("os_http_write_head", 3, make_check(test_os_http_write_head))
add("os_http_output_opts", 3, make_check(test_os_http_output_opts))