Skip to content

Commit 96465c3

Browse files
committed
RSCBC-32: Reduce allocations on KV hot paths
Changes: - Get result values use Bytes instead of Vec<u8>, eliminating a full document-body memcpy on every get/get_and_lock/get_and_touch/get_meta by propagating zero-copy Bytes from the codec through to the SDK layer - RetryRequest.retry_reasons is now Option<HashSet>, only allocating on the first retry attempt instead of every operation - Compressor.compress returns Cow<[u8]> instead of borrowing from self, removing heap state from StdCompressor entirely - MutationToken bucket_name uses Arc<str> instead of String, replacing a per-mutation String::clone with an atomic ref-count bump - Add unit tests for compression manager
1 parent cf5ae22 commit 96465c3

File tree

9 files changed

+200
-70
lines changed

9 files changed

+200
-70
lines changed

sdk/couchbase-core/src/compressionmanager.rs

Lines changed: 134 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,24 @@
1616
*
1717
*/
1818

19+
use snap::raw::Encoder;
20+
use std::borrow::Cow;
1921
use std::fmt::Debug;
2022
use std::marker::PhantomData;
2123

22-
use snap::raw::Encoder;
23-
2424
use crate::error;
2525
use crate::error::ErrorKind;
2626
use crate::memdx::datatype::DataTypeFlag;
2727
use crate::options::agent::{CompressionConfig, CompressionMode};
2828

2929
pub(crate) trait Compressor: Send + Sync + Debug {
3030
fn new(compression_config: &CompressionConfig) -> Self;
31-
// This is a bit of a weird signature, but it allows us to avoid allocations when no compression occurs.
3231
fn compress<'a>(
33-
&'a mut self,
32+
&mut self,
3433
connection_supports_snappy: bool,
3534
datatype: DataTypeFlag,
3635
input: &'a [u8],
37-
) -> error::Result<(&'a [u8], u8)>;
36+
) -> error::Result<(Cow<'a, [u8]>, u8)>;
3837
}
3938

4039
#[derive(Debug)]
@@ -64,12 +63,8 @@ pub(crate) struct StdCompressor {
6463
compression_enabled: bool,
6564
compression_min_size: usize,
6665
compression_min_ratio: f64,
67-
68-
compressed_value: Vec<u8>,
6966
}
7067

