From f10d606474ab2daef69df078e2845d5c57009f48 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 11:19:08 +0530 Subject: [PATCH 1/5] Revert "update: remove time partition stream creation (#1227)" This reverts commit 10aef7b1049027e2e4ee9c961c557a0d0ef925df. --- src/parseable/mod.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index b10a34bdd..3b3927aaa 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -557,17 +557,23 @@ impl Parseable { .await; } - if !time_partition.is_empty() || !time_partition_limit.is_empty() { - return Err(StreamError::Custom { - msg: "Creating stream with time partition is not supported anymore".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + let time_partition_in_days = if !time_partition_limit.is_empty() { + Some(validate_time_partition_limit(&time_partition_limit)?) + } else { + None + }; if let Some(custom_partition) = &custom_partition { validate_custom_partition(custom_partition)?; } + if !time_partition.is_empty() && custom_partition.is_some() { + return Err(StreamError::Custom { + msg: "Cannot set both time partition and custom partition".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + let schema = validate_static_schema( body, stream_name, @@ -579,7 +585,7 @@ impl Parseable { self.create_stream( stream_name.to_string(), &time_partition, - None, + time_partition_in_days, custom_partition.as_ref(), static_schema_flag, schema, From 96af702b5e0bf0f7a83058900416156cb52792fa Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 12:09:57 +0530 Subject: [PATCH 2/5] feat: re-enable time-partitioning --- src/event/format/json.rs | 2 +- src/event/mod.rs | 12 +++++++----- src/handlers/http/ingest.rs | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 36ffe0427..3cad79062 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -179,7 +179,7 @@ impl EventFormat for Event { origin_size, is_first_event, parsed_timestamp, - time_partition: None, + time_partitioned: time_partition.is_some(), custom_partition_values, stream_type, }) diff --git a/src/event/mod.rs b/src/event/mod.rs index b641643cb..1c8b608c6 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -31,7 +31,7 @@ use crate::{ storage::StreamType, LOCK_EXPECT, }; -use chrono::NaiveDateTime; +use chrono::{NaiveDateTime, Utc}; use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; @@ -47,7 +47,7 @@ pub struct Event { pub origin_size: u64, pub is_first_event: bool, pub parsed_timestamp: NaiveDateTime, - pub time_partition: Option, + pub time_partitioned: bool, pub custom_partition_values: HashMap, pub stream_type: StreamType, } @@ -56,12 +56,14 @@ pub struct Event { impl Event { pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); - if self.time_partition.is_some() { - let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); - key.push_str(&parsed_timestamp_to_min); + if self.time_partitioned { + // For time partitioned streams, concatenate timestamp to filename, ensuring we don't write to a finished arrows file + let curr_timestamp = Utc::now().format("%Y%m%dT%H%M").to_string(); + key.push_str(&curr_timestamp); } if !self.custom_partition_values.is_empty() { + // For custom partitioned streams, concatenate values to filename, ensuring we write to different arrows files for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) { key.push_str(&format!("&{k}={v}")); } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index b8e056cac..40512e1a9 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -296,7 +296,7 @@ pub async fn push_logs_unchecked( origin_format: "json", origin_size: 0, parsed_timestamp: Utc::now().naive_utc(), - time_partition: None, + time_partitioned: false, is_first_event: true, // NOTE: Maybe should be false custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, From dfeca752cfc1b3bc867b2727d2211bed598ddafb Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 16:25:10 +0530 Subject: [PATCH 3/5] fix: flush timerange is dependent on time of acceptance only --- src/parseable/streams.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..f58537c1a 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -144,10 +144,8 @@ impl Stream { // entry is not present thus we create it std::fs::create_dir_all(&self.data_path)?; - let range = TimeRange::granularity_range( - parsed_timestamp.and_local_timezone(Utc).unwrap(), - OBJECT_STORE_DATA_GRANULARITY, - ); + let range = + TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY); let file_path = self.data_path.join(&filename); let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) .expect("File and RecordBatch both are checked"); From 89edfc8b97a12ec83e40b1ab89540f0769c34991 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 16:48:41 +0530 Subject: [PATCH 4/5] style: it's a check --- src/event/format/json.rs | 2 +- src/event/mod.rs | 4 ++-- src/handlers/http/ingest.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 3cad79062..f5026e432 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -179,7 +179,7 @@ impl EventFormat for Event { origin_size, is_first_event, parsed_timestamp, - time_partitioned: time_partition.is_some(), + is_time_partitioned: time_partition.is_some(), custom_partition_values, stream_type, }) diff --git a/src/event/mod.rs b/src/event/mod.rs index 1c8b608c6..7b7d5a060 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -47,7 +47,7 @@ pub struct Event { pub origin_size: u64, pub is_first_event: bool, pub parsed_timestamp: NaiveDateTime, - pub time_partitioned: bool, + pub is_time_partitioned: bool, pub custom_partition_values: HashMap, pub stream_type: StreamType, } @@ -56,7 +56,7 @@ pub struct Event { impl Event { pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); - if self.time_partitioned { + if self.is_time_partitioned { // For time partitioned streams, concatenate timestamp to filename, ensuring we don't write to a finished arrows file let curr_timestamp = Utc::now().format("%Y%m%dT%H%M").to_string(); key.push_str(&curr_timestamp); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 3fdc17ba8..665200a5c 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -296,7 +296,7 @@ pub async fn push_logs_unchecked( origin_format: "json", origin_size: 0, parsed_timestamp: Utc::now().naive_utc(), - time_partitioned: false, + is_time_partitioned: false, is_first_event: true, // NOTE: Maybe should be false custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, From a516cc07e489b3b622a9b7602e354f5a40825a9a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 16:55:04 +0530 Subject: [PATCH 5/5] doc: reasoning behind range creation Signed-off-by: Devdutt Shenoi --- src/parseable/streams.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index f58537c1a..c34d290f7 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -144,6 +144,7 @@ impl Stream { // entry is not present thus we create it std::fs::create_dir_all(&self.data_path)?; + // Use current time for partitioning to ensure consistent partition boundaries let range = TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY); let file_path = self.data_path.join(&filename);