Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed the opentelemetry source to collect HTTP headers for all event types and with or without `use_otlp_decoding` configuration option.

authors: ozanichkovsky
40 changes: 29 additions & 11 deletions src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,20 @@ pub(crate) fn build_warp_filter(
);
let metrics_filters = build_warp_metrics_filter(
acknowledgements,
log_namespace,
out.clone(),
bytes_received.clone(),
events_received.clone(),
headers.clone(),
metrics_deserializer,
);
let trace_filters = build_warp_trace_filter(
acknowledgements,
log_namespace,
out.clone(),
bytes_received,
events_received,
headers.clone(),
traces_deserializer,
);
log_filters
Expand Down Expand Up @@ -229,13 +233,13 @@ fn build_warp_log_filter(
if let Some(d) = deserializer.as_ref() {
parse_with_deserializer(d, decoded_body, log_namespace)
} else {
decode_log_body(decoded_body, log_namespace, &events_received).map(
|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
events
},
)
}
decode_log_body(decoded_body, log_namespace, &events_received)
}.map(
|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
events
},
)
})
};

Expand All @@ -248,12 +252,14 @@ fn build_warp_log_filter(
}
fn build_warp_metrics_filter(
acknowledgements: bool,
log_namespace: LogNamespace,
source_sender: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers_cfg: Vec<HttpConfigParamKind>,
deserializer: Option<OtlpDeserializer>,
) -> BoxedFilter<(Response,)> {
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
decompress_body(encoding_header.as_deref(), body)
.inspect_err(|err| {
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
Expand All @@ -270,7 +276,12 @@ fn build_warp_metrics_filter(
parse_with_deserializer(d, decoded_body, LogNamespace::default())
} else {
decode_metrics_body(decoded_body, &events_received)
}
}.map(
|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
events
},
)
})
};

Expand All @@ -284,12 +295,14 @@ fn build_warp_metrics_filter(

fn build_warp_trace_filter(
acknowledgements: bool,
log_namespace: LogNamespace,
source_sender: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers_cfg: Vec<HttpConfigParamKind>,
deserializer: Option<OtlpDeserializer>,
) -> BoxedFilter<(Response,)> {
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
decompress_body(encoding_header.as_deref(), body)
.inspect_err(|err| {
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
Expand All @@ -306,7 +319,12 @@ fn build_warp_trace_filter(
parse_with_deserializer(d, decoded_body, LogNamespace::default())
} else {
decode_trace_body(decoded_body, &events_received)
}
}.map(
|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
events
},
)
})
};

Expand Down
Loading