A simple yet powerful concurrent data processing dispatcher framework for Rust, built on the Actor model.
WorkDispatcher
provides a generic and ergonomic way to set up a system that reads items from a source (Producer
) and
dispatches them to a pool of async workers (Processor
) for concurrent processing. It abstracts away the complexities
of channels, task spawning, and graceful shutdown, allowing you to focus on your business logic.
- Producer-Consumer Model: Decouples data generation from data processing.
- Concurrent Processing: Leverages a pool of async actors to process items in parallel, maximizing throughput.
- Backpressure Handling: Uses a bounded channel to prevent out-of-memory errors when the producer is faster than the consumers.
- Type-Safe & Generic: Define your own data items, producers, and processors using traits.
- Simple API: Set up and run a complex processing system with a fluent builder-style API.
- Built on Tokio & Flume: Relies on industry-standard crates for high-performance async runtime and channels.
Add work_dispatcher
to your Cargo.toml
:
[dependencies]
work_dispatcher = "0.1.0"
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
async-trait = "0.1"
flume = "0.11"
Then, create your dispatcher in just a few steps:
use anyhow::Result;
use async_trait::async_trait;
use flume::Sender;
use std::time::Duration;
use work_dispatcher::{Producer, Processor, WorkDispatcher};
// 1. Define your data structure
#[derive(Debug)]
struct DataItem {
id: i32,
}
// 2. Implement the Producer trait for your data source
struct MyProducer;
#[async_trait]
impl Producer for MyProducer {
type Item = DataItem;
async fn run(self, sender: Sender<Self::Item>) {
println!("[Producer] Starting to generate items...");
for i in 0..100 {
sender.send_async(DataItem { id: i })?;
}
println!("[Producer] Finished generating items.");
}
}
// 3. Implement the Processor trait for your processing logic
#[derive(Clone)]
struct MyProcessor;
#[async_trait]
impl Processor for MyProcessor {
type Item = DataItem;
type Context = String; // The shared context for all workers
async fn process(&self, item: Self::Item, context: &Self::Context) {
println!(
"[Processor] Processing item #{} with context: '{}'",
item.id, context
);
// Simulate async work like a database call or API request
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
// 4. Build and run the WorkDispatcher
#[tokio::main]
async fn main() -> Result<()> {
let producer = MyProducer;
let processor = MyProcessor;
let context = "Shared DB Connection Pool".to_string();
WorkDispatcher::new(producer, processor, context)
.workers(8) // Use 8 concurrent workers
.buffer(500) // Set channel buffer size to 500
.run()
.await?;
println!("Dispatcher finished successfully!");
Ok(())
}
This project is licensed: MIT license
Contributions are welcome! Please feel free to submit a pull request or open an issue.