Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
1ba2e3b
added missing components to state stream backend, added execution dat…
AndriiDiachuk Sep 30, 2025
05acbf1
Update cmd/access/node_builder/access_node_builder.go
AndriiDiachuk Oct 2, 2025
cdbb793
Update cmd/observer/node_builder/observer_builder.go
AndriiDiachuk Oct 2, 2025
fab2478
Update cmd/observer/node_builder/observer_builder.go
AndriiDiachuk Oct 2, 2025
20e6f6b
Update cmd/access/node_builder/access_node_builder.go
AndriiDiachuk Oct 2, 2025
0d88fb7
Linted
AndriiDiachuk Oct 2, 2025
c411bb3
Added check if GetIncludeExecutorMetadata is true for GetExecutionDat…
AndriiDiachuk Oct 2, 2025
eacc8e2
Added check for errNot found
AndriiDiachuk Oct 2, 2025
50cb0c0
Added BlockExecutionData to the Snapshot interface, changed implement…
AndriiDiachuk Oct 2, 2025
1152695
Merged conflicts
AndriiDiachuk Oct 6, 2025
af2c148
Added new interface for execution data reading, fixed corresponding p…
AndriiDiachuk Oct 7, 2025
5bb7569
Fixed executionData tests
AndriiDiachuk Oct 8, 2025
d4b4830
Fixed TestGetExecutionDataByBlockID in handler tests
AndriiDiachuk Oct 8, 2025
c4cd2ce
Added checks if events storage is not empty
AndriiDiachuk Oct 9, 2025
9bafca1
Added check to see if CI will fail
AndriiDiachuk Oct 10, 2025
ad1c42c
Added check for builder.events
AndriiDiachuk Oct 10, 2025
4ce817c
Moved events init above calling snapshot
AndriiDiachuk Oct 10, 2025
96b8a84
Initing events ion separate module
AndriiDiachuk Oct 10, 2025
3293f9c
Moved back events storage init
AndriiDiachuk Oct 10, 2025
424380f
Removed check if Events are non empty
AndriiDiachuk Oct 10, 2025
6dd50bc
Removed not nil checks foir new snapshot mock
AndriiDiachuk Oct 10, 2025
ee10ca1
Added again storage event init at the top of the build function
AndriiDiachuk Oct 10, 2025
6b23d30
Added check for notNil for events
AndriiDiachuk Oct 11, 2025
beb8d0c
Removed not nol check as one of the tests expects events to be nil
AndriiDiachuk Oct 11, 2025
c1da4d2
Added error handilng to the test
AndriiDiachuk Oct 14, 2025
dbc1972
Linted
AndriiDiachuk Oct 14, 2025
d0dd55e
Removed commented code
AndriiDiachuk Oct 14, 2025
6964e5e
Linted again
AndriiDiachuk Oct 14, 2025
78a76e8
Fixed comments regarding to errors in the GetExecutionDataByBlockID
AndriiDiachuk Oct 15, 2025
4bdd5df
Added additional checks in the tests
AndriiDiachuk Oct 15, 2025
742c0ce
Extended checks for handler test< added godoc
AndriiDiachuk Oct 15, 2025
1ee4ba4
Removed notNil check for ExecutionStateCache
AndriiDiachuk Oct 15, 2025
e730298
Removed other notNil check for executionStateCache
AndriiDiachuk Oct 15, 2025
257f514
Moved init of executionResultInfoProvider to the top of the Build fun…
AndriiDiachuk Oct 16, 2025
7ad32c5
Moved init of executionResultInfoProvider to the top of the Build fun…
AndriiDiachuk Oct 16, 2025
c8e7038
Added not nil checks for the execution result info provider
AndriiDiachuk Oct 16, 2025
2dba2af
Created separate function for result info provider
AndriiDiachuk Oct 16, 2025
616324c
Fixed mocked values, added error handling for missing error
AndriiDiachuk Oct 17, 2025
8a293ef
Removed separate config for the optimistic_sync package
AndriiDiachuk Oct 17, 2025
dc1e1a8
Fixed init of NewSnapshotMock in observer builder
AndriiDiachuk Oct 17, 2025
5adc47d
Added not nil check for events in NewSnapshotMock
AndriiDiachuk Oct 17, 2025
44fca74
Changed check for builder events
AndriiDiachuk Oct 17, 2025
b14cd1c
Added check for the collections storage
AndriiDiachuk Oct 17, 2025
a44b079
added check for nil for transactions and executionDataStoreCache
AndriiDiachuk Oct 17, 2025
b86afba
Moved lightTransactionResults and transactionResultErrorMessages init…
AndriiDiachuk Oct 17, 2025
beff0ad
Fixed notNil function
AndriiDiachuk Oct 20, 2025
bf29402
Changed impl of notNil
AndriiDiachuk Oct 20, 2025
7c4062a
Added function to check storages
AndriiDiachuk Oct 20, 2025
fc9ef3c
Added godoc, and refactored one test
AndriiDiachuk Oct 21, 2025
386498a
Added more test cases, added godoc woth expected errors
AndriiDiachuk Oct 27, 2025
47bbf72
Removed empty line
AndriiDiachuk Oct 27, 2025
29cbe4d
Merged both builders
AndriiDiachuk Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ packages:
github.com/onflow/flow-go/module/execution:
github.com/onflow/flow-go/module/executiondatasync/execution_data:
github.com/onflow/flow-go/module/executiondatasync/optimistic_sync:
config:
all: False
interfaces:
Core:
github.com/onflow/flow-go/module/executiondatasync/tracker:
github.com/onflow/flow-go/module/forest:
github.com/onflow/flow-go/module/mempool:
Expand Down
161 changes: 108 additions & 53 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,10 @@ type FlowAccessNodeBuilder struct {
unsecureGrpcServer *grpcserver.GrpcServer
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend
stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend
executionResultInfoProvider optimistic_sync.ExecutionResultInfoProvider
executionStateCache optimistic_sync.ExecutionStateCache

ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
Expand Down Expand Up @@ -654,6 +656,39 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

return nil
}).
Module("execution state cache", func(node *cmd.NodeConfig) error {

// check if all the storages are initialized before passing them to Snapshot
if builder.events == nil {
return fmt.Errorf("events store not initialized: ensure 'events storage' module runs before 'execution state cache'")
}
if builder.collections == nil {
return fmt.Errorf("collections store not initialized: ensure 'collections storage' module runs before 'execution state cache'")
}
if builder.transactions == nil {
return fmt.Errorf("transactions store not initialized: ensure 'transactions storage' module runs before 'execution state cache'")
}
if builder.lightTransactionResults == nil {
return fmt.Errorf("lightTransactionResults store not initialized: ensure 'lightTransactionResults storage' module runs before 'execution state cache'")
}
if builder.transactionResultErrorMessages == nil {
return fmt.Errorf("transactionResultErrorMessages store not initialized: ensure 'transactionResultErrorMessages storage' module runs before 'execution state cache'")
}

// TODO: use real objects instead of mocks once they're implemented
snapshot := osyncsnapshot.NewSnapshotMock(
builder.events,
builder.collections,
builder.transactions,
builder.lightTransactionResults,
builder.transactionResultErrorMessages,
nil,
notNil(executionDataStoreCache),
)
builder.executionStateCache = execution_state.NewExecutionStateCacheMock(snapshot)

return nil
}).
Component("execution data service", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
opts := []network.BlobServiceOption{
blob.WithBitswapOptions(
Expand Down Expand Up @@ -858,10 +893,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
indexedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
return nil
}).
DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// Note: using a DependableComponent here to ensure that the indexer does not block
// other components from starting while bootstrapping the register db since it may
Expand Down Expand Up @@ -1088,6 +1119,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.stateStreamConf.ClientSendBufferSize,
),
executionDataTracker,
notNil(builder.executionResultInfoProvider),
builder.executionStateCache, // might be nil
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
Expand Down Expand Up @@ -1729,10 +1762,76 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
})
}

