Skip to content

refactor: memory/cpu efficient handling of stream configurations #1191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl ParseableSinkProcessor {
let stream = PARSEABLE.get_stream(stream_name)?;
let schema = stream.get_schema_raw();
let time_partition = stream.get_time_partition();
let custom_partition = stream.get_custom_partition();
let custom_partition = stream.get_custom_partitions();
let static_schema_flag = stream.get_static_schema_flag();
let schema_version = stream.get_schema_version();

Expand Down
13 changes: 4 additions & 9 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,13 @@ impl EventFormat for Event {
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
custom_partitions: &[String],
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
extract_custom_partition_values(&self.json, &custom_partitions)
}
None => HashMap::new(),
};
let custom_partition_values =
extract_custom_partition_values(&self.json, custom_partitions);

let parsed_timestamp = match time_partition {
Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?,
Expand Down Expand Up @@ -188,7 +183,7 @@ impl EventFormat for Event {
/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}`
pub fn extract_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
custom_partition_list: &[String],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
Expand Down
2 changes: 1 addition & 1 deletion src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub trait EventFormat: Sized {
origin_size: u64,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: bool,
custom_partitions: Option<&String>,
custom_partitions: &[String],
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
Expand Down
8 changes: 4 additions & 4 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
size as u64,
&schema,
false,
None,
&[],
None,
SchemaVersion::V0,
StreamType::Internal,
Expand Down Expand Up @@ -546,7 +546,7 @@ mod tests {
json,
None,
None,
None,
&[],
SchemaVersion::V0,
&crate::event::format::LogSource::default()
)
Expand Down Expand Up @@ -749,7 +749,7 @@ mod tests {
json,
None,
None,
None,
&[],
SchemaVersion::V0,
&crate::event::format::LogSource::default(),
)
Expand Down Expand Up @@ -832,7 +832,7 @@ mod tests {
json,
None,
None,
None,
&[],
SchemaVersion::V1,
&crate::event::format::LogSource::default(),
)
Expand Down
79 changes: 44 additions & 35 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,39 @@
*
*/

use self::error::StreamError;
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
use super::query::update_schema_when_distributed;
use crate::event::format::override_data_type;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::SchemaVersion;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::rbac::role::Action;
use crate::rbac::Users;
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::retention::Retention;
use crate::storage::{StreamInfo, StreamType};
use crate::utils::actix::extract_session_key_from_req;
use crate::{stats, validator, LOCK_EXPECT};

use actix_web::http::StatusCode;
use actix_web::web::{Json, Path};
use actix_web::{web, HttpRequest, Responder};
use std::{fs, sync::Arc};

use actix_web::{
web::{Json, Path},
HttpRequest, Responder,
};
use arrow_json::reader::infer_json_schema_from_iterator;
use bytes::Bytes;
use chrono::Utc;
use error::StreamError;
use http::StatusCode;
use itertools::Itertools;
use serde_json::{json, Value};
use std::fs;
use std::sync::Arc;
use tracing::warn;

use crate::{
event::format::override_data_type,
hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION},
metadata::SchemaVersion,
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
parseable::{StreamNotFound, PARSEABLE},
rbac::{role::Action, Users},
stats::{self, event_labels_date, storage_size_labels_date, Stats},
storage::{retention::Retention, StreamInfo, StreamType},
utils::actix::extract_session_key_from_req,
validator, LOCK_EXPECT,
};

use super::{
cluster::utils::{IngestionStats, QueriedStats, StorageStats},
query::update_schema_when_distributed,
};

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
Expand Down Expand Up @@ -98,7 +103,7 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
.map(|name| json!({"name": name}))
.collect_vec();

Ok(web::Json(res))
Ok(Json(res))
}

pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, StreamError> {
Expand All @@ -117,7 +122,7 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
for log_record in log_records {
schema = override_data_type(schema, log_record, SchemaVersion::V1);
}
Ok((web::Json(schema), StatusCode::OK))
Ok((Json(schema), StatusCode::OK))
}

pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand All @@ -132,7 +137,7 @@ pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, Str
match update_schema_when_distributed(&vec![stream_name.clone()]).await {
Ok(_) => {
let schema = stream.get_schema();
Ok((web::Json(schema), StatusCode::OK))
Ok((Json(schema), StatusCode::OK))
}
Err(err) => Err(StreamError::Custom {
msg: err.to_string(),
Expand Down Expand Up @@ -168,7 +173,7 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
.get_stream(&stream_name)?
.get_retention()
.unwrap_or_default();
Ok((web::Json(retention), StatusCode::OK))
Ok((Json(retention), StatusCode::OK))
}

pub async fn put_retention(
Expand Down Expand Up @@ -250,7 +255,7 @@ pub async fn get_stats(
if !date_value.is_empty() {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;
return Ok((web::Json(stats), StatusCode::OK));
return Ok((Json(stats), StatusCode::OK));
}
}

Expand Down Expand Up @@ -281,7 +286,7 @@ pub async fn get_stats(

let stats = serde_json::to_value(stats)?;

Ok((web::Json(stats), StatusCode::OK))
Ok((Json(stats), StatusCode::OK))
}

pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -333,12 +338,12 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
time_partition_limit: stream_meta
.time_partition_limit
.map(|limit| limit.to_string()),
custom_partition: stream_meta.custom_partition.clone(),
custom_partitions: stream_meta.custom_partitions.clone(),
static_schema_flag: stream_meta.static_schema_flag,
log_source: stream_meta.log_source.clone(),
};

Ok((web::Json(stream_info), StatusCode::OK))
Ok((Json(stream_info), StatusCode::OK))
}

pub async fn put_stream_hot_tier(
Expand Down Expand Up @@ -405,7 +410,7 @@ pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Respo
};
let meta = hot_tier_manager.get_hot_tier(&stream_name).await?;

Ok((web::Json(meta), StatusCode::OK))
Ok((Json(meta), StatusCode::OK))
}

pub async fn delete_stream_hot_tier(
Expand Down Expand Up @@ -455,6 +460,7 @@ pub mod error {
use http::StatusCode;

use crate::{
handlers::http::modal::utils::logstream_utils::HeaderParseError,
hottier::HotTierError,
parseable::StreamNotFound,
storage::ObjectStorageError,
Expand Down Expand Up @@ -524,6 +530,8 @@ pub mod error {
HotTierValidation(#[from] HotTierValidationError),
#[error("{0}")]
HotTierError(#[from] HotTierError),
#[error("Error when parsing headers: {0}")]
HeaderParsing(#[from] HeaderParseError),
}

impl actix_web::ResponseError for StreamError {
Expand Down Expand Up @@ -559,6 +567,7 @@ pub mod error {
StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN,
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
StreamError::HeaderParsing(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down Expand Up @@ -596,7 +605,7 @@ mod tests {
#[actix_web::test]
async fn header_without_log_source() {
let req = TestRequest::default().to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Json);
}

Expand All @@ -605,19 +614,19 @@ mod tests {
let mut req = TestRequest::default()
.insert_header(("X-P-Log-Source", "pmeta"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Pmeta);

req = TestRequest::default()
.insert_header(("X-P-Log-Source", "otel-logs"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::OtelLogs);

req = TestRequest::default()
.insert_header(("X-P-Log-Source", "kinesis"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Kinesis);
}

Expand All @@ -626,7 +635,7 @@ mod tests {
let req = TestRequest::default()
.insert_header(("X-P-Log-Source", "teststream"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Json);
}
}
14 changes: 6 additions & 8 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,18 @@ async fn push_logs(
) -> Result<(), PostError> {
let stream = PARSEABLE.get_stream(stream_name)?;
let time_partition = stream.get_time_partition();
let time_partition_limit = PARSEABLE
.get_stream(stream_name)?
.get_time_partition_limit();
let time_partition_limit = stream.get_time_partition_limit();
let static_schema_flag = stream.get_static_schema_flag();
let custom_partition = stream.get_custom_partition();
let custom_partitions = stream.get_custom_partitions();
let schema_version = stream.get_schema_version();
let p_timestamp = Utc::now();

let data = if time_partition.is_some() || custom_partition.is_some() {
let data = if time_partition.is_some() || !custom_partitions.is_empty() {
convert_array_to_object(
json,
time_partition.as_ref(),
time_partition_limit,
custom_partition.as_ref(),
&custom_partitions,
schema_version,
log_source,
)?
Expand All @@ -104,7 +102,7 @@ async fn push_logs(
json,
None,
None,
None,
&[],
schema_version,
log_source,
)?)?]
Expand All @@ -119,7 +117,7 @@ async fn push_logs(
origin_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
&custom_partitions,
time_partition.as_ref(),
schema_version,
StreamType::UserDefined,
Expand Down
Loading
Loading