Skip to content

Commit 7a49de6

Browse files
authored
Integrate decision handler into worker (cadence-workflow#31)
<!-- Describe what has changed in this PR --> **What changed?** integrate decision handler into worker <!-- Tell your future self why have you made these changes --> **Why?** now that handler is implemented, we will integrate that with decision worker <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Tim Li <[email protected]>
1 parent c183c31 commit 7a49de6

12 files changed

+1796
-75
lines changed

cadence/_internal/workflow/context.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import Optional
12
from cadence.client import Client
23
from cadence.workflow import WorkflowContext, WorkflowInfo
34

@@ -7,9 +8,27 @@ class Context(WorkflowContext):
78
def __init__(self, client: Client, info: WorkflowInfo):
89
self._client = client
910
self._info = info
11+
self._replay_mode = True
12+
self._replay_current_time_milliseconds: Optional[int] = None
1013

1114
def info(self) -> WorkflowInfo:
1215
return self._info
1316

1417
def client(self) -> Client:
1518
return self._client
19+
20+
def set_replay_mode(self, replay: bool) -> None:
21+
"""Set whether the workflow is currently in replay mode."""
22+
self._replay_mode = replay
23+
24+
def is_replay_mode(self) -> bool:
25+
"""Check if the workflow is currently in replay mode."""
26+
return self._replay_mode
27+
28+
def set_replay_current_time_milliseconds(self, time_millis: int) -> None:
29+
"""Set the current replay time in milliseconds."""
30+
self._replay_current_time_milliseconds = time_millis
31+
32+
def get_replay_current_time_milliseconds(self) -> Optional[int]:
33+
"""Get the current replay time in milliseconds."""
34+
return self._replay_current_time_milliseconds

cadence/_internal/workflow/decision_events_iterator.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,10 @@ async def next_decision_events(self) -> DecisionEvents:
157157
decision_events.events.append(decision_task_started)
158158

159159
# Update replay time if available
160-
if hasattr(decision_task_started, 'event_time') and decision_task_started.event_time:
161-
self._replay_current_time_milliseconds = getattr(
162-
decision_task_started.event_time, 'seconds', 0
163-
) * 1000
160+
if decision_task_started.event_time:
161+
self._replay_current_time_milliseconds = (
162+
decision_task_started.event_time.seconds * 1000
163+
)
164164
decision_events.replay_current_time_milliseconds = self._replay_current_time_milliseconds
165165

166166
# Process subsequent events until we find the corresponding DecisionTask completion
Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
"""
2+
DecisionsHelper manages the next decision ID which is used for tracking decision state machines.
3+
4+
This helper ensures that decision IDs are properly assigned and tracked to maintain
5+
consistency in the workflow execution state.
6+
"""
7+
8+
import logging
9+
from dataclasses import dataclass
10+
from typing import Dict, Optional
11+
12+
from cadence._internal.decision_state_machine import DecisionId, DecisionType, DecisionManager
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
@dataclass
18+
class DecisionTracker:
19+
"""Tracks a decision with its ID and current state."""
20+
21+
decision_id: DecisionId
22+
scheduled_event_id: Optional[int] = None
23+
initiated_event_id: Optional[int] = None
24+
started_event_id: Optional[int] = None
25+
is_completed: bool = False
26+
27+
28+
class DecisionsHelper:
29+
"""
30+
Helper class to manage decision IDs and work with DecisionManager state machines.
31+
32+
This class generates unique decision IDs and integrates with the DecisionManager
33+
state machines for proper decision lifecycle tracking.
34+
"""
35+
36+
def __init__(self, decision_manager: DecisionManager):
37+
"""
38+
Initialize the DecisionsHelper with a DecisionManager reference.
39+
40+
Args:
41+
decision_manager: The DecisionManager containing the state machines
42+
"""
43+
self._next_decision_counters: Dict[DecisionType, int] = {}
44+
self._tracked_decisions: Dict[str, DecisionTracker] = {}
45+
self._decision_id_to_key: Dict[str, str] = {}
46+
self._decision_manager = decision_manager
47+
logger.debug("DecisionsHelper initialized with DecisionManager integration")
48+
49+
def _get_next_counter(self, decision_type: DecisionType) -> int:
50+
"""
51+
Get the next counter value for a given decision type.
52+
53+
Args:
54+
decision_type: The type of decision
55+
56+
Returns:
57+
The next counter value
58+
"""
59+
if decision_type not in self._next_decision_counters:
60+
self._next_decision_counters[decision_type] = 1
61+
else:
62+
self._next_decision_counters[decision_type] += 1
63+
64+
return self._next_decision_counters[decision_type]
65+
66+
def generate_activity_id(self, activity_name: str) -> str:
67+
"""
68+
Generate a unique activity ID.
69+
70+
Args:
71+
activity_name: The name of the activity
72+
73+
Returns:
74+
A unique activity ID
75+
"""
76+
counter = self._get_next_counter(DecisionType.ACTIVITY)
77+
activity_id = f"{activity_name}_{counter}"
78+
79+
# Track this decision
80+
decision_id = DecisionId(DecisionType.ACTIVITY, activity_id)
81+
tracker = DecisionTracker(decision_id)
82+
self._tracked_decisions[activity_id] = tracker
83+
self._decision_id_to_key[str(decision_id)] = activity_id
84+
85+
logger.debug(f"Generated activity ID: {activity_id}")
86+
return activity_id
87+
88+
def generate_timer_id(self, timer_name: str = "timer") -> str:
89+
"""
90+
Generate a unique timer ID.
91+
92+
Args:
93+
timer_name: The name/prefix for the timer
94+
95+
Returns:
96+
A unique timer ID
97+
"""
98+
counter = self._get_next_counter(DecisionType.TIMER)
99+
timer_id = f"{timer_name}_{counter}"
100+
101+
# Track this decision
102+
decision_id = DecisionId(DecisionType.TIMER, timer_id)
103+
tracker = DecisionTracker(decision_id)
104+
self._tracked_decisions[timer_id] = tracker
105+
self._decision_id_to_key[str(decision_id)] = timer_id
106+
107+
logger.debug(f"Generated timer ID: {timer_id}")
108+
return timer_id
109+
110+
def generate_child_workflow_id(self, workflow_name: str) -> str:
111+
"""
112+
Generate a unique child workflow ID.
113+
114+
Args:
115+
workflow_name: The name of the child workflow
116+
117+
Returns:
118+
A unique child workflow ID
119+
"""
120+
counter = self._get_next_counter(DecisionType.CHILD_WORKFLOW)
121+
workflow_id = f"{workflow_name}_{counter}"
122+
123+
# Track this decision
124+
decision_id = DecisionId(DecisionType.CHILD_WORKFLOW, workflow_id)
125+
tracker = DecisionTracker(decision_id)
126+
self._tracked_decisions[workflow_id] = tracker
127+
self._decision_id_to_key[str(decision_id)] = workflow_id
128+
129+
logger.debug(f"Generated child workflow ID: {workflow_id}")
130+
return workflow_id
131+
132+
def generate_marker_id(self, marker_name: str) -> str:
133+
"""
134+
Generate a unique marker ID.
135+
136+
Args:
137+
marker_name: The name of the marker
138+
139+
Returns:
140+
A unique marker ID
141+
"""
142+
counter = self._get_next_counter(DecisionType.MARKER)
143+
marker_id = f"{marker_name}_{counter}"
144+
145+
# Track this decision
146+
decision_id = DecisionId(DecisionType.MARKER, marker_id)
147+
tracker = DecisionTracker(decision_id)
148+
self._tracked_decisions[marker_id] = tracker
149+
self._decision_id_to_key[str(decision_id)] = marker_id
150+
151+
logger.debug(f"Generated marker ID: {marker_id}")
152+
return marker_id
153+
154+
def get_decision_tracker(self, decision_key: str) -> Optional[DecisionTracker]:
155+
"""
156+
Get the decision tracker for a given decision key.
157+
158+
Args:
159+
decision_key: The decision key (activity_id, timer_id, etc.)
160+
161+
Returns:
162+
The DecisionTracker if found, None otherwise
163+
"""
164+
return self._tracked_decisions.get(decision_key)
165+
166+
def update_decision_scheduled(
167+
self, decision_key: str, scheduled_event_id: int
168+
) -> None:
169+
"""
170+
Update a decision tracker when it gets scheduled.
171+
172+
Args:
173+
decision_key: The decision key
174+
scheduled_event_id: The event ID when the decision was scheduled
175+
"""
176+
tracker = self._tracked_decisions.get(decision_key)
177+
if tracker:
178+
tracker.scheduled_event_id = scheduled_event_id
179+
logger.debug(
180+
f"Updated decision {decision_key} with scheduled event ID {scheduled_event_id}"
181+
)
182+
else:
183+
logger.warning(f"No tracker found for decision key: {decision_key}")
184+
185+
def update_decision_initiated(
186+
self, decision_key: str, initiated_event_id: int
187+
) -> None:
188+
"""
189+
Update a decision tracker when it gets initiated.
190+
191+
Args:
192+
decision_key: The decision key
193+
initiated_event_id: The event ID when the decision was initiated
194+
"""
195+
tracker = self._tracked_decisions.get(decision_key)
196+
if tracker:
197+
tracker.initiated_event_id = initiated_event_id
198+
logger.debug(
199+
f"Updated decision {decision_key} with initiated event ID {initiated_event_id}"
200+
)
201+
else:
202+
logger.warning(f"No tracker found for decision key: {decision_key}")
203+
204+
def update_decision_started(self, decision_key: str, started_event_id: int) -> None:
205+
"""
206+
Update a decision tracker when it gets started.
207+
208+
Args:
209+
decision_key: The decision key
210+
started_event_id: The event ID when the decision was started
211+
"""
212+
tracker = self._tracked_decisions.get(decision_key)
213+
if tracker:
214+
tracker.started_event_id = started_event_id
215+
logger.debug(
216+
f"Updated decision {decision_key} with started event ID {started_event_id}"
217+
)
218+
else:
219+
logger.warning(f"No tracker found for decision key: {decision_key}")
220+
221+
def update_decision_completed(self, decision_key: str) -> None:
222+
"""
223+
Mark a decision as completed.
224+
225+
Args:
226+
decision_key: The decision key
227+
"""
228+
tracker = self._tracked_decisions.get(decision_key)
229+
if tracker:
230+
tracker.is_completed = True
231+
logger.debug(f"Marked decision {decision_key} as completed")
232+
else:
233+
logger.warning(f"No tracker found for decision key: {decision_key}")
234+
235+
236+
def _find_decision_by_scheduled_event_id(
237+
self, scheduled_event_id: int
238+
) -> Optional[str]:
239+
"""Find a decision key by its scheduled event ID."""
240+
for key, tracker in self._tracked_decisions.items():
241+
if tracker.scheduled_event_id == scheduled_event_id:
242+
return key
243+
return None
244+
245+
def _find_decision_by_initiated_event_id(
246+
self, initiated_event_id: int
247+
) -> Optional[str]:
248+
"""Find a decision key by its initiated event ID."""
249+
for key, tracker in self._tracked_decisions.items():
250+
if tracker.initiated_event_id == initiated_event_id:
251+
return key
252+
return None
253+
254+
def _find_decision_by_started_event_id(
255+
self, started_event_id: int
256+
) -> Optional[str]:
257+
"""Find a decision key by its started event ID."""
258+
for key, tracker in self._tracked_decisions.items():
259+
if tracker.started_event_id == started_event_id:
260+
return key
261+
return None
262+
263+
def get_pending_decisions_count(self) -> int:
264+
"""
265+
Get the count of decisions that are not yet completed.
266+
267+
Returns:
268+
The number of pending decisions
269+
"""
270+
return sum(
271+
1
272+
for tracker in self._tracked_decisions.values()
273+
if not tracker.is_completed
274+
)
275+
276+
def get_completed_decisions_count(self) -> int:
277+
"""
278+
Get the count of decisions that have been completed.
279+
280+
Returns:
281+
The number of completed decisions
282+
"""
283+
return sum(
284+
1 for tracker in self._tracked_decisions.values() if tracker.is_completed
285+
)
286+
287+
def reset(self) -> None:
288+
"""Reset all decision tracking state."""
289+
self._next_decision_counters.clear()
290+
self._tracked_decisions.clear()
291+
self._decision_id_to_key.clear()
292+
logger.debug("DecisionsHelper reset")
293+
294+
def get_stats(self) -> Dict[str, int]:
295+
"""
296+
Get statistics about tracked decisions.
297+
298+
Returns:
299+
Dictionary with decision statistics
300+
"""
301+
stats = {
302+
"total_decisions": len(self._tracked_decisions),
303+
"pending_decisions": self.get_pending_decisions_count(),
304+
"completed_decisions": self.get_completed_decisions_count(),
305+
}
306+
307+
# Add per-type counts
308+
for decision_type in DecisionType:
309+
type_name = decision_type.name.lower()
310+
stats[f"{type_name}_count"] = self._next_decision_counters.get(
311+
decision_type, 0
312+
)
313+
314+
return stats

0 commit comments

Comments
 (0)