Skip to content

Commit ca8fcd6

Browse files
committed
respond to comments
Signed-off-by: Tim Li <[email protected]>
1 parent 2d999a1 commit ca8fcd6

File tree

10 files changed

+78
-98
lines changed

10 files changed

+78
-98
lines changed

cadence/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
# Import main client functionality
88
from .client import Client
99
from .worker import Registry
10-
from .workflow import workflow
10+
from . import workflow
1111

1212
__version__ = "0.1.0"
1313

cadence/_internal/workflow/workflow_engine.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
279279
workflow_input = await self._extract_workflow_input(decision_task)
280280

281281
# Execute workflow function
282-
result = self._execute_workflow_function_once(workflow_func, workflow_input)
282+
result = await self._execute_workflow_function_once(workflow_func, workflow_input)
283283

284284
# Check if workflow is complete
285285
if result is not None:
@@ -341,7 +341,7 @@ async def _extract_workflow_input(self, decision_task: PollForDecisionTaskRespon
341341
logger.warning("No WorkflowExecutionStarted event found in history")
342342
return None
343343

344-
def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
344+
async def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
345345
"""
346346
Execute the workflow function once (not during replay).
347347
@@ -355,23 +355,9 @@ def _execute_workflow_function_once(self, workflow_func: Callable, workflow_inpu
355355
logger.debug(f"Executing workflow function with input: {workflow_input}")
356356
result = workflow_func(workflow_input)
357357

358-
# If the workflow function is async, we need to handle it properly
358+
# If the workflow function is async, await it properly
359359
if asyncio.iscoroutine(result):
360-
# For now, use asyncio.run for async workflow functions
361-
# TODO: Implement proper deterministic event loop for workflow execution
362-
try:
363-
result = asyncio.run(result)
364-
except RuntimeError:
365-
# If we're already in an event loop, create a new task
366-
loop = asyncio.get_event_loop()
367-
if loop.is_running():
368-
# We can't use asyncio.run inside a running loop
369-
# For now, just get the result (this may not be deterministic)
370-
logger.warning("Async workflow function called within running event loop - may not be deterministic")
371-
# This is a workaround - in a real implementation, we'd need proper task scheduling
372-
result = None
373-
else:
374-
result = loop.run_until_complete(result)
360+
result = await result
375361

376362
return result
377363

cadence/worker/_registry.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@
77
"""
88

99
import logging
10-
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload, Type, Union
10+
from typing import Callable, Dict, Optional, Unpack, TypedDict, overload, Type, Union, TypeVar
1111
from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T
1212
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
1313

1414
logger = logging.getLogger(__name__)
1515

16+
# TypeVar for workflow class types
17+
W = TypeVar('W')
18+
1619

1720
class RegisterWorkflowOptions(TypedDict, total=False):
1821
"""Options for registering a workflow."""
@@ -35,9 +38,9 @@ def __init__(self) -> None:
3538

3639
def workflow(
3740
self,
38-
cls: Optional[Type] = None,
41+
cls: Optional[Type[W]] = None,
3942
**kwargs: Unpack[RegisterWorkflowOptions]
40-
) -> Union[Type, Callable[[Type], Type]]:
43+
) -> Union[Type[W], Callable[[Type[W]], Type[W]]]:
4144
"""
4245
Register a workflow class.
4346
@@ -57,7 +60,7 @@ def workflow(
5760
"""
5861
options = RegisterWorkflowOptions(**kwargs)
5962

60-
def decorator(target: Type) -> Type:
63+
def decorator(target: Type[W]) -> Type[W]:
6164
workflow_name = options.get('name') or target.__name__
6265

6366
if workflow_name in self._workflows:
@@ -194,7 +197,7 @@ def of(*args: 'Registry') -> 'Registry':
194197

195198
return result
196199

197-
def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]:
200+
def _find_activity_definitions(instance: object) -> list[ActivityDefinition]:
198201
attr_to_def = {}
199202
for t in instance.__class__.__mro__:
200203
for attr in dir(t):
@@ -206,10 +209,7 @@ def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]
206209
raise ValueError(f"'{attr}' was overridden with a duplicate activity definition")
207210
attr_to_def[attr] = value
208211

209-
# Create new definitions, copying the attributes from the declaring type but using the function
210-
# from the specific object. This allows for the decorator to be applied to the base class and the
211-
# function to be overridden
212-
result = []
212+
result: list[ActivityDefinition] = []
213213
for attr, definition in attr_to_def.items():
214214
result.append(ActivityDefinition(getattr(instance, attr), definition.name, definition.strategy, definition.params))
215215

cadence/workflow.py

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
from contextlib import contextmanager
33
from contextvars import ContextVar
44
from dataclasses import dataclass
5-
from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any
6-
from functools import wraps
5+
from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any, Optional, Union
6+
import inspect
77

88
from cadence.client import Client
99

10-
T = TypeVar('T')
10+
T = TypeVar('T', bound=Callable[..., Any])
1111

1212

