diff --git a/arrow-array/src/array/byte_array.rs b/arrow-array/src/array/byte_array.rs index 8cd69e6a21d9..e7e6d27c12e0 100644 --- a/arrow-array/src/array/byte_array.rs +++ b/arrow-array/src/array/byte_array.rs @@ -530,7 +530,6 @@ impl Array for GenericByteArray { } fn to_symbolic_data(&self) -> super::SymbolicArrayData { - dbg!(&self.data_type, &self, &self.symbolic_data); self.symbolic_data .clone() .unwrap_or(self.make_symbolic_data()) diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 7222bd323ef1..b857a134943d 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -80,6 +80,9 @@ use crate::iterator::ArrayIter; mod symbolic_expr; pub use symbolic_expr::make_colref_symbolic_expr_array; +pub use symbolic_expr::arrow_type_to_var_type; +pub use symbolic_expr::add_constraint_to_row; +pub use symbolic_expr::VarType as SymbolicVarType; pub use symbolic_expr::Expr as SymbolicExpr; pub use symbolic_expr::Operator as SymbolicOperator; pub use symbolic_expr::ScalarValue as SymbolicScalarValue; @@ -116,8 +119,6 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the underlying data of this array fn to_data(&self) -> ArrayData; - // fn to_symbolic_data(&self) -> SymbolicArrayData; - /// Returns the underlying data of this array /// /// Unlike [`Array::to_data`] this consumes self, allowing it avoid unnecessary clones @@ -368,10 +369,6 @@ impl Array for ArrayRef { self.as_ref().to_data() } - // fn to_symbolic_data(&self) -> SymbolicArrayData { - // self.as_ref().to_symbolic_data() - // } - fn into_data(self) -> ArrayData { self.to_data() } @@ -459,10 +456,6 @@ impl Array for &T { T::to_data(self) } - // fn to_symbolic_data(&self) -> SymbolicArrayData { - // T::to_symbolic_data(self) - // } - fn into_data(self) -> ArrayData { self.to_data() } diff --git a/arrow-array/src/array/symbolic_expr.rs b/arrow-array/src/array/symbolic_expr.rs index 6ddeba156dd5..88b47814b212 100644 --- a/arrow-array/src/array/symbolic_expr.rs +++ b/arrow-array/src/array/symbolic_expr.rs @@ -1,3 +1,5 @@ +use arrow_schema::DataType; + /// Literal values used in symbolic expressions #[derive(Debug, Clone, PartialEq)] pub enum ScalarValue { @@ -31,6 +33,34 @@ pub enum ScalarValue { Utf8(Option), } +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum VarType { + /// A boolean variable + Boolean, + /// A string variable + String, + /// A int variable + Int, +} + +pub fn arrow_type_to_var_type(arrow_type: DataType) -> VarType { + match arrow_type { + DataType::Boolean => VarType::Boolean, + DataType::Utf8 | DataType::LargeUtf8 => VarType::String, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => VarType::Int, + _ => VarType::String, // Default to String for other types + } +} + /// Operators applied to symbolic expressions #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)] pub enum Operator { @@ -60,6 +90,10 @@ pub enum Operator { And, /// Logical OR, like `||` Or, + /// Logical NOT, like `!` + Not, + /// In, like `in` + In, } /// A symbolic expression for a columnar array. @@ -79,6 +113,29 @@ pub enum Expr { column: usize, /// The position of the row row: usize, + /// The variable type + var_type: VarType, + }, + + /// A symbolic variable representing a row in a table + Row { + /// The name of the table + table: String, + /// The position of the row + row_id: usize, + /// The row constraint + constraint: Option>, + }, + + /// A list of symbolic expressions + List(Vec), + + /// A count symbolic expression + Count { + /// The column to count + expr: Box, + /// Whether the count is distinct + is_distinct: bool, }, /// A binary symbolic expression @@ -108,8 +165,39 @@ impl Expr { } /// Create a symbolic variable - pub fn variable(table: String, column: usize, row: usize) -> Self { - Expr::Variable { table, column, row } + pub fn variable(table: String, column: usize, row: usize, var_type: VarType) -> Self { + Expr::Variable { + table, + column, + row, + var_type, + } + } + + /// Create a symbolic row + pub fn row(table: String, row_id: usize) -> Self { + Expr::Row { + table, + row_id, + constraint: None, + } + } + + /// Create a symbolic row with a constraint + pub fn row_with_constraint(table: String, row_id: usize, constraint: Vec) -> Self { + Expr::Row { + table, + row_id, + constraint: Some(constraint), + } + } + + /// Create a count symbolic expression + pub fn count(expr: Expr, is_distinct: bool) -> Self { + Expr::Count { + expr: Box::new(expr), + is_distinct, + } } /// Create a binary symbolic expression @@ -138,14 +226,44 @@ impl Expr { } /// Create a symbolic expression array for a column reference using table name and column position -pub fn make_colref_symbolic_expr_array(table: String, column: usize, len: usize) -> Vec { +pub fn make_colref_symbolic_expr_array( + table: String, + column: usize, + len: usize, + row_offset: usize, + var_type: VarType, +) -> Vec { let mut res = Vec::with_capacity(len); for i in 0..len { res.push(Expr::Variable { table: table.clone(), column, - row: i, + row: i + row_offset, + var_type, }); } res } + +pub fn add_constraint_to_row(row: &mut Expr, constraint: Expr) { + if let Expr::Row { + constraint: Some(constraints), + .. + } = row + { + constraints.push(constraint); + } else if let Expr::Row { + table, + row_id, + constraint: None, + } = row + { + *row = Expr::Row { + table: table.clone(), + row_id: *row_id, + constraint: Some(vec![constraint]), + }; + } else { + panic!("Expr is not a symbolic row"); + } +} diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 66877629c26c..b2e79c240eaf 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -205,6 +205,7 @@ pub struct RecordBatch { /// /// This is stored separately from the columns to handle the case of no columns row_count: usize, + row_symbolic_data: Option>, constraints: Option>, } @@ -238,7 +239,7 @@ impl RecordBatch { /// ``` pub fn try_new(schema: SchemaRef, columns: Vec) -> Result { let options = RecordBatchOptions::new(); - Self::try_new_impl(schema, columns, &options, None) + Self::try_new_impl(schema, columns, &options, None, None) } /// Creates a `RecordBatch` from a schema and columns, with additional options, @@ -250,7 +251,7 @@ impl RecordBatch { columns: Vec, options: &RecordBatchOptions, ) -> Result { - Self::try_new_impl(schema, columns, options, None) + Self::try_new_impl(schema, columns, options, None, None) } /// Creates a `RecordBatch` from a schema and columns, with additional options and symbolic constraints. @@ -260,7 +261,48 @@ impl RecordBatch { options: &RecordBatchOptions, constraints: Option>, ) -> Result { - Self::try_new_impl(schema, columns, options, constraints) + Self::try_new_impl(schema, columns, options, None, constraints) + } + + /// Creates a `RecordBatch` from a schema and columns, with symbolic data. + pub fn try_new_with_symbolic_data( + schema: SchemaRef, + columns: Vec, + row_symbolic_data: Option>, + ) -> Result { + Self::try_new_impl( + schema, + columns, + &RecordBatchOptions::new(), + row_symbolic_data, + None, + ) + } + + /// Creates a `RecordBatch` from a schema and columns, with constraints. + pub fn try_new_with_constraints( + schema: SchemaRef, + columns: Vec, + constraints: Option>, + ) -> Result { + Self::try_new_impl( + schema, + columns, + &RecordBatchOptions::new(), + None, + constraints, + ) + } + + /// Creates a `RecordBatch` from a schema and columns, with additional options and symbolic constraints. + pub fn try_new_with_options_and_constraints_and_symbolic_data( + schema: SchemaRef, + columns: Vec, + options: &RecordBatchOptions, + row_symbolic_data: Option>, + constraints: Option>, + ) -> Result { + Self::try_new_impl(schema, columns, options, row_symbolic_data, constraints) } /// Creates a new empty [`RecordBatch`]. @@ -275,6 +317,7 @@ impl RecordBatch { schema, columns, row_count: 0, + row_symbolic_data: None, constraints: None, } } @@ -285,9 +328,9 @@ impl RecordBatch { schema: SchemaRef, columns: Vec, options: &RecordBatchOptions, + row_symbolic_data: Option>, constraints: Option>, ) -> Result { - // dbg!(&columns); // check that number of fields in schema match column length if schema.fields().len() != columns.len() { return Err(ArrowError::InvalidArgumentError(format!( @@ -351,6 +394,7 @@ impl RecordBatch { schema, columns, row_count, + row_symbolic_data, constraints, }) } @@ -371,6 +415,7 @@ impl RecordBatch { schema, columns: self.columns, row_count: self.row_count, + row_symbolic_data: self.row_symbolic_data, constraints: self.constraints, }) } @@ -390,6 +435,11 @@ impl RecordBatch { self.constraints.as_ref().map(|c| &c[..]) } + /// Returns the symbolic data of the record batch. + pub fn row_symbolic_data(&self) -> Option<&[crate::SymbolicExpr]> { + self.row_symbolic_data.as_ref().map(|c| &c[..]) + } + /// Projects the schema onto the specified columns pub fn project(&self, indices: &[usize]) -> Result { let projected_schema = self.schema.project(indices)?; @@ -632,15 +682,13 @@ impl RecordBatch { .map(|column| column.slice(offset, length)) .collect(); - let constraints = self - .constraints - .as_ref() - .map(|c| c[offset..offset + length].to_vec()); + let constraints = self.constraints.clone(); Self { schema: self.schema.clone(), columns, row_count: length, + row_symbolic_data: self.row_symbolic_data.clone(), constraints, } } @@ -801,6 +849,7 @@ impl From for RecordBatch { schema: Arc::new(Schema::new(fields)), row_count, columns, + row_symbolic_data: None, constraints: None, } } diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs index 0461e33d12a9..7292f2f03a65 100644 --- a/arrow-ord/src/cmp.rs +++ b/arrow-ord/src/cmp.rs @@ -73,13 +73,15 @@ macro_rules! sym { let mut sym_res = vec![]; for i in 0..syms1.len() { - let l = &syms1[i]; - let r = &syms2[i]; - sym_res.push(SymbolicExpr::binary( - l.clone(), - SymbolicOperator::$op, - r.clone(), - )); + for j in 0..syms2.len() { + let l = &syms1[i]; + let r = &syms2[j]; + sym_res.push(SymbolicExpr::binary( + l.clone(), + SymbolicOperator::$op, + r.clone(), + )); + } } Ok(expr.with_symbolic_data(&sym_res)) }}; diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index c31ea3a897e3..1fec5b46cdbc 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -255,8 +255,13 @@ fn concat_fallback(arrays: &[&dyn Array], capacity: Capacities) -> Result = arrays + .iter() + .flat_map(|a| a.to_symbolic_data()) + .collect::>(); + let array = array.with_symbolic_data(&symbolic_data); + Ok(array) } /// Concatenates `batches` together into a single [`RecordBatch`]. @@ -270,14 +275,15 @@ pub fn concat_batches<'a>( input_batches: impl IntoIterator, ) -> Result { // When schema is empty, sum the number of the rows of all batches + let x = input_batches.into_iter().collect::>(); if schema.fields().is_empty() { - let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum(); + let num_rows: usize = x.iter().cloned().map(RecordBatch::num_rows).sum(); let mut options = RecordBatchOptions::default(); options.row_count = Some(num_rows); return RecordBatch::try_new_with_options(schema.clone(), vec![], &options); } - let batches: Vec<&RecordBatch> = input_batches.into_iter().collect(); + let batches: Vec<&RecordBatch> = x; if batches.is_empty() { return Ok(RecordBatch::new_empty(schema.clone())); } @@ -294,15 +300,27 @@ pub fn concat_batches<'a>( } let options = RecordBatchOptions::new(); let mut constraints = vec![]; + let mut row_symbolic_data = vec![]; let mut is_some = false; for batch in batches { if let Some(c) = batch.constraints().map(|c| c.to_vec()) { is_some = true; constraints.extend(c); } + if let Some(c) = batch.row_symbolic_data() { + row_symbolic_data.extend(c.to_vec()); + } } let constraints = if is_some { Some(constraints) } else { None }; - RecordBatch::try_new_with_options_and_constraints(schema.clone(), arrays, &options, constraints) + let row_symbolic_data = if row_symbolic_data.is_empty() { None } else { Some(row_symbolic_data) }; + let res = RecordBatch::try_new_with_options_and_constraints_and_symbolic_data( + schema.clone(), + arrays, + &options, + row_symbolic_data, + constraints, + ); + res } #[cfg(test)] diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index a25c3be0db35..a1b23b2cf29b 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -181,7 +181,6 @@ pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> Result>() }); - dbg!(&constraints); + let mut all_constraints = if let Some(constraints) = record_batch.constraints() { + constraints.to_vec() + } else { + vec![] + }; + all_constraints.extend(constraints.unwrap_or_default()); + + //let mut row_symbolic_data = record_batch.row_symbolic_data().map(|c| c.to_vec()); + + // Add constraints to the row symbolic data from all_constraints + let mut row_symbolic_data = record_batch.row_symbolic_data().map(|c| { + let mut data = c.to_vec(); + // Add constraints to each row + data.iter_mut().for_each(|row| { + all_constraints.iter().for_each(|c| { + if let SymbolicExpr::Row { + table, + row_id, + constraint, + } = row + { + add_constraint_to_row(row, c.clone()); + } + }); + }); + data + }); + + let constraints = Some(all_constraints); + dbg!(&row_symbolic_data); - RecordBatch::try_new_with_options_and_constraints( + RecordBatch::try_new_with_options_and_constraints_and_symbolic_data( record_batch.schema(), filtered_arrays, &options, + row_symbolic_data, constraints, ) } diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index 71a7c77a8f92..1b45cbc47105 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -92,10 +92,11 @@ pub fn take( take_impl($values, &indices) }}; } - downcast_integer! { + let result = downcast_integer! { indices.data_type() => (helper, values, indices, options), d => Err(ArrowError::InvalidArgumentError(format!("Take only supported for integers, got {d:?}"))) - } + }; + result } /// For each [ArrayRef] in the [`Vec`], take elements by index and create a new @@ -379,7 +380,8 @@ where { let values_buf = take_native(values.values(), indices); let nulls = take_nulls(values.nulls(), indices); - Ok(PrimitiveArray::new(values_buf, nulls).with_data_type(values.data_type().clone())) + let symbolic_data = values.to_symbolic_data(); + Ok(PrimitiveArray::new(values_buf, nulls).with_data_type(values.data_type().clone()).with_symbolic_data(&symbolic_data)) } #[inline(never)] diff --git a/arrow/tests/shrink_to_fit.rs b/arrow/tests/shrink_to_fit.rs index 5d7c2cf98bc9..a9c1fe7bb61c 100644 --- a/arrow/tests/shrink_to_fit.rs +++ b/arrow/tests/shrink_to_fit.rs @@ -45,7 +45,6 @@ fn test_shrink_to_fit_after_concat() { memory_use(|| { let mut concatenated = concatenate(num_concats, list_array.clone()); concatenated.shrink_to_fit(); // This is what we're testing! - dbg!(concatenated.data_type()); concatenated }); let expected_len = num_concats * array_len;