From ebf946cba45d00ec46173d5c974d24bd3d5c94a1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Mar 2025 11:52:26 +0530 Subject: [PATCH 1/2] refactor: move kinesis to src --- src/handlers/http/mod.rs | 1 - src/handlers/http/modal/utils/ingest_utils.rs | 10 +++------- src/{handlers/http => }/kinesis.rs | 0 src/lib.rs | 1 + 4 files changed, 4 insertions(+), 8 deletions(-) rename src/{handlers/http => }/kinesis.rs (100%) diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index aa2e0a02d..e91f00acd 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -34,7 +34,6 @@ pub mod cluster; pub mod correlation; pub mod health_check; pub mod ingest; -mod kinesis; pub mod llm; pub mod logstream; pub mod middleware; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index cb932a5c3..50a413949 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -31,13 +31,8 @@ use crate::{ format::{json, EventFormat, LogSource}, FORMAT_KEY, SOURCE_IP_KEY, USER_AGENT_KEY, }, - handlers::{ - http::{ - ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, - }, - LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, - }, + handlers::{http::ingest::PostError, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}, + kinesis::{flatten_kinesis_logs, Message}, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::PARSEABLE, storage::StreamType, @@ -143,6 +138,7 @@ async fn push_logs( )? .process()?; } + Ok(()) } diff --git a/src/handlers/http/kinesis.rs b/src/kinesis.rs similarity index 100% rename from src/handlers/http/kinesis.rs rename to src/kinesis.rs diff --git a/src/lib.rs b/src/lib.rs index 27809aab0..48c8953a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ pub mod enterprise; pub mod event; pub mod handlers; pub mod hottier; +mod kinesis; mod livetail; mod metadata; pub mod metrics; From 5b5f8024b9d11c48d279fc50379fabafdb1303d7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Mar 2025 12:31:51 +0530 Subject: [PATCH 2/2] test: `flatten_kinesis_logs` --- src/kinesis.rs | 112 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/src/kinesis.rs b/src/kinesis.rs index e2f245f73..09f0b5447 100644 --- a/src/kinesis.rs +++ b/src/kinesis.rs @@ -83,3 +83,115 @@ pub fn flatten_kinesis_logs(message: Message) -> Vec { vec_kinesis_json } + +#[cfg(test)] +mod tests { + use serde_json::{json, Value}; + + use super::{flatten_kinesis_logs, Message}; + + #[test] + fn flatten_kinesis_logs_decodes_base64_data() { + let message: Message = serde_json::from_value(json!( { + "requestId": "9b848d8a-2d89-474b-b073-04b8e5232210".to_string(), + "timestamp": 1705026780451_i64, + "records": [ + { + "data": "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9".to_string(), + }, + { + "data": "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9".to_string(), + }, + ], + })).unwrap(); + + let result = flatten_kinesis_logs(message); + assert_eq!(result.len(), 2); + + let Value::Object(map) = &result[0] else { + panic!("Expected first result to be a JSON object"); + }; + assert_eq!(map.get("CHANGE").unwrap().as_f64().unwrap(), -0.45); + assert_eq!(map.get("PRICE").unwrap().as_f64().unwrap(), 5.36); + assert_eq!(map.get("TICKER_SYMBOL").unwrap().as_str().unwrap(), "DEG"); + assert_eq!(map.get("SECTOR").unwrap().as_str().unwrap(), "ENERGY"); + assert_eq!( + map.get("requestId").unwrap().as_str().unwrap(), + "9b848d8a-2d89-474b-b073-04b8e5232210" + ); + assert_eq!( + map.get("timestamp").unwrap().as_str().unwrap(), + "1705026780451" + ); + + let Value::Object(map) = &result[1] else { + panic!("Expected second result to be a JSON object"); + }; + assert_eq!(map.get("CHANGE").unwrap().as_f64().unwrap(), 3.16); + assert_eq!(map.get("PRICE").unwrap().as_f64().unwrap(), 73.76); + assert_eq!(map.get("TICKER_SYMBOL").unwrap(), "WMT"); + assert_eq!(map.get("SECTOR").unwrap(), "RETAIL"); + assert_eq!( + map.get("requestId").unwrap().as_str().unwrap(), + "9b848d8a-2d89-474b-b073-04b8e5232210" + ); + assert_eq!( + map.get("timestamp").unwrap().as_str().unwrap(), + "1705026780451" + ); + } + + #[test] + fn flatten_kinesis_logs_adds_request_id_and_timestamp() { + let message: Message = serde_json::from_value(json!( { + "requestId": "9b848d8a-2d89-474b-b073-04b8e5232210".to_string(), + "timestamp": 1705026780451_i64, + "records": [ + { + "data": "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9".to_string(), + }, + { + "data": "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9".to_string(), + }, + ], + })).unwrap(); + + let result = flatten_kinesis_logs(message); + assert_eq!(result.len(), 2); + + let event = result[0].as_object().unwrap(); + assert_eq!( + event.get("requestId").unwrap().as_str().unwrap(), + "9b848d8a-2d89-474b-b073-04b8e5232210" + ); + assert_eq!( + event.get("timestamp").unwrap().as_str().unwrap(), + "1705026780451" + ); + + let event = result[1].as_object().unwrap(); + assert_eq!( + event.get("requestId").unwrap().as_str().unwrap(), + "9b848d8a-2d89-474b-b073-04b8e5232210" + ); + assert_eq!( + event.get("timestamp").unwrap().as_str().unwrap(), + "1705026780451" + ); + } + + #[test] + #[should_panic(expected = "InvalidByte(7, 95)")] + fn malformed_json_after_base64_decoding() { + let message: Message = serde_json::from_value(json!({ + "requestId": "9b848d8a-2d89-474b-b073-04b8e5232210".to_string(), + "timestamp": 1705026780451_i64, + "records": [ { + "data": "invalid_base64_data".to_string(), + }], + })) + .unwrap(); + + flatten_kinesis_logs(message); + } +}