1313
class WorkflowDefinitionOptions(TypedDict, total=False):
@@ -23,9 +23,10 @@ class WorkflowDefinition:
2323
Provides type safety and metadata for workflow classes.
2424
"""
2525

26-
def __init__(self, cls: Type, name: str):
26+
def __init__(self, cls: Type, name: str, run_method_name: str):
2727
self._cls = cls
2828
self._name = name
29+
self._run_method_name = run_method_name
2930

3031
@property
3132
def name(self) -> str:
@@ -39,13 +40,7 @@ def cls(self) -> Type:
3940

4041
def get_run_method(self, instance: Any) -> Callable:
4142
"""Get the workflow run method from an instance of the workflow class."""
42-
for attr_name in dir(instance):
43-
if attr_name.startswith('_'):
44-
continue
45-
attr = getattr(instance, attr_name)
46-
if callable(attr) and hasattr(attr, '_workflow_run'):
47-
return cast(Callable, attr)
48-
raise ValueError(f"No @workflow.run method found in class {self._cls.__name__}")
43+
return cast(Callable, getattr(instance, self._run_method_name))
4944

5045
@staticmethod
5146
def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition':
@@ -66,8 +61,8 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition':
6661
if "name" in opts and opts["name"]:
6762
name = opts["name"]
6863

69-
# Validate that the class has exactly one run method
70-
run_method_count = 0
64+
# Validate that the class has exactly one run method and find it
65+
run_method_name = None
7166
for attr_name in dir(cls):
7267
if attr_name.startswith('_'):
7368
continue
@@ -78,40 +73,54 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition':
7873

7974
# Check for workflow run method
8075
if hasattr(attr, '_workflow_run'):
81-
run_method_count += 1
76+
if run_method_name is not None:
77+
raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}")
78+
run_method_name = attr_name
8279

83-
if run_method_count == 0:
80+
if run_method_name is None:
8481
raise ValueError(f"No @workflow.run method found in class {cls.__name__}")
85-
elif run_method_count > 1:
86-
raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}")
8782

88-
return WorkflowDefinition(cls, name)
83+
return WorkflowDefinition(cls, name, run_method_name)
8984

9085

91-
def run(func: Callable[..., T]) -> Callable[..., T]:
86+
def run(func: Optional[T] = None) -> Union[T, Callable[[T], T]]:
9287
"""
9388
Decorator to mark a method as the main workflow run method.
9489
90+
Can be used with or without parentheses:
91+
@workflow.run
92+
async def my_workflow(self):
93+
...
94+
95+
@workflow.run()
96+
async def my_workflow(self):
97+
...
98+
9599
Args:
96100
func: The method to mark as the workflow run method
97101
98102
Returns:
99103
The decorated method with workflow run metadata
104+
105+
Raises:
106+
ValueError: If the function is not async
100107
"""
101-
@wraps(func)
102-
def wrapper(*args, **kwargs):
103-
return func(*args, **kwargs)
104-
105-
# Attach metadata to the function
106-
wrapper._workflow_run = True # type: ignore
107-
return wrapper
108-
109-
110-
# Create a simple namespace object for the workflow decorators
111-
class _WorkflowNamespace:
112-
run = staticmethod(run)
113-
114-
workflow = _WorkflowNamespace()
108+
def decorator(f: T) -> T:
109+
# Validate that the function is async
110+
if not inspect.iscoroutinefunction(f):
111+
raise ValueError(f"Workflow run method '{f.__name__}' must be async")
112+
113+
# Attach metadata to the function
114+
f._workflow_run = True # type: ignore
115+
return f
116+
117+
# Support both @workflow.run and @workflow.run()
118+
if func is None:
119+
# Called with parentheses: @workflow.run()
120+
return decorator
121+
else:
122+
# Called without parentheses: @workflow.run
123+
return decorator(func)
115124

116125

117126
@dataclass

tests/cadence/_internal/workflow/test_workflow_engine_integration.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType
1010
from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes
1111
from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult
12-
from cadence.workflow import WorkflowInfo
12+
from cadence import workflow
13+
from cadence.workflow import WorkflowInfo, WorkflowDefinition, WorkflowDefinitionOptions
1314
from cadence.client import Client
1415

1516

@@ -38,11 +39,9 @@ def workflow_info(self):
3839
@pytest.fixture
3940
def mock_workflow_definition(self):
4041
"""Create a mock workflow definition."""
41-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
42-
4342
class TestWorkflow:
4443
@workflow.run
45-
def weird_name(self, input_data):
44+
async def weird_name(self, input_data):
4645
return f"processed: {input_data}"
4746

4847
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
@@ -214,39 +213,42 @@ async def test_extract_workflow_input_deserialization_error(self, workflow_engin
214213
# Verify no input was extracted due to error
215214
assert input_data is None
216215

217-
def test_execute_workflow_function_sync(self, workflow_engine):
216+
@pytest.mark.asyncio
217+
async def test_execute_workflow_function_sync(self, workflow_engine):
218218
"""Test synchronous workflow function execution."""
219219
input_data = "test-input"
220220

221221
# Get the workflow function from the instance
222222
workflow_func = workflow_engine._workflow_definition.get_run_method(workflow_engine._workflow_instance)
223223

224224
# Execute the workflow function
225-
result = workflow_engine._execute_workflow_function_once(workflow_func, input_data)
225+
result = await workflow_engine._execute_workflow_function_once(workflow_func, input_data)
226226

227227
# Verify the result
228228
assert result == "processed: test-input"
229229

230-
def test_execute_workflow_function_async(self, workflow_engine):
230+
@pytest.mark.asyncio
231+
async def test_execute_workflow_function_async(self, workflow_engine):
231232
"""Test asynchronous workflow function execution."""
232233
async def async_workflow_func(input_data):
233234
return f"async-processed: {input_data}"
234235

235236
input_data = "test-input"
236237

237238
# Execute the async workflow function
238-
result = workflow_engine._execute_workflow_function_once(async_workflow_func, input_data)
239+
result = await workflow_engine._execute_workflow_function_once(async_workflow_func, input_data)
239240

240241
# Verify the result
241242
assert result == "async-processed: test-input"
242243

243-
def test_execute_workflow_function_none(self, workflow_engine):
244+
@pytest.mark.asyncio
245+
async def test_execute_workflow_function_none(self, workflow_engine):
244246
"""Test workflow function execution with None function."""
245247
input_data = "test-input"
246248

247249
# Execute with None workflow function - should raise TypeError
248250
with pytest.raises(TypeError, match="'NoneType' object is not callable"):
249-
workflow_engine._execute_workflow_function_once(None, input_data)
251+
await workflow_engine._execute_workflow_function_once(None, input_data)
250252

251253
def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_definition):
252254
"""Test WorkflowEngine initialization."""
@@ -281,11 +283,9 @@ async def test_workflow_engine_workflow_completion(self, workflow_engine, mock_c
281283
decision_task = self.create_mock_decision_task()
282284

283285
# Create a workflow definition that returns a result (indicating completion)
284-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
285-
286286
class CompletingWorkflow:
287287
@workflow.run
288-
def run(self, input_data):
288+
async def run(self, input_data):
289289
return "workflow-completed"
290290

291291
workflow_opts = WorkflowDefinitionOptions(name="completing_workflow")

tests/cadence/worker/test_decision_task_handler.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from cadence.worker._decision_task_handler import DecisionTaskHandler
1818
from cadence.worker._registry import Registry
1919
from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult
20+
from cadence import workflow
21+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
2022

2123

2224
class TestDecisionTaskHandler:
@@ -83,8 +85,6 @@ def test_initialization(self, mock_client, mock_registry):
8385
async def test_handle_task_implementation_success(self, handler, sample_decision_task, mock_registry):
8486
"""Test successful decision task handling."""
8587
# Create actual workflow definition
86-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
87-
8888
class MockWorkflow:
8989
@workflow.run
9090
async def run(self):
@@ -151,8 +151,6 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp
151151
async def test_handle_task_implementation_caches_engines(self, handler, sample_decision_task, mock_registry):
152152
"""Test that decision task handler caches workflow engines for same workflow execution."""
153153
# Create actual workflow definition
154-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
155-
156154
class MockWorkflow:
157155
@workflow.run
158156
async def run(self):
@@ -189,8 +187,6 @@ async def run(self):
189187
async def test_handle_task_implementation_different_executions_get_separate_engines(self, handler, mock_registry):
190188
"""Test that different workflow executions get separate engines."""
191189
# Create actual workflow definition
192-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
193-
194190
class MockWorkflow:
195191
@workflow.run
196192
async def run(self):
@@ -348,8 +344,6 @@ async def test_respond_decision_task_completed_error(self, handler, sample_decis
348344
async def test_workflow_engine_creation_with_workflow_info(self, handler, sample_decision_task, mock_registry):
349345
"""Test that WorkflowEngine is created with correct WorkflowInfo."""
350346
# Create actual workflow definition
351-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
352-
353347
class MockWorkflow:
354348
@workflow.run
355349
async def run(self):

tests/cadence/worker/test_decision_task_handler_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from cadence.api.v1.decision_pb2 import Decision
1414
from cadence.worker._decision_task_handler import DecisionTaskHandler
1515
from cadence.worker._registry import Registry
16-
from cadence.workflow import workflow
16+
from cadence import workflow
1717
from cadence.client import Client
1818

1919

tests/cadence/worker/test_decision_worker_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes
1212
from cadence.worker._decision import DecisionWorker
1313
from cadence.worker._registry import Registry
14-
from cadence.workflow import workflow
14+
from cadence import workflow
1515
from cadence.client import Client
1616

1717

tests/cadence/worker/test_registry.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
import pytest
77

88
from cadence import activity
9+
from cadence import workflow
910
from cadence.worker import Registry
10-
from cadence.workflow import workflow, WorkflowDefinition
11+
from cadence.workflow import WorkflowDefinition
1112
from tests.cadence import common_activities
1213

1314

0 commit comments

Comments
 (0)