commit 49fd18bd69752957e1ac55c9f87b86af980ca55f
parent 696dda6f989f88917a2aa330a1f72829092a1213
Author: lash <dev@holbrook.no>
Date: Wed, 11 Feb 2026 16:50:18 +0000
Implement socket client proxy for store get and put
Diffstat:
4 files changed, 139 insertions(+), 25 deletions(-)
diff --git a/dummy/tests/handler.py b/dummy/tests/handler.py
@@ -18,7 +18,7 @@ testdir = os.path.realpath(os.path.dirname(__file__))
def zero_handler(v):
logg.debug('zero handler arg 0x{}'.format(v.hex()))
- return 0
+ return b'\x00'
def create_handler():
diff --git a/dummy/tests/server.py b/dummy/tests/server.py
@@ -13,8 +13,8 @@ import lxml.etree
from whee.mem import MemStore
from usawa import DemoWallet, Ledger, Entry, EntryPart, ACL, UnitIndex
-from usawa.store import LedgerStore
-from usawa.service import Handler, UnixServer
+from usawa.store import LedgerStore, pfx_entry
+from usawa.service import Handler, UnixServer, UnixClient
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
@@ -24,7 +24,7 @@ testdir = os.path.realpath(os.path.dirname(__file__))
def zero_handler(v):
logg.debug('zero handler arg 0x{}'.format(v.hex()))
- return 0
+ return b'\x00'
def create_handler():
@@ -38,11 +38,12 @@ class TestSocket(unittest.TestCase):
def setUp(self):
self.uidx = UnitIndex('FOO')
self.uidx.add('USD')
- self.store = MemStore()
+ self.db = MemStore()
self.wallet = DemoWallet()
self.acl = ACL.from_wallet(self.wallet)
- self.ledger = Ledger(self.uidx, acl=self.acl)
+ self.ledger = Ledger(self.uidx, acl=self.acl, wallet=self.wallet)
self.workdir = tempfile.mkdtemp()
+ self.store = LedgerStore(self.db, self.ledger)
def tearDown(self):
@@ -59,6 +60,33 @@ class TestSocket(unittest.TestCase):
srv = UnixServer(self.store, self.ledger, path=srv_path)
th = threading.Thread(target=self.serve, args=(srv,))
th.start()
+ client = UnixClient(path=srv_path)
+ #client.get(b'\x00\x01\x02')
+ client.close()
+ srv.stop()
+ th.join()
+
+
+ def test_socket_entry(self):
+ s = 'FOO'
+ x = EntryPart(s, 'income', 'foo', 1337, debit=True)
+ y = EntryPart(s, 'asset', 'foo', 1337)
+ entry = Entry(self.ledger.peek(), datetime.datetime.now(), parent=self.ledger.current())
+ entry.add_part(x, debit=True)
+ entry.add_part(y)
+ entry.sign(self.wallet)
+ self.store.add_entry(entry, update_ledger=True)
+
+ s = str(uuid.uuid4())
+ srv_path = os.path.join(self.workdir, s)
+ srv = UnixServer(self.db, self.ledger, path=srv_path)
+ th = threading.Thread(target=self.serve, args=(srv,))
+ th.start()
+ client = UnixClient(path=srv_path)
+
+ b = pfx_entry(self.ledger, entry)
+ client.get(b)
+ client.close()
srv.stop()
th.join()
diff --git a/dummy/usawa/error.py b/dummy/usawa/error.py
@@ -8,3 +8,7 @@ class VerifyError(Exception):
class ValidateError(Exception):
pass
+
+
+class SocketError(Exception):
+ pass
diff --git a/dummy/usawa/service.py b/dummy/usawa/service.py
@@ -1,7 +1,9 @@
import logging
import socket
+from whee import Interface
from usawa.store import LedgerStore
+from usawa.error import SocketError
logg = logging.getLogger('handler')
@@ -35,9 +37,9 @@ class Handler:
if r > 0:
return r
if self.handle_len() > 0:
- return False
+ return -1
if self.handle_collect() > 0:
- return False
+ return -1
return self.handle()
@@ -47,6 +49,7 @@ class Handler:
self.l = -1
self.state = 1
self.r = None
+ self.v = None
return self.handle_len()
@@ -89,12 +92,22 @@ class Handler:
def handle(self):
- self.state = 0
+ self.state = 3
fn = self.h.get(self.cmd)
if fn == None:
raise ValueError()
logg.debug('handling cmd {} arg 0x{} rest buffer 0x{}'.format(self.cmd, self.r.hex(), self.buf.hex()))
- return fn(self.r)
+ r = fn(self.r)
+ self.v = b'\x00'
+ l = len(r)
+ self.v += l.to_bytes(3, byteorder='big')
+ self.v += r
+ return 0
+
+
+ def harvest(self):
+ self.state = 0
+ return self.v
class SocketServer:
@@ -125,51 +138,120 @@ class SocketServer:
break
try:
(sckc, address) = self.scks.accept()
+ except TimeoutError:
+ logg.debug('timeout')
+ continue
except OSError:
logg.warning('Socket accept aborted. Bailing.')
break
- logg.info('connect: {}'.format(address))
- #th = threading.Thread(target=self.receive, args=(sckc, address))
- #th.start()
- self.receive(sckc, address)
+ if sckc != None:
+ logg.info('connect: {}'.format(address))
+ #th = threading.Thread(target=self.receive, args=(sckc, address))
+ #th.start()
+ try:
+ self.receive(sckc, address)
+ except ConnectionResetError:
+ logg.warning('connection reset')
+ sckc.close()
+ break
- def default_handler(self, v):
- return 0
-
-
def receive(self, sckc, address):
c = 0
data = bytearray()
handler = Handler()
- handler.register(0, self.default_handler)
+ handler.register(0, self.store.get)
+ handler.register(1, self.store.put)
while True:
+ r = -1
b = sckc.recv(READ_SIZE)
if len(b) == 0:
logg.info('connection broken: {}'.format(address))
break
- for v in b:
- if v == 0x0a:
- logg.debug('command boundary reached')
- parse(bytes(data))
- data.append(v)
- logg.debug('read {}: {}'.format(len(b), b.hex()))
+ try:
+ r = handler.scan(b)
+ except Exception as e:
+ logg.warning('socket cmd fail: ' + str(type(e)))
+ if r == -1:
+ sckc.sendall(b'\x02')
+ break
+ v = handler.harvest()
+ sckc.sendall(v)
class UnixServer(SocketServer):
+ timeout = 1
+
def __init__(self, db, ledger, acl=None, path='./usawa.socket'):
super(UnixServer, self).__init__(db, ledger, acl=acl)
self.scks = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.scks.bind(path)
+ self.scks.settimeout(UnixServer.timeout)
class TCPServer(SocketServer):
+ timeout = 1
+
def __init__(self, db, ledger, acl=None, host='', port=32327):
super(TCPServer, self).__init__(db, ledger, acl=acl)
self.scks = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.scks.bind((host, port,))
+ self.scks.settimeout(UnixServer.timeout)
+
+
+
+class SocketClient(Interface):
+
+ def __init__(self):
+ self.sck = None
+
+
+ def close(self):
+ logg.debug('request client close')
+ if self.sck != None:
+ self.sck.shutdown(socket.SHUT_RDWR)
+ self.sck.close()
+
+
+class UnixClient(SocketClient):
+
+ def __init__(self, path='./usawa.socket'):
+ super(UnixClient, self).__init__()
+ self.sck = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.sck.connect(path)
+
+
+ """
+
+ :todo: optimize length for key and value
+ """
+ def put(self, k, v):
+ b = b'\x00'
+ l = len(k) + len(v)
+ b += l.to_bytes(3, byteorder='big')
+ l = len(k)
+ b += l.to_bytes(3, byteorder='big')
+ b += k
+ l = len(v)
+ b += l.to_bytes(3, byteorder='big')
+ b += v
+ self.sck.sendall(b)
+ r = self.sck.recv(1)
+ if r != b'\x00':
+ raise SocketError()
+
+
+ def get(self, k):
+ b = b'\x00'
+ l = len(k)
+ b += l.to_bytes(3, byteorder='big')
+ b += k
+ self.sck.sendall(b)
+ r = self.sck.recv(1)
+ if r != b'\x00':
+ raise SocketError()
class TCPClient: