usawa

Signed, immutable accounting.
Info | Log | Files | Refs | Submodules | LICENSE

resolver.py (3531B)


      1 import unittest
      2 import logging
      3 import tempfile
      4 import hashlib
      5 import shutil
      6 import os
      7 import datetime
      8 
      9 import lxml.etree
     10 
     11 from usawa import Ledger, UnitIndex, Entry, EntryPart, DemoWallet, ACL
     12 from usawa.resolve.fs import FSResolver
     13 from usawa.error import VerifyError
     14 
     15 
     16 logging.basicConfig(level=logging.DEBUG)
     17 logg = logging.getLogger()
     18 
     19 testdir = os.path.realpath(os.path.dirname(__file__))
     20 
     21 hash_of_foo = 'f7fbba6e0636f890e56fbbf3283e524c6fa3204ae298382d624741d0dc6638326e282c41be5e4254d8820772c5518a2c5a8c0c7f7eda19594a7eb539453e1ed7'
     22 hash_of_bar = 'd82c4eb5261cb9c8aa9855edd67d1bd10482f41529858d925094d173fa662aa91ff39bc5b188615273484021dfb16fd8284cf684ccf0fc795be3aa2fc1e6c181'
     23 
     24 class TestResolver(unittest.TestCase):
     25     
     26     def setUp(self):
     27         self.parent = b'\x00' * 64
     28         self.path = tempfile.mkdtemp()
     29         self.backend = FSResolver(self.path) 
     30         self.dtreg = datetime.datetime.now()
     31 
     32 
     33     def tearDown(self):
     34         shutil.rmtree(self.path)
     35 
     36 
     37     def test_resolve_putget(self):
     38         h = hashlib.sha512()
     39         v = os.urandom(1337)
     40         h.update(v)
     41         k = h.digest()
     42         self.backend.put(k, v)
     43         r = self.backend.get(k)
     44         self.assertEqual(r, v)
     45 
     46         k_wrong = os.urandom(32)
     47         with self.assertRaises(ValueError):
     48             r = self.backend.get(k_wrong)
     49 
     50     
     51     def test_resolve_get_evil(self):
     52         fp = os.path.join(self.path, hash_of_foo)
     53         f = open(fp, 'wb')
     54         f.write(b'bar')
     55         f.close()
     56         with self.assertRaises(VerifyError):
     57             self.backend.get(hash_of_foo)
     58 
     59 
     60     def test_resolve_lookup(self):
     61         uidx = UnitIndex('FOO')
     62         wallet = DemoWallet()
     63         ledger = Ledger(uidx, wallet=wallet, topic=bytes.fromhex(hash_of_foo))
     64         dst = EntryPart('FOO', 'asset', 'foo', 1337)
     65         src = EntryPart('FOO', 'income', 'foo', 1337, debit=True)
     66         entry = Entry(1, datetime.datetime.strptime('2025-11-11', '%Y-%m-%d'), parent=self.parent, tx_datereg=self.dtreg)
     67         entry.add_part(dst)
     68         entry.add_part(src, debit=True)
     69         entry.sign(wallet)
     70         first_entry_key = self.backend.put_entry(entry, 'sha512')
     71         ledger.add_entry(entry)
     72 
     73         dst = EntryPart('FOO', 'expense', 'bar̈́', 42, debit=True)
     74         src = EntryPart('FOO', 'liability', 'bar', 42)
     75         entry = Entry(ledger.peek(), datetime.datetime.now(), parent=ledger.current())
     76         entry.add_part(dst)
     77         entry.add_part(src, debit=True)
     78         entry.sign(wallet)
     79         ledger.add_entry(entry)
     80         last_entry_key = self.backend.put_entry(entry, 'sha512')
     81         ledger.sign()
     82 
     83         ledger.truncate(lookup='sha512')
     84         logg.debug('after trunc {}'.format(ledger.lookup))
     85 
     86         acl = ACL.from_wallet(wallet)
     87         s = ledger.to_string(lookup='sha512')
     88         logg.debug('ledgerstring {}'.format(s))
     89         ledger = ledger.from_string(s, acl=acl)
     90 
     91         self.backend.restore_ledger(ledger)
     92 
     93 #        tree = ledger.to_tree(lookup='sha512')
     94 #        s = lxml.etree.tostring(tree)
     95 #        ledger = Ledger(uidx, wallet=wallet, topic=bytes.fromhex(hash_of_foo))
     96 #
     97 #        k = self.backend.get(first_entry_key)
     98 #        first_entry = Entry.from_string(k, uidx)
     99 #        ledger.add_entry(first_entry)
    100 #
    101 #        k = self.backend.get(last_entry_key)
    102 #        last_entry = Entry.from_string(k, uidx)
    103 #        ledger.add_entry(last_entry)
    104 #        
    105 #        tree = ledger.to_tree(lookup='sha512')
    106 #        s_orig = lxml.etree.tostring(tree)
    107 
    108 
    109 if __name__ == '__main__':
    110     unittest.main()