From 393aa21a1e44c6084e9bd3fd29223398d1aee46c Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Wed, 28 May 2025 16:43:48 +0200 Subject: [PATCH 1/5] chore: New task execution task id test (#1352) * chore: New task execution task id test test how taskExecutionTaskId can be used for idempotency Signed-off-by: Javier Aliaga * chore: Clean up not used files Signed-off-by: Javier Aliaga * docs: Task execution keys Signed-off-by: Javier Aliaga * test: Modify unit tests Signed-off-by: Javier Aliaga * Remove new lines Signed-off-by: artur-ciocanu --------- Signed-off-by: Javier Aliaga Signed-off-by: artur-ciocanu Co-authored-by: Cassie Coyle Co-authored-by: artur-ciocanu Signed-off-by: Javier Aliaga --- .../java-workflow/java-workflow-howto.md | 43 ++++++++++++-- .../io/dapr/it/testcontainers/KeyStore.java | 51 ++++++++++++++++ .../TaskExecutionKeyActivity.java | 35 +++++++++++ .../TestExecutionKeysWorkflow.java | 58 +++++++++++++++++++ .../workflows/DaprWorkflowsIT.java | 26 +++++++++ .../workflows/TestWorkflowsConfiguration.java | 5 +- .../workflows/WorkflowActivityContext.java | 2 + .../DefaultWorkflowActivityContext.java | 5 ++ .../WorkflowActivityClassWrapperTest.java | 5 +- 9 files changed, 223 insertions(+), 7 deletions(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java diff --git a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md index 1c12aa50c2..ccc365cf42 100644 --- a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md +++ b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md @@ -6,7 +6,7 @@ weight: 20000 description: How to get up and running with workflows using the Dapr Java SDK --- -Let’s create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: +Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: - Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) - Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java) @@ -85,11 +85,10 @@ You're up and running! Both Dapr and your app logs will appear here. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. ``` -## Run the `DemoWorkflowClient +## Run the `DemoWorkflowClient` The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr. - ```java public class DemoWorkflowClient { @@ -246,4 +245,40 @@ Exiting DemoWorkflowClient. ## Next steps - [Learn more about Dapr workflow]({{% ref workflow-overview.md %}}) -- [Workflow API reference]({{% ref workflow_api.md %}}) \ No newline at end of file +- [Workflow API reference]({{% ref workflow_api.md %}}) + +## Advanced features + +### Task Execution Keys + +Task execution keys are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for: + +1. **Idempotency**: Ensuring activities are not executed multiple times for the same task +2. **State Management**: Tracking the state of activity execution +3. **Error Handling**: Managing retries and failures in a controlled manner + +Here's an example of how to use task execution keys in your workflow activities: + +```java +public class TaskExecutionKeyActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + // Get the task execution key for this activity + String taskExecutionKey = ctx.getTaskExecutionKey(); + + // Use the key to implement idempotency or state management + // For example, check if this task has already been executed + if (isTaskAlreadyExecuted(taskExecutionKey)) { + return getPreviousResult(taskExecutionKey); + } + + // Execute the activity logic + Object result = executeActivityLogic(); + + // Store the result with the task execution key + storeResult(taskExecutionKey, result); + + return result; + } +} +``` diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java new file mode 100644 index 0000000000..017e1c50be --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers; + +import java.util.HashMap; +import java.util.Map; + +public class KeyStore { + + private final Map keyStore = new HashMap<>(); + + private static KeyStore instance; + + private KeyStore() { + } + + public static KeyStore getInstance() { + if (instance == null) { + synchronized (KeyStore.class) { + if (instance == null) { + instance = new KeyStore(); + } + } + } + return instance; + } + + + public void addKey(String key, Boolean value) { + keyStore.put(key, value); + } + + public Boolean getKey(String key) { + return keyStore.get(key); + } + + public void removeKey(String key) { + keyStore.remove(key); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java new file mode 100644 index 0000000000..c1a5b50381 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java @@ -0,0 +1,35 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; + +public class TaskExecutionKeyActivity implements WorkflowActivity { + + @Override + public Object run(WorkflowActivityContext ctx) { + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + KeyStore keyStore = KeyStore.getInstance(); + Boolean exists = keyStore.getKey(ctx.getTaskExecutionKey()); + if (!Boolean.TRUE.equals(exists)) { + keyStore.addKey(ctx.getTaskExecutionKey(), true); + workflowPayload.getPayloads().add("Execution key not found"); + throw new IllegalStateException("Task execution key not found"); + } + workflowPayload.getPayloads().add("Execution key found"); + return workflowPayload; + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java new file mode 100644 index 0000000000..30a9ea33f2 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java @@ -0,0 +1,58 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.durabletask.Task; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; + +import java.time.Duration; + +import org.slf4j.Logger; + +public class TestExecutionKeysWorkflow implements Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + + Logger logger = ctx.getLogger(); + String instanceId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID: " + instanceId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + workflowPayload.setWorkflowId(instanceId); + + WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(3) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxRetryInterval(Duration.ofSeconds(10)) + .setBackoffCoefficient(2.0) + .setRetryTimeout(Duration.ofSeconds(50)) + .build()); + + + Task t = ctx.callActivity(TaskExecutionKeyActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class); + + TestWorkflowPayload payloadAfterExecution = t.await(); + + ctx.complete(payloadAfterExecution); + }; + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java index 19fe8f986b..c99837e4f3 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java @@ -15,6 +15,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; + +import io.dapr.it.testcontainers.KeyStore; +import io.dapr.it.testcontainers.TestExecutionKeysWorkflow; import io.dapr.testcontainers.Component; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; @@ -41,6 +44,7 @@ import java.util.Map; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -148,6 +152,28 @@ public void testSuspendAndResumeWorkflows() throws Exception { } + public void testExecutionKeyWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload); + + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false); + + Duration timeout = Duration.ofSeconds(1000); + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); + + assertNotNull(workflowStatus); + + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + + assertEquals(1, workflowOutput.getPayloads().size()); + assertEquals("Execution key found", workflowOutput.getPayloads().get(0)); + + String executionKey = workflowOutput.getWorkflowId() +"-"+"io.dapr.it.testcontainers.TaskExecutionKeyActivity"; + assertTrue(KeyStore.getInstance().getKey(executionKey)); + + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java index 78f749e9dd..18ce24b9ed 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java @@ -56,9 +56,12 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder( WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides)); builder.registerWorkflow(TestWorkflow.class); + builder.registerWorkflow(TestExecutionKeysWorkflow.class); builder.registerActivity(FirstActivity.class); builder.registerActivity(SecondActivity.class); - + builder.registerActivity(TaskExecutionKeyActivity.class); + + return builder; } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 3fe5d88a23..90a2c41a59 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -17,6 +17,8 @@ public interface WorkflowActivityContext { String getName(); + String getTaskExecutionKey(); + T getInput(Class targetType); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index 551c21a373..217c3cd183 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -56,4 +56,9 @@ public String getName() { public T getInput(Class targetType) { return this.innerContext.getInput(targetType); } + + @Override + public String getTaskExecutionKey() { + return this.innerContext.getTaskExecutionKey(); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java index 76a7e07af1..81ac492e05 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java @@ -16,7 +16,7 @@ public static class TestActivity implements WorkflowActivity { @Override public Object run(WorkflowActivityContext ctx) { String activityContextName = ctx.getName(); - return ctx.getInput(String.class) + " world! from " + activityContextName; + return ctx.getInput(String.class) + " world! from " + activityContextName + " with task execution key " + ctx.getTaskExecutionKey(); } } @@ -37,10 +37,11 @@ public void createWithClass() { when(mockContext.getInput(String.class)).thenReturn("Hello"); when(mockContext.getName()).thenReturn("TestActivityContext"); + when(mockContext.getTaskExecutionKey()).thenReturn("123"); Object result = wrapper.create().run(mockContext); verify(mockContext, times(1)).getInput(String.class); - assertEquals("Hello world! from TestActivityContext", result); + assertEquals("Hello world! from TestActivityContext with task execution key 123", result); } } From 5964491eb867144dd93dfeed0be585d88d0c6bf4 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 17 Jul 2025 09:30:44 +0200 Subject: [PATCH 2/5] chore: Use latest durabletask-java version Signed-off-by: Javier Aliaga --- ...ecutionKeyActivity.java => TaskExecutionIdActivity.java} | 6 +++--- .../dapr/it/testcontainers/TestExecutionKeysWorkflow.java | 2 +- .../workflows/TestWorkflowsConfiguration.java | 2 +- sdk-workflows/pom.xml | 2 +- .../java/io/dapr/workflows/WorkflowActivityContext.java | 2 +- .../workflows/runtime/DefaultWorkflowActivityContext.java | 4 ++-- .../workflows/runtime/WorkflowActivityClassWrapperTest.java | 4 ++-- 7 files changed, 11 insertions(+), 11 deletions(-) rename sdk-tests/src/test/java/io/dapr/it/testcontainers/{TaskExecutionKeyActivity.java => TaskExecutionIdActivity.java} (86%) diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java similarity index 86% rename from sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java rename to sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java index c1a5b50381..a75230e502 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java @@ -16,15 +16,15 @@ import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivityContext; -public class TaskExecutionKeyActivity implements WorkflowActivity { +public class TaskExecutionIdActivity implements WorkflowActivity { @Override public Object run(WorkflowActivityContext ctx) { TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); KeyStore keyStore = KeyStore.getInstance(); - Boolean exists = keyStore.getKey(ctx.getTaskExecutionKey()); + Boolean exists = keyStore.getKey(ctx.getTaskExecutionId()); if (!Boolean.TRUE.equals(exists)) { - keyStore.addKey(ctx.getTaskExecutionKey(), true); + keyStore.addKey(ctx.getTaskExecutionId(), true); workflowPayload.getPayloads().add("Execution key not found"); throw new IllegalStateException("Task execution key not found"); } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java index 30a9ea33f2..6314ce1610 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java @@ -47,7 +47,7 @@ public WorkflowStub create() { .build()); - Task t = ctx.callActivity(TaskExecutionKeyActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class); + Task t = ctx.callActivity(TaskExecutionIdActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class); TestWorkflowPayload payloadAfterExecution = t.await(); diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java index 18ce24b9ed..40b7cb1b33 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java @@ -59,7 +59,7 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder( builder.registerWorkflow(TestExecutionKeysWorkflow.class); builder.registerActivity(FirstActivity.class); builder.registerActivity(SecondActivity.class); - builder.registerActivity(TaskExecutionKeyActivity.class); + builder.registerActivity(TaskExecutionIdActivity.class); return builder; diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml index 883489fd01..720a512908 100644 --- a/sdk-workflows/pom.xml +++ b/sdk-workflows/pom.xml @@ -47,7 +47,7 @@ io.dapr durabletask-client - 1.5.6 + 1.5.7 + +Execute the following script in order to run IdempotencyWorker: +```sh +dapr run --app-id idempotencyworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyWorker +``` + +Once running, execute the following script in order to run IdempotencyClient: +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyClient +``` + + +The worker logs will show how the activity handles retries idempotently: +```text +== APP == 2023-11-07 15:30:22,145 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.idempotency.IdempotentWorkflow +== APP == 2023-11-07 15:30:22,189 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,192 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 0 +== APP == 2023-11-07 15:30:22,198 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,199 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 1 +== APP == 2023-11-07 15:30:22,205 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,206 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 2 +== APP == 2023-11-07 15:30:22,212 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,226 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 3 +== APP == 2023-11-07 15:30:22,230 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,231 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 0 +== APP == 2023-11-07 15:30:22,236 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,237 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 1 +== APP == 2023-11-07 15:30:22,242 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,255 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 2 +== APP == 2023-11-07 15:30:22,259 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,262 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_003] with limit 1, incrementing counter 0 +== APP == 2023-11-07 15:30:22,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,280 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 1 +``` + +The client logs: +```text +Started a new chaining model workflow with instance ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g +workflow instance with ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g completed with result: [3, 2, 1] +``` + +Key Points: +1. **Task Execution ID**: Each activity call has a unique task execution ID that remains consistent across retries +2. **Retry Policy**: The workflow defines a retry policy with exponential backoff and maximum attempts +3. **State Tracking**: The activity uses a shared key store to track the execution state for each task execution ID +4. **Controlled Failure**: The activity intentionally fails until it reaches the specified limit, demonstrating retry behavior +5. **Idempotent Result**: Once the limit is reached, subsequent retries with the same task execution ID return the same result + +This pattern is essential for building resilient workflows that can handle transient failures without causing duplicate operations or inconsistent state. diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java index a75230e502..e52dc8d574 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java @@ -13,6 +13,7 @@ package io.dapr.it.testcontainers; +import io.dapr.it.testcontainers.workflows.TestWorkflowPayload; import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivityContext; diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java index 6314ce1610..a83e5b2ff9 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java @@ -14,6 +14,7 @@ package io.dapr.it.testcontainers; import io.dapr.durabletask.Task; +import io.dapr.it.testcontainers.workflows.TestWorkflowPayload; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowStub; import io.dapr.workflows.WorkflowTaskOptions; From ad4fdc86902ec00cbcdc041ec8fa95aceaef1782 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 17 Jul 2025 15:51:42 +0200 Subject: [PATCH 5/5] chore: Use dapr from master Signed-off-by: Javier Aliaga --- .github/workflows/validate.yml | 10 +++++----- .../workflows/TestWorkflowsConfiguration.java | 2 ++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index efeb393910..5e681f4c11 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -32,16 +32,16 @@ jobs: matrix: java: [ 17 ] env: - GOVER: "1.20" + GOVER: "1.24.4" GOOS: linux GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: ${{ matrix.java }} - DAPR_CLI_VER: 1.15.0 - DAPR_RUNTIME_VER: 1.15.4 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh + DAPR_CLI_VER: 1.16.0-rc.1 + DAPR_RUNTIME_VER: 1.16.0-rc.1 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.16.0-rc.1/install/install.sh DAPR_CLI_REF: - DAPR_REF: + DAPR_REF: steps: - uses: actions/checkout@v4 - name: Set up OpenJDK ${{ env.JDK_VER }} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java index 40b7cb1b33..0217c62041 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java @@ -15,6 +15,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.config.Properties; +import io.dapr.it.testcontainers.TaskExecutionIdActivity; +import io.dapr.it.testcontainers.TestExecutionKeysWorkflow; import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import org.springframework.beans.factory.annotation.Value;