Skip to content

Commit d271372

Browse files
authored
Random Access Benchmark (#149)
1 parent 704d3f3 commit d271372

File tree

20 files changed

+346
-87
lines changed

20 files changed

+346
-87
lines changed

Cargo.lock

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ arrayref = "0.3.7"
3939
arrow = { version = "51.0.0", features = ["pyarrow"] }
4040
arrow-array = "51.0.0"
4141
arrow-buffer = "51.0.0"
42+
arrow-select = "51.0.0"
4243
arrow-schema = "51.0.0"
4344
bindgen = "0.69.4"
4445
criterion = { version = "0.5.1", features = ["html_reports"] }

bench-vortex/Cargo.toml

+5-3
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,21 @@ workspace = true
1616

1717
[dependencies]
1818
arrow-array = { workspace = true }
19+
arrow-select = { workspace = true }
20+
vortex-alp = { path = "../vortex-alp" }
1921
vortex-array = { path = "../vortex-array" }
2022
vortex-datetime = { path = "../vortex-datetime" }
21-
vortex-alp = { path = "../vortex-alp" }
2223
vortex-dict = { path = "../vortex-dict" }
24+
vortex-error = { path = "../vortex-error", features = ["parquet"] }
2325
vortex-fastlanes = { path = "../vortex-fastlanes" }
2426
vortex-ree = { path = "../vortex-ree" }
2527
vortex-roaring = { path = "../vortex-roaring" }
2628
vortex-schema = { path = "../vortex-schema" }
2729
vortex-zigzag = { path = "../vortex-zigzag" }
2830
itertools = { workspace = true }
29-
reqwest = { workspace = true }
30-
parquet = { workspace = true }
3131
log = { workspace = true }
32+
parquet = { workspace = true }
33+
reqwest = { workspace = true }
3234
simplelog = { workspace = true }
3335

3436
[dev-dependencies]

bench-vortex/benches/compress_benchmark.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use criterion::{black_box, criterion_group, criterion_main, Criterion};
22

33
use bench_vortex::compress_taxi_data;
4-
use bench_vortex::taxi_data::download_taxi_data;
4+
use bench_vortex::taxi_data::taxi_data_parquet;
55

66
fn vortex_compress(c: &mut Criterion) {
7-
download_taxi_data();
7+
taxi_data_parquet();
88
let mut group = c.benchmark_group("end to end");
99
group.sample_size(10);
1010
group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data())));

bench-vortex/benches/random_access.rs

+14-11
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
1-
use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data};
1+
use bench_vortex::reader::take_vortex;
2+
use bench_vortex::taxi_data::taxi_data_vortex_compressed;
23
use criterion::{black_box, criterion_group, criterion_main, Criterion};
3-
use itertools::Itertools;
4-
5-
use vortex::array::ENCODINGS;
64

