usawa

Signed, immutable accounting.
Info | Log | Files | Refs | Submodules | LICENSE

commit 9e183498ea05a0327e62d8cf8dc93bce9ff22d20
parent 83e5c191a0e0324109d8e10cd54cc9a367eadc98
Author: lash <dev@holbrook.no>
Date:   Tue, 10 Mar 2026 07:05:09 -0600

Merge branch 'carlos/export_menu' into lash/account-list

Diffstat:
M.gitignore | 2++
Mdummy/requirements.txt | 1+
Mdummy/setup.cfg | 1+
Adummy/usawa/config.py | 50++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/core/__init__.py | 0
Adummy/usawa/core/chain_manager.py | 158+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/core/entry_service.py | 67+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/core/models.py | 95+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mdummy/usawa/data/usawa.ini | 8++++++++
Adummy/usawa/gui/__init__.py | 42++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/gui/controllers/entry_controller.py | 73+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/gui/main_window.py | 130+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/gui/models/__init__.py | 0
Adummy/usawa/gui/models/entry_item.py | 64++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/gui/views/create_entry_view.py | 531+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/gui/views/entry_details_view.py | 570+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/gui/views/entry_list_view.py | 642+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/runnable/gui.py | 11+++++++++++
Adummy/usawa/storage/__init__.py | 0
Adummy/usawa/storage/entry_mapper.py | 142+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/storage/ledger_repository.py | 298+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adummy/usawa/storage/xml_utils.py | 204+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
22 files changed, 3089 insertions(+), 0 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -3,3 +3,5 @@ __pycache__ *.egg-info .state _build +.venv +/dummy/build diff --git a/dummy/requirements.txt b/dummy/requirements.txt @@ -7,3 +7,4 @@ python-gnupg==0.4.9 rencode==1.0.8 wheepy[valkey]~=0.0.3 filemagic~=1.6 +PyGObject~=3.52.3 diff --git a/dummy/setup.cfg b/dummy/setup.cfg @@ -31,3 +31,4 @@ packages = [options.entry_points] console_scripts = usawad = usawa.runnable.server:main + usawa = usawa.runnable.gui:main diff --git a/dummy/usawa/config.py b/dummy/usawa/config.py @@ -0,0 +1,50 @@ +import os +import confini +from xdg.BaseDirectory import save_data_path +import logging + +__datadir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data") +logger = logging.getLogger(__name__) + + +def load(): + cfg = confini.Config(__datadir) + cfg.process() + + if "VALKEY_HOST" in cfg.store: + valkey_host = cfg.get("VALKEY_HOST") + else: + valkey_host = save_data_path("usawa") + cfg.add(valkey_host, "VALKEY_HOST") + + if "VALKEY_PORT" in cfg.store: + valkey_port = cfg.get("VALKEY_PORT") + else: + valkey_port = save_data_path("usawa") + cfg.add(valkey_port, "VALKEY_PORT") + + if "SERVER_SOCKET_FILE_PATH" in cfg.store: + socket_file_path = cfg.get("SERVER_SOCKET_FILE_PATH") + else: + socket_file_path = save_data_path("usawa") + cfg.add(socket_file_path, "SERVER_SOCKET_FILE_PATH") + + if "SIGS_DEFAULT_PUBLIC_KEY" in cfg.store: + default_public_key = cfg.get("SIGS_DEFAULT_PUBLIC_KEY") + else: + default_public_key = save_data_path("usawa") + cfg.add(default_public_key, "SIGS_DEFAULT_PUBLIC_KEY") + + if "SIGS_DEFAULT_PRIVATE_KEY" in cfg.store: + default_private_key = cfg.get("SIGS_DEFAULT_PRIVATE_KEY") + else: + default_private_key = save_data_path("usawa") + cfg.add(default_private_key, "SIGS_DEFAULT_PRIVATE_KEY") + + if "FS_RESOLVER_STORE_PATH" in cfg.store: + fs_resolver_store_path = cfg.get("FS_RESOLVER_STORE_PATH") + else: + fs_resolver_store_path = save_data_path("usawa") + cfg.add(fs_resolver_store_path, "FS_RESOLVER_STORE_PATH") + + return cfg diff --git a/dummy/usawa/core/__init__.py b/dummy/usawa/core/__init__.py diff --git a/dummy/usawa/core/chain_manager.py b/dummy/usawa/core/chain_manager.py @@ -0,0 +1,157 @@ +import os +import logging +from usawa import Ledger, load +from usawa.crypto import ACL, DemoWallet +from usawa.store import LedgerStore +from whee.valkey import ValkeyStore +import os +import shutil +import logging + +from usawa import Ledger, load + +logg = logging.getLogger("core.chain_manager") + + +def default_chain_dir(): + """Return the conventional XDG data directory for usawa ledger files. + + Defaults to ~/.local/share/usawa/ledger/ unless XDG_DATA_HOME is set. + Directory is created if it does not exist. + """ + xdg_data = os.environ.get( + 'XDG_DATA_HOME', + os.path.join(os.path.expanduser('~'), '.local', 'share'), + ) + path = os.path.join(xdg_data, 'usawa', 'ledger') + os.makedirs(path, exist_ok=True) + return path + + +class LedgerChainManager: + + def __init__(self, genesis_path=None): + """Initialise the chain manager. + + On first ever launch, supply genesis_path so it can be copied into + the conventional directory as 0.xml. On subsequent launches, omit + genesis_path — the manager will reconstruct the chain from disk. + + :param genesis_path: Path to the genesis XML file (optional after first run) + :type genesis_path: str or None + """ + self.basedir = default_chain_dir() + logg.debug('chain dir: {}'.format(self.basedir)) + + zero = os.path.join(self.basedir, '0.xml') + if not os.path.exists(zero): + if genesis_path is None: + raise FileNotFoundError( + 'no chain found in {} and no genesis_path provided'.format(self.basedir) + ) + genesis_path = os.path.realpath(genesis_path) + if not os.path.exists(genesis_path): + raise FileNotFoundError('genesis file not found: {}'.format(genesis_path)) + shutil.copy(genesis_path, zero) + logg.debug('copied genesis {} → {}'.format(genesis_path, zero)) + + self.chain = self._reconstruct_chain() + logg.debug('chain reconstructed with depth {}'.format(self.depth())) + + + def _reconstruct_chain(self): + """Rebuild the chain list by scanning sequential xml files on disk. + + Starts at 0.xml and stops at the first missing index. + """ + chain = [] + index = 0 + while True: + path = os.path.join(self.basedir, '{}.xml'.format(index)) + if not os.path.exists(path): + break + chain.append(path) + index += 1 + if not chain: + raise FileNotFoundError('no chain files found in: {}'.format(self.basedir)) + return chain + + + def current(self): + """Return the path of the most recent XML file in the chain.""" + return self.chain[-1] + + + def derive_next(self): + """Derive the next output file path based on the current chain length. + + genesis = index 0, first entry output = index 1, and so on. + """ + next_index = len(self.chain) + basename = '{}.xml'.format(next_index) + return os.path.join(self.basedir, basename) + + + def advance(self, written_path): + """Call this after successfully writing an entry output file. + + Verifies the file exists before appending to the chain. + """ + written_path = os.path.realpath(written_path) + if not os.path.exists(written_path): + raise FileNotFoundError( + 'written file not found, cannot advance chain: {}'.format(written_path) + ) + self.chain.append(written_path) + logg.debug('chain advanced to: {}'.format(written_path)) + + + def load_current(self): + """Load and return a fresh Ledger instance from the current chain tail.""" + ledger_path = self.current() + ledger_tree = load(ledger_path) + ledger = Ledger.from_tree(ledger_tree) + logg.debug('loaded ledger from: {}'.format(ledger_path)) + return ledger + + + def write_entry(self, entry, ledger, store, wallet): + """Sign and write an entry, then advance the chain. + + Encapsulates the full write sequence: + entry.sign → store.add_entry → ledger.truncate → ledger.sign → write file + """ + next_path = self.derive_next() + + db = ValkeyStore('') + + store = LedgerStore(db, ledger) + pk = store.get_key() + wallet = DemoWallet(privatekey=pk) + logg.debug("wallet pk: %s pubk: %s", wallet.privkey().hex(), wallet.pubkey().hex()) + ledger.set_wallet(wallet) + + + ledger.acl = ACL.from_wallet(wallet) + self.ledger = ledger + + entry.sign(wallet) + store.add_entry(entry, update_ledger=True) + ledger.truncate() + ledger.sign() + + with open(next_path, 'wb') as f: + f.write(ledger.to_string()) + logg.debug('entry written to: {}'.format(next_path)) + + self.advance(next_path) + return next_path + + + def depth(self): + """Return the number of files in the chain including genesis.""" + return len(self.chain) + + + def __repr__(self): + return 'LedgerChainManager(depth={}, current={})'.format(self.depth(), self.current()) +\ No newline at end of file diff --git a/dummy/usawa/core/entry_service.py b/dummy/usawa/core/entry_service.py @@ -0,0 +1,67 @@ +import logging +from datetime import datetime +import uuid + +from .models import LedgerEntry +from usawa.storage.ledger_repository import LedgerRepository + +logg = logging.getLogger("core.entry_service") + + +class EntryService: + """Business logic for ledger entries""" + + def __init__(self, repository: LedgerRepository): + self.repository = repository + + def save_entry(self, entry: LedgerEntry) -> tuple[bool, str]: + try: + entry.tx_date = datetime.now() + entry.date_registered = datetime.now() + entry.transaction_ref = self._generate_transaction_ref() + + is_valid, error_msg = entry.validate() + if not is_valid: + logg.error(f"Entry validation failed: {error_msg}") + return False, error_msg + + self.repository.save(entry) + + logg.info(f"Entry saved successfully") + return True, "" + + except FileExistsError as e: + error_msg = ( + "Some file information for this entry is already recorded in the ledger" + ) + return False, error_msg + + except ValueError as e: + error_msg = f"Invalid entry data: {str(e)}" + logg.error(f"Validation error: {e}") + return False, error_msg + + except IOError as e: + error_msg = f"File error: {str(e)}" + logg.error(f"File operation failed: {e}") + return False, error_msg + + except Exception as e: + error_msg = f"Failed to save entry: {str(e)}" + logg.error(f"Unexpected error: {e}", exc_info=True) + return False, error_msg + + def get_all_entries(self): + return self.repository.get_all_entries() + + def _generate_transaction_ref(self) -> str: + return str(uuid.uuid4()) + + def get_asset_bytes(self, digest: bytes) -> bytes: + return self.repository.get_asset_bytes(digest=digest) + + def export_all_entries_to_xml(self, output_path: str) -> tuple[bool, str]: + return self.repository.export_all_entries_to_xml(output_path=output_path) + + def export_entry_to_xml(self, serial: int, output_path: str) -> tuple[bool, str]: + return self.repository.export_entry_to_xml(serial, output_path) diff --git a/dummy/usawa/core/models.py b/dummy/usawa/core/models.py @@ -0,0 +1,95 @@ +from dataclasses import dataclass,field +from typing import Optional,List, Union +from datetime import datetime +from pathlib import Path + +@dataclass +class LedgerEntry: + """Ledger entry data model""" + + # Basic details + external_reference: Optional[str] = None + description: Optional[str] = None + + # Transaction details + amount: float = 0.0 + source_unit: str = "" + source_type: str = "" + source_path: str = "general" + dest_unit: str = "" + dest_type: str = "" + dest_path: str = "general" + + # Attachments + attachments: List[str] = field(default_factory=list) + + # Signers (public keys) + signer_pubkeys: List[str] = field(default_factory=list) + + serial: Optional[int] = None + tx_date: Optional[datetime] = None + tx_reference: Optional[str] = None + date_registered: Optional[datetime] = None + parent_digest: Optional[str] = None + unit_index: Optional[int] = None + + def validate(self) -> tuple[bool, str]: + """Validate entry data""" + if self.amount <= 0: + return False, "Amount must be greater than 0" + + if not self.source_unit or not self.dest_unit: + return False, "Unit/Currency is required for both source and destination" + + if not self.source_type or not self.dest_type: + return False, "Account type is required for both source and destination" + + if not self.source_path or not self.dest_path: + return False, "Account path is required for both source and destination" + + # Validate attachments + if self.attachments: + for filepath in self.attachments: + if not Path(filepath).exists(): + return False, f"Attachment file not found: {filepath}" + + return True, "" + + def __repr__(self): + return ( + f"LedgerEntry(" + f"external_reference={self.external_reference!r}, " + f"description={self.description!r}, " + f"serial={self.serial!r}, " + f"parent_digest={self.parent_digest}, " + f"amount={self.amount}, " + f"source_unit={self.source_unit}, " + f"tx_ref={self.tx_reference}, " + f"source_type={self.source_type}, " + f"dest_unit={self.dest_unit}, " + f"dest_type={self.dest_type})" + f"attachments={self.attachments!r})" + ) + + def add_attachment(self, filepath: Union[str, List[str]]): + """ + Add one or more attachment file paths + """ + if isinstance(filepath, str): + if filepath not in self.attachments: + self.attachments.append(filepath) + elif isinstance(filepath, list): + for path in filepath: + if path not in self.attachments: + self.attachments.append(path) + else: + raise TypeError(f"filepath must be str or List[str], got {type(filepath)}") + + def remove_attachment(self, filepath: str): + """Remove an attachment file path""" + if filepath in self.attachments: + self.attachments.remove(filepath) + + def get_attachment_count(self) -> int: + """Get number of attachments""" + return len(self.attachments) diff --git a/dummy/usawa/data/usawa.ini b/dummy/usawa/data/usawa.ini @@ -1,3 +1,10 @@ [valkey] host = port = +[server] +socket_file_path= +[sigs] +default_public_key= +default_private_key = +[fs_resolver] +store_path= +\ No newline at end of file diff --git a/dummy/usawa/gui/__init__.py b/dummy/usawa/gui/__init__.py @@ -0,0 +1,41 @@ +import logging +import signal +import gi + +gi.require_version('Gtk', '4.0') +gi.require_version('Adw', '1') +from gi.repository import Gtk, Adw,Gio +from .main_window import UsawaMainWindow +from usawa.config import load as load_config +logg = logging.getLogger("gui.app") + +class Usawa(Adw.Application): + def __init__(self, *args, **kwargs): + super().__init__(flags= Gio.ApplicationFlags.HANDLES_COMMAND_LINE,*args, **kwargs) + self.win = None + self.ledger_file = None + self.cfg = None + self.connect('activate', self.on_activate) + signal.signal(signal.SIGINT, self._handle_sigint) + + + def on_activate(self, app): + self.win = UsawaMainWindow(application=app,ledger_path=self.ledger_file) + self.win.present() + + + def _handle_sigint(self,*_): + logg.debug("shutdown") + self.quit() + + def do_command_line(self, cli): + args = cli.get_arguments() + + if len(args) > 1: + self.ledger_file = args[1] + + self.cfg = load_config() + for k in self.cfg.all(): + logg.debug("config {} => {}".format(k, self.cfg.get(k))) + self.activate() + return 0 +\ No newline at end of file diff --git a/dummy/usawa/gui/controllers/entry_controller.py b/dummy/usawa/gui/controllers/entry_controller.py @@ -0,0 +1,73 @@ +import logging +from typing import Optional + +from usawa.core.entry_service import EntryService +from ...core.models import LedgerEntry + +logg = logging.getLogger("gui.entry_controller") + + +class EntryController: + """Handles entry creation logic""" + + def __init__(self, entry_service: EntryService): + self.entry_service = entry_service + self._entry_created_listeners = [] + + def collect_entry_data(self, view) -> Optional[LedgerEntry]: + """Collect data from the view and create an entry""" + try: + entry = LedgerEntry( + external_reference=view.ref_entry.get_text().strip() or None, + description=view.desc_entry.get_text().strip() or None, + amount=float(view.amount_entry.get_text() or "0"), + source_unit="BTC", + source_type=view.get_source_type(), + source_path=view.source_path_entry.get_text().strip(), + dest_unit="BTC", + dest_type=view.get_dest_type(), + dest_path=view.dest_path_entry.get_text().strip(), + ) + + is_valid, error_msg = entry.validate() + if not is_valid: + logg.error(f"Validation failed: {error_msg}") + return None + return entry + + except ValueError as e: + logg.error(f"Failed to collect entry data: {e}") + return None + + def finalize_entry(self, entry: LedgerEntry) -> tuple[bool, str]: + logg.debug("Finalizing entry: %s", entry) + success, error_msg = self.entry_service.save_entry(entry) + + if success: + return True, "" + else: + return False, error_msg + + def get_all_entries(self): + return self.entry_service.get_all_entries() + + def add_entry_created_listener(self, callback): + self._entry_created_listeners.append(callback) + + def notify_entry_created(self): + for callback in self._entry_created_listeners: + callback() + + def next_serial(self, entries: list | None = None) -> int: + if entries is None: + entries = self.get_all_entries() + return len(entries) + 1 + + def get_asset_bytes(self, digest: bytes) -> bytes: + return self.entry_service.get_asset_bytes(digest=digest) + + def export_ledger(self, output_path: str) -> tuple[bool, str]: + return self.entry_service.export_all_entries_to_xml(output_path) + + def export_entry(self, serial: int, output_path: str) -> tuple[bool, str]: + return self.entry_service.export_entry_to_xml(serial, output_path) diff --git a/dummy/usawa/gui/main_window.py b/dummy/usawa/gui/main_window.py @@ -0,0 +1,130 @@ +import logging +from usawa.service import UnixClient +from usawa.core.entry_service import EntryService +from usawa.storage.ledger_repository import LedgerRepository +from gi.repository import Adw, Gtk, Gio + +from usawa.gui.controllers.entry_controller import EntryController +from usawa.gui.views.entry_list_view import EntryListView +from datetime import datetime +from whee.valkey import ValkeyStore + +logg = logging.getLogger("gui.mainwindow") + + +class UsawaMainWindow(Adw.ApplicationWindow): + + def __init__(self, application, ledger_path=None, **kwargs): + super().__init__(application=application, **kwargs) + + self.set_title("Usawa") + self.set_default_size(1000, 600) + + toolbar_view = Adw.ToolbarView() + self.set_content(toolbar_view) + + header = Adw.HeaderBar() + + menu_button = self._create_menu_button() + header.pack_end(menu_button) + + toolbar_view.add_top_bar(header) + + self.toast_overlay = Adw.ToastOverlay() + toolbar_view.set_content(self.toast_overlay) + + cfg = self.get_application().cfg + self.client = UnixClient(path=cfg.get("SERVER_SOCKET_FILE_PATH")) + self.valkey_store = ValkeyStore( + "", host=cfg.get("VALKEY_HOST"), port=cfg.get("VALKEY_PORT") + ) + + repository = LedgerRepository( + ledger_path=ledger_path, + unix_client=self.client, + valkey_store=self.valkey_store, + cfg=cfg, + ) + entry_service = EntryService(repository=repository) + self.entry_controller = EntryController(entry_service=entry_service) + self.entry_controller.add_entry_created_listener(self.refresh_entries) + + self.nav_view = Adw.NavigationView() + self.toast_overlay.set_child(self.nav_view) + + entry_list_page = self._create_entry_list_page() + self.nav_view.add(entry_list_page) + + self._setup_actions() + + def _create_menu_button(self): + menu = Gio.Menu() + menu.append("Export Ledger", "app.export-ledger") + + menu_button = Gtk.MenuButton() + menu_button.set_icon_name("open-menu-symbolic") + menu_button.set_menu_model(menu) + menu_button.set_tooltip_text("Main menu") + + return menu_button + + def _setup_actions(self): + export_action = Gio.SimpleAction.new("export-ledger", None) + export_action.connect("activate", lambda a, p: self._on_export_clicked()) + self.get_application().add_action(export_action) + + def _on_export_clicked(self): + file_dialog = Gtk.FileDialog() + file_dialog.set_title("Export Ledger to XML") + default_name = f"ledger_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml" + file_dialog.set_initial_name(default_name) + + filters = Gio.ListStore.new(Gtk.FileFilter) + + xml_filter = Gtk.FileFilter() + xml_filter.set_name("XML Files") + xml_filter.add_pattern("*.xml") + filters.append(xml_filter) + + file_dialog.set_filters(filters) + file_dialog.set_default_filter(xml_filter) + + file_dialog.save(parent=self, callback=self._on_export_file_selected) + + def _on_export_file_selected(self, dialog, result): + try: + file = dialog.save_finish(result) + if file: + file_path = file.get_path() + success, error_msg = self.entry_controller.export_ledger(file_path) + + if success: + self._show_success_toast(f"Exported to {file_path}") + else: + self._show_error_dialog("Export Failed", error_msg) + except Exception as e: + logg.debug(f"Export cancelled: {e}") + + def _show_success_toast(self, message): + toast = Adw.Toast.new(message) + toast.set_timeout(3) + self.toast_overlay.add_toast(toast) + + def _create_entry_list_page(self): + page = Adw.NavigationPage(title="Ledger Entries", tag="entry-list") + entries = [] + self.entry_list_view = EntryListView( + nav_view=self.nav_view, + entry_controller=self.entry_controller, + entries=entries, + refresh_callback=self.refresh_entries, + toast_overlay=self.toast_overlay, + ) + self.entry_list_view._load_entries() + page.set_child(self.entry_list_view) + + return page + + def refresh_entries(self): + logg.info("MainWindow refreshing entries") + self.entry_list_view._load_entries() diff --git a/dummy/usawa/gui/models/__init__.py b/dummy/usawa/gui/models/__init__.py diff --git a/dummy/usawa/gui/models/entry_item.py b/dummy/usawa/gui/models/entry_item.py @@ -0,0 +1,63 @@ +from gi.repository import GObject + + +class EntryItem(GObject.Object): + """Data model for a ledger entry""" + + serial = GObject.Property(type=int, default=0) + parent_digest = GObject.Property(type=str, default="") + tx_date = GObject.Property(type=str, default="") + tx_ref = GObject.Property(type=str, default="") + tx_date_ref = GObject.Property(type=str, default="") + description = GObject.Property(type=str, default="") + auth_state = GObject.Property(type=str, default="") + amount = GObject.Property(type=str, default="") + source_unit = GObject.Property(type=str, default="") + unit_index = GObject.Property(type=str, default="") + source_type = GObject.Property(type=str, default="") + source_path = GObject.Property(type=str, default="") + dest_unit = GObject.Property(type=str, default="") + dest_type = GObject.Property(type=str, default="") + dest_path = GObject.Property(type=str, default="") + attachments = GObject.Property(type=str, default="") + signers = GObject.Property(type=str, default="") + + def __init__(self, serial=0,parent_digest = "",tx_date="",tx_date_rg= "",tx_ref ="" ,description="", auth_state="unsigned", + amount = "",source_unit="", unit_index = "",source_type="", source_path="", + dest_unit="", dest_type="", dest_path="",attachments = "",attachments_raw= None,signers="", signers_raw=None): + super().__init__() + self.serial = serial + self.parent_digest = parent_digest + self.tx_date = tx_date + self.tx_ref = tx_ref + self.tx_date_rg = tx_date_rg + self.description = description + self.auth_state = auth_state + self.amount = amount + self.unit_index = unit_index + self.source_unit = source_unit + self.source_type = source_type + self.source_path = source_path + self.dest_unit = dest_unit + self.dest_type = dest_type + self.dest_path = dest_path + self.attachments = attachments + self.attachments_raw = attachments_raw or [] + self.signers = signers + self.signers_raw = signers_raw or [] + + + + def __repr__(self): + return ( + f"EntryItem(" + f"serial={self.serial}," + f"description={self.description}," + f"source_type={self.source_type}, " + f"destination_type={self.dest_type}, " + f"amount_source={self.source_unit}:{self.source_path}, " + f"amount_dest={self.dest_unit}:{self.dest_path}, " + f"attachments='{self.attachments}', " + f"attachments_count={len(self.attachments_raw) if hasattr(self, 'attachments_raw') else 0}" + f")" + ) +\ No newline at end of file diff --git a/dummy/usawa/gui/views/create_entry_view.py b/dummy/usawa/gui/views/create_entry_view.py @@ -0,0 +1,530 @@ +import logging +from gi.repository import Gtk, Adw,Gio,Pango +from pathlib import Path +import mimetypes + +logg = logging.getLogger("gui.create_entry_view") + + +def create_entry_page(nav_view, controller): + """Create a new entry page""" + page = Adw.NavigationPage( + title="Create New Entry", + tag="create-entry" + ) + + view = CreateEntryView(nav_view, controller) + page.set_child(view) + + return page + + +class CreateEntryView(Gtk.Box): + """Create entry view - UI ONLY""" + + def __init__(self, nav_view, controller): + super().__init__(orientation=Gtk.Orientation.VERTICAL, spacing=0) + + self.nav_view = nav_view + self.controller = controller + self.attachment_paths: list[str] = [] + + self._build_ui(nav_view) + + def _build_ui(self,nav_view): + """Build the UI""" + header = self._create_header(nav_view=nav_view) + self.append(header) + + scrolled = Gtk.ScrolledWindow() + scrolled.set_vexpand(True) + scrolled.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) + + content = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=16) + content.set_margin_top(16) + content.set_margin_bottom(16) + content.set_margin_start(16) + content.set_margin_end(16) + + # Sections + basic_section = self._create_basic_section() + content.append(basic_section) + + transaction_section = self._create_transaction_section() + content.append(transaction_section) + + attachments_section = self._create_attachments_section() + content.append(attachments_section) + + warning = self._create_warning() + content.append(warning) + + scrolled.set_child(content) + self.append(scrolled) + + action_bar = self._create_action_bar() + self.append(action_bar) + + + def _create_header(self,nav_view): + """Create the header with back button and serial number""" + header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12) + header_box.set_margin_top(12) + header_box.set_margin_bottom(12) + header_box.set_margin_start(12) + header_box.set_margin_end(12) + + back_btn = Gtk.Button() + back_btn.set_icon_name("go-previous-symbolic") + back_btn.add_css_class("flat") + back_btn.connect("clicked", lambda b: nav_view.pop()) + header_box.append(back_btn) + + title = Gtk.Label(label="Create new Entry") + title.add_css_class("title-2") + header_box.append(title) + + serial_badge = Gtk.Label(label=f"Next Serial: #{self.controller.next_serial():04d}") + serial_badge.add_css_class("caption") + serial_badge.add_css_class("accent") + serial_badge.set_margin_start(8) + serial_badge.set_margin_end(8) + serial_badge.set_margin_top(4) + serial_badge.set_margin_bottom(4) + header_box.append(serial_badge) + return header_box + + def _create_basic_section(self): + """Create basic details section - UI ONLY""" + section_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + + + header = Gtk.Label(label="BASIC DETAILS") + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + section_box.append(header) + + + grid = Gtk.Grid() + grid.set_column_spacing(12) + grid.set_row_spacing(12) + + ref_label = Gtk.Label(label="External reference (optional)") + ref_label.set_halign(Gtk.Align.START) + ref_label.add_css_class("dim-label") + grid.attach(ref_label, 0, 0, 2, 1) + + self.ref_entry = Gtk.Entry() + self.ref_entry.set_hexpand(True) + grid.attach(self.ref_entry, 0, 1, 2, 1) + + desc_label = Gtk.Label(label="Description (optional)") + desc_label.set_halign(Gtk.Align.START) + desc_label.add_css_class("dim-label") + grid.attach(desc_label, 0, 2, 2, 1) + + self.desc_entry = Gtk.Entry() + self.desc_entry.set_placeholder_text("Brief description of the entry") + self.desc_entry.set_hexpand(True) + grid.attach(self.desc_entry, 0, 3, 2, 1) + + section_box.append(grid) + return section_box + + def _create_transaction_section(self): + """Create transaction details section with per-side currencies""" + section_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + + header = Gtk.Label(label="TRANSACTION DETAILS") + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + section_box.append(header) + + amount_label = Gtk.Label(label="Amount") + amount_label.set_halign(Gtk.Align.START) + amount_label.add_css_class("dim-label") + section_box.append(amount_label) + + self.amount_entry = Gtk.Entry() + self.amount_entry.set_placeholder_text("0.00") + section_box.append(self.amount_entry) + + ledger_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=16) + ledger_box.set_homogeneous(True) + + + source_card = self._create_ledger_side_card("Source", is_source=True) + ledger_box.append(source_card) + + + dest_card = self._create_ledger_side_card("Destination", is_source=False) + ledger_box.append(dest_card) + + section_box.append(ledger_box) + + return section_box + + + def _create_attachments_section(self): + """Create the attachments section""" + section_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + + + header = Gtk.Label(label="ATTACHMENTS") + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + section_box.append(header) + + attachments_card = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) + attachments_card.add_css_class("card") + attachments_card.set_margin_top(8) + attachments_card.set_margin_bottom(8) + attachments_card.set_margin_start(8) + attachments_card.set_margin_end(8) + + + header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + header_box.set_margin_top(8) + header_box.set_margin_start(8) + header_box.set_margin_end(8) + + attach_label = Gtk.Label(label="Add supporting Documents") + attach_label.set_halign(Gtk.Align.START) + attach_label.set_hexpand(True) + header_box.append(attach_label) + + add_btn = Gtk.Button() + add_btn.set_icon_name("list-add-symbolic") + add_btn.set_label("Add File") + add_btn.add_css_class("flat") + add_btn.connect("clicked", self.on_add_attachment) + header_box.append(add_btn) + + attachments_card.append(header_box) + + self.attachment_list = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) + self.attachment_list.set_margin_start(8) + self.attachment_list.set_margin_end(8) + self.attachment_list.set_margin_bottom(8) + + attachments_card.append(self.attachment_list) + + section_box.append(attachments_card) + + return section_box + + + def _create_attachment_row(self, filename: str, file_path: str, metadata: str): + """Create a single attachment row""" + row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + row.add_css_class("card") + row.set_margin_top(4) + row.set_margin_bottom(4) + row.set_margin_start(4) + row.set_margin_end(4) + + row.file_path = file_path + + icon = self._get_icon_for_file(filename, metadata) + row.append(icon) + + info_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=2) + info_box.set_hexpand(True) + + name_label = Gtk.Label(label=filename) + name_label.set_halign(Gtk.Align.START) + name_label.add_css_class("caption") + name_label.set_ellipsize(Pango.EllipsizeMode.MIDDLE) + info_box.append(name_label) + + meta_label = Gtk.Label(label=metadata) + meta_label.set_halign(Gtk.Align.START) + meta_label.add_css_class("dim-label") + meta_label.add_css_class("caption") + info_box.append(meta_label) + + row.append(info_box) + + remove_btn = Gtk.Button(label="Remove") + remove_btn.add_css_class("destructive-action") + remove_btn.connect("clicked", lambda b: self._on_remove_attachment(row)) + row.append(remove_btn) + + return row + + + + def _create_ledger_side_card( self,title, is_source=True): + """Create a card for source or destination with its own unit""" + card = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10) + card.add_css_class("card") + card.set_margin_top(8) + card.set_margin_bottom(8) + card.set_margin_start(8) + card.set_margin_end(8) + + header = Gtk.Label(label=title) + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + header.set_margin_top(8) + header.set_margin_start(8) + card.append(header) + + + fields = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) + fields.set_margin_start(8) + fields.set_margin_end(8) + fields.set_margin_bottom(8) + + unit_label = Gtk.Label(label="Unit") + unit_label.set_halign(Gtk.Align.START) + unit_label.add_css_class("caption") + fields.append(unit_label) + + units = Gtk.StringList() + units.append("Bitcoin(BTC)") + + + unit_dropdown = Gtk.DropDown(model=units) + unit_dropdown.set_selected(0) + fields.append(unit_dropdown) + + type_label = Gtk.Label(label="Account type") + type_label.set_halign(Gtk.Align.START) + type_label.add_css_class("caption") + fields.append(type_label) + + types = Gtk.StringList() + if is_source: + types.append("Expense") + types.append("Asset") + types.append("Liability") + types.append("Income") + else: + types.append("Asset") + types.append("Expense") + types.append("Liability") + types.append("Income") + + type_dropdown = Gtk.DropDown(model=types) + fields.append(type_dropdown) + + + path_label = Gtk.Label(label="Account path") + path_label.set_halign(Gtk.Align.START) + path_label.add_css_class("caption") + fields.append(path_label) + + path_entry = Gtk.Entry() + path_entry.set_text("general") + fields.append(path_entry) + + card.append(fields) + + + if is_source: + self.source_unit_dropdown = unit_dropdown + self.source_type_dropdown = type_dropdown + self.source_path_entry = path_entry + else: + self.dest_unit_dropdown = unit_dropdown + self.dest_type_dropdown = type_dropdown + self.dest_path_entry = path_entry + + return card + + + def _create_warning(self): + """Create warning note""" + warning = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + warning.add_css_class("card") + warning.add_css_class("warning") + warning.set_margin_top(8) + warning.set_margin_bottom(8) + warning.set_margin_start(8) + warning.set_margin_end(8) + + icon = Gtk.Image.new_from_icon_name("dialog-warning-symbolic") + warning.append(icon) + + text = Gtk.Label() + text.set_markup("<b>Note:</b> Entries cannot be saved as drafts. You must either Finalize or Discard.") + text.set_wrap(True) + text.set_hexpand(True) + text.set_halign(Gtk.Align.START) + warning.append(text) + + return warning + + def _create_action_bar(self): + """Create action bar with buttons""" + action_bar = Gtk.ActionBar() + + discard_btn = Gtk.Button(label="Discard") + discard_btn.add_css_class("destructive-action") + discard_btn.connect("clicked", self._on_discard) + action_bar.pack_start(discard_btn) + + finalize_btn = Gtk.Button(label="Finalize Entry") + finalize_btn.add_css_class("suggested-action") + finalize_btn.connect("clicked", self._on_finalize) + action_bar.pack_end(finalize_btn) + + return action_bar + + def get_source_unit(self) -> str: + """Get selected source unit""" + selected_idx = self.source_unit_dropdown.get_selected() + model = self.source_unit_dropdown.get_model() + return model.get_string(selected_idx) + + def get_source_type(self) -> str: + """Get selected source type""" + selected_idx = self.source_type_dropdown.get_selected() + model = self.source_type_dropdown.get_model() + return model.get_string(selected_idx) + + def get_dest_unit(self) -> str: + """Get selected dest unit""" + selected_idx = self.dest_unit_dropdown.get_selected() + model = self.dest_unit_dropdown.get_model() + return model.get_string(selected_idx) + + def get_dest_type(self) -> str: + """Get selected dest type""" + selected_idx = self.dest_type_dropdown.get_selected() + model = self.dest_type_dropdown.get_model() + return model.get_string(selected_idx) + + def _on_discard(self, button): + """Handle discard button""" + self.nav_view.pop() + + + def on_add_attachment(self, button): + """Handle add attachment button - multiple files""" + logg.info("Add attachment clicked") + + file_dialog = Gtk.FileDialog() + file_dialog.set_title("Select Attachments") + + + filters = Gio.ListStore.new(Gtk.FileFilter) + all_filter = Gtk.FileFilter() + all_filter.set_name("All Files") + all_filter.add_pattern("*") + filters.append(all_filter) + file_dialog.set_filters(filters) + + file_dialog.open_multiple( + parent=self.get_root(), + callback=self._on_files_selected + ) + + def _on_files_selected(self, dialog, result): + """Handle multiple file selection""" + try: + files = dialog.open_multiple_finish(result) + + if files: + for i in range(files.get_n_items()): + file = files.get_item(i) + file_path = file.get_path() + logg.info(f"File selected: {file_path}") + self.attachment_paths.append(file_path) + self._add_attachment_to_list(file_path) + + except Exception as e: + logg.debug(f"File selection cancelled or failed: {e}") + + + def _get_icon_for_file(self, filename: str, metadata: str): + """Get appropriate icon for file type""" + icon_name = "text-x-generic-symbolic" + + if "pdf" in metadata.lower(): + icon_name = "application-pdf-symbolic" + elif "image" in metadata.lower(): + icon_name = "image-x-generic-symbolic" + elif "text" in metadata.lower(): + icon_name = "text-x-generic-symbolic" + elif "video" in metadata.lower(): + icon_name = "video-x-generic-symbolic" + elif "audio" in metadata.lower(): + icon_name = "audio-x-generic-symbolic" + + icon = Gtk.Image.new_from_icon_name(icon_name) + return icon + + def _add_attachment_to_list(self, file_path: str): + path = Path(file_path) + + filename = path.name + file_size = path.stat().st_size + mime_type, _ = mimetypes.guess_type(file_path) + + if mime_type is None: + mime_type = "application/octet-stream" + + size_str = self._format_file_size(file_size) + + attachment_row = self._create_attachment_row( + filename=filename, + file_path=file_path, + metadata=f"{mime_type} • {size_str}" + ) + + self.attachment_list.append(attachment_row) + logg.info(f"Added attachment: {filename} ({size_str})") + + def _format_file_size(self, size_bytes: int) -> str: + """Format file size in human-readable form""" + for unit in ['B', 'KB', 'MB', 'GB']: + if size_bytes < 1024.0: + return f"{size_bytes:.1f} {unit}" + size_bytes /= 1024.0 + return f"{size_bytes:.1f} TB" + + def _on_finalize(self, button): + entry = self.controller.collect_entry_data(self) + + if entry is None: + self._show_error_dialog( + "Invalid Input", + "Please check your entries and try again." + ) + return + + if self.attachment_paths: + entry.add_attachment(self.attachment_paths) + + if entry.attachments: + for attachment_path in entry.attachments: + if not Path(attachment_path).exists(): + self._show_error_dialog( + "Attachment Missing", + f"Attachment file not found: {Path(attachment_path).name}" + ) + return + + success, error_msg = self.controller.finalize_entry(entry) + + if success: + self.controller.notify_entry_created() + logg.info("Entry saved successfully, returning to list") + self.nav_view.pop() + else: + self._show_error_dialog("Save Failed", error_msg) + + + def _show_error_dialog(self, title, message): + dialog = Adw.MessageDialog( + transient_for=self.get_root(), + heading=title, + body=message + ) + dialog.add_response("ok", "OK") + dialog.set_response_appearance("ok", Adw.ResponseAppearance.DESTRUCTIVE) + dialog.connect("response", lambda d, response: d.close()) + + dialog.present() +\ No newline at end of file diff --git a/dummy/usawa/gui/views/entry_details_view.py b/dummy/usawa/gui/views/entry_details_view.py @@ -0,0 +1,570 @@ +import logging +import threading +from gi.repository import Gtk, Adw, Pango, Gdk, GdkPixbuf, GLib, Gio +import threading +import tempfile +import subprocess +import logging +from datetime import datetime + + +logg = logging.getLogger("gui.entry_details_view") + + +def create_entry_details_page( + entry, nav_view, entry_controller, toast_overlay, fetch_fn +): + """Create an entry details page for the navigation stack""" + page = Adw.NavigationPage(title="Entry Details", tag=f"entry-{entry.serial}") + view = EntryDetailsView(entry, nav_view, entry_controller, toast_overlay, fetch_fn) + page.set_child(view) + return page + + +class EntryDetailsView(Gtk.Box): + """Entry details view""" + + def __init__(self, entry, nav_view, entry_controller, toast_overlay, fetch_fn): + super().__init__(orientation=Gtk.Orientation.VERTICAL, spacing=0) + self.entry = entry + self.nav_view = nav_view + self.entry_controller = entry_controller + self.toast_overlay = toast_overlay + self.fetch_fn = fetch_fn + self._build_ui() + + def _build_ui(self): + self.append(self._create_header()) + + scrolled = Gtk.ScrolledWindow() + scrolled.set_vexpand(True) + scrolled.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) + + content = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=20) + content.set_margin_top(20) + content.set_margin_bottom(20) + content.set_margin_start(20) + content.set_margin_end(20) + + content.append(self._create_entry_details_section()) + content.append(self._create_transaction_section()) + content.append(self._create_attachments_section()) + + scrolled.set_child(content) + self.append(scrolled) + + def _create_header(self): + header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12) + header_box.set_margin_top(12) + header_box.set_margin_bottom(12) + header_box.set_margin_start(12) + header_box.set_margin_end(12) + header_box.add_css_class("toolbar") + + back_btn = Gtk.Button() + back_btn.set_icon_name("go-previous-symbolic") + back_btn.add_css_class("flat") + back_btn.connect("clicked", lambda b: self.nav_view.pop()) + header_box.append(back_btn) + + spacer = Gtk.Box() + spacer.set_hexpand(True) + header_box.append(spacer) + + export_btn = Gtk.Button() + export_btn.set_label("Export") + export_btn.set_tooltip_text("Export this entry to XML") + export_btn.add_css_class("suggested-action") + export_btn.connect("clicked", self._on_export_clicked) + header_box.append(export_btn) + + return header_box + + def _create_entry_details_section(self): + section_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + + header = Gtk.Label(label="ENTRY DETAILS") + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + section_box.append(header) + + grid = Gtk.Grid() + grid.set_column_spacing(16) + grid.set_row_spacing(12) + grid.set_column_homogeneous(True) + + _add_field_to_grid(grid, "Serial number", str(self.entry.serial), 0, 0) + _add_field_to_grid(grid, "Transaction reference(uuid)", self.entry.tx_ref, 0, 1) + _add_field_to_grid(grid, "Transaction date", self.entry.tx_date, 1, 0) + _add_field_to_grid( + grid, + "Date registered", + ( + self.entry.tx_date_rg.strftime("%Y-%m-%d %H:%M:%S") + if self.entry.tx_date_rg + else "" + ), + 1, + 1, + ) + + parent_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + parent_label = Gtk.Label(label="Parent Digest") + parent_label.set_halign(Gtk.Align.START) + parent_label.add_css_class("dim-label") + parent_label.add_css_class("caption") + parent_box.append(parent_label) + + parent_value = Gtk.Label(label=self.entry.parent_digest) + parent_value.set_halign(Gtk.Align.START) + parent_value.set_selectable(True) + parent_value.set_wrap(False) + parent_value.set_ellipsize(Pango.EllipsizeMode.END) + parent_value.set_max_width_chars(70) + parent_value.add_css_class("monospace") + parent_box.append(parent_value) + + grid.attach(parent_box, 0, 2, 2, 1) + signer_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + signer_label = Gtk.Label(label="Signers") + signer_label.set_halign(Gtk.Align.START) + signer_label.add_css_class("dim-label") + signer_label.add_css_class("caption") + signer_box.append(signer_label) + + signers = self.entry.signers_raw + + if len(signers) == 1: + pubkey = signers[0] + short_key = f"{pubkey[:8]}...{pubkey[-6:]}" + + signer_value = Gtk.Label(label=short_key) + signer_value.set_halign(Gtk.Align.START) + signer_value.set_selectable(True) + signer_value.set_tooltip_text(pubkey) + signer_value.add_css_class("monospace") + + signer_box.append(signer_value) + + elif len(signers) > 1: + expander = Gtk.Expander(label=f"{len(signers)} signers") + + key_list = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + + for pubkey in signers: + short_key = f"{pubkey[:8]}...{pubkey[-6:]}" + key_label = Gtk.Label(label=short_key) + key_label.set_halign(Gtk.Align.START) + key_label.set_selectable(True) + key_label.set_tooltip_text(pubkey) + key_label.add_css_class("monospace") + key_list.append(key_label) + + expander.set_child(key_list) + signer_box.append(expander) + else: + none_label = Gtk.Label(label="No signatures") + none_label.set_halign(Gtk.Align.START) + signer_box.append(none_label) + grid.attach(signer_box, 0, 3, 1, 1) + + auth_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + auth_label = Gtk.Label(label="Authentication State") + auth_label.set_halign(Gtk.Align.START) + auth_label.add_css_class("dim-label") + auth_label.add_css_class("caption") + auth_box.append(auth_label) + + auth_badge = _create_auth_badge(self.entry.auth_state) + auth_badge.set_halign(Gtk.Align.START) + auth_box.append(auth_badge) + grid.attach(auth_box, 1, 3, 1, 1) + + desc_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + desc_label = Gtk.Label(label="Description") + desc_label.set_halign(Gtk.Align.START) + desc_label.add_css_class("dim-label") + desc_label.add_css_class("caption") + desc_box.append(desc_label) + + desc_value = Gtk.Label(label=self.entry.description) + desc_value.set_halign(Gtk.Align.START) + desc_value.set_wrap(True) + desc_box.append(desc_value) + grid.attach(desc_box, 0, 4, 2, 1) + + section_box.append(grid) + return section_box + + def _create_transaction_section(self): + section_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + + header = Gtk.Label(label="TRANSACTION DETAILS") + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + section_box.append(header) + + amount_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + amount_label = Gtk.Label(label="Amount") + amount_label.set_halign(Gtk.Align.START) + amount_label.add_css_class("dim-label") + amount_label.add_css_class("caption") + amount_box.append(amount_label) + + amount_value = Gtk.Label(label=self.entry.amount) + amount_value.set_halign(Gtk.Align.START) + amount_value.add_css_class("title-1") + amount_box.append(amount_value) + section_box.append(amount_box) + + ledger_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=16) + ledger_box.set_homogeneous(True) + + logg.debug("Created EntryItem in TX Section: %s", self.entry) + + ledger_box.append( + _create_ledger_card( + "Source", + self.entry.source_unit, + self.entry.source_type, + self.entry.source_path, + is_source=True, + ) + ) + ledger_box.append( + _create_ledger_card( + "Destination", + self.entry.dest_unit, + self.entry.dest_type, + self.entry.dest_path, + is_source=False, + ) + ) + + section_box.append(ledger_box) + return section_box + + def _create_attachments_section(self): + section_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) + + header = Gtk.Label(label="ATTACHMENTS") + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + section_box.append(header) + + attachments_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12) + + if self.entry.attachments_raw: + for asset in self.entry.attachments_raw: + attach_card = _create_attachment_card( + asset.slug or "Unnamed", + asset.mime or "unknown", + on_click=lambda f, a=asset: _open_attachment_viewer( + self.get_root(), a, fetch_fn=self.fetch_fn + ), + ) + attachments_box.append(attach_card) + + section_box.append(attachments_box) + return section_box + + def _on_export_clicked(self, button): + """Handle export button click""" + logg.info(f"Export entry #{self.entry.serial} clicked") + + file_dialog = Gtk.FileDialog() + file_dialog.set_title(f"Export Entry #{self.entry.serial}") + + default_name = f"entry_{self.entry.serial:05d}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml" + file_dialog.set_initial_name(default_name) + + filters = Gio.ListStore.new(Gtk.FileFilter) + + xml_filter = Gtk.FileFilter() + xml_filter.set_name("XML Files") + xml_filter.add_pattern("*.xml") + filters.append(xml_filter) + + all_filter = Gtk.FileFilter() + all_filter.set_name("All Files") + all_filter.add_pattern("*") + filters.append(all_filter) + + file_dialog.set_filters(filters) + file_dialog.set_default_filter(xml_filter) + + file_dialog.save(parent=self.get_root(), callback=self._on_export_file_selected) + + def _on_export_file_selected(self, dialog, result): + """Handle file selection for entry export""" + try: + file = dialog.save_finish(result) + + if file: + file_path = file.get_path() + logg.info(f"Exporting entry #{self.entry.serial} to: {file_path}") + + success, error_msg = self.entry_controller.export_entry( + self.entry.serial, file_path + ) + + if success: + self._show_success_toast( + f"Entry #{self.entry.serial} exported to {file_path}" + ) + else: + self._show_error_dialog("Export Failed", error_msg) + + except Exception as e: + logg.debug(f"Export cancelled or failed: {e}") + + def _show_success_toast(self, message): + """Show success toast notification""" + toast = Adw.Toast.new(message) + toast.set_timeout(3) + self.toast_overlay.add_toast(toast) + + +def _add_field_to_grid(grid, label_text, value_text, row, col): + """Helper to add a field to the grid""" + field_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4) + + label = Gtk.Label(label=label_text) + label.set_halign(Gtk.Align.START) + label.add_css_class("dim-label") + label.add_css_class("caption") + field_box.append(label) + + value = Gtk.Label(label=value_text) + value.set_halign(Gtk.Align.START) + value.add_css_class("monospace") + value.set_selectable(True) + field_box.append(value) + + grid.attach(field_box, col, row, 1, 1) + + +def _create_auth_badge(auth_state): + """Create an authentication state badge""" + badge = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) + badge.set_margin_top(4) + badge.set_margin_bottom(4) + badge.set_margin_start(8) + badge.set_margin_end(8) + badge.add_css_class("badge") + + icon = Gtk.Label() + text = Gtk.Label() + text.add_css_class("caption") + + if auth_state == "trusted": + badge.add_css_class("auth-trusted") + icon.set_text("✓") + text.set_text("Trusted") + elif auth_state == "not_trusted": + badge.add_css_class("auth-not-trusted") + icon.set_text("⚠") + text.set_text("Not Trusted") + elif auth_state == "unknown": + badge.add_css_class("auth-unknown") + icon.set_text("?") + text.set_text("Unknown Key") + elif auth_state == "invalid": + badge.add_css_class("auth-invalid") + icon.set_text("✗") + text.set_text("Invalid") + else: + badge.add_css_class("auth-unsigned") + icon.set_text("○") + text.set_text("No Key") + + badge.append(icon) + badge.append(text) + return badge + + +def _create_ledger_card(title, unit, account_type, path, is_source=True): + """Create a source or destination card""" + card = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) + card.add_css_class("card") + card.set_margin_top(8) + card.set_margin_bottom(8) + card.set_margin_start(8) + card.set_margin_end(8) + + header = Gtk.Label(label=title) + header.set_halign(Gtk.Align.START) + header.add_css_class("heading") + header.set_margin_top(8) + header.set_margin_start(8) + card.append(header) + + fields_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) + fields_box.set_margin_start(8) + fields_box.set_margin_end(8) + fields_box.set_margin_bottom(8) + + _add_label_value_pair(fields_box, "Account Unit:", unit) + _add_label_value_pair(fields_box, "Account Type:", account_type) + _add_label_value_pair(fields_box, "Account Path:", path) + + card.append(fields_box) + return card + + +def _add_label_value_pair(container, label_text, value_text): + """Add a label:value pair to a container""" + row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + + label = Gtk.Label(label=label_text) + label.set_halign(Gtk.Align.START) + label.add_css_class("dim-label") + row.append(label) + + value = Gtk.Label(label=value_text) + value.set_halign(Gtk.Align.START) + value.add_css_class("monospace") + value.set_hexpand(True) + row.append(value) + + container.append(row) + + +def _create_attachment_card(filename, metadata, on_click=None): + """Create an attachment card""" + card = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) + card.add_css_class("card") + card.set_margin_top(8) + card.set_margin_bottom(8) + card.set_size_request(200, -1) + + if on_click: + gesture = Gtk.GestureClick.new() + gesture.connect("pressed", lambda gesture, n_press, x, y: on_click(filename)) + card.add_controller(gesture) + card.set_cursor(Gdk.Cursor.new_from_name("pointer")) + + header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + header_box.set_margin_top(12) + header_box.set_margin_start(12) + header_box.set_margin_end(12) + + icon = Gtk.Image.new_from_icon_name("text-x-generic-symbolic") + header_box.append(icon) + + name_label = Gtk.Label(label=filename) + name_label.set_halign(Gtk.Align.START) + name_label.add_css_class("caption") + name_label.set_ellipsize(Pango.EllipsizeMode.MIDDLE) + header_box.append(name_label) + + card.append(header_box) + + meta_label = Gtk.Label(label=metadata) + meta_label.set_halign(Gtk.Align.START) + meta_label.add_css_class("dim-label") + meta_label.add_css_class("caption") + meta_label.set_margin_start(12) + meta_label.set_margin_bottom(12) + card.append(meta_label) + + return card + + +def _on_attachment_fetched(parent_window, asset, bytes_data): + """Handle fetched attachment data and open appropriate viewer""" + if bytes_data is None: + _show_error_dialog( + parent_window, + "Failed to load attachment", + "Could not retrieve asset data for {}.".format(asset.slug), + ) + return + + mime = asset.mime or "" + if mime.startswith("image/"): + _show_image_viewer(parent_window, asset.slug, bytes_data) + elif mime == "application/pdf": + _show_pdf_viewer(parent_window, asset.slug, bytes_data) + elif mime.startswith("text/") or mime in ( + "application/json", + "application/xml", + ): + _show_text_viewer(parent_window, asset.slug, bytes_data) + else: + _show_unsupported_dialog(parent_window, asset.slug, mime) + + +def _fetch_attachment_data(parent_window, asset, fetch_fn): + """Fetch data and schedule callback on main thread""" + bytes_data = fetch_fn(asset.digest) + GLib.idle_add(_on_attachment_fetched, parent_window, asset, bytes_data) + + +def _open_attachment_viewer(parent_window, asset, fetch_fn): + """Fetch bytes in a background thread then open the appropriate viewer.""" + threading.Thread( + target=_fetch_attachment_data, + args=(parent_window, asset, fetch_fn), + daemon=True, + ).start() + + +def _show_image_viewer(parent, title, data): + dialog = Gtk.Window(title=title) + dialog.set_transient_for(parent) + dialog.set_modal(True) + dialog.set_default_size(800, 600) + + loader = GdkPixbuf.PixbufLoader() + loader.write(data) + loader.close() + pixbuf = loader.get_pixbuf() + + scroll = Gtk.ScrolledWindow() + image = Gtk.Picture.new_for_pixbuf(pixbuf) + image.set_content_fit(Gtk.ContentFit.CONTAIN) + scroll.set_child(image) + dialog.set_child(scroll) + dialog.present() + + +def _show_text_viewer(parent, title, data): + dialog = Gtk.Window(title=title) + dialog.set_transient_for(parent) + dialog.set_modal(True) + dialog.set_default_size(800, 600) + + scroll = Gtk.ScrolledWindow() + text_view = Gtk.TextView() + text_view.set_editable(False) + text_view.set_monospace(True) + text_view.get_buffer().set_text(data.decode("utf-8", errors="replace")) + scroll.set_child(text_view) + dialog.set_child(scroll) + dialog.present() + + +def _show_pdf_viewer(parent, title, data): + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f: + f.write(data) + tmp_path = f.name + subprocess.Popen(["evince", tmp_path]) + + +def _show_unsupported_dialog(parent, filename, mime): + dialog = Gtk.AlertDialog() + dialog.set_message(f"Cannot preview '{filename}'") + dialog.set_detail(f"No viewer available for type: {mime}") + dialog.show(parent) + + +def _show_error_dialog(self, title, message): + dialog = Adw.MessageDialog( + transient_for=self.get_root(), heading=title, body=message + ) + dialog.add_response("ok", "OK") + dialog.set_response_appearance("ok", Adw.ResponseAppearance.DESTRUCTIVE) + dialog.connect("response", lambda d, response: d.close()) + + dialog.present() diff --git a/dummy/usawa/gui/views/entry_list_view.py b/dummy/usawa/gui/views/entry_list_view.py @@ -0,0 +1,642 @@ +from datetime import datetime +import logging +from gi.repository import Gtk, Gio, Pango + +from usawa.gui.models.entry_item import EntryItem +from usawa.gui.views.create_entry_view import create_entry_page +from usawa.gui.views.entry_details_view import create_entry_details_page + +logg = logging.getLogger("gui.entrylist") + + +class EntryListView(Gtk.Box): + """The entry list view with filters, table, and FAB""" + + def __init__( + self, + nav_view, + entry_controller, + toast_overlay, + entries=None, + refresh_callback=None, + **kwargs, + ): + super().__init__(orientation=Gtk.Orientation.VERTICAL, spacing=0, **kwargs) + + self.nav_view = nav_view + self.entry_controller = entry_controller + self.entries = entries or [] + self.refresh_callback = refresh_callback + self.toast_overlay = toast_overlay + self.active_filter = None + + overlay = Gtk.Overlay() + self.append(overlay) + + content = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, + spacing=12, + ) + content.set_margin_top(12) + content.set_margin_bottom(12) + content.set_margin_start(12) + content.set_margin_end(12) + overlay.set_child(content) + + # Sort Section + sort_section = self._create_sort_section() + content.append(sort_section) + + # Filter Section + filter_section = self._create_filter_section() + content.append(filter_section) + + # Table Section + table_section = self._create_table_section() + content.append(table_section) + + # Floating Action Button + fab = Gtk.Button() + fab.set_icon_name("list-add-symbolic") + fab.add_css_class("circular") + fab.add_css_class("suggested-action") + fab.set_halign(Gtk.Align.END) + fab.set_valign(Gtk.Align.END) + fab.set_margin_end(24) + fab.set_margin_bottom(24) + overlay.add_overlay(fab) + fab.connect("clicked", self.on_fab_clicked) + + def _create_sort_section(self): + """Create the Sort by section with toggle buttons""" + sort_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) + + sort_label = Gtk.Label(label="Sort by") + sort_label.set_halign(Gtk.Align.START) + sort_label.add_css_class("heading") + sort_box.append(sort_label) + + button_container = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + button_container.set_margin_top(4) + button_container.set_margin_bottom(4) + button_container.set_margin_start(8) + button_container.set_margin_end(8) + button_container.add_css_class("card") + + self.sort_serial_btn = Gtk.ToggleButton(label="Serial Number") + self.sort_serial_btn.set_active(True) + self.sort_serial_btn.connect("toggled", self.on_sort_changed, "serial") + button_container.append(self.sort_serial_btn) + + self.sort_datetime_btn = Gtk.ToggleButton(label="Transaction Datetime") + self.sort_datetime_btn.connect("toggled", self.on_sort_changed, "datetime") + button_container.append(self.sort_datetime_btn) + + self.sort_serial_btn.set_group(self.sort_datetime_btn) + + sort_box.append(button_container) + + return sort_box + + def _create_filter_section(self): + """Create the Filter section with account type, keyword, and date range""" + filter_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10) + + filter_label = Gtk.Label(label="Filter") + filter_label.set_halign(Gtk.Align.START) + filter_label.add_css_class("heading") + filter_box.append(filter_label) + + filter_container = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=16) + filter_container.set_margin_top(8) + filter_container.set_margin_bottom(8) + filter_container.set_margin_start(8) + filter_container.set_margin_end(8) + filter_container.add_css_class("card") + + account_type_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) + account_type_box.set_hexpand(True) + account_type_box.set_margin_start(8) + account_type_label = Gtk.Label(label="Account type") + account_type_label.set_halign(Gtk.Align.START) + account_type_label.add_css_class("caption") + account_type_label.set_margin_top(4) + account_type_box.set_margin_bottom(4) + account_type_box.append(account_type_label) + + account_types = Gtk.StringList() + account_types.append("All types") + account_types.append("Asset") + account_types.append("Liability") + account_types.append("Income") + account_types.append("Expense") + + self.account_type_dropdown = Gtk.DropDown(model=account_types) + self.account_type_dropdown.set_selected(0) + self.account_type_dropdown.connect("notify::selected", self.on_filter_changed) + account_type_box.append(self.account_type_dropdown) + + filter_container.append(account_type_box) + + keyword_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) + keyword_box.set_hexpand(True) + + keyword_label = Gtk.Label(label="Keyword or Account path") + keyword_label.set_halign(Gtk.Align.START) + keyword_label.add_css_class("caption") + keyword_label.set_margin_top(4) + keyword_box.set_margin_bottom(4) + keyword_box.append(keyword_label) + + self.keyword_entry = Gtk.Entry() + self.keyword_entry.set_placeholder_text("Search in description or path...") + self.keyword_entry.set_text("rent/apartment") + self.keyword_entry.connect("changed", self.on_filter_changed) + keyword_label.set_margin_top(4) + keyword_box.append(self.keyword_entry) + + filter_container.append(keyword_box) + + date_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) + date_box.set_hexpand(True) + + date_label = Gtk.Label(label="Date range") + date_label.set_halign(Gtk.Align.START) + date_label.add_css_class("caption") + date_label.set_margin_top(4) + date_box.set_margin_bottom(4) + date_box.append(date_label) + + date_range_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + + self.date_start_entry = Gtk.Entry() + self.date_start_entry.set_placeholder_text("MM-DD") + self.date_start_entry.set_text("02-10") + self.date_start_entry.set_max_width_chars(10) + self.date_start_entry.connect("changed", self.on_filter_changed) + date_range_box.append(self.date_start_entry) + + to_label = Gtk.Label(label="to") + to_label.add_css_class("dim-label") + date_range_box.append(to_label) + + self.date_end_entry = Gtk.Entry() + self.date_end_entry.set_placeholder_text("MM-DD") + self.date_end_entry.set_text("02-11") + self.date_end_entry.set_max_width_chars(10) + self.date_end_entry.connect("changed", self.on_filter_changed) + date_range_box.append(self.date_end_entry) + + calendar_btn = Gtk.Button() + calendar_btn.set_icon_name("x-office-calendar-symbolic") + calendar_btn.add_css_class("flat") + calendar_btn.connect("clicked", self.on_calendar_clicked) + date_range_box.append(calendar_btn) + + date_box.append(date_range_box) + + filter_container.append(date_box) + + filter_box.append(filter_container) + + return filter_box + + def on_filter_changed(self, widget, *args): + """Handle filter changes - track which filter was last updated""" + if widget == self.account_type_dropdown: + self.active_filter = "account_type" + elif widget == self.keyword_entry: + self.active_filter = "keyword" + elif widget in (self.date_start_entry, self.date_end_entry): + self.active_filter = "date" + + self.refresh_data() + + def on_calendar_clicked(self, button): + logg.info("Calendar button clicked - showing date range picker") + self._start_date = None + self._end_date = None + + dialog = Gtk.Dialog(transient_for=self.get_root(), modal=True) + dialog.set_title("Select Date Range") + dialog.set_default_size(400, 450) + dialog.add_button("Cancel", Gtk.ResponseType.CANCEL) + dialog.add_button("Apply", Gtk.ResponseType.OK) + + content = dialog.get_content_area() + content.set_spacing(16) + content.set_margin_top(12) + content.set_margin_bottom(12) + content.set_margin_start(12) + content.set_margin_end(12) + + main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=16) + content.append(main_box) + + instruction = Gtk.Label( + label="Click once for start date, click again for end date" + ) + instruction.add_css_class("dim-label") + instruction.set_wrap(True) + instruction.set_halign(Gtk.Align.START) + main_box.append(instruction) + + self._calendar = Gtk.Calendar() + self._calendar.set_margin_start(12) + self._calendar.set_margin_end(12) + self._calendar.set_margin_top(8) + self._calendar.set_margin_bottom(8) + main_box.append(self._calendar) + + selection_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=12) + selection_box.set_halign(Gtk.Align.CENTER) + main_box.append(selection_box) + + self._start_label = Gtk.Label() + self._start_label.set_markup("<b>Start:</b> --") + selection_box.append(self._start_label) + selection_box.append(Gtk.Label(label="→")) + self._end_label = Gtk.Label() + self._end_label.set_markup("<b>End:</b> --") + selection_box.append(self._end_label) + + self._calendar.connect("day-selected", self._on_calendar_day_selected) + dialog.connect("response", self._on_calendar_response) + dialog.present() + + def _on_calendar_day_selected(self, cal): + date = cal.get_date() + date_str = "{}-{:02d}-{:02d}".format( + date.get_year(), date.get_month(), date.get_day_of_month() + ) + if self._start_date is None or self._end_date is not None: + self._start_date = date_str + self._end_date = None + self._start_label.set_markup("<b>Start:</b> {}".format(date_str)) + self._end_label.set_markup("<b>End:</b> --") + self._calendar.mark_day(date.get_day_of_month()) + else: + self._end_date = date_str + self._end_label.set_markup("<b>End:</b> {}".format(date_str)) + self._calendar.mark_day(date.get_day_of_month()) + + def _on_calendar_response(self, dialog, response): + if response == Gtk.ResponseType.OK and self._start_date and self._end_date: + logg.info( + "Date range selected: {} to {}".format(self._start_date, self._end_date) + ) + start = datetime.strptime(self._start_date, "%Y-%m-%d").date() + end = datetime.strptime(self._end_date, "%Y-%m-%d").date() + filtered = [ + e + for e in self.entries + if e.tx_date and start <= e.tx_date.date() <= end + ] + self._reload_store(filtered) + self._calendar.unmark_day(self._calendar.get_date().get_day_of_month()) + dialog.destroy() + + def on_sort_changed(self, button, sort_type): + """Handle sort option changes""" + if button.get_active(): + logg.info(f"Sort changed to: {sort_type}") + self._sort_and_reload(sort_type) + + def on_fab_clicked(self, button): + logg.info("FAB clicked - opening create entry window") + + create_page = create_entry_page(self.nav_view, self.entry_controller) + self.nav_view.push(create_page) + + def refresh_data(self): + filtered = list(self.entries) + + if self.active_filter == "account_type": + account_type_idx = self.account_type_dropdown.get_selected() + account_type = self.account_type_dropdown.get_model().get_string( + account_type_idx + ) + if account_type != "All types": + filtered = [ + e + for e in filtered + if e.source_type.lower() == account_type.lower() + or e.dest_type.lower() == account_type.lower() + ] + + elif self.active_filter == "keyword": + keyword = self.keyword_entry.get_text().strip().lower() + if keyword: + filtered = [ + e + for e in filtered + if keyword in (e.description or "").lower() + or keyword in (e.source_path or "").lower() + or keyword in (e.dest_path or "").lower() + ] + + elif self.active_filter == "date": + date_start = self.date_start_entry.get_text().strip() + date_end = self.date_end_entry.get_text().strip() + logg.info("Date range selected: {} to {}".format(date_start, date_end)) + if date_start: + filtered = [e for e in filtered if str(e.tx_date)[5:10] >= date_start] + if date_end: + filtered = [e for e in filtered if str(e.tx_date)[5:10] <= date_end] + + sort_type = "serial" if self.sort_serial_btn.get_active() else "datetime" + if sort_type == "serial": + filtered = sorted(filtered, key=lambda e: e.serial) + else: + filtered = sorted( + filtered, key=lambda e: (e.tx_date, e.serial), reverse=True + ) + + self._reload_store(filtered) + + def on_create_window_closed(self, window): + logg.info("Create entry window closed") + if self.refresh_callback: + self.refresh_callback() + return False + + def _create_table_section(self): + """Create the entry list table with all columns""" + table_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) + + self.entry_store = Gio.ListStore.new(EntryItem) + selection_model = Gtk.SingleSelection.new(self.entry_store) + + column_view = Gtk.ColumnView(model=selection_model) + column_view.add_css_class("data-table") + column_view.set_show_row_separators(True) + column_view.set_show_column_separators(True) + + serial_factory = Gtk.SignalListItemFactory() + serial_factory.connect("setup", self._on_serial_setup) + serial_factory.connect("bind", self._on_serial_bind) + serial_col = Gtk.ColumnViewColumn(title="Serial No", factory=serial_factory) + serial_col.set_fixed_width(70) + column_view.append_column(serial_col) + + date_factory = Gtk.SignalListItemFactory() + date_factory.connect("setup", self._on_date_setup) + date_factory.connect("bind", self._on_date_bind) + date_col = Gtk.ColumnViewColumn(title="Transaction date", factory=date_factory) + date_col.set_fixed_width(100) + column_view.append_column(date_col) + + desc_factory = Gtk.SignalListItemFactory() + desc_factory.connect("setup", self._on_desc_setup) + desc_factory.connect("bind", self._on_desc_bind) + desc_col = Gtk.ColumnViewColumn(title="Description", factory=desc_factory) + desc_col.set_expand(True) + column_view.append_column(desc_col) + + auth_factory = Gtk.SignalListItemFactory() + auth_factory.connect("setup", self._on_auth_setup) + auth_factory.connect("bind", self._on_auth_bind) + auth_col = Gtk.ColumnViewColumn(title="Auth state", factory=auth_factory) + auth_col.set_fixed_width(120) + column_view.append_column(auth_col) + + source_factory = Gtk.SignalListItemFactory() + source_factory.connect("setup", self._on_source_setup) + source_factory.connect("bind", self._on_source_bind) + source_col = Gtk.ColumnViewColumn(title="Source", factory=source_factory) + source_col.set_expand(True) + column_view.append_column(source_col) + + dest_factory = Gtk.SignalListItemFactory() + dest_factory.connect("setup", self._on_dest_setup) + dest_factory.connect("bind", self._on_dest_bind) + dest_col = Gtk.ColumnViewColumn(title="Destination", factory=dest_factory) + dest_col.set_expand(True) + column_view.append_column(dest_col) + + action_factory = Gtk.SignalListItemFactory() + action_factory.connect("setup", self._on_action_setup) + action_factory.connect("bind", self._on_action_bind) + action_col = Gtk.ColumnViewColumn(title="Action", factory=action_factory) + action_col.set_fixed_width(80) + column_view.append_column(action_col) + + scrolled = Gtk.ScrolledWindow() + scrolled.set_vexpand(True) + scrolled.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) + scrolled.set_child(column_view) + + table_box.append(scrolled) + + return table_box + + def _on_serial_setup(self, factory, list_item): + label = Gtk.Label() + label.set_halign(Gtk.Align.START) + label.add_css_class("monospace") + list_item.set_child(label) + + def _on_serial_bind(self, factory, list_item): + entry = list_item.get_item() + label = list_item.get_child() + label.set_text(str(entry.serial)) + + def _on_date_setup(self, factory, list_item): + label = Gtk.Label() + label.set_halign(Gtk.Align.START) + label.add_css_class("monospace") + list_item.set_child(label) + + def _on_date_bind(self, factory, list_item): + entry = list_item.get_item() + label = list_item.get_child() + date_only = str(entry.tx_date)[:10] + label.set_label(date_only) + + def _on_desc_setup(self, factory, list_item): + label = Gtk.Label() + label.set_halign(Gtk.Align.START) + label.set_ellipsize(Pango.EllipsizeMode.END) + list_item.set_child(label) + + def _on_desc_bind(self, factory, list_item): + entry = list_item.get_item() + label = list_item.get_child() + label.set_text(entry.description) + + def _on_auth_setup(self, factory, list_item): + box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) + box.set_halign(Gtk.Align.CENTER) + + badge = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=4) + badge.set_margin_top(4) + badge.set_margin_bottom(4) + badge.set_margin_start(8) + badge.set_margin_end(8) + badge.add_css_class("badge") + + icon = Gtk.Label() + badge.append(icon) + + text = Gtk.Label() + text.add_css_class("caption") + badge.append(text) + + box.append(badge) + list_item.set_child(box) + + def _on_auth_bind(self, factory, list_item): + """Bind auth state data with styling""" + entry = list_item.get_item() + box = list_item.get_child() + badge = box.get_first_child() + + badge.remove_css_class("auth-trusted") + badge.remove_css_class("auth-not-trusted") + badge.remove_css_class("auth-unknown") + badge.remove_css_class("auth-invalid") + badge.remove_css_class("auth-unsigned") + + icon_label = badge.get_first_child() + text_label = icon_label.get_next_sibling() + + auth_state = entry.auth_state + if auth_state == "trusted": + badge.add_css_class("auth-trusted") + icon_label.set_text("✓") + text_label.set_text("Trusted") + elif auth_state == "not_trusted": + badge.add_css_class("auth-not-trusted") + icon_label.set_text("⚠") + text_label.set_text("Not Trusted") + elif auth_state == "unknown": + badge.add_css_class("auth-unknown") + icon_label.set_text("?") + text_label.set_text("Unknown Key") + elif auth_state == "invalid": + badge.add_css_class("auth-invalid") + icon_label.set_text("✗") + text_label.set_text("Invalid") + else: + badge.add_css_class("auth-unsigned") + icon_label.set_text("○") + text_label.set_text("No Key") + + def _on_source_setup(self, factory, list_item): + """Setup source cell (compressed format)""" + label = Gtk.Label() + label.set_halign(Gtk.Align.START) + label.add_css_class("monospace") + label.add_css_class("source-cell") + label.set_margin_start(8) + list_item.set_child(label) + + def _on_source_bind(self, factory, list_item): + """Bind source data in compressed format: [BTC] Expense.deposit/rent""" + entry = list_item.get_item() + label = list_item.get_child() + + # Format: [UNIT] Type.path + formatted = f"[{entry.source_unit}]\n{entry.source_type}.{entry.source_path}" + label.set_text(formatted) + + def _on_dest_setup(self, factory, list_item): + """Setup destination cell (compressed format)""" + label = Gtk.Label() + label.set_halign(Gtk.Align.START) + label.add_css_class("monospace") + label.add_css_class("dest-cell") + label.set_margin_start(8) + list_item.set_child(label) + + def _on_dest_bind(self, factory, list_item): + """Bind destination data in compressed format""" + entry = list_item.get_item() + label = list_item.get_child() + + # Format: [UNIT] Type.path + formatted = f"[{entry.dest_unit}]\n{entry.dest_type}.{entry.dest_path}" + label.set_text(formatted) + + def _on_action_setup(self, factory, list_item): + """Setup action cell""" + button = Gtk.Button(label="View") + button.add_css_class("link") + button.add_css_class("accent") + button.set_halign(Gtk.Align.CENTER) + button.handler_id = None + list_item.set_child(button) + + def _on_action_bind(self, factory, list_item): + """Bind action button""" + entry = list_item.get_item() + button = list_item.get_child() + + if hasattr(button, "handler_id") and button.handler_id is not None: + button.disconnect(button.handler_id) + + button.handler_id = button.connect("clicked", self._on_view_entry, entry) + + def _on_view_entry(self, button, entry): + """Handle view button click""" + logg.info(f"View entry clicked: {entry.serial}") + details_page = create_entry_details_page( + entry, + self.nav_view, + self.entry_controller, + self.toast_overlay, + self.entry_controller.get_asset_bytes, + ) + self.nav_view.push(details_page) + + def format_attachments(self, assets): + if not assets: + return "" + + return ", ".join( + a.slug or a.uuid or (a.digest.hex()[:8] if a.digest else "unknown") + for a in assets + ) + + def _make_entry_item(self, entry) -> EntryItem: + signers_raw = entry.signer_pubkeys + signers_display = ", ".join([f"{k[:8]}...{k[-6:]}" for k in signers_raw]) + return EntryItem( + serial=entry.serial, + parent_digest=entry.parent_digest, + tx_date=entry.tx_date, + tx_ref=entry.tx_reference, + tx_date_rg=entry.date_registered, + description=entry.description, + auth_state="trusted", + amount=entry.amount, + source_path=entry.source_path, + source_type=entry.source_type, + source_unit=entry.source_unit, + unit_index=entry.unit_index, + signers=signers_display, + signers_raw=signers_raw, + dest_path=entry.dest_path, + dest_unit=entry.dest_unit, + dest_type=entry.dest_type, + attachments=self.format_attachments(entry.attachments), + attachments_raw=entry.attachments, + ) + + def _reload_store(self, entries): + self.entry_store.remove_all() + for entry in entries: + self.entry_store.append(self._make_entry_item(entry)) + + def _load_entries(self): + self.entries = self.entry_controller.get_all_entries() + self._sort_and_reload("serial") + + def _sort_and_reload(self, sort_type): + if sort_type == "serial": + sorted_entries = sorted(self.entries, key=lambda e: e.serial) + elif sort_type == "datetime": + sorted_entries = sorted( + self.entries, key=lambda e: (e.tx_date, e.serial), reverse=True + ) + self._reload_store(sorted_entries) diff --git a/dummy/usawa/runnable/gui.py b/dummy/usawa/runnable/gui.py @@ -0,0 +1,11 @@ +import logging +import sys + +from usawa.gui import Usawa + +logging.basicConfig(level=logging.DEBUG) + + +def main(): + app = Usawa(application_id='org.usawa.app') + app.run(sys.argv) diff --git a/dummy/usawa/storage/__init__.py b/dummy/usawa/storage/__init__.py diff --git a/dummy/usawa/storage/entry_mapper.py b/dummy/usawa/storage/entry_mapper.py @@ -0,0 +1,142 @@ +import logging +from datetime import datetime, date + +from usawa.entry import Entry, EntryPart +from usawa.unit import UnitIndex +from ..core.models import LedgerEntry +from usawa import Entry, EntryPart + +logg = logging.getLogger("storage.entry_mapper") + + +class EntryMapper: + """Maps between domain model (LedgerEntry) and storage model (Entry)""" + + @staticmethod + def to_entry(domain: LedgerEntry, ledger, unitindex=UnitIndex("BTC")): + """ + Convert LedgerEntry (domain) to Entry (storage) + + :param domain: Domain model entry + :type domain: LedgerEntry + :param ledger: The ledger object (for parent digest) + :type ledger: usawa.Ledger + :param unitindex: UnitIndex for validation + :type unitindex: UnitIndex + :return: Storage model entry + :rtype: Entry + """ + + if domain.tx_date is None: + raise ValueError("Transaction date is required for storage") + + tx_date = domain.tx_date + if isinstance(tx_date, date) and not isinstance(tx_date, datetime): + tx_date = datetime.combine(tx_date, datetime.min.time()) + + parent = ledger.current() if ledger else None + ref = domain.transaction_ref if domain.transaction_ref else None + + entry = Entry( + serial=ledger.serial, + tx_date=tx_date, + parent=parent, + description=domain.description or "", + ref=ref, + unitindex=unitindex, + ) + + source_amount = unitindex.from_floatstring( + unitindex.default_unit, str(domain.amount) + ) + dest_amount = -source_amount + + source_part = EntryPart( + unitindex.default_unit, + domain.source_type.lower(), + domain.source_path, + source_amount, + debit=True, + ) + entry.add_part(source_part, debit=True) + + dest_part = EntryPart( + unitindex.default_unit, + domain.dest_type.lower(), + domain.dest_path, + dest_amount, + debit=False, + ) + entry.add_part(dest_part, debit=False) + + return entry + + @staticmethod + def to_domain_entry(storage_entry) -> LedgerEntry: + """ + Convert Entry (storage) to LedgerEntry (domain) + """ + + source_unit = "" + source_type = "" + source_path = "" + amount = 0.0 + + dest_unit = "" + dest_type = "" + dest_path = "" + if storage_entry.debit: + debit_part = storage_entry.debit[0] + source_unit = debit_part.unit + source_type = debit_part.typ + source_path = debit_part.account + amount = abs(float(debit_part.amount)) + is_debit = debit_part.isdebit + else: + source_unit = source_type = source_path = "" + amount = 0.0 + is_debit = None + + if storage_entry.credit: + credit_part = storage_entry.credit[0] + dest_unit = credit_part.unit + dest_type = credit_part.typ + dest_path = credit_part.account + is_credit = credit_part.isdebit + else: + dest_unit = dest_type = dest_path = "" + is_credit = None + + parent_digest = parent_digest = storage_entry.parent.hex() + + tx_date = storage_entry.dt + date_registered = storage_entry.dtreg + + transaction_ref = str(storage_entry.ref) if storage_entry.ref else None + external_ref = None + + signer_pubkeys = list(storage_entry.sigs.keys()) + + domain = LedgerEntry( + external_reference=external_ref, + description=storage_entry.description, + amount=amount, + source_unit=source_unit, + source_type=source_type, + source_path=source_path, + dest_unit=dest_unit, + dest_type=dest_type, + dest_path=dest_path, + attachments=( + storage_entry.attachment.copy() if storage_entry.attachment else [] + ), + serial=storage_entry.serial, + tx_date=tx_date, + tx_reference=storage_entry.ref, + date_registered=date_registered, + signer_pubkeys=signer_pubkeys, + parent_digest=parent_digest, + unit_index=storage_entry.uidx, + ) + + return domain diff --git a/dummy/usawa/storage/ledger_repository.py b/dummy/usawa/storage/ledger_repository.py @@ -0,0 +1,298 @@ +import hashlib +import logging +from typing import List +import hexathon +from usawa.storage.xml_utils import ( + _build_export_root, + _build_incoming_element, + _find_child, + _find_entry_by_serial, + _write_xml_to_file, + resolve_namespace, +) +from usawa.asset import Asset +from usawa.crypto import ACL, DemoWallet +from usawa.error import VerifyError +from usawa.ledger import Ledger +from usawa.resolve.fs import FSResolver +from usawa.service import UnixClient +from usawa.store import LedgerStore +from ..core.models import LedgerEntry +from .entry_mapper import EntryMapper +from whee.valkey import ValkeyStore +from usawa import Ledger, DemoWallet, load +from pathlib import Path +import mimetypes +import lxml.etree as ET +from pathlib import Path +import lxml.etree as ET +from copy import deepcopy + + +logg = logging.getLogger("storage.ledger_repository") + + +def sha256_verify(k, v=None): + if isinstance(k, str): + k = bytes.fromhex(k) + + if len(k) != 32: + raise ValueError("expect 256 bit key") + + khx = hexathon.uniform(k.hex()) + + if v is not None: + h = hashlib.sha256() + h.update(v) + if k != h.digest(): + raise VerifyError(khx) + + return khx + + +class LedgerRepository: + """Repository that wraps LedgerStore and handles mapping""" + + def __init__( + self, + ledger_path=None, + unix_client: UnixClient = None, + valkey_store: ValkeyStore = None, + cfg=None, + ): + """ + Initialize the LedgerRepository. + + :param ledger_path: Path to the ledger definition file to import. + :type ledger_path: str | None + + :param unix_client: Unix socket client used to communicate with the storage service. + :type unix_client: usawa.UnixClient + + """ + self.valkey_store = valkey_store + self.unix_client = unix_client + self._wallet = None + self._store = None + self.ledger_path = ledger_path + self.cfg = cfg + self.resolver = FSResolver( + self.cfg.get("FS_RESOLVER_STORE_PATH"), verifier=sha256_verify + ) + + def _init_store(self, write=False) -> tuple[LedgerStore, Ledger, DemoWallet]: + ledger_tree = load(self.ledger_path) + ledger = Ledger.from_tree(ledger_tree) + + if write: + logg.info("init store for write") + self.store = LedgerStore(self.valkey_store, ledger) + + if self._wallet is None: + pk = self.store.get_key() + if pk is None: + raise ValueError("No private key found in store") + self._wallet = DemoWallet(privatekey=pk) + else: + logg.info("init store for read") + self.store = LedgerStore(self.valkey_store, ledger) + + if self._wallet is None: + try: + pk = self.store.get_key() + self._wallet = DemoWallet(privatekey=pk) + logg.info( + f"Loaded wallet, pubkey: {self._wallet.pubkey().hex()[:16]}..." + ) + + except FileNotFoundError: + logg.warning( + "No private key found in store, initializing a a default one" + ) + privkey = bytes.fromhex(self.cfg.get("SIGS_DEFAULT_PRIVATE_KEY")) + self._wallet = DemoWallet(privatekey=privkey) + + # Add key to store + try: + self.store.add_key(wallet=self._wallet) + logg.info("Stored new private key successfully") + except Exception as e: + logg.warning(f"Could not store new key: {e}") + + except Exception as e: + raise ValueError( + f"Could not retrieve or create private key: {e}" + ) from e + + logg.debug( + "wallet pk: %s pubk: %s", + self._wallet.privkey().hex(), + self._wallet.pubkey().hex(), + ) + ledger.set_wallet(self._wallet) + ledger.acl = ACL.from_wallet(self._wallet) + self.store.load(acl=ledger.acl) + return self.store, ledger, self._wallet + + def save(self, domain_entry: LedgerEntry) -> None: + """ + Save a domain entry to storage + + :param domain_entry: Entry to save + :type domain_entry: LedgerEntry + :raises ValueError: If validation fails + :raises FileExistsError: If attachment already exists in the store + :raises IOError: If file operations fail + :raises Exception: For other storage errors + """ + try: + store, ledger, wallet = self._init_store(write=True) + + entry = EntryMapper.to_entry(domain_entry, ledger=ledger) + entry.sign(wallet) + + logg.debug( + "Mapped entry - Serial: %s, Parent: %s, Attachments: %s", + entry.serial, + entry.parent.hex(), + entry.attachment, + ) + + for attachment in domain_entry.attachments: + try: + info = self.get_file_info(attachment) + asset = Asset.from_file( + attachment, + slug=info["slug"], + description=info["description"], + mimetype=info["mimetype"], + ) + store.add_asset(asset) + entry.attach(asset) + + with open(attachment, "rb") as f: + data = f.read() + self.resolver.put(asset.get_digest(binary=True), data) + + except FileNotFoundError as e: + raise IOError(f"Attachment file not found: {attachment}") from e + except PermissionError as e: + raise IOError(f"Cannot read attachment file: {attachment}") from e + + store.add_entry(entry, update_ledger=True) + + ledger.truncate() + ledger.sign() + logg.info(f"Successfully saved entry #{entry.serial}") + + except FileExistsError as e: + logg.debug(f"Entry fileinfo already exists: {e}") + raise + except ValueError as e: + logg.debug(f"Validation error: {e}") + raise + except IOError as e: + logg.debug(f"File operation failed: {e}") + raise + except Exception as e: + logg.debug(f"Failed to save entry: {e}", exc_info=True) + raise + + def get_all_entries(self) -> List[LedgerEntry]: + """Get all entries""" + try: + store, _, _ = self._init_store() + + return [ + EntryMapper.to_domain_entry(storage_entry) + for _, storage_entry in store.ledger.entries.items() + ] + except Exception as e: + logg.error(f"Failed to retrieve entries: {e}") + return [] + + def get_asset_bytes(self, digest: str): + logg.debug(f"Getting asset for digest: {digest}") + try: + return self.resolver.get(digest) + except Exception as e: + logg.exception("Failed to get asset for digest %s: %s", digest, e) + return None + + def get_file_info(self, file_path: str) -> dict: + path = Path(file_path) + + slug = path.stem + + mimetype, _ = mimetypes.guess_type(file_path) + + if mimetype: + kind = mimetype.split("/")[0] + description = f"{kind.capitalize()} file: {path.name}" + else: + description = f"File: {path.name}" + + return { + "slug": slug, + "description": description, + "mimetype": mimetype, + } + + def export_all_entries_to_xml(self, output_path: str) -> tuple[bool, str]: + try: + ledger = self.store.ledger + + tree = ledger.to_tree() + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + xml_string = ET.tostring( + tree, encoding="utf-8", xml_declaration=True, pretty_print=True + ) + + with open(output_file, "wb") as f: + f.write(xml_string) + + logg.info(f"Exported ledger to {output_path}") + return True, "" + + except PermissionError as e: + error_msg = "Permission denied. Cannot write to the specified location." + logg.debug(f"Permission error: {e}") + return False, error_msg + + except IOError as e: + error_msg = f"Failed to write file: {str(e)}" + logg.debug(f"I/O error: {e}") + return False, error_msg + + except Exception as e: + error_msg = f"Failed to export ledger: {str(e)}" + logg.debug("Unexpected error during export") + return False, error_msg + + def export_entry_to_xml(self, serial: int, output_path: str) -> tuple[bool, str]: + try: + storage_entry = self.store.ledger.entries.get(serial) + if not storage_entry: + return False, f"Entry #{serial} not found" + + xml_tree = self.store.ledger.to_tree() + ns_uri = resolve_namespace(xml_tree) + target_entry = _find_entry_by_serial(xml_tree, ns_uri, serial) + if target_entry is None: + return False, f"Entry #{serial} not found in XML" + + incoming = _build_incoming_element(ns_uri, target_entry, xml_tree) + root = _build_export_root(xml_tree, ns_uri, target_entry, incoming) + _write_xml_to_file(root, output_path) + + logg.info(f"Successfully exported entry #{serial} -> {output_path}") + return True, "" + + except PermissionError: + return False, "Permission denied" + except IOError as e: + return False, str(e) + except Exception as e: + return False, str(e) diff --git a/dummy/usawa/storage/xml_utils.py b/dummy/usawa/storage/xml_utils.py @@ -0,0 +1,204 @@ +import logging +from copy import deepcopy +from pathlib import Path + +from lxml import etree as ET + +logg = logging.getLogger(__name__) + +FALLBACK_NS = "http://usawa.defalsify.org/" + + +def _get_local_name(element): + """Extract local name from element tag (without namespace)""" + tag = element.tag + return tag.split("}")[-1] if "}" in tag else tag + + +def _find_child(parent, local_name): + """Find child element by local name""" + for child in parent: + if _get_local_name(child) == local_name: + return child + return None + + +def _get_local_name(element): + """Extract local name from element tag (without namespace)""" + tag = element.tag + return tag.split("}")[-1] if "}" in tag else tag + + +def _find_child(parent, local_name): + """Find child element by local name, ignoring namespace.""" + for child in parent: + if _get_local_name(child) == local_name: + return child + return None + + +def resolve_namespace(xml_tree) -> str: + """Resolve the namespace URI from an XML tree. + + Tries nsmap first, then falls back to parsing the root tag, + then falls back to the well-known usawa namespace. + + :param xml_tree: Root XML element. + :type xml_tree: lxml.etree.Element + :return: Namespace URI string. + :rtype: str + """ + ns_uri = xml_tree.nsmap.get(None) + if ns_uri is not None: + return ns_uri + if "}" in xml_tree.tag: + return xml_tree.tag.split("}")[0].strip("{") + logg.warning("Could not resolve namespace, using fallback: %s", FALLBACK_NS) + return FALLBACK_NS + + +def _find_entry_by_serial(xml_tree, ns_uri: str, serial: int): + """Find an entry element by its serial number. + + :param xml_tree: Root XML element to search within. + :type xml_tree: lxml.etree.Element + :param ns_uri: Namespace URI. + :type ns_uri: str + :param serial: Entry serial number to find. + :type serial: int + :return: Matching entry element, or None if not found. + :rtype: lxml.etree.Element or None + """ + all_entries = xml_tree.findall(".//{%s}entry" % ns_uri) + logg.debug("Searching %d entries for serial %d", len(all_entries), serial) + + for entry_elem in all_entries: + data_elem = _find_child(entry_elem, "data") + if data_elem is None: + logg.warning("Entry without <data> element, skipping") + continue + + serial_elem = _find_child(data_elem, "serial") + if serial_elem is None: + logg.warning("Entry without <serial> element, skipping") + continue + + if int(serial_elem.text) == serial: + logg.debug("Found target entry serial %d", serial) + return entry_elem + + return None + + +def _build_incoming_element(ns_uri: str, target_entry, orig_incoming=None): + """Build the <incoming> XML element from entry debit/credit values. + + :param ns_uri: Namespace URI. + :type ns_uri: str + :param target_entry: Entry element to read debit/credit amounts from. + :type target_entry: lxml.etree.Element + :param orig_incoming: Existing <incoming> element to copy digest/sig from. + :type orig_incoming: lxml.etree.Element or None + :return: Constructed <incoming> element. + :rtype: lxml.etree.Element + """ + data_elem = _find_child(target_entry, "data") + + debit_val = 0 + credit_val = 0 + + if data_elem is not None: + debit_elem = _find_child(data_elem, "debit") + credit_elem = _find_child(data_elem, "credit") + + if debit_elem is not None: + amount_elem = _find_child(debit_elem, "amount") + if amount_elem is not None: + debit_val = int(amount_elem.text) + + if credit_elem is not None: + amount_elem = _find_child(credit_elem, "amount") + if amount_elem is not None: + credit_val = int(amount_elem.text) + + expense = -abs(debit_val) + asset = credit_val + + incoming = ET.Element("{%s}incoming" % ns_uri) + incoming.set("serial", "0") + + real = ET.SubElement(incoming, "{%s}real" % ns_uri) + real.set("unit", "BTC") + + ET.SubElement(real, "{%s}income" % ns_uri).text = "0" + ET.SubElement(real, "{%s}expense" % ns_uri).text = str(expense) + ET.SubElement(real, "{%s}asset" % ns_uri).text = str(asset) + ET.SubElement(real, "{%s}liability" % ns_uri).text = "0" + + if orig_incoming is not None: + for tag in ["digest", "sig"]: + elem = _find_child(orig_incoming, tag) + if elem is not None: + incoming.append(deepcopy(elem)) + + return incoming + + +def _build_export_root(xml_tree, ns_uri: str, target_entry, incoming): + """Assemble the export root element with header metadata, incoming, and target entry. + + :param xml_tree: Source ledger XML tree to copy header elements from. + :type xml_tree: lxml.etree.Element + :param ns_uri: Namespace URI. + :type ns_uri: str + :param target_entry: The entry element to include in the export. + :type target_entry: lxml.etree.Element + :param incoming: The <incoming> element to include. + :type incoming: lxml.etree.Element + :return: Assembled root element. + :rtype: lxml.etree.Element + """ + root = ET.Element("{%s}ledger" % ns_uri, nsmap={None: ns_uri}) + root.set("version", xml_tree.get("version", "")) + + for tag in ["topic", "generated", "src", "units", "identity"]: + elem = _find_child(xml_tree, tag) + if elem is not None: + logg.debug("Copying header element: %s", tag) + root.append(deepcopy(elem)) + + root.append(incoming) + root.append(deepcopy(target_entry)) + + final_entries = root.findall(".//{%s}entry" % ns_uri) + logg.debug("Export root contains %d entry(ies)", len(final_entries)) + + return root + + +def _write_xml_to_file(root, output_path: str) -> None: + """Serialize an XML element tree and write it to a file. + + Creates parent directories if they do not exist. + + :param root: Root XML element to serialize. + :type root: lxml.etree.Element + :param output_path: Destination file path. + :type output_path: str + :raises PermissionError: If the file cannot be written. + :raises IOError: If a file system error occurs. + """ + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + xml_string = ET.tostring( + root, + encoding="utf-8", + xml_declaration=True, + pretty_print=True, + ) + + with open(output_file, "wb") as f: + f.write(xml_string) + + logg.debug("Wrote XML to %s (%d bytes)", output_path, len(xml_string))