Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/snippets/static02.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ class TemperatureController(Controller):


fastcs = FastCS(TemperatureController(), [])
fastcs.run()
# fastcs.run() # Commented as this will block
2 changes: 1 addition & 1 deletion docs/snippets/static03.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ class TemperatureController(Controller):


fastcs = FastCS(TemperatureController(), [])
fastcs.run()
# fastcs.run() # Commented as this will block
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ dependencies = [
"pytango",
"softioc>=4.5.0",
"strawberry-graphql",
"p4p"
"p4p",
"IPython",
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
53 changes: 52 additions & 1 deletion src/fastcs/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import inspect
import json
import signal
from collections.abc import Coroutine
from functools import partial
from pathlib import Path
from typing import Annotated, Any, Optional, TypeAlias, get_type_hints

import typer
from IPython.terminal.embed import InteractiveShellEmbed
from pydantic import BaseModel, create_model
from ruamel.yaml import YAML

Expand Down Expand Up @@ -36,6 +39,7 @@ def __init__(
transport_options: TransportOptions,
):
self._loop = asyncio.get_event_loop()
self._controller = controller
self._backend = Backend(controller, self._loop)
transport: TransportAdapter
self._transports: list[TransportAdapter] = []
Expand Down Expand Up @@ -100,12 +104,59 @@ def run(self):

async def serve(self) -> None:
coros = [self._backend.serve()]
coros.extend([transport.serve() for transport in self._transports])
context = {
"controller": self._controller,
"controller_api": self._backend.controller_api,
"transports": [
transport.__class__.__name__ for transport in self._transports
],
}

for transport in self._transports:
coros.append(transport.serve())
common_context = context.keys() & transport.context.keys()
if common_context:
raise RuntimeError(
"Duplicate context keys found between "
f"current context { ({k: context[k] for k in common_context}) } "
f"and {transport.__class__.__name__} context: "
f"{ ({k: transport.context[k] for k in common_context}) }"
)
context.update(transport.context)

coros.append(self._interactive_shell(context))

try:
await asyncio.gather(*coros)
except asyncio.CancelledError:
pass

async def _interactive_shell(self, context: dict[str, Any]):
"""Spawn interactive shell in another thread and wait for it to complete."""

def run(coro: Coroutine[None, None, None]):
"""Run coroutine on FastCS event loop from IPython thread."""

def wrapper():
asyncio.create_task(coro)

self._loop.call_soon_threadsafe(wrapper)

async def interactive_shell(
context: dict[str, object], stop_event: asyncio.Event
):
"""Run interactive shell in a new thread."""
shell = InteractiveShellEmbed()
await asyncio.to_thread(partial(shell.mainloop, local_ns=context))

stop_event.set()

context["run"] = run

stop_event = asyncio.Event()
self._loop.create_task(interactive_shell(context, stop_event))
await stop_event.wait()


def launch(
controller_class: type[Controller],
Expand Down
4 changes: 4 additions & 0 deletions src/fastcs/transport/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ def create_docs(self) -> None:
@abstractmethod
def create_gui(self) -> None:
pass

@property
def context(self) -> dict[str, Any]:
return {}
2 changes: 0 additions & 2 deletions src/fastcs/transport/epics/ca/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,3 @@ def create_gui(self) -> None:
async def serve(self) -> None:
print(f"Running FastCS IOC: {self._pv_prefix}")
self._ioc.run(self._loop)
while True:
await asyncio.sleep(1)
4 changes: 4 additions & 0 deletions tests/example_p4p_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async def d(self):
print("D: RUNNING")
await asyncio.sleep(0.1)
print("D: FINISHED")
await self.j.set(self.j.get() + 1)

e: AttrR = AttrR(Bool())

Expand All @@ -67,6 +68,9 @@ async def i(self):
else:
self.fail_on_next_e = True
print("I: FINISHED")
await self.j.set(self.j.get() + 1)

j: AttrR = AttrR(Int())


def run(pv_prefix="P4P_TEST_DEVICE"):
Expand Down
14 changes: 14 additions & 0 deletions tests/test_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,17 @@ def test_get_schema(data):

ref_schema = YAML(typ="safe").load(data / "schema.json")
assert target_schema == ref_schema


def test_error_if_identical_context_in_transports(mocker: MockerFixture, data):
mocker.patch("fastcs.launch.FastCS.create_gui")
mocker.patch("fastcs.launch.FastCS.create_docs")
mocker.patch(
"fastcs.transport.adapter.TransportAdapter.context",
new_callable=mocker.PropertyMock,
return_value={"controller": "test"},
)
app = _launch(IsHinted)
result = runner.invoke(app, ["run", str(data / "config.yaml")])
assert isinstance(result.exception, RuntimeError)
assert "Duplicate context keys found" in result.exception.args[0]
33 changes: 14 additions & 19 deletions tests/transport/epics/pva/test_p4p.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ async def test_ioc(p4p_subprocess: tuple[str, Queue]):
"g": {"rw": f"{pv_prefix}:Child1:G"},
"h": {"rw": f"{pv_prefix}:Child1:H"},
"i": {"x": f"{pv_prefix}:Child1:I"},
"j": {"r": f"{pv_prefix}:Child1:J"},
}


Expand Down Expand Up @@ -104,31 +105,29 @@ async def test_scan_method(p4p_subprocess: tuple[str, Queue]):

@pytest.mark.asyncio
async def test_command_method(p4p_subprocess: tuple[str, Queue]):
QUEUE_TIMEOUT = 1
pv_prefix, stdout_queue = p4p_subprocess
pv_prefix, _ = p4p_subprocess
d_values = asyncio.Queue()
i_values = asyncio.Queue()
j_values = asyncio.Queue()
ctxt = Context("pva")

d_monitor = ctxt.monitor(f"{pv_prefix}:Child1:D", d_values.put)
i_monitor = ctxt.monitor(f"{pv_prefix}:Child1:I", i_values.put)
j_monitor = ctxt.monitor(f"{pv_prefix}:Child1:J", j_values.put)

try:
if not stdout_queue.empty():
raise RuntimeError("stdout_queue not empty", stdout_queue.get())
j_initial_value = await j_values.get()
assert (await d_values.get()).raw.value is False
await ctxt.put(f"{pv_prefix}:Child1:D", True)
assert (await d_values.get()).raw.value is True
# D process hangs for 0.1s, so we wait slightly longer
await asyncio.sleep(0.2)
# Value returns to False, signifying completed process
assert (await d_values.get()).raw.value is False

assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "D: RUNNING"
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "D: FINISHED"
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
# D process increments J by 1
assert (await j_values.get()).raw.value == j_initial_value + 1

# First run fails
assert stdout_queue.empty()
before_command_value = (await i_values.get()).raw
assert before_command_value["value"] is False
assert before_command_value["alarm"]["severity"] == 0
Expand All @@ -143,30 +142,26 @@ async def test_command_method(p4p_subprocess: tuple[str, Queue]):
assert (
after_command_value["alarm"]["message"] == "I: FAILED WITH THIS WEIRD ERROR"
)
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: RUNNING"
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
# Failed I process does not increment J
assert j_values.empty()

# Second run succeeds
assert stdout_queue.empty()
await ctxt.put(f"{pv_prefix}:Child1:I", True)
assert (await i_values.get()).raw.value is True
await asyncio.sleep(0.2)
after_command_value = (await i_values.get()).raw
# Successful I process increments J by 1
assert (await j_values.get()).raw.value == j_initial_value + 2

# On the second run the command succeeded so we left the error state
assert after_command_value["value"] is False
assert after_command_value["alarm"]["severity"] == 0
assert after_command_value["alarm"]["message"] == ""

assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: RUNNING"
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: FINISHED"
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
assert stdout_queue.empty()

finally:
d_monitor.close()
i_monitor.close()
j_monitor.close()


@pytest.mark.asyncio
Expand Down
Loading