Skip to content

Commit 87220a4

Browse files
committed
[Fix #727] Adding persistence support
Signed-off-by: fjtirado <[email protected]>
1 parent 79dcd02 commit 87220a4

File tree

11 files changed

+149
-28
lines changed

11 files changed

+149
-28
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowModel;
19+
import java.time.Instant;
20+
21+
public record CompletedTaskInfo(
22+
Instant instant,
23+
WorkflowModel model,
24+
WorkflowModel context,
25+
Boolean isEndNode,
26+
String nextPosition)
27+
implements PersistenceTaskInfo {}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public interface PersistenceInstanceWriter extends AutoCloseable {
3232

3333
void resumed(WorkflowContextData workflowContext);
3434

35+
void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext);
36+
3537
void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext);
3638

3739
void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext);

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,4 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18-
import io.serverlessworkflow.impl.WorkflowModel;
19-
import java.time.Instant;
20-
21-
public record PersistenceTaskInfo(
22-
Instant instant,
23-
WorkflowModel model,
24-
WorkflowModel context,
25-
Boolean isEndNode,
26-
String nextPosition) {}
18+
public interface PersistenceTaskInfo {}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
public record RetriedTaskInfo(short retryAttempt) implements PersistenceTaskInfo {}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,20 @@ public CompletableFuture<WorkflowModel> start() {
4747
@Override
4848
public void restoreContext(WorkflowContext workflow, TaskContext context) {
4949
PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer());
50-
if (taskInfo != null) {
51-
context.output(taskInfo.model());
52-
context.completedAt(taskInfo.instant());
50+
if (taskInfo instanceof CompletedTaskInfo completedTaskInfo) {
51+
context.output(completedTaskInfo.model());
52+
context.completedAt(completedTaskInfo.instant());
5353
context.transition(
5454
new TransitionInfo(
55-
taskInfo.nextPosition() == null
55+
completedTaskInfo.nextPosition() == null
5656
? null
57-
: workflow.definition().taskExecutor(taskInfo.nextPosition()),
58-
taskInfo.isEndNode()));
59-
workflow.context(taskInfo.context());
57+
: workflow.definition().taskExecutor(completedTaskInfo.nextPosition()),
58+
completedTaskInfo.isEndNode()));
59+
workflow.context(completedTaskInfo.context());
60+
} else if (taskInfo instanceof RetriedTaskInfo retriedTaskInfo) {
61+
if (context.retryAttempt() == 0) {
62+
context.retryAttempt(retriedTaskInfo.retryAttempt());
63+
}
6064
}
6165
}
6266
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1919

2020
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
21+
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
2122
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
2223
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
2324
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
@@ -75,6 +76,11 @@ public void onTaskCompleted(TaskCompletedEvent ev) {
7576
persistenceWriter.taskCompleted(ev.workflowContext(), ev.taskContext());
7677
}
7778

