Skip to content

Commit 1ae7eac

Browse files
Fix #731 - Add HITL / Agentic DSL documentation (#768)
* Fix #731 - Add HITL / Agentic DSL documentation Signed-off-by: Ricardo Zanini <[email protected]> * Adjusting readme Signed-off-by: Ricardo Zanini <[email protected]> --------- Signed-off-by: Ricardo Zanini <[email protected]>
1 parent 5f8023d commit 1ae7eac

File tree

4 files changed

+207
-19
lines changed

4 files changed

+207
-19
lines changed
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# CNCF Serverless Workflow SDK Java — Agentic DSL
2+
3+
## What is the DSL?
4+
5+
This module uses the **CNCF Workflow Specification Java Fluent DSL (DSL 1.0.0, tasks‑based)**. It’s a **strongly‑typed, builder‑style API** for composing workflows in Java while staying faithful to the CNCF spec’s execution model and event semantics.
6+
7+
**Core ideas:**
8+
9+
* **Tasks first.** Compose first‑class **tasks** (sequence/branches) rather than legacy “states”.
10+
* **Fluent builders.** Typed verbs guide valid configurations:
11+
12+
* `agent(id, Agents.Foo)`**integrates with LangChain4j (LC4J) agentic modules**. Build the agent via `AgenticServices.agentBuilder(...)`, select the model/provider through config, and capture outputs via `outputName`.
13+
* `callFn(id, c -> c.function(MethodRef, ArgClass))` — call **pure Java functions** with static typing.
14+
* `switchCase(id, s -> …)` — branch with **typed predicates** and `onDefault(...)`.
15+
* `emit(id, e -> e.event(...))` — publish **CloudEvents** with typed payload marshalling.
16+
* `listen(id, l -> l.to(e -> e.any(...) / e.all(...)))`**wait** for one or more events before continuing.
17+
* **Event‑native.** **CloudEvents** are the wire model for `emit`/`listen`, keeping components loosely coupled.
18+
* **Agentic‑AI friendly.** Agents are first‑class tasks; outputs flow into functions and policies naturally.
19+
* **Embeddable runtime.** `WorkflowApplication` runs definitions **in‑process** (great for tests/services).
20+
* **Type‑safe data flow.** Inputs/outputs keep their static types (e.g., `PolicyDecision.class`).
21+
22+
> [!NOTE]
23+
> **Module layering & mix‑and‑match**
24+
>
25+
> ```
26+
> spec → func → agentic
27+
> ```
28+
>
29+
> * **`spec`** — CNCF‑only core (workflows, tasks, events)
30+
> * **`func`** — adds **Java function calls** & **predicate branching** on top of `spec`
31+
> * **`agentic`** — adds **LangChain4j agent calls** on top of `func` + `spec`
32+
>
33+
> Because of this hierarchy, you can **freely mix** `agent(...)` with core CNCF tasks and Java `callFn(...)`/`switchCase(...)` in the same workflow.
34+
35+
---
36+
37+
## Email Drafter Agentic Workflow (use case)
38+
39+
> [!NOTE]
40+
> The full integration test can be seen in [src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java]().
41+
42+
**What it does:**
43+
Drafts an email with an agent, **parses** it, runs a **policy check**, and either:
44+
45+
* **Auto‑marks the email as ready**, or
46+
* **Requests human review** and **waits** for an approval/denial event.
47+
48+
**Main steps:**
49+
50+
1. **`agentEmailDrafter`** – `Agents.EmailDrafter` → `email_draft`
51+
2. **`parseDraft`** – `EmailDrafts::parse(String)` → `EmailDraft`
52+
3. **`policyCheck`** – `EmailPolicies::policyCheck(EmailDraft)` → `PolicyDecision`
53+
4. **`needsHumanReview?`** – if decision ≠ `AUTO_SEND`: emit **`org.acme.email.review.required`** and **listen** for `org.acme.email.approved` **or** `org.acme.email.denied`
54+
5. **`emailFinished`** – emit **`org.acme.email.finished`**
55+
56+
**Mermaid view (generated with our Mermaid library):**
57+
58+
```mermaid
59+
---
60+
config:
61+
look: handDrawn
62+
theme: base
63+
---
64+
flowchart TD
65+
n_agentemaildrafter_760e461ad032@{ shape: rect, label: "call: agentEmailDrafter" }
66+
n_agentemaildrafter_760e461ad032-->n_parsedraft_924cfc80438b
67+
n_parsedraft_924cfc80438b@{ shape: rect, label: "call: parseDraft" }
68+
n_parsedraft_924cfc80438b-->n_policycheck_19f6595cf361
69+
n_policycheck_19f6595cf361@{ shape: rect, label: "call: policyCheck" }
70+
n_policycheck_19f6595cf361-->n_needshumanreview_b6d9ececf6f3
71+
n_needshumanreview_b6d9ececf6f3@{ shape: diam, label: "switch: needsHumanReview?" }
72+
n_needshumanreview_b6d9ececf6f3--default-->n_emailready_931221ddff95
73+
n_needshumanreview_b6d9ececf6f3--default-->n_requestreview_83b9e0aa6873
74+
n_requestreview_83b9e0aa6873@{ shape: lean-r, label: "emit: **org.acme.email.review.required**" }
75+
n_requestreview_83b9e0aa6873-->n_waitforreview_0e05540cda49
76+
subgraph n_waitforreview_0e05540cda49["listen"]
77+
direction TB
78+
n_-vntcpr_0@{ shape: note, label: "to ANY events: <br/>• org.acme.email.approved<br/>• org.acme.email.denied" }
79+
n_-vntcpr_0-->n_-qyvbjm_0
80+
n_-qyvbjm_0@{ shape: f-circ, label: "join" }
81+
n_-qyvbjm_0-->n_-xenlzr_0
82+
n_-xenlzr_0@{ shape: rect, label: "waitForReview" }
83+
end
84+
n_waitforreview_0e05540cda49-->n_emailready_931221ddff95
85+
n_emailready_931221ddff95@{ shape: lean-r, label: "emit: **org.acme.email.finished**" }
86+
n_emailready_931221ddff95-->n__end__
87+
n__start__@{ shape: sm-circ, label: "Start" }
88+
n__start__-->n_agentemaildrafter_760e461ad032
89+
n__end__@{ shape: stop, label: "End" }
90+
```
91+
92+
---
93+
94+
## Human‑in‑the‑Loop (HITL) in Java Enterprise: Why & How
95+
96+
**Why HITL matters:** In real organizations, certain actions must be **reviewed or approved by a person** before they’re executed. Reasons include compliance (SOX, GDPR/PII), brand/reputation risk, contractual obligations, or simply to protect end‑users. With this DSL you can **codify those gates** while keeping the rest fully automated.
97+
98+
### What the Email Drafter workflow demonstrates
99+
100+
* **Risk‑based gating.** A policy converts the draft into a `PolicyDecision`. If risk is low → `AUTO_SEND`; else we **emit** `org.acme.email.review.required` and **listen** for approval/denial.
101+
* **Asynchronous review.** Business users can take minutes or hours. The workflow stays **durably waiting** via `listen(...any(...))` and resumes when an event arrives.
102+
* **Auditable trail.** Every `emit`/`listen` edge is a CloudEvent, so you get a clear audit trail: *who* approved, *when*, and *what* changed.
103+
104+
### Where this shines in Java enterprise apps
105+
106+
* **Customer communications & CRM.** Outbound emails, quotes, and renewals requiring manager/legal sign‑off.
107+
* **Procurement & finance.** PO creation, vendor onboarding, invoice disputes, refunds above threshold.
108+
* **HR & legal.** Offer letters, policy updates, external statements that need counsel approval.
109+
* **ITSM & DevOps.** Change approvals (CAB), production runbooks that pause for human confirmation.
110+
* **Healthcare & insurance.** Sensitive messaging that must be clinician/adjuster‑approved.
111+
* **Marketing & brand.** Campaign copy generation with brand/compliance review before release.
112+
113+
### A typical HITL architecture (event‑native)
114+
115+
1. **Workflow emits** `org.acme.email.review.required` with a correlation key (e.g., `data.workflowInstanceId`).
116+
2. **A reviewer UI** (React/Angular) lists pending items by reading from Kafka/AMQP or via a service (
117+
Quarkus/Spring) that projects CloudEvents into a DB.
118+
3. Reviewer **approves/denies** → UI calls a webhook
119+
(`POST /events/decisions`) that **publishes** a CloudEvent:
120+
121+
* `org.acme.email.approved` **or** `org.acme.email.denied`
122+
* includes the same correlation key so the waiting `listen` matches
123+
4. Workflow **resumes** and continues to `emailReady` or an alternate path.
124+
125+
> [!TIP]
126+
> Use CloudEvents attributes like `subject` (correlation), `type` (routing), and `time` (auditing). Store the event IDs to ensure **idempotency** if a reviewer double‑clicks.
127+
128+
### Example approval payload (conceptual)
129+
130+
```json
131+
{
132+
"type": "org.acme.email.approved",
133+
"subject": "wf:emailDrafterAgentic:2b9ee...",
134+
"data": {
135+
"approvedBy": "jdoe",
136+
"comments": "Looks good; added calendar link.",
137+
"redactions": ["phoneNumber"],
138+
"version": 3
139+
}
140+
}
141+
```
142+
143+
### Production considerations & best practices
144+
145+
* **Timeouts & escalation.** Add a timer branch (SLA breach → notify Slack/Jira or fall back to a safe default).
146+
* **Policy engines.** Externalize complex rules with **OPA** or a Java rules engine; keep `policyCheck` deterministic and testable.
147+
* **PII/redaction gates.** Run a redaction/safety step before approvals (PII scans, external domain checks, prompt‑injection guards).
148+
* **RBAC & separation of duties.** Ensure the reviewer isn’t the same person who drafted (auditors love this).
149+
* **Observability.** Emit metrics on time‑to‑approve, auto‑send rate, denial reasons; add tracing across `emit`/`listen`.
150+
* **Idempotency & retries.** Use event IDs and outbox/inbox patterns with Kafka/AMQP to avoid duplicate advances.
151+
* **Versioning.** Include draft `version` in events; if the text changed during review, request re‑approval.
152+
153+
### Extending the pattern
154+
155+
* **Multi‑stage approvals.** Chain multiple `listen` steps (e.g., manager → legal → compliance) with `any/all` strategies.
156+
* **Conditional reviewers.** Route to different queues based on region/domain or risk score.
157+
* **Partial automation.** Allow **auto‑send** for internal domains but require approval for external ones (`allowedDomains`).
158+
* **A/B safety.** Run two agents (draft + safety critique) before human review; only request HITL if they disagree.
159+
160+
This approach gives teams the **best of both worlds**: fast, LLM‑assisted generation with **governed, observable checkpoints** that fit naturally into Java enterprise stacks (Quarkus/Spring, JPA, Kafka/AMQP, REST/WebSockets) and compliance programs.
161+
162+
---
163+
164+
## Maven setup (single dependency)
165+
166+
**For application projects** a **single dependency** is enough. The `agentic` module brings the required transitive bits for you (core spec, func layer, runtime, etc.).
167+
168+
Requires **Java 17+**.
169+
170+
```xml
171+
<dependency>
172+
<groupId>io.serverlessworkflow</groupId>
173+
<artifactId>serverlessworkflow-experimental-fluent-agentic</artifactId>
174+
<version>YOUR_VERSION</version>
175+
</dependency>
176+
```
177+
178+
> [!TIP]
179+
> You can still mix `agent(...)` calls with `callFn(...)`, `switchCase(...)`, `emit(...)`, and `listen(...)` in the same workflow thanks to the `spec → func → agentic` layering.

experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ void email_drafter_agent() {
5656
.tasks(
5757
tasks ->
5858
tasks
59-
.agent(emailDrafter)
60-
.callFn(c -> c.function(EmailDrafts::parse, String.class))
61-
.callFn(c -> c.function(EmailPolicies::policyCheck, EmailDraft.class))
59+
.agent("agentEmailDrafter", emailDrafter)
60+
.callFn("parseDraft", c -> c.function(EmailDrafts::parse, String.class))
61+
.callFn(
62+
"policyCheck",
63+
c -> c.function(EmailPolicies::policyCheck, EmailDraft.class))
6264
.switchCase(
65+
"needsHumanReview?",
6366
s ->
6467
s.onPredicate(
6568
c ->
@@ -69,7 +72,7 @@ void email_drafter_agent() {
6972
decision.decision()),
7073
PolicyDecision.class)
7174
.then("requestReview"))
72-
.onDefault("emailReady"))
75+
.onDefault("emailFinished"))
7376
.emit(
7477
"requestReview",
7578
emit ->
@@ -94,7 +97,8 @@ void email_drafter_agent() {
9497
any -> any.with(r -> r.type("org.acme.email.approved")),
9598
any -> any.with(r -> r.type("org.acme.email.denied")))))
9699
.emit(
97-
"emailReady", emit -> emit.event(e -> e.type("org.acme.email.ready"))))
100+
"emailFinished",
101+
emit -> emit.event(e -> e.type("org.acme.email.finished"))))
98102
.build();
99103

