diff --git a/proto/network.proto b/proto/network.proto index 90f1991e..df81e169 100644 --- a/proto/network.proto +++ b/proto/network.proto @@ -14,7 +14,9 @@ enum MessageType { // Warning: Do not change the numeric values of existing enum fields. They are part of the public API. // New values can be added, but existing values must never be renumbered or deleted. enum ResultCode { - RESULT_CODE_UNSPECIFIED = 0; + // 0-10: [SCOPE: BASE_TRANSPORT_AND_VALIDATOR_STATUS] + // Generic V1 processing outcome before deeper payload/proof-specific validation branches. + RESULT_CODE_UNSPECIFIED = 0; // We should create an v1Error for this case, sender disconnects from the validator, add optional information to update the node verion. RESULT_CODE_OK = 1; RESULT_CODE_INVALID_PAYLOAD = 2; RESULT_CODE_RATE_LIMITED = 3; @@ -25,6 +27,8 @@ enum ResultCode { RESULT_CODE_TX_ACCEPTED_PROOF_UNAVAILABLE = 8; RESULT_CODE_NODE_OVERLOADED = 9; RESULT_CODE_TX_ALREADY_PENDING = 10; + // 11-44: [SCOPE: SHARED_VALIDATORS_REQUEST_PAYLOAD] + // (incoming BROADCAST_TRANSACTION_REQUEST -> partial apply payload validation) - validator disconnects, sender rotates RESULT_CODE_OPERATION_TYPE_UNKNOWN = 11; RESULT_CODE_SCHEMA_VALIDATION_FAILED = 12; RESULT_CODE_REQUESTER_ADDRESS_INVALID = 13; @@ -59,9 +63,15 @@ enum ResultCode { RESULT_CODE_TRANSFER_SENDER_NOT_FOUND = 42; RESULT_CODE_TRANSFER_INSUFFICIENT_BALANCE = 43; RESULT_CODE_TRANSFER_RECIPIENT_BALANCE_OVERFLOW = 44; + + // 45-47: [SCOPE: REQUEST_DISPATCH_TXPOOL_TXCOMMIT_PIPELINE] + // (dispatch/enqueue/commit handling on validator) RESULT_CODE_TX_HASH_INVALID_FORMAT = 45; RESULT_CODE_INTERNAL_ENQUEUE_VALIDATION_FAILED = 46; - RESULT_CODE_TX_ACCEPTED_RECEIPT_MISSING = 47; + RESULT_CODE_TX_COMMITTED_RECEIPT_MISSING = 47; + + // 48-60: [SCOPE: VALIDATOR_RESPONSE_PROOF_AND_IDENTITY_VALIDATION] + // (incoming BROADCAST_TRANSACTION_RESPONSE validation on sender) RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_INVALID = 48; RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN = 49; RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED = 50; @@ -94,7 +104,7 @@ message BroadcastTransactionRequest { bytes nonce = 2; bytes signature = 3; } - +// TODO: before release change appendedAt to appended_at message BroadcastTransactionResponse { bytes nonce = 1; bytes signature = 2; @@ -102,7 +112,7 @@ message BroadcastTransactionResponse { uint64 appendedAt = 4; ResultCode result = 5; } - +// TODO delete ProofData message ProofData { bytes proof = 1; uint64 appendedAt = 2; diff --git a/rpc/rpc_services.js b/rpc/rpc_services.js index 96dca5d1..70b826e4 100644 --- a/rpc/rpc_services.js +++ b/rpc/rpc_services.js @@ -6,10 +6,23 @@ import { } from "../src/utils/normalizers.js"; import { get_confirmed_tx_info, get_unconfirmed_tx_info } from "../src/utils/cli.js"; import {OperationType} from "../src/utils/constants.js"; +import { sleep } from "../src/utils/helpers.js"; import b4a from "b4a"; import PartialTransactionValidator from "../src/core/network/protocols/shared/validators/PartialTransactionValidator.js"; import PartialTransferValidator from "../src/core/network/protocols/shared/validators/PartialTransferValidator.js"; +// This was added because V1 is not waiting for signed/unsigned state. So to include reverse compatibility +// we need to slow down v1 to legacy case. +const waitForUnconfirmedTx = async (state, txHash, config) => { + const startedAt = Date.now(); + while ((Date.now() - startedAt) < config.messageValidatorResponseTimeout) { + const payload = await state.get(txHash); + if (payload) return true; + await sleep(100); + } + return false; +}; + export async function getBalance(msbInstance, address, confirmed) { const state = msbInstance.state; const useUnconfirmed = confirmed === false; @@ -75,6 +88,11 @@ export async function broadcastTransaction(msbInstance, config, payload) { throw new Error("Failed to broadcast transaction after multiple attempts."); } + const isConfirmed = await waitForUnconfirmedTx(msbInstance.state, hash, config); + if (!isConfirmed) { + throw new Error("Failed to broadcast transaction after multiple attempts."); + } + const signedLength = msbInstance.state.getSignedLength(); const unsignedLength = msbInstance.state.getUnsignedLength(); diff --git a/src/config/env.js b/src/config/env.js index 5cf5809b..1615a62c 100644 --- a/src/config/env.js +++ b/src/config/env.js @@ -54,7 +54,7 @@ const configData = { enableValidatorObserver: true, enableWallet: true, maxValidators: 6, - maxRetries: 0, + maxRetries: 3, messageThreshold: 1000, messageValidatorRetryDelay: 1000, //How long to wait before retrying (ms) MESSAGE_VALIDATOR_RETRY_DELAY_MS messageValidatorResponseTimeout: 3 * 3 * 1000, //Overall timeout for sending a message (ms). This is 3 * maxRetries * messageValidatorRetryDelay; diff --git a/src/core/network/protocols/connectionPolicies.js b/src/core/network/protocols/connectionPolicies.js new file mode 100644 index 00000000..e457bf27 --- /dev/null +++ b/src/core/network/protocols/connectionPolicies.js @@ -0,0 +1,88 @@ +import { ResultCode } from '../../../utils/constants.js'; + +// TODO: Consider how to behave with ResultCode.UNSPECIFIED + +export const SENDER_ACTION = Object.freeze({ + UNDEFINED: 'UNDEFINED', + SUCCESS: 'SUCCESS', + ROTATE: 'ROTATE', + NO_ROTATE: 'NO_ROTATE', +}); + +const SUCCESS_CODES = new Set([ + ResultCode.OK, +]); + +const ROTATE_CODES = new Set([ + ResultCode.UNSPECIFIED, + ResultCode.INVALID_PAYLOAD, + ResultCode.RATE_LIMITED, + ResultCode.SIGNATURE_INVALID, + ResultCode.UNEXPECTED_ERROR, + ResultCode.TIMEOUT, + ResultCode.NODE_HAS_NO_WRITE_ACCESS, + ResultCode.TX_ACCEPTED_PROOF_UNAVAILABLE, + ResultCode.NODE_OVERLOADED, + ResultCode.OPERATION_TYPE_UNKNOWN, + ResultCode.SCHEMA_VALIDATION_FAILED, + ResultCode.REQUESTER_ADDRESS_INVALID, + ResultCode.REQUESTER_PUBLIC_KEY_INVALID, + ResultCode.TX_HASH_MISMATCH, + ResultCode.TX_SIGNATURE_INVALID, + ResultCode.TX_EXPIRED, + ResultCode.TX_ALREADY_EXISTS, + ResultCode.OPERATION_ALREADY_COMPLETED, + ResultCode.REQUESTER_NOT_FOUND, + ResultCode.INSUFFICIENT_FEE_BALANCE, + ResultCode.EXTERNAL_BOOTSTRAP_EQUALS_MSB_BOOTSTRAP, + ResultCode.SELF_VALIDATION_FORBIDDEN, + ResultCode.ROLE_NODE_ENTRY_NOT_FOUND, + ResultCode.ROLE_NODE_ALREADY_WRITER, + ResultCode.ROLE_NODE_NOT_WHITELISTED, + ResultCode.ROLE_NODE_NOT_WRITER, + ResultCode.ROLE_NODE_IS_INDEXER, + ResultCode.ROLE_ADMIN_ENTRY_MISSING, + ResultCode.ROLE_INVALID_RECOVERY_CASE, + ResultCode.ROLE_UNKNOWN_OPERATION, + ResultCode.ROLE_INVALID_WRITER_KEY, + ResultCode.ROLE_INSUFFICIENT_FEE_BALANCE, + ResultCode.MSB_BOOTSTRAP_MISMATCH, + ResultCode.EXTERNAL_BOOTSTRAP_NOT_DEPLOYED, + ResultCode.EXTERNAL_BOOTSTRAP_TX_MISSING, + ResultCode.EXTERNAL_BOOTSTRAP_MISMATCH, + ResultCode.BOOTSTRAP_ALREADY_EXISTS, + ResultCode.TRANSFER_RECIPIENT_ADDRESS_INVALID, + ResultCode.TRANSFER_RECIPIENT_PUBLIC_KEY_INVALID, + ResultCode.TRANSFER_AMOUNT_TOO_LARGE, + ResultCode.TRANSFER_SENDER_NOT_FOUND, + ResultCode.TRANSFER_INSUFFICIENT_BALANCE, + ResultCode.TRANSFER_RECIPIENT_BALANCE_OVERFLOW, + ResultCode.TX_HASH_INVALID_FORMAT, + ResultCode.INTERNAL_ENQUEUE_VALIDATION_FAILED, + ResultCode.TX_COMMITTED_RECEIPT_MISSING, + ResultCode.VALIDATOR_RESPONSE_TX_TYPE_INVALID, + ResultCode.VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN, + ResultCode.VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED, + ResultCode.VALIDATOR_RESPONSE_SCHEMA_INVALID, + ResultCode.PENDING_REQUEST_MISSING_TX_DATA, + ResultCode.PROOF_PAYLOAD_MISMATCH, + ResultCode.VALIDATOR_WRITER_KEY_NOT_REGISTERED, + ResultCode.VALIDATOR_ADDRESS_MISMATCH, + ResultCode.VALIDATOR_NODE_ENTRY_NOT_FOUND, + ResultCode.VALIDATOR_NODE_NOT_WRITER, + ResultCode.VALIDATOR_WRITER_KEY_MISMATCH, + ResultCode.VALIDATOR_TX_OBJECT_INVALID, + ResultCode.VALIDATOR_VA_MISSING, + ResultCode.TX_INVALID_PAYLOAD +]); + +const NO_ROTATE_CODES = new Set([ + ResultCode.TX_ALREADY_PENDING, +]); + +export function resultToValidatorAction(resultCode) { + if (SUCCESS_CODES.has(resultCode)) return SENDER_ACTION.SUCCESS; + if (ROTATE_CODES.has(resultCode)) return SENDER_ACTION.ROTATE; + if (NO_ROTATE_CODES.has(resultCode)) return SENDER_ACTION.NO_ROTATE; + return SENDER_ACTION.UNDEFINED; +} diff --git a/src/core/network/protocols/v1/NetworkMessageRouter.js b/src/core/network/protocols/v1/NetworkMessageRouter.js index 6c27d83e..f287106b 100644 --- a/src/core/network/protocols/v1/NetworkMessageRouter.js +++ b/src/core/network/protocols/v1/NetworkMessageRouter.js @@ -87,7 +87,6 @@ class NetworkMessageRouterV1 { #preValidate(incomingMessage) { return !(!incomingMessage || !b4a.isBuffer(incomingMessage) || incomingMessage.length === 0 || incomingMessage.length > V1_PROTOCOL_PAYLOAD_MAX_SIZE); - } #disconnect(connection, reason) { diff --git a/src/core/network/protocols/v1/V1ProtocolError.js b/src/core/network/protocols/v1/V1ProtocolError.js index 17b70d44..2f95be25 100644 --- a/src/core/network/protocols/v1/V1ProtocolError.js +++ b/src/core/network/protocols/v1/V1ProtocolError.js @@ -42,7 +42,7 @@ export class V1TxInvalidPayloadError extends V1ProtocolError { } export class V1SignatureInvalidError extends V1ProtocolError { - constructor(message = 'Signature invalid', endConnection = false) { + constructor(message = 'Signature invalid', endConnection = true) { super(ResultCode.SIGNATURE_INVALID, message, endConnection); } } @@ -60,7 +60,7 @@ export class V1UnexpectedError extends V1ProtocolError { } export class V1TimeoutError extends V1ProtocolError { - constructor(message = 'Request timed out', endConnection = true) { + constructor(message = 'Request timed out', endConnection = false) { super(ResultCode.TIMEOUT, message, endConnection); } } @@ -72,7 +72,7 @@ export class V1NodeHasNoWriteAccess extends V1ProtocolError { } export class V1TxAcceptedProofUnavailable extends V1ProtocolError { - constructor(message = 'Transaction accepted but proof is unavailable', endConnection = true, appendedAt = 0) { + constructor(message = 'Transaction accepted but proof is unavailable', endConnection = false, appendedAt = 0) { super(ResultCode.TX_ACCEPTED_PROOF_UNAVAILABLE, message, endConnection); this.appendedAt = Number.isSafeInteger(appendedAt) && appendedAt > 0 ? appendedAt : 0; } @@ -85,7 +85,7 @@ export class V1NodeOverloadedError extends V1ProtocolError { } export class V1TxAlreadyPendingError extends V1ProtocolError { - constructor(message = 'Transaction is already pending', endConnection = true) { + constructor(message = 'Transaction is already pending', endConnection = false) { super(ResultCode.TX_ALREADY_PENDING, message, endConnection); } } diff --git a/src/core/network/protocols/v1/handlers/V1BaseOperationHandler.js b/src/core/network/protocols/v1/handlers/V1BaseOperationHandler.js index d6f39cf5..1dbf2ade 100644 --- a/src/core/network/protocols/v1/handlers/V1BaseOperationHandler.js +++ b/src/core/network/protocols/v1/handlers/V1BaseOperationHandler.js @@ -38,7 +38,6 @@ class V1BaseOperationHandler { const protocolError = this.#toProtocolError(error); const rejected = this.#pendingRequestService.rejectPendingRequest(messageId, protocolError); if (!rejected) return; - if (shouldEndConnection(protocolError)) connection.end(); this.displayError(step, connection.remotePublicKey, error); } @@ -50,7 +49,7 @@ class V1BaseOperationHandler { if (error instanceof V1ProtocolError) { return error; } - return new V1UnexpectedError(error?.message ?? 'Unexpected error', false); + return new V1UnexpectedError(error?.message ?? 'Unexpected error'); } } diff --git a/src/core/network/protocols/v1/handlers/V1BroadcastTransactionOperationHandler.js b/src/core/network/protocols/v1/handlers/V1BroadcastTransactionOperationHandler.js index 33d8cb6b..1eafd5de 100644 --- a/src/core/network/protocols/v1/handlers/V1BroadcastTransactionOperationHandler.js +++ b/src/core/network/protocols/v1/handlers/V1BroadcastTransactionOperationHandler.js @@ -154,7 +154,7 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler { appendedAt ); } catch (error) { - throw new V1UnexpectedError(`Failed to build broadcast transaction response: ${error.message}`, true); + throw new V1UnexpectedError(`Failed to build broadcast transaction response: ${error.message}`); } } @@ -162,7 +162,7 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler { try { return unsafeDecodeApplyOperation(message); } catch (error) { - throw new V1UnexpectedError(`Failed to decode apply operation from message: ${error.message}`, true); + throw new V1UnexpectedError(`Failed to decode apply operation from message: ${error.message}`); } } @@ -217,7 +217,7 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler { throw new V1TxInvalidPayloadError('Decoded transaction type is missing.', false); } if (!this.#transactionCommitService) { - throw new V1UnexpectedError('TransactionCommitService is not configured.', true); + throw new V1UnexpectedError('TransactionCommitService is not configured.'); } const type = decodedTransaction.type; @@ -252,10 +252,10 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler { throw new V1ProtocolError(ResultCode.TX_HASH_INVALID_FORMAT, error.message, false); } if (error instanceof PendingCommitAlreadyExistsError) { - throw new V1TxAlreadyPendingError(error.message, false); + throw new V1TxAlreadyPendingError(error.message); } if (error instanceof PendingCommitBufferFullError) { - throw new V1NodeOverloadedError(error.message, false); + throw new V1NodeOverloadedError(error.message); } throw error; } @@ -287,10 +287,10 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler { throw new V1TxAcceptedProofUnavailable(error.message, false, error.appendedAt); } if (error instanceof TransactionPoolMissingCommitReceiptError) { - throw new V1ProtocolError(ResultCode.TX_ACCEPTED_RECEIPT_MISSING, error.message, false); + throw new V1ProtocolError(ResultCode.TX_COMMITTED_RECEIPT_MISSING, error.message, false); } if (error instanceof PendingCommitTimeoutError) { - throw new V1TimeoutError(error.message, false); + throw new V1TimeoutError(error.message); } throw error; } diff --git a/src/core/network/protocols/v1/handlers/V1LivenessOperationHandler.js b/src/core/network/protocols/v1/handlers/V1LivenessOperationHandler.js index 2e5173f4..bb233e84 100644 --- a/src/core/network/protocols/v1/handlers/V1LivenessOperationHandler.js +++ b/src/core/network/protocols/v1/handlers/V1LivenessOperationHandler.js @@ -2,7 +2,6 @@ import { networkMessageFactory } from "../../../../../messages/network/v1/networ import { NETWORK_CAPABILITIES, ResultCode } from "../../../../../utils/constants.js"; import V1LivenessRequest from "../validators/V1LivenessRequest.js"; import {getResultCode, shouldEndConnection, V1UnexpectedError} from "../V1ProtocolError.js"; -import { publicKeyToAddress, sleep } from "../../../../../utils/helpers.js"; import V1LivenessResponse from "../validators/V1LivenessResponse.js"; import V1BaseOperationHandler from "./V1BaseOperationHandler.js"; @@ -60,7 +59,7 @@ class V1LivenessOperationHandler extends V1BaseOperationHandler { message.id, connection, error, - "failed to process liveness response from sender" + "Failed to process liveness response from sender" ); } } @@ -73,7 +72,7 @@ class V1LivenessOperationHandler extends V1BaseOperationHandler { resultCode ); } catch (error) { - throw new V1UnexpectedError(`Failed to build liveness response: ${error.message}`, true); + throw new V1UnexpectedError(`Failed to build liveness response: ${error.message}`); } } diff --git a/src/core/network/protocols/v1/validators/V1BroadcastTransactionResponse.js b/src/core/network/protocols/v1/validators/V1BroadcastTransactionResponse.js index e81d8634..180cfb56 100644 --- a/src/core/network/protocols/v1/validators/V1BroadcastTransactionResponse.js +++ b/src/core/network/protocols/v1/validators/V1BroadcastTransactionResponse.js @@ -29,26 +29,27 @@ class V1BroadcastTransactionResponse extends V1BaseOperation { await this.validateSignature(payload, connection.remotePublicKey); const resultCode = payload.broadcast_transaction_response.result; // if result code is not OK, we can skip the rest of the validations. - if (resultCode !== ResultCode.OK) { - return true; + this.validateIfResultCodeIsValidatorInternalError(resultCode); + if (resultCode === ResultCode.OK) { + const proofResult = await this.verifyProofOfPublication(payload, stateInstance); + const { + validatorDecodedTx, + manifest + } = await this.assertProofPayloadMatchesRequestPayload(proofResult, pendingRequestServiceEntry); + this.validateDecodedCompletePayloadSchema(validatorDecodedTx); + const { + writerKeyFromManifest, + validatorAddressCorrelatedWithManifest + } = await this.validateWritingKey(validatorDecodedTx, manifest, stateInstance); + await this.validateValidatorCorrectness( + validatorDecodedTx, + connection.remotePublicKey, + writerKeyFromManifest, + validatorAddressCorrelatedWithManifest, + stateInstance, + ); } - const proofResult = await this.verifyProofOfPublication(payload, stateInstance); - const { - validatorDecodedTx, - manifest - } = await this.assertProofPayloadMatchesRequestPayload(proofResult, pendingRequestServiceEntry); - this.validateDecodedCompletePayloadSchema(validatorDecodedTx); - const { - writerKeyFromManifest, - validatorAddressCorrelatedWithManifest - } = await this.validateWritingKey(validatorDecodedTx, manifest, stateInstance); - await this.validateValidatorCorrectness( - validatorDecodedTx, - connection.remotePublicKey, - writerKeyFromManifest, - validatorAddressCorrelatedWithManifest, - stateInstance, - ); + return true; } @@ -213,6 +214,16 @@ class V1BroadcastTransactionResponse extends V1BaseOperation { ); } } + + validateIfResultCodeIsValidatorInternalError(resultCode) { + if (resultCode === ResultCode.TX_COMMITTED_RECEIPT_MISSING) { + throw new V1ProtocolError( + resultCode, + `Validator response indicates an error with result code ${resultCode}, which is an internal error code.`, + true + ) + } + } } const stripValidatorMetadata = (value) => { diff --git a/src/core/network/services/ConnectionManager.js b/src/core/network/services/ConnectionManager.js index b981400f..69077b07 100644 --- a/src/core/network/services/ConnectionManager.js +++ b/src/core/network/services/ConnectionManager.js @@ -1,6 +1,6 @@ import b4a from 'b4a' -import PeerWallet from "trac-wallet" -import { EventType, ResultCode } from '../../../utils/constants.js'; +import {EventType, ResultCode} from '../../../utils/constants.js'; +import {publicKeyToAddress} from "../../../utils/helpers.js"; /** * @typedef {import('hyperswarm').Connection} Connection @@ -67,7 +67,7 @@ class ConnectionManager { if (DEBUG) { // It is recommended to leave this if(DEBUG) statement here to avoid needlessly // calculating the address from the pubKey during production execution - targetAddress = this.#toAddress(publicKey) + targetAddress = publicKeyToAddress(publicKey, this.#config) } if (!this.exists(publicKey) || !this.connected(publicKey)) { @@ -113,7 +113,7 @@ class ConnectionManager { #stopHealthCheck(publicKeyHex) { let targetAddress = null; if (DEBUG) { - targetAddress = this.#toAddress(publicKeyHex) + targetAddress = publicKeyToAddress(publicKeyHex, this.#config) } if (!this.#healthCheckService) { @@ -141,70 +141,28 @@ class ConnectionManager { return entry ? entry.connection : undefined; } - /** - * Sends a message to a single randomly selected connected validator. - * Returns the public key (buffer) of the validator used, or throws - * if the specified validator is unavailable. - * @param {Object} message - The message to send to the validator - * @returns {String} - The public key of the validator used - */ - // send(message) { - // const connectedValidators = this.connectedValidators(); - - // if (connectedValidators.length === 0) { - // throw new Error('ConnectionManager: no connected validators available to send message'); - // } - - // const target = this.pickRandomValidator(connectedValidators); - // const entry = this.#validators.get(target); - // if (!entry || !entry.connection || !entry.connection.protocolSession?.has('legacy')) return null; - - // try { - // entry.connection.protocolSession.send(message); - // entry.sent = (entry.sent || 0) + 1; - // } catch (e) { - // // Swallow individual send errors. - // } - - // return target; - // } - /** * Sends a message through a specific validator without increasing sent messages count. - * @param {Object} message - The message to send to the validator + * @param {Object} message - The message to send to the validator. * @param {String | Buffer} publicKey - A validator public key hex string to be fetched from the pool. - * @returns {Boolean} True if the message was sent, false otherwise. + * @returns {Promise<*>} A promise returned by `validator.connection.protocolSession.send(message)`. + * @throws {ConnectionManagerError} If the validator is not connected. + * @throws {ConnectionManagerError} If the validator has no valid connection or protocol session. */ async sendSingleMessage(message, publicKey) { let publicKeyHex = this.#toHexString(publicKey); - if (!this.exists(publicKeyHex) || !this.connected(publicKeyHex)) return false; // Fail silently - + if (!this.connected(publicKeyHex)) { + throw new ConnectionManagerError( + `Cannot send message: validator ${publicKeyToAddress(publicKey, this.#config)} is not connected.` + ); + } const validator = this.#validators.get(publicKeyHex); - if (!validator || !validator.connection || !validator.connection.protocolSession) return false; - - let result = false; - await validator.connection.protocolSession.send(message) - .then( - () => { - result = true; - } - ) - .catch( - () => { - result = false; - } - ) - return result; - } - - /** - * Creates a blank entry for a validator in the pool without a connection. - * @param {String | Buffer} publicKey - The public key hex string of the validator to whitelist - */ - // TODO: Deprecated/Unused - remove if not needed - whiteList(publicKey) { - let publicKeyHex = this.#toHexString(publicKey); - this.#validators.set(publicKeyHex, { connection: null, sent: 0 }); + if (!validator || !validator.connection || !validator.connection.protocolSession) { + throw new ConnectionManagerError( + `Cannot send message: no valid connection found for validator ${publicKeyToAddress(publicKey, this.#config)}.` + ); + } + return validator.connection.protocolSession.send(message) } /** @@ -219,17 +177,17 @@ class ConnectionManager { debugLog(`addValidator: max connections reached.`); return false; } - debugLog(`addValidator: adding validator ${this.#toAddress(publicKeyHex)}`); + debugLog(`addValidator: adding validator ${publicKeyToAddress(publicKeyHex, this.#config)}`); if (!this.exists(publicKeyHex)) { - debugLog(`addValidator: appending validator ${this.#toAddress(publicKeyHex)}`); + debugLog(`addValidator: appending validator ${publicKeyToAddress(publicKeyHex, this.#config)}`); this.#append(publicKeyHex, connection); return true; } else if (!this.connected(publicKeyHex)) { - debugLog(`addValidator: updating validator ${this.#toAddress(publicKeyHex)}`); + debugLog(`addValidator: updating validator ${publicKeyToAddress(publicKeyHex, this.#config)}`); this.#update(publicKeyHex, connection); return true; } - debugLog(`addValidator: didn't add validator ${this.#toAddress(publicKeyHex)}`); + debugLog(`addValidator: didn't add validator ${publicKeyToAddress(publicKeyHex, this.#config)}`); return false; // TODO: Implement better success/failure reporting } @@ -238,7 +196,7 @@ class ConnectionManager { * @param {String | Buffer} publicKey - The public key hex string of the validator to remove */ remove(publicKey) { - debugLog(`remove: removing validator ${this.#toAddress(publicKey)}`); + debugLog(`remove: removing validator ${publicKeyToAddress(publicKey, this.#config)}`); const publicKeyHex = this.#toHexString(publicKey); this.#stopHealthCheck(publicKeyHex); if (this.exists(publicKeyHex)) { @@ -249,10 +207,11 @@ class ConnectionManager { entry.connection.end(); } catch (e) { // Ignore errors on connection end + debugLog("remove: failed to end connection: ", e.message); // TODO: Consider logging these errors here in verbose mode } } - debugLog(`remove: removing validator from map: ${this.#toAddress(publicKeyHex)}. Map size before removal: ${this.#validators.size}.`); + debugLog(`remove: removing validator from map: ${publicKeyToAddress(publicKeyHex, this.#config)}. Map size before removal: ${this.#validators.size}.`); this.#validators.delete(publicKeyHex); debugLog(`remove: validator removed successfully. Map size is now ${this.#validators.size}.`); } @@ -330,29 +289,12 @@ class ConnectionManager { prettyPrint() { console.log('Connection count: ', this.connectionCount()) console.log('Validator map keys count: ', this.#validators.size) - console.log('Validator map keys:\n', Array.from(this.#validators.entries()).map(([key, val]) => { + console.log('Validator map keys:\n', Array.from(this.#validators.entries()).map(([publicKey, val]) => { const protocols = val.connection?.protocolSession?.preferredProtocol || 'none'; - return `${this.#toAddress(key)}: ${protocols}`; + return `${publicKeyToAddress(publicKey, this.#config)}: ${protocols}`; }).join('\n')) } - // Note 1: This method shuffles the whole array (in practice, probably around 50 elements) - // just to fetch a small subset of it (most times, 1 element). - // There are more efficient ways to pick a small subset of validators. Consider optimizing. - // Note 2: This method is unused now, but will be kept here for future reference - // TODO: Deprecated/Unused - remove if not needed - pickRandomSubset(validators, maxTargets) { - const copy = validators.slice(); - const count = Math.min(maxTargets, copy.length); - - for (let i = copy.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [copy[i], copy[j]] = [copy[j], copy[i]]; - } - - return copy.slice(0, count); - } - /** * Picks a random validator from the given array of validator public keys. * @param {String[]} validatorPubKeys - An array of validator public key hex strings @@ -382,18 +324,18 @@ class ConnectionManager { * @param {Object} connection - The connection object */ #append(publicKey, connection) { - debugLog(`#append: appending validator ${this.#toAddress(publicKey)}`); + debugLog(`#append: appending validator ${publicKeyToAddress(publicKey, this.#config)}`); const publicKeyHex = this.#toHexString(publicKey); if (this.#validators.has(publicKeyHex)) { // This should never happen, but just in case, we log it - debugLog(`#append: tried to append existing validator: ${this.#toAddress(publicKey)}`); + debugLog(`#append: tried to append existing validator: ${publicKeyToAddress(publicKey, this.#config)}`); return; } - this.#validators.set(publicKeyHex, { connection, sent: 0 }); + this.#validators.set(publicKeyHex, {connection, sent: 0}); connection.on('close', () => { - debugLog(`#append: connection closing for validator ${this.#toAddress(publicKey)}`); + debugLog(`#append: connection closing for validator ${publicKeyToAddress(publicKey, this.#config)}`); this.remove(publicKeyHex); - debugLog(`#append: connection closed for validator ${this.#toAddress(publicKey)}`); + debugLog(`#append: connection closed for validator ${publicKeyToAddress(publicKey, this.#config)}`); }); } @@ -408,25 +350,24 @@ class ConnectionManager { // It would be preferable to keep them separated though, but we would need to review all usages to ensure correctness. // Also, we should remove the 'else' branch below if we decide to keep 'update' and 'append' separated. const publicKeyHex = this.#toHexString(publicKey); - debugLog(`#update: updating validator ${this.#toAddress(publicKey)}`); + debugLog(`#update: updating validator ${publicKeyToAddress(publicKey, this.#config)}`); if (this.#validators.has(publicKeyHex)) { this.#validators.get(publicKeyHex).connection = connection; } else { - this.#validators.set(publicKeyHex, { connection, sent: 0 }); + this.#validators.set(publicKeyHex, {connection, sent: 0}); } } - #toAddress(publicKey) { - const keyHex = b4a.isBuffer(publicKey) ? publicKey : b4a.from(publicKey, 'hex'); - return PeerWallet.encodeBech32m( - this.#config.addressPrefix, - keyHex - ); - } - #toHexString(publicKey) { return b4a.isBuffer(publicKey) ? publicKey.toString('hex') : publicKey; } } +export class ConnectionManagerError extends Error { + constructor(message) { + super(message); + this.name = this.constructor.name; + } +} + export default ConnectionManager; diff --git a/src/core/network/services/MessageOrchestrator.js b/src/core/network/services/MessageOrchestrator.js index eb7c37aa..7f7d3732 100644 --- a/src/core/network/services/MessageOrchestrator.js +++ b/src/core/network/services/MessageOrchestrator.js @@ -1,13 +1,13 @@ -import { generateUUID, sleep } from '../../../utils/helpers.js'; +import { generateUUID, publicKeyToAddress, sleep } from '../../../utils/helpers.js'; import { operationToPayload } from '../../../utils/applyOperations.js'; -import PeerWallet from "trac-wallet"; -import b4a from "b4a"; import { networkMessageFactory } from "../../../messages/network/v1/networkMessageFactory.js"; import { NETWORK_CAPABILITIES } from "../../../utils/constants.js"; import { unsafeEncodeApplyOperation } from "../../../utils/protobuf/operationHelpers.js"; import { normalizeMessageByOperationType } from "../../../utils/normalizers.js"; +import { resultToValidatorAction, SENDER_ACTION } from "../protocols/connectionPolicies.js"; +import { ConnectionManagerError } from './ConnectionManager.js'; /** * MessageOrchestrator coordinates message submission, retry, and validator management. * It works with ConnectionManager and ledger state to ensure reliable message delivery. @@ -32,16 +32,57 @@ class MessageOrchestrator { this.#wallet = wallet; } + /** + * Picks a validator for an outgoing message while avoiding requester self-validation. + * + * ValidatorObserverService already prevents connecting to the local node itself. + * This method handles a different case: for a given message, we avoid selecting + * a validator whose address equals `message.address` (requester), because + * validator-side checks reject that flow. + * + * @param {object} [message] Outgoing operation payload. + * @param {string} [message.address] Requester address (bech32m). + * @returns {string|null} Selected validator public key hex, or null when unavailable. + */ + #pickValidatorForMessage(message) { + const requesterAddress = message?.address; + if (!requesterAddress || typeof this.connectionManager.connectedValidators !== 'function') { + return this.connectionManager.pickRandomConnectedValidator(); + } + + const connected = this.connectionManager.connectedValidators(); + if (!Array.isArray(connected) || connected.length === 0) { + return null; + } + + const eligible = connected.filter((publicKey) => { + return publicKeyToAddress(publicKey, this.#config) !== requesterAddress; + }); + + const pool = eligible.length > 0 ? eligible : connected; + if (typeof this.connectionManager.pickRandomValidator === 'function') { + return this.connectionManager.pickRandomValidator(pool); + } + + const index = Math.floor(Math.random() * pool.length); + return pool[index] ?? null; + } + /** * Sends a message to a single randomly selected connected validator. * @param {object} message - The message object to be sent + * @param retries - The current retry count for this message * @returns {Promise} - true if successful, false otherwise */ + async send(message, retries = 0) { + if (retries > this.#config.maxRetries) { + console.warn(`MessageOrchestrator: Max retries reached for message ${JSON.stringify(message)}. Aborting send.`); + return false; + } - async send(message) { - const validatorPublicKey = this.connectionManager.pickRandomConnectedValidator(); + const validatorPublicKey = this.#pickValidatorForMessage(message); if (!validatorPublicKey) return false; - console.log("Sending message to validator:", PeerWallet.encodeBech32m(this.#config.addressPrefix, b4a.from(validatorPublicKey, 'hex'))); + console.log("Sending message to validator:", publicKeyToAddress(validatorPublicKey, this.#config)); /* NOTE: Since the retry logic for Legacy is handled here, and is very unique to the protocol, * it was decided to not change MessageOrchestrator send method in the refactor to make protocols transparent. @@ -55,9 +96,19 @@ class MessageOrchestrator { // TODO: After Legacy is deprecated, we don't need to check preferred protocol here. const validatorConnection = this.connectionManager.getConnection(validatorPublicKey); const preferredProtocol = validatorConnection.protocolSession.preferredProtocol; - + let success = false; if (preferredProtocol === validatorConnection.protocolSession.supportedProtocols.LEGACY) { - return this.#attemptSendMessageForLegacy(validatorPublicKey, message); + + try { + success = await this.#attemptSendMessageForLegacy(validatorPublicKey, message); + } catch (error) { + success = await this.send(message, retries + 1); + } + if (!success) { + // Remove validator and retry + this.connectionManager.remove(validatorPublicKey); + success = await this.send(message, retries + 1); + } } else if (preferredProtocol === validatorConnection.protocolSession.supportedProtocols.V1) { // TODO: This is probably better placed inside the V1 protocol definition. // Both protocols should receive a 'canonical' message and solve the encodings internally @@ -71,31 +122,65 @@ class MessageOrchestrator { NETWORK_CAPABILITIES ); - return this.connectionManager.sendSingleMessage(v1Message, validatorPublicKey); + await this.connectionManager.sendSingleMessage(v1Message, validatorPublicKey) + .then( + (resultCode) => { + console.log(resultCode); + // TODO: When we will deprecate the legacy protocol, we should refactor this scope, to propagate domain-error with result code. + const action = resultToValidatorAction(resultCode); + switch (action) { + case SENDER_ACTION.SUCCESS: + success = true; + //TODO: Create a function for action below, and replace it also in legacy flow. + this.incrementSentCount(validatorPublicKey); + if (this.shouldRemove(validatorPublicKey)) { + this.connectionManager.remove(validatorPublicKey); + } + break; + case SENDER_ACTION.ROTATE: + this.connectionManager.remove(validatorPublicKey); + break; + case SENDER_ACTION.NO_ROTATE: + // ignore + break; + default: + this.connectionManager.remove(validatorPublicKey); + console.warn( + `MessageOrchestrator: Unrecognized action from connectionPolicies: ${action}. + ResultCode was: ${resultCode}. Removing validator ${publicKeyToAddress(validatorPublicKey, this.#config)}` + ); + break; + } + } + ) + .catch( + async (err) => { + if (err instanceof ConnectionManagerError) { + success = await this.send(message, retries + 1); + console.warn(`MessageOrchestrator: Connection Error: ${err.message}`); + } else { + this.connectionManager.remove(validatorPublicKey); + success = await this.send(message, retries + 1); + } + } + ) + } - return false; + return success; } // TODO: Delete this function after legacy protocol is deprecated async #attemptSendMessageForLegacy(validatorPublicKey, message) { - const startTime = Date.now(); const deductedTxType = operationToPayload(message.type); - let attempts = 0; - while (attempts <= this.#config.maxRetries && Date.now() - startTime < this.#config.messageValidatorResponseTimeout) { - await this.connectionManager.sendSingleMessage(message, validatorPublicKey); - const appeared = await this.waitForUnsignedState(message[deductedTxType].tx, this.#config.messageValidatorRetryDelay); - if (appeared) { - this.incrementSentCount(validatorPublicKey); - if (this.shouldRemove(validatorPublicKey)) { - this.connectionManager.remove(validatorPublicKey); - } - return true; + await this.connectionManager.sendSingleMessage(message, validatorPublicKey); + const appeared = await this.waitForUnsignedState(message[deductedTxType].tx, this.#config.messageValidatorResponseTimeout); + if (appeared) { + this.incrementSentCount(validatorPublicKey); + if (this.shouldRemove(validatorPublicKey)) { + this.connectionManager.remove(validatorPublicKey); } - attempts++; + return true; } - - // If all retries fail, remove validator from pool - this.connectionManager.remove(validatorPublicKey); return false; } diff --git a/src/core/network/services/PendingRequestService.js b/src/core/network/services/PendingRequestService.js index c6085531..99f92b7a 100644 --- a/src/core/network/services/PendingRequestService.js +++ b/src/core/network/services/PendingRequestService.js @@ -93,7 +93,7 @@ class PendingRequestService { id, new V1TimeoutError( `Pending request with ID ${id} from peer ${peerPubKeyHex} timed out after ${entry.timeoutMs} ms.`, - false + true )); }, entry.timeoutMs); @@ -136,7 +136,7 @@ class PendingRequestService { if (!entry) return false; const err = error instanceof V1ProtocolError ? error - : new V1UnexpectedError(error?.message ?? 'Unexpected error', false); + : new V1UnexpectedError(error?.message ?? 'Unexpected error'); entry.reject(err); return true; } diff --git a/src/core/state/State.js b/src/core/state/State.js index 7cd17f2a..edfab3e7 100644 --- a/src/core/state/State.js +++ b/src/core/state/State.js @@ -246,7 +246,7 @@ class State extends ReadyResource { const appendedAt = new Date(); const snapshot = core.snapshot(); // consistent view while generating proofs. await snapshot.ready(); - + // TODO: check state if specific tx has been appened THEN generate a proof. try { const receipts = []; let failedProofs = 0; diff --git a/src/index.js b/src/index.js index aaa363b4..11a624c8 100644 --- a/src/index.js +++ b/src/index.js @@ -303,7 +303,7 @@ export class MainSettlementBus extends ReadyResource { const success = await this.broadcastPartialTransaction(adminRecoveryMessage); if (!success) { - throw new Error("Failed to broadcast transaction after multiple attempts."); + throw new Error("Failed to broadcast transaction. Try again later."); } console.info(`Transaction hash: ${adminRecoveryMessage.rao.tx}`); @@ -424,7 +424,7 @@ export class MainSettlementBus extends ReadyResource { const success = await this.broadcastPartialTransaction(assembledMessage); if (!success) { - throw new Error("Failed to broadcast transaction after multiple attempts."); + throw new Error("Failed to broadcast transaction. Try again later."); } console.info(`Transaction hash: ${assembledMessage.rao.tx}`); @@ -458,7 +458,7 @@ export class MainSettlementBus extends ReadyResource { const success = await this.broadcastPartialTransaction(assembledMessage); if (!success) { - throw new Error("Failed to broadcast transaction after multiple attempts."); + throw new Error("Failed to broadcast transaction. Try again later."); } console.info(`Transaction hash: ${assembledMessage.rao.tx}`); @@ -682,7 +682,7 @@ export class MainSettlementBus extends ReadyResource { const success = await this.broadcastPartialTransaction(payload); if (!success) { - throw new Error("Failed to broadcast transaction after multiple attempts."); + throw new Error("Failed to broadcast transaction. Try again later."); } console.info(`Transaction hash: ${payload.bdo.tx}`); diff --git a/src/utils/constants.js b/src/utils/constants.js index 93d43cb2..c513c5f1 100644 --- a/src/utils/constants.js +++ b/src/utils/constants.js @@ -88,7 +88,7 @@ export const ResultCode = Object.freeze({ TRANSFER_RECIPIENT_BALANCE_OVERFLOW: NetworkResultCode.RESULT_CODE_TRANSFER_RECIPIENT_BALANCE_OVERFLOW, TX_HASH_INVALID_FORMAT: NetworkResultCode.RESULT_CODE_TX_HASH_INVALID_FORMAT, INTERNAL_ENQUEUE_VALIDATION_FAILED: NetworkResultCode.RESULT_CODE_INTERNAL_ENQUEUE_VALIDATION_FAILED, - TX_ACCEPTED_RECEIPT_MISSING: NetworkResultCode.RESULT_CODE_TX_ACCEPTED_RECEIPT_MISSING, + TX_COMMITTED_RECEIPT_MISSING: NetworkResultCode.RESULT_CODE_TX_COMMITTED_RECEIPT_MISSING, VALIDATOR_RESPONSE_TX_TYPE_INVALID: NetworkResultCode.RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_INVALID, VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN: NetworkResultCode.RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN, VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED: NetworkResultCode.RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED, diff --git a/src/utils/protobuf/network.cjs b/src/utils/protobuf/network.cjs index deaa71e5..437e3e37 100644 --- a/src/utils/protobuf/network.cjs +++ b/src/utils/protobuf/network.cjs @@ -67,7 +67,7 @@ exports.ResultCode = { "RESULT_CODE_TRANSFER_RECIPIENT_BALANCE_OVERFLOW": 44, "RESULT_CODE_TX_HASH_INVALID_FORMAT": 45, "RESULT_CODE_INTERNAL_ENQUEUE_VALIDATION_FAILED": 46, - "RESULT_CODE_TX_ACCEPTED_RECEIPT_MISSING": 47, + "RESULT_CODE_TX_COMMITTED_RECEIPT_MISSING": 47, "RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_INVALID": 48, "RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN": 49, "RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED": 50, diff --git a/tests/acceptance/v1/account/account.test.mjs b/tests/acceptance/v1/account/account.test.mjs index ba90ba86..b459b036 100644 --- a/tests/acceptance/v1/account/account.test.mjs +++ b/tests/acceptance/v1/account/account.test.mjs @@ -108,11 +108,17 @@ export const registerAccountTests = (context) => { it("returns 500 on internal error", async () => { const originalGetNodeEntry = context.rpcMsb.state.getNodeEntry + const failingAddress = randomAddress(context.rpcMsb.config.addressPrefix) - context.rpcMsb.state.getNodeEntry = async () => { throw new Error("test") } + context.rpcMsb.state.getNodeEntry = async (address) => { + if (address === failingAddress) { + throw new Error("test") + } + return originalGetNodeEntry.call(context.rpcMsb.state, address) + } try { - const res = await request(context.server).get(`/v1/account/${context.wallet.address}`) + const res = await request(context.server).get(`/v1/account/${failingAddress}`) expect(res.statusCode).toBe(500) expect(res.body).toEqual({ error: 'An error occurred processing the request.' }) } finally { diff --git a/tests/acceptance/v1/tx-details/tx-details.test.mjs b/tests/acceptance/v1/tx-details/tx-details.test.mjs index 209287d8..11d2cc2f 100644 --- a/tests/acceptance/v1/tx-details/tx-details.test.mjs +++ b/tests/acceptance/v1/tx-details/tx-details.test.mjs @@ -1,5 +1,24 @@ import request from "supertest" import { buildRpcSelfTransferPayload, waitForConnection } from "../../../helpers/transactionPayloads.mjs" +import { sleep } from "../../../../src/utils/helpers.js" + +const TX_DETAILS_TIMEOUT_MS = 4000 +const TX_DETAILS_RETRY_INTERVAL_MS = 100 + +const waitForStatusCode = async (requestFactory, expectedStatusCode, timeoutMs = TX_DETAILS_TIMEOUT_MS) => { + const startedAt = Date.now() + let response = null + + while ((Date.now() - startedAt) < timeoutMs) { + response = await requestFactory() + if (response.statusCode === expectedStatusCode) { + return response + } + await sleep(TX_DETAILS_RETRY_INTERVAL_MS) + } + + return response +} export const registerTxDetailsTests = (context) => { describe("GET /v1/tx/details", () => { @@ -17,8 +36,11 @@ export const registerTxDetailsTests = (context) => { .send(JSON.stringify({ payload })) expect(broadcastRes.statusCode).toBe(200) - const resConfirmed = await request(context.server) - .get(`/v1/tx/details/${txHashHex}?confirmed=true`) + const resConfirmed = await waitForStatusCode( + () => request(context.server) + .get(`/v1/tx/details/${txHashHex}?confirmed=true`), + 200 + ) expect(resConfirmed.statusCode).toBe(200) expect(resConfirmed.body).toMatchObject({ @@ -27,8 +49,11 @@ export const registerTxDetailsTests = (context) => { fee: expect.any(String) }) - const resUnconfirmed = await request(context.server) - .get(`/v1/tx/details/${txHashHex}?confirmed=false`) + const resUnconfirmed = await waitForStatusCode( + () => request(context.server) + .get(`/v1/tx/details/${txHashHex}?confirmed=false`), + 200 + ) expect(resUnconfirmed.statusCode).toBe(200) expect(resUnconfirmed.body).toMatchObject({ @@ -57,8 +82,11 @@ export const registerTxDetailsTests = (context) => { .send(JSON.stringify({ payload })) expect(broadcastRes.statusCode).toBe(200) - const res = await request(context.server) - .get(`/v1/tx/details/${txHashHex}?confirmed=false`) + const res = await waitForStatusCode( + () => request(context.server) + .get(`/v1/tx/details/${txHashHex}?confirmed=false`), + 200 + ) expect(res.statusCode).toBe(200) expect(res.body).toMatchObject({ diff --git a/tests/acceptance/v1/tx/tx.test.mjs b/tests/acceptance/v1/tx/tx.test.mjs index 8b9b7cb8..f72409e6 100644 --- a/tests/acceptance/v1/tx/tx.test.mjs +++ b/tests/acceptance/v1/tx/tx.test.mjs @@ -1,5 +1,24 @@ import request from "supertest" import { buildRpcSelfTransferPayload, waitForConnection } from "../../../helpers/transactionPayloads.mjs" +import { sleep } from "../../../../src/utils/helpers.js" + +const TX_ENDPOINT_TIMEOUT_MS = 4000 +const TX_ENDPOINT_RETRY_INTERVAL_MS = 100 + +const waitForStatusCode = async (requestFactory, expectedStatusCode, timeoutMs = TX_ENDPOINT_TIMEOUT_MS) => { + const startedAt = Date.now() + let response = null + + while ((Date.now() - startedAt) < timeoutMs) { + response = await requestFactory() + if (response.statusCode === expectedStatusCode) { + return response + } + await sleep(TX_ENDPOINT_RETRY_INTERVAL_MS) + } + + return response +} export const registerTxTests = (context) => { describe("GET /v1/tx/:hash", () => { @@ -17,7 +36,10 @@ export const registerTxTests = (context) => { .send(JSON.stringify({ payload })) expect(broadcastRes.statusCode).toBe(200) - const res = await request(context.server).get(`/v1/tx/${txHashHex}`) + const res = await waitForStatusCode( + () => request(context.server).get(`/v1/tx/${txHashHex}`), + 200 + ) expect(res.statusCode).toBe(200) expect(res.body).toMatchObject({ txDetails: expect.any(Object) }) }) diff --git a/tests/unit/network/services/ConnectionManager.test.js b/tests/unit/network/services/ConnectionManager.test.js index fd0bcced..2bdefb95 100644 --- a/tests/unit/network/services/ConnectionManager.test.js +++ b/tests/unit/network/services/ConnectionManager.test.js @@ -2,7 +2,7 @@ import sinon from "sinon"; import { hook, test } from 'brittle' import { default as EventEmitter } from "bare-events" import { testKeyPair1, testKeyPair2, testKeyPair3, testKeyPair4, testKeyPair5, testKeyPair6, testKeyPair7, testKeyPair8 } from "../../../fixtures/apply.fixtures.js"; -import ConnectionManager from "../../../../src/core/network/services/ConnectionManager.js"; +import ConnectionManager, { ConnectionManagerError } from "../../../../src/core/network/services/ConnectionManager.js"; import { tick } from "../../../helpers/setupApplyTests.js"; import b4a from 'b4a' import { createConfig, ENV } from "../../../../src/config/env.js"; @@ -135,6 +135,55 @@ test('ConnectionManager', () => { }) }) + test('sendSingleMessage', async t => { + test('returns exact resultCode from protocolSession.send', async t => { + reset() + const data = createConnection(testKeyPair1.publicKey) + data.connection.protocolSession.send = sinon.stub().resolves(ResultCode.TIMEOUT) + const connectionManager = makeManager(6, [data]) + + const result = await connectionManager.sendSingleMessage({ payload: 1 }, testKeyPair1.publicKey) + + t.is(result, ResultCode.TIMEOUT, 'should return the exact result code from protocol session') + t.ok(data.connection.protocolSession.send.calledOnce, 'should invoke protocolSession.send') + }) + + test('throws ConnectionManagerError when validator is disconnected', async t => { + reset() + const connectionManager = makeManager() + + try { + await connectionManager.sendSingleMessage({ payload: 1 }, testKeyPair8.publicKey) + t.fail('expected sendSingleMessage to throw') + } catch (error) { + t.ok(error instanceof ConnectionManagerError, 'should throw ConnectionManagerError') + t.ok(error.message.includes('is not connected'), 'should include disconnected validator details') + } + }) + + test('throws ConnectionManagerError when protocolSession is missing', async t => { + reset() + const emitter = new EventEmitter() + emitter.connected = true + emitter.remotePublicKey = b4a.from(testKeyPair6.publicKey, 'hex') + emitter.end = sinon.stub() + const data = { + key: b4a.from(testKeyPair6.publicKey, 'hex'), + connection: emitter, + } + + const connectionManager = makeManager(6, [data]) + + try { + await connectionManager.sendSingleMessage({ payload: 1 }, testKeyPair6.publicKey) + t.fail('expected sendSingleMessage to throw') + } catch (error) { + t.ok(error instanceof ConnectionManagerError, 'should throw ConnectionManagerError') + t.ok(error.message.includes('no valid connection found'), 'should include protocol session details') + } + }) + }) + // Note: These tests were commented out because connectionManager.send is being deprecated. When it is completely removed, the tests should be deleted. // test('send', async t => { // // test('triggers send on messenger', async t => { @@ -302,4 +351,100 @@ test('ConnectionManager', () => { } }); }) + + test('edge branches', async t => { + test('pickRandomValidator returns null for empty array', async t => { + reset() + const connectionManager = makeManager() + t.is(connectionManager.pickRandomValidator([]), null) + }) + + test('pickRandomConnectedValidator returns null when pool is empty', async t => { + reset() + const connectionManager = makeManager(6, []) + t.is(connectionManager.pickRandomConnectedValidator(), null) + }) + + test('remove missing validator keeps state unchanged', async t => { + reset() + const connectionManager = makeManager() + const before = connectionManager.connectionCount() + connectionManager.remove(testKeyPair8.publicKey) + t.is(connectionManager.connectionCount(), before) + }) + + test('remove handles connection.end throwing and still deletes validator', async t => { + reset() + const data = createConnection(testKeyPair7.publicKey) + data.connection.end = sinon.stub().throws(new Error('end boom')) + const connectionManager = makeManager(6, [data]) + + t.ok(connectionManager.connected(data.key)) + connectionManager.remove(data.key) + t.absent(connectionManager.connected(data.key)) + }) + + test('sent counters handle missing validators safely', async t => { + reset() + const connectionManager = makeManager() + t.is(connectionManager.getSentCount(testKeyPair8.publicKey), 0) + connectionManager.incrementSentCount(testKeyPair8.publicKey) + t.is(connectionManager.getSentCount(testKeyPair8.publicKey), 0) + }) + + test('subscribeToHealthChecks validates service interface', async t => { + reset() + const connectionManager = makeManager() + + await t.exception( + () => connectionManager.subscribeToHealthChecks({ on() {} }), + /must implement on\/off/ + ) + }) + + test('health check removes validator when protocolSession is missing', async t => { + reset() + const emitter = new EventEmitter() + emitter.connected = true + emitter.remotePublicKey = b4a.from(testKeyPair6.publicKey, 'hex') + emitter.end = sinon.stub() + const data = { + key: b4a.from(testKeyPair6.publicKey, 'hex'), + connection: emitter + } + + const connectionManager = makeManager(6, [data]) + const healthCheckService = { + on: (_event, fn) => { healthCheckService.handler = fn; }, + off: () => {}, + has: sinon.stub().returns(true), + stop: sinon.stub(), + handler: null, + } + + connectionManager.subscribeToHealthChecks(healthCheckService) + await healthCheckService.handler(testKeyPair6.publicKey, 'hc-1') + + t.absent(connectionManager.connected(data.key)) + t.ok(healthCheckService.stop.called) + }) + + test('remove tolerates health check service errors', async t => { + reset() + const data = createConnection(testKeyPair5.publicKey) + const connectionManager = makeManager(6, [data]) + const healthCheckService = { + on: (_event, fn) => { healthCheckService.handler = fn; }, + off: () => {}, + has: sinon.stub().throws(new Error('has boom')), + stop: sinon.stub(), + handler: null, + } + connectionManager.subscribeToHealthChecks(healthCheckService) + + connectionManager.remove(data.key) + + t.absent(connectionManager.connected(data.key)) + }) + }) }) diff --git a/tests/unit/network/services/MessageOrchestrator.test.js b/tests/unit/network/services/MessageOrchestrator.test.js new file mode 100644 index 00000000..148e295c --- /dev/null +++ b/tests/unit/network/services/MessageOrchestrator.test.js @@ -0,0 +1,445 @@ +import { hook, test } from 'brittle'; +import sinon from 'sinon'; +import b4a from 'b4a'; + +import { createConfig, ENV } from '../../../../src/config/env.js'; +import MessageOrchestrator from '../../../../src/core/network/services/MessageOrchestrator.js'; +import { OperationType, ResultCode } from '../../../../src/utils/constants.js'; +import NetworkWalletFactory from '../../../../src/core/network/identity/NetworkWalletFactory.js'; +import { testKeyPair1, testKeyPair2 } from '../../../fixtures/apply.fixtures.js'; +import { publicKeyToAddress } from '../../../../src/utils/helpers.js'; +import { ConnectionManagerError } from '../../../../src/core/network/services/ConnectionManager.js'; +import { V1TimeoutError } from '../../../../src/core/network/protocols/v1/V1ProtocolError.js'; + +const VALIDATOR_KEY = testKeyPair2.publicKey; + +const createWallet = (config) => { + const keyPair = { + publicKey: b4a.from(testKeyPair1.publicKey, 'hex'), + secretKey: b4a.from(testKeyPair1.secretKey, 'hex'), + }; + return NetworkWalletFactory.provide({ + enableWallet: false, + keyPair, + networkPrefix: config.addressPrefix, + }); +}; + +const createTransferMessage = (config, wallet) => ({ + type: OperationType.TRANSFER, + address: wallet.address, + tro: { + tx: 'aa'.repeat(32), + txv: 'bb'.repeat(32), + in: 'cc'.repeat(32), + to: publicKeyToAddress(testKeyPair2.publicKey, config), + am: '00'.repeat(16), + is: 'dd'.repeat(64), + }, +}); + +const createConnectionManager = ({ + preferredProtocol = 'v1', + sendSingleMessage = sinon.stub().resolves(ResultCode.OK), + sentCount = 0, + connectedValidators = [VALIDATOR_KEY], +} = {}) => ({ + pickRandomConnectedValidator: sinon.stub().returns(VALIDATOR_KEY), + pickRandomValidator: sinon.stub().callsFake((validators) => validators[0] ?? null), + connectedValidators: sinon.stub().returns(connectedValidators), + getConnection: sinon.stub().returns({ + protocolSession: { + preferredProtocol, + supportedProtocols: { + LEGACY: 'legacy', + V1: 'v1', + } + } + }), + sendSingleMessage, + remove: sinon.stub(), + incrementSentCount: sinon.stub(), + getSentCount: sinon.stub().returns(sentCount), +}); + +hook('setup', () => { + sinon.stub(console, 'log'); + sinon.stub(console, 'warn'); +}); + +hook('teardown', () => { + sinon.restore(); +}); + +test('MessageOrchestrator.send returns false for unsupported protocol', async t => { + const config = createConfig(ENV.DEVELOPMENT, {}); + const connectionManager = createConnectionManager({ preferredProtocol: 'unknown' }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message, 0); + + t.is(result, false); + t.is(connectionManager.sendSingleMessage.callCount, 0); +}); + +test('MessageOrchestrator.send V1 matrix: OK -> SUCCESS', async t => { + const config = createConfig(ENV.DEVELOPMENT, {}); + const connectionManager = createConnectionManager({ + sendSingleMessage: sinon.stub().resolves(ResultCode.OK), + sentCount: 0, + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(connectionManager.incrementSentCount.callCount, 1); + t.is(connectionManager.remove.callCount, 0); +}); + +test('MessageOrchestrator.send V1 matrix: TIMEOUT -> ROTATE', async t => { + const config = createConfig(ENV.DEVELOPMENT, {}); + const connectionManager = createConnectionManager({ + sendSingleMessage: sinon.stub().resolves(ResultCode.TIMEOUT), + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, false); + t.is(connectionManager.sendSingleMessage.callCount, 1); + t.is(connectionManager.remove.callCount, 1); + t.is(connectionManager.incrementSentCount.callCount, 0); +}); + +test('MessageOrchestrator.send V1 matrix: TX_ALREADY_PENDING -> NO_ROTATE', async t => { + const config = createConfig(ENV.DEVELOPMENT, {}); + const connectionManager = createConnectionManager({ + sendSingleMessage: sinon.stub().resolves(ResultCode.TX_ALREADY_PENDING), + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, false); + t.is(connectionManager.sendSingleMessage.callCount, 1); + t.is(connectionManager.remove.callCount, 0); + t.is(connectionManager.incrementSentCount.callCount, 0); +}); + +test('MessageOrchestrator.send V1 matrix: unknown code -> UNDEFINED', async t => { + const config = createConfig(ENV.DEVELOPMENT, {}); + const connectionManager = createConnectionManager({ + sendSingleMessage: sinon.stub().resolves(99999), + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, false); + t.is(connectionManager.sendSingleMessage.callCount, 1); + t.is(connectionManager.remove.callCount, 1); +}); + +test('MessageOrchestrator.send removes validator when threshold reached on success', async t => { + const config = createConfig(ENV.DEVELOPMENT, {}); + const connectionManager = createConnectionManager({ + sendSingleMessage: sinon.stub().resolves(ResultCode.OK), + sentCount: config.messageThreshold, + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(connectionManager.incrementSentCount.callCount, 1); + t.is(connectionManager.remove.callCount, 1); +}); + +test('MessageOrchestrator.send retries on ConnectionManagerError without removing validator', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 2 }); + const sendSingleMessage = sinon.stub(); + sendSingleMessage.onFirstCall().rejects(new ConnectionManagerError('disconnected')); + sendSingleMessage.onSecondCall().resolves(ResultCode.OK); + + const connectionManager = createConnectionManager({ sendSingleMessage, sentCount: 0 }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 2); + t.is(connectionManager.remove.callCount, 0); + t.is(connectionManager.incrementSentCount.callCount, 1); +}); + +test('MessageOrchestrator.send retries on generic catch error with remove + retry', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 2 }); + const sendSingleMessage = sinon.stub(); + sendSingleMessage.onFirstCall().rejects(new Error('response validation failed')); + sendSingleMessage.onSecondCall().resolves(ResultCode.OK); + + const connectionManager = createConnectionManager({ sendSingleMessage, sentCount: 0 }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 2); + t.is(connectionManager.remove.callCount, 1); + t.is(connectionManager.incrementSentCount.callCount, 1); +}); + +test('MessageOrchestrator.send max retries guard returns false immediately', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 1 }); + const connectionManager = createConnectionManager({ + sendSingleMessage: sinon.stub().resolves(ResultCode.OK), + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message, 2); + + t.is(result, false); + t.is(connectionManager.pickRandomConnectedValidator.callCount, 0); + t.is(connectionManager.sendSingleMessage.callCount, 0); +}); + +test('MessageOrchestrator.send timeout split: pending timeout rejection goes through catch and retries', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 2 }); + const sendSingleMessage = sinon.stub(); + sendSingleMessage.onFirstCall().rejects(new V1TimeoutError('pending request timeout', false)); + sendSingleMessage.onSecondCall().resolves(ResultCode.OK); + + const connectionManager = createConnectionManager({ sendSingleMessage }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 2); + t.is(connectionManager.remove.callCount, 1); +}); + +test('MessageOrchestrator.send timeout split: TIMEOUT result code stays in then path and does not retry', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 2 }); + const sendSingleMessage = sinon.stub().resolves(ResultCode.TIMEOUT); + const connectionManager = createConnectionManager({ sendSingleMessage }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, false); + t.is(sendSingleMessage.callCount, 1); + t.is(connectionManager.remove.callCount, 1); +}); + +test('MessageOrchestrator.send validation split: thrown validation error goes through catch', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 2 }); + const sendSingleMessage = sinon.stub(); + sendSingleMessage.onFirstCall().rejects(new Error('validator response validation failed')); + sendSingleMessage.onSecondCall().resolves(ResultCode.OK); + + const connectionManager = createConnectionManager({ sendSingleMessage }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 2); + t.is(connectionManager.remove.callCount, 1); +}); + +test('MessageOrchestrator.send validation split: non-OK result code stays in then and uses policy', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 2 }); + const sendSingleMessage = sinon.stub().resolves(ResultCode.SCHEMA_VALIDATION_FAILED); + const connectionManager = createConnectionManager({ sendSingleMessage }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, false); + t.is(sendSingleMessage.callCount, 1); + t.is(connectionManager.remove.callCount, 1); +}); + +test('MessageOrchestrator.send legacy path succeeds and increments sent count', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 0 }); + const sendSingleMessage = sinon.stub().resolves(true); + const connectionManager = createConnectionManager({ + preferredProtocol: 'legacy', + sendSingleMessage, + sentCount: 0, + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + sinon.stub(orchestrator, 'waitForUnsignedState').resolves(true); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 1); + t.is(connectionManager.incrementSentCount.callCount, 1); + t.is(connectionManager.remove.callCount, 0); +}); + +test('MessageOrchestrator.send legacy path false result removes validator and retries', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 1 }); + const sendSingleMessage = sinon.stub().resolves(true); + const connectionManager = createConnectionManager({ + preferredProtocol: 'legacy', + sendSingleMessage, + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + const waitStub = sinon.stub(orchestrator, 'waitForUnsignedState'); + waitStub.onFirstCall().resolves(false); + waitStub.onSecondCall().resolves(true); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 2); + t.is(connectionManager.remove.callCount, 1); +}); + +test('MessageOrchestrator.send legacy path catches send error and retries', async t => { + const config = createConfig(ENV.DEVELOPMENT, { maxRetries: 1 }); + const sendSingleMessage = sinon.stub(); + sendSingleMessage.onFirstCall().rejects(new Error('legacy send failed')); + sendSingleMessage.onSecondCall().resolves(true); + + const connectionManager = createConnectionManager({ + preferredProtocol: 'legacy', + sendSingleMessage, + }); + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const wallet = createWallet(config); + const message = createTransferMessage(config, wallet); + sinon.stub(orchestrator, 'waitForUnsignedState').resolves(true); + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 2); + t.is(connectionManager.remove.callCount, 0); +}); + +test('MessageOrchestrator.waitForUnsignedState returns true when state entry appears', async t => { + const clock = sinon.useFakeTimers({ now: 1 }); + try { + const config = createConfig(ENV.DEVELOPMENT, {}); + const state = { + get: sinon.stub() + .onFirstCall().resolves(null) + .onSecondCall().resolves({ tx: 'found' }), + }; + const orchestrator = new MessageOrchestrator(createConnectionManager(), state, config); + + const pending = orchestrator.waitForUnsignedState('tx-hash', 500); + await clock.tickAsync(450); + const result = await pending; + + t.is(result, true); + t.ok(state.get.callCount >= 2); + } finally { + clock.restore(); + } +}); + +test('MessageOrchestrator.waitForUnsignedState returns false on timeout', async t => { + const clock = sinon.useFakeTimers({ now: 1 }); + try { + const config = createConfig(ENV.DEVELOPMENT, {}); + const state = { get: sinon.stub().resolves(null) }; + const orchestrator = new MessageOrchestrator(createConnectionManager(), state, config); + + const pending = orchestrator.waitForUnsignedState('tx-hash', 400); + await clock.tickAsync(1000); + const result = await pending; + + t.is(result, false); + t.ok(state.get.callCount >= 1); + } finally { + clock.restore(); + } +}); + +test('MessageOrchestrator.send V1 avoids selecting validator with requester address when possible', async t => { + const config = createConfig(ENV.DEVELOPMENT, {}); + const requesterValidatorKey = testKeyPair1.publicKey; + const otherValidatorKey = testKeyPair2.publicKey; + const sendSingleMessage = sinon.stub().resolves(ResultCode.OK); + + const connectionManager = createConnectionManager({ + sendSingleMessage, + connectedValidators: [requesterValidatorKey, otherValidatorKey], + }); + connectionManager.getConnection = sinon.stub().returns({ + protocolSession: { + preferredProtocol: 'v1', + supportedProtocols: { + LEGACY: 'legacy', + V1: 'v1', + } + } + }); + + const orchestrator = new MessageOrchestrator(connectionManager, { get: async () => null }, config); + const requesterAddress = publicKeyToAddress(requesterValidatorKey, config); + const wallet = createWallet(config); + const message = { + ...createTransferMessage(config, wallet), + address: requesterAddress, + }; + + orchestrator.setWallet(wallet); + const result = await orchestrator.send(message); + + t.is(result, true); + t.is(sendSingleMessage.callCount, 1); + t.is(sendSingleMessage.firstCall.args[1], otherValidatorKey); +}); diff --git a/tests/unit/network/services/PendingRequestService.test.js b/tests/unit/network/services/PendingRequestService.test.js index 477e5cf1..803489d2 100644 --- a/tests/unit/network/services/PendingRequestService.test.js +++ b/tests/unit/network/services/PendingRequestService.test.js @@ -85,7 +85,7 @@ test('PendingRequestService rejects and removes pending request', async t => { } catch (error) { t.ok(error instanceof V1UnexpectedError); t.is(error.message, expectedError.message); - t.is(error.endConnection, false); + t.is(error.endConnection, true); } t.is(service.rejectPendingRequest('missing', new Error('missing')), false); @@ -382,7 +382,7 @@ test('PendingRequestService.rejectPendingRequest falls back to Unexpected error } catch (error) { t.ok(error instanceof V1UnexpectedError); t.is(error.message, 'Unexpected error'); - t.is(error.endConnection, false); + t.is(error.endConnection, true); } }); diff --git a/tests/unit/network/services/services.test.js b/tests/unit/network/services/services.test.js index f6fa48f0..b9069363 100644 --- a/tests/unit/network/services/services.test.js +++ b/tests/unit/network/services/services.test.js @@ -7,6 +7,7 @@ async function runTests() { await import('./ValidatorHealthCheckService.test.js'); await import('./TransactionRateLimiterService.test.js'); await import('./ConnectionManager.test.js'); + await import('./MessageOrchestrator.test.js'); await import('./PendingRequestService.test.js'); await import('./TransactionCommitService.test.js'); await import('./TransactionPoolService.test.js'); diff --git a/tests/unit/network/v1/V1BaseOperation.test.js b/tests/unit/network/v1/V1BaseOperation.test.js new file mode 100644 index 00000000..92ab6b9b --- /dev/null +++ b/tests/unit/network/v1/V1BaseOperation.test.js @@ -0,0 +1,356 @@ +import test from 'brittle'; +import b4a from 'b4a'; +import PeerWallet from 'trac-wallet'; + +import V1BaseOperation from '../../../../src/core/network/protocols/v1/validators/V1BaseOperation.js'; +import NetworkWalletFactory from '../../../../src/core/network/identity/NetworkWalletFactory.js'; +import NetworkMessageBuilder from '../../../../src/messages/network/v1/NetworkMessageBuilder.js'; +import { + V1InvalidPayloadError, + V1SignatureInvalidError, + V1UnexpectedError +} from '../../../../src/core/network/protocols/v1/V1ProtocolError.js'; +import { + NetworkOperationType, + ResultCode +} from '../../../../src/utils/constants.js'; +import { testKeyPair1, testKeyPair2 } from '../../../fixtures/apply.fixtures.js'; +import { config } from '../../../helpers/config.js'; +import { errorMessageIncludes } from '../../../helpers/regexHelper.js'; + +const createWallet = (fixture = testKeyPair1) => { + const keyPair = { + publicKey: b4a.from(fixture.publicKey, 'hex'), + secretKey: b4a.from(fixture.secretKey, 'hex'), + }; + + return NetworkWalletFactory.provide({ + enableWallet: false, + keyPair, + networkPrefix: config.addressPrefix, + }); +}; + +const buildSignedPayload = async (wallet, type, options = {}) => { + const builder = new NetworkMessageBuilder(wallet, config) + .setType(type) + .setId(`id-${type}-${Date.now()}-${Math.random()}`) + .setTimestamp() + .setCapabilities(['cap:a']); + + switch (type) { + case NetworkOperationType.LIVENESS_REQUEST: + break; + case NetworkOperationType.LIVENESS_RESPONSE: + builder.setResultCode(options.resultCode ?? ResultCode.OK); + break; + case NetworkOperationType.BROADCAST_TRANSACTION_REQUEST: + builder.setData(options.data ?? b4a.from('abcd', 'hex')); + break; + case NetworkOperationType.BROADCAST_TRANSACTION_RESPONSE: + builder + .setResultCode(options.resultCode ?? ResultCode.OK) + .setProof(options.proof ?? b4a.from('deadbeef', 'hex')) + .setAppendedAt(options.appendedAt ?? Date.now()); + break; + default: + throw new Error(`Unsupported type in test helper: ${type}`); + } + + await builder.buildPayload(); + return builder.getResult(); +}; + +test('V1BaseOperation.validate throws "must be implemented" by default', async t => { + const operation = new V1BaseOperation(config); + + await t.exception( + async () => operation.validate({}, {}, {}), + errorMessageIncludes('must be implemented') + ); +}); + +test('V1BaseOperation.isPayloadSchemaValid handles missing/invalid type cases', async t => { + const operation = new V1BaseOperation(config); + + t.exception( + () => operation.isPayloadSchemaValid(null), + errorMessageIncludes('Payload or payload type is missing') + ); + + t.exception( + () => operation.isPayloadSchemaValid({ type: null }), + errorMessageIncludes('Payload or payload type is missing') + ); + + t.exception( + () => operation.isPayloadSchemaValid({ type: 1.5 }), + errorMessageIncludes('Operation type must be an integer') + ); + + t.exception( + () => operation.isPayloadSchemaValid({ type: 0 }), + errorMessageIncludes('Operation type is unspecified') + ); + + t.exception( + () => operation.isPayloadSchemaValid({ type: 9999 }), + errorMessageIncludes('Unknown operation type') + ); +}); + +test('V1BaseOperation.isPayloadSchemaValid accepts all supported message schemas', async t => { + const operation = new V1BaseOperation(config); + const wallet = createWallet(); + + const payloads = [ + await buildSignedPayload(wallet, NetworkOperationType.LIVENESS_REQUEST), + await buildSignedPayload(wallet, NetworkOperationType.LIVENESS_RESPONSE), + await buildSignedPayload(wallet, NetworkOperationType.BROADCAST_TRANSACTION_REQUEST), + await buildSignedPayload(wallet, NetworkOperationType.BROADCAST_TRANSACTION_RESPONSE), + ]; + + for (const payload of payloads) { + operation.isPayloadSchemaValid(payload); + } + + t.pass(); +}); + +test('V1BaseOperation.validateSignature verifies valid signatures for all supported message types', async t => { + const operation = new V1BaseOperation(config); + const wallet = createWallet(); + + const payloads = [ + await buildSignedPayload(wallet, NetworkOperationType.LIVENESS_REQUEST), + await buildSignedPayload(wallet, NetworkOperationType.LIVENESS_RESPONSE), + await buildSignedPayload(wallet, NetworkOperationType.BROADCAST_TRANSACTION_REQUEST), + await buildSignedPayload(wallet, NetworkOperationType.BROADCAST_TRANSACTION_RESPONSE), + ]; + + for (const payload of payloads) { + await operation.validateSignature(payload, wallet.publicKey); + } + + t.pass(); +}); + +test('V1BaseOperation.validateSignature throws V1SignatureInvalidError on wrong public key', async t => { + const operation = new V1BaseOperation(config); + const wallet = createWallet(testKeyPair1); + const otherWallet = createWallet(testKeyPair2); + + const payload = await buildSignedPayload(wallet, NetworkOperationType.LIVENESS_REQUEST); + + await t.exception( + async () => operation.validateSignature(payload, otherWallet.publicKey), + errorMessageIncludes('signature verification failed') + ); +}); + +test('V1BaseOperation.validateSignature rethrows protocol-shaped build errors', async t => { + const operation = new V1BaseOperation(config); + + const payload = { + type: 0, + id: 'id', + timestamp: Date.now(), + capabilities: [], + }; + + try { + await operation.validateSignature(payload, b4a.alloc(32, 1)); + t.fail('expected validateSignature to throw'); + } catch (error) { + t.ok(error instanceof V1InvalidPayloadError); + t.is(error.resultCode, ResultCode.INVALID_PAYLOAD); + t.ok(error.message.includes('Operation type is unspecified')); + } +}); + +test('V1BaseOperation.validateSignature wraps non-protocol build errors as V1InvalidPayloadError', async t => { + const operation = new V1BaseOperation(config); + + const payload = { + type: NetworkOperationType.LIVENESS_REQUEST, + id: 'id', + timestamp: Date.now(), + capabilities: [], + }; + + try { + await operation.validateSignature(payload, b4a.alloc(32, 1)); + t.fail('expected validateSignature to throw'); + } catch (error) { + t.ok(error instanceof V1InvalidPayloadError); + t.is(error.resultCode, ResultCode.INVALID_PAYLOAD); + t.ok(error.message.includes('Failed to build signature message')); + } +}); + +test('V1BaseOperation.validateSignature throws V1InvalidPayloadError when hashing fails', async t => { + const operation = new V1BaseOperation(config); + const wallet = createWallet(); + const payload = await buildSignedPayload(wallet, NetworkOperationType.LIVENESS_REQUEST); + + const originalBlake3 = PeerWallet.blake3; + PeerWallet.blake3 = async () => { + throw new Error('hash fail'); + }; + + t.teardown(() => { + PeerWallet.blake3 = originalBlake3; + }); + + try { + await operation.validateSignature(payload, wallet.publicKey); + t.fail('expected validateSignature to throw'); + } catch (error) { + t.ok(error instanceof V1InvalidPayloadError); + t.ok(error.message.includes('Failed to hash signature message')); + } +}); + +test('V1BaseOperation.validateSignature handles verify() throw as invalid signature', async t => { + const operation = new V1BaseOperation(config); + const wallet = createWallet(); + const payload = await buildSignedPayload(wallet, NetworkOperationType.LIVENESS_REQUEST); + + const originalVerify = PeerWallet.verify; + PeerWallet.verify = () => { + throw new Error('verify fail'); + }; + + t.teardown(() => { + PeerWallet.verify = originalVerify; + }); + + try { + await operation.validateSignature(payload, wallet.publicKey); + t.fail('expected validateSignature to throw'); + } catch (error) { + t.ok(error instanceof V1SignatureInvalidError); + t.is(error.resultCode, ResultCode.SIGNATURE_INVALID); + } +}); + +test('V1BaseOperation.validateSignature enforces BROADCAST_TRANSACTION_RESPONSE proof/appendedAt invariants', async t => { + const operation = new V1BaseOperation(config); + const wallet = createWallet(); + + const validBase = await buildSignedPayload(wallet, NetworkOperationType.BROADCAST_TRANSACTION_RESPONSE); + + const cases = [ + { + mutate: payload => { + payload.broadcast_transaction_response.result = ResultCode.OK; + payload.broadcast_transaction_response.proof = b4a.alloc(0); + }, + match: 'Result code OK requires non-empty proof' + }, + { + mutate: payload => { + payload.broadcast_transaction_response.result = ResultCode.TX_ACCEPTED_PROOF_UNAVAILABLE; + payload.broadcast_transaction_response.proof = b4a.from('aa', 'hex'); + }, + match: 'TX_ACCEPTED_PROOF_UNAVAILABLE requires empty proof' + }, + { + mutate: payload => { + payload.broadcast_transaction_response.result = ResultCode.TX_ACCEPTED_PROOF_UNAVAILABLE; + payload.broadcast_transaction_response.proof = b4a.alloc(0); + payload.broadcast_transaction_response.appendedAt = 0; + }, + match: 'TX_ACCEPTED_PROOF_UNAVAILABLE requires appendedAt > 0' + }, + { + mutate: payload => { + payload.broadcast_transaction_response.result = ResultCode.TIMEOUT; + payload.broadcast_transaction_response.proof = b4a.from('aa', 'hex'); + payload.broadcast_transaction_response.appendedAt = 0; + }, + match: 'Non-OK result code requires empty proof' + }, + { + mutate: payload => { + payload.broadcast_transaction_response.result = ResultCode.TIMEOUT; + payload.broadcast_transaction_response.proof = b4a.alloc(0); + payload.broadcast_transaction_response.appendedAt = Date.now(); + }, + match: 'Non-OK result code requires appendedAt to be 0' + }, + ]; + + for (const scenario of cases) { + const payload = structuredClone(validBase); + scenario.mutate(payload); + + await t.exception( + async () => operation.validateSignature(payload, wallet.publicKey), + errorMessageIncludes(scenario.match) + ); + } +}); + +test('V1BaseOperation.validateSignature throws V1UnexpectedError for unknown operation type', async t => { + const operation = new V1BaseOperation(config); + + const payload = { + type: 9999, + id: 'id', + timestamp: Date.now(), + capabilities: [], + }; + + await t.exception( + async () => operation.validateSignature(payload, b4a.alloc(32, 1)), + errorMessageIncludes('Unknown operation type') + ); +}); + +test('V1BaseOperation.validatePeerCorrectness validates response sender identity', t => { + const operation = new V1BaseOperation(config); + const remotePublicKey = b4a.alloc(32, 9); + + operation.validatePeerCorrectness(remotePublicKey, { + requestedTo: b4a.toString(remotePublicKey, 'hex') + }); + + t.exception( + () => operation.validatePeerCorrectness(remotePublicKey, { requestedTo: 'ff' }), + errorMessageIncludes('Response sender mismatch') + ); +}); + +test('V1BaseOperation.validateResponseType supports expected mappings and rejects mismatches', t => { + const operation = new V1BaseOperation(config); + + operation.validateResponseType( + { type: NetworkOperationType.LIVENESS_RESPONSE }, + { id: '1', requestType: NetworkOperationType.LIVENESS_REQUEST } + ); + + operation.validateResponseType( + { type: NetworkOperationType.BROADCAST_TRANSACTION_RESPONSE }, + { id: '2', requestType: NetworkOperationType.BROADCAST_TRANSACTION_REQUEST } + ); + + t.exception( + () => operation.validateResponseType( + { type: NetworkOperationType.BROADCAST_TRANSACTION_RESPONSE }, + { id: '3', requestType: NetworkOperationType.LIVENESS_REQUEST } + ), + errorMessageIncludes('Response type mismatch') + ); + + try { + operation.validateResponseType( + { type: NetworkOperationType.LIVENESS_RESPONSE }, + { id: '4', requestType: 9999 } + ); + t.fail('expected validateResponseType to throw'); + } catch (error) { + t.ok(error instanceof V1UnexpectedError); + t.ok(error.message.includes('Unsupported pending request type')); + } +}); diff --git a/tests/unit/network/v1/V1BroadcastTransactionRequest.test.js b/tests/unit/network/v1/V1BroadcastTransactionRequest.test.js new file mode 100644 index 00000000..2d98f48c --- /dev/null +++ b/tests/unit/network/v1/V1BroadcastTransactionRequest.test.js @@ -0,0 +1,56 @@ +import test from 'brittle'; +import b4a from 'b4a'; + +import V1BroadcastTransactionRequest from '../../../../src/core/network/protocols/v1/validators/V1BroadcastTransactionRequest.js'; +import { V1InvalidPayloadError } from '../../../../src/core/network/protocols/v1/V1ProtocolError.js'; +import { MAX_PARTIAL_TX_PAYLOAD_BYTE_SIZE } from '../../../../src/utils/constants.js'; +import { config } from '../../../helpers/config.js'; + +test('V1BroadcastTransactionRequest.validate runs schema, size and signature validation', async t => { + const { default: FreshV1BroadcastTransactionRequest } = await import( + `../../../../src/core/network/protocols/v1/validators/V1BroadcastTransactionRequest.js?fresh=${Date.now()}` + ); + const validator = new FreshV1BroadcastTransactionRequest(config); + const calls = []; + + validator.isPayloadSchemaValid = () => calls.push('schema'); + validator.isDataPropertySizeValid = () => calls.push('size'); + validator.validateSignature = async () => calls.push('signature'); + + const result = await validator.validate({}, b4a.alloc(32, 1)); + + t.is(result, true); + t.is(calls.length, 3); + t.is(calls[0], 'schema'); + t.is(calls[1], 'size'); + t.is(calls[2], 'signature'); +}); + +test('V1BroadcastTransactionRequest.isDataPropertySizeValid accepts max allowed payload size', t => { + const validator = new V1BroadcastTransactionRequest(config); + const payload = { + broadcast_transaction_request: { + data: b4a.alloc(MAX_PARTIAL_TX_PAYLOAD_BYTE_SIZE, 1) + } + }; + + validator.isDataPropertySizeValid(payload); + t.pass(); +}); + +test('V1BroadcastTransactionRequest.isDataPropertySizeValid throws for oversized payload', t => { + const validator = new V1BroadcastTransactionRequest(config); + const payload = { + broadcast_transaction_request: { + data: b4a.alloc(MAX_PARTIAL_TX_PAYLOAD_BYTE_SIZE + 1, 1) + } + }; + + try { + validator.isDataPropertySizeValid(payload); + t.fail('expected size validation to throw'); + } catch (error) { + t.ok(error instanceof V1InvalidPayloadError); + t.ok(error.message.includes('exceeds the maximum allowed byte size')); + } +}); diff --git a/tests/unit/network/v1/V1BroadcastTransactionResponse.test.js b/tests/unit/network/v1/V1BroadcastTransactionResponse.test.js index a08954b2..a1a75c65 100644 --- a/tests/unit/network/v1/V1BroadcastTransactionResponse.test.js +++ b/tests/unit/network/v1/V1BroadcastTransactionResponse.test.js @@ -1,8 +1,47 @@ import {test} from 'brittle'; import b4a from 'b4a'; +import Autobase from 'autobase'; +import Hypercore from 'hypercore'; -import {extractRequiredVaFromDecodedTx} from '../../../../src/core/network/protocols/v1/validators/V1BroadcastTransactionResponse.js'; -import {ResultCode} from '../../../../src/utils/constants.js'; +import V1BroadcastTransactionResponse, {extractRequiredVaFromDecodedTx} from '../../../../src/core/network/protocols/v1/validators/V1BroadcastTransactionResponse.js'; +import {V1ProtocolError} from '../../../../src/core/network/protocols/v1/V1ProtocolError.js'; +import Check from '../../../../src/utils/check.js'; +import { + unsafeEncodeApplyOperation, + unsafeDecodeApplyOperation, +} from '../../../../src/utils/protobuf/operationHelpers.js'; +import {addressToBuffer} from '../../../../src/core/state/utils/address.js'; +import {publicKeyToAddress} from '../../../../src/utils/helpers.js'; +import {OperationType, ResultCode} from '../../../../src/utils/constants.js'; +import { config } from '../../../helpers/config.js'; +import protobufFixtures from '../../../fixtures/protobuf.fixtures.js'; +import { testKeyPair1 } from '../../../fixtures/apply.fixtures.js'; + +const remotePublicKey = b4a.from(testKeyPair1.publicKey, 'hex'); +const remoteAddressBuffer = addressToBuffer(publicKeyToAddress(remotePublicKey, config), config.addressPrefix); +const writerKey = b4a.alloc(32, 2); + +const overrideCheckMethods = (t, overrides) => { + const originals = {}; + for (const [name, impl] of Object.entries(overrides)) { + originals[name] = Check.prototype[name]; + Check.prototype[name] = impl; + } + + t.teardown(() => { + for (const [name, original] of Object.entries(originals)) { + Check.prototype[name] = original; + } + }); +}; + +const overrideFunction = (t, target, key, replacement) => { + const original = target[key]; + target[key] = replacement; + t.teardown(() => { + target[key] = original; + }); +}; test('extractRequiredVaFromDecodedTx throws VALIDATOR_TX_OBJECT_INVALID for non-object', t => { try { @@ -21,3 +60,451 @@ test('extractRequiredVaFromDecodedTx throws VALIDATOR_VA_MISSING when va is miss t.is(err.resultCode, ResultCode.VALIDATOR_VA_MISSING); } }); + +test('extractRequiredVaFromDecodedTx returns va buffer when present', t => { + const va = b4a.alloc(39, 1); + const extracted = extractRequiredVaFromDecodedTx({ + type: OperationType.TX, + txo: {va} + }); + + t.ok(b4a.equals(extracted, va)); +}); + +test('validate skips proof validation when result code is non-OK', async t => { + const validator = new V1BroadcastTransactionResponse(config); + let proofCalled = false; + + validator.isPayloadSchemaValid = () => true; + validator.validateResponseType = () => true; + validator.validatePeerCorrectness = () => true; + validator.validateSignature = async () => true; + validator.verifyProofOfPublication = async () => { + proofCalled = true; + return {}; + }; + + const payload = { + broadcast_transaction_response: { + result: ResultCode.TIMEOUT, + } + }; + + const result = await validator.validate( + payload, + { remotePublicKey: b4a.alloc(32, 1) }, + {}, + null + ); + + t.is(result, true); + t.absent(proofCalled); +}); + +test('validate runs proof validation pipeline when result code is OK', async t => { + const validator = new V1BroadcastTransactionResponse(config); + const calls = []; + + validator.isPayloadSchemaValid = () => calls.push('schema'); + validator.validateResponseType = () => calls.push('type'); + validator.validatePeerCorrectness = () => calls.push('peer'); + validator.validateSignature = async () => calls.push('signature'); + validator.verifyProofOfPublication = async () => { + calls.push('proof'); + return { proof: {}, manifest: {} }; + }; + validator.assertProofPayloadMatchesRequestPayload = async () => { + calls.push('assert-proof-payload'); + return { + validatorDecodedTx: { type: OperationType.TX, txo: { va: remoteAddressBuffer } }, + manifest: {} + }; + }; + validator.validateDecodedCompletePayloadSchema = () => calls.push('schema-complete'); + validator.validateWritingKey = async () => { + calls.push('writer-key'); + return { + writerKeyFromManifest: writerKey, + validatorAddressCorrelatedWithManifest: remoteAddressBuffer + }; + }; + validator.validateValidatorCorrectness = async () => calls.push('validator-correctness'); + + const result = await validator.validate( + { broadcast_transaction_response: { result: ResultCode.OK } }, + { remotePublicKey }, + { requestTxData: b4a.alloc(1), requestedTo: b4a.toString(remotePublicKey, 'hex') }, + {} + ); + + t.is(result, true); + t.alike(calls, [ + 'schema', + 'type', + 'peer', + 'signature', + 'proof', + 'assert-proof-payload', + 'schema-complete', + 'writer-key', + 'validator-correctness' + ]); +}); + +test('validate rejects TX_COMMITTED_RECEIPT_MISSING as validator internal error', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + validator.isPayloadSchemaValid = () => true; + validator.validateResponseType = () => true; + validator.validatePeerCorrectness = () => true; + validator.validateSignature = async () => true; + + const payload = { + broadcast_transaction_response: { + result: ResultCode.TX_COMMITTED_RECEIPT_MISSING, + } + }; + + try { + await validator.validate( + payload, + { remotePublicKey: b4a.alloc(32, 1) }, + {}, + null + ); + t.fail('expected validate to throw'); + } catch (error) { + t.ok(error instanceof V1ProtocolError); + t.is(error.resultCode, ResultCode.TX_COMMITTED_RECEIPT_MISSING); + t.is(error.endConnection, true); + } +}); + +test('validateDecodedCompletePayloadSchema throws for missing, unknown and unsupported types', t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + validator.validateDecodedCompletePayloadSchema({}); + t.fail('expected missing type to throw'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_RESPONSE_TX_TYPE_INVALID); + } + + try { + validator.validateDecodedCompletePayloadSchema({ type: 9999 }); + t.fail('expected unknown type to throw'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN); + } + + try { + validator.validateDecodedCompletePayloadSchema({ type: OperationType.ADD_ADMIN }); + t.fail('expected unsupported type to throw'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED); + } +}); + +test('validateDecodedCompletePayloadSchema selects proper validators for supported operation types', t => { + const validator = new V1BroadcastTransactionResponse(config); + const called = []; + + overrideCheckMethods(t, { + validateRoleAccessOperation: () => { + called.push('role'); + return true; + }, + validateBootstrapDeploymentOperation: () => { + called.push('bootstrap'); + return true; + }, + validateTransactionOperation: () => { + called.push('tx'); + return true; + }, + validateTransferOperation: () => { + called.push('transfer'); + return true; + }, + }); + + validator.validateDecodedCompletePayloadSchema({ type: OperationType.ADD_WRITER }); + validator.validateDecodedCompletePayloadSchema({ type: OperationType.BOOTSTRAP_DEPLOYMENT }); + validator.validateDecodedCompletePayloadSchema({ type: OperationType.TX }); + validator.validateDecodedCompletePayloadSchema({ type: OperationType.TRANSFER }); + + t.alike(called, ['role', 'bootstrap', 'tx', 'transfer']); +}); + +test('validateDecodedCompletePayloadSchema throws VALIDATOR_RESPONSE_SCHEMA_INVALID when selected validator fails', t => { + const validator = new V1BroadcastTransactionResponse(config); + + overrideCheckMethods(t, { + validateTransactionOperation: () => false, + }); + + try { + validator.validateDecodedCompletePayloadSchema({ type: OperationType.TX }); + t.fail('expected schema invalid'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_RESPONSE_SCHEMA_INVALID); + } +}); + +test('verifyProofOfPublication delegates verification to state instance', async t => { + const validator = new V1BroadcastTransactionResponse(config); + const proof = b4a.from('deadbeef', 'hex'); + + const result = await validator.verifyProofOfPublication( + { + broadcast_transaction_response: { + proof, + } + }, + { + verifyProofOfPublication: receivedProof => { + t.ok(b4a.equals(receivedProof, proof)); + return { ok: true }; + } + } + ); + + t.alike(result, { ok: true }); +}); + +test('assertProofPayloadMatchesRequestPayload throws when pending request tx data is missing', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + await validator.assertProofPayloadMatchesRequestPayload( + { proof: { block: { value: b4a.alloc(1) }, manifest: {} } }, + { requestTxData: null } + ); + t.fail('expected missing tx data to throw'); + } catch (error) { + t.is(error.resultCode, ResultCode.PENDING_REQUEST_MISSING_TX_DATA); + } +}); + +test('assertProofPayloadMatchesRequestPayload throws PROOF_PAYLOAD_MISMATCH when decoded payloads differ', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + const requestTxData = unsafeEncodeApplyOperation(protobufFixtures.validPartialTransactionOperation); + const responseTxData = unsafeEncodeApplyOperation(protobufFixtures.validPartialTransferOperation); + + overrideFunction(t, Autobase, 'decodeValue', async () => responseTxData); + + try { + await validator.assertProofPayloadMatchesRequestPayload( + { + proof: { + block: { value: b4a.alloc(1) }, + manifest: {} + } + }, + { requestTxData } + ); + t.fail('expected payload mismatch'); + } catch (error) { + t.is(error.resultCode, ResultCode.PROOF_PAYLOAD_MISMATCH); + } +}); + +test('assertProofPayloadMatchesRequestPayload strips validator metadata before comparison', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + const requestPayload = structuredClone(protobufFixtures.validTransactionOperation); + requestPayload.txo.va = null; + requestPayload.txo.vn = null; + requestPayload.txo.vs = null; + const requestTxData = unsafeEncodeApplyOperation(requestPayload); + const responseTxData = unsafeEncodeApplyOperation(protobufFixtures.validTransactionOperation); + const manifest = { signers: [] }; + + overrideFunction(t, Autobase, 'decodeValue', async () => responseTxData); + + const result = await validator.assertProofPayloadMatchesRequestPayload( + { + proof: { + block: { value: b4a.alloc(1) }, + manifest, + } + }, + { requestTxData } + ); + + t.is(result.validatorDecodedTx.type, OperationType.TX); + t.alike(result.manifest, manifest); + + const decodedResponse = unsafeDecodeApplyOperation(responseTxData); + t.alike(result.validatorDecodedTx, decodedResponse); +}); + +test('validateWritingKey throws when writer key is not registered', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + overrideFunction(t, Hypercore, 'key', () => writerKey); + + try { + await validator.validateWritingKey( + {}, + {}, + { + getRegisteredWriterKey: async () => null + } + ); + t.fail('expected validateWritingKey to throw'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_WRITER_KEY_NOT_REGISTERED); + } +}); + +test('validateWritingKey returns writer key and correlated validator address', async t => { + const validator = new V1BroadcastTransactionResponse(config); + const registeredAddress = b4a.alloc(39, 7); + + overrideFunction(t, Hypercore, 'key', () => writerKey); + + const result = await validator.validateWritingKey( + {}, + {}, + { + getRegisteredWriterKey: async writerKeyHex => { + t.is(writerKeyHex, b4a.toString(writerKey, 'hex')); + return registeredAddress; + } + } + ); + + t.ok(b4a.equals(result.writerKeyFromManifest, writerKey)); + t.ok(b4a.equals(result.validatorAddressCorrelatedWithManifest, registeredAddress)); +}); + +test('validateValidatorCorrectness throws VALIDATOR_ADDRESS_MISMATCH when tx va differs from connection-derived address', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + await validator.validateValidatorCorrectness( + { txo: { va: b4a.alloc(remoteAddressBuffer.length, 9) } }, + remotePublicKey, + writerKey, + remoteAddressBuffer, + { getNodeEntry: async () => null } + ); + t.fail('expected address mismatch'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_ADDRESS_MISMATCH); + } +}); + +test('validateValidatorCorrectness throws VALIDATOR_ADDRESS_MISMATCH when tx va differs from manifest-correlated address', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + await validator.validateValidatorCorrectness( + { txo: { va: remoteAddressBuffer } }, + remotePublicKey, + writerKey, + b4a.alloc(remoteAddressBuffer.length, 8), + { getNodeEntry: async () => null } + ); + t.fail('expected address mismatch'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_ADDRESS_MISMATCH); + } +}); + +test('validateValidatorCorrectness throws VALIDATOR_NODE_ENTRY_NOT_FOUND when state has no node entry', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + await validator.validateValidatorCorrectness( + { txo: { va: remoteAddressBuffer } }, + remotePublicKey, + writerKey, + remoteAddressBuffer, + { getNodeEntry: async () => null } + ); + t.fail('expected missing node entry'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_NODE_ENTRY_NOT_FOUND); + } +}); + +test('validateValidatorCorrectness throws VALIDATOR_NODE_NOT_WRITER when node is not a writer', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + await validator.validateValidatorCorrectness( + { txo: { va: remoteAddressBuffer } }, + remotePublicKey, + writerKey, + remoteAddressBuffer, + { + getNodeEntry: async () => ({ + isWriter: false, + wk: writerKey, + }) + } + ); + t.fail('expected node-not-writer'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_NODE_NOT_WRITER); + } +}); + +test('validateValidatorCorrectness throws VALIDATOR_WRITER_KEY_MISMATCH when state writer key differs', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + await validator.validateValidatorCorrectness( + { txo: { va: remoteAddressBuffer } }, + remotePublicKey, + writerKey, + remoteAddressBuffer, + { + getNodeEntry: async () => ({ + isWriter: true, + wk: b4a.alloc(32, 99), + }) + } + ); + t.fail('expected writer-key mismatch'); + } catch (error) { + t.is(error.resultCode, ResultCode.VALIDATOR_WRITER_KEY_MISMATCH); + } +}); + +test('validateValidatorCorrectness passes when validator address and writer key are consistent', async t => { + const validator = new V1BroadcastTransactionResponse(config); + + await validator.validateValidatorCorrectness( + { txo: { va: remoteAddressBuffer } }, + remotePublicKey, + writerKey, + remoteAddressBuffer, + { + getNodeEntry: async () => ({ + isWriter: true, + wk: writerKey, + }) + } + ); + + t.pass(); +}); + +test('validateIfResultCodeIsValidatorInternalError throws only for TX_COMMITTED_RECEIPT_MISSING', t => { + const validator = new V1BroadcastTransactionResponse(config); + + try { + validator.validateIfResultCodeIsValidatorInternalError(ResultCode.TX_COMMITTED_RECEIPT_MISSING); + t.fail('expected internal error result code to throw'); + } catch (error) { + t.is(error.resultCode, ResultCode.TX_COMMITTED_RECEIPT_MISSING); + t.is(error.endConnection, true); + } + + validator.validateIfResultCodeIsValidatorInternalError(ResultCode.OK); + t.pass(); +}); diff --git a/tests/unit/network/v1/V1LivenessRequest.test.js b/tests/unit/network/v1/V1LivenessRequest.test.js new file mode 100644 index 00000000..c43b25ab --- /dev/null +++ b/tests/unit/network/v1/V1LivenessRequest.test.js @@ -0,0 +1,32 @@ +import test from 'brittle'; +import b4a from 'b4a'; + +import V1LivenessRequest from '../../../../src/core/network/protocols/v1/validators/V1LivenessRequest.js'; +import { config } from '../../../helpers/config.js'; +import { errorMessageIncludes } from '../../../helpers/regexHelper.js'; + +test('V1LivenessRequest.validate runs schema and signature validation', async t => { + const validator = new V1LivenessRequest(config); + const calls = []; + + validator.isPayloadSchemaValid = () => calls.push('schema'); + validator.validateSignature = async () => calls.push('signature'); + + const result = await validator.validate({}, b4a.alloc(32, 1)); + + t.is(result, true); + t.alike(calls, ['schema', 'signature']); +}); + +test('V1LivenessRequest.validate propagates signature validation errors', async t => { + const validator = new V1LivenessRequest(config); + validator.isPayloadSchemaValid = () => true; + validator.validateSignature = async () => { + throw new Error('signature failed'); + }; + + await t.exception( + async () => validator.validate({}, b4a.alloc(32, 1)), + errorMessageIncludes('signature failed') + ); +}); diff --git a/tests/unit/network/v1/V1LivenessResponse.test.js b/tests/unit/network/v1/V1LivenessResponse.test.js new file mode 100644 index 00000000..1a3eca55 --- /dev/null +++ b/tests/unit/network/v1/V1LivenessResponse.test.js @@ -0,0 +1,45 @@ +import test from 'brittle'; +import b4a from 'b4a'; + +import V1LivenessResponse from '../../../../src/core/network/protocols/v1/validators/V1LivenessResponse.js'; +import { config } from '../../../helpers/config.js'; +import { errorMessageIncludes } from '../../../helpers/regexHelper.js'; + +test('V1LivenessResponse.validate runs all validation steps in order', async t => { + const validator = new V1LivenessResponse(config); + const calls = []; + + validator.isPayloadSchemaValid = () => calls.push('schema'); + validator.validateResponseType = () => calls.push('responseType'); + validator.validatePeerCorrectness = () => calls.push('peer'); + validator.validateSignature = async () => calls.push('signature'); + + const result = await validator.validate( + {}, + { remotePublicKey: b4a.alloc(32, 1) }, + { requestedTo: b4a.toString(b4a.alloc(32, 1), 'hex') } + ); + + t.is(result, true); + t.alike(calls, ['schema', 'responseType', 'peer', 'signature']); +}); + +test('V1LivenessResponse.validate propagates response-type errors', async t => { + const validator = new V1LivenessResponse(config); + + validator.isPayloadSchemaValid = () => true; + validator.validateResponseType = () => { + throw new Error('invalid response type'); + }; + validator.validatePeerCorrectness = () => true; + validator.validateSignature = async () => true; + + await t.exception( + async () => validator.validate( + {}, + { remotePublicKey: b4a.alloc(32, 1) }, + { requestedTo: b4a.toString(b4a.alloc(32, 1), 'hex') } + ), + errorMessageIncludes('invalid response type') + ); +}); diff --git a/tests/unit/network/v1/V1ResultCode.test.js b/tests/unit/network/v1/V1ResultCode.test.js index 44d890cf..52c328a1 100644 --- a/tests/unit/network/v1/V1ResultCode.test.js +++ b/tests/unit/network/v1/V1ResultCode.test.js @@ -2,25 +2,83 @@ import {test} from 'brittle'; import {ResultCode} from '../../../../src/utils/constants.js'; +const expectedResultCodeMap = Object.freeze({ + UNSPECIFIED: 0, + OK: 1, + INVALID_PAYLOAD: 2, + RATE_LIMITED: 3, + SIGNATURE_INVALID: 4, + UNEXPECTED_ERROR: 5, + TIMEOUT: 6, + NODE_HAS_NO_WRITE_ACCESS: 7, + TX_ACCEPTED_PROOF_UNAVAILABLE: 8, + NODE_OVERLOADED: 9, + TX_ALREADY_PENDING: 10, + OPERATION_TYPE_UNKNOWN: 11, + SCHEMA_VALIDATION_FAILED: 12, + REQUESTER_ADDRESS_INVALID: 13, + REQUESTER_PUBLIC_KEY_INVALID: 14, + TX_HASH_MISMATCH: 15, + TX_SIGNATURE_INVALID: 16, + TX_EXPIRED: 17, + TX_ALREADY_EXISTS: 18, + OPERATION_ALREADY_COMPLETED: 19, + REQUESTER_NOT_FOUND: 20, + INSUFFICIENT_FEE_BALANCE: 21, + EXTERNAL_BOOTSTRAP_EQUALS_MSB_BOOTSTRAP: 22, + SELF_VALIDATION_FORBIDDEN: 23, + ROLE_NODE_ENTRY_NOT_FOUND: 24, + ROLE_NODE_ALREADY_WRITER: 25, + ROLE_NODE_NOT_WHITELISTED: 26, + ROLE_NODE_NOT_WRITER: 27, + ROLE_NODE_IS_INDEXER: 28, + ROLE_ADMIN_ENTRY_MISSING: 29, + ROLE_INVALID_RECOVERY_CASE: 30, + ROLE_UNKNOWN_OPERATION: 31, + ROLE_INVALID_WRITER_KEY: 32, + ROLE_INSUFFICIENT_FEE_BALANCE: 33, + MSB_BOOTSTRAP_MISMATCH: 34, + EXTERNAL_BOOTSTRAP_NOT_DEPLOYED: 35, + EXTERNAL_BOOTSTRAP_TX_MISSING: 36, + EXTERNAL_BOOTSTRAP_MISMATCH: 37, + BOOTSTRAP_ALREADY_EXISTS: 38, + TRANSFER_RECIPIENT_ADDRESS_INVALID: 39, + TRANSFER_RECIPIENT_PUBLIC_KEY_INVALID: 40, + TRANSFER_AMOUNT_TOO_LARGE: 41, + TRANSFER_SENDER_NOT_FOUND: 42, + TRANSFER_INSUFFICIENT_BALANCE: 43, + TRANSFER_RECIPIENT_BALANCE_OVERFLOW: 44, + TX_HASH_INVALID_FORMAT: 45, + INTERNAL_ENQUEUE_VALIDATION_FAILED: 46, + TX_COMMITTED_RECEIPT_MISSING: 47, + VALIDATOR_RESPONSE_TX_TYPE_INVALID: 48, + VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN: 49, + VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED: 50, + VALIDATOR_RESPONSE_SCHEMA_INVALID: 51, + PENDING_REQUEST_MISSING_TX_DATA: 52, + PROOF_PAYLOAD_MISMATCH: 53, + VALIDATOR_WRITER_KEY_NOT_REGISTERED: 54, + VALIDATOR_ADDRESS_MISMATCH: 55, + VALIDATOR_NODE_ENTRY_NOT_FOUND: 56, + VALIDATOR_NODE_NOT_WRITER: 57, + VALIDATOR_WRITER_KEY_MISMATCH: 58, + VALIDATOR_TX_OBJECT_INVALID: 59, + VALIDATOR_VA_MISSING: 60, + TX_INVALID_PAYLOAD: 61 +}); + test('ResultCode values are unique', t => { const values = Object.values(ResultCode); t.is(new Set(values).size, values.length); }); test('ResultCode preserves existing numeric values (append-only)', t => { - t.is(ResultCode.UNSPECIFIED, 0); - t.is(ResultCode.OK, 1); - t.is(ResultCode.INVALID_PAYLOAD, 2); - t.is(ResultCode.RATE_LIMITED, 3); - t.is(ResultCode.SIGNATURE_INVALID, 4); - t.is(ResultCode.UNEXPECTED_ERROR, 5); - t.is(ResultCode.TIMEOUT, 6); - t.is(ResultCode.NODE_HAS_NO_WRITE_ACCESS, 7); - t.is(ResultCode.TX_ACCEPTED_PROOF_UNAVAILABLE, 8); - t.is(ResultCode.NODE_OVERLOADED, 9); - t.is(ResultCode.TX_ALREADY_PENDING, 10); - - t.is(ResultCode.OPERATION_TYPE_UNKNOWN, 11); + t.alike(ResultCode, expectedResultCodeMap); +}); - t.is(ResultCode.TX_INVALID_PAYLOAD, 61); +test('ResultCode does not expose deprecated alias', t => { + t.absent( + Object.prototype.hasOwnProperty.call(ResultCode, 'TX_ACCEPTED_RECEIPT_MISSING'), + 'deprecated alias should not be present' + ); }); diff --git a/tests/unit/network/v1/connectionPolicies.test.js b/tests/unit/network/v1/connectionPolicies.test.js new file mode 100644 index 00000000..ae318894 --- /dev/null +++ b/tests/unit/network/v1/connectionPolicies.test.js @@ -0,0 +1,49 @@ +import { test } from 'brittle'; + +import { resultToValidatorAction, SENDER_ACTION } from '../../../../src/core/network/protocols/connectionPolicies.js'; +import { ResultCode } from '../../../../src/utils/constants.js'; + +test('connectionPolicies maps OK to SUCCESS', t => { + t.is( + resultToValidatorAction(ResultCode.OK), + SENDER_ACTION.SUCCESS + ); +}); + +test('connectionPolicies maps TX_ALREADY_PENDING to NO_ROTATE', t => { + t.is( + resultToValidatorAction(ResultCode.TX_ALREADY_PENDING), + SENDER_ACTION.NO_ROTATE + ); +}); + +test('connectionPolicies maps TX_COMMITTED_RECEIPT_MISSING to ROTATE', t => { + t.is( + resultToValidatorAction(ResultCode.TX_COMMITTED_RECEIPT_MISSING), + SENDER_ACTION.ROTATE + ); +}); + +test('connectionPolicies maps unknown result code to UNDEFINED', t => { + t.is( + resultToValidatorAction(999999), + SENDER_ACTION.UNDEFINED + ); +}); + +test('connectionPolicies maps every ResultCode constant to a concrete sender action', t => { + const unassigned = []; + + for (const [name, code] of Object.entries(ResultCode)) { + const action = resultToValidatorAction(code); + if (action === SENDER_ACTION.UNDEFINED) { + unassigned.push(`${name}(${code})`); + } + } + + t.alike( + unassigned, + [], + `Unassigned ResultCode entries in policy: ${unassigned.join(', ')}` + ); +}); diff --git a/tests/unit/network/v1/handlers/V1BaseOperationHandler.test.js b/tests/unit/network/v1/handlers/V1BaseOperationHandler.test.js index 64d8d5cd..8730cd81 100644 --- a/tests/unit/network/v1/handlers/V1BaseOperationHandler.test.js +++ b/tests/unit/network/v1/handlers/V1BaseOperationHandler.test.js @@ -151,7 +151,7 @@ test('handlePendingResponseError: unknown native error -> maps to V1UnexpectedEr t.absent(ended, 'V1UnexpectedError should not close the connection by default'); }); -test('handlePendingResponseError: protocol error with endConnection=true -> closes connection', async (t) => { +test('handlePendingResponseError: protocol error with endConnection=true -> does not close connection directly', async (t) => { const pendingReq = new MockPendingReqService(); const handler = new V1BaseOperationHandler(null, pendingReq, mockConfig); @@ -178,7 +178,7 @@ test('handlePendingResponseError: protocol error with endConnection=true -> clos 'Should pass protocol error without wrapping' ); - t.ok(ended, 'Should call connection.end() when error dictates it'); + t.absent(ended, 'Connection closing is delegated to ConnectionManager'); }); test('handlePendingResponseError: delegates logging -> calls displayError', async (t) => { diff --git a/tests/unit/network/v1/handlers/V1BroadcastTransactionOperationHandler.test.js b/tests/unit/network/v1/handlers/V1BroadcastTransactionOperationHandler.test.js index 843d13bc..95e68bb8 100644 --- a/tests/unit/network/v1/handlers/V1BroadcastTransactionOperationHandler.test.js +++ b/tests/unit/network/v1/handlers/V1BroadcastTransactionOperationHandler.test.js @@ -18,6 +18,7 @@ import PartialBootstrapDeploymentValidator from '../../../../../src/core/network import PartialTransactionValidator from '../../../../../src/core/network/protocols/shared/validators/PartialTransactionValidator.js'; import PartialTransferValidator from '../../../../../src/core/network/protocols/shared/validators/PartialTransferValidator.js'; import { config as testConfig } from '../../../../helpers/config.js'; +import { errorMessageIncludes } from '../../../../helpers/regexHelper.js'; const VALID_ADDR = 'trac123z3gfpr2epjwww7ntm3m6ud2fhmq0tvts27p2f5mx3qkecsutlqfys769'; const VALID_TO_ADDR = 'trac1mqktwme8fvklrds4hlhfy6lhmsu9qgfn3c3kuhz7c5zwjt8rc3dqj9tx7h'; @@ -147,17 +148,17 @@ test('dispatchTransaction: missing/invalid type -> throws invalid payload error' await t.exception( async () => handler.dispatchTransaction(null), - /Decoded transaction type is missing/ + errorMessageIncludes('Decoded transaction type is missing') ); await t.exception( async () => handler.dispatchTransaction({ type: 0 }), - /Decoded transaction type is missing/ + errorMessageIncludes('Decoded transaction type is missing') ); await t.exception( async () => handler.dispatchTransaction({ type: 999 }), - /Unsupported transaction type/ + errorMessageIncludes('Unsupported transaction type') ); }); @@ -536,7 +537,7 @@ test('Unsupported role access subtype', async t => { rao: roleAccessPayload(), // unsupported operation type }), - /Unsupported transaction type/ + errorMessageIncludes('Unsupported transaction type') ); }); @@ -550,7 +551,7 @@ test('Role access switch default branch', async t => { address: VALID_ADDR, rao: roleAccessPayload() }), - /Unsupported transaction type/ + errorMessageIncludes('Unsupported transaction type') ); }); @@ -573,12 +574,15 @@ test('TransactionPoolMissingCommitReceiptError via receipt branch', async t => { txo: transactionPayload() }); + let capturedResultCode = null; await handler.handleRequest( - { id: b4a.alloc(32), broadcast_transaction_request: { data: b4a.alloc(1) } }, - mockConn() + { id: 'receipt-missing-id', broadcast_transaction_request: { data: b4a.alloc(1) } }, + mockConn(res => { + capturedResultCode = res.broadcast_transaction_response.result; + }) ); - t.pass(); + t.is(capturedResultCode, ResultCode.TX_COMMITTED_RECEIPT_MISSING); }); test('PendingCommitBufferFullError mapping branch', async t => { diff --git a/tests/unit/network/v1/v1.test.js b/tests/unit/network/v1/v1.test.js index 49f5825e..2e4aab0e 100644 --- a/tests/unit/network/v1/v1.test.js +++ b/tests/unit/network/v1/v1.test.js @@ -3,7 +3,12 @@ import { default as test } from 'brittle'; async function runTests() { test.pause(); await import('./v1.handlers.test.js'); + await import('./V1BaseOperation.test.js'); + await import('./V1LivenessRequest.test.js'); + await import('./V1LivenessResponse.test.js'); + await import('./V1BroadcastTransactionRequest.test.js'); await import('./V1ResultCode.test.js'); + await import('./connectionPolicies.test.js'); await import('./V1BroadcastTransactionResponse.test.js'); await import('./V1ValidationSchema.test.js'); await import('./V1BroadcastTransactionOperationHandler.test.js');