Skip to content

Commit b8a3f53

Browse files
committed
detect datetime column on ingestion
1 parent b3f4090 commit b8a3f53

File tree

3 files changed

+55
-12
lines changed

3 files changed

+55
-12
lines changed

server/src/event/format.rs

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub trait EventFormat: Sized {
102102
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
103103
return Err(anyhow!("Schema mismatch"));
104104
}
105-
new_schema = update_field_type_in_schema(new_schema, time_partition);
105+
new_schema = update_field_type_in_schema(new_schema, None, time_partition, None);
106106
let rb = Self::decode(data, new_schema.clone())?;
107107
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
108108
let metadata_arr =
@@ -147,19 +147,56 @@ pub trait EventFormat: Sized {
147147
}
148148
}
149149

150+
pub fn get_existing_fields(
151+
inferred_schema: Arc<Schema>,
152+
existing_schema: Option<&HashMap<String, Arc<Field>>>,
153+
) -> Vec<Arc<Field>> {
154+
let mut existing_fields = Vec::new();
155+
156+
for field in inferred_schema.fields.iter() {
157+
if existing_schema.map_or(false, |schema| schema.contains_key(field.name())) {
158+
existing_fields.push(field.clone());
159+
}
160+
}
161+
162+
existing_fields
163+
}
164+
150165
pub fn update_field_type_in_schema(
151-
schema: Arc<Schema>,
166+
inferred_schema: Arc<Schema>,
167+
existing_schema: Option<&HashMap<String, Arc<Field>>>,
152168
time_partition: Option<String>,
169+
log_records: Option<&Vec<Value>>,
153170
) -> Arc<Schema> {
171+
let mut updated_schema = inferred_schema.clone();
172+
173+
if let Some(existing_schema) = existing_schema {
174+
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
175+
let existing_field_names: Vec<String> = existing_fields
176+
.iter()
177+
.map(|field| field.name().clone())
178+
.collect();
179+
180+
if let Some(log_records) = log_records {
181+
for log_record in log_records {
182+
updated_schema = Arc::new(update_data_type_to_datetime(
183+
(*updated_schema).clone(),
184+
log_record.clone(),
185+
existing_field_names.clone(),
186+
));
187+
}
188+
}
189+
}
190+
154191
if time_partition.is_none() {
155-
return schema;
192+
return updated_schema;
156193
}
157-
let field_name = time_partition.unwrap();
158-
let new_schema: Vec<Field> = schema
194+
let time_partition_field_name = time_partition.unwrap();
195+
let new_schema: Vec<Field> = updated_schema
159196
.fields()
160197
.iter()
161198
.map(|field| {
162-
if *field.name() == field_name {
199+
if *field.name() == time_partition_field_name {
163200
if field.data_type() == &DataType::Utf8 {
164201
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
165202
Field::new(field.name().clone(), new_data_type, true)
@@ -174,12 +211,16 @@ pub fn update_field_type_in_schema(
174211
Arc::new(Schema::new(new_schema))
175212
}
176213

177-
pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema {
214+
pub fn update_data_type_to_datetime(
215+
schema: Schema,
216+
value: Value,
217+
ignore_field_names: Vec<String>,
218+
) -> Schema {
178219
let new_schema: Vec<Field> = schema
179220
.fields()
180221
.iter()
181222
.map(|field| {
182-
if field.data_type() == &DataType::Utf8 {
223+
if field.data_type() == &DataType::Utf8 && !ignore_field_names.contains(field.name()) {
183224
if let Value::Object(map) = &value {
184225
if let Some(Value::String(s)) = map.get(field.name()) {
185226
if DateTime::parse_from_rfc3339(s).is_ok() {

server/src/event/format/json.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ impl EventFormat for Event {
7171
Ok(mut infer_schema) => {
7272
let new_infer_schema = super::super::format::update_field_type_in_schema(
7373
Arc::new(infer_schema),
74+
Some(&stream_schema),
7475
time_partition,
76+
Some(&value_arr),
7577
);
7678
infer_schema = Schema::new(new_infer_schema.fields().clone());
7779
if let Err(err) = Schema::try_merge(vec![

server/src/handlers/http/logstream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub async fn list(_: HttpRequest) -> impl Responder {
9393

9494
pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
9595
let body_val: Value = serde_json::from_slice(&body)?;
96-
let value_arr: Vec<Value> = match body_val {
96+
let log_records: Vec<Value> = match body_val {
9797
Value::Array(arr) => arr,
9898
value @ Value::Object(_) => vec![value],
9999
_ => {
@@ -104,9 +104,9 @@ pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
104104
}
105105
};
106106

107-
let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap();
108-
for value in value_arr {
109-
schema = update_data_type_to_datetime(schema, value);
107+
let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap();
108+
for log_record in log_records {
109+
schema = update_data_type_to_datetime(schema, log_record, Vec::new());
110110
}
111111
Ok((web::Json(schema), StatusCode::OK))
112112
}

0 commit comments

Comments
 (0)