// buildStoragesData registers modules that initialize core read-only stores used by
// the access node (events, lightTransactionResults, and transactionResultErrorMessages).
func (builder *FlowAccessNodeBuilder) buildStoragesData() *FlowAccessNodeBuilder {
builder.Module("events storage", func(node *cmd.NodeConfig) error {
builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
})
builder.Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
return nil
})
builder.Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
return nil
})

return builder
}

// buildExecutionResultInfoProvider registers a module that wires the
// optimistic_sync.ExecutionResultInfoProvider on the builder.
func (builder *FlowAccessNodeBuilder) buildExecutionResultInfoProvider() *FlowAccessNodeBuilder {
builder.Module("execution result info provider", func(node *cmd.NodeConfig) error {
backendConfig := builder.rpcConf.BackendConfig

preferredENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

requiredENIdentifiers, err := flow.IdentifierListFromHex(builder.executionResultRequiredExecutors)
if err != nil {
return fmt.Errorf("failed to convert node id string to Flow Identifier for required EN list: %w", err)
}

execNodeSelector := execution_result.NewExecutionNodeSelector(
preferredENIdentifiers,
fixedENIdentifiers,
)

operatorCriteria := optimistic_sync.Criteria{
AgreeingExecutorsCount: builder.executionResultAgreeingExecutorsCount,
RequiredExecutors: requiredENIdentifiers,
}

builder.executionResultInfoProvider = execution_result.NewExecutionResultInfoProvider(
node.Logger,
node.State,
node.Storage.Receipts,
execNodeSelector,
operatorCriteria,
)

return nil
})
return builder
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedFinalizedBlockHeight storage.ConsumerProgressInitializer
var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer

builder.buildStoragesData()
builder.buildExecutionResultInfoProvider()

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
}
Expand Down Expand Up @@ -1939,10 +2038,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.RegistersAsyncStore = execution.NewRegistersAsyncStore()
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB)
return nil
}).
Module("reporter", func(node *cmd.NodeConfig) error {
builder.Reporter = index.NewReporter()
return nil
Expand Down Expand Up @@ -1974,13 +2069,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return nil
}).
Module("transaction result error messages storage", func(node *cmd.NodeConfig) error {
if builder.storeTxResultErrorMessages {
builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize)
}

return nil
}).
Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if !builder.versionControlEnabled {
noop := &module.NoopReadyDoneAware{}
Expand Down Expand Up @@ -2110,15 +2198,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN list: %w", err)
}

