Skip to content

Commit 2a5a365

Browse files
gbrgrvustef
andauthored
feat(core): Add incremental scan for appends and positional deletes (#3)
* WIP, initial draft of incremental scan * . * . * cargo fmt * Implement unzipped stream * Remove printlns * Add API method for unzipped stream * . * Remove comment * Rename var * Add import * Measure time * Fix typo * Undo some changes * Change type name * Add comment header * Fail when encountering equality deletes * Add comments * Add some preliminary tests * Format * Remove playground * Add more tests * Clippy * . * . * Adapt tests * . * Add test * Add tests * Add tests * Format * Add test * Format * . * Rm newline * Rename trait function * Reuse schema * . * remove clone * Add test for adding file_path column * Make `from_snapshot` mandatory * Error out if incremental scan encounters neither Append nor Delete * . * Add materialized variant of add_file_path_column * . * Allow dead code * Some PR comments * . * More PR comments * . * Add comments * Avoid cloning * Add reference to PR * Some PR comments * . * format * Allow overwrite operation for now * Fix file_path column * Add overwrite test * Unwrap delete vector * . * Add assertion * Avoid cloning the mutex guard * Abort when encountering a deleted delete file * Adjust comment * Update crates/iceberg/src/arrow/reader.rs Co-authored-by: Vukasin Stefanovic <[email protected]> * Add check * Update crates/iceberg/src/scan/incremental/mod.rs --------- Co-authored-by: Vukasin Stefanovic <[email protected]>
1 parent bc2ea1b commit 2a5a365

File tree

16 files changed

+3456
-24
lines changed

16 files changed

+3456
-24
lines changed

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,22 @@ enum EqDelState {
3535
}
3636

3737
#[derive(Debug, Default)]
38-
struct DeleteFileFilterState {
38+
pub(crate) struct DeleteFileFilterState {
3939
delete_vectors: HashMap<String, Arc<Mutex<DeleteVector>>>,
4040
equality_deletes: HashMap<String, EqDelState>,
4141
}
4242

43+
impl DeleteFileFilterState {
44+
pub fn delete_vectors(&self) -> &HashMap<String, Arc<Mutex<DeleteVector>>> {
45+
&self.delete_vectors
46+
}
47+
48+
/// Remove and return the delete vector for the given data file path.
49+
pub fn remove_delete_vector(&mut self, path: &str) -> Option<Arc<Mutex<DeleteVector>>> {
50+
self.delete_vectors.remove(path)
51+
}
52+
}
53+
4354
#[derive(Clone, Debug, Default)]
4455
pub(crate) struct DeleteFilter {
4556
state: Arc<RwLock<DeleteFileFilterState>>,
@@ -65,6 +76,28 @@ impl DeleteFilter {
6576
.and_then(|st| st.delete_vectors.get(delete_file_path).cloned())
6677
}
6778

79+
pub(crate) fn with_read<F, G>(&self, f: F) -> Result<G>
80+
where F: FnOnce(&DeleteFileFilterState) -> Result<G> {
81+
let state = self.state.read().map_err(|e| {
82+
Error::new(
83+
ErrorKind::Unexpected,
84+
format!("Failed to acquire read lock: {}", e),
85+
)
86+
})?;
87+
f(&state)
88+
}
89+
90+
pub(crate) fn with_write<F, G>(&self, f: F) -> Result<G>
91+
where F: FnOnce(&mut DeleteFileFilterState) -> Result<G> {
92+
let mut state = self.state.write().map_err(|e| {
93+
Error::new(
94+
ErrorKind::Unexpected,
95+
format!("Failed to acquire write lock: {}", e),
96+
)
97+
})?;
98+
f(&mut state)
99+
}
100+
68101
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
69102
let mut state = self.state.write().unwrap();
70103

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::pin::Pin;
19+
use std::sync::Arc;
20+
21+
use arrow_array::{RecordBatch, UInt64Array};
22+
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
23+
use futures::channel::mpsc::channel;
24+
use futures::stream::select;
25+
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
26+
27+
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
28+
use crate::arrow::{
29+
ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_FIELD_ID_FILE_PATH, StreamsInto,
30+
};
31+
use crate::delete_vector::DeleteVector;
32+
use crate::io::FileIO;
33+
use crate::runtime::spawn;
34+
use crate::scan::ArrowRecordBatchStream;
35+
use crate::scan::incremental::{
36+
AppendedFileScanTask, IncrementalFileScanTask, IncrementalFileScanTaskStream,
37+
};
38+
use crate::{Error, ErrorKind, Result};
39+
40+
/// The type of incremental batch: appended data or deleted records.
41+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42+
pub enum IncrementalBatchType {
43+
/// Appended records.
44+
Append,
45+
/// Deleted records.
46+
Delete,
47+
}
48+
49+
/// The stream of incremental Arrow `RecordBatch`es with batch type.
50+
pub type CombinedIncrementalBatchRecordStream =
51+
Pin<Box<dyn Stream<Item = Result<(IncrementalBatchType, RecordBatch)>> + Send + 'static>>;
52+
53+
/// Stream type for obtaining a separate stream of appended and deleted record batches.
54+
pub type UnzippedIncrementalBatchRecordStream = (ArrowRecordBatchStream, ArrowRecordBatchStream);
55+
56+
impl StreamsInto<ArrowReader, CombinedIncrementalBatchRecordStream>
57+
for IncrementalFileScanTaskStream
58+
{
59+
/// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns a
60+
/// stream of Arrow `RecordBatch`es containing the data from the files.
61+
fn stream(self, reader: ArrowReader) -> Result<CombinedIncrementalBatchRecordStream> {
62+
let (appends, deletes) =
63+
StreamsInto::<ArrowReader, UnzippedIncrementalBatchRecordStream>::stream(self, reader)?;
64+
65+
let left = appends.map(|res| res.map(|batch| (IncrementalBatchType::Append, batch)));
66+
let right = deletes.map(|res| res.map(|batch| (IncrementalBatchType::Delete, batch)));
67+
68+
Ok(Box::pin(select(left, right)) as CombinedIncrementalBatchRecordStream)
69+
}
70+
}
71+
72+
impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
73+
for IncrementalFileScanTaskStream
74+
{
75+
/// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns two
76+
/// separate streams of Arrow `RecordBatch`es containing appended data and deleted records.
77+
fn stream(self, reader: ArrowReader) -> Result<UnzippedIncrementalBatchRecordStream> {
78+
let (appends_tx, appends_rx) = channel(reader.concurrency_limit_data_files);
79+
let (deletes_tx, deletes_rx) = channel(reader.concurrency_limit_data_files);
80+
81+
let batch_size = reader.batch_size;
82+
let concurrency_limit_data_files = reader.concurrency_limit_data_files;
83+
84+
spawn(async move {
85+
let _ = self
86+
.try_for_each_concurrent(concurrency_limit_data_files, |task| {
87+
let file_io = reader.file_io.clone();
88+
let mut appends_tx = appends_tx.clone();
89+
let mut deletes_tx = deletes_tx.clone();
90+
async move {
91+
match task {
92+
IncrementalFileScanTask::Append(append_task) => {
93+
spawn(async move {
94+
let record_batch_stream = process_incremental_append_task(
95+
append_task,
96+
batch_size,
97+
file_io,
98+
)
99+
.await;
100+
101+
match record_batch_stream {
102+
Ok(mut stream) => {
103+
while let Some(batch) = stream.next().await {
104+
let result = appends_tx
105+
.send(batch.map_err(|e| {
106+
Error::new(
107+
ErrorKind::Unexpected,
108+
"failed to read appended record batch",
109+
)
110+
.with_source(e)
111+
}))
112+
.await;
113+
114+
if result.is_err() {
115+
break;
116+
}
117+
}
118+
}
119+
Err(e) => {
120+
let _ = appends_tx.send(Err(e)).await;
121+
}
122+
}
123+
});
124+
}
125+
IncrementalFileScanTask::Delete(file_path, delete_vector) => {
126+
spawn(async move {
127+
let record_batch_stream = process_incremental_delete_task(
128+
file_path,
129+
delete_vector,
130+
batch_size,
131+
);
132+
133+
match record_batch_stream {
134+
Ok(mut stream) => {
135+
while let Some(batch) = stream.next().await {
136+
let result = deletes_tx
137+
.send(batch.map_err(|e| {
138+
Error::new(
139+
ErrorKind::Unexpected,
140+
"failed to read deleted record batch",
141+
)
142+
.with_source(e)
143+
}))
144+
.await;
145+
146+
if result.is_err() {
147+
break;
148+
}
149+
}
150+
}
151+
Err(e) => {
152+
let _ = deletes_tx.send(Err(e)).await;
153+
}
154+
}
155+
});
156+
}
157+
};
158+
159+
Ok(())
160+
}
161+
})
162+
.await;
163+
});
164+
165+
Ok((
166+
Box::pin(appends_rx) as ArrowRecordBatchStream,
167+
Box::pin(deletes_rx) as ArrowRecordBatchStream,
168+
))
169+
}
170+
}
171+
172+
async fn process_incremental_append_task(
173+
task: AppendedFileScanTask,
174+
batch_size: Option<usize>,
175+
file_io: FileIO,
176+
) -> Result<ArrowRecordBatchStream> {
177+
let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder(
178+
&task.data_file_path,
179+
file_io,
180+
true,
181+
)
182+
.await?;
183+
184+
// Create a projection mask for the batch stream to select which columns in the
185+
// Parquet file that we want in the response
186+
let projection_mask = ArrowReader::get_arrow_projection_mask(
187+
&task.project_field_ids,
188+
&task.schema_ref(),
189+
record_batch_stream_builder.parquet_schema(),
190+
record_batch_stream_builder.schema(),
191+
)?;
192+
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
193+
194+
// RecordBatchTransformer performs any transformations required on the RecordBatches
195+
// that come back from the file, such as type promotion, default column insertion
196+
// and column re-ordering
197+
let mut record_batch_transformer =
198+
RecordBatchTransformer::build(task.schema_ref(), &task.project_field_ids);
199+
200+
if let Some(batch_size) = batch_size {
201+
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
202+
}
203+
204+
// Apply positional deletes as row selections.
205+
let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes {
206+
Some(ArrowReader::build_deletes_row_selection(
207+
record_batch_stream_builder.metadata().row_groups(),
208+
&None,
209+
&positional_delete_indexes.lock().unwrap(),
210+
)?)
211+
} else {
212+
None
213+
};
214+
215+
if let Some(row_selection) = row_selection {
216+
record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection);
217+
}
218+
219+
// Build the batch stream and send all the RecordBatches that it generates
220+
// to the requester.
221+
let record_batch_stream = record_batch_stream_builder
222+
.build()?
223+
.map(move |batch| match batch {
224+
Ok(batch) => record_batch_transformer.process_record_batch(batch),
225+
Err(err) => Err(err.into()),
226+
});
227+
228+
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
229+
}
230+
231+
fn process_incremental_delete_task(
232+
file_path: String,
233+
delete_vector: DeleteVector,
234+
batch_size: Option<usize>,
235+
) -> Result<ArrowRecordBatchStream> {
236+
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
237+
"pos",
238+
DataType::UInt64,
239+
false,
240+
)]));
241+
242+
let batch_size = batch_size.unwrap_or(1024);
243+
244+
let treemap = delete_vector.inner;
245+
246+
let stream = futures::stream::iter(treemap)
247+
.chunks(batch_size)
248+
.map(move |chunk| {
249+
let array = UInt64Array::from_iter(chunk);
250+
RecordBatch::try_new(
251+
Arc::clone(&schema), // Cheap Arc clone instead of full schema creation
252+
vec![Arc::new(array)],
253+
)
254+
.map_err(|_| {
255+
Error::new(
256+
ErrorKind::Unexpected,
257+
"Failed to create RecordBatch for DeleteVector",
258+
)
259+
})
260+
.and_then(|batch| {
261+
ArrowReader::add_file_path_column(
262+
batch,
263+
&file_path,
264+
RESERVED_COL_NAME_FILE_PATH,
265+
RESERVED_FIELD_ID_FILE_PATH,
266+
)
267+
})
268+
});
269+
270+
Ok(Box::pin(stream) as ArrowRecordBatchStream)
271+
}

crates/iceberg/src/arrow/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ pub mod record_batch_projector;
3333
pub(crate) mod record_batch_transformer;
3434
mod value;
3535

36+
mod incremental;
37+
pub use incremental::*;
3638
pub use reader::*;
3739
pub use value::*;
3840
/// Partition value calculator for computing partition values

0 commit comments

Comments
 (0)