- 
                Notifications
    You must be signed in to change notification settings 
- Fork 12
          Add reconnect logic to RPCEventSubscriber
          #856
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| WalkthroughRefactored RPCEventSubscriber.subscribe to add reconnect and retry logic, track last received block height, distinguish gRPC error types, and resume subscriptions from the correct block after recoverable failures. Changes
 Sequence Diagram(s)sequenceDiagram
    participant Subscriber
    participant EventService
    participant EventsChan
    Subscriber->>EventService: connect(startHeight)
    alt connect success
        loop receive events
            EventService-->>Subscriber: blockEvent(height)
            Subscriber->>Subscriber: lastReceivedHeight = height
            Subscriber->>EventsChan: send event (buffered)
        end
    else connect fails
        Subscriber->>EventsChan: send error & close
    end
    alt errChan/error received
        Subscriber->>Subscriber: inspect gRPC status
        alt NotFound
            Subscriber->>Subscriber: sleep 200ms, retry connect(lastReceivedHeight+1)
        else DeadlineExceeded/Internal
            Subscriber->>Subscriber: retry connect(lastReceivedHeight+1) immediately
        else Other
            Subscriber->>EventsChan: send disconnection error & terminate
        end
    end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~15 minutes Assessment against linked issues
 
 ✨ Finishing Touches
 🧪 Generate unit tests
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type  Other keywords and placeholders
 CodeRabbit Configuration File ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
services/ingestion/event_subscriber.go (2)
223-223: Consider making the retry delay configurable.The 200ms sleep duration is hardcoded. Different environments might benefit from different retry delays.
Consider adding a configurable retry delay to the
RPCEventSubscriberstruct:type RPCEventSubscriber struct { logger zerolog.Logger client *requester.CrossSporkClient chain flowGo.ChainID keyLock keystore.KeyLock height uint64 recovery bool recoveredEvents []flow.Event + retryDelay time.Duration }Then use it in the retry logic:
- time.Sleep(200 * time.Millisecond) + time.Sleep(r.retryDelay)
232-241: Consider adding metrics and logging for reconnection attempts.To improve observability in production, consider adding logging and metrics for reconnection attempts.
Add logging before reconnection:
+ r.logger.Info(). + Uint64("last_height", lastReceivedHeight). + Str("error_code", status.Code(err).String()). + Msg("attempting to reconnect after error") + if err := connect(lastReceivedHeight + 1); err != nil {Consider also tracking metrics for:
- Total reconnection attempts
- Successful vs failed reconnections
- Time between disconnection and successful reconnection
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- services/ingestion/event_subscriber.go(5 hunks)
🧰 Additional context used
🧠 Learnings (1)
services/ingestion/event_subscriber.go (4)
Learnt from: peterargue
PR: #772
File: services/requester/keystore/key_store.go:50-62
Timestamp: 2025-03-07T01:35:09.751Z
Learning: In the flow-evm-gateway codebase, panics are acceptable in scenarios where immediate detection of critical bugs is desired during development and testing, particularly for invariant violations that should never occur in a correctly functioning system (e.g., when a key is available but locked in the keystore implementation).
Learnt from: peterargue
PR: #617
File: api/stream.go:62-67
Timestamp: 2024-10-18T19:26:37.579Z
Learning: In the flow-evm-gateway project, within the Go file api/stream.go, the prepareBlockResponse method includes the Bloom filter as the field LogsBloom in the returned Block struct.
Learnt from: peterargue
PR: #682
File: services/ingestion/event_subscriber.go:85-90
Timestamp: 2025-01-24T20:15:10.908Z
Learning: When calculating blocks to catch up in Flow, initialize the counter to 0 and only set it when the latest chain height is greater than the current height (latestOnChainHeight > currentHeight). There's no need to handle the else case as 0 blocks to catch up is the correct default when we're at the same height or ahead.
Learnt from: peterargue
PR: #615
File: bootstrap/bootstrap.go:167-197
Timestamp: 2024-10-17T18:04:04.165Z
Learning: In the flow-evm-gateway Go project, the validation ensuring that startHeight is less than or equal to endHeight is performed before the StartTraceDownloader method in bootstrap/bootstrap.go, so additional checks in this method are unnecessary.
🧬 Code Graph Analysis (1)
services/ingestion/event_subscriber.go (4)
models/events.go (2)
BlockEvents(208-211)
NewBlockEventsError(252-256)models/errors/errors.go (1)
ErrDisconnected(24-24)services/ingestion/event_subscriber_test.go (4)
Test_SubscribingWithRetryOnError(173-235)
Test_SubscribingWithRetryOnErrorMultipleBlocks(241-303)
Test_Subscribing(31-75)
Test_SubscribingWithRetryOnErrorEmptyBlocks(309-370)services/testutils/mock_client.go (1)
SubscribeEventsByBlockHeight(36-43)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test
🔇 Additional comments (2)
services/ingestion/event_subscriber.go (2)
174-181: Well-implemented error handling for channel closures.The code properly distinguishes between context cancellation and unexpected disconnections, providing clear error messages to consumers. Good use of the predefined
ErrDisconnectederror.Also applies to: 209-217
138-151: Clean encapsulation of subscription logic.The
connectfunction provides good separation of concerns and enables easy reconnection with different starting heights. The comment about the heartbeat interval is helpful for understanding the configuration choice.
3281bb5    to
    dfda5c3      
    Compare
  
    | eventsChan <- models.NewBlockEventsError( | ||
| fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err), | ||
| ) | ||
| close(eventsChan) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be closed in a defer after instantiation? looks like this is the only code path that closes it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can't use a defer after instantiation. Because the channel is used inside a go-routine, which is created a few lines below. Like this:
go func() {
	defer func() {
		close(eventsChan)
	}()
	...
}So the channel closing is handled inside the go-routine, if it reaches that certain piece of code.
Your question got me thinking though, and I realized that for the two error-handling cases we have, we don't properly propagate the error.
I have addressed that issue in 301fac7 .
The issue was that we create the eventsChan, and there's two possible error cases, which will actually block, since we attempt to write to it, without anyone reading from it 😅 .
301fac7    to
    29b5098      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
