Skip to content

Commit c2eb5d3

Browse files
committed
perf(query): improve SELECTs performance
Also, add `RowCursor::received_bytes()` and `RowCursor::decoded_bytes()`
1 parent 7b599ef commit c2eb5d3

17 files changed

+411
-454
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ url = "2.1.1"
9393
futures = "0.3.5"
9494
futures-channel = "0.3.30"
9595
static_assertions = "1.1"
96-
sealed = "0.5"
96+
sealed = "0.6"
9797
sha-1 = { version = "0.10", optional = true }
9898
serde_json = { version = "1.0.68", optional = true }
9999
lz4_flex = { version = "0.11.3", default-features = false, features = [

benches/select_numbers.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ struct Data {
99

1010
async fn bench(name: &str, compression: Compression) {
1111
let start = std::time::Instant::now();
12-
let sum = tokio::spawn(do_bench(compression)).await.unwrap();
12+
let (sum, dec_mbytes, rec_mbytes) = tokio::spawn(do_bench(compression)).await.unwrap();
1313
assert_eq!(sum, 124999999750000000);
1414
let elapsed = start.elapsed();
15-
println!("{name:>8} {elapsed:>7.3?}");
15+
let throughput = dec_mbytes / elapsed.as_secs_f64();
16+
println!("{name:>8} {elapsed:>7.3?} {throughput:>4.0} MiB/s {rec_mbytes:>4.0} MiB");
1617
}
1718

18-
async fn do_bench(compression: Compression) -> u64 {
19+
async fn do_bench(compression: Compression) -> (u64, f64, f64) {
1920
let client = Client::default()
2021
.with_compression(compression)
2122
.with_url("http://localhost:8123");
@@ -30,12 +31,16 @@ async fn do_bench(compression: Compression) -> u64 {
3031
sum += row.no;
3132
}
3233

33-
sum
34+
let dec_bytes = cursor.decoded_bytes();
35+
let dec_mbytes = dec_bytes as f64 / 1024.0 / 1024.0;
36+
let recv_bytes = cursor.received_bytes();
37+
let recv_mbytes = recv_bytes as f64 / 1024.0 / 1024.0;
38+
(sum, dec_mbytes, recv_mbytes)
3439
}
3540

3641
#[tokio::main]
3742
async fn main() {
38-
println!("compress elapsed");
43+
println!("compress elapsed throughput received");
3944
bench("none", Compression::None).await;
4045
#[cfg(feature = "lz4")]
4146
bench("lz4", Compression::Lz4).await;

src/buflist.rs

Lines changed: 0 additions & 167 deletions
This file was deleted.

src/bytes_ext.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use bytes::{Bytes, BytesMut};
2+
3+
#[derive(Default)]
4+
pub(crate) struct BytesExt {
5+
bytes: Bytes,
6+
cursor: usize,
7+
}
8+
9+
impl BytesExt {
10+
#[inline(always)]
11+
pub(crate) fn slice(&self) -> &[u8] {
12+
&self.bytes[self.cursor..]
13+
}
14+
15+
#[inline(always)]
16+
pub(crate) fn remaining(&self) -> usize {
17+
self.bytes.len() - self.cursor
18+
}
19+
20+
#[inline(always)]
21+
pub(crate) fn set_remaining(&mut self, n: usize) {
22+
// We can use `bytes.advance()` here, but it's slower.
23+
self.cursor = self.bytes.len() - n;
24+
}
25+
26+
#[cfg(any(test, feature = "lz4", feature = "watch"))]
27+
#[inline(always)]
28+
pub(crate) fn advance(&mut self, n: usize) {
29+
// We can use `bytes.advance()` here, but it's slower.
30+
self.cursor += n;
31+
}
32+
33+
#[inline(always)]
34+
pub(crate) fn extend(&mut self, chunk: Bytes) {
35+
if self.cursor == self.bytes.len() {
36+
// Most of the time, we read the next chunk after consuming the previous one.
37+
self.bytes = chunk;
38+
self.cursor = 0;
39+
} else {
40+
// Some bytes are left in the buffer, we need to merge them with the next chunk.
41+
self.extend_slow(chunk);
42+
}
43+
}
44+
45+
#[cold]
46+
#[inline(never)]
47+
fn extend_slow(&mut self, chunk: Bytes) {
48+
let total = self.remaining() + chunk.len();
49+
let mut new_bytes = BytesMut::with_capacity(total);
50+
let capacity = new_bytes.capacity();
51+
new_bytes.extend_from_slice(self.slice());
52+
new_bytes.extend_from_slice(&chunk);
53+
debug_assert_eq!(new_bytes.capacity(), capacity);
54+
self.bytes = new_bytes.freeze();
55+
self.cursor = 0;
56+
}
57+
}
58+
59+
#[test]
60+
fn it_works() {
61+
let mut bytes = BytesExt::default();
62+
assert!(bytes.slice().is_empty());
63+
assert_eq!(bytes.remaining(), 0);
64+
65+
bytes.extend(Bytes::from_static(b"hello"));
66+
assert_eq!(bytes.slice(), b"hello");
67+
assert_eq!(bytes.remaining(), 5);
68+
69+
bytes.advance(3);
70+
assert_eq!(bytes.slice(), b"lo");
71+
assert_eq!(bytes.remaining(), 2);
72+
73+
bytes.extend(Bytes::from_static(b"l"));
74+
assert_eq!(bytes.slice(), b"lol");
75+
assert_eq!(bytes.remaining(), 3);
76+
77+
bytes.set_remaining(1);
78+
assert_eq!(bytes.slice(), b"l");
79+
assert_eq!(bytes.remaining(), 1);
80+
}

0 commit comments

Comments
 (0)