diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 36ffe0427..f5026e432 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -179,7 +179,7 @@ impl EventFormat for Event { origin_size, is_first_event, parsed_timestamp, - time_partition: None, + is_time_partitioned: time_partition.is_some(), custom_partition_values, stream_type, }) diff --git a/src/event/mod.rs b/src/event/mod.rs index b641643cb..7b7d5a060 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -31,7 +31,7 @@ use crate::{ storage::StreamType, LOCK_EXPECT, }; -use chrono::NaiveDateTime; +use chrono::{NaiveDateTime, Utc}; use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; @@ -47,7 +47,7 @@ pub struct Event { pub origin_size: u64, pub is_first_event: bool, pub parsed_timestamp: NaiveDateTime, - pub time_partition: Option, + pub is_time_partitioned: bool, pub custom_partition_values: HashMap, pub stream_type: StreamType, } @@ -56,12 +56,14 @@ pub struct Event { impl Event { pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); - if self.time_partition.is_some() { - let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); - key.push_str(&parsed_timestamp_to_min); + if self.is_time_partitioned { + // For time partitioned streams, concatenate timestamp to filename, ensuring we don't write to a finished arrows file + let curr_timestamp = Utc::now().format("%Y%m%dT%H%M").to_string(); + key.push_str(&curr_timestamp); } if !self.custom_partition_values.is_empty() { + // For custom partitioned streams, concatenate values to filename, ensuring we write to different arrows files for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) { key.push_str(&format!("&{k}={v}")); } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index e0ab36889..665200a5c 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -296,7 +296,7 @@ pub async fn push_logs_unchecked( origin_format: "json", origin_size: 0, parsed_timestamp: Utc::now().naive_utc(), - time_partition: None, + is_time_partitioned: false, is_first_event: true, // NOTE: Maybe should be false custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index b10a34bdd..3b3927aaa 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -557,17 +557,23 @@ impl Parseable { .await; } - if !time_partition.is_empty() || !time_partition_limit.is_empty() { - return Err(StreamError::Custom { - msg: "Creating stream with time partition is not supported anymore".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + let time_partition_in_days = if !time_partition_limit.is_empty() { + Some(validate_time_partition_limit(&time_partition_limit)?) + } else { + None + }; if let Some(custom_partition) = &custom_partition { validate_custom_partition(custom_partition)?; } + if !time_partition.is_empty() && custom_partition.is_some() { + return Err(StreamError::Custom { + msg: "Cannot set both time partition and custom partition".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + let schema = validate_static_schema( body, stream_name, @@ -579,7 +585,7 @@ impl Parseable { self.create_stream( stream_name.to_string(), &time_partition, - None, + time_partition_in_days, custom_partition.as_ref(), static_schema_flag, schema, diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..c34d290f7 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -144,10 +144,9 @@ impl Stream { // entry is not present thus we create it std::fs::create_dir_all(&self.data_path)?; - let range = TimeRange::granularity_range( - parsed_timestamp.and_local_timezone(Utc).unwrap(), - OBJECT_STORE_DATA_GRANULARITY, - ); + // Use current time for partitioning to ensure consistent partition boundaries + let range = + TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY); let file_path = self.data_path.join(&filename); let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) .expect("File and RecordBatch both are checked");