From 7898a21114b3afc9b8ca4ff864778b0404497f74 Mon Sep 17 00:00:00 2001 From: Michael Levin Date: Fri, 6 Jun 2025 01:08:16 -0700 Subject: [PATCH 1/2] Symbolic evaluation v1 --- arrow-array/src/array/boolean_array.rs | 56 ++++++- arrow-array/src/array/byte_array.rs | 56 ++++++- arrow-array/src/array/byte_view_array.rs | 8 + arrow-array/src/array/dictionary_array.rs | 16 ++ .../src/array/fixed_size_binary_array.rs | 8 + .../src/array/fixed_size_list_array.rs | 8 + arrow-array/src/array/list_array.rs | 8 + arrow-array/src/array/list_view_array.rs | 8 + arrow-array/src/array/map_array.rs | 8 + arrow-array/src/array/mod.rs | 41 +++++ arrow-array/src/array/null_array.rs | 8 + arrow-array/src/array/primitive_array.rs | 81 +++++++++- arrow-array/src/array/run_array.rs | 16 ++ arrow-array/src/array/struct_array.rs | 8 + arrow-array/src/array/symbolic_expr.rs | 151 ++++++++++++++++++ arrow-array/src/array/union_array.rs | 8 + arrow-array/src/record_batch.rs | 32 +++- arrow-ord/src/cmp.rs | 43 ++++- arrow-select/src/concat.rs | 12 +- arrow-select/src/filter.rs | 24 ++- 20 files changed, 580 insertions(+), 20 deletions(-) create mode 100644 arrow-array/src/array/symbolic_expr.rs diff --git a/arrow-array/src/array/boolean_array.rs b/arrow-array/src/array/boolean_array.rs index 9c2d4af8c454..6e3f619d3a95 100644 --- a/arrow-array/src/array/boolean_array.rs +++ b/arrow-array/src/array/boolean_array.rs @@ -25,6 +25,8 @@ use arrow_schema::DataType; use std::any::Any; use std::sync::Arc; +use super::SymbolicArrayData; + /// An array of [boolean values](https://arrow.apache.org/docs/format/Columnar.html#fixed-size-primitive-layout) /// /// # Example: From a Vec @@ -68,6 +70,7 @@ use std::sync::Arc; pub struct BooleanArray { values: BooleanBuffer, nulls: Option, + symbolic_data: Option>, } impl std::fmt::Debug for BooleanArray { @@ -76,7 +79,11 @@ impl std::fmt::Debug for BooleanArray { print_long_array(self, f, |array, index, f| { std::fmt::Debug::fmt(&array.value(index), f) })?; - write!(f, "]") + write!(f, "]")?; + if let Some(symbolic_data) = &self.symbolic_data { + write!(f, ",\n{:?}", symbolic_data)?; + } + Ok(()) } } @@ -90,7 +97,11 @@ impl BooleanArray { if let Some(n) = nulls.as_ref() { assert_eq!(values.len(), n.len()); } - Self { values, nulls } + Self { + values, + nulls, + symbolic_data: None, + } } /// Create a new [`BooleanArray`] with length `len` consisting only of nulls @@ -98,6 +109,7 @@ impl BooleanArray { Self { values: BooleanBuffer::new_unset(len), nulls: Some(NullBuffer::new_null(len)), + symbolic_data: None, } } @@ -128,6 +140,15 @@ impl BooleanArray { BooleanBuffer::new(Buffer::from(value), 0, value.len() * 8).into() } + /// Clone a new [`PrimitiveArray`] with the supplied symbolic data + pub fn with_symbolic_data(&self, symbolic_data: &[super::SymbolicExpr]) -> Self { + Self { + values: self.values.clone(), + nulls: self.nulls.clone(), + symbolic_data: Some(symbolic_data.to_vec()), + } + } + /// Returns the length of this array. pub fn len(&self) -> usize { self.values.len() @@ -143,6 +164,10 @@ impl BooleanArray { Self { values: self.values.slice(offset, length), nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), + symbolic_data: self + .symbolic_data + .as_ref() + .map(|s| s[offset..offset + length].to_vec()), } } @@ -277,6 +302,21 @@ impl BooleanArray { pub fn into_parts(self) -> (BooleanBuffer, Option) { (self.values, self.nulls) } + + /// Convert the array to symbolic data + pub fn make_symbolic_data(&self) -> super::SymbolicArrayData { + let mut symbolic_data = vec![]; + for i in 0..self.len() { + let lit = super::SymbolicScalarValue::Boolean(Some(self.value(i))); + symbolic_data.push(super::SymbolicExpr::Literal(lit)); + } + symbolic_data + } + + /// Return array's symbolic data if it exists, otherwise return None + pub fn to_maybe_symbolic_data(&self) -> Option> { + self.symbolic_data.clone() + } } impl Array for BooleanArray { @@ -338,6 +378,16 @@ impl Array for BooleanArray { fn get_array_memory_size(&self) -> usize { std::mem::size_of::() + self.get_buffer_memory_size() } + + fn to_symbolic_data(&self) -> SymbolicArrayData { + self.symbolic_data + .clone() + .unwrap_or(self.make_symbolic_data()) + } + + fn with_symbolic_data(&self, symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + Arc::new(self.with_symbolic_data(symbolic_data)) + } } impl ArrayAccessor for &BooleanArray { @@ -397,6 +447,7 @@ impl From for BooleanArray { Self { values, nulls: data.nulls().cloned(), + symbolic_data: None, } } } @@ -471,6 +522,7 @@ impl From for BooleanArray { Self { values, nulls: None, + symbolic_data: None, } } } diff --git a/arrow-array/src/array/byte_array.rs b/arrow-array/src/array/byte_array.rs index f2b22507081d..8cd69e6a21d9 100644 --- a/arrow-array/src/array/byte_array.rs +++ b/arrow-array/src/array/byte_array.rs @@ -89,6 +89,7 @@ pub struct GenericByteArray { value_offsets: OffsetBuffer, value_data: Buffer, nulls: Option, + symbolic_data: Option, } impl Clone for GenericByteArray { @@ -98,6 +99,7 @@ impl Clone for GenericByteArray { value_offsets: self.value_offsets.clone(), value_data: self.value_data.clone(), nulls: self.nulls.clone(), + symbolic_data: self.symbolic_data.clone(), } } } @@ -151,6 +153,7 @@ impl GenericByteArray { value_offsets: offsets, value_data: values, nulls, + symbolic_data: None, }) } @@ -169,6 +172,7 @@ impl GenericByteArray { value_offsets: offsets, value_data: values, nulls, + symbolic_data: None, } } @@ -179,6 +183,7 @@ impl GenericByteArray { value_offsets: OffsetBuffer::new_zeroed(len), value_data: MutableBuffer::new(0).into(), nulls: Some(NullBuffer::new_null(len)), + symbolic_data: None, } } @@ -187,6 +192,17 @@ impl GenericByteArray { Scalar::new(Self::from_iter_values(std::iter::once(value))) } + /// Clone a new [`GenericByteArray`] with the supplied symbolic data + pub fn with_symbolic_data(&self, symbolic_data: &[super::SymbolicExpr]) -> Self { + Self { + data_type: self.data_type.clone(), + value_offsets: self.value_offsets.clone(), + value_data: self.value_data.clone(), + nulls: self.nulls.clone(), + symbolic_data: Some(symbolic_data.to_vec()), + } + } + /// Creates a [`GenericByteArray`] based on an iterator of values without nulls pub fn from_iter_values(iter: I) -> Self where @@ -218,6 +234,7 @@ impl GenericByteArray { value_data: values.into(), value_offsets, nulls: None, + symbolic_data: None, } } @@ -327,6 +344,10 @@ impl GenericByteArray { value_offsets: self.value_offsets.slice(offset, length), value_data: self.value_data.clone(), nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), + symbolic_data: self + .symbolic_data + .as_ref() + .map(|s| s[offset..offset + length].to_vec()), } } @@ -412,6 +433,23 @@ impl GenericByteArray { } } } + + /// Convert the array to symbolic data + pub fn make_symbolic_data(&self) -> super::SymbolicArrayData { + let mut symbolic_data = vec![]; + for i in 0..self.len() { + let lit = convert_to_symbolic_scalar_value::(self.value(i)); + symbolic_data.push(super::SymbolicExpr::Literal(lit)); + } + symbolic_data + } +} + +fn convert_to_symbolic_scalar_value( + value: &T::Native, +) -> super::SymbolicScalarValue { + // TODO: are we handling this correctly for all byte types? + super::SymbolicScalarValue::Utf8(Some(format!("{:?}", value))) } impl std::fmt::Debug for GenericByteArray { @@ -420,7 +458,11 @@ impl std::fmt::Debug for GenericByteArray { print_long_array(self, f, |array, index, f| { std::fmt::Debug::fmt(&array.value(index), f) })?; - write!(f, "]") + write!(f, "]")?; + if let Some(symbolic_data) = &self.symbolic_data { + write!(f, ",\n{:?}", symbolic_data)?; + } + Ok(()) } } @@ -486,6 +528,17 @@ impl Array for GenericByteArray { fn get_array_memory_size(&self) -> usize { std::mem::size_of::() + self.get_buffer_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + dbg!(&self.data_type, &self, &self.symbolic_data); + self.symbolic_data + .clone() + .unwrap_or(self.make_symbolic_data()) + } + + fn with_symbolic_data(&self, symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + Arc::new(self.with_symbolic_data(symbolic_data)) + } } impl<'a, T: ByteArrayType> ArrayAccessor for &'a GenericByteArray { @@ -526,6 +579,7 @@ impl From for GenericByteArray { value_data, data_type: T::DATA_TYPE, nulls: data.nulls().cloned(), + symbolic_data: None, } } } diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 9d2d396a5266..8282e6d0247c 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -609,6 +609,14 @@ impl Array for GenericByteViewArray { fn get_array_memory_size(&self) -> usize { std::mem::size_of::() + self.get_buffer_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl<'a, T: ByteViewType + ?Sized> ArrayAccessor for &'a GenericByteViewArray { diff --git a/arrow-array/src/array/dictionary_array.rs b/arrow-array/src/array/dictionary_array.rs index f852b57fb65e..f0f13c41c733 100644 --- a/arrow-array/src/array/dictionary_array.rs +++ b/arrow-array/src/array/dictionary_array.rs @@ -787,6 +787,14 @@ impl Array for DictionaryArray { + self.keys.get_buffer_memory_size() + self.values.get_array_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl std::fmt::Debug for DictionaryArray { @@ -906,6 +914,14 @@ impl Array for TypedDictionaryArray<'_, K, V fn get_array_memory_size(&self) -> usize { self.dictionary.get_array_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl IntoIterator for TypedDictionaryArray<'_, K, V> diff --git a/arrow-array/src/array/fixed_size_binary_array.rs b/arrow-array/src/array/fixed_size_binary_array.rs index 576b8012491b..044030205bca 100644 --- a/arrow-array/src/array/fixed_size_binary_array.rs +++ b/arrow-array/src/array/fixed_size_binary_array.rs @@ -634,6 +634,14 @@ impl Array for FixedSizeBinaryArray { fn get_array_memory_size(&self) -> usize { std::mem::size_of::() + self.get_buffer_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl<'a> ArrayAccessor for &'a FixedSizeBinaryArray { diff --git a/arrow-array/src/array/fixed_size_list_array.rs b/arrow-array/src/array/fixed_size_list_array.rs index 44be442c9f85..2475aed143d4 100644 --- a/arrow-array/src/array/fixed_size_list_array.rs +++ b/arrow-array/src/array/fixed_size_list_array.rs @@ -436,6 +436,14 @@ impl Array for FixedSizeListArray { } size } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl ArrayAccessor for FixedSizeListArray { diff --git a/arrow-array/src/array/list_array.rs b/arrow-array/src/array/list_array.rs index 009a7b7a5075..b2a2d5876ed3 100644 --- a/arrow-array/src/array/list_array.rs +++ b/arrow-array/src/array/list_array.rs @@ -576,6 +576,14 @@ impl Array for GenericListArray { } size } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl ArrayAccessor for &GenericListArray { diff --git a/arrow-array/src/array/list_view_array.rs b/arrow-array/src/array/list_view_array.rs index 6118607bcbbf..c7b7224f48b8 100644 --- a/arrow-array/src/array/list_view_array.rs +++ b/arrow-array/src/array/list_view_array.rs @@ -432,6 +432,14 @@ impl Array for GenericListViewArray { } size } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl std::fmt::Debug for GenericListViewArray { diff --git a/arrow-array/src/array/map_array.rs b/arrow-array/src/array/map_array.rs index 18a7c491aa16..68d30f44a931 100644 --- a/arrow-array/src/array/map_array.rs +++ b/arrow-array/src/array/map_array.rs @@ -410,6 +410,14 @@ impl Array for MapArray { } size } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl ArrayAccessor for &MapArray { diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index e41a3a1d719a..7222bd323ef1 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -78,6 +78,15 @@ pub use list_view_array::*; use crate::iterator::ArrayIter; +mod symbolic_expr; +pub use symbolic_expr::make_colref_symbolic_expr_array; +pub use symbolic_expr::Expr as SymbolicExpr; +pub use symbolic_expr::Operator as SymbolicOperator; +pub use symbolic_expr::ScalarValue as SymbolicScalarValue; + +/// A symbolic array data is a vector of symbolic expressions. +pub type SymbolicArrayData = Vec; + /// An array in the [arrow columnar format](https://arrow.apache.org/docs/format/Columnar.html) pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the array as [`Any`] so that it can be @@ -107,6 +116,8 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the underlying data of this array fn to_data(&self) -> ArrayData; + // fn to_symbolic_data(&self) -> SymbolicArrayData; + /// Returns the underlying data of this array /// /// Unlike [`Array::to_data`] this consumes self, allowing it avoid unnecessary clones @@ -336,6 +347,12 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// This value will always be greater than returned by `get_buffer_memory_size()` and /// includes the overhead of the data structures that contain the pointers to the various buffers. fn get_array_memory_size(&self) -> usize; + + /// Returns the symbolic data of this array + fn to_symbolic_data(&self) -> SymbolicArrayData; + + /// Clone a new [`Array`] with the supplied symbolic data + fn with_symbolic_data(&self, symbolic_data: &[SymbolicExpr]) -> ArrayRef; } /// A reference-counted reference to a generic `Array` @@ -351,6 +368,10 @@ impl Array for ArrayRef { self.as_ref().to_data() } + // fn to_symbolic_data(&self) -> SymbolicArrayData { + // self.as_ref().to_symbolic_data() + // } + fn into_data(self) -> ArrayData { self.to_data() } @@ -419,6 +440,14 @@ impl Array for ArrayRef { fn get_array_memory_size(&self) -> usize { self.as_ref().get_array_memory_size() } + + fn to_symbolic_data(&self) -> SymbolicArrayData { + self.as_ref().to_symbolic_data() + } + + fn with_symbolic_data(&self, symbolic_data: &[SymbolicExpr]) -> ArrayRef { + self.as_ref().with_symbolic_data(symbolic_data) + } } impl Array for &T { @@ -430,6 +459,10 @@ impl Array for &T { T::to_data(self) } + // fn to_symbolic_data(&self) -> SymbolicArrayData { + // T::to_symbolic_data(self) + // } + fn into_data(self) -> ArrayData { self.to_data() } @@ -489,6 +522,14 @@ impl Array for &T { fn get_array_memory_size(&self) -> usize { T::get_array_memory_size(self) } + + fn to_symbolic_data(&self) -> SymbolicArrayData { + T::to_symbolic_data(self) + } + + fn with_symbolic_data(&self, symbolic_data: &[SymbolicExpr]) -> ArrayRef { + T::with_symbolic_data(self, symbolic_data) + } } /// A generic trait for accessing the values of an [`Array`] diff --git a/arrow-array/src/array/null_array.rs b/arrow-array/src/array/null_array.rs index 9a7a5ebe17fe..567a0eb906bf 100644 --- a/arrow-array/src/array/null_array.rs +++ b/arrow-array/src/array/null_array.rs @@ -132,6 +132,14 @@ impl Array for NullArray { fn get_array_memory_size(&self) -> usize { std::mem::size_of::() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl From for NullArray { diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index 57aa23bf9040..5c3739957118 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -23,7 +23,7 @@ use crate::temporal_conversions::{ }; use crate::timezone::Tz; use crate::trusted_len::trusted_len_unzip; -use crate::types::*; +use crate::{types::*, SymbolicScalarValue}; use crate::{Array, ArrayAccessor, ArrayRef, Scalar}; use arrow_buffer::{i256, ArrowNativeType, Buffer, NullBuffer, ScalarBuffer}; use arrow_data::bit_iterator::try_for_each_valid_idx; @@ -351,7 +351,7 @@ pub type Time64MicrosecondArray = PrimitiveArray; /// hold values such as `00:02:00.123456789` pub type Time64NanosecondArray = PrimitiveArray; -/// A [`PrimitiveArray`] of “calendar” intervals in whole months +/// A [`PrimitiveArray`] of "calendar" intervals in whole months /// /// See [`IntervalYearMonthType`] for details on representation and caveats. /// @@ -366,7 +366,7 @@ pub type Time64NanosecondArray = PrimitiveArray; /// ``` pub type IntervalYearMonthArray = PrimitiveArray; -/// A [`PrimitiveArray`] of “calendar” intervals in days and milliseconds +/// A [`PrimitiveArray`] of "calendar" intervals in days and milliseconds /// /// See [`IntervalDayTime`] for details on representation and caveats. /// @@ -382,7 +382,7 @@ pub type IntervalYearMonthArray = PrimitiveArray; /// ``` pub type IntervalDayTimeArray = PrimitiveArray; -/// A [`PrimitiveArray`] of “calendar” intervals in months, days, and nanoseconds. +/// A [`PrimitiveArray`] of "calendar" intervals in months, days, and nanoseconds. /// /// See [`IntervalMonthDayNano`] for details on representation and caveats. /// @@ -530,6 +530,7 @@ pub struct PrimitiveArray { /// Values data values: ScalarBuffer, nulls: Option, + symbolic_data: Option, } impl Clone for PrimitiveArray { @@ -538,6 +539,7 @@ impl Clone for PrimitiveArray { data_type: self.data_type.clone(), values: self.values.clone(), nulls: self.nulls.clone(), + symbolic_data: self.symbolic_data.clone(), } } } @@ -574,6 +576,7 @@ impl PrimitiveArray { data_type: T::DATA_TYPE, values: vec![T::Native::usize_as(0); length].into(), nulls: Some(NullBuffer::new_null(length)), + symbolic_data: None, } } @@ -601,6 +604,7 @@ impl PrimitiveArray { data_type: T::DATA_TYPE, values, nulls, + symbolic_data: None, }) } @@ -610,9 +614,20 @@ impl PrimitiveArray { data_type: T::DATA_TYPE, values: vec![value].into(), nulls: None, + symbolic_data: None, }) } + /// Clone a new [`PrimitiveArray`] with the supplied symbolic data + pub fn with_symbolic_data(&self, symbolic_data: &[super::SymbolicExpr]) -> Self { + Self { + data_type: self.data_type.clone(), + values: self.values.clone(), + nulls: self.nulls.clone(), + symbolic_data: Some(symbolic_data.to_vec()), + } + } + /// Deconstruct this array into its constituent parts pub fn into_parts(self) -> (DataType, ScalarBuffer, Option) { (self.data_type, self.values, self.nulls) @@ -710,6 +725,7 @@ impl PrimitiveArray { data_type: T::DATA_TYPE, values: ScalarBuffer::new(val_buf, 0, len), nulls: None, + symbolic_data: None, } } @@ -724,6 +740,7 @@ impl PrimitiveArray { data_type: T::DATA_TYPE, values: ScalarBuffer::new(val_buf, 0, len), nulls, + symbolic_data: None, } } @@ -760,6 +777,10 @@ impl PrimitiveArray { data_type: self.data_type.clone(), values: self.values.slice(offset, length), nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), + symbolic_data: self + .symbolic_data + .as_ref() + .map(|s| s[offset..offset + length].to_vec()), } } @@ -1110,6 +1131,41 @@ impl PrimitiveArray { } } } + + /// Convert the array to symbolic data + pub fn make_symbolic_data(&self) -> super::SymbolicArrayData { + let mut symbolic_data = vec![]; + for i in 0..self.len() { + let lit = convert_to_symbolic_scalar_value::(self.value(i)); + symbolic_data.push(super::SymbolicExpr::Literal(lit)); + } + symbolic_data + } +} + +fn convert_to_symbolic_scalar_value( + value: T::Native, +) -> SymbolicScalarValue { + match T::DATA_TYPE { + DataType::Boolean => SymbolicScalarValue::Boolean(Some(value.as_usize() != 0)), + DataType::Int8 => SymbolicScalarValue::Int8(Some(value.as_usize() as i8)), + DataType::Int16 => SymbolicScalarValue::Int16(Some(value.as_usize() as i16)), + DataType::Int32 => SymbolicScalarValue::Int32(Some(value.as_usize() as i32)), + DataType::Int64 => SymbolicScalarValue::Int64(Some(value.as_usize() as i64)), + DataType::UInt8 => SymbolicScalarValue::UInt8(Some(value.as_usize() as u8)), + DataType::UInt16 => SymbolicScalarValue::UInt16(Some(value.as_usize() as u16)), + DataType::UInt32 => SymbolicScalarValue::UInt32(Some(value.as_usize() as u32)), + DataType::UInt64 => SymbolicScalarValue::UInt64(Some(value.as_usize() as u64)), + DataType::Float32 => SymbolicScalarValue::Float32(Some(value.as_usize() as f32)), + DataType::Float64 => SymbolicScalarValue::Float64(Some(value.as_usize() as f64)), + DataType::Decimal128(precision, scale) => { + SymbolicScalarValue::Decimal128(Some(value.as_usize() as i128), precision, scale) + } + _ => panic!( + "Unsupported primitive type for symbolic expressions: {:?}", + T::DATA_TYPE + ), + } } impl From> for ArrayData { @@ -1182,6 +1238,16 @@ impl Array for PrimitiveArray { fn get_array_memory_size(&self) -> usize { std::mem::size_of::() + self.get_buffer_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + self.symbolic_data + .clone() + .unwrap_or(self.make_symbolic_data()) + } + + fn with_symbolic_data(&self, symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + Arc::new(self.with_symbolic_data(symbolic_data)) + } } impl ArrayAccessor for &PrimitiveArray { @@ -1298,7 +1364,11 @@ impl std::fmt::Debug for PrimitiveArray { } _ => std::fmt::Debug::fmt(&array.value(index), f), })?; - write!(f, "]") + write!(f, "]")?; + if let Some(symbolic_data) = &self.symbolic_data { + write!(f, ",\n{:?}", symbolic_data)?; + } + Ok(()) } } @@ -1529,6 +1599,7 @@ impl From for PrimitiveArray { data_type: data.data_type().clone(), values, nulls: data.nulls().cloned(), + symbolic_data: None, } } } diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index b340bf9a9065..d7054c9f1f72 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -389,6 +389,14 @@ impl Array for RunArray { + self.run_ends.inner().inner().capacity() + self.values.get_array_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl std::fmt::Debug for RunArray { @@ -616,6 +624,14 @@ impl Array for TypedRunArray<'_, R, V> { fn get_array_memory_size(&self) -> usize { self.run_array.get_array_memory_size() } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } // Array accessor converts the index of logical array to the index of the physical array diff --git a/arrow-array/src/array/struct_array.rs b/arrow-array/src/array/struct_array.rs index de6d9c699d22..e28afae2091e 100644 --- a/arrow-array/src/array/struct_array.rs +++ b/arrow-array/src/array/struct_array.rs @@ -400,6 +400,14 @@ impl Array for StructArray { } size } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl From> for StructArray { diff --git a/arrow-array/src/array/symbolic_expr.rs b/arrow-array/src/array/symbolic_expr.rs new file mode 100644 index 000000000000..6ddeba156dd5 --- /dev/null +++ b/arrow-array/src/array/symbolic_expr.rs @@ -0,0 +1,151 @@ +/// Literal values used in symbolic expressions +#[derive(Debug, Clone, PartialEq)] +pub enum ScalarValue { + /// represents `DataType::Null` (castable to/from any other type) + Null, + /// true or false value + Boolean(Option), + /// 32bit float + Float32(Option), + /// 64bit float + Float64(Option), + /// 128bit decimal, using the i128 to represent the decimal, precision scale + Decimal128(Option, u8, i8), + /// signed 8bit int + Int8(Option), + /// signed 16bit int + Int16(Option), + /// signed 32bit int + Int32(Option), + /// signed 64bit int + Int64(Option), + /// unsigned 8bit int + UInt8(Option), + /// unsigned 16bit int + UInt16(Option), + /// unsigned 32bit int + UInt32(Option), + /// unsigned 64bit int + UInt64(Option), + /// utf-8 encoded string. + Utf8(Option), +} + +/// Operators applied to symbolic expressions +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)] +pub enum Operator { + /// Expressions are equal + Equal, + /// Expressions are not equal + NotEqual, + /// Left side is smaller than right side + Less, + /// Left side is smaller or equal to right side + LessEqual, + /// Left side is greater than right side + Greater, + /// Left side is greater or equal to right side + GreaterEqual, + /// Addition + Plus, + /// Subtraction + Minus, + /// Multiplication operator, like `*` + Multiply, + /// Division operator, like `/` + Divide, + /// Remainder operator, like `%` + Modulo, + /// Logical AND, like `&&` + And, + /// Logical OR, like `||` + Or, +} + +/// A symbolic expression for a columnar array. +/// +/// This is used to represent the symbolic data of an array. +/// Used to represent the symbolic data of an array. +#[derive(Debug, Clone, PartialEq)] +pub enum Expr { + /// A literal symbolic expression + Literal(ScalarValue), + + /// A symbolic variable representing a cell in a table + Variable { + /// The name of the table + table: String, + /// The position of the column + column: usize, + /// The position of the row + row: usize, + }, + + /// A binary symbolic expression + BinaryExpr { + /// The left side of the binary expression + left: Box, + /// The operator of the binary expression + op: Operator, + /// The right side of the binary expression + right: Box, + }, + + /// A negation symbolic expression + Not(Box), + + /// A null check symbolic expression + IsNull(Box), + + /// A not null check symbolic expression + IsNotNull(Box), +} + +impl Expr { + /// Create a literal symbolic expression + pub fn literal(val: ScalarValue) -> Self { + Expr::Literal(val) + } + + /// Create a symbolic variable + pub fn variable(table: String, column: usize, row: usize) -> Self { + Expr::Variable { table, column, row } + } + + /// Create a binary symbolic expression + pub fn binary(left: Expr, op: Operator, right: Expr) -> Self { + Expr::BinaryExpr { + left: Box::new(left), + op, + right: Box::new(right), + } + } + + /// Create a negation symbolic expression + pub fn not(expr: Expr) -> Self { + Expr::Not(Box::new(expr)) + } + + /// Create a null check symbolic expression + pub fn is_null(expr: Expr) -> Self { + Expr::IsNull(Box::new(expr)) + } + + /// Create a not null check symbolic expression + pub fn is_not_null(expr: Expr) -> Self { + Expr::IsNotNull(Box::new(expr)) + } +} + +/// Create a symbolic expression array for a column reference using table name and column position +pub fn make_colref_symbolic_expr_array(table: String, column: usize, len: usize) -> Vec { + let mut res = Vec::with_capacity(len); + for i in 0..len { + res.push(Expr::Variable { + table: table.clone(), + column, + row: i, + }); + } + res +} diff --git a/arrow-array/src/array/union_array.rs b/arrow-array/src/array/union_array.rs index b442395b4978..30035db241f8 100644 --- a/arrow-array/src/array/union_array.rs +++ b/arrow-array/src/array/union_array.rs @@ -924,6 +924,14 @@ impl Array for UnionArray { .sum::() + sum } + + fn to_symbolic_data(&self) -> super::SymbolicArrayData { + todo!() + } + + fn with_symbolic_data(&self, _symbolic_data: &[super::SymbolicExpr]) -> ArrayRef { + todo!() + } } impl std::fmt::Debug for UnionArray { diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index a6c2aee7cbc6..66877629c26c 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -205,6 +205,7 @@ pub struct RecordBatch { /// /// This is stored separately from the columns to handle the case of no columns row_count: usize, + constraints: Option>, } impl RecordBatch { @@ -237,7 +238,7 @@ impl RecordBatch { /// ``` pub fn try_new(schema: SchemaRef, columns: Vec) -> Result { let options = RecordBatchOptions::new(); - Self::try_new_impl(schema, columns, &options) + Self::try_new_impl(schema, columns, &options, None) } /// Creates a `RecordBatch` from a schema and columns, with additional options, @@ -249,7 +250,17 @@ impl RecordBatch { columns: Vec, options: &RecordBatchOptions, ) -> Result { - Self::try_new_impl(schema, columns, options) + Self::try_new_impl(schema, columns, options, None) + } + + /// Creates a `RecordBatch` from a schema and columns, with additional options and symbolic constraints. + pub fn try_new_with_options_and_constraints( + schema: SchemaRef, + columns: Vec, + options: &RecordBatchOptions, + constraints: Option>, + ) -> Result { + Self::try_new_impl(schema, columns, options, constraints) } /// Creates a new empty [`RecordBatch`]. @@ -264,6 +275,7 @@ impl RecordBatch { schema, columns, row_count: 0, + constraints: None, } } @@ -273,7 +285,9 @@ impl RecordBatch { schema: SchemaRef, columns: Vec, options: &RecordBatchOptions, + constraints: Option>, ) -> Result { + // dbg!(&columns); // check that number of fields in schema match column length if schema.fields().len() != columns.len() { return Err(ArrowError::InvalidArgumentError(format!( @@ -337,6 +351,7 @@ impl RecordBatch { schema, columns, row_count, + constraints, }) } @@ -356,6 +371,7 @@ impl RecordBatch { schema, columns: self.columns, row_count: self.row_count, + constraints: self.constraints, }) } @@ -369,6 +385,11 @@ impl RecordBatch { &self.schema } + /// Returns the constraints of the record batch. + pub fn constraints(&self) -> Option<&[crate::SymbolicExpr]> { + self.constraints.as_ref().map(|c| &c[..]) + } + /// Projects the schema onto the specified columns pub fn project(&self, indices: &[usize]) -> Result { let projected_schema = self.schema.project(indices)?; @@ -611,10 +632,16 @@ impl RecordBatch { .map(|column| column.slice(offset, length)) .collect(); + let constraints = self + .constraints + .as_ref() + .map(|c| c[offset..offset + length].to_vec()); + Self { schema: self.schema.clone(), columns, row_count: length, + constraints, } } @@ -774,6 +801,7 @@ impl From for RecordBatch { schema: Arc::new(Schema::new(fields)), row_count, columns, + constraints: None, } } } diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs index 2727ff996150..0461e33d12a9 100644 --- a/arrow-ord/src/cmp.rs +++ b/arrow-ord/src/cmp.rs @@ -29,6 +29,7 @@ use arrow_array::{ downcast_primitive_array, AnyDictionaryArray, Array, ArrowNativeTypeOp, BooleanArray, Datum, FixedSizeBinaryArray, GenericByteArray, GenericByteViewArray, }; +use arrow_array::{SymbolicExpr, SymbolicOperator}; use arrow_buffer::bit_util::ceil; use arrow_buffer::{BooleanBuffer, MutableBuffer, NullBuffer}; use arrow_schema::ArrowError; @@ -62,6 +63,28 @@ impl std::fmt::Display for Op { } } +macro_rules! sym { + ($op:ident, $lhs:expr, $rhs:expr) => {{ + let lhs = $lhs.get().0; + let rhs = $rhs.get().0; + let expr = compare_op(Op::$op, $lhs, $rhs)?; + let syms1 = lhs.to_symbolic_data(); + let syms2 = rhs.to_symbolic_data(); + + let mut sym_res = vec![]; + for i in 0..syms1.len() { + let l = &syms1[i]; + let r = &syms2[i]; + sym_res.push(SymbolicExpr::binary( + l.clone(), + SymbolicOperator::$op, + r.clone(), + )); + } + Ok(expr.with_symbolic_data(&sym_res)) + }}; +} + /// Perform `left == right` operation on two [`Datum`]. /// /// Comparing null values on either side will yield a null in the corresponding @@ -76,7 +99,7 @@ impl std::fmt::Display for Op { /// Nested types, such as lists, are not supported as the null semantics are not well-defined. /// For comparisons involving nested types see [`crate::ord::make_comparator`] pub fn eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result { - compare_op(Op::Equal, lhs, rhs) + sym!(Equal, lhs, rhs) } /// Perform `left != right` operation on two [`Datum`]. @@ -93,7 +116,7 @@ pub fn eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result /// Nested types, such as lists, are not supported as the null semantics are not well-defined. /// For comparisons involving nested types see [`crate::ord::make_comparator`] pub fn neq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result { - compare_op(Op::NotEqual, lhs, rhs) + sym!(NotEqual, lhs, rhs) } /// Perform `left < right` operation on two [`Datum`]. @@ -110,7 +133,7 @@ pub fn neq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result /// Nested types, such as lists, are not supported as the null semantics are not well-defined. /// For comparisons involving nested types see [`crate::ord::make_comparator`] pub fn lt(lhs: &dyn Datum, rhs: &dyn Datum) -> Result { - compare_op(Op::Less, lhs, rhs) + sym!(Less, lhs, rhs) } /// Perform `left <= right` operation on two [`Datum`]. @@ -127,7 +150,7 @@ pub fn lt(lhs: &dyn Datum, rhs: &dyn Datum) -> Result /// Nested types, such as lists, are not supported as the null semantics are not well-defined. /// For comparisons involving nested types see [`crate::ord::make_comparator`] pub fn lt_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result { - compare_op(Op::LessEqual, lhs, rhs) + sym!(LessEqual, lhs, rhs) } /// Perform `left > right` operation on two [`Datum`]. @@ -144,7 +167,7 @@ pub fn lt_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result Result { - compare_op(Op::Greater, lhs, rhs) + sym!(Greater, lhs, rhs) } /// Perform `left >= right` operation on two [`Datum`]. @@ -161,7 +184,7 @@ pub fn gt(lhs: &dyn Datum, rhs: &dyn Datum) -> Result /// Nested types, such as lists, are not supported as the null semantics are not well-defined. /// For comparisons involving nested types see [`crate::ord::make_comparator`] pub fn gt_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result { - compare_op(Op::GreaterEqual, lhs, rhs) + sym!(GreaterEqual, lhs, rhs) } /// Perform `left IS DISTINCT FROM right` operation on two [`Datum`] @@ -179,7 +202,9 @@ pub fn gt_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result Result { - compare_op(Op::Distinct, lhs, rhs) + let _res = compare_op(Op::Distinct, lhs, rhs); + unimplemented!() + // res } /// Perform `left IS NOT DISTINCT FROM right` operation on two [`Datum`] @@ -197,7 +222,9 @@ pub fn distinct(lhs: &dyn Datum, rhs: &dyn Datum) -> Result Result { - compare_op(Op::NotDistinct, lhs, rhs) + let _res = compare_op(Op::NotDistinct, lhs, rhs); + unimplemented!() + // res } /// Perform `op` on the provided `Datum` diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 1f453466dc9b..c31ea3a897e3 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -292,7 +292,17 @@ pub fn concat_batches<'a>( )?; arrays.push(array); } - RecordBatch::try_new(schema.clone(), arrays) + let options = RecordBatchOptions::new(); + let mut constraints = vec![]; + let mut is_some = false; + for batch in batches { + if let Some(c) = batch.constraints().map(|c| c.to_vec()) { + is_some = true; + constraints.extend(c); + } + } + let constraints = if is_some { Some(constraints) } else { None }; + RecordBatch::try_new_with_options_and_constraints(schema.clone(), arrays, &options, constraints) } #[cfg(test)] diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index c91732848653..a25c3be0db35 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -217,7 +217,29 @@ pub fn filter_record_batch( .map(|a| filter_array(a, &filter)) .collect::, _>>()?; let options = RecordBatchOptions::default().with_row_count(Some(filter.count())); - RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) + let constraints = predicate.to_maybe_symbolic_data().map(|exprs| { + exprs + .iter() + .enumerate() + .map(|(i, expr)| { + dbg!(&predicate.value(i)); + if predicate.value(i) { + expr.clone() + } else { + SymbolicExpr::not(expr.clone()) + } + }) + .collect::>() + }); + + dbg!(&constraints); + + RecordBatch::try_new_with_options_and_constraints( + record_batch.schema(), + filtered_arrays, + &options, + constraints, + ) } /// A builder to construct [`FilterPredicate`] From a95cd7089be21a765bac2cfc839cd412b1ec4238 Mon Sep 17 00:00:00 2001 From: Michael Levin Date: Fri, 6 Jun 2025 15:08:43 -0700 Subject: [PATCH 2/2] Fixing bugs and initial support for UNION ALL for inputs --- arrow-array/src/array/mod.rs | 10 ---------- arrow-array/src/record_batch.rs | 5 +---- arrow-select/src/concat.rs | 6 ++++-- arrow-select/src/filter.rs | 1 + 4 files changed, 6 insertions(+), 16 deletions(-) diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 7222bd323ef1..d1fdefb19743 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -116,8 +116,6 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the underlying data of this array fn to_data(&self) -> ArrayData; - // fn to_symbolic_data(&self) -> SymbolicArrayData; - /// Returns the underlying data of this array /// /// Unlike [`Array::to_data`] this consumes self, allowing it avoid unnecessary clones @@ -368,10 +366,6 @@ impl Array for ArrayRef { self.as_ref().to_data() } - // fn to_symbolic_data(&self) -> SymbolicArrayData { - // self.as_ref().to_symbolic_data() - // } - fn into_data(self) -> ArrayData { self.to_data() } @@ -459,10 +453,6 @@ impl Array for &T { T::to_data(self) } - // fn to_symbolic_data(&self) -> SymbolicArrayData { - // T::to_symbolic_data(self) - // } - fn into_data(self) -> ArrayData { self.to_data() } diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 66877629c26c..33596f58b1b3 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -632,10 +632,7 @@ impl RecordBatch { .map(|column| column.slice(offset, length)) .collect(); - let constraints = self - .constraints - .as_ref() - .map(|c| c[offset..offset + length].to_vec()); + let constraints = self.constraints.clone(); Self { schema: self.schema.clone(), diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index c31ea3a897e3..53848370876a 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -270,14 +270,16 @@ pub fn concat_batches<'a>( input_batches: impl IntoIterator, ) -> Result { // When schema is empty, sum the number of the rows of all batches + let x = input_batches.into_iter().collect::>(); + dbg!(&x); if schema.fields().is_empty() { - let num_rows: usize = input_batches.into_iter().map(RecordBatch::num_rows).sum(); + let num_rows: usize = x.iter().cloned().map(RecordBatch::num_rows).sum(); let mut options = RecordBatchOptions::default(); options.row_count = Some(num_rows); return RecordBatch::try_new_with_options(schema.clone(), vec![], &options); } - let batches: Vec<&RecordBatch> = input_batches.into_iter().collect(); + let batches: Vec<&RecordBatch> = x; if batches.is_empty() { return Ok(RecordBatch::new_empty(schema.clone())); } diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index a25c3be0db35..577d370cffed 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -203,6 +203,7 @@ pub fn filter_record_batch( record_batch: &RecordBatch, predicate: &BooleanArray, ) -> Result { + dbg!(&record_batch); let mut filter_builder = FilterBuilder::new(predicate); if record_batch.num_columns() > 1 { // Only optimize if filtering more than one column