diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 29484a790..a99e2a569 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,14 +33,14 @@ jobs: spring-boot-display-version: 3.3.x experimental: false env: - GOVER: "1.20" + GOVER: "1.24.4" GOOS: linux GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: ${{ matrix.java }} - DAPR_CLI_VER: 1.15.0 - DAPR_RUNTIME_VER: 1.15.4 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh + DAPR_CLI_VER: 1.16.0-rc.1 + DAPR_RUNTIME_VER: 1.16.0-rc.1 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.16.0-rc.1/install/install.sh DAPR_CLI_REF: DAPR_REF: TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64 diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index efeb39391..5e681f4c1 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -32,16 +32,16 @@ jobs: matrix: java: [ 17 ] env: - GOVER: "1.20" + GOVER: "1.24.4" GOOS: linux GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: ${{ matrix.java }} - DAPR_CLI_VER: 1.15.0 - DAPR_RUNTIME_VER: 1.15.4 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh + DAPR_CLI_VER: 1.16.0-rc.1 + DAPR_RUNTIME_VER: 1.16.0-rc.1 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.16.0-rc.1/install/install.sh DAPR_CLI_REF: - DAPR_REF: + DAPR_REF: steps: - uses: actions/checkout@v4 - name: Set up OpenJDK ${{ env.JDK_VER }} diff --git a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md index 1c12aa50c..135e1bb73 100644 --- a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md +++ b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md @@ -6,7 +6,7 @@ weight: 20000 description: How to get up and running with workflows using the Dapr Java SDK --- -Let’s create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: +Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: - Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) - Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java) @@ -85,11 +85,10 @@ You're up and running! Both Dapr and your app logs will appear here. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. ``` -## Run the `DemoWorkflowClient +## Run the `DemoWorkflowClient` The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr. - ```java public class DemoWorkflowClient { @@ -246,4 +245,39 @@ Exiting DemoWorkflowClient. ## Next steps - [Learn more about Dapr workflow]({{% ref workflow-overview.md %}}) -- [Workflow API reference]({{% ref workflow_api.md %}}) \ No newline at end of file +- [Workflow API reference]({{% ref workflow_api.md %}}) + +## Advanced features + +### Task Execution Ids + +Task execution ids are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for: + +**Idempotency**: Ensuring activities are not executed multiple times for the same task + + +Here's an example of how to use task execution keys in your workflow activities: + +```java +public class TaskExecutionIdActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + // Get the task execution key for this activity + String taskExecutionId = ctx.getTaskExecutionId(); + + // Use the key to implement idempotency or state management + // For example, check if this task has already been executed + if (isTaskAlreadyExecuted(taskExecutionId)) { + return getPreviousResult(taskExecutionId); + } + + // Execute the activity logic + Object result = executeActivityLogic(); + + // Store the result with the task execution key + storeResult(taskExecutionId, result); + + return result; + } +} +``` diff --git a/examples/src/main/java/io/dapr/examples/workflows/README.md b/examples/src/main/java/io/dapr/examples/workflows/README.md index b90726080..4adaefb6d 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/README.md +++ b/examples/src/main/java/io/dapr/examples/workflows/README.md @@ -53,6 +53,8 @@ Those examples contain the following workflow patterns: 4. [External Event Pattern](#external-event-pattern) 5. [Child-workflow Pattern](#child-workflow-pattern) 6. [Compensation Pattern](#compensation-pattern) +6. [Suspend/resume Pattern](#suspendresume-pattern) +7. [Idempotency Pattern](#idempotency-pattern) ### Chaining Pattern In the chaining pattern, a sequence of activities executes in a specific order. @@ -707,4 +709,143 @@ The client log: ```text Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. -``` \ No newline at end of file +``` + +### Idempotency Pattern + +The idempotency pattern ensures that activities can be safely retried without causing unintended side effects. This pattern is crucial when dealing with potentially unreliable external services or when network failures might cause activities to be retried. In this example, we demonstrate how to use task execution IDs to implement idempotent activities that return consistent results across retries. + +The `IdempotentWorkflow` class defines the workflow with retry policies and calls activities with specific limits. The workflow uses a shared key store to track task execution attempts. See the code snippet below: +```java +public class IdempotentWorkflow implements Workflow { + + private static Map keyStore; + + public static Map getKeyStore() { + if (keyStore == null) { + synchronized (IdempotentWorkflow.class) { + if (keyStore == null) { + keyStore = new ConcurrentHashMap<>(); + } + } + } + return keyStore; + } + + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + var result = new ArrayList(); + + WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(10) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxRetryInterval(Duration.ofSeconds(10)) + .setBackoffCoefficient(2.0) + .setRetryTimeout(Duration.ofSeconds(10)) + .build()); + + result.add(ctx.callActivity(IdempotentActivity.class.getName(), 3, options, Integer.class).await()); + result.add(ctx.callActivity(IdempotentActivity.class.getName(), 2, options, Integer.class).await()); + result.add(ctx.callActivity(IdempotentActivity.class.getName(), 1, options, Integer.class).await()); + + result.forEach(r -> ctx.getLogger().info("Result: " + r)); + + ctx.complete(result); + }; + } +} +``` + +The `IdempotentActivity` class implements the idempotency logic using task execution IDs. Each task execution has a unique ID that remains consistent across retries, allowing the activity to track its state and ensure idempotent behavior. See the code snippet below: +```java +public class IdempotentActivity implements WorkflowActivity { + + Logger logger = LoggerFactory.getLogger(IdempotentActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + + logger.info("[{}] Starting Activity {} ", ctx.getTaskExecutionId(), ctx.getName()); + var limit = ctx.getInput(Integer.class); + + var counter = IdempotentWorkflow.getKeyStore().getOrDefault(ctx.getTaskExecutionId(), new AtomicInteger(0)); + if (counter.get() != limit) { + logger.info("Task execution key[{}] with limit {}, incrementing counter {}",ctx.getTaskExecutionId(), limit, counter.get()); + IdempotentWorkflow.getKeyStore().put(ctx.getTaskExecutionId(), new AtomicInteger(counter.incrementAndGet())); + + throw new IllegalStateException("Task execution key not found"); + } + + return counter.get(); + } +} +``` + + + +Execute the following script in order to run IdempotencyWorker: +```sh +dapr run --app-id idempotencyworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyWorker +``` + +Once running, execute the following script in order to run IdempotencyClient: +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyClient +``` + + +The worker logs will show how the activity handles retries idempotently: +```text +== APP == 2023-11-07 15:30:22,145 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.idempotency.IdempotentWorkflow +== APP == 2023-11-07 15:30:22,189 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,192 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 0 +== APP == 2023-11-07 15:30:22,198 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,199 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 1 +== APP == 2023-11-07 15:30:22,205 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,206 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 2 +== APP == 2023-11-07 15:30:22,212 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,226 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 3 +== APP == 2023-11-07 15:30:22,230 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,231 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 0 +== APP == 2023-11-07 15:30:22,236 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,237 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 1 +== APP == 2023-11-07 15:30:22,242 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,255 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 2 +== APP == 2023-11-07 15:30:22,259 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,262 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_003] with limit 1, incrementing counter 0 +== APP == 2023-11-07 15:30:22,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity +== APP == 2023-11-07 15:30:22,280 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 1 +``` + +The client logs: +```text +Started a new chaining model workflow with instance ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g +workflow instance with ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g completed with result: [3, 2, 1] +``` + +Key Points: +1. **Task Execution ID**: Each activity call has a unique task execution ID that remains consistent across retries +2. **Retry Policy**: The workflow defines a retry policy with exponential backoff and maximum attempts +3. **State Tracking**: The activity uses a shared key store to track the execution state for each task execution ID +4. **Controlled Failure**: The activity intentionally fails until it reaches the specified limit, demonstrating retry behavior +5. **Idempotent Result**: Once the limit is reached, subsequent retries with the same task execution ID return the same result + +This pattern is essential for building resilient workflows that can handle transient failures without causing duplicate operations or inconsistent state. diff --git a/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotencyClient.java b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotencyClient.java new file mode 100644 index 000000000..a5b92029d --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotencyClient.java @@ -0,0 +1,54 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.idempotency; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowRuntimeStatus; + +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; + +public class IdempotencyClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(IdempotentWorkflow.class); + System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId); + WorkflowInstanceStatus workflowInstanceStatus = + client.waitForInstanceCompletion(instanceId, null, true); + + if (workflowInstanceStatus == null) { + System.out.printf("workflow instance with ID: %s not found%n", instanceId); + return; + } + + if (workflowInstanceStatus.getRuntimeStatus() != WorkflowRuntimeStatus.COMPLETED) { + System.out.printf("workflow instance with ID: %s failed", instanceId); + return; + } + + var result = workflowInstanceStatus.readOutputAs(ArrayList.class); + System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result.toString()); + + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotencyWorker.java b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotencyWorker.java new file mode 100644 index 000000000..4b611dde4 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotencyWorker.java @@ -0,0 +1,36 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.idempotency; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class IdempotencyWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(IdempotentWorkflow.class); + builder.registerActivity(IdempotentActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + runtime.start(); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotentActivity.java b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotentActivity.java new file mode 100644 index 000000000..a68fc135e --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotentActivity.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.idempotency; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IdempotentActivity implements WorkflowActivity { + + Logger logger = LoggerFactory.getLogger(IdempotentActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + + logger.info("[{}] Starting Activity {} ", ctx.getTaskExecutionId(), ctx.getName()); + var limit = ctx.getInput(Integer.class); + + var counter = IdempotentWorkflow.getKeyStore().getOrDefault(ctx.getTaskExecutionId(), new AtomicInteger(0)); + if (counter.get() != limit) { + logger.info("Task execution key[{}] with limit {}, incrementing counter {}",ctx.getTaskExecutionId(), limit, counter.get()); + IdempotentWorkflow.getKeyStore().put(ctx.getTaskExecutionId(), new AtomicInteger(counter.incrementAndGet())); + + throw new IllegalStateException("Task execution key not found"); + } + + return counter.get(); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotentWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotentWorkflow.java new file mode 100644 index 000000000..c2d3bb595 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/idempotency/IdempotentWorkflow.java @@ -0,0 +1,71 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.idempotency; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; + + +public class IdempotentWorkflow implements Workflow { + + + private static Map keyStore; + + + public static Map getKeyStore() { + if (keyStore == null) { + synchronized (IdempotentWorkflow.class) { + if (keyStore == null) { + keyStore = new ConcurrentHashMap<>(); + } + } + } + return keyStore; + } + + + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + var result = new ArrayList(); + + WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(10) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxRetryInterval(Duration.ofSeconds(10)) + .setBackoffCoefficient(2.0) + .setRetryTimeout(Duration.ofSeconds(10)) + .build()); + + result.add(ctx.callActivity(IdempotentActivity.class.getName(), 3, options, Integer.class).await()); + result.add(ctx.callActivity(IdempotentActivity.class.getName(), 2, options, Integer.class).await()); + result.add(ctx.callActivity(IdempotentActivity.class.getName(), 1, options, Integer.class).await()); + + result.forEach(r -> ctx.getLogger().info("Result: " + r)); + + ctx.complete(result); + }; + } +} \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java new file mode 100644 index 000000000..0b096d33a --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class KeyStore { + + private final Map keyStore = new ConcurrentHashMap<>(); + + private static KeyStore instance; + + private KeyStore() { + } + + public static KeyStore getInstance() { + if (instance == null) { + synchronized (KeyStore.class) { + if (instance == null) { + instance = new KeyStore(); + } + } + } + return instance; + } + + + public void addKey(String key, Boolean value) { + keyStore.put(key, value); + } + + public Boolean getKey(String key) { + return keyStore.get(key); + } + + public void removeKey(String key) { + keyStore.remove(key); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java new file mode 100644 index 000000000..e52dc8d57 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionIdActivity.java @@ -0,0 +1,36 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.it.testcontainers.workflows.TestWorkflowPayload; +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; + +public class TaskExecutionIdActivity implements WorkflowActivity { + + @Override + public Object run(WorkflowActivityContext ctx) { + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + KeyStore keyStore = KeyStore.getInstance(); + Boolean exists = keyStore.getKey(ctx.getTaskExecutionId()); + if (!Boolean.TRUE.equals(exists)) { + keyStore.addKey(ctx.getTaskExecutionId(), true); + workflowPayload.getPayloads().add("Execution key not found"); + throw new IllegalStateException("Task execution key not found"); + } + workflowPayload.getPayloads().add("Execution key found"); + return workflowPayload; + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java new file mode 100644 index 000000000..a83e5b2ff --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.durabletask.Task; +import io.dapr.it.testcontainers.workflows.TestWorkflowPayload; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; + +import java.time.Duration; + +import org.slf4j.Logger; + +public class TestExecutionKeysWorkflow implements Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + + Logger logger = ctx.getLogger(); + String instanceId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID: " + instanceId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + workflowPayload.setWorkflowId(instanceId); + + WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(3) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxRetryInterval(Duration.ofSeconds(10)) + .setBackoffCoefficient(2.0) + .setRetryTimeout(Duration.ofSeconds(50)) + .build()); + + + Task t = ctx.callActivity(TaskExecutionIdActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class); + + TestWorkflowPayload payloadAfterExecution = t.await(); + + ctx.complete(payloadAfterExecution); + }; + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java index 19fe8f986..c99837e4f 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java @@ -15,6 +15,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; + +import io.dapr.it.testcontainers.KeyStore; +import io.dapr.it.testcontainers.TestExecutionKeysWorkflow; import io.dapr.testcontainers.Component; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; @@ -41,6 +44,7 @@ import java.util.Map; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -148,6 +152,28 @@ public void testSuspendAndResumeWorkflows() throws Exception { } + public void testExecutionKeyWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload); + + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false); + + Duration timeout = Duration.ofSeconds(1000); + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); + + assertNotNull(workflowStatus); + + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + + assertEquals(1, workflowOutput.getPayloads().size()); + assertEquals("Execution key found", workflowOutput.getPayloads().get(0)); + + String executionKey = workflowOutput.getWorkflowId() +"-"+"io.dapr.it.testcontainers.TaskExecutionKeyActivity"; + assertTrue(KeyStore.getInstance().getKey(executionKey)); + + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java index 78f749e9d..0217c6204 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/TestWorkflowsConfiguration.java @@ -15,6 +15,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.config.Properties; +import io.dapr.it.testcontainers.TaskExecutionIdActivity; +import io.dapr.it.testcontainers.TestExecutionKeysWorkflow; import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import org.springframework.beans.factory.annotation.Value; @@ -56,9 +58,12 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder( WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides)); builder.registerWorkflow(TestWorkflow.class); + builder.registerWorkflow(TestExecutionKeysWorkflow.class); builder.registerActivity(FirstActivity.class); builder.registerActivity(SecondActivity.class); - + builder.registerActivity(TaskExecutionIdActivity.class); + + return builder; } } diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml index 883489fd0..720a51290 100644 --- a/sdk-workflows/pom.xml +++ b/sdk-workflows/pom.xml @@ -47,7 +47,7 @@ io.dapr durabletask-client - 1.5.6 + 1.5.7