diff --git a/lamden/cli/start.py b/lamden/cli/start.py index 4cbdc37b2..011ac6822 100644 --- a/lamden/cli/start.py +++ b/lamden/cli/start.py @@ -29,6 +29,8 @@ def start_mongo(): print('Starting MongoDB...') time.sleep(3) +def cfg_and_start_rsync_daemon(): + os.system('cp rsyncd.conf /etc/ && rsync --daemon > /dev/null 2>&1') def print_ascii_art(): print(''' @@ -130,6 +132,7 @@ def start_node(args): if args.node_type == 'masternode': # Start mongo start_mongo() + cfg_and_start_rsync_daemon() n = Masternode( wallet=wallet, @@ -186,6 +189,7 @@ def join_network(args): if args.node_type == 'masternode': # Start mongo start_mongo() + cfg_and_start_rsync_daemon() n = Masternode( wallet=wallet, diff --git a/lamden/nodes/base.py b/lamden/nodes/base.py index a2fa2fe2a..9fbb5260b 100644 --- a/lamden/nodes/base.py +++ b/lamden/nodes/base.py @@ -1,4 +1,5 @@ -from lamden import storage, network, router, authentication, rewards, upgrade +from lamden import network, router, authentication, rewards, upgrade +from lamden import storage from lamden.crypto import canonical from lamden.crypto.wallet import Wallet from lamden.contracts import sync @@ -245,7 +246,8 @@ async def catchup(self, mn_seed, mn_vk): wallet=self.wallet, ctx=self.ctx ) - self.process_new_block(block) + if not block.get('error'): + self.process_new_block(block) # Process any blocks that were made while we were catching up while len(self.new_block_processor.q) > 0: @@ -318,8 +320,6 @@ def update_state(self, block): client=self.client ) - #self.nonces.flush_pending() - self.log.info('Updating metadata.') self.current_height = storage.get_latest_block_height(self.driver) self.current_hash = storage.get_latest_block_hash(self.driver) @@ -335,15 +335,14 @@ def process_new_block(self, block): if self.store: encoded_block = encode(block) encoded_block = json.loads(encoded_block) - - self.blocks.store_block(encoded_block) - # create Event File self.event_writer.write_event(Event( topics=[NEW_BLOCK_EVENT], data=encoded_block )) + self.blocks.store_block(block) + # Prepare for the next block by flushing out driver and notification state # self.new_block_processor.clean() diff --git a/lamden/nodes/delegate/delegate.py b/lamden/nodes/delegate/delegate.py index 42c8a1a06..3d2fedab2 100644 --- a/lamden/nodes/delegate/delegate.py +++ b/lamden/nodes/delegate/delegate.py @@ -38,9 +38,11 @@ def __init__(self, client: ContractingClient, nonces: storage.NonceStorage, debu async def process_message(self, msg): self.log.info(f'Received work from {msg["sender"][:8]}') - # if msg['sender'] not in self.masters: - # self.log.error(f'TX Batch received from non-master {msg["sender"][:8]}') - # return + self.log.info(msg) + + if msg['sender'] not in self.masters: + self.log.error(f'TX Batch received from non-master {msg["sender"][:8]}') + return shim = { 'transactions': [], diff --git a/lamden/nodes/events.py b/lamden/nodes/events.py index a63c6bda0..68942c6b5 100644 --- a/lamden/nodes/events.py +++ b/lamden/nodes/events.py @@ -51,8 +51,7 @@ def get_events(self) -> List[Event]: with open(file, 'r') as f: try: e = json.load(f) - event = Event(e['topics'], e['data']) - events.append(event) + events.append(Event(e['topics'], e['data'])) except: # TODO(nikita): proper handling print('failed to load event') diff --git a/lamden/nodes/masternode/masternode.py b/lamden/nodes/masternode/masternode.py index eb3706ac7..981ec3b11 100644 --- a/lamden/nodes/masternode/masternode.py +++ b/lamden/nodes/masternode/masternode.py @@ -43,7 +43,7 @@ def get_block(self, command): block = self.blocks.get_block(num) if block is None: - return None + return {"error": "block does not exist"} return block diff --git a/lamden/nodes/masternode/webserver.py b/lamden/nodes/masternode/webserver.py index eb3b0ffc3..9042a0d51 100644 --- a/lamden/nodes/masternode/webserver.py +++ b/lamden/nodes/masternode/webserver.py @@ -153,7 +153,7 @@ async def disconnect(): @self.sio.event async def event(data): for client in self.ws_clients: - await client.send(json.dumps(data)) + await client.send(encode(data)) def __register_app_listeners(self): @self.app.listener('after_server_start') @@ -180,7 +180,7 @@ async def ws_handler(self, request, ws): 'data': block } - await ws.send(json.dumps(eventData)) + await ws.send(encode(eventData)) async for message in ws: pass @@ -393,7 +393,7 @@ async def get_latest_block(self, request): num = storage.get_latest_block_height(self.driver) block = self.blocks.get_block(int(num)) - return response.json(block, dumps=NonceEncoder().encode, headers={'Access-Control-Allow-Origin': '*'}) + return response.json(block, dumps=encode, headers={'Access-Control-Allow-Origin': '*'}) async def get_latest_block_number(self, request): self.driver.clear_pending_state() @@ -424,7 +424,7 @@ async def get_block(self, request): return response.json({'error': 'Block not found.'}, status=400, headers={'Access-Control-Allow-Origin': '*'}) - return response.json(block, dumps=NonceEncoder().encode, headers={'Access-Control-Allow-Origin': '*'}) + return response.json(block, dumps=encode, headers={'Access-Control-Allow-Origin': '*'}) async def get_tx(self, request): _hash = request.args.get('hash') @@ -444,7 +444,7 @@ async def get_tx(self, request): return response.json({'error': 'Transaction not found.'}, status=400, headers={'Access-Control-Allow-Origin': '*'}) - return response.json(tx, dumps=NonceEncoder().encode, headers={'Access-Control-Allow-Origin': '*'}) + return response.json(tx, dumps=encode, headers={'Access-Control-Allow-Origin': '*'}) async def get_constitution(self, request): self.client.raw_driver.clear_pending_state() diff --git a/lamden/storage.py b/lamden/storage.py index a43471b40..29fbc004f 100644 --- a/lamden/storage.py +++ b/lamden/storage.py @@ -1,106 +1,166 @@ from contracting.db.driver import ContractDriver -from pymongo import MongoClient, DESCENDING - -from bson.decimal128 import Decimal128 -from bson.codec_options import TypeCodec, TypeEncoder, TypeDecoder -from bson.codec_options import TypeRegistry -from bson.codec_options import CodecOptions - -from decimal import Decimal - -import lamden -from lamden.logger.base import get_logger +from contracting.db.driver import FSDriver +from contracting.db.encoder import encode, decode, encode_kv from contracting.stdlib.bridge.decimal import ContractingDecimal +from lamden.logger.base import get_logger +import json +import os +import pathlib +import shutil -BLOCK_HASH_KEY = '_current_block_hash' -BLOCK_NUM_HEIGHT = '_current_block_height' +BLOCK_HASH_KEY = '__latest_block.hash' +BLOCK_NUM_HEIGHT = '__latest_block.height' NONCE_KEY = '__n' PENDING_NONCE_KEY = '__pn' +STORAGE_HOME = pathlib.Path().home().joinpath('.lamden') + log = get_logger('STATE') +class BlockStorage: + def __init__(self, home=STORAGE_HOME): + self.home = home + + self.blocks_dir = self.home.joinpath('blocks') + self.blocks_alias_dir = self.blocks_dir.joinpath('alias') + self.txs_dir = self.blocks_dir.joinpath('txs') -class DecimalEncoder(TypeEncoder): - python_type = Decimal # the Python type acted upon by this type codec + self.build_directories() - def transform_python(self, value): - d = Decimal(str(value)) - return Decimal128(d) + def build_directories(self): + self.home.mkdir(exist_ok=True, parents=True) + self.blocks_dir.mkdir(exist_ok=True, parents=True) + self.blocks_alias_dir.mkdir(exist_ok=True, parents=True) + self.txs_dir.mkdir(exist_ok=True, parents=True) + def flush(self): + try: + shutil.rmtree(self.home) + self.build_directories() + except FileNotFoundError: + pass + + def store_block(self, block): + if block.get('subblocks') is None: + return -class ContractingDecimalEncoder(TypeEncoder): - python_type = ContractingDecimal # the Python type acted upon by this type codec + txs, hashes = self.cull_txs(block) + self.write_block(block) + self.write_txs(txs, hashes) - def transform_python(self, value): - d = Decimal(str(value._d)) - return Decimal128(d) + @staticmethod + def cull_txs(block): + # Pops all transactions from the block and replaces them with the hash only for storage space + # Returns the data and hashes for storage in a different folder. Block is modified in place + txs = [] + hashes = [] + for subblock in block['subblocks']: + subblock_txs = [] + subblock_hashes = [] + for i in range(len(subblock['transactions'])): + tx = subblock['transactions'].pop(0) -class DecimalDecoder(TypeDecoder): - bson_type = Decimal128 + subblock_txs.append(tx) + subblock_hashes.append(tx['hash']) - def transform_bson(self, value): - return value.to_decimal() + subblock['transactions'] = subblock_hashes + try: + subblock['subblock'] = int(subblock['subblock']) + except: + pass -# class ContractingDecimalCodec(TypeCodec): -# python_type = ContractingDecimal # the Python type acted upon by this type codec -# bson_type = Decimal128 # the BSON type acted upon by this type codec -# -# def transform_python(self, value): -# return Decimal128(value._d) -# -# def transform_bson(self, value): -# return value.to_decimal() + txs.extend(subblock_txs) + hashes.extend(subblock_hashes) + return txs, hashes -type_registry = TypeRegistry([DecimalDecoder(), DecimalEncoder(), ContractingDecimalEncoder()]) -codec_options = CodecOptions(type_registry=type_registry) + def write_block(self, block): + num = block.get('number') + if type(num) == dict: + num = num.get('__fixed__') + block['number'] = num -class NonceStorage: - def __init__(self, port=27027, db_name='lamden', nonce_collection='nonces', pending_collection='pending_nonces', config_path=lamden.__path__[0]): - self.config_path = config_path + name = str(num).zfill(64) - self.port = port + symlink_name = block.get('hash') - self.client = MongoClient() - self.db = self.client.get_database(db_name) + encoded_block = encode(block) + with open(self.blocks_dir.joinpath(name), 'w') as f: + f.write(encoded_block) - self.nonces = self.db.get_collection(nonce_collection, codec_options=codec_options) - self.pending_nonces = self.db.get_collection(pending_collection, codec_options=codec_options) + try: + os.symlink(self.blocks_dir.joinpath(name), self.blocks_alias_dir.joinpath(symlink_name)) + except FileExistsError: + pass - @staticmethod - def get_one(sender, processor, db): - v = db.find_one( - { - 'sender': sender, - 'processor': processor - } - ) + def write_txs(self, txs, hashes): + for file, data in zip(hashes, txs): + with open(self.txs_dir.joinpath(file), 'w') as f: + encoded_tx = encode(data) + f.write(encoded_tx) + def get_block(self, v=None, no_id=True): if v is None: return None - return v['value'] + try: + if isinstance(v, int): + f = open(self.blocks_dir.joinpath(str(v).zfill(64))) + else: + f = open(self.blocks_alias_dir.joinpath(v)) + except FileNotFoundError: + return None + + encoded_block = f.read() + block = decode(encoded_block) + f.close() + self.fill_block(block) + + return block + + def fill_block(self, block): + for subblock in block['subblocks']: + txs = [] + for i in range(len(subblock['transactions'])): + tx = self.get_tx(subblock['transactions'][i]) + txs.append(tx) + + subblock['transactions'] = txs + + def get_tx(self, h): + try: + f = open(self.txs_dir.joinpath(h)) + encoded_tx = f.read() + + tx = decode(encoded_tx) + + f.close() + except FileNotFoundError: + tx = None + + return tx + +class NonceStorage: + def __init__(self, nonce_collection=STORAGE_HOME.joinpath('nonces'), + pending_collection=STORAGE_HOME.joinpath('pending_nonces')): + self.nonces = FSDriver(root=nonce_collection) + self.pending_nonces = FSDriver(root=pending_collection) @staticmethod - def set_one(sender, processor, value, db): - db.update_one( - { - 'sender': sender, - 'processor': processor - }, - { - '$set': - { - 'value': value - } - }, upsert=True - ) + def get_one(sender, processor, db: FSDriver): + return db.get(f'__nonces.{processor}/{sender}') + @staticmethod + def set_one(sender, processor, value, db: FSDriver): + return db.set(f'__nonces.{processor}/{sender}', value) + + # Move this to transaction.py def get_nonce(self, sender, processor): return self.get_one(sender, processor, self.nonces) + # Move this to transaction.py def get_pending_nonce(self, sender, processor): return self.get_one(sender, processor, self.pending_nonces) @@ -110,6 +170,7 @@ def set_nonce(self, sender, processor, value): def set_pending_nonce(self, sender, processor, value): self.set_one(sender, processor, value, self.pending_nonces) + # Move this to webserver.py def get_latest_nonce(self, sender, processor): latest_nonce = self.get_pending_nonce(sender=sender, processor=processor) @@ -122,26 +183,23 @@ def get_latest_nonce(self, sender, processor): return latest_nonce def flush(self): - self.nonces.drop() - self.pending_nonces.drop() + self.nonces.flush() + self.pending_nonces.flush() def flush_pending(self): - self.pending_nonces.drop() - + self.pending_nonces.flush() def get_latest_block_hash(driver: ContractDriver): - latest_hash = driver.get(BLOCK_HASH_KEY, mark=False) + latest_hash = driver.get(BLOCK_HASH_KEY) if latest_hash is None: return '0' * 64 return latest_hash - def set_latest_block_hash(h, driver: ContractDriver): driver.driver.set(BLOCK_HASH_KEY, h) - def get_latest_block_height(driver: ContractDriver): - h = driver.get(BLOCK_NUM_HEIGHT, mark=False) + h = driver.get(BLOCK_NUM_HEIGHT) if h is None: return 0 @@ -150,14 +208,10 @@ def get_latest_block_height(driver: ContractDriver): return h - def set_latest_block_height(h, driver: ContractDriver): driver.driver.set(BLOCK_NUM_HEIGHT, h) - def update_state_with_transaction(tx, driver: ContractDriver, nonces: NonceStorage): - nonces_to_delete = [] - if tx['state'] is not None and len(tx['state']) > 0: for delta in tx['state']: driver.driver.set(delta['key'], delta['value']) @@ -169,11 +223,7 @@ def update_state_with_transaction(tx, driver: ContractDriver, nonces: NonceStora value=tx['transaction']['payload']['nonce'] + 1 ) - nonces_to_delete.append((tx['transaction']['payload']['sender'], tx['transaction']['payload']['processor'])) - - for n in nonces_to_delete: - nonces.set_pending_nonce(*n, value=None) - + nonces.flush_pending() def update_state_with_block(block, driver: ContractDriver, nonces: NonceStorage, set_hash_and_height=True): if block.get('subblocks') is not None: @@ -186,114 +236,3 @@ def update_state_with_block(block, driver: ContractDriver, nonces: NonceStorage, set_latest_block_hash(block['hash'], driver=driver) set_latest_block_height(block['number'], driver=driver) - -class BlockStorage: - BLOCK = 0 - TX = 1 - - def __init__(self, port=27027, config_path=lamden.__path__[0], db='lamden', blocks_collection='blocks', tx_collection='tx'): - # Setup configuration file to read constants - self.config_path = config_path - - self.port = port - - self.client = MongoClient() - self.db = self.client.get_database(db) - - self.blocks = self.db.get_collection(blocks_collection, codec_options=codec_options) - self.txs = self.db.get_collection(tx_collection, codec_options=codec_options) - - def q(self, v): - if isinstance(v, int): - return {'number': v} - return {'hash': v} - - def get_block(self, v=None, no_id=True): - if v is None: - return None - - q = self.q(v) - block = self.blocks.find_one(q) - - if block is not None and no_id: - block.pop('_id') - - return block - - def put(self, data, collection=BLOCK): - if collection == BlockStorage.BLOCK: - _id = self.blocks.insert_one(data) - log.debug(data) - del data['_id'] - elif collection == BlockStorage.TX: - _id = self.txs.insert_one(data) - del data['_id'] - else: - return False - - return _id is not None - - def get_last_n(self, n, collection=BLOCK): - if collection == BlockStorage.BLOCK: - c = self.blocks - else: - return None - - block_query = c.find({}, {'_id': False}).sort( - 'number', DESCENDING - ).limit(n) - - blocks = [block for block in block_query] - - if len(blocks) > 1: - first_block_num = blocks[0].get('number') - last_block_num = blocks[-1].get('number') - - assert first_block_num > last_block_num, "Blocks are not descending." - - return blocks - - def get_tx(self, h, no_id=True): - tx = self.txs.find_one({'hash': h}) - - if tx is not None and no_id: - tx.pop('_id') - - return tx - - def drop_collections(self): - self.blocks.drop() - self.txs.drop() - - def flush(self): - self.drop_collections() - - def store_block(self, block): - if block.get('number') is not None: - block['number'] = int(block['number']) - - self.put(block, BlockStorage.BLOCK) - self.store_txs(block) - - def store_txs(self, block): - if block.get('subblocks') is None: - return - - for subblock in block['subblocks']: - for tx in subblock['transactions']: - self.put(tx, BlockStorage.TX) - - def delete_tx(self, h): - self.txs.delete_one({'hash': h}) - - def delete_block(self, v): - block = self.get_block(v, no_id=False) - - if block is None: - return - - for subblock in block['subblocks']: - for tx in subblock['transactions']: - self.delete_tx(tx['hash']) - - self.blocks.delete_one({'_id': block['_id']}) diff --git a/lamden/utils/__init__.py b/lamden/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lamden/utils/legacy.py b/lamden/utils/legacy.py new file mode 100644 index 000000000..6fd6b3d8d --- /dev/null +++ b/lamden/utils/legacy.py @@ -0,0 +1,299 @@ +from contracting.db.driver import ContractDriver +from pymongo import MongoClient, DESCENDING + +from bson.decimal128 import Decimal128 +from bson.codec_options import TypeCodec, TypeEncoder, TypeDecoder +from bson.codec_options import TypeRegistry +from bson.codec_options import CodecOptions + +from decimal import Decimal + +import lamden +from lamden.logger.base import get_logger +from contracting.stdlib.bridge.decimal import ContractingDecimal + +BLOCK_HASH_KEY = '_current_block_hash' +BLOCK_NUM_HEIGHT = '_current_block_height' +NONCE_KEY = '__n' +PENDING_NONCE_KEY = '__pn' + +log = get_logger('STATE') + + +class DecimalEncoder(TypeEncoder): + python_type = Decimal # the Python type acted upon by this type codec + + def transform_python(self, value): + d = Decimal(str(value)) + return Decimal128(d) + + +class ContractingDecimalEncoder(TypeEncoder): + python_type = ContractingDecimal # the Python type acted upon by this type codec + + def transform_python(self, value): + d = Decimal(str(value._d)) + return Decimal128(d) + + +class DecimalDecoder(TypeDecoder): + bson_type = Decimal128 + + def transform_bson(self, value): + return value.to_decimal() + +# class ContractingDecimalCodec(TypeCodec): +# python_type = ContractingDecimal # the Python type acted upon by this type codec +# bson_type = Decimal128 # the BSON type acted upon by this type codec +# +# def transform_python(self, value): +# return Decimal128(value._d) +# +# def transform_bson(self, value): +# return value.to_decimal() + + +type_registry = TypeRegistry([DecimalDecoder(), DecimalEncoder(), ContractingDecimalEncoder()]) +codec_options = CodecOptions(type_registry=type_registry) + + +class NonceStorage: + def __init__(self, port=27027, db_name='lamden', nonce_collection='nonces', pending_collection='pending_nonces', config_path=lamden.__path__[0]): + self.config_path = config_path + + self.port = port + + self.client = MongoClient() + self.db = self.client.get_database(db_name) + + self.nonces = self.db.get_collection(nonce_collection, codec_options=codec_options) + self.pending_nonces = self.db.get_collection(pending_collection, codec_options=codec_options) + + @staticmethod + def get_one(sender, processor, db): + v = db.find_one( + { + 'sender': sender, + 'processor': processor + } + ) + + if v is None: + return None + + return v['value'] + + @staticmethod + def set_one(sender, processor, value, db): + db.update_one( + { + 'sender': sender, + 'processor': processor + }, + { + '$set': + { + 'value': value + } + }, upsert=True + ) + + def get_nonce(self, sender, processor): + return self.get_one(sender, processor, self.nonces) + + def get_pending_nonce(self, sender, processor): + return self.get_one(sender, processor, self.pending_nonces) + + def set_nonce(self, sender, processor, value): + self.set_one(sender, processor, value, self.nonces) + + def set_pending_nonce(self, sender, processor, value): + self.set_one(sender, processor, value, self.pending_nonces) + + def get_latest_nonce(self, sender, processor): + latest_nonce = self.get_pending_nonce(sender=sender, processor=processor) + + if latest_nonce is None: + latest_nonce = self.get_nonce(sender=sender, processor=processor) + + if latest_nonce is None: + latest_nonce = 0 + + return latest_nonce + + def flush(self): + self.nonces.drop() + self.pending_nonces.drop() + + def flush_pending(self): + self.pending_nonces.drop() + + +def get_latest_block_hash(driver: ContractDriver): + latest_hash = driver.get(BLOCK_HASH_KEY) + if latest_hash is None: + return '0' * 64 + return latest_hash + + +def set_latest_block_hash(h, driver: ContractDriver): + driver.driver.set(BLOCK_HASH_KEY, h) + + +def get_latest_block_height(driver: ContractDriver): + h = driver.get(BLOCK_NUM_HEIGHT) + if h is None: + return 0 + + if type(h) == ContractingDecimal: + h = int(h._d) + + return h + + +def set_latest_block_height(h, driver: ContractDriver): + driver.driver.set(BLOCK_NUM_HEIGHT, h) + + +def update_state_with_transaction(tx, driver: ContractDriver, nonces: NonceStorage): + nonces_to_delete = [] + + if tx['state'] is not None and len(tx['state']) > 0: + for delta in tx['state']: + driver.driver.set(delta['key'], delta['value']) + # log.debug(f"{delta['key']} -> {delta['value']}") + + nonces.set_nonce( + sender=tx['transaction']['payload']['sender'], + processor=tx['transaction']['payload']['processor'], + value=tx['transaction']['payload']['nonce'] + 1 + ) + + nonces_to_delete.append((tx['transaction']['payload']['sender'], tx['transaction']['payload']['processor'])) + + for n in nonces_to_delete: + nonces.set_pending_nonce(*n, value=None) + + +def update_state_with_block(block, driver: ContractDriver, nonces: NonceStorage, set_hash_and_height=True): + if block.get('subblocks') is not None: + for sb in block['subblocks']: + for tx in sb['transactions']: + update_state_with_transaction(tx, driver, nonces) + + # Update our block hash and block num + if set_hash_and_height: + set_latest_block_hash(block['hash'], driver=driver) + set_latest_block_height(block['number'], driver=driver) + + +class BlockStorage: + BLOCK = 0 + TX = 1 + + def __init__(self, port=27027, config_path=lamden.__path__[0], db='lamden', blocks_collection='blocks', tx_collection='tx'): + # Setup configuration file to read constants + self.config_path = config_path + + self.port = port + + self.client = MongoClient() + self.db = self.client.get_database(db) + + self.blocks = self.db.get_collection(blocks_collection, codec_options=codec_options) + self.txs = self.db.get_collection(tx_collection, codec_options=codec_options) + + def q(self, v): + if isinstance(v, int): + return {'number': v} + return {'hash': v} + + def get_block(self, v=None, no_id=True): + if v is None: + return None + + q = self.q(v) + block = self.blocks.find_one(q) + + if block is not None and no_id: + block.pop('_id') + + return block + + def put(self, data, collection=BLOCK): + if collection == BlockStorage.BLOCK: + _id = self.blocks.insert_one(data) + log.debug(data) + del data['_id'] + elif collection == BlockStorage.TX: + _id = self.txs.insert_one(data) + del data['_id'] + else: + return False + + return _id is not None + + def get_last_n(self, n, collection=BLOCK): + if collection == BlockStorage.BLOCK: + c = self.blocks + else: + return None + + block_query = c.find({}, {'_id': False}).sort( + 'number', DESCENDING + ).limit(n) + + blocks = [block for block in block_query] + + if len(blocks) > 1: + first_block_num = blocks[0].get('number') + last_block_num = blocks[-1].get('number') + + assert first_block_num > last_block_num, "Blocks are not descending." + + return blocks + + def get_tx(self, h, no_id=True): + tx = self.txs.find_one({'hash': h}) + + if tx is not None and no_id: + tx.pop('_id') + + return tx + + def drop_collections(self): + self.blocks.drop() + self.txs.drop() + + def flush(self): + self.drop_collections() + + def store_block(self, block): + if block.get('number') is not None: + block['number'] = int(block['number']) + + self.put(block, BlockStorage.BLOCK) + self.store_txs(block) + + def store_txs(self, block): + if block.get('subblocks') is None: + return + + for subblock in block['subblocks']: + for tx in subblock['transactions']: + self.put(tx, BlockStorage.TX) + + def delete_tx(self, h): + self.txs.delete_one({'hash': h}) + + def delete_block(self, v): + block = self.get_block(v, no_id=False) + + if block is None: + return + + for subblock in block['subblocks']: + for tx in subblock['transactions']: + self.delete_tx(tx['hash']) + + self.blocks.delete_one({'_id': block['_id']}) diff --git a/lamden/utils/migrate_mongo.py b/lamden/utils/migrate_mongo.py new file mode 100644 index 000000000..3f5e96180 --- /dev/null +++ b/lamden/utils/migrate_mongo.py @@ -0,0 +1,140 @@ +from contracting.client import ContractingClient +from contracting.db.driver import ContractDriver, Driver, FSDriver +from lamden import storage, rewards +from lamden.contracts import sync +from lamden.logger.base import get_logger +import decimal +import gc +import json +import lamden +import legacy + +import pathlib + +log = get_logger('MIGRATE') + +class MigrationNode: + def __init__(self, + constitution=pathlib.Path.home().joinpath('constitution.json'), + debug=True, store=True, bypass_catchup=False, + genesis_path=lamden.contracts.__path__[0], + reward_manager=rewards.RewardManager(), + nonces=storage.NonceStorage()): + + self.new_blocks = storage.BlockStorage() + self.old_blocks = legacy.BlockStorage() + + self.new_driver = ContractDriver() + self.new_driver.driver = FSDriver() + + self.old_driver = ContractDriver() + self.old_driver.driver = Driver() + + self.new_nonces = nonces + self.store = store + + self.log = get_logger('MigrationNode') + self.log.propagate = debug + + self.genesis_path = genesis_path + + self.client = ContractingClient( + driver=self.new_driver, + submission_filename=genesis_path + '/submission.s.py' + ) + + self.constitution = constitution + + self.reward_manager = reward_manager + + self.bypass_catchup = bypass_catchup + + with open(constitution) as f: + self.constitution = json.load(f) + + self.seed_genesis_contracts() + + def seed_genesis_contracts(self): + self.log.info('Setting up genesis contracts.') + sync.setup_genesis_contracts( + initial_masternodes=self.constitution['masternodes'], + initial_delegates=self.constitution['delegates'], + client=self.client, + filename=self.genesis_path + '/genesis.json', + root=self.genesis_path + ) + + def should_process(self, block): + try: + self.log.info(f'Processing block #{block.get("number")}') + except: + self.log.error('Malformed block :(') + return False + # Test if block failed immediately + if block == {'response': 'ok'}: + return False + + if block['hash'] == 'f' * 64: + self.log.error('Failed Block! Not storing.') + return False + + return True + + def update_state(self, block): + self.new_driver.clear_pending_state() + + # Check if the block is valid + if self.should_process(block): + self.log.info('Storing new block.') + # Commit the state changes and nonces to the database + storage.update_state_with_block( + block=block, + driver=self.new_driver, + nonces=self.new_nonces + ) + + self.log.info('Issuing rewards.') + # Calculate and issue the rewards for the governance nodes + self.reward_manager.issue_rewards( + block=block, + client=self.client + ) + + self.log.info('Updating metadata.') + + def process_new_block(self, block): + # Update the state and refresh the sockets so new nodes can join + self.update_state(block) + + # Store the block if it's a masternode + if self.store: + self.new_blocks.store_block(block) + + # Prepare for the next block by flushing out driver and notification state + self.new_driver.commit() + self.new_driver.clear_pending_state() + gc.collect() + + def catchup(self): + # Get the current latest block stored and the latest block of the network + log.info('Running migration.') + # Find the missing blocks process them + while True: + current = storage.get_latest_block_height(self.new_driver) + latest = legacy.get_latest_block_height(self.old_driver) + log.info(f'Current block: {current}, Latest available block: {latest}') + # Increment current by one. Don't count the genesis block. + current = 1 if current == 0 else current + + if current == latest: + break + + for i in range(current, latest+1): + block = self.old_blocks.get_block(v=i) + log.info(f'Migrating block: #{i}') + if block is not None: + self.process_new_block(block) + +if __name__ == '__main__': + mn = MigrationNode() + mn.catchup() diff --git a/rsyncd.conf b/rsyncd.conf new file mode 100644 index 000000000..857fbff65 --- /dev/null +++ b/rsyncd.conf @@ -0,0 +1,35 @@ +pid file = /var/run/rsyncd.pid +lock file = /var/run/rsync.lock +log file = /var/log/rsync.log + +[blocks] +path = /root/.lamden/blocks +read only = true +list = yes +timeout = 300 +uid = nobody +gid = nogroup + +[state] +path = /root/.lamden/state +read only = true +list = yes +timeout = 300 +uid = nobody +gid = nogroup + +[nonces] +path = /root/.lamden/nonces +read only = true +list = yes +timeout = 300 +uid = nobody +gid = nogroup + +[pending_nonces] +path = /root/.lamden/pending_nonces +read only = true +list = yes +timeout = 300 +uid = nobody +gid = nogroup diff --git a/tests/inprog/test_orchestrate_rewards_voting.py b/tests/inprog/test_orchestrate_rewards_voting.py index 9450b1643..e346cf1a5 100644 --- a/tests/inprog/test_orchestrate_rewards_voting.py +++ b/tests/inprog/test_orchestrate_rewards_voting.py @@ -1239,4 +1239,4 @@ async def test(): loop.run_until_complete(test()) def test_1_by_2_votes_new_masternode_that_never_joins_can_vote_them_out(self): - pass \ No newline at end of file + pass diff --git a/tests/integration/test_base_node.py b/tests/integration/test_base_node.py index f2491767e..a67e86902 100644 --- a/tests/integration/test_base_node.py +++ b/tests/integration/test_base_node.py @@ -1,6 +1,7 @@ from lamden.nodes.masternode import masternode from lamden.nodes import base -from lamden import router, storage, network, authentication +from lamden import router, network, authentication +from lamden import storage from lamden.crypto.wallet import Wallet from lamden.crypto import canonical from contracting.db.driver import InMemDriver, ContractDriver diff --git a/tests/unit/test_block_service.py b/tests/unit/test_block_service.py index da32f6b9d..a8e51774d 100644 --- a/tests/unit/test_block_service.py +++ b/tests/unit/test_block_service.py @@ -1,6 +1,7 @@ from lamden.nodes.masternode import masternode from lamden.nodes import base -from lamden import router, storage, authentication +from lamden import router, authentication +from lamden import storage from contracting.db.driver import ContractDriver from contracting.client import ContractingClient import zmq.asyncio diff --git a/tests/unit/test_class_webserver.py b/tests/unit/test_class_webserver.py index 7d36b6887..fffcd63a7 100644 --- a/tests/unit/test_class_webserver.py +++ b/tests/unit/test_class_webserver.py @@ -3,7 +3,6 @@ # from lamden.webserver.webserver import WebServer from lamden.nodes.masternode.webserver import WebServer # from lamden.webserver.readers import AsyncBlockReader -from lamden.storage import BlockStorage from lamden.crypto.wallet import Wallet from contracting.client import ContractingClient from contracting.db.driver import ContractDriver, decode, encode @@ -40,13 +39,12 @@ def setUp(self): ) self.ws.client.flush() - self.blocks.flush() + self.ws.blocks.flush() self.ws.driver.flush() - self.loop = asyncio.get_event_loop() def tearDown(self): self.ws.client.flush() - self.blocks.flush() + self.ws.blocks.flush() self.ws.driver.flush() def test_ping(self): @@ -290,21 +288,59 @@ def test_get_latest_block(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block) + self.ws.blocks.store_block(block) block2 = { 'hash': 'abb', 'number': 1000, - 'data': 'woop2' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] + } + + b2exp = { + 'hash': 'abb', + 'number': 1000, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block2) + storage.set_latest_block_height(1000, driver=self.ws.driver) + self.ws.blocks.store_block(block2) _, response = self.ws.app.test_client.get('/latest_block') - self.assertDictEqual(response.json, {'hash': 'abb', 'number': 1000, 'data': 'woop2'}) + print(response.json) + self.assertDictEqual(response.json, b2exp) def test_get_latest_block_num(self): storage.set_latest_block_height(1234, self.ws.driver) @@ -322,16 +358,42 @@ def test_get_latest_block_hash(self): def test_get_block_by_num_that_exists(self): block = { - 'hash': '1234', + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] + } + + exp = { + 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block) + self.ws.blocks.store_block(block) _, response = self.ws.app.test_client.get('/blocks?num=1') - self.assertDictEqual(response.json, block) + self.assertDictEqual(response.json, exp) def test_get_block_by_num_that_doesnt_exist_returns_error(self): _, response = self.ws.app.test_client.get('/blocks?num=1000') @@ -342,17 +404,37 @@ def test_get_block_by_hash_that_exists(self): h = '1234' block = { - 'hash': h, - 'blockNum': 1, - 'data': 'woop' + 'hash': '1234', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block) + self.ws.blocks.store_block(block) expected = { - 'hash': h, - 'blockNum': 1, - 'data': 'woop' + 'hash': '1234', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } _, response = self.ws.app.test_client.get(f'/blocks?hash={h}') @@ -455,7 +537,8 @@ def test_fixed_objects_do_not_fail_signature(self): self.assertEqual(len(self.ws.queue), 1) def test_submit_transaction_error_if_queue_full(self): - self.ws.queue.extend(range(10_000)) + for i in range(10_000): + self.ws.queue.append(bytes(i)) tx = build_transaction( wallet=Wallet(), @@ -474,24 +557,33 @@ def test_submit_transaction_error_if_queue_full(self): self.assertDictEqual(response.json, {'error': 'Queue full. Resubmit shortly.'}) - self.ws.queue.clear() + self.ws.queue.flush() def test_get_tx_by_hash_if_it_exists(self): - b = '0' * 64 - - tx = { - 'hash': b, - 'some': 'data' + block = { + 'hash': '1234', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': '123456', + 'foo': 'bar' + }, + + ] + }, + ] } expected = { - 'hash': b, - 'some': 'data' + 'hash': '123456', + 'foo': 'bar' } - self.blocks.put(tx, collection=self.ws.blocks.TX) + self.ws.blocks.store_block(block) - _, response = self.ws.app.test_client.get(f'/tx?hash={b}') + _, response = self.ws.app.test_client.get(f'/tx?hash=123456') self.assertDictEqual(response.json, expected) def test_malformed_tx_returns_error(self): diff --git a/tests/unit/test_storage.py b/tests/unit/test_legacy_storage.py similarity index 86% rename from tests/unit/test_storage.py rename to tests/unit/test_legacy_storage.py index 220fb11d7..829154ee8 100644 --- a/tests/unit/test_storage.py +++ b/tests/unit/test_legacy_storage.py @@ -1,13 +1,10 @@ -from lamden import storage from contracting.db.driver import ContractDriver +from lamden.utils import legacy from unittest import TestCase -from lamden.storage import BlockStorage - - class TestNonce(TestCase): def setUp(self): - self.nonces = storage.NonceStorage() + self.nonces = legacy.NonceStorage() self.nonces.flush() def tearDown(self): @@ -103,21 +100,21 @@ def tearDown(self): self.driver.flush() def test_get_latest_block_hash_0s_if_none(self): - h = storage.get_latest_block_hash(self.driver) + h = legacy.get_latest_block_hash(self.driver) self.assertEqual(h, '0' * 64) def test_get_latest_block_hash_correct_after_set(self): - storage.set_latest_block_hash('a' * 64, self.driver) - h = storage.get_latest_block_hash(self.driver) + legacy.set_latest_block_hash('a' * 64, self.driver) + h = legacy.get_latest_block_hash(self.driver) self.assertEqual(h, 'a' * 64) def test_get_latest_block_height_0_if_none(self): - h = storage.get_latest_block_height(self.driver) + h = legacy.get_latest_block_height(self.driver) self.assertEqual(h, 0) def test_get_latest_block_height_correct_after_set(self): - storage.set_latest_block_height(123, self.driver) - h = storage.get_latest_block_height(self.driver) + legacy.set_latest_block_height(123, self.driver) + h = legacy.get_latest_block_height(self.driver) self.assertEqual(h, 123) @@ -193,7 +190,7 @@ def test_get_latest_block_height_correct_after_set(self): class TestUpdatingState(TestCase): def setUp(self): self.driver = ContractDriver() - self.nonces = storage.NonceStorage() + self.nonces = legacy.NonceStorage() self.nonces.flush() self.driver.flush() self.driver.clear_pending_state() @@ -204,20 +201,20 @@ def tearDown(self): self.driver.clear_pending_state() def test_state_updated_to_correct_values_in_tx(self): - v1 = self.driver.get('hello', mark=False) - v2 = self.driver.get('name', mark=False) + v1 = self.driver.get('hello') + v2 = self.driver.get('name') self.assertIsNone(v1) self.assertIsNone(v2) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_1, driver=self.driver, nonces=self.nonces ) - v1 = self.driver.get('hello', mark=False) - v2 = self.driver.get('name', mark=False) + v1 = self.driver.get('hello') + v2 = self.driver.get('name') self.assertEqual(v1, 'there') self.assertEqual(v2, 'jeff') @@ -226,7 +223,7 @@ def test_nonces_set_to_tx_value(self): n = self.nonces.get_latest_nonce(sender='abc', processor='def') self.assertEqual(n, 0) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_1, driver=self.driver, nonces=self.nonces @@ -246,7 +243,7 @@ def test_nonces_deleted_after_all_updates(self): self.assertEqual(n, 122) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_1, driver=self.driver, nonces=self.nonces @@ -275,19 +272,19 @@ def test_multiple_txs_deletes_multiple_nonces(self): n = self.nonces.get_pending_nonce(sender='xxx', processor='yyy') self.assertEqual(n, 4) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_1, driver=self.driver, nonces=self.nonces ) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_2, driver=self.driver, nonces=self.nonces ) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_3, driver=self.driver, nonces=self.nonces @@ -306,13 +303,13 @@ def test_multiple_txs_deletes_multiple_nonces(self): self.assertEqual(n, 43) def test_multiple_tx_state_updates_correctly(self): - v1 = self.driver.get('hello', mark=False) - v2 = self.driver.get('name', mark=False) + v1 = self.driver.get('hello') + v2 = self.driver.get('name') - v3 = self.driver.get('name2', mark=False) + v3 = self.driver.get('name2') - v4 = self.driver.get('another', mark=False) - v5 = self.driver.get('something', mark=False) + v4 = self.driver.get('another') + v5 = self.driver.get('something') self.assertIsNone(v1) self.assertIsNone(v2) @@ -320,31 +317,31 @@ def test_multiple_tx_state_updates_correctly(self): self.assertIsNone(v4) self.assertIsNone(v5) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_1, driver=self.driver, nonces=self.nonces ) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_2, driver=self.driver, nonces=self.nonces ) - storage.update_state_with_transaction( + legacy.update_state_with_transaction( tx=tx_3, driver=self.driver, nonces=self.nonces ) - v1 = self.driver.get('hello', mark=False) - v2 = self.driver.get('name', mark=False) + v1 = self.driver.get('hello') + v2 = self.driver.get('name') - v3 = self.driver.get('name2', mark=False) + v3 = self.driver.get('name2') - v4 = self.driver.get('another', mark=False) - v5 = self.driver.get('something', mark=False) + v4 = self.driver.get('another') + v5 = self.driver.get('something') self.assertEqual(v1, 'there2') self.assertEqual(v2, 'jeff') @@ -353,20 +350,20 @@ def test_multiple_tx_state_updates_correctly(self): self.assertEqual(v5, 'else') def test_update_with_block_sets_hash_and_height(self): - _hash = storage.get_latest_block_hash(self.driver) - num = storage.get_latest_block_height(self.driver) + _hash = legacy.get_latest_block_hash(self.driver) + num = legacy.get_latest_block_height(self.driver) self.assertEqual(_hash, '0' * 64) self.assertEqual(num, 0) - storage.update_state_with_block( + legacy.update_state_with_block( block=block, driver=self.driver, nonces=self.nonces ) - _hash = storage.get_latest_block_hash(self.driver) - num = storage.get_latest_block_height(self.driver) + _hash = legacy.get_latest_block_hash(self.driver) + num = legacy.get_latest_block_height(self.driver) self.assertEqual(_hash, 'f' * 64) self.assertEqual(num, 555) @@ -390,7 +387,7 @@ def test_update_with_block_sets_nonces_correctly(self): n = self.nonces.get_pending_nonce(sender='xxx', processor='yyy') self.assertEqual(n, 4) - storage.update_state_with_block( + legacy.update_state_with_block( block=block, driver=self.driver, nonces=self.nonces @@ -409,13 +406,13 @@ def test_update_with_block_sets_nonces_correctly(self): self.assertEqual(n, 43) def test_update_state_with_block_sets_state_correctly(self): - v1 = self.driver.get('hello', mark=False) - v2 = self.driver.get('name', mark=False) + v1 = self.driver.get('hello') + v2 = self.driver.get('name') - v3 = self.driver.get('name2', mark=False) + v3 = self.driver.get('name2') - v4 = self.driver.get('another', mark=False) - v5 = self.driver.get('something', mark=False) + v4 = self.driver.get('another') + v5 = self.driver.get('something') self.assertIsNone(v1) self.assertIsNone(v2) @@ -423,19 +420,19 @@ def test_update_state_with_block_sets_state_correctly(self): self.assertIsNone(v4) self.assertIsNone(v5) - storage.update_state_with_block( + legacy.update_state_with_block( block=block, driver=self.driver, nonces=self.nonces ) - v1 = self.driver.get('hello', mark=False) - v2 = self.driver.get('name', mark=False) + v1 = self.driver.get('hello') + v2 = self.driver.get('name') - v3 = self.driver.get('name2', mark=False) + v3 = self.driver.get('name2') - v4 = self.driver.get('another', mark=False) - v5 = self.driver.get('something', mark=False) + v4 = self.driver.get('another') + v5 = self.driver.get('something') self.assertEqual(v1, 'there2') self.assertEqual(v2, 'jeff') @@ -446,7 +443,7 @@ def test_update_state_with_block_sets_state_correctly(self): class TestMasterStorage(TestCase): def setUp(self): - self.db = BlockStorage() + self.db = legacy.BlockStorage() def tearDown(self): self.db.drop_collections() @@ -575,7 +572,7 @@ def test_get_last_n_blocks(self): for block in blocks: self.db.put(block) - got_blocks = self.db.get_last_n(3, BlockStorage.BLOCK) + got_blocks = self.db.get_last_n(3, legacy.BlockStorage.BLOCK) nums = [b['number'] for b in got_blocks] @@ -591,9 +588,9 @@ def test_get_last_n_index(self): blocks.append({'hash': 'a', 'number': 5, 'data': 'woop'}) for block in blocks: - self.db.put(block, BlockStorage.BLOCK) + self.db.put(block, legacy.BlockStorage.BLOCK) - got_blocks = self.db.get_last_n(3, BlockStorage.BLOCK) + got_blocks = self.db.get_last_n(3, legacy.BlockStorage.BLOCK) nums = [b['number'] for b in got_blocks] @@ -609,7 +606,7 @@ def test_get_none_from_wrong_n_collection(self): blocks.append({'hash': 'a', 'number': 5, 'data': 'woop'}) for block in blocks: - self.db.put(block, BlockStorage.BLOCK) + self.db.put(block, legacy.BlockStorage.BLOCK) got_blocks = self.db.get_last_n(3, 5) @@ -621,7 +618,7 @@ def test_store_and_get_tx(self): 'key': 'value' } - self.db.put(tx, BlockStorage.TX) + self.db.put(tx, legacy.BlockStorage.TX) tx_got = self.db.get_tx(h='something') @@ -718,7 +715,7 @@ def test_delete_tx(self): 'key': 'value' } - self.db.put(tx, BlockStorage.TX) + self.db.put(tx, legacy.BlockStorage.TX) t = self.db.get_tx(h='something') @@ -753,7 +750,7 @@ def test_return_id_noid_false_tx(self): 'key': 'value' } - self.db.put(tx, BlockStorage.TX) + self.db.put(tx, legacy.BlockStorage.TX) t = self.db.get_tx(h='something', no_id=False) diff --git a/tests/unit/test_misc.py b/tests/unit/test_misc.py index 0f7a2c6a1..7676c05db 100644 --- a/tests/unit/test_misc.py +++ b/tests/unit/test_misc.py @@ -20,4 +20,4 @@ def test_storing_large_int(self): b = json.dumps(block) _b = json.loads(b, parse_int=decimal.Decimal) - self.client.store_block(_b) \ No newline at end of file + self.client.store_block(_b) diff --git a/tests/unit/test_new_storage.py b/tests/unit/test_new_storage.py new file mode 100644 index 000000000..0ed8e05b4 --- /dev/null +++ b/tests/unit/test_new_storage.py @@ -0,0 +1,818 @@ +from contracting.db.driver import ContractDriver +from lamden import storage +from lamden.storage import BlockStorage +from unittest import TestCase +import copy +import json + +class TestNonce(TestCase): + def setUp(self): + self.nonces = storage.NonceStorage() + self.nonces.flush() + + def tearDown(self): + self.nonces.flush() + + def test_get_nonce_none_if_not_set_first(self): + n = self.nonces.get_nonce( + sender='test', + processor='test2' + ) + + self.assertIsNone(n) + + def test_get_pending_nonce_none_if_not_set_first(self): + n = self.nonces.get_pending_nonce( + sender='test', + processor='test2' + ) + + self.assertIsNone(n) + + def test_set_then_get_nonce_returns_set_nonce(self): + self.nonces.set_nonce( + sender='test', + processor='test2', + value=2 + ) + + n = self.nonces.get_nonce( + sender='test', + processor='test2' + ) + + self.assertEqual(n, 2) + + def test_set_then_get_pending_nonce_returns_set_pending_nonce(self): + self.nonces.set_pending_nonce( + sender='test', + processor='test2', + value=2 + ) + + n = self.nonces.get_pending_nonce( + sender='test', + processor='test2' + ) + + self.assertEqual(n, 2) + + def test_get_latest_nonce_zero_if_none_set(self): + n = self.nonces.get_latest_nonce( + sender='test', + processor='test2' + ) + + self.assertEqual(n, 0) + + def test_get_latest_nonce_returns_pending_nonce_if_not_none(self): + self.nonces.set_pending_nonce( + sender='test', + processor='test2', + value=2 + ) + + n = self.nonces.get_latest_nonce( + sender='test', + processor='test2' + ) + + self.assertEqual(n, 2) + + def test_get_latest_nonce_nonce_if_pending_nonce_is_none(self): + self.nonces.set_nonce( + sender='test', + processor='test2', + value=2 + ) + + n = self.nonces.get_latest_nonce( + sender='test', + processor='test2' + ) + + self.assertEqual(n, 2) + + +class TestStorage(TestCase): + def setUp(self): + self.driver = ContractDriver() + self.driver.flush() + + def tearDown(self): + self.driver.flush() + + def test_get_latest_block_hash_0s_if_none(self): + h = storage.get_latest_block_hash(self.driver) + self.assertEqual(h, '0' * 64) + + def test_get_latest_block_hash_correct_after_set(self): + storage.set_latest_block_hash('a' * 64, self.driver) + h = storage.get_latest_block_hash(self.driver) + self.assertEqual(h, 'a' * 64) + + def test_get_latest_block_height_0_if_none(self): + h = storage.get_latest_block_height(self.driver) + self.assertEqual(h, 0) + + def test_get_latest_block_height_correct_after_set(self): + storage.set_latest_block_height(123, self.driver) + h = storage.get_latest_block_height(self.driver) + self.assertEqual(h, 123) + + +tx_1 = { + 'transaction': { + 'payload': { + 'sender': 'abc', + 'processor': 'def', + 'nonce': 123, + } + }, + 'state': [ + { + 'key': 'hello', 'value': 'there' + }, + { + 'key': 'name', 'value': 'jeff' + } + ] +} + +tx_2 = { + 'transaction': { + 'payload': { + 'sender': 'abc', + 'processor': 'def', + 'nonce': 124, + } + }, + 'state': [ + { + 'key': 'hello', 'value': 'there2' + }, + { + 'key': 'name2', 'value': 'jeff2' + } + ] +} + +tx_3 = { + 'transaction': { + 'payload': { + 'sender': 'xxx', + 'processor': 'yyy', + 'nonce': 42, + } + }, + 'state': [ + { + 'key': 'another', 'value': 'value' + }, + { + 'key': 'something', 'value': 'else' + } + ] +} + + +block = { + 'hash': 'f' * 64, + 'number': 555, + 'subblocks': [ + { + 'transactions': [tx_1, tx_2] + }, + { + 'transactions': [tx_3] + } + ] +} + + +class TestUpdatingState(TestCase): + def setUp(self): + self.driver = ContractDriver() + self.nonces = storage.NonceStorage() + self.nonces.flush() + self.driver.flush() + self.driver.clear_pending_state() + + def tearDown(self): + self.nonces.flush() + self.driver.flush() + self.driver.clear_pending_state() + + def test_state_updated_to_correct_values_in_tx(self): + v1 = self.driver.get('hello') + v2 = self.driver.get('name') + + self.assertIsNone(v1) + self.assertIsNone(v2) + + storage.update_state_with_transaction( + tx=tx_1, + driver=self.driver, + nonces=self.nonces + ) + + v1 = self.driver.get('hello') + v2 = self.driver.get('name') + + self.assertEqual(v1, 'there') + self.assertEqual(v2, 'jeff') + + def test_nonces_set_to_tx_value(self): + n = self.nonces.get_latest_nonce(sender='abc', processor='def') + self.assertEqual(n, 0) + + storage.update_state_with_transaction( + tx=tx_1, + driver=self.driver, + nonces=self.nonces + ) + + n = self.nonces.get_latest_nonce(sender='abc', processor='def') + self.assertEqual(n, 124) + + def test_nonces_deleted_after_all_updates(self): + self.nonces.set_pending_nonce( + sender='abc', + processor='def', + value=122 + ) + + n = self.nonces.get_pending_nonce(sender='abc', processor='def') + + self.assertEqual(n, 122) + + storage.update_state_with_transaction( + tx=tx_1, + driver=self.driver, + nonces=self.nonces + ) + + n = self.nonces.get_pending_nonce(sender='abc', processor='def') + + self.assertEqual(n, None) + + def test_multiple_txs_deletes_multiple_nonces(self): + self.nonces.set_pending_nonce( + sender='abc', + processor='def', + value=122 + ) + + n = self.nonces.get_pending_nonce(sender='abc', processor='def') + self.assertEqual(n, 122) + + self.nonces.set_pending_nonce( + sender='xxx', + processor='yyy', + value=4 + ) + + n = self.nonces.get_pending_nonce(sender='xxx', processor='yyy') + self.assertEqual(n, 4) + + storage.update_state_with_transaction( + tx=tx_1, + driver=self.driver, + nonces=self.nonces + ) + + storage.update_state_with_transaction( + tx=tx_2, + driver=self.driver, + nonces=self.nonces + ) + + storage.update_state_with_transaction( + tx=tx_3, + driver=self.driver, + nonces=self.nonces + ) + + n = self.nonces.get_pending_nonce(sender='abc', processor='def') + self.assertEqual(n, None) + + n = self.nonces.get_pending_nonce(sender='xxx', processor='yyy') + self.assertEqual(n, None) + + n = self.nonces.get_latest_nonce(sender='abc', processor='def') + self.assertEqual(n, 125) + + n = self.nonces.get_latest_nonce(sender='xxx', processor='yyy') + self.assertEqual(n, 43) + + def test_multiple_tx_state_updates_correctly(self): + v1 = self.driver.get('hello') + v2 = self.driver.get('name') + + v3 = self.driver.get('name2') + + v4 = self.driver.get('another') + v5 = self.driver.get('something') + + self.assertIsNone(v1) + self.assertIsNone(v2) + self.assertIsNone(v3) + self.assertIsNone(v4) + self.assertIsNone(v5) + + storage.update_state_with_transaction( + tx=tx_1, + driver=self.driver, + nonces=self.nonces + ) + + storage.update_state_with_transaction( + tx=tx_2, + driver=self.driver, + nonces=self.nonces + ) + + storage.update_state_with_transaction( + tx=tx_3, + driver=self.driver, + nonces=self.nonces + ) + + v1 = self.driver.get('hello') + v2 = self.driver.get('name') + + v3 = self.driver.get('name2') + + v4 = self.driver.get('another') + v5 = self.driver.get('something') + + self.assertEqual(v1, 'there2') + self.assertEqual(v2, 'jeff') + self.assertEqual(v3, 'jeff2') + self.assertEqual(v4, 'value') + self.assertEqual(v5, 'else') + + def test_update_with_block_sets_hash_and_height(self): + _hash = storage.get_latest_block_hash(self.driver) + num = storage.get_latest_block_height(self.driver) + + self.assertEqual(_hash, '0' * 64) + self.assertEqual(num, 0) + + storage.update_state_with_block( + block=block, + driver=self.driver, + nonces=self.nonces + ) + + _hash = storage.get_latest_block_hash(self.driver) + num = storage.get_latest_block_height(self.driver) + + self.assertEqual(_hash, 'f' * 64) + self.assertEqual(num, 555) + + def test_update_with_block_sets_nonces_correctly(self): + self.nonces.set_pending_nonce( + sender='abc', + processor='def', + value=122 + ) + + n = self.nonces.get_pending_nonce(sender='abc', processor='def') + self.assertEqual(n, 122) + + self.nonces.set_pending_nonce( + sender='xxx', + processor='yyy', + value=4 + ) + + n = self.nonces.get_pending_nonce(sender='xxx', processor='yyy') + self.assertEqual(n, 4) + + storage.update_state_with_block( + block=block, + driver=self.driver, + nonces=self.nonces + ) + + n = self.nonces.get_pending_nonce(sender='abc', processor='def') + self.assertEqual(n, None) + + n = self.nonces.get_pending_nonce(sender='xxx', processor='yyy') + self.assertEqual(n, None) + + n = self.nonces.get_latest_nonce(sender='abc', processor='def') + self.assertEqual(n, 125) + + n = self.nonces.get_latest_nonce(sender='xxx', processor='yyy') + self.assertEqual(n, 43) + + def test_update_state_with_block_sets_state_correctly(self): + v1 = self.driver.get('hello') + v2 = self.driver.get('name') + + v3 = self.driver.get('name2') + + v4 = self.driver.get('another') + v5 = self.driver.get('something') + + self.assertIsNone(v1) + self.assertIsNone(v2) + self.assertIsNone(v3) + self.assertIsNone(v4) + self.assertIsNone(v5) + + storage.update_state_with_block( + block=block, + driver=self.driver, + nonces=self.nonces + ) + + v1 = self.driver.get('hello') + v2 = self.driver.get('name') + + v3 = self.driver.get('name2') + + v4 = self.driver.get('another') + v5 = self.driver.get('something') + + self.assertEqual(v1, 'there2') + self.assertEqual(v2, 'jeff') + self.assertEqual(v3, 'jeff2') + self.assertEqual(v4, 'value') + self.assertEqual(v5, 'else') + + +class TestStorage(TestCase): + def setUp(self): + self.db = storage.BlockStorage() + + def tearDown(self): + self.db.flush() + + def test_cull_transaction_works_single_sb_and_tx(self): + block = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] + } + ] + } + + tx = { + 'hash': 'XXX', + 'foo': 'bar' + } + + txs, hashes = self.db.cull_txs(block) + expected_txs = [tx] + expected_hashes = ['XXX'] + + self.assertEqual(txs, expected_txs) + self.assertEqual(hashes, expected_hashes) + + def test_cull_transaction_works_single_sb_multi_txs(self): + block = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + } + ] + } + ] + } + + expected_txs = [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + } + ] + + expected_hashes = ['XXX', 'XXY', 'XXF'] + + txs, hashes = self.db.cull_txs(block) + + self.assertEqual(txs, expected_txs) + self.assertEqual(hashes, expected_hashes) + + def test_cull_transaction_works_multi_sb_multi_txs(self): + block = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + } + ] + }, + { + 'transactions': [ + { + 'hash': 'YYY', + 'foo3': 'bar3' + }, + { + 'hash': 'YYX', + 'foo4': 'bar4' + }, + { + 'hash': 'YSX', + 'foo5': 'bar5' + } + ] + } + ] + } + + expected_txs = [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + }, + { + 'hash': 'YYY', + 'foo3': 'bar3' + }, + { + 'hash': 'YYX', + 'foo4': 'bar4' + }, + { + 'hash': 'YSX', + 'foo5': 'bar5' + } + ] + + expected_hashes = ['XXX', 'XXY', 'XXF', 'YYY', 'YYX', 'YSX'] + + txs, hashes = self.db.cull_txs(block) + + self.assertEqual(txs, expected_txs) + self.assertEqual(hashes, expected_hashes) + + def test_write_block_stores_block_by_num(self): + block = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] + } + ] + } + + self.db.write_block(block) + + filename = ('0' * 63) + '1' + + with open(self.db.blocks_dir.joinpath(filename)) as f: + b = json.load(f) + + self.assertEqual(block, b) + + def test_write_block_stores_symlink_by_hash(self): + block = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] + } + ] + } + + self.db.write_block(block) + + with open(self.db.blocks_alias_dir.joinpath(block.get('hash'))) as f: + b = json.load(f) + + self.assertEqual(block, b) + + def test_write_txs_stores_transactions_by_hash_and_payload(self): + block = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] + } + ] + } + + txs, hashes = self.db.cull_txs(block) + + self.db.write_txs(txs, hashes) + + with open(self.db.txs_dir.joinpath('XXX')) as f: + t = json.load(f) + + self.assertEqual(txs[0], t) + + def test_store_block_completes_loop(self): + block = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] + } + + self.db.store_block(block) + + with open(self.db.txs_dir.joinpath('XXX')) as f: + t = json.load(f) + + _t = { + 'hash': 'XXX', + 'foo': 'bar' + } + + self.assertEqual(t, _t) + + filename = ('0' * 63) + '1' + with open(self.db.blocks_dir.joinpath(filename)) as f: + b = json.load(f) + + self.assertEqual(b, block) + + with open(self.db.blocks_alias_dir.joinpath('a')) as f: + bb = json.load(f) + + self.assertEqual(bb, block) + + def test_get_block(self): + block = { + 'hash': 'a', + 'number': 1, + 'data': 'woop', + 'subblocks':[] + } + + self.db.store_block(block) + + got_block = self.db.get_block(1) + + self.assertEqual(block, got_block) + + def test_get_block_hash(self): + block = { + 'hash': 'a', + 'number': 1, + 'data': 'woop', + 'subblocks': [] + } + + self.db.store_block(block) + + got_block = self.db.get_block('a') + + self.assertEqual(block, got_block) + + def test_get_none_block(self): + block = { + 'hash': 'a', + 'number': 1, + 'data': 'woop' + } + + self.db.store_block(block) + + got_block = self.db.get_block('b') + + self.assertIsNone(got_block) + + def test_got_none_block_num(self): + block = { + 'hash': 'a', + 'number': 1, + 'data': 'woop', + 'subblocks': [] + } + + self.db.store_block(block) + + got_block = self.db.get_block(2) + + self.assertIsNone(got_block) + + def test_get_non_existant_tx_returns_none(self): + tx_got = self.db.get_tx(h='something') + + self.assertIsNone(tx_got) + + def test_store_block_stores_txs_and_block(self): + tx_1 = { + 'hash': 'something1', + 'key': '1' + } + + tx_2 = { + 'hash': 'something2', + 'key': '2' + } + + tx_3 = { + 'hash': 'something3', + 'key': '3' + } + + block = { + 'hash': 'hello', + 'number': 1, + 'subblocks': [ + { + 'transactions': [tx_1, tx_2, tx_3] + } + ] + } + + expected = copy.deepcopy(block) + + self.db.store_block(block) + + got_1 = self.db.get_tx(h='something1') + got_2 = self.db.get_tx(h='something2') + got_3 = self.db.get_tx(h='something3') + + self.assertDictEqual(tx_1, got_1) + self.assertDictEqual(tx_2, got_2) + self.assertDictEqual(tx_3, got_3) + + got_block = self.db.get_block('hello') + + self.assertDictEqual(expected, got_block) + + def test_get_block_v_none_returns_none(self): + self.assertIsNone(self.db.get_block())