Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions sdk/couchbase-core/src/memdx/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,10 @@ impl Dispatcher for Client {
let op_code = packet.op_code;

// Create a guard that will remove the opaque entry from the map if the future is
// dropped before we successfully construct a ClientPendingOp (which takes over
// cleanup responsibility via its own Drop impl).
// dropped before we successfully buffer the packet into FramedWrite. Once the
// packet is buffered (after `feed` succeeds), it WILL be flushed to the server
// on a subsequent write, so we must NOT remove the opaque entry — the server
// will respond and the read loop needs the handler to be present.
let mut opaque_guard = DispatchOpaqueGuard::new(opaque, self.opaque_map.clone());

trace!(
Expand All @@ -401,28 +403,63 @@ impl Dispatcher for Client {
);

let mut writer = self.writer.lock().await;
match writer.send(packet).await {

// We split the write into feed + flush rather than using send() because
// send() is NOT cancellation-safe. send() internally does
// poll_ready -> start_send -> poll_flush. If the future is dropped after
// start_send (the packet is buffered) but before poll_flush completes,
// the packet bytes remain in the FramedWrite buffer and WILL be flushed
// on the next write. If we had removed the opaque entry (via the guard),
// the server's response would become an orphan.
//
// By splitting into feed + flush:
// - If cancelled during feed (before the packet is buffered): the guard
// fires and correctly removes the opaque entry. No data was buffered.
// - If cancelled during flush (packet already buffered): the guard is
// already disarmed, so the opaque entry stays. The packet will be
// flushed by the next writer and the read loop will handle the response.

// feed() calls poll_ready (which may flush previous data) and then
// start_send (which encodes our packet into the write buffer).
match writer.feed(packet).await {
Ok(_) => {
// Disarm the guard — the ClientPendingOp now owns cleanup responsibility.
// The packet is now in the FramedWrite buffer. Even if we are
// cancelled from here on, the data will be flushed by the next
// send/flush call. Disarm the guard so the opaque stays registered.
opaque_guard.disarm();
Ok(ClientPendingOp::new(
opaque,
self.opaque_map.clone(),
response_rx,
is_persistent,
))
}
Err(e) => {
debug!(
"{} failed to write packet {} {} {}",
"{} failed to buffer packet {} {} {}",
self.client_id, opaque, op_code, e
);

// opaque_guard will remove the entry from the opaque map when dropped.

Err(Error::new_dispatch_error(opaque, op_code, Box::new(e)))
return Err(Error::new_dispatch_error(opaque, op_code, Box::new(e)));
}
}

// Flush the buffered data to the underlying socket. If we are cancelled
// here, the packet is already buffered and will be flushed by the next
// writer. The opaque entry remains in the map so the response is handled.
if let Err(e) = writer.flush().await {
debug!(
"{} failed to flush packet {} {} {}",
self.client_id, opaque, op_code, e
);
// The packet is buffered but the flush failed. The opaque is still
// registered (guard was disarmed). Return an error but do NOT remove
// the opaque — the data may have been partially written and a
// response may still arrive. The read loop or connection close will
// eventually clean it up.
return Err(Error::new_dispatch_error(opaque, op_code, Box::new(e)));
}

Ok(ClientPendingOp::new(
opaque,
self.opaque_map.clone(),
response_rx,
is_persistent,
))
}

async fn close(&self) -> error::Result<()> {
Expand Down