Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions doc/architectural_decisions/009-kafka-streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# 9. Kafka streaming

## Status

Current

## Context

Many facilities stream bluesky documents to an event-bus for consumption by out-of-process listeners.
Event buses used for this purpose at other facilities include ZeroMQ, RabbitMQ, Kafka, Redis, NATS, and
others.

The capability this provides is that callbacks can be run in different processes or on other computers,
without holding up or interfering with the local `RunEngine`. Other groups at ISIS have expressed some
interest in being able to subscribe to bluesky documents.

## Decision

- We will stream our messages to Kafka, as opposed to some other message bus. This is because we already
have Kafka infrastructure available for other purposes (e.g. event data & sample-environment data).
- At the time of writing, we will not **depend** on Kafka for anything critical. This is because the
central Kafka instance is not currently considered "reliable" in an experiment controls context. However,
streaming the documents will allow testing to be done. Kafka will eventually be deployed in a "reliable"
way accessible to each instrument.
- We will encode messages from bluesky using `msgpack` (with the `msgpack-numpy` extension), because:
- It is the default encoder used by the upstream `bluesky-kafka` integration
- It is a schema-less encoder, meaning we do not have to write/maintain fixed schemas for all the
documents allowed by `event-model`
- It has reasonable performance in terms of encoding speed and message size
- `msgpack` is very widely supported in a range of programming languages
- Kafka brokers will be configurable via an environment variable, `IBEX_BLUESKY_CORE_KAFKA_BROKER`

```{note}
Wherever Kafka is mentioned above, the actual implementation may be a Kafka-like (e.g. RedPanda).
```

### Alternatives considered

