blog:2022:0702_collecting_transactions_data

NVP Framework: more cleanup and collecting transactions high level data

In this session we continue with more cleanup and refactoring points on the NVP framework and also start collecting some “high level” data from the most space consuming transaction types on blockchain.

  • As part of my “financial management” I also now need to unwrap some WETH into ETH on the Ethereum blockchain, naturally I could go on uniswap to perform that with a simple transaction with metamask, but I thought it would be much more fun to do directly from a python command 😎, so here I am.
  • ⇒ I simply added the following command in the BlockchainManager:
            if cmd == "balance":
                chain_name = self.get_param("chain")
                chain: EVMBlockchain = self.get_component(f"{chain_name}_chain")
    
                account = self.get_param("account")
                if account is not None:
                    chain.set_account(account)
    
                token_name = self.get_param("token")
                token_name = token_name or chain.get_native_symbol()
                bal = chain.get_balance(token_name)
                logger.info("Current balance: %f %s", bal, token_name)
                return True
  • Which works as expected on BSC for both native and wrapped native tokens:
    $ nvp bchain balance -c bsc
    2022/06/27 06:58:27 [__main__] INFO: Current balance: 7.667770 BNB
    
    kenshin@Saturn ~
    $ nvp bchain balance -c bsc -t WBNB
    2022/06/27 06:58:37 [__main__] INFO: Current balance: 0.000000 WBNB
  • But unfortunately this doesn't work yet on Ethereum for the WETH token since I haven't built any token database yet on Ethereum, arrff.
  • Let's see if I can quickly start retrieving some tokens/pairs from uniswapv2 (and this will also give me an idea of how many pairs we have there)
  • My my my, for once, that part is almost too easy: running this in jupyter:
    from nvp.nvp_context import NVPContext
    import numpy as np
    from hexbytes import HexBytes
    import matplotlib.pyplot as plt
    
    if NVPContext.instance is None:
        NVPContext()
        
    ctx = NVPContext.get()
    chain = ctx.get_component('eth_chain')
    
    chain.handle("find_uniswap_routers")
  • And I get 58 routers addresses on the blockchain in a few seconds from the last 50k transactions 👍! :
    2022/06/27 07:05:55 [find_uniswap_routers] INFO: Searching for routers in 50835 transactions...
    2022/06/27 07:05:55 [find_uniswap_routers] INFO: Found 58 routers:
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D: 33948
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F: 10660
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x03f7724180AA6b939894B5Ca4314783B0b36b329: 4585
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x25553828F22bDD19a20e4F12F052903Cb474a335: 532
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x1C6cA5DEe97C8C368Ca559892CCce2454c8C35C7: 214
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0xCeB90E4C17d626BE0fACd78b79c9c87d7ca181b3: 191
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x1fdD76e18dD21046b7e7D54C8254Bf08B239e4D9: 104
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0xb5e9F6C58f548CeE53257304e287b23757eFFCA1: 69
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x0ACd03d3fC4A41d02E75ad6527AC4F7c90c4abb1: 68
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x4F71E29C3D5934A15308005B19Ca263061E99616: 56
    2022/06/27 07:05:55 [find_uniswap_routers] INFO:   - 0x0A3e1c20B5384eB97D2CCfF9a96bc91f0c77e7dB: 55
  • The first router is the Uniswap V2 router, so adding a config entry for that one:
        "UniSwap2": {
          "chain": "eth",
          "id": 13,
          "name": "UniSwap2",
          "router": "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D",
          "factory": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f",
          "flash_loan_supported": true,
          "swap_fee_points": 30,
          "default_slippage": 0.1,
          "swap_max_gas": 550000
        }
  • And now collecting the available pairs:
    $ nvp bchain update-pairs uniswap2
