Skip to content

Commit df5b733

Browse files
committed
adding fanoutfanin example
1 parent 84bf76e commit df5b733

File tree

14 files changed

+527
-3
lines changed

14 files changed

+527
-3
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Fan-out/Fan-in
2+
3+
This tutorial demonstrates how to author a workflow where multiple independent tasks can be scheduled and executed simultaneously. The workflow can either wait until all tasks are completed to proceed, or continue when the fastest task is completed. For more information about the fan-out/fan-in pattern see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns/#fan-outfan-in).
4+
5+
## Inspect the code
6+
7+
Open the [`FanOutFanInWorkflow.java`](src/main/java/io/dapr/springboot/examples/fanoutfanin/FanOutFanInWorkflow.java) file in the `tutorials/workflow/java/fan-out-fan-in/src/main/java/io/dapr/springboot/examples/fanoutfanin` folder. This file contains the definition for the workflow.
8+
9+
```mermaid
10+
graph LR
11+
SW((Start
12+
Workflow))
13+
subgraph for each word in the input
14+
GWL[GetWordLength]
15+
end
16+
SHORT[Select the
17+
shortest word]
18+
ALL[Wait until all tasks
19+
are completed]
20+
EW((End
21+
Workflow))
22+
SW --> GWL
23+
GWL --> ALL
24+
ALL --> SHORT
25+
SHORT --> EW
26+
```
27+
28+
## Run the tutorial
29+
30+
1. Use a terminal to navigate to the `tutorials/workflow/java/task-chaining` folder.
31+
2. Build and run the project using Maven.
32+
33+
```bash
34+
mvn spring-boot:test-run
35+
```
36+
37+
3. Use the POST request in the [`fanoutfanin.http`](./fanoutfanin.http) file to start the workflow, or use this cURL command:
38+
39+
```bash
40+
curl -i --request POST \
41+
--url http://localhost:8080/start \
42+
--header 'content-type: application/json' \
43+
--data '["which","word","is","the","shortest"]'
44+
```
45+
46+
The input for the workflow is an array of strings:
47+
48+
```json
49+
[
50+
"which",
51+
"word",
52+
"is",
53+
"the",
54+
"shortest"
55+
]
56+
```
57+
58+
The expected app logs are as follows:
59+
60+
```text
61+
== APP - fanoutfanin == GetWordLength: Received input: word.
62+
== APP - fanoutfanin == GetWordLength: Received input: is.
63+
== APP - fanoutfanin == GetWordLength: Received input: the.
64+
== APP - fanoutfanin == GetWordLength: Received input: shortest.
65+
== APP - fanoutfanin == GetWordLength: Received input: which.
66+
```
67+
68+
> Note that the order of the logs may vary.
69+
70+
4. Use the GET request in the [`fanoutfanin.http`](./fanoutfanin.http) file to get the status of the workflow, or use this cURL command:
71+
72+
```bash
73+
curl --request GET --url http://localhost:8080/output
74+
```
75+
76+
5. The expected serialized output of the workflow is an array with two strings:
77+
78+
```txt
79+
"["is"]"
80+
```
81+
82+
6. Stop the application by pressing `Ctrl+C`.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
@apphost=http://localhost:8080
2+
3+
### Start the FanOutFanIn workflow
4+
# @name startWorkflowRequest
5+
POST {{ apphost }}/start
6+
7+
### Get the workflow status
8+
GET {{ apphost }}/output
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<groupId>org.springframework.boot</groupId>
8+
<artifactId>spring-boot-starter-parent</artifactId>
9+
<version>3.4.5</version>
10+
<relativePath/> <!-- lookup parent from repository -->
11+
</parent>
12+
13+
<artifactId>fan-out-fan-in</artifactId>
14+
<name>fan-out-fan-in</name>
15+
<description>Fan Out/In Workflow Example</description>
16+
17+
<properties>
18+
<dapr.spring.version>0.15.0-rc-7</dapr.spring.version>
19+
</properties>
20+
<dependencies>
21+
<dependency>
22+
<groupId>org.springframework.boot</groupId>
23+
<artifactId>spring-boot-starter-web</artifactId>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.springframework.boot</groupId>
27+
<artifactId>spring-boot-starter-test</artifactId>
28+
</dependency>
29+
<dependency>
30+
<groupId>io.dapr.spring</groupId>
31+
<artifactId>dapr-spring-boot-starter</artifactId>
32+
<version>${dapr.spring.version}</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>io.dapr.spring</groupId>
36+
<artifactId>dapr-spring-boot-starter-test</artifactId>
37+
<version>${dapr.spring.version}</version>
38+
<scope>test</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>io.rest-assured</groupId>
42+
<artifactId>rest-assured</artifactId>
43+
<scope>test</scope>
44+
</dependency>
45+
</dependencies>
46+
47+
<build>
48+
<plugins>
49+
<plugin>
50+
<groupId>org.springframework.boot</groupId>
51+
<artifactId>spring-boot-maven-plugin</artifactId>
52+
</plugin>
53+
</plugins>
54+
</build>
55+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.springboot.examples;
15+
16+
import org.springframework.boot.SpringApplication;
17+
import org.springframework.boot.autoconfigure.SpringBootApplication;
18+
19+
20+
@SpringBootApplication
21+
public class FanOutFanInApplication {
22+
23+
public static void main(String[] args) {
24+
SpringApplication.run(FanOutFanInApplication.class, args);
25+
}
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.springboot.examples;
15+
16+
import com.fasterxml.jackson.databind.ObjectMapper;
17+
import org.springframework.boot.web.client.RestTemplateBuilder;
18+
import org.springframework.context.annotation.Bean;
19+
import org.springframework.context.annotation.Configuration;
20+
import org.springframework.web.client.RestTemplate;
21+
22+
@Configuration
23+
public class FanOutFanInConfiguration {
24+
25+
@Bean
26+
public RestTemplate restTemplate() {
27+
return new RestTemplateBuilder().build();
28+
}
29+
30+
@Bean
31+
public ObjectMapper mapper() {
32+
return new ObjectMapper();
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.springboot.examples;
15+
16+
17+
import io.dapr.spring.workflows.config.EnableDaprWorkflows;
18+
import io.dapr.springboot.examples.fanoutfanin.FanOutFanInWorkflow;
19+
import io.dapr.workflows.client.DaprWorkflowClient;
20+
import io.dapr.workflows.client.WorkflowInstanceStatus;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.web.bind.annotation.GetMapping;
25+
import org.springframework.web.bind.annotation.PostMapping;
26+
import org.springframework.web.bind.annotation.RequestBody;
27+
import org.springframework.web.bind.annotation.RestController;
28+
29+
import java.util.List;
30+
import java.util.concurrent.TimeoutException;
31+
32+
@RestController
33+
@EnableDaprWorkflows
34+
public class FanOutFanInRestController {
35+
36+
private final Logger logger = LoggerFactory.getLogger(FanOutFanInRestController.class);
37+
38+
@Autowired
39+
private DaprWorkflowClient daprWorkflowClient;
40+
41+
/*
42+
* **Note:** This local variable is used for examples purposes only.
43+
* For production scenarios, you will need to map workflowInstanceIds to your business scenarios.
44+
*/
45+
private String instanceId;
46+
47+
/**
48+
* Run Fan Out / Fan In Demo Workflow
49+
*
50+
* @return the instanceId of the FanOutFanInWorkflow execution
51+
*/
52+
@PostMapping("start")
53+
public String fanoutfanin(@RequestBody List<String> input) {
54+
instanceId = daprWorkflowClient.scheduleNewWorkflow(FanOutFanInWorkflow.class, input);
55+
return instanceId;
56+
}
57+
58+
/**
59+
* Obtain the output of the workflow
60+
*
61+
* @return the output of the FanOutFanInWorkflow execution
62+
*/
63+
@GetMapping("output")
64+
public String output() throws TimeoutException {
65+
WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, true);
66+
if (instanceState != null) {
67+
return instanceState.readOutputAs(String.class);
68+
}
69+
return "N/A";
70+
}
71+
72+
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2023 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.springboot.examples.fanoutfanin;
15+
16+
import io.dapr.durabletask.Task;
17+
import io.dapr.workflows.Workflow;
18+
import io.dapr.workflows.WorkflowStub;
19+
import org.springframework.stereotype.Component;
20+
21+
import java.io.Serializable;
22+
import java.util.Comparator;
23+
import java.util.List;
24+
import java.util.stream.Collectors;
25+
26+
@Component
27+
public class FanOutFanInWorkflow implements Workflow {
28+
@Override
29+
public WorkflowStub create() {
30+
return ctx -> {
31+
ctx.getLogger().info("Starting Workflow: {}", ctx.getName());
32+
33+
List<String> inputs = ctx.getInput(List.class);
34+
if (inputs.isEmpty()) {
35+
throw new IllegalStateException("Input cannot be empty.");
36+
}
37+
// This list will contain the tasks that will be executed by the Dapr Workflow engine.
38+
List<Task<WordLength>> tasks = inputs.stream()
39+
.map(input -> ctx.callActivity(GetWordLengthActivity.class.getName(), input,
40+
WordLength.class))
41+
.collect(Collectors.toList());
42+
43+
// The Dapr Workflow engine will schedule all the tasks and wait for all tasks to complete before continuing.
44+
List<WordLength> allWordLengths = ctx.allOf(tasks).await();
45+
//Let's sort the list of WordLengths
46+
allWordLengths.sort(Comparator.comparingInt(WordLength::length));
47+
//Pick the first one
48+
String shortestWord = allWordLengths.get(0).word();
49+
50+
ctx.complete(shortestWord);
51+
};
52+
}
53+
record WordLength(String word, int length) implements Serializable {}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2023 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.springboot.examples.fanoutfanin;
15+
16+
import io.dapr.workflows.WorkflowActivity;
17+
import io.dapr.workflows.WorkflowActivityContext;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
import org.springframework.stereotype.Component;
21+
22+
@Component
23+
public class GetWordLengthActivity implements WorkflowActivity {
24+
25+
@Override
26+
public Object run(WorkflowActivityContext ctx) {
27+
Logger logger = LoggerFactory.getLogger(GetWordLengthActivity.class);
28+
var input = ctx.getInput(String.class);
29+
logger.info("{} : Received input: {}", ctx.getName(), input);
30+
return new FanOutFanInWorkflow.WordLength(input, input.length());
31+
}
32+
33+
34+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
spring.application.name=fan-out-fan-in

0 commit comments

Comments
 (0)