diff --git a/changelog.d/24307_opentelemetry_source_headers_collection.fix.md b/changelog.d/24307_opentelemetry_source_headers_collection.fix.md new file mode 100644 index 0000000000000..91642d26d062b --- /dev/null +++ b/changelog.d/24307_opentelemetry_source_headers_collection.fix.md @@ -0,0 +1,3 @@ +Fixed the opentelemetry source to collect HTTP headers for all event types and with or without `use_otlp_decoding` configuration option. + +authors: ozanichkovsky \ No newline at end of file diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 81f09f8470579..16b0e0e0fd9d5 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -229,13 +229,13 @@ fn build_warp_log_filter( if let Some(d) = deserializer.as_ref() { parse_with_deserializer(d, decoded_body, log_namespace) } else { - decode_log_body(decoded_body, log_namespace, &events_received).map( - |mut events| { - enrich_events(&mut events, &headers_cfg, &headers, log_namespace); - events - }, - ) - } + decode_log_body(decoded_body, log_namespace, &events_received) + }.map( + |mut events| { + enrich_events(&mut events, &headers_cfg, &headers, log_namespace); + events + }, + ) }) }; diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 76324210be13a..75c4f7e8b9edb 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1,7 +1,4 @@ -use std::{ - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::{net, sync::Arc, time::{SystemTime, UNIX_EPOCH}}; use chrono::{DateTime, TimeZone, Utc}; use futures::Stream; @@ -20,7 +17,7 @@ use vector_lib::{ }, }, common::v1::{ - AnyValue, InstrumentationScope, KeyValue, any_value, any_value::Value::StringValue, + AnyValue, InstrumentationScope, KeyValue, any_value::Value::StringValue, }, logs::v1::{LogRecord, ResourceLogs, ScopeLogs}, metrics::v1::{ @@ -33,7 +30,6 @@ use vector_lib::{ }, }; use vrl::value; -use warp::http::HeaderMap; use crate::{ SourceSender, @@ -1067,36 +1063,35 @@ async fn receive_summary_metric() { .await; } +fn get_source_config_with_headers(grpc_addr: net::SocketAddr, http_addr: net::SocketAddr, use_otlp_decoding: bool) -> OpentelemetryConfig { + OpentelemetryConfig { + grpc: GrpcConfig { + address: grpc_addr, + tls: Default::default(), + }, + http: HttpConfig { + address: http_addr, + tls: Default::default(), + keepalive: Default::default(), + headers: vec![ + "User-Agent".to_string(), + "X-*".to_string(), + "AbsentHeader".to_string(), + ], + }, + acknowledgements: Default::default(), + log_namespace: Default::default(), + use_otlp_decoding: use_otlp_decoding, + } +} + #[tokio::test] -async fn http_headers() { +async fn http_headers_logs_use_otlp_decoding_false() { assert_source_compliance(&SOURCE_TAGS, async { let (_guard_0, grpc_addr) = next_addr(); let (_guard_1, http_addr) = next_addr(); - let mut headers = HeaderMap::new(); - headers.insert("User-Agent", "test_client".parse().unwrap()); - headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap()); - headers.insert("X-Test-Header", "true".parse().unwrap()); - - let source = OpentelemetryConfig { - grpc: GrpcConfig { - address: grpc_addr, - tls: Default::default(), - }, - http: HttpConfig { - address: http_addr, - tls: Default::default(), - keepalive: Default::default(), - headers: vec![ - "User-Agent".to_string(), - "X-*".to_string(), - "AbsentHeader".to_string(), - ], - }, - acknowledgements: Default::default(), - log_namespace: Default::default(), - use_otlp_decoding: false, - }; + let source = get_source_config_with_headers(grpc_addr, http_addr, false); let schema_definitions = source .outputs(LogNamespace::Legacy) .remove(0) @@ -1122,7 +1117,7 @@ async fn http_headers() { severity_number: 9, severity_text: "info".into(), body: Some(AnyValue { - value: Some(any_value::Value::StringValue("log body".into())), + value: Some(StringValue("log body".into())), }), attributes: vec![], dropped_attributes_count: 0, @@ -1175,6 +1170,67 @@ async fn http_headers() { .await; } +#[tokio::test] +async fn http_headers_logs_use_otlp_decoding_true() { + assert_source_compliance(&SOURCE_TAGS, async { + let (_guard_0, grpc_addr) = next_addr(); + let (_guard_1, http_addr) = next_addr(); + + let source = get_source_config_with_headers(grpc_addr, http_addr, true); + + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); + let server = source + .build(SourceContext::new_test(sender, None)) + .await + .unwrap(); + tokio::spawn(server); + test_util::wait_for_tcp(http_addr).await; + + let client = reqwest::Client::new(); + let req = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: None, + scope_logs: vec![ScopeLogs { + scope: None, + log_records: vec![LogRecord { + time_unix_nano: 1, + observed_time_unix_nano: 2, + severity_number: 9, + severity_text: "info".into(), + body: Some(AnyValue { + value: Some(StringValue("log body".into())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 4, + // opentelemetry sdk will hex::decode the given trace_id and span_id + trace_id: str_into_hex_bytes("4ac52aadf321c2e531db005df08792f5"), + span_id: str_into_hex_bytes("0b9e4bda2a55530d"), + }], + schema_url: "v1".into(), + }], + schema_url: "v1".into(), + }], + }; + let _res = client + .post(format!("http://{http_addr}/v1/logs")) + .header("Content-Type", "application/x-protobuf") + .header("User-Agent", "Test") + .body(req.encode_to_vec()) + .send() + .await + .expect("Failed to send log to Opentelemetry Collector."); + + let mut output = test_util::collect_ready(logs_output).await; + assert_eq!(output.len(), 1); + let actual_event = output.pop().unwrap(); + let log = actual_event.as_log(); + assert_eq!(log["AbsentHeader"], Value::Null); + assert_eq!(log["User-Agent"], "Test".into()); + }) + .await; +} + pub struct OTelTestEnv { pub grpc_addr: String, pub config: OpentelemetryConfig,