Interestingly we only have about 77.9k pairs on uniswap V2 compared to the +1M pairs on PancakeSwap V2
  • But naturally, this is failing in an unexpected way now:
    2022/06/27 07:47:54 [nvh.crypto.blockchain.uniswap_base] INFO: 24/77972: Added pair on CEL/WETH
    2022/06/27 07:47:54 [nvh.crypto.blockchain.erc20_pair] INFO: Registering pair at 0xC2aDdA861F89bBB333c90c492cB837741916A225
    2022/06/27 07:47:55 [nvh.crypto.blockchain.evm_smart_contract] ERROR: Exception occured while calling 'name' (on 0x9f8F72aA9304c8B593d555F12eF6589cC3A579A2): Python int too large to convert to C ssize_t
    2022/06/27 07:47:57 [nvh.crypto.blockchain.evm_smart_contract] ERROR: Exception occured while calling 'name' (on 0x9f8F72aA9304c8B593d555F12eF6589cC3A579A2): Python int too large to convert to C ssize_t
    2022/06/27 07:47:59 [nvh.crypto.blockchain.evm_smart_contract] ERROR: Exception occured while calling 'name' (on 0x9f8F72aA9304c8B593d555F12eF6589cC3A579A2): Python int too large to convert to C ssize_t
    2022/06/27 07:48:01 [nvh.crypto.blockchain.evm_smart_contract] ERROR: Exception occured while calling 'name' (on 0x9f8F72aA9304c8B593d555F12eF6589cC3A579A2): Python int too large to convert to C ssize_t
    2022/06/27 07:48:03 [nvh.crypto.blockchain.evm_smart_contract] ERROR: Exception occured while calling 'name' (on 0x9f8F72aA9304c8B593d555F12eF6589cC3A579A2): Python int too large to convert to C ssize_t
    2022/06/27 07:48:05 [nvh.crypto.blockchain.evm_smart_contract] ERROR: Exception occured while calling 'name' (on 0x9f8F72aA9304c8B593d555F12eF6589cC3A579A2): Python int too large to convert to C ssize_t
  • Really wondering what this could be… 🤔 I'm supposed to retrieve a name, not an int value… Checking the contract.
  • Hmmm weird… the “name” is stored as a bytes32 value, not sure this is the standard for ERC20 tokens 🤔: checking my default ABI…
  • nope, in my default ABI file I rather expect name to be a string so that would explain why I cannot read that value correctly 😰.
  • Note: I could try calling the function “manually” by sending a transaction such as the following and then parse the result manually, but for now this is not worth it:
    web3.eth.sendTransaction({'to': contractAddress, 'from': yourAddress, 'data': "0x61455567"})
  • Anyway, will get back to the uniswap pairs later, for now just added a few of them to get the WETH token registered and now I can retrieve the balance I have:
    $ nvp bchain balance -c eth -t WETH
    2022/06/27 08:15:58 [__main__] INFO: Current balance: 2.842616 WETH
  • Next I need support to unwrap those native tokens, so here is the function to do that:
            if cmd == "unwrap":
                chain_name = self.get_param("chain")
                chain: EVMBlockchain = self.get_component(f"{chain_name}_chain")
    
                account = self.get_param("account")
                if account is not None:
                    chain.set_account(account)
    
                # We can only unwrap the native token:
                token = chain.get_wrapped_native_token()
    
                value = self.get_param("value")
    
                if value is None:
                    amount = token.get_balance(False)
                else:
                    amount = token.to_amount(value)
    
                if amount == 0:
                    logger.info("Nothing to unwrap.")
                    return True
    
                # Get the initial balance:
                bal0 = chain.get_native_balance()
                logger.info("Initial native balance: %f %s", bal0, chain.get_native_symbol())
                logger.info("Unwrapping %d units of %s", amount, token.symbol())
                token.withdraw(amount)
                bal1 = chain.get_native_balance()
                logger.info("Final native balance: %f %s", bal1, chain.get_native_symbol())
                return True
  • ⇒ And this works as expected 👍!
  • Refactoring more of the remaining static components from NervHome:
    • Navision: OK
    • BackupManager: OK
    • PasswordGenerator: OK
      • ⇒ To generate a password I will now use a command such as:
        $ nvp pwd gen-password -l en
  • While refactoring those dynamic components above I realized I didn't really like the repetition required when defining the arguments for a given parser, as this could take a lot of space. So I'm now trying to build a more convinient mechanism to build these.
  • ⇒ So I defined a new ParserContext class:
    class ParserContext(object):
        """Simple class used to setup an argparse parser conviniently"""
    
        def __init__(self, parser, pname):
            """Start with a parser to add arguments on it"""
            self.parser = parser
            self.pname = pname
            self.cur_state = None
    
        def __del__(self):
            """Destructor for this parser context"""
            if self.cur_state is not None:
                raise NVPCheckError(f"Parser context for {self.pname} was not closed properly.")
    
        def end(self):
            """Finish the current argument"""
            if self.cur_state is not None:
                args = self.cur_state['args']
                del self.cur_state['args']
                self.parser.add_argument(*args, **self.cur_state)
                self.cur_state = None
    
        def add_str(self, *args, **kwargs):
            """Add a string argument"""
            # finish previous arg if any:
            self.end()
    
            self.cur_state = {"args": args, "type": str}
            self.cur_state.update(kwargs)
            return self
    
        def add_int(self, *args, **kwargs):
            """Add an int argument"""
            # finish previous arg if any:
            self.end()
    
            self.cur_state = {"args": args, "type": int}
            self.cur_state.update(kwargs)
            return self
    
        def add_float(self, *args, **kwargs):
            """Add a float argument"""
            # finish previous arg if any:
            self.end()
    
            self.cur_state = {"args": args, "type": float}
            self.cur_state.update(kwargs)
            return self
    
        def add_flag(self, *args, **kwargs):
            """Add a flag argument"""
            # finish previous arg if any:
            self.end()
    
            self.cur_state = {"args": args, "action": "store_true"}
            self.cur_state.update(kwargs)
            return self
    
        def help(self, help_msg):
            """Add an help message."""
            self.cur_state["help"] = help_msg
    
        def __call__(self, **kwargs):
            """Add elements and finish the arg."""
            self.cur_state.update(kwargs)
            self.end()
  • And now using that to define the parsers one by one:
        comp = context.register_component("password_gen", PasswordGenerator(context))
    
        psr = context.build_parser("collect-words")
        psr.add_str("-l", "--language", dest="language", default="fr")("language to collect the words")
    
        psr = context.build_parser("download-books")
        psr.add_str("-l", "--language", dest="language", default="fr")("language to collect the words")
        psr.add_int("-n", "--num", dest="num_books", default=500)("Number of books to collect to get the words")
    
        psr = context.build_parser("gen-password")
        psr.add_str("-l", "--language", dest="language", default="fr")("Input language to use to collect the words")
        psr.add_int("-n", "--num", dest="num_words", default=5)("Number of words to collect")
        psr.add_int("--min", dest="min_len", default=3)("Minimum number of characters in the words")
        psr.add_int("--max", dest="max_len", default=8)("Maximum number of characters in the words")
        psr.add_int("--min-count", dest="min_count", default=10)("Minimum count for the words to take into account.")
        psr.add_flag("-p", "--no-space", dest="no_space")("Remove the space between the words")
  • It's definitely not perfect, but it's a bit shorter anyway, so I'll keep it and see how this goes.
  • Now I want to perform an additional quick task during this session: I should start parsing the data from the transactions when I collect them to fill dedicated tables in my database, and try to reduce the size of the raw transaction data (which I could mostly discard once the higher level values are extracted)
  • First checking again the transactions taking the most space:
    sql = "SELECT pg_size_pretty( pg_database_size('transactions_db') );"
    tdb.execute(sql).fetchone()
  • ⇒ Arrff.. I already have about 11GB of data in there: it's really hightime I try to reduce that.
  • Checking the number of transactions available in the bsc_tx_data:
    tdb.get_num_transactions()
    • ⇒ Current a bit more than 30 millions transactions.
  • Computing the signatures stats on the first 2 millions transactions:
    # Select other transactions:
    sql = "SELECT sig,registers FROM bsc_txdata LIMIT 2000000;"
    rows = tdb.execute(sql).fetchall()
    nrows = len(rows)
    print(f"Got {nrows} rows")
  • Then to compute the stats:
    import nvp.core.utils as utl
    
    bcount = 0
    
    size_stats={}
    
    for row in rows:
        sig = row[0]
        if sig is None:
            sig = 0
    
        if sig not in size_stats:
            size_stats[sig] = {
                'sig': hex(utl.to_uint32(sig)),
                'min_size':100000,
                'max_size':0,
                'mean_size': 0,
                'count': 0
            }
        
        desc = size_stats[sig]
        size = 0 if row[1] is None else len(row[1])
        desc['min_size'] = min(desc['min_size'], size)
        desc['max_size'] = max(desc['max_size'], size)
        desc['mean_size'] += size
        desc['count'] += 1
        
        bcount += size
    
    stats = list(size_stats.values())
    for desc in stats:
        desc["share"] = desc["mean_size"]
        desc["mean_size"] /= desc["count"]
    
    stats.sort(key=lambda item: item['share'], reverse=True)
    
    print(f"Total number of bytes: {bcount}")
    
    stats[:20]
  • And we find that the biggest share is for the signature '0x38ed1739' which is for the swapExactTokensForTokens() function, so let's handle that one first.
  • Prototype of that functions is as follow: swapExactTokensForTokens(uint256 amountIn, uint256 amountOutMin, address[] path, address to, uint256 deadline)
  • In that prototype, we care about the amount_in,amount_out, and then the first address on the path and the last address on the path, and then the to address, but that's all actually.
  • Also for the amount_in/amount_out values we can certainly store these with only a limited precision:
    • with an int32 storage we can store values up to +2147483647 which give us at lest 9 digits of precision already. Then with a smallint we get 2 bytes, and we can use those 2 bytes separetely to store the power of 10 that we should use to reconstruct the amount value…. or basically, I'm rebuilding the concept of a float value here, so I should probably just store the amounts as two floats 😅.
  • So the final table setup will be:
    '''CREATE TABLE IF NOT EXISTS %s (
        tx_id INTEGER PRIMARY KEY,
        amount_in REAL,
        amount_out REAL,
        token_in INTEGER,
        token_out INTEGER,
        to_id INTEGER
    );'''
  • And now I'm also parsing the data from the transactions just before storing the raw data into my giant table with this method:
        def parse_tx_data(self, descs):
            """Collect the available tx data"""
    
            # Check which of the transactions should be processed further here:
            swap_tokens_descs = []
    
            for desc in descs:
                sig = desc["sig"] or ""
                sig = "0x" + sig
                if sig in self.swap_tokens_sigs:
                    # collect the data for the swap tokens operation:
                    op_desc = self.sig_map.parse_swap_tokens(desc["registers"], sig, desc["tx_id"])
                    op_desc["tx_id"] = desc["tx_id"]
    
                    # Remove the registers:
                    desc["registers"] = None
    
                    swap_tokens_descs.append(op_desc)
    
            return {
                "swap_tokens": swap_tokens_descs,
            }
  • To activate that new system I should probably just discard all the existing data for the blocks/transactions, so while I'm at it, I'm also going to reduce the size of the other non-critical tx data fields:
    • ⇒ I removed operation_type from the base transaction table in chainDB, and updated the raw tx table in transactions_db to be:
      SQL_CREATE_TXDATA_TABLE = '''CREATE TABLE IF NOT EXISTS %s (
          tx_id INTEGER PRIMARY KEY,
          gas REAL,
          gas_price REAL,
          nonce INTEGER,
          value REAL,
          sig INTEGER,
          registers BYTEA
      );'''
  • And the end result still seems OK:
    2022/06/29 13:10:18 [collect_evm_blocks] INFO: 2.27%: Block 19111479, 289 txs
    2022/06/29 13:10:19 [nvh.crypto.blockchain.transactions_db] INFO: Inserting 28 row of swap tokens op
    2022/06/29 13:10:19 [collect_evm_blocks] INFO: 4.55%: Block 19111480, 126 txs
    2022/06/29 13:10:20 [nvh.crypto.blockchain.transactions_db] INFO: Inserting 17 row of swap tokens op
    2022/06/29 13:10:20 [collect_evm_blocks] INFO: 6.82%: Block 19111481, 297 txs
    2022/06/29 13:10:21 [nvh.crypto.blockchain.transactions_db] INFO: Inserting 20 row of swap tokens op
    2022/06/29 13:10:22 [collect_evm_blocks] INFO: 9.09%: Block 19111482, 121 txs
    2022/06/29 13:10:22 [nvh.crypto.blockchain.transactions_db] INFO: Inserting 15 row of swap tokens op
    2022/06/29 13:10:23 [collect_evm_blocks] INFO: 11.36%: Block 19111483, 142 txs
    2022/06/29 13:10:23 [nvh.crypto.blockchain.transactions_db] INFO: Inserting 18 row of swap tokens op
  • The next most significant type of transactions is the one with the 0xa9059cbb signature, which correspond to the function transfer(address recipient, uint256 amount)
  • Again, we only need to store the to_id as an integer and the amount as a float value for this type of operation.
  • Here is the simple table definition:
    SQL_CREATE_TRANSFER_OP_TABLE = CREATE TABLE IF NOT EXISTS %s (
        tx_id INTEGER PRIMARY KEY,
        to_id INTEGER,
        amount REAL
    ); 
  • And here is the method used to parse the content of those transactions so far:
        def parse_tx_data(self, descs):
            """Collect the available tx data"""
    
            # Check which of the transactions should be processed further here:
            swap_tokens_descs = []
            transfer_descs = []
    
            for desc in descs:
                sig = desc["sig"] or ""
                sig = "0x" + sig
                tid = desc["id"]
                txh = desc["hash"]
                # logger.info("tid: %d", tid)
                if sig in self.swap_tokens_sigs:
                    # collect the data for the swap tokens operation:
                    op_desc = self.sig_map.parse_swap_tokens_op(desc["registers"], sig, txh)
                    op_desc["tx_id"] = tid
    
                    # Remove the registers:
                    desc["registers"] = None
    
                    swap_tokens_descs.append(op_desc)
    
                elif sig in self.transfer_sigs:
                    # collect the data for the trasnfer operation:
                    op_desc = self.sig_map.parse_transfer_op(desc["registers"], sig, txh)
                    op_desc["tx_id"] = tid
    
                    # Remove the registers:
                    desc["registers"] = None
    
                    transfer_descs.append(op_desc)
    
            return {"swap_tokens": swap_tokens_descs, "transfer": transfer_descs}
  • ⇒ In the process I also removed the “transactions” table completely from the base chain database: anyway, to construct advanced stats on the transactions I would need to relate to the transaction id, on the different tables so it's really better if all those tables are in the same database I think.
  • And while I'm at it, let's do a third one already “0x7ff36ab5” which is for swapExactETHForTokens(uint256 amountOutMin, address[] path, address to, uint256 deadline)
  • So that's what I categorized in the EVMSigMap as a swap_native_for_tokens operation
  • For that operation we are only interested in the amount, token_out address, and to address so here is the table definition:
    SQL_CREATE_SWAP_NATIVE_OP_TABLE = ''' CREATE TABLE IF NOT EXISTS %s (
        tx_id INTEGER PRIMARY KEY,
        amount REAL,
        token_out INTEGER,
        to_id INTEGER
    );'''
  • And here is the main parsing function for that third type of transaction:
        def parse_swap_native_op(self, regdata, _sig=None, txh=None):
            """Parse a swap native operation input data"""
            # regdata should contain the data for:
            # (uint256,address[],address,uint256)
    
            regs = self.parse_register_data(regdata, txh)
    
            self.check(int(regs[1], 16) == 4 * 32, "Invalid value for regs[1]: %s (hash=%s)", regs[2], txh)
            # get the number of tokens:
            ntokens = int(regs[4], 16)
            self.check(len(regs) == 5 + ntokens, "Invalid number of tokens: %s (hash=%s)", regs, txh)
    
            # Extract the data:
            token_out = self.chain.to_checksum_address(regs[-1][-40:])
            to_id = self.chain.to_checksum_address(regs[2][-40:])
    
            return {
                "amount": float(int(regs[0], 16)),
                "token_out": self.chain.get_address_id(token_out),
                "to_id": self.chain.get_address_id(to_id),
            }
  • When collecting blocks I now sometimes (often!) get errors such as this one:
    File "/mnt/data1/dev/projects/NervProj/nvp/nvp_component.py", line 105, in handle
    return self.call_handler(f"{self.handlers_path}.{hname}", self, *args, **kwargs)
    File "/mnt/data1/dev/projects/NervProj/nvp/nvp_component.py", line 100, in call_handler
    return self.ctx.call_handler(hname, *args, **kwargs)
    File "/mnt/data1/dev/projects/NervProj/nvp/nvp_context.py", line 678, in call_handler
    return handler(*args, **kwargs)
    File "/mnt/data1/dev/projects/NervHome/nvh/crypto/blockchain/handlers/collect_evm_blocks.py", line 36, in handle
    process_block(cdb, tdb, bck, f"{idx*100.0/count:.2f}%: ")
    File "/mnt/data1/dev/projects/NervHome/nvh/crypto/blockchain/handlers/collect_evm_blocks.py", line 67, in process_block
    process_transactions(cdb, tdb, txs)
    File "/mnt/data1/dev/projects/NervHome/nvh/crypto/blockchain/handlers/collect_evm_blocks.py", line 126, in process_transactions
    tdb.insert_swap_tokens_ops(parsed["swap_tokens"])
    File "/mnt/data1/dev/projects/NervHome/nvh/crypto/blockchain/transactions_db.py", line 227, in insert_swap_tokens_ops
    self.execute(self.insert_swap_tokens_sql, rows, many=True, commit=True)
    File "/mnt/data1/dev/projects/NervHome/nvh/crypto/blockchain/transactions_db.py", line 113, in execute
    return self.sql_db.execute(*args, **kaargs)
    File "/mnt/data1/dev/projects/NervHome/nvh/core/postgresql_db.py", line 60, in execute
    c.executemany(code, data)
    psycopg2.errors.NumericValueOutOfRange: "356811923176490000000000000000000000000000000" is out of range for type real
  • This definitely makes sense of course as that value is is something like 35681192,317649 * 1e37 and the “real” type max value is something around 1e37.
  • ⇒ But actually I have an idea to solve this: I'm not using any negative values yet, so what i'm going to do is as follow:
    • When I find an integer value larger than 1e37, I divide the value by 1e37, and multiply by -1.0!
    • This way I could store even larger values in the negative range (and restore as needed when reading from the database of course)
  • So here is what this now looks like in the TransactionDB class(fixed in many other functions too obviously):
        def swap_tokens_row_from_desc(self, desc):
            """Convert a tx desc to a row"""
            keys = (
                "tx_id",
                "amount_in",
                "amount_out",
                "token_in",
                "token_out",
                "to_id",
            )
    
            row = [desc[key] for key in keys]
            # fix float range:
            if row[1] > 1e37:
                row[1] = -row[1] / 1e37
            if row[2] > 1e37:
                row[2] = -row[2] / 1e37
    
            return row
* => And this will conclude this dev session ! See you next time 😀!
  • blog/2022/0702_collecting_transactions_data.txt
  • Last modified: 2022/07/02 05:11
  • by 127.0.0.1