Skip to content

Commit 3947c14

Browse files
authored
feat!(insert): RowBinaryWithNamesAndTypes (#244)
1 parent 2e57c26 commit 3947c14

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3043
-2176
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ criterion = "0.6"
158158
serde = { version = "1.0.106", features = ["derive"] }
159159
tokio = { version = "1.0.1", features = ["full", "test-util"] }
160160
hyper = { version = "1.1", features = ["server"] }
161+
indexmap = { version = "2.10.0", features = ["serde"] }
162+
linked-hash-map = { version = "0.5.6", features = ["serde_impl"] }
163+
fxhash = { version = "0.2.1" }
161164
serde_bytes = "0.11.4"
162165
serde_json = "1"
163166
serde_repr = "0.1.7"

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ struct MyRow {
137137
name: String,
138138
}
139139
140-
let mut insert = client.insert("some")?;
140+
let mut insert = client.insert::<MyRow>("some").await?;
141141
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
142142
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
143143
insert.end().await?;
@@ -158,14 +158,14 @@ insert.end().await?;
158158
Requires the `inserter` feature.
159159

160160
```rust,ignore
161-
let mut inserter = client.inserter("some")?
161+
let mut inserter = client.inserter::<MyRow>("some")?
162162
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
163163
.with_max_bytes(50_000_000)
164164
.with_max_rows(750_000)
165165
.with_period(Some(Duration::from_secs(15)));
166166
167-
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
168-
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
167+
inserter.write(&MyRow { no: 0, name: "foo".into() }).await?;
168+
inserter.write(&MyRow { no: 1, name: "bar".into() }).await?;
169169
let stats = inserter.commit().await?;
170170
if stats.rows > 0 {
171171
println!(

benches/mocked_insert.rs

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,57 @@
11
use bytes::Bytes;
2+
use clickhouse::{error::Result, Client, Compression, Row};
3+
use clickhouse_types::{Column, DataTypeNode};
24
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
3-
use http_body_util::Empty;
5+
use futures_util::stream;
6+
use http_body_util::StreamBody;
7+
use hyper::body::{Body, Frame};
48
use hyper::{body::Incoming, Request, Response};
59
use serde::Serialize;
10+
use std::convert::Infallible;
611
use std::hint::black_box;
7-
use std::net::SocketAddr;
12+
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
813
use std::{
914
future::Future,
1015
mem,
1116
time::{Duration, Instant},
1217
};
1318

14-
use clickhouse::{error::Result, Client, Compression, Row};
15-
1619
mod common;
1720

18-
async fn serve(request: Request<Incoming>) -> Response<Empty<Bytes>> {
21+
async fn serve(
22+
request: Request<Incoming>,
23+
compression: Compression,
24+
with_validation: bool,
25+
) -> Response<impl Body<Data = Bytes, Error = Infallible>> {
1926
common::skip_incoming(request).await;
20-
Response::new(Empty::new())
27+
28+
let bytes = if with_validation {
29+
let schema = vec![
30+
Column::new("a".to_string(), DataTypeNode::UInt64),
31+
Column::new("b".to_string(), DataTypeNode::Int64),
32+
Column::new("c".to_string(), DataTypeNode::Int32),
33+
Column::new("d".to_string(), DataTypeNode::UInt32),
34+
Column::new("e".to_string(), DataTypeNode::UInt64),
35+
Column::new("f".to_string(), DataTypeNode::UInt32),
36+
Column::new("g".to_string(), DataTypeNode::UInt64),
37+
Column::new("h".to_string(), DataTypeNode::Int64),
38+
];
39+
40+
let mut buffer = Vec::new();
41+
clickhouse_types::put_rbwnat_columns_header(&schema, &mut buffer).unwrap();
42+
43+
match compression {
44+
Compression::None => Bytes::from(buffer),
45+
#[cfg(feature = "lz4")]
46+
Compression::Lz4 => clickhouse::_priv::lz4_compress(&buffer).unwrap(),
47+
_ => unreachable!(),
48+
}
49+
} else {
50+
Bytes::new()
51+
};
52+
53+
let stream = StreamBody::new(stream::once(async { Ok(Frame::data(bytes)) }));
54+
Response::new(stream)
2155
}
2256

2357
#[derive(Row, Serialize)]
@@ -47,11 +81,18 @@ impl SomeRow {
4781
}
4882
}
4983

50-
async fn run_insert(client: Client, addr: SocketAddr, iters: u64) -> Result<Duration> {
51-
let _server = common::start_server(addr, serve).await;
84+
const ADDR: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 6524));
85+
86+
async fn run_insert(
87+
client: Client,
88+
iters: u64,
89+
compression: Compression,
90+
validation: bool,
91+
) -> Result<Duration> {
92+
let _server = common::start_server(ADDR, move |req| serve(req, compression, validation)).await;
5293

5394
let start = Instant::now();
54-
let mut insert = client.insert::<SomeRow>("table")?;
95+
let mut insert = client.insert::<SomeRow>("table").await?;
5596

5697
for _ in 0..iters {
5798
insert.write(&SomeRow::sample()).await?;
@@ -64,65 +105,68 @@ async fn run_insert(client: Client, addr: SocketAddr, iters: u64) -> Result<Dura
64105
#[cfg(feature = "inserter")]
65106
async fn run_inserter<const WITH_PERIOD: bool>(
66107
client: Client,
67-
addr: SocketAddr,
68108
iters: u64,
109+
compression: Compression,
110+
validation: bool,
69111
) -> Result<Duration> {
70-
let _server = common::start_server(addr, serve).await;
112+
let _server = common::start_server(ADDR, move |req| serve(req, compression, validation)).await;
71113

72114
let start = Instant::now();
73-
let mut inserter = client.inserter::<SomeRow>("table")?.with_max_rows(iters);
115+
let mut inserter = client.inserter::<SomeRow>("table").with_max_rows(iters);
74116

75117
if WITH_PERIOD {
76118
// Just to measure overhead, not to actually use it.
77119
inserter = inserter.with_period(Some(Duration::from_secs(1000)));
78120
}
79121

80122
for _ in 0..iters {
81-
inserter.write(&SomeRow::sample())?;
123+
inserter.write(&SomeRow::sample()).await?;
82124
inserter.commit().await?;
83125
}
84126

85127
inserter.end().await?;
86128
Ok(start.elapsed())
87129
}
88130

89-
fn run<F>(c: &mut Criterion, name: &str, port: u16, f: impl Fn(Client, SocketAddr, u64) -> F)
131+
fn run<F>(c: &mut Criterion, name: &str, f: impl Fn(Client, u64, Compression, bool) -> F)
90132
where
91133
F: Future<Output = Result<Duration>> + Send + 'static,
92134
{
93-
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
94135
let runner = common::start_runner();
95-
96136
let mut group = c.benchmark_group(name);
97137
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
98-
group.bench_function("uncompressed", |b| {
99-
b.iter_custom(|iters| {
100-
let client = Client::default()
101-
.with_url(format!("http://{addr}"))
102-
.with_compression(Compression::None);
103-
runner.run((f)(client, addr, iters))
104-
})
105-
});
106-
#[cfg(feature = "lz4")]
107-
group.bench_function("lz4", |b| {
108-
b.iter_custom(|iters| {
109-
let client = Client::default()
110-
.with_url(format!("http://{addr}"))
111-
.with_compression(Compression::Lz4);
112-
runner.run((f)(client, addr, iters))
113-
})
114-
});
138+
for validation in [true, false] {
139+
#[allow(clippy::single_element_loop)]
140+
for compression in [
141+
Compression::None,
142+
#[cfg(feature = "lz4")]
143+
Compression::Lz4,
144+
] {
145+
group.bench_function(
146+
format!("validation={validation}/compression={compression:?}"),
147+
|b| {
148+
b.iter_custom(|iters| {
149+
let client = Client::default()
150+
.with_url(format!("http://{ADDR}"))
151+
.with_compression(compression)
152+
.with_validation(validation);
153+
runner.run((f)(client, iters, compression, validation))
154+
})
155+
},
156+
);
157+
}
158+
}
115159
group.finish();
116160
}
117161

118162
fn insert(c: &mut Criterion) {
119-
run(c, "insert", 6543, run_insert);
163+
run(c, "insert", run_insert);
120164
}
121165

122166
#[cfg(feature = "inserter")]
123167
fn inserter(c: &mut Criterion) {
124-
run(c, "inserter", 6544, run_inserter::<false>);
125-
run(c, "inserter-period", 6545, run_inserter::<true>);
168+
run(c, "inserter", run_inserter::<false>);
169+
run(c, "inserter-period", run_inserter::<true>);
126170
}
127171

128172
#[cfg(not(feature = "inserter"))]

benches/mocked_select.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ mod common;
2121
async fn serve(
2222
request: Request<Incoming>,
2323
compression: Compression,
24-
use_rbwnat: bool,
24+
with_validation: bool,
2525
) -> Response<impl Body<Data = Bytes, Error = Infallible>> {
2626
common::skip_incoming(request).await;
2727

28-
let maybe_schema = if use_rbwnat {
28+
let maybe_schema = if with_validation {
2929
let schema = vec![
3030
Column::new("a".to_string(), DataTypeNode::UInt64),
3131
Column::new("b".to_string(), DataTypeNode::Int64),
@@ -79,8 +79,8 @@ fn prepare_chunk() -> Bytes {
7979
const ADDR: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 6523));
8080

8181
fn select(c: &mut Criterion) {
82-
async fn start_server(compression: Compression, use_rbwnat: bool) -> common::ServerHandle {
83-
common::start_server(ADDR, move |req| serve(req, compression, use_rbwnat)).await
82+
async fn start_server(compression: Compression, with_validation: bool) -> common::ServerHandle {
83+
common::start_server(ADDR, move |req| serve(req, compression, with_validation)).await
8484
}
8585

8686
let runner = common::start_runner();

benches/select_market_data.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,21 @@ async fn prepare_data() {
4141
client
4242
.query(
4343
r#"
44-
CREATE TABLE IF NOT EXISTS l2_book_log
45-
(
46-
`instrument_id` UInt32 CODEC(T64, Default),
47-
`received_time` DateTime64(9) CODEC(DoubleDelta, Default),
48-
`exchange_time` Nullable(DateTime64(9)) CODEC(DoubleDelta, Default),
49-
`sequence_no` Nullable(UInt64) CODEC(DoubleDelta, Default),
50-
`trace_id` UInt64 CODEC(DoubleDelta, Default),
51-
`side` Enum8('Bid' = 0, 'Ask' = 1),
52-
`price` Float64,
53-
`amount` Float64,
54-
`is_eot` Bool
55-
)
56-
ENGINE = MergeTree
57-
PRIMARY KEY (instrument_id, received_time)
58-
"#,
44+
CREATE TABLE IF NOT EXISTS l2_book_log
45+
(
46+
`instrument_id` UInt32 CODEC(T64, Default),
47+
`received_time` DateTime64(9) CODEC(DoubleDelta, Default),
48+
`exchange_time` Nullable(DateTime64(9)) CODEC(DoubleDelta, Default),
49+
`sequence_no` Nullable(UInt64) CODEC(DoubleDelta, Default),
50+
`trace_id` UInt64 CODEC(DoubleDelta, Default),
51+
`side` Enum8('Bid' = 0, 'Ask' = 1),
52+
`price` Float64,
53+
`amount` Float64,
54+
`is_eot` Bool
55+
)
56+
ENGINE = MergeTree
57+
PRIMARY KEY (instrument_id, received_time)
58+
"#,
5959
)
6060
.execute()
6161
.await
@@ -71,7 +71,7 @@ async fn prepare_data() {
7171
return;
7272
}
7373

74-
let mut insert = client.insert::<L2Update>("l2_book_log").unwrap();
74+
let mut insert = client.insert::<L2Update>("l2_book_log").await.unwrap();
7575

7676
for i in 0..10_000_000 {
7777
insert

examples/async_insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async fn main() -> Result<()> {
4040
.execute()
4141
.await?;
4242

43-
let mut insert = client.insert::<Event>(table_name)?;
43+
let mut insert = client.insert::<Event>(table_name).await?;
4444
insert
4545
.write(&Event {
4646
timestamp: now(),

examples/clickhouse_cloud.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ async fn main() -> clickhouse::error::Result<()> {
4242
.execute()
4343
.await?;
4444

45-
let mut insert = client.insert::<Data>(table_name)?;
45+
let mut insert = client.insert::<Data>(table_name).await?;
4646
insert
4747
.write(&Data {
4848
id: 42,

examples/data_types_derive_containers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async fn main() -> Result<()> {
3737
.execute()
3838
.await?;
3939

40-
let mut insert = client.insert::<Row>(table_name)?;
40+
let mut insert = client.insert::<Row>(table_name).await?;
4141
insert.write(&Row::new()).await?;
4242
insert.end().await?;
4343

examples/data_types_derive_simple.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ async fn main() -> Result<()> {
8080
.execute()
8181
.await?;
8282

83-
let mut insert = client.insert::<Row>(table_name)?;
83+
let mut insert = client.insert::<Row>(table_name).await?;
8484
insert.write(&Row::new()).await?;
8585
insert.end().await?;
8686

examples/data_types_new_json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async fn main() -> Result<()> {
4949
.to_string(),
5050
};
5151

52-
let mut insert = client.insert::<Row>(table_name)?;
52+
let mut insert = client.insert::<Row>(table_name).await?;
5353
insert.write(&row).await?;
5454
insert.end().await?;
5555

0 commit comments

Comments
 (0)