diff --git a/.mockery.yaml b/.mockery.yaml index a99b7c9e56d..bc58015f196 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -71,10 +71,6 @@ packages: github.com/onflow/flow-go/module/execution: github.com/onflow/flow-go/module/executiondatasync/execution_data: github.com/onflow/flow-go/module/executiondatasync/optimistic_sync: - config: - all: False - interfaces: - Core: github.com/onflow/flow-go/module/executiondatasync/tracker: github.com/onflow/flow-go/module/forest: github.com/onflow/flow-go/module/mempool: diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index a19746eae68..68850f4cd11 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -379,8 +379,10 @@ type FlowAccessNodeBuilder struct { unsecureGrpcServer *grpcserver.GrpcServer stateStreamGrpcServer *grpcserver.GrpcServer - stateStreamBackend *statestreambackend.StateStreamBackend - nodeBackend *backend.Backend + stateStreamBackend *statestreambackend.StateStreamBackend + nodeBackend *backend.Backend + executionResultInfoProvider optimistic_sync.ExecutionResultInfoProvider + executionStateCache optimistic_sync.ExecutionStateCache ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore @@ -654,6 +656,39 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return nil }). + Module("execution state cache", func(node *cmd.NodeConfig) error { + + // check if all the storages are initialized before passing them to Snapshot + if builder.events == nil { + return fmt.Errorf("events store not initialized: ensure 'events storage' module runs before 'execution state cache'") + } + if builder.collections == nil { + return fmt.Errorf("collections store not initialized: ensure 'collections storage' module runs before 'execution state cache'") + } + if builder.transactions == nil { + return fmt.Errorf("transactions store not initialized: ensure 'transactions storage' module runs before 'execution state cache'") + } + if builder.lightTransactionResults == nil { + return fmt.Errorf("lightTransactionResults store not initialized: ensure 'lightTransactionResults storage' module runs before 'execution state cache'") + } + if builder.transactionResultErrorMessages == nil { + return fmt.Errorf("transactionResultErrorMessages store not initialized: ensure 'transactionResultErrorMessages storage' module runs before 'execution state cache'") + } + + // TODO: use real objects instead of mocks once they're implemented + snapshot := osyncsnapshot.NewSnapshotMock( + builder.events, + builder.collections, + builder.transactions, + builder.lightTransactionResults, + builder.transactionResultErrorMessages, + nil, + notNil(executionDataStoreCache), + ) + builder.executionStateCache = execution_state.NewExecutionStateCacheMock(snapshot) + + return nil + }). Component("execution data service", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { opts := []network.BlobServiceOption{ blob.WithBitswapOptions( @@ -858,10 +893,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess indexedBlockHeight = store.NewConsumerProgress(builder.ProtocolDB, module.ConsumeProgressExecutionDataIndexerBlockHeight) return nil }). - Module("transaction results storage", func(node *cmd.NodeConfig) error { - builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize) - return nil - }). DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { // Note: using a DependableComponent here to ensure that the indexer does not block // other components from starting while bootstrapping the register db since it may @@ -1088,6 +1119,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess builder.stateStreamConf.ClientSendBufferSize, ), executionDataTracker, + notNil(builder.executionResultInfoProvider), + builder.executionStateCache, // might be nil ) if err != nil { return nil, fmt.Errorf("could not create state stream backend: %w", err) @@ -1729,10 +1762,76 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() { }) } +// buildStoragesData registers modules that initialize core read-only stores used by +// the access node (events, lightTransactionResults, and transactionResultErrorMessages). +func (builder *FlowAccessNodeBuilder) buildStoragesData() *FlowAccessNodeBuilder { + builder.Module("events storage", func(node *cmd.NodeConfig) error { + builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB) + return nil + }) + builder.Module("transaction result error messages storage", func(node *cmd.NodeConfig) error { + builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize) + return nil + }) + builder.Module("transaction results storage", func(node *cmd.NodeConfig) error { + builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize) + return nil + }) + + return builder +} + +// buildExecutionResultInfoProvider registers a module that wires the +// optimistic_sync.ExecutionResultInfoProvider on the builder. +func (builder *FlowAccessNodeBuilder) buildExecutionResultInfoProvider() *FlowAccessNodeBuilder { + builder.Module("execution result info provider", func(node *cmd.NodeConfig) error { + backendConfig := builder.rpcConf.BackendConfig + + preferredENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.PreferredExecutionNodeIDs) + if err != nil { + return fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) + } + + fixedENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.FixedExecutionNodeIDs) + if err != nil { + return fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) + } + + requiredENIdentifiers, err := flow.IdentifierListFromHex(builder.executionResultRequiredExecutors) + if err != nil { + return fmt.Errorf("failed to convert node id string to Flow Identifier for required EN list: %w", err) + } + + execNodeSelector := execution_result.NewExecutionNodeSelector( + preferredENIdentifiers, + fixedENIdentifiers, + ) + + operatorCriteria := optimistic_sync.Criteria{ + AgreeingExecutorsCount: builder.executionResultAgreeingExecutorsCount, + RequiredExecutors: requiredENIdentifiers, + } + + builder.executionResultInfoProvider = execution_result.NewExecutionResultInfoProvider( + node.Logger, + node.State, + node.Storage.Receipts, + execNodeSelector, + operatorCriteria, + ) + + return nil + }) + return builder +} + func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { var processedFinalizedBlockHeight storage.ConsumerProgressInitializer var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer + builder.buildStoragesData() + builder.buildExecutionResultInfoProvider() + if builder.executionDataSyncEnabled { builder.BuildExecutionSyncComponents() } @@ -1939,10 +2038,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.RegistersAsyncStore = execution.NewRegistersAsyncStore() return nil }). - Module("events storage", func(node *cmd.NodeConfig) error { - builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB) - return nil - }). Module("reporter", func(node *cmd.NodeConfig) error { builder.Reporter = index.NewReporter() return nil @@ -1974,13 +2069,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil }). - Module("transaction result error messages storage", func(node *cmd.NodeConfig) error { - if builder.storeTxResultErrorMessages { - builder.transactionResultErrorMessages = store.NewTransactionResultErrorMessages(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize) - } - - return nil - }). Component("version control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { if !builder.versionControlEnabled { noop := &module.NoopReadyDoneAware{} @@ -2110,15 +2198,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN list: %w", err) } - requiredENIdentifiers, err := flow.IdentifierListFromHex(builder.executionResultRequiredExecutors) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for required EN list: %w", err) - } - operatorCriteria := optimistic_sync.Criteria{ - AgreeingExecutorsCount: builder.executionResultAgreeingExecutorsCount, - RequiredExecutors: requiredENIdentifiers, - } - builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider( node.Logger, node.State, @@ -2137,30 +2216,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { notNil(builder.ExecNodeIdentitiesProvider), ) - execNodeSelector := execution_result.NewExecutionNodeSelector( - preferredENIdentifiers, - fixedENIdentifiers, - ) - - execResultInfoProvider := execution_result.NewExecutionResultInfoProvider( - node.Logger, - node.State, - node.Storage.Receipts, - execNodeSelector, - operatorCriteria, - ) - - // TODO: use real objects instead of mocks once they're implemented - snapshot := osyncsnapshot.NewSnapshotMock( - builder.events, - builder.collections, - builder.transactions, - builder.lightTransactionResults, - builder.transactionResultErrorMessages, - nil, - ) - execStateCache := execution_state.NewExecutionStateCacheMock(snapshot) - builder.nodeBackend, err = backend.New(backend.Params{ State: node.State, CollectionRPC: builder.CollectionRPC, // might be nil @@ -2201,8 +2256,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { VersionControl: notNil(builder.VersionControl), ExecNodeIdentitiesProvider: notNil(builder.ExecNodeIdentitiesProvider), TxErrorMessageProvider: notNil(builder.txResultErrorMessageProvider), - ExecutionResultInfoProvider: execResultInfoProvider, - ExecutionStateCache: execStateCache, + ExecutionResultInfoProvider: notNil(builder.executionResultInfoProvider), + ExecutionStateCache: builder.executionStateCache, // might be nil MaxScriptAndArgumentSize: config.BackendConfig.AccessConfig.MaxRequestMsgSize, ScheduledCallbacksEnabled: builder.scheduledCallbacksEnabled, }) @@ -2534,7 +2589,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri // notNil ensures that the input is not nil and returns it // the usage is to ensure the dependencies are initialized before initializing a module. // for instance, the IngestionEngine depends on storage.Collections, which is initialized in a -// different function, so we need to ensure that the storage.Collections is initialized before +// different function, so we need to ensure that the storage.Collections were initialized before // creating the IngestionEngine. func notNil[T any](dep T) T { if any(dep) == nil { diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 2633cc3b017..80eb929df78 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -331,7 +331,9 @@ type ObserverServiceBuilder struct { unsecureGrpcServer *grpcserver.GrpcServer stateStreamGrpcServer *grpcserver.GrpcServer - stateStreamBackend *statestreambackend.StateStreamBackend + stateStreamBackend *statestreambackend.StateStreamBackend + executionResultInfoProvider optimistic_sync.ExecutionResultInfoProvider + executionStateCache optimistic_sync.ExecutionStateCache } // deriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters. @@ -1108,10 +1110,53 @@ func (builder *ObserverServiceBuilder) initObserverLocal() func(node *cmd.NodeCo } } +// buildExecutionResultInfoProvider registers a module that wires the +// optimistic_sync.ExecutionResultInfoProvider on the builder. +func (builder *ObserverServiceBuilder) buildExecutionResultInfoProvider() *ObserverServiceBuilder { + builder.Module("execution result info provider", func(node *cmd.NodeConfig) error { + backendConfig := builder.rpcConf.BackendConfig + + preferredENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.PreferredExecutionNodeIDs) + if err != nil { + return fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) + } + + fixedENIdentifiers, err := flow.IdentifierListFromHex(backendConfig.FixedExecutionNodeIDs) + if err != nil { + return fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) + } + + requiredENIdentifiers, err := flow.IdentifierListFromHex(builder.executionResultRequiredExecutors) + if err != nil { + return fmt.Errorf("failed to convert node id string to Flow Identifier for required EN list: %w", err) + } + operatorCriteria := optimistic_sync.Criteria{ + AgreeingExecutorsCount: builder.executionResultAgreeingExecutorsCount, + RequiredExecutors: requiredENIdentifiers, + } + execNodeSelector := execution_result.NewExecutionNodeSelector( + preferredENIdentifiers, + fixedENIdentifiers, + ) + + builder.executionResultInfoProvider = execution_result.NewExecutionResultInfoProvider( + node.Logger, + node.State, + node.Storage.Receipts, + execNodeSelector, + operatorCriteria, + ) + + return nil + }) + return builder +} + // Build enqueues the sync engine and the follower engine for the observer. // Currently, the observer only runs the follower engine. func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) { builder.BuildConsensusFollower() + builder.buildExecutionResultInfoProvider() if builder.executionDataSyncEnabled { builder.BuildExecutionSyncComponents() @@ -1222,6 +1267,25 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return nil }). + Module("events storage", func(node *cmd.NodeConfig) error { + builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB) + return nil + }). + Module("execution state cache", func(node *cmd.NodeConfig) error { + // TODO: use real objects instead of mocks once they're implemented + snapshot := osyncsnapshot.NewSnapshotMock( + builder.events, + builder.Storage.Collections, + builder.Storage.Transactions, + builder.lightTransactionResults, + nil, + nil, + executionDataStoreCache, + ) + builder.executionStateCache = execution_state.NewExecutionStateCacheMock(snapshot) + + return nil + }). Component("public execution data service", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { opts := []network.BlobServiceOption{ blob.WithBitswapOptions( @@ -1609,6 +1673,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS builder.stateStreamConf.ClientSendBufferSize, ), executionDataTracker, + builder.executionResultInfoProvider, + builder.executionStateCache, ) if err != nil { return nil, fmt.Errorf("could not create state stream backend: %w", err) @@ -1829,10 +1895,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.RegistersAsyncStore = execution.NewRegistersAsyncStore() return nil }) - builder.Module("events storage", func(node *cmd.NodeConfig) error { - builder.events = store.NewEvents(node.Metrics.Cache, node.ProtocolDB) - return nil - }) builder.Module("reporter", func(node *cmd.NodeConfig) error { builder.Reporter = index.NewReporter() return nil @@ -1988,39 +2050,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { fixedENIdentifiers, ) - requiredENIdentifiers, err := flow.IdentifierListFromHex(builder.executionResultRequiredExecutors) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for required EN list: %w", err) - } - operatorCriteria := optimistic_sync.Criteria{ - AgreeingExecutorsCount: builder.executionResultAgreeingExecutorsCount, - RequiredExecutors: requiredENIdentifiers, - } - - execNodeSelector := execution_result.NewExecutionNodeSelector( - preferredENIdentifiers, - fixedENIdentifiers, - ) - - execResultInfoProvider := execution_result.NewExecutionResultInfoProvider( - node.Logger, - node.State, - node.Storage.Receipts, - execNodeSelector, - operatorCriteria, - ) - - // TODO: use real objects instead of mocks once they're implemented - snapshot := osyncsnapshot.NewSnapshotMock( - builder.events, - nil, - nil, - builder.lightTransactionResults, - nil, - nil, - ) - execStateCache := execution_state.NewExecutionStateCacheMock(snapshot) - backendParams := backend.Params{ State: node.State, Blocks: node.Storage.Blocks, @@ -2052,8 +2081,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { VersionControl: builder.VersionControl, ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, MaxScriptAndArgumentSize: config.BackendConfig.AccessConfig.MaxRequestMsgSize, - ExecutionResultInfoProvider: execResultInfoProvider, - ExecutionStateCache: execStateCache, + ExecutionResultInfoProvider: builder.executionResultInfoProvider, + ExecutionStateCache: builder.executionStateCache, } if builder.localServiceAPIEnabled { diff --git a/engine/access/integration_unsecure_grpc_server_test.go b/engine/access/integration_unsecure_grpc_server_test.go index 73c03152182..f9e96c6f46f 100644 --- a/engine/access/integration_unsecure_grpc_server_test.go +++ b/engine/access/integration_unsecure_grpc_server_test.go @@ -295,6 +295,8 @@ func (suite *SameGRPCPortTestSuite) SetupTest() { state_stream.DefaultRegisterIDsRequestLimit, subscriptionHandler, suite.executionDataTracker, + suite.executionResultInfoProvider, + suite.executionStateCache, ) assert.NoError(suite.T(), err) diff --git a/engine/access/state_stream/backend/backend.go b/engine/access/state_stream/backend/backend.go index 4e99013ebd2..b32692af6e0 100644 --- a/engine/access/state_stream/backend/backend.go +++ b/engine/access/state_stream/backend/backend.go @@ -18,6 +18,7 @@ import ( "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" + "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" ) @@ -95,6 +96,8 @@ func New( registerIDsRequestLimit int, subscriptionHandler *subscription.SubscriptionHandler, executionDataTracker tracker.ExecutionDataTracker, + executionResultProvider optimistic_sync.ExecutionResultInfoProvider, + executionStateCache optimistic_sync.ExecutionStateCache, ) (*StateStreamBackend, error) { logger := log.With().Str("module", "state_stream_api").Logger() @@ -113,11 +116,13 @@ func New( } b.ExecutionDataBackend = ExecutionDataBackend{ - log: logger, - headers: headers, - subscriptionHandler: subscriptionHandler, - getExecutionData: b.getExecutionData, - executionDataTracker: executionDataTracker, + log: logger, + headers: headers, + subscriptionHandler: subscriptionHandler, + getExecutionData: b.getExecutionData, + executionDataTracker: executionDataTracker, + executionResultProvider: executionResultProvider, + executionStateCache: executionStateCache, } eventsProvider := EventsProvider{ diff --git a/engine/access/state_stream/backend/backend_executiondata.go b/engine/access/state_stream/backend/backend_executiondata.go index 954640f3cb9..7c9d2fd49df 100644 --- a/engine/access/state_stream/backend/backend_executiondata.go +++ b/engine/access/state_stream/backend/backend_executiondata.go @@ -7,23 +7,26 @@ import ( "time" "github.com/rs/zerolog" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rpc/backend/common" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/engine/access/subscription/tracker" - "github.com/onflow/flow-go/engine/common/rpc" + accessmodel "github.com/onflow/flow-go/model/access" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/executiondatasync/execution_data" + "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync" "github.com/onflow/flow-go/storage" ) +// ExecutionDataResponse bundles the execution data returned for a single block. type ExecutionDataResponse struct { Height uint64 ExecutionData *execution_data.BlockExecutionData BlockTimestamp time.Time } +// ExecutionDataBackend exposes read-only access to execution data. type ExecutionDataBackend struct { log zerolog.Logger headers storage.Headers @@ -32,26 +35,64 @@ type ExecutionDataBackend struct { subscriptionHandler *subscription.SubscriptionHandler executionDataTracker tracker.ExecutionDataTracker + + executionResultProvider optimistic_sync.ExecutionResultInfoProvider + executionStateCache optimistic_sync.ExecutionStateCache } -func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error) { - header, err := b.headers.ByBlockID(blockID) +// GetExecutionDataByBlockID retrieves execution data for a specific block by its block ID. +// +// CAUTION: this layer SIMPLIFIES the ERROR HANDLING convention +// - All errors returned are guaranteed to be benign. The node can continue normal operations after such errors. +// - To prevent delivering incorrect results to clients in case of an error, all other return values should be discarded. +// +// Expected errors: +// - [access.DataNotFoundError]: when data required to process the request is not available. +func (b *ExecutionDataBackend) GetExecutionDataByBlockID( + ctx context.Context, + blockID flow.Identifier, + criteria optimistic_sync.Criteria, +) (*execution_data.BlockExecutionData, *accessmodel.ExecutorMetadata, error) { + execResultInfo, err := b.executionResultProvider.ExecutionResultInfo(blockID, criteria) if err != nil { - return nil, fmt.Errorf("could not get block header for %s: %w", blockID, err) + err = fmt.Errorf("failed to get execution result info for block: %w", err) + switch { + case errors.Is(err, storage.ErrNotFound): + return nil, nil, access.NewDataNotFoundError("execution data", err) + case common.IsInsufficientExecutionReceipts(err): + return nil, nil, access.NewDataNotFoundError("execution data", err) + default: + return nil, nil, access.RequireNoError(ctx, err) + } } - executionData, err := b.getExecutionData(ctx, header.Height) + executionResultID := execResultInfo.ExecutionResultID + snapshot, err := b.executionStateCache.Snapshot(executionResultID) + if err != nil { + err = access.RequireErrorIs(ctx, err, storage.ErrNotFound) + err = fmt.Errorf("failed to find snapshot by execution result ID %s: %w", executionResultID.String(), err) + return nil, nil, access.NewDataNotFoundError("snapshot", err) + } + executionData, err := snapshot.BlockExecutionData().ByBlockID(ctx, blockID) if err != nil { // need custom not found handler due to blob not found error - if errors.Is(err, storage.ErrNotFound) || execution_data.IsBlobNotFoundError(err) || errors.Is(err, subscription.ErrBlockNotReady) { - return nil, status.Errorf(codes.NotFound, "could not find execution data: %v", err) + if errors.Is(err, storage.ErrNotFound) || + execution_data.IsBlobNotFoundError(err) { + err = fmt.Errorf("could not find execution data for block %s: %w", blockID, err) + return nil, nil, access.NewDataNotFoundError("execution data", err) } - return nil, rpc.ConvertError(err, "could not get execution data", codes.Internal) + // any other error is unexpected exception and indicates there is a bug or inconsistent state. + return nil, nil, access.RequireNoError(ctx, fmt.Errorf("unexpected error getting execution data: %w", err)) + } + + metadata := &accessmodel.ExecutorMetadata{ + ExecutionResultID: executionResultID, + ExecutorIDs: execResultInfo.ExecutionNodes.NodeIDs(), } - return executionData.BlockExecutionData, nil + return executionData.BlockExecutionData, metadata, nil } // SubscribeExecutionData is deprecated and will be removed in future versions. diff --git a/engine/access/state_stream/backend/backend_executiondata_test.go b/engine/access/state_stream/backend/backend_executiondata_test.go index f7587affa6d..d5f204ee0ee 100644 --- a/engine/access/state_stream/backend/backend_executiondata_test.go +++ b/engine/access/state_stream/backend/backend_executiondata_test.go @@ -16,17 +16,22 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/access/index" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/engine/access/subscription/tracker" trackermock "github.com/onflow/flow-go/engine/access/subscription/tracker/mock" + accessmodel "github.com/onflow/flow-go/model/access" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/blobs" "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" + "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync" + osyncmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool/herocache" "github.com/onflow/flow-go/module/metrics" protocolmock "github.com/onflow/flow-go/state/protocol/mock" @@ -68,6 +73,11 @@ type BackendExecutionDataSuite struct { backend *StateStreamBackend executionDataTrackerReal tracker.ExecutionDataTracker + executionResultProvider *osyncmock.ExecutionResultProvider + executionStateCache *osyncmock.ExecutionStateCache + executionDataSnapshot *osyncmock.Snapshot + criteria optimistic_sync.Criteria + blocks []*flow.Block blockEvents map[flow.Identifier][]flow.Event execDataMap map[flow.Identifier]*execution_data.BlockExecutionDataEntity @@ -169,6 +179,11 @@ func (s *BackendExecutionDataSuite) SetupTestSuite(blockCount int) { s.resultMap = make(map[flow.Identifier]*flow.ExecutionResult, blockCount) s.blocks = make([]*flow.Block, 0, blockCount) + s.executionDataSnapshot = osyncmock.NewSnapshot(s.T()) + s.executionResultProvider = osyncmock.NewExecutionResultProvider(s.T()) + s.executionStateCache = osyncmock.NewExecutionStateCache(s.T()) + s.criteria = optimistic_sync.Criteria{} + // generate blockCount consecutive blocks with associated seal, result and execution data s.rootBlock = unittest.BlockFixture() s.blockMap[s.rootBlock.Height] = s.rootBlock @@ -261,6 +276,8 @@ func (s *BackendExecutionDataSuite) SetupBackend(useEventsIndex bool) { subscription.DefaultSendBufferSize, ), s.executionDataTracker, + s.executionResultProvider, + s.executionStateCache, ) require.NoError(s.T(), err) @@ -335,24 +352,189 @@ func (s *BackendExecutionDataSuite) TestGetExecutionDataByBlockID() { // notify backend block is available s.highestBlockHeader = block.ToHeader() + executionNodes := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) + var err error + reader := osyncmock.NewBlockExecutionDataReader(s.T()) s.Run("happy path TestGetExecutionDataByBlockID success", func() { result.ExecutionDataID, err = s.eds.Add(ctx, execData.BlockExecutionData) require.NoError(s.T(), err) - res, err := s.backend.GetExecutionDataByBlockID(ctx, block.ID()) + metadata := &accessmodel.ExecutorMetadata{ + ExecutionResultID: result.ID(), + ExecutorIDs: executionNodes.NodeIDs(), + } + + s.executionResultProvider. + On("ExecutionResultInfo", block.ID(), mock.Anything). + Return(&optimistic_sync.ExecutionResultInfo{ + ExecutionResultID: result.ID(), + ExecutionNodes: executionNodes.ToSkeleton(), + }, nil). + Once() + + s.executionStateCache. + On("Snapshot", result.ID()). + Return(s.executionDataSnapshot, nil). + Once() + + s.executionDataSnapshot. + On("BlockExecutionData"). + Return(reader). + Once() + + reader. + On("ByBlockID", mock.Anything, block.ID()). + Return(execData, nil). + Once() + + res, resMetadata, err := s.backend.GetExecutionDataByBlockID(ctx, block.ID(), s.criteria) + assert.NotNil(s.T(), resMetadata) + assert.Equal(s.T(), metadata, resMetadata) assert.Equal(s.T(), execData.BlockExecutionData, res) assert.NoError(s.T(), err) }) s.execDataHeroCache.Clear() + s.Run("execution result info returns data not found", func() { + s.executionResultProvider. + On("ExecutionResultInfo", block.ID(), mock.Anything). + Return(nil, storage.ErrNotFound). + Once() + + execDataRes, metadata, err := s.backend.GetExecutionDataByBlockID(ctx, block.ID(), s.criteria) + assert.Nil(s.T(), execDataRes) + assert.Nil(s.T(), metadata) + require.Error(s.T(), err) + require.True(s.T(), access.IsDataNotFoundError(err)) + }) + + s.Run("execution result info returns unexpected error", func() { + expectedErr := fmt.Errorf("failed to get execution result info for block: %w", storage.ErrDataMismatch) + s.executionResultProvider. + On("ExecutionResultInfo", block.ID(), mock.Anything). + Return(nil, storage.ErrDataMismatch). + Once() + + ctxSignaler := irrecoverable.NewMockSignalerContextExpectError(s.T(), ctx, expectedErr) + ctxIrr := irrecoverable.WithSignalerContext(ctx, ctxSignaler) + + execDataRes, metadata, err := s.backend.GetExecutionDataByBlockID(ctxIrr, block.ID(), s.criteria) + assert.Nil(s.T(), execDataRes) + assert.Nil(s.T(), metadata) + assert.Error(s.T(), err) + }) + + s.Run("snapshot returns data not found", func() { + s.executionResultProvider. + On("ExecutionResultInfo", block.ID(), mock.Anything). + Return(&optimistic_sync.ExecutionResultInfo{ + ExecutionResultID: result.ID(), + ExecutionNodes: executionNodes.ToSkeleton(), + }, nil). + Once() + + s.executionStateCache. + On("Snapshot", result.ID()). + Return(nil, storage.ErrNotFound). + Once() + + execDataRes, metadata, err := s.backend.GetExecutionDataByBlockID(ctx, block.ID(), s.criteria) + assert.Nil(s.T(), execDataRes) + assert.Nil(s.T(), metadata) + require.Error(s.T(), err) + require.True(s.T(), access.IsDataNotFoundError(err)) + }) + + s.Run("snapshot returns unexpected error", func() { + s.executionResultProvider. + On("ExecutionResultInfo", block.ID(), mock.Anything). + Return(&optimistic_sync.ExecutionResultInfo{ + ExecutionResultID: result.ID(), + ExecutionNodes: executionNodes.ToSkeleton(), + }, nil). + Once() + + expectedError := fmt.Errorf("unexpected error") + s.executionStateCache. + On("Snapshot", result.ID()). + Return(nil, expectedError). + Once() + + ctxSignaler := irrecoverable.NewMockSignalerContextExpectError(s.T(), ctx, expectedError) + ctxIrr := irrecoverable.WithSignalerContext(ctx, ctxSignaler) + + execDataRes, metadata, err := s.backend.GetExecutionDataByBlockID(ctxIrr, block.ID(), s.criteria) + assert.Nil(s.T(), execDataRes) + assert.Nil(s.T(), metadata) + assert.Error(s.T(), err) + }) + s.Run("missing exec data for TestGetExecutionDataByBlockID failure", func() { result.ExecutionDataID = unittest.IdentifierFixture() - execDataRes, err := s.backend.GetExecutionDataByBlockID(ctx, block.ID()) + s.executionResultProvider. + On("ExecutionResultInfo", block.ID(), mock.Anything). + Return(&optimistic_sync.ExecutionResultInfo{ + ExecutionResultID: result.ID(), + ExecutionNodes: executionNodes.ToSkeleton(), + }, nil). + Once() + + s.executionStateCache. + On("Snapshot", result.ID()). + Return(s.executionDataSnapshot, nil). + Once() + + s.executionDataSnapshot. + On("BlockExecutionData"). + Return(reader). + Once() + + reader. + On("ByBlockID", mock.Anything, block.ID()). + Return(nil, storage.ErrNotFound). + Once() + + execDataRes, metadata, err := s.backend.GetExecutionDataByBlockID(ctx, block.ID(), s.criteria) + assert.Nil(s.T(), execDataRes) + assert.Nil(s.T(), metadata) + s.Require().True(access.IsDataNotFoundError(err)) + }) + + s.Run("unexpected error from ByBlockID", func() { + s.executionResultProvider. + On("ExecutionResultInfo", block.ID(), mock.Anything). + Return(&optimistic_sync.ExecutionResultInfo{ + ExecutionResultID: result.ID(), + ExecutionNodes: executionNodes.ToSkeleton(), + }, nil). + Once() + + s.executionStateCache. + On("Snapshot", result.ID()). + Return(s.executionDataSnapshot, nil). + Once() + + s.executionDataSnapshot. + On("BlockExecutionData"). + Return(reader). + Once() + + reader. + On("ByBlockID", mock.Anything, block.ID()). + Return(nil, storage.ErrDataMismatch). + Once() + + expectedError := fmt.Errorf("unexpected error getting execution data: %w", storage.ErrDataMismatch) + ctxSignaler := irrecoverable.NewMockSignalerContextExpectError(s.T(), ctx, expectedError) + ctxIrr := irrecoverable.WithSignalerContext(ctx, ctxSignaler) + + execDataRes, metadata, err := s.backend.GetExecutionDataByBlockID(ctxIrr, block.ID(), s.criteria) assert.Nil(s.T(), execDataRes) - assert.Equal(s.T(), codes.NotFound, status.Code(err)) + assert.Nil(s.T(), metadata) + assert.Error(s.T(), err) }) } diff --git a/engine/access/state_stream/backend/handler.go b/engine/access/state_stream/backend/handler.go index ea9cded1bed..8cb97d696a1 100644 --- a/engine/access/state_stream/backend/handler.go +++ b/engine/access/state_stream/backend/handler.go @@ -49,15 +49,23 @@ func NewHandler(api state_stream.API, chain flow.Chain, config Config) *Handler return h } +// GetExecutionDataByBlockID handles request to fetch execution data for a +// specific block. +// +// Expected errors during normal operation: +// - codes.InvalidArgument - if invalid block ID provided. +// - codes.Internal - if failed to get execution data the execution node or failed to convert execution data event payloads to JSON. func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *executiondata.GetExecutionDataByBlockIDRequest) (*executiondata.GetExecutionDataByBlockIDResponse, error) { blockID, err := convert.BlockID(request.GetBlockId()) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "could not convert block ID: %v", err) } - execData, err := h.api.GetExecutionDataByBlockID(ctx, blockID) + query := request.GetExecutionStateQuery() + + execData, executorMetadata, err := h.api.GetExecutionDataByBlockID(ctx, blockID, convert.NewCriteria(query)) if err != nil { - return nil, rpc.ConvertError(err, "could no get execution data", codes.Internal) + return nil, rpc.ErrorToStatus(err) } message, err := convert.BlockExecutionDataToMessage(execData) @@ -70,7 +78,15 @@ func (h *Handler) GetExecutionDataByBlockID(ctx context.Context, request *execut return nil, status.Errorf(codes.Internal, "could not convert execution data event payloads to JSON: %v", err) } - return &executiondata.GetExecutionDataByBlockIDResponse{BlockExecutionData: message}, nil + response := &executiondata.GetExecutionDataByBlockIDResponse{ + BlockExecutionData: message, + } + + if query.GetIncludeExecutorMetadata() { + response.ExecutorMetadata = convert.ExecutorMetadataToMessage(executorMetadata) + } + + return response, nil } // SubscribeExecutionData is deprecated and will be removed in a future version. diff --git a/engine/access/state_stream/backend/handler_test.go b/engine/access/state_stream/backend/handler_test.go index affe2167dfe..6f29a9a8aec 100644 --- a/engine/access/state_stream/backend/handler_test.go +++ b/engine/access/state_stream/backend/handler_test.go @@ -26,6 +26,7 @@ import ( ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/engine/common/rpc/convert" + accessmodel "github.com/onflow/flow-go/model/access" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/utils/unittest" @@ -224,17 +225,27 @@ func TestGetExecutionDataByBlockID(t *testing.T) { ) blockID := result.BlockID + metadata := &accessmodel.ExecutorMetadata{ + ExecutionResultID: unittest.IdentifierFixture(), + ExecutorIDs: unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)).NodeIDs(), + } + api := ssmock.NewAPI(t) - api.On("GetExecutionDataByBlockID", mock.Anything, blockID).Return(result, nil) + api.On("GetExecutionDataByBlockID", mock.Anything, blockID, mock.Anything).Return(result, metadata, nil).Once() h := NewHandler(api, flow.Localnet.Chain(), makeConfig(1)) response, err := h.GetExecutionDataByBlockID(ctx, &executiondata.GetExecutionDataByBlockIDRequest{ BlockId: blockID[:], EventEncodingVersion: test.eventVersion, + ExecutionStateQuery: &entities.ExecutionStateQuery{ + IncludeExecutorMetadata: true, + }, }) require.NoError(t, err) require.NotNil(t, response) + require.NotNil(t, response.ExecutorMetadata) + require.Equal(t, convert.ExecutorMetadataToMessage(metadata), response.ExecutorMetadata) blockExecutionData := response.GetBlockExecutionData() require.Equal(t, blockID[:], blockExecutionData.GetBlockId()) diff --git a/engine/access/state_stream/mock/api.go b/engine/access/state_stream/mock/api.go index ba8bd9e7544..1f07962cdca 100644 --- a/engine/access/state_stream/mock/api.go +++ b/engine/access/state_stream/mock/api.go @@ -5,11 +5,16 @@ package mock import ( context "context" - flow "github.com/onflow/flow-go/model/flow" + access "github.com/onflow/flow-go/model/access" + execution_data "github.com/onflow/flow-go/module/executiondatasync/execution_data" + flow "github.com/onflow/flow-go/model/flow" + mock "github.com/stretchr/testify/mock" + optimistic_sync "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync" + state_stream "github.com/onflow/flow-go/engine/access/state_stream" subscription "github.com/onflow/flow-go/engine/access/subscription" @@ -20,34 +25,43 @@ type API struct { mock.Mock } -// GetExecutionDataByBlockID provides a mock function with given fields: ctx, blockID -func (_m *API) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error) { - ret := _m.Called(ctx, blockID) +// GetExecutionDataByBlockID provides a mock function with given fields: ctx, blockID, criteria +func (_m *API) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier, criteria optimistic_sync.Criteria) (*execution_data.BlockExecutionData, *access.ExecutorMetadata, error) { + ret := _m.Called(ctx, blockID, criteria) if len(ret) == 0 { panic("no return value specified for GetExecutionDataByBlockID") } var r0 *execution_data.BlockExecutionData - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier) (*execution_data.BlockExecutionData, error)); ok { - return rf(ctx, blockID) + var r1 *access.ExecutorMetadata + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, optimistic_sync.Criteria) (*execution_data.BlockExecutionData, *access.ExecutorMetadata, error)); ok { + return rf(ctx, blockID, criteria) } - if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier) *execution_data.BlockExecutionData); ok { - r0 = rf(ctx, blockID) + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, optimistic_sync.Criteria) *execution_data.BlockExecutionData); ok { + r0 = rf(ctx, blockID, criteria) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*execution_data.BlockExecutionData) } } - if rf, ok := ret.Get(1).(func(context.Context, flow.Identifier) error); ok { - r1 = rf(ctx, blockID) + if rf, ok := ret.Get(1).(func(context.Context, flow.Identifier, optimistic_sync.Criteria) *access.ExecutorMetadata); ok { + r1 = rf(ctx, blockID, criteria) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).(*access.ExecutorMetadata) + } } - return r0, r1 + if rf, ok := ret.Get(2).(func(context.Context, flow.Identifier, optimistic_sync.Criteria) error); ok { + r2 = rf(ctx, blockID, criteria) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } // GetRegisterValues provides a mock function with given fields: registerIDs, height diff --git a/engine/access/state_stream/state_stream.go b/engine/access/state_stream/state_stream.go index 862c0c0e3b1..7beafb8c679 100644 --- a/engine/access/state_stream/state_stream.go +++ b/engine/access/state_stream/state_stream.go @@ -4,8 +4,10 @@ import ( "context" "github.com/onflow/flow-go/engine/access/subscription" + accessmodel "github.com/onflow/flow-go/model/access" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/executiondatasync/execution_data" + "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync" ) const ( @@ -16,7 +18,14 @@ const ( // API represents an interface that defines methods for interacting with a blockchain's execution data and events. type API interface { // GetExecutionDataByBlockID retrieves execution data for a specific block by its block ID. - GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionData, error) + // + // CAUTION: this layer SIMPLIFIES the ERROR HANDLING convention + // - All errors returned are guaranteed to be benign. The node can continue normal operations after such errors. + // - To prevent delivering incorrect results to clients in case of an error, all other return values should be discarded. + // + // Expected errors: + // - [access.DataNotFoundError]: when data required to process the request is not available. + GetExecutionDataByBlockID(ctx context.Context, blockID flow.Identifier, criteria optimistic_sync.Criteria) (*execution_data.BlockExecutionData, *accessmodel.ExecutorMetadata, error) // SubscribeExecutionData is deprecated and will be removed in future versions. // Use SubscribeExecutionDataFromStartBlockID, SubscribeExecutionDataFromStartBlockHeight or SubscribeExecutionDataFromLatest. // diff --git a/module/executiondatasync/optimistic_sync/block_execution_data_reader.go b/module/executiondatasync/optimistic_sync/block_execution_data_reader.go new file mode 100644 index 00000000000..f9290e30664 --- /dev/null +++ b/module/executiondatasync/optimistic_sync/block_execution_data_reader.go @@ -0,0 +1,20 @@ +package optimistic_sync + +import ( + "context" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/executiondatasync/execution_data" +) + +// BlockExecutionDataReader provides access to per-block execution data sourced from a particular snapshot. +type BlockExecutionDataReader interface { + // ByBlockID returns the execution data for the given block ID. + // + // Expected errors during normal operation: + // - [storage.ErrNotFound]: if a seal or execution result is not available for the block + // - [BlobNotFoundError]: if some CID in the blob tree could not be found from the blobstore + // - [MalformedDataError]: if some level of the blob tree cannot be properly deserialized + // - [BlobSizeLimitExceededError]: if some blob in the blob tree exceeds the maximum allowed size + ByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionDataEntity, error) +} diff --git a/module/executiondatasync/optimistic_sync/mock/block_execution_data_reader.go b/module/executiondatasync/optimistic_sync/mock/block_execution_data_reader.go new file mode 100644 index 00000000000..244a4af123a --- /dev/null +++ b/module/executiondatasync/optimistic_sync/mock/block_execution_data_reader.go @@ -0,0 +1,61 @@ +// Code generated by mockery. DO NOT EDIT. + +package mock + +import ( + context "context" + + flow "github.com/onflow/flow-go/model/flow" + execution_data "github.com/onflow/flow-go/module/executiondatasync/execution_data" + + mock "github.com/stretchr/testify/mock" +) + +// BlockExecutionDataReader is an autogenerated mock type for the BlockExecutionDataReader type +type BlockExecutionDataReader struct { + mock.Mock +} + +// ByBlockID provides a mock function with given fields: ctx, blockID +func (_m *BlockExecutionDataReader) ByBlockID(ctx context.Context, blockID flow.Identifier) (*execution_data.BlockExecutionDataEntity, error) { + ret := _m.Called(ctx, blockID) + + if len(ret) == 0 { + panic("no return value specified for ByBlockID") + } + + var r0 *execution_data.BlockExecutionDataEntity + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier) (*execution_data.BlockExecutionDataEntity, error)); ok { + return rf(ctx, blockID) + } + if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier) *execution_data.BlockExecutionDataEntity); ok { + r0 = rf(ctx, blockID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*execution_data.BlockExecutionDataEntity) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, flow.Identifier) error); ok { + r1 = rf(ctx, blockID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewBlockExecutionDataReader creates a new instance of BlockExecutionDataReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBlockExecutionDataReader(t interface { + mock.TestingT + Cleanup(func()) +}) *BlockExecutionDataReader { + mock := &BlockExecutionDataReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/module/executiondatasync/optimistic_sync/mock/execution_result_info_provider.go b/module/executiondatasync/optimistic_sync/mock/execution_result_info_provider.go index d7f60d88797..b86cffb5b11 100644 --- a/module/executiondatasync/optimistic_sync/mock/execution_result_info_provider.go +++ b/module/executiondatasync/optimistic_sync/mock/execution_result_info_provider.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.3. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mock diff --git a/module/executiondatasync/optimistic_sync/mock/execution_state_cache.go b/module/executiondatasync/optimistic_sync/mock/execution_state_cache.go index a88e4305011..506fc783454 100644 --- a/module/executiondatasync/optimistic_sync/mock/execution_state_cache.go +++ b/module/executiondatasync/optimistic_sync/mock/execution_state_cache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.3. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mock diff --git a/module/executiondatasync/optimistic_sync/mock/snapshot.go b/module/executiondatasync/optimistic_sync/mock/snapshot.go index d6842d8f187..ff3c29ce74b 100644 --- a/module/executiondatasync/optimistic_sync/mock/snapshot.go +++ b/module/executiondatasync/optimistic_sync/mock/snapshot.go @@ -1,11 +1,11 @@ -// Code generated by mockery v2.53.3. DO NOT EDIT. +// Code generated by mockery. DO NOT EDIT. package mock import ( - mock "github.com/stretchr/testify/mock" - + optimistic_sync "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync" storage "github.com/onflow/flow-go/storage" + mock "github.com/stretchr/testify/mock" ) // Snapshot is an autogenerated mock type for the Snapshot type @@ -13,6 +13,26 @@ type Snapshot struct { mock.Mock } +// BlockExecutionData provides a mock function with no fields +func (_m *Snapshot) BlockExecutionData() optimistic_sync.BlockExecutionDataReader { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for BlockExecutionData") + } + + var r0 optimistic_sync.BlockExecutionDataReader + if rf, ok := ret.Get(0).(func() optimistic_sync.BlockExecutionDataReader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(optimistic_sync.BlockExecutionDataReader) + } + } + + return r0 +} + // Collections provides a mock function with no fields func (_m *Snapshot) Collections() storage.CollectionsReader { ret := _m.Called() diff --git a/module/executiondatasync/optimistic_sync/snapshot.go b/module/executiondatasync/optimistic_sync/snapshot.go index 3b3dd371f29..84abbc915ba 100644 --- a/module/executiondatasync/optimistic_sync/snapshot.go +++ b/module/executiondatasync/optimistic_sync/snapshot.go @@ -23,4 +23,7 @@ type Snapshot interface { // Registers returns a reader for querying register data. Registers() storage.RegisterIndexReader + + // BlockExecutionData returns a reader for querying execution data. + BlockExecutionData() BlockExecutionDataReader } diff --git a/module/executiondatasync/optimistic_sync/snapshot/snapshot.go b/module/executiondatasync/optimistic_sync/snapshot/snapshot.go index 66ddd985af1..2c30c3db423 100644 --- a/module/executiondatasync/optimistic_sync/snapshot/snapshot.go +++ b/module/executiondatasync/optimistic_sync/snapshot/snapshot.go @@ -12,6 +12,7 @@ type Mock struct { lightTransactionResults storage.LightTransactionResultsReader transactionResultErrorMessages storage.TransactionResultErrorMessagesReader registers storage.RegisterIndexReader + executionData optimistic_sync.BlockExecutionDataReader } var _ optimistic_sync.Snapshot = (*Mock)(nil) @@ -23,6 +24,7 @@ func NewSnapshotMock( lightTransactionResults storage.LightTransactionResultsReader, transactionResultErrorMessages storage.TransactionResultErrorMessagesReader, registers storage.RegisterIndexReader, + executionData optimistic_sync.BlockExecutionDataReader, ) *Mock { return &Mock{ events: events, @@ -31,6 +33,7 @@ func NewSnapshotMock( lightTransactionResults: lightTransactionResults, transactionResultErrorMessages: transactionResultErrorMessages, registers: registers, + executionData: executionData, } } @@ -57,3 +60,5 @@ func (s *Mock) TransactionResultErrorMessages() storage.TransactionResultErrorMe func (s *Mock) Registers() storage.RegisterIndexReader { return s.registers } + +func (s *Mock) BlockExecutionData() optimistic_sync.BlockExecutionDataReader { return s.executionData }