feat: add extractor supervisor with fault tolerance and exponential backoff#1026
feat: add extractor supervisor with fault tolerance and exponential backoff#1026zizou0x wants to merge 7 commits into
Conversation
There was a problem hiding this comment.
Claude Code Review
This repository is configured for manual code reviews. Comment @claude review to trigger a review and subscribe this PR to future pushes, or @claude review once for a one-time review.
Tip: disable this comment in your organization's Code Review settings.
kayibal
left a comment
There was a problem hiding this comment.
🤖 The three-struct split (ExtractorFactory / ExtractorSupervisor / ExtractorRunner) is cleaner than the old builder, but it's worth asking whether ExtractorRunner pulls its weight now. Before this PR, Runner owned both the stream loop and control message handling. After the refactor, control messages moved to the supervisor, and Runner's only remaining job is driving the select! loop over the stream. All of its fields (ws_subscriptions, pending_deltas_tx, stop_rx) are owned by the supervisor and injected per restart — Runner holds no state of its own.
Not necessarily a blocker, but worth a conversation: is the stream loop complex enough to justify a dedicated struct, or would a private run_stream() method on the supervisor give the same isolation with one fewer abstraction layer?
…ackoff Introduces ExtractorSupervisor, which wraps ExtractorFactory and manages the full lifecycle of an extractor: building, running, restarting on failure with exponential backoff, clearing WS subscriptions, and signalling PendingDeltas to reset its buffer between runs. Key design changes: - Replace ExtractorBuilder (one-shot builder pattern) with ExtractorFactory, designed for repeated use across restarts. The factory is async at construction: one RPC call for ChainState, one populate() for ProtocolMemoryCache — both reused across all restarts via clone. - ExtractorRunner is now decoupled from control-message handling. It receives a oneshot stop signal and shared ws_subscriptions/pending_deltas_tx directly. Subscribe and stop are handled by the supervisor. - ExtractorSupervisor creates its own control channel internally and exposes handle() to give callers an ExtractorHandle. max_restarts is moved to ExtractorConfig as Option<u32> (None = restart forever, default). - ExtractorConfig, ProtocolTypeConfig, and DCIType move to supervisor.rs alongside ExtractorFactory. Internal fields are private; only initialized_accounts, initialized_accounts_block, and dci_plugin remain pub. - PendingDeltas.run() now accepts pre-built receivers and a reset channel rather than subscribing dynamically through extractors. On restart the supervisor sends the extractor name on the reset channel so PendingDeltas can clear its buffer. - CachedGateway gains new_instance() for creating fresh per-restart gateway instances with independent LRU cache and open_tx. - Chain::block_time() added to tycho-common for per-chain block time estimation. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Replace two unwrap() calls with ? so that S3 body read failures and file write failures surface as errors rather than panics in production. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
new() conventionally signals a cheap, synchronous, infallible constructor. This one makes an RPC call and populates a DB-backed cache, so create() better reflects what the caller should expect. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…extractor channel
Introduce DeltaCommand { Block(ExtractorMsg), ExtractorRestarted(String) }
sent over the same channel as block messages. PendingDeltas now handles
buffer resets in-band, which guarantees ExtractorRestarted is always
processed after the last block the runner emitted before stopping.
Removes the separate reset_tx: Sender<String> channel and its consumers
in ServicesBuilder and PendingDeltas::run.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ry.rs supervisor.rs pulled in aws_sdk_s3, prost, all DCI types, and the full Substreams construction stack purely because the factory lived there. Moving ExtractorFactory, ExtractorConfig, ProtocolTypeConfig, DCIType, ensure_spkg, and download_file_from_s3 into factory.rs keeps each file focused on one thing. supervisor.rs is now ~200 lines covering only restart lifecycle. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…loses When the supervisor drops WS subscriptions on extractor restart, the stream in WsActor silently ended and the default StreamHandler::finished() called ctx.stop(), closing the entire WebSocket connection for all subscriptions — including those to unaffected extractors. Add a private ExtractorEvent enum (Message | ChannelClosed) yielded by each subscription stream. The ChannelClosed variant triggers SubscriptionEnded to the client and cleans up that subscription's state. finished() closes the connection only if no subscriptions remain, preserving the connection when other extractors are still active. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
afc0fb0 to
a076ecf
Compare
| // Exponential backoff: 120s, 240s, 480s, 960s, 1920s, 3840s, 7680s, 14400s | ||
| // (capped at 4 hours). | ||
| let exp = restart_count.min(7); // 120 * 2^7 = 14400s = 4 hours, cap here to avoid overflow. | ||
| let backoff = std::time::Duration::from_secs(120 * 2u64.pow(exp)); |
There was a problem hiding this comment.
🤖 Backoff starts at 120s — first retry is 2 minutes
The first restart delay is 120 * 2^0 = 120s regardless of failure cause. A momentary network blip or DB connection drop will keep the extractor dark for 2 minutes before the first retry.
Typical supervisory backoffs start at 1–10s (with optional jitter) and cap at minutes/hours — not start there. Consider 5s, 10s, 20s, 40s, ..., capped at 3600s. The 4-hour cap is reasonable; it's the floor that's too high.
There was a problem hiding this comment.
I agree we should immediately restart on the first error. So I'd change this to be 1s or so initially.
We already depend on tokio-retry in this crate, we could reuse it instead of implementing our own exponential backoff:
| } | ||
| } | ||
|
|
||
| // TODO: rename variable here instead |
There was a problem hiding this comment.
🤖 Stale TODO + wrong variable name — clean up now
arced_message is not an Arc — it's just the message moved into a new binding. This rename has been deferred for a while and the surrounding function was significantly refactored in this PR, making it a natural moment to finish the job.
There was a problem hiding this comment.
It's actually an Arc here but indeed we should probably just do the renaming now.
| .await?; | ||
|
|
||
| // Create dedicated PendingDeltas channel for this extractor | ||
| let (pd_tx, pd_rx) = tokio::sync::mpsc::channel(256); |
There was a problem hiding this comment.
🤖 Magic number for PendingDeltas channel buffer
256 is a tunable that affects backpressure and memory usage under load. Embedding it as a literal makes it easy to overlook and inconsistent with the 128 used for the supervisor control channel. A named constant makes the intent visible and the value easy to adjust.
kayibal
left a comment
There was a problem hiding this comment.
I think removing websocket knowledge out of the supervisor should still happen before we merge this.
| ws_subscriptions: Arc<Mutex<SubscriptionsMap>>, | ||
| pending_deltas_tx: Option<Sender<DeltaCommand>>, |
There was a problem hiding this comment.
Wait so PendingDeltas, used to operate with the standard subscription mechnism and now we use something separate?
| stop_rx: oneshot::Receiver<()>, | ||
| /// Handle of the tokio runtime on which the extraction tasks will be run. | ||
| /// If 'None' the default runtime will be used. | ||
| /// If `None` the default runtime will be used. |
There was a problem hiding this comment.
Would be good to mention here in the comment that this refers to the tokio runtime.
| ws_subscriptions: Arc<Mutex<SubscriptionsMap>>, | ||
| /// Dedicated channel for PendingDeltasBuffer — survives restarts. | ||
| pending_deltas_tx: Option<Sender<DeltaCommand>>, | ||
| /// Oneshot stop signal from the supervisor. |
There was a problem hiding this comment.
I don't like that these are split now. I also dislike that to correctly model the old behaviour the pending_deltas_tx has to be optional. I think ideally DeltaCommand should be moved into SubscriptionMap. Then any other component that requires it independently if internal or external can subscribe to it via subscriptions.
| // Clear WS subscriptions — clients must reconnect after a restart. | ||
| // TODO: can we keep the ws connections alive and handle this on the client side? | ||
| { | ||
| let mut subs = self.ws_subscriptions.lock().await; |
There was a problem hiding this comment.
I really dislike that the Supervisor and the ExtractorRunner now "know" about websocket subscriptions and treat them differently than PendingDelta subscriptions, previously the only concept was to have subscribers. In my opinion this can be kept now that we introduced DeltaCommand type.
Atm you are forcefully clearing the websocket subscriptions, making the normal subscription service potentially unusable for any other internal future use case. I think the connection restart should be decided at the websocket layer not here. So services::ws should instead receive a DeltaCommand::ExtractoRestarted and act upon it as it thinks is best.
What
Adds
ExtractorSupervisorandExtractorFactoryto handle extractor restartswith exponential backoff, replacing the one-shot
ExtractorBuilderpattern.Why
Any extractor failure would cause the whole indexer process to panic. This
removes that coupling — each extractor is now isolated and restarts
independently on failure without affecting others or the RPC server.
Changes
New
ExtractorSupervisor(extractor/supervisor.rs)PendingDeltasto reset its buffer, wait with exponential backoff, rebuild.handle()so callers get anExtractorHandlewithout managing thecontrol channel directly.
New
ExtractorFactory(same file)ChainState, onepopulate()forProtocolMemoryCache. Both are reused across restarts (cache via cheap Arcclone; chain state is
Copy).build_runner()creates a freshProtocolExtractor, DCI plugin, andSubstreams stream per restart, with a fresh
CachedGatewayinstance forwrite isolation.
ExtractorRunnersimplifiedoneshotstop signal andshared
ws_subscriptions/pending_deltas_txdirectly.Config types moved
ExtractorConfig,ProtocolTypeConfig,DCITypemoved fromrunner.rsto
supervisor.rs. Internal fields are now private; onlyinitialized_accounts,initialized_accounts_block, anddci_pluginremain
pub.max_restarts: Option<u32>added toExtractorConfig(YAML-configurable,default
None= restart forever).PendingDeltasupdatedrun()now accepts pre-built receivers and a reset channel instead ofsubscribing through extractor handles dynamically. On restart the supervisor
sends the extractor name on the reset channel to clear its buffer.
Supporting changes
CachedGateway::new_instance()— fresh gateway with independent LRU cacheand
open_tx, sharing the same write channel and connection pool.Chain::block_time()— per-chain block time in seconds.ExtractionError::variant_name()— static label for Prometheus counters.