-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfork_join
82 lines (70 loc) · 2.47 KB
/
fork_join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package thread;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.logging.Logger;
public class ForkJoinMain {
public static void main(String[] args) {
ForkJoinPool commonPool = ForkJoinPool.commonPool();
commonPool.invoke(
new CustomRecursiveAction("THis is sdfsdf dsfnsf sdfnsdkf sdfondsf dsofjndsf dskf"));
}
}
class CustomRecursiveAction extends RecursiveAction {
private static final int THRESOLD = 4;
private static Logger logger = Logger.getAnonymousLogger();
private String workload = "";
CustomRecursiveAction(String workload) {
this.workload = workload;
}
@Override
protected void compute() {
if (workload.length() > THRESOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}
private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
}
private void processing(String work) {
String result = work.toUpperCase();
logger.info(
"This result - " + result + " was processed by " + Thread.currentThread().getName());
}
}
class CustomRecursiveTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 20;
private int[] arr;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks()).stream().mapToInt(ForkJoinTask::join).sum();
} else {
return processing(arr);
}
}
private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr).filter(a -> a > 10 && a < 27).map(a -> a * 10).sum();
}
}