100104
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
@@ -127,7 +131,7 @@ void email_drafter_agent() {
127131
app.eventConsumer()
128132
.listen(
129133
new EventFilter()
130-
.withWith(new EventProperties().withType("org.acme.email.ready")),
134+
.withWith(new EventProperties().withType("org.acme.email.finished")),
131135
app),
132136
ce -> finishedEvents.add((CloudEvent) ce));
133137

mermaid/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# serverlessworkflow-mermaid
1+
# CNCF Serverless Workflow SDK Java — Mermaid Exporter
22

33
Generate **Mermaid** diagrams for [Serverless Workflow](https://serverlessworkflow.io/) definitions.
44
This library turns a `Workflow` into a Mermaid **flowchart**, with sensible shapes and wiring for common DSL constructs, and can optionally export **SVG/PNG** via a lightweight HTTP helper.

mermaid/src/main/java/io/serverlessworkflow/mermaid/NodeBuilder.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,30 +67,35 @@ public static Node error() {
6767
}
6868

6969
public static TaskNode task(TaskItem task) {
70-
if (task.getTask().get() instanceof TryTask) {
70+
71+
// Sometimes task.getTask().get() is null
72+
73+
if (task.getTask().get() instanceof TryTask || task.getTask().getTryTask() != null) {
7174
return new TryCatchNode(task);
72-
} else if (task.getTask().get() instanceof DoTask) {
75+
} else if (task.getTask().get() instanceof DoTask || task.getTask().getDoTask() != null) {
7376
return new TaskSubgraphNode(task, String.format("do: %s", task.getName()))
7477
.withBranches(task.getTask().getDoTask().getDo());
75-
} else if (task.getTask().get() instanceof SetTask) {
78+
} else if (task.getTask().get() instanceof SetTask || task.getTask().getSetTask() != null) {
7679
return new TaskNode(String.format("set: %s", task.getName()), task, NodeType.RECT);
77-
} else if (task.getTask().get() instanceof ForTask) {
80+
} else if (task.getTask().get() instanceof ForTask || task.getTask().getForTask() != null) {
7881
return new ForNode(task);
79-
} else if (task.getTask().get() instanceof ListenTask) {
82+
} else if (task.getTask().get() instanceof ListenTask
83+
|| task.getTask().getListenTask() != null) {
8084
return new ListenNode(task);
81-
} else if (task.getTask().get() instanceof EmitTask) {
85+
} else if (task.getTask().get() instanceof EmitTask || task.getTask().getEmitTask() != null) {
8286
return new EmitNode(task);
83-
} else if (task.getTask().get() instanceof ForkTask) {
87+
} else if (task.getTask().get() instanceof ForkTask || task.getTask().getForkTask() != null) {
8488
return new ForkNode(task);
85-
} else if (task.getTask().get() instanceof SwitchTask) {
89+
} else if (task.getTask().get() instanceof SwitchTask
90+
|| task.getTask().getSwitchTask() != null) {
8691
return new SwitchNode(task);
87-
} else if (task.getTask().get() instanceof RaiseTask) {
92+
} else if (task.getTask().get() instanceof RaiseTask || task.getTask().getRaiseTask() != null) {
8893
return new RaiseNode(task);
89-
} else if (task.getTask().get() instanceof RunTask) {
94+
} else if (task.getTask().get() instanceof RunTask || task.getTask().getRunTask() != null) {
9095
return new RunNode(task);
91-
} else if (task.getTask().get() instanceof WaitTask) {
96+
} else if (task.getTask().get() instanceof WaitTask || task.getTask().getWaitTask() != null) {
9297
return new WaitNode(task);
93-
} else if (task.getTask().get() instanceof CallTask) {
98+
} else if (task.getTask().get() instanceof CallTask || task.getTask().getCallTask() != null) {
9499
return new CallNode(task);
95100
}
96101

0 commit comments

Comments
 (0)