71-
impl StdCompressor {}
72-
7368
impl Compressor for StdCompressor {
7469
fn new(compression_config: &CompressionConfig) -> Self {
7570
let (compression_enabled, compression_min_size, compression_min_ratio) =
@@ -85,33 +80,31 @@ impl Compressor for StdCompressor {
8580
compression_enabled,
8681
compression_min_size,
8782
compression_min_ratio,
88-
89-
compressed_value: Vec::new(),
9083
}
9184
}
9285

9386
fn compress<'a>(
94-
&'a mut self,
87+
&mut self,
9588
connection_supports_snappy: bool,
9689
datatype: DataTypeFlag,
9790
input: &'a [u8],
98-
) -> error::Result<(&'a [u8], u8)> {
91+
) -> error::Result<(Cow<'a, [u8]>, u8)> {
9992
if !connection_supports_snappy || !self.compression_enabled {
100-
return Ok((input, u8::from(datatype)));
93+
return Ok((Cow::Borrowed(input), u8::from(datatype)));
10194
}
10295

10396
let datatype = u8::from(datatype);
10497

10598
// If the packet is already compressed then we don't want to compress it again.
10699
if datatype & u8::from(DataTypeFlag::Compressed) != 0 {
107-
return Ok((input, datatype));
100+
return Ok((Cow::Borrowed(input), datatype));
108101
}
109102

110103
let packet_size = input.len();
111104

112105
// Only compress values that are large enough to be worthwhile.
113106
if packet_size <= self.compression_min_size {
114-
return Ok((input, datatype));
107+
return Ok((Cow::Borrowed(input), datatype));
115108
}
116109

117110
let mut encoder = Encoder::new();
@@ -121,14 +114,135 @@ impl Compressor for StdCompressor {
121114

122115
// Only return the compressed value if the ratio of compressed:original is small enough.
123116
if compressed_value.len() as f64 / packet_size as f64 > self.compression_min_ratio {
124-
return Ok((input, datatype));
117+
return Ok((Cow::Borrowed(input), datatype));
125118
}
126119

127-
self.compressed_value = compressed_value;
128-
129120
Ok((
130-
&self.compressed_value,
121+
Cow::Owned(compressed_value),
131122
datatype | u8::from(DataTypeFlag::Compressed),
132123
))
133124
}
134125
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::*;
130+
use std::borrow::Cow;
131+
132+
fn enabled_config(min_size: usize, min_ratio: f64) -> CompressionConfig {
133+
CompressionConfig::new(CompressionMode::Enabled {
134+
min_size,
135+
min_ratio,
136+
})
137+
}
138+
139+
fn disabled_config() -> CompressionConfig {
140+
CompressionConfig::new(CompressionMode::Disabled)
141+
}
142+
143+
#[test]
144+
fn disabled_compression_returns_input_unchanged() {
145+
let mut compressor = StdCompressor::new(&disabled_config());
146+
let input = b"hello world";
147+
148+
let (output, dt) = compressor
149+
.compress(true, DataTypeFlag::Json, input)
150+
.unwrap();
151+
152+
assert!(matches!(output, Cow::Borrowed(_)));
153+
assert_eq!(&*output, input.as_slice());
154+
assert_eq!(dt, u8::from(DataTypeFlag::Json));
155+
}
156+
157+
#[test]
158+
fn connection_without_snappy_returns_input_unchanged() {
159+
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
160+
let input = b"hello world";
161+
162+
let (output, dt) = compressor
163+
.compress(false, DataTypeFlag::Json, input)
164+
.unwrap();
165+
166+
assert!(matches!(output, Cow::Borrowed(_)));
167+
assert_eq!(&*output, input.as_slice());
168+
}
169+
170+
#[test]
171+
fn already_compressed_returns_input_unchanged() {
172+
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
173+
let input = b"already compressed data";
174+
175+
let (output, dt) = compressor
176+
.compress(true, DataTypeFlag::Compressed, input)
177+
.unwrap();
178+
179+
assert!(matches!(output, Cow::Borrowed(_)));
180+
assert_eq!(&*output, input.as_slice());
181+
assert_eq!(dt, u8::from(DataTypeFlag::Compressed));
182+
}
183+
184+
#[test]
185+
fn input_below_min_size_returns_input_unchanged() {
186+
let mut compressor = StdCompressor::new(&enabled_config(1024, 1.0));
187+
let input = b"small";
188+
189+
let (output, dt) = compressor
190+
.compress(true, DataTypeFlag::Json, input)
191+
.unwrap();
192+
193+
assert!(matches!(output, Cow::Borrowed(_)));
194+
assert_eq!(&*output, input.as_slice());
195+
assert_eq!(dt, u8::from(DataTypeFlag::Json));
196+
}
197+
198+
#[test]
199+
fn compressible_input_returns_owned_with_compressed_flag() {
200+
let mut compressor = StdCompressor::new(&enabled_config(0, 1.0));
201+
// Highly compressible: repeated bytes.
202+
let input = vec![b'a'; 4096];
203+
204+
let (output, dt) = compressor
205+
.compress(true, DataTypeFlag::Json, &input)
206+
.unwrap();
207+
208+
assert!(matches!(output, Cow::Owned(_)));
209+
assert!(output.len() < input.len());
210+
assert_eq!(
211+
dt,
212+
u8::from(DataTypeFlag::Json) | u8::from(DataTypeFlag::Compressed)
213+
);
214+
215+
// Verify it round-trips through snappy.
216+
let decompressed = snap::raw::Decoder::new().decompress_vec(&output).unwrap();
217+
assert_eq!(decompressed, input);
218+
}
219+
220+
#[test]
221+
fn poor_ratio_returns_input_unchanged() {
222+
// Set a very aggressive ratio that compressed output can't beat.
223+
let mut compressor = StdCompressor::new(&enabled_config(0, 0.01));
224+
let input = vec![b'a'; 256];
225+
226+
let (output, dt) = compressor
227+
.compress(true, DataTypeFlag::Json, &input)
228+
.unwrap();
229+
230+
assert!(matches!(output, Cow::Borrowed(_)));
231+
assert_eq!(&*output, input.as_slice());
232+
assert_eq!(dt, u8::from(DataTypeFlag::Json));
233+
}
234+
235+
#[test]
236+
fn compression_manager_creates_compressor() {
237+
let manager = CompressionManager::<StdCompressor>::new(enabled_config(0, 1.0));
238+
let mut compressor = manager.compressor();
239+
let input = vec![b'x'; 4096];
240+
241+
let (output, _dt) = compressor
242+
.compress(true, DataTypeFlag::None, &input)
243+
.unwrap();
244+
245+
assert!(matches!(output, Cow::Owned(_)));
246+
assert!(output.len() < input.len());
247+
}
248+
}

