Skip to content

chore: New task execution task id test #1430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ jobs:
matrix:
java: [ 17 ]
env:
GOVER: "1.20"
GOVER: "1.24.4"
GOOS: linux
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.15.0
DAPR_RUNTIME_VER: 1.15.4
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh
DAPR_CLI_VER: 1.16.0-rc.1
DAPR_RUNTIME_VER: 1.16.0-rc.1
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.16.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF:
steps:
- uses: actions/checkout@v4
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ weight: 20000
description: How to get up and running with workflows using the Dapr Java SDK
---

Lets create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will:
Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will:

- Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java)
- Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java)
Expand Down Expand Up @@ -85,11 +85,10 @@ You're up and running! Both Dapr and your app logs will appear here.
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
```

## Run the `DemoWorkflowClient
## Run the `DemoWorkflowClient`

The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr.


```java
public class DemoWorkflowClient {

Expand Down Expand Up @@ -246,4 +245,39 @@ Exiting DemoWorkflowClient.

## Next steps
- [Learn more about Dapr workflow]({{% ref workflow-overview.md %}})
- [Workflow API reference]({{% ref workflow_api.md %}})
- [Workflow API reference]({{% ref workflow_api.md %}})

## Advanced features

### Task Execution Ids

Task execution ids are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for:

**Idempotency**: Ensuring activities are not executed multiple times for the same task


Here's an example of how to use task execution keys in your workflow activities:

```java
public class TaskExecutionIdActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
// Get the task execution key for this activity
String taskExecutionId = ctx.getTaskExecutionId();

// Use the key to implement idempotency or state management
// For example, check if this task has already been executed
if (isTaskAlreadyExecuted(taskExecutionId)) {
return getPreviousResult(taskExecutionId);
}

// Execute the activity logic
Object result = executeActivityLogic();

// Store the result with the task execution key
storeResult(taskExecutionId, result);

return result;
}
}
```
143 changes: 142 additions & 1 deletion examples/src/main/java/io/dapr/examples/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Those examples contain the following workflow patterns:
4. [External Event Pattern](#external-event-pattern)
5. [Child-workflow Pattern](#child-workflow-pattern)
6. [Compensation Pattern](#compensation-pattern)
6. [Suspend/resume Pattern](#suspendresume-pattern)
7. [Idempotency Pattern](#idempotency-pattern)

### Chaining Pattern
In the chaining pattern, a sequence of activities executes in a specific order.
Expand Down Expand Up @@ -707,4 +709,143 @@ The client log:
```text
Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255
workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed.
```
```

### Idempotency Pattern

The idempotency pattern ensures that activities can be safely retried without causing unintended side effects. This pattern is crucial when dealing with potentially unreliable external services or when network failures might cause activities to be retried. In this example, we demonstrate how to use task execution IDs to implement idempotent activities that return consistent results across retries.

The `IdempotentWorkflow` class defines the workflow with retry policies and calls activities with specific limits. The workflow uses a shared key store to track task execution attempts. See the code snippet below:
```java
public class IdempotentWorkflow implements Workflow {

private static Map<String, AtomicInteger> keyStore;

public static Map<String, AtomicInteger> getKeyStore() {
if (keyStore == null) {
synchronized (IdempotentWorkflow.class) {
if (keyStore == null) {
keyStore = new ConcurrentHashMap<>();
}
}
}
return keyStore;
}

@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());

var result = new ArrayList<Integer>();

WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(10)
.setFirstRetryInterval(Duration.ofSeconds(1))
.setMaxRetryInterval(Duration.ofSeconds(10))
.setBackoffCoefficient(2.0)
.setRetryTimeout(Duration.ofSeconds(10))
.build());

result.add(ctx.callActivity(IdempotentActivity.class.getName(), 3, options, Integer.class).await());
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 2, options, Integer.class).await());
result.add(ctx.callActivity(IdempotentActivity.class.getName(), 1, options, Integer.class).await());

result.forEach(r -> ctx.getLogger().info("Result: " + r));

ctx.complete(result);
};
}
}
```

The `IdempotentActivity` class implements the idempotency logic using task execution IDs. Each task execution has a unique ID that remains consistent across retries, allowing the activity to track its state and ensure idempotent behavior. See the code snippet below:
```java
public class IdempotentActivity implements WorkflowActivity {

Logger logger = LoggerFactory.getLogger(IdempotentActivity.class);

@Override
public Object run(WorkflowActivityContext ctx) {

logger.info("[{}] Starting Activity {} ", ctx.getTaskExecutionId(), ctx.getName());
var limit = ctx.getInput(Integer.class);

var counter = IdempotentWorkflow.getKeyStore().getOrDefault(ctx.getTaskExecutionId(), new AtomicInteger(0));
if (counter.get() != limit) {
logger.info("Task execution key[{}] with limit {}, incrementing counter {}",ctx.getTaskExecutionId(), limit, counter.get());
IdempotentWorkflow.getKeyStore().put(ctx.getTaskExecutionId(), new AtomicInteger(counter.incrementAndGet()));

throw new IllegalStateException("Task execution key not found");
}

return counter.get();
}
}
```

