diff --git a/doc/architectural_decisions/009-kafka-streaming.md b/doc/architectural_decisions/009-kafka-streaming.md new file mode 100644 index 00000000..0d91c27e --- /dev/null +++ b/doc/architectural_decisions/009-kafka-streaming.md @@ -0,0 +1,58 @@ +# 9. Kafka streaming + +## Status + +Current + +## Context + +Many facilities stream bluesky documents to an event-bus for consumption by out-of-process listeners. +Event buses used for this purpose at other facilities include ZeroMQ, RabbitMQ, Kafka, Redis, NATS, and +others. + +The capability this provides is that callbacks can be run in different processes or on other computers, +without holding up or interfering with the local `RunEngine`. Other groups at ISIS have expressed some +interest in being able to subscribe to bluesky documents. + +## Decision + +- We will stream our messages to Kafka, as opposed to some other message bus. This is because we already +have Kafka infrastructure available for other purposes (e.g. event data & sample-environment data). +- At the time of writing, we will not **depend** on Kafka for anything critical. This is because the +central Kafka instance is not currently considered "reliable" in an experiment controls context. However, +streaming the documents will allow testing to be done. Kafka will eventually be deployed in a "reliable" +way accessible to each instrument. +- We will encode messages from bluesky using `msgpack` (with the `msgpack-numpy` extension), because: + - It is the default encoder used by the upstream `bluesky-kafka` integration + - It is a schema-less encoder, meaning we do not have to write/maintain fixed schemas for all the +documents allowed by `event-model` + - It has reasonable performance in terms of encoding speed and message size + - `msgpack` is very widely supported in a range of programming languages +- Kafka brokers will be configurable via an environment variable, `IBEX_BLUESKY_CORE_KAFKA_BROKER` + +```{note} +Wherever Kafka is mentioned above, the actual implementation may be a Kafka-like (e.g. RedPanda). +``` + +### Alternatives considered + +Encoding bluesky documents into JSON and then wrapping them in the +[`json_json.fbs` flatbuffers schema](https://github.com/ess-dmsc/streaming-data-types/blob/58793c3dfa060f60b4a933bc085f831744e43f17/schemas/json_json.fbs) +was considered. + +We chose `msgpack` instead of json strings + flatbuffers because: +- It is more standard in the bluesky community (e.g. it is the default used in `bluesky-kafka`) +- Bluesky events will be streamed to a dedicated topic, which is unlikely to be confused with data +using any other schema. + +Performance/storage impacts are unlikely to be noticeable for bluesky documents, but nonetheless: +- `msgpack`-encoded documents are 30-40% smaller than `json` + flatbuffers +for a typical bluesky document +- `msgpack`-encoding messages is ~5x faster than `json` + flatbuffers encoding +for a typical bluesky document. + +## Justification & Consequences + +We will stream bluesky documents to Kafka, encoded using `msgpack-numpy`. + +At the time of writing this is purely to enable testing, and will not be used for "production" workflows. diff --git a/doc/dev/kafka.md b/doc/dev/kafka.md new file mode 100644 index 00000000..71c03939 --- /dev/null +++ b/doc/dev/kafka.md @@ -0,0 +1,15 @@ +# Kafka + +`ibex_bluesky_core` uses [the `bluesky-kafka` library](https://github.com/bluesky/bluesky-kafka) to send documents +emitted by the `RunEngine` to kafka. The kafka callback is automatically added by +{py:obj}`ibex_bluesky_core.run_engine.get_run_engine`, and so no user configuration is required - the callback is always +enabled. + +Documents are encoded using [the `msgpack` format](https://msgpack.org/index.html) - using the `msgpack-numpy` library +to also handle numpy arrays transparently. + +The kafka broker to send to can be controlled using the `IBEX_BLUESKY_CORE_KAFKA_BROKER` environment variable, if +an instrument needs to override the default. The kafka topic will be `_bluesky`, where `INSTRUMENT` is the +instrument name with any NDX or NDH prefix stripped. + +The message key will always be `doc` for bluesky documents; specifying a non-null key enforces message ordering. diff --git a/pyproject.toml b/pyproject.toml index 34d1fb65..2a993fe9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,13 +41,15 @@ classifiers = [ dependencies = [ "bluesky", # Bluesky framework + "bluesky-kafka", # Bluesky-kafka integration "ophyd-async[ca] == 0.12.3", # Device abstraction - "matplotlib", # Plotting "lmfit", # Fitting - "scipy", # Definitions of erf/erfc functions + "matplotlib", # Plotting + "msgpack-numpy", # Encoding kafka messages "numpy", # General array support "orjson", # json module which handles numpy arrays transparently "scipp", # support for arrays with variances/units + "scipy", # Definitions of erf/erfc functions "scippneutron", # neutron-specific utilities for scipp "typing-extensions", # TypeVar with default-arg support "tzdata", # Windows timezone support diff --git a/src/ibex_bluesky_core/log.py b/src/ibex_bluesky_core/log.py index 4db30255..85945870 100644 --- a/src/ibex_bluesky_core/log.py +++ b/src/ibex_bluesky_core/log.py @@ -18,7 +18,7 @@ # Find the log directory, if already set in the environment, else use the default log_location = os.environ.get("IBEX_BLUESKY_CORE_LOGS", DEFAULT_LOG_FOLDER) -INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async"] +INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async", "bluesky_kafka"] @cache diff --git a/src/ibex_bluesky_core/run_engine/__init__.py b/src/ibex_bluesky_core/run_engine/__init__.py index b5a6d68d..077583ea 100644 --- a/src/ibex_bluesky_core/run_engine/__init__.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -3,29 +3,46 @@ import asyncio import functools import logging +import os +import socket from collections.abc import Generator from functools import cache from threading import Event, Lock from typing import Any, cast import bluesky.preprocessors as bpp +import msgpack_numpy from bluesky.run_engine import RunEngine, RunEngineResult from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted +from bluesky_kafka import Publisher from ibex_bluesky_core.callbacks import DocLoggingCallback -from ibex_bluesky_core.preprocessors import add_rb_number_processor - -__all__ = ["get_run_engine", "run_plan"] - - from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY +from ibex_bluesky_core.preprocessors import add_rb_number_processor from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler from ibex_bluesky_core.utils import is_matplotlib_backend_qt from ibex_bluesky_core.version import version +__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"] + logger = logging.getLogger(__name__) +DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092" + + +def get_kafka_topic_name() -> str: + """Get the name of the bluesky kafka topic for this machine.""" + computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper() + computer_name = computer_name.upper() + if computer_name.startswith(("NDX", "NDH")): + name = computer_name[3:] + else: + name = computer_name + + return f"{name}_bluesky" + + class _DuringTask(DuringTask): def block(self, blocking_event: Event) -> None: """On windows, event.wait() on the main thread is not interruptible by a CTRL-C. @@ -103,6 +120,19 @@ def get_run_engine() -> RunEngine: log_callback = DocLoggingCallback() RE.subscribe(log_callback) + kafka_callback = Publisher( + topic=get_kafka_topic_name(), + bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER), + key="doc", + serializer=msgpack_numpy.dumps, + producer_config={ + "enable.idempotence": True, + "log_level": 0, + "log.connection.close": False, + }, + ) + RE.subscribe(kafka_callback) + RE.register_command(CALL_SYNC_MSG_KEY, call_sync_handler) RE.register_command(CALL_QT_AWARE_MSG_KEY, call_qt_aware_handler) diff --git a/tests/test_run_engine.py b/tests/test_run_engine.py index 083aa45d..a0b74d23 100644 --- a/tests/test_run_engine.py +++ b/tests/test_run_engine.py @@ -3,6 +3,7 @@ import threading from collections.abc import Generator from typing import Any +from unittest import mock from unittest.mock import MagicMock import bluesky.plan_stubs as bps @@ -11,7 +12,7 @@ from bluesky.run_engine import RunEngineResult from bluesky.utils import Msg, RequestAbort, RunEngineInterrupted -from ibex_bluesky_core.run_engine import _DuringTask, get_run_engine, run_plan +from ibex_bluesky_core.run_engine import _DuringTask, get_kafka_topic_name, get_run_engine, run_plan from ibex_bluesky_core.version import version @@ -146,3 +147,14 @@ def plan(): result = run_plan(plan()) assert result.plan_result == "happy_path_result" assert result.exit_status == "success" + + +def test_get_kafka_topic_name(): + with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="FOO"): + assert get_kafka_topic_name() == "FOO_bluesky" + + with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDXBAR"): + assert get_kafka_topic_name() == "BAR_bluesky" + + with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDHBAZ"): + assert get_kafka_topic_name() == "BAZ_bluesky"