Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 42 additions & 44 deletions crates/tycho-client/src/deltas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ impl WsDeltasClient {
uri,
auth_key: auth_key.map(|s| s.to_string()),
inner: Arc::new(Mutex::new(None)),
ws_buffer_size: 128,
subscription_buffer_size: 128,
ws_buffer_size: 256,
subscription_buffer_size: 256,
conn_notify: Arc::new(Notify::new()),
max_reconnects: 5,
retry_cooldown: Duration::from_millis(500),
Expand Down Expand Up @@ -487,29 +487,29 @@ impl WsDeltasClient {
/// This method acquires the lock for inner for a short period, then waits until the
/// connection is established if not already connected.
async fn ensure_connection(&self) -> Result<(), DeltasError> {
if self.dead.load(Ordering::SeqCst) {
return Err(DeltasError::NotConnected);
}
if self.is_connected().await {
return Ok(());
}
// Enable the future BEFORE re-checking is_connected to close the race window where
// the reconnect task calls notify_waiters() between is_connected() returning false and
// notified().await — without enable(), that notification would be lost and this call
// would block until the next reconnect.
let notified = self.conn_notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if !self.is_connected().await {
notified.await;
}
if self.dead.load(Ordering::SeqCst) {
return Err(DeltasError::NotConnected);
}
if !self.is_connected().await {
return Err(DeltasError::NotConnected);
// Loop until either permanently dead or successfully connected. A single wait-and-check
// is not enough: the WS can reconnect briefly then drop again (e.g. server restart),
// which would fire conn_notify but leave is_connected() false. Looping retries
// automatically on that transient race.
loop {
if self.dead.load(Ordering::SeqCst) {
return Err(DeltasError::NotConnected);
}
if self.is_connected().await {
return Ok(());
}
// Enable the future BEFORE re-checking is_connected to close the race window where
// the reconnect task calls notify_waiters() between is_connected() returning false
// and notified().await — without enable(), that notification would be lost.
let notified = self.conn_notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if !self.is_connected().await {
notified.await;
}
// Loop back: recheck dead and is_connected. If the WS dropped again between the
// notification and here, we wait for the next reconnect rather than failing.
}
Ok(())
}

/// Main message handling logic
Expand Down Expand Up @@ -1030,15 +1030,20 @@ impl DeltasClient for WsDeltasClient {
#[instrument(skip(self))]
async fn close(&self) -> Result<(), DeltasError> {
info!("Closing TychoWebsocketClient");
let mut guard = self.inner.lock().await;
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
inner
.cmd_tx
.send(())
.await
.map_err(|e| DeltasError::TransportError(e.to_string()))?;
{
let mut guard = self.inner.lock().await;
if let Some(inner) = guard.as_mut() {
inner
.cmd_tx
.send(())
.await
.map_err(|e| DeltasError::TransportError(e.to_string()))?;
}
}
// Mark dead and notify so any ensure_connection() callers blocked on conn_notify
// unblock immediately and return NotConnected rather than hanging forever.
self.dead.store(true, Ordering::SeqCst);
self.conn_notify.notify_waiters();
Ok(())
}
}
Expand Down Expand Up @@ -1122,17 +1127,10 @@ mod tests {
}

fn subscribe_with_compression(compression: bool) -> String {
serde_json::json!({
"method": "subscribe",
"extractor_id": {
"chain": "ethereum",
"name": "vm:ambient"
},
"include_state": true,
"compression": compression,
"partial_blocks": false
})
.to_string()
// Field order matches Command::Subscribe's serde tag + struct field declaration order.
format!(
r#"{{"method":"subscribe","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"include_state":true,"compression":{compression},"partial_blocks":false}}"#
)
}

fn subscription_confirmation() -> String {
Expand Down
Loading