Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 88 additions & 65 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,48 +147,50 @@ import (
// For a node running as a standalone process, the config fields will be populated from the command line params,
// while for a node running as a library, the config fields are expected to be initialized by the caller.
type AccessNodeConfig struct {
supportsObserver bool // True if this is an Access node that supports observers and consensus follower engines
pingEnabled bool
nodeInfoFile string
apiRatelimits map[string]int
apiBurstlimits map[string]int
rpcConf rpc.Config
stateStreamConf statestreambackend.Config
stateStreamFilterConf map[string]int
ExecutionNodeAddress string // deprecated
HistoricalAccessRPCs []access.AccessAPIClient
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
publicNetworkExecutionDataEnabled bool
executionDataDBMode string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
executionDataPruningInterval time.Duration
executionDataDir string
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
executionDataIndexingEnabled bool
registersDBPath string
checkpointFile string
scriptExecutorConfig query.QueryConfig
scriptExecMinBlock uint64
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
checkPayerBalanceMode string
versionControlEnabled bool
storeTxResultErrorMessages bool
stopControlEnabled bool
registerDBPruneThreshold uint64
scheduledCallbacksEnabled bool
supportsObserver bool // True if this is an Access node that supports observers and consensus follower engines
pingEnabled bool
nodeInfoFile string
apiRatelimits map[string]int
apiBurstlimits map[string]int
rpcConf rpc.Config
stateStreamConf statestreambackend.Config
stateStreamFilterConf map[string]int
ExecutionNodeAddress string // deprecated
HistoricalAccessRPCs []access.AccessAPIClient
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
logTxTimeToSealed bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
publicNetworkExecutionDataEnabled bool
executionDataDBMode string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
executionDataPruningInterval time.Duration
executionDataDir string
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
executionDataIndexingEnabled bool
registersDBPath string
checkpointFile string
scriptExecutorConfig query.QueryConfig
scriptExecMinBlock uint64
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
checkPayerBalanceMode string
versionControlEnabled bool
storeTxResultErrorMessages bool
stopControlEnabled bool
registerDBPruneThreshold uint64
scheduledCallbacksEnabled bool
executionResultAgreeingExecutorsCount uint
executionResultRequiredExecutors []string
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -280,25 +282,27 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
RetryDelay: edrequester.DefaultRetryDelay,
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
},
executionDataIndexingEnabled: false,
executionDataDBMode: execution_data.ExecutionDataDBModePebble.String(),
executionDataPrunerHeightRangeTarget: 0,
executionDataPrunerThreshold: pruner.DefaultThreshold,
executionDataPruningInterval: pruner.DefaultPruningInterval,
registersDBPath: filepath.Join(homedir, ".flow", "execution_state"),
checkpointFile: cmd.NotSet,
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
checkPayerBalanceMode: txvalidator.Disabled.String(),
versionControlEnabled: true,
storeTxResultErrorMessages: false,
stopControlEnabled: false,
registerDBPruneThreshold: 0,
scheduledCallbacksEnabled: fvm.DefaultScheduledCallbacksEnabled,
executionDataIndexingEnabled: false,
executionDataDBMode: execution_data.ExecutionDataDBModePebble.String(),
executionDataPrunerHeightRangeTarget: 0,
executionDataPrunerThreshold: pruner.DefaultThreshold,
executionDataPruningInterval: pruner.DefaultPruningInterval,
registersDBPath: filepath.Join(homedir, ".flow", "execution_state"),
checkpointFile: cmd.NotSet,
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
checkPayerBalanceMode: txvalidator.Disabled.String(),
versionControlEnabled: true,
storeTxResultErrorMessages: false,
stopControlEnabled: false,
registerDBPruneThreshold: 0,
scheduledCallbacksEnabled: fvm.DefaultScheduledCallbacksEnabled,
executionResultAgreeingExecutorsCount: optimistic_sync.DefaultCriteria.AgreeingExecutorsCount,
executionResultRequiredExecutors: optimistic_sync.DefaultCriteria.RequiredExecutors.Strings(),
}
}

Expand Down Expand Up @@ -1436,6 +1440,14 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"store-tx-result-error-messages",
defaultConfig.storeTxResultErrorMessages,
"whether to enable storing transaction error messages into the db")
flags.UintVar(&builder.executionResultAgreeingExecutorsCount,
"execution-result-agreeing-executors-count",
defaultConfig.executionResultAgreeingExecutorsCount,
"minimum number of execution receipts with the same result required for execution result queries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a validation to make sure the provided number is greater than or equal to 1? That should go in the ValidateFlags function

flags.StringSliceVar(&builder.executionResultRequiredExecutors,
"execution-result-required-executors",
defaultConfig.executionResultRequiredExecutors,
"comma separated list of execution node IDs, one of which must have produced the execution result")
// Script Execution
flags.StringVar(&builder.rpcConf.BackendConfig.ScriptExecutionMode,
"script-execution-mode",
Expand Down Expand Up @@ -1603,6 +1615,9 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
if builder.rpcConf.BackendConfig.ExecutionConfig.MaxResponseMsgSize <= 0 {
return errors.New("rpc-max-execution-response-message-size must be greater than 0")
}
if builder.executionResultAgreeingExecutorsCount <= 0 {
return errors.New("execution-result-agreeing-executors-count must be greater than 0")
}

return nil
})
Expand Down Expand Up @@ -2087,12 +2102,21 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

preferredENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN list: %w", err)
}

fixedENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN list: %w", err)
}

requiredENIdentifiers, err := flow.IdentifierListFromHex(builder.executionResultRequiredExecutors)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for required EN list: %w", err)
}
operatorCriteria := optimistic_sync.Criteria{
AgreeingExecutorsCount: builder.executionResultAgreeingExecutorsCount,
RequiredExecutors: requiredENIdentifiers,
}

builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider(
Expand Down Expand Up @@ -2123,7 +2147,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.State,
node.Storage.Receipts,
execNodeSelector,
optimistic_sync.DefaultCriteria,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultCriteria is also passed to the backend params as the OperatorCriteria field. I don't think we need that field anymore. can you remove it from the params struct?

operatorCriteria,
)

// TODO: use real objects instead of mocks once they're implemented
Expand Down Expand Up @@ -2179,7 +2203,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
TxErrorMessageProvider: notNil(builder.txResultErrorMessageProvider),
ExecutionResultInfoProvider: execResultInfoProvider,
ExecutionStateCache: execStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
MaxScriptAndArgumentSize: config.BackendConfig.AccessConfig.MaxRequestMsgSize,
ScheduledCallbacksEnabled: builder.scheduledCallbacksEnabled,
})
Expand Down
1 change: 0 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
MaxScriptAndArgumentSize: config.BackendConfig.AccessConfig.MaxRequestMsgSize,
ExecutionResultInfoProvider: execResultInfoProvider,
ExecutionStateCache: execStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
}

if builder.localServiceAPIEnabled {
Expand Down
6 changes: 0 additions & 6 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/mempool/stdmap"
Expand Down Expand Up @@ -191,7 +190,6 @@ func (suite *Suite) RunTest(
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
MaxScriptAndArgumentSize: commonrpc.DefaultAccessMaxRequestSize,
})
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -364,7 +362,6 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
MaxScriptAndArgumentSize: commonrpc.DefaultAccessMaxRequestSize,
})
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -715,7 +712,6 @@ func (suite *Suite) TestGetSealedTransaction() {
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
MaxScriptAndArgumentSize: commonrpc.DefaultAccessMaxRequestSize,
})
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -936,7 +932,6 @@ func (suite *Suite) TestGetTransactionResult() {
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
MaxScriptAndArgumentSize: commonrpc.DefaultAccessMaxRequestSize,
})
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -1194,7 +1189,6 @@ func (suite *Suite) TestExecuteScript() {
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
MaxScriptAndArgumentSize: commonrpc.DefaultAccessMaxRequestSize,
})
require.NoError(suite.T(), err)
Expand Down
2 changes: 0 additions & 2 deletions engine/access/handle_irrecoverable_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
Expand Down Expand Up @@ -170,7 +169,6 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
})
suite.Require().NoError(err)

Expand Down
2 changes: 0 additions & 2 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
Expand Down Expand Up @@ -215,7 +214,6 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
})
require.NoError(suite.T(), err)

Expand Down
2 changes: 0 additions & 2 deletions engine/access/rest_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
Expand Down Expand Up @@ -196,7 +195,6 @@ func (suite *RestAPITestSuite) SetupTest() {
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
})
require.NoError(suite.T(), err)

Expand Down
1 change: 0 additions & 1 deletion engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ type Params struct {

ExecutionResultInfoProvider optimistic_sync.ExecutionResultInfoProvider
ExecutionStateCache optimistic_sync.ExecutionStateCache
OperatorCriteria optimistic_sync.Criteria
ScheduledCallbacksEnabled bool
}

Expand Down
2 changes: 0 additions & 2 deletions engine/access/rpc/backend/backend_stream_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/access/subscription/tracker"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/metrics"
protocol "github.com/onflow/flow-go/state/protocol/mock"
Expand Down Expand Up @@ -175,7 +174,6 @@ func (s *BackendBlocksSuite) backendParams(broadcaster *engine.Broadcaster) Para
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: s.executionResultInfoProvider,
ExecutionStateCache: s.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
}
}

Expand Down
2 changes: 0 additions & 2 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -2107,7 +2106,6 @@ func (suite *Suite) defaultBackendParams() Params {
),
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
}
}

Expand Down
2 changes: 0 additions & 2 deletions engine/access/rpc/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
Expand Down Expand Up @@ -192,7 +191,6 @@ func (suite *RateLimitTestSuite) SetupTest() {
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
})
suite.Require().NoError(err)

Expand Down
2 changes: 0 additions & 2 deletions engine/access/secure_grpcr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync"
osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
Expand Down Expand Up @@ -172,7 +171,6 @@ func (suite *SecureGRPCTestSuite) SetupTest() {
TxResultQueryMode: query_mode.IndexQueryModeExecutionNodesOnly,
ExecutionResultInfoProvider: suite.executionResultInfoProvider,
ExecutionStateCache: suite.executionStateCache,
OperatorCriteria: optimistic_sync.DefaultCriteria,
})
suite.Require().NoError(err)

Expand Down
Loading