Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Bus v2 #1712

Closed
twitu opened this issue Jun 14, 2024 · 5 comments · Fixed by #1786
Closed

Message Bus v2 #1712

twitu opened this issue Jun 14, 2024 · 5 comments · Fixed by #1786
Assignees
Labels
enhancement New feature or request RFC A request for comment rust Relating to the Rust core

Comments

@twitu
Copy link
Collaborator

twitu commented Jun 14, 2024

MessageBus is a core component of the Nautilus engine that allows other components to talk to each by passing messages to handlers #1711 that can consume the message. It allows components to be loosely and dynamically coupled. A unique characteristic of the Nautilus MessageBus is that it is meant to connect components running on the same thread which means it is a synchronous and deterministic message bus.

It supports 3 communication patterns:

  • Send a message to a component
  • Request a response from a component
  • Publish a message to multiple components

Send

  • register_endpoint - registers a handler for an endpoint
  • send - send a message to an endpoint

Request/Response

  • register_endpoint - registers a handler for an endpoint
  • request - send a message to an endpoint with a request id and a response handler. The output of the endpoint is sent to the response handler.

Publish

  • subscribe - subscribe a handler to all topics that match a given pattern
  • publish - publish a message to a topic which passes the message to all subscribed handlers in order

The current message bus implementation has most of the data structures to store the data to enable the above patterns. However, it needs a separate component to queue messages and execute the handlers with the messages. In this sense, the below structure is the context and the runner actually executes the tasks. The runner can be a custom implementation, however there might be a way to get the tokio runtime to do this for us by restricting it to run on one thread essentially making in synchronous. Moreover, since the message bus context needs to be accessible by many different components it might be ergonomic to make it a global variable.

pub struct MessageBus {
    ...
    /// mapping from topic to the corresponding handler
    /// a topic can be a string with wildcards
    /// * '?' - any character
    /// * '*' - any number of any characters
    subscriptions: IndexMap<Subscription, Vec<Ustr>>,
    /// maps a pattern to all the handlers registered for it
    /// this is updated whenever a new subscription is created.
    patterns: IndexMap<Ustr, Vec<Subscription>>,
    /// handles a message or a request destined for a specific endpoint.
    endpoints: IndexMap<Ustr, MessageHandler>,
    /// Relates a request with a response
    /// a request maps it's id to a handler so that a response
    /// with the same id can later be handled.
    correlation_index: IndexMap<UUID4, MessageHandler>,
}

There still remains some challenge around sharing access to the message bus fields because different components might need shared read and write access.

@twitu twitu added the enhancement New feature or request label Jun 14, 2024
@twitu twitu self-assigned this Jun 14, 2024
@twitu twitu added RFC A request for comment rust Relating to the Rust core labels Jun 14, 2024
@twitu
Copy link
Collaborator Author

twitu commented Jun 16, 2024

The message bus needs to accept many different kind of messages. There are a few pre-defined messages that are used by internal components, however users can also derive their custom classes and even transfer arbitrary Python Objects through the message bus.

  • Command
    • TradingCommand
    • DataCommand
  • Event
    • ComponentStateChanged
    • RiskEvent
    • TimeEvent
    • AccountState
    • OrderEvent
    • PositionEvent
  • Request
    • DataRequest
  • Response
    • DataResponse

The messages can have very different fields so a single trait cannot encompass all the functionality. However, we can use traits to hide the details of the message and recover the concrete type later (without any serde) by using the dynamic typing functionality provided by the std::any modules.

A message handler needs to advertise the concrete type it intends to consume, and the message can then be downcasted to the appropriate concrete value.

pub trait MessageHandler {
   fn arg_type_id() -> TypeId
   fn call(&mut self, arg: Box<dyn Any>)
}

Pseudo examples

For example, a user defined custom Python handler that takes a Python object can be wrapped into a message handler like this pseudocode:

struct MessageBusPythonHandler {
   callback: PyObject
}

impl MessageHandler for MessageBusPythonHandler {
    fn arg_type_id() -> TypeId {
        TypeId::of::<PyObject>()
    }
    