Encoding bluesky documents into JSON and then wrapping them in the
[`json_json.fbs` flatbuffers schema](https://github.com/ess-dmsc/streaming-data-types/blob/58793c3dfa060f60b4a933bc085f831744e43f17/schemas/json_json.fbs)
was considered.

We chose `msgpack` instead of json strings + flatbuffers because:
- It is more standard in the bluesky community (e.g. it is the default used in `bluesky-kafka`)
- Bluesky events will be streamed to a dedicated topic, which is unlikely to be confused with data
using any other schema.

Performance/storage impacts are unlikely to be noticeable for bluesky documents, but nonetheless:
- `msgpack`-encoded documents are 30-40% smaller than `json` + flatbuffers
for a typical bluesky document
- `msgpack`-encoding messages is ~5x faster than `json` + flatbuffers encoding
for a typical bluesky document.

## Justification & Consequences

We will stream bluesky documents to Kafka, encoded using `msgpack-numpy`.

At the time of writing this is purely to enable testing, and will not be used for "production" workflows.
15 changes: 15 additions & 0 deletions doc/dev/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Kafka

`ibex_bluesky_core` uses [the `bluesky-kafka` library](https://github.com/bluesky/bluesky-kafka) to send documents
emitted by the `RunEngine` to kafka. The kafka callback is automatically added by
{py:obj}`ibex_bluesky_core.run_engine.get_run_engine`, and so no user configuration is required - the callback is always
enabled.

Documents are encoded using [the `msgpack` format](https://msgpack.org/index.html) - using the `msgpack-numpy` library
to also handle numpy arrays transparently.

The kafka broker to send to can be controlled using the `IBEX_BLUESKY_CORE_KAFKA_BROKER` environment variable, if
an instrument needs to override the default. The kafka topic will be `<INSTRUMENT>_bluesky`, where `INSTRUMENT` is the
instrument name with any NDX or NDH prefix stripped.

The message key will always be `doc` for bluesky documents; specifying a non-null key enforces message ordering.
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ classifiers = [

dependencies = [
"bluesky", # Bluesky framework
"bluesky-kafka", # Bluesky-kafka integration
"ophyd-async[ca] == 0.12.3", # Device abstraction
"matplotlib", # Plotting
"lmfit", # Fitting
"scipy", # Definitions of erf/erfc functions
"matplotlib", # Plotting
"msgpack-numpy", # Encoding kafka messages
"numpy", # General array support
"orjson", # json module which handles numpy arrays transparently
"scipp", # support for arrays with variances/units
"scipy", # Definitions of erf/erfc functions
"scippneutron", # neutron-specific utilities for scipp
"typing-extensions", # TypeVar with default-arg support
"tzdata", # Windows timezone support
Expand Down
2 changes: 1 addition & 1 deletion src/ibex_bluesky_core/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# Find the log directory, if already set in the environment, else use the default
log_location = os.environ.get("IBEX_BLUESKY_CORE_LOGS", DEFAULT_LOG_FOLDER)

INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async"]
INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async", "bluesky_kafka"]


@cache
Expand Down
40 changes: 35 additions & 5 deletions src/ibex_bluesky_core/run_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,46 @@
import asyncio
import functools
import logging
import os
import socket
from collections.abc import Generator
from functools import cache
from threading import Event, Lock
from typing import Any, cast

import bluesky.preprocessors as bpp
import msgpack_numpy
from bluesky.run_engine import RunEngine, RunEngineResult
from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted
from bluesky_kafka import Publisher

from ibex_bluesky_core.callbacks import DocLoggingCallback
from ibex_bluesky_core.preprocessors import add_rb_number_processor

__all__ = ["get_run_engine", "run_plan"]


from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY
from ibex_bluesky_core.preprocessors import add_rb_number_processor
from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler
from ibex_bluesky_core.utils import is_matplotlib_backend_qt
from ibex_bluesky_core.version import version

__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"]

logger = logging.getLogger(__name__)


DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092"


def get_kafka_topic_name() -> str:
"""Get the name of the bluesky kafka topic for this machine."""
computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper()
computer_name = computer_name.upper()
if computer_name.startswith(("NDX", "NDH")):
name = computer_name[3:]
else:
name = computer_name

return f"{name}_bluesky"


class _DuringTask(DuringTask):
def block(self, blocking_event: Event) -> None:
"""On windows, event.wait() on the main thread is not interruptible by a CTRL-C.
Expand Down Expand Up @@ -103,6 +120,19 @@ def get_run_engine() -> RunEngine:
log_callback = DocLoggingCallback()
RE.subscribe(log_callback)

kafka_callback = Publisher(
topic=get_kafka_topic_name(),
bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER),
key="doc",
serializer=msgpack_numpy.dumps,
producer_config={
"enable.idempotence": True,
"log_level": 0,
"log.connection.close": False,
},
)
RE.subscribe(kafka_callback)

RE.register_command(CALL_SYNC_MSG_KEY, call_sync_handler)
RE.register_command(CALL_QT_AWARE_MSG_KEY, call_qt_aware_handler)

Expand Down
14 changes: 13 additions & 1 deletion tests/test_run_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
from collections.abc import Generator
from typing import Any
from unittest import mock
from unittest.mock import MagicMock

import bluesky.plan_stubs as bps
Expand All @@ -11,7 +12,7 @@
from bluesky.run_engine import RunEngineResult
from bluesky.utils import Msg, RequestAbort, RunEngineInterrupted

from ibex_bluesky_core.run_engine import _DuringTask, get_run_engine, run_plan
from ibex_bluesky_core.run_engine import _DuringTask, get_kafka_topic_name, get_run_engine, run_plan
from ibex_bluesky_core.version import version


Expand Down Expand Up @@ -146,3 +147,14 @@ def plan():
result = run_plan(plan())
assert result.plan_result == "happy_path_result"
assert result.exit_status == "success"


def test_get_kafka_topic_name():
with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="FOO"):
assert get_kafka_topic_name() == "FOO_bluesky"

with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDXBAR"):
assert get_kafka_topic_name() == "BAR_bluesky"

with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDHBAZ"):
assert get_kafka_topic_name() == "BAZ_bluesky"