Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions proto/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ enum MessageType {
// Warning: Do not change the numeric values of existing enum fields. They are part of the public API.
// New values can be added, but existing values must never be renumbered or deleted.
enum ResultCode {
RESULT_CODE_UNSPECIFIED = 0;
// 0-10: [SCOPE: BASE_TRANSPORT_AND_VALIDATOR_STATUS]
// Generic V1 processing outcome before deeper payload/proof-specific validation branches.
RESULT_CODE_UNSPECIFIED = 0; // We should create an v1Error for this case, sender disconnects from the validator, add optional information to update the node verion.
RESULT_CODE_OK = 1;
RESULT_CODE_INVALID_PAYLOAD = 2;
RESULT_CODE_RATE_LIMITED = 3;
Expand All @@ -25,6 +27,8 @@ enum ResultCode {
RESULT_CODE_TX_ACCEPTED_PROOF_UNAVAILABLE = 8;
RESULT_CODE_NODE_OVERLOADED = 9;
RESULT_CODE_TX_ALREADY_PENDING = 10;
// 11-44: [SCOPE: SHARED_VALIDATORS_REQUEST_PAYLOAD]
// (incoming BROADCAST_TRANSACTION_REQUEST -> partial apply payload validation) - validator disconnects, sender rotates
RESULT_CODE_OPERATION_TYPE_UNKNOWN = 11;
RESULT_CODE_SCHEMA_VALIDATION_FAILED = 12;
RESULT_CODE_REQUESTER_ADDRESS_INVALID = 13;
Expand Down Expand Up @@ -59,9 +63,15 @@ enum ResultCode {
RESULT_CODE_TRANSFER_SENDER_NOT_FOUND = 42;
RESULT_CODE_TRANSFER_INSUFFICIENT_BALANCE = 43;
RESULT_CODE_TRANSFER_RECIPIENT_BALANCE_OVERFLOW = 44;

// 45-47: [SCOPE: REQUEST_DISPATCH_TXPOOL_TXCOMMIT_PIPELINE]
// (dispatch/enqueue/commit handling on validator)
RESULT_CODE_TX_HASH_INVALID_FORMAT = 45;
RESULT_CODE_INTERNAL_ENQUEUE_VALIDATION_FAILED = 46;
RESULT_CODE_TX_ACCEPTED_RECEIPT_MISSING = 47;
RESULT_CODE_TX_COMMITTED_RECEIPT_MISSING = 47;

// 48-60: [SCOPE: VALIDATOR_RESPONSE_PROOF_AND_IDENTITY_VALIDATION]
// (incoming BROADCAST_TRANSACTION_RESPONSE validation on sender)
RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_INVALID = 48;
RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN = 49;
RESULT_CODE_VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED = 50;
Expand Down Expand Up @@ -94,15 +104,15 @@ message BroadcastTransactionRequest {
bytes nonce = 2;
bytes signature = 3;
}

// TODO: before release change appendedAt to appended_at
message BroadcastTransactionResponse {
bytes nonce = 1;
bytes signature = 2;
bytes proof = 3;
uint64 appendedAt = 4;
ResultCode result = 5;
}

// TODO delete ProofData
message ProofData {
bytes proof = 1;
uint64 appendedAt = 2;
Expand Down
18 changes: 18 additions & 0 deletions rpc/rpc_services.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,23 @@ import {
} from "../src/utils/normalizers.js";
import { get_confirmed_tx_info, get_unconfirmed_tx_info } from "../src/utils/cli.js";
import {OperationType} from "../src/utils/constants.js";
import { sleep } from "../src/utils/helpers.js";
import b4a from "b4a";
import PartialTransactionValidator from "../src/core/network/protocols/shared/validators/PartialTransactionValidator.js";
import PartialTransferValidator from "../src/core/network/protocols/shared/validators/PartialTransferValidator.js";

// This was added because V1 is not waiting for signed/unsigned state. So to include reverse compatibility
// we need to slow down v1 to legacy case.
const waitForUnconfirmedTx = async (state, txHash, config) => {
const startedAt = Date.now();
while ((Date.now() - startedAt) < config.messageValidatorResponseTimeout) {
const payload = await state.get(txHash);
if (payload) return true;
await sleep(100);
}
return false;
};

export async function getBalance(msbInstance, address, confirmed) {
const state = msbInstance.state;
const useUnconfirmed = confirmed === false;
Expand Down Expand Up @@ -75,6 +88,11 @@ export async function broadcastTransaction(msbInstance, config, payload) {
throw new Error("Failed to broadcast transaction after multiple attempts.");
}

const isConfirmed = await waitForUnconfirmedTx(msbInstance.state, hash, config);
if (!isConfirmed) {
throw new Error("Failed to broadcast transaction after multiple attempts.");
}

const signedLength = msbInstance.state.getSignedLength();
const unsignedLength = msbInstance.state.getUnsignedLength();

Expand Down
2 changes: 1 addition & 1 deletion src/config/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const configData = {
enableValidatorObserver: true,
enableWallet: true,
maxValidators: 6,
maxRetries: 0,
maxRetries: 3,
messageThreshold: 1000,
messageValidatorRetryDelay: 1000, //How long to wait before retrying (ms) MESSAGE_VALIDATOR_RETRY_DELAY_MS
messageValidatorResponseTimeout: 3 * 3 * 1000, //Overall timeout for sending a message (ms). This is 3 * maxRetries * messageValidatorRetryDelay;
Expand Down
88 changes: 88 additions & 0 deletions src/core/network/protocols/connectionPolicies.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { ResultCode } from '../../../utils/constants.js';

// TODO: Consider how to behave with ResultCode.UNSPECIFIED

export const SENDER_ACTION = Object.freeze({
UNDEFINED: 'UNDEFINED',
SUCCESS: 'SUCCESS',
ROTATE: 'ROTATE',
NO_ROTATE: 'NO_ROTATE',
});

const SUCCESS_CODES = new Set([
ResultCode.OK,
]);

const ROTATE_CODES = new Set([
ResultCode.UNSPECIFIED,
ResultCode.INVALID_PAYLOAD,
ResultCode.RATE_LIMITED,
ResultCode.SIGNATURE_INVALID,
ResultCode.UNEXPECTED_ERROR,
ResultCode.TIMEOUT,
ResultCode.NODE_HAS_NO_WRITE_ACCESS,
ResultCode.TX_ACCEPTED_PROOF_UNAVAILABLE,
ResultCode.NODE_OVERLOADED,
ResultCode.OPERATION_TYPE_UNKNOWN,
ResultCode.SCHEMA_VALIDATION_FAILED,
ResultCode.REQUESTER_ADDRESS_INVALID,
ResultCode.REQUESTER_PUBLIC_KEY_INVALID,
ResultCode.TX_HASH_MISMATCH,
ResultCode.TX_SIGNATURE_INVALID,
ResultCode.TX_EXPIRED,
ResultCode.TX_ALREADY_EXISTS,
ResultCode.OPERATION_ALREADY_COMPLETED,
ResultCode.REQUESTER_NOT_FOUND,
ResultCode.INSUFFICIENT_FEE_BALANCE,
ResultCode.EXTERNAL_BOOTSTRAP_EQUALS_MSB_BOOTSTRAP,
ResultCode.SELF_VALIDATION_FORBIDDEN,
ResultCode.ROLE_NODE_ENTRY_NOT_FOUND,
ResultCode.ROLE_NODE_ALREADY_WRITER,
ResultCode.ROLE_NODE_NOT_WHITELISTED,
ResultCode.ROLE_NODE_NOT_WRITER,
ResultCode.ROLE_NODE_IS_INDEXER,
ResultCode.ROLE_ADMIN_ENTRY_MISSING,
ResultCode.ROLE_INVALID_RECOVERY_CASE,
ResultCode.ROLE_UNKNOWN_OPERATION,
ResultCode.ROLE_INVALID_WRITER_KEY,
ResultCode.ROLE_INSUFFICIENT_FEE_BALANCE,
ResultCode.MSB_BOOTSTRAP_MISMATCH,
ResultCode.EXTERNAL_BOOTSTRAP_NOT_DEPLOYED,
ResultCode.EXTERNAL_BOOTSTRAP_TX_MISSING,
ResultCode.EXTERNAL_BOOTSTRAP_MISMATCH,
ResultCode.BOOTSTRAP_ALREADY_EXISTS,
ResultCode.TRANSFER_RECIPIENT_ADDRESS_INVALID,
ResultCode.TRANSFER_RECIPIENT_PUBLIC_KEY_INVALID,
ResultCode.TRANSFER_AMOUNT_TOO_LARGE,
ResultCode.TRANSFER_SENDER_NOT_FOUND,
ResultCode.TRANSFER_INSUFFICIENT_BALANCE,
ResultCode.TRANSFER_RECIPIENT_BALANCE_OVERFLOW,
ResultCode.TX_HASH_INVALID_FORMAT,
ResultCode.INTERNAL_ENQUEUE_VALIDATION_FAILED,
ResultCode.TX_COMMITTED_RECEIPT_MISSING,
ResultCode.VALIDATOR_RESPONSE_TX_TYPE_INVALID,
ResultCode.VALIDATOR_RESPONSE_TX_TYPE_UNKNOWN,
ResultCode.VALIDATOR_RESPONSE_TX_TYPE_UNSUPPORTED,
ResultCode.VALIDATOR_RESPONSE_SCHEMA_INVALID,
ResultCode.PENDING_REQUEST_MISSING_TX_DATA,
ResultCode.PROOF_PAYLOAD_MISMATCH,
ResultCode.VALIDATOR_WRITER_KEY_NOT_REGISTERED,
ResultCode.VALIDATOR_ADDRESS_MISMATCH,
ResultCode.VALIDATOR_NODE_ENTRY_NOT_FOUND,
ResultCode.VALIDATOR_NODE_NOT_WRITER,
ResultCode.VALIDATOR_WRITER_KEY_MISMATCH,
ResultCode.VALIDATOR_TX_OBJECT_INVALID,
ResultCode.VALIDATOR_VA_MISSING,
ResultCode.TX_INVALID_PAYLOAD
]);

const NO_ROTATE_CODES = new Set([
ResultCode.TX_ALREADY_PENDING,
]);

export function resultToValidatorAction(resultCode) {
if (SUCCESS_CODES.has(resultCode)) return SENDER_ACTION.SUCCESS;
if (ROTATE_CODES.has(resultCode)) return SENDER_ACTION.ROTATE;
if (NO_ROTATE_CODES.has(resultCode)) return SENDER_ACTION.NO_ROTATE;
return SENDER_ACTION.UNDEFINED;
}
1 change: 0 additions & 1 deletion src/core/network/protocols/v1/NetworkMessageRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class NetworkMessageRouterV1 {

#preValidate(incomingMessage) {
return !(!incomingMessage || !b4a.isBuffer(incomingMessage) || incomingMessage.length === 0 || incomingMessage.length > V1_PROTOCOL_PAYLOAD_MAX_SIZE);

}

#disconnect(connection, reason) {
Expand Down
8 changes: 4 additions & 4 deletions src/core/network/protocols/v1/V1ProtocolError.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class V1TxInvalidPayloadError extends V1ProtocolError {
}

export class V1SignatureInvalidError extends V1ProtocolError {
constructor(message = 'Signature invalid', endConnection = false) {
constructor(message = 'Signature invalid', endConnection = true) {
super(ResultCode.SIGNATURE_INVALID, message, endConnection);
}
}
Expand All @@ -60,7 +60,7 @@ export class V1UnexpectedError extends V1ProtocolError {
}

export class V1TimeoutError extends V1ProtocolError {
constructor(message = 'Request timed out', endConnection = true) {
constructor(message = 'Request timed out', endConnection = false) {
super(ResultCode.TIMEOUT, message, endConnection);
}
}
Expand All @@ -72,7 +72,7 @@ export class V1NodeHasNoWriteAccess extends V1ProtocolError {
}

export class V1TxAcceptedProofUnavailable extends V1ProtocolError {
constructor(message = 'Transaction accepted but proof is unavailable', endConnection = true, appendedAt = 0) {
constructor(message = 'Transaction accepted but proof is unavailable', endConnection = false, appendedAt = 0) {
super(ResultCode.TX_ACCEPTED_PROOF_UNAVAILABLE, message, endConnection);
this.appendedAt = Number.isSafeInteger(appendedAt) && appendedAt > 0 ? appendedAt : 0;
}
Expand All @@ -85,7 +85,7 @@ export class V1NodeOverloadedError extends V1ProtocolError {
}

export class V1TxAlreadyPendingError extends V1ProtocolError {
constructor(message = 'Transaction is already pending', endConnection = true) {
constructor(message = 'Transaction is already pending', endConnection = false) {
super(ResultCode.TX_ALREADY_PENDING, message, endConnection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class V1BaseOperationHandler {
const protocolError = this.#toProtocolError(error);
const rejected = this.#pendingRequestService.rejectPendingRequest(messageId, protocolError);
if (!rejected) return;
if (shouldEndConnection(protocolError)) connection.end();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if this line disappeared. Cant you just remove the flag from the Error? In general, that is also not great in terms of design. Exceptions (or things that throw) are done in a pub-sub design. That means that the emitter should work as broadcast and the observer (subscriber) should know how to handle those accordingly. If you pass a behavior flag, you are making it so the emitter know the behavior of the subscriber beforehand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could maybe check the result codes here and decide whether to close the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so the plan is to create a new module that defines the behavior based on V1ProtocolError, in a similar way to ResultCodePolicy.js It would be even better to rename that file to ResultCodesPolicies.js.

In that module, we would have:

  • a sender connection handling policy (Success, Rotate, NoRotate)
  • a validator connection handling policy (the ResultCodes that should cause us to close the validator connection).

It will have one big benefit - centralizing all policies into a single file.

@leonardotc @jusufsuljic @leonardostsouza I’m going to merge the current PR as is, and then create a followup issue to refactor it in this direction.

this.displayError(step, connection.remotePublicKey, error);
}

Expand All @@ -50,7 +49,7 @@ class V1BaseOperationHandler {
if (error instanceof V1ProtocolError) {
return error;
}
return new V1UnexpectedError(error?.message ?? 'Unexpected error', false);
return new V1UnexpectedError(error?.message ?? 'Unexpected error');
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,15 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler {
appendedAt
);
} catch (error) {
throw new V1UnexpectedError(`Failed to build broadcast transaction response: ${error.message}`, true);
throw new V1UnexpectedError(`Failed to build broadcast transaction response: ${error.message}`);
}
}

decodeApplyOperation(message) {
try {
return unsafeDecodeApplyOperation(message);
} catch (error) {
throw new V1UnexpectedError(`Failed to decode apply operation from message: ${error.message}`, true);
throw new V1UnexpectedError(`Failed to decode apply operation from message: ${error.message}`);
}
}

Expand Down Expand Up @@ -217,7 +217,7 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler {
throw new V1TxInvalidPayloadError('Decoded transaction type is missing.', false);
}
if (!this.#transactionCommitService) {
throw new V1UnexpectedError('TransactionCommitService is not configured.', true);
throw new V1UnexpectedError('TransactionCommitService is not configured.');
}

const type = decodedTransaction.type;
Expand Down Expand Up @@ -252,10 +252,10 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler {
throw new V1ProtocolError(ResultCode.TX_HASH_INVALID_FORMAT, error.message, false);
}
if (error instanceof PendingCommitAlreadyExistsError) {
throw new V1TxAlreadyPendingError(error.message, false);
throw new V1TxAlreadyPendingError(error.message);
}
if (error instanceof PendingCommitBufferFullError) {
throw new V1NodeOverloadedError(error.message, false);
throw new V1NodeOverloadedError(error.message);
}
throw error;
}
Expand Down Expand Up @@ -287,10 +287,10 @@ class V1BroadcastTransactionOperationHandler extends V1BaseOperationHandler {
throw new V1TxAcceptedProofUnavailable(error.message, false, error.appendedAt);
}
if (error instanceof TransactionPoolMissingCommitReceiptError) {
throw new V1ProtocolError(ResultCode.TX_ACCEPTED_RECEIPT_MISSING, error.message, false);
throw new V1ProtocolError(ResultCode.TX_COMMITTED_RECEIPT_MISSING, error.message, false);
}
if (error instanceof PendingCommitTimeoutError) {
throw new V1TimeoutError(error.message, false);
throw new V1TimeoutError(error.message);
}
throw error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { networkMessageFactory } from "../../../../../messages/network/v1/networ
import { NETWORK_CAPABILITIES, ResultCode } from "../../../../../utils/constants.js";
import V1LivenessRequest from "../validators/V1LivenessRequest.js";
import {getResultCode, shouldEndConnection, V1UnexpectedError} from "../V1ProtocolError.js";
import { publicKeyToAddress, sleep } from "../../../../../utils/helpers.js";
import V1LivenessResponse from "../validators/V1LivenessResponse.js";
import V1BaseOperationHandler from "./V1BaseOperationHandler.js";

Expand Down Expand Up @@ -60,7 +59,7 @@ class V1LivenessOperationHandler extends V1BaseOperationHandler {
message.id,
connection,
error,
"failed to process liveness response from sender"
"Failed to process liveness response from sender"
);
}
}
Expand All @@ -73,7 +72,7 @@ class V1LivenessOperationHandler extends V1BaseOperationHandler {
resultCode
);
} catch (error) {
throw new V1UnexpectedError(`Failed to build liveness response: ${error.message}`, true);
throw new V1UnexpectedError(`Failed to build liveness response: ${error.message}`);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,27 @@ class V1BroadcastTransactionResponse extends V1BaseOperation {
await this.validateSignature(payload, connection.remotePublicKey);
const resultCode = payload.broadcast_transaction_response.result;
// if result code is not OK, we can skip the rest of the validations.
if (resultCode !== ResultCode.OK) {
return true;
this.validateIfResultCodeIsValidatorInternalError(resultCode);
if (resultCode === ResultCode.OK) {
const proofResult = await this.verifyProofOfPublication(payload, stateInstance);
const {
validatorDecodedTx,
manifest
} = await this.assertProofPayloadMatchesRequestPayload(proofResult, pendingRequestServiceEntry);
this.validateDecodedCompletePayloadSchema(validatorDecodedTx);
const {
writerKeyFromManifest,
validatorAddressCorrelatedWithManifest
} = await this.validateWritingKey(validatorDecodedTx, manifest, stateInstance);
await this.validateValidatorCorrectness(
validatorDecodedTx,
connection.remotePublicKey,
writerKeyFromManifest,
validatorAddressCorrelatedWithManifest,
stateInstance,
);
}
const proofResult = await this.verifyProofOfPublication(payload, stateInstance);
const {
validatorDecodedTx,
manifest
} = await this.assertProofPayloadMatchesRequestPayload(proofResult, pendingRequestServiceEntry);
this.validateDecodedCompletePayloadSchema(validatorDecodedTx);
const {
writerKeyFromManifest,
validatorAddressCorrelatedWithManifest
} = await this.validateWritingKey(validatorDecodedTx, manifest, stateInstance);
await this.validateValidatorCorrectness(
validatorDecodedTx,
connection.remotePublicKey,
writerKeyFromManifest,
validatorAddressCorrelatedWithManifest,
stateInstance,
);

return true;
}

Expand Down Expand Up @@ -213,6 +214,16 @@ class V1BroadcastTransactionResponse extends V1BaseOperation {
);
}
}

validateIfResultCodeIsValidatorInternalError(resultCode) {
if (resultCode === ResultCode.TX_COMMITTED_RECEIPT_MISSING) {
throw new V1ProtocolError(
resultCode,
`Validator response indicates an error with result code ${resultCode}, which is an internal error code.`,
true
)
}
}
}

const stripValidatorMetadata = (value) => {
Expand Down
Loading
Loading