From 3d1b0c839d303d66a7a1549b75c5a1e50e2f3cf5 Mon Sep 17 00:00:00 2001 From: Charles Dixon Date: Thu, 5 Mar 2026 09:54:15 +0000 Subject: [PATCH] RSCBC-255: Draft --- sdk/couchbase-core/src/memdx/client.rs | 65 ++++++++++++++++++++------ 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/sdk/couchbase-core/src/memdx/client.rs b/sdk/couchbase-core/src/memdx/client.rs index 4ec2e6f6..d3226838 100644 --- a/sdk/couchbase-core/src/memdx/client.rs +++ b/sdk/couchbase-core/src/memdx/client.rs @@ -389,8 +389,10 @@ impl Dispatcher for Client { let op_code = packet.op_code; // Create a guard that will remove the opaque entry from the map if the future is - // dropped before we successfully construct a ClientPendingOp (which takes over - // cleanup responsibility via its own Drop impl). + // dropped before we successfully buffer the packet into FramedWrite. Once the + // packet is buffered (after `feed` succeeds), it WILL be flushed to the server + // on a subsequent write, so we must NOT remove the opaque entry — the server + // will respond and the read loop needs the handler to be present. let mut opaque_guard = DispatchOpaqueGuard::new(opaque, self.opaque_map.clone()); trace!( @@ -401,28 +403,63 @@ impl Dispatcher for Client { ); let mut writer = self.writer.lock().await; - match writer.send(packet).await { + + // We split the write into feed + flush rather than using send() because + // send() is NOT cancellation-safe. send() internally does + // poll_ready -> start_send -> poll_flush. If the future is dropped after + // start_send (the packet is buffered) but before poll_flush completes, + // the packet bytes remain in the FramedWrite buffer and WILL be flushed + // on the next write. If we had removed the opaque entry (via the guard), + // the server's response would become an orphan. + // + // By splitting into feed + flush: + // - If cancelled during feed (before the packet is buffered): the guard + // fires and correctly removes the opaque entry. No data was buffered. + // - If cancelled during flush (packet already buffered): the guard is + // already disarmed, so the opaque entry stays. The packet will be + // flushed by the next writer and the read loop will handle the response. + + // feed() calls poll_ready (which may flush previous data) and then + // start_send (which encodes our packet into the write buffer). + match writer.feed(packet).await { Ok(_) => { - // Disarm the guard — the ClientPendingOp now owns cleanup responsibility. + // The packet is now in the FramedWrite buffer. Even if we are + // cancelled from here on, the data will be flushed by the next + // send/flush call. Disarm the guard so the opaque stays registered. opaque_guard.disarm(); - Ok(ClientPendingOp::new( - opaque, - self.opaque_map.clone(), - response_rx, - is_persistent, - )) } Err(e) => { debug!( - "{} failed to write packet {} {} {}", + "{} failed to buffer packet {} {} {}", self.client_id, opaque, op_code, e ); - // opaque_guard will remove the entry from the opaque map when dropped. - - Err(Error::new_dispatch_error(opaque, op_code, Box::new(e))) + return Err(Error::new_dispatch_error(opaque, op_code, Box::new(e))); } } + + // Flush the buffered data to the underlying socket. If we are cancelled + // here, the packet is already buffered and will be flushed by the next + // writer. The opaque entry remains in the map so the response is handled. + if let Err(e) = writer.flush().await { + debug!( + "{} failed to flush packet {} {} {}", + self.client_id, opaque, op_code, e + ); + // The packet is buffered but the flush failed. The opaque is still + // registered (guard was disarmed). Return an error but do NOT remove + // the opaque — the data may have been partially written and a + // response may still arrive. The read loop or connection close will + // eventually clean it up. + return Err(Error::new_dispatch_error(opaque, op_code, Box::new(e))); + } + + Ok(ClientPendingOp::new( + opaque, + self.opaque_map.clone(), + response_rx, + is_persistent, + )) } async fn close(&self) -> error::Result<()> {