From 718c8674820100c41b8e9cdfd518ba75d151de45 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Mar 2026 16:14:33 -0400 Subject: [PATCH] Disconnect all cdc logic --- parquet/src/arrow/arrow_writer/levels.rs | 196 ----------------------- parquet/src/arrow/arrow_writer/mod.rs | 68 +------- parquet/src/column/chunker/cdc.rs | 2 +- parquet/src/column/chunker/mod.rs | 2 - parquet/src/schema/types.rs | 157 +----------------- 5 files changed, 5 insertions(+), 420 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index d1da24872c49..0ff2137d907e 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -40,7 +40,6 @@ //! //! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding) -use crate::column::chunker::CdcChunk; use crate::errors::{ParquetError, Result}; use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, OffsetSizeTrait}; @@ -802,58 +801,11 @@ impl ArrayLevels { pub fn non_null_indices(&self) -> &[usize] { &self.non_null_indices } - - /// Create a sliced view of this `ArrayLevels` for a CDC chunk. - /// - /// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied (not zero-copy), - /// while `array` is sliced without copying. - pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self { - let level_offset = chunk.level_offset; - let num_levels = chunk.num_levels; - let value_offset = chunk.value_offset; - let num_values = chunk.num_values; - let def_levels = self - .def_levels - .as_ref() - .map(|levels| levels[level_offset..level_offset + num_levels].to_vec()); - let rep_levels = self - .rep_levels - .as_ref() - .map(|levels| levels[level_offset..level_offset + num_levels].to_vec()); - - // Filter non_null_indices to [value_offset, value_offset + num_values) - // and shift by -value_offset. Use binary search since the slice is sorted. - let value_end = value_offset + num_values; - let start = self - .non_null_indices - .partition_point(|&idx| idx < value_offset); - let end = self - .non_null_indices - .partition_point(|&idx| idx < value_end); - let non_null_indices: Vec = self.non_null_indices[start..end] - .iter() - .map(|&idx| idx - value_offset) - .collect(); - - let array = self.array.slice(value_offset, num_values); - let logical_nulls = array.logical_nulls(); - - Self { - def_levels, - rep_levels, - non_null_indices, - max_def_level: self.max_def_level, - max_rep_level: self.max_rep_level, - array, - logical_nulls, - } - } } #[cfg(test)] mod tests { use super::*; - use crate::column::chunker::CdcChunk; use arrow_array::builder::*; use arrow_array::types::Int32Type; @@ -2144,152 +2096,4 @@ mod tests { let v = Arc::new(array) as ArrayRef; LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap() } - - #[test] - fn test_slice_for_chunk_flat() { - // Case 1: required field (max_def_level=0, no def/rep levels stored). - // Array has 6 values; all are non-null so non_null_indices covers every position. - // The chunk selects value_offset=2, num_values=3 → the sub-array [3, 4, 5]. - // Since there are no levels, num_levels=0 and level_offset are irrelevant. - // non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2 → [0,1,2]. - let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])); - let logical_nulls = array.logical_nulls(); - let levels = ArrayLevels { - def_levels: None, - rep_levels: None, - non_null_indices: vec![0, 1, 2, 3, 4, 5], - max_def_level: 0, - max_rep_level: 0, - array, - logical_nulls, - }; - let sliced = levels.slice_for_chunk(&CdcChunk { - level_offset: 0, - num_levels: 0, - value_offset: 2, - num_values: 3, - }); - assert!(sliced.def_levels.is_none()); - assert!(sliced.rep_levels.is_none()); - assert_eq!(sliced.non_null_indices, vec![0, 1, 2]); - assert_eq!(sliced.array.len(), 3); - - // Case 2: optional field (max_def_level=1, def levels present, no rep levels). - // Array: [Some(1), None, Some(3), None, Some(5), Some(6)] - // def_levels: [1, 0, 1, 0, 1, 1] (1=non-null, 0=null) - // non_null_indices: [0, 2, 4, 5] (array positions of the four non-null values) - // - // The chunk selects level_offset=1, num_levels=3, value_offset=1, num_values=3: - // - def_levels[1..4] = [0, 1, 0] → null, non-null, null - // - sub-array slice(1, 3) = [None, Some(3), None] - // - non_null_indices filtered to [value_offset=1, value_end=4): only index 2 qualifies, - // shifted by -1 → [1] (position of Some(3) within the sliced sub-array) - let array: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(1), - None, - Some(3), - None, - Some(5), - Some(6), - ])); - let logical_nulls = array.logical_nulls(); - let levels = ArrayLevels { - def_levels: Some(vec![1, 0, 1, 0, 1, 1]), - rep_levels: None, - non_null_indices: vec![0, 2, 4, 5], - max_def_level: 1, - max_rep_level: 0, - array, - logical_nulls, - }; - let sliced = levels.slice_for_chunk(&CdcChunk { - level_offset: 1, - num_levels: 3, - value_offset: 1, - num_values: 3, - }); - assert_eq!(sliced.def_levels, Some(vec![0, 1, 0])); - assert!(sliced.rep_levels.is_none()); - assert_eq!(sliced.non_null_indices, vec![1]); - assert_eq!(sliced.array.len(), 3); - } - - #[test] - fn test_slice_for_chunk_nested() { - // [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1] - // Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5 - let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); - let logical_nulls = array.logical_nulls(); - let levels = ArrayLevels { - def_levels: Some(vec![2, 2, 2, 2, 2]), - rep_levels: Some(vec![0, 1, 0, 0, 1]), - non_null_indices: vec![0, 1, 2, 3, 4], - max_def_level: 2, - max_rep_level: 1, - array, - logical_nulls, - }; - let sliced = levels.slice_for_chunk(&CdcChunk { - level_offset: 2, - num_levels: 3, - value_offset: 2, - num_values: 3, - }); - assert_eq!(sliced.def_levels, Some(vec![2, 2, 2])); - assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1])); - // [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2] - assert_eq!(sliced.non_null_indices, vec![0, 1, 2]); - assert_eq!(sliced.array.len(), 3); - } - - #[test] - fn test_slice_for_chunk_non_null_indices_boundary() { - // [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / exclusive upper bounds - let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); - let logical_nulls = array.logical_nulls(); - let levels = ArrayLevels { - def_levels: Some(vec![1, 0, 1]), - rep_levels: None, - non_null_indices: vec![0, 2], - max_def_level: 1, - max_rep_level: 0, - array, - logical_nulls, - }; - assert_eq!( - levels - .slice_for_chunk(&CdcChunk { - level_offset: 0, - num_levels: 1, - value_offset: 0, - num_values: 1 - }) - .non_null_indices, - vec![0] - ); - // idx 2 in range [1,3), shifted -1 → 1 - assert_eq!( - levels - .slice_for_chunk(&CdcChunk { - level_offset: 1, - num_levels: 2, - value_offset: 1, - num_values: 2 - }) - .non_null_indices, - vec![1] - ); - // idx 2 excluded from [1,2) - assert_eq!( - levels - .slice_for_chunk(&CdcChunk { - level_offset: 1, - num_levels: 1, - value_offset: 1, - num_values: 1 - }) - .non_null_indices, - Vec::::new() - ); - } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 2ef71d5745a2..5514e8e1d3c1 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -17,8 +17,6 @@ //! Contains writer which writes arrow data into parquet data. -use crate::column::chunker::ContentDefinedChunker; - use bytes::Bytes; use std::io::{Read, Write}; use std::iter::Peekable; @@ -194,9 +192,6 @@ pub struct ArrowWriter { /// The maximum size in bytes for a row group, or None for unlimited max_row_group_bytes: Option, - - /// CDC chunkers persisted across row groups (one per leaf column). - cdc_chunkers: Option>, } impl std::fmt::Debug for ArrowWriter { @@ -266,18 +261,6 @@ impl ArrowWriter { let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone()); - let cdc_chunkers = props_ptr - .content_defined_chunking() - .map(|opts| { - file_writer - .schema_descr() - .columns() - .iter() - .map(|desc| ContentDefinedChunker::new(desc, opts)) - .collect::>>() - }) - .transpose()?; - Ok(Self { writer: file_writer, in_progress: None, @@ -285,7 +268,6 @@ impl ArrowWriter { row_group_writer_factory, max_row_group_row_count, max_row_group_bytes, - cdc_chunkers, }) } @@ -401,10 +383,7 @@ impl ArrowWriter { } } - match self.cdc_chunkers.as_mut() { - Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?, - None => in_progress.write(batch)?, - } + in_progress.write(batch)?; let should_flush = self .max_row_group_row_count @@ -893,32 +872,6 @@ impl ArrowColumnWriter { self.write_internal(&col.0) } - /// Write with content-defined chunking, inserting page flushes at chunk boundaries. - fn write_with_chunker( - &mut self, - col: &ArrowLeafColumn, - chunker: &mut ContentDefinedChunker, - ) -> Result<()> { - let levels = &col.0; - let chunks = - chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?; - - let num_chunks = chunks.len(); - for (i, chunk) in chunks.iter().enumerate() { - let chunk_levels = levels.slice_for_chunk(chunk); - self.write_internal(&chunk_levels)?; - - // Add a page break after each chunk except the last - if i + 1 < num_chunks { - match &mut self.writer { - ArrowColumnWriterImpl::Column(c) => c.add_data_page()?, - ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?, - } - } - } - Ok(()) - } - fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> { match &mut self.writer { ArrowColumnWriterImpl::Column(c) => { @@ -1015,25 +968,6 @@ impl ArrowRowGroupWriter { Ok(()) } - fn write_with_chunkers( - &mut self, - batch: &RecordBatch, - chunkers: &mut [ContentDefinedChunker], - ) -> Result<()> { - self.buffered_rows += batch.num_rows(); - let mut writers = self.writers.iter_mut(); - let mut chunkers = chunkers.iter_mut(); - for (field, column) in self.schema.fields().iter().zip(batch.columns()) { - for leaf in compute_leaves(field.as_ref(), column)? { - writers - .next() - .unwrap() - .write_with_chunker(&leaf, chunkers.next().unwrap())?; - } - } - Ok(()) - } - /// Returns the estimated total encoded bytes for this row group fn get_estimated_total_bytes(&self) -> usize { self.writers diff --git a/parquet/src/column/chunker/cdc.rs b/parquet/src/column/chunker/cdc.rs index a1fef9e31995..30fb4f741035 100644 --- a/parquet/src/column/chunker/cdc.rs +++ b/parquet/src/column/chunker/cdc.rs @@ -127,7 +127,7 @@ impl ContentDefinedChunker { Ok(Self { max_def_level: desc.max_def_level(), max_rep_level: desc.max_rep_level(), - repeated_ancestor_def_level: desc.repeated_ancestor_def_level(), + repeated_ancestor_def_level: 0, min_chunk_size: options.min_chunk_size as i64, max_chunk_size: options.max_chunk_size as i64, rolling_hash_mask, diff --git a/parquet/src/column/chunker/mod.rs b/parquet/src/column/chunker/mod.rs index c4caf18af66b..630df9fa0fd9 100644 --- a/parquet/src/column/chunker/mod.rs +++ b/parquet/src/column/chunker/mod.rs @@ -24,8 +24,6 @@ mod cdc; mod cdc_generated; -pub(crate) use cdc::ContentDefinedChunker; - /// A chunk of data with level and value offsets for record-shredded nested data. #[derive(Debug, Clone, Copy)] pub(crate) struct CdcChunk { diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 2925557e7b86..85f3ed48972c 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -853,9 +853,6 @@ pub struct ColumnDescriptor { /// The maximum repetition level for this column max_rep_level: i16, - /// The definition level at the nearest REPEATED ancestor, or 0 if none. - repeated_ancestor_def_level: i16, - /// The path of this column. For instance, "a.b.c.d". path: ColumnPath, } @@ -875,22 +872,11 @@ impl ColumnDescriptor { max_def_level: i16, max_rep_level: i16, path: ColumnPath, - ) -> Self { - Self::new_with_repeated_ancestor(primitive_type, max_def_level, max_rep_level, path, 0) - } - - pub(crate) fn new_with_repeated_ancestor( - primitive_type: TypePtr, - max_def_level: i16, - max_rep_level: i16, - path: ColumnPath, - repeated_ancestor_def_level: i16, ) -> Self { Self { primitive_type, max_def_level, max_rep_level, - repeated_ancestor_def_level, path, } } @@ -907,12 +893,6 @@ impl ColumnDescriptor { self.max_rep_level } - /// Returns the definition level at the nearest REPEATED ancestor, or 0 if none. - #[inline] - pub fn repeated_ancestor_def_level(&self) -> i16 { - self.repeated_ancestor_def_level - } - /// Returns [`ColumnPath`] for this column. pub fn path(&self) -> &ColumnPath { &self.path @@ -1089,16 +1069,7 @@ impl SchemaDescriptor { let mut path = Vec::with_capacity(INIT_SCHEMA_DEPTH); for (root_idx, f) in tp.get_fields().iter().enumerate() { path.clear(); - build_tree( - f, - root_idx, - 0, - 0, - 0, - &mut leaves, - &mut leaf_to_base, - &mut path, - ); + build_tree(f, root_idx, 0, 0, &mut leaves, &mut leaf_to_base, &mut path); } Self { @@ -1220,13 +1191,11 @@ fn count_leaves(tp: &TypePtr, n_leaves: &mut usize) { } } -#[allow(clippy::too_many_arguments)] fn build_tree<'a>( tp: &'a TypePtr, root_idx: usize, mut max_rep_level: i16, mut max_def_level: i16, - mut repeated_ancestor_def_level: i16, leaves: &mut Vec, leaf_to_base: &mut Vec, path_so_far: &mut Vec<&'a str>, @@ -1241,7 +1210,6 @@ fn build_tree<'a>( Repetition::REPEATED => { max_def_level += 1; max_rep_level += 1; - repeated_ancestor_def_level = max_def_level; } _ => {} } @@ -1250,14 +1218,12 @@ fn build_tree<'a>( Type::PrimitiveType { .. } => { let mut path: Vec = vec![]; path.extend(path_so_far.iter().copied().map(String::from)); - let desc = ColumnDescriptor::new_with_repeated_ancestor( + leaves.push(Arc::new(ColumnDescriptor::new( tp.clone(), max_def_level, max_rep_level, ColumnPath::new(path), - repeated_ancestor_def_level, - ); - leaves.push(Arc::new(desc)); + ))); leaf_to_base.push(root_idx); } Type::GroupType { fields, .. } => { @@ -1267,7 +1233,6 @@ fn build_tree<'a>( root_idx, max_rep_level, max_def_level, - repeated_ancestor_def_level, leaves, leaf_to_base, path_so_far, @@ -1976,122 +1941,6 @@ mod tests { assert_eq!(descr.column(3).max_rep_level(), 1); } - #[test] - fn test_schema_build_tree_repeated_ancestor_def_level() { - // Flat columns: no REPEATED ancestor → repeated_ancestor_def_level = 0 - let message_type = " - message m { - REQUIRED INT32 a; - OPTIONAL INT32 b; - OPTIONAL group s { - OPTIONAL INT32 x; - } - } - "; - let schema = parse_message_type(message_type).expect("should parse schema"); - let descr = SchemaDescriptor::new(Arc::new(schema)); - assert_eq!(descr.column(0).repeated_ancestor_def_level(), 0); // a - assert_eq!(descr.column(1).repeated_ancestor_def_level(), 0); // b - assert_eq!(descr.column(2).repeated_ancestor_def_level(), 0); // s.x - - // Standard list: OPTIONAL outer, REPEATED group, OPTIONAL element - // repeated_ancestor_def_level is the def_level at the REPEATED group (= 2) - let message_type = " - message m { - OPTIONAL group c (LIST) { - REPEATED group list { - OPTIONAL INT32 element; - } - } - } - "; - let schema = parse_message_type(message_type).expect("should parse schema"); - let descr = SchemaDescriptor::new(Arc::new(schema)); - // c(optional)=1, list(repeated)=2, element(optional)=3 - assert_eq!(descr.column(0).max_def_level(), 3); - assert_eq!(descr.column(0).max_rep_level(), 1); - assert_eq!(descr.column(0).repeated_ancestor_def_level(), 2); - - // Required list: REQUIRED outer, REPEATED group, REQUIRED element - // No OPTIONAL nodes between REPEATED and leaf, so repeated_ancestor_def_level == max_def_level - let message_type = " - message m { - REQUIRED group c (LIST) { - REPEATED group list { - REQUIRED INT32 element; - } - } - } - "; - let schema = parse_message_type(message_type).expect("should parse schema"); - let descr = SchemaDescriptor::new(Arc::new(schema)); - // list(repeated)=1, element(required)=1 - assert_eq!(descr.column(0).max_def_level(), 1); - assert_eq!(descr.column(0).max_rep_level(), 1); - assert_eq!(descr.column(0).repeated_ancestor_def_level(), 1); - - // Nested lists: innermost REPEATED wins - let message_type = " - message m { - OPTIONAL group outer (LIST) { - REPEATED group list { - OPTIONAL group inner (LIST) { - REPEATED group list2 { - OPTIONAL INT32 element; - } - } - } - } - } - "; - let schema = parse_message_type(message_type).expect("should parse schema"); - let descr = SchemaDescriptor::new(Arc::new(schema)); - // outer(opt)=1, list(rep)=2, inner(opt)=3, list2(rep)=4, element(opt)=5 - assert_eq!(descr.column(0).max_def_level(), 5); - assert_eq!(descr.column(0).max_rep_level(), 2); - assert_eq!(descr.column(0).repeated_ancestor_def_level(), 4); - - // Struct inside list: all sibling leaves share the same repeated_ancestor_def_level - let message_type = " - message m { - OPTIONAL group bag (LIST) { - REPEATED group list { - REQUIRED group item { - OPTIONAL INT32 x; - REQUIRED INT32 y; - } - } - } - } - "; - let schema = parse_message_type(message_type).expect("should parse schema"); - let descr = SchemaDescriptor::new(Arc::new(schema)); - // bag(opt)=1, list(rep)=2, item(req)=2, x(opt)=3 - assert_eq!(descr.column(0).repeated_ancestor_def_level(), 2); // bag.list.item.x - // bag(opt)=1, list(rep)=2, item(req)=2, y(req)=2 - assert_eq!(descr.column(1).repeated_ancestor_def_level(), 2); // bag.list.item.y - - // Map type: key (required) and value (optional) under the same REPEATED group - let message_type = " - message m { - OPTIONAL group my_map (MAP) { - REPEATED group key_value { - REQUIRED BYTE_ARRAY key (UTF8); - OPTIONAL INT32 value; - } - } - } - "; - let schema = parse_message_type(message_type).expect("should parse schema"); - let descr = SchemaDescriptor::new(Arc::new(schema)); - // my_map(opt)=1, key_value(rep)=2, key(req)=2 - assert_eq!(descr.column(0).max_def_level(), 2); - assert_eq!(descr.column(0).repeated_ancestor_def_level(), 2); // key: max_def == repeated_ancestor - // my_map(opt)=1, key_value(rep)=2, value(opt)=3 - assert_eq!(descr.column(1).max_def_level(), 3); - assert_eq!(descr.column(1).repeated_ancestor_def_level(), 2); // value: max_def > repeated_ancestor - } - #[test] #[should_panic(expected = "Cannot call get_physical_type() on a non-primitive type")] fn test_get_physical_type_panic() {