Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions experimental/fluent/func/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${version.org.mockito}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallTaskJava;
import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder;
import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations;
import io.serverlessworkflow.fluent.spec.TaskBaseBuilder;
Expand Down Expand Up @@ -61,6 +62,16 @@ public <T, V> FuncCallTaskBuilder function(
return this;
}

public <T, V> FuncCallTaskBuilder function(JavaFilterFunction<T, V> function) {
return function(function, null);
}

public <T, V> FuncCallTaskBuilder function(JavaFilterFunction<T, V> function, Class<T> argClass) {
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass));
super.setTask(this.callTaskJava.getCallJava());
return this;
}

/** Accept a side-effect Consumer; engine should pass input through unchanged. */
public <T> FuncCallTaskBuilder consumer(Consumer<T> consumer) {
this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.serverlessworkflow.fluent.func.dsl;

import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder;
import java.util.function.Consumer;
Expand All @@ -26,6 +27,7 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
private final String name;
private final Function<T, R> fn;
private final JavaContextFunction<T, R> ctxFn;
private final JavaFilterFunction<T, R> filterFn;
private final Class<T> argClass;

/** Function<T,R> variant (unnamed). */
Expand All @@ -38,6 +40,7 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
this.name = name;
this.fn = fn;
this.ctxFn = null;
this.filterFn = null;
this.argClass = argClass;
}

Expand All @@ -51,6 +54,21 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
this.name = name;
this.fn = null;
this.ctxFn = ctxFn;
this.filterFn = null;
this.argClass = argClass;
}

/** JavaFilterFunction<T,R> variant (unnamed). */
FuncCallStep(JavaFilterFunction<T, R> filterFn, Class<T> argClass) {
this(null, filterFn, argClass);
}

/** JavaFilterFunction<T,R> variant (named). */
FuncCallStep(String name, JavaFilterFunction<T, R> filterFn, Class<T> argClass) {
this.name = name;
this.fn = null;
this.ctxFn = null;
this.filterFn = filterFn;
this.argClass = argClass;
}

