Skip to content

fix: instrumentation context runtime blocking#226

Open
fallintoplace wants to merge 1 commit into
rapidsai:mainfrom
fallintoplace:fix/instrumentation-context-runtime
Open

fix: instrumentation context runtime blocking#226
fallintoplace wants to merge 1 commit into
rapidsai:mainfrom
fallintoplace:fix/instrumentation-context-runtime

Conversation

@fallintoplace

@fallintoplace fallintoplace commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Summary

Make Context::try_new and Context teardown safe when called from inside an existing Tokio runtime.

Details

Context::try_new detected Handle::try_current() and then called handle.block_on(create_exporter(...)). Tokio does not allow blocking the current runtime from a runtime worker thread, so async callers could panic during context construction. Drop had the same shape while waiting for the forwarder and flushing the exporter.

This keeps the existing synchronous API and preserves the original runtime behavior: contexts reuse an existing Tokio runtime when one is present, and only create an owned runtime when no runtime exists. When construction or teardown happens from inside Tokio, the blocking Handle::block_on work runs on a helper thread instead of the async worker thread. Noop contexts still short-circuit without creating or using a runtime.

Regression coverage now exercises:

  • creating an exporter-backed context inside an existing Tokio runtime, without allocating an owned runtime
  • dropping an exporter-backed context inside an existing Tokio runtime

Validation

  • cargo test -p quent-instrumentation
  • cargo clippy -p quent-instrumentation --all-targets -- -D warnings
  • cargo test

Note: the broad cargo test run emitted existing C++ bridge build warnings from macOS ar -D, but completed successfully.

@johanpel johanpel assigned dhruv9vats and unassigned dhruv9vats Jun 15, 2026
@johanpel johanpel requested a review from dhruv9vats June 15, 2026 08:18
@dhruv9vats dhruv9vats added improvement Improves an existing functionality breaking Introduces a breaking change labels Jun 15, 2026
Comment on lines +215 to +220
let join_result = thread::spawn(move || {
let exporter = runtime
.block_on(create_exporter(kind, id))
.map_err(|e| e.to_string())?;
Ok::<_, String>((runtime, exporter))
})

@dhruv9vats dhruv9vats Jun 15, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @fallintoplace!
Can we not just spawn a new thread as here on an existing runtime if one exists instead of creating another runtime? Also, can we consider spawn_blocking here?

@fallintoplace fallintoplace changed the title Fix instrumentation context runtime blocking fix: instrumentation context runtime blocking Jun 15, 2026
@fallintoplace fallintoplace force-pushed the fix/instrumentation-context-runtime branch from 211de5d to e079b0c Compare June 15, 2026 16:38
rapids-bot Bot pushed a commit that referenced this pull request Jun 24, 2026
# Description

This PR is of mainly instrumentation infrastructure / support crates based on two issues:

1. Have one exporter per observer instead of one exporter per context (see #229)
2. Make the context id unrelated to the id of the root entity (e.g. the engine id in the query engine domain model). 

2 allows every context to write events without the need to synchronize on the root id or to consider the root entity as something special versus other entities. While this would be ideally propagated back to the modeling API, #191 will remove that in its current form entirely anyway, so it is left as is for now so upstream model definitions experience no breaking changes twice in a short period of time.

I initially tried to deal with 1 and 2 separately, but since this touches a lot of the same code across many layers it seemed best to tackle these in one pass.

The consequences are as follows. In this description, only {App}Context and {Entity}Observer/Handle are user-facing, plain Context and Observer/Handle mentions are not user facing, but are the model-agonistic backing structures for their generated counterparts.

- There is no longer a need for an "umbrella" event type representing all events of an application event model. This is for now still generated by the model macros because the analyzer side still relies on it. Removing that is intentionally not done in this PR to reduce the scope.
- The quent_instrumentation::Context (not user facing) is now mainly responsible for ensuring there is an async runtime that observers can work with, it no longer owns the exporter, and to provide a bridge from sync <-> async code.
- A generated {App}Context now concurrently writes the model provenance sidecar file, constructs (+connects) exporters and observers, and blocks until all of that is ready.
- Each Observer now manages its own Exporter (which lives in an async forwarder task that it cancels on drop)
- Each {Entity}Observer deals out {Entity}Handles for entity instances which hold an Arc to the inner Observer, which ensures everything is kept alive as long as a handle exists.

This means as long as there are any live handles, even when all Observers or Context is dropped, events can be emitted. Also, constructing or dropping Context / Observer / Handle can be done from both synchronous and asynchronous code as long as that code is not using a `current-thread` runtime (also raised in #226). This PR adds tests for all flavors of runtime environments for the Context to live in.

- Filesystem exporters now write in the following directory tree:

```
  - <context uuid>
    - <entity_name>
      - <uuid>.<extension>  (using a uuid here is just a non-enforced convention 
                             that mainly batching no-append file format exporters 
                             will benefit from as they can just generate a new uuid 
                             for each batch of events as file name).
    - <another entity name>
      - <uuid>.<extension>
      - <uuid>.<extension>
      - ...
    - model.qmi        (sidecar file) 
```

- For the query engine domain, since a (distributed) engine's events could now be spread out across multiple contexts / directories, listing engines is done by first scanning all engine events across the entire data source folder to obtain all engine ids. Then, all worker events are scanned to figure out which workers were spawned on behalf of those engines. For now it is assumed that these entities are directly tied to all event-producing processes of the engines, such that their instrumentation contexts together capture all events for an entire engine model. This is a rather brittle assumption that should not be relied on in follow-up work. I intend to address this with a generated importing stack that leverages the quent-ref-tree constraint after #191 is done, plus some sort of indexing service hinted at in #40.

## Related Issues

Closes #229



## For reference: example of macro generated code by 🤖:

### Generated event type + stream name
```rust
pub enum FileStatsEvent { Checksum(Checksum), Decompressed(Decompressed) }

impl quent_model::EntityEvent for FileStatsEvent {
    const NAME: &'static str = "file_stats";   // exporter subdir / wire tag / ingest key
}
```

### `{App}Context` (the user entry point), the relevant slice
```rust
pub struct AppContext {
    file_stats: FileStatsObserver,     // ... one field per entity
    _inner: quent_model::Context,
}

impl AppContext {
    pub fn try_new(exporter: Option<…ExporterOptions>) -> Result<Self, …> {
        let inner = quent_model::Context::try_new(exporter)?;   // sync: resolve runtime only
        Self::assemble(inner)
    }

    // The single sync→async bridge: sidecar + every observer built concurrently,
    // blocked once.
    fn assemble(inner: quent_model::Context) -> Result<Self, …> {
        let (file_stats, …) = inner.block_on(async {
            let (_sidecar, file_stats, …) = quent_model::tokio::try_join!(
                async { inner.write_sidecar(<App as ModelSource>::model_info()).await; Ok(()) },
                inner.observer::<FileStatsEvent>(),     // ... one per entity
            )?;
            Ok((file_stats, …))
        })?;
        Ok(Self { file_stats: FileStatsObserver::new(file_stats), …, _inner: inner })
    }

    pub fn file_stats_observer(&self) -> FileStatsObserver { self.file_stats.clone() }   // cheap Arc clone
}
```

### `{Entity}Observer` facade → `{Entity}Handle`
```rust
pub struct FileStatsObserver { inner: Arc<quent_model::Observer<FileStatsEvent>> }

impl FileStatsObserver {
    pub fn new(observer: quent_model::Observer<FileStatsEvent>) -> Self { Self { inner: Arc::new(observer) } }
    pub fn send(&self, event: quent_model::Event<FileStatsEvent>) { self.inner.send(event); }  // pre-built (collector path)
    pub fn create(&self, id: Uuid) -> FileStatsHandle {
        FileStatsHandle { id, inner: self.inner.clone() }   // handle co-owns the observer
    }
}

pub struct FileStatsHandle { id: Uuid, inner: Arc<quent_model::Observer<FileStatsEvent>> }

impl FileStatsHandle {
    pub fn uuid(&self) -> Uuid { self.id }
    pub fn checksum(&self, event: Checksum) {
        self.inner.emit(self.id, FileStatsEvent::Checksum(event));   // Observer::emit -> EventSender -> forwarder
    }
    pub fn decompressed(&self, event: Decompressed) {
        self.inner.emit(self.id, FileStatsEvent::Decompressed(event));
    }
}
```

Emit path: `AppContext.file_stats_observer().create(id).checksum(..)` → `Observer::emit` → mpsc → forwarder task → exporter.

### Read/route side (collector + analyzer), same entity
```rust
// CollectorSink::ingest (server replays a received stream)
if entity == <FileStatsEvent as EntityEvent>::NAME {
    let e: Event<FileStatsEvent> = ciborium::from_reader(event)?;
    self.file_stats.send(e);                 // into the (already built) observer
    return Ok(());
}

// {Model}::import_events (analyzer reconstruction), per entity:
let path = dir.join(<FileStatsEvent as EntityEvent>::NAME);   // <ctx>/file_stats
if path.is_dir() {
    let importer = create_importer::<FileStatsEvent>(&FileSystem { format, path })?;
    streams.push(importer.map(|e| Event::new(e.id, e.timestamp, AppEvent::from(e.data))));
}
```

`AppEvent` (the umbrella enum) + its `From<{Entity}Event>` impls are still generated solely for this analyzer reconstruction; nothing on the capture path uses them.

Authors:
  - Johan Peltenburg (https://github.com/johanpel)

Approvers:
  - Matthijs Brobbel (https://github.com/mbrobbel)
  - Dhruv Vats (https://github.com/dhruv9vats)

URL: #241
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking Introduces a breaking change improvement Improves an existing functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants