Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def queue_elements(bec_client_mock):
request_block = messages.RequestBlock(
msg=request_msg,
RID="req_id",
scan_motors=["samx"],
report_instructions=[],
readout_priority={"monitored": ["samx"]},
is_scan=True,
Expand Down Expand Up @@ -63,7 +62,6 @@ def sample_request_block(sample_request_msg):
return messages.RequestBlock(
msg=sample_request_msg,
RID="req_id",
scan_motors=["samx"],
report_instructions=[],
readout_priority={"monitored": ["samx"]},
is_scan=True,
Expand Down Expand Up @@ -240,7 +238,6 @@ def test_available_req_blocks_multiple_blocks(bec_client_mock):
request_block1 = messages.RequestBlock(
msg=request_msg,
RID="test_rid",
scan_motors=["samx"],
report_instructions=[],
readout_priority={"monitored": ["samx"]},
is_scan=True,
Expand All @@ -251,7 +248,6 @@ def test_available_req_blocks_multiple_blocks(bec_client_mock):
request_block2 = messages.RequestBlock(
msg=request_msg,
RID="test_rid",
scan_motors=["samy"],
report_instructions=[],
readout_priority={"monitored": ["samy"]},
is_scan=True,
Expand All @@ -262,7 +258,6 @@ def test_available_req_blocks_multiple_blocks(bec_client_mock):
request_block3 = messages.RequestBlock(
msg=request_msg,
RID="different_rid",
scan_motors=["samz"],
report_instructions=[],
readout_priority={"monitored": ["samz"]},
is_scan=True,
Expand Down
2 changes: 0 additions & 2 deletions bec_lib/bec_lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ class RequestBlock(BaseModel):
Args:
msg (ScanQueueMessage): The original scan queue message containing the request details
RID (str): Request ID associated with the request
scan_motors (list[str]): List of motors involved in the scan
readout_priority (dict[Literal["monitored", "baseline", "async", "continuous", "on_request"], list[str]]): Readout priority for the request
is_scan (bool): True if the request is a scan, False if it is an rpc call
scan_number (int | None): Scan number if applicable
Expand All @@ -345,7 +344,6 @@ class RequestBlock(BaseModel):

msg: ScanQueueMessage
RID: str
scan_motors: list[str]
readout_priority: dict[
Literal["monitored", "baseline", "async", "continuous", "on_request"], list[str]
]
Expand Down
141 changes: 0 additions & 141 deletions bec_server/bec_server/scan_bundler/bluesky_emitter.py

This file was deleted.

8 changes: 1 addition & 7 deletions bec_server/bec_server/scan_bundler/scan_bundler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from bec_lib.logger import bec_logger

from .bec_emitter import BECEmitter
from .bluesky_emitter import BlueskyEmitter

if TYPE_CHECKING:
from bec_lib.redis_connector import RedisConnector
Expand Down Expand Up @@ -46,7 +45,6 @@ def __init__(self, config, connector_cls: type[RedisConnector]) -> None:
self.monitored_devices = {}
self.baseline_devices = {}
self.device_storage = {}
self.scan_motors = {}
self.readout_priority = {}
self.storage_initialized = set()
self.executor = ThreadPoolExecutor(max_workers=4)
Expand All @@ -58,7 +56,7 @@ def __init__(self, config, connector_cls: type[RedisConnector]) -> None:
self.status = messages.BECStatus.RUNNING

def _initialize_emitters(self):
self._emitter = [BECEmitter(self), BlueskyEmitter(self)]
self._emitter = [BECEmitter(self)]

def run_emitter(self, emitter_method: Callable, *args, **kwargs):
for emi in self._emitter:
Expand Down Expand Up @@ -125,8 +123,6 @@ def _initialize_scan_container(self, scan_msg: messages.ScanStatusMessage):

scan_id = scan_msg.content["scan_id"]
scan_info = scan_msg.content["info"]
scan_motors = list(set(self.device_manager.devices[m] for m in scan_info["scan_motors"]))
self.scan_motors[scan_id] = scan_motors
self.readout_priority[scan_id] = scan_info["readout_priority"]
if scan_id not in self.storage_initialized:
self.sync_storage[scan_id] = {"info": scan_info, "status": "open", "sent": set()}
Expand Down Expand Up @@ -363,14 +359,12 @@ def cleanup_storage(self):
"sync_storage",
"monitored_devices",
"baseline_devices",
"scan_motors",
"readout_priority",
]:
try:
getattr(self, storage).pop(scan_id)
except KeyError:
logger.warning(f"Failed to remove {scan_id} from {storage}.")
# self.bluesky_emitter.cleanup_storage(scan_id)
self.run_emitter("on_cleanup", scan_id)
self.storage_initialized.remove(scan_id)

Expand Down
4 changes: 0 additions & 4 deletions bec_server/bec_server/scan_server/scan_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,6 @@ def __init__(
self, msg: messages.ScanQueueMessage, assembler: ScanAssembler, parent: RequestBlockQueue
) -> None:
self.instructions = None
self.scan_motors = []
self.readout_priority: ReadoutPriorities = {}
self.msg = msg
self.RID = msg.metadata["RID"]
Expand All @@ -931,8 +930,6 @@ def _assemble(self):
self.scan_id = str(uuid.uuid4())
self.scan = self.scan_assembler.assemble_device_instructions(self.msg, self.scan_id)
self.instructions = self.scan.run()
if self.scan.caller_args:
self.scan_motors = self.scan.scan_motors
self.readout_priority = self.scan.readout_priority

@property
Expand Down Expand Up @@ -997,7 +994,6 @@ def describe(self) -> messages.RequestBlock:
return messages.RequestBlock(
msg=self.msg,
RID=self.RID,
scan_motors=self.scan_motors,
readout_priority=self.readout_priority,
is_scan=self.is_scan,
scan_number=self.scan_number,
Expand Down
9 changes: 3 additions & 6 deletions bec_server/bec_server/scan_server/scan_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ def _get_result_from_status(self, status: ScanStubStatus) -> Any:
def open_scan(
self,
*,
scan_motors: list,
readout_priority: dict,
num_pos: int,
scan_name: str,
Expand All @@ -443,7 +442,6 @@ def open_scan(
"""Open a new scan. This is typically not used directly but called by the underlying base class.

Args:
scan_motors (list): List of scan motors.
readout_priority (dict): Modification of the readout priority.
num_pos (int): Number of positions within the scope of this scan.
positions (list): List of positions for this scan.
Expand All @@ -459,7 +457,6 @@ def open_scan(
device=None,
action="open_scan",
parameter={
"scan_motors": scan_motors,
"readout_priority": readout_priority,
"num_points": num_pos,
"positions": positions,
Expand Down Expand Up @@ -721,7 +718,7 @@ def read(
*,
device: list[str] | str | None = None,
point_id: int | None = None,
group: Literal["scan_motor", "monitored", None] = None,
group: Literal["monitored", None] = None,
wait: bool = True,
) -> Generator[messages.DeviceInstructionMessage, None, ScanStubStatus]:
"""
Expand All @@ -733,7 +730,7 @@ def read(
device (list[str], str, optional): Device name. Can be a list of devices or a single device. Defaults to None.
point_id (int, optional): point_id to assign this reading to point within the scan. If None, the read will simply update
the cache without assigning the read to a specific point. Defaults to None.
group (Literal["scan_motor", "monitored", None], optional): Device group. Can be used instead of device. Defaults to None.
group (Literal["monitored", None], optional): Device group. Can be used instead of device. Defaults to None.
wait (bool, optional): If True, the read command will wait for the completion of the read operation before returning. Defaults to True.

Returns:
Expand All @@ -751,7 +748,7 @@ def read(
metadata = {"point_id": point_id, "device_instr_id": status._device_instr_id}
self._exclude_nones(parameter)
self._exclude_nones(metadata)
if device is None:
if device is None and group == "monitored":
device = [
dev.root.name
for dev in self._device_manager.devices.monitored_devices(
Expand Down
9 changes: 1 addition & 8 deletions bec_server/bec_server/scan_server/scan_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def __init__(self, *, parent: ScanServer, queue_name: str = "primary"):
self.status = InstructionQueueStatus.IDLE
self.signal_event = threading.Event()
self.scan_id = None
self.scan_motors = []
self.readout_priority = {}
self.scan_type = None
self.current_scan_id: str = ""
Expand All @@ -59,12 +58,7 @@ def open_scan(self, instr: messages.DeviceInstructionMessage) -> None:
"""
if not self.scan_id:
self.scan_id = instr.metadata.get("scan_id")
if instr.content["parameter"].get("scan_motors") is not None:
self.scan_motors = [
self.device_manager.devices[dev]
for dev in instr.content["parameter"].get("scan_motors")
]
self.readout_priority = instr.content["parameter"].get("readout_priority", {})
self.readout_priority = instr.content["parameter"].get("readout_priority", {})
self.scan_type = instr.content["parameter"].get("scan_type")

if not instr.metadata.get("scan_def_id"):
Expand Down Expand Up @@ -536,7 +530,6 @@ def reset(self):
self.scan_id = None
self.interception_msg = None
self.current_instruction_queue_item = None
self.scan_motors = []

def cleanup(self):
"""perform cleanup instructions"""
Expand Down
1 change: 0 additions & 1 deletion bec_server/bec_server/scan_server/scans.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ def open_scan(self):
"""open the scan"""
positions = self.positions if isinstance(self.positions, list) else self.positions.tolist()
yield from self.stubs.open_scan(
scan_motors=self.scan_motors,
readout_priority=self.readout_priority,
num_pos=self.num_pos,
positions=positions,
Expand Down
Loading
Loading