Skip to content

Commit 4009f42

Browse files
committed
Switch to tokio for multi threaded writing test
1 parent 2d42ca0 commit 4009f42

File tree

2 files changed

+37
-32
lines changed

2 files changed

+37
-32
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ impl PageWriter for ArrowPageWriter {
558558
}
559559

560560
/// A leaf column that can be encoded by [`ArrowColumnWriter`]
561-
#[derive(Debug)]
561+
#[derive(Debug, Clone)]
562562
pub struct ArrowLeafColumn(ArrayLevels);
563563

564564
/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]

parquet/tests/encryption/encryption.rs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ use parquet::arrow::arrow_reader::{
2828
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection,
2929
RowSelector,
3030
};
31-
use parquet::arrow::arrow_writer::{
32-
compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory,
33-
};
31+
use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowRowGroupWriterFactory};
3432
use parquet::arrow::{ArrowSchemaConverter, ArrowWriter};
3533
use parquet::data_type::{ByteArray, ByteArrayType};
3634
use parquet::encryption::decrypt::FileDecryptionProperties;
@@ -1165,41 +1163,48 @@ async fn test_multi_threaded_encrypted_writing() {
11651163

11661164
// Get column writers with encryptor from ArrowRowGroupWriter
11671165
let col_writers = arrow_row_group_writer.writers;
1166+
let num_columns = col_writers.len();
1167+
1168+
// Create a channel for each column writer to send ArrowLeafColumn data to
1169+
let mut col_writer_tasks = Vec::with_capacity(num_columns);
1170+
let mut col_array_channels = Vec::with_capacity(num_columns);
1171+
for mut writer in col_writers.into_iter() {
1172+
let (send_array, mut receive_array) = tokio::sync::mpsc::channel::<ArrowLeafColumn>(100);
1173+
col_array_channels.push(send_array);
1174+
let handle = tokio::spawn(async move {
1175+
while let Some(col) = receive_array.recv().await {
1176+
let _ = writer.write(&col);
1177+
}
1178+
writer.close()
1179+
});
1180+
col_writer_tasks.push(handle);
1181+
}
11681182

1169-
let mut workers: Vec<_> = col_writers
1170-
.into_iter()
1171-
.map(|mut col_writer| {
1172-
let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1173-
let handle = std::thread::spawn(move || {
1174-
// receive Arrays to encode via the channel
1175-
for col in recv {
1176-
col_writer.write(&col)?;
1177-
}
1178-
// once the input is complete, close the writer
1179-
// to return the newly created ArrowColumnChunk
1180-
col_writer.close()
1181-
});
1182-
(handle, send)
1183-
})
1184-
.collect();
1185-
1186-
let mut worker_iter = workers.iter_mut();
1187-
for (arr, field) in to_write.iter().zip(&schema.fields) {
1188-
for leaves in compute_leaves(field, arr).unwrap() {
1189-
worker_iter.next().unwrap().1.send(leaves).unwrap();
1183+
// Send the ArrowLeafColumn data to the respective column writer channels
1184+
for (channel_idx, (array, field)) in to_write.iter().zip(schema.fields()).enumerate() {
1185+
for c in compute_leaves(field, array).iter().flat_map(|x| x) {
1186+
let _ = col_array_channels[channel_idx].send(c.clone()).await;
11901187
}
11911188
}
1189+
drop(col_array_channels);
11921190

1193-
// Wait for the workers to complete encoding, and append
1191+
// Wait for all column writers to finish writing
1192+
let mut finalized_rg = Vec::with_capacity(num_columns);
1193+
for task in col_writer_tasks.into_iter() {
1194+
finalized_rg.push(task.await);
1195+
}
1196+
1197+
// Wait for the workers to complete writing then append
11941198
// the resulting column chunks to the row group (and the file)
11951199
let mut row_group_writer = file_writer.next_row_group().unwrap();
1196-
1197-
for (handle, send) in workers {
1198-
drop(send); // Drop send side to signal termination
1199-
// wait for the worker to send the completed chunk
1200-
let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1201-
chunk.append_to_row_group(&mut row_group_writer).unwrap();
1200+
for chunk in finalized_rg {
1201+
chunk
1202+
.unwrap()
1203+
.unwrap()
1204+
.append_to_row_group(&mut row_group_writer)
1205+
.unwrap();
12021206
}
1207+
12031208
// Close the row group which writes to the underlying file
12041209
row_group_writer.close().unwrap();
12051210

0 commit comments

Comments
 (0)