Skip to content

feat: re-enable time-partitioning #1267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl EventFormat for Event {
origin_size,
is_first_event,
parsed_timestamp,
time_partition: None,
is_time_partitioned: time_partition.is_some(),
custom_partition_values,
stream_type,
})
Expand Down
12 changes: 7 additions & 5 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
storage::StreamType,
LOCK_EXPECT,
};
use chrono::NaiveDateTime;
use chrono::{NaiveDateTime, Utc};
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
Expand All @@ -47,7 +47,7 @@ pub struct Event {
pub origin_size: u64,
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
pub time_partition: Option<String>,
pub is_time_partitioned: bool,
pub custom_partition_values: HashMap<String, String>,
pub stream_type: StreamType,
}
Expand All @@ -56,12 +56,14 @@ pub struct Event {
impl Event {
pub fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
key.push_str(&parsed_timestamp_to_min);
if self.is_time_partitioned {
// For time partitioned streams, concatenate timestamp to filename, ensuring we don't write to a finished arrows file
let curr_timestamp = Utc::now().format("%Y%m%dT%H%M").to_string();
key.push_str(&curr_timestamp);
}

if !self.custom_partition_values.is_empty() {
// For custom partitioned streams, concatenate values to filename, ensuring we write to different arrows files
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
key.push_str(&format!("&{k}={v}"));
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ pub async fn push_logs_unchecked(
origin_format: "json",
origin_size: 0,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
is_time_partitioned: false,
is_first_event: true, // NOTE: Maybe should be false
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
stream_type: StreamType::UserDefined,
Expand Down
20 changes: 13 additions & 7 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,17 +557,23 @@ impl Parseable {
.await;
}

if !time_partition.is_empty() || !time_partition_limit.is_empty() {
return Err(StreamError::Custom {
msg: "Creating stream with time partition is not supported anymore".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let time_partition_in_days = if !time_partition_limit.is_empty() {
Some(validate_time_partition_limit(&time_partition_limit)?)
} else {
None
};

if let Some(custom_partition) = &custom_partition {
validate_custom_partition(custom_partition)?;
}

if !time_partition.is_empty() && custom_partition.is_some() {
return Err(StreamError::Custom {
msg: "Cannot set both time partition and custom partition".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

let schema = validate_static_schema(
body,
stream_name,
Expand All @@ -579,7 +585,7 @@ impl Parseable {
self.create_stream(
stream_name.to_string(),
&time_partition,
None,
time_partition_in_days,
custom_partition.as_ref(),
static_schema_flag,
schema,
Expand Down
7 changes: 3 additions & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ impl Stream {
// entry is not present thus we create it
std::fs::create_dir_all(&self.data_path)?;

let range = TimeRange::granularity_range(
parsed_timestamp.and_local_timezone(Utc).unwrap(),
OBJECT_STORE_DATA_GRANULARITY,
);
// Use current time for partitioning to ensure consistent partition boundaries
let range =
TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY);
let file_path = self.data_path.join(&filename);
let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)
.expect("File and RecordBatch both are checked");
Expand Down
Loading