services/ingestion/event_subscriber.go (2)
206-208: Consider updatinglastReceivedHeightafter successfully sending toeventsChan.The current placement updates
lastReceivedHeightbefore sending the event toeventsChan. If the send operation blocks indefinitely or the goroutine is interrupted, the height tracking could become inconsistent with what was actually delivered to consumers.Move the height update after the send operation:
- lastReceivedHeight = blockEvents.Height - eventsChan <- evmEvents + + lastReceivedHeight = blockEvents.Height
222-233: Add retry limits and exponential backoff for robustness.The current implementation could retry indefinitely without any backoff strategy, potentially overwhelming the server during extended outages. The immediate retry on
DeadlineExceededandInternalerrors (without any delay) could cause excessive load on the server.Consider implementing:
- A maximum retry count to prevent infinite reconnection attempts
- Exponential backoff with jitter to reduce server load during failures
- Circuit breaker pattern for prolonged outages
Example implementation:
type RPCEventSubscriber struct { logger zerolog.Logger client *requester.CrossSporkClient chain flowGo.ChainID keyLock keystore.KeyLock height uint64 recovery bool recoveredEvents []flow.Event + + // Retry configuration + maxRetries int + baseRetryDelay time.Duration }Then modify the retry logic:
+ retryCount := 0 + const maxRetries = 10 + baseDelay := 100 * time.Millisecond + case err, ok := <-errChan: if !ok { // typically we receive an error in the errChan before the channels are closed var err error err = errs.ErrDisconnected if ctx.Err() != nil { err = ctx.Err() } eventsChan <- models.NewBlockEventsError(err) return } + retryCount++ + if retryCount > maxRetries { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf("max reconnection attempts (%d) reached: %w", maxRetries, err), + ) + return + } + + // Calculate exponential backoff with jitter + delay := time.Duration(float64(baseDelay) * math.Pow(2, float64(retryCount-1))) + // Add jitter (±25%) + jitter := time.Duration(rand.Float64() * float64(delay) * 0.5) + if rand.Intn(2) == 0 { + delay = delay + jitter + } else { + delay = delay - jitter + } + // Cap maximum delay at 30 seconds + if delay > 30*time.Second { + delay = 30 * time.Second + } + switch status.Code(err) { case codes.NotFound: // we can get not found when reconnecting after a disconnect/restart before the // next block is finalized. just wait briefly and try again - time.Sleep(200 * time.Millisecond) + time.Sleep(delay) case codes.DeadlineExceeded, codes.Internal: // these are sometimes returned when the stream is disconnected by a middleware or the server + time.Sleep(delay) default: // skip reconnect on all other errors eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) return } if err := connect(lastReceivedHeight + 1); err != nil { eventsChan <- models.NewBlockEventsError( fmt.Errorf( "failed to resubscribe for events on height: %d, with: %w", lastReceivedHeight+1, err, ), ) return } + + // Reset retry count on successful reconnection + retryCount = 0
🧹 Nitpick comments (2)
services/ingestion/event_subscriber.go (2)
176-184: Consider logging before returning on channel closure.When the
blockEventsStreamchannel closes unexpectedly, it would be helpful to log this event for debugging purposes before sending the error.case blockEvents, ok := <-blockEventsStream: if !ok { + r.logger.Warn().Msg("block events stream closed unexpectedly") // typically we receive an error in the errChan before the channels are closed var err error err = errs.ErrDisconnected if ctx.Err() != nil { err = ctx.Err() } eventsChan <- models.NewBlockEventsError(err) return }
210-220: Consider logging error channel closure.Similar to the block events stream, logging when the error channel closes would aid in debugging connection issues.
case err, ok := <-errChan: if !ok { + r.logger.Warn().Msg("error channel closed unexpectedly") // typically we receive an error in the errChan before the channels are closed var err error err = errs.ErrDisconnected if ctx.Err() != nil { err = ctx.Err() } eventsChan <- models.NewBlockEventsError(err) return }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- services/ingestion/event_subscriber.go(5 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-03-07T01:35:09.751Z
Learnt from: peterargue
PR: onflow/flow-evm-gateway#772
File: services/requester/keystore/key_store.go:50-62
Timestamp: 2025-03-07T01:35:09.751Z
Learning: In the flow-evm-gateway codebase, panics are acceptable in scenarios where immediate detection of critical bugs is desired during development and testing, particularly for invariant violations that should never occur in a correctly functioning system (e.g., when a key is available but locked in the keystore implementation).
Applied to files:
- services/ingestion/event_subscriber.go
🧬 Code Graph Analysis (1)
services/ingestion/event_subscriber.go (5)
models/events.go (2)
BlockEvents(208-211)
NewBlockEventsError(252-256)models/errors/errors.go (1)
ErrDisconnected(24-24)services/ingestion/event_subscriber_test.go (4)
Test_SubscribingWithRetryOnError(173-235)
Test_SubscribingWithRetryOnErrorMultipleBlocks(241-303)
Test_SubscribingWithRetryOnErrorEmptyBlocks(309-370)
Test_Subscribing(31-75)services/testutils/mock_client.go (1)
SubscribeEventsByBlockHeight(36-43)api/stream.go (1)
s(55-71)
🔇 Additional comments (3)
services/ingestion/event_subscriber.go (3)
125-127: LGTM! Buffer size addition prevents blocking on error cases.The change from an unbuffered channel to a buffered channel with size 1 is a good fix to prevent blocking when sending errors in lines 132 and 158 before closing the channel.
140-154: LGTM! Well-structured reconnect function encapsulation.The
connectfunction properly encapsulates the subscription logic, making reconnection attempts cleaner and more maintainable. The consistent use of heartbeat interval 1 for minimal delay is appropriate.
235-244: Reconnection height usage verifiedI checked every occurrence of
lastReceivedHeightand confirmed:
- It’s initialized from the input
heighton startup.- The initial connection uses
connect(lastReceivedHeight).- After processing each block, you update
lastReceivedHeight = blockEvents.Height.- On error, you reconnect with
connect(lastReceivedHeight + 1)to avoid duplicates.No other uses were found, and the logic is consistent throughout.
Closes: #707
Description
This is the same reconnect logic that is currently done on the soft-finality branch. See https://github.com/onflow/flow-evm-gateway/blob/mpeter/poc-index-finalized-block-results/services/ingestion/block_tracking_subscriber.go#L138-L280 .
For contributor use:
masterbranchFiles changedin the Github PR explorerSummary by CodeRabbit