Python "httplib"

Admin User, created Mar 06. 2024
         
###
# Modern Albufeira Prolog Interpreter
#
# Warranty & Liability
# To the extent permitted by applicable law and unless explicitly
# otherwise agreed upon, XLOG Technologies AG makes no warranties
# regarding the provided information. XLOG Technologies AG assumes
# no liability that any problems might be solved with the information
# provided by XLOG Technologies AG.
#
# Rights & License
# All industrial property rights regarding the information - copyright
# and patent rights in particular - are the sole property of XLOG
# Technologies AG. If the company was not the originator of some
# excerpts, XLOG Technologies AG has at least obtained the right to
# reproduce, change and translate the information.
#
# Reproduction is restricted to the whole unaltered document. Reproduction
# of the information is only allowed for non-commercial uses. Selling,
# giving away or letting of the execution of the library is prohibited.
# The library can be distributed as part of your applications and libraries
# for execution provided this comment remains unchanged.
#
# Restrictions
# Only to be distributed with programs that add significant and primary
# functionality to the library. Not to be distributed with additional
# software intended to replace any components of the library.
#
# Trademarks
# Jekejeke is a registered trademark of XLOG Technologies AG.
##
from dogelog import (exec_unify, exec_build, deref, check_atom,
check_clause, open_file_promise, open_write, make_error, Compound,
Sink, Source, map_http_result, file_close_promise, http_read_promise,
get_engine, Context, launch_async, get_ctx, MASK_SRC_AREAD,
check_integer, add, make_check, register_signal, map_stream_error)
from http.server import BaseHTTPRequestHandler, HTTPServer
import urllib.request
import urllib.error
import asyncio
import threading
#######################################################################
# HTTP Client #
#######################################################################
###
# os_open_promise_opts(P, L, S, Q):
# The predicate succeeds in Q with a promise for open input S
# on path P and option list L.
##
def test_os_open_promise_opts(args):
url = deref(exec_build(args[0]))
check_atom(url)
opts = deref(exec_build(args[1]))
stream = Source()
if not exec_unify(args[2], stream):
return False
buf = get_ctx()
if url.startswith("http:") or url.startswith("https:"):
prom = open_http_promise_opts(buf, stream, url, opts)
else:
prom = open_file_promise(buf, stream, url)
return exec_unify(args[3], lambda: prom)
async def open_http_promise_opts(buf, stream, url, opts):
if "method" in opts:
method = opts["method"]
elif "body" in opts:
method = "POST"
else:
method = "GET"
if "body" in opts:
data = opts["body"].encode("utf-8")
else:
data = None
if "headers" in opts:
headers = opts["headers"]
else:
headers = {}
request = urllib.request.Request(url,
data=data, headers=headers, method=method)
try:
response = await asyncio.to_thread(urllib.request.urlopen, request)
except urllib.error.HTTPError as err:
register_signal(buf, map_http_result(err.code, url))
return
except urllib.error.URLError as err:
register_signal(buf, Compound("resource_error", ["url_exception"]))
return
stream.data = response
stream.receive = http_read_promise
stream.release = file_close_promise
stream.flags |= MASK_SRC_AREAD
###
# os_open_sync_opts(P, M, L, S):
# The predicate succeeds. As a side effect the stream S is
# opened on the path P with the mode M and the option list L.
##
def test_os_open_sync_opts(args):
url = deref(exec_build(args[0]))
check_atom(url)
mode = deref(exec_build(args[1]))
check_atom(mode)
opts = deref(exec_build(args[2]))
if "read" == mode:
raise make_error(Compound("resource_error",
["not_implemented"]))
elif "write" == mode:
stream = open_write(url, "w")
elif "append" == mode:
stream = open_write(url, "a")
else:
raise make_error(Compound("domain_error",
["io_mode", mode]))
return exec_unify(args[3], stream)
#######################################################################
# HTTP Server #
#######################################################################
###
# http_server_new(S):
# The predicate succeeds in S with a new http server.
##
def test_http_server_new(args):
obj = HTTPServer(None, WebRequestHandler, bind_and_activate=False)
return exec_unify(args[0], obj)
class WebRequestHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
pass
def do_HEAD(self):
self.server.func(self, self)
def do_GET(self):
self.server.func(self, self)
def do_POST(self):
self.server.func(self, self)
def do_DELETE(self):
self.server.func(self, self)
def do_PUT(self):
self.server.func(self, self)
###
# sys_http_server bind(S, T, C):
# The predicate succeeds. As a side effect the compiled handler C
# is registered as listener to event T from server S.
##
def test_sys_http_server_on(args):
obj = deref(exec_build(args[0]))
type = deref(exec_build(args[1]))
check_atom(type)
clause = deref(exec_build(args[2]))
check_clause(clause)
buf = Context()
buf.engine.text_output = get_engine().text_output
buf.engine.text_error = get_engine().text_error
buf.engine.text_input = get_engine().text_input
loop = asyncio.get_running_loop()
if type == "request":
obj.func = lambda req, res: baby_come_back(launch_async(clause, buf, [req, res]), loop)
return True
def baby_come_back(coro, loop):
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
###
# http_server_listen(S, P):
# The predicate succeeds. As a side effect the server S
# starts listening on port P.
##
def test_http_server_listen(args):
obj = deref(exec_build(args[0]))
port = deref(exec_build(args[1]))
check_integer(port)
obj.server_address = ("", port)
obj.server_bind()
obj.server_activate()
thread = threading.Thread(target=blocking_forever, args=(obj,))
thread.start()
return True
def blocking_forever(obj):
obj.serve_forever()
#######################################################################
# HTTP Request #
#######################################################################
###
# http_current_method(S, P):
# The predicate succeeds in M with the method of the HTTP request S.
##
def test_http_current_method(args):
obj = deref(exec_build(args[0]))
return exec_unify(args[1], obj.command)
###
# http_current_path(S, P):
# The predicate succeeds in P with the path of the HTTP request S.
##
def test_http_current_path(args):
obj = deref(exec_build(args[0]))
return exec_unify(args[1], obj.path)
###
# http_input_promise(S, R, Q):
# The predicate succeeds in Q with a promise for a new
# text reader R on HTTP request S.
###
def test_http_input_promise(args):
obj = deref(exec_build(args[0]))
stream = Source()
if not exec_unify(args[1], stream):
return False
buf = get_ctx()
prom = http_input_promise(buf, stream, obj)
return exec_unify(args[2], lambda: prom)
async def http_input_promise(buf, stream, obj):
try:
text = await asyncio.to_thread(blocking_input, obj)
stream.buf = text
except IOError as err:
register_signal(buf, map_stream_error(err))
def blocking_input(obj):
clen = int(obj.headers.get('content-length'))
return obj.rfile.read(clen).decode("utf-8")
#######################################################################
# HTTP Response #
#######################################################################
###
# sys_http_write_head(S, C, H):
# The predicate succeeds. As a side effect it writes the status
# code C and the headers map H to the HTTP response S.
##
def test_sys_http_write_head(args):
obj = deref(exec_build(args[0]))
code = deref(exec_build(args[1]))
check_integer(code)
headers = deref(exec_build(args[2]))
obj.send_response(code)
for (key, value) in headers.items():
obj.send_header(key, value)
obj.end_headers()
return True
###
# http_output_new(S, W):
# The predicate succeeds in W with a new text writer for the HTTP response S.
##
def test_http_output_new(args):
obj = deref(exec_build(args[0]))
obj = obj.wfile
dst = Sink()
dst.data = obj
dst.send = http_send
dst.notify = http_notify
return exec_unify(args[1], dst)
def http_send(res, buf):
res.write(buf.encode("utf-8"))
def http_notify(res):
res.flush()
#######################################################################
# HTTP Lib Init #
#######################################################################
def main():
add("os_open_promise_opts", 4, make_check(test_os_open_promise_opts))
add("os_open_sync_opts", 4, make_check(test_os_open_sync_opts))
add("http_server_new", 1, make_check(test_http_server_new))
add("sys_http_server_on", 3, make_check(test_sys_http_server_on))
add("http_server_listen", 2, make_check(test_http_server_listen))
add("http_current_method", 2, make_check(test_http_current_method))
add("http_current_path", 2, make_check(test_http_current_path))
add("http_input_promise", 3, make_check(test_http_input_promise))
add("sys_http_write_head", 3, make_check(test_sys_http_write_head))
add("http_output_new", 2, make_check(test_http_output_new))