Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 150 additions & 27 deletions crates/instrumentation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use quent_events::Event;
use quent_exporter::{ExporterOptions, create_exporter};
use quent_exporter_types::Exporter;
use serde::Serialize;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
};
use tokio::{
runtime::{Handle, Runtime},
Expand All @@ -20,6 +23,8 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use uuid::Uuid;

type ExporterHandle<T> = Arc<dyn Exporter<T>>;

/// Wrapper around an optional channel sender. When the inner sender is `None`
/// (i.e. the noop exporter is selected), `send` is a no-op that avoids any
/// channel or event-forwarding overhead.
Expand Down Expand Up @@ -84,7 +89,7 @@ where
{
handle: Option<Handle>,
events_sender: EventSender<T>,
exporter: Option<Arc<dyn Exporter<T>>>,
exporter: Option<ExporterHandle<T>>,
cancellation_token: CancellationToken,
forwarder_handle: Option<JoinHandle<()>>,

Expand Down Expand Up @@ -125,25 +130,23 @@ where
debug!("using existing async runtime");
(None, handle)
} else {
debug!("spawning new async runtime");
if let Ok(runtime) = Runtime::new() {
let handle = runtime.handle().clone();
(Some(runtime), handle)
} else {
return Err("unable to spawn async runtime")?;
}
debug!("spawning async runtime");
let runtime =
Runtime::new().map_err(|e| format!("unable to spawn async runtime: {e}"))?;
let handle = runtime.handle().clone();
(Some(runtime), handle)
};

let (events_sender, mut events_receiver) = unbounded_channel();

debug!("constructing exporter");
let exporter: Arc<dyn Exporter<T>> = handle.block_on(create_exporter(kind, id))?;
let exporter = create_exporter_on_handle(&handle, kind, id)?;

let cancellation_token = CancellationToken::new();
let cloned_token = cancellation_token.clone();

let forwarder_handle = handle.spawn({
let exporter: Arc<dyn Exporter<T>> = Arc::clone(&exporter);
let exporter: ExporterHandle<T> = Arc::clone(&exporter);
async move {
loop {
tokio::select! {
Expand Down Expand Up @@ -201,35 +204,115 @@ where
fn drop(&mut self) {
self.cancellation_token.cancel();

if let Some(handle) = &self.handle {
// Wait for the forwarder to finish processing remaining events
if let Some(forwarder_handle) = self.forwarder_handle.take()
&& let Err(e) = handle.block_on(forwarder_handle)
{
warn!("forwarder task failed: {e}");
}
if let Some(handle) = self.handle.take() {
shutdown_runtime(
handle,
self.forwarder_handle.take(),
self.exporter.take(),
self._runtime.take(),
);
}
}
}

// Flush the exporter to ensure all events are sent
if let Some(exporter) = &self.exporter
&& let Err(e) = handle.block_on(exporter.force_flush())
{
warn!("failed to flush exporter: {e}");
}
fn create_exporter_on_handle<T>(
handle: &Handle,
kind: ExporterOptions,
id: Uuid,
) -> Result<ExporterHandle<T>, Box<dyn std::error::Error>>
where
T: Serialize + Send + 'static,
{
// `Handle::block_on` panics from a Tokio worker thread, so keep the sync
// constructor API by doing the blocking exporter setup on a helper thread.
if Handle::try_current().is_ok() {
let handle = handle.clone();
let join_result = thread::spawn(move || {
handle
.block_on(create_exporter(kind, id))
.map_err(|e| e.to_string())
})
Comment on lines +230 to +234

@dhruv9vats dhruv9vats Jun 15, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @fallintoplace!
Can we not just spawn a new thread as here on an existing runtime if one exists instead of creating another runtime? Also, can we consider spawn_blocking here?

.join();

return match join_result {
Ok(Ok(exporter)) => Ok(exporter),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err("exporter construction thread panicked".into()),
};
}

Ok(handle.block_on(create_exporter(kind, id))?)
}

fn shutdown_runtime<T>(
handle: Handle,
forwarder_handle: Option<JoinHandle<()>>,
exporter: Option<ExporterHandle<T>>,
runtime: Option<Runtime>,
) where
T: Serialize + Send + 'static,
{
// `Drop` is synchronous too; use the same helper-thread escape hatch when
// teardown happens from inside a Tokio runtime.
if Handle::try_current().is_ok() {
if thread::spawn(move || {
shutdown_runtime_on_current_thread(handle, forwarder_handle, exporter, runtime)
})
.join()
.is_err()
{
warn!("runtime shutdown thread panicked");
}
return;
}

shutdown_runtime_on_current_thread(handle, forwarder_handle, exporter, runtime);
}

fn shutdown_runtime_on_current_thread<T>(
handle: Handle,
forwarder_handle: Option<JoinHandle<()>>,
exporter: Option<ExporterHandle<T>>,
runtime: Option<Runtime>,
) where
T: Serialize + Send + 'static,
{
// Wait for the forwarder to finish processing remaining events.
if let Some(forwarder_handle) = forwarder_handle
&& let Err(e) = handle.block_on(forwarder_handle)
{
warn!("forwarder task failed: {e}");
}

// Flush the exporter to ensure all events are sent.
if let Some(exporter) = exporter {
if let Err(e) = handle.block_on(exporter.force_flush()) {
warn!("failed to flush exporter: {e}");
}
drop(exporter);
}

drop(runtime);
}

#[cfg(test)]
mod tests {
use super::*;
use quent_exporter::NdjsonExporterOptions;
use tempfile::TempDir;

#[derive(Debug, serde::Serialize)]
struct TestEvent;

fn ndjson_exporter(output: &TempDir) -> ExporterOptions {
ExporterOptions::Ndjson(NdjsonExporterOptions {
output_dir: output.path().to_path_buf(),
})
}

#[test]
fn noop_exporter() {
let ctx = Context::<TestEvent>::try_new(Uuid::now_v7(), None).unwrap();
assert!(ctx.handle.is_none());
assert!(ctx.exporter.is_none());
assert!(ctx.forwarder_handle.is_none());
assert!(ctx._runtime.is_none());
Expand All @@ -241,4 +324,44 @@ mod tests {
sender.send(Event::new_now(Uuid::now_v7(), TestEvent));
drop(ctx);
}

#[test]
fn exporter_context_can_be_created_inside_existing_runtime() {
let output = TempDir::new().unwrap();
let runtime = Runtime::new().unwrap();
let context_id = Uuid::now_v7();
let event_id = Uuid::now_v7();
let output_file = output.path().join(format!("{context_id}.ndjson"));

runtime.block_on(async {
let ctx =
Context::<TestEvent>::try_new(context_id, Some(ndjson_exporter(&output))).unwrap();
assert!(ctx._runtime.is_none());
ctx.events_sender().emit(event_id, TestEvent);
drop(ctx);
});

let contents = std::fs::read_to_string(output_file).unwrap();
assert!(contents.contains(&event_id.to_string()));
}

#[test]
fn exporter_context_can_be_dropped_inside_existing_runtime() {
let output = TempDir::new().unwrap();
let runtime = Runtime::new().unwrap();
let context_id = Uuid::now_v7();
let event_id = Uuid::now_v7();
let output_file = output.path().join(format!("{context_id}.ndjson"));
let ctx =
Context::<TestEvent>::try_new(context_id, Some(ndjson_exporter(&output))).unwrap();

assert!(ctx._runtime.is_some());
ctx.events_sender().emit(event_id, TestEvent);
runtime.block_on(async move {
drop(ctx);
});

let contents = std::fs::read_to_string(output_file).unwrap();
assert!(contents.contains(&event_id.to_string()));
}
}