From 7dc6fdcdb0b1e63c86c6e360edb86c523ed6782d Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Mon, 8 Jun 2026 11:28:26 +0200 Subject: [PATCH 1/5] fix(tycho-client): double default WS buffer sizes to 256 Co-Authored-By: Claude Sonnet 4.6 --- crates/tycho-client/src/deltas.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/tycho-client/src/deltas.rs b/crates/tycho-client/src/deltas.rs index 771ec55a2b..f955f053b5 100644 --- a/crates/tycho-client/src/deltas.rs +++ b/crates/tycho-client/src/deltas.rs @@ -415,8 +415,8 @@ impl WsDeltasClient { uri, auth_key: auth_key.map(|s| s.to_string()), inner: Arc::new(Mutex::new(None)), - ws_buffer_size: 128, - subscription_buffer_size: 128, + ws_buffer_size: 256, + subscription_buffer_size: 256, conn_notify: Arc::new(Notify::new()), max_reconnects: 5, retry_cooldown: Duration::from_millis(500), From b1089bb73c870419b7a43ed772bba5ff514caae3 Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Mon, 8 Jun 2026 11:28:39 +0200 Subject: [PATCH 2/5] fix(tycho-client): loop in ensure_connection on transient WS reconnect A single wait-and-check was not enough: if the WS reconnected briefly then dropped again, conn_notify fired but is_connected() returned false. The old code returned Err(NotConnected) there, which propagated as ConnectionClosed to synchronizers and terminated the stream. Now loops until dead=true or stably connected. Co-Authored-By: Claude Sonnet 4.6 --- crates/tycho-client/src/deltas.rs | 44 +++++++++++++++---------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/crates/tycho-client/src/deltas.rs b/crates/tycho-client/src/deltas.rs index f955f053b5..6c636ec015 100644 --- a/crates/tycho-client/src/deltas.rs +++ b/crates/tycho-client/src/deltas.rs @@ -487,29 +487,29 @@ impl WsDeltasClient { /// This method acquires the lock for inner for a short period, then waits until the /// connection is established if not already connected. async fn ensure_connection(&self) -> Result<(), DeltasError> { - if self.dead.load(Ordering::SeqCst) { - return Err(DeltasError::NotConnected); - } - if self.is_connected().await { - return Ok(()); - } - // Enable the future BEFORE re-checking is_connected to close the race window where - // the reconnect task calls notify_waiters() between is_connected() returning false and - // notified().await — without enable(), that notification would be lost and this call - // would block until the next reconnect. - let notified = self.conn_notify.notified(); - tokio::pin!(notified); - notified.as_mut().enable(); - if !self.is_connected().await { - notified.await; - } - if self.dead.load(Ordering::SeqCst) { - return Err(DeltasError::NotConnected); - } - if !self.is_connected().await { - return Err(DeltasError::NotConnected); + // Loop until either permanently dead or successfully connected. A single wait-and-check + // is not enough: the WS can reconnect briefly then drop again (e.g. server restart), + // which would fire conn_notify but leave is_connected() false. Looping retries + // automatically on that transient race. + loop { + if self.dead.load(Ordering::SeqCst) { + return Err(DeltasError::NotConnected); + } + if self.is_connected().await { + return Ok(()); + } + // Enable the future BEFORE re-checking is_connected to close the race window where + // the reconnect task calls notify_waiters() between is_connected() returning false + // and notified().await — without enable(), that notification would be lost. + let notified = self.conn_notify.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + if !self.is_connected().await { + notified.await; + } + // Loop back: recheck dead and is_connected. If the WS dropped again between the + // notification and here, we wait for the next reconnect rather than failing. } - Ok(()) } /// Main message handling logic From 168b58ea05c0379039055eaf2a12fffdbffd2d4b Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Mon, 8 Jun 2026 11:28:53 +0200 Subject: [PATCH 3/5] fix(tycho-client): unblock ensure_connection callers on close close() failed with NotConnected when inner was None (WS between reconnects), leaving synchronizers hung in ensure_connection(). Now always sets dead=true and notifies conn_notify so blocked callers return immediately. Co-Authored-By: Claude Sonnet 4.6 --- crates/tycho-client/src/deltas.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/crates/tycho-client/src/deltas.rs b/crates/tycho-client/src/deltas.rs index 6c636ec015..3d4d48fa43 100644 --- a/crates/tycho-client/src/deltas.rs +++ b/crates/tycho-client/src/deltas.rs @@ -1030,15 +1030,20 @@ impl DeltasClient for WsDeltasClient { #[instrument(skip(self))] async fn close(&self) -> Result<(), DeltasError> { info!("Closing TychoWebsocketClient"); - let mut guard = self.inner.lock().await; - let inner = guard - .as_mut() - .ok_or_else(|| DeltasError::NotConnected)?; - inner - .cmd_tx - .send(()) - .await - .map_err(|e| DeltasError::TransportError(e.to_string()))?; + { + let mut guard = self.inner.lock().await; + if let Some(inner) = guard.as_mut() { + inner + .cmd_tx + .send(()) + .await + .map_err(|e| DeltasError::TransportError(e.to_string()))?; + } + } + // Mark dead and notify so any ensure_connection() callers blocked on conn_notify + // unblock immediately and return NotConnected rather than hanging forever. + self.dead.store(true, Ordering::SeqCst); + self.conn_notify.notify_waiters(); Ok(()) } } From 9b0cc763827903517ef8534590e5962d4bab1b43 Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Mon, 8 Jun 2026 11:28:59 +0200 Subject: [PATCH 4/5] test(tycho-client): fix subscribe test helper JSON key ordering subscribe_with_compression built a json!() literal with insertion-order keys, but Command::Subscribe serializes alphabetically. All 13 subscription tests failed with an assertion mismatch in the mock server. Now serializes Command::Subscribe directly. Co-Authored-By: Claude Sonnet 4.6 --- crates/tycho-client/src/deltas.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/tycho-client/src/deltas.rs b/crates/tycho-client/src/deltas.rs index 3d4d48fa43..3a59999a74 100644 --- a/crates/tycho-client/src/deltas.rs +++ b/crates/tycho-client/src/deltas.rs @@ -1127,17 +1127,13 @@ mod tests { } fn subscribe_with_compression(compression: bool) -> String { - serde_json::json!({ - "method": "subscribe", - "extractor_id": { - "chain": "ethereum", - "name": "vm:ambient" - }, - "include_state": true, - "compression": compression, - "partial_blocks": false + serde_json::to_string(&Command::Subscribe { + extractor_id: dto::ExtractorIdentity::new(dto::Chain::Ethereum, "vm:ambient"), + include_state: true, + compression, + partial_blocks: false, }) - .to_string() + .unwrap() } fn subscription_confirmation() -> String { From e8d973fdfe671d48b8f1b36212c1d0233765a0ae Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Tue, 9 Jun 2026 14:27:16 +0200 Subject: [PATCH 5/5] test(tycho-client): replace unwrap with format!() in subscribe helper Co-Authored-By: Claude Sonnet 4.6 --- crates/tycho-client/src/deltas.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/tycho-client/src/deltas.rs b/crates/tycho-client/src/deltas.rs index 3a59999a74..820c82b596 100644 --- a/crates/tycho-client/src/deltas.rs +++ b/crates/tycho-client/src/deltas.rs @@ -1127,13 +1127,10 @@ mod tests { } fn subscribe_with_compression(compression: bool) -> String { - serde_json::to_string(&Command::Subscribe { - extractor_id: dto::ExtractorIdentity::new(dto::Chain::Ethereum, "vm:ambient"), - include_state: true, - compression, - partial_blocks: false, - }) - .unwrap() + // Field order matches Command::Subscribe's serde tag + struct field declaration order. + format!( + r#"{{"method":"subscribe","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"include_state":true,"compression":{compression},"partial_blocks":false}}"# + ) } fn subscription_confirmation() -> String {