diff --git a/examples/service/README.md b/examples/service/README.md new file mode 100644 index 0000000..43703e2 --- /dev/null +++ b/examples/service/README.md @@ -0,0 +1,271 @@ +# ROSE Service Examples + +This directory contains examples for running ROSE workflows through the **ROSE Service** — a daemon-based workflow manager that accepts workflow submissions, tracks execution state, and provides real-time status updates. + +The service uses file-based IPC: the manager polls a local directory for request files, and clients (CLI or Python) write JSON requests into that directory. A shared `registry.json` file reflects the live state of all workflows. + +**Files in this directory:** + +| File | Description | +|------|-------------| +| `service_test.yaml` | Minimal test workflow using `/bin/echo` (no dependencies required) | +| `service_real.yaml` | Real workflow using `ParallelActiveLearner` with Python scripts | +| `debug_workflow.yaml` | Fast test workflow for plugin testing (2 iterations) | +| `run_service.py` | Integration example: launches, submits, monitors, and shuts down programmatically | +| `verify_service.py` | Demonstrates workflow cancellation flow | +| `example_rose_plugin.py` | REST API example using RADICAL-Edge plugin | + +--- + +## Workflow YAML Format + +Both examples use a YAML file to define the workflow. The service loads this file, instantiates the appropriate learner, and registers the component tasks. + +```yaml +learner: + type: SequentialActiveLearner # or ParallelActiveLearner + +components: + simulation: + type: script # or "function" for a Python callable + path: /bin/echo + config: + args: ["Simulation Step"] + training: + type: script + path: /bin/echo + config: + args: ["Training Step"] + active_learn: + type: script + path: /bin/echo + config: + args: ["AL Step"] + +config: + max_iterations: 3 + work_dir: /tmp/rose_test +``` + +--- + +## Option 1 — CLI (Two Terminals) + +The CLI is the simplest way to interact with ROSE service. You need two terminal sessions: one to run the service daemon and one to submit and monitor workflows. + +> The `--job-id` flag identifies the service instance. If you are inside a SLURM job, it defaults to `$SLURM_JOB_ID`. For local usage, it defaults to `local_job_0`. Both terminals must use the same job ID. + +### Terminal 1 — Start the Service + +```bash +rose launch +``` + +The service starts and blocks, printing log output as workflows are received and executed. Keep this terminal open for the lifetime of the session. + +To use a custom job ID (e.g. for running multiple isolated services): + +```bash +rose launch --job-id job.000001 +``` + +### Terminal 2 — Submit and Monitor a Workflow + +**Submit a workflow:** + +```bash +rose submit --job-id job.000001 examples/service/service_test.yaml +``` + +Output: + +``` +Submitted workflow request. +Request ID: 3f2a1b4c-... +Workflow ID: wf.3f2a1b4c +``` + +**Check workflow status:** + +```bash +rose status wf.3f2a1b4c +``` + +Output (example): + +```json +{ + "wf_id": "wf.3f2a1b4c", + "state": "running", + "workflow_file": "/path/to/service_test.yaml", + "start_time": 1700000000.0, + "end_time": null, + "stats": { + "iteration": 2, + "metric_value": null + }, + "error": null +} +``` + +**List all workflows:** + +```bash +rose status +``` + +**Cancel a running workflow:** + +```bash +rose cancel wf.3f2a1b4c +``` + +**Shut down the service when done:** + +```bash +rose shutdown +``` + +> The service in Terminal 1 will exit gracefully after receiving the shutdown request. + +If you launched with a custom `--job-id`, pass the same flag to all client commands: + +```bash +rose submit --job-id my_session examples/service/service_test.yaml +rose status --job-id my_session +rose shutdown --job-id my_session +``` + +--- + +## Option 2 — Python Client + +Use `ServiceClient` from `rose.service.client` to drive the service programmatically — useful for integration scripts, notebooks, or automated pipelines. + +See [`run_service.py`](run_service.py) for a complete working example. It covers the full lifecycle: + +1. Start `ServiceManager` as a background `asyncio` task +2. Initialize `ServiceClient` with the same job ID +3. Call `client.submit_workflow(path)` to submit a YAML workflow +4. Derive the workflow ID with `ServiceClient.get_wf_id(req_id)` +5. Poll `client.get_workflow_status(wf_id)` until the state reaches `COMPLETED`, `FAILED`, or `CANCELED` +6. Call `client.shutdown()` to stop the service + +Run it with: + +```bash +python examples/service/run_service.py +``` + +See [`verify_service.py`](verify_service.py) for an example of cancellation via `client.cancel_workflow(wf_id)`. + +**Key `ServiceClient` methods:** + +| Method | Description | +|--------|-------------| +| `submit_workflow(path)` | Submit a YAML workflow file; returns a `req_id` | +| `ServiceClient.get_wf_id(req_id)` | Derive the workflow ID from a request ID | +| `get_workflow_status(wf_id)` | Return the current state dict for a workflow | +| `list_workflows()` | Return all workflows from the registry | +| `cancel_workflow(wf_id)` | Request cancellation of a running workflow | +| `shutdown()` | Send a graceful shutdown request to the service | + +--- + +## Option 3 — REST API via RADICAL-Edge Plugin + +The ROSE plugin for RADICAL-Edge provides a REST API for workflow management. This is the recommended approach for remote access and integration with other services. + +**Architecture:** +``` +Client (Python/curl/browser) + ↓ HTTP/REST +RADICAL-Edge Bridge + ↓ WebSocket +Edge Service (with ROSE plugin) + ↓ +WorkflowEngine / Learners (embedded) +``` + +The plugin embeds workflow execution directly — no separate `rose launch` daemon required. + +### Prerequisites + +1. RADICAL-Edge bridge running +2. RADICAL-Edge service running with ROSE plugin loaded + +### REST Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| `POST` | `/rose/register_session` | Register a new session | +| `POST` | `/rose/submit/{sid}` | Submit a workflow | +| `GET` | `/rose/status/{sid}/{wf_id}` | Get workflow status | +| `GET` | `/rose/workflows/{sid}` | List all workflows | +| `POST` | `/rose/cancel/{sid}/{wf_id}` | Cancel a workflow | +| `POST` | `/rose/unregister_session/{sid}` | Close session | + +### Python Client Example + +See [`example_rose_plugin.py`](example_rose_plugin.py) for a complete working example. + +```python +from radical.edge import BridgeClient +import rose.service.api.rest # Register plugin + +# Connect to bridge +bc = BridgeClient(url='https://localhost:8000') +edges = bc.list_edges() +ec = bc.get_edge_client(edges[0]) + +# Get ROSE plugin client +rose = ec.get_plugin('rose') + +# Submit workflow +result = rose.submit_workflow('/path/to/workflow.yaml') +wf_id = result['wf_id'] + +# Monitor status +status = rose.get_workflow_status(wf_id) +print(f"State: {status['state']}") + +# List all workflows +workflows = rose.list_workflows() + +# Cancel if needed +rose.cancel_workflow(wf_id) + +# Cleanup +rose.close() +bc.close() +``` + +### Notifications + +The plugin sends real-time notifications via SSE when workflow state changes: + +```python +def on_state_change(topic, data): + print(f"Workflow {data['wf_id']}: {data['state']}") + +rose.on_workflow_state(on_state_change) +``` + +### Running the Example + +```bash +# Set bridge URL +export RADICAL_BRIDGE_URL=https://localhost:8000 + +# Run example +python example_rose_plugin.py --workflow debug_workflow.yaml +``` + +--- + +## Additional Files + +| File | Description | +|------|-------------| +| `example_rose_plugin.py` | REST API example using RADICAL-Edge plugin | +| `debug_workflow.yaml` | Fast test workflow (2 iterations, ~6 seconds) | diff --git a/examples/service/debug_workflow.yaml b/examples/service/debug_workflow.yaml new file mode 100644 index 0000000..151a905 --- /dev/null +++ b/examples/service/debug_workflow.yaml @@ -0,0 +1,39 @@ +# Debug Workflow for ROSE Plugin Testing +# +# This workflow uses simple echo commands to quickly test +# the ROSE service and plugin without heavy computation. +# +# Usage: +# python example_rose_client.py --workflow debug_workflow.yaml + +learner: + type: SequentialActiveLearner + +components: + simulation: + type: script + path: /bin/bash + config: + args: + - "-c" + - "echo '[SIM] Iteration $ROSE_ITERATION' && sleep 1" + + training: + type: script + path: /bin/bash + config: + args: + - "-c" + - "echo '[TRAIN] Training model...' && sleep 1" + + active_learn: + type: script + path: /bin/bash + config: + args: + - "-c" + - "echo '[AL] Active learning step' && sleep 1" + +config: + max_iterations: 2 + work_dir: /tmp/rose_debug diff --git a/examples/service/example_rose_plugin.py b/examples/service/example_rose_plugin.py new file mode 100755 index 0000000..fa796ac --- /dev/null +++ b/examples/service/example_rose_plugin.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +Example: Test ROSE Edge Plugin + +Connects to a running RADICAL-Edge bridge, submits a workflow via the +ROSE plugin, and monitors its status. + +Prerequisites: + 1. Bridge running: radical-edge-bridge + 2. Edge with ROSE plugin: radical-edge-service (with ROSE plugin loaded) + 3. ROSE ServiceManager: rose launch --job-id local_job_0 + +Usage: + export RADICAL_BRIDGE_URL=https://localhost:8443 + python example_rose_plugin.py [--workflow FILE] [--job-id ID] +""" + +import argparse +import logging +import os +import sys +import time +from pathlib import Path + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s", + datefmt="%H:%M:%S", +) +for name in ["httpx", "httpcore", "urllib3"]: + logging.getLogger(name).setLevel(logging.DEBUG) + +log = logging.getLogger("rose.example") + + +def notification_cb(topic: str, data: dict): + """Handle workflow notifications.""" + log.info(f"[NOTIFY] {topic}: {data}") + + +def main(): + parser = argparse.ArgumentParser(description="Test ROSE Edge Plugin") + parser.add_argument("--workflow", "-w", default="debug_workflow.yaml") + parser.add_argument("--job-id", "-j", default="local_job_0") + parser.add_argument( + "--bridge-url", "-b", default=os.environ.get("RADICAL_BRIDGE_URL", "https://localhost:8443") + ) + args = parser.parse_args() + + # Resolve workflow path + workflow = Path(args.workflow) + if not workflow.exists(): + workflow = Path(__file__).parent / args.workflow + if not workflow.exists(): + log.error(f"Workflow not found: {args.workflow}") + sys.exit(1) + + log.info(f"Bridge: {args.bridge_url}") + log.info(f"Workflow: {workflow}") + log.info(f"Job ID: {args.job_id}") + log.info("-" * 60) + + from radical.edge import BridgeClient + + # Import ROSE plugin to register client class locally + import rose.service.api.rest # noqa: F401 + + try: + bc = BridgeClient(url=args.bridge_url) + edges = bc.list_edges() + except Exception as e: + log.error(f"Cannot connect to bridge: {e}") + log.error("Make sure bridge and edge service are running.") + sys.exit(1) + + if not edges: + log.error("No edges connected to bridge") + sys.exit(1) + + edge_id = edges[0] + log.info(f"Using edge: {edge_id}") + + try: + ec = bc.get_edge_client(edge_id) + rose = ec.get_plugin("rose", job_id=args.job_id) + except Exception as e: + log.error(f"Cannot get ROSE plugin: {e}") + log.error("Make sure ROSE plugin is loaded on edge service.") + sys.exit(1) + + log.info(f"Session: {rose.sid}") + rose.on_workflow_state(notification_cb) + + try: + # Submit workflow + log.info(f"Submitting {workflow}...") + result = rose.submit_workflow(str(workflow.absolute())) + wf_id = result["wf_id"] + log.info(f"Submitted: {wf_id}") + + # Monitor status + log.info("Monitoring (Ctrl+C to cancel)...") + terminal = {"COMPLETED", "FAILED", "CANCELED"} + last_state = None + + for _i in range(120): + time.sleep(2) + try: + status = rose.get_workflow_status(wf_id) + state = status.get("state", "UNKNOWN") + if state != last_state: + log.info(f"State: {state}") + last_state = state + if state in terminal: + if state == "FAILED": + log.error(f"Error: {status.get('error')}") + break + except Exception as e: + log.warning(f"Status error: {e}") + + # Final state + log.info("-" * 60) + for wid, info in rose.list_workflows().items(): + log.info(f"{wid}: {info.get('state')}") + + except KeyboardInterrupt: + log.warning("Interrupted") + + finally: + rose.off_workflow_state(notification_cb) + rose.close() + bc.close() + log.info("Done.") + + +if __name__ == "__main__": + main() diff --git a/examples/service/run_service.py b/examples/service/run_service.py new file mode 100644 index 0000000..3911756 --- /dev/null +++ b/examples/service/run_service.py @@ -0,0 +1,120 @@ +import asyncio +import json +import logging +import os +import shutil +from pathlib import Path + +from rose.service.client import ServiceClient +from rose.service.manager import ServiceManager + +# Configure logging to see what's happening +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("run_service") + +# Job ID for this service instance +JOB_ID = "rose_service_run" +SERVICE_ROOT = Path.home() / ".rose" / "services" / JOB_ID + + +def cleanup(): + if SERVICE_ROOT.exists(): + logger.info(f"Cleaning up previous service root at {SERVICE_ROOT}") + shutil.rmtree(SERVICE_ROOT) + + +async def run_workflow(): + print(f"--- Starting ROSE Service for Job {JOB_ID} ---") + cleanup() + + # 1. Start Service Manager in a background task + manager = ServiceManager(JOB_ID) + service_task = asyncio.create_task(manager.run()) + + # Wait for service initialization + await asyncio.sleep(1) + + # 2. Initialize Client + client = ServiceClient(JOB_ID) + + # 3. Submit the realistic workflow + wf_path = "service_real.yaml" + if not os.path.exists(wf_path): + print(f"Error: Workflow file not found at {wf_path}") + await manager.shutdown() + return + + print(f"Submitting workflow: {wf_path}") + req_id = client.submit_workflow(wf_path) + print(f"Submitted. Request ID: {req_id}") + + # 4. Wait for workflow to be picked up and assigned a wf_id + wf_id = None + print("Waiting for service to assign Workflow ID...") + for _ in range(20): + await asyncio.sleep(1) + registry = client.list_workflows() + if registry: + wf_id = list(registry.keys())[0] + print(f"Assigned Workflow ID: {wf_id}") + break + + if not wf_id: + print("Error: Workflow was not picked up by the service.") + await manager.shutdown() + return + + # 5. Monitor progress until completion + print(f"Monitoring workflow {wf_id}...") + last_state = None + while True: + status = client.get_workflow_status(wf_id) + if not status: + print("Error: Workflow status lost.") + break + + current_state = status.get("state") + if current_state != last_state: + print(f"Status Change: {current_state}") + last_state = current_state + + if current_state in ["COMPLETED", "FAILED", "CANCELED"]: + print(f"Workflow reached terminal state: {current_state}") + if current_state == "FAILED": + print(f"Error Details: {status.get('error')}") + break + + # Optional: Print iteration progress if available in stats + stats = status.get("stats", {}) + if "iteration" in stats: + print( + f" Iteration: {stats['iteration']} | Metric: {stats.get('metric_value', 'N/A')}", + end="\r", + ) + + await asyncio.sleep(2) + + # 6. Final Report + status = client.get_workflow_status(wf_id) + print("\n--- Final Workflow Status ---") + print(json.dumps(status, indent=2)) + + # 7. Graceful Shutdown + print("Shutting down service...") + await manager.shutdown() + try: + # Give some time for internal tasks to finish before canceling the service loop + await asyncio.wait_for(service_task, timeout=5) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + + print("--- Service Run Finished ---") + + +if __name__ == "__main__": + try: + asyncio.run(run_workflow()) + except KeyboardInterrupt: + print("\nInterrupted by user.") diff --git a/examples/service/service_real.yaml b/examples/service/service_real.yaml new file mode 100644 index 0000000..c4544d4 --- /dev/null +++ b/examples/service/service_real.yaml @@ -0,0 +1,24 @@ +learner: + path: rose.al.active_learner.ParallelActiveLearner + +components: + simulation: + type: script + path: /home/aymen/ve/raas/bin/python3 + config: + args: ["/home/aymen/RADICAL/ROSE-AS-A-SERVICE/ROSE/examples/active_learn/basic/sim.py"] + training: + type: script + path: /home/aymen/ve/raas/bin/python3 + config: + args: ["/home/aymen/RADICAL/ROSE-AS-A-SERVICE/ROSE/examples/active_learn/basic/train.py"] + active_learn: + type: script + path: /home/aymen/ve/raas/bin/python3 + config: + args: ["/home/aymen/RADICAL/ROSE-AS-A-SERVICE/ROSE/examples/active_learn/basic/active.py"] + +config: + parallel_learners: 4 + max_iterations: 10 + work_dir: /tmp/rose_test diff --git a/examples/service/service_test.yaml b/examples/service/service_test.yaml new file mode 100644 index 0000000..41451c0 --- /dev/null +++ b/examples/service/service_test.yaml @@ -0,0 +1,23 @@ +learner: + type: SequentialActiveLearner + +components: + simulation: + type: script + path: /bin/echo + config: + args: ["Simulation Step"] + training: + type: script + path: /bin/echo + config: + args: ["Training Step"] + active_learn: + type: script + path: /bin/echo + config: + args: ["AL Step"] + +config: + max_iterations: 3 + work_dir: /tmp/rose_test diff --git a/examples/service/verify_service.py b/examples/service/verify_service.py new file mode 100644 index 0000000..3f3c79b --- /dev/null +++ b/examples/service/verify_service.py @@ -0,0 +1,81 @@ +import asyncio +import shutil +from pathlib import Path + +from rose.service.client import ServiceClient +from rose.service.manager import ServiceManager + +# Mock Job ID +JOB_ID = "test_job_123" +SERVICE_ROOT = Path.home() / ".rose" / "services" / JOB_ID + + +def cleanup(): + if SERVICE_ROOT.exists(): + shutil.rmtree(SERVICE_ROOT) + + +async def run_verification(): + print(f"--- Starting Verification for Job {JOB_ID} ---") + cleanup() + + # 1. Start Service in Background Task + manager = ServiceManager(JOB_ID) + service_task = asyncio.create_task(manager.run()) + + # Allow service to init + await asyncio.sleep(2) + + # 2. Initialize Client + client = ServiceClient(JOB_ID) + + # 3. Submit Workflow + wf_path = "service_real.yaml" + print(f"Submitting {wf_path}...") + req_id = client.submit_workflow(wf_path) + print(f"Submitted. Request ID: {req_id}") + + # 4. Poll for Status until Running + wf_id = None + for _ in range(10): + await asyncio.sleep(1) + registry = client.list_workflows() + if registry: + wf_id = list(registry.keys())[0] + state = registry[wf_id]["state"] + print(f"Workflow {wf_id} State: {state}") + if state in ["RUNNING", "COMPLETED"]: + break + + if not wf_id: + print("Failed to get workflow ID") + await manager.shutdown() + return + + # 5. Cancel Workflow (if still running) + print(f"Canceling {wf_id}...") + client.cancel_workflow(wf_id) + + # 6. Check for Canceled State + for _ in range(5): + await asyncio.sleep(1) + status = client.get_workflow_status(wf_id) + print(f"Workflow {wf_id} State: {status['state']}") + if status["state"] == "CANCELED": + print("SUCCESS: Workflow Canceled") + break + if status["state"] == "COMPLETED": + print("Workflow finished before cancel (acceptable for short test)") + break + + # Shutdown + await manager.shutdown() + try: + await service_task + except asyncio.CancelledError: + pass + print("--- Verification Finished ---") + + +if __name__ == "__main__": + asyncio.run(run_verification()) diff --git a/pyproject.toml b/pyproject.toml index e8fcca0..266342c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ requires-python = ">=3.10" dependencies = [ "numpy", + "PyYAML", "radical.asyncflow", "rhapsody-py[radical_pilot]", 'rhapsody-py[dragon]; python_version >= "3.10" and python_version <= "3.12"', @@ -29,6 +30,12 @@ Homepage = "https://github.com/radical-cybertools/ROSE" Issues = "https://github.com/radical-cybertools/ROSE/issues" Documentation = "https://radical-cybertools.github.io/ROSE/" +[project.scripts] +rose = "rose.service.api.cli:main" + +[project.entry-points."radical.edge.plugins"] +rose = "rose.service.api.rest:PluginRose" + [project.optional-dependencies] lint = ["ruff"] diff --git a/rose/service/.claude/settings.local.json b/rose/service/.claude/settings.local.json new file mode 100644 index 0000000..4742ca3 --- /dev/null +++ b/rose/service/.claude/settings.local.json @@ -0,0 +1,17 @@ +{ + "permissions": { + "allow": [ + "Bash(wc:*)", + "Bash(python -c:*)", + "Bash(source:*)", + "Bash(ruff check:*)", + "Bash(python3:*)", + "Bash(pip show:*)", + "Bash(pkill:*)", + "Bash(pgrep:*)", + "Bash(find:*)", + "Bash(xargs:*)", + "Bash(grep:*)" + ] + } +} diff --git a/rose/service/__init__.py b/rose/service/__init__.py new file mode 100644 index 0000000..e3fddf1 --- /dev/null +++ b/rose/service/__init__.py @@ -0,0 +1,3 @@ +from .models import Workflow, WorkflowState + +__all__ = ["WorkflowState", "Workflow"] diff --git a/rose/service/api/__init__.py b/rose/service/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rose/service/api/cli.py b/rose/service/api/cli.py new file mode 100644 index 0000000..24c21f1 --- /dev/null +++ b/rose/service/api/cli.py @@ -0,0 +1,141 @@ +import argparse +import asyncio +import json +import logging +import os +import sys + +from rose.service.client import ServiceClient +from rose.service.manager import ServiceManager + + +def get_job_id(): + """Get SLURM_JOB_ID from env or argument.""" + # Priority: env var -> but for client, user might validly pass it as arg. + # For 'launch', we usually rely on env var if running inside job. + return os.environ.get("SLURM_JOB_ID", "local_job_0") + + +def cmd_launch(args): + """Start the Service Manager.""" + # Configure logging for the service + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%H:%M:%S", + ) + + job_id = args.job_id or get_job_id() + print(f"Launching ROSE Service for Job ID: {job_id}") + + manager = ServiceManager(job_id) + # The loop in manager.run() handles signals if radical.asyncflow does, + # and the finally block ensures manager.shutdown() is called. + asyncio.run(manager.run()) + + +def cmd_submit(args): + """Submit a workflow.""" + job_id = args.job_id or get_job_id() + client = ServiceClient(job_id) + + try: + req_id = client.submit_workflow(args.workflow_file) + wf_id = ServiceClient.get_wf_id(req_id) + print("Submitted workflow request.") + print(f"Request ID: {req_id}") + print(f"Workflow ID: {wf_id}") + except Exception as e: + print(f"Error submitting workflow: {e}") + sys.exit(1) + + +def cmd_cancel(args): + """Cancel a workflow.""" + job_id = args.job_id or get_job_id() + client = ServiceClient(job_id) + + try: + req_id = client.cancel_workflow(args.wf_id) + print(f"Sent cancellation request for {args.wf_id}. Request ID: {req_id}") + except Exception as e: + print(f"Error cancelling workflow: {e}") + sys.exit(1) + + +def cmd_status(args): + """Get status.""" + job_id = args.job_id or get_job_id() + client = ServiceClient(job_id) + + try: + if args.wf_id: + status = client.get_workflow_status(args.wf_id) + if status: + print(json.dumps(status, indent=2)) + else: + print(f"Workflow {args.wf_id} not found.") + else: + # List all + registry = client.list_workflows() + print(f"Workflows in Job {job_id}:") + for wfid, wfdata in registry.items(): + print(f" - {wfid}: {wfdata.get('state')}") + except Exception as e: + print(f"Error getting status: {e}") + sys.exit(1) + + +def cmd_shutdown(args): + """Shutdown the service.""" + job_id = args.job_id or get_job_id() + client = ServiceClient(job_id) + try: + client.shutdown() + print(f"Shutdown request sent to service (Job ID: {job_id})") + except Exception as e: + print(f"Error sending shutdown request: {e}") + sys.exit(1) + + +def main(): + parser = argparse.ArgumentParser(description="ROSE Service CLI") + subparsers = parser.add_subparsers(dest="command", required=True) + + # Common arg for job id + parent_parser = argparse.ArgumentParser(add_help=False) + parent_parser.add_argument("--job-id", help="SLURM Job ID (default: $SLURM_JOB_ID)") + + # Launch + p_launch = subparsers.add_parser( + "launch", parents=[parent_parser], help="Start the Service Manager daemon" + ) + p_launch.set_defaults(func=cmd_launch) + + # Submit + p_submit = subparsers.add_parser("submit", parents=[parent_parser], help="Submit a workflow") + p_submit.add_argument("workflow_file", help="Path to workflow YAML file") + p_submit.set_defaults(func=cmd_submit) + + # Cancel + p_cancel = subparsers.add_parser("cancel", parents=[parent_parser], help="Cancel a workflow") + p_cancel.add_argument("wf_id", help="Workflow ID to cancel") + p_cancel.set_defaults(func=cmd_cancel) + + # Status + p_status = subparsers.add_parser("status", parents=[parent_parser], help="Get workflow status") + p_status.add_argument("wf_id", nargs="?", help="Optional Workflow ID") + p_status.set_defaults(func=cmd_status) + + # Shutdown + p_shutdown = subparsers.add_parser( + "shutdown", parents=[parent_parser], help="Shutdown the service" + ) + p_shutdown.set_defaults(func=cmd_shutdown) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/rose/service/api/rest.py b/rose/service/api/rest.py new file mode 100644 index 0000000..935f7e1 --- /dev/null +++ b/rose/service/api/rest.py @@ -0,0 +1,565 @@ +""" +ROSE Plugin for RADICAL-Edge +============================= + +This module provides a RADICAL-Edge plugin for ROSE (Remote Online Smart +Experiment) workflow management. It enables submission, monitoring, and +cancellation of Active Learning workflows through REST endpoints. + +Architecture +------------ +The plugin embeds workflow execution directly within the Edge service, +eliminating the need for a separate ServiceManager daemon. Each RoseSession +maintains its own WorkflowEngine and executes learner loops as async tasks. + +:: + + Client (Python/curl/browser) + ↓ HTTP/REST + RADICAL-Edge Bridge + ↓ WebSocket + Edge Service (with ROSE plugin) + ↓ + WorkflowEngine / Learners (embedded) + +Components +---------- +- **PluginRose**: The plugin class registered with RADICAL-Edge. Defines REST + routes and UI configuration for portal integration. + +- **RoseSession**: Server-side session managing workflow execution. Each + session lazily initializes a WorkflowEngine and tracks all submitted + workflows. + +- **RoseClient**: Application-side client providing synchronous methods for + workflow operations. + +REST Endpoints +-------------- +- ``POST /rose/register_session`` - Create a new session +- ``POST /rose/submit/{sid}`` - Submit a workflow YAML +- ``GET /rose/status/{sid}/{wf_id}`` - Get workflow status +- ``GET /rose/workflows/{sid}`` - List all workflows +- ``POST /rose/cancel/{sid}/{wf_id}`` - Cancel a workflow +- ``POST /rose/unregister_session/{sid}`` - Close session + +Notifications +------------- +The plugin sends real-time notifications via SSE when workflow state changes. +Clients can subscribe using ``RoseClient.on_workflow_state(callback)``. + +See Also +-------- +- ``rose.service.manager.WorkflowLoader`` - YAML parsing and learner creation +- ``rose.al.active_learner`` - SequentialActiveLearner, ParallelActiveLearner +- ``radical.edge.plugin_base.Plugin`` - Base plugin class +""" + +__author__ = "RADICAL Development Team" +__email__ = "radical@radical-project.org" +__copyright__ = "Copyright 2024, RADICAL@Rutgers" +__license__ = "MIT" + + +import asyncio +import logging +import time +import uuid + +from fastapi import FastAPI, HTTPException, Request +from radical.asyncflow import LocalExecutionBackend, WorkflowEngine +from radical.edge.client import PluginClient +from radical.edge.plugin_base import Plugin +from radical.edge.plugin_session_base import PluginSession +from radical.edge.ui_schema import ( + UIConfig, + UIField, + UIForm, + UIFormSubmit, + UIMonitor, + UINotifications, +) +from starlette.responses import JSONResponse + +from rose.al.active_learner import ParallelActiveLearner +from rose.service.manager import WorkflowLoader +from rose.service.models import Workflow, WorkflowState + +log = logging.getLogger("radical.edge") + + +# ------------------------------------------------------------------------------ +# +class RoseSession(PluginSession): + """ROSE session (service-side). + + Directly manages workflow execution using AsyncFlow, eliminating the need for a separate + ServiceManager process. + """ + + # -------------------------------------------------------------------------- + # + def __init__(self, sid: str): + """Initialize a RoseSession. + + Args: + sid (str): Unique session identifier assigned by the plugin. + """ + super().__init__(sid) + + self._workflows: dict[str, Workflow] = {} + self._learner_tasks: dict[str, asyncio.Task] = {} + self._engine: WorkflowEngine | None = None + self._engine_lock = asyncio.Lock() + self._initialized = False + + # -------------------------------------------------------------------------- + # + async def _ensure_engine(self): + """Lazily initialize the workflow engine.""" + if self._engine is not None: + return + + async with self._engine_lock: + if self._engine is not None: + return + + log.info(f"[{self.sid}] Initializing workflow engine") + backend = LocalExecutionBackend() + self._engine = await WorkflowEngine.create(backend) + self._initialized = True + log.info(f"[{self.sid}] Workflow engine ready") + + # -------------------------------------------------------------------------- + # + async def submit_workflow(self, workflow_file: str) -> dict: + """Submit a workflow YAML file for execution. + + Args: + workflow_file (str): Absolute or relative path to the workflow YAML. + + Returns: + dict: ``{wf_id}`` — the workflow ID. + """ + self._check_active() + await self._ensure_engine() + + # Generate workflow ID + wf_id = f"wf.{uuid.uuid4().hex[:8]}" + + # Create workflow record + wf = Workflow(wf_id=wf_id, state=WorkflowState.SUBMITTED, workflow_file=workflow_file) + self._workflows[wf_id] = wf + + # Notify submission + if self._notify: + self._notify( + "workflow_state", + {"wf_id": wf_id, "state": "SUBMITTED", "workflow_file": workflow_file}, + ) + + # Start workflow execution in background + task = asyncio.create_task(self._run_workflow(wf)) + self._learner_tasks[wf_id] = task + + log.info(f"[{self.sid}] Submitted workflow {wf_id}: {workflow_file}") + return {"wf_id": wf_id} + + # -------------------------------------------------------------------------- + # + async def _run_workflow(self, wf: Workflow): + """Execute a workflow (runs as background task).""" + wf_id = wf.wf_id + + try: + # Initialize + wf.state = WorkflowState.INITIALIZING + self._notify_state(wf) + + # Load workflow definition + wf_def = WorkflowLoader.load_yaml(wf.workflow_file) + learner, initial_config = WorkflowLoader.create_learner(wf_id, wf_def, self._engine) + wf.learner_instance = learner + + # Run + wf.state = WorkflowState.RUNNING + wf.start_time = time.time() + self._notify_state(wf) + + config = wf_def.get("config", {}) + learner_cfg = wf_def.get("learner", {}) + max_iter = config.get("max_iterations", learner_cfg.get("max_iterations", 10)) + + log.info(f"[{self.sid}] Running workflow {wf_id} (max_iterations={max_iter})") + + if isinstance(learner, ParallelActiveLearner): + parallel = config.get("parallel_learners", learner_cfg.get("parallel_learners", 2)) + configs = [initial_config] * parallel if initial_config else None + + results = await learner.start( + parallel_learners=parallel, max_iter=max_iter, learner_configs=configs + ) + wf.stats = {"parallel_results": [str(r) for r in results]} + + else: + # Sequential learner - async iterator + async for state in learner.start(max_iter=max_iter, initial_config=initial_config): + wf.stats = state.to_dict() + log.info( + f"[{self.sid}] {wf_id} iteration {state.iteration} " + f"(metric={state.metric_value})" + ) + self._notify_state(wf) + + # Completed + wf.state = WorkflowState.COMPLETED + wf.end_time = time.time() + log.info(f"[{self.sid}] Workflow {wf_id} completed") + + except asyncio.CancelledError: + wf.state = WorkflowState.CANCELED + wf.end_time = time.time() + log.info(f"[{self.sid}] Workflow {wf_id} canceled") + + except Exception as e: + wf.state = WorkflowState.FAILED + wf.error = str(e) + wf.end_time = time.time() + log.error(f"[{self.sid}] Workflow {wf_id} failed: {e}") + import traceback + + traceback.print_exc() + + finally: + self._notify_state(wf) + self._learner_tasks.pop(wf_id, None) + + # -------------------------------------------------------------------------- + # + def _notify_state(self, wf: Workflow): + """Send workflow state notification.""" + if self._notify: + self._notify( + "workflow_state", + {"wf_id": wf.wf_id, "state": wf.state.value, "stats": wf.stats, "error": wf.error}, + ) + + # -------------------------------------------------------------------------- + # + async def get_workflow_status(self, wf_id: str) -> dict: + """Return the current status of a workflow. + + Args: + wf_id (str): The workflow ID. + + Returns: + dict: Workflow state dictionary. + + Raises: + HTTPException(404): If the workflow ID is not found. + """ + self._check_active() + + wf = self._workflows.get(wf_id) + if not wf: + raise HTTPException(status_code=404, detail=f"workflow '{wf_id}' not found") + + return wf.to_dict() + + # -------------------------------------------------------------------------- + # + async def list_workflows(self) -> dict: + """List all workflows in this session. + + Returns: + dict: Mapping ``wf_id → state dict``. + """ + self._check_active() + + return {wf_id: wf.to_dict() for wf_id, wf in self._workflows.items()} + + # -------------------------------------------------------------------------- + # + async def cancel_workflow(self, wf_id: str) -> dict: + """Cancel a running workflow. + + Args: + wf_id (str): The workflow ID to cancel. + + Returns: + dict: ``{wf_id}`` confirming the cancellation. + """ + self._check_active() + + wf = self._workflows.get(wf_id) + if not wf: + raise HTTPException(status_code=404, detail=f"workflow '{wf_id}' not found") + + if wf.state not in ( + WorkflowState.RUNNING, + WorkflowState.INITIALIZING, + WorkflowState.SUBMITTED, + ): + raise HTTPException(status_code=400, detail=f"workflow '{wf_id}' not running") + + # Stop the learner + if wf.learner_instance: + wf.learner_instance.stop() + + # Cancel the task + task = self._learner_tasks.get(wf_id) + if task and not task.done(): + task.cancel() + + log.info(f"[{self.sid}] Canceling workflow {wf_id}") + + if self._notify: + self._notify("workflow_state", {"wf_id": wf_id, "state": "CANCELING"}) + + return {"wf_id": wf_id} + + # -------------------------------------------------------------------------- + # + async def close(self) -> dict: + """Close this session, stopping all workflows and cleaning up.""" + log.info(f"[{self.sid}] Closing session") + + # Stop all learners + for wf in self._workflows.values(): + if wf.learner_instance and wf.state == WorkflowState.RUNNING: + wf.learner_instance.stop() + + # Cancel all tasks + for task in self._learner_tasks.values(): + if not task.done(): + task.cancel() + + if self._learner_tasks: + await asyncio.gather(*self._learner_tasks.values(), return_exceptions=True) + self._learner_tasks.clear() + + # Shutdown engine + if self._engine: + await self._engine.shutdown() + self._engine = None + + return await super().close() + + +# ------------------------------------------------------------------------------ +# +class RoseClient(PluginClient): + """Application-side client for the ROSE plugin. + + Provides a thin sync wrapper over the HTTP endpoints exposed by + ``PluginRose``. + """ + + # -------------------------------------------------------------------------- + # + def on_workflow_state(self, callback): + """Register a callback for workflow state change notifications. + + Args: + callback: A callable(topic, data) to invoke on state changes. + """ + self.register_notification_callback(callback) + + # -------------------------------------------------------------------------- + # + def off_workflow_state(self, callback): + """Unregister a workflow state change callback. + + Args: + callback: The callback to unregister. + """ + self.unregister_notification_callback(callback) + + # -------------------------------------------------------------------------- + # + def submit_workflow(self, workflow_file: str) -> dict: + """Submit a workflow YAML file. + + Args: + workflow_file (str): Path to the workflow YAML file. + + Returns: + dict: ``{wf_id}``. + """ + if not self.sid: + raise RuntimeError("No active session") + + resp = self._http.post( + self._url(f"submit/{self.sid}"), json={"workflow_file": workflow_file} + ) + resp.raise_for_status() + + return resp.json() + + # -------------------------------------------------------------------------- + # + def get_workflow_status(self, wf_id: str) -> dict: + """Get the current status of a workflow. + + Args: + wf_id (str): Workflow ID. + + Returns: + dict: Workflow state dictionary. + """ + if not self.sid: + raise RuntimeError("No active session") + + resp = self._http.get(self._url(f"status/{self.sid}/{wf_id}")) + resp.raise_for_status() + + return resp.json() + + # -------------------------------------------------------------------------- + # + def list_workflows(self) -> dict: + """List all workflows in the session. + + Returns: + dict: Registry mapping ``wf_id → state dict``. + """ + if not self.sid: + raise RuntimeError("No active session") + + resp = self._http.get(self._url(f"workflows/{self.sid}")) + resp.raise_for_status() + + return resp.json() + + # -------------------------------------------------------------------------- + # + def cancel_workflow(self, wf_id: str) -> dict: + """Cancel a running workflow. + + Args: + wf_id (str): Workflow ID to cancel. + + Returns: + dict: ``{wf_id}``. + """ + if not self.sid: + raise RuntimeError("No active session") + + resp = self._http.post(self._url(f"cancel/{self.sid}/{wf_id}")) + resp.raise_for_status() + + return resp.json() + + +# ------------------------------------------------------------------------------ +# +class PluginRose(Plugin): + """ROSE plugin for RADICAL-Edge. + + Exposes workflow management via REST endpoints, with embedded execution + (no separate ServiceManager process required). + + Routes: + - POST /rose/register_session + - POST /rose/unregister_session/{sid} + - POST /rose/submit/{sid} + - GET /rose/status/{sid}/{wf_id} + - GET /rose/workflows/{sid} + - POST /rose/cancel/{sid}/{wf_id} + """ + + plugin_name = "rose" + session_class = RoseSession + client_class = RoseClient + version = "0.2.0" + session_ttl = 0 # No timeout - workflows can run for hours/days + + ui_config = UIConfig( + icon="🌹", + title="ROSE Active Learning", + description="Submit and monitor Active Learning workflows", + refresh_button=True, + forms=[ + UIForm( + id="submit", + title="Submit Workflow", + layout="single", + fields=[ + UIField( + name="workflow_file", + type="text", + label="Workflow File", + placeholder="/path/to/workflow.yaml", + required=True, + ) + ], + submit=UIFormSubmit(label="Submit", style="success", endpoint="submit/{sid}"), + ) + ], + monitors=[ + UIMonitor( + id="workflows", + title="Workflows", + type="task_list", + css_class="workflow-list", + empty_text="No workflows submitted yet", + auto_load="workflows/{sid}", + ) + ], + notifications=UINotifications( + topic="workflow_state", id_field="wf_id", state_field="state" + ), + ) + + # -------------------------------------------------------------------------- + # + def __init__(self, app: FastAPI, instance_name: str = "rose"): + """Initialize the ROSE plugin, registering all routes.""" + super().__init__(app, instance_name) + + self.add_route_post("submit/{sid}", self.submit_workflow) + self.add_route_get("status/{sid}/{wf_id}", self.get_workflow_status) + self.add_route_get("workflows/{sid}", self.list_workflows) + self.add_route_post("cancel/{sid}/{wf_id}", self.cancel_workflow) + + self._log_routes() + + # -------------------------------------------------------------------------- + # + async def submit_workflow(self, request: Request) -> JSONResponse: + """Submit a workflow YAML file.""" + sid = request.path_params["sid"] + data = await request.json() + + return await self._forward( + sid, RoseSession.submit_workflow, workflow_file=data.get("workflow_file") + ) + + # -------------------------------------------------------------------------- + # + async def get_workflow_status(self, request: Request) -> JSONResponse: + """Return the status of a specific workflow.""" + sid = request.path_params["sid"] + wf_id = request.path_params["wf_id"] + + return await self._forward(sid, RoseSession.get_workflow_status, wf_id=wf_id) + + # -------------------------------------------------------------------------- + # + async def list_workflows(self, request: Request) -> JSONResponse: + """List all workflows in the session.""" + sid = request.path_params["sid"] + + return await self._forward(sid, RoseSession.list_workflows) + + # -------------------------------------------------------------------------- + # + async def cancel_workflow(self, request: Request) -> JSONResponse: + """Cancel a running workflow.""" + sid = request.path_params["sid"] + wf_id = request.path_params["wf_id"] + + return await self._forward(sid, RoseSession.cancel_workflow, wf_id=wf_id) + + +# ------------------------------------------------------------------------------ diff --git a/rose/service/client.py b/rose/service/client.py new file mode 100644 index 0000000..a168824 --- /dev/null +++ b/rose/service/client.py @@ -0,0 +1,107 @@ +import json +import logging +import time +import uuid +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +class ServiceClient: + """Client for interacting with the ROSE Service via File-based IPC. + + Attributes: + job_id (str): The SLURM job ID where the service is running. + service_root (Path): Root directory for service IPC (~/.rose/services/). + """ + + @staticmethod + def get_wf_id(req_id: str) -> str: + """Derive Workflow ID from Request ID.""" + return f"wf.{req_id[:8]}" + + def __init__(self, job_id: str): + self.job_id = job_id + self.service_root = Path.home() / ".rose" / "services" / str(job_id) + self.requests_dir = self.service_root / "requests" + self.registry_file = self.service_root / "registry.json" + + if not self.service_root.exists(): + # It's possible the service hasn't started creating dirs yet, + # or the job ID is wrong. We don't raise immediately to allow + # retry logic in scripts, but warn if needed. + logger.warning(f"Service root {self.service_root} does not exist yet.") + + def _write_request(self, action: str, payload: dict[str, Any]) -> str: + """Write a request file to the requests directory.""" + req_id = str(uuid.uuid4()) + request_data = { + "id": req_id, + "action": action, + "timestamp": time.time(), + "payload": payload, + } + + # Ensure requests dir exists (client might start before service creates it? + # Better to assume service creates it, but safe to check) + if not self.requests_dir.exists(): + raise RuntimeError(f"Service requests directory not found: {self.requests_dir}") + + req_file = self.requests_dir / f"{action}_{req_id}.json" + with open(req_file, "w") as f: + json.dump(request_data, f, indent=2) + + return req_id + + def submit_workflow(self, workflow_file: str) -> str: + """Submit a workflow file to the service. + + Args: + workflow_file (str): Path to the workflow YAML file. + + Returns: + str: Request ID (not yet the wf_id, which depends on service processing). + """ + abs_path = str(Path(workflow_file).resolve()) + return self._write_request("submit", {"workflow_file": abs_path}) + + def cancel_workflow(self, wf_id: str) -> str: + """Request cancellation of a workflow. + + Args: + wf_id (str): The workflow ID to cancel. + """ + return self._write_request("cancel", {"wf_id": wf_id}) + + def shutdown(self) -> str: + """Request graceful shutdown of the service.""" + return self._write_request("shutdown", {}) + + def get_workflow_status(self, wf_id: str) -> dict[str, Any] | None: + """Get the current status of a workflow from the registry. + + Args: + wf_id (str): Workflow ID. + + Returns: + dict: Workflow state dict or None if not found. + """ + registry = self._read_registry() + return registry.get(wf_id) + + def list_workflows(self) -> dict[str, Any]: + """List all workflows in the registry.""" + return self._read_registry() + + def _read_registry(self) -> dict[str, Any]: + """Read and parse the registry file.""" + if not self.registry_file.exists(): + return {} + + try: + with open(self.registry_file) as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + # Race condition on read or empty file + return {} diff --git a/rose/service/manager.py b/rose/service/manager.py new file mode 100644 index 0000000..1d7f4e8 --- /dev/null +++ b/rose/service/manager.py @@ -0,0 +1,406 @@ +import asyncio +import importlib +import json +import logging +import os +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from radical.asyncflow import LocalExecutionBackend, WorkflowEngine + +from rose.al.active_learner import ParallelActiveLearner, SequentialActiveLearner +from rose.learner import LearnerConfig, TaskConfig + +from .client import ServiceClient +from .models import Workflow, WorkflowState + +logger = logging.getLogger(__name__) + + +class WorkflowLoader: + """Helper to load a Learner from a YAML definition.""" + + @staticmethod + def load_yaml(path: str) -> dict[str, Any]: + """Load YAML file (mocking yaml load with json for now or basic parsing if yaml lib not + avail? + + User environment might have PyYAML. Assuming yaml is available or using json for simplicity + if needed. The user request says 'workflow.yaml', so we should try to support YAML. If + PyYAML is not installed, we might fallback or error. Standard python doesn't have yaml. + """ + # For this implementation, I will assume PyYAML is available as it's common in this stack, + # or I will implement a very simple parser if restricted. + # Given "ROSE" context, PyYAML is likely a dependency. + # But to be safe and depend only on stdlib as requested ("standard libraries only" was for + # IPC, but let's stick to it), check if yaml module exists, otherwise parse as JSON. + # However, the user explicitly said "workflow.yaml". + # I will try to import yaml. + try: + import yaml + + with open(path) as f: + return yaml.safe_load(f) + except ImportError: + # Fallback: simpler parsing or expect JSON content in .yaml (not ideal) + logger.warning("PyYAML not found, trying JSON parsing for workflow file") + with open(path) as f: + return json.load(f) + + @staticmethod + def _import_function(path_str: str) -> Callable: + """Import a function from a module path string 'package.module.func'.""" + try: + module_name, func_name = path_str.rsplit(".", 1) + module = importlib.import_module(module_name) + return getattr(module, func_name) + except (ValueError, ImportError, AttributeError) as e: + raise ImportError(f"Could not import function '{path_str}': {e}") from e + + @staticmethod + def _create_script_task_factory(script_path: str) -> Callable: + """Create a task function that returns the script path + arguments. + + Args: + script_path: The base command or script path. + """ + + async def task_func(*args, **kwargs): + # Extract string arguments to append to the command. + # Skip Task objects (dependencies). + cmd_parts = [script_path] + for arg in args: + if isinstance(arg, str): + cmd_parts.append(arg) + + for k, v in kwargs.items(): + if isinstance(v, bool): + if v: + cmd_parts.append(f"--{k}") + else: + cmd_parts.append(f"--{k} {v}") + + return " ".join(cmd_parts) + + return task_func + + @classmethod + def create_learner(cls, wf_id: str, workflow_def: dict[str, Any], asyncflow: WorkflowEngine): + """Create and configure a Learner based on the YAML definition.""" + + learner_def = workflow_def.get("learner", {}) + l_type = learner_def.get("type", "SequentialActiveLearner") + l_path = learner_def.get("path") + + # 1. Instantiate Learner Class + if l_path: + # Load custom learner class + try: + module_name, class_name = l_path.rsplit(".", 1) + module = importlib.import_module(module_name) + learner_cls = getattr(module, class_name) + except (ValueError, ImportError, AttributeError) as e: + raise ImportError(f"Could not import learner class '{l_path}': {e}") from e + elif l_type == "SequentialActiveLearner": + learner_cls = SequentialActiveLearner + else: + # Try to find it in rose.al or similar if we want to support more built-ins + raise ValueError(f"Unknown learner type '{l_type}' and no path provided.") + + learner = learner_cls(asyncflow) + learner.learner_id = wf_id + # rose/active_learner.py: learner_id (Optional[int]). + # I should probably hash the wf_id or just set it if it accepts Any? + # The type hint says Optional[int]. Let's ignore type hint for a moment or hash it. + learner.learner_id = hash(wf_id) + + components = workflow_def.get("components", {}) + + # 2. Register Components + # Expecting structure: + # components: + # simulation: + # type: function | script + # path: ... + # config: ... + + for name in ["simulation", "training", "active_learn", "criterion"]: + comp_def = components.get(name) + if not comp_def: + continue + + ctype = comp_def.get("type", "script") # Default to script? + cpath = comp_def.get("path") + + task_func = None + as_executable = True + + if ctype == "function": + task_func = cls._import_function(cpath) + as_executable = False + else: + # Script + task_func = cls._create_script_task_factory(cpath) + as_executable = True + + # Register using the appropriate decorator + if name == "simulation": + logger.info(f"Registering simulation task for workflow {wf_id}") + learner.simulation_task(as_executable=as_executable)(task_func) + elif name == "training": + logger.info(f"Registering training task for workflow {wf_id}") + learner.training_task(as_executable=as_executable)(task_func) + elif name == "active_learn": + logger.info(f"Registering active_learn task for workflow {wf_id}") + learner.active_learn_task(as_executable=as_executable)(task_func) + elif name == "criterion": + # Special handling for criterion + logger.info(f"Registering criterion task for workflow {wf_id}") + threshold = comp_def.get("threshold", 0.0) + metric = comp_def.get("metric", "CUSTOM") + learner.as_stop_criterion( + metric_name=metric, threshold=threshold, as_executable=as_executable + )(task_func) + + # 3. Build initial LearnerConfig from component defs + # This ensures that args/kwargs specified in YAML are used + l_config = LearnerConfig() + for name in ["simulation", "training", "active_learn", "criterion"]: + comp_def = components.get(name) + if comp_def and "config" in comp_def: + c_config = comp_def["config"] + t_config = TaskConfig( + args=tuple(c_config.get("args", ())), kwargs=c_config.get("kwargs", {}) + ) + setattr(l_config, name, t_config) + + return learner, l_config + + +class ServiceManager: + def __init__(self, job_id: str): + self.job_id = job_id + self.service_root = Path.home() / ".rose" / "services" / str(job_id) + self.requests_dir = self.service_root / "requests" + self.registry_file = self.service_root / "registry.json" + + self.workflows: dict[str, Workflow] = {} + self.engine: WorkflowEngine | None = None + self._learner_tasks: list[asyncio.Task] = [] + self._shutdown = False + + async def initialize(self): + """Setup directories and backend.""" + self.requests_dir.mkdir(parents=True, exist_ok=True) + + backend = LocalExecutionBackend() + self.engine = await WorkflowEngine.create(backend) + logger.info(f"Service initialized at {self.service_root}") + + async def _process_requests(self): + """Pick up json files from requests_dir.""" + if not self.requests_dir.exists(): + return + + # Sort by mtime to process in order? + req_files = sorted(self.requests_dir.glob("*.json"), key=os.path.getmtime) + + for req_file in req_files: + try: + with open(req_file) as f: + req = json.load(f) + + action = req.get("action") + payload = req.get("payload", {}) + + if action == "submit": + await self._handle_submit(req.get("id"), payload) + elif action == "cancel": + await self._handle_cancel(payload) + elif action == "shutdown": + logger.info("Shutdown request received via IPC") + self._shutdown = True + + # Remove request file after processing + req_file.unlink() + + except Exception as e: + logger.error(f"Error processing request {req_file}: {e}") + # Move to failed_requests? Or just delete? + # For now, delete to avoid loop + try: + req_file.unlink() + except Exception: + pass + + async def _handle_submit(self, req_id: str, payload: dict[str, Any]): + wf_file = payload.get("workflow_file") + if not wf_file: + logger.error("No workflow file in submit payload") + return + + # Use request ID as part of wf_id or generate new? + # User goal: "assigned a unique workflow identifier (wf_id)" + wf_id = ServiceClient.get_wf_id(req_id) + + wf = Workflow(wf_id=wf_id, state=WorkflowState.INITIALIZING, workflow_file=wf_file) + self.workflows[wf_id] = wf + self._update_registry() + + try: + wf_def = WorkflowLoader.load_yaml(wf_file) + learner, initial_l_config = WorkflowLoader.create_learner(wf_id, wf_def, self.engine) + wf.learner_instance = learner + + # Merge with top-level config if needed (e.g. if we want to override via top-level) + # For now, initial_l_config from components is primary. + + # Start the learner loop as a background task + task = asyncio.create_task( + self._run_learner(wf, wf_def.get("config", {}), initial_l_config) + ) + self._learner_tasks.append(task) + + except Exception as e: + logger.error(f"Failed to submit workflow {wf_id}: {e}") + wf.state = WorkflowState.FAILED + wf.error = str(e) + self._update_registry() + + async def _handle_cancel(self, payload: dict[str, Any]): + wf_id = payload.get("wf_id") + wf = self.workflows.get(wf_id) + if wf and wf.state in [ + WorkflowState.RUNNING, + WorkflowState.INITIALIZING, + WorkflowState.SUBMITTED, + ]: + logger.info(f"Canceling workflow {wf_id}") + if wf.learner_instance: + wf.learner_instance.stop() # Cooperative cancel + wf.state = WorkflowState.CANCELED + self._update_registry() + + async def _run_learner( + self, + wf: Workflow, + workflow_def: dict[str, Any], + initial_l_config: LearnerConfig | None = None, + ): + """Driver loop for a single workflow.""" + wf.state = WorkflowState.RUNNING + wf.start_time = asyncio.get_event_loop().time() + logger.info(f"Starting workflow {wf.wf_id} ({wf.workflow_file})") + self._update_registry() + + try: + learner_cfg = workflow_def.get("learner", {}) + max_iter = learner_cfg.get("max_iterations", workflow_def.get("max_iterations", 0)) + + # Identify learner type and call appropriately + if isinstance(wf.learner_instance, ParallelActiveLearner): + parallel_learners = learner_cfg.get( + "parallel_learners", workflow_def.get("parallel_learners", 2) + ) + + # ParallelActiveLearner.start doesn't take initial_config, + # but we can map it to learner_configs + l_configs = None + if initial_l_config: + l_configs = [initial_l_config] * parallel_learners + + async for state in wf.learner_instance.start( + parallel_learners=parallel_learners, + max_iter=max_iter, + learner_configs=l_configs, + ): + wf.stats = ( + state.to_dict() if hasattr(state, "to_dict") else {"result": str(state)} + ) + learner_id = getattr(state, "learner_id", "?") + iteration = getattr(state, "iteration", "?") + metric = getattr(state, "metric_value", "?") + logger.info( + f"Workflow {wf.wf_id} - Learner {learner_id}," + f" iteration {iteration} completed (metric: {metric})" + ) + self._update_registry() + logger.info( + f"Workflow {wf.wf_id} - Parallel execution of" + f" {parallel_learners} learners finished" + ) + else: + # SequentialActiveLearner or other async iterator + async for state in wf.learner_instance.start( + max_iter=max_iter, initial_config=initial_l_config + ): + wf.stats = state.to_dict() + logger.info( + f"Workflow {wf.wf_id} - Iteration {state.iteration}" + f" completed (metric: {state.metric_value})" + ) + self._update_registry() + + wf.state = WorkflowState.COMPLETED + logger.info(f"Workflow {wf.wf_id} completed successfully") + except Exception as e: + wf.state = WorkflowState.FAILED + wf.error = str(e) + logger.error(f"Workflow {wf.wf_id} failed: {e}") + import traceback + + traceback.print_exc() + finally: + wf.end_time = asyncio.get_event_loop().time() + self._update_registry() + + def _update_registry(self): + """Dump registry to json.""" + data = {wf_id: wf.to_dict() for wf_id, wf in self.workflows.items()} + tmp_file = self.registry_file.with_suffix(".tmp") + with open(tmp_file, "w") as f: + json.dump(data, f, indent=2) + tmp_file.replace(self.registry_file) + + async def run(self): + """Main Service Loop.""" + try: + await self.initialize() + logger.info("Service Manager Running") + + while not self._shutdown: + await self._process_requests() + await asyncio.sleep(0.1) # Polling interval + finally: + await self.shutdown() + + async def shutdown(self): + self._shutdown = True + logger.info("Service Shutting Down...") + + # 1. Stop all learners + if self.workflows: + logger.info(f"Stopping {len(self.workflows)} workflows") + for wf in self.workflows.values(): + if wf.learner_instance: + wf.learner_instance.stop() + + # 2. Cancel and wait for learner tasks + if self._learner_tasks: + logger.info(f"Canceling {len(self._learner_tasks)} learner tasks") + for task in self._learner_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*self._learner_tasks, return_exceptions=True) + self._learner_tasks.clear() + logger.info("All learner tasks stopped") + + # 3. Shutdown engine + if self.engine: + logger.info("Shutting down workflow engine") + await self.engine.shutdown() + self.engine = None + logger.info("Workflow engine shut down") + + logger.info("Service shutdown complete") diff --git a/rose/service/models.py b/rose/service/models.py new file mode 100644 index 0000000..838041e --- /dev/null +++ b/rose/service/models.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + + +class WorkflowState(Enum): + """Lifecycle states for a ROSE Workflow/Learner.""" + + SUBMITTED = "SUBMITTED" # Received but not yet started + INITIALIZING = "INITIALIZING" # Loading config and resources + RUNNING = "RUNNING" # Active execution + COMPLETED = "COMPLETED" # Finished successfully + FAILED = "FAILED" # Terminated with error + CANCELED = "CANCELED" # Stopped by user request + + +@dataclass +class Workflow: + """Represents a managed workflow (learner) instance.""" + + wf_id: str + state: WorkflowState = WorkflowState.SUBMITTED + workflow_file: str = "" + start_time: float = 0.0 + end_time: float = 0.0 + stats: dict[str, Any] = field(default_factory=dict) + error: str | None = None + + # Internal reference to the actual Learner object + # This is not serialized to JSON + learner_instance: Any = field(default=None, repr=False) + + def to_dict(self) -> dict[str, Any]: + """Serializable representation for external monitoring.""" + return { + "wf_id": self.wf_id, + "state": self.state.value, + "workflow_file": self.workflow_file, + "start_time": self.start_time, + "end_time": self.end_time, + "stats": self.stats, + "error": self.error, + } diff --git a/tests/unit/test_rose_plugin.py b/tests/unit/test_rose_plugin.py new file mode 100644 index 0000000..4a96942 --- /dev/null +++ b/tests/unit/test_rose_plugin.py @@ -0,0 +1,377 @@ +"""Unit tests for the ROSE Edge Plugin. + +Tests the RoseSession, RoseClient, and WorkflowLoader classes. +""" + +import asyncio +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from rose.service.api.rest import PluginRose, RoseClient, RoseSession +from rose.service.manager import WorkflowLoader +from rose.service.models import Workflow, WorkflowState + +# ----------------------------------------------------------------------------- +# Fixtures +# ----------------------------------------------------------------------------- + + +@pytest.fixture +def mock_engine(): + """Create a mock WorkflowEngine.""" + engine = AsyncMock() + engine.shutdown = AsyncMock() + return engine + + +@pytest.fixture +def rose_session(): + """Create a RoseSession for testing.""" + session = RoseSession(sid="test-session-001") + return session + + +@pytest.fixture +def sample_workflow_yaml(tmp_path): + """Create a sample workflow YAML file.""" + wf_content = """ +learner: + type: SequentialActiveLearner + +components: + simulation: + type: script + path: /bin/echo + config: + args: ["sim"] + training: + type: script + path: /bin/echo + config: + args: ["train"] + active_learn: + type: script + path: /bin/echo + config: + args: ["al"] + +config: + max_iterations: 2 + work_dir: /tmp/rose_test +""" + wf_file = tmp_path / "test_workflow.yaml" + wf_file.write_text(wf_content) + return str(wf_file) + + +# ----------------------------------------------------------------------------- +# WorkflowLoader Tests +# ----------------------------------------------------------------------------- + + +class TestWorkflowLoader: + """Tests for WorkflowLoader class.""" + + def test_load_yaml_valid(self, sample_workflow_yaml): + """Test loading a valid YAML workflow file.""" + wf_def = WorkflowLoader.load_yaml(sample_workflow_yaml) + + assert "learner" in wf_def + assert wf_def["learner"]["type"] == "SequentialActiveLearner" + assert "components" in wf_def + assert "simulation" in wf_def["components"] + assert "config" in wf_def + assert wf_def["config"]["max_iterations"] == 2 + + def test_load_yaml_file_not_found(self): + """Test loading a non-existent file raises error.""" + with pytest.raises(FileNotFoundError): + WorkflowLoader.load_yaml("/nonexistent/path/workflow.yaml") + + def test_create_learner_sequential(self, sample_workflow_yaml, mock_engine): + """Test creating a SequentialActiveLearner from YAML.""" + wf_def = WorkflowLoader.load_yaml(sample_workflow_yaml) + learner, config = WorkflowLoader.create_learner("wf.test001", wf_def, mock_engine) + + from rose.al.active_learner import SequentialActiveLearner + + assert isinstance(learner, SequentialActiveLearner) + assert learner.learner_id == hash("wf.test001") + + def test_import_function_valid(self): + """Test importing a valid function path.""" + func = WorkflowLoader._import_function("os.path.exists") + import os + + assert func == os.path.exists + + def test_import_function_invalid(self): + """Test importing an invalid function path raises error.""" + with pytest.raises(ImportError): + WorkflowLoader._import_function("nonexistent.module.func") + + def test_create_script_task_factory(self): + """Test creating a script task factory.""" + factory = WorkflowLoader._create_script_task_factory("/bin/echo") + + # The factory should be an async function + assert asyncio.iscoroutinefunction(factory) + + +# ----------------------------------------------------------------------------- +# RoseSession Tests +# ----------------------------------------------------------------------------- + + +class TestRoseSession: + """Tests for RoseSession class.""" + + def test_init(self, rose_session): + """Test RoseSession initialization.""" + assert rose_session.sid == "test-session-001" + assert rose_session.is_active + assert rose_session._workflows == {} + assert rose_session._engine is None + + @pytest.mark.asyncio + async def test_ensure_engine(self, rose_session): + """Test lazy engine initialization.""" + with ( + patch("rose.service.api.rest.LocalExecutionBackend") as mock_backend, + patch("rose.service.api.rest.WorkflowEngine") as mock_engine_cls, + ): + mock_backend.return_value = Mock() + mock_engine_cls.create = AsyncMock(return_value=Mock()) + + await rose_session._ensure_engine() + + assert rose_session._engine is not None + mock_engine_cls.create.assert_called_once() + + @pytest.mark.asyncio + async def test_submit_workflow(self, rose_session, sample_workflow_yaml): + """Test workflow submission.""" + # Mock the engine and workflow execution + with ( + patch.object(rose_session, "_ensure_engine", new_callable=AsyncMock), + patch.object(rose_session, "_run_workflow", new_callable=AsyncMock), + ): + rose_session._engine = Mock() + + result = await rose_session.submit_workflow(sample_workflow_yaml) + + assert "wf_id" in result + assert result["wf_id"].startswith("wf.") + assert result["wf_id"] in rose_session._workflows + assert rose_session._workflows[result["wf_id"]].state == WorkflowState.SUBMITTED + + @pytest.mark.asyncio + async def test_get_workflow_status_found(self, rose_session): + """Test getting status of an existing workflow.""" + # Add a workflow manually + wf = Workflow(wf_id="wf.test123", state=WorkflowState.RUNNING) + rose_session._workflows["wf.test123"] = wf + + status = await rose_session.get_workflow_status("wf.test123") + + assert status["wf_id"] == "wf.test123" + assert status["state"] == "RUNNING" + + @pytest.mark.asyncio + async def test_get_workflow_status_not_found(self, rose_session): + """Test getting status of non-existent workflow raises 404.""" + from fastapi import HTTPException + + with pytest.raises(HTTPException) as exc_info: + await rose_session.get_workflow_status("wf.nonexistent") + + assert exc_info.value.status_code == 404 + + @pytest.mark.asyncio + async def test_list_workflows(self, rose_session): + """Test listing all workflows.""" + # Add some workflows + rose_session._workflows["wf.001"] = Workflow(wf_id="wf.001", state=WorkflowState.COMPLETED) + rose_session._workflows["wf.002"] = Workflow(wf_id="wf.002", state=WorkflowState.RUNNING) + + result = await rose_session.list_workflows() + + assert len(result) == 2 + assert "wf.001" in result + assert "wf.002" in result + assert result["wf.001"]["state"] == "COMPLETED" + assert result["wf.002"]["state"] == "RUNNING" + + @pytest.mark.asyncio + async def test_cancel_workflow(self, rose_session): + """Test canceling a running workflow.""" + # Add a running workflow with mock learner + mock_learner = Mock() + mock_task = AsyncMock() + mock_task.done.return_value = False + + wf = Workflow(wf_id="wf.cancel", state=WorkflowState.RUNNING) + wf.learner_instance = mock_learner + rose_session._workflows["wf.cancel"] = wf + rose_session._learner_tasks["wf.cancel"] = mock_task + + result = await rose_session.cancel_workflow("wf.cancel") + + assert result["wf_id"] == "wf.cancel" + mock_learner.stop.assert_called_once() + mock_task.cancel.assert_called_once() + + @pytest.mark.asyncio + async def test_cancel_workflow_not_running(self, rose_session): + """Test canceling a completed workflow raises 400.""" + from fastapi import HTTPException + + wf = Workflow(wf_id="wf.done", state=WorkflowState.COMPLETED) + rose_session._workflows["wf.done"] = wf + + with pytest.raises(HTTPException) as exc_info: + await rose_session.cancel_workflow("wf.done") + + assert exc_info.value.status_code == 400 + + @pytest.mark.asyncio + async def test_close_session(self, rose_session): + """Test closing session stops all workflows.""" + # Add a running workflow + mock_learner = Mock() + mock_task = AsyncMock() + mock_task.done.return_value = False + + wf = Workflow(wf_id="wf.close", state=WorkflowState.RUNNING) + wf.learner_instance = mock_learner + rose_session._workflows["wf.close"] = wf + rose_session._learner_tasks["wf.close"] = mock_task + + rose_session._engine = AsyncMock() + rose_session._engine.shutdown = AsyncMock() + + result = await rose_session.close() + + assert result == {} + assert not rose_session.is_active + mock_learner.stop.assert_called_once() + rose_session._engine.shutdown.assert_called_once() + + @pytest.mark.asyncio + async def test_session_closed_check(self, rose_session): + """Test operations on closed session raise error.""" + await rose_session.close() + + with pytest.raises(RuntimeError, match="session is closed"): + await rose_session.list_workflows() + + +# ----------------------------------------------------------------------------- +# RoseClient Tests +# ----------------------------------------------------------------------------- + + +class TestRoseClient: + """Tests for RoseClient class.""" + + @pytest.fixture + def mock_http(self): + """Create a mock HTTP client.""" + http = Mock() + response = Mock() + response.json.return_value = {"sid": "session.abc123"} + response.raise_for_status = Mock() + http.post.return_value = response + http.get.return_value = response + return http + + @pytest.fixture + def rose_client(self, mock_http): + """Create a RoseClient with mocked HTTP.""" + client = RoseClient(mock_http, "/test/rose") + client._sid = "session.test" + return client + + def test_submit_workflow(self, rose_client, mock_http): + """Test submitting a workflow via client.""" + mock_http.post.return_value.json.return_value = {"wf_id": "wf.new"} + + result = rose_client.submit_workflow("/path/to/wf.yaml") + + assert result == {"wf_id": "wf.new"} + mock_http.post.assert_called() + + def test_submit_workflow_no_session(self, mock_http): + """Test submit without session raises error.""" + client = RoseClient(mock_http, "/test/rose") + # No session registered + + with pytest.raises(RuntimeError, match="No active session"): + client.submit_workflow("/path/to/wf.yaml") + + def test_get_workflow_status(self, rose_client, mock_http): + """Test getting workflow status via client.""" + mock_http.get.return_value.json.return_value = {"wf_id": "wf.123", "state": "RUNNING"} + + result = rose_client.get_workflow_status("wf.123") + + assert result["wf_id"] == "wf.123" + assert result["state"] == "RUNNING" + + def test_list_workflows(self, rose_client, mock_http): + """Test listing workflows via client.""" + mock_http.get.return_value.json.return_value = { + "wf.001": {"state": "COMPLETED"}, + "wf.002": {"state": "RUNNING"}, + } + + result = rose_client.list_workflows() + + assert len(result) == 2 + + def test_cancel_workflow(self, rose_client, mock_http): + """Test canceling workflow via client.""" + mock_http.post.return_value.json.return_value = {"wf_id": "wf.cancel"} + + result = rose_client.cancel_workflow("wf.cancel") + + assert result["wf_id"] == "wf.cancel" + + def test_notification_callbacks(self, rose_client): + """Test registering notification callbacks.""" + callback = Mock() + + # Should not raise + with patch.object(rose_client, "register_notification_callback"): + rose_client.on_workflow_state(callback) + rose_client.register_notification_callback.assert_called_with(callback) + + with patch.object(rose_client, "unregister_notification_callback"): + rose_client.off_workflow_state(callback) + rose_client.unregister_notification_callback.assert_called_with(callback) + + +# ----------------------------------------------------------------------------- +# PluginRose Tests +# ----------------------------------------------------------------------------- + + +class TestPluginRose: + """Tests for PluginRose class.""" + + def test_plugin_attributes(self): + """Test plugin class attributes.""" + assert PluginRose.plugin_name == "rose" + assert PluginRose.session_class == RoseSession + assert PluginRose.client_class == RoseClient + assert PluginRose.version == "0.2.0" + assert PluginRose.session_ttl == 0 + + def test_ui_config(self): + """Test UI configuration is defined.""" + assert PluginRose.ui_config is not None + assert PluginRose.ui_config.title == "ROSE Active Learning" + assert len(PluginRose.ui_config.forms) == 1 + assert len(PluginRose.ui_config.monitors) == 1 + assert PluginRose.ui_config.notifications is not None