Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 56 additions & 12 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"errors"
"fmt"
"sort"
"time"

"github.com/onflow/cadence/common"
"github.com/onflow/flow-go/fvm/evm/events"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
Expand Down Expand Up @@ -119,26 +122,42 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)
// create the channel with a buffer size of 1,
// to avoid blocking on the two error cases below
eventsChan := make(chan models.BlockEvents, 1)

_, err := r.client.GetBlockHeaderByHeight(ctx, height)
if err != nil {
err = fmt.Errorf("failed to subscribe for events, the block height %d doesn't exist: %w", height, err)
eventsChan <- models.NewBlockEventsError(err)
close(eventsChan)
return eventsChan
}

// we always use heartbeat interval of 1 to have the least amount of delay from the access node
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
ctx,
height,
blocksFilter(r.chain),
access.WithHeartbeatInterval(1),
)
if err != nil {
var blockEventsStream <-chan flow.BlockEvents
var errChan <-chan error

lastReceivedHeight := height
connect := func(height uint64) error {
var err error

// we always use heartbeat interval of 1 to have the
// least amount of delay from the access node
blockEventsStream, errChan, err = r.client.SubscribeEventsByBlockHeight(
ctx,
height,
blocksFilter(r.chain),
access.WithHeartbeatInterval(1),
)

return err
}

if err := connect(lastReceivedHeight); err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err),
)
close(eventsChan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be closed in a defer after instantiation? looks like this is the only code path that closes it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can't use a defer after instantiation. Because the channel is used inside a go-routine, which is created a few lines below. Like this:

go func() {
	defer func() {
		close(eventsChan)
	}()
	...
}

So the channel closing is handled inside the go-routine, if it reaches that certain piece of code.

Your question got me thinking though, and I realized that for the two error-handling cases we have, we don't properly propagate the error.

I have addressed that issue in 301fac7 .

The issue was that we create the eventsChan, and there's two possible error cases, which will actually block, since we attempt to write to it, without anyone reading from it 😅 .

return eventsChan
}

Expand All @@ -153,8 +172,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
r.logger.Info().Msg("event ingestion received done signal")
return

case blockEvents, ok := <-eventStream:
case blockEvents, ok := <-blockEventsStream:
if !ok {
// typically we receive an error in the errChan before the channels are closed
var err error
err = errs.ErrDisconnected
if ctx.Err() != nil {
Expand Down Expand Up @@ -183,10 +203,13 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
},
)

lastReceivedHeight = blockEvents.Height

eventsChan <- evmEvents

case err, ok := <-errChan:
if !ok {
// typically we receive an error in the errChan before the channels are closed
var err error
err = errs.ErrDisconnected
if ctx.Err() != nil {
Expand All @@ -196,8 +219,29 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
return
}

eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
return
switch status.Code(err) {
case codes.NotFound:
// we can get not found when reconnecting after a disconnect/restart before the
// next block is finalized. just wait briefly and try again
time.Sleep(200 * time.Millisecond)
case codes.DeadlineExceeded, codes.Internal:
// these are sometimes returned when the stream is disconnected by a middleware or the server
default:
// skip reconnect on all other errors
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
return
}

if err := connect(lastReceivedHeight + 1); err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf(
"failed to resubscribe for events on height: %d, with: %w",
lastReceivedHeight+1,
err,
),
)
return
}
}
}
}()
Expand Down