commit 7d689a76bb8b7b47f62baa7c439165ff8cddd62f
parent 0efb8c95fc75c0ebd44286ae9aac9f573a485fab
Author: Carlosokumu <carlosokumu254@gmail.com>
Date: Mon, 9 Mar 2026 12:31:49 +0300
break down entry_export function
Diffstat:
1 file changed, 19 insertions(+), 146 deletions(-)
diff --git a/dummy/usawa/storage/ledger_repository.py b/dummy/usawa/storage/ledger_repository.py
@@ -2,6 +2,14 @@ import hashlib
import logging
from typing import List
import hexathon
+from usawa.storage.xml_utils import (
+ _build_export_root,
+ _build_incoming_element,
+ _find_child,
+ _find_entry_by_serial,
+ _write_xml_to_file,
+ resolve_namespace,
+)
from usawa.asset import Asset
from usawa.crypto import ACL, DemoWallet
from usawa.error import VerifyError
@@ -79,6 +87,7 @@ class LedgerRepository:
if write:
logg.info("init store for write")
self.store = LedgerStore(self.valkey_store, ledger)
+
if self._wallet is None:
pk = self.store.get_key()
if pk is None:
@@ -87,6 +96,7 @@ class LedgerRepository:
else:
logg.info("init store for read")
self.store = LedgerStore(self.valkey_store, ledger)
+
if self._wallet is None:
try:
pk = self.store.get_key()
@@ -96,7 +106,9 @@ class LedgerRepository:
)
except FileNotFoundError:
- logg.warning("No private key found in store, generating new wallet")
+ logg.warning(
+ "No private key found in store, initializing a a default one"
+ )
privkey = bytes.fromhex(self.cfg.get("SIGS_DEFAULT_PRIVATE_KEY"))
self._wallet = DemoWallet(privatekey=privkey)
@@ -227,14 +239,6 @@ class LedgerRepository:
}
def export_all_entries_to_xml(self, output_path: str) -> tuple[bool, str]:
- """
- Export all ledger entries to XML file
-
- :param output_path: Path where XML file will be saved
- :type output_path: str
- :return: (success, error_message) tuple
- :rtype: tuple[bool, str]
- """
try:
ledger = self.store.ledger
@@ -268,158 +272,27 @@ class LedgerRepository:
return False, error_msg
def export_entry_to_xml(self, serial: int, output_path: str) -> tuple[bool, str]:
- """
- Export a single ledger entry to an XML file
- """
try:
- logg.debug(f"Requested export for entry serial: {serial}")
storage_entry = self.store.ledger.entries.get(serial)
if not storage_entry:
return False, f"Entry #{serial} not found"
xml_tree = self.store.ledger.to_tree()
- ns_uri = xml_tree.nsmap.get(None)
- if ns_uri is None:
- if "}" in xml_tree.tag:
- ns_uri = xml_tree.tag.split("}")[0].strip("{")
- else:
- # Final fallback
- ns_uri = "http://usawa.defalsify.org/"
-
- logg.debug(f"Using namespace: {ns_uri}")
- logg.debug(f"Root tag: {xml_tree.tag}")
- logg.debug(f"Namespace map: {xml_tree.nsmap}")
-
- all_entries = xml_tree.findall(".//{%s}entry" % ns_uri)
- logg.debug(f"Total entries found in XML: {len(all_entries)}")
-
- target_entry = None
-
- for entry_elem in all_entries:
- data_elem = _find_child(entry_elem, "data")
-
- if data_elem is None:
- logg.warning(f"Entry without <data> element")
- continue
-
- serial_elem = _find_child(data_elem, "serial")
-
- if serial_elem is None:
- logg.warning("Entry found with no serial element")
- continue
-
- entry_serial = int(serial_elem.text)
- logg.debug(f"Inspecting entry serial: {entry_serial}")
-
- if entry_serial == serial:
- target_entry = entry_elem
- logg.debug(f"Target entry found: {serial}")
- break
-
+ ns_uri = resolve_namespace(xml_tree)
+ target_entry = _find_entry_by_serial(xml_tree, ns_uri, serial)
if target_entry is None:
return False, f"Entry #{serial} not found in XML"
- root = ET.Element("{%s}ledger" % ns_uri, nsmap={None: ns_uri})
- root.set("version", xml_tree.get("version"))
-
- for tag in ["topic", "generated", "src", "units", "identity"]:
- elem = _find_child(xml_tree, tag)
- if elem is not None:
- logg.debug(f"Copying {tag} element")
- root.append(deepcopy(elem))
-
- data_elem = _find_child(target_entry, "data")
-
- if data_elem is None:
- return False, "Target entry has no <data> element"
-
- debit_elem = _find_child(data_elem, "debit")
- credit_elem = _find_child(data_elem, "credit")
-
- debit_val = 0
- credit_val = 0
-
- if debit_elem is not None:
- amount_elem = _find_child(debit_elem, "amount")
- if amount_elem is not None:
- debit_val = int(amount_elem.text)
-
- if credit_elem is not None:
- amount_elem = _find_child(credit_elem, "amount")
- if amount_elem is not None:
- credit_val = int(amount_elem.text)
-
- expense = -abs(debit_val)
- asset = credit_val
-
- incoming = ET.Element("{%s}incoming" % ns_uri)
- incoming.set("serial", "0")
-
- real = ET.SubElement(incoming, "{%s}real" % ns_uri)
- real.set("unit", "BTC")
-
- ET.SubElement(real, "{%s}income" % ns_uri).text = "0"
- ET.SubElement(real, "{%s}expense" % ns_uri).text = str(expense)
- ET.SubElement(real, "{%s}asset" % ns_uri).text = str(asset)
- ET.SubElement(real, "{%s}liability" % ns_uri).text = "0"
-
- orig_incoming = _find_child(xml_tree, "incoming")
- if orig_incoming is not None:
- digest = _find_child(orig_incoming, "digest")
- if digest is not None:
- incoming.append(deepcopy(digest))
-
- sig = _find_child(orig_incoming, "sig")
- if sig is not None:
- incoming.append(deepcopy(sig))
-
- root.append(incoming)
-
- logg.info(f"Appending entry #{serial} to new ledger")
- root.append(deepcopy(target_entry))
-
- final_entries = root.findall(".//{%s}entry" % ns_uri)
- logg.info(f"Final XML contains {len(final_entries)} entry(ies)")
-
- output_file = Path(output_path)
- output_file.parent.mkdir(parents=True, exist_ok=True)
-
- xml_string = ET.tostring(
- root,
- encoding="utf-8",
- xml_declaration=True,
- pretty_print=True,
- )
-
- with open(output_file, "wb") as f:
- f.write(xml_string)
+ incoming = _build_incoming_element(ns_uri, target_entry, xml_tree)
+ root = _build_export_root(xml_tree, ns_uri, target_entry, incoming)
+ _write_xml_to_file(root, output_path)
logg.info(f"Successfully exported entry #{serial} -> {output_path}")
-
return True, ""
- except PermissionError as e:
- logg.debug(f"Permission error while exporting entry: {e}")
+ except PermissionError:
return False, "Permission denied"
-
except IOError as e:
- logg.debug(f"I/O error: {e}")
return False, str(e)
-
except Exception as e:
- logg.debug("Unexpected error during entry export")
return False, str(e)
-
-
-def _get_local_name(element):
- """Extract local name from element tag (without namespace)"""
- tag = element.tag
- return tag.split("}")[-1] if "}" in tag else tag
-
-
-def _find_child(parent, local_name):
- """Find child element by local name"""
- for child in parent:
- if _get_local_name(child) == local_name:
- return child
- return None