Skip to content

Add coalesce example #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Or have a look at the general catalog below:
| <a id="database-interaction">Database Interaction Patterns</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#database-interaction-patterns) |
| <a id="sync-to-async">Convert Sync Tasks to Async</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [<img src="https://skillicons.dev/icons?i=go" width="24" height="24">](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [<img src="https://skillicons.dev/icons?i=python&theme=light" width="24" height="24">](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [<img src="https://skillicons.dev/icons?i=java&theme=light" width="24" height="24">](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) |
| <a id="batching">Batching</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#batching) |
| <a id="coalesce-workflows">Coalesce workflows</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#coalesce-workflows) |
| <a id="payment-signals">Payments Signals \(Advanced\)</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#payment-signals) [<img src="https://skillicons.dev/icons?i=python&theme=light" width="24" height="24">](python/patterns-use-cases/README.md#payment-signals) [<img src="https://skillicons.dev/icons?i=java&theme=light" width="24" height="24">](java/patterns-use-cases/README.md#payment-signals) |
| <a id="sagas">Sagas</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#sagas) [<img src="https://skillicons.dev/icons?i=go" width="24" height="24">](go/patterns-use-cases/README.md#sagas) [<img src="https://skillicons.dev/icons?i=python&theme=light" width="24" height="24">](python/patterns-use-cases/README.md#sagas) [<img src="https://skillicons.dev/icons?i=java&theme=light" width="24" height="24">](java/patterns-use-cases/README.md#sagas) [<img src="https://skillicons.dev/icons?i=kotlin&theme=light" width="24" height="24">](kotlin/patterns-use-cases/README.md#sagas) |
| <a id="stateful-actors">Stateful Actors and State Machines</a> | [<img src="https://skillicons.dev/icons?i=ts" width="24" height="24">](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [<img src="https://skillicons.dev/icons?i=go" width="24" height="24">](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [<img src="https://skillicons.dev/icons?i=python&theme=light" width="24" height="24">](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [<img src="https://skillicons.dev/icons?i=java&theme=light" width="24" height="24">](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) |
Expand Down
1 change: 1 addition & 0 deletions typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Common tasks and patterns implemented with Restate:
- **[Database Interaction Patterns](patterns-use-cases/README.md#database-interaction-patterns)**: Recommended approaches for reading from and writing to databases using Restate handlers. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/database/main.ts)
- **[Convert Sync Tasks to Async](patterns-use-cases/README.md#convert-sync-tasks-to-async)**: Kick off a synchronous task (e.g. data upload) and turn it into an asynchronous one if it takes too long. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/syncasync/client.ts)
- **[Batching](patterns-use-cases/README.md#batching)**: Group RPCs into batches of a particular size, subject to a max wait time [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/batching/batcher.ts)
- **[Coalesce workflows](patterns-use-cases/README.md#coalesce-workflows)**: Combine incoming workflow submissions into a single in-flight workflow at a time. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/coalesce/coalesce.ts)
- **[Payments Signals (Advanced)](patterns-use-cases/README.md#payment-signals)**: Combining fast synchronous responses and slow async callbacks for payments, with Stripe. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](patterns-use-cases/src/signalspayments/payment_service.ts)

#### Orchestration patterns
Expand Down
33 changes: 33 additions & 0 deletions typescript/patterns-use-cases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Common tasks and patterns implemented with Restate:
- **[Database Interaction Patterns](README.md#database-interaction-patterns)**: Recommended approaches for reading from and writing to databases using Restate handlers. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/database/main.ts)
- **[Convert Sync Tasks to Async](README.md#convert-sync-tasks-to-async)**: Kick off a synchronous task (e.g. data upload) and turn it into an asynchronous one if it takes too long. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/syncasync/client.ts)
- **[Batching](README.md#batching)**: Group RPCs into batches of a particular size, subject to a max wait time [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/batching/batcher.ts)
- **[Coalesce workflows](README.md#coalesce-workflows)**: Combine incoming workflow submissions into a single in-flight workflow at a time. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/coalesce/coalesce.ts)
- **[Payments signals (Advanced)](README.md#payment-signals)**: Combining fast synchronous responses and slow async callbacks for payments, with Stripe. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/signalspayments/payment_service.ts)

#### Orchestration patterns
Expand Down Expand Up @@ -233,6 +234,38 @@ Have a look at the service logs to see how your messages are grouped together in

</details>

## Coalesce Workflows
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](src/coalesce/coalesce.ts)

This example shows how incoming requests can be deduplicated such that all concurrent clients see the same result.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be improved to explain how the behavior here is different from just using an idempotency key?

It relies on the ability to attach to a Restate workflow execution and retrieve its result. However, the same
technique can be used for other types of Restate services if you provide an idempotency key when calling them, which
allows the resulting invocation ID to be attached to.

A client submits work to the `submit` handler of the `coalesce` object, which then checks to see if there is a workflow
already in flight. If there is, it attaches to the existing run so the client will receive that result. Otherwise, a new
workflow run is kicked off and attached to.

Under the hood, the `coalesce` object uses a separate service, `supervisor`, to send a callback when an invocation finishes. This is optional
but is a useful pattern.

<details>
<summary><strong>Running the example</strong></summary>

1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server`
2. Start the service: `npx tsx watch ./src/coalesce/coalesce.ts`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

Do some work concurrently:
```shell
# the first group of requests will all see the same result
curl --parallel --parallel-immediate localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit
# once that is done, new requests will see a new result
curl --parallel --parallel-immediate localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit
```

</details>

## Payment Signals
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](src/signalspayments/payment_service.ts)

Expand Down
140 changes: 140 additions & 0 deletions typescript/patterns-use-cases/src/coalesce/coalesce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import * as restate from "@restatedev/restate-sdk";

interface TargetWorkflowInput {}
interface TargetWorkflowOutput {
key: string;
}

const targetWorkflow = restate.workflow({
name: "targetWorkflow",
handlers: {
run: async (
ctx: restate.WorkflowContext,
input: TargetWorkflowInput,
): Promise<TargetWorkflowOutput> => {
// simulate some work happening
await ctx.sleep(5_000);

return { key: ctx.key };
},
},
});
export type TargetWorkflow = typeof targetWorkflow;
const TargetWorkflow: TargetWorkflow = { name: "targetWorkflow" };

interface Target {
service: string;
handler: string;
key?: string;
}

// supervisor watches an invocation (must be from a workflow or with an idempotency key) for its completion and then calls back
const supervisor = restate.service({
name: "supervisor",
handlers: {
watch: async (
ctx: restate.Context,
input: { invocationId: restate.InvocationId; target: Target },
): Promise<void> => {
const done = () =>
ctx.genericSend({
service: input.target.service,
method: input.target.handler,
key: input.target.key,
parameter: input.invocationId,
inputSerde: restate.serde.json,
});

try {
await ctx.attach(input.invocationId, restate.serde.binary);
} catch (e) {
if (e instanceof restate.TerminalError) {
done();
}
throw e;
}
done();
},
},
});

type Supervisor = typeof supervisor;
const Supervisor: Supervisor = { name: "supervisor" };

interface CoalesceState {
invocationId: restate.InvocationId;
count: number;
}

const coalesce = restate.object({
name: "coalesce",
handlers: {
submit: restate.handlers.object.shared(
async (
ctx: restate.ObjectSharedContext<CoalesceState>,
input: TargetWorkflowInput,
): Promise<TargetWorkflowOutput> => {
let invocationId = await ctx.get("invocationId");
if (invocationId == null) {
// there might be no in flight invocation, run the slow path to be sure
invocationId = await ctx
.objectClient(Coalesce, ctx.key)
.submitLocked(input);
}

return ctx.attach(invocationId);
},
),
submitLocked: async (
ctx: restate.ObjectContext<CoalesceState>,
input: TargetWorkflowInput,
): Promise<restate.InvocationId> => {
let invocationId = await ctx.get("invocationId");
const count = (await ctx.get("count")) ?? 0;

if (invocationId !== null) {
// we lost the race
return invocationId;
}

// there is no in flight invocation, create one
const workflowID = `${ctx.key}-${count}`;
const handle = ctx
.workflowSendClient(TargetWorkflow, workflowID)
.run(input);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for putting together this example. I guess here we could also do genericSend and pass the target workflow in as a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

certainly yes!

invocationId = await handle.invocationId;

ctx.serviceSendClient(Supervisor).watch({
invocationId,
target: {
service: "coalesce",
handler: "onCompletion",
key: ctx.key,
},
});

ctx.set("invocationId", invocationId);
ctx.set("count", count + 1);

return invocationId;
},
onCompletion: async (
ctx: restate.ObjectContext<CoalesceState>,
invocationId: restate.InvocationId,
) => {
if ((await ctx.get("invocationId")) == invocationId) {
ctx.clear("invocationId");
}
},
},
});

type Coalesce = typeof coalesce;
const Coalesce: Coalesce = { name: "coalesce" };

restate
.endpoint()
.bind(targetWorkflow)
.bind(supervisor)
.bind(coalesce)
.listen();
Loading