Skip to content

Commit abba8c2

Browse files
authored
feat: Register workflow via WorkflowDefinition instead of raw callable (#37)
<!-- Describe what has changed in this PR --> **What changed?** - Added `WorkflowDefinition` class with type-safe wrapper for workflow functions, including `.fn`, `.name`, and `.params` properties for metadata access - Registry now stores `WorkflowDefinition` internally instead of raw callables, with `get_workflow()` returning the typed wrapper - Introduced `WorkflowParameter` dataclass for parameter introspection (name, type hints, defaults) and `@defn` decorator for standalone workflow definition <!-- Tell your future self why have you made these changes --> **Why?** - Enables compile-time type checking via mypy and better IDE support, consistent with existing `ActivityDefinition` pattern - Enables parameter validation, automatic API documentation, dynamic workflow discovery, and better error messages <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Tim Li <[email protected]>
1 parent afef579 commit abba8c2

File tree

11 files changed

+460
-204
lines changed

11 files changed

+460
-204
lines changed

cadence/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@
66

77
# Import main client functionality
88
from .client import Client
9+
from .worker import Registry
10+
from . import workflow
911

1012
__version__ = "0.1.0"
1113

1214
__all__ = [
1315
"Client",
16+
"Registry",
17+
"workflow",
1418
]

cadence/_internal/workflow/workflow_engine.py

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ class DecisionResult:
2020
decisions: list[Decision]
2121

2222
class WorkflowEngine:
23-
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
23+
def __init__(self, info: WorkflowInfo, client: Client, workflow_definition=None):
2424
self._context = Context(client, info)
25-
self._workflow_func = workflow_func
25+
self._workflow_definition = workflow_definition
26+
self._workflow_instance = None
27+
if workflow_definition:
28+
self._workflow_instance = workflow_definition.cls()
2629
self._decision_manager = DecisionManager()
2730
self._decisions_helper = DecisionsHelper(self._decision_manager)
2831
self._is_workflow_complete = False
@@ -250,19 +253,17 @@ def _fallback_process_workflow_history(self, history) -> None:
250253
async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
251254
"""
252255
Execute the workflow function to generate new decisions.
253-
256+
254257
This blocks until the workflow schedules an activity or completes.
255-
258+
256259
Args:
257260
decision_task: The decision task containing workflow context
258261
"""
259262
try:
260-
# Execute the workflow function
261-
# The workflow function should block until it schedules an activity
262-
workflow_func = self._workflow_func
263-
if workflow_func is None:
263+
# Execute the workflow function from the workflow instance
264+
if self._workflow_definition is None or self._workflow_instance is None:
264265
logger.warning(
265-
"No workflow function available",
266+
"No workflow definition or instance available",
266267
extra={
267268
"workflow_type": self._context.info().workflow_type,
268269
"workflow_id": self._context.info().workflow_id,
@@ -271,11 +272,14 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
271272
)
272273
return
273274

275+
# Get the workflow run method from the instance
276+
workflow_func = self._workflow_definition.get_run_method(self._workflow_instance)
277+
274278
# Extract workflow input from history
275279
workflow_input = await self._extract_workflow_input(decision_task)
276280

277281
# Execute workflow function
278-
result = self._execute_workflow_function_once(workflow_func, workflow_input)
282+
result = await self._execute_workflow_function_once(workflow_func, workflow_input)
279283

280284
# Check if workflow is complete
281285
if result is not None:
@@ -290,7 +294,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
290294
"completion_type": "success"
291295
}
292296
)
293-
297+
294298
except Exception as e:
295299
logger.error(
296300
"Error executing workflow function",
@@ -337,7 +341,7 @@ async def _extract_workflow_input(self, decision_task: PollForDecisionTaskRespon
337341
logger.warning("No WorkflowExecutionStarted event found in history")
338342
return None
339343

340-
def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
344+
async def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
341345
"""
342346
Execute the workflow function once (not during replay).
343347
@@ -351,23 +355,9 @@ def _execute_workflow_function_once(self, workflow_func: Callable, workflow_inpu
351355
logger.debug(f"Executing workflow function with input: {workflow_input}")
352356
result = workflow_func(workflow_input)
353357

354-
# If the workflow function is async, we need to handle it properly
358+
# If the workflow function is async, await it properly
355359
if asyncio.iscoroutine(result):
356-
# For now, use asyncio.run for async workflow functions
357-
# TODO: Implement proper deterministic event loop for workflow execution
358-
try:
359-
result = asyncio.run(result)
360-
except RuntimeError:
361-
# If we're already in an event loop, create a new task
362-
loop = asyncio.get_event_loop()
363-
if loop.is_running():
364-
# We can't use asyncio.run inside a running loop
365-
# For now, just get the result (this may not be deterministic)
366-
logger.warning("Async workflow function called within running event loop - may not be deterministic")
367-
# This is a workaround - in a real implementation, we'd need proper task scheduling
368-
result = None
369-
else:
370-
result = loop.run_until_complete(result)
360+
result = await result
371361

372362
return result
373363

cadence/worker/_decision_task_handler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
7676
)
7777

7878
try:
79-
workflow_func = self._registry.get_workflow(workflow_type_name)
79+
workflow_definition = self._registry.get_workflow(workflow_type_name)
8080
except KeyError:
8181
logger.error(
8282
"Workflow type not found in registry",
@@ -103,9 +103,9 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
103103
workflow_engine = self._workflow_engines.get(cache_key)
104104
if workflow_engine is None:
105105
workflow_engine = WorkflowEngine(
106-
info=workflow_info,
107-
client=self._client,
108-
workflow_func=workflow_func
106+
info=workflow_info,
107+
client=self._client,
108+
workflow_definition=workflow_definition
109109
)
110110
self._workflow_engines[cache_key] = workflow_engine
111111

cadence/worker/_registry.py

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77
"""
88

99
import logging
10-
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload
10+
from typing import Callable, Dict, Optional, Unpack, TypedDict, overload, Type, Union, TypeVar
1111
from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T
12+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
1213

1314
logger = logging.getLogger(__name__)
1415

16+
# TypeVar for workflow class types
17+
W = TypeVar('W')
18+
1519

1620
class RegisterWorkflowOptions(TypedDict, total=False):
1721
"""Options for registering a workflow."""
@@ -28,53 +32,58 @@ class Registry:
2832

2933
def __init__(self) -> None:
3034
"""Initialize the registry."""
31-
self._workflows: Dict[str, Callable] = {}
35+
self._workflows: Dict[str, WorkflowDefinition] = {}
3236
self._activities: Dict[str, ActivityDefinition] = {}
3337
self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping
3438

3539
def workflow(
3640
self,
37-
func: Optional[Callable] = None,
41+
cls: Optional[Type[W]] = None,
3842
**kwargs: Unpack[RegisterWorkflowOptions]
39-
) -> Callable:
43+
) -> Union[Type[W], Callable[[Type[W]], Type[W]]]:
4044
"""
41-
Register a workflow function.
42-
45+
Register a workflow class.
46+
4347
This method can be used as a decorator or called directly.
44-
48+
Only supports class-based workflows.
49+
4550
Args:
46-
func: The workflow function to register
51+
cls: The workflow class to register
4752
**kwargs: Options for registration (name, alias)
48-
53+
4954
Returns:
50-
The decorated function or the function itself
51-
55+
The decorated class
56+
5257
Raises:
5358
KeyError: If workflow name already exists
59+
ValueError: If class workflow is invalid
5460
"""
5561
options = RegisterWorkflowOptions(**kwargs)
56-
57-
def decorator(f: Callable) -> Callable:
58-
workflow_name = options.get('name') or f.__name__
59-
62+
63+
def decorator(target: Type[W]) -> Type[W]:
64+
workflow_name = options.get('name') or target.__name__
65+
6066
if workflow_name in self._workflows:
6167
raise KeyError(f"Workflow '{workflow_name}' is already registered")
62-
63-
self._workflows[workflow_name] = f
64-
68+
69+
# Create WorkflowDefinition with type information
70+
workflow_opts = WorkflowDefinitionOptions(name=workflow_name)
71+
workflow_def = WorkflowDefinition.wrap(target, workflow_opts)
72+
self._workflows[workflow_name] = workflow_def
73+
6574
# Register alias if provided
6675
alias = options.get('alias')
6776
if alias:
6877
if alias in self._workflow_aliases:
6978
raise KeyError(f"Workflow alias '{alias}' is already registered")
7079
self._workflow_aliases[alias] = workflow_name
71-
80+
7281
logger.info(f"Registered workflow '{workflow_name}'")
73-
return f
74-
75-
if func is None:
82+
return target
83+
84+
if cls is None:
7685
return decorator
77-
return decorator(func)
86+
return decorator(cls)
7887

7988
@overload
8089
def activity(self, func: Callable[P, T]) -> ActivityDefinition[P, T]:
@@ -135,25 +144,25 @@ def _register_activity(self, defn: ActivityDefinition) -> None:
135144
self._activities[defn.name] = defn
136145

137146

138-
def get_workflow(self, name: str) -> Callable:
147+
def get_workflow(self, name: str) -> WorkflowDefinition:
139148
"""
140149
Get a registered workflow by name.
141-
150+
142151
Args:
143152
name: Name or alias of the workflow
144-
153+
145154
Returns:
146-
The workflow function
147-
155+
The workflow definition
156+
148157
Raises:
149158
KeyError: If workflow is not found
150159
"""
151160
# Check if it's an alias
152161
actual_name = self._workflow_aliases.get(name, name)
153-
162+
154163
if actual_name not in self._workflows:
155164
raise KeyError(f"Workflow '{name}' not found in registry")
156-
165+
157166
return self._workflows[actual_name]
158167

159168
def get_activity(self, name: str) -> ActivityDefinition:
@@ -188,7 +197,7 @@ def of(*args: 'Registry') -> 'Registry':
188197

189198
return result
190199

191-
def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]:
200+
def _find_activity_definitions(instance: object) -> list[ActivityDefinition]:
192201
attr_to_def = {}
193202
for t in instance.__class__.__mro__:
194203
for attr in dir(t):
@@ -200,10 +209,7 @@ def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]
200209
raise ValueError(f"'{attr}' was overridden with a duplicate activity definition")
201210
attr_to_def[attr] = value
202211

203-
# Create new definitions, copying the attributes from the declaring type but using the function
204-
# from the specific object. This allows for the decorator to be applied to the base class and the
205-
# function to be overridden
206-
result = []
212+
result: list[ActivityDefinition] = []
207213
for attr, definition in attr_to_def.items():
208214
result.append(ActivityDefinition(getattr(instance, attr), definition.name, definition.strategy, definition.params))
209215

0 commit comments

Comments
 (0)