diff --git a/src/radical/asyncflow/workflow_manager.py b/src/radical/asyncflow/workflow_manager.py index 4fb76b4..96d21af 100644 --- a/src/radical/asyncflow/workflow_manager.py +++ b/src/radical/asyncflow/workflow_manager.py @@ -5,6 +5,7 @@ import inspect import logging import os +import shlex import signal from collections import defaultdict, deque from functools import wraps @@ -419,7 +420,20 @@ def wrapper(*args, **kwargs): async def async_wrapper(): try: if task_type == EXECUTABLE: - comp_desc[EXECUTABLE] = await func(*args, **kwargs) + cmd = await func(*args, **kwargs) + if not cmd or not isinstance(cmd, str): + raise ValueError( + f"Executable task '{func.__name__}' must return " + "a non-empty command string" + ) + parts = shlex.split(cmd) + if not parts: + raise ValueError( + f"Executable task '{func.__name__}' must return " + "a non-empty command string" + ) + comp_desc[EXECUTABLE] = parts[0] + comp_desc["arguments"] = parts[1:] elif task_type == PROMPT: comp_desc[PROMPT] = await func(*args, **kwargs) else: @@ -488,18 +502,6 @@ def _register_component( comp_desc["uid"] = self._assign_uid(prefix=comp_type) if task_type == EXECUTABLE: - # For executable tasks, validate the executable value - executable_value = comp_desc.get(EXECUTABLE) - if executable_value is None: - raise ValueError( - f"Executable task '{comp_desc['name']}' returned None — " - "must return a string command" - ) - if not isinstance(executable_value, str): - raise ValueError( - "Executable task must return a string, got " - f"{type(executable_value)}" - ) comp_desc[FUNCTION] = None # Clear function since we're using executable elif task_type == PROMPT: # For prompt tasks, validate the prompt value diff --git a/tests/unit/test_executable_task.py b/tests/unit/test_executable_task.py new file mode 100644 index 0000000..8a56ff7 --- /dev/null +++ b/tests/unit/test_executable_task.py @@ -0,0 +1,153 @@ +"""Unit tests for executable_task decorator — shlex splitting behaviour.""" + +import asyncio + +import pytest + +from radical.asyncflow import WorkflowEngine +from radical.asyncflow.workflow_manager import EXECUTABLE, FUNCTION + +# --------------------------------------------------------------------------- +# Split correctness +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_executable_task_simple_split(): + """'echo hello' → executable='echo', arguments=['hello'].""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return "echo hello" + + await t() + await asyncio.sleep(0.05) + + desc = next(iter(engine.components.values()))["description"] + assert desc[EXECUTABLE] == "echo" + assert desc["arguments"] == ["hello"] + assert desc[FUNCTION] is None + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_executable_task_no_arguments(): + """'ls' → executable='ls', arguments=[].""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return "ls" + + await t() + await asyncio.sleep(0.05) + + desc = next(iter(engine.components.values()))["description"] + assert desc[EXECUTABLE] == "ls" + assert desc["arguments"] == [] + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_executable_task_multiple_arguments(): + """'echo hello world' → arguments=['hello', 'world'].""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return "echo hello world" + + await t() + await asyncio.sleep(0.05) + + desc = next(iter(engine.components.values()))["description"] + assert desc[EXECUTABLE] == "echo" + assert desc["arguments"] == ["hello", "world"] + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_executable_task_quoted_argument(): + """Shlex preserves quoted tokens as single arguments.""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return 'echo "hello world"' + + await t() + await asyncio.sleep(0.05) + + desc = next(iter(engine.components.values()))["description"] + assert desc[EXECUTABLE] == "echo" + assert desc["arguments"] == ["hello world"] + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_executable_task_path_with_spaces(): + """Executable paths with spaces quoted correctly.""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return '"/usr/my tool" --verbose' + + await t() + await asyncio.sleep(0.05) + + desc = next(iter(engine.components.values()))["description"] + assert desc[EXECUTABLE] == "/usr/my tool" + assert desc["arguments"] == ["--verbose"] + await engine.shutdown() + + +# --------------------------------------------------------------------------- +# Error cases +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_executable_task_none_return_raises(): + """Returning None raises ValueError.""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return None + + with pytest.raises(ValueError, match="non-empty command string"): + await t() + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_executable_task_empty_string_raises(): + """Returning '' raises ValueError.""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return "" + + with pytest.raises(ValueError, match="non-empty command string"): + await t() + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_executable_task_whitespace_only_raises(): + """Returning whitespace-only string raises ValueError.""" + engine = await WorkflowEngine.create(dry_run=True) + + @engine.executable_task + async def t(): + return " " + + with pytest.raises(ValueError, match="non-empty command string"): + await t() + + await engine.shutdown()