Skip to content

Commit 799959b

Browse files
page size to 20mb, add data type for Date
1 parent 4087877 commit 799959b

File tree

4 files changed

+22
-2
lines changed

4 files changed

+22
-2
lines changed

src/event/format/json.rs

+8
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
217217
false
218218
}
219219
}
220+
DataType::Date32 => value.is_string() && is_parsable_as_date(value),
220221
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
221222
_ => {
222223
error!("Unsupported datatype {:?}, value {:?}", data_type, value);
@@ -231,3 +232,10 @@ pub fn is_parsable_as_number(value: &Value) -> bool {
231232
};
232233
s.parse::<i64>().is_ok()
233234
}
235+
236+
pub fn is_parsable_as_date(value: &Value) -> bool {
237+
let Value::String(s) = value else {
238+
return false;
239+
};
240+
chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok()
241+
}

src/event/format/mod.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::{
2626
use anyhow::{anyhow, Error as AnyError};
2727
use arrow_array::RecordBatch;
2828
use arrow_schema::{DataType, Field, Schema, TimeUnit};
29-
use chrono::DateTime;
29+
use chrono::{DateTime, NaiveDate};
3030
use serde::{Deserialize, Serialize};
3131
use serde_json::Value;
3232

@@ -325,6 +325,16 @@ pub fn override_data_type(
325325
// Update the field's data type to Float64
326326
Field::new(field_name, DataType::Int64, true)
327327
}
328+
(SchemaVersion::V1, Some(Value::String(s)))
329+
if TIME_FIELD_NAME_PARTS
330+
.iter()
331+
.any(|part| field_name.to_lowercase().contains(part))
332+
&& field.data_type() == &DataType::Utf8
333+
&& NaiveDate::parse_from_str(s, "%Y-%m-%d").is_ok() =>
334+
{
335+
// Update the field's data type to Timestamp
336+
Field::new(field_name, DataType::Date32, true)
337+
}
328338

329339
// in V1 for new fields in json with inferred type number, cast as float64.
330340
(SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_integer() => {

src/static_schema.rs

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub fn convert_static_schema_to_arrow_schema(
102102
"double" | "float" => DataType::Float64,
103103
"boolean" => DataType::Boolean,
104104
"string" => DataType::Utf8,
105+
"date" => DataType::Date32,
105106
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
106107
"string_list" => {
107108
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))

src/storage/staging.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ pub fn parquet_writer_props(
325325
.set_column_encoding(
326326
ColumnPath::new(vec![time_partition_field]),
327327
Encoding::DELTA_BINARY_PACKED,
328-
);
328+
)
329+
.set_data_page_size_limit(20 * 1024 * 1024);
329330

330331
for (field, index) in custom_partition_fields {
331332
let field = ColumnPath::new(vec![field]);

0 commit comments

Comments
 (0)