From e079b0c9b795abaefe18b6a703b9edbaa53214ea Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 15 Jun 2026 00:20:35 +0200 Subject: [PATCH] fix(instrumentation): avoid blocking current runtime --- crates/instrumentation/src/lib.rs | 177 +++++++++++++++++++++++++----- 1 file changed, 150 insertions(+), 27 deletions(-) diff --git a/crates/instrumentation/src/lib.rs b/crates/instrumentation/src/lib.rs index 70f47d4d..6170dd1c 100644 --- a/crates/instrumentation/src/lib.rs +++ b/crates/instrumentation/src/lib.rs @@ -7,9 +7,12 @@ use quent_events::Event; use quent_exporter::{ExporterOptions, create_exporter}; use quent_exporter_types::Exporter; use serde::Serialize; -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + thread, }; use tokio::{ runtime::{Handle, Runtime}, @@ -20,6 +23,8 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, warn}; use uuid::Uuid; +type ExporterHandle = Arc>; + /// Wrapper around an optional channel sender. When the inner sender is `None` /// (i.e. the noop exporter is selected), `send` is a no-op that avoids any /// channel or event-forwarding overhead. @@ -84,7 +89,7 @@ where { handle: Option, events_sender: EventSender, - exporter: Option>>, + exporter: Option>, cancellation_token: CancellationToken, forwarder_handle: Option>, @@ -125,25 +130,23 @@ where debug!("using existing async runtime"); (None, handle) } else { - debug!("spawning new async runtime"); - if let Ok(runtime) = Runtime::new() { - let handle = runtime.handle().clone(); - (Some(runtime), handle) - } else { - return Err("unable to spawn async runtime")?; - } + debug!("spawning async runtime"); + let runtime = + Runtime::new().map_err(|e| format!("unable to spawn async runtime: {e}"))?; + let handle = runtime.handle().clone(); + (Some(runtime), handle) }; let (events_sender, mut events_receiver) = unbounded_channel(); debug!("constructing exporter"); - let exporter: Arc> = handle.block_on(create_exporter(kind, id))?; + let exporter = create_exporter_on_handle(&handle, kind, id)?; let cancellation_token = CancellationToken::new(); let cloned_token = cancellation_token.clone(); let forwarder_handle = handle.spawn({ - let exporter: Arc> = Arc::clone(&exporter); + let exporter: ExporterHandle = Arc::clone(&exporter); async move { loop { tokio::select! { @@ -201,35 +204,115 @@ where fn drop(&mut self) { self.cancellation_token.cancel(); - if let Some(handle) = &self.handle { - // Wait for the forwarder to finish processing remaining events - if let Some(forwarder_handle) = self.forwarder_handle.take() - && let Err(e) = handle.block_on(forwarder_handle) - { - warn!("forwarder task failed: {e}"); - } + if let Some(handle) = self.handle.take() { + shutdown_runtime( + handle, + self.forwarder_handle.take(), + self.exporter.take(), + self._runtime.take(), + ); + } + } +} - // Flush the exporter to ensure all events are sent - if let Some(exporter) = &self.exporter - && let Err(e) = handle.block_on(exporter.force_flush()) - { - warn!("failed to flush exporter: {e}"); - } +fn create_exporter_on_handle( + handle: &Handle, + kind: ExporterOptions, + id: Uuid, +) -> Result, Box> +where + T: Serialize + Send + 'static, +{ + // `Handle::block_on` panics from a Tokio worker thread, so keep the sync + // constructor API by doing the blocking exporter setup on a helper thread. + if Handle::try_current().is_ok() { + let handle = handle.clone(); + let join_result = thread::spawn(move || { + handle + .block_on(create_exporter(kind, id)) + .map_err(|e| e.to_string()) + }) + .join(); + + return match join_result { + Ok(Ok(exporter)) => Ok(exporter), + Ok(Err(e)) => Err(e.into()), + Err(_) => Err("exporter construction thread panicked".into()), + }; + } + + Ok(handle.block_on(create_exporter(kind, id))?) +} + +fn shutdown_runtime( + handle: Handle, + forwarder_handle: Option>, + exporter: Option>, + runtime: Option, +) where + T: Serialize + Send + 'static, +{ + // `Drop` is synchronous too; use the same helper-thread escape hatch when + // teardown happens from inside a Tokio runtime. + if Handle::try_current().is_ok() { + if thread::spawn(move || { + shutdown_runtime_on_current_thread(handle, forwarder_handle, exporter, runtime) + }) + .join() + .is_err() + { + warn!("runtime shutdown thread panicked"); + } + return; + } + + shutdown_runtime_on_current_thread(handle, forwarder_handle, exporter, runtime); +} + +fn shutdown_runtime_on_current_thread( + handle: Handle, + forwarder_handle: Option>, + exporter: Option>, + runtime: Option, +) where + T: Serialize + Send + 'static, +{ + // Wait for the forwarder to finish processing remaining events. + if let Some(forwarder_handle) = forwarder_handle + && let Err(e) = handle.block_on(forwarder_handle) + { + warn!("forwarder task failed: {e}"); + } + + // Flush the exporter to ensure all events are sent. + if let Some(exporter) = exporter { + if let Err(e) = handle.block_on(exporter.force_flush()) { + warn!("failed to flush exporter: {e}"); } + drop(exporter); } + + drop(runtime); } #[cfg(test)] mod tests { use super::*; + use quent_exporter::NdjsonExporterOptions; + use tempfile::TempDir; #[derive(Debug, serde::Serialize)] struct TestEvent; + fn ndjson_exporter(output: &TempDir) -> ExporterOptions { + ExporterOptions::Ndjson(NdjsonExporterOptions { + output_dir: output.path().to_path_buf(), + }) + } + #[test] fn noop_exporter() { let ctx = Context::::try_new(Uuid::now_v7(), None).unwrap(); - assert!(ctx.handle.is_none()); assert!(ctx.exporter.is_none()); assert!(ctx.forwarder_handle.is_none()); assert!(ctx._runtime.is_none()); @@ -241,4 +324,44 @@ mod tests { sender.send(Event::new_now(Uuid::now_v7(), TestEvent)); drop(ctx); } + + #[test] + fn exporter_context_can_be_created_inside_existing_runtime() { + let output = TempDir::new().unwrap(); + let runtime = Runtime::new().unwrap(); + let context_id = Uuid::now_v7(); + let event_id = Uuid::now_v7(); + let output_file = output.path().join(format!("{context_id}.ndjson")); + + runtime.block_on(async { + let ctx = + Context::::try_new(context_id, Some(ndjson_exporter(&output))).unwrap(); + assert!(ctx._runtime.is_none()); + ctx.events_sender().emit(event_id, TestEvent); + drop(ctx); + }); + + let contents = std::fs::read_to_string(output_file).unwrap(); + assert!(contents.contains(&event_id.to_string())); + } + + #[test] + fn exporter_context_can_be_dropped_inside_existing_runtime() { + let output = TempDir::new().unwrap(); + let runtime = Runtime::new().unwrap(); + let context_id = Uuid::now_v7(); + let event_id = Uuid::now_v7(); + let output_file = output.path().join(format!("{context_id}.ndjson")); + let ctx = + Context::::try_new(context_id, Some(ndjson_exporter(&output))).unwrap(); + + assert!(ctx._runtime.is_some()); + ctx.events_sender().emit(event_id, TestEvent); + runtime.block_on(async move { + drop(ctx); + }); + + let contents = std::fs::read_to_string(output_file).unwrap(); + assert!(contents.contains(&event_id.to_string())); + } }