diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 10025ed98..d1c92fb51 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -62,7 +62,7 @@ impl ParseableSinkProcessor { let stream = PARSEABLE.get_stream(stream_name)?; let schema = stream.get_schema_raw(); let time_partition = stream.get_time_partition(); - let custom_partition = stream.get_custom_partition(); + let custom_partition = stream.get_custom_partitions(); let static_schema_flag = stream.get_static_schema_flag(); let schema_version = stream.get_schema_version(); diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 903ab2752..b45b7c986 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -145,18 +145,13 @@ impl EventFormat for Event { origin_size: u64, storage_schema: &HashMap>, static_schema_flag: bool, - custom_partitions: Option<&String>, + custom_partitions: &[String], time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, ) -> Result { - let custom_partition_values = match custom_partitions.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - extract_custom_partition_values(&self.json, &custom_partitions) - } - None => HashMap::new(), - }; + let custom_partition_values = + extract_custom_partition_values(&self.json, custom_partitions); let parsed_timestamp = match time_partition { Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?, @@ -188,7 +183,7 @@ impl EventFormat for Event { /// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}` pub fn extract_custom_partition_values( json: &Value, - custom_partition_list: &[&str], + custom_partition_list: &[String], ) -> HashMap { let mut custom_partition_values: HashMap = HashMap::new(); for custom_partition_field in custom_partition_list { diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 58c35fc79..9ab8c83ca 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -219,7 +219,7 @@ pub trait EventFormat: Sized { origin_size: u64, storage_schema: &HashMap>, static_schema_flag: bool, - custom_partitions: Option<&String>, + custom_partitions: &[String], time_partition: Option<&String>, schema_version: SchemaVersion, stream_type: StreamType, diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 1e5e6d048..16d705eec 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -98,7 +98,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< size as u64, &schema, false, - None, + &[], None, SchemaVersion::V0, StreamType::Internal, @@ -546,7 +546,7 @@ mod tests { json, None, None, - None, + &[], SchemaVersion::V0, &crate::event::format::LogSource::default() ) @@ -749,7 +749,7 @@ mod tests { json, None, None, - None, + &[], SchemaVersion::V0, &crate::event::format::LogSource::default(), ) @@ -832,7 +832,7 @@ mod tests { json, None, None, - None, + &[], SchemaVersion::V1, &crate::event::format::LogSource::default(), ) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index a3a2096ae..0593dabb2 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -16,34 +16,39 @@ * */ -use self::error::StreamError; -use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; -use super::query::update_schema_when_distributed; -use crate::event::format::override_data_type; -use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; -use crate::metadata::SchemaVersion; -use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::parseable::{StreamNotFound, PARSEABLE}; -use crate::rbac::role::Action; -use crate::rbac::Users; -use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; -use crate::storage::retention::Retention; -use crate::storage::{StreamInfo, StreamType}; -use crate::utils::actix::extract_session_key_from_req; -use crate::{stats, validator, LOCK_EXPECT}; - -use actix_web::http::StatusCode; -use actix_web::web::{Json, Path}; -use actix_web::{web, HttpRequest, Responder}; +use std::{fs, sync::Arc}; + +use actix_web::{ + web::{Json, Path}, + HttpRequest, Responder, +}; use arrow_json::reader::infer_json_schema_from_iterator; use bytes::Bytes; use chrono::Utc; +use error::StreamError; +use http::StatusCode; use itertools::Itertools; use serde_json::{json, Value}; -use std::fs; -use std::sync::Arc; use tracing::warn; +use crate::{ + event::format::override_data_type, + hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}, + metadata::SchemaVersion, + metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}, + parseable::{StreamNotFound, PARSEABLE}, + rbac::{role::Action, Users}, + stats::{self, event_labels_date, storage_size_labels_date, Stats}, + storage::{retention::Retention, StreamInfo, StreamType}, + utils::actix::extract_session_key_from_req, + validator, LOCK_EXPECT, +}; + +use super::{ + cluster::utils::{IngestionStats, QueriedStats, StorageStats}, + query::update_schema_when_distributed, +}; + pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); // Error out if stream doesn't exist in memory, or in the case of query node, in storage as well @@ -98,7 +103,7 @@ pub async fn list(req: HttpRequest) -> Result { .map(|name| json!({"name": name})) .collect_vec(); - Ok(web::Json(res)) + Ok(Json(res)) } pub async fn detect_schema(Json(json): Json) -> Result { @@ -117,7 +122,7 @@ pub async fn detect_schema(Json(json): Json) -> Result) -> Result { @@ -132,7 +137,7 @@ pub async fn get_schema(stream_name: Path) -> Result { let schema = stream.get_schema(); - Ok((web::Json(schema), StatusCode::OK)) + Ok((Json(schema), StatusCode::OK)) } Err(err) => Err(StreamError::Custom { msg: err.to_string(), @@ -168,7 +173,7 @@ pub async fn get_retention(stream_name: Path) -> Result) -> Result { @@ -333,12 +338,12 @@ pub async fn get_stream_info(stream_name: Path) -> Result) -> Result StatusCode::FORBIDDEN, StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, + StreamError::HeaderParsing(_) => StatusCode::BAD_REQUEST, } } @@ -596,7 +605,7 @@ mod tests { #[actix_web::test] async fn header_without_log_source() { let req = TestRequest::default().to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Json); } @@ -605,19 +614,19 @@ mod tests { let mut req = TestRequest::default() .insert_header(("X-P-Log-Source", "pmeta")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Pmeta); req = TestRequest::default() .insert_header(("X-P-Log-Source", "otel-logs")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::OtelLogs); req = TestRequest::default() .insert_header(("X-P-Log-Source", "kinesis")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Kinesis); } @@ -626,7 +635,7 @@ mod tests { let req = TestRequest::default() .insert_header(("X-P-Log-Source", "teststream")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Json); } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 84d5ae117..b6435634c 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -82,20 +82,18 @@ async fn push_logs( ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; let time_partition = stream.get_time_partition(); - let time_partition_limit = PARSEABLE - .get_stream(stream_name)? - .get_time_partition_limit(); + let time_partition_limit = stream.get_time_partition_limit(); let static_schema_flag = stream.get_static_schema_flag(); - let custom_partition = stream.get_custom_partition(); + let custom_partitions = stream.get_custom_partitions(); let schema_version = stream.get_schema_version(); let p_timestamp = Utc::now(); - let data = if time_partition.is_some() || custom_partition.is_some() { + let data = if time_partition.is_some() || !custom_partitions.is_empty() { convert_array_to_object( json, time_partition.as_ref(), time_partition_limit, - custom_partition.as_ref(), + &custom_partitions, schema_version, log_source, )? @@ -104,7 +102,7 @@ async fn push_logs( json, None, None, - None, + &[], schema_version, log_source, )?)?] @@ -119,7 +117,7 @@ async fn push_logs( origin_size, &schema, static_schema_flag, - custom_partition.as_ref(), + &custom_partitions, time_partition.as_ref(), schema_version, StreamType::UserDefined, diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 15c25da2b..841c79c54 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -16,6 +16,8 @@ * */ +use std::num::NonZeroU32; + use actix_web::http::header::HeaderMap; use crate::{ @@ -27,31 +29,57 @@ use crate::{ storage::StreamType, }; +/// Name of a field that appears within a data stream +pub type FieldName = String; + +/// Name of the field used as a custom partition +pub type CustomPartition = String; + +#[derive(Debug, thiserror::Error)] +pub enum HeaderParseError { + #[error("Maximum 3 custom partition keys are supported")] + TooManyPartitions, + #[error("Missing 'd' suffix for duration value")] + UnsupportedUnit, + #[error("Could not convert duration to an unsigned number")] + ZeroOrNegative, +} + #[derive(Debug, Default)] pub struct PutStreamHeaders { - pub time_partition: String, - pub time_partition_limit: String, - pub custom_partition: Option, + pub time_partition: Option, + pub time_partition_limit: Option, + pub custom_partitions: Vec, pub static_schema_flag: bool, pub update_stream_flag: bool, pub stream_type: StreamType, pub log_source: LogSource, } -impl From<&HeaderMap> for PutStreamHeaders { - fn from(headers: &HeaderMap) -> Self { - PutStreamHeaders { - time_partition: headers - .get(TIME_PARTITION_KEY) - .map_or("", |v| v.to_str().unwrap()) - .to_string(), - time_partition_limit: headers - .get(TIME_PARTITION_LIMIT_KEY) - .map_or("", |v| v.to_str().unwrap()) - .to_string(), - custom_partition: headers - .get(CUSTOM_PARTITION_KEY) - .map(|v| v.to_str().unwrap().to_string()), +impl TryFrom<&HeaderMap> for PutStreamHeaders { + type Error = HeaderParseError; + + fn try_from(headers: &HeaderMap) -> Result { + let time_partition = headers + .get(TIME_PARTITION_KEY) + .map(|v| v.to_str().unwrap().to_owned()); + let time_partition_limit = match headers + .get(TIME_PARTITION_LIMIT_KEY) + .map(|v| v.to_str().unwrap()) + { + Some(limit) => Some(parse_time_partition_limit(limit)?), + None => None, + }; + let custom_partition = headers + .get(CUSTOM_PARTITION_KEY) + .map(|v| v.to_str().unwrap()) + .unwrap_or_default(); + let custom_partitions = parse_custom_partition(custom_partition)?; + + let headers = PutStreamHeaders { + time_partition, + time_partition_limit, + custom_partitions, static_schema_flag: headers .get(STATIC_SCHEMA_FLAG) .is_some_and(|v| v.to_str().unwrap() == "true"), @@ -65,6 +93,36 @@ impl From<&HeaderMap> for PutStreamHeaders { log_source: headers .get(LOG_SOURCE_KEY) .map_or(LogSource::default(), |v| v.to_str().unwrap().into()), - } + }; + + Ok(headers) } } + +pub fn parse_custom_partition( + custom_partition: &str, +) -> Result, HeaderParseError> { + let custom_partition_list = custom_partition + .split(',') + .map(String::from) + .collect::>(); + if custom_partition_list.len() > 3 { + return Err(HeaderParseError::TooManyPartitions); + } + + Ok(custom_partition_list) +} + +pub fn parse_time_partition_limit( + time_partition_limit: &str, +) -> Result { + if !time_partition_limit.ends_with('d') { + return Err(HeaderParseError::UnsupportedUnit); + } + let days = &time_partition_limit[0..time_partition_limit.len() - 1]; + let Ok(days) = days.parse::() else { + return Err(HeaderParseError::ZeroOrNegative); + }; + + Ok(days) +} diff --git a/src/metadata.rs b/src/metadata.rs index 79cec19e5..5ee158c31 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -83,7 +83,7 @@ pub struct LogStreamMetadata { pub first_event_at: Option, pub time_partition: Option, pub time_partition_limit: Option, - pub custom_partition: Option, + pub custom_partitions: Vec, pub static_schema_flag: bool, pub hot_tier_enabled: bool, pub stream_type: StreamType, @@ -94,9 +94,9 @@ impl LogStreamMetadata { #[allow(clippy::too_many_arguments)] pub fn new( created_at: String, - time_partition: String, + time_partition: Option, time_partition_limit: Option, - custom_partition: Option, + custom_partitions: Vec, static_schema_flag: bool, static_schema: HashMap>, stream_type: StreamType, @@ -109,13 +109,9 @@ impl LogStreamMetadata { } else { created_at }, - time_partition: if time_partition.is_empty() { - None - } else { - Some(time_partition) - }, + time_partition, time_partition_limit, - custom_partition, + custom_partitions, static_schema_flag, schema: if static_schema.is_empty() { HashMap::new() diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 20c159a63..5024f9191 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -309,7 +309,7 @@ async fn setup_logstream_metadata( stats, time_partition, time_partition_limit, - custom_partition, + custom_partitions, static_schema_flag, hot_tier_enabled, stream_type, @@ -342,7 +342,7 @@ async fn setup_logstream_metadata( first_event_at, time_partition, time_partition_limit: time_partition_limit.and_then(|limit| limit.parse().ok()), - custom_partition, + custom_partitions, static_schema_flag, hot_tier_enabled, stream_type, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index f8ffa1ca9..055d1a74f 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -322,11 +322,11 @@ impl Parseable { .collect(); let created_at = stream_metadata.created_at; - let time_partition = stream_metadata.time_partition.unwrap_or_default(); + let time_partition = stream_metadata.time_partition; let time_partition_limit = stream_metadata .time_partition_limit .and_then(|limit| limit.parse().ok()); - let custom_partition = stream_metadata.custom_partition; + let custom_partition = stream_metadata.custom_partitions; let static_schema_flag = stream_metadata.static_schema_flag; let stream_type = stream_metadata.stream_type; let schema_version = stream_metadata.schema_version; @@ -408,9 +408,9 @@ impl Parseable { self.create_stream( stream_name.to_string(), - "", None, None, + &[], false, Arc::new(Schema::empty()), stream_type, @@ -481,12 +481,12 @@ impl Parseable { let PutStreamHeaders { time_partition, time_partition_limit, - custom_partition, + custom_partitions, static_schema_flag, update_stream_flag, stream_type, log_source, - } = headers.into(); + } = headers.try_into()?; let stream_in_memory_dont_update = self.streams.contains(stream_name) && !update_stream_flag; @@ -511,36 +511,32 @@ impl Parseable { stream_name, &time_partition, static_schema_flag, - &time_partition_limit, - custom_partition.as_ref(), + time_partition_limit, + &custom_partitions, ) .await; } - if !time_partition.is_empty() || !time_partition_limit.is_empty() { + if time_partition.is_some() || time_partition_limit.is_some() { return Err(StreamError::Custom { msg: "Creating stream with time partition is not supported anymore".to_string(), status: StatusCode::BAD_REQUEST, }); } - if let Some(custom_partition) = &custom_partition { - validate_custom_partition(custom_partition)?; - } - let schema = validate_static_schema( body, stream_name, &time_partition, - custom_partition.as_ref(), + &custom_partitions, static_schema_flag, )?; let log_source_entry = LogSourceEntry::new(log_source, HashSet::new()); self.create_stream( stream_name.to_string(), - &time_partition, - None, - custom_partition.as_ref(), + time_partition, + time_partition_limit, + &custom_partitions, static_schema_flag, schema, stream_type, @@ -555,15 +551,15 @@ impl Parseable { &self, headers: &HeaderMap, stream_name: &str, - time_partition: &str, + time_partition: &Option, static_schema_flag: bool, - time_partition_limit: &str, - custom_partition: Option<&String>, + time_partition_limit: Option, + custom_partitions: &[String], ) -> Result { if !self.streams.contains(stream_name) { return Err(StreamNotFound(stream_name.to_string()).into()); } - if !time_partition.is_empty() { + if time_partition.is_some() { return Err(StreamError::Custom { msg: "Altering the time partition of an existing stream is restricted.".to_string(), status: StatusCode::BAD_REQUEST, @@ -575,8 +571,7 @@ impl Parseable { status: StatusCode::BAD_REQUEST, }); } - if !time_partition_limit.is_empty() { - let time_partition_days = validate_time_partition_limit(time_partition_limit)?; + if let Some(time_partition_days) = time_partition_limit { self.update_time_partition_limit_in_stream( stream_name.to_string(), time_partition_days, @@ -584,7 +579,7 @@ impl Parseable { .await?; return Ok(headers.clone()); } - self.validate_and_update_custom_partition(stream_name, custom_partition) + self.update_custom_partition_in_stream(stream_name.to_string(), custom_partitions) .await?; Ok(headers.clone()) @@ -594,9 +589,9 @@ impl Parseable { pub async fn create_stream( &self, stream_name: String, - time_partition: &str, + time_partition: Option, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], static_schema_flag: bool, schema: Arc, stream_type: StreamType, @@ -613,9 +608,9 @@ impl Parseable { created_at: Utc::now().to_rfc3339(), permissions: vec![Permisssion::new(PARSEABLE.options.username.clone())], stream_type, - time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), + time_partition: time_partition.clone(), time_partition_limit: time_partition_limit.map(|limit| limit.to_string()), - custom_partition: custom_partition.cloned(), + custom_partitions: custom_partitions.to_vec(), static_schema_flag, schema_version: SchemaVersion::V1, // NOTE: Newly created streams are all V1 owner: Owner { @@ -643,9 +638,9 @@ impl Parseable { let metadata = LogStreamMetadata::new( created_at, - time_partition.to_owned(), + time_partition, time_partition_limit, - custom_partition.cloned(), + custom_partitions.to_vec(), static_schema_flag, static_schema, stream_type, @@ -668,28 +663,6 @@ impl Parseable { Ok(()) } - async fn validate_and_update_custom_partition( - &self, - stream_name: &str, - custom_partition: Option<&String>, - ) -> Result<(), StreamError> { - let stream = self.get_stream(stream_name).expect(STREAM_EXISTS); - if stream.get_time_partition().is_some() { - return Err(StreamError::Custom { - msg: "Cannot set both time partition and custom partition".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - if let Some(custom_partition) = custom_partition { - validate_custom_partition(custom_partition)?; - } - - self.update_custom_partition_in_stream(stream_name.to_string(), custom_partition) - .await?; - - Ok(()) - } - pub async fn update_time_partition_limit_in_stream( &self, stream_name: String, @@ -718,7 +691,7 @@ impl Parseable { pub async fn update_custom_partition_in_stream( &self, stream_name: String, - custom_partition: Option<&String>, + custom_partitions: &[String], ) -> Result<(), CreateStreamError> { let stream = self.get_stream(&stream_name).expect(STREAM_EXISTS); let static_schema_flag = stream.get_static_schema_flag(); @@ -726,46 +699,39 @@ impl Parseable { if static_schema_flag { let schema = stream.get_schema(); - if let Some(custom_partition) = custom_partition { - let custom_partition_list = custom_partition.split(',').collect::>(); - for partition in custom_partition_list.iter() { - if !schema - .fields() - .iter() - .any(|field| field.name() == partition) - { - return Err(CreateStreamError::Custom { + for partition in custom_partitions.iter() { + if !schema + .fields() + .iter() + .any(|field| field.name() == partition) + { + return Err(CreateStreamError::Custom { msg: format!("custom partition field {partition} does not exist in the schema for the stream {stream_name}"), status: StatusCode::BAD_REQUEST, }); - } - } - - for partition in custom_partition_list { - if time_partition - .as_ref() - .is_some_and(|time| time == partition) - { - return Err(CreateStreamError::Custom { - msg: format!( - "time partition {} cannot be set as custom partition", - partition - ), - status: StatusCode::BAD_REQUEST, - }); - } + } else if time_partition + .as_ref() + .is_some_and(|time| time == partition) + { + return Err(CreateStreamError::Custom { + msg: format!( + "time partition {} cannot be set as custom partition", + partition + ), + status: StatusCode::BAD_REQUEST, + }); } } } let storage = self.storage.get_object_store(); if let Err(err) = storage - .update_custom_partition_in_stream(&stream_name, custom_partition) + .update_custom_partitions_in_stream(&stream_name, custom_partitions) .await { return Err(CreateStreamError::Storage { stream_name, err }); } - stream.set_custom_partition(custom_partition); + stream.set_custom_partitions(custom_partitions.to_vec()); Ok(()) } @@ -849,8 +815,8 @@ impl Parseable { pub fn validate_static_schema( body: &Bytes, stream_name: &str, - time_partition: &str, - custom_partition: Option<&String>, + time_partition: &Option, + custom_partition: &[String], static_schema_flag: bool, ) -> Result, CreateStreamError> { if !static_schema_flag { @@ -876,34 +842,3 @@ pub fn validate_static_schema( Ok(parsed_schema) } - -pub fn validate_time_partition_limit( - time_partition_limit: &str, -) -> Result { - if !time_partition_limit.ends_with('d') { - return Err(CreateStreamError::Custom { - msg: "Missing 'd' suffix for duration value".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - let days = &time_partition_limit[0..time_partition_limit.len() - 1]; - let Ok(days) = days.parse::() else { - return Err(CreateStreamError::Custom { - msg: "Could not convert duration to an unsigned number".to_string(), - status: StatusCode::BAD_REQUEST, - }); - }; - - Ok(days) -} - -pub fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> { - let custom_partition_list = custom_partition.split(',').collect::>(); - if custom_partition_list.len() > 1 { - return Err(CreateStreamError::Custom { - msg: "Maximum 1 custom partition key is supported".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - Ok(()) -} diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 784658c1f..2a8850bc7 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -308,14 +308,14 @@ impl Stream { ); let time_partition = self.get_time_partition(); - let custom_partition = self.get_custom_partition(); + let custom_partitions = self.get_custom_partitions(); // read arrow files on disk // convert them to parquet let schema = self .convert_disk_files_to_parquet( time_partition.as_ref(), - custom_partition.as_ref(), + custom_partitions, shutdown_signal, ) .inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?; @@ -373,7 +373,7 @@ impl Stream { &self, merged_schema: &Schema, time_partition: Option<&String>, - custom_partition: Option<&String>, + custom_partitions: &Vec, ) -> WriterProperties { // Determine time partition field let time_partition_field = time_partition.map_or(DEFAULT_TIMESTAMP_KEY, |tp| tp.as_str()); @@ -397,18 +397,16 @@ impl Stream { }]; // Describe custom partition column encodings and sorting - if let Some(custom_partition) = custom_partition { - for partition in custom_partition.split(',') { - if let Ok(idx) = merged_schema.index_of(partition) { - let column_path = ColumnPath::new(vec![partition.to_string()]); - props = props.set_column_encoding(column_path, Encoding::DELTA_BYTE_ARRAY); - - sorting_column_vec.push(SortingColumn { - column_idx: idx as i32, - descending: true, - nulls_first: true, - }); - } + for partition in custom_partitions { + if let Ok(idx) = merged_schema.index_of(partition) { + let column_path = ColumnPath::new(vec![partition.to_string()]); + props = props.set_column_encoding(column_path, Encoding::DELTA_BYTE_ARRAY); + + sorting_column_vec.push(SortingColumn { + column_idx: idx as i32, + descending: true, + nulls_first: true, + }); } } @@ -422,7 +420,7 @@ impl Stream { pub fn convert_disk_files_to_parquet( &self, time_partition: Option<&String>, - custom_partition: Option<&String>, + custom_partitions: Vec, shutdown_signal: bool, ) -> Result, StagingError> { let mut schemas = Vec::new(); @@ -468,7 +466,8 @@ impl Stream { } let merged_schema = record_reader.merged_schema(); - let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition); + let props = + self.parquet_writer_props(&merged_schema, time_partition, &custom_partitions); schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); let mut part_path = parquet_path.to_owned(); @@ -566,11 +565,11 @@ impl Stream { .time_partition_limit } - pub fn get_custom_partition(&self) -> Option { + pub fn get_custom_partitions(&self) -> Vec { self.metadata .read() .expect(LOCK_EXPECT) - .custom_partition + .custom_partitions .clone() } @@ -652,8 +651,8 @@ impl Stream { .time_partition_limit = Some(time_partition_limit); } - pub fn set_custom_partition(&self, custom_partition: Option<&String>) { - self.metadata.write().expect(LOCK_EXPECT).custom_partition = custom_partition.cloned(); + pub fn set_custom_partitions(&self, custom_partitions: Vec) { + self.metadata.write().expect(LOCK_EXPECT).custom_partitions = custom_partitions; } pub fn set_hot_tier(&self, enable: bool) { @@ -994,7 +993,7 @@ mod tests { LogStreamMetadata::default(), None, ) - .convert_disk_files_to_parquet(None, None, false)?; + .convert_disk_files_to_parquet(None, vec![], false)?; assert!(result.is_none()); // Verify metrics were set to 0 let staging_files = metrics::STAGING_FILES.with_label_values(&[&stream]).get(); @@ -1073,7 +1072,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, vec![], true) .unwrap(); assert!(result.is_some()); @@ -1122,7 +1121,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, vec![], true) .unwrap(); assert!(result.is_some()); @@ -1176,7 +1175,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, false) + .convert_disk_files_to_parquet(None, vec![], false) .unwrap(); assert!(result.is_some()); diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index e37db20e4..72875f08f 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -186,7 +186,7 @@ async fn get_stream_info_helper(stream_name: &str) -> Result, + time_partition: &Option, + custom_partitions: &[String], ) -> Result, StaticSchemaError> { let mut parsed_schema = ParsedSchema { fields: Vec::new(), @@ -69,34 +69,22 @@ pub fn convert_static_schema_to_arrow_schema( }; let mut time_partition_exists = false; - if let Some(custom_partition) = custom_partition { - let custom_partition_list = custom_partition.split(',').collect::>(); - let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len()); - - for partition in &custom_partition_list { - if static_schema - .fields - .iter() - .any(|field| &field.name == partition) - { - custom_partition_exists.insert(partition.to_string(), true); - } - } - - for partition in &custom_partition_list { - if !custom_partition_exists.contains_key(*partition) { - return Err(StaticSchemaError::MissingCustomPartition( - partition.to_string(), - )); - } + for partition in custom_partitions { + if !static_schema + .fields + .iter() + .any(|field| &field.name == partition) + { + return Err(StaticSchemaError::MissingCustomPartition( + partition.to_owned(), + )); } } let mut existing_field_names: HashSet = HashSet::new(); - for mut field in static_schema.fields { validate_field_names(&field.name, &mut existing_field_names)?; - if !time_partition.is_empty() && field.name == time_partition { + if time_partition.as_ref().is_some_and(|p| p == &field.name) { time_partition_exists = true; field.data_type = "datetime".to_string(); } @@ -135,11 +123,14 @@ pub fn convert_static_schema_to_arrow_schema( parsed_schema.fields.push(parsed_field); } - if !time_partition.is_empty() && !time_partition_exists { - return Err(StaticSchemaError::MissingTimePartition( - time_partition.to_string(), - )); + if let Some(time_partition) = time_partition { + if !time_partition_exists { + return Err(StaticSchemaError::MissingTimePartition( + time_partition.to_owned(), + )); + } } + add_parseable_fields_to_static_schema(parsed_schema) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6a272c8e4..b980d3760 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -28,7 +28,10 @@ use crate::{ option::StandaloneWithDistributed, parseable::StreamNotFound, stats::FullStats, - utils::json::{deserialize_string_as_true, serialize_bool_as_true}, + utils::json::{ + deserialize_custom_partitions, deserialize_string_as_true, serialize_bool_as_true, + serialize_custom_partitions, + }, }; use chrono::Utc; @@ -103,8 +106,11 @@ pub struct ObjectStoreFormat { pub time_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub custom_partition: Option, + #[serde( + deserialize_with = "deserialize_custom_partitions", + serialize_with = "serialize_custom_partitions" + )] + pub custom_partitions: Vec, #[serde( default, // sets to false if not configured deserialize_with = "deserialize_string_as_true", @@ -131,8 +137,12 @@ pub struct StreamInfo { pub time_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub custom_partition: Option, + #[serde( + deserialize_with = "deserialize_custom_partitions", + serialize_with = "serialize_custom_partitions", + rename = "custom_partition" + )] + pub custom_partitions: Vec, #[serde( default, // sets to false if not configured deserialize_with = "deserialize_string_as_true", @@ -216,7 +226,7 @@ impl Default for ObjectStoreFormat { retention: None, time_partition: None, time_partition_limit: None, - custom_partition: None, + custom_partitions: vec![], static_schema_flag: false, hot_tier_enabled: false, log_source: vec![LogSourceEntry::default()], diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 680d20f3e..97b3481d0 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -256,13 +256,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } - async fn update_custom_partition_in_stream( + async fn update_custom_partitions_in_stream( &self, stream_name: &str, - custom_partition: Option<&String>, + custom_partitions: &[String], ) -> Result<(), ObjectStorageError> { + if custom_partitions.is_empty() { + return Ok(()); + } let mut format = self.get_object_store_format(stream_name).await?; - format.custom_partition = custom_partition.cloned(); + format.custom_partitions = custom_partitions.to_vec(); let format_json = to_bytes(&format); self.put_object(&stream_json_path(stream_name), format_json) .await?; @@ -805,7 +808,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { info!("Starting object_store_sync for stream- {stream_name}"); let stream = PARSEABLE.get_or_create_stream(&stream_name); - let custom_partition = stream.get_custom_partition(); + let custom_partitions = stream.get_custom_partitions(); for path in stream.parquet_files() { let filename = path .file_name() @@ -827,13 +830,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .add(compressed_size as i64); let mut file_suffix = str::replacen(filename, ".", "/", 3); - let custom_partition_clone = custom_partition.clone(); - if custom_partition_clone.is_some() { - let custom_partition_fields = custom_partition_clone.unwrap(); - let custom_partition_list = - custom_partition_fields.split(',').collect::>(); - file_suffix = - str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + if !custom_partitions.is_empty() { + file_suffix = str::replacen(filename, ".", "/", 3 + custom_partitions.len()); } let stream_relative_path = format!("{stream_name}/{file_suffix}"); diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 06f1e7201..b68214e4c 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -58,14 +58,14 @@ pub fn flatten( separator: &str, time_partition: Option<&String>, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], validation_required: bool, ) -> Result<(), JsonFlattenError> { match nested_value { Value::Object(nested_dict) => { if validation_required { validate_time_partition(nested_dict, time_partition, time_partition_limit)?; - validate_custom_partition(nested_dict, custom_partition)?; + validate_custom_partition(nested_dict, custom_partitions)?; } let mut map = Map::new(); flatten_object(&mut map, None, nested_dict, separator)?; @@ -79,7 +79,7 @@ pub fn flatten( separator, time_partition, time_partition_limit, - custom_partition, + custom_partitions, validation_required, )?; } @@ -94,14 +94,9 @@ pub fn flatten( // not null, empty, an object , an array, or contain a `.` when serialized pub fn validate_custom_partition( value: &Map, - custom_partition: Option<&String>, + custom_partitions: &[String], ) -> Result<(), JsonFlattenError> { - let Some(custom_partition) = custom_partition else { - return Ok(()); - }; - let custom_partition_list = custom_partition.split(',').collect::>(); - - for field in custom_partition_list { + for field in custom_partitions { let trimmed_field = field.trim(); let Some(field_value) = value.get(trimmed_field) else { return Err(JsonFlattenError::FieldNotPartOfLog( @@ -355,7 +350,7 @@ mod tests { fn flatten_single_key_string() { let mut obj = json!({"key": "value"}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -363,7 +358,7 @@ mod tests { fn flatten_single_key_int() { let mut obj = json!({"key": 1}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -371,7 +366,7 @@ mod tests { fn flatten_multiple_key_value() { let mut obj = json!({"key1": 1, "key2": "value2"}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -379,7 +374,7 @@ mod tests { fn flatten_nested_single_key_value() { let mut obj = json!({"key": "value", "nested_key": {"key":"value"}}); let expected = json!({"key": "value", "nested_key.key": "value"}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -388,7 +383,7 @@ mod tests { let mut obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); let expected = json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -396,7 +391,7 @@ mod tests { fn nested_key_value_with_array() { let mut obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); let expected = json!({"key": "value", "nested_key.key1": [1,2,3]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -404,7 +399,7 @@ mod tests { fn nested_obj_array() { let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -412,7 +407,7 @@ mod tests { fn nested_obj_array_nulls() { let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -420,7 +415,7 @@ mod tests { fn nested_obj_array_nulls_reversed() { let mut obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -428,7 +423,7 @@ mod tests { fn nested_obj_array_nested_obj() { let mut obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); let expected = json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -436,14 +431,14 @@ mod tests { fn nested_obj_array_nested_obj_array() { let mut obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); let expected = json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } #[test] fn flatten_mixed_object() { let mut obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(&mut obj, ".", None, None, None, false).is_err()); + assert!(flatten(&mut obj, ".", None, None, &[], false).is_err()); } #[test] @@ -536,22 +531,22 @@ mod tests { let mut value = json!({ "a": 1, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); let mut value = json!({ "a": true, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); let mut value = json!({ "a": "yes", }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); let mut value = json!({ "a": -1, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); } #[test] @@ -560,7 +555,7 @@ mod tests { "a": null, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldEmptyOrNull(_) ); @@ -568,7 +563,7 @@ mod tests { "a": "", }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldEmptyOrNull(_) ); @@ -576,7 +571,7 @@ mod tests { "a": {"b": 1}, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldIsObject(_) ); @@ -584,7 +579,7 @@ mod tests { "a": ["b", "c"], }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldIsArray(_) ); @@ -592,7 +587,7 @@ mod tests { "a": "b.c", }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldContainsPeriod(_) ); @@ -600,7 +595,7 @@ mod tests { "a": 1.0, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldContainsPeriod(_) ); } diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index efa9cb2e2..2ec25254a 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -25,6 +25,9 @@ use serde_json; use serde_json::Value; use crate::event::format::LogSource; +use crate::handlers::http::modal::utils::logstream_utils::{ + parse_custom_partition, CustomPartition, +}; use crate::metadata::SchemaVersion; pub mod flatten; @@ -36,7 +39,7 @@ pub fn flatten_json_body( body: Value, time_partition: Option<&String>, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], schema_version: SchemaVersion, validation_required: bool, log_source: &LogSource, @@ -58,7 +61,7 @@ pub fn flatten_json_body( "_", time_partition, time_partition_limit, - custom_partition, + custom_partitions, validation_required, )?; Ok(nested_value) @@ -68,7 +71,7 @@ pub fn convert_array_to_object( body: Value, time_partition: Option<&String>, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], schema_version: SchemaVersion, log_source: &LogSource, ) -> Result, anyhow::Error> { @@ -76,7 +79,7 @@ pub fn convert_array_to_object( body, time_partition, time_partition_limit, - custom_partition, + custom_partitions, schema_version, true, log_source, @@ -142,6 +145,54 @@ where } } +struct PartitionsFromStr; + +impl Visitor<'_> for PartitionsFromStr { + type Value = Vec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a comma-separated list of custom partitions (e.g., \"a,b,c\")") + } + + fn visit_borrowed_str(self, v: &'_ str) -> Result + where + E: serde::de::Error, + { + self.visit_str(v) + } + + fn visit_str(self, s: &str) -> Result + where + E: serde::de::Error, + { + parse_custom_partition(s) + .map_err(|e| E::custom(format!("Expected list: \"a, b, c\", got: {s}; error: {e}",))) + } +} + +/// Used to convert "a,b,c" to ["a", "b", "c"], to support backward compatibility. +pub fn deserialize_custom_partitions<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + deserializer.deserialize_str(PartitionsFromStr) +} + +/// Used to convert ["a", "b", "c"] to "a,b,c" for backward compatibility. +pub fn serialize_custom_partitions(value: &[String], serializer: S) -> Result +where + S: serde::Serializer, +{ + if value.is_empty() { + // Skip serializing this field + serializer.serialize_none() + } else { + serializer.serialize_str(&value.join(",")) + } +} + #[cfg(test)] mod tests { use crate::event::format::LogSource; @@ -159,7 +210,7 @@ mod tests { value, None, None, - None, + &[], crate::metadata::SchemaVersion::V1, false, &LogSource::default() @@ -178,7 +229,7 @@ mod tests { value, None, None, - None, + &[], crate::metadata::SchemaVersion::V1, false, &LogSource::default()