usawa

Signed, immutable accounting.
Info | Log | Files | Refs | Submodules | LICENSE

commit 0abe35d8af9ee6a73fa49a2cbeba6f522fcf58bb
parent b0b965baafaf1268f70be67e8a3958fc64a92669
Author: Carlosokumu <carlosokumu254@gmail.com>
Date:   Thu, 19 Feb 2026 19:32:37 +0300

initialize storage module

Diffstat:
Adummy/usawa/storage/__init__.py | 0
Adummy/usawa/storage/entry_mapper.py | 152+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/storage/ledger_repository.py | 97+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 249 insertions(+), 0 deletions(-)

diff --git a/dummy/usawa/storage/__init__.py b/dummy/usawa/storage/__init__.py diff --git a/dummy/usawa/storage/entry_mapper.py b/dummy/usawa/storage/entry_mapper.py @@ -0,0 +1,151 @@ +import logging +from datetime import datetime, date + +from usawa.asset import Asset +from usawa.entry import Entry, EntryPart +from usawa.unit import UnitIndex +from ..core.models import LedgerEntry +from usawa import Entry, EntryPart + +logg = logging.getLogger("storage.entry_mapper") + + +class EntryMapper: + """Maps between domain model (LedgerEntry) and storage model (Entry)""" + + @staticmethod + def to_entry(domain: LedgerEntry, ledger, unitindex=None): + """ + Convert LedgerEntry (domain) to Entry (storage) + + :param domain: Domain model entry + :type domain: LedgerEntry + :param ledger: The ledger object (for parent digest) + :type ledger: usawa.Ledger + :param unitindex: UnitIndex for validation + :type unitindex: UnitIndex + :return: Storage model entry + :rtype: Entry + """ + + + if domain.tx_date is None: + raise ValueError("Transaction date is required for storage") + + + tx_date = domain.tx_date + if isinstance(tx_date, date) and not isinstance(tx_date, datetime): + tx_date = datetime.combine(tx_date, datetime.min.time()) + + + parent = ledger.current() if ledger else None + ref = domain.transaction_ref if domain.transaction_ref else None + + entry = Entry( + serial= 1, + tx_date=tx_date, + parent=parent, + description=domain.description or "", + ref=ref, + unitindex = UnitIndex('BTC') + ) + + source_amount = domain.amount + source_part = EntryPart( + "BTC", + domain.source_type.lower(), + domain.source_path, + source_amount, + debit=True + ) + entry.add_part(source_part, debit=True) + + + dest_amount = -domain.amount + dest_part = EntryPart( + "BTC", + domain.dest_type.lower(), + domain.dest_path, + dest_amount, + debit=False + ) + entry.add_part(dest_part, debit=False) + + logg.debug(f"Mapped entry, serial: {entry.serial} parent: {entry.parent}") + return entry + + @staticmethod + def to_domain_entry(storage_entry) -> LedgerEntry: + """ + Convert Entry (storage) to LedgerEntry (domain) + + :param storage_entry: Storage model entry + :type storage_entry: Entry + :return: Domain model entry + :rtype: LedgerEntry + """ + + # Extract debit (source) information + source_unit = "" + source_type = "" + source_path = "" + amount = 0.0 + + if storage_entry.debit: + debit_part = storage_entry.debit[0] + source_unit = debit_part.unit + source_type = debit_part.category + source_path = debit_part.account + amount = abs(float(debit_part.value)) + + # Extract credit (destination) information + dest_unit = "" + dest_type = "" + dest_path = "" + + if storage_entry.credit: + credit_part = storage_entry.credit[0] + dest_unit = credit_part.unit + dest_type = credit_part.category + dest_path = credit_part.account + + # Convert parent digest to hex string if bytes + parent_digest = None + if storage_entry.parent: + if isinstance(storage_entry.parent, bytes): + parent_digest = storage_entry.parent.hex() + else: + parent_digest = str(storage_entry.parent) + + tx_date = storage_entry.dt + date_registered = storage_entry.dtreg + + + transaction_ref = str(storage_entry.ref) if storage_entry.ref else None + + + external_ref = None + description = storage_entry.description + + + domain = LedgerEntry( + external_reference=external_ref, + description=description, + amount=amount, + source_unit=source_unit, + source_type=source_type, + source_path=source_path, + dest_unit=dest_unit, + dest_type=dest_type, + dest_path=dest_path, + attachments=storage_entry.attachment.copy() if storage_entry.attachment else [], + serial=storage_entry.serial, + tx_date=tx_date, + date_registered=date_registered, + transaction_ref=transaction_ref, + parent_digest=parent_digest, + unit_index=storage_entry.uidx + ) + + logg.debug(f"Mapped storage entry #{storage_entry.serial} to domain LedgerEntry") + return domain +\ No newline at end of file diff --git a/dummy/usawa/storage/ledger_repository.py b/dummy/usawa/storage/ledger_repository.py @@ -0,0 +1,96 @@ +import logging +from typing import Optional, List +from usawa.service import UnixClient +from usawa.store import pfx_entry +from ..core.models import LedgerEntry +from .entry_mapper import EntryMapper + +logg = logging.getLogger("storage.ledger_repository") + + +class LedgerRepository: + """Repository that wraps LedgerStore and handles mapping""" + + def __init__(self, ledger_store,unitindex=None, unix_client: UnixClient = None, wallet=None): + """ + :param ledger_store: Instance of LedgerStore + :type ledger_store: usawa.LedgerStore + """ + self.store = ledger_store + self.unitindex = unitindex + self.client = unix_client + self.wallet = wallet + self.mapper = EntryMapper() + + def save(self, domain_entry: LedgerEntry) -> bool: + """Save a domain entry to storage""" + try: + entry = self.mapper.to_entry(domain_entry, ledger=self.store.ledger) + logg.debug(f"Mapped entry - Serial: {entry.serial}, Parent: {entry.parent.hex()}") + entry.sign(self.wallet) + logg.debug("Ledger current before add_entry: %s", self._get_parent_digest().hex()) + self.store.add_entry(entry, update_ledger=True) + # self.store.ledger.sign() + # self.store.load(acl=self.store.ledger.acl) + logg.debug("Parent digest after add_entry %s", self._get_parent_digest().hex()) + + return True + + except Exception: + logg.exception("Failed to save entry") + return False + + + + def get_by_serial(self, serial: int) -> Optional[LedgerEntry]: + """Get entry by serial number""" + try: + storage_entry = self.store.get_entry(serial) + domain_entry = self.mapper.to_domain_entry(storage_entry) + + return domain_entry + + except FileNotFoundError: + logg.warning(f"Entry #{serial} not found") + return None + except Exception as e: + logg.error(f"Failed to retrieve entry #{serial}: {e}") + return None + + + def get_next_serial(self) -> int: + """ + Get the next serial number for a new entry + + This calls ledger.next_serial() which: + 1. Increments ledger.serial + 2. Returns the new serial + + :return: Next serial number + :rtype: int + """ + next_serial = self.store.ledger.peek() + logg.debug(f"Next serial from ledger.next_serial(): {next_serial}") + return next_serial + + def _get_parent_digest(self) -> str: + """Get digest of previous ledger state""" + return self.store.ledger.current() + + def get_all(self) -> List[LedgerEntry]: + """Get all entries""" + try: + self.store.load() + domain_entries = [] + for storage_entry in self.store.ledger.entries: + domain_entry = self.mapper.to_domain_entry(storage_entry) + domain_entries.append(domain_entry) + return domain_entries + + except Exception as e: + logg.error(f"Failed to retrieve entries: {e}") + return [] + + def get_max_serial(self) -> int: + """Get highest serial number""" + return self.store.ledger.serial +\ No newline at end of file