Skip to content

Commit b4cbb39

Browse files
authored
perf(query): improve SELECTs performance (#169)
* Improve performance of `RowCursor::next()`. * Add `RowCursor::{decoded_bytes,received_bytes}` methods.
1 parent b45ff06 commit b4cbb39

22 files changed

+569
-490
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
# Changelog
22
All notable changes to this project will be documented in this file.
33

4-
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
<!-- next-header -->
88

99
## [Unreleased] - ReleaseDate
10+
### Added
11+
- query/cursor: add `RowCursor::{decoded_bytes,received_bytes}` methods ([#169]).
12+
13+
### Changed
14+
- query/cursor: improve performance of `RowCursor::next()` ([#169]).
15+
1016
### Fixed
1117
- mock: work with the advanced time via `tokio::time::advance()` ([#165]).
1218

1319
[#165]: https://github.com/ClickHouse/clickhouse-rs/pull/165
20+
[#169]: https://github.com/ClickHouse/clickhouse-rs/pull/169
1421

1522
## [0.13.0] - 2024-09-27
1623
### Added

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ url = "2.1.1"
9393
futures = "0.3.5"
9494
futures-channel = "0.3.30"
9595
static_assertions = "1.1"
96-
sealed = "0.5"
96+
sealed = "0.6"
9797
sha-1 = { version = "0.10", optional = true }
9898
serde_json = { version = "1.0.68", optional = true }
9999
lz4_flex = { version = "0.11.3", default-features = false, features = [

benches/README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Benchmarks
2+
3+
All cases are run with `cargo bench --bench <case>`.
4+
5+
## With a mocked server
6+
7+
These benchmarks are run against a mocked server, which is a simple HTTP server that responds with a fixed response. This is useful to measure the overhead of the client itself:
8+
* `select` checks throughput of `Client::query()`.
9+
* `insert` checks throughput of `Client::insert()` and `Client::inserter()` (if the `inserter` features is enabled).
10+
11+
### How to collect perf data
12+
13+
The crate's code runs on the thread with the name `testee`:
14+
```bash
15+
cargo bench --bench <name> &
16+
perf record -p `ps -AT | grep testee | awk '{print $2}'` --call-graph dwarf,65528 --freq 5000 -g -- sleep 5
17+
perf script > perf.script
18+
```
19+
20+
Then upload the `perf.script` file to [Firefox Profiler](https://profiler.firefox.com).
21+
22+
## With a running ClickHouse server
23+
24+
These benchmarks are run against a real ClickHouse server, so it must be started:
25+
```bash
26+
docker run -d -p 8123:8123 -p 9000:9000 --name ch clickhouse/clickhouse-server
27+
28+
cargo bench --bench <case>
29+
```
30+
31+
Cases:
32+
* `select_numbers` measures time of running a big SELECT query to the `system.numbers_mt` table.
33+
34+
### How to collect perf data
35+
36+
```bash
37+
cargo bench --bench <name> &
38+
perf record -p `ps -AT | grep <name> | awk '{print $2}'` --call-graph dwarf,65528 --freq 5000 -g -- sleep 5
39+
perf script > perf.script
40+
```
41+
42+
Then upload the `perf.script` file to [Firefox Profiler](https://profiler.firefox.com).

benches/common.rs

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
#![allow(dead_code)] // typical for common test/bench modules :(
22

3-
use std::{convert::Infallible, future::Future, net::SocketAddr, thread};
3+
use std::{
4+
convert::Infallible,
5+
future::Future,
6+
net::SocketAddr,
7+
pin::Pin,
8+
sync::atomic::{AtomicU32, Ordering},
9+
thread,
10+
time::Duration,
11+
};
412

513
use bytes::Bytes;
614
use futures::stream::StreamExt;
@@ -11,7 +19,13 @@ use hyper::{
1119
service, Request, Response,
1220
};
1321
use hyper_util::rt::{TokioIo, TokioTimer};
14-
use tokio::{net::TcpListener, runtime};
22+
use tokio::{
23+
net::TcpListener,
24+
runtime,
25+
sync::{mpsc, oneshot},
26+
};
27+
28+
use clickhouse::error::Result;
1529

1630
pub(crate) struct ServerHandle;
1731

@@ -38,14 +52,7 @@ where
3852
}
3953
};
4054

41-
thread::spawn(move || {
42-
runtime::Builder::new_current_thread()
43-
.enable_all()
44-
.build()
45-
.unwrap()
46-
.block_on(serving);
47-
});
48-
55+
run_on_st_runtime("server", serving);
4956
ServerHandle
5057
}
5158

@@ -57,3 +64,62 @@ pub(crate) async fn skip_incoming(request: Request<Incoming>) {
5764
result.unwrap();
5865
}
5966
}
67+
68+
pub(crate) struct RunnerHandle {
69+
tx: mpsc::UnboundedSender<Run>,
70+
}
71+
72+
struct Run {
73+
future: Pin<Box<dyn Future<Output = Result<Duration>> + Send>>,
74+
callback: oneshot::Sender<Result<Duration>>,
75+
}
76+
77+
impl RunnerHandle {
78+
pub(crate) fn run(
79+
&self,
80+
f: impl Future<Output = Result<Duration>> + Send + 'static,
81+
) -> Duration {
82+
let (tx, rx) = oneshot::channel();
83+
84+
self.tx
85+
.send(Run {
86+
future: Box::pin(f),
87+
callback: tx,
88+
})
89+
.unwrap();
90+
91+
rx.blocking_recv().unwrap().unwrap()
92+
}
93+
}
94+
95+
pub(crate) fn start_runner() -> RunnerHandle {
96+
let (tx, mut rx) = mpsc::unbounded_channel::<Run>();
97+
98+
run_on_st_runtime("testee", async move {
99+
while let Some(run) = rx.recv().await {
100+
let result = run.future.await;
101+
let _ = run.callback.send(result);
102+
}
103+
});
104+
105+
RunnerHandle { tx }
106+
}
107+
108+
fn run_on_st_runtime(name: &str, f: impl Future + Send + 'static) {
109+
let name = name.to_string();
110+
thread::Builder::new()
111+
.name(name.clone())
112+
.spawn(move || {
113+
let no = AtomicU32::new(0);
114+
runtime::Builder::new_current_thread()
115+
.enable_all()
116+
.thread_name_fn(move || {
117+
let no = no.fetch_add(1, Ordering::Relaxed);
118+
format!("{name}-{no}")
119+
})
120+
.build()
121+
.unwrap()
122+
.block_on(f);
123+
})
124+
.unwrap();
125+
}

benches/insert.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
use std::{future::Future, mem, time::Duration};
1+
use std::{
2+
future::Future,
3+
mem,
4+
time::{Duration, Instant},
5+
};
26

37
use bytes::Bytes;
48
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
59
use http_body_util::Empty;
610
use hyper::{body::Incoming, Request, Response};
711
use serde::Serialize;
8-
use tokio::{runtime::Runtime, time::Instant};
912

1013
use clickhouse::{error::Result, Client, Compression, Row};
1114

@@ -76,30 +79,29 @@ async fn run_inserter<const WITH_PERIOD: bool>(client: Client, iters: u64) -> Re
7679

7780
fn run<F>(c: &mut Criterion, name: &str, port: u16, f: impl Fn(Client, u64) -> F)
7881
where
79-
F: Future<Output = Result<Duration>>,
82+
F: Future<Output = Result<Duration>> + Send + 'static,
8083
{
8184
let addr = format!("127.0.0.1:{port}").parse().unwrap();
8285
let _server = common::start_server(addr, serve);
86+
let runner = common::start_runner();
8387

8488
let mut group = c.benchmark_group(name);
8589
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
8690
group.bench_function("no compression", |b| {
8791
b.iter_custom(|iters| {
88-
let rt = Runtime::new().unwrap();
8992
let client = Client::default()
9093
.with_url(format!("http://{addr}"))
9194
.with_compression(Compression::None);
92-
rt.block_on((f)(client, iters)).unwrap()
95+
runner.run((f)(client, iters))
9396
})
9497
});
9598
#[cfg(feature = "lz4")]
9699
group.bench_function("lz4", |b| {
97100
b.iter_custom(|iters| {
98-
let rt = Runtime::new().unwrap();
99101
let client = Client::default()
100102
.with_url(format!("http://{addr}"))
101103
.with_compression(Compression::Lz4);
102-
rt.block_on((f)(client, iters)).unwrap()
104+
runner.run((f)(client, iters))
103105
})
104106
});
105107
group.finish();

benches/select.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{convert::Infallible, mem};
1+
use std::{
2+
convert::Infallible,
3+
mem,
4+
time::{Duration, Instant},
5+
};
26

37
use bytes::Bytes;
48
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
@@ -9,7 +13,6 @@ use hyper::{
913
Request, Response,
1014
};
1115
use serde::Deserialize;
12-
use tokio::{runtime::Runtime, time::Instant};
1316

1417
use clickhouse::{error::Result, Client, Compression, Row};
1518

@@ -47,6 +50,7 @@ fn select(c: &mut Criterion) {
4750
let addr = "127.0.0.1:6543".parse().unwrap();
4851
let chunk = prepare_chunk();
4952
let _server = common::start_server(addr, move |req| serve(req, chunk.clone()));
53+
let runner = common::start_runner();
5054

5155
#[allow(dead_code)]
5256
#[derive(Debug, Row, Deserialize)]
@@ -57,7 +61,8 @@ fn select(c: &mut Criterion) {
5761
d: u32,
5862
}
5963

60-
async fn run(client: Client, iters: u64) -> Result<()> {
64+
async fn run(client: Client, iters: u64) -> Result<Duration> {
65+
let start = Instant::now();
6166
let mut cursor = client
6267
.query("SELECT ?fields FROM some")
6368
.fetch::<SomeRow>()?;
@@ -66,32 +71,26 @@ fn select(c: &mut Criterion) {
6671
black_box(cursor.next().await?);
6772
}
6873

69-
Ok(())
74+
Ok(start.elapsed())
7075
}
7176

7277
let mut group = c.benchmark_group("select");
7378
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
7479
group.bench_function("no compression", |b| {
7580
b.iter_custom(|iters| {
76-
let rt = Runtime::new().unwrap();
7781
let client = Client::default()
7882
.with_url(format!("http://{addr}"))
7983
.with_compression(Compression::None);
80-
let start = Instant::now();
81-
rt.block_on(run(client, iters)).unwrap();
82-
start.elapsed()
84+
runner.run(run(client, iters))
8385
})
8486
});
8587
#[cfg(feature = "lz4")]
8688
group.bench_function("lz4", |b| {
8789
b.iter_custom(|iters| {
88-
let rt = Runtime::new().unwrap();
8990
let client = Client::default()
9091
.with_url(format!("http://{addr}"))
9192
.with_compression(Compression::Lz4);
92-
let start = Instant::now();
93-
rt.block_on(run(client, iters)).unwrap();
94-
start.elapsed()
93+
runner.run(run(client, iters))
9594
})
9695
});
9796
group.finish();

benches/select_numbers.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
use serde::Deserialize;
22

3-
use clickhouse::{error::Result, Client, Compression, Row};
3+
use clickhouse::{Client, Compression, Row};
44

55
#[derive(Row, Deserialize)]
66
struct Data {
77
no: u64,
88
}
99

10-
async fn bench() -> u64 {
10+
async fn bench(name: &str, compression: Compression) {
11+
let start = std::time::Instant::now();
12+
let (sum, dec_mbytes, rec_mbytes) = tokio::spawn(do_bench(compression)).await.unwrap();
13+
assert_eq!(sum, 124999999750000000);
14+
let elapsed = start.elapsed();
15+
let throughput = dec_mbytes / elapsed.as_secs_f64();
16+
println!("{name:>8} {elapsed:>7.3?} {throughput:>4.0} MiB/s {rec_mbytes:>4.0} MiB");
17+
}
18+
19+
async fn do_bench(compression: Compression) -> (u64, f64, f64) {
1120
let client = Client::default()
12-
.with_compression(Compression::None)
21+
.with_compression(compression)
1322
.with_url("http://localhost:8123");
1423

1524
let mut cursor = client
@@ -22,15 +31,17 @@ async fn bench() -> u64 {
2231
sum += row.no;
2332
}
2433

25-
sum
34+
let dec_bytes = cursor.decoded_bytes();
35+
let dec_mbytes = dec_bytes as f64 / 1024.0 / 1024.0;
36+
let recv_bytes = cursor.received_bytes();
37+
let recv_mbytes = recv_bytes as f64 / 1024.0 / 1024.0;
38+
(sum, dec_mbytes, recv_mbytes)
2639
}
2740

2841
#[tokio::main]
29-
async fn main() -> Result<()> {
30-
println!("Started");
31-
let start = std::time::Instant::now();
32-
let sum = tokio::spawn(bench()).await.unwrap();
33-
let elapsed = start.elapsed();
34-
println!("Done: elapsed={elapsed:?} sum={sum}");
35-
Ok(())
42+
async fn main() {
43+
println!("compress elapsed throughput received");
44+
bench("none", Compression::None).await;
45+
#[cfg(feature = "lz4")]
46+
bench("lz4", Compression::Lz4).await;
3647
}

0 commit comments

Comments
 (0)