diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 8e252b14f..c23eb6659 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,7 +24,6 @@ use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use serde_json::Value; use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; @@ -39,7 +38,7 @@ use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; -use crate::utils::json::flatten::JsonFlattenError; +use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue}; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header}; @@ -51,7 +50,7 @@ use super::users::filters::FiltersError; // creates if stream does not exist pub async fn ingest( req: HttpRequest, - Json(mut json): Json, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -83,6 +82,8 @@ pub async fn ingest( let mut p_custom_fields = get_custom_fields_from_header(&req); + let mut json = json.into_inner(); + let fields = match &log_source { LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log( &mut json, @@ -127,13 +128,13 @@ pub async fn ingest( pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { let size: usize = body.len(); - let json: Value = serde_json::from_slice(&body)?; + let json: StrictValue = serde_json::from_slice(&body)?; let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); let mut p_custom_fields = HashMap::new(); p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); // For internal streams, use old schema - format::json::Event::new(json) + format::json::Event::new(json.into_inner()) .into_event( stream_name, size as u64, @@ -155,7 +156,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // creates if stream does not exist pub async fn handle_otel_logs_ingestion( req: HttpRequest, - Json(json): Json, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -205,7 +206,13 @@ pub async fn handle_otel_logs_ingestion( let p_custom_fields = get_custom_fields_from_header(&req); - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; + flatten_and_push_logs( + json.into_inner(), + &stream_name, + &log_source, + &p_custom_fields, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -215,7 +222,7 @@ pub async fn handle_otel_logs_ingestion( // creates if stream does not exist pub async fn handle_otel_metrics_ingestion( req: HttpRequest, - Json(json): Json, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -263,7 +270,13 @@ pub async fn handle_otel_metrics_ingestion( let p_custom_fields = get_custom_fields_from_header(&req); - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; + flatten_and_push_logs( + json.into_inner(), + &stream_name, + &log_source, + &p_custom_fields, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -273,7 +286,7 @@ pub async fn handle_otel_metrics_ingestion( // creates if stream does not exist pub async fn handle_otel_traces_ingestion( req: HttpRequest, - Json(json): Json, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -322,7 +335,13 @@ pub async fn handle_otel_traces_ingestion( let p_custom_fields = get_custom_fields_from_header(&req); - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; + flatten_and_push_logs( + json.into_inner(), + &stream_name, + &log_source, + &p_custom_fields, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -333,7 +352,7 @@ pub async fn handle_otel_traces_ingestion( pub async fn post_event( req: HttpRequest, stream_name: Path, - Json(mut json): Json, + Json(json): Json, ) -> Result { let stream_name = stream_name.into_inner(); @@ -369,6 +388,7 @@ pub async fn post_event( .get(EXTRACT_LOG_KEY) .and_then(|h| h.to_str().ok()); let mut p_custom_fields = get_custom_fields_from_header(&req); + let mut json = json.into_inner(); match &log_source { LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { return Err(PostError::OtelNotSupported) diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index cb1e2fb81..9de308c25 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -28,6 +28,7 @@ use crate::event::format::LogSource; use crate::metadata::SchemaVersion; pub mod flatten; +pub mod strict; /// calls the function `flatten_json` which results Vec or Error /// in case when Vec is returned, converts the Vec to Value of Array diff --git a/src/utils/json/strict.rs b/src/utils/json/strict.rs new file mode 100644 index 000000000..27142de8d --- /dev/null +++ b/src/utils/json/strict.rs @@ -0,0 +1,370 @@ +use serde::{ + de::{Error as _, Visitor}, + Deserialize, Serialize, +}; + +enum InterimValue { + Null, + Bool(bool), + I64(i64), + I128(i128), + U64(u64), + U128(u128), + F64(f64), + String(String), + Array(Vec), + Object(Vec<(String, InterimValue)>), +} + +impl TryFrom for serde_json::Value { + type Error = &'static str; + + fn try_from(value: InterimValue) -> Result { + Ok(match value { + InterimValue::Null => Self::Null, + InterimValue::Bool(b) => Self::Bool(b), + InterimValue::I64(i) => Self::Number(i.into()), + InterimValue::I128(i) => { + Self::Number(if let Some(num) = serde_json::Number::from_i128(i) { + num + } else { + // Number is a i64 + (i as i64).into() + }) + } + InterimValue::U64(u) => Self::Number(u.into()), + InterimValue::U128(u) => { + Self::Number(if let Some(num) = serde_json::Number::from_u128(u) { + num + } else { + // Number is a u64 + (u as u64).into() + }) + } + InterimValue::F64(f) => { + Self::Number(serde_json::Number::from_f64(f).ok_or("not a valid JSON number")?) + } + InterimValue::String(s) => Self::String(s), + InterimValue::Array(a) => Self::Array( + a.into_iter() + .map(TryInto::try_into) + .collect::>()?, + ), + InterimValue::Object(o) => { + let mut map = serde_json::Map::with_capacity(o.len()); + for (k, v) in o { + if map.contains_key(&k) { + return Err("duplicate key in JSON"); + } + map.insert(k, v.try_into()?); + } + Self::Object(map) + } + }) + } +} + +impl<'de> Deserialize<'de> for InterimValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct StrictValueVisitor; + + impl<'de> Visitor<'de> for StrictValueVisitor { + type Value = InterimValue; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("any valid JSON value") + } + + fn visit_bool(self, v: bool) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::Bool(v)) + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::I64(v)) + } + + fn visit_i128(self, v: i128) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::I128(v)) + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::U64(v)) + } + + fn visit_u128(self, v: u128) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::U128(v)) + } + + fn visit_f64(self, v: f64) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::F64(v)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::String(v.to_owned())) + } + + fn visit_string(self, v: String) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::String(v)) + } + + fn visit_none(self) -> Result { + Ok(InterimValue::Null) + } + + fn visit_some(self, deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Deserialize::deserialize(deserializer) + } + + fn visit_unit(self) -> Result + where + E: serde::de::Error, + { + Ok(InterimValue::Null) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut arr = Vec::new(); + + while let Some(v) = seq.next_element::()? { + arr.push(v); + } + + Ok(InterimValue::Array(arr)) + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut obj = Vec::new(); + + while let Some((key, value)) = map.next_entry::()? { + obj.push((key, value)); + } + + Ok(InterimValue::Object(obj)) + } + } + + deserializer.deserialize_any(StrictValueVisitor) + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct StrictValue { + inner: serde_json::Value, +} + +impl StrictValue { + pub fn into_inner(self) -> serde_json::Value { + self.inner + } +} + +impl Serialize for StrictValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.inner.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for StrictValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let interim: InterimValue = Deserialize::deserialize(deserializer)?; + Ok(Self { + inner: interim.try_into().map_err(D::Error::custom)?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_deserialize_string() { + let raw_json = r#""hello""#; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + assert_eq!(val.into_inner(), json!("hello")); + } + + #[test] + fn test_deserialize_number() { + let raw_json = "123"; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + assert_eq!(val.into_inner(), json!(123)); + } + + #[test] + fn test_deserialize_object() { + let raw_json = r#"{"key": "value"}"#; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + assert_eq!(val.into_inner(), json!({"key": "value"})); + } + + #[test] + fn test_deserialize_array() { + let raw_json = r#"[1, 2, 3]"#; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + assert_eq!(val.into_inner(), json!([1, 2, 3])); + } + + #[test] + fn test_deserialize_boolean() { + let raw_json = "true"; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + assert_eq!(val.into_inner(), json!(true)); + } + + #[test] + fn test_deserialize_null() { + let raw_json = "null"; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + assert_eq!(val.into_inner(), json!(null)); + } + + #[test] + fn test_deserialize_nested_object() { + let raw_json = r#"{ + "user": { + "id": 1, + "info": { + "name": "Alice", + "emails": ["alice@example.com", "alice@work.com"] + } + } + }"#; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + let expected = json!({ + "user": { + "id": 1, + "info": { + "name": "Alice", + "emails": ["alice@example.com", "alice@work.com"] + } + } + }); + assert_eq!(val.into_inner(), expected); + } + + #[test] + fn test_deserialize_nested_array() { + let raw_json = r#"[[1, 2], [3, 4, [5, 6]], []]"#; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + let expected = json!([[1, 2], [3, 4, [5, 6]], []]); + assert_eq!(val.into_inner(), expected); + } + + #[test] + fn test_deserialize_mixed_structure() { + let raw_json = r#"{ + "status": "ok", + "data": [ + {"id": 1, "value": null}, + {"id": 2, "value": [true, false]}, + {"id": 3, "value": {"nested": "yes"}} + ] + }"#; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + let expected = json!({ + "status": "ok", + "data": [ + {"id": 1, "value": null}, + {"id": 2, "value": [true, false]}, + {"id": 3, "value": {"nested": "yes"}} + ] + }); + assert_eq!(val.into_inner(), expected); + } + + #[test] + fn test_deserialize_deep_nesting() { + let raw_json = r#" + { + "a": { + "b": { + "c": { + "d": [1, {"e": "f"}] + } + } + } + }"#; + let val: StrictValue = serde_json::from_str(raw_json).unwrap(); + let expected = json!({ + "a": { + "b": { + "c": { + "d": [1, {"e": "f"}] + } + } + } + }); + assert_eq!(val.into_inner(), expected); + } + + #[test] + fn test_duplicate_keys_should_error() { + let raw_json = r#" + { + "key": "value1", + "key": "value2" + } + "#; + + // Strict version should error + let custom_result: Result = serde_json::from_str(raw_json); + assert!( + custom_result.is_err(), + "Expected strict Value to error on duplicate keys" + ); + + // serde_json::Value should succeed, keeping the last key + let serde_result: Result = serde_json::from_str(raw_json); + assert!( + serde_result.is_ok(), + "Expected serde_json::Value to allow duplicate keys" + ); + + let deserialized = serde_result.unwrap(); + assert_eq!(deserialized, json!({"key": "value2"})); + } +}