Skip to content

Commit ac34ff4

Browse files
authored
feat(grpc): subscriptions runtime (#190)
1 parent 0ff72ff commit ac34ff4

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

crates/grpc/server/src/lib.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use torii_proto::Message;
5959

6060
use anyhow::{anyhow, Error};
6161

62-
#[derive(Debug, Clone)]
62+
#[derive(Debug)]
6363
pub struct DojoWorld<P: Provider + Sync> {
6464
storage: Arc<dyn Storage>,
6565
provider: Arc<P>,
@@ -73,6 +73,8 @@ pub struct DojoWorld<P: Provider + Sync> {
7373
token_manager: Arc<TokenManager>,
7474
transaction_manager: Arc<TransactionManager>,
7575
_config: GrpcConfig,
76+
// Keep runtime alive
77+
_subscriptions_runtime: tokio::runtime::Runtime,
7678
}
7779

7880
impl<P: Provider + Sync> DojoWorld<P> {
@@ -91,31 +93,40 @@ impl<P: Provider + Sync> DojoWorld<P> {
9193
let token_manager = Arc::new(TokenManager::new(config.clone()));
9294
let transaction_manager = Arc::new(TransactionManager::new(config.clone()));
9395

94-
tokio::task::spawn(subscriptions::entity::Service::new(Arc::clone(
96+
// Create a separate runtime for subscription processing
97+
// This isolates subscription work from gRPC request handling
98+
let subscriptions_runtime = tokio::runtime::Builder::new_multi_thread()
99+
.worker_threads(num_cpus::get().min(4)) // Dedicated threads for subscriptions
100+
.thread_name("torii-grpc-subscriptions")
101+
.build()
102+
.expect("Failed to create subscriptions runtime");
103+
104+
// Spawn subscription services on the dedicated runtime
105+
subscriptions_runtime.spawn(subscriptions::entity::Service::new(Arc::clone(
95106
&entity_manager,
96107
)));
97108

98-
tokio::task::spawn(subscriptions::event_message::Service::new(Arc::clone(
109+
subscriptions_runtime.spawn(subscriptions::event_message::Service::new(Arc::clone(
99110
&event_message_manager,
100111
)));
101112

102-
tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(
113+
subscriptions_runtime.spawn(subscriptions::event::Service::new(Arc::clone(
103114
&event_manager,
104115
)));
105116

106-
tokio::task::spawn(subscriptions::indexer::Service::new(Arc::clone(
117+
subscriptions_runtime.spawn(subscriptions::indexer::Service::new(Arc::clone(
107118
&indexer_manager,
108119
)));
109120

110-
tokio::task::spawn(subscriptions::token_balance::Service::new(Arc::clone(
121+
subscriptions_runtime.spawn(subscriptions::token_balance::Service::new(Arc::clone(
111122
&token_balance_manager,
112123
)));
113124

114-
tokio::task::spawn(subscriptions::token::Service::new(Arc::clone(
125+
subscriptions_runtime.spawn(subscriptions::token::Service::new(Arc::clone(
115126
&token_manager,
116127
)));
117128

118-
tokio::task::spawn(subscriptions::transaction::Service::new(Arc::clone(
129+
subscriptions_runtime.spawn(subscriptions::transaction::Service::new(Arc::clone(
119130
&transaction_manager,
120131
)));
121132

@@ -132,6 +143,7 @@ impl<P: Provider + Sync> DojoWorld<P> {
132143
token_manager,
133144
transaction_manager,
134145
_config: config,
146+
_subscriptions_runtime: subscriptions_runtime,
135147
}
136148
}
137149
}

0 commit comments

Comments
 (0)