Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions src/radical/asyncflow/workflow_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import inspect
import logging
import os
import shlex
import signal
from collections import defaultdict, deque
from functools import wraps
Expand Down Expand Up @@ -419,7 +420,20 @@ def wrapper(*args, **kwargs):
async def async_wrapper():
try:
if task_type == EXECUTABLE:
comp_desc[EXECUTABLE] = await func(*args, **kwargs)
cmd = await func(*args, **kwargs)
if not cmd or not isinstance(cmd, str):
raise ValueError(
f"Executable task '{func.__name__}' must return "
"a non-empty command string"
)
parts = shlex.split(cmd)
if not parts:
raise ValueError(
f"Executable task '{func.__name__}' must return "
"a non-empty command string"
)
comp_desc[EXECUTABLE] = parts[0]
comp_desc["arguments"] = parts[1:]
elif task_type == PROMPT:
comp_desc[PROMPT] = await func(*args, **kwargs)
else:
Expand Down Expand Up @@ -488,18 +502,6 @@ def _register_component(
comp_desc["uid"] = self._assign_uid(prefix=comp_type)

if task_type == EXECUTABLE:
# For executable tasks, validate the executable value
executable_value = comp_desc.get(EXECUTABLE)
if executable_value is None:
raise ValueError(
f"Executable task '{comp_desc['name']}' returned None — "
"must return a string command"
)
if not isinstance(executable_value, str):
raise ValueError(
"Executable task must return a string, got "
f"{type(executable_value)}"
)
comp_desc[FUNCTION] = None # Clear function since we're using executable
elif task_type == PROMPT:
# For prompt tasks, validate the prompt value
Expand Down
153 changes: 153 additions & 0 deletions tests/unit/test_executable_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""Unit tests for executable_task decorator — shlex splitting behaviour."""

import asyncio

import pytest

from radical.asyncflow import WorkflowEngine
from radical.asyncflow.workflow_manager import EXECUTABLE, FUNCTION

# ---------------------------------------------------------------------------
# Split correctness
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_executable_task_simple_split():
"""'echo hello' → executable='echo', arguments=['hello']."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return "echo hello"

await t()
await asyncio.sleep(0.05)

desc = next(iter(engine.components.values()))["description"]
assert desc[EXECUTABLE] == "echo"
assert desc["arguments"] == ["hello"]
assert desc[FUNCTION] is None
await engine.shutdown()


@pytest.mark.asyncio
async def test_executable_task_no_arguments():
"""'ls' → executable='ls', arguments=[]."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return "ls"

await t()
await asyncio.sleep(0.05)

desc = next(iter(engine.components.values()))["description"]
assert desc[EXECUTABLE] == "ls"
assert desc["arguments"] == []
await engine.shutdown()


@pytest.mark.asyncio
async def test_executable_task_multiple_arguments():
"""'echo hello world' → arguments=['hello', 'world']."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return "echo hello world"

await t()
await asyncio.sleep(0.05)

desc = next(iter(engine.components.values()))["description"]
assert desc[EXECUTABLE] == "echo"
assert desc["arguments"] == ["hello", "world"]
await engine.shutdown()


@pytest.mark.asyncio
async def test_executable_task_quoted_argument():
"""Shlex preserves quoted tokens as single arguments."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return 'echo "hello world"'

await t()
await asyncio.sleep(0.05)

desc = next(iter(engine.components.values()))["description"]
assert desc[EXECUTABLE] == "echo"
assert desc["arguments"] == ["hello world"]
await engine.shutdown()


@pytest.mark.asyncio
async def test_executable_task_path_with_spaces():
"""Executable paths with spaces quoted correctly."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return '"/usr/my tool" --verbose'

await t()
await asyncio.sleep(0.05)

desc = next(iter(engine.components.values()))["description"]
assert desc[EXECUTABLE] == "/usr/my tool"
assert desc["arguments"] == ["--verbose"]
await engine.shutdown()


# ---------------------------------------------------------------------------
# Error cases
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_executable_task_none_return_raises():
"""Returning None raises ValueError."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return None

with pytest.raises(ValueError, match="non-empty command string"):
await t()

await engine.shutdown()


@pytest.mark.asyncio
async def test_executable_task_empty_string_raises():
"""Returning '' raises ValueError."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return ""

with pytest.raises(ValueError, match="non-empty command string"):
await t()

await engine.shutdown()


@pytest.mark.asyncio
async def test_executable_task_whitespace_only_raises():
"""Returning whitespace-only string raises ValueError."""
engine = await WorkflowEngine.create(dry_run=True)

@engine.executable_task
async def t():
return " "

with pytest.raises(ValueError, match="non-empty command string"):
await t()

await engine.shutdown()