usawa

Unnamed repository; edit this file 'description' to name the repository.
Info | Log | Files | Refs | Submodules | LICENSE

commit 7a8ee65dc5c2fddfa6a6ca7a0d4a919a9a9f61b2
parent fc2d125562a7315afc13043f2324886ae383f089
Author: lash <dev@holbrook.no>
Date:   Thu, 12 Feb 2026 14:22:15 +0000

Add documentation for services, longhelp for server executable

Diffstat:
Mdummy/doc/index.texi | 1+
Mdummy/tests/handler.py | 17++---------------
Mdummy/usawa/runnable/server.py | 36++++++++++++++++++++++++++++++------
Mdummy/usawa/service.py | 198+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
4 files changed, 211 insertions(+), 41 deletions(-)

diff --git a/dummy/doc/index.texi b/dummy/doc/index.texi @@ -29,5 +29,6 @@ Documentation released 2026 under CC-BY-SA @include store.texi @include resolver.texi @include assets.texi +@include server.texi @include internals.texi @include appendix.texi diff --git a/dummy/tests/handler.py b/dummy/tests/handler.py @@ -55,7 +55,7 @@ class TestHandler(unittest.TestCase): handler = create_handler() b = b'\x00\x00\x00' r = handler.scan(b) - self.assertEqual(r, 1) + self.assertEqual(r, -1) b = b'\x00' r = handler.scan(b) self.assertEqual(r, 0) @@ -70,20 +70,7 @@ class TestHandler(unittest.TestCase): b = b'\x00' r = handler.scan(b'') - self.assertEqual(r, 1) - with self.assertRaises(ValueError): - handler.scan(b'\x00') - - - def test_handler_twopass_pair(self): - handler = create_handler() - b = b'\x00\x00\x00\x00\x09\x00\x00' - r = handler.scan(b) - self.assertEqual(r, 0) - handler.harvest() - - r = handler.scan(b'') - self.assertEqual(r, 1) + self.assertEqual(r, -1) with self.assertRaises(ValueError): handler.scan(b'\x00') diff --git a/dummy/usawa/runnable/server.py b/dummy/usawa/runnable/server.py @@ -1,24 +1,41 @@ import logging import argparse +import signal #import threading from usawa import Ledger, Entry, EntryPart, DemoWallet, ACL, UnitIndex, load from usawa.context import Context -from usawa.service import Handler, SocketServer +from usawa.service import Handler, UnixServer from whee.valkey import ValkeyStore logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() - def parse(v): logg.debug('parsingĀ {}'.format(v.hex())) def main(): - argp = argparse.ArgumentParser() + argp = argparse.ArgumentParser( + prog='usawa socket server', + formatter_class=argparse.RawDescriptionHelpFormatter, + description="""Provides a store agnostic middleware layer served on a socket + +The server pairs with the usawa.service.UnixClient implementation, which can be used as the store argument for the usawa.store.LedgerStore. + +It uses valkey as backend for the store, and needs a valkey service to connect to. + +Loading ledger state is available from XML only. + +Public keys used to verify entries and ledger states are provided using the -k flag. The server will not allow unverified items. +""", + epilog="Currently only UNIX socket are supported", + ) argp.add_argument('-k', action='append', type=str, default=[], help='Add public key to list of valid signers') + argp.add_argument('-u', type=str, default='./usawa.socket', help='UNIX Socket file to listen on') + argp.add_argument('--valkey-host', dest='valkey_host', type=str, default='localhost', help='Valkey host') + argp.add_argument('--valkey-port', dest='valkey_port', type=int, default=6379, help='Valkey port') argp.add_argument('ledger_xml_file', type=str, help='load ledger metadata from XML file') arg = argp.parse_args() @@ -31,10 +48,17 @@ def main(): uidx = UnitIndex.from_tree(ledger_tree) ledger = Ledger.from_tree(ledger_tree, acl=acl) - db = ValkeyStore('') - srv = TCPServer(db, ledger, acl=acl) - #srv.start() + db = ValkeyStore('', host=arg.valkey_host, port=arg.valkey_port) + #srv = TCPServer(db, ledger, acl=acl) + srv = UnixServer(db, ledger, acl=acl, path=arg.u) + + def stop_server(sig, stack): + srv.stop() + signal.signal(signal.SIGINT, stop_server) + signal.signal(signal.SIGTERM, stop_server) + srv.start() + logg.debug('stopping') if __name__ == '__main__': main() diff --git a/dummy/usawa/service.py b/dummy/usawa/service.py @@ -1,5 +1,6 @@ import logging import socket +import os from whee import Interface from usawa.store import LedgerStore @@ -9,13 +10,14 @@ logg = logging.getLogger('handler') READ_SIZE = 2048 LISTEN_COUNT = 5 - +#CMD_LEN = 1 +#LEN_LEN = 3 class Handler: + """Buffers and parses instruction data from a client, and executes code corresponding to the command given. - cmd_len = 1 - len_len = 3 - + The instance may be reused. + """ def __init__(self): self.cmd = None self.buf = b'' @@ -26,31 +28,56 @@ class Handler: self.h = {} + """Register execution code for a command byte. + """ def register(self, k, fn): self.h[k] = fn + """Single processing pass on buffer to read and execute an instruction. + + The process consists of four parts: + + 1. Initialize the handler and read the command byte. + 2. Read the content length + 3. Read the contents + 4. Execute code on the contents as specified by the command byte. + + Once an instruction is successfully executed, the consecutive call will start over from the first step. + + :raises BufferError: Initialization attempted on an empty buffer. + :raises ValueError: Code for command byte does not exist + :return: -1 if further parsing is required, 0 for successful execution, any other positive value indicates an error. + :rtype: int + """ def scan(self, v): + if self.state == 4: + self.state = 0 self.buf += v if self.state == 0: - r = self.handle_cmd() - if r > 0: - return r - if self.handle_len() > 0: + self.handle_cmd() + r = self.handle_len() + if r > 0: return -1 - if self.handle_collect() > 0: + r = self.handle_collect() + if r > 0: return -1 - return self.handle() + return self.handle_exec() + """Initialize the handler for parsing a new instruction. + + :raises BufferError: If buffer is empty. + """ def handle_cmd(self): + if len(self.buf) == 0: + raise BufferError('empty buffer') self.cmd = self.buf[0] self.c = 1 self.l = -1 self.state = 1 self.r = None self.v = None - return self.handle_len() def __remainder(self): @@ -58,9 +85,20 @@ class Handler: return (len(v), v,) + """Parse the length of the command contents. + + If it returns 0, there are no bytes remaining to read and the contents can be parsed with the handle_collect() method. + + If it returns a positive value, handle_collect() should be called again once more data is available to complete the parsing. + + :return: 0 if complete, -1 if not in correct state, or bytes remaining to read. + :rtype: int + """ def handle_len(self): - if self.state != 1: + if self.state == 0: return -1 + if self.state != 1: + return 0 (l, v) = self.__remainder() if l == 0: return 0 @@ -73,6 +111,15 @@ class Handler: return 0 + """Read the instruction up to the full indicated length. + + If it returns 0, there are no bytes remaining to read and the code can be executed with the handle() method. + + If it returns a positive value, handle_collect() should be called again once more data is available to complete the parsing. + + :return: 0 if complete, -1 if not in correct state, or bytes remaining to read. + :rtype: int + """ def handle_collect(self): if self.state != 2: return -1 @@ -89,10 +136,21 @@ class Handler: return l - c - def handle(self): + """Execute the code corresponding to the command for a fully parsed instruction. + + The code for the command byte has to have been registered using the register() method. + + After successful execution, the result of the operation can be read using the harvest() method. + + :raises ValueError: No code registered for the command + :return: 0 on successful execution, any other value indicates error. + :rtype: int + """ + def handle_exec(self): self.state = 3 fn = self.h.get(self.cmd) if fn == None: + self.state = 4 raise ValueError() logg.debug('handling cmd {} arg 0x{} rest buffer 0x{}'.format(self.cmd, self.r.hex(), self.buf.hex())) r = fn(self.r) @@ -103,13 +161,38 @@ class Handler: return 0 + """Retrieve the completed result of the operation + """ def harvest(self): + if self.state < 3: + raise AttributeError('processing not complete') + if self.state == 4: + raise AttributeError('processing failed, no data available.') self.state = 0 return self.v class SocketServer: + """Store agnostic middleware for remote connections. + + Each instruction to the server is prefixed by a single-byte command identifier and a 3-byte big-endian length value specifying the total length of the contents to be sent to the backend. + + The get command has the byte value 0x00. The content is the literal key to retrieve from the store (e.g. the value of usawa.store.pfx_entry()) + + The put command has the byte value 0x01. The content is a key and value pair, each prefixed by their own 3-byte big-endian length value, specifying the total length of the key and value respectively. + + The server returns a result code and optionally a payload. The return value consists of a single byte result code, and a 3-byte big-endian length value specifying the total length of the return value. A length value of 0x000000 means the command returns and empty payload. A return value of 0x00 indicates success, any other value indicates failure. + The server wraps usawa.Handler, which handles parsing and buffering the client submissions, and sending to the right handler - put or get. + + :param db: The underlying store to provide remote access to. + :type db: whee.Interface + :param ledger: The ledger the server operates on. + :type ledger: usawa.Ledger + :param acl: ACL to use for verifications. + :type acl: usawa.ACL + :todo: Ensure ACL overrides existing ACL in ledger. + """ def __init__(self, db, ledger, acl=None): self.store = LedgerStore(db, ledger) self.acl = acl @@ -117,6 +200,10 @@ class SocketServer: self.running = True + """Shut down the socket and execution loop. + + This function is noop if called more than once. + """ def stop(self): if self.scks != None: self.scks.shutdown(socket.SHUT_RD) @@ -125,7 +212,12 @@ class SocketServer: self.running = False + """Start server listening loop. + + This method does not return until the server is stopped. + """ def start(self): + logg.info('starting server: ' + str(self)) self.scks.listen(LISTEN_COUNT) while self.running: logg.debug('waiting for connection') @@ -153,7 +245,11 @@ class SocketServer: sckc.close() break + + """Implements whee.Interface + Splits the content of the command to its individual key and value parts. + """ def put(self, b): l = int.from_bytes(b[:3], byteorder='big') k = b[3:l+3] @@ -169,6 +265,12 @@ class SocketServer: return b'\x00' + """Handles a client session. + + Does not return + + + """ def receive(self, sckc, address): c = 0 data = bytearray() @@ -193,43 +295,87 @@ class SocketServer: sckc.sendall(v) + def __str__(self): + return str(self.__class__.__name__) + '@' + self.path + + + class UnixServer(SocketServer): - timeout = 1 + timeout = 1 # Timeout for socket listen loop. + + """Implements usawa.service.SocketServer + + See SocketServer for definintion of remaining parameters. + + :param path: Path to socket file to bind to. + :type host: str + """ def __init__(self, db, ledger, acl=None, path='./usawa.socket'): super(UnixServer, self).__init__(db, ledger, acl=acl) + self.path = os.path.realpath(path) self.scks = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.scks.bind(path) + self.scks.bind(self.path) self.scks.settimeout(UnixServer.timeout) + def __del__(self): + logg.debug("removing socket file " + self.path) + os.remove(self.path) + + + class TCPServer(SocketServer): - - timeout = 1 + timeout = 1 # Timeout for socket listen loop. + + """Implements usawa.service.SocketServer + + See SocketServer for definintion of remaining parameters. + + :param host: Host to IP address to bind server to. + :type host: str + :param port: Port to bind server to. + :type port: int + """ def __init__(self, db, ledger, acl=None, host='', port=32327): super(TCPServer, self).__init__(db, ledger, acl=acl) self.scks = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.scks.bind((host, port,)) self.scks.settimeout(UnixServer.timeout) + self.path = host + ':' + str(port) class SocketClient(Interface): + """Base class for clients to usawa.service.SocketServer. + It can be used as the implementation argument for the usawa.LedgerStore. + + The implementing class should complete the connection within the class initializer. + + :todo: Implement the remaining needed methods needed by all usawa.LedgerStore methods. + """ def __init__(self): self.sck = None + + def __del__(self): + self.close() + + """Close the underlying connection. + Is noop if called more than once. + """ def close(self): logg.debug('request client close') if self.sck != None: self.sck.shutdown(socket.SHUT_RDWR) self.sck.close() - - """ + + """Implements whee.Interface. :todo: optimize length for key and value """ @@ -262,6 +408,8 @@ class SocketClient(Interface): b += r + """Implements whee.Interface. + """ def get(self, k): b = b'\x00' l = len(k) @@ -289,16 +437,26 @@ class SocketClient(Interface): class UnixClient(SocketClient): + """Implements usawa.service.SocketClient. + :param path: Path to socket file + :type path: str + """ def __init__(self, path='./usawa.socket'): super(UnixClient, self).__init__() self.sck = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sck.connect(path) - class TCPClient: + """Implements usawa.service.TCPClient. + + :param host: Hostname or IP address to bind server to. + :type host: str + :param port: Port to bind server to. + :type port: int + """ def __init__(self, host, port=32327): self.scks = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((host, port,))