    fn call(&mut self, arg: Box<dyn Any>) {
            let arg = arg as &dyn Any;
            
            let arg = arg.downcast::<PyObject>::().unwrap();
            Python::with_gil(...
                self.callback.call(arg)
             );
    }
}

On the other hand, internal components can define their handlers in Rust. For example the DataEngine registers itself as an endpoint in the message bus.

        self._msgbus.register(endpoint="DataEngine.execute", handler=self.execute)

This can be ported to message bus v2 as:

struct DataEngineExecuteHandler {
    engine: DataEngine
}

impl MessageHandler for DateEngineExecuteHandler {
    fn arg_type_id() -> TypeId {
        TypeId::of::<PyObject>()
    }
    
    fn call(&mut self, arg: Box<dyn Any>) {
            let arg = arg as &dyn Any;
            let command = arg.downcast::<Command>::().unwrap();
            self.engine.execute(command);
    }
}

Summary

The downside to this approach is that dynamic typing will be costlier than directly calling the functions and will reduce the optimizations the compiler can perform. The exact impact can be found only after some perf tests but intuitively it should still be better than a serde based approach. It also adds some complexity to how a component can receives events because the component will have to create a unique struct for downcasting the specific "type" of event.

The upside is that this approach keeps the decoupled, modular and user extensible properties of the current message bus.

@Troubladore
Copy link
Contributor

RE: "A unique characteristic of the nautilus message bus is that it is meant to connect components running on the same thread which means it is a synchronous and deterministic message bus."

Any thoughts on how the recent emergence of subinterpreters and the forthcoming GIL-less Python could impact some of Nautilus' architectural choices?

btw - I really appreciate the deep thought you are giving to these core elements - having an engine that is as robust and performant as possible is a huge selling point for Nautilus in my mind, and these are the tickets that make it happen.

@twitu
Copy link
Collaborator Author

twitu commented Jun 20, 2024

Thanks for sharing this. I'm excited about GIL-less Python, although after reading it appears to be tricky to use properly and bug-freely leverage.

In the case of Nautilus's core architecture, I don't see it being affected much because nautilus core is CPU bound and each event is data dependent on previous event leaving little scope for parallelism. Concurrency is leveraged at the fringes where IO bound tasks read from files or networks. Perhaps after careful analysis of data dependence some components in the pub/sub communication model can benefit from this. But as of now, the message bus is meant to be used in a single threaded context.

@cjdsellers
Copy link
Member

Just in addition to @twitu s thoughts - performance considerations aside, determinism is a very desirable feature for a backtest engine (and for the core platform), which is what we get with the current (mostly) single threaded design. As twitu points out, we're already leveraging threads (via Rust) for logging, cache and message bus (external publishing) and in Python for adapter network peripherals (live) which seems to be working well - as all of those are offloading I/O overhead from the main thread in some way.

The approach we're settling on right now is to port the basic design of the message bus we have already, but adapted slightly based on the guarantees Rust provides. We'll hopefully be able to keep the full range of features currently available, but the Python/pyo3 interaction does make it a challenge to achieve.

@twitu twitu mentioned this issue Jul 9, 2024
@twitu
Copy link
Collaborator Author

twitu commented Jul 9, 2024

Based on the new data engine design in #1782, the message bus is drastically simplified as it only needs to map endpoints to handlers and topics to subscribers. However, this approach introduces a few constraints directly derived from rust's mutability and sharing rules -

  1. Actors cannot be run concurrently
  2. A message handler cannot add new handlers to the message bus
  3. A message handler cannot end up calling itself through any chain of messages - this will result in a panic.
concurrent actors handler can update handler mapping in message bus Handler mapping Message handler internal state sharing/sync primitives
Rc<RefCell<HashMap<usize, Rc<RefCell<dyn MessageHandler>>>>> Rc RefCell
✔️ Arc<RwLock<HashMap<usize, Rc<dyn MessageHandler>>> or entry level locks like lockfree::Map Arc Mutex
✔️ ✔️ Entry level locks like lockfree::Map Arc Mutex
✔️ Entry level locks like lockfree::Map Arc Mutex

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request RFC A request for comment rust Relating to the Rust core
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants