11import time
22from concurrent .futures import Future
33
4- from bluesky_stomp .messaging import MessageContext , StompClient
5- from bluesky_stomp .models import Broker
6- from observability_utils .tracing import (
7- get_tracer ,
8- start_as_current_span ,
9- )
4+ from bluesky_stomp .messaging import MessageContext
5+ from observability_utils .tracing import get_tracer , start_as_current_span
106
117from blueapi .config import ApplicationConfig , MissingStompConfigurationError
128from blueapi .core .bluesky_types import DataEvent
@@ -38,15 +34,15 @@ class BlueapiClient:
3834 """Unified client for controlling blueapi"""
3935
4036 _rest : BlueapiRestClient
41- _events : EventBusClient | None
37+ _event_bus_client : EventBusClient | None
4238
4339 def __init__ (
4440 self ,
4541 rest : BlueapiRestClient ,
4642 events : EventBusClient | None = None ,
4743 ):
4844 self ._rest = rest
49- self ._events = events
45+ self ._event_bus_client = events
5046
5147 @classmethod
5248 def from_config (cls , config : ApplicationConfig ) -> "BlueapiClient" :
@@ -56,21 +52,8 @@ def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
5652 except Exception :
5753 ... # Swallow exceptions
5854 rest = BlueapiRestClient (config .api , session_manager = session_manager )
59- stomp_config = config .stomp if config .stomp .enabled else rest .get_stomp_config ()
60- if stomp_config and stomp_config .enabled :
61- assert stomp_config .url .host is not None , "Stomp URL missing host"
62- assert stomp_config .url .port is not None , "Stomp URL missing port"
63- client = StompClient .for_broker (
64- broker = Broker (
65- host = stomp_config .url .host ,
66- port = stomp_config .url .port ,
67- auth = stomp_config .auth ,
68- )
69- )
70- events = EventBusClient (client )
71- return cls (rest , events )
72- else :
73- return cls (rest )
55+ event_bus = EventBusClient .from_stomp_config (config .stomp )
56+ return cls (rest , event_bus )
7457
7558 @start_as_current_span (TRACER )
7659 def get_plans (self ) -> PlanResponse :
@@ -217,7 +200,7 @@ def run_task(
217200 of task execution.
218201 """
219202
220- if self ._events is None :
203+ if ( event_bus := self ._event_bus ()) is None :
221204 raise MissingStompConfigurationError (
222205 "Stomp configuration required to run plans is missing or disabled"
223206 )
@@ -254,8 +237,8 @@ def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
254237 else :
255238 complete .set_result (event )
256239
257- with self . _events :
258- self . _events .subscribe_to_all_events (inner_on_event )
240+ with event_bus :
241+ event_bus .subscribe_to_all_events (inner_on_event )
259242 self .start_task (WorkerTask (task_id = task_id ))
260243 return complete .result (timeout = timeout )
261244
@@ -458,3 +441,10 @@ def get_python_env(
458441 """
459442
460443 return self ._rest .get_python_environment (name = name , source = source )
444+
445+ def _event_bus (self ) -> EventBusClient | None :
446+ if not self ._event_bus_client :
447+ if stomp_config := self ._rest .get_stomp_config ():
448+ self ._event_bus_client = EventBusClient .from_stomp_config (stomp_config )
449+
450+ return self ._event_bus_client
0 commit comments