From 93ee79e03c05f9c07f7bb82c31937f0814ac4885 Mon Sep 17 00:00:00 2001 From: "@TT" <1sand0s@users.noreply.github.com> Date: Mon, 9 Jun 2025 01:01:24 -0700 Subject: [PATCH 1/6] Merging changes from michael/sym-eval into aditya/sym_eval --- arrow-array/src/array/mod.rs | 10 ---------- arrow-array/src/record_batch.rs | 5 +---- arrow-select/src/concat.rs | 6 ++++-- arrow-select/src/filter.rs | 1 + 4 files changed, 6 insertions(+), 16 deletions(-) diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 7222bd323ef1..d1fdefb19743 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -116,8 +116,6 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the underlying data of this array fn to_data(&self) -> ArrayData; - // fn to_symbolic_data(&self) -> SymbolicArrayData; - /// Returns the underlying data of this array /// /// Unlike [`Array::to_data`] this consumes self, allowing it avoid unnecessary clones @@ -368,10 +366,6 @@ impl Array for ArrayRef { self.as_ref().to_data() } - // fn to_symbolic_data(&self) -> SymbolicArrayData { - // self.as_ref().to_symbolic_data() - // } - fn into_data(self) -> ArrayData { self.to_data() } @@ -459,10 +453,6 @@ impl Array for &T { T::to_data(self) } - // fn to_symbolic_data(&self) -> SymbolicArrayData { - // T::to_symbolic_data(self) - // } - fn into_data(self) -> ArrayData { self.to_data() } diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 66877629c26c..33596f58b1b3 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -632,10 +632,7 @@ impl RecordBatch { .map(|column| column.slice(offset, length)) .collect(); - let constraints = self - .constraints - .as_ref() - .map(|c| c[offset..offset + length].to_vec()); + let constraints = self.constraints.clone(); Self { schema: self.schema.clone(), diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index c31ea3a897e3..53848370876a 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -270,14 +270,16 @@ pub fn concat_batches<'a>( input_batches: impl IntoIterator, ) -> Result { // When schema is empty, sum the number of the rows of all batches + let x = input_batches.into_iter().collect::>(); + dbg!(&x); if schema.fields().is_empty() { - let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum(); + let num_rows: usize = x.iter().cloned().map(RecordBatch::num_rows).sum(); let mut options = RecordBatchOptions::default(); options.row_count = Some(num_rows); return RecordBatch::try_new_with_options(schema.clone(), vec![], &options); } - let batches: Vec<&RecordBatch> = input_batches.into_iter().collect(); + let batches: Vec<&RecordBatch> = x; if batches.is_empty() { return Ok(RecordBatch::new_empty(schema.clone())); } diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index a25c3be0db35..577d370cffed 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -203,6 +203,7 @@ pub fn filter_record_batch( record_batch: &RecordBatch, predicate: &BooleanArray, ) -> Result { + dbg!(&record_batch); let mut filter_builder = FilterBuilder::new(predicate); if record_batch.num_columns() > 1 { // Only optimize if filtering more than one column From a7740a753b3880b594c6be39c350ec5ea76946ea Mon Sep 17 00:00:00 2001 From: Aditya Thimmaiah <1sand0sardpi@gmail.com> Date: Thu, 12 Jun 2025 02:58:51 -0700 Subject: [PATCH 2/6] Removing debug prints (dbg) --- arrow-array/src/array/byte_array.rs | 1 - arrow-array/src/record_batch.rs | 1 - arrow-select/src/concat.rs | 18 ++++++++++++++---- arrow-select/src/filter.rs | 11 +++++++---- arrow/tests/shrink_to_fit.rs | 1 - 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/arrow-array/src/array/byte_array.rs b/arrow-array/src/array/byte_array.rs index 8cd69e6a21d9..e7e6d27c12e0 100644 --- a/arrow-array/src/array/byte_array.rs +++ b/arrow-array/src/array/byte_array.rs @@ -530,7 +530,6 @@ impl Array for GenericByteArray { } fn to_symbolic_data(&self) -> super::SymbolicArrayData { - dbg!(&self.data_type, &self, &self.symbolic_data); self.symbolic_data .clone() .unwrap_or(self.make_symbolic_data()) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 33596f58b1b3..362645d3370b 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -287,7 +287,6 @@ impl RecordBatch { options: &RecordBatchOptions, constraints: Option>, ) -> Result { - // dbg!(&columns); // check that number of fields in schema match column length if schema.fields().len() != columns.len() { return Err(ArrowError::InvalidArgumentError(format!( diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 53848370876a..a69c5e9a6dac 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -255,8 +255,13 @@ fn concat_fallback(arrays: &[&dyn Array], capacity: Capacities) -> Result = arrays + .iter() + .flat_map(|a| a.to_symbolic_data()) + .collect::>(); + let array = array.with_symbolic_data(&symbolic_data); + Ok(array) } /// Concatenates `batches` together into a single [`RecordBatch`]. @@ -271,7 +276,6 @@ pub fn concat_batches<'a>( ) -> Result { // When schema is empty, sum the number of the rows of all batches let x = input_batches.into_iter().collect::>(); - dbg!(&x); if schema.fields().is_empty() { let num_rows: usize = x.iter().cloned().map(RecordBatch::num_rows).sum(); let mut options = RecordBatchOptions::default(); @@ -304,7 +308,13 @@ pub fn concat_batches<'a>( } } let constraints = if is_some { Some(constraints) } else { None }; - RecordBatch::try_new_with_options_and_constraints(schema.clone(), arrays, &options, constraints) + let res = RecordBatch::try_new_with_options_and_constraints( + schema.clone(), + arrays, + &options, + constraints, + ); + res } #[cfg(test)] diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 577d370cffed..b79c80085de5 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -181,7 +181,6 @@ pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> Result Result { - dbg!(&record_batch); let mut filter_builder = FilterBuilder::new(predicate); if record_batch.num_columns() > 1 { // Only optimize if filtering more than one column @@ -223,7 +221,6 @@ pub fn filter_record_batch( .iter() .enumerate() .map(|(i, expr)| { - dbg!(&predicate.value(i)); if predicate.value(i) { expr.clone() } else { @@ -233,7 +230,13 @@ pub fn filter_record_batch( .collect::>() }); - dbg!(&constraints); + let mut all_constraints = if let Some(constraints) = record_batch.constraints() { + constraints.to_vec() + } else { + vec![] + }; + all_constraints.extend(constraints.unwrap_or_default()); + let constraints = Some(all_constraints); RecordBatch::try_new_with_options_and_constraints( record_batch.schema(), diff --git a/arrow/tests/shrink_to_fit.rs b/arrow/tests/shrink_to_fit.rs index 5d7c2cf98bc9..a9c1fe7bb61c 100644 --- a/arrow/tests/shrink_to_fit.rs +++ b/arrow/tests/shrink_to_fit.rs @@ -45,7 +45,6 @@ fn test_shrink_to_fit_after_concat() { memory_use(|| { let mut concatenated = concatenate(num_concats, list_array.clone()); concatenated.shrink_to_fit(); // This is what we're testing! - dbg!(concatenated.data_type()); concatenated }); let expected_len = num_concats * array_len; From c0dc1a4a084f59cff873f44975503119df0ccba6 Mon Sep 17 00:00:00 2001 From: Aditya Thimmaiah <1sand0sardpi@gmail.com> Date: Thu, 12 Jun 2025 03:00:52 -0700 Subject: [PATCH 3/6] Adding datatype to Variable for setting variable type in z3 correctly Exposing arrow datatype to z3 type converter in mod --- arrow-array/src/array/mod.rs | 2 + arrow-array/src/array/symbolic_expr.rs | 52 ++++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index d1fdefb19743..ca862ec31b88 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -80,6 +80,8 @@ use crate::iterator::ArrayIter; mod symbolic_expr; pub use symbolic_expr::make_colref_symbolic_expr_array; +pub use symbolic_expr::arrow_type_to_var_type; +pub use symbolic_expr::VarType as SymbolicVarType; pub use symbolic_expr::Expr as SymbolicExpr; pub use symbolic_expr::Operator as SymbolicOperator; pub use symbolic_expr::ScalarValue as SymbolicScalarValue; diff --git a/arrow-array/src/array/symbolic_expr.rs b/arrow-array/src/array/symbolic_expr.rs index 6ddeba156dd5..31790a62769f 100644 --- a/arrow-array/src/array/symbolic_expr.rs +++ b/arrow-array/src/array/symbolic_expr.rs @@ -1,3 +1,5 @@ +use arrow_schema::DataType; + /// Literal values used in symbolic expressions #[derive(Debug, Clone, PartialEq)] pub enum ScalarValue { @@ -31,6 +33,27 @@ pub enum ScalarValue { Utf8(Option), } +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum VarType { + /// A boolean variable + Boolean, + /// A string variable + String, + /// A int variable + Int, +} + +pub fn arrow_type_to_var_type(arrow_type: DataType) -> VarType { + match arrow_type { + DataType::Boolean => VarType::Boolean, + DataType::Utf8 | DataType::LargeUtf8 => VarType::String, + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 | + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => VarType::Int, + _ => VarType::String, // Default to String for other types + } +} + /// Operators applied to symbolic expressions #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)] pub enum Operator { @@ -60,6 +83,10 @@ pub enum Operator { And, /// Logical OR, like `||` Or, + /// Logical NOT, like `!` + Not, + /// In, like `in` + In, } /// A symbolic expression for a columnar array. @@ -79,8 +106,13 @@ pub enum Expr { column: usize, /// The position of the row row: usize, + /// The variable type + var_type: VarType, }, + /// A list of symbolic expressions + List(Vec), + /// A binary symbolic expression BinaryExpr { /// The left side of the binary expression @@ -108,8 +140,13 @@ impl Expr { } /// Create a symbolic variable - pub fn variable(table: String, column: usize, row: usize) -> Self { - Expr::Variable { table, column, row } + pub fn variable(table: String, column: usize, row: usize, var_type: VarType) -> Self { + Expr::Variable { + table, + column, + row, + var_type, + } } /// Create a binary symbolic expression @@ -138,13 +175,20 @@ impl Expr { } /// Create a symbolic expression array for a column reference using table name and column position -pub fn make_colref_symbolic_expr_array(table: String, column: usize, len: usize) -> Vec { +pub fn make_colref_symbolic_expr_array( + table: String, + column: usize, + len: usize, + row_offset: usize, + var_type: VarType, +) -> Vec { let mut res = Vec::with_capacity(len); for i in 0..len { res.push(Expr::Variable { table: table.clone(), column, - row: i, + row: i + row_offset, + var_type, }); } res From 0d8607c9b5e585585079f07607bf5c310c5369a9 Mon Sep 17 00:00:00 2001 From: Aditya Thimmaiah <1sand0sardpi@gmail.com> Date: Thu, 12 Jun 2025 03:02:25 -0700 Subject: [PATCH 4/6] Changing sym macro to pair elements from joining batches quadratically --- arrow-ord/src/cmp.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs index 0461e33d12a9..7292f2f03a65 100644 --- a/arrow-ord/src/cmp.rs +++ b/arrow-ord/src/cmp.rs @@ -73,13 +73,15 @@ macro_rules! sym { let mut sym_res = vec![]; for i in 0..syms1.len() { - let l = &syms1[i]; - let r = &syms2[i]; - sym_res.push(SymbolicExpr::binary( - l.clone(), - SymbolicOperator::$op, - r.clone(), - )); + for j in 0..syms2.len() { + let l = &syms1[i]; + let r = &syms2[j]; + sym_res.push(SymbolicExpr::binary( + l.clone(), + SymbolicOperator::$op, + r.clone(), + )); + } } Ok(expr.with_symbolic_data(&sym_res)) }}; From 54daa6ce4949e7bd97b3a237e7d4af8c6e00ae24 Mon Sep 17 00:00:00 2001 From: Aditya Thimmaiah <1sand0sardpi@gmail.com> Date: Thu, 12 Jun 2025 03:05:44 -0700 Subject: [PATCH 5/6] Adding symbolic data to primitive array constructed in module The module defines methods to construct primitive array from indexes of the input primitive array. We need to make sure that the symbolic data of the input primitive array is copied into the constructed primitive array --- arrow-select/src/take.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index 71a7c77a8f92..1b45cbc47105 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -92,10 +92,11 @@ pub fn take( take_impl($values, &indices) }}; } - downcast_integer! { + let result = downcast_integer! { indices.data_type() => (helper, values, indices, options), d => Err(ArrowError::InvalidArgumentError(format!("Take only supported for integers, got {d:?}"))) - } + }; + result } /// For each [ArrayRef] in the [`Vec`], take elements by index and create a new @@ -379,7 +380,8 @@ where { let values_buf = take_native(values.values(), indices); let nulls = take_nulls(values.nulls(), indices); - Ok(PrimitiveArray::new(values_buf, nulls).with_data_type(values.data_type().clone())) + let symbolic_data = values.to_symbolic_data(); + Ok(PrimitiveArray::new(values_buf, nulls).with_data_type(values.data_type().clone()).with_symbolic_data(&symbolic_data)) } #[inline(never)] From 86b37fc4c51389aba222d19da26a4da5e9fe346e Mon Sep 17 00:00:00 2001 From: Aditya Thimmaiah <1sand0sardpi@gmail.com> Date: Sun, 15 Jun 2025 21:36:58 -0500 Subject: [PATCH 6/6] Adding row symbolic data to record batch for symbolic evaluation of aggregations --- arrow-array/src/array/mod.rs | 1 + arrow-array/src/array/symbolic_expr.rs | 80 +++++++++++++++++++++++++- arrow-array/src/record_batch.rs | 59 ++++++++++++++++++- arrow-select/src/concat.rs | 8 ++- arrow-select/src/filter.rs | 26 ++++++++- 5 files changed, 166 insertions(+), 8 deletions(-) diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index ca862ec31b88..b857a134943d 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -81,6 +81,7 @@ use crate::iterator::ArrayIter; mod symbolic_expr; pub use symbolic_expr::make_colref_symbolic_expr_array; pub use symbolic_expr::arrow_type_to_var_type; +pub use symbolic_expr::add_constraint_to_row; pub use symbolic_expr::VarType as SymbolicVarType; pub use symbolic_expr::Expr as SymbolicExpr; pub use symbolic_expr::Operator as SymbolicOperator; diff --git a/arrow-array/src/array/symbolic_expr.rs b/arrow-array/src/array/symbolic_expr.rs index 31790a62769f..88b47814b212 100644 --- a/arrow-array/src/array/symbolic_expr.rs +++ b/arrow-array/src/array/symbolic_expr.rs @@ -47,9 +47,16 @@ pub fn arrow_type_to_var_type(arrow_type: DataType) -> VarType { match arrow_type { DataType::Boolean => VarType::Boolean, DataType::Utf8 | DataType::LargeUtf8 => VarType::String, - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 | - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | - DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => VarType::Int, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => VarType::Int, _ => VarType::String, // Default to String for other types } } @@ -110,9 +117,27 @@ pub enum Expr { var_type: VarType, }, + /// A symbolic variable representing a row in a table + Row { + /// The name of the table + table: String, + /// The position of the row + row_id: usize, + /// The row constraint + constraint: Option>, + }, + /// A list of symbolic expressions List(Vec), + /// A count symbolic expression + Count { + /// The column to count + expr: Box, + /// Whether the count is distinct + is_distinct: bool, + }, + /// A binary symbolic expression BinaryExpr { /// The left side of the binary expression @@ -149,6 +174,32 @@ impl Expr { } } + /// Create a symbolic row + pub fn row(table: String, row_id: usize) -> Self { + Expr::Row { + table, + row_id, + constraint: None, + } + } + + /// Create a symbolic row with a constraint + pub fn row_with_constraint(table: String, row_id: usize, constraint: Vec) -> Self { + Expr::Row { + table, + row_id, + constraint: Some(constraint), + } + } + + /// Create a count symbolic expression + pub fn count(expr: Expr, is_distinct: bool) -> Self { + Expr::Count { + expr: Box::new(expr), + is_distinct, + } + } + /// Create a binary symbolic expression pub fn binary(left: Expr, op: Operator, right: Expr) -> Self { Expr::BinaryExpr { @@ -193,3 +244,26 @@ pub fn make_colref_symbolic_expr_array( } res } + +pub fn add_constraint_to_row(row: &mut Expr, constraint: Expr) { + if let Expr::Row { + constraint: Some(constraints), + .. + } = row + { + constraints.push(constraint); + } else if let Expr::Row { + table, + row_id, + constraint: None, + } = row + { + *row = Expr::Row { + table: table.clone(), + row_id: *row_id, + constraint: Some(vec![constraint]), + }; + } else { + panic!("Expr is not a symbolic row"); + } +} diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 362645d3370b..b2e79c240eaf 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -205,6 +205,7 @@ pub struct RecordBatch { /// /// This is stored separately from the columns to handle the case of no columns row_count: usize, + row_symbolic_data: Option>, constraints: Option>, } @@ -238,7 +239,7 @@ impl RecordBatch { /// ``` pub fn try_new(schema: SchemaRef, columns: Vec) -> Result { let options = RecordBatchOptions::new(); - Self::try_new_impl(schema, columns, &options, None) + Self::try_new_impl(schema, columns, &options, None, None) } /// Creates a `RecordBatch` from a schema and columns, with additional options, @@ -250,7 +251,7 @@ impl RecordBatch { columns: Vec, options: &RecordBatchOptions, ) -> Result { - Self::try_new_impl(schema, columns, options, None) + Self::try_new_impl(schema, columns, options, None, None) } /// Creates a `RecordBatch` from a schema and columns, with additional options and symbolic constraints. @@ -260,7 +261,48 @@ impl RecordBatch { options: &RecordBatchOptions, constraints: Option>, ) -> Result { - Self::try_new_impl(schema, columns, options, constraints) + Self::try_new_impl(schema, columns, options, None, constraints) + } + + /// Creates a `RecordBatch` from a schema and columns, with symbolic data. + pub fn try_new_with_symbolic_data( + schema: SchemaRef, + columns: Vec, + row_symbolic_data: Option>, + ) -> Result { + Self::try_new_impl( + schema, + columns, + &RecordBatchOptions::new(), + row_symbolic_data, + None, + ) + } + + /// Creates a `RecordBatch` from a schema and columns, with constraints. + pub fn try_new_with_constraints( + schema: SchemaRef, + columns: Vec, + constraints: Option>, + ) -> Result { + Self::try_new_impl( + schema, + columns, + &RecordBatchOptions::new(), + None, + constraints, + ) + } + + /// Creates a `RecordBatch` from a schema and columns, with additional options and symbolic constraints. + pub fn try_new_with_options_and_constraints_and_symbolic_data( + schema: SchemaRef, + columns: Vec, + options: &RecordBatchOptions, + row_symbolic_data: Option>, + constraints: Option>, + ) -> Result { + Self::try_new_impl(schema, columns, options, row_symbolic_data, constraints) } /// Creates a new empty [`RecordBatch`]. @@ -275,6 +317,7 @@ impl RecordBatch { schema, columns, row_count: 0, + row_symbolic_data: None, constraints: None, } } @@ -285,6 +328,7 @@ impl RecordBatch { schema: SchemaRef, columns: Vec, options: &RecordBatchOptions, + row_symbolic_data: Option>, constraints: Option>, ) -> Result { // check that number of fields in schema match column length @@ -350,6 +394,7 @@ impl RecordBatch { schema, columns, row_count, + row_symbolic_data, constraints, }) } @@ -370,6 +415,7 @@ impl RecordBatch { schema, columns: self.columns, row_count: self.row_count, + row_symbolic_data: self.row_symbolic_data, constraints: self.constraints, }) } @@ -389,6 +435,11 @@ impl RecordBatch { self.constraints.as_ref().map(|c| &c[..]) } + /// Returns the symbolic data of the record batch. + pub fn row_symbolic_data(&self) -> Option<&[crate::SymbolicExpr]> { + self.row_symbolic_data.as_ref().map(|c| &c[..]) + } + /// Projects the schema onto the specified columns pub fn project(&self, indices: &[usize]) -> Result { let projected_schema = self.schema.project(indices)?; @@ -637,6 +688,7 @@ impl RecordBatch { schema: self.schema.clone(), columns, row_count: length, + row_symbolic_data: self.row_symbolic_data.clone(), constraints, } } @@ -797,6 +849,7 @@ impl From for RecordBatch { schema: Arc::new(Schema::new(fields)), row_count, columns, + row_symbolic_data: None, constraints: None, } } diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index a69c5e9a6dac..1fec5b46cdbc 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -300,18 +300,24 @@ pub fn concat_batches<'a>( } let options = RecordBatchOptions::new(); let mut constraints = vec![]; + let mut row_symbolic_data = vec![]; let mut is_some = false; for batch in batches { if let Some(c) = batch.constraints().map(|c| c.to_vec()) { is_some = true; constraints.extend(c); } + if let Some(c) = batch.row_symbolic_data() { + row_symbolic_data.extend(c.to_vec()); + } } let constraints = if is_some { Some(constraints) } else { None }; - let res = RecordBatch::try_new_with_options_and_constraints( + let row_symbolic_data = if row_symbolic_data.is_empty() { None } else { Some(row_symbolic_data) }; + let res = RecordBatch::try_new_with_options_and_constraints_and_symbolic_data( schema.clone(), arrays, &options, + row_symbolic_data, constraints, ); res diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index b79c80085de5..a1b23b2cf29b 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -236,12 +236,36 @@ pub fn filter_record_batch( vec![] }; all_constraints.extend(constraints.unwrap_or_default()); + + //let mut row_symbolic_data = record_batch.row_symbolic_data().map(|c| c.to_vec()); + + // Add constraints to the row symbolic data from all_constraints + let mut row_symbolic_data = record_batch.row_symbolic_data().map(|c| { + let mut data = c.to_vec(); + // Add constraints to each row + data.iter_mut().for_each(|row| { + all_constraints.iter().for_each(|c| { + if let SymbolicExpr::Row { + table, + row_id, + constraint, + } = row + { + add_constraint_to_row(row, c.clone()); + } + }); + }); + data + }); + let constraints = Some(all_constraints); + dbg!(&row_symbolic_data); - RecordBatch::try_new_with_options_and_constraints( + RecordBatch::try_new_with_options_and_constraints_and_symbolic_data( record_batch.schema(), filtered_arrays, &options, + row_symbolic_data, constraints, ) }