diff --git a/lamden/db_config.py b/lamden/db_config.py deleted file mode 100644 index f7ead22bc..000000000 --- a/lamden/db_config.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - - -MASTER_DB = 0 - -DATA_DIR = os.getenv('DATADIR') - -if DATA_DIR is None: - DATA_DIR = '/usr/local/db/lamden' - - -MONGO_DIR = DATA_DIR + '/mongo' -MONGO_LOG_PATH = MONGO_DIR + '/logs/mongo.log' - - -def config_mongo_dir(): - try: - os.makedirs(os.path.dirname(MONGO_LOG_PATH), exist_ok=True) - with open(MONGO_LOG_PATH, 'w+') as f: - print('Mongo log file created at path {}'.format(MONGO_LOG_PATH)) - except Exception as e: - print("Error creating mongo log file at path {}. Error -- {}".format(MONGO_LOG_PATH, e)) diff --git a/lamden/network.py b/lamden/network.py index 298280ab2..66e60d3b7 100644 --- a/lamden/network.py +++ b/lamden/network.py @@ -180,3 +180,6 @@ def all_vks_found(self, vks): if self.peers.get(vk) is None: return False return True + +def discover_peer(vk, ip): + pass \ No newline at end of file diff --git a/lamden/nodes/base.py b/lamden/nodes/base.py index a2fa2fe2a..3d5ec09ad 100644 --- a/lamden/nodes/base.py +++ b/lamden/nodes/base.py @@ -245,7 +245,8 @@ async def catchup(self, mn_seed, mn_vk): wallet=self.wallet, ctx=self.ctx ) - self.process_new_block(block) + if not block.get('error'): + self.process_new_block(block) # Process any blocks that were made while we were catching up while len(self.new_block_processor.q) > 0: @@ -318,8 +319,6 @@ def update_state(self, block): client=self.client ) - #self.nonces.flush_pending() - self.log.info('Updating metadata.') self.current_height = storage.get_latest_block_height(self.driver) self.current_hash = storage.get_latest_block_hash(self.driver) @@ -335,15 +334,14 @@ def process_new_block(self, block): if self.store: encoded_block = encode(block) encoded_block = json.loads(encoded_block) - - self.blocks.store_block(encoded_block) - # create Event File self.event_writer.write_event(Event( topics=[NEW_BLOCK_EVENT], data=encoded_block )) + self.blocks.store_block(block) + # Prepare for the next block by flushing out driver and notification state # self.new_block_processor.clean() diff --git a/lamden/nodes/delegate/delegate.py b/lamden/nodes/delegate/delegate.py index 42c8a1a06..3d2fedab2 100644 --- a/lamden/nodes/delegate/delegate.py +++ b/lamden/nodes/delegate/delegate.py @@ -38,9 +38,11 @@ def __init__(self, client: ContractingClient, nonces: storage.NonceStorage, debu async def process_message(self, msg): self.log.info(f'Received work from {msg["sender"][:8]}') - # if msg['sender'] not in self.masters: - # self.log.error(f'TX Batch received from non-master {msg["sender"][:8]}') - # return + self.log.info(msg) + + if msg['sender'] not in self.masters: + self.log.error(f'TX Batch received from non-master {msg["sender"][:8]}') + return shim = { 'transactions': [], diff --git a/lamden/nodes/events.py b/lamden/nodes/events.py index a63c6bda0..4b0036bbd 100644 --- a/lamden/nodes/events.py +++ b/lamden/nodes/events.py @@ -20,6 +20,7 @@ from sanic import Sanic import json import argparse +from contracting.db.encoder import encode, decode EVENTS_HOME = pathlib.Path().home().joinpath('.lamden').joinpath('events') EXTENSION = '.e' @@ -51,8 +52,7 @@ def get_events(self) -> List[Event]: with open(file, 'r') as f: try: e = json.load(f) - event = Event(e['topics'], e['data']) - events.append(event) + events.append(Event(e['topics'], e['data'])) except: # TODO(nikita): proper handling print('failed to load event') diff --git a/lamden/nodes/masternode/masternode.py b/lamden/nodes/masternode/masternode.py index f9c7da2b7..10f730eff 100644 --- a/lamden/nodes/masternode/masternode.py +++ b/lamden/nodes/masternode/masternode.py @@ -43,7 +43,7 @@ def get_block(self, command): block = self.blocks.get_block(num) if block is None: - return None + return {"error": "block does not exist"} return block diff --git a/lamden/nodes/masternode/webserver.py b/lamden/nodes/masternode/webserver.py index c504168de..033cceb09 100644 --- a/lamden/nodes/masternode/webserver.py +++ b/lamden/nodes/masternode/webserver.py @@ -153,7 +153,7 @@ async def disconnect(): @self.sio.event async def event(data): for client in self.ws_clients: - await client.send(json.dumps(data)) + await client.send(encode(data)) def __register_app_listeners(self): @self.app.listener('after_server_start') @@ -180,7 +180,7 @@ async def ws_handler(self, request, ws): 'data': block } - await ws.send(json.dumps(eventData)) + await ws.send(encode(eventData)) async for message in ws: pass @@ -386,7 +386,7 @@ async def get_latest_block(self, request): num = storage.get_latest_block_height(self.driver) block = self.blocks.get_block(int(num)) - return response.json(block, dumps=NonceEncoder().encode, headers={'Access-Control-Allow-Origin': '*'}) + return response.json(block, dumps=encode, headers={'Access-Control-Allow-Origin': '*'}) async def get_latest_block_number(self, request): self.driver.clear_pending_state() @@ -417,7 +417,7 @@ async def get_block(self, request): return response.json({'error': 'Block not found.'}, status=400, headers={'Access-Control-Allow-Origin': '*'}) - return response.json(block, dumps=NonceEncoder().encode, headers={'Access-Control-Allow-Origin': '*'}) + return response.json(block, dumps=encode, headers={'Access-Control-Allow-Origin': '*'}) async def get_tx(self, request): _hash = request.args.get('hash') @@ -437,7 +437,7 @@ async def get_tx(self, request): return response.json({'error': 'Transaction not found.'}, status=400, headers={'Access-Control-Allow-Origin': '*'}) - return response.json(tx, dumps=NonceEncoder().encode, headers={'Access-Control-Allow-Origin': '*'}) + return response.json(tx, dumps=encode, headers={'Access-Control-Allow-Origin': '*'}) async def get_constitution(self, request): self.client.raw_driver.clear_pending_state() diff --git a/lamden/router.py b/lamden/router.py index c4b4cf4d9..d9289fc2e 100644 --- a/lamden/router.py +++ b/lamden/router.py @@ -277,3 +277,32 @@ async def secure_multicast(msg: dict, service, wallet: Wallet, peer_map: dict, c ) await asyncio.gather(*coroutines) + + +def build_secure_socket(wallet: Wallet, vk: str, ip: str, ctx: zmq.asyncio.Context, linger=500, cert_dir=DEFAULT_DIR): + socket = ctx.socket(zmq.DEALER) + socket.setsockopt(zmq.LINGER, linger) + socket.setsockopt(zmq.TCP_KEEPALIVE, 1) + + socket.curve_secretkey = wallet.curve_sk + socket.curve_publickey = wallet.curve_vk + + filename = str(cert_dir / f'{vk}.key') + if not os.path.exists(filename): + return None + + server_pub, _ = load_certificate(filename) + + socket.curve_serverkey = server_pub + + try: + socket.connect(ip) + except ZMQBaseError: + socket.close() + return None + + return socket + + +def discover_peer(): + pass \ No newline at end of file diff --git a/lamden/storage.py b/lamden/storage.py index a43471b40..94b16a258 100644 --- a/lamden/storage.py +++ b/lamden/storage.py @@ -1,106 +1,179 @@ from contracting.db.driver import ContractDriver -from pymongo import MongoClient, DESCENDING -from bson.decimal128 import Decimal128 -from bson.codec_options import TypeCodec, TypeEncoder, TypeDecoder -from bson.codec_options import TypeRegistry -from bson.codec_options import CodecOptions - -from decimal import Decimal - -import lamden from lamden.logger.base import get_logger from contracting.stdlib.bridge.decimal import ContractingDecimal +from contracting.db.driver import FSDriver + +from contracting.db.encoder import encode, decode, encode_kv + +import pathlib + +import json + +import os -BLOCK_HASH_KEY = '_current_block_hash' -BLOCK_NUM_HEIGHT = '_current_block_height' +import shutil + +BLOCK_HASH_KEY = '__latest_block.hash' +BLOCK_NUM_HEIGHT = '__latest_block.height' NONCE_KEY = '__n' PENDING_NONCE_KEY = '__pn' +STORAGE_HOME = pathlib.Path().home().joinpath('.lamden') + log = get_logger('STATE') -class DecimalEncoder(TypeEncoder): - python_type = Decimal # the Python type acted upon by this type codec +class BlockStorage: + def __init__(self, home=STORAGE_HOME): + self.home = home - def transform_python(self, value): - d = Decimal(str(value)) - return Decimal128(d) + self.blocks_dir = self.home.joinpath('blocks') + self.blocks_alias_dir = self.blocks_dir.joinpath('alias') + self.txs_dir = self.home.joinpath('txs') + self.build_directories() -class ContractingDecimalEncoder(TypeEncoder): - python_type = ContractingDecimal # the Python type acted upon by this type codec + def build_directories(self): + self.home.mkdir(exist_ok=True, parents=True) + self.blocks_dir.mkdir(exist_ok=True, parents=True) + self.blocks_alias_dir.mkdir(exist_ok=True, parents=True) + self.txs_dir.mkdir(exist_ok=True, parents=True) - def transform_python(self, value): - d = Decimal(str(value._d)) - return Decimal128(d) + def flush(self): + try: + shutil.rmtree(self.home) + self.build_directories() + except FileNotFoundError: + pass + def store_block(self, block): + if block.get('subblocks') is None: + return + + txs, hashes = self.cull_txs(block) + self.write_block(block) + self.write_txs(txs, hashes) -class DecimalDecoder(TypeDecoder): - bson_type = Decimal128 + @staticmethod + def cull_txs(block): + # Pops all transactions from the block and replaces them with the hash only for storage space + # Returns the data and hashes for storage in a different folder. Block is modified in place + txs = [] + hashes = [] + for subblock in block['subblocks']: + subblock_txs = [] + subblock_hashes = [] - def transform_bson(self, value): - return value.to_decimal() + for i in range(len(subblock['transactions'])): + tx = subblock['transactions'].pop(0) -# class ContractingDecimalCodec(TypeCodec): -# python_type = ContractingDecimal # the Python type acted upon by this type codec -# bson_type = Decimal128 # the BSON type acted upon by this type codec -# -# def transform_python(self, value): -# return Decimal128(value._d) -# -# def transform_bson(self, value): -# return value.to_decimal() + subblock_txs.append(tx) + subblock_hashes.append(tx['hash']) + subblock['transactions'] = subblock_hashes + try: + subblock['subblock'] = int(subblock['subblock']) + except: + pass -type_registry = TypeRegistry([DecimalDecoder(), DecimalEncoder(), ContractingDecimalEncoder()]) -codec_options = CodecOptions(type_registry=type_registry) + txs.extend(subblock_txs) + hashes.extend(subblock_hashes) + return txs, hashes -class NonceStorage: - def __init__(self, port=27027, db_name='lamden', nonce_collection='nonces', pending_collection='pending_nonces', config_path=lamden.__path__[0]): - self.config_path = config_path + def write_block(self, block): + num = block.get('number') - self.port = port + if type(num) == dict: + num = num.get('__fixed__') + block['number'] = num - self.client = MongoClient() - self.db = self.client.get_database(db_name) + name = str(num).zfill(64) - self.nonces = self.db.get_collection(nonce_collection, codec_options=codec_options) - self.pending_nonces = self.db.get_collection(pending_collection, codec_options=codec_options) + symlink_name = block.get('hash') - @staticmethod - def get_one(sender, processor, db): - v = db.find_one( - { - 'sender': sender, - 'processor': processor - } - ) + encoded_block = encode(block) + with open(self.blocks_dir.joinpath(name), 'w') as f: + f.write(encoded_block) + try: + os.symlink(self.blocks_dir.joinpath(name), self.blocks_alias_dir.joinpath(symlink_name)) + except FileExistsError: + pass + + def write_txs(self, txs, hashes): + for file, data in zip(hashes, txs): + with open(self.txs_dir.joinpath(file), 'w') as f: + encoded_tx = encode(data) + f.write(encoded_tx) + + def get_block(self, v=None, no_id=True): if v is None: return None - return v['value'] + try: + if isinstance(v, int): + f = open(self.blocks_dir.joinpath(str(v).zfill(64))) + else: + f = open(self.blocks_alias_dir.joinpath(v)) + except FileNotFoundError: + return None + + encoded_block = f.read() + + block = decode(encoded_block) + + f.close() + + self.fill_block(block) + + + + return block + + def fill_block(self, block): + for subblock in block['subblocks']: + txs = [] + for i in range(len(subblock['transactions'])): + tx = self.get_tx(subblock['transactions'][i]) + + txs.append(tx) + + subblock['transactions'] = txs + + def get_tx(self, h): + try: + f = open(self.txs_dir.joinpath(h)) + encoded_tx = f.read() + + tx = decode(encoded_tx) + + f.close() + except FileNotFoundError: + tx = None + + return tx + +class NonceStorage: + def __init__(self, nonce_collection=STORAGE_HOME.joinpath('nonces'), + pending_collection=STORAGE_HOME.joinpath('pending_nonces')): + self.nonces = FSDriver(root=nonce_collection) + self.pending_nonces = FSDriver(root=pending_collection) @staticmethod - def set_one(sender, processor, value, db): - db.update_one( - { - 'sender': sender, - 'processor': processor - }, - { - '$set': - { - 'value': value - } - }, upsert=True - ) + def get_one(sender, processor, db: FSDriver): + return db.get(f'__nonces.{processor}/{sender}') + @staticmethod + def set_one(sender, processor, value, db: FSDriver): + return db.set(f'__nonces.{processor}/{sender}', value) + + # Move this to transaction.py def get_nonce(self, sender, processor): return self.get_one(sender, processor, self.nonces) + # Move this to transaction.py def get_pending_nonce(self, sender, processor): return self.get_one(sender, processor, self.pending_nonces) @@ -110,6 +183,7 @@ def set_nonce(self, sender, processor, value): def set_pending_nonce(self, sender, processor, value): self.set_one(sender, processor, value, self.pending_nonces) + # Move this to webserver.py def get_latest_nonce(self, sender, processor): latest_nonce = self.get_pending_nonce(sender=sender, processor=processor) @@ -122,11 +196,11 @@ def get_latest_nonce(self, sender, processor): return latest_nonce def flush(self): - self.nonces.drop() - self.pending_nonces.drop() + self.nonces.flush() + self.pending_nonces.flush() def flush_pending(self): - self.pending_nonces.drop() + self.pending_nonces.flush() def get_latest_block_hash(driver: ContractDriver): @@ -135,11 +209,9 @@ def get_latest_block_hash(driver: ContractDriver): return '0' * 64 return latest_hash - def set_latest_block_hash(h, driver: ContractDriver): driver.driver.set(BLOCK_HASH_KEY, h) - def get_latest_block_height(driver: ContractDriver): h = driver.get(BLOCK_NUM_HEIGHT, mark=False) if h is None: @@ -150,13 +222,11 @@ def get_latest_block_height(driver: ContractDriver): return h - def set_latest_block_height(h, driver: ContractDriver): driver.driver.set(BLOCK_NUM_HEIGHT, h) - def update_state_with_transaction(tx, driver: ContractDriver, nonces: NonceStorage): - nonces_to_delete = [] + #nonces_to_delete = [] if tx['state'] is not None and len(tx['state']) > 0: for delta in tx['state']: @@ -169,11 +239,12 @@ def update_state_with_transaction(tx, driver: ContractDriver, nonces: NonceStora value=tx['transaction']['payload']['nonce'] + 1 ) - nonces_to_delete.append((tx['transaction']['payload']['sender'], tx['transaction']['payload']['processor'])) + #nonces_to_delete.append((tx['transaction']['payload']['sender'], tx['transaction']['payload']['processor'])) - for n in nonces_to_delete: - nonces.set_pending_nonce(*n, value=None) + #for n in nonces_to_delete: + # nonces.set_pending_nonce(*n, value=None) + nonces.flush_pending() def update_state_with_block(block, driver: ContractDriver, nonces: NonceStorage, set_hash_and_height=True): if block.get('subblocks') is not None: @@ -186,114 +257,3 @@ def update_state_with_block(block, driver: ContractDriver, nonces: NonceStorage, set_latest_block_hash(block['hash'], driver=driver) set_latest_block_height(block['number'], driver=driver) - -class BlockStorage: - BLOCK = 0 - TX = 1 - - def __init__(self, port=27027, config_path=lamden.__path__[0], db='lamden', blocks_collection='blocks', tx_collection='tx'): - # Setup configuration file to read constants - self.config_path = config_path - - self.port = port - - self.client = MongoClient() - self.db = self.client.get_database(db) - - self.blocks = self.db.get_collection(blocks_collection, codec_options=codec_options) - self.txs = self.db.get_collection(tx_collection, codec_options=codec_options) - - def q(self, v): - if isinstance(v, int): - return {'number': v} - return {'hash': v} - - def get_block(self, v=None, no_id=True): - if v is None: - return None - - q = self.q(v) - block = self.blocks.find_one(q) - - if block is not None and no_id: - block.pop('_id') - - return block - - def put(self, data, collection=BLOCK): - if collection == BlockStorage.BLOCK: - _id = self.blocks.insert_one(data) - log.debug(data) - del data['_id'] - elif collection == BlockStorage.TX: - _id = self.txs.insert_one(data) - del data['_id'] - else: - return False - - return _id is not None - - def get_last_n(self, n, collection=BLOCK): - if collection == BlockStorage.BLOCK: - c = self.blocks - else: - return None - - block_query = c.find({}, {'_id': False}).sort( - 'number', DESCENDING - ).limit(n) - - blocks = [block for block in block_query] - - if len(blocks) > 1: - first_block_num = blocks[0].get('number') - last_block_num = blocks[-1].get('number') - - assert first_block_num > last_block_num, "Blocks are not descending." - - return blocks - - def get_tx(self, h, no_id=True): - tx = self.txs.find_one({'hash': h}) - - if tx is not None and no_id: - tx.pop('_id') - - return tx - - def drop_collections(self): - self.blocks.drop() - self.txs.drop() - - def flush(self): - self.drop_collections() - - def store_block(self, block): - if block.get('number') is not None: - block['number'] = int(block['number']) - - self.put(block, BlockStorage.BLOCK) - self.store_txs(block) - - def store_txs(self, block): - if block.get('subblocks') is None: - return - - for subblock in block['subblocks']: - for tx in subblock['transactions']: - self.put(tx, BlockStorage.TX) - - def delete_tx(self, h): - self.txs.delete_one({'hash': h}) - - def delete_block(self, v): - block = self.get_block(v, no_id=False) - - if block is None: - return - - for subblock in block['subblocks']: - for tx in subblock['transactions']: - self.delete_tx(tx['hash']) - - self.blocks.delete_one({'_id': block['_id']}) diff --git a/lamden/utils/__init__.py b/lamden/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lamden/utils/legacy.py b/lamden/utils/legacy.py new file mode 100644 index 000000000..ec07b0ca2 --- /dev/null +++ b/lamden/utils/legacy.py @@ -0,0 +1,97 @@ +from contracting.db.driver import ContractDriver +from pymongo import MongoClient, DESCENDING + +from bson.decimal128 import Decimal128 +from bson.codec_options import TypeCodec, TypeEncoder, TypeDecoder +from bson.codec_options import TypeRegistry +from bson.codec_options import CodecOptions + +from decimal import Decimal + +import lamden +from lamden.logger.base import get_logger +from contracting.stdlib.bridge.decimal import ContractingDecimal + +BLOCK_HASH_KEY = '_current_block_hash' +BLOCK_NUM_HEIGHT = '_current_block_height' +NONCE_KEY = '__n' +PENDING_NONCE_KEY = '__pn' + +log = get_logger('STATE') + + +class DecimalEncoder(TypeEncoder): + python_type = Decimal # the Python type acted upon by this type codec + + def transform_python(self, value): + return Decimal128(value) + + +class ContractingDecimalEncoder(TypeEncoder): + python_type = ContractingDecimal # the Python type acted upon by this type codec + + def transform_python(self, value): + return Decimal128(value._d) + + +class DecimalDecoder(TypeDecoder): + bson_type = Decimal128 + + def transform_bson(self, value): + return int(value.to_decimal()) + + +type_registry = TypeRegistry([DecimalDecoder(), DecimalEncoder(), ContractingDecimalEncoder()]) +codec_options = CodecOptions(type_registry=type_registry) + + +def get_latest_block_hash(driver: ContractDriver): + latest_hash = driver.get(BLOCK_HASH_KEY, mark=False) + if latest_hash is None: + return '0' * 64 + return latest_hash + + +def set_latest_block_hash(h, driver: ContractDriver): + driver.driver.set(BLOCK_HASH_KEY, h) + + +def get_latest_block_height(driver: ContractDriver): + h = driver.get(BLOCK_NUM_HEIGHT, mark=False) + if h is None: + return 0 + + if type(h) == ContractingDecimal: + h = int(h._d) + + return h + + +class LegacyBlockStorage: + def __init__(self, port=27027, config_path=lamden.__path__[0], db='lamden', blocks_collection='blocks'): + # Setup configuration file to read constants + self.config_path = config_path + + self.port = port + + self.client = MongoClient() + self.db = self.client.get_database(db) + + self.blocks = self.db.get_collection(blocks_collection, codec_options=codec_options) + + def q(self, v): + if isinstance(v, int): + return {'number': v} + return {'hash': v} + + def get_block(self, v=None, no_id=True): + if v is None: + return None + + q = self.q(v) + block = self.blocks.find_one(q) + + if block is not None and no_id: + block.pop('_id') + + return block diff --git a/lamden/utils/migrate_mongo.py b/lamden/utils/migrate_mongo.py new file mode 100644 index 000000000..5b6513c69 --- /dev/null +++ b/lamden/utils/migrate_mongo.py @@ -0,0 +1,148 @@ +import legacy + +from lamden import storage, rewards +from lamden.contracts import sync +from contracting.db.driver import ContractDriver, encode, Driver, FSDriver +import lamden +import json +from contracting.client import ContractingClient +import gc +from lamden.logger.base import get_logger +import decimal + +import pathlib + +log = get_logger('MIGRATE') + +class MigrationNode: + def __init__(self, + constitution=pathlib.Path.home().joinpath('constitution.json'), + debug=True, store=True, seed=None, bypass_catchup=False, + genesis_path=lamden.contracts.__path__[0], + reward_manager=rewards.RewardManager(), + nonces=storage.NonceStorage()): + + self.new_blocks = storage.BlockStorage() + self.old_blocks = legacy.LegacyBlockStorage() + + self.blocks = self.new_blocks + + # Has the new FSDriver + self.new_driver = ContractDriver() + self.new_driver.driver = FSDriver() + + # Does not have the new FSDriver + self.old_driver = ContractDriver() + self.old_driver.driver = Driver() + + self.driver = self.new_driver + self.nonces = nonces + self.store = store + + self.seed = seed + + self.log = get_logger('Base') + self.log.propagate = debug + + self.genesis_path = genesis_path + + self.client = ContractingClient( + driver=self.driver, + submission_filename=genesis_path + '/submission.s.py' + ) + + self.constitution = constitution + + self.reward_manager = reward_manager + + self.bypass_catchup = bypass_catchup + + with open(constitution) as f: + self.constitution = json.load(f) + + self.seed_genesis_contracts() + + def seed_genesis_contracts(self): + self.log.info('Setting up genesis contracts.') + sync.setup_genesis_contracts( + initial_masternodes=self.constitution['masternodes'], + initial_delegates=self.constitution['delegates'], + client=self.client, + filename=self.genesis_path + '/genesis.json', + root=self.genesis_path + ) + + def catchup(self, current=0): + # Get the current latest block stored and the latest block of the network + log.info('Running migration.') + + latest = legacy.get_latest_block_height(self.old_driver) + + log.info(f'Current block: {current}, Latest available block: {latest}') + + # Increment current by one. Don't count the genesis block. + if current == 0: + current = 1 + + # Find the missing blocks process them + for i in range(current, latest+1): + block = self.old_blocks.get_block(v=i) + log.info(f'current: {i}, block: {block}') + + if block is not None: + self.process_new_block(block) + + def should_process(self, block): + try: + self.log.info(f'Processing block #{block.get("number")}') + except: + self.log.error('Malformed block :(') + return False + # Test if block failed immediately + if block == {'response': 'ok'}: + return False + + if block['hash'] == 'f' * 64: + self.log.error('Failed Block! Not storing.') + return False + + return True + + def update_state(self, block): + self.driver.clear_pending_state() + + # Check if the block is valid + if self.should_process(block): + self.log.info('Storing new block.') + # Commit the state changes and nonces to the database + storage.update_state_with_block( + block=block, + driver=self.driver, + nonces=self.nonces + ) + + self.log.info('Issuing rewards.') + # Calculate and issue the rewards for the governance nodes + self.reward_manager.issue_rewards( + block=block, + client=self.client + ) + + self.log.info('Updating metadata.') + + def process_new_block(self, block): + # Update the state and refresh the sockets so new nodes can join + self.update_state(block) + + # Store the block if it's a masternode + if self.store: + self.blocks.store_block(block) + + # Prepare for the next block by flushing out driver and notification state + self.driver.commit() + self.driver.clear_pending_state() + gc.collect() + +if __name__ == '__main__': + mn = MigrationNode() + mn.catchup() diff --git a/rsyncd.conf b/rsyncd.conf new file mode 100644 index 000000000..6d8c4acb1 --- /dev/null +++ b/rsyncd.conf @@ -0,0 +1,11 @@ +pid file = /var/run/rsyncd.pid +lock file = /var/run/rsync.lock +log file = /var/log/rsync.log + +[lamden] +path = /var/lib/lamden +read only = true +list = yes +timeout = 300 +uid = nobody +gid = nogroup diff --git a/tests/unit/test_class_webserver.py b/tests/unit/test_class_webserver.py index 7d36b6887..e14f7cc25 100644 --- a/tests/unit/test_class_webserver.py +++ b/tests/unit/test_class_webserver.py @@ -40,13 +40,12 @@ def setUp(self): ) self.ws.client.flush() - self.blocks.flush() + self.ws.blocks.flush() self.ws.driver.flush() - self.loop = asyncio.get_event_loop() def tearDown(self): self.ws.client.flush() - self.blocks.flush() + self.ws.blocks.flush() self.ws.driver.flush() def test_ping(self): @@ -290,21 +289,59 @@ def test_get_latest_block(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block) + self.ws.blocks.store_block(block) block2 = { 'hash': 'abb', 'number': 1000, - 'data': 'woop2' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] + } + + b2exp = { + 'hash': 'abb', + 'number': 1000, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block2) + storage.set_latest_block_height(1000, driver=self.ws.driver) + self.ws.blocks.store_block(block2) _, response = self.ws.app.test_client.get('/latest_block') - self.assertDictEqual(response.json, {'hash': 'abb', 'number': 1000, 'data': 'woop2'}) + print(response.json) + self.assertDictEqual(response.json, b2exp) def test_get_latest_block_num(self): storage.set_latest_block_height(1234, self.ws.driver) @@ -322,16 +359,42 @@ def test_get_latest_block_hash(self): def test_get_block_by_num_that_exists(self): block = { - 'hash': '1234', + 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block) + exp = { + 'hash': 'a', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] + } + + self.ws.blocks.store_block(block) _, response = self.ws.app.test_client.get('/blocks?num=1') - self.assertDictEqual(response.json, block) + self.assertDictEqual(response.json, exp) def test_get_block_by_num_that_doesnt_exist_returns_error(self): _, response = self.ws.app.test_client.get('/blocks?num=1000') @@ -342,17 +405,37 @@ def test_get_block_by_hash_that_exists(self): h = '1234' block = { - 'hash': h, - 'blockNum': 1, - 'data': 'woop' + 'hash': '1234', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } - self.blocks.put(block) + self.ws.blocks.store_block(block) expected = { - 'hash': h, - 'blockNum': 1, - 'data': 'woop' + 'hash': '1234', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, + ] } _, response = self.ws.app.test_client.get(f'/blocks?hash={h}') @@ -455,7 +538,8 @@ def test_fixed_objects_do_not_fail_signature(self): self.assertEqual(len(self.ws.queue), 1) def test_submit_transaction_error_if_queue_full(self): - self.ws.queue.extend(range(10_000)) + for i in range(10_000): + self.ws.queue.append(bytes(i)) tx = build_transaction( wallet=Wallet(), @@ -474,24 +558,33 @@ def test_submit_transaction_error_if_queue_full(self): self.assertDictEqual(response.json, {'error': 'Queue full. Resubmit shortly.'}) - self.ws.queue.clear() + self.ws.queue.flush() def test_get_tx_by_hash_if_it_exists(self): - b = '0' * 64 - - tx = { - 'hash': b, - 'some': 'data' + block = { + 'hash': '1234', + 'number': 1, + 'subblocks': [ + { + 'transactions': [ + { + 'hash': '123456', + 'foo': 'bar' + }, + + ] + }, + ] } expected = { - 'hash': b, - 'some': 'data' + 'hash': '123456', + 'foo': 'bar' } - self.blocks.put(tx, collection=self.ws.blocks.TX) + self.ws.blocks.store_block(block) - _, response = self.ws.app.test_client.get(f'/tx?hash={b}') + _, response = self.ws.app.test_client.get(f'/tx?hash=123456') self.assertDictEqual(response.json, expected) def test_malformed_tx_returns_error(self): diff --git a/tests/unit/test_storage.py b/tests/unit/test_storage.py index 220fb11d7..e4bd5e2ce 100644 --- a/tests/unit/test_storage.py +++ b/tests/unit/test_storage.py @@ -3,6 +3,8 @@ from unittest import TestCase from lamden.storage import BlockStorage +import json +import copy class TestNonce(TestCase): @@ -444,369 +446,335 @@ def test_update_state_with_block_sets_state_correctly(self): self.assertEqual(v5, 'else') -class TestMasterStorage(TestCase): +class TestStorage(TestCase): def setUp(self): - self.db = BlockStorage() + self.db = storage.BlockStorage() def tearDown(self): - self.db.drop_collections() - - def test_init(self): - self.assertTrue(self.db) - - def test_q_num(self): - q = self.db.q(1) - - self.assertEqual(q, {'number': 1}) + self.db.flush() - def test_q_hash(self): - q = self.db.q('1') - - self.assertEqual(q, {'hash': '1'}) - - def test_put_block(self): - block = { - 'hash': 'a', - 'number': 1, - 'data': 'woop' - } - - _id = self.db.put(block) - - self.assertTrue(_id) - - def test_get_block(self): + def test_cull_transaction_works_single_sb_and_tx(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] + } + ] } - _id = self.db.put(block) - - self.assertTrue(_id) + tx = { + 'hash': 'XXX', + 'foo': 'bar' + } - got_block = self.db.get_block(1) + txs, hashes = self.db.cull_txs(block) + expected_txs = [tx] + expected_hashes = ['XXX'] - self.assertEqual(block, got_block) + self.assertEqual(txs, expected_txs) + self.assertEqual(hashes, expected_hashes) - def test_get_block_hash(self): + def test_cull_transaction_works_single_sb_multi_txs(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + } + ] + } + ] } - _id = self.db.put(block) + expected_txs = [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + } + ] - self.assertTrue(_id) + expected_hashes = ['XXX', 'XXY', 'XXF'] - got_block = self.db.get_block('a') + txs, hashes = self.db.cull_txs(block) - self.assertEqual(block, got_block) + self.assertEqual(txs, expected_txs) + self.assertEqual(hashes, expected_hashes) - def test_get_none_block(self): + def test_cull_transaction_works_multi_sb_multi_txs(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + } + ] + }, + { + 'transactions': [ + { + 'hash': 'YYY', + 'foo3': 'bar3' + }, + { + 'hash': 'YYX', + 'foo4': 'bar4' + }, + { + 'hash': 'YSX', + 'foo5': 'bar5' + } + ] + } + ] } - _id = self.db.put(block) - - self.assertTrue(_id) - - got_block = self.db.get_block('b') - - self.assertIsNone(got_block) - - def test_got_none_block_num(self): + expected_txs = [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + { + 'hash': 'XXY', + 'foo': 'bar2' + }, + { + 'hash': 'XXF', + 'foo2': 'bar' + }, + { + 'hash': 'YYY', + 'foo3': 'bar3' + }, + { + 'hash': 'YYX', + 'foo4': 'bar4' + }, + { + 'hash': 'YSX', + 'foo5': 'bar5' + } + ] + + expected_hashes = ['XXX', 'XXY', 'XXF', 'YYY', 'YYX', 'YSX'] + + txs, hashes = self.db.cull_txs(block) + + self.assertEqual(txs, expected_txs) + self.assertEqual(hashes, expected_hashes) + + def test_write_block_stores_block_by_num(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] + } + ] } - _id = self.db.put(block) + self.db.write_block(block) - self.assertTrue(_id) + filename = ('0' * 63) + '1' - got_block = self.db.get_block(2) + with open(self.db.blocks_dir.joinpath(filename)) as f: + b = json.load(f) - self.assertIsNone(got_block) + self.assertEqual(block, b) - def test_drop_collections_block(self): + def test_write_block_stores_symlink_by_hash(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'subblocks': [ + { + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] + } + ] } - _id = self.db.put(block) - - self.assertTrue(_id) + self.db.write_block(block) - self.db.drop_collections() + with open(self.db.blocks_alias_dir.joinpath(block.get('hash'))) as f: + b = json.load(f) - got_block = self.db.get_block(1) + self.assertEqual(block, b) - self.assertIsNone(got_block) - - def test_put_other(self): - index = { + def test_write_txs_stores_transactions_by_hash_and_payload(self): + block = { 'hash': 'a', 'number': 1, - 'blockOwners': 'stu' - } - - _id = self.db.put(index, 999) - - self.assertFalse(_id) - - def test_get_last_n_blocks(self): - blocks = [] - - blocks.append({'hash': 'a', 'number': 1, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 2, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 3, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 4, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 5, 'data': 'woop'}) - - for block in blocks: - self.db.put(block) - - got_blocks = self.db.get_last_n(3, BlockStorage.BLOCK) - - nums = [b['number'] for b in got_blocks] - - self.assertEqual(nums, [5, 4, 3]) - - def test_get_last_n_index(self): - blocks = [] - - blocks.append({'hash': 'a', 'number': 1, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 2, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 3, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 4, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 5, 'data': 'woop'}) - - for block in blocks: - self.db.put(block, BlockStorage.BLOCK) - - got_blocks = self.db.get_last_n(3, BlockStorage.BLOCK) - - nums = [b['number'] for b in got_blocks] - - self.assertEqual(nums, [5, 4, 3]) - - def test_get_none_from_wrong_n_collection(self): - blocks = [] - - blocks.append({'hash': 'a', 'number': 1, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 2, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 3, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 4, 'data': 'woop'}) - blocks.append({'hash': 'a', 'number': 5, 'data': 'woop'}) - - for block in blocks: - self.db.put(block, BlockStorage.BLOCK) - - got_blocks = self.db.get_last_n(3, 5) - - self.assertIsNone(got_blocks) - - def test_store_and_get_tx(self): - tx = { - 'hash': 'something', - 'key': 'value' - } - - self.db.put(tx, BlockStorage.TX) - - tx_got = self.db.get_tx(h='something') - - self.assertDictEqual(tx, tx_got) - - def test_get_non_existant_tx_returns_none(self): - tx_got = self.db.get_tx(h='something') - - self.assertIsNone(tx_got) - - def test_store_txs_from_block_adds_all_txs(self): - tx_1 = { - 'hash': 'something1', - 'key': '1' - } - - tx_2 = { - 'hash': 'something2', - 'key': '2' - } - - tx_3 = { - 'hash': 'something3', - 'key': '3' - } - - block = { 'subblocks': [ { - 'transactions': [tx_1, tx_2, tx_3] + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + } + ] } ] } - self.db.store_txs(block) + txs, hashes = self.db.cull_txs(block) - got_1 = self.db.get_tx(h='something1') - got_2 = self.db.get_tx(h='something2') - got_3 = self.db.get_tx(h='something3') + self.db.write_txs(txs, hashes) - self.assertDictEqual(tx_1, got_1) - self.assertDictEqual(tx_2, got_2) - self.assertDictEqual(tx_3, got_3) + with open(self.db.txs_dir.joinpath('XXX')) as f: + t = json.load(f) - def test_store_block_stores_txs_and_block(self): - tx_1 = { - 'hash': 'something1', - 'key': '1' - } - - tx_2 = { - 'hash': 'something2', - 'key': '2' - } - - tx_3 = { - 'hash': 'something3', - 'key': '3' - } + self.assertEqual(txs[0], t) + def test_store_block_completes_loop(self): block = { - 'hash': 'hello', + 'hash': 'a', + 'number': 1, 'subblocks': [ { - 'transactions': [tx_1, tx_2, tx_3] - } + 'transactions': [ + { + 'hash': 'XXX', + 'foo': 'bar' + }, + + ] + }, ] } self.db.store_block(block) - got_1 = self.db.get_tx(h='something1') - got_2 = self.db.get_tx(h='something2') - got_3 = self.db.get_tx(h='something3') + with open(self.db.txs_dir.joinpath('XXX')) as f: + t = json.load(f) - self.assertDictEqual(tx_1, got_1) - self.assertDictEqual(tx_2, got_2) - self.assertDictEqual(tx_3, got_3) - - got_block = self.db.get_block('hello') - - self.assertDictEqual(block, got_block) - - def test_get_block_v_none_returns_none(self): - self.assertIsNone(self.db.get_block()) - - def test_delete_tx(self): - t = self.db.get_tx(h='something') - - self.assertIsNone(t) - - tx = { - 'hash': 'something', - 'key': 'value' + _t = { + 'hash': 'XXX', + 'foo': 'bar' } - self.db.put(tx, BlockStorage.TX) - - t = self.db.get_tx(h='something') + self.assertEqual(t, _t) - self.assertIsNotNone(t) + filename = ('0' * 63) + '1' + with open(self.db.blocks_dir.joinpath(filename)) as f: + b = json.load(f) - self.db.delete_tx(h='something') + self.assertEqual(b, block) - t = self.db.get_tx(h='something') + with open(self.db.blocks_alias_dir.joinpath('a')) as f: + bb = json.load(f) - self.assertIsNone(t) + self.assertEqual(bb, block) - def test_return_id_noid_false_block(self): + def test_get_block(self): block = { 'hash': 'a', 'number': 1, - 'data': 'woop' + 'data': 'woop', + 'subblocks':[] } - self.db.put(block) - - b = self.db.get_block('a', no_id=False) - - self.assertIsNotNone(b.get('_id')) + self.db.store_block(block) - b = self.db.get_block('a') + got_block = self.db.get_block(1) - self.assertIsNone(b.get('_id')) + self.assertEqual(block, got_block) - def test_return_id_noid_false_tx(self): - tx = { - 'hash': 'something', - 'key': 'value' + def test_get_block_hash(self): + block = { + 'hash': 'a', + 'number': 1, + 'data': 'woop', + 'subblocks': [] } - self.db.put(tx, BlockStorage.TX) - - t = self.db.get_tx(h='something', no_id=False) - - self.assertIsNotNone(t.get('_id')) + self.db.store_block(block) - t = self.db.get_tx(h='something', no_id=True) + got_block = self.db.get_block('a') - self.assertIsNone(t.get('_id')) + self.assertEqual(block, got_block) - def test_delete_block_deletes_block(self): - tx_1 = { - 'hash': 'something1', - 'key': '1' + def test_get_none_block(self): + block = { + 'hash': 'a', + 'number': 1, + 'data': 'woop' } - tx_2 = { - 'hash': 'something2', - 'key': '2' - } + self.db.store_block(block) - tx_3 = { - 'hash': 'something3', - 'key': '3' - } + got_block = self.db.get_block('b') + self.assertIsNone(got_block) + + def test_got_none_block_num(self): block = { - 'hash': 'hello', - 'subblocks': [ - { - 'transactions': [tx_1, tx_2, tx_3] - } - ] + 'hash': 'a', + 'number': 1, + 'data': 'woop', + 'subblocks': [] } self.db.store_block(block) - got_1 = self.db.get_tx(h='something1') - got_2 = self.db.get_tx(h='something2') - got_3 = self.db.get_tx(h='something3') - - self.assertDictEqual(tx_1, got_1) - self.assertDictEqual(tx_2, got_2) - self.assertDictEqual(tx_3, got_3) - - got_block = self.db.get_block('hello') + got_block = self.db.get_block(2) - self.assertDictEqual(block, got_block) + self.assertIsNone(got_block) - self.db.delete_block(v='hello') + def test_get_non_existant_tx_returns_none(self): + tx_got = self.db.get_tx(h='something') - self.assertIsNone(self.db.get_block(v='hello')) + self.assertIsNone(tx_got) - def test_delete_block_deletes_txs(self): + def test_store_block_stores_txs_and_block(self): tx_1 = { 'hash': 'something1', 'key': '1' @@ -824,6 +792,7 @@ def test_delete_block_deletes_txs(self): block = { 'hash': 'hello', + 'number': 1, 'subblocks': [ { 'transactions': [tx_1, tx_2, tx_3] @@ -831,6 +800,8 @@ def test_delete_block_deletes_txs(self): ] } + expected = copy.deepcopy(block) + self.db.store_block(block) got_1 = self.db.get_tx(h='something1') @@ -843,14 +814,7 @@ def test_delete_block_deletes_txs(self): got_block = self.db.get_block('hello') - self.assertDictEqual(block, got_block) + self.assertDictEqual(expected, got_block) - self.db.delete_block(v='hello') - - got_1 = self.db.get_tx(h='something1') - got_2 = self.db.get_tx(h='something2') - got_3 = self.db.get_tx(h='something3') - - self.assertIsNone(got_1) - self.assertIsNone(got_2) - self.assertIsNone(got_3) + def test_get_block_v_none_returns_none(self): + self.assertIsNone(self.db.get_block())