Skip to content

Commit 005af43

Browse files
committed
[Fix #847] Alternative implementation
Signed-off-by: fjtirado <[email protected]>
1 parent 0d40db7 commit 005af43

File tree

8 files changed

+158
-63
lines changed

8 files changed

+158
-63
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import static io.serverlessworkflow.impl.WorkflowUtils.*;
19+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1920

2021
import io.serverlessworkflow.api.types.Input;
2122
import io.serverlessworkflow.api.types.ListenTo;
2223
import io.serverlessworkflow.api.types.Output;
2324
import io.serverlessworkflow.api.types.Schedule;
2425
import io.serverlessworkflow.api.types.Workflow;
2526
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
26-
import io.serverlessworkflow.impl.events.EventRegistrationInfo;
2727
import io.serverlessworkflow.impl.executors.TaskExecutor;
2828
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2929
import io.serverlessworkflow.impl.resources.ResourceLoader;
@@ -41,11 +41,11 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
4141
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
4242
private Optional<WorkflowFilter> inputFilter = Optional.empty();
4343
private Optional<WorkflowFilter> outputFilter = Optional.empty();
44-
private EventRegistrationInfo registrationInfo;
4544
private final WorkflowApplication application;
4645
private final TaskExecutor<?> taskExecutor;
4746
private final ResourceLoader resourceLoader;
4847
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
48+
private ScheduledEventConsumer scheculedConsumer;
4949

5050
private WorkflowDefinition(
5151
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -83,32 +83,18 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
8383
if (schedule != null) {
8484
ListenTo to = schedule.getOn();
8585
if (to != null) {
86-
definition.register(
87-
application.scheduler().eventConsumer(definition, application.modelFactory()::from),
88-
EventRegistrationBuilderInfo.from(application, to, x -> null));
86+
definition.scheculedConsumer =
87+
application
88+
.scheduler()
89+
.eventConsumer(
90+
definition,
91+
application.modelFactory()::from,
92+
EventRegistrationBuilderInfo.from(application, to, x -> null));
8993
}
9094
}
9195
return definition;
9296
}
9397

94-
private void register(ScheduledEventConsumer consumer, EventRegistrationBuilderInfo builderInfo) {
95-
WorkflowModelCollection model = application.modelFactory().createCollection();
96-
registrationInfo =
97-
EventRegistrationInfo.<WorkflowModel>build(
98-
builderInfo.registrations(),
99-
(ce, f) -> consumer.accept(ce, f, model),
100-
application.eventConsumer());
101-
registrationInfo
102-
.completableFuture()
103-
.thenAccept(
104-
x -> {
105-
EventRegistrationInfo prevRegistrationInfo = registrationInfo;
106-
register(consumer, builderInfo);
107-
consumer.start(model);
108-
prevRegistrationInfo.registrations().forEach(application.eventConsumer()::unregister);
109-
});
110-
}
111-
11298
public WorkflowInstance instance(Object input) {
11399
WorkflowModel inputModel = application.modelFactory().fromAny(input);
114100
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
@@ -159,8 +145,6 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
159145

160146
@Override
161147
public void close() {
162-
if (registrationInfo != null) {
163-
registrationInfo.registrations().forEach(application.eventConsumer()::unregister);
164-
}
148+
safeClose(scheculedConsumer);
165149
}
166150
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
1920
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
2021
import java.util.Collection;
2122
import java.util.function.Function;
@@ -24,5 +25,7 @@ public interface WorkflowScheduler {
2425
Collection<WorkflowInstance> scheduledInstances();
2526

2627
ScheduledEventConsumer eventConsumer(
27-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter);
28+
WorkflowDefinition definition,
29+
Function<CloudEvent, WorkflowModel> converter,
30+
EventRegistrationBuilderInfo info);
2831
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowScheduler;
23+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
2627
import java.util.function.Function;
2728

2829
public class DefaultWorkflowScheduler implements WorkflowScheduler {
2930

30-
private Collection<WorkflowInstance> instances = new ArrayList<>();
31+
private Collection<WorkflowInstance> instances =
32+
Collections.synchronizedCollection(new ArrayList<>());
3133

3234
@Override
3335
public Collection<WorkflowInstance> scheduledInstances() {
@@ -36,8 +38,10 @@ public Collection<WorkflowInstance> scheduledInstances() {
3638

3739
@Override
3840
public ScheduledEventConsumer eventConsumer(
39-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
40-
return new ScheduledEventConsumer(definition, converter) {
41+
WorkflowDefinition definition,
42+
Function<CloudEvent, WorkflowModel> converter,
43+
EventRegistrationBuilderInfo builderInfo) {
44+
return new ScheduledEventConsumer(definition, converter, builderInfo) {
4145
@Override
4246
protected void addScheduledInstance(WorkflowInstance instance) {
4347
instances.add(instance);

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,107 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowModelCollection;
23-
import java.util.concurrent.CompletableFuture;
23+
import io.serverlessworkflow.impl.events.EventConsumer;
24+
import io.serverlessworkflow.impl.events.EventRegistration;
25+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
26+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
27+
import java.util.ArrayList;
28+
import java.util.Collection;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
2432
import java.util.function.Function;
2533

26-
public abstract class ScheduledEventConsumer {
34+
public abstract class ScheduledEventConsumer implements AutoCloseable {
2735

2836
private final Function<CloudEvent, WorkflowModel> converter;
2937
private final WorkflowDefinition definition;
38+
private final EventRegistrationBuilderInfo builderInfo;
39+
private final EventConsumer eventConsumer;
40+
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
41+
private Collection<EventRegistration> registrations = new ArrayList<>();
3042

3143
protected ScheduledEventConsumer(
32-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
44+
WorkflowDefinition definition,
45+
Function<CloudEvent, WorkflowModel> converter,
46+
EventRegistrationBuilderInfo builderInfo) {
3347
this.definition = definition;
3448
this.converter = converter;
49+
this.builderInfo = builderInfo;
50+
this.eventConsumer = definition.application().eventConsumer();
51+
if (builderInfo.registrations().isAnd()
52+
&& builderInfo.registrations().registrations().size() > 1) {
53+
this.correlatedEvents = new HashMap<>();
54+
builderInfo
55+
.registrations()
56+
.registrations()
57+
.forEach(
58+
reg -> {
59+
correlatedEvents.put(reg, new ArrayList<>());
60+
registrations.add(
61+
eventConsumer.register(reg, ce -> consumeEvent(reg, (CloudEvent) ce)));
62+
});
63+
} else {
64+
builderInfo
65+
.registrations()
66+
.registrations()
67+
.forEach(
68+
reg -> registrations.add(eventConsumer.register(reg, ce -> start((CloudEvent) ce))));
69+
}
3570
}
3671

37-
public void accept(
38-
CloudEvent t, CompletableFuture<WorkflowModel> u, WorkflowModelCollection col) {
39-
WorkflowModel model = converter.apply(t);
40-
col.add(model);
41-
u.complete(model);
72+
private void consumeEvent(EventRegistrationBuilder reg, CloudEvent ce) {
73+
Collection<Collection<CloudEvent>> collections = new ArrayList<>();
74+
// to minimize the critical section, conversion is done later, here we are
75+
// performing
76+
// just collection, if any
77+
synchronized (correlatedEvents) {
78+
correlatedEvents.get(reg).add((CloudEvent) ce);
79+
while (satisfyCondition()) {
80+
Collection<CloudEvent> collection = new ArrayList<>();
81+
for (List<CloudEvent> values : correlatedEvents.values()) {
82+
collection.add(values.remove(0));
83+
}
84+
collections.add(collection);
85+
}
86+
}
87+
// convert and start outside synchronized
88+
collections.forEach(this::start);
4289
}
4390

44-
public void start(Object model) {
91+
private boolean satisfyCondition() {
92+
for (List<CloudEvent> values : correlatedEvents.values()) {
93+
if (values.isEmpty()) {
94+
return false;
95+
}
96+
}
97+
return true;
98+
}
99+
100+
protected void start(CloudEvent ce) {
101+
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
102+
model.add(converter.apply(ce));
103+
start(model);
104+
}
105+
106+
protected void start(Collection<CloudEvent> ces) {
107+
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
108+
ces.forEach(ce -> model.add(converter.apply(ce)));
109+
start(model);
110+
}
111+
112+
private void start(WorkflowModel model) {
45113
WorkflowInstance instance = definition.instance(model);
46114
addScheduledInstance(instance);
47115
instance.start();
48116
}
49117

118+
public void close() {
119+
if (correlatedEvents != null) {
120+
correlatedEvents.clear();
121+
}
122+
registrations.forEach(eventConsumer::unregister);
123+
}
124+
50125
protected abstract void addScheduledInstance(WorkflowInstance instace);
51126
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventTest.java renamed to impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,41 +32,52 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.concurrent.ExecutionException;
35-
import org.junit.jupiter.api.AfterAll;
36-
import org.junit.jupiter.api.BeforeAll;
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
3737
import org.junit.jupiter.api.Test;
3838

39-
class ScheduleEventTest {
39+
class ScheduleEventConsumerTest {
4040

41-
private static WorkflowApplication appl;
41+
private WorkflowApplication appl;
4242

43-
@BeforeAll
44-
static void init() throws IOException {
45-
appl = WorkflowApplication.builder().build();
43+
@BeforeEach
44+
void init() throws IOException {
45+
appl = WorkflowApplication.builder().withListener(new TraceExecutionListener()).build();
4646
}
4747

48-
@AfterAll
49-
static void tearDown() throws IOException {
48+
@AfterEach
49+
void tearDown() throws IOException {
5050
appl.close();
5151
}
5252

5353
@Test
54-
void testStartUsingEvent() throws IOException, InterruptedException, ExecutionException {
55-
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml"));
56-
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
54+
void testAllEvent() throws IOException, InterruptedException, ExecutionException {
55+
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start-all.yaml"));
5756
Collection<WorkflowInstance> instances = appl.scheduler().scheduledInstances();
57+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
58+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
5859
await()
59-
.pollDelay(Duration.ofMillis(10))
60-
.atMost(Duration.ofMillis(200))
61-
.until(() -> instances.size() == 1);
60+
.pollDelay(Duration.ofMillis(20))
61+
.atMost(Duration.ofMillis(500))
62+
.until(() -> instances.stream().filter(i -> i.output() != null).count() == 1);
63+
assertThat((Collection) assertThat(instances).singleElement().actual().output().asJavaObject())
64+
.containsExactlyInAnyOrder("Javierito", "Fulanito");
65+
}
66+
67+
@Test
68+
void testOneOfEvent() throws IOException, InterruptedException, ExecutionException {
69+
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml"));
70+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
6271
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
72+
Collection<WorkflowInstance> instances = appl.scheduler().scheduledInstances();
6373
await()
64-
.pollDelay(Duration.ofMillis(10))
65-
.atMost(Duration.ofMillis(200))
66-
.until(() -> instances.size() == 2);
74+
.pollDelay(Duration.ofMillis(20))
75+
.atMost(Duration.ofMillis(500))
76+
.until(() -> instances.stream().filter(i -> i.output() != null).count() == 2);
6777
List<Object> outputs = instances.stream().map(i -> i.output().asJavaObject()).toList();
68-
assertThat(outputs.get(0)).isEqualTo(Map.of("recovered", "Javierito"));
69-
assertThat(outputs.get(1)).isEqualTo(Map.of("recovered", "Fulanito"));
78+
assertThat(outputs)
79+
.containsExactlyInAnyOrder(
80+
Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito"));
7081
}
7182

7283
private CloudEvent buildCloudEvent(Object data) {

impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public class TraceExecutionListener implements WorkflowExecutionListener {
3737
private static final Logger logger = LoggerFactory.getLogger(TraceExecutionListener.class);
3838

3939
public void onWorkflowStarted(WorkflowStartedEvent ev) {
40-
4140
logger.info(
42-
"Workflow definition {} with id {} started at {}",
41+
"Workflow definition {} with id {} started at {} with data {}",
4342
ev.workflowContext().definition().workflow().getDocument().getName(),
4443
ev.workflowContext().instanceData().id(),
45-
ev.eventDate());
44+
ev.eventDate(),
45+
ev.workflowContext().instanceData().input());
4646
}
4747

4848
public void onWorkflowResumed(WorkflowResumedEvent ev) {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
document:
2+
dsl: '1.0.1'
3+
namespace: test
4+
name: event-driven-schedule-all
5+
version: '0.1.0'
6+
schedule:
7+
on:
8+
all:
9+
- with:
10+
type: com.example.hospital.events.patients.recover
11+
data: ${.name == "Javierito"}
12+
- with:
13+
type: com.example.hospital.events.patients.recover
14+
data: ${.name == "Fulanito"}
15+
do:
16+
- recovered:
17+
set: ${[$workflow.input[]|.data.name]}
18+

impl/test/src/test/resources/workflows-samples/listen-start.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
document:
22
dsl: '1.0.1'
3-
namespace: examples
3+
namespace: test
44
name: event-driven-schedule
55
version: '0.1.0'
66
schedule:

0 commit comments

Comments
 (0)