Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 0 additions & 141 deletions bec_server/bec_server/scan_bundler/bluesky_emitter.py

This file was deleted.

8 changes: 1 addition & 7 deletions bec_server/bec_server/scan_bundler/scan_bundler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from bec_lib.logger import bec_logger

from .bec_emitter import BECEmitter
from .bluesky_emitter import BlueskyEmitter

if TYPE_CHECKING:
from bec_lib.redis_connector import RedisConnector
Expand Down Expand Up @@ -46,7 +45,6 @@ def __init__(self, config, connector_cls: type[RedisConnector]) -> None:
self.monitored_devices = {}
self.baseline_devices = {}
self.device_storage = {}
self.scan_motors = {}
self.readout_priority = {}
self.storage_initialized = set()
self.executor = ThreadPoolExecutor(max_workers=4)
Expand All @@ -58,7 +56,7 @@ def __init__(self, config, connector_cls: type[RedisConnector]) -> None:
self.status = messages.BECStatus.RUNNING

def _initialize_emitters(self):
self._emitter = [BECEmitter(self), BlueskyEmitter(self)]
self._emitter = [BECEmitter(self)]

def run_emitter(self, emitter_method: Callable, *args, **kwargs):
for emi in self._emitter:
Expand Down Expand Up @@ -125,8 +123,6 @@ def _initialize_scan_container(self, scan_msg: messages.ScanStatusMessage):

scan_id = scan_msg.content["scan_id"]
scan_info = scan_msg.content["info"]
scan_motors = list(set(self.device_manager.devices[m] for m in scan_info["scan_motors"]))
self.scan_motors[scan_id] = scan_motors
self.readout_priority[scan_id] = scan_info["readout_priority"]
if scan_id not in self.storage_initialized:
self.sync_storage[scan_id] = {"info": scan_info, "status": "open", "sent": set()}
Expand Down Expand Up @@ -363,14 +359,12 @@ def cleanup_storage(self):
"sync_storage",
"monitored_devices",
"baseline_devices",
"scan_motors",
"readout_priority",
]:
try:
getattr(self, storage).pop(scan_id)
except KeyError:
logger.warning(f"Failed to remove {scan_id} from {storage}.")
# self.bluesky_emitter.cleanup_storage(scan_id)
self.run_emitter("on_cleanup", scan_id)
self.storage_initialized.remove(scan_id)

Expand Down
78 changes: 0 additions & 78 deletions bec_server/tests/tests_scan_bundler/test_bluesky_emitter.py

This file was deleted.

7 changes: 1 addition & 6 deletions bec_server/tests/tests_scan_bundler/test_scan_bundler.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,14 @@ def test_initialize_scan_container(scan_bundler_mock, scan_msg):
sb = scan_bundler_mock
scan_id = scan_msg.content["scan_id"]
scan_info = scan_msg.content["info"]
scan_motors = list(set(sb.device_manager.devices[m] for m in scan_info["scan_motors"]))
readout_priority = scan_info["readout_priority"]
bl_devs = sb.device_manager.devices.baseline_devices(readout_priority=readout_priority)

with mock.patch.object(sb, "run_emitter") as emitter_mock:
sb._initialize_scan_container(
scan_msg
) # The sb.device_manager.devices[m] will crash if m is not a motor in devices
sb._initialize_scan_container(scan_msg)

if scan_msg.content.get("status") != "open":
return
assert sb.scan_motors[scan_id] == scan_motors
assert sb.sync_storage[scan_id] == {"info": scan_info, "status": "open", "sent": set()}
assert sb.monitored_devices[scan_id] == {
"devices": sb.device_manager.devices.monitored_devices(
Expand Down Expand Up @@ -573,7 +569,6 @@ def test_baseline_update(scan_bundler_mock, scan_id, device, signal):
sb = scan_bundler_mock
sb.baseline_devices[scan_id] = {"done": {device: False}}
sb.sync_storage[scan_id] = {}
sb.scan_motors[scan_id] = []
sb.readout_priority[scan_id] = {}
with mock.patch.object(sb, "run_emitter") as emitter:
sb._baseline_update(scan_id, device, signal)
Expand Down
Loading