diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 8e182a92..90f20603 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "sort" + "time" "github.com/onflow/cadence/common" "github.com/onflow/flow-go/fvm/evm/events" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" @@ -119,26 +122,42 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE // Subscribing to EVM specific events and handle any disconnection errors // as well as context cancellations. func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { - eventsChan := make(chan models.BlockEvents) + // create the channel with a buffer size of 1, + // to avoid blocking on the two error cases below + eventsChan := make(chan models.BlockEvents, 1) _, err := r.client.GetBlockHeaderByHeight(ctx, height) if err != nil { err = fmt.Errorf("failed to subscribe for events, the block height %d doesn't exist: %w", height, err) eventsChan <- models.NewBlockEventsError(err) + close(eventsChan) return eventsChan } - // we always use heartbeat interval of 1 to have the least amount of delay from the access node - eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight( - ctx, - height, - blocksFilter(r.chain), - access.WithHeartbeatInterval(1), - ) - if err != nil { + var blockEventsStream <-chan flow.BlockEvents + var errChan <-chan error + + lastReceivedHeight := height + connect := func(height uint64) error { + var err error + + // we always use heartbeat interval of 1 to have the + // least amount of delay from the access node + blockEventsStream, errChan, err = r.client.SubscribeEventsByBlockHeight( + ctx, + height, + blocksFilter(r.chain), + access.WithHeartbeatInterval(1), + ) + + return err + } + + if err := connect(lastReceivedHeight); err != nil { eventsChan <- models.NewBlockEventsError( fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err), ) + close(eventsChan) return eventsChan } @@ -153,8 +172,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha r.logger.Info().Msg("event ingestion received done signal") return - case blockEvents, ok := <-eventStream: + case blockEvents, ok := <-blockEventsStream: if !ok { + // typically we receive an error in the errChan before the channels are closed var err error err = errs.ErrDisconnected if ctx.Err() != nil { @@ -183,10 +203,13 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha }, ) + lastReceivedHeight = blockEvents.Height + eventsChan <- evmEvents case err, ok := <-errChan: if !ok { + // typically we receive an error in the errChan before the channels are closed var err error err = errs.ErrDisconnected if ctx.Err() != nil { @@ -196,8 +219,29 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return } - eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) - return + switch status.Code(err) { + case codes.NotFound: + // we can get not found when reconnecting after a disconnect/restart before the + // next block is finalized. just wait briefly and try again + time.Sleep(200 * time.Millisecond) + case codes.DeadlineExceeded, codes.Internal: + // these are sometimes returned when the stream is disconnected by a middleware or the server + default: + // skip reconnect on all other errors + eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) + return + } + + if err := connect(lastReceivedHeight + 1); err != nil { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf( + "failed to resubscribe for events on height: %d, with: %w", + lastReceivedHeight+1, + err, + ), + ) + return + } } } }()