diff --git a/bec_server/bec_server/scan_bundler/bluesky_emitter.py b/bec_server/bec_server/scan_bundler/bluesky_emitter.py deleted file mode 100644 index 18686e8dc..000000000 --- a/bec_server/bec_server/scan_bundler/bluesky_emitter.py +++ /dev/null @@ -1,141 +0,0 @@ -from __future__ import annotations - -import time -import uuid -from typing import TYPE_CHECKING - -import msgpack - -from bec_lib.endpoints import MessageEndpoints -from bec_lib.logger import bec_logger - -from .emitter import EmitterBase - -logger = bec_logger.logger - -if TYPE_CHECKING: - from .scan_bundler import ScanBundler - - -class BlueskyEmitter(EmitterBase): - def __init__(self, scan_bundler: ScanBundler) -> None: - super().__init__(scan_bundler.connector) - self.scan_bundler = scan_bundler - self.bluesky_metadata = {} - - def send_run_start_document(self, scan_id) -> None: - """Bluesky only: send run start documents.""" - logger.debug(f"sending run start doc for scan_id {scan_id}") - self.bluesky_metadata[scan_id] = {} - doc = self._get_run_start_document(scan_id) - self.bluesky_metadata[scan_id]["start"] = doc - self.connector.raw_send(MessageEndpoints.bluesky_events(), msgpack.dumps(("start", doc))) - self.send_descriptor_document(scan_id) - - def _get_run_start_document(self, scan_id) -> dict: - sb = self.scan_bundler - doc = { - "time": time.time(), - "uid": str(uuid.uuid4()), - "scan_id": scan_id, - "queue_id": sb.sync_storage[scan_id]["info"]["queue_id"], - "scan_id": sb.sync_storage[scan_id]["info"]["scan_number"], - "motors": tuple(dev.name for dev in sb.scan_motors[scan_id]), - } - return doc - - def _get_data_keys(self, scan_id): - sb = self.scan_bundler - signals = {} - for dev in sb.monitored_devices[scan_id]["devices"]: - # copied from bluesky/callbacks/stream.py: - signals[dev.name] = sb.device_manager.devices[dev.name]._info.get("describe", {}) - return signals - - def _get_descriptor_document(self, scan_id) -> dict: - sb = self.scan_bundler - doc = { - "run_start": self.bluesky_metadata[scan_id]["start"]["uid"], - "time": time.time(), - "data_keys": self._get_data_keys(scan_id), - "uid": str(uuid.uuid4()), - "configuration": {}, - "name": "primary", - "hints": {"samx": {"fields": ["samx"]}, "samy": {"fields": ["samy"]}}, - "object_keys": { - dev.name: [val for val in dev._info.get("signals", {})] - for dev in sb.monitored_devices[scan_id]["devices"] - }, - } - return doc - - def send_descriptor_document(self, scan_id) -> None: - """Bluesky only: send descriptor document""" - doc = self._get_descriptor_document(scan_id) - self.bluesky_metadata[scan_id]["descriptor"] = doc - self.connector.raw_send( - MessageEndpoints.bluesky_events(), msgpack.dumps(("descriptor", doc)) - ) - - def cleanup_storage(self, scan_id): - """remove old scan_ids to free memory""" - - for storage in ["bluesky_metadata"]: - try: - getattr(self, storage).pop(scan_id) - except KeyError: - logger.warning(f"Failed to remove {scan_id} from {storage}.") - - def send_bluesky_scan_point(self, scan_id, point_id) -> None: - self.connector.raw_send( - MessageEndpoints.bluesky_events(), - msgpack.dumps(("event", self._prepare_bluesky_event_data(scan_id, point_id))), - ) - - def _prepare_bluesky_event_data(self, scan_id, point_id) -> dict: - # event = { - # "descriptor": "5605e810-bb4e-4e40-b...d45279e3a4", - # "time": 1648468217.524021, - # "data": { - # "det": 1.0, - # "motor1": -10.0, - # "motor1_setpoint": -10.0, - # "motor2": -10.0, - # "motor2_setpoint": -10.0, - # }, - # "timestamps": { - # "det": 1648468209.868633, - # "motor1": 1648468209.862141, - # "motor1_setpoint": 1648468209.8607192, - # "motor2": 1648468209.864479, - # "motor2_setpoint": 1648468209.8629901, - # }, - # "seq_num": 1, - # "uid": "ea83a56e-6af2-4b94-9...44dcc36d4e", - # "filled": {}, - # } - sb = self.scan_bundler - metadata = self.bluesky_metadata[scan_id] - while not metadata.get("descriptor"): - time.sleep(0.01) - - bls_event = { - "descriptor": metadata["descriptor"].get("uid"), - "time": time.time(), - "seq_num": point_id, - "uid": str(uuid.uuid4()), - "filled": {}, - "data": {}, - "timestamps": {}, - } - for data_point in sb.sync_storage[scan_id][point_id].values(): - for key, val in data_point.items(): - bls_event["data"][key] = val["value"] - bls_event["timestamps"][key] = val["timestamp"] - return bls_event - - def on_cleanup(self, scan_id: str): - self.cleanup_storage(scan_id) - - def on_init(self, scan_id: str): - self.send_run_start_document(scan_id) diff --git a/bec_server/bec_server/scan_bundler/scan_bundler.py b/bec_server/bec_server/scan_bundler/scan_bundler.py index e1e63d5f0..5821122f5 100644 --- a/bec_server/bec_server/scan_bundler/scan_bundler.py +++ b/bec_server/bec_server/scan_bundler/scan_bundler.py @@ -15,7 +15,6 @@ from bec_lib.logger import bec_logger from .bec_emitter import BECEmitter -from .bluesky_emitter import BlueskyEmitter if TYPE_CHECKING: from bec_lib.redis_connector import RedisConnector @@ -46,7 +45,6 @@ def __init__(self, config, connector_cls: type[RedisConnector]) -> None: self.monitored_devices = {} self.baseline_devices = {} self.device_storage = {} - self.scan_motors = {} self.readout_priority = {} self.storage_initialized = set() self.executor = ThreadPoolExecutor(max_workers=4) @@ -58,7 +56,7 @@ def __init__(self, config, connector_cls: type[RedisConnector]) -> None: self.status = messages.BECStatus.RUNNING def _initialize_emitters(self): - self._emitter = [BECEmitter(self), BlueskyEmitter(self)] + self._emitter = [BECEmitter(self)] def run_emitter(self, emitter_method: Callable, *args, **kwargs): for emi in self._emitter: @@ -125,8 +123,6 @@ def _initialize_scan_container(self, scan_msg: messages.ScanStatusMessage): scan_id = scan_msg.content["scan_id"] scan_info = scan_msg.content["info"] - scan_motors = list(set(self.device_manager.devices[m] for m in scan_info["scan_motors"])) - self.scan_motors[scan_id] = scan_motors self.readout_priority[scan_id] = scan_info["readout_priority"] if scan_id not in self.storage_initialized: self.sync_storage[scan_id] = {"info": scan_info, "status": "open", "sent": set()} @@ -363,14 +359,12 @@ def cleanup_storage(self): "sync_storage", "monitored_devices", "baseline_devices", - "scan_motors", "readout_priority", ]: try: getattr(self, storage).pop(scan_id) except KeyError: logger.warning(f"Failed to remove {scan_id} from {storage}.") - # self.bluesky_emitter.cleanup_storage(scan_id) self.run_emitter("on_cleanup", scan_id) self.storage_initialized.remove(scan_id) diff --git a/bec_server/tests/tests_scan_bundler/test_bluesky_emitter.py b/bec_server/tests/tests_scan_bundler/test_bluesky_emitter.py deleted file mode 100644 index f14edbdbb..000000000 --- a/bec_server/tests/tests_scan_bundler/test_bluesky_emitter.py +++ /dev/null @@ -1,78 +0,0 @@ -from unittest import mock - -import msgpack -import pytest - -from bec_lib.endpoints import MessageEndpoints -from bec_server.scan_bundler.bluesky_emitter import BlueskyEmitter - - -@pytest.fixture -def bls_emitter(scan_bundler_mock): - emitter = BlueskyEmitter(scan_bundler_mock) - yield emitter - emitter.shutdown() - - -@pytest.mark.parametrize("scan_id", ["alskdj"]) -def test_run_start_document(bls_emitter, scan_id): - with mock.patch.object(bls_emitter.connector, "raw_send") as send: - with mock.patch.object(bls_emitter, "send_descriptor_document") as send_descr: - with mock.patch.object( - bls_emitter, "_get_run_start_document", return_value={} - ) as get_doc: - bls_emitter.send_run_start_document(scan_id) - get_doc.assert_called_once_with(scan_id) - send.assert_called_once_with( - MessageEndpoints.bluesky_events(), msgpack.dumps(("start", {})) - ) - send_descr.assert_called_once_with(scan_id) - - -def test_get_run_start_document(bls_emitter): - sb = bls_emitter.scan_bundler - scan_id = "lkajsdl" - sb.sync_storage[scan_id] = {"info": {"queue_id": "jdklj", "scan_number": 5}} - sb.scan_motors[scan_id] = [sb.device_manager.devices.samx, sb.device_manager.devices.samy] - - data = bls_emitter._get_run_start_document(scan_id) - - assert all(key in data for key in ["time", "uid", "scan_id", "queue_id", "scan_id", "motors"]) - assert data["motors"] == ("samx", "samy") - assert data["scan_id"] == 5 - - -def test_send_descriptor_document(bls_emitter): - scan_id = "lkajsdl" - bls_emitter.bluesky_metadata[scan_id] = {} - with mock.patch.object(bls_emitter.connector, "raw_send") as send: - with mock.patch.object( - bls_emitter, "_get_descriptor_document", return_value={} - ) as get_descr: - bls_emitter.send_descriptor_document(scan_id) - get_descr.assert_called_once_with(scan_id) - send.assert_called_once_with( - MessageEndpoints.bluesky_events(), msgpack.dumps(("descriptor", {})) - ) - - -def test_bls_cleanup_storage(bls_emitter): - scan_id = "lkajsdl" - bls_emitter.bluesky_metadata[scan_id] = {} - - bls_emitter.cleanup_storage(scan_id) - assert scan_id not in bls_emitter.bluesky_metadata - - -def test_bls_on_cleanup(bls_emitter): - scan_id = "lkajsdl" - with mock.patch.object(bls_emitter, "cleanup_storage") as cleanup: - bls_emitter.on_cleanup(scan_id) - cleanup.assert_called_once_with(scan_id) - - -def test_bls_on_init(bls_emitter): - scan_id = "lkajsdl" - with mock.patch.object(bls_emitter, "send_run_start_document") as start: - bls_emitter.on_init(scan_id) - start.assert_called_once_with(scan_id) diff --git a/bec_server/tests/tests_scan_bundler/test_scan_bundler.py b/bec_server/tests/tests_scan_bundler/test_scan_bundler.py index 88793c23e..13a6d6dae 100644 --- a/bec_server/tests/tests_scan_bundler/test_scan_bundler.py +++ b/bec_server/tests/tests_scan_bundler/test_scan_bundler.py @@ -390,18 +390,14 @@ def test_initialize_scan_container(scan_bundler_mock, scan_msg): sb = scan_bundler_mock scan_id = scan_msg.content["scan_id"] scan_info = scan_msg.content["info"] - scan_motors = list(set(sb.device_manager.devices[m] for m in scan_info["scan_motors"])) readout_priority = scan_info["readout_priority"] bl_devs = sb.device_manager.devices.baseline_devices(readout_priority=readout_priority) with mock.patch.object(sb, "run_emitter") as emitter_mock: - sb._initialize_scan_container( - scan_msg - ) # The sb.device_manager.devices[m] will crash if m is not a motor in devices + sb._initialize_scan_container(scan_msg) if scan_msg.content.get("status") != "open": return - assert sb.scan_motors[scan_id] == scan_motors assert sb.sync_storage[scan_id] == {"info": scan_info, "status": "open", "sent": set()} assert sb.monitored_devices[scan_id] == { "devices": sb.device_manager.devices.monitored_devices( @@ -573,7 +569,6 @@ def test_baseline_update(scan_bundler_mock, scan_id, device, signal): sb = scan_bundler_mock sb.baseline_devices[scan_id] = {"done": {device: False}} sb.sync_storage[scan_id] = {} - sb.scan_motors[scan_id] = [] sb.readout_priority[scan_id] = {} with mock.patch.object(sb, "run_emitter") as emitter: sb._baseline_update(scan_id, device, signal)