Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion arrow-array/src/array/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,6 @@ impl<T: ByteArrayType> Array for GenericByteArray<T> {
}

fn to_symbolic_data(&self) -> super::SymbolicArrayData {
dbg!(&self.data_type, &self, &self.symbolic_data);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did we end up with dbg lines in the first place? For other repos we have a protection, should we lint here as well?

self.symbolic_data
.clone()
.unwrap_or(self.make_symbolic_data())
Expand Down
13 changes: 3 additions & 10 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ use crate::iterator::ArrayIter;

mod symbolic_expr;
pub use symbolic_expr::make_colref_symbolic_expr_array;
pub use symbolic_expr::arrow_type_to_var_type;
pub use symbolic_expr::add_constraint_to_row;
pub use symbolic_expr::VarType as SymbolicVarType;
pub use symbolic_expr::Expr as SymbolicExpr;
pub use symbolic_expr::Operator as SymbolicOperator;
pub use symbolic_expr::ScalarValue as SymbolicScalarValue;
Expand Down Expand Up @@ -116,8 +119,6 @@ pub trait Array: std::fmt::Debug + Send + Sync {
/// Returns the underlying data of this array
fn to_data(&self) -> ArrayData;

// fn to_symbolic_data(&self) -> SymbolicArrayData;

/// Returns the underlying data of this array
///
/// Unlike [`Array::to_data`] this consumes self, allowing it avoid unnecessary clones
Expand Down Expand Up @@ -368,10 +369,6 @@ impl Array for ArrayRef {
self.as_ref().to_data()
}

// fn to_symbolic_data(&self) -> SymbolicArrayData {
// self.as_ref().to_symbolic_data()
// }

fn into_data(self) -> ArrayData {
self.to_data()
}
Expand Down Expand Up @@ -459,10 +456,6 @@ impl<T: Array> Array for &T {
T::to_data(self)
}

// fn to_symbolic_data(&self) -> SymbolicArrayData {
// T::to_symbolic_data(self)
// }

fn into_data(self) -> ArrayData {
self.to_data()
}
Expand Down
126 changes: 122 additions & 4 deletions arrow-array/src/array/symbolic_expr.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use arrow_schema::DataType;

/// Literal values used in symbolic expressions
#[derive(Debug, Clone, PartialEq)]
pub enum ScalarValue {
Expand Down Expand Up @@ -31,6 +33,34 @@ pub enum ScalarValue {
Utf8(Option<String>),
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub enum VarType {
/// A boolean variable
Boolean,
/// A string variable
String,
/// A int variable
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: An int

Int,
}

pub fn arrow_type_to_var_type(arrow_type: DataType) -> VarType {
match arrow_type {
DataType::Boolean => VarType::Boolean,
DataType::Utf8 | DataType::LargeUtf8 => VarType::String,
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _) => VarType::Int,
_ => VarType::String, // Default to String for other types
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are those other types and why is string ok? Is it safer to return an error in this case

}
}

/// Operators applied to symbolic expressions
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum Operator {
Expand Down Expand Up @@ -60,6 +90,10 @@ pub enum Operator {
And,
/// Logical OR, like `||`
Or,
/// Logical NOT, like `!`
Not,
/// In, like `in`
In,
}

/// A symbolic expression for a columnar array.
Expand All @@ -79,6 +113,29 @@ pub enum Expr {
column: usize,
/// The position of the row
row: usize,
/// The variable type
var_type: VarType,
},

/// A symbolic variable representing a row in a table
Row {
/// The name of the table
table: String,
/// The position of the row
row_id: usize,
/// The row constraint
constraint: Option<Vec<Expr>>,
},

/// A list of symbolic expressions
List(Vec<Expr>),

/// A count symbolic expression
Count {
/// The column to count
expr: Box<Expr>,
/// Whether the count is distinct
is_distinct: bool,
},

/// A binary symbolic expression
Expand Down Expand Up @@ -108,8 +165,39 @@ impl Expr {
}

/// Create a symbolic variable
pub fn variable(table: String, column: usize, row: usize) -> Self {
Expr::Variable { table, column, row }
pub fn variable(table: String, column: usize, row: usize, var_type: VarType) -> Self {
Expr::Variable {
table,
column,
row,
var_type,
}
}

/// Create a symbolic row
pub fn row(table: String, row_id: usize) -> Self {
Expr::Row {
table,
row_id,
constraint: None,
}
}

/// Create a symbolic row with a constraint
pub fn row_with_constraint(table: String, row_id: usize, constraint: Vec<Expr>) -> Self {
Expr::Row {
table,
row_id,
constraint: Some(constraint),
}
}

/// Create a count symbolic expression
pub fn count(expr: Expr, is_distinct: bool) -> Self {
Expr::Count {
expr: Box::new(expr),
is_distinct,
}
}

/// Create a binary symbolic expression
Expand Down Expand Up @@ -138,14 +226,44 @@ impl Expr {
}

/// Create a symbolic expression array for a column reference using table name and column position
pub fn make_colref_symbolic_expr_array(table: String, column: usize, len: usize) -> Vec<Expr> {
pub fn make_colref_symbolic_expr_array(
table: String,
column: usize,
len: usize,
row_offset: usize,
var_type: VarType,
) -> Vec<Expr> {
let mut res = Vec::with_capacity(len);
for i in 0..len {
res.push(Expr::Variable {
table: table.clone(),
column,
row: i,
row: i + row_offset,
var_type,
});
}
res
}

pub fn add_constraint_to_row(row: &mut Expr, constraint: Expr) {
if let Expr::Row {
constraint: Some(constraints),
..
} = row
{
constraints.push(constraint);
} else if let Expr::Row {
table,
row_id,
constraint: None,
} = row
{
*row = Expr::Row {
table: table.clone(),
row_id: *row_id,
constraint: Some(vec![constraint]),
};
} else {
panic!("Expr is not a symbolic row");
}
}
65 changes: 57 additions & 8 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ pub struct RecordBatch {
///
/// This is stored separately from the columns to handle the case of no columns
row_count: usize,
row_symbolic_data: Option<Vec<crate::SymbolicExpr>>,
constraints: Option<Vec<crate::SymbolicExpr>>,
}

Expand Down Expand Up @@ -238,7 +239,7 @@ impl RecordBatch {
/// ```
pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self, ArrowError> {
let options = RecordBatchOptions::new();
Self::try_new_impl(schema, columns, &options, None)
Self::try_new_impl(schema, columns, &options, None, None)
}

/// Creates a `RecordBatch` from a schema and columns, with additional options,
Expand All @@ -250,7 +251,7 @@ impl RecordBatch {
columns: Vec<ArrayRef>,
options: &RecordBatchOptions,
) -> Result<Self, ArrowError> {
Self::try_new_impl(schema, columns, options, None)
Self::try_new_impl(schema, columns, options, None, None)
}

/// Creates a `RecordBatch` from a schema and columns, with additional options and symbolic constraints.
Expand All @@ -260,7 +261,48 @@ impl RecordBatch {
options: &RecordBatchOptions,
constraints: Option<Vec<crate::SymbolicExpr>>,
) -> Result<Self, ArrowError> {
Self::try_new_impl(schema, columns, options, constraints)
Self::try_new_impl(schema, columns, options, None, constraints)
}

/// Creates a `RecordBatch` from a schema and columns, with symbolic data.
pub fn try_new_with_symbolic_data(
schema: SchemaRef,
columns: Vec<ArrayRef>,
row_symbolic_data: Option<Vec<crate::SymbolicExpr>>,
) -> Result<Self, ArrowError> {
Self::try_new_impl(
schema,
columns,
&RecordBatchOptions::new(),
row_symbolic_data,
None,
)
}

/// Creates a `RecordBatch` from a schema and columns, with constraints.
pub fn try_new_with_constraints(
schema: SchemaRef,
columns: Vec<ArrayRef>,
constraints: Option<Vec<crate::SymbolicExpr>>,
) -> Result<Self, ArrowError> {
Self::try_new_impl(
schema,
columns,
&RecordBatchOptions::new(),
None,
constraints,
)
}

/// Creates a `RecordBatch` from a schema and columns, with additional options and symbolic constraints.
pub fn try_new_with_options_and_constraints_and_symbolic_data(
schema: SchemaRef,
columns: Vec<ArrayRef>,
options: &RecordBatchOptions,
row_symbolic_data: Option<Vec<crate::SymbolicExpr>>,
constraints: Option<Vec<crate::SymbolicExpr>>,
) -> Result<Self, ArrowError> {
Self::try_new_impl(schema, columns, options, row_symbolic_data, constraints)
}

/// Creates a new empty [`RecordBatch`].
Expand All @@ -275,6 +317,7 @@ impl RecordBatch {
schema,
columns,
row_count: 0,
row_symbolic_data: None,
constraints: None,
}
}
Expand All @@ -285,9 +328,9 @@ impl RecordBatch {
schema: SchemaRef,
columns: Vec<ArrayRef>,
options: &RecordBatchOptions,
row_symbolic_data: Option<Vec<crate::SymbolicExpr>>,
constraints: Option<Vec<crate::SymbolicExpr>>,
) -> Result<Self, ArrowError> {
// dbg!(&columns);
// check that number of fields in schema match column length
if schema.fields().len() != columns.len() {
return Err(ArrowError::InvalidArgumentError(format!(
Expand Down Expand Up @@ -351,6 +394,7 @@ impl RecordBatch {
schema,
columns,
row_count,
row_symbolic_data,
constraints,
})
}
Expand All @@ -371,6 +415,7 @@ impl RecordBatch {
schema,
columns: self.columns,
row_count: self.row_count,
row_symbolic_data: self.row_symbolic_data,
constraints: self.constraints,
})
}
Expand All @@ -390,6 +435,11 @@ impl RecordBatch {
self.constraints.as_ref().map(|c| &c[..])
}

/// Returns the symbolic data of the record batch.
pub fn row_symbolic_data(&self) -> Option<&[crate::SymbolicExpr]> {
self.row_symbolic_data.as_ref().map(|c| &c[..])
}

/// Projects the schema onto the specified columns
pub fn project(&self, indices: &[usize]) -> Result<RecordBatch, ArrowError> {
let projected_schema = self.schema.project(indices)?;
Expand Down Expand Up @@ -632,15 +682,13 @@ impl RecordBatch {
.map(|column| column.slice(offset, length))
.collect();

let constraints = self
.constraints
.as_ref()
.map(|c| c[offset..offset + length].to_vec());
let constraints = self.constraints.clone();

Self {
schema: self.schema.clone(),
columns,
row_count: length,
row_symbolic_data: self.row_symbolic_data.clone(),
constraints,
}
}
Expand Down Expand Up @@ -801,6 +849,7 @@ impl From<StructArray> for RecordBatch {
schema: Arc::new(Schema::new(fields)),
row_count,
columns,
row_symbolic_data: None,
constraints: None,
}
}
Expand Down
16 changes: 9 additions & 7 deletions arrow-ord/src/cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ macro_rules! sym {

let mut sym_res = vec![];
for i in 0..syms1.len() {
let l = &syms1[i];
let r = &syms2[i];
sym_res.push(SymbolicExpr::binary(
l.clone(),
SymbolicOperator::$op,
r.clone(),
));
for j in 0..syms2.len() {
let l = &syms1[i];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not outside the nested loop?

let r = &syms2[j];
sym_res.push(SymbolicExpr::binary(
l.clone(),
SymbolicOperator::$op,
r.clone(),
));
}
}
Ok(expr.with_symbolic_data(&sym_res))
}};
Expand Down
Loading