79+
@Override
80+
public void onTaskRetried(TaskRetriedEvent ev) {
81+
persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext());
82+
}
83+
7884
public void close() {
7985
safeClose(persistenceWriter);
8086
}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ public void aborted(WorkflowContextData workflowContext) {
6868
@Override
6969
public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {}
7070

71+
@Override
72+
public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) {
73+
doTransaction(
74+
t ->
75+
t.tasks(key(workflowContext))
76+
.put(
77+
taskContext.position().jsonPointer(),
78+
marshallTaskRetried(workflowContext, (TaskContext) taskContext)));
79+
}
80+
7181
@Override
7282
public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) {
7383
doTransaction(
@@ -108,5 +118,8 @@ protected void removeProcessInstance(WorkflowContextData workflowContext) {
108118
protected abstract T marshallTaskCompleted(
109119
WorkflowContextData workflowContext, TaskContext taskContext);
110120

121+
protected abstract T marshallTaskRetried(
122+
WorkflowContextData workflowContext, TaskContext taskContext);
123+
111124
protected abstract S marshallStatus(WorkflowStatus status);
112125
}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import io.serverlessworkflow.impl.WorkflowStatus;
2020
import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory;
2121
import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer;
22+
import io.serverlessworkflow.impl.persistence.CompletedTaskInfo;
2223
import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo;
24+
import io.serverlessworkflow.impl.persistence.RetriedTaskInfo;
2325
import java.io.ByteArrayInputStream;
2426
import java.time.Instant;
2527

@@ -36,21 +38,43 @@ public BytesMapInstanceReader(
3638
@Override
3739
protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) {
3840
try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) {
39-
buffer.readByte(); // version byte not used at the moment
40-
Instant date = buffer.readInstant();
41-
WorkflowModel model = (WorkflowModel) buffer.readObject();
42-
WorkflowModel context = (WorkflowModel) buffer.readObject();
43-
Boolean isEndNode = null;
44-
String nextPosition = null;
45-
isEndNode = buffer.readBoolean();
46-
boolean hasNext = buffer.readBoolean();
47-
if (hasNext) {
48-
nextPosition = buffer.readString();
41+
byte version = buffer.readByte();
42+
switch (version) {
43+
case MarshallingUtils.VERSION_0:
44+
default:
45+
return readVersion0(buffer);
46+
case MarshallingUtils.VERSION_1:
47+
return readVersion1(buffer);
4948
}
50-
return new PersistenceTaskInfo(date, model, context, isEndNode, nextPosition);
5149
}
5250
}
5351

52+
private PersistenceTaskInfo readVersion1(WorkflowInputBuffer buffer) {
53+
54+
TaskStatus taskStatus = buffer.readEnum(TaskStatus.class);
55+
switch (taskStatus) {
56+
case COMPLETED:
57+
default:
58+
return readVersion0(buffer);
59+
case RETRIED:
60+
return new RetriedTaskInfo(buffer.readShort());
61+
}
62+
}
63+
64+
private PersistenceTaskInfo readVersion0(WorkflowInputBuffer buffer) {
65+
Instant date = buffer.readInstant();
66+
WorkflowModel model = (WorkflowModel) buffer.readObject();
67+
WorkflowModel context = (WorkflowModel) buffer.readObject();
68+
Boolean isEndNode = null;
69+
String nextPosition = null;
70+
isEndNode = buffer.readBoolean();
71+
boolean hasNext = buffer.readBoolean();
72+
if (hasNext) {
73+
nextPosition = buffer.readString();
74+
}
75+
return new CompletedTaskInfo(date, model, context, isEndNode, nextPosition);
76+
}
77+
5478
@Override
5579
protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) {
5680
try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) {

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public BytesMapInstanceWriter(
4040
protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) {
4141
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
4242
try (WorkflowOutputBuffer writer = factory.output(bytes)) {
43-
writer.writeByte(MarshallingUtils.VERSION_0);
43+
writer.writeByte(MarshallingUtils.VERSION_1);
44+
writer.writeEnum(TaskStatus.COMPLETED);
4445
writer.writeInstant(taskContext.completedAt());
4546
writeModel(writer, taskContext.output());
4647
writeModel(writer, contextData.context());
@@ -82,4 +83,16 @@ protected byte[] marshallInstance(WorkflowInstanceData instance) {
8283
protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) {
8384
writer.writeObject(model);
8485
}
86+
87+
@Override
88+
protected byte[] marshallTaskRetried(
89+
WorkflowContextData workflowContext, TaskContext taskContext) {
90+
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
91+
try (WorkflowOutputBuffer writer = factory.output(bytes)) {
92+
writer.writeByte(MarshallingUtils.VERSION_1);
93+
writer.writeEnum(TaskStatus.RETRIED);
94+
writer.writeShort(taskContext.retryAttempt());
95+
}
96+
return bytes.toByteArray();
97+
}
8598
}

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ class MarshallingUtils {
2020
private MarshallingUtils() {}
2121

2222
public static final byte VERSION_0 = 0;
23+
public static final byte VERSION_1 = 1;
2324
}

0 commit comments

Comments
 (0)