diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 49d4a084e7..8060c99841 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -121,6 +121,10 @@ impl LogProcessor for EnrichWithBaggageLogProcessor { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { + Ok(()) + } } /// A custom span processor that enriches spans with baggage attributes. Baggage diff --git a/opentelemetry-appender-tracing/benches/log-attributes.rs b/opentelemetry-appender-tracing/benches/log-attributes.rs index 08a5ba1885..ca00dd6f61 100644 --- a/opentelemetry-appender-tracing/benches/log-attributes.rs +++ b/opentelemetry-appender-tracing/benches/log-attributes.rs @@ -43,6 +43,10 @@ impl LogProcessor for NoopProcessor { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } /// Creates a single benchmark for a specific number of attributes diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 90b15235e7..7054ed2e92 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -62,6 +62,10 @@ impl LogProcessor for NoopProcessor { ) -> bool { self.enabled } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } struct NoOpLogLayer { diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index f2402237c1..a0072ce67b 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -931,6 +931,10 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 65d0598216..cb50f64bf9 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -241,6 +241,10 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } fn create_test_log_data( diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 32fa12db65..51d187d367 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -13,6 +13,8 @@ The logs functionality now operates independently, while automatic correlation between logs and traces continues to work when the "trace" feature is explicitly enabled. +- **Fix**: Fix shutdown of `SimpleLogProcessor` and async `BatchLogProcessor`. +- Default implementation of `LogProcessor::shutdown_with_timeout()` will now warn to encourage users to implement proper shutdown. ## 0.30.0 @@ -35,7 +37,7 @@ also modified to suppress telemetry before invoking exporters. - **Feature**: Implemented and enabled cardinality capping for Metrics by default. [#2901](https://github.com/open-telemetry/opentelemetry-rust/pull/2901) - - The default cardinality limit is 2000 and can be customized using Views. + - The default cardinality limit is 2000 and can be customized using Views. - This feature was previously removed in version 0.28 due to the lack of configurability but has now been reintroduced with the ability to configure the limit. @@ -176,7 +178,7 @@ Released 2025-Mar-21 ``` After: - + ```rust async fn export(&self, batch: Vec) -> OTelSdkResult ``` diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index ed0603ee44..e70f14202a 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -38,7 +38,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index d14f3de320..6b5beccc23 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -73,7 +73,7 @@ impl LogProcessor for ExportingProcessorWithFuture { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } @@ -104,7 +104,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index 313a94028d..ce0404447e 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -54,7 +54,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } @@ -71,7 +71,7 @@ impl LogProcessor for CloningProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } @@ -117,7 +117,7 @@ impl LogProcessor for SendToChannelProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 3f7fa62975..232a3992ef 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -31,7 +31,7 @@ use crate::{logs::SdkLogRecord, Resource}; #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; -use opentelemetry::InstrumentationScope; +use opentelemetry::{otel_warn, InstrumentationScope}; use std::fmt::Debug; use std::time::Duration; @@ -57,10 +57,21 @@ pub trait LogProcessor: Send + Sync + Debug { /// Shuts down the processor. /// After shutdown returns the log processor should stop processing any logs. /// It's up to the implementation on when to drop the LogProcessor. + /// + /// All implementors should implement this method. fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { + // It would have been better to make this method required, but that ship + // sailed when the logs API was declared stable. + otel_warn!( + name: "LogProcessor.DefaultShutdownWithTimeout", + message = format!("LogProcessor::shutdown_with_timeout should be implemented by all LogProcessor types") + ); Ok(()) } /// Shuts down the processor with default timeout. + /// + /// Implementors typically do not need to change this method, and can just + /// implement `shutdown_with_timeout`. fn shutdown(&self) -> OTelSdkResult { self.shutdown_with_timeout(Duration::from_secs(5)) } @@ -140,6 +151,10 @@ pub(crate) mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } #[derive(Debug)] @@ -166,6 +181,10 @@ pub(crate) mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } #[test] diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index 953d936b7e..bd725bec9a 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -87,7 +87,7 @@ impl LogProcessor for BatchLogProcessor { .and_then(std::convert::identity) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -628,6 +628,10 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } #[derive(Debug)] @@ -654,6 +658,10 @@ mod tests { fn force_flush(&self) -> OTelSdkResult { Ok(()) } + + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { + Ok(()) + } } #[test] fn test_log_data_modification_by_multiple_processors() { diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index e7de152807..0097c3b6ca 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -402,6 +402,10 @@ mod tests { *res = resource.clone(); self.exporter.set_resource(resource); } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { + Ok(()) + } } impl TestProcessorForResource { fn new(exporter: TestExporterForResource) -> Self { diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 0d44679109..c54f001f97 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -171,7 +171,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> crate::error::OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } @@ -277,7 +277,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult { Ok(()) } } diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index ec72ba46bc..5c8642221a 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -28,6 +28,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationS use std::fmt::Debug; use std::sync::atomic::AtomicBool; use std::sync::Mutex; +use std::time::Duration; /// A [`LogProcessor`] designed for testing and debugging purpose, that immediately /// exports log records as they are emitted. Log records are exported synchronously @@ -116,11 +117,11 @@ impl LogProcessor for SimpleLogProcessor { Ok(()) } - fn shutdown(&self) -> OTelSdkResult { + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(exporter) = self.exporter.lock() { - exporter.shutdown() + exporter.shutdown_with_timeout(timeout) } else { Err(OTelSdkError::InternalFailure( "SimpleLogProcessor mutex poison at shutdown".into(),