Skip to content

Commit 79e7327

Browse files
committed
feat: Implement signal_with_start_workflow
Signed-off-by: Tim Li <[email protected]>
1 parent f91c925 commit 79e7327

File tree

2 files changed

+151
-0
lines changed

2 files changed

+151
-0
lines changed

cadence/client.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from cadence.api.v1.service_workflow_pb2 import (
1818
StartWorkflowExecutionRequest,
1919
StartWorkflowExecutionResponse,
20+
SignalWithStartWorkflowExecutionRequest,
21+
SignalWithStartWorkflowExecutionResponse,
2022
)
2123
from cadence.api.v1.common_pb2 import WorkflowType, WorkflowExecution
2224
from cadence.api.v1.tasklist_pb2 import TaskList
@@ -229,6 +231,72 @@ async def start_workflow(
229231
except Exception:
230232
raise
231233

234+
async def signal_with_start_workflow(
235+
self,
236+
workflow: Union[str, Callable],
237+
signal_name: str,
238+
signal_input: Any = None,
239+
*args,
240+
**options_kwargs: Unpack[StartWorkflowOptions],
241+
) -> WorkflowExecution:
242+
"""
243+
Signal a workflow execution, starting it if it is not already running.
244+
245+
Args:
246+
workflow: Workflow function or workflow type name string
247+
signal_name: Name of the signal
248+
signal_input: Input data for the signal
249+
*args: Arguments to pass to the workflow if it needs to be started
250+
**options_kwargs: StartWorkflowOptions as keyword arguments
251+
252+
Returns:
253+
WorkflowExecution with workflow_id and run_id
254+
255+
Raises:
256+
ValueError: If required parameters are missing or invalid
257+
Exception: If the gRPC call fails
258+
"""
259+
# Convert kwargs to StartWorkflowOptions and validate
260+
options = _validate_and_apply_defaults(StartWorkflowOptions(**options_kwargs))
261+
262+
# Build the start workflow request
263+
start_request = await self._build_start_workflow_request(workflow, args, options)
264+
265+
# Encode signal input
266+
signal_payload = None
267+
if signal_input is not None:
268+
try:
269+
signal_payload = await self.data_converter.to_data(signal_input)
270+
except Exception as e:
271+
raise ValueError(f"Failed to encode signal input: {e}")
272+
273+
# Build the SignalWithStartWorkflowExecution request
274+
request = SignalWithStartWorkflowExecutionRequest(
275+
start_request=start_request,
276+
signal_name=signal_name,
277+
)
278+
279+
if signal_payload:
280+
request.signal_input.CopyFrom(signal_payload)
281+
282+
# Execute the gRPC call
283+
try:
284+
response: SignalWithStartWorkflowExecutionResponse = (
285+
await self.workflow_stub.SignalWithStartWorkflowExecution(request)
286+
)
287+
288+
# Emit metrics if available
289+
if self.metrics_emitter:
290+
# TODO: Add metrics similar to Go client
291+
pass
292+
293+
execution = WorkflowExecution()
294+
execution.workflow_id = start_request.workflow_id
295+
execution.run_id = response.run_id
296+
return execution
297+
except Exception:
298+
raise
299+
232300

233301
def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
234302
if "target" not in options:

tests/integration_tests/test_client.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,86 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper):
136136
assert task_timeout_seconds == task_timeout.total_seconds(), (
137137
f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s"
138138
)
139+
140+
# trying parametrized test for table test
141+
@pytest.mark.parametrize(
142+
"test_case,workflow_id,start_first,expected_same_run",
143+
[
144+
(
145+
"new_workflow",
146+
"test-workflow-signal-with-start-123",
147+
False,
148+
False,
149+
),
150+
(
151+
"existing_workflow",
152+
"test-workflow-signal-existing-456",
153+
True,
154+
True,
155+
),
156+
],
157+
)
158+
@pytest.mark.usefixtures("helper")
159+
async def test_signal_with_start_workflow(
160+
helper: CadenceHelper,
161+
test_case: str,
162+
workflow_id: str,
163+
start_first: bool,
164+
expected_same_run: bool,
165+
):
166+
"""Test signal_with_start_workflow method.
167+
168+
Test cases:
169+
1. new_workflow: SignalWithStartWorkflow starts a new workflow if it doesn't exist
170+
2. existing_workflow: SignalWithStartWorkflow signals existing workflow without restart
171+
"""
172+
async with helper.client() as client:
173+
workflow_type = f"test-workflow-signal-{test_case}"
174+
task_list_name = f"test-task-list-signal-{test_case}"
175+
execution_timeout = timedelta(minutes=5)
176+
signal_name = "test-signal"
177+
signal_input = {"data": "test-signal-data"}
178+
179+
first_run_id = None
180+
if start_first:
181+
first_execution = await client.start_workflow(
182+
workflow_type,
183+
task_list=task_list_name,
184+
execution_start_to_close_timeout=execution_timeout,
185+
workflow_id=workflow_id,
186+
)
187+
first_run_id = first_execution.run_id
188+
189+
execution = await client.signal_with_start_workflow(
190+
workflow_type,
191+
signal_name,
192+
signal_input,
193+
"arg1",
194+
"arg2",
195+
task_list=task_list_name,
196+
execution_start_to_close_timeout=execution_timeout,
197+
workflow_id=workflow_id,
198+
)
199+
200+
assert execution is not None
201+
assert execution.workflow_id == workflow_id
202+
assert execution.run_id is not None
203+
assert execution.run_id != ""
204+
205+
if expected_same_run:
206+
assert execution.run_id == first_run_id
207+
208+
describe_request = DescribeWorkflowExecutionRequest(
209+
domain=DOMAIN_NAME,
210+
workflow_execution=WorkflowExecution(
211+
workflow_id=execution.workflow_id,
212+
run_id=execution.run_id,
213+
),
214+
)
215+
216+
response = await client.workflow_stub.DescribeWorkflowExecution(
217+
describe_request
218+
)
219+
220+
assert response.workflow_execution_info.type.name == workflow_type
221+
assert response.workflow_execution_info.task_list == task_list_name

0 commit comments

Comments
 (0)