<!-- STEP
name: Run Idempotency Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'Starting Workflow: io.dapr.examples.workflows.idempotency.IdempotentWorkflow'
- 'Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity'
- 'Task execution key'
- 'incrementing counter'
- 'Result: 3'
- 'Result: 2'
- 'Result: 1'
background: true
sleep: 60
timeout_seconds: 60
-->

Execute the following script in order to run IdempotencyWorker:
```sh
dapr run --app-id idempotencyworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyWorker
```

Once running, execute the following script in order to run IdempotencyClient:
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.idempotency.IdempotencyClient
```
<!-- END_STEP -->

The worker logs will show how the activity handles retries idempotently:
```text
== APP == 2023-11-07 15:30:22,145 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.idempotency.IdempotentWorkflow
== APP == 2023-11-07 15:30:22,189 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,192 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 0
== APP == 2023-11-07 15:30:22,198 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,199 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 1
== APP == 2023-11-07 15:30:22,205 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,206 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_001] with limit 3, incrementing counter 2
== APP == 2023-11-07 15:30:22,212 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_001] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,226 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 3
== APP == 2023-11-07 15:30:22,230 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,231 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 0
== APP == 2023-11-07 15:30:22,236 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,237 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_002] with limit 2, incrementing counter 1
== APP == 2023-11-07 15:30:22,242 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_002] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,255 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 2
== APP == 2023-11-07 15:30:22,259 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,262 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - Task execution key[task_exec_003] with limit 1, incrementing counter 0
== APP == 2023-11-07 15:30:22,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.idempotency.IdempotentActivity - [task_exec_003] Starting Activity io.dapr.examples.workflows.idempotency.IdempotentActivity
== APP == 2023-11-07 15:30:22,280 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Result: 1
```

The client logs:
```text
Started a new chaining model workflow with instance ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g
workflow instance with ID: 7f8e92a4-5b3c-4d1f-8e2a-9b8c7d6e5f4g completed with result: [3, 2, 1]
```

Key Points:
1. **Task Execution ID**: Each activity call has a unique task execution ID that remains consistent across retries
2. **Retry Policy**: The workflow defines a retry policy with exponential backoff and maximum attempts
3. **State Tracking**: The activity uses a shared key store to track the execution state for each task execution ID
4. **Controlled Failure**: The activity intentionally fails until it reaches the specified limit, demonstrating retry behavior
5. **Idempotent Result**: Once the limit is reached, subsequent retries with the same task execution ID return the same result

This pattern is essential for building resilient workflows that can handle transient failures without causing duplicate operations or inconsistent state.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.workflows.idempotency;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;

import java.util.ArrayList;
import java.util.concurrent.TimeoutException;

public class IdempotencyClient {
/**
* The main method to start the client.
*
* @param args Input arguments (unused).
* @throws InterruptedException If program has been interrupted.
*/
public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
String instanceId = client.scheduleNewWorkflow(IdempotentWorkflow.class);
System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId);
WorkflowInstanceStatus workflowInstanceStatus =
client.waitForInstanceCompletion(instanceId, null, true);

if (workflowInstanceStatus == null) {
System.out.printf("workflow instance with ID: %s not found%n", instanceId);
return;
}

if (workflowInstanceStatus.getRuntimeStatus() != WorkflowRuntimeStatus.COMPLETED) {
System.out.printf("workflow instance with ID: %s failed", instanceId);
return;
}

var result = workflowInstanceStatus.readOutputAs(ArrayList.class);
System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result.toString());

} catch (TimeoutException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.workflows.idempotency;

import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;

public class IdempotencyWorker {
/**
* The main method of this app.
*
* @param args The port the app will listen on.
* @throws Exception An Exception.
*/
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(IdempotentWorkflow.class);
builder.registerActivity(IdempotentActivity.class);

// Build and then start the workflow runtime pulling and executing tasks
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.workflows.idempotency;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;

import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class IdempotentActivity implements WorkflowActivity {

Logger logger = LoggerFactory.getLogger(IdempotentActivity.class);

@Override
public Object run(WorkflowActivityContext ctx) {

logger.info("[{}] Starting Activity {} ", ctx.getTaskExecutionId(), ctx.getName());
var limit = ctx.getInput(Integer.class);

var counter = IdempotentWorkflow.getKeyStore().getOrDefault(ctx.getTaskExecutionId(), new AtomicInteger(0));
if (counter.get() != limit) {
logger.info("Task execution key[{}] with limit {}, incrementing counter {}",ctx.getTaskExecutionId(), limit, counter.get());
IdempotentWorkflow.getKeyStore().put(ctx.getTaskExecutionId(), new AtomicInteger(counter.incrementAndGet()));

throw new IllegalStateException("Task execution key not found");
}

return counter.get();
}
}
Loading
Loading