diff --git a/.gitignore b/.gitignore index 2e81a92..35d771e 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,5 @@ bin/ ### .idea -StreamLet.iml \ No newline at end of file +StreamLet.iml +output \ No newline at end of file diff --git a/config.txt b/config.txt index b0fe795..e4e7b80 100644 --- a/config.txt +++ b/config.txt @@ -5,8 +5,11 @@ P2P=127.0.0.1:54582 P2P=127.0.0.1:54583 P2P=127.0.0.1:54584 +# Agreed time for every server to start protocol dd-MM-yyyy HH:mm:ss +start=04-12-2025 08:36:00 + # NORMAL and DEBUG mode for logs -logLevel=NORMAL +logLevel=DEBUG # SIMULATION: transactions are generated randomly by the servers # CLIENT: waits for client to send transactions diff --git a/makefile b/makefile index d938a36..04b63dc 100644 --- a/makefile +++ b/makefile @@ -15,3 +15,4 @@ compile: $(OUT_DIR) clean: rm -rf out + rm -rf output diff --git a/src/Streamlet.java b/src/Streamlet.java index e93c9cd..07e4747 100644 --- a/src/Streamlet.java +++ b/src/Streamlet.java @@ -1,4 +1,4 @@ -import StreamletApp.StreamletNode; +import app.StreamletNode; import utils.ConfigParser; import utils.communication.PeerInfo; import utils.logs.AppLogger; @@ -14,14 +14,15 @@ void main(String[] args) throws IOException, InterruptedException { ConfigParser.ConfigData configData = ConfigParser.parseConfig(); - List peerInfos = configData.peers; - AppLogger.updateLoggerLevel(configData.logLevel); + LocalDateTime start = configData.start(); + + List peerInfos = configData.peers(); + AppLogger.updateLoggerLevel(configData.logLevel()); PeerInfo localPeer = peerInfos.get(nodeId); List remotePeers = peerInfos.stream().filter(p -> p.id() != nodeId).toList(); AppLogger.logDebug(remotePeers.toString()); - AppLogger.logInfo("Waiting all peers to connect..."); - StreamletNode node = new StreamletNode(localPeer, remotePeers, 1, configData.isClientGeneratingTransactions, configData.servers.get(nodeId)); + StreamletNode node = new StreamletNode(localPeer, remotePeers, 1, start, configData.isClientGeneratingTransactions(), configData.servers().get(nodeId)); node.startProtocol(); } diff --git a/src/StreamletApp/BlockchainManager.java b/src/StreamletApp/BlockchainManager.java deleted file mode 100644 index e8a1aea..0000000 --- a/src/StreamletApp/BlockchainManager.java +++ /dev/null @@ -1,111 +0,0 @@ -package StreamletApp; - -import utils.application.Block; -import utils.application.BlockWithChain; -import utils.application.Transaction; -import utils.logs.AppLogger; - -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.Gatherers; - -public class BlockchainManager { - public static final int FINALIZATION_MIN_SIZE = 3; - private static final int SHA1_LENGTH = 20; - private static final Block GENESIS_BLOCK = - new Block(new byte[SHA1_LENGTH], 0, 0, new Transaction[0]); - private final Set seenNotarizedChains = new HashSet<>(); - private final Set finalizedChains = new HashSet<>(); - private final HashMap pendingProposes = new HashMap<>(); - private LinkedList biggestNotarizedChain = new LinkedList<>(); - - public BlockchainManager() { - biggestNotarizedChain.add(GENESIS_BLOCK); - seenNotarizedChains.add(new ChainView(biggestNotarizedChain)); - } - - public void notarizeBlock(Block headerBlock) { - BlockWithChain proposal = pendingProposes.get(headerBlock); - if (proposal == null) { - return; - } - LinkedList chain = proposal.chain(); - chain.add(proposal.block()); - pendingProposes.remove(headerBlock); - seenNotarizedChains.add(new ChainView(chain)); - if (chain.getLast().length() > biggestNotarizedChain.getLast().length()) { - biggestNotarizedChain = chain; - } - AppLogger.logInfo("Block notarized: epoch " + headerBlock.epoch() + " length " + headerBlock.length()); - tryToFinalizeChain(chain); - } - - private void tryToFinalizeChain(LinkedList chain) { - int size = chain.size(); - if (size < FINALIZATION_MIN_SIZE) return; - boolean shouldChainBeFinalized = chain - .subList(chain.size() - FINALIZATION_MIN_SIZE, chain.size()) - .stream() - .map(Block::epoch) - .gather(Gatherers.windowSliding(2)) // zip xs $ tail xs - .map(window -> window.getLast() - window.getFirst()) - .allMatch(delta -> delta == 1); - if (shouldChainBeFinalized) { - finalizedChains.add(new ChainView(new LinkedList<>(biggestNotarizedChain.subList(0, size - 1)))); - } - } - - public boolean onPropose(BlockWithChain proposal) { - Block proposedBlock = proposal.block(); - LinkedList chain = proposal.chain(); - Block parentTip = chain.getLast(); - - if (!Arrays.equals(proposedBlock.parentHash(), parentTip.getSHA1())) return false; - - boolean isStrictlyLonger = seenNotarizedChains.stream() - .anyMatch(notarizedChain -> proposedBlock.length() > notarizedChain.blocks().getLast().length()); - if (!isStrictlyLonger) { - return false; - } - pendingProposes.put(proposedBlock, proposal); - return true; - } - - public void printBiggestFinalizedChain() { - final String GREEN = "\u001B[32m"; - final String RESET = "\u001B[0m"; - - String header = "=== LONGEST FINALIZED CHAIN ==="; - String border = "=".repeat(header.length()); - - LinkedList biggestFinalizedChain = finalizedChains.stream() - .max(Comparator.comparing(c -> c.blocks().getLast().length())) - .map(ChainView::blocks) - .orElse(new LinkedList<>()); - - String chainString = biggestFinalizedChain.stream() - .skip(1) - .map(block -> "%sBlock[%d-%d]%s".formatted(GREEN, block.epoch(), block.length(), RESET)) - .collect(Collectors.joining(" <- ", "%sGENESIS%s <- ".formatted(GREEN, RESET), "")); - - String output = String.format( - "%s%n%s%n%s%n%s%n%s", - border, - header, - border, - biggestFinalizedChain.isEmpty() ? "No Finalized Chain Yet" : chainString, - border - ); - - synchronized (AppLogger.class) { - for (String line : output.split("\n")) { - AppLogger.logInfo(line); - } - } - - } - - public LinkedList getBiggestNotarizedChain() { - return biggestNotarizedChain; - } -} diff --git a/src/StreamletApp/ChainView.java b/src/StreamletApp/ChainView.java deleted file mode 100644 index 0611251..0000000 --- a/src/StreamletApp/ChainView.java +++ /dev/null @@ -1,25 +0,0 @@ -package StreamletApp; - -import utils.application.Block; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.Objects; - -// this is just a wrapper to overwrite both hashcode and equals -record ChainView(LinkedList blocks) { - @Override - public int hashCode() { - return Objects.hash(blocks.stream().map(Block::getSHA1).toArray()); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ChainView(LinkedList blocks1))) return false; - if (blocks.size() != blocks1.size()) return false; - for (int i = 0; i < blocks.size(); i++) { - if (!Arrays.equals(blocks.get(i).getSHA1(), blocks1.get(i).getSHA1())) return false; - } - return true; - } -} \ No newline at end of file diff --git a/src/StreamletApp/StreamletNode.java b/src/StreamletApp/StreamletNode.java deleted file mode 100644 index 396e49c..0000000 --- a/src/StreamletApp/StreamletNode.java +++ /dev/null @@ -1,225 +0,0 @@ -package StreamletApp; - -import URB.URBNode; -import utils.application.*; -import utils.communication.Address; -import utils.communication.PeerInfo; -import utils.logs.AppLogger; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.net.ServerSocket; -import java.net.Socket; -import java.security.NoSuchAlgorithmException; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -record SeenProposal(int leader, int epoch) { -} - -public class StreamletNode { - public static final int BLOCK_CHAIN_PRINT_EPOCH_FREQUENCY = 5; - private static final int CONFUSION_START = 0; - private static final int CONFUSION_DURATION = 2; - private final int deltaInSeconds; - private final int numberOfDistinctNodes; - private final TransactionPoolSimulator transactionPoolSimulator; - private final Random random = new Random(1L); - private final AtomicInteger currentEpoch = new AtomicInteger(0); - - private final int localId; - private final URBNode urbNode; - private final BlockchainManager blockchainManager; - private final Map> votedBlocks = new HashMap<>(); - private final BlockingQueue derivableQueue = new LinkedBlockingQueue<>(1000); - private final Set seenProposals = new HashSet<>(); - private final ConcurrentLinkedQueue clientPendingTransactionsQueue = new ConcurrentLinkedQueue<>(); - - - private final ExecutorService executor = Executors.newCachedThreadPool(); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private final boolean isClientGeneratingTransactions; - private final Address myClientAddress; - - public StreamletNode(PeerInfo localPeerInfo, List remotePeersInfo, int deltaInSeconds, - boolean isClientGeneratingTransactions, Address myClientAddress) - throws IOException { - localId = localPeerInfo.id(); - numberOfDistinctNodes = 1 + remotePeersInfo.size(); - this.deltaInSeconds = deltaInSeconds; - transactionPoolSimulator = new TransactionPoolSimulator(numberOfDistinctNodes); - blockchainManager = new BlockchainManager(); - urbNode = new URBNode(localPeerInfo, remotePeersInfo, derivableQueue::add); - this.isClientGeneratingTransactions = isClientGeneratingTransactions; - this.myClientAddress = myClientAddress; - } - - public void startProtocol() throws InterruptedException { - launchThreads(); - urbNode.waitForAllPeersToConnect(); - - long epochDuration = 2L * deltaInSeconds; - scheduler.scheduleAtFixedRate(this::safeAdvanceEpoch, 0, epochDuration, TimeUnit.SECONDS); - } - - private void safeAdvanceEpoch() { - try { - advanceEpoch(); - } catch (Exception e) { - AppLogger.logError("Error advancing epoch: " + e.getMessage(), e); - } - } - - private void advanceEpoch() { - int epoch = currentEpoch.get(); - int currentLeaderId = calculateLeaderId(epoch); - AppLogger.logInfo("#### EPOCH = " + epoch + " LEADER= " + currentLeaderId + " ####"); - - if (localId == currentLeaderId) { - try { - if (!isClientGeneratingTransactions || !clientPendingTransactionsQueue.isEmpty()) { - AppLogger.logDebug("Node " + localId + " is leader: proposing new block"); - proposeNewBlock(epoch); - } - proposeNewBlock(epoch); - } catch (NoSuchAlgorithmException e) { - AppLogger.logError("Error proposing new block: " + e.getMessage(), e); - } - } - if (epoch != 0 && epoch % BLOCK_CHAIN_PRINT_EPOCH_FREQUENCY == 0) - blockchainManager.printBiggestFinalizedChain(); - currentEpoch.incrementAndGet(); - } - - private void launchThreads() { - executor.submit(() -> { - try { - urbNode.startURBNode(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - - executor.submit(this::consumeMessages); - if (isClientGeneratingTransactions) executor.submit(this::receiveClientTransactionsRequests); - } - - private void consumeMessages() { - final Queue bufferedMessages = new LinkedList<>(); - try { - while (true) { - Message message = derivableQueue.poll(100, TimeUnit.MILLISECONDS); - if (message == null) continue; - - if (inConfusionEpoch(currentEpoch.get())) { - bufferedMessages.add(message); - continue; - } - - while (!bufferedMessages.isEmpty()) { - handleMessageDelivery(bufferedMessages.poll()); - } - - handleMessageDelivery(message); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - AppLogger.logWarning("Message consumer thread interrupted"); - } - } - - private void proposeNewBlock(int epoch) throws NoSuchAlgorithmException { - LinkedList parentChain = blockchainManager.getBiggestNotarizedChain(); - Block parent = parentChain.getLast(); - Transaction[] transactions; - if (isClientGeneratingTransactions) { - transactions = new Transaction[clientPendingTransactionsQueue.size()]; - int i = 0; - while (!clientPendingTransactionsQueue.isEmpty()) { - transactions[i++] = clientPendingTransactionsQueue.poll(); - } - } else { - transactions = transactionPoolSimulator.generateTransactions(); - } - - Block newBlock = new Block(parent.getSHA1(), epoch, parent.length() + 1, transactions); - AppLogger.logDebug("Proposed block: " + newBlock + " with transactions: " + Arrays.toString(transactions)); - urbNode.broadcastFromLocal(new Message(MessageType.PROPOSE, new BlockWithChain(newBlock, parentChain), localId)); - } - - private void handleMessageDelivery(Message message) { - AppLogger.logDebug("Delivering message from " + message.sender() + ": " + message.type()); - switch (message.type()) { - case PROPOSE -> handlePropose(message); - case VOTE -> handleVote(message); - } - } - - private void handlePropose(Message message) { - BlockWithChain blockWithChain = (BlockWithChain) message.content(); - SeenProposal proposal = new SeenProposal(message.sender(), blockWithChain.block().epoch()); - - if (seenProposals.contains(proposal) - || !blockchainManager.onPropose(blockWithChain)) - return; - seenProposals.add(proposal); - - Block fullBlock = blockWithChain.block(); - Block blockHeader = new Block(fullBlock.parentHash(), fullBlock.epoch(), fullBlock.length(), new Transaction[0]); - urbNode.broadcastFromLocal(new Message(MessageType.VOTE, blockHeader, localId)); - AppLogger.logDebug("Voted for block from leader " + message.sender() + " epoch " + fullBlock.epoch()); - } - - - private void handleVote(Message message) { - Block block = (Block) message.content(); - votedBlocks.computeIfAbsent(block, _ -> new HashSet<>()).add(message.sender()); - - if (votedBlocks.get(block).size() > numberOfDistinctNodes / 2) { - blockchainManager.notarizeBlock(block); - } - } - - private boolean inConfusionEpoch(int epoch) { - return epoch >= CONFUSION_START && epoch <= CONFUSION_START + CONFUSION_DURATION - 1; - } - - - private int calculateLeaderId(int epoch) { - return inConfusionEpoch(epoch) ? epoch % numberOfDistinctNodes - : random.nextInt(numberOfDistinctNodes); - } - - private void receiveClientTransactionsRequests() { - try (ServerSocket serverSocket = new ServerSocket(myClientAddress.port())) { - AppLogger.logInfo("Transaction client server listening on port " + myClientAddress.port()); - while (true) { - Socket clientSocket = serverSocket.accept(); - executor.submit(() -> handleReceiveClientRequest(clientSocket)); - } - } catch (IOException e) { - AppLogger.logError("Error in transaction client server: " + e.getMessage(), e); - } - } - - private void handleReceiveClientRequest(Socket clientSocket) { - AppLogger.logDebug("Handling client " + clientSocket.getInetAddress() + " connection..."); - try (Socket s = clientSocket; - ObjectInputStream ois = new ObjectInputStream(s.getInputStream())) { - - while (true) { - try { - Transaction transaction = (Transaction) ois.readObject(); - AppLogger.logInfo("Received transaction from client " + s.getInetAddress() + ": " + transaction); - clientPendingTransactionsQueue.add(transaction); - } catch (ClassNotFoundException e) { - AppLogger.logError("Received unknown object from client " + s.getInetAddress(), e); - } - } - - } catch (IOException e) { - AppLogger.logInfo("Client " + clientSocket.getInetAddress() + " disconnected."); - } - } -} \ No newline at end of file diff --git a/src/StreamletClient.java b/src/StreamletClient.java index 2ef22b0..5f0baa6 100644 --- a/src/StreamletClient.java +++ b/src/StreamletClient.java @@ -20,10 +20,10 @@ void main() throws IOException { })); ConfigParser.ConfigData configData = ConfigParser.parseConfig(); - AppLogger.updateLoggerLevel(configData.logLevel); + AppLogger.updateLoggerLevel(configData.logLevel()); userInput = new BufferedReader(new InputStreamReader(System.in)); - serverAddressees = ConfigParser.parseConfig().servers; + serverAddressees = ConfigParser.parseConfig().servers(); printInfoGui(); while (running) { diff --git a/src/app/BlockNode.java b/src/app/BlockNode.java new file mode 100644 index 0000000..94e860d --- /dev/null +++ b/src/app/BlockNode.java @@ -0,0 +1,62 @@ +package app; + +import utils.application.Block; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class BlockNode implements Serializable { + private static final Pattern BLOCK_NODE_REGEX = Pattern.compile( + "BlockNode\\[(?true|false),(?.*)]" + ); + + private final Block block; + private Boolean finalized; + + public BlockNode(Block block, Boolean finalized) { + this.block = block; + this.finalized = finalized; + } + + public Block block() { + return block; + } + + public Boolean finalized() { + return finalized; + } + + public void finalizeBlock() { + finalized = true; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof BlockNode blockNode)) return false; + + return block.equals(blockNode.block); + } + + @Override + public int hashCode() { + return block.hashCode(); + } + + public String getPersistenceString() { + return "BlockNode[%s,%s]".formatted(finalized, block.getPersistenceString()); + } + + public static BlockNode fromPersistenceString(String persistenceString) { + Matcher matcher = BLOCK_NODE_REGEX.matcher(persistenceString); + if (!matcher.matches()) { + return null; + } + + boolean finalized = Boolean.parseBoolean(matcher.group("finalized")); + String blockString = matcher.group("block"); + Block block = Block.fromPersistenceString(blockString); + + return new BlockNode(block, finalized); + } +} diff --git a/src/app/BlockchainManager.java b/src/app/BlockchainManager.java new file mode 100644 index 0000000..b032efb --- /dev/null +++ b/src/app/BlockchainManager.java @@ -0,0 +1,388 @@ +package app; + +import utils.application.Block; +import utils.application.Hash; +import utils.application.Transaction; +import utils.logs.AppLogger; + +import java.nio.file.Path; +import java.util.*; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class BlockchainManager { + public static final int FINALIZATION_MIN_SIZE = 3; + public static final String LOG_FILE_NAME = "log.txt"; + public static final String BLOCK_CHAIN_FILE_NAME = "blockChain.txt"; + private static final int SHA1_LENGTH = 20; + private static final Block GENESIS_BLOCK = + new Block(new byte[SHA1_LENGTH], 0, 0, new Transaction[0]); + private final Hash genesisParentHash; + + private final Map blockNodesByHash = new HashMap<>(); + private final Map> blockchainByParentHash = new HashMap<>(); + private final Set recoveredBlocks = new HashSet<>(); + private final Set pendingProposals = new HashSet<>(); + + private final PersistenceFilesManager persistenceManager; + + private int mostRecentNotarizedEpoch; + + public BlockchainManager(Path outputPath) { + Path logFilePath = outputPath.resolve(LOG_FILE_NAME); + Path blockchainFilePath = outputPath.resolve(BLOCK_CHAIN_FILE_NAME); + persistenceManager = new PersistenceFilesManager(logFilePath, blockchainFilePath, outputPath); + mostRecentNotarizedEpoch = persistenceManager.initializeFromFile( + blockNodesByHash, blockchainByParentHash, recoveredBlocks, pendingProposals + ); + persistenceManager.getPendingOperations().forEach(this::processOperation); + + BlockNode genesisNode = new BlockNode(GENESIS_BLOCK, true); + genesisParentHash = new Hash(GENESIS_BLOCK.parentHash()); + Hash genesisHash = new Hash(GENESIS_BLOCK.getSHA1()); + + AppLogger.logWarning("RESTARTING AND THE GENESIS'S PARENT HAS THIS MANY CHILDREN..."); + if (blockchainByParentHash.get(genesisParentHash) != null) { + AppLogger.logWarning("" + blockchainByParentHash.get(genesisParentHash).size()); + AppLogger.logWarning("LENGTH BLOCK NODES BY HASH: " + blockNodesByHash.size()); + AppLogger.logWarning("LENGTH BLOCK CHAIN BY PARENT HASH: " + blockchainByParentHash.size()); + } else { + AppLogger.logWarning("GENESIS NOT INSERTED"); + } + + List genesisRoot = new LinkedList<>(); + genesisRoot.add(genesisHash); + blockchainByParentHash.putIfAbsent(genesisParentHash, genesisRoot); + blockchainByParentHash.putIfAbsent(genesisHash, new LinkedList<>()); + + blockNodesByHash.putIfAbsent(genesisHash, genesisNode); + } + + private void processOperation(Operation operation) { + switch (operation.getType()) { + case PROPOSE -> onPropose(operation.getBlock()); + case NOTARIZE -> notarizeBlock(operation.getBlock()); + case FINALIZE -> { + Hash hash = new Hash(operation.getBlock().getSHA1()); + BlockNode node = blockNodesByHash.get(hash); + if (node != null && !node.finalized()) { + finalizeChainUpstream(node); + } + } + } + } + + public void persistToFile() { + persistenceManager.persistToFile(getPersistenceString()); + } + + private String getPersistenceString() { + StringBuilder sb = new StringBuilder(); + + blockNodesByHash.forEach((hash, blockNode) -> { + sb.append("%s:%s".formatted(hash.getPersistenceString(), blockNode.getPersistenceString())); + sb.append("\n"); + }); + sb.append("\n\n"); + + blockchainByParentHash.forEach((hash, childHashes) -> { + String childrenStr = childHashes.stream() + .map(Hash::getPersistenceString) + .collect(Collectors.joining(",")); + + sb.append("%s:[%s]".formatted(hash.getPersistenceString(), childrenStr)); + sb.append("\n"); + }); + + sb.append("\n\n"); + sb.append("%s".formatted(recoveredBlocks.stream().map(BlockNode::getPersistenceString).collect(Collectors.joining("\n")))); + sb.append("\n\n"); + sb.append("%s".formatted(pendingProposals.stream().map(Block::getPersistenceString).collect(Collectors.joining("\n")))); + return sb.toString(); + } + + public List getBiggestNotarizedChain() { + AppLogger.logWarning("STARTING THE SEARCH..."); + return findBiggestChainMatching(genesisParentHash, _ -> true); + } + + public List getBiggestFinalizedChain() { + return findBiggestChainMatching(genesisParentHash, BlockNode::finalized); + } + + private List findBiggestChainMatching(Hash parentHash, Predicate predicate) { + List chain = new LinkedList<>(); + + AppLogger.logWarning("FINDING BIGGEST CHAIN ON EPOCH..."); + + if (!parentHash.equals(genesisParentHash)) { + BlockNode node = blockNodesByHash.get(parentHash); + if (node != null) { + AppLogger.logWarning(node.block().epoch().toString()); + chain.add(node.block()); + } + } + + List childrenHashes = blockchainByParentHash.get(parentHash); + if (childrenHashes == null) childrenHashes = new ArrayList<>(); + + for (Hash childHash : childrenHashes) { + BlockNode child = blockNodesByHash.get(childHash); + if (child != null) { + AppLogger.logWarning("\tCHILD"); + AppLogger.logWarning("\t" + child.getPersistenceString()); + AppLogger.logWarning("\tWITH HASH: " + Base64.getEncoder().encodeToString(child.block().getSHA1())); + } + } + + chain.addAll( + childrenHashes.stream() + .map(blockNodesByHash::get) + .filter(Objects::nonNull) + .filter(predicate) + .map(child -> findBiggestChainMatching(new Hash(child.block().getSHA1()), predicate)) + .max(Comparator.comparing(List::size)) + .orElseGet(LinkedList::new) + ); + return chain; + } + + public boolean onPropose(Block proposedBlock) { + Hash blockHash = new Hash(proposedBlock.getSHA1()); + + if (blockNodesByHash.containsKey(blockHash)) { + return false; + } + + boolean isLongerThanAnyChain = blockchainByParentHash.keySet().stream() + .filter(parentHash -> { + List children = blockchainByParentHash.get(parentHash); + return children == null || children.isEmpty(); + }) + .map(blockNodesByHash::get) + .filter(Objects::nonNull) + .anyMatch(blockNode -> proposedBlock.length() > blockNode.block().length()); + + if (!isLongerThanAnyChain) { + return false; + } + + pendingProposals.add(proposedBlock); + + BlockNode blockNode = new BlockNode(proposedBlock, false); + blockNodesByHash.put(blockHash, blockNode); + + persistenceManager.appendToLog(new Operation.Propose(proposedBlock)); + + return true; + } + + public void notarizeBlock(Block blockHeader) { + Block fullBlock = pendingProposals.stream() + .filter(blockHeader::equals) + .findFirst() + .orElse(null); + if (fullBlock == null) return; + + Hash parentHash = new Hash(fullBlock.parentHash()); + Hash blockHash = new Hash(fullBlock.getSHA1()); + BlockNode blockNode = blockNodesByHash.get(blockHash); + + blockchainByParentHash.computeIfAbsent(parentHash, _ -> new LinkedList<>()) + .add(blockHash); + blockchainByParentHash.computeIfAbsent(blockHash, _ -> new LinkedList<>()); + + if (blockHeader.epoch() > mostRecentNotarizedEpoch) { + mostRecentNotarizedEpoch = blockHeader.epoch(); + } + + pendingProposals.remove(blockHeader); + + persistenceManager.appendToLog(new Operation.Notarize(fullBlock)); + + AppLogger.logInfo("Block notarized: epoch " + blockHeader.epoch() + " length " + blockHeader.length()); + finalizeAndPropagate(blockNode); + } + + private void finalizeAndPropagate(BlockNode targetBlock) { + propagateFinalizedStatusDownstream(new Hash(targetBlock.block().getSHA1())); + finalizeByConsecutiveEpochBlocks(targetBlock); + } + + private void propagateFinalizedStatusDownstream(Hash parentHash) { + List childrenHashes = blockchainByParentHash.get(parentHash); + if (childrenHashes == null || childrenHashes.isEmpty()) return; + + for (Hash childHash : childrenHashes) { + BlockNode child = blockNodesByHash.get(childHash); + if (child == null) continue; + + if (child.finalized()) { + finalizeChainUpstream(child); + } + propagateFinalizedStatusDownstream(new Hash(child.block().getSHA1())); + } + } + + private void finalizeByConsecutiveEpochBlocks(BlockNode anchorBlock) { + List blocksBefore = collectPrecedingConsecutiveBlocks(anchorBlock); + List blocksAfter = collectFollowingConsecutiveBlocks(anchorBlock); + + List finalizationCandidate = new LinkedList<>(blocksBefore); + finalizationCandidate.add(anchorBlock); + finalizationCandidate.addAll(blocksAfter); + + if (finalizationCandidate.size() >= FINALIZATION_MIN_SIZE) { + BlockNode lastBlock = finalizationCandidate.getLast(); + finalizeChainUpstream(lastBlock); + verifyAndFinalizeBasedOnChildren(lastBlock); + } + } + + private void verifyAndFinalizeBasedOnChildren(BlockNode lastBlock) { + List childrenHashes = blockchainByParentHash.get(new Hash(lastBlock.block().getSHA1())); + + if (childrenHashes != null && !childrenHashes.isEmpty()) { + for (Hash childHash : childrenHashes) { + BlockNode child = blockNodesByHash.get(childHash); + if (child != null && !child.finalized()) { + finalizeByConsecutiveEpochBlocks(child); + } + } + } + } + + private List collectFollowingConsecutiveBlocks(BlockNode startBlock) { + List consecutiveBlocks = new LinkedList<>(); + BlockNode currentBlock = startBlock; + int currentEpoch = startBlock.block().epoch(); + + for (int i = 1; i < FINALIZATION_MIN_SIZE; i++) { + List childHashes = blockchainByParentHash.get(new Hash(currentBlock.block().getSHA1())); + if (childHashes == null) break; + + int targetEpoch = currentEpoch + 1; + Optional nextBlock = childHashes.stream() + .map(blockNodesByHash::get) + .filter(Objects::nonNull) + .filter(blockNode -> blockNode.block().epoch() == targetEpoch) + .findFirst(); + + if (nextBlock.isEmpty()) break; + + BlockNode child = nextBlock.get(); + currentEpoch++; + consecutiveBlocks.add(child); + currentBlock = child; + } + + return consecutiveBlocks; + } + + private List collectPrecedingConsecutiveBlocks(BlockNode startBlock) { + List consecutiveBlocks = new LinkedList<>(); + BlockNode currentBlock = startBlock; + int currentEpoch = startBlock.block().epoch(); + + for (int i = 1; i < FINALIZATION_MIN_SIZE; i++) { + BlockNode parentBlock = blockNodesByHash.get(new Hash(currentBlock.block().parentHash())); + + if (parentBlock == null + || parentBlock.block().epoch() != currentEpoch - 1 + || isGenesis(parentBlock)) { + break; + } + + currentEpoch--; + consecutiveBlocks.add(parentBlock); + currentBlock = parentBlock; + } + + return consecutiveBlocks; + } + + private void finalizeChainUpstream(BlockNode anchorBlock) { + for (BlockNode currentBlock = anchorBlock; + currentBlock != null && !isGenesis(currentBlock); + currentBlock = blockNodesByHash.get(new Hash(currentBlock.block().parentHash()))) { + if (currentBlock.finalized()) break; + currentBlock.finalizeBlock(); + + persistenceManager.appendToLog(new Operation.Finalize(currentBlock.block())); + } + } + + private boolean isGenesis(BlockNode block) { + return block.block().equals(GENESIS_BLOCK); + } + + public int getLastNotarizedEpoch() { + return mostRecentNotarizedEpoch; + } + + public List getBlocksInEpochRange(int fromEpoch, int toEpoch) { + return blockNodesByHash.values().stream() + .sorted(Comparator.comparing(block -> block.block().epoch())) + .dropWhile(block -> block.block().epoch() < fromEpoch) + .takeWhile(block -> block.block().epoch() < toEpoch) + .toList(); + } + + public void insertMissingBlocks(List missingBlocks) { + Set affectedBlocks = new HashSet<>(); + + for (BlockNode blockNode : missingBlocks) { + Hash parentHash = new Hash(blockNode.block().parentHash()); + Hash blockHash = new Hash(blockNode.block().getSHA1()); + + recoveredBlocks.add(blockNode); + blockNodesByHash.put(blockHash, blockNode); + + List siblings = blockchainByParentHash.computeIfAbsent(parentHash, _ -> new LinkedList<>()); + + if (!siblings.contains(blockHash)) { + siblings.add(blockHash); + } + + blockchainByParentHash.computeIfAbsent(blockHash, _ -> new LinkedList<>()); + + affectedBlocks.add(parentHash); + affectedBlocks.add(blockHash); + } + + affectedBlocks.stream() + .map(blockNodesByHash::get) + .filter(Objects::nonNull) + .forEach(this::finalizeAndPropagate); + } + + public void printBiggestFinalizedChain() { + final String GREEN = "\u001B[32m"; + final String RESET = "\u001B[0m"; + + String header = "=== LONGEST FINALIZED CHAIN ==="; + String border = "=".repeat(header.length()); + + List finalizedChain = getBiggestFinalizedChain(); + + String chainString = finalizedChain.stream() + .skip(1) + .map(block -> "%sBlock[%d-%d]%s".formatted(GREEN, block.epoch(), block.length(), RESET)) + .collect(Collectors.joining(" <- ", "%sGENESIS%s <- ".formatted(GREEN, RESET), "")); + + String output = String.format( + "%s%n%s%n%s%n%s%n%s", + border, + header, + border, + finalizedChain.size() == 1 ? "No Finalized Chain Yet" : chainString, + border + ); + + synchronized (AppLogger.class) { + for (String line : output.split("\n")) { + AppLogger.logInfo(line); + } + } + } +} \ No newline at end of file diff --git a/src/app/Operation.java b/src/app/Operation.java new file mode 100644 index 0000000..efb7a32 --- /dev/null +++ b/src/app/Operation.java @@ -0,0 +1,68 @@ +package app; + +import utils.application.Block; + +public interface Operation { + Type getType(); + + Block getBlock(); + + String getPersistenceString(); + + enum Type { + PROPOSE, + NOTARIZE, + FINALIZE + } + + record Propose(Block block) implements Operation { + @Override + public Type getType() { + return Type.PROPOSE; + } + + @Override + public Block getBlock() { + return block; + } + + @Override + public String getPersistenceString() { + return "PROPOSE:" + block.getPersistenceString(); + } + } + + record Notarize(Block block) implements Operation { + @Override + public Type getType() { + return Type.NOTARIZE; + } + + @Override + public Block getBlock() { + return block; + } + + @Override + public String getPersistenceString() { + return "NOTARIZE:" + block.getPersistenceString(); + } + } + + record Finalize(Block block) implements Operation { + @Override + public Type getType() { + return Type.FINALIZE; + } + + @Override + public Block getBlock() { + return block; + } + + @Override + public String getPersistenceString() { + return "FINALIZE:" + block.getPersistenceString(); + } + } +} diff --git a/src/app/PersistenceFilesManager.java b/src/app/PersistenceFilesManager.java new file mode 100644 index 0000000..282b1af --- /dev/null +++ b/src/app/PersistenceFilesManager.java @@ -0,0 +1,242 @@ +package app; + +import utils.application.Block; +import utils.application.Hash; +import utils.logs.AppLogger; + +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class PersistenceFilesManager { + + private final Path logFilePath; + private final Path blockchainFilePath; + + public PersistenceFilesManager(Path logFilePath, Path blockchainFilePath, Path outputPath) { + this.logFilePath = logFilePath; + this.blockchainFilePath = blockchainFilePath; + + try { + createIfNotExistsOutputFiles(outputPath); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void createIfNotExistsOutputFiles(Path outputPath) throws IOException { + Files.createDirectories(outputPath); + try { + Files.createFile(logFilePath); + } catch (FileAlreadyExistsException ignored) { + } + try { + Files.createFile(blockchainFilePath); + } catch (FileAlreadyExistsException ignored) { + } + } + + public int initializeFromFile( + Map blockNodesByHash, + Map> blockchainByParentHash, + Set recoveredBlocks, + Set pendingProposals + ) { + + String content = ""; + + try { + content = Files.readString(blockchainFilePath); + } catch (IOException _) { + } + if (content.isBlank()) return -1; + + String[] sections = content.split("\n\n"); + + int mostRecentEpoch = -1; + + if (sections.length > 0) { + String[] blockNodeLines = sections[0].trim().split("\n"); + mostRecentEpoch = initializeBlockNodesByHash(blockNodesByHash, blockNodeLines); + } + + if (sections.length > 1) { + String[] chainLines = sections[1].trim().split("\n"); + initializeBlockChainByParentHash(blockchainByParentHash, chainLines); + } + + if (sections.length > 2) { + String[] recoveredLines = sections[2].trim().split("\n"); + initializeRecoveredBlocks(recoveredBlocks, recoveredLines); + } + + if (sections.length > 3) { + String[] pendingLines = sections[3].trim().split("\n"); + initializePendingProposals(pendingProposals, pendingLines); + } + return mostRecentEpoch; + } + + private int initializeBlockNodesByHash(Map blockNodesByHash, String[] blockNodeLines) { + int maxEpoch = -1; + + for (String line : blockNodeLines) { + if (line.isBlank()) continue; + + int colonIndex = line.indexOf(":"); + if (colonIndex == -1) continue; + + String hashStr = line.substring(0, colonIndex).trim(); + String blockNodeStr = line.substring(colonIndex + 1).trim(); + + AppLogger.logWarning("[PERSISTENCE] BLOCKNODE OF HASH " + hashStr); + + try { + Hash hash = Hash.fromPersistenceString(hashStr); + BlockNode blockNode = BlockNode.fromPersistenceString(blockNodeStr); + + if (blockNode != null) { + blockNodesByHash.put(hash, blockNode); + + if (blockNode.block().epoch() > maxEpoch) { + maxEpoch = blockNode.block().epoch(); + } + } + } catch (IllegalArgumentException e) { + AppLogger.logWarning("Failed to parse blockNode entry: " + line); + } + } + return maxEpoch; + } + + private void initializeBlockChainByParentHash(Map> blockchainByParentHash, String[] chainLines) { + for (String line : chainLines) { + if (line.isBlank()) continue; + + int colonIndex = line.indexOf(":"); + if (colonIndex == -1) continue; + + int startBracketIndex = line.indexOf("["); + int lastBracketIndex = line.lastIndexOf("]"); + if (startBracketIndex == -1 || lastBracketIndex == -1) continue; + + String hashStr = line.substring(0, colonIndex).trim(); + String childrenStr = line.substring(startBracketIndex + 1, lastBracketIndex); + + try { + Hash parentHash = Hash.fromPersistenceString(hashStr); + + List childrenHashes = new LinkedList<>(); + + if (!childrenStr.isBlank()) { + String[] childHashStrings = childrenStr.split(","); + + for (String childHashStr : childHashStrings) { + if (childHashStr.isBlank()) continue; + + try { + Hash childHash = Hash.fromPersistenceString(childHashStr.trim()); + childrenHashes.add(childHash); + AppLogger.logWarning("[PERSISTENCE] FOUND CHILD HASH FOR PARENT " + hashStr); + } catch (Exception e) { + AppLogger.logWarning("Could not parse child hash: " + childHashStr); + } + } + } + + blockchainByParentHash.put(parentHash, childrenHashes); + } catch (IllegalArgumentException e) { + AppLogger.logWarning("Failed to parse chain entry: " + line); + } + } + } + + private void initializeRecoveredBlocks(Set recoveredBlocks, String[] recoveredLines) { + for (String line : recoveredLines) { + if (line.isBlank()) continue; + + try { + BlockNode blockNode = BlockNode.fromPersistenceString(line.trim()); + + if (blockNode != null) { + recoveredBlocks.add(blockNode); + } + } catch (Exception e) { + AppLogger.logWarning("Failed to parse recovered block: " + line); + } + } + } + + private void initializePendingProposals(Set pendingProposals, String[] pendingLines) { + for (String line : pendingLines) { + if (line.isBlank()) continue; + + try { + Block block = Block.fromPersistenceString(line.trim()); + + if (block != null) { + pendingProposals.add(block); + } + } catch (Exception e) { + AppLogger.logWarning("Failed to parse pending proposal: " + line); + } + } + } + + public void persistToFile(String persistenceString) { + try { + Files.writeString(blockchainFilePath, persistenceString, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE); + Files.writeString(logFilePath, "", StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE); + } catch (IOException ignored) {} + } + + public List getPendingOperations() { + List operations = new LinkedList<>(); + List lines; + + try { + lines = Files.readAllLines(logFilePath); + } catch (IOException e) { + return operations; + } + + for (String line : lines) { + if (line.isBlank()) continue; + + int splitIndex = line.indexOf(":"); + if (splitIndex == -1) continue; + + String type = line.substring(0, splitIndex); + String blockData = line.substring(splitIndex + 1); + + try { + Block block = Block.fromPersistenceString(blockData); + if (block == null) continue; + + switch (type) { + case "PROPOSE" -> operations.add(new Operation.Propose(block)); + case "NOTARIZE" -> operations.add(new Operation.Notarize(block)); + case "FINALIZE" -> operations.add(new Operation.Finalize(block)); + } + } catch (Exception e) { + AppLogger.logWarning("Corrupt log entry skipped: " + line); + } + } + return operations; + } + + public void appendToLog(Operation operation) { + String logEntry = operation.getPersistenceString() + "\n"; + try { + Files.writeString(logFilePath, logEntry, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + } catch (IOException e) { + AppLogger.logWarning("Failed to append operation to log: " + e.getMessage()); + } + } +} \ No newline at end of file diff --git a/src/app/StreamletNode.java b/src/app/StreamletNode.java new file mode 100644 index 0000000..4b2924d --- /dev/null +++ b/src/app/StreamletNode.java @@ -0,0 +1,340 @@ + +package app; + +import urb.URBNode; +import utils.application.*; +import utils.communication.Address; +import utils.communication.PeerInfo; +import utils.logs.AppLogger; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.file.Path; +import java.security.NoSuchAlgorithmException; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +record SeenProposal(int leader, int epoch) { +} + +public class StreamletNode { + public static final int BLOCKCHAIN_PRINT_EPOCH_INTERVAL = 5; + private static final int CONFUSION_EPOCH_START = 0; + private static final int CONFUSION_EPOCH_DURATION = 2; + private static final int BLOCKCHAIN_PERSISTENCE_INTERVAL = 10; + private final int deltaInSeconds; + private final LocalDateTime protocolStartTime; + private final int numberOfNodes; + private final TransactionPoolSimulator transactionPoolSimulator; + private final AtomicInteger currentEpoch = new AtomicInteger(0); + private final Random epochLeaderRandomizer = new Random(1L); + + private final int localNodeId; + private final URBNode urbNode; + private final BlockchainManager blockchainManager; + private final Map> blockVotes = new HashMap<>(); + private final BlockingQueue deliveredMessagesQueue = new LinkedBlockingQueue<>(1000); + private final Set seenProposals = new HashSet<>(); + private final ConcurrentLinkedQueue pendingClientTransactions = new ConcurrentLinkedQueue<>(); + + private final ExecutorService executor = Executors.newCachedThreadPool(); + private final ScheduledExecutorService epochScheduler = Executors.newSingleThreadScheduledExecutor(); + private final boolean isClientGeneratingTransactions; + private final Address clientServerAddress; + + private volatile boolean needsToRecover = false; + + public StreamletNode(PeerInfo localPeerInfo, List remotePeersInfo, int deltaInSeconds, + LocalDateTime protocolStartTime, boolean isClientGeneratingTransactions, Address clientServerAddress) + throws IOException { + localNodeId = localPeerInfo.id(); + numberOfNodes = 1 + remotePeersInfo.size(); + this.deltaInSeconds = deltaInSeconds; + this.protocolStartTime = protocolStartTime; + transactionPoolSimulator = new TransactionPoolSimulator(numberOfNodes); + blockchainManager = new BlockchainManager(Path.of("output", "node_%d".formatted(localNodeId))); + urbNode = new URBNode(localPeerInfo, remotePeersInfo, deliveredMessagesQueue::add); + this.isClientGeneratingTransactions = isClientGeneratingTransactions; + this.clientServerAddress = clientServerAddress; + } + + public void startProtocol() { + launchBackgroundThreads(); + + long epochDurationNanos = 2L * deltaInSeconds * 1_000_000_000L; + long delayUntilFirstEpochNanos = calculateDelayUntilFirstEpoch(); + epochScheduler.scheduleAtFixedRate( + this::safeAdvanceEpoch, delayUntilFirstEpochNanos, epochDurationNanos, TimeUnit.NANOSECONDS + ); + } + + private void launchBackgroundThreads() { + executor.submit(() -> { + try { + urbNode.startURBNode(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + executor.submit(this::processDeliveredMessages); + if (isClientGeneratingTransactions) { + executor.submit(this::acceptClientTransactions); + } + } + + private long calculateDelayUntilFirstEpoch() { + LocalDateTime now = LocalDateTime.now(); + if (now.isBefore(protocolStartTime)) { + return calculateDelayUntilProtocolStart(); + } + if (now.isAfter(protocolStartTime)) { + return recoverAndGetDelayToNextEpoch(); + } + return 0; + } + + private long calculateDelayUntilProtocolStart() { + AppLogger.logInfo("Waiting for protocol to start..."); + long delayNanos = ChronoUnit.NANOS.between(LocalDateTime.now(), protocolStartTime); + return Math.max(delayNanos, 0); + } + + private long recoverAndGetDelayToNextEpoch() { + AppLogger.logInfo("(Re)joining protocol that has already started..."); + needsToRecover = true; + + int epochDurationSeconds = 2 * this.deltaInSeconds; + long elapsedSeconds = ChronoUnit.SECONDS.between(protocolStartTime, LocalDateTime.now()); + + long completedEpochsSeconds = elapsedSeconds - (elapsedSeconds % epochDurationSeconds); + int completedEpochs = (int) (completedEpochsSeconds / epochDurationSeconds); + currentEpoch.compareAndSet(0, completedEpochs + 1); + + LocalDateTime nextEpochStartTime = protocolStartTime.plusSeconds(completedEpochsSeconds + epochDurationSeconds); + long delayToNextEpochNanos = ChronoUnit.NANOS.between(LocalDateTime.now(), nextEpochStartTime); + return Math.max(delayToNextEpochNanos, 0); + } + + private void requestMissingBlocksFromPeers(int fromEpoch, int toEpoch) { + MissingEpochRange missingEpochRange = new MissingEpochRange(fromEpoch, toEpoch); + Message joinRequest = new Message(MessageType.JOIN, missingEpochRange, localNodeId); + urbNode.broadcastFromLocal(joinRequest); + + synchronizeEpochWithPeers(toEpoch); + } + + private void synchronizeEpochWithPeers(int targetEpoch) { + for (int epoch = 0; epoch < targetEpoch; epoch++) { + determineEpochLeader(epoch); + } + } + + private void safeAdvanceEpoch() { + try { + advanceEpoch(); + } catch (Exception e) { + AppLogger.logError("Error advancing epoch: " + e.getMessage(), e); + } + } + + private void advanceEpoch() { + int epoch = currentEpoch.get(); + + if (needsToRecover) { + requestMissingBlocksFromPeers(blockchainManager.getLastNotarizedEpoch() + 1, epoch); + needsToRecover = false; + } + + if (epoch != 0 && epoch % BLOCKCHAIN_PRINT_EPOCH_INTERVAL == 0) { + synchronized (blockchainManager) { + blockchainManager.printBiggestFinalizedChain(); + } + } + + if (epoch != 0 && epoch % BLOCKCHAIN_PERSISTENCE_INTERVAL == 0) { + synchronized (blockchainManager) { + blockchainManager.persistToFile(); + } + } + int epochLeader = determineEpochLeader(epoch); + AppLogger.logInfo("#### EPOCH = " + epoch + " LEADER = " + epochLeader + " ####"); + + if (localNodeId == epochLeader) { + try { + if (!isClientGeneratingTransactions || !pendingClientTransactions.isEmpty()) { + synchronized (blockchainManager) { + AppLogger.logDebug("Node " + localNodeId + " is leader: proposing new block"); + proposeNewBlock(epoch); + } + } + } catch (NoSuchAlgorithmException e) { + AppLogger.logError("Error proposing new block: " + e.getMessage(), e); + } + } + + currentEpoch.incrementAndGet(); + } + + private void processDeliveredMessages() { + final Queue bufferedMessages = new LinkedList<>(); + try { + while (true) { + Message message = deliveredMessagesQueue.poll(100, TimeUnit.MILLISECONDS); + if (message == null) continue; + + if (isInConfusionPhase(currentEpoch.get())) { + bufferedMessages.add(message); + continue; + } + + while (!bufferedMessages.isEmpty()) { + processMessage(bufferedMessages.poll()); + } + + processMessage(message); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + AppLogger.logWarning("Message consumer thread interrupted"); + } + } + + private void proposeNewBlock(int epoch) throws NoSuchAlgorithmException { + Block parentBlock = blockchainManager.getBiggestNotarizedChain().getLast(); + Transaction[] transactions = collectBlockTransactions(); + + Block newBlock = new Block( + parentBlock.getSHA1(), + epoch, + parentBlock.length() + 1, + transactions + ); + AppLogger.logDebug("Proposed block: " + newBlock + " with transactions: " + Arrays.toString(transactions)); + urbNode.broadcastFromLocal(new Message(MessageType.PROPOSE, newBlock, localNodeId)); + } + + private Transaction[] collectBlockTransactions() { + if (isClientGeneratingTransactions) { + Transaction[] transactions = new Transaction[pendingClientTransactions.size()]; + int index = 0; + while (!pendingClientTransactions.isEmpty()) { + transactions[index++] = pendingClientTransactions.poll(); + } + return transactions; + } else { + return transactionPoolSimulator.generateTransactions(); + } + } + + private void processMessage(Message message) { + AppLogger.logDebug("Processing message from " + message.sender() + ": " + message.type()); + synchronized (blockchainManager) { + switch (message.type()) { + case JOIN -> handleJoinRequest(message); + case PROPOSE -> handleProposalMessage(message); + case VOTE -> handleVoteMessage(message); + case UPDATE -> handleUpdateMessage(message); + default -> {} + } + } + } + + private void handleProposalMessage(Message message) { + Block proposedBlock = (Block) message.content(); + SeenProposal proposal = new SeenProposal(message.sender(), proposedBlock.epoch()); + + if (seenProposals.contains(proposal) || !blockchainManager.onPropose(proposedBlock)) { + return; + } + seenProposals.add(proposal); + + Block blockHeader = new Block( + proposedBlock.parentHash(), + proposedBlock.epoch(), + proposedBlock.length(), + new Transaction[0] + ); + urbNode.broadcastFromLocal(new Message(MessageType.VOTE, blockHeader, localNodeId)); + AppLogger.logDebug("Voted for block from leader " + message.sender() + " epoch " + proposedBlock.epoch()); + } + + private void handleVoteMessage(Message message) { + Block block = (Block) message.content(); + blockVotes.computeIfAbsent(block, _ -> new HashSet<>()).add(message.sender()); + + int totalVotes = blockVotes.get(block).size(); + + if (totalVotes > numberOfNodes / 2) { + blockchainManager.notarizeBlock(block); + } + } + + private void handleJoinRequest(Message message) { + if (message.sender() == localNodeId) return; + + MissingEpochRange requestedRange = (MissingEpochRange) message.content(); + List missingBlocks = blockchainManager.getBlocksInEpochRange( + requestedRange.from(), + requestedRange.to() + ); + + Message catchUpResponse = new Message( + MessageType.UPDATE, + new CatchUp(message.sender(), missingBlocks, currentEpoch.get()), + localNodeId + ); + urbNode.broadcastFromLocal(catchUpResponse); + } + + private void handleUpdateMessage(Message message) { + CatchUp catchUp = (CatchUp) message.content(); + if (catchUp.slackerId() != localNodeId) return; + blockchainManager.insertMissingBlocks(catchUp.missingChain()); + } + + private boolean isInConfusionPhase(int epoch) { + return epoch >= CONFUSION_EPOCH_START && epoch < CONFUSION_EPOCH_START + CONFUSION_EPOCH_DURATION; + } + + private int determineEpochLeader(int epoch) { + return isInConfusionPhase(epoch) ? epoch % numberOfNodes : epochLeaderRandomizer.nextInt(numberOfNodes); + } + + private void acceptClientTransactions() { + try (ServerSocket serverSocket = new ServerSocket(clientServerAddress.port())) { + AppLogger.logInfo("Transaction server listening on port " + clientServerAddress.port()); + while (true) { + Socket clientSocket = serverSocket.accept(); + executor.submit(() -> handleClientConnection(clientSocket)); + } + } catch (IOException e) { + AppLogger.logError("Error in transaction server: " + e.getMessage(), e); + } + } + + private void handleClientConnection(Socket clientSocket) { + AppLogger.logDebug("Handling client connection from " + clientSocket.getInetAddress() + "..."); + try (Socket s = clientSocket; + ObjectInputStream ois = new ObjectInputStream(s.getInputStream())) { + + while (true) { + try { + Transaction transaction = (Transaction) ois.readObject(); + AppLogger.logInfo("Received transaction from client " + s.getInetAddress() + ": " + transaction); + pendingClientTransactions.add(transaction); + } catch (ClassNotFoundException e) { + AppLogger.logError("Received unknown object from client " + s.getInetAddress(), e); + } + } + + } catch (IOException e) { + AppLogger.logInfo("Client " + clientSocket.getInetAddress() + " disconnected."); + } + } +} \ No newline at end of file diff --git a/src/StreamletApp/TransactionPoolSimulator.java b/src/app/TransactionPoolSimulator.java similarity index 97% rename from src/StreamletApp/TransactionPoolSimulator.java rename to src/app/TransactionPoolSimulator.java index f7f90cf..5cceadd 100644 --- a/src/StreamletApp/TransactionPoolSimulator.java +++ b/src/app/TransactionPoolSimulator.java @@ -1,4 +1,4 @@ -package StreamletApp; +package app; import utils.application.Transaction; diff --git a/src/GroupCommunication/P2PNode.java b/src/groupcommunication/P2PNode.java similarity index 97% rename from src/GroupCommunication/P2PNode.java rename to src/groupcommunication/P2PNode.java index 13d5133..cf86045 100644 --- a/src/GroupCommunication/P2PNode.java +++ b/src/groupcommunication/P2PNode.java @@ -1,4 +1,4 @@ -package GroupCommunication; +package groupcommunication; import utils.application.Message; import utils.communication.KeyType; @@ -14,14 +14,16 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Function; import java.util.stream.Collectors; public class P2PNode implements Runnable, AutoCloseable { private static final long RETRY_DELAY_MS = 2500; private final PeerInfo localPeerInfo; - private final CountDownLatch allPeersConnectedLatch; private final Map peerInfoById; private final ConcurrentLinkedQueue outgoingMessageQueue = new ConcurrentLinkedQueue<>(); private final BlockingQueue incomingMessageQueue = new LinkedBlockingQueue<>(); @@ -35,7 +37,6 @@ public P2PNode(PeerInfo localPeerInfo, List remotePeersInfo) throws IO this.localPeerInfo = localPeerInfo; this.peerInfoById = remotePeersInfo.stream() .collect(Collectors.toMap(PeerInfo::id, Function.identity())); - this.allPeersConnectedLatch = new CountDownLatch(peerInfoById.size()); initializeServerSocket(); } @@ -201,7 +202,6 @@ private void handleConnectComplete(SelectionKey key) throws IOException { clientChannel.write(idBuffer); if (connectedPeers.add(remotePeer.id())) { - allPeersConnectedLatch.countDown(); peerConnectionBackoff.remove(remotePeer.id()); } @@ -241,7 +241,6 @@ private void handleIncomingConnection(SelectionKey key) throws IOException { peerConnections.put(remotePeerId, incomingChannel); if (connectedPeers.add(remotePeerId)) { - allPeersConnectedLatch.countDown(); peerConnectionBackoff.remove(remotePeerId); } @@ -297,10 +296,6 @@ public void enqueueIncomingMessage(Message message) { incomingMessageQueue.add(message); } - public void waitForAllPeersConnected() throws InterruptedException { - allPeersConnectedLatch.await(); - } - private void processOutgoingMessages() { MessageWithReceiver messageWithReceiver; while ((messageWithReceiver = outgoingMessageQueue.poll()) != null) { diff --git a/src/URB/URBCallback.java b/src/urb/URBCallback.java similarity index 89% rename from src/URB/URBCallback.java rename to src/urb/URBCallback.java index 80f5c06..0ed0f2b 100644 --- a/src/URB/URBCallback.java +++ b/src/urb/URBCallback.java @@ -1,4 +1,4 @@ -package URB; +package urb; import utils.application.Message; diff --git a/src/URB/URBNode.java b/src/urb/URBNode.java similarity index 91% rename from src/URB/URBNode.java rename to src/urb/URBNode.java index 12a3109..4faa7f1 100644 --- a/src/URB/URBNode.java +++ b/src/urb/URBNode.java @@ -1,6 +1,6 @@ -package URB; +package urb; -import GroupCommunication.P2PNode; +import groupcommunication.P2PNode; import utils.application.Message; import utils.application.MessageType; import utils.communication.MessageWithReceiver; @@ -33,12 +33,7 @@ public URBNode(PeerInfo localPeerInfo, this.callback = callback; } - public void waitForAllPeersToConnect() throws InterruptedException { - networkLayer.waitForAllPeersConnected(); - } - public void startURBNode() throws InterruptedException { - waitForAllPeersToConnect(); AppLogger.logInfo("P2PNode " + localPeerId + " is ready"); executor.submit(this::processIncomingMessages); } @@ -72,7 +67,7 @@ private void deliverMessage(Message message) { broadcastFromLocal(contentMessage); } } - case PROPOSE, VOTE -> { + case PROPOSE, VOTE, JOIN, UPDATE -> { Message echoMessage = new Message(MessageType.ECHO, message, localPeerId); broadcastToPeers(echoMessage); deliverToApplication(message); diff --git a/src/utils/ConfigParser.java b/src/utils/ConfigParser.java index e417427..3b23abe 100644 --- a/src/utils/ConfigParser.java +++ b/src/utils/ConfigParser.java @@ -1,12 +1,17 @@ + package utils; import utils.communication.Address; import utils.communication.PeerInfo; +import utils.logs.AppLogger; import utils.logs.LogLevel; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -15,50 +20,79 @@ import java.util.regex.Pattern; public class ConfigParser { - public final static String CONFIG_FILE = "config.txt"; + public static final String CONFIG_FILE = "config.txt"; + public static final DateTimeFormatter START_FORMAT = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"); + public static final LocalDateTime DEFAULT_START_DATE = + LocalDateTime.parse("01-01-2000 00:00:00", START_FORMAT); private static final Pattern P2P_PATTERN = Pattern.compile("^P2P\\s*=\\s*(.+)$", Pattern.CASE_INSENSITIVE); + private static final Pattern START_PATTERN = Pattern.compile("^start\\s*=\\s*(\\d{2}-\\d{2}-\\d{4} \\d{2}:\\d{2}:\\d{2})$", Pattern.CASE_INSENSITIVE); private static final Pattern SERVER_PATTERN = Pattern.compile("^server\\s*=\\s*(.+)$", Pattern.CASE_INSENSITIVE); private static final Pattern LOGLEVEL_PATTERN = Pattern.compile("^logLevel\\s*=\\s*(.+)$", Pattern.CASE_INSENSITIVE); private static final Pattern TRANSACTION_MODE_PATTERN = Pattern.compile("^transactionsMode\\s*=\\s*(.+)$", Pattern.CASE_INSENSITIVE); public static ConfigData parseConfig() throws IOException { - ConfigData configData = new ConfigData(); + List peers = new ArrayList<>(); + LocalDateTime start = DEFAULT_START_DATE; + Map servers = new HashMap<>(); + LogLevel logLevel = LogLevel.NORMAL; + boolean isClientGeneratingTransactions = false; + List lines = Files.readAllLines(Paths.get(CONFIG_FILE)); int peerIndex = 0; int serverIndex = 0; for (String line : lines) { line = line.trim(); - if (line.isEmpty() || line.startsWith("#")) continue; // skip empty lines or comments + if (line.isEmpty() || line.startsWith("#")) continue; Matcher p2pMatcher = P2P_PATTERN.matcher(line); + Matcher startMatcher = START_PATTERN.matcher(line); Matcher serverMatcher = SERVER_PATTERN.matcher(line); Matcher logLevelMatcher = LOGLEVEL_PATTERN.matcher(line); Matcher transactionMatcher = TRANSACTION_MODE_PATTERN.matcher(line); if (p2pMatcher.matches()) { - configData.peers.add(new PeerInfo(peerIndex++, Address.fromString(p2pMatcher.group(1).trim()))); + peers.add(new PeerInfo(peerIndex++, Address.fromString(p2pMatcher.group(1).trim()))); + } else if (startMatcher.matches()) { + start = parseToDate(startMatcher.group(1).trim()); } else if (serverMatcher.matches()) { - configData.servers.put(serverIndex++, Address.fromString(serverMatcher.group(1).trim())); + servers.put(serverIndex++, Address.fromString(serverMatcher.group(1).trim())); } else if (logLevelMatcher.matches()) { - try { - configData.logLevel = LogLevel.valueOf(logLevelMatcher.group(1).trim().toUpperCase()); - } catch (IllegalArgumentException ignored) { - configData.logLevel = LogLevel.NORMAL; - } + logLevel = parseLogLevel(logLevelMatcher.group(1).trim()); } else if (transactionMatcher.matches()) { - configData.isClientGeneratingTransactions = transactionMatcher.group(1).trim().equalsIgnoreCase("CLIENT"); + isClientGeneratingTransactions = transactionMatcher.group(1).trim().equalsIgnoreCase("CLIENT"); } } - return configData; + return new ConfigData(peers, start, servers, logLevel, isClientGeneratingTransactions); + } + + private static LocalDateTime parseToDate(String dateStr) { + try { + return LocalDateTime.parse(dateStr, START_FORMAT); + } catch (DateTimeParseException e) { + AppLogger.logWarning( + "Invalid format for protocol start date. Default start date was used. Should be: dd-MM-yyyy HH:mm:ss." + ); + return DEFAULT_START_DATE; + } + } + + private static LogLevel parseLogLevel(String levelStr) { + try { + return LogLevel.valueOf(levelStr.toUpperCase()); + } catch (IllegalArgumentException ignored) { + return LogLevel.NORMAL; + } } - public static class ConfigData { - public List peers = new ArrayList<>(); - public Map servers = new HashMap<>(); - public LogLevel logLevel = LogLevel.NORMAL; - public boolean isClientGeneratingTransactions = false; + public record ConfigData( + List peers, + LocalDateTime start, + Map servers, + LogLevel logLevel, + boolean isClientGeneratingTransactions + ) { } -} +} \ No newline at end of file diff --git a/src/utils/application/Block.java b/src/utils/application/Block.java index aa949e9..ec5b7c8 100644 --- a/src/utils/application/Block.java +++ b/src/utils/application/Block.java @@ -4,10 +4,18 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; +import java.util.Base64; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import utils.logs.AppLogger; public record Block(byte[] parentHash, Integer epoch, Integer length, Transaction[] transactions) implements Content { + private static final Pattern BLOCK_REGEX = Pattern.compile( + "Block\\{(?\\d+),(?\\d+),(?.*),\\&(?.*)&}" + ); public Block(byte[] parentHash, Integer epoch, Integer length, Transaction[] transactions) { this.parentHash = parentHash; @@ -30,7 +38,8 @@ public byte[] getSHA1() { for (Transaction transaction : transactions) { ByteBuffer transactionBuffer = ByteBuffer.allocate(24); transactionBuffer.putLong(transaction.id()); - transactionBuffer.putDouble(transaction.amount()); + double amount = Double.parseDouble(String.format(Locale.US, "%.2f", transaction.amount())); + transactionBuffer.putDouble(amount); transactionBuffer.putInt(transaction.sender()); transactionBuffer.putInt(transaction.receiver()); sha1.update(transactionBuffer.array()); @@ -71,4 +80,36 @@ public String toStringSummary() { epoch, length, partialParentHash, txSummary ); } + + public String getPersistenceString() { + AppLogger.logWarning("[BLOCK] I (" + epoch + ") have these many transactions..."); + AppLogger.logWarning("[BLOCK] " + transactions.length); + return "Block{%s,%s,%s,&%s&}".formatted( + epoch, + length, + Base64.getEncoder().encodeToString(parentHash), + Arrays.stream(transactions).map(Transaction::getPersistenceString).collect(Collectors.joining(",")) + ); + } + + public static Block fromPersistenceString(String persistenceString) { + Matcher matcher = BLOCK_REGEX.matcher(persistenceString); + if (!matcher.matches()) { + return null; + } + + int epoch = Integer.parseInt(matcher.group("epoch")); + int length = Integer.parseInt(matcher.group("length")); + byte[] parentHash = Base64.getDecoder().decode(matcher.group("parentHash")); + + String transactionsString = matcher.group("transactions"); + AppLogger.logWarning("[BLOCK] I (" + epoch + ") HAVE THESE TRANSACTIONS..."); + AppLogger.logWarning("[BLOCK] " + transactionsString); + Transaction[] transactions = transactionsString.isEmpty() ? new Transaction[0] : + Arrays.stream(transactionsString.split(",(?=Tx\\<)")) + .map(Transaction::fromPersistenceString) + .toArray(Transaction[]::new); + + return new Block(parentHash, epoch, length, transactions); + } } diff --git a/src/utils/application/BlockWithChain.java b/src/utils/application/BlockWithChain.java deleted file mode 100644 index 48e5ccf..0000000 --- a/src/utils/application/BlockWithChain.java +++ /dev/null @@ -1,6 +0,0 @@ -package utils.application; - -import java.util.LinkedList; - -public record BlockWithChain(Block block, LinkedList chain) implements Content { -} diff --git a/src/utils/application/CatchUp.java b/src/utils/application/CatchUp.java new file mode 100644 index 0000000..94a11a0 --- /dev/null +++ b/src/utils/application/CatchUp.java @@ -0,0 +1,27 @@ +package utils.application; + +import app.BlockNode; + +import java.util.LinkedList; +import java.util.List; + +public record CatchUp(Integer slackerId, List missingChain, Integer currentEpoch) implements Content { + + public CatchUp(Integer slackerId, List missingChain, Integer currentEpoch) { + this.slackerId = slackerId; + this.missingChain = new LinkedList<>(missingChain); + this.currentEpoch = currentEpoch; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof CatchUp catchUp)) return false; + + return slackerId.equals(catchUp.slackerId) && currentEpoch.equals(catchUp.currentEpoch); + } + + @Override + public int hashCode() { + return 31 * slackerId.hashCode() + currentEpoch.hashCode(); + } +} diff --git a/src/utils/application/Hash.java b/src/utils/application/Hash.java new file mode 100644 index 0000000..f0111b5 --- /dev/null +++ b/src/utils/application/Hash.java @@ -0,0 +1,27 @@ +package utils.application; + +import java.util.Arrays; +import java.util.Base64; + +public record Hash(byte[] hash) { + + @Override + public boolean equals(Object o) { + if (!(o instanceof Hash(byte[] hash1))) return false; + + return Arrays.equals(this.hash, hash1); + } + + @Override + public int hashCode() { + return Arrays.hashCode(hash); + } + + public String getPersistenceString() { + return Base64.getEncoder().encodeToString(hash); + } + + public static Hash fromPersistenceString(String persistenceString) { + return new Hash(Base64.getDecoder().decode(persistenceString)); + } +} diff --git a/src/utils/application/MessageType.java b/src/utils/application/MessageType.java index 23545e0..a132614 100644 --- a/src/utils/application/MessageType.java +++ b/src/utils/application/MessageType.java @@ -1,6 +1,8 @@ package utils.application; public enum MessageType { + JOIN, + UPDATE, PROPOSE, VOTE, ECHO diff --git a/src/utils/application/MissingEpochRange.java b/src/utils/application/MissingEpochRange.java new file mode 100644 index 0000000..53abffe --- /dev/null +++ b/src/utils/application/MissingEpochRange.java @@ -0,0 +1,4 @@ +package utils.application; + +public record MissingEpochRange(Integer from, Integer to) implements Content { +} diff --git a/src/utils/application/Transaction.java b/src/utils/application/Transaction.java index ee32094..575276c 100644 --- a/src/utils/application/Transaction.java +++ b/src/utils/application/Transaction.java @@ -1,9 +1,30 @@ package utils.application; import java.io.Serializable; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public record Transaction(Long id, Double amount, Integer sender, Integer receiver) implements Serializable { + private static final Pattern TX_REGEX = Pattern.compile("Tx\\<(?\\d+),(?\\d+(.\\d+)?),(?\\d+),(?\\d+)>"); + public String toStringSummary() { return String.format("id=%d, %d→%d: %.2f", id, sender, receiver, amount); } + + public String getPersistenceString() { + return String.format(Locale.US, "Tx<%d,%.2f,%d,%d>", id, amount, sender, receiver); + } + + public static Transaction fromPersistenceString(String persistenceString) { + Matcher matcher = TX_REGEX.matcher(persistenceString); + if (!matcher.matches()) { + return null; + } + long id = Long.parseLong(matcher.group("id")); + double amount = Double.parseDouble(matcher.group("amount")); + int sender = Integer.parseInt(matcher.group("sender")); + int receiver = Integer.parseInt(matcher.group("receiver")); + return new Transaction(id, amount, sender, receiver); + } }