sdk/couchbase-core/src/crudcomponent.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl<
130130
key: opts.key,
131131
vbucket_id,
132132
flags: opts.flags,
133-
value,
133+
value: &value,
134134
datatype,
135135
expiry: opts.expiry,
136136
preserve_expiry: opts.preserve_expiry,
@@ -195,7 +195,7 @@ impl<
195195
Error::new_contextual_memdx_error(e)
196196
})
197197
.map_ok(|resp| GetResult {
198-
value: resp.value.to_vec(),
198+
value: resp.value,
199199
datatype: resp.datatype,
200200
cas: resp.cas,
201201
flags: resp.flags,
@@ -233,7 +233,7 @@ impl<
233233
Error::new_contextual_memdx_error(e)
234234
})
235235
.map_ok(|resp| GetMetaResult {
236-
value: resp.value.to_vec(),
236+
value: resp.value,
237237
datatype: resp.datatype,
238238
server_duration: resp.server_duration,
239239
expiry: resp.expiry,
@@ -323,7 +323,7 @@ impl<
323323
Error::new_contextual_memdx_error(e)
324324
})
325325
.map_ok(|resp| GetAndLockResult {
326-
value: resp.value.to_vec(),
326+
value: resp.value,
327327
datatype: resp.datatype,
328328
cas: resp.cas,
329329
flags: resp.flags,
@@ -362,7 +362,7 @@ impl<
362362
Error::new_contextual_memdx_error(e)
363363
})
364364
.map_ok(|resp| GetAndTouchResult {
365-
value: resp.value.to_vec(),
365+
value: resp.value,
366366
datatype: resp.datatype,
367367
cas: resp.cas,
368368
flags: resp.flags,
@@ -469,7 +469,7 @@ impl<
469469
key: opts.key,
470470
vbucket_id,
471471
flags: opts.flags,
472-
value,
472+
value: &value,
473473
datatype,
474474
expiry: opts.expiry,
475475
on_behalf_of: None,
@@ -531,7 +531,7 @@ impl<
531531
key: opts.key,
532532
vbucket_id,
533533
flags: opts.flags,
534-
value,
534+
value: &value,
535535
datatype,
536536
expiry: opts.expiry,
537537
preserve_expiry: opts.preserve_expiry,
@@ -594,7 +594,7 @@ impl<
594594
collection_id,
595595
key: opts.key,
596596
vbucket_id,
597-
value,
597+
value: &value,
598598
datatype,
599599
cas: opts.cas,
600600
on_behalf_of: None,
@@ -655,7 +655,7 @@ impl<
655655
collection_id,
656656
key: opts.key,
657657
vbucket_id,
658-
value,
658+
value: &value,
659659
datatype,
660660
cas: opts.cas,
661661
on_behalf_of: None,

sdk/couchbase-core/src/results/kv.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
*
1717
*/
1818

19+
use bytes::Bytes;
20+
1921
use crate::error;
2022
use crate::mutationtoken::MutationToken;
2123
use std::time::Duration;
2224

2325
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
2426
pub struct GetResult {
25-
pub value: Vec<u8>,
27+
pub value: Bytes,
2628
pub flags: u32,
2729
pub datatype: u8,
2830
pub cas: u64,
@@ -32,7 +34,7 @@ pub struct GetResult {
3234
pub struct GetMetaResult {
3335
pub cas: u64,
3436
pub flags: u32,
35-
pub value: Vec<u8>,
37+
pub value: Bytes,
3638
pub datatype: u8,
3739
pub server_duration: Option<Duration>,
3840
pub expiry: u32,
@@ -54,15 +56,15 @@ pub struct DeleteResult {
5456

5557
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
5658
pub struct GetAndLockResult {
57-
pub value: Vec<u8>,
59+
pub value: Bytes,
5860
pub flags: u32,
5961
pub datatype: u8,
6062
pub cas: u64,
6163
}
6264

6365
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
6466
pub struct GetAndTouchResult {
65-
pub value: Vec<u8>,
67+
pub value: Bytes,
6668
pub flags: u32,
6769
pub datatype: u8,
6870
pub cas: u64,

0 commit comments

Comments
 (0)