Expand All @@ -60,6 +78,8 @@ protected void configure(FuncTaskItemListBuilder list, Consumer<FuncCallTaskBuil
cb -> {
if (ctxFn != null) {
cb.function(ctxFn, argClass);
} else if (filterFn != null) {
cb.function(filterFn, argClass);
} else {
cb.function(fn, argClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.cloudevents.CloudEventData;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder;
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;
Expand All @@ -26,6 +27,8 @@
import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer;
import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer;
import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps;
import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -286,7 +289,7 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> cla
}

/**
* Build a call step for functions that need {@code WorkflowContextData} as the first parameter.
* Build a call step for functions that need {@link WorkflowContextData} as the first parameter.
* The DSL wraps it as a {@link JavaContextFunction} and injects the runtime context.
*
* <p>Signature expected: {@code (ctx, payload) -> result}
Expand All @@ -297,7 +300,7 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> cla
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> withContext(CtxBiFunction<T, R> fn, Class<T> in) {
public static <T, R> FuncCallStep<T, R> withContext(JavaContextFunction<T, R> fn, Class<T> in) {
return withContext(null, fn, in);
}

Expand All @@ -319,7 +322,7 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
}

/**
* Named variant of {@link #withContext(CtxBiFunction, Class)}.
* Named variant of {@link #withContext(JavaContextFunction, Class)}.
*
* @param name task name
* @param fn context-aware bi-function
Expand All @@ -329,9 +332,40 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> withContext(
String name, CtxBiFunction<T, R> fn, Class<T> in) {
JavaContextFunction<T, R> jcf = (payload, wctx) -> fn.apply(wctx, payload);
return new FuncCallStep<>(name, jcf, in);
String name, JavaContextFunction<T, R> fn, Class<T> in) {
return new FuncCallStep<>(name, fn, in);
}

/**
* Build a call step for functions that need {@link WorkflowContextData} and {@link
* io.serverlessworkflow.impl.TaskContextData} as the first and second parameter. The DSL wraps it
* as a {@link JavaFilterFunction} and injects the runtime context.
*
* <p>Signature expected: {@code (wctx, tctx, payload) -> result}
*
* @param fn context-aware bi-function
* @param in payload input class
* @param <T> input type
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> withFilter(JavaFilterFunction<T, R> fn, Class<T> in) {
return withFilter(null, fn, in);
}

/**
* Named variant of {@link #withFilter(JavaFilterFunction, Class)}.
*
* @param name task name
* @param fn context-aware bi-function
* @param in payload input class
* @param <T> input type
* @param <R> result type
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> withFilter(
String name, JavaFilterFunction<T, R> fn, Class<T> in) {
return new FuncCallStep<>(name, fn, in);
}

/**
Expand All @@ -350,6 +384,38 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
return new FuncCallStep<>(name, jcf, in);
}

/**
* Builds a composition of the current workflow instance id and the definition of the task
* position as a JSON pointer.
*/
static String defaultUniqueId(WorkflowContextData wctx, TaskContextData tctx) {
return String.format("%s-%s", wctx.instanceData().id(), tctx.position().jsonPointer());
}

/**
* Build a call step for functions that expect a composition with the workflow instance id and the
* task position as the first parameter. The instance ID is extracted from the runtime context,
* the task position from the definition.
*
* <p>Signature expected: {@code (uniqueId, payload) -> result}
*
* @param fn unique-id-aware bi-function
* @param in payload input class
* @param <T> input type
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> withUniqueId(
String name, UniqueIdBiFunction<T, R> fn, Class<T> in) {
JavaFilterFunction<T, R> jff =
(payload, wctx, tctx) -> fn.apply(defaultUniqueId(wctx, tctx), payload);
return new FuncCallStep<>(name, jff, in);
}

public static <T, R> FuncCallStep<T, R> withUniqueId(UniqueIdBiFunction<T, R> fn, Class<T> in) {
return withUniqueId(null, fn, in);
}

/**
* Create a fire-and-forget side-effect step (unnamed). The consumer receives the typed input.
*
Expand Down Expand Up @@ -387,12 +453,12 @@ public static <T> ConsumeStep<T> consume(String name, Consumer<T> consumer, Clas
* @param <R> result type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> agent(InstanceIdBiFunction<T, R> fn, Class<T> in) {
return withInstanceId(fn, in);
public static <T, R> FuncCallStep<T, R> agent(UniqueIdBiFunction<T, R> fn, Class<T> in) {
return withUniqueId(fn, in);
}

/**
* Named agent-style sugar. See {@link #agent(InstanceIdBiFunction, Class)}.
* Named agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}.
*
* @param name task name
* @param fn (instanceId, payload) -> result
Expand All @@ -402,8 +468,8 @@ public static <T, R> FuncCallStep<T, R> agent(InstanceIdBiFunction<T, R> fn, Cla
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> agent(
String name, InstanceIdBiFunction<T, R> fn, Class<T> in) {
return withInstanceId(name, fn, in);
String name, UniqueIdBiFunction<T, R> fn, Class<T> in) {
return withUniqueId(name, fn, in);
}

/**
Expand Down Expand Up @@ -677,7 +743,7 @@ public static <T> FuncTaskConfigurer switchWhenOrElse(
* switchWhenOrElse(".approved == true", "sendEmail", FlowDirectiveEnum.END)
* </pre>
*
* The JQ expression is evaluated against the task input at runtime.
* <p>The JQ expression is evaluated against the task input at runtime.
*/
public static FuncTaskConfigurer switchWhenOrElse(
String jqExpression, String thenTask, FlowDirectiveEnum otherwise) {
Expand All @@ -698,7 +764,7 @@ public static FuncTaskConfigurer switchWhenOrElse(
* switchWhenOrElse(".score >= 80", "pass", "fail")
* </pre>
*
* The JQ expression is evaluated against the task input at runtime.
* <p>The JQ expression is evaluated against the task input at runtime.
*/
public static FuncTaskConfigurer switchWhenOrElse(
String jqExpression, String thenTask, String otherwiseTask) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.fluent.func.dsl;

/**
* Functions that expect a unique ID injection in runtime, typically an idempotent generated unique
* id based on the workflow instance id and task name.
*
* @param <T> The task payload input
* @param <R> The task result output
*/
@FunctionalInterface
public interface UniqueIdBiFunction<T, R> {
R apply(String uniqueId, T object);
}
Loading