diff --git a/bootstrap/bootstrap_pi.sh b/bootstrap/bootstrap_pi.sh index 0d28dc2..298a011 100755 --- a/bootstrap/bootstrap_pi.sh +++ b/bootstrap/bootstrap_pi.sh @@ -44,6 +44,8 @@ else echo "Run manually: sudo ${SCRIPT_DIR}/install_publisher.sh $REPO_DIR " fi +"${SCRIPT_DIR}/install_led_service.sh" "$REPO_DIR" + echo "" echo "=== Bootstrap complete for ${BENCH_NAME} ===" echo "Config: /etc/hil-bench/config.yaml" diff --git a/bootstrap/install_led_service.sh b/bootstrap/install_led_service.sh new file mode 100755 index 0000000..0fb4dc6 --- /dev/null +++ b/bootstrap/install_led_service.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +# Install the LED daemon service. +# Idempotent — safe to re-run. +# +# Usage: sudo ./install_led_service.sh +set -euo pipefail + +REPO_DIR="${1:?Usage: $0 }" +VENV="/opt/hil-bench/venv" +SYSTEMD_DST="/etc/systemd/system" + +echo "--- Installing LED service ---" + +# Install rpi_ws281x into venv +if [[ -d "$VENV" ]]; then + echo "Installing rpi_ws281x package..." + "${VENV}/bin/pip" install --quiet "rpi_ws281x>=5.0" +else + echo "WARNING: venv not found at $VENV — run install_python_env.sh first" +fi + +# Install systemd unit +UNIT_SRC="${REPO_DIR}/systemd/hil-bench-led.service" +if [[ -f "$UNIT_SRC" ]]; then + cp "$UNIT_SRC" "${SYSTEMD_DST}/hil-bench-led.service" + systemctl daemon-reload + systemctl enable hil-bench-led.service + echo "Installed and enabled: ${SYSTEMD_DST}/hil-bench-led.service" +else + echo "WARNING: Service file not found: $UNIT_SRC" +fi + +echo "--- LED service install done ---" +echo "Start with: sudo systemctl start hil-bench-led" diff --git a/bootstrap/update.sh b/bootstrap/update.sh index 4eacf11..7088c0b 100755 --- a/bootstrap/update.sh +++ b/bootstrap/update.sh @@ -51,7 +51,17 @@ echo "--- Refreshing health timer ---" echo "--- Refreshing udev rules ---" "${SCRIPT_DIR}/install_udev_rules.sh" "$REPO_DIR" -# ── 6. Done ────────────────────────────────────────────────────────────── +# ── 6. Refresh LED service ─────────────────────────────────────────── + +LED_UNIT_SRC="${REPO_DIR}/systemd/hil-bench-led.service" +if [[ -f "$LED_UNIT_SRC" ]]; then + echo "--- Refreshing LED service ---" + cp "$LED_UNIT_SRC" /etc/systemd/system/hil-bench-led.service + systemctl daemon-reload + echo "LED service unit refreshed" +fi + +# ── 7. Done ────────────────────────────────────────────────────────────── echo "" echo "=== Update complete ===" diff --git a/configs/bench-config.example.yaml b/configs/bench-config.example.yaml index a9749b2..bb46323 100644 --- a/configs/bench-config.example.yaml +++ b/configs/bench-config.example.yaml @@ -30,3 +30,9 @@ paths: wiki: canonical_url: https://wiki.aharoni-lab.com/HIL/Bench/aharoni-samd51-bench-01 + +led: + enabled: true + led_count: 16 + gpio_pin: 18 + brightness: 128 diff --git a/configs/config.template.yaml b/configs/config.template.yaml index 928043b..f4a1e2e 100644 --- a/configs/config.template.yaml +++ b/configs/config.template.yaml @@ -58,3 +58,11 @@ paths: wiki: canonical_url: null # optional wiki link + +led: + enabled: false # set to true to enable WS2812B LED strip + led_count: 16 # number of LEDs on the strip + gpio_pin: 18 # GPIO pin (PWM0) + brightness: 128 # default brightness (0-255) + fps: 30 # animation frame rate + socket_path: /run/hil-bench/led.sock diff --git a/docs/led-status-display.md b/docs/led-status-display.md new file mode 100644 index 0000000..c487b47 --- /dev/null +++ b/docs/led-status-display.md @@ -0,0 +1,322 @@ +# RGB LED Status Display + +The HIL bench controller supports a WS2812B (NeoPixel) addressable LED strip as a visual status indicator. Operators can see bench state at a glance — idle, flashing firmware, running tests, or in an error state. + +## Architecture + +``` +benchctl led set-scene ... ──┐ +Publisher hooks (flash/health)──┼── LedClient ── Unix socket ── LED Daemon ── rpi_ws281x +GitHub Actions steps ──┘ /run/hil-bench/led.sock +``` + +- **Daemon** (`hil-bench-led.service`) owns the LED hardware exclusively, runs a 30 FPS animation loop, and listens on a Unix domain socket for commands. +- **Clients** send JSON commands over the socket. Any process on the Pi can trigger scene changes. +- **Scenes** are self-contained animation classes that produce frames. All animations are time-based (not frame-count-based) so they look consistent at any FPS. + +## Quick Start + +### 1. Enable in config + +Add to `/etc/hil-bench/config.yaml`: + +```yaml +led: + enabled: true + led_count: 16 # number of LEDs on your strip + gpio_pin: 18 # GPIO pin (PWM0 — default for NeoPixels) + brightness: 128 # 0-255 + fps: 30 # animation frame rate + socket_path: /run/hil-bench/led.sock +``` + +### 2. Install the hardware dependency + +```bash +sudo /opt/hil-bench/venv/bin/pip install "rpi_ws281x>=5.0" +``` + +Or during bootstrap, this is handled by `install_led_service.sh`. + +### 3. Start the daemon + +```bash +# Via systemd (production) +sudo systemctl start hil-bench-led +sudo systemctl enable hil-bench-led + +# Or directly (for testing) +benchctl led daemon # real hardware +benchctl led daemon --stub # no hardware, in-memory strip +``` + +### 4. Control the LEDs + +```bash +# Set a scene +benchctl led set-scene idle +benchctl led set-scene rainbow --speed 2.0 +benchctl led set-scene solid --color 255,0,0 --brightness 200 +benchctl led set-scene progress --percent 75 --color 0,255,0 + +# Turn off +benchctl led off + +# Check status +benchctl led status +benchctl led list-scenes +``` + +## Built-in Scenes + +| Scene | Description | Parameters | +|-------|-------------|------------| +| `idle` | Slow sine-wave breathing, dim↔bright | `color` (default: blue), `speed` | +| `flashing` | Knight-rider sweep back and forth | `color` (default: orange), `speed` | +| `testing` | Chase pattern, segments moving along strip | `color` (default: cyan), `speed` | +| `error` | Red pulsing/flashing alert | `color` (default: red), `speed` | +| `success` | Green burst, auto-reverts to idle | `color`, `duration_ms`, `revert_to` | +| `booting` | Sequential fill from one end | `color` (default: blue), `speed` | +| `off` | All pixels off | — | +| `solid` | Static single color | `color`, `brightness` | +| `rainbow` | HSV rainbow cycling across strip | `speed` | +| `progress` | LED bar graph proportional to percent | `color`, `bg_color`, `percent` | + +### Parameters + +- **`color`**: RGB tuple as `R,G,B` on CLI or `[R, G, B]` in JSON/Python (e.g., `255,0,0` for red) +- **`speed`**: Animation speed multiplier (default varies per scene) +- **`brightness`**: Strip brightness 0-255 +- **`percent`**: For `progress` scene, 0-100 +- **`duration_ms`**: For `success` scene, how long before auto-reverting +- **`revert_to`**: For `success` scene, which scene to switch to after duration (default: `idle`) + +## Automatic Scene Changes + +The publisher hooks automatically trigger LED scenes during bench operations: + +| Event | Scene | +|-------|-------| +| Flash starts | `flashing` | +| Flash succeeds | `success` (auto-reverts to `idle`) | +| Flash fails | `error` | +| Health check passes | `idle` | +| Health check fails | `error` | +| Daemon starts | `booting` → `idle` (after 2s) | + +These are **best-effort** — if the LED daemon isn't running, hooks silently continue without error. + +## Using the Python Client + +From any Python code on the Pi: + +```python +from hilbench.led import LedClient + +client = LedClient("/run/hil-bench/led.sock") + +# Check if daemon is running +if client.is_daemon_running(): + # Set a scene + resp = client.set_scene("rainbow", {"speed": 2.0}) + print(resp.ok, resp.current_scene) + + # Get status + status = client.status() + print(f"Scene: {status.current_scene}, LEDs: {status.led_count}") + + # List available scenes + scenes = client.list_scenes() + + # Turn off + client.off() +``` + +## Using from GitHub Actions Workflows + +In your workflow steps, use `benchctl` to signal bench state: + +```yaml +steps: + - name: Signal test start + run: benchctl led set-scene testing --color 0,200,255 + + - name: Run tests + run: pytest tests/ -v + + - name: Signal result + if: always() + run: | + if [ "${{ job.status }}" = "success" ]; then + benchctl led set-scene success + else + benchctl led set-scene error + fi +``` + +## Socket Protocol + +The daemon listens on a Unix domain socket. Commands are newline-delimited JSON: + +```json +// Set scene +→ {"command": "set_scene", "scene": "idle", "params": {"color": [0, 255, 0], "speed": 0.5}} +← {"ok": true, "current_scene": "idle"} + +// Get status +→ {"command": "status"} +← {"running": true, "current_scene": "idle", "led_count": 16, "uptime_s": 42.5} + +// List scenes +→ {"command": "list_scenes"} +← {"ok": true, "scenes": ["booting", "error", "flashing", ...]} +``` + +Any language can communicate with the daemon — just open a Unix socket, send JSON, read the response. + +## Adding a Custom Scene + +### 1. Create the scene class + +Add your scene to `src/hilbench/led/_scenes.py`: + +```python +@register_scene +class MyCustomScene: + """Description of your animation.""" + + @property + def name(self) -> str: + return "my_custom" # This is the name used in CLI and IPC + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + """Called once when the scene is activated. Parse params here.""" + self._color = _parse_color(params, (255, 255, 255)) # default white + self._speed = float(params.get("speed", 1.0)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + """Called every frame. elapsed_ms is time since scene started. + + IMPORTANT: Use elapsed_ms for timing, not frame counts. + Use strip.num_pixels to adapt to any LED count. + """ + n = strip.num_pixels + t = elapsed_ms / 1000.0 # convert to seconds + + for i in range(n): + # Your animation logic here + # Use strip.set_pixel(i, r, g, b) for each LED + # Or strip.set_all(r, g, b) for uniform color + pass + + def teardown(self, strip: LedStrip) -> None: + """Called when switching away from this scene. Optional cleanup.""" + pass +``` + +### 2. Key rules for scenes + +- **Use `@register_scene` decorator** — this automatically registers the scene by name. +- **Time-based animations** — use `elapsed_ms` for all timing. Never count frames. +- **Adapt to strip length** — use `strip.num_pixels`, never hardcode LED count. +- **Use `_parse_color()`** — helper that handles `[R,G,B]` lists, `{"r":R,"g":G,"b":B}` dicts, and defaults. +- **Use `_clamp()`** — helper to keep values in 0-255 range. +- **Keep `tick()` fast** — it runs at 30 FPS. Avoid I/O or heavy computation. + +### 3. Auto-reverting scenes + +If your scene should auto-revert (like `success`), add these properties: + +```python +@property +def should_revert(self) -> bool: + return self._done # True when it's time to switch + +@property +def revert_scene(self) -> str: + return self._revert_to # Scene name to switch to +``` + +The daemon checks for `should_revert` on `SuccessScene` instances each frame. To support this on a custom scene, the daemon's animation loop would need to be updated to check your scene type too — or you can subclass `SuccessScene`. + +### 4. Test your scene + +Add tests in `tests/test_led_scenes.py` using `StubStrip`: + +```python +class TestMyCustomScene: + def test_basic(self) -> None: + strip = StubStrip(8) + scene = MyCustomScene() + scene.setup(strip, {}) + + # Tick at t=0 + scene.tick(strip, 0.0) + # Assert something about strip.pixels + assert any(p != (0, 0, 0) for p in strip.pixels) + + # Tick at t=500ms + scene.tick(strip, 500.0) + # Assert animation progressed + + scene.teardown(strip) +``` + +### 5. Verify + +```bash +# Check it appears in the list +benchctl led list-scenes + +# Run the tests +pytest tests/test_led_scenes.py -v + +# Lint +ruff check src/hilbench/led/_scenes.py +ruff format src/hilbench/led/_scenes.py +``` + +## Hardware Wiring + +Connect the WS2812B strip to the Raspberry Pi: + +| Strip Wire | Pi Pin | Notes | +|-----------|--------|-------| +| 5V (red) | Pin 2 or 4 (5V) | Or external 5V supply for long strips | +| GND (white/black) | Pin 6 (GND) | Common ground with Pi | +| DIN (green) | Pin 12 (GPIO 18) | PWM0 — default `gpio_pin` | + +For strips longer than ~30 LEDs, use an external 5V power supply (the Pi's 5V rail can only provide ~1A). Always connect the ground of the external supply to the Pi's ground. + +## Troubleshooting + +**Daemon won't start**: Check `journalctl -u hil-bench-led` for errors. Common issues: +- `rpi_ws281x` not installed: `pip install rpi_ws281x` +- Not running as root: the daemon needs root for GPIO access +- Socket directory doesn't exist: `RuntimeDirectory=hil-bench` in the systemd unit creates it + +**LEDs not lighting up**: +- Check wiring, especially GND connection +- Try `benchctl led daemon --stub` to verify the daemon runs without hardware +- Check brightness isn't 0 in config + +**Health check shows `led_daemon: not reachable`**: +- Is the daemon running? `systemctl status hil-bench-led` +- Is `led.enabled` set to `true` in config? (When `false`, health check auto-passes) + +**Permission denied on socket**: +- The daemon runs as root and creates the socket. Clients need to be able to connect — this works by default since the socket is in `/run/hil-bench/` which is created by systemd's `RuntimeDirectory`. + +## File Reference + +| File | Purpose | +|------|---------| +| `src/hilbench/led/__init__.py` | Package init, re-exports | +| `src/hilbench/led/_models.py` | Pydantic IPC models | +| `src/hilbench/led/_strip.py` | LedStrip Protocol + Ws281xStrip + StubStrip | +| `src/hilbench/led/_scenes.py` | Scene Protocol, registry, built-in scenes | +| `src/hilbench/led/_daemon.py` | Animation loop + Unix socket server | +| `src/hilbench/led/_client.py` | Client library for IPC | +| `src/hilbench/cli/led_cmd.py` | CLI commands | +| `systemd/hil-bench-led.service` | Systemd unit file | +| `bootstrap/install_led_service.sh` | Installation script | diff --git a/pyproject.toml b/pyproject.toml index 747c117..d496ba2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ [project.optional-dependencies] publisher = ["supabase>=2.11"] +led = ["rpi_ws281x>=5.0"] dev = [ "pytest>=8.0", "pytest-cov>=5.0", @@ -53,7 +54,7 @@ warn_return_any = true warn_unused_configs = true [[tool.mypy.overrides]] -module = ["gpiod.*", "serial.*"] +module = ["gpiod.*", "serial.*", "rpi_ws281x.*"] ignore_missing_imports = true [[tool.mypy.overrides]] diff --git a/src/hilbench/cli/led_cmd.py b/src/hilbench/cli/led_cmd.py new file mode 100644 index 0000000..3526723 --- /dev/null +++ b/src/hilbench/cli/led_cmd.py @@ -0,0 +1,145 @@ +"""LED subcommands: set-scene, off, list-scenes, status, daemon.""" + +from __future__ import annotations + +import click +from rich.console import Console + + +@click.group() +def led() -> None: + """Addressable RGB LED strip control.""" + + +@led.command("set-scene") +@click.argument("name") +@click.option("--color", "-c", default=None, help="RGB color as R,G,B (e.g. 255,0,0).") +@click.option("--speed", "-s", type=float, default=None, help="Animation speed multiplier.") +@click.option("--brightness", "-b", type=int, default=None, help="Strip brightness (0-255).") +@click.option("--percent", "-p", type=float, default=None, help="Progress percent (0-100).") +@click.pass_obj +def set_scene( + ctx: object, + name: str, + color: str | None, + speed: float | None, + brightness: int | None, + percent: float | None, +) -> None: + """Set the active LED scene.""" + from hilbench.cli.main import Context + + assert isinstance(ctx, Context) + console = Console() + + params: dict[str, object] = {} + if color is not None: + parts = color.split(",") + if len(parts) == 3: + params["color"] = [int(p.strip()) for p in parts] + if speed is not None: + params["speed"] = speed + if brightness is not None: + params["brightness"] = brightness + if percent is not None: + params["percent"] = percent + + if ctx.dry_run: + console.print(f"[yellow]dry-run:[/yellow] would set LED scene to {name!r} params={params}") + return + + from hilbench.led import LedClient + + client = LedClient(ctx.config.led.socket_path) + resp = client.set_scene(name, params) + if resp.ok: + console.print(f"LED scene → [bold]{resp.current_scene}[/bold]") + else: + console.print(f"[red]Error:[/red] {resp.error}") + raise SystemExit(1) + + +@led.command() +@click.pass_obj +def off(ctx: object) -> None: + """Turn off all LEDs.""" + from hilbench.cli.main import Context + + assert isinstance(ctx, Context) + console = Console() + + if ctx.dry_run: + console.print("[yellow]dry-run:[/yellow] would turn off LEDs") + return + + from hilbench.led import LedClient + + client = LedClient(ctx.config.led.socket_path) + resp = client.off() + if resp.ok: + console.print("LEDs off") + else: + console.print(f"[red]Error:[/red] {resp.error}") + raise SystemExit(1) + + +@led.command("list-scenes") +def list_scenes_cmd() -> None: + """List available LED scenes.""" + from hilbench.led._scenes import list_scenes + + console = Console() + scenes = list_scenes() + for s in scenes: + console.print(f" {s}") + + +@led.command() +@click.pass_obj +def status(ctx: object) -> None: + """Show LED daemon status.""" + from hilbench.cli.main import Context + + assert isinstance(ctx, Context) + console = Console() + + from hilbench.led import LedClient + + client = LedClient(ctx.config.led.socket_path) + if not client.is_daemon_running(): + console.print("[yellow]LED daemon is not running[/yellow]") + return + + st = client.status() + console.print(f"Running: {st.running}") + console.print(f"Scene: {st.current_scene}") + console.print(f"LEDs: {st.led_count}") + console.print(f"Uptime: {st.uptime_s:.0f}s") + + +@led.command() +@click.option("--stub", is_flag=True, help="Use stub strip (no hardware).") +@click.pass_obj +def daemon(ctx: object, stub: bool) -> None: + """Start the LED animation daemon (for systemd ExecStart).""" + from hilbench.cli.main import Context + + assert isinstance(ctx, Context) + console = Console() + cfg = ctx.config.led + + if ctx.dry_run: + console.print("[yellow]dry-run:[/yellow] would start LED daemon") + return + + from hilbench.led._daemon import LedDaemon + + d = LedDaemon( + led_count=cfg.led_count, + gpio_pin=cfg.gpio_pin, + brightness=cfg.brightness, + fps=cfg.fps, + socket_path=str(cfg.socket_path), + use_stub=stub, + ) + d.run() diff --git a/src/hilbench/cli/main.py b/src/hilbench/cli/main.py index ff8e04f..2d859e8 100644 --- a/src/hilbench/cli/main.py +++ b/src/hilbench/cli/main.py @@ -58,6 +58,7 @@ def cli(ctx: click.Context, config_path: str | None, verbose: bool, dry_run: boo from hilbench.cli.flash_cmd import flash # noqa: E402 from hilbench.cli.gpio_cmd import gpio # noqa: E402 from hilbench.cli.health_cmd import health # noqa: E402 +from hilbench.cli.led_cmd import led # noqa: E402 from hilbench.cli.publish_cmd import publish # noqa: E402 from hilbench.cli.serial_cmd import serial # noqa: E402 @@ -66,4 +67,5 @@ def cli(ctx: click.Context, config_path: str | None, verbose: bool, dry_run: boo cli.add_command(serial) cli.add_command(gpio) cli.add_command(health) +cli.add_command(led) cli.add_command(publish) diff --git a/src/hilbench/config.py b/src/hilbench/config.py index 6b256d6..b6176ab 100644 --- a/src/hilbench/config.py +++ b/src/hilbench/config.py @@ -76,6 +76,15 @@ class WikiConfig(BaseModel): canonical_url: str | None = None +class LedConfig(BaseModel): + enabled: bool = False + led_count: int = Field(default=16, ge=1, le=1000) + gpio_pin: int = Field(default=18, ge=0) + brightness: int = Field(default=128, ge=0, le=255) + fps: int = Field(default=30, ge=1, le=120) + socket_path: Path = Path("/run/hil-bench/led.sock") + + # ── Root model ────────────────────────────────────────────────────────────── @@ -86,6 +95,7 @@ class BenchConfig(BaseModel): targets: dict[str, TargetConfig] paths: PathsConfig = PathsConfig() wiki: WikiConfig = WikiConfig() + led: LedConfig = LedConfig() def get_target(self, name: str | None = None) -> tuple[str, TargetConfig]: """Return (name, config) for the named target, or the only target if name is None.""" diff --git a/src/hilbench/exceptions.py b/src/hilbench/exceptions.py index 2f1e79e..aa8e55c 100644 --- a/src/hilbench/exceptions.py +++ b/src/hilbench/exceptions.py @@ -31,3 +31,7 @@ class HealthCheckError(HilBenchError): class ArtifactError(HilBenchError): """Firmware artifact not found or invalid.""" + + +class LedError(HilBenchError): + """LED strip or daemon error.""" diff --git a/src/hilbench/health.py b/src/hilbench/health.py index c9ea04d..8ebf644 100644 --- a/src/hilbench/health.py +++ b/src/hilbench/health.py @@ -98,12 +98,31 @@ def check_runner_service() -> CheckResult: ) +def check_led_daemon(config: BenchConfig) -> CheckResult: + """Check if the LED daemon is reachable (only when LED is enabled).""" + if not config.led.enabled: + return CheckResult(name="led_daemon", passed=True, detail="disabled") + try: + from hilbench.led import LedClient + + client = LedClient(config.led.socket_path) + running = client.is_daemon_running() + except Exception: + running = False + return CheckResult( + name="led_daemon", + passed=running, + detail="running" if running else "not reachable", + ) + + _CHECK_RUNNERS: dict[str, Callable[[BenchConfig], list[CheckResult]]] = { "config": lambda cfg: [check_config(cfg)], "probe": check_probe, "serial": check_serial, "gpio_chip": lambda cfg: [check_gpio_chip()], "runner_service": lambda cfg: [check_runner_service()], + "led_daemon": lambda cfg: [check_led_daemon(cfg)], } CHECK_CATEGORIES: list[str] = list(_CHECK_RUNNERS.keys()) diff --git a/src/hilbench/led/__init__.py b/src/hilbench/led/__init__.py new file mode 100644 index 0000000..7085f8f --- /dev/null +++ b/src/hilbench/led/__init__.py @@ -0,0 +1,8 @@ +"""Addressable RGB LED status display system.""" + +from __future__ import annotations + +from hilbench.led._client import LedClient +from hilbench.led._models import DaemonStatus, LedColor, SceneRequest, SceneResponse + +__all__ = ["DaemonStatus", "LedClient", "LedColor", "SceneRequest", "SceneResponse"] diff --git a/src/hilbench/led/_client.py b/src/hilbench/led/_client.py new file mode 100644 index 0000000..81caf8a --- /dev/null +++ b/src/hilbench/led/_client.py @@ -0,0 +1,69 @@ +"""Client library for communicating with the LED daemon.""" + +from __future__ import annotations + +import json +import logging +import socket +from typing import TYPE_CHECKING + +from hilbench.led._models import DaemonStatus, SceneRequest, SceneResponse + +if TYPE_CHECKING: + from pathlib import Path + +logger = logging.getLogger(__name__) + + +class LedClient: + """Short-lived Unix socket client for the LED daemon.""" + + def __init__(self, socket_path: str | Path = "/run/hil-bench/led.sock") -> None: + self._socket_path = str(socket_path) + + def _send(self, request: SceneRequest) -> str: + """Send a request and return the raw JSON response.""" + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(5.0) + try: + sock.connect(self._socket_path) + sock.sendall((request.model_dump_json() + "\n").encode()) + data = sock.recv(4096) + return data.decode().strip() + finally: + sock.close() + + def set_scene( + self, + scene: str, + params: dict[str, object] | None = None, + ) -> SceneResponse: + """Set the active scene on the daemon.""" + req = SceneRequest(command="set_scene", scene=scene, params=params or {}) + raw = self._send(req) + return SceneResponse.model_validate_json(raw) + + def off(self) -> SceneResponse: + """Turn off all LEDs.""" + return self.set_scene("off") + + def status(self) -> DaemonStatus: + """Get daemon status.""" + req = SceneRequest(command="status") + raw = self._send(req) + return DaemonStatus.model_validate_json(raw) + + def list_scenes(self) -> list[str]: + """List available scene names.""" + req = SceneRequest(command="list_scenes") + raw = self._send(req) + data = json.loads(raw) + return list(data.get("scenes", [])) + + def is_daemon_running(self) -> bool: + """Check if the daemon is reachable.""" + try: + self.status() + except (ConnectionRefusedError, FileNotFoundError, OSError): + return False + return True diff --git a/src/hilbench/led/_daemon.py b/src/hilbench/led/_daemon.py new file mode 100644 index 0000000..2b6f3bb --- /dev/null +++ b/src/hilbench/led/_daemon.py @@ -0,0 +1,217 @@ +"""LED animation daemon with Unix socket IPC.""" + +from __future__ import annotations + +import contextlib +import json +import logging +import os +import selectors +import signal +import socket +import time +from typing import Any + +from hilbench.led._models import SceneRequest, SceneResponse +from hilbench.led._scenes import Scene, SuccessScene, get_scene, list_scenes +from hilbench.led._strip import LedStrip, StubStrip, Ws281xStrip + +logger = logging.getLogger(__name__) + + +class LedDaemon: + """Single-threaded LED animation daemon with Unix socket IPC.""" + + def __init__( + self, + led_count: int = 16, + gpio_pin: int = 18, + brightness: int = 128, + fps: int = 30, + socket_path: str = "/run/hil-bench/led.sock", + use_stub: bool = False, + ) -> None: + self._fps = fps + self._frame_interval = 1.0 / fps + self._socket_path = socket_path + self._shutdown = False + self._start_time = 0.0 + self._scene_start_time = 0.0 + + if use_stub: + self._strip: LedStrip = StubStrip(led_count) + else: + self._strip = Ws281xStrip(led_count, gpio_pin, brightness) + + self._scene: Scene | None = None + self._sel = selectors.DefaultSelector() + self._server_sock: socket.socket | None = None + + def _setup_socket(self) -> None: + """Create the Unix domain socket and start listening.""" + sock_dir = os.path.dirname(self._socket_path) + if sock_dir: + os.makedirs(sock_dir, exist_ok=True) + + if os.path.exists(self._socket_path): + os.unlink(self._socket_path) + + self._server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._server_sock.setblocking(False) + self._server_sock.bind(self._socket_path) + self._server_sock.listen(5) + self._sel.register(self._server_sock, selectors.EVENT_READ, self._accept_client) + logger.info("Listening on %s", self._socket_path) + + def _accept_client(self, sock: socket.socket) -> None: + """Accept a new client connection.""" + conn, _ = sock.accept() + conn.setblocking(False) + self._sel.register(conn, selectors.EVENT_READ, self._handle_client) + + def _handle_client(self, conn: socket.socket) -> None: + """Read a command from a client and send a response.""" + try: + data = conn.recv(4096) + if not data: + self._sel.unregister(conn) + conn.close() + return + + for line in data.decode().strip().split("\n"): + if not line: + continue + response = self._process_command(line) + conn.sendall((response + "\n").encode()) + except (ConnectionResetError, BrokenPipeError): + pass + except Exception: + logger.warning("Error handling client", exc_info=True) + finally: + with contextlib.suppress(KeyError, ValueError): + self._sel.unregister(conn) + conn.close() + + def _process_command(self, raw: str) -> str: + """Process a JSON command and return a JSON response.""" + try: + req = SceneRequest.model_validate_json(raw) + except Exception as exc: + resp = SceneResponse(ok=False, error=f"invalid request: {exc}") + return resp.model_dump_json() + + if req.command == "set_scene": + return self._cmd_set_scene(req.scene, req.params) + if req.command == "status": + return self._cmd_status() + if req.command == "list_scenes": + return json.dumps({"ok": True, "scenes": list_scenes()}) + + resp = SceneResponse(ok=False, error=f"unknown command: {req.command!r}") + return resp.model_dump_json() + + def _cmd_set_scene(self, scene_name: str, params: dict[str, Any]) -> str: + """Switch to a new scene.""" + try: + self._set_scene(scene_name, params) + except Exception as exc: + resp = SceneResponse(ok=False, error=str(exc)) + return resp.model_dump_json() + + resp = SceneResponse(ok=True, current_scene=scene_name) + return resp.model_dump_json() + + def _cmd_status(self) -> str: + """Return daemon status.""" + from hilbench.led._models import DaemonStatus + + status = DaemonStatus( + running=True, + current_scene=self._scene.name if self._scene else "", + led_count=self._strip.num_pixels, + uptime_s=time.monotonic() - self._start_time, + ) + return status.model_dump_json() + + def _set_scene(self, name: str, params: dict[str, Any] | None = None) -> None: + """Internal helper to switch scenes.""" + scene = get_scene(name) + if self._scene is not None: + self._scene.teardown(self._strip) + self._scene = scene + self._scene_start_time = time.monotonic() + self._scene.setup(self._strip, params or {}) + + def run(self) -> None: + """Main animation loop.""" + self._start_time = time.monotonic() + + def _handle_signal(signum: int, frame: Any) -> None: + logger.info("Received signal %s, shutting down LED daemon", signum) + self._shutdown = True + + import threading + + if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGTERM, _handle_signal) + signal.signal(signal.SIGINT, _handle_signal) + + self._setup_socket() + self._set_scene("booting") + logger.info("LED daemon started (fps=%d, pixels=%d)", self._fps, self._strip.num_pixels) + + # Transition to idle after boot animation + boot_end = self._start_time + 2.0 + + while not self._shutdown: + frame_start = time.monotonic() + + # Check for boot→idle transition + if ( + self._scene is not None + and self._scene.name == "booting" + and frame_start >= boot_end + ): + self._set_scene("idle") + + # Poll for socket events (non-blocking) + events = self._sel.select(timeout=0) + for key, _ in events: + callback = key.data + callback(key.fileobj) + + # Tick the current scene + if self._scene is not None: + elapsed_ms = (frame_start - self._scene_start_time) * 1000.0 + self._scene.tick(self._strip, elapsed_ms) + + # Handle auto-revert for SuccessScene + if isinstance(self._scene, SuccessScene) and self._scene.should_revert: + self._set_scene(self._scene.revert_scene) + + self._strip.show() + + # Sleep for the remainder of the frame + elapsed = time.monotonic() - frame_start + sleep_time = self._frame_interval - elapsed + if sleep_time > 0: + time.sleep(sleep_time) + + self._shutdown_cleanup() + + def _shutdown_cleanup(self) -> None: + """Clean up on shutdown.""" + logger.info("Shutting down LED daemon") + if self._scene is not None: + self._scene.teardown(self._strip) + self._strip.set_all(0, 0, 0) + self._strip.show() + + if self._server_sock is not None: + self._sel.unregister(self._server_sock) + self._server_sock.close() + self._sel.close() + + if os.path.exists(self._socket_path): + os.unlink(self._socket_path) + logger.info("LED daemon stopped") diff --git a/src/hilbench/led/_models.py b/src/hilbench/led/_models.py new file mode 100644 index 0000000..d29f30e --- /dev/null +++ b/src/hilbench/led/_models.py @@ -0,0 +1,75 @@ +"""Pydantic IPC models for LED daemon communication.""" + +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + + +class LedColor(BaseModel): + """RGB color value.""" + + r: int = Field(ge=0, le=255) + g: int = Field(ge=0, le=255) + b: int = Field(ge=0, le=255) + + @classmethod + def red(cls) -> LedColor: + return cls(r=255, g=0, b=0) + + @classmethod + def green(cls) -> LedColor: + return cls(r=0, g=255, b=0) + + @classmethod + def blue(cls) -> LedColor: + return cls(r=0, g=0, b=255) + + @classmethod + def white(cls) -> LedColor: + return cls(r=255, g=255, b=255) + + @classmethod + def off(cls) -> LedColor: + return cls(r=0, g=0, b=0) + + @classmethod + def yellow(cls) -> LedColor: + return cls(r=255, g=255, b=0) + + @classmethod + def cyan(cls) -> LedColor: + return cls(r=0, g=255, b=255) + + def to_grb_int(self) -> int: + """Convert to 24-bit GRB integer (rpi_ws281x format).""" + return (self.g << 16) | (self.r << 8) | self.b + + def to_rgb_tuple(self) -> tuple[int, int, int]: + return (self.r, self.g, self.b) + + +class SceneRequest(BaseModel): + """Client → daemon command.""" + + command: str + scene: str = "" + params: dict[str, Any] = Field(default_factory=dict) + + +class SceneResponse(BaseModel): + """Daemon → client response.""" + + ok: bool + current_scene: str = "" + error: str = "" + + +class DaemonStatus(BaseModel): + """Daemon status info.""" + + running: bool + current_scene: str = "" + led_count: int = 0 + uptime_s: float = 0.0 diff --git a/src/hilbench/led/_scenes.py b/src/hilbench/led/_scenes.py new file mode 100644 index 0000000..9c7156b --- /dev/null +++ b/src/hilbench/led/_scenes.py @@ -0,0 +1,340 @@ +"""Scene system for LED animations.""" + +from __future__ import annotations + +import colorsys +import math +from typing import TYPE_CHECKING, Any, Protocol + +if TYPE_CHECKING: + from hilbench.led._strip import LedStrip + + +class Scene(Protocol): + """Interface for LED animation scenes.""" + + @property + def name(self) -> str: ... + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: ... + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: ... + + def teardown(self, strip: LedStrip) -> None: ... + + +# ── Registry ──────────────────────────────────────────────────────────────── + +_SCENE_REGISTRY: dict[str, type[Scene]] = {} + + +def register_scene(cls: type[Scene]) -> type[Scene]: + """Decorator to register a scene class.""" + instance = cls() + _SCENE_REGISTRY[instance.name] = cls + return cls + + +def get_scene(name: str) -> Scene: + """Create a scene instance by name.""" + if name not in _SCENE_REGISTRY: + from hilbench.exceptions import LedError + + msg = f"unknown scene {name!r}; available: {list_scenes()}" + raise LedError(msg) + return _SCENE_REGISTRY[name]() + + +def list_scenes() -> list[str]: + """Return sorted list of registered scene names.""" + return sorted(_SCENE_REGISTRY.keys()) + + +# ── Helpers ───────────────────────────────────────────────────────────────── + + +def _parse_color(params: dict[str, Any], default: tuple[int, int, int]) -> tuple[int, int, int]: + """Extract (r,g,b) from params['color'] which may be a list or dict.""" + raw = params.get("color") + if raw is None: + return default + if isinstance(raw, (list, tuple)) and len(raw) == 3: + return (int(raw[0]), int(raw[1]), int(raw[2])) + if isinstance(raw, dict): + return (int(raw.get("r", 0)), int(raw.get("g", 0)), int(raw.get("b", 0))) + return default + + +def _clamp(value: int, lo: int = 0, hi: int = 255) -> int: + return max(lo, min(hi, value)) + + +# ── Built-in scenes ──────────────────────────────────────────────────────── + + +@register_scene +class IdleScene: + """Slow sine-wave breathing, dim to bright.""" + + @property + def name(self) -> str: + return "idle" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (0, 80, 200)) + self._speed = float(params.get("speed", 0.5)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + t = elapsed_ms / 1000.0 + brightness = 0.15 + 0.85 * ((math.sin(2 * math.pi * self._speed * t) + 1) / 2) + r = _clamp(int(self._color[0] * brightness)) + g = _clamp(int(self._color[1] * brightness)) + b = _clamp(int(self._color[2] * brightness)) + strip.set_all(r, g, b) + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class FlashingScene: + """Knight-rider sweep back and forth.""" + + @property + def name(self) -> str: + return "flashing" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (255, 165, 0)) + self._speed = float(params.get("speed", 2.0)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + n = strip.num_pixels + t = elapsed_ms / 1000.0 + cycle = (t * self._speed) % 2.0 + pos = cycle if cycle < 1.0 else 2.0 - cycle + center = pos * (n - 1) + + for i in range(n): + dist = abs(i - center) + intensity = max(0.0, 1.0 - dist / 3.0) + r = _clamp(int(self._color[0] * intensity)) + g = _clamp(int(self._color[1] * intensity)) + b = _clamp(int(self._color[2] * intensity)) + strip.set_pixel(i, r, g, b) + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class TestingScene: + """Chase pattern — segments moving along strip.""" + + @property + def name(self) -> str: + return "testing" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (0, 200, 255)) + self._speed = float(params.get("speed", 3.0)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + n = strip.num_pixels + t = elapsed_ms / 1000.0 + offset = t * self._speed + + for i in range(n): + phase = ((i + offset) % 4) / 4.0 + intensity = max(0.0, 1.0 - phase * 2) + r = _clamp(int(self._color[0] * intensity)) + g = _clamp(int(self._color[1] * intensity)) + b = _clamp(int(self._color[2] * intensity)) + strip.set_pixel(i, r, g, b) + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class ErrorScene: + """Red pulsing/flashing alert.""" + + @property + def name(self) -> str: + return "error" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (255, 0, 0)) + self._speed = float(params.get("speed", 2.0)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + t = elapsed_ms / 1000.0 + pulse = (math.sin(2 * math.pi * self._speed * t) + 1) / 2 + intensity = 0.3 + 0.7 * pulse + r = _clamp(int(self._color[0] * intensity)) + g = _clamp(int(self._color[1] * intensity)) + b = _clamp(int(self._color[2] * intensity)) + strip.set_all(r, g, b) + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class SuccessScene: + """Green burst, auto-reverts to idle.""" + + @property + def name(self) -> str: + return "success" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (0, 255, 0)) + self._duration_ms = float(params.get("duration_ms", 3000)) + self._revert_to = str(params.get("revert_to", "idle")) + self._reverted = False + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + if elapsed_ms >= self._duration_ms: + if not self._reverted: + self._reverted = True + return + progress = elapsed_ms / self._duration_ms + intensity = max(0.0, 1.0 - progress * 0.5) + r = _clamp(int(self._color[0] * intensity)) + g = _clamp(int(self._color[1] * intensity)) + b = _clamp(int(self._color[2] * intensity)) + strip.set_all(r, g, b) + + @property + def should_revert(self) -> bool: + return self._reverted + + @property + def revert_scene(self) -> str: + return self._revert_to + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class BootingScene: + """Sequential fill from one end.""" + + @property + def name(self) -> str: + return "booting" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (0, 100, 255)) + self._speed = float(params.get("speed", 1.0)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + n = strip.num_pixels + t = elapsed_ms / 1000.0 + filled = int((t * self._speed * n) % (n + 1)) + filled = min(filled, n) + + for i in range(n): + if i < filled: + strip.set_pixel(i, *self._color) + else: + strip.set_pixel(i, 0, 0, 0) + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class OffScene: + """All pixels off.""" + + @property + def name(self) -> str: + return "off" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + pass + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + strip.set_all(0, 0, 0) + + def teardown(self, strip: LedStrip) -> None: + strip.set_all(0, 0, 0) + + +@register_scene +class SolidScene: + """Static single color.""" + + @property + def name(self) -> str: + return "solid" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (255, 255, 255)) + if "brightness" in params: + strip.set_brightness(int(params["brightness"])) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + strip.set_all(*self._color) + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class RainbowScene: + """HSV rainbow cycling across strip.""" + + @property + def name(self) -> str: + return "rainbow" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._speed = float(params.get("speed", 0.5)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + n = strip.num_pixels + t = elapsed_ms / 1000.0 + + for i in range(n): + hue = (i / n + t * self._speed) % 1.0 + r_f, g_f, b_f = colorsys.hsv_to_rgb(hue, 1.0, 1.0) + strip.set_pixel(i, int(r_f * 255), int(g_f * 255), int(b_f * 255)) + + def teardown(self, strip: LedStrip) -> None: + pass + + +@register_scene +class ProgressScene: + """LED bar graph proportional to percent.""" + + @property + def name(self) -> str: + return "progress" + + def setup(self, strip: LedStrip, params: dict[str, Any]) -> None: + self._color = _parse_color(params, (0, 255, 0)) + self._bg_color = _parse_color({"color": params.get("bg_color")}, (10, 10, 10)) + self._percent = float(params.get("percent", 0)) + + def tick(self, strip: LedStrip, elapsed_ms: float) -> None: + n = strip.num_pixels + filled = int(n * self._percent / 100.0) + filled = max(0, min(n, filled)) + + for i in range(n): + if i < filled: + strip.set_pixel(i, *self._color) + else: + strip.set_pixel(i, *self._bg_color) + + def update_percent(self, percent: float) -> None: + self._percent = max(0.0, min(100.0, percent)) + + def teardown(self, strip: LedStrip) -> None: + pass diff --git a/src/hilbench/led/_strip.py b/src/hilbench/led/_strip.py new file mode 100644 index 0000000..4a0b2e0 --- /dev/null +++ b/src/hilbench/led/_strip.py @@ -0,0 +1,80 @@ +"""LED strip hardware abstraction.""" + +from __future__ import annotations + +from typing import Protocol + + +class LedStrip(Protocol): + """Interface for addressable LED strip control.""" + + @property + def num_pixels(self) -> int: ... + + def set_pixel(self, index: int, r: int, g: int, b: int) -> None: ... + + def set_all(self, r: int, g: int, b: int) -> None: ... + + def show(self) -> None: ... + + def set_brightness(self, brightness: int) -> None: ... + + +class Ws281xStrip: + """Real WS2812B strip using rpi_ws281x.""" + + def __init__( + self, + led_count: int, + gpio_pin: int = 18, + brightness: int = 128, + ) -> None: + from rpi_ws281x import Color, PixelStrip + + self._strip = PixelStrip(led_count, gpio_pin, brightness=brightness) + self._strip.begin() + self._led_count = led_count + self._Color = Color + + @property + def num_pixels(self) -> int: + return self._led_count + + def set_pixel(self, index: int, r: int, g: int, b: int) -> None: + self._strip.setPixelColor(index, self._Color(r, g, b)) + + def set_all(self, r: int, g: int, b: int) -> None: + for i in range(self._led_count): + self.set_pixel(i, r, g, b) + + def show(self) -> None: + self._strip.show() + + def set_brightness(self, brightness: int) -> None: + self._strip.setBrightness(brightness) + + +class StubStrip: + """In-memory LED strip for testing.""" + + def __init__(self, led_count: int = 16) -> None: + self._led_count = led_count + self.pixels: list[tuple[int, int, int]] = [(0, 0, 0)] * led_count + self.brightness: int = 128 + self.show_count: int = 0 + + @property + def num_pixels(self) -> int: + return self._led_count + + def set_pixel(self, index: int, r: int, g: int, b: int) -> None: + self.pixels[index] = (r, g, b) + + def set_all(self, r: int, g: int, b: int) -> None: + self.pixels = [(r, g, b)] * self._led_count + + def show(self) -> None: + self.show_count += 1 + + def set_brightness(self, brightness: int) -> None: + self.brightness = brightness diff --git a/src/hilbench/publisher/_hooks.py b/src/hilbench/publisher/_hooks.py index 066973a..875eda1 100644 --- a/src/hilbench/publisher/_hooks.py +++ b/src/hilbench/publisher/_hooks.py @@ -15,6 +15,17 @@ _publisher: SupabasePublisher | None = None +def _set_led_scene(socket_path: str, scene: str, params: dict[str, object] | None = None) -> None: + """Best-effort LED scene change — never raises.""" + try: + from hilbench.led import LedClient + + client = LedClient(socket_path) + client.set_scene(scene, params) + except Exception: + logger.debug("LED scene change failed (daemon may not be running)", exc_info=True) + + def _get_publisher(bench_config: BenchConfig) -> SupabasePublisher | None: """Return (and lazily create) the module-level publisher singleton.""" global _publisher # noqa: PLW0603 @@ -35,6 +46,7 @@ def _get_publisher(bench_config: BenchConfig) -> SupabasePublisher | None: def on_flash_start(bench_config: BenchConfig, target_name: str, firmware: str) -> None: """Called before flashing begins.""" + _set_led_scene(str(bench_config.led.socket_path), "flashing") pub = _get_publisher(bench_config) if pub is None: return @@ -46,6 +58,8 @@ def on_flash_end( bench_config: BenchConfig, target_name: str, success: bool, duration_s: float ) -> None: """Called after flashing completes (success or failure).""" + led_scene = "success" if success else "error" + _set_led_scene(str(bench_config.led.socket_path), led_scene) pub = _get_publisher(bench_config) if pub is None: return @@ -60,12 +74,14 @@ def on_flash_end( def on_health_complete(bench_config: BenchConfig, results: list[CheckResult]) -> None: """Called after health checks complete.""" + all_passed = all(r.passed for r in results) + led_scene = "idle" if all_passed else "error" + _set_led_scene(str(bench_config.led.socket_path), led_scene) pub = _get_publisher(bench_config) if pub is None: return from hilbench.health import results_to_dicts - all_passed = all(r.passed for r in results) checks = results_to_dicts(results) state = "idle" if all_passed else "error" pub.publish_status(state=state, healthy=all_passed, checks=checks) diff --git a/systemd/hil-bench-led.service b/systemd/hil-bench-led.service new file mode 100644 index 0000000..a51b278 --- /dev/null +++ b/systemd/hil-bench-led.service @@ -0,0 +1,16 @@ +[Unit] +Description=HIL Bench LED Status Display Daemon +After=local-fs.target + +[Service] +Type=simple +ExecStart=/opt/hil-bench/venv/bin/benchctl led daemon +RuntimeDirectory=hil-bench +User=root +Restart=on-failure +RestartSec=5 +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target diff --git a/tests/test_led_cli.py b/tests/test_led_cli.py new file mode 100644 index 0000000..dbc67c0 --- /dev/null +++ b/tests/test_led_cli.py @@ -0,0 +1,141 @@ +"""Tests for LED CLI commands.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from click.testing import CliRunner + +from hilbench.cli.main import cli +from hilbench.led._models import DaemonStatus, SceneResponse + + +class TestLedListScenes: + def test_list_scenes(self, sample_config_path) -> None: + runner = CliRunner() + result = runner.invoke(cli, ["led", "list-scenes"]) + assert result.exit_code == 0 + assert "idle" in result.output + assert "rainbow" in result.output + assert "off" in result.output + + +class TestLedSetScene: + def test_dry_run(self, sample_config_path) -> None: + runner = CliRunner() + result = runner.invoke( + cli, ["--config", str(sample_config_path), "--dry-run", "led", "set-scene", "idle"] + ) + assert result.exit_code == 0 + assert "dry-run" in result.output + + def test_set_scene_success(self, sample_config_path) -> None: + mock_client = MagicMock() + mock_client.set_scene.return_value = SceneResponse(ok=True, current_scene="rainbow") + + runner = CliRunner() + with patch("hilbench.led.LedClient", return_value=mock_client): + result = runner.invoke( + cli, + [ + "--config", + str(sample_config_path), + "led", + "set-scene", + "rainbow", + "--speed", + "2.0", + ], + ) + assert result.exit_code == 0 + assert "rainbow" in result.output + + def test_set_scene_with_color(self, sample_config_path) -> None: + mock_client = MagicMock() + mock_client.set_scene.return_value = SceneResponse(ok=True, current_scene="solid") + + runner = CliRunner() + with patch("hilbench.led.LedClient", return_value=mock_client): + result = runner.invoke( + cli, + [ + "--config", + str(sample_config_path), + "led", + "set-scene", + "solid", + "--color", + "255,0,0", + ], + ) + assert result.exit_code == 0 + mock_client.set_scene.assert_called_once() + call_params = mock_client.set_scene.call_args[0][1] + assert call_params["color"] == [255, 0, 0] + + def test_set_scene_error(self, sample_config_path) -> None: + mock_client = MagicMock() + mock_client.set_scene.return_value = SceneResponse(ok=False, error="daemon down") + + runner = CliRunner() + with patch("hilbench.led.LedClient", return_value=mock_client): + result = runner.invoke( + cli, + ["--config", str(sample_config_path), "led", "set-scene", "idle"], + ) + assert result.exit_code == 1 + + +class TestLedOff: + def test_dry_run(self, sample_config_path) -> None: + runner = CliRunner() + result = runner.invoke( + cli, ["--config", str(sample_config_path), "--dry-run", "led", "off"] + ) + assert result.exit_code == 0 + assert "dry-run" in result.output + + def test_off_success(self, sample_config_path) -> None: + mock_client = MagicMock() + mock_client.off.return_value = SceneResponse(ok=True, current_scene="off") + + runner = CliRunner() + with patch("hilbench.led.LedClient", return_value=mock_client): + result = runner.invoke(cli, ["--config", str(sample_config_path), "led", "off"]) + assert result.exit_code == 0 + + +class TestLedStatus: + def test_daemon_not_running(self, sample_config_path) -> None: + mock_client = MagicMock() + mock_client.is_daemon_running.return_value = False + + runner = CliRunner() + with patch("hilbench.led.LedClient", return_value=mock_client): + result = runner.invoke(cli, ["--config", str(sample_config_path), "led", "status"]) + assert result.exit_code == 0 + assert "not running" in result.output + + def test_daemon_running(self, sample_config_path) -> None: + mock_client = MagicMock() + mock_client.is_daemon_running.return_value = True + mock_client.status.return_value = DaemonStatus( + running=True, current_scene="idle", led_count=16, uptime_s=42.0 + ) + + runner = CliRunner() + with patch("hilbench.led.LedClient", return_value=mock_client): + result = runner.invoke(cli, ["--config", str(sample_config_path), "led", "status"]) + assert result.exit_code == 0 + assert "idle" in result.output + assert "16" in result.output + + +class TestLedDaemonCmd: + def test_dry_run(self, sample_config_path) -> None: + runner = CliRunner() + result = runner.invoke( + cli, ["--config", str(sample_config_path), "--dry-run", "led", "daemon"] + ) + assert result.exit_code == 0 + assert "dry-run" in result.output diff --git a/tests/test_led_client.py b/tests/test_led_client.py new file mode 100644 index 0000000..0aa87aa --- /dev/null +++ b/tests/test_led_client.py @@ -0,0 +1,111 @@ +"""Tests for LED client IPC.""" + +from __future__ import annotations + +import json +import socket +import threading +from typing import TYPE_CHECKING + +from hilbench.led._client import LedClient +from hilbench.led._models import DaemonStatus, SceneResponse + +if TYPE_CHECKING: + from pathlib import Path + + +def _run_mock_server(sock_path: str, responses: list[str], ready: threading.Event) -> None: + """Simple mock daemon that replies with pre-canned responses.""" + srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + srv.bind(sock_path) + srv.listen(1) + srv.settimeout(5.0) + ready.set() + + for resp_data in responses: + try: + conn, _ = srv.accept() + conn.recv(4096) # read request + conn.sendall((resp_data + "\n").encode()) + conn.close() + except TimeoutError: + break + srv.close() + + +class TestLedClient: + def test_set_scene(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + resp = SceneResponse(ok=True, current_scene="rainbow").model_dump_json() + ready = threading.Event() + t = threading.Thread(target=_run_mock_server, args=(sock_path, [resp], ready)) + t.start() + ready.wait() + + client = LedClient(sock_path) + result = client.set_scene("rainbow", {"speed": 2.0}) + assert result.ok + assert result.current_scene == "rainbow" + t.join(timeout=2) + + def test_off(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + resp = SceneResponse(ok=True, current_scene="off").model_dump_json() + ready = threading.Event() + t = threading.Thread(target=_run_mock_server, args=(sock_path, [resp], ready)) + t.start() + ready.wait() + + client = LedClient(sock_path) + result = client.off() + assert result.ok + assert result.current_scene == "off" + t.join(timeout=2) + + def test_status(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + status = DaemonStatus( + running=True, current_scene="idle", led_count=16, uptime_s=10.0 + ).model_dump_json() + ready = threading.Event() + t = threading.Thread(target=_run_mock_server, args=(sock_path, [status], ready)) + t.start() + ready.wait() + + client = LedClient(sock_path) + result = client.status() + assert result.running + assert result.current_scene == "idle" + t.join(timeout=2) + + def test_list_scenes(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + resp = json.dumps({"ok": True, "scenes": ["idle", "off", "rainbow"]}) + ready = threading.Event() + t = threading.Thread(target=_run_mock_server, args=(sock_path, [resp], ready)) + t.start() + ready.wait() + + client = LedClient(sock_path) + result = client.list_scenes() + assert "idle" in result + assert "rainbow" in result + t.join(timeout=2) + + def test_is_daemon_running_false_when_no_socket(self, tmp_path: Path) -> None: + client = LedClient(str(tmp_path / "nonexistent.sock")) + assert client.is_daemon_running() is False + + def test_is_daemon_running_true(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + status = DaemonStatus( + running=True, current_scene="idle", led_count=16, uptime_s=1.0 + ).model_dump_json() + ready = threading.Event() + t = threading.Thread(target=_run_mock_server, args=(sock_path, [status], ready)) + t.start() + ready.wait() + + client = LedClient(sock_path) + assert client.is_daemon_running() is True + t.join(timeout=2) diff --git a/tests/test_led_daemon.py b/tests/test_led_daemon.py new file mode 100644 index 0000000..044baa2 --- /dev/null +++ b/tests/test_led_daemon.py @@ -0,0 +1,158 @@ +"""Tests for LED daemon.""" + +from __future__ import annotations + +import json +import socket +import threading +import time +from typing import TYPE_CHECKING + +from hilbench.led._daemon import LedDaemon + +if TYPE_CHECKING: + from pathlib import Path + + +def _wait_for_socket(sock_path: str, timeout: float = 5.0) -> bool: + """Wait until the daemon socket is accepting connections.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(sock_path) + s.close() + return True + except (ConnectionRefusedError, FileNotFoundError): + time.sleep(0.05) + return False + + +def _send_command(sock_path: str, command: dict[str, object]) -> dict[str, object]: + """Send a JSON command and parse the response.""" + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(5.0) + s.connect(sock_path) + s.sendall((json.dumps(command) + "\n").encode()) + data = s.recv(4096).decode().strip() + s.close() + return json.loads(data) + + +class TestLedDaemon: + def test_starts_and_responds_to_status(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + daemon = LedDaemon(led_count=8, fps=30, socket_path=sock_path, use_stub=True) + + t = threading.Thread(target=daemon.run, daemon=True) + t.start() + assert _wait_for_socket(sock_path) + + resp = _send_command(sock_path, {"command": "status"}) + assert resp["running"] is True + assert resp["led_count"] == 8 + + daemon._shutdown = True + t.join(timeout=3) + + def test_set_scene(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + daemon = LedDaemon(led_count=8, fps=30, socket_path=sock_path, use_stub=True) + + t = threading.Thread(target=daemon.run, daemon=True) + t.start() + assert _wait_for_socket(sock_path) + + resp = _send_command( + sock_path, + {"command": "set_scene", "scene": "solid", "params": {"color": [255, 0, 0]}}, + ) + assert resp["ok"] is True + assert resp["current_scene"] == "solid" + + daemon._shutdown = True + t.join(timeout=3) + + def test_set_unknown_scene(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + daemon = LedDaemon(led_count=4, fps=30, socket_path=sock_path, use_stub=True) + + t = threading.Thread(target=daemon.run, daemon=True) + t.start() + assert _wait_for_socket(sock_path) + + resp = _send_command( + sock_path, {"command": "set_scene", "scene": "does_not_exist", "params": {}} + ) + assert resp["ok"] is False + assert "unknown scene" in resp["error"] + + daemon._shutdown = True + t.join(timeout=3) + + def test_list_scenes(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + daemon = LedDaemon(led_count=4, fps=30, socket_path=sock_path, use_stub=True) + + t = threading.Thread(target=daemon.run, daemon=True) + t.start() + assert _wait_for_socket(sock_path) + + resp = _send_command(sock_path, {"command": "list_scenes"}) + assert resp["ok"] is True + assert "idle" in resp["scenes"] + assert "rainbow" in resp["scenes"] + + daemon._shutdown = True + t.join(timeout=3) + + def test_invalid_json(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + daemon = LedDaemon(led_count=4, fps=30, socket_path=sock_path, use_stub=True) + + t = threading.Thread(target=daemon.run, daemon=True) + t.start() + assert _wait_for_socket(sock_path) + + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(5.0) + s.connect(sock_path) + s.sendall(b"not json\n") + data = s.recv(4096).decode().strip() + s.close() + resp = json.loads(data) + assert resp["ok"] is False + + daemon._shutdown = True + t.join(timeout=3) + + def test_boots_to_idle(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + daemon = LedDaemon(led_count=4, fps=60, socket_path=sock_path, use_stub=True) + + t = threading.Thread(target=daemon.run, daemon=True) + t.start() + assert _wait_for_socket(sock_path) + + # Wait for boot→idle transition (2 seconds) + time.sleep(2.5) + resp = _send_command(sock_path, {"command": "status"}) + assert resp["current_scene"] == "idle" + + daemon._shutdown = True + t.join(timeout=3) + + def test_socket_cleaned_up_on_shutdown(self, tmp_path: Path) -> None: + sock_path = str(tmp_path / "led.sock") + daemon = LedDaemon(led_count=4, fps=30, socket_path=sock_path, use_stub=True) + + t = threading.Thread(target=daemon.run, daemon=True) + t.start() + assert _wait_for_socket(sock_path) + + daemon._shutdown = True + t.join(timeout=3) + + import os + + assert not os.path.exists(sock_path) diff --git a/tests/test_led_models.py b/tests/test_led_models.py new file mode 100644 index 0000000..d7d7ea9 --- /dev/null +++ b/tests/test_led_models.py @@ -0,0 +1,124 @@ +"""Tests for LED models and config.""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from hilbench.config import BenchConfig, LedConfig +from hilbench.led._models import DaemonStatus, LedColor, SceneRequest, SceneResponse +from tests.conftest import SAMPLE_CONFIG + + +class TestLedColor: + def test_factory_red(self) -> None: + c = LedColor.red() + assert c.r == 255 and c.g == 0 and c.b == 0 + + def test_factory_green(self) -> None: + c = LedColor.green() + assert c.r == 0 and c.g == 255 and c.b == 0 + + def test_factory_blue(self) -> None: + c = LedColor.blue() + assert c.r == 0 and c.g == 0 and c.b == 255 + + def test_factory_off(self) -> None: + c = LedColor.off() + assert c.r == 0 and c.g == 0 and c.b == 0 + + def test_factory_white(self) -> None: + c = LedColor.white() + assert c.r == 255 and c.g == 255 and c.b == 255 + + def test_factory_yellow(self) -> None: + c = LedColor.yellow() + assert c.r == 255 and c.g == 255 and c.b == 0 + + def test_factory_cyan(self) -> None: + c = LedColor.cyan() + assert c.r == 0 and c.g == 255 and c.b == 255 + + def test_to_grb_int(self) -> None: + c = LedColor(r=255, g=0, b=0) + # GRB: g=0 << 16, r=255 << 8, b=0 + assert c.to_grb_int() == 0x00FF00 + + def test_to_rgb_tuple(self) -> None: + c = LedColor(r=10, g=20, b=30) + assert c.to_rgb_tuple() == (10, 20, 30) + + def test_out_of_range_rejects(self) -> None: + with pytest.raises(ValidationError): + LedColor(r=256, g=0, b=0) + with pytest.raises(ValidationError): + LedColor(r=0, g=-1, b=0) + + +class TestSceneRequest: + def test_minimal(self) -> None: + req = SceneRequest(command="set_scene", scene="idle") + assert req.command == "set_scene" + assert req.params == {} + + def test_with_params(self) -> None: + req = SceneRequest(command="set_scene", scene="solid", params={"color": [255, 0, 0]}) + assert req.params["color"] == [255, 0, 0] + + def test_json_round_trip(self) -> None: + req = SceneRequest(command="set_scene", scene="rainbow", params={"speed": 2.0}) + raw = req.model_dump_json() + restored = SceneRequest.model_validate_json(raw) + assert restored.scene == "rainbow" + assert restored.params["speed"] == 2.0 + + +class TestSceneResponse: + def test_ok(self) -> None: + resp = SceneResponse(ok=True, current_scene="idle") + assert resp.ok + assert resp.error == "" + + def test_error(self) -> None: + resp = SceneResponse(ok=False, error="unknown scene") + assert not resp.ok + + +class TestDaemonStatus: + def test_fields(self) -> None: + status = DaemonStatus(running=True, current_scene="idle", led_count=16, uptime_s=42.5) + assert status.running + assert status.led_count == 16 + + +class TestLedConfig: + def test_defaults(self) -> None: + cfg = LedConfig() + assert cfg.enabled is False + assert cfg.led_count == 16 + assert cfg.gpio_pin == 18 + assert cfg.brightness == 128 + assert cfg.fps == 30 + + def test_validation_led_count(self) -> None: + with pytest.raises(ValidationError): + LedConfig(led_count=0) + with pytest.raises(ValidationError): + LedConfig(led_count=1001) + + def test_validation_brightness(self) -> None: + with pytest.raises(ValidationError): + LedConfig(brightness=-1) + with pytest.raises(ValidationError): + LedConfig(brightness=256) + + def test_bench_config_has_led(self) -> None: + cfg = BenchConfig.model_validate(SAMPLE_CONFIG) + assert cfg.led.enabled is False + assert cfg.led.led_count == 16 + + def test_bench_config_with_led_override(self) -> None: + data = {**SAMPLE_CONFIG, "led": {"enabled": True, "led_count": 30}} + cfg = BenchConfig.model_validate(data) + assert cfg.led.enabled is True + assert cfg.led.led_count == 30 diff --git a/tests/test_led_scenes.py b/tests/test_led_scenes.py new file mode 100644 index 0000000..86e895a --- /dev/null +++ b/tests/test_led_scenes.py @@ -0,0 +1,249 @@ +"""Tests for LED scene engine.""" + +from __future__ import annotations + +import pytest + +from hilbench.exceptions import LedError +from hilbench.led._scenes import ( + BootingScene, + ErrorScene, + FlashingScene, + IdleScene, + OffScene, + ProgressScene, + RainbowScene, + SolidScene, + SuccessScene, + TestingScene, + get_scene, + list_scenes, +) +from hilbench.led._strip import StubStrip + + +class TestSceneRegistry: + def test_list_scenes_returns_all(self) -> None: + scenes = list_scenes() + expected = [ + "booting", + "error", + "flashing", + "idle", + "off", + "progress", + "rainbow", + "solid", + "success", + "testing", + ] + assert scenes == expected + + def test_get_scene_returns_instance(self) -> None: + scene = get_scene("idle") + assert scene.name == "idle" + + def test_get_unknown_scene_raises(self) -> None: + with pytest.raises(LedError, match="unknown scene"): + get_scene("nonexistent") + + +class TestIdleScene: + def test_breathing(self) -> None: + strip = StubStrip(8) + scene = IdleScene() + scene.setup(strip, {}) + scene.tick(strip, 0.0) + assert any(p != (0, 0, 0) for p in strip.pixels) + scene.tick(strip, 500.0) + scene.teardown(strip) + + def test_custom_color(self) -> None: + strip = StubStrip(4) + scene = IdleScene() + scene.setup(strip, {"color": [255, 0, 0], "speed": 1.0}) + scene.tick(strip, 0.0) + # All pixels should be the same (breathing applies uniformly) + assert all(p == strip.pixels[0] for p in strip.pixels) + + +class TestFlashingScene: + def test_knight_rider(self) -> None: + strip = StubStrip(16) + scene = FlashingScene() + scene.setup(strip, {}) + scene.tick(strip, 0.0) + # At t=0, the bright spot should be near the start + assert strip.pixels[0] != (0, 0, 0) + scene.tick(strip, 250.0) # Quarter of the way through + scene.teardown(strip) + + def test_different_times_produce_different_frames(self) -> None: + strip = StubStrip(16) + scene = FlashingScene() + scene.setup(strip, {}) + scene.tick(strip, 0.0) + frame1 = list(strip.pixels) + scene.tick(strip, 100.0) + frame2 = list(strip.pixels) + assert frame1 != frame2 + + +class TestTestingScene: + def test_chase(self) -> None: + strip = StubStrip(8) + scene = TestingScene() + scene.setup(strip, {}) + scene.tick(strip, 0.0) + scene.tick(strip, 333.0) + scene.teardown(strip) + + +class TestErrorScene: + def test_pulsing(self) -> None: + strip = StubStrip(8) + scene = ErrorScene() + scene.setup(strip, {}) + scene.tick(strip, 0.0) + # Red channel should be non-zero (pulsing red) + assert any(p[0] > 0 for p in strip.pixels) + scene.tick(strip, 250.0) + assert any(p[0] > 0 for p in strip.pixels) + + +class TestSuccessScene: + def test_burst_and_revert(self) -> None: + strip = StubStrip(8) + scene = SuccessScene() + scene.setup(strip, {"duration_ms": 100}) + scene.tick(strip, 0.0) + assert not scene.should_revert + assert any(p != (0, 0, 0) for p in strip.pixels) + + scene.tick(strip, 200.0) + assert scene.should_revert + assert scene.revert_scene == "idle" + + def test_custom_revert(self) -> None: + strip = StubStrip(4) + scene = SuccessScene() + scene.setup(strip, {"revert_to": "rainbow", "duration_ms": 50}) + scene.tick(strip, 100.0) + assert scene.revert_scene == "rainbow" + + +class TestBootingScene: + def test_sequential_fill(self) -> None: + strip = StubStrip(8) + scene = BootingScene() + scene.setup(strip, {"speed": 10.0}) + scene.tick(strip, 0.0) + # At t=0 the fill may be partial + scene.tick(strip, 500.0) + scene.teardown(strip) + + +class TestOffScene: + def test_all_off(self) -> None: + strip = StubStrip(8) + # Pre-fill with some color + strip.set_all(255, 0, 0) + scene = OffScene() + scene.setup(strip, {}) + scene.tick(strip, 0.0) + assert all(p == (0, 0, 0) for p in strip.pixels) + + def test_teardown_clears(self) -> None: + strip = StubStrip(4) + strip.set_all(100, 100, 100) + scene = OffScene() + scene.setup(strip, {}) + scene.teardown(strip) + assert all(p == (0, 0, 0) for p in strip.pixels) + + +class TestSolidScene: + def test_solid_color(self) -> None: + strip = StubStrip(8) + scene = SolidScene() + scene.setup(strip, {"color": [128, 64, 32]}) + scene.tick(strip, 0.0) + assert all(p == (128, 64, 32) for p in strip.pixels) + + def test_sets_brightness(self) -> None: + strip = StubStrip(4) + scene = SolidScene() + scene.setup(strip, {"brightness": 50}) + assert strip.brightness == 50 + + +class TestRainbowScene: + def test_different_pixels(self) -> None: + strip = StubStrip(16) + scene = RainbowScene() + scene.setup(strip, {}) + scene.tick(strip, 0.0) + # Rainbow should produce different colors across pixels + unique = set(strip.pixels) + assert len(unique) > 1 + + +class TestProgressScene: + def test_zero_percent(self) -> None: + strip = StubStrip(8) + scene = ProgressScene() + scene.setup(strip, {"percent": 0}) + scene.tick(strip, 0.0) + # All should be background color + assert all(p == (10, 10, 10) for p in strip.pixels) + + def test_fifty_percent(self) -> None: + strip = StubStrip(8) + scene = ProgressScene() + scene.setup(strip, {"percent": 50, "color": [0, 255, 0]}) + scene.tick(strip, 0.0) + # First 4 should be green, last 4 should be background + assert strip.pixels[0] == (0, 255, 0) + assert strip.pixels[3] == (0, 255, 0) + assert strip.pixels[4] == (10, 10, 10) + + def test_hundred_percent(self) -> None: + strip = StubStrip(4) + scene = ProgressScene() + scene.setup(strip, {"percent": 100, "color": [0, 255, 0]}) + scene.tick(strip, 0.0) + assert all(p == (0, 255, 0) for p in strip.pixels) + + def test_update_percent(self) -> None: + strip = StubStrip(8) + scene = ProgressScene() + scene.setup(strip, {"percent": 0}) + scene.update_percent(75) + scene.tick(strip, 0.0) + # 6 of 8 should be filled + filled = sum(1 for p in strip.pixels if p != (10, 10, 10)) + assert filled == 6 + + +class TestStubStrip: + def test_set_pixel(self) -> None: + strip = StubStrip(4) + strip.set_pixel(2, 100, 200, 50) + assert strip.pixels[2] == (100, 200, 50) + assert strip.pixels[0] == (0, 0, 0) + + def test_set_all(self) -> None: + strip = StubStrip(4) + strip.set_all(10, 20, 30) + assert all(p == (10, 20, 30) for p in strip.pixels) + + def test_show_increments(self) -> None: + strip = StubStrip(4) + assert strip.show_count == 0 + strip.show() + strip.show() + assert strip.show_count == 2 + + def test_num_pixels(self) -> None: + strip = StubStrip(32) + assert strip.num_pixels == 32