75
fn random_access(c: &mut Criterion) {
8-
let taxi_spiral = write_taxi_data();
6+
let mut group = c.benchmark_group("random access");
7+
// group.sample_size(10);
8+
99
let indices = [10, 11, 12, 13, 100_000, 3_000_000];
10-
println!(
11-
"ENCODINGS {:?}",
12-
ENCODINGS.iter().map(|e| e.id()).collect_vec()
13-
);
14-
c.bench_function("random access", |b| {
15-
b.iter(|| black_box(take_taxi_data(&taxi_spiral, &indices)))
10+
11+
let taxi_vortex = taxi_data_vortex_compressed();
12+
group.bench_function("vortex", |b| {
13+
b.iter(|| black_box(take_vortex(&taxi_vortex, &indices).unwrap()))
1614
});
15+
//
16+
// let taxi_parquet = taxi_data_parquet();
17+
// group.bench_function("arrow", |b| {
18+
// b.iter(|| black_box(take_parquet(&taxi_parquet, &indices)?))
19+
// });
1720
}
1821

1922
criterion_group!(benches, random_access);

bench-vortex/src/bin/compress.rs

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use bench_vortex::reader::{compress_vortex, open_vortex};
2+
use bench_vortex::setup_logger;
3+
use bench_vortex::taxi_data::taxi_data_parquet;
4+
use log::LevelFilter;
5+
use std::fs::File;
6+
use std::os::unix::prelude::MetadataExt;
7+
use std::path::PathBuf;
8+
use vortex::array::Array;
9+
use vortex::formatter::display_tree;
10+
11+
pub fn main() {
12+
setup_logger(LevelFilter::Debug);
13+
14+
let path: PathBuf = "taxi_data.vortex".into();
15+
{
16+
let mut write = File::create(&path).unwrap();
17+
compress_vortex(&taxi_data_parquet(), &mut write).unwrap();
18+
}
19+
20+
let taxi_vortex = open_vortex(&path).unwrap();
21+
22+
let pq_size = taxi_data_parquet().metadata().unwrap().size();
23+
let vx_size = taxi_vortex.nbytes();
24+
25+
println!("{}\n\n", display_tree(taxi_vortex.as_ref()));
26+
println!("Parquet size: {}, Vortex size: {}", pq_size, vx_size);
27+
println!("Compression ratio: {}", vx_size as f32 / pq_size as f32);
28+
}

bench-vortex/src/bin/serde.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
use bench_vortex::reader::take_vortex;
12
use bench_vortex::setup_logger;
2-
use bench_vortex::taxi_data::{take_taxi_data, write_taxi_data};
3+
use bench_vortex::taxi_data::taxi_data_vortex;
34
use log::LevelFilter;
45

56
pub fn main() {
67
setup_logger(LevelFilter::Debug);
7-
let taxi_spiral = write_taxi_data();
8-
let rows = take_taxi_data(&taxi_spiral, &[10, 11, 12, 13]); //, 100_000, 3_000_000]);
8+
let taxi_vortex = taxi_data_vortex();
9+
let rows = take_vortex(&taxi_vortex, &[10, 11, 12, 13, 100_000, 3_000_000]).unwrap();
910
println!("TAKE TAXI DATA: {:?}", rows);
1011
}

bench-vortex/src/lib.rs

+26-13
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
99
use parquet::arrow::ProjectionMask;
1010
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
1111

12+
use crate::taxi_data::taxi_data_parquet;
1213
use vortex::array::chunked::ChunkedArray;
1314
use vortex::array::downcast::DowncastArrayBuiltin;
1415
use vortex::array::IntoArray;
@@ -18,23 +19,35 @@ use vortex::compress::{CompressConfig, CompressCtx};
1819
use vortex::formatter::display_tree;
1920
use vortex_alp::ALPEncoding;
2021
use vortex_datetime::DateTimeEncoding;
21-
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};
22+
use vortex_dict::DictEncoding;
23+
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
2224
use vortex_ree::REEEncoding;
2325
use vortex_roaring::RoaringBoolEncoding;
2426
use vortex_schema::DType;
2527

28+
pub mod reader;
2629
pub mod taxi_data;
2730

28-
pub fn idempotent(name: &str, f: impl FnOnce(&mut File)) -> PathBuf {
31+
pub fn idempotent<T, E>(
32+
name: &str,
33+
f: impl FnOnce(&mut File) -> Result<T, E>,
34+
) -> Result<PathBuf, E> {
35+
let path = data_path(name);
36+
if !path.exists() {
37+
let mut file = File::create(&path).unwrap();
38+
f(&mut file)?;
39+
}
40+
Ok(path.to_path_buf())
41+
}
42+
43+
pub fn data_path(name: &str) -> PathBuf {
2944
let path = Path::new(env!("CARGO_MANIFEST_DIR"))
3045
.join("data")
3146
.join(name);
32-
if !path.exists() {
47+
if !path.parent().unwrap().exists() {
3348
create_dir_all(path.parent().unwrap()).unwrap();
34-
let mut file = File::create(&path).unwrap();
35-
f(&mut file);
3649
}
37-
path.to_path_buf()
50+
path
3851
}
3952

4053
pub fn setup_logger(level: LevelFilter) {
@@ -51,11 +64,11 @@ pub fn enumerate_arrays() -> Vec<EncodingRef> {
5164
println!("FOUND {:?}", ENCODINGS.iter().map(|e| e.id()).collect_vec());
5265
vec![
5366
&ALPEncoding,
54-
//&DictEncoding,
67+
&DictEncoding,
5568
&BitPackedEncoding,
5669
&FoREncoding,
5770
&DateTimeEncoding,
58-
&DeltaEncoding,
71+
// &DeltaEncoding, Blows up the search space too much.
5972
&REEEncoding,
6073
&RoaringBoolEncoding,
6174
// RoaringIntEncoding,
@@ -71,7 +84,7 @@ pub fn compress_ctx() -> CompressCtx {
7184
}
7285

7386
pub fn compress_taxi_data() -> ArrayRef {
74-
let file = File::open(taxi_data::download_taxi_data()).unwrap();
87+
let file = File::open(taxi_data_parquet()).unwrap();
7588
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
7689
let _mask = ProjectionMask::roots(builder.parquet_schema(), [1]);
7790
let _no_datetime_mask = ProjectionMask::roots(
@@ -141,7 +154,7 @@ mod test {
141154
use vortex::encode::FromArrowArray;
142155
use vortex::serde::{ReadCtx, WriteCtx};
143156

144-
use crate::taxi_data::download_taxi_data;
157+
use crate::taxi_data::taxi_data_parquet;
145158
use crate::{compress_ctx, compress_taxi_data, setup_logger};
146159

147160
#[ignore]
@@ -154,7 +167,7 @@ mod test {
154167
#[ignore]
155168
#[test]
156169
fn round_trip_serde() {
157-
let file = File::open(download_taxi_data()).unwrap();
170+
let file = File::open(taxi_data_parquet()).unwrap();
158171
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
159172
let reader = builder.with_limit(1).build().unwrap();
160173

@@ -176,7 +189,7 @@ mod test {
176189
#[ignore]
177190
#[test]
178191
fn round_trip_arrow() {
179-
let file = File::open(download_taxi_data()).unwrap();
192+
let file = File::open(taxi_data_parquet()).unwrap();
180193
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
181194
let reader = builder.with_limit(1).build().unwrap();
182195

@@ -194,7 +207,7 @@ mod test {
194207
#[ignore]
195208
#[test]
196209
fn round_trip_arrow_compressed() {
197-
let file = File::open(download_taxi_data()).unwrap();
210+
let file = File::open(taxi_data_parquet()).unwrap();
198211
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
199212
let reader = builder.with_limit(1).build().unwrap();
200213

bench-vortex/src/reader.rs

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use crate::compress_ctx;
2+
use arrow_array::types::Int64Type;
3+
use arrow_array::{
4+
ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray, RecordBatch,
5+
RecordBatchReader,
6+
};
7+
use arrow_select::concat::concat_batches;
8+
use arrow_select::take::take_record_batch;
9+
use itertools::Itertools;
10+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11+
use std::collections::HashMap;
12+
use std::fs::File;
13+
use std::io::Write;
14+
use std::path::Path;
15+
use std::sync::Arc;
16+
use vortex::array::chunked::ChunkedArray;
17+
use vortex::array::primitive::PrimitiveArray;
18+
use vortex::array::{ArrayRef, IntoArray};
19+
use vortex::arrow::FromArrowType;
20+
use vortex::compute::flatten::flatten;
21+
use vortex::compute::take::take;
22+
use vortex::ptype::PType;
23+
use vortex::serde::{ReadCtx, WriteCtx};
24+
use vortex_error::VortexResult;
25+
use vortex_schema::DType;
26+
27+
pub fn open_vortex(path: &Path) -> VortexResult<ArrayRef> {
28+
let mut file = File::open(path)?;
29+
let dummy_dtype: DType = PType::U8.into();
30+
let mut read_ctx = ReadCtx::new(&dummy_dtype, &mut file);
31+
let dtype = read_ctx.dtype()?;
32+
read_ctx.with_schema(&dtype).read()
33+
}
34+
35+
pub fn compress_vortex<W: Write>(parquet_path: &Path, write: &mut W) -> VortexResult<()> {
36+
let taxi_pq = File::open(parquet_path)?;
37+
let builder = ParquetRecordBatchReaderBuilder::try_new(taxi_pq)?;
38+
39+
// FIXME(ngates): #157 the compressor should handle batch size.
40+
let reader = builder.with_batch_size(65_536).build()?;
41+
42+
let dtype = DType::from_arrow(reader.schema());
43+
let ctx = compress_ctx();
44+
45+
let chunks = reader
46+
.map(|batch_result| batch_result.unwrap())
47+
.map(|record_batch| {
48+
let vortex_array = record_batch.into_array();
49+
ctx.compress(&vortex_array, None).unwrap()
50+
})
51+
.collect_vec();
52+
let chunked = ChunkedArray::new(chunks, dtype.clone());
53+
54+
let mut write_ctx = WriteCtx::new(write);
55+
write_ctx.dtype(&dtype).unwrap();
56+
write_ctx.write(&chunked).unwrap();
57+
Ok(())
58+
}
59+
60+
pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<ArrayRef> {
61+
let array = open_vortex(path)?;
62+
let taken = take(&array, &PrimitiveArray::from(indices.to_vec()))?;
63+
// For equivalence.... we flatten to make sure we're not cheating too much.
64+
flatten(&taken).map(|x| x.into_array())
65+
}
66+
67+
pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult<RecordBatch> {
68+
let file = File::open(path)?;
69+
70+
// TODO(ngates): enable read_page_index
71+
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
72+
73+
// We figure out which row groups we need to read and a selection filter for each of them.
74+
let mut row_groups = HashMap::new();
75+
let mut row_group_offsets = vec![0];
76+
row_group_offsets.extend(
77+
builder
78+
.metadata()
79+
.row_groups()
80+
.iter()
81+
.map(|rg| rg.num_rows())
82+
.scan(0i64, |acc, x| {
83+
*acc += x;
84+
Some(*acc)
85+
}),
86+
);
87+
88+
for idx in indices {
89+
let row_group_idx = row_group_offsets
90+
.binary_search(&(*idx as i64))
91+
.unwrap_or_else(|e| e - 1);
92+
row_groups
93+
.entry(row_group_idx)
94+
.or_insert_with(Vec::new)
95+
.push((*idx as i64) - row_group_offsets[row_group_idx]);
96+
}
97+
let row_group_indices = row_groups
98+
.keys()
99+
.sorted()
100+
.map(|i| row_groups.get(i).unwrap().clone())
101+
.collect_vec();
102+
103+
let reader = builder
104+
.with_row_groups(row_groups.keys().copied().collect_vec())
105+
// FIXME(ngates): our indices code assumes the batch size == the row group sizes
106+
.with_batch_size(10_000_000)
107+
.build()
108+
.unwrap();
109+
110+
let schema = reader.schema();
111+
112+
let batches = reader
113+
.into_iter()
114+
.enumerate()
115+
.map(|(idx, batch)| {
116+
let batch = batch.unwrap();
117+
let indices = ArrowPrimitiveArray::<Int64Type>::from(row_group_indices[idx].clone());
118+
let indices_array: ArrowArrayRef = Arc::new(indices);
119+
take_record_batch(&batch, &indices_array).unwrap()
120+
})
121+
.collect_vec();
122+
123+
Ok(concat_batches(&schema, &batches)?)
124+
}

0 commit comments

Comments
 (0)