diff --git a/README.md b/README.md index f255ea7a..f1c36923 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ Or have a look at the general catalog below: | Database Interaction Patterns | [](typescript/patterns-use-cases/README.md#database-interaction-patterns) | | Convert Sync Tasks to Async | [](typescript/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](go/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](python/patterns-use-cases/README.md#convert-sync-tasks-to-async) [](java/patterns-use-cases/README.md#convert-sync-tasks-to-async) | | Batching | [](typescript/patterns-use-cases/README.md#batching) | +| Coalesce workflows | [](typescript/patterns-use-cases/README.md#coalesce-workflows) | | Payments Signals \(Advanced\) | [](typescript/patterns-use-cases/README.md#payment-signals) [](python/patterns-use-cases/README.md#payment-signals) [](java/patterns-use-cases/README.md#payment-signals) | | Sagas | [](typescript/patterns-use-cases/README.md#sagas) [](go/patterns-use-cases/README.md#sagas) [](python/patterns-use-cases/README.md#sagas) [](java/patterns-use-cases/README.md#sagas) [](kotlin/patterns-use-cases/README.md#sagas) | | Stateful Actors and State Machines | [](typescript/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](go/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](python/patterns-use-cases/README.md#stateful-actors-and-state-machines) [](java/patterns-use-cases/README.md#stateful-actors-and-state-machines) | diff --git a/typescript/README.md b/typescript/README.md index b84d47aa..bd9e3d52 100644 --- a/typescript/README.md +++ b/typescript/README.md @@ -24,6 +24,7 @@ Common tasks and patterns implemented with Restate: - **[Database Interaction Patterns](patterns-use-cases/README.md#database-interaction-patterns)**: Recommended approaches for reading from and writing to databases using Restate handlers. [](patterns-use-cases/src/database/main.ts) - **[Convert Sync Tasks to Async](patterns-use-cases/README.md#convert-sync-tasks-to-async)**: Kick off a synchronous task (e.g. data upload) and turn it into an asynchronous one if it takes too long. [](patterns-use-cases/src/syncasync/client.ts) - **[Batching](patterns-use-cases/README.md#batching)**: Group RPCs into batches of a particular size, subject to a max wait time [](patterns-use-cases/src/batching/batcher.ts) +- **[Coalesce workflows](patterns-use-cases/README.md#coalesce-workflows)**: Combine incoming workflow submissions into a single in-flight workflow at a time. [](patterns-use-cases/src/coalesce/coalesce.ts) - **[Payments Signals (Advanced)](patterns-use-cases/README.md#payment-signals)**: Combining fast synchronous responses and slow async callbacks for payments, with Stripe. [](patterns-use-cases/src/signalspayments/payment_service.ts) #### Orchestration patterns diff --git a/typescript/patterns-use-cases/README.md b/typescript/patterns-use-cases/README.md index 3c0eb263..0790da02 100644 --- a/typescript/patterns-use-cases/README.md +++ b/typescript/patterns-use-cases/README.md @@ -9,6 +9,7 @@ Common tasks and patterns implemented with Restate: - **[Database Interaction Patterns](README.md#database-interaction-patterns)**: Recommended approaches for reading from and writing to databases using Restate handlers. [](src/database/main.ts) - **[Convert Sync Tasks to Async](README.md#convert-sync-tasks-to-async)**: Kick off a synchronous task (e.g. data upload) and turn it into an asynchronous one if it takes too long. [](src/syncasync/client.ts) - **[Batching](README.md#batching)**: Group RPCs into batches of a particular size, subject to a max wait time [](src/batching/batcher.ts) +- **[Coalesce workflows](README.md#coalesce-workflows)**: Combine incoming workflow submissions into a single in-flight workflow at a time. [](src/coalesce/coalesce.ts) - **[Payments signals (Advanced)](README.md#payment-signals)**: Combining fast synchronous responses and slow async callbacks for payments, with Stripe. [](src/signalspayments/payment_service.ts) #### Orchestration patterns @@ -233,6 +234,38 @@ Have a look at the service logs to see how your messages are grouped together in +## Coalesce Workflows +[](src/coalesce/coalesce.ts) + +This example shows how incoming requests can be deduplicated such that all concurrent clients see the same result. +It relies on the ability to attach to a Restate workflow execution and retrieve its result. However, the same +technique can be used for other types of Restate services if you provide an idempotency key when calling them, which +allows the resulting invocation ID to be attached to. + +A client submits work to the `submit` handler of the `coalesce` object, which then checks to see if there is a workflow +already in flight. If there is, it attaches to the existing run so the client will receive that result. Otherwise, a new +workflow run is kicked off and attached to. + +Under the hood, the `coalesce` object uses a separate service, `supervisor`, to send a callback when an invocation finishes. This is optional +but is a useful pattern. + +
+Running the example + +1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server` +2. Start the service: `npx tsx watch ./src/coalesce/coalesce.ts` +3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` + +Do some work concurrently: +```shell +# the first group of requests will all see the same result +curl --parallel --parallel-immediate localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit +# once that is done, new requests will see a new result +curl --parallel --parallel-immediate localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit localhost:8080/coalesce/my-key/submit +``` + +
+ ## Payment Signals [](src/signalspayments/payment_service.ts) diff --git a/typescript/patterns-use-cases/src/coalesce/coalesce.ts b/typescript/patterns-use-cases/src/coalesce/coalesce.ts new file mode 100644 index 00000000..a699c9b2 --- /dev/null +++ b/typescript/patterns-use-cases/src/coalesce/coalesce.ts @@ -0,0 +1,140 @@ +import * as restate from "@restatedev/restate-sdk"; + +interface TargetWorkflowInput {} +interface TargetWorkflowOutput { + key: string; +} + +const targetWorkflow = restate.workflow({ + name: "targetWorkflow", + handlers: { + run: async ( + ctx: restate.WorkflowContext, + input: TargetWorkflowInput, + ): Promise => { + // simulate some work happening + await ctx.sleep(5_000); + + return { key: ctx.key }; + }, + }, +}); +export type TargetWorkflow = typeof targetWorkflow; +const TargetWorkflow: TargetWorkflow = { name: "targetWorkflow" }; + +interface Target { + service: string; + handler: string; + key?: string; +} + +// supervisor watches an invocation (must be from a workflow or with an idempotency key) for its completion and then calls back +const supervisor = restate.service({ + name: "supervisor", + handlers: { + watch: async ( + ctx: restate.Context, + input: { invocationId: restate.InvocationId; target: Target }, + ): Promise => { + const done = () => + ctx.genericSend({ + service: input.target.service, + method: input.target.handler, + key: input.target.key, + parameter: input.invocationId, + inputSerde: restate.serde.json, + }); + + try { + await ctx.attach(input.invocationId, restate.serde.binary); + } catch (e) { + if (e instanceof restate.TerminalError) { + done(); + } + throw e; + } + done(); + }, + }, +}); + +type Supervisor = typeof supervisor; +const Supervisor: Supervisor = { name: "supervisor" }; + +interface CoalesceState { + invocationId: restate.InvocationId; + count: number; +} + +const coalesce = restate.object({ + name: "coalesce", + handlers: { + submit: restate.handlers.object.shared( + async ( + ctx: restate.ObjectSharedContext, + input: TargetWorkflowInput, + ): Promise => { + let invocationId = await ctx.get("invocationId"); + if (invocationId == null) { + // there might be no in flight invocation, run the slow path to be sure + invocationId = await ctx + .objectClient(Coalesce, ctx.key) + .submitLocked(input); + } + + return ctx.attach(invocationId); + }, + ), + submitLocked: async ( + ctx: restate.ObjectContext, + input: TargetWorkflowInput, + ): Promise => { + let invocationId = await ctx.get("invocationId"); + const count = (await ctx.get("count")) ?? 0; + + if (invocationId !== null) { + // we lost the race + return invocationId; + } + + // there is no in flight invocation, create one + const workflowID = `${ctx.key}-${count}`; + const handle = ctx + .workflowSendClient(TargetWorkflow, workflowID) + .run(input); + invocationId = await handle.invocationId; + + ctx.serviceSendClient(Supervisor).watch({ + invocationId, + target: { + service: "coalesce", + handler: "onCompletion", + key: ctx.key, + }, + }); + + ctx.set("invocationId", invocationId); + ctx.set("count", count + 1); + + return invocationId; + }, + onCompletion: async ( + ctx: restate.ObjectContext, + invocationId: restate.InvocationId, + ) => { + if ((await ctx.get("invocationId")) == invocationId) { + ctx.clear("invocationId"); + } + }, + }, +}); + +type Coalesce = typeof coalesce; +const Coalesce: Coalesce = { name: "coalesce" }; + +restate + .endpoint() + .bind(targetWorkflow) + .bind(supervisor) + .bind(coalesce) + .listen();