Skip to content

Commit 0df008d

Browse files
removed p_timestamp from primary filter, used Utf8View instead of Utf8
1 parent 799959b commit 0df008d

File tree

6 files changed

+66
-26
lines changed

6 files changed

+66
-26
lines changed

src/event/format/json.rs

+54-15
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#![allow(deprecated)]
2121

2222
use anyhow::anyhow;
23-
use arrow_array::RecordBatch;
23+
use arrow_array::{RecordBatch, StringArray, StringViewArray};
2424
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
2525
use arrow_schema::{DataType, Field, Fields, Schema};
2626
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
@@ -105,22 +105,61 @@ impl EventFormat for Event {
105105
Ok((value_arr, schema, is_first))
106106
}
107107

108-
// Convert the Data type (defined above) to arrow record batch
109-
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, anyhow::Error> {
110-
let array_capacity = round_upto_multiple_of_64(data.len());
111-
let mut reader = ReaderBuilder::new(schema)
112-
.with_batch_size(array_capacity)
113-
.with_coerce_primitive(false)
114-
.build_decoder()?;
115-
116-
reader.serialize(&data)?;
117-
match reader.flush() {
118-
Ok(Some(recordbatch)) => Ok(recordbatch),
119-
Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)),
120-
Ok(None) => unreachable!("all records are added to one rb"),
108+
109+
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, anyhow::Error> {
110+
// First create a schema with Utf8 instead of Utf8View
111+
let temp_schema = Schema::new(
112+
schema
113+
.fields()
114+
.iter()
115+
.map(|field| {
116+
if matches!(field.data_type(), DataType::Utf8View) {
117+
Arc::new(Field::new(field.name(), DataType::Utf8, field.is_nullable()))
118+
} else {
119+
field.clone()
120+
}
121+
})
122+
.collect::<Vec<_>>(),
123+
);
124+
125+
let array_capacity = round_upto_multiple_of_64(data.len());
126+
let mut reader = ReaderBuilder::new(Arc::new(temp_schema))
127+
.with_batch_size(array_capacity)
128+
.with_coerce_primitive(false)
129+
.with_strict_mode(false)
130+
.build_decoder()?;
131+
132+
reader.serialize(&data)?;
133+
134+
match reader.flush() {
135+
Ok(Some(temp_batch)) => {
136+
// Convert Utf8 arrays to Utf8View arrays where needed
137+
let new_columns: Vec<Arc<dyn arrow_array::Array>> = temp_batch
138+
.columns()
139+
.iter()
140+
.zip(schema.fields())
141+
.map(|(col, field)| {
142+
if matches!(field.data_type(), DataType::Utf8View) {
143+
let string_array = col
144+
.as_any()
145+
.downcast_ref::<StringArray>()
146+
.expect("Expected StringArray");
147+
Arc::new(StringViewArray::from(
148+
string_array.iter().map(|s| s.map(|s| s.to_string())).collect::<Vec<_>>()
149+
))
150+
} else {
151+
col.clone()
152+
}
153+
})
154+
.collect();
155+
156+
Ok(RecordBatch::try_new(schema, new_columns)?)
121157
}
158+
Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)),
159+
Ok(None) => unreachable!("all records are added to one rb"),
122160
}
123161
}
162+
}
124163

125164
// Returns arrow schema with the fields that are present in the request body
126165
// This schema is an input to convert the request body to arrow record batch
@@ -179,7 +218,7 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
179218
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
180219
// All numbers can be cast as Float64 from schema version v1
181220
DataType::Int64 => value.is_i64() || is_parsable_as_number(value),
182-
DataType::Utf8 => value.is_string(),
221+
DataType::Utf8View => value.is_string(),
183222
DataType::List(field) => {
184223
let data_type = field.data_type();
185224
if let Value::Array(arr) = value {

src/event/format/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ pub fn override_data_type(
298298
if TIME_FIELD_NAME_PARTS
299299
.iter()
300300
.any(|part| field_name.to_lowercase().contains(part))
301-
&& field.data_type() == &DataType::Utf8
301+
&& field.data_type() == &DataType::Utf8View
302302
&& (DateTime::parse_from_rfc3339(s).is_ok()
303303
|| DateTime::parse_from_rfc2822(s).is_ok()) =>
304304
{
@@ -319,7 +319,7 @@ pub fn override_data_type(
319319
.unwrap()
320320
.data_type()
321321
== &DataType::Int64
322-
&& field.data_type() == &DataType::Utf8
322+
&& field.data_type() == &DataType::Utf8View
323323
&& s.parse::<i64>().is_ok()) =>
324324
{
325325
// Update the field's data type to Float64
@@ -329,7 +329,7 @@ pub fn override_data_type(
329329
if TIME_FIELD_NAME_PARTS
330330
.iter()
331331
.any(|part| field_name.to_lowercase().contains(part))
332-
&& field.data_type() == &DataType::Utf8
332+
&& field.data_type() == &DataType::Utf8View
333333
&& NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok() =>
334334
{
335335
// Update the field's data type to Timestamp

src/query/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ fn transform(
494494
) -> Transformed<LogicalPlan> {
495495
plan.transform(&|plan| match plan {
496496
LogicalPlan::TableScan(table) => {
497-
let mut new_filters = vec![];
497+
let new_filters = vec![];
498498
if !table_contains_any_time_filters(&table, time_partition) {
499499
let mut _start_time_filter: Expr;
500500
let mut _end_time_filter: Expr;
@@ -529,8 +529,8 @@ fn transform(
529529
}
530530
}
531531

532-
new_filters.push(_start_time_filter);
533-
new_filters.push(_end_time_filter);
532+
// new_filters.push(_start_time_filter);
533+
// new_filters.push(_end_time_filter);
534534
}
535535
let new_filter = new_filters.into_iter().reduce(and);
536536
if let Some(new_filter) = new_filter {

src/query/stream_schema_provider.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -435,9 +435,9 @@ impl TableProvider for StandardTableProvider {
435435
.map_err(|err| DataFusionError::Plan(err.to_string()))?;
436436
let time_partition = object_store_format.time_partition;
437437
let mut time_filters = extract_primary_filter(filters, &time_partition);
438-
if time_filters.is_empty() {
439-
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
440-
}
438+
// if time_filters.is_empty() {
439+
// return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
440+
// }
441441

442442
if include_now(filters, &time_partition) {
443443
if let Some(records) =

src/static_schema.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub fn convert_static_schema_to_arrow_schema(
101101
"int" => DataType::Int64,
102102
"double" | "float" => DataType::Float64,
103103
"boolean" => DataType::Boolean,
104-
"string" => DataType::Utf8,
104+
"string" => DataType::Utf8View,
105105
"date" => DataType::Date32,
106106
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
107107
"string_list" => {

src/storage/staging.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ pub fn parquet_writer_props(
326326
ColumnPath::new(vec![time_partition_field]),
327327
Encoding::DELTA_BINARY_PACKED,
328328
)
329-
.set_data_page_size_limit(20 * 1024 * 1024);
329+
.set_data_page_size_limit(20 * 1024 * 1024)
330+
.set_data_page_row_count_limit(100000);
330331

331332
for (field, index) in custom_partition_fields {
332333
let field = ColumnPath::new(vec![field]);

0 commit comments

Comments
 (0)