requiredENIdentifiers, err := flow.IdentifierListFromHex(builder.executionResultRequiredExecutors)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for required EN list: %w", err)
}
operatorCriteria := optimistic_sync.Criteria{
AgreeingExecutorsCount: builder.executionResultAgreeingExecutorsCount,
RequiredExecutors: requiredENIdentifiers,
}

builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider(
node.Logger,
node.State,
Expand All @@ -2137,30 +2216,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
notNil(builder.ExecNodeIdentitiesProvider),
)

execNodeSelector := execution_result.NewExecutionNodeSelector(
preferredENIdentifiers,
fixedENIdentifiers,
)

execResultInfoProvider := execution_result.NewExecutionResultInfoProvider(
node.Logger,
node.State,
node.Storage.Receipts,
execNodeSelector,
operatorCriteria,
)

// TODO: use real objects instead of mocks once they're implemented
snapshot := osyncsnapshot.NewSnapshotMock(
builder.events,
builder.collections,
builder.transactions,
builder.lightTransactionResults,
builder.transactionResultErrorMessages,
nil,
)
execStateCache := execution_state.NewExecutionStateCacheMock(snapshot)

builder.nodeBackend, err = backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC, // might be nil
Expand Down Expand Up @@ -2201,8 +2256,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
VersionControl: notNil(builder.VersionControl),
ExecNodeIdentitiesProvider: notNil(builder.ExecNodeIdentitiesProvider),
TxErrorMessageProvider: notNil(builder.txResultErrorMessageProvider),
ExecutionResultInfoProvider: execResultInfoProvider,
ExecutionStateCache: execStateCache,
ExecutionResultInfoProvider: notNil(builder.executionResultInfoProvider),
ExecutionStateCache: builder.executionStateCache, // might be nil
MaxScriptAndArgumentSize: config.BackendConfig.AccessConfig.MaxRequestMsgSize,
ScheduledCallbacksEnabled: builder.scheduledCallbacksEnabled,
})
Expand Down Expand Up @@ -2534,7 +2589,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
// notNil ensures that the input is not nil and returns it
// the usage is to ensure the dependencies are initialized before initializing a module.
// for instance, the IngestionEngine depends on storage.Collections, which is initialized in a
// different function, so we need to ensure that the storage.Collections is initialized before
// different function, so we need to ensure that the storage.Collections were initialized before
// creating the IngestionEngine.
func notNil[T any](dep T) T {
if any(dep) == nil {
Expand Down
Loading