Skip to content

Commit

Permalink
logic to overriding known timestamp fields which were inferred as str…
Browse files Browse the repository at this point in the history
…ing fields
  • Loading branch information
balaji-jr committed Oct 27, 2024
1 parent b8a3f53 commit 777bbae
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,48 @@ pub fn get_existing_fields(
existing_fields
}

pub fn get_existing_timestamp_fields(
existing_schema: &HashMap<String, Arc<Field>>,
) -> Vec<Arc<Field>> {
let mut timestamp_fields = Vec::new();

for field in existing_schema.values() {
if let DataType::Timestamp(TimeUnit::Millisecond, None) = field.data_type() {
timestamp_fields.push(field.clone());
}
}

timestamp_fields
}

pub fn override_timestamp_fields(
inferred_schema: Arc<Schema>,
existing_timestamp_fields: &[Arc<Field>],
) -> Arc<Schema> {
let timestamp_field_names: Vec<&str> = existing_timestamp_fields
.iter()
.map(|field| field.name().as_str())
.collect();

let updated_fields: Vec<Arc<Field>> = inferred_schema
.fields()
.iter()
.map(|field| {
if timestamp_field_names.contains(&field.name().as_str()) {
Arc::new(Field::new(
field.name(),
DataType::Timestamp(TimeUnit::Millisecond, None),
field.is_nullable(),
))
} else {
field.clone()
}
})
.collect();

Arc::new(Schema::new(updated_fields))
}

pub fn update_field_type_in_schema(
inferred_schema: Arc<Schema>,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
Expand All @@ -172,6 +214,9 @@ pub fn update_field_type_in_schema(

if let Some(existing_schema) = existing_schema {
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema);
// overriding known timestamp fields which were inferred as string fields
updated_schema = override_timestamp_fields(updated_schema, &existing_timestamp_fields);
let existing_field_names: Vec<String> = existing_fields
.iter()
.map(|field| field.name().clone())
Expand Down

0 comments on commit 777bbae

Please sign in to comment.