Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine: Limit payload sizes of events #890

Merged
merged 17 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @openfn/integration-tests-worker

## 1.0.79

### Patch Changes

- Updated dependencies [deb7293]
- Updated dependencies [d50c05d]
- @openfn/[email protected]
- @openfn/[email protected]
- @openfn/[email protected]

## 1.0.78

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.78",
"version": "1.0.79",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
39 changes: 39 additions & 0 deletions integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,45 @@ test.serial('Redact logs which exceed the payload limit', (t) => {
});
});

test.serial("Don't return dataclips which exceed the payload limit", (t) => {
return new Promise(async (done) => {
if (!worker.destroyed) {
await worker.destroy();
}

({ worker } = await initWorker(lightningPort, {
maxWorkers: 1,
// use the dummy repo to remove autoinstall
repoDir: path.resolve('./dummy-repo'),
}));

const run = {
id: crypto.randomUUID(),
jobs: [
{
adaptor: '@openfn/[email protected]',
body: `fn(() => ({ data: 'abdef' }))`,
},
],
options: {
payload_limit_mb: 0,
},
};

lightning.on('step:complete', (evt) => {
t.is(evt.payload.output_dataclip_error, 'DATACLIP_TOO_LARGE');
t.falsy(evt.payload.output_dataclip_id);
t.falsy(evt.payload.output_dataclip);
});

lightning.enqueueRun(run);

lightning.once('run:complete', () => {
done();
});
});
});

test.serial(
"Don't send job logs to stdout when job_log_level is set to none",
(t) => {
Expand Down
10 changes: 10 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# engine-multi

## 1.6.0

### Minor Changes

- d50c05d: Fix an issue where large payloads can cause the worker to OOM crash

### Patch Changes

- deb7293: Don't return the result of a task unless explicitly requested

## 1.5.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.5.1",
"version": "1.6.0",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export default function initWorkers(
const callWorker: CallWorker = (
task,
args = [],
events = [],
events = {},
options = {}
) => {
return workers.exec(task, args, {
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const execute = async (context: ExecutionContext) => {

const workerOptions = {
memoryLimitMb: options.memoryLimitMb,
payloadLimitMb: options.payloadLimitMb,
timeout: options.runTimeoutMs,
};

Expand Down
3 changes: 2 additions & 1 deletion packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ export const jobComplete = (
context: ExecutionContext,
event: internalEvents.JobCompleteEvent
) => {
const { threadId, state, duration, jobId, next, mem } = event;
const { threadId, state, duration, jobId, next, mem, redacted } = event;

context.emit(externalEvents.JOB_COMPLETE, {
threadId,
state,
duration,
jobId,
next,
redacted,
mem,
time: timestamp(),
});
Expand Down
6 changes: 6 additions & 0 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const DEFAULT_RUN_TIMEOUT = 1000 * 60 * 10; // ms

const DEFAULT_MEMORY_LIMIT_MB = 500;

const DEFAULT_PAYLOAD_LIMIT_MB = 10;

// For each workflow, create an API object with its own event emitter
// this is a bit weird - what if the emitter went on state instead?
const createWorkflowEvents = (
Expand Down Expand Up @@ -72,6 +74,7 @@ export type EngineOptions = {
logger: Logger;
maxWorkers?: number;
memoryLimitMb?: number;
payloadLimitMb?: number;
noCompile?: boolean; // TODO deprecate in favour of compile
repoDir: string;
resolvers?: LazyResolvers;
Expand Down Expand Up @@ -100,6 +103,8 @@ const createEngine = async (

const defaultTimeout = options.runTimeoutMs || DEFAULT_RUN_TIMEOUT;
const defaultMemoryLimit = options.memoryLimitMb || DEFAULT_MEMORY_LIMIT_MB;
const defaultPayloadLimit =
options.payloadLimitMb || DEFAULT_PAYLOAD_LIMIT_MB;

let resolvedWorkerPath;
if (workerPath) {
Expand Down Expand Up @@ -173,6 +178,7 @@ const createEngine = async (
resolvers: opts.resolvers,
runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout,
memoryLimitMb: opts.memoryLimitMb ?? defaultMemoryLimit,
payloadLimitMb: opts.payloadLimitMb ?? defaultPayloadLimit,
jobLogLevel: opts.jobLogLevel,
},
});
Expand Down
5 changes: 4 additions & 1 deletion packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export interface JobCompletePayload extends ExternalEvent {
state: any; // the result state
next: string[]; // downstream jobs
time: bigint;
redacted?: boolean;
mem: {
job: number;
system: number;
Expand All @@ -92,7 +93,9 @@ export interface JobErrorPayload extends ExternalEvent {
next: string[]; // downstream jobs
}

export interface WorkerLogPayload extends ExternalEvent, SerializedLogEvent {}
export interface WorkerLogPayload extends ExternalEvent, SerializedLogEvent {
redacted?: boolean;
}

export interface EdgeResolvedPayload extends ExternalEvent {
edgeId: string; // interesting, we don't really have this yet. Is index more appropriate? key? yeah, it's target node basically
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type ExecutionContextConstructor = {
};

export type ExecuteOptions = {
payloadLimitMb?: number;
memoryLimitMb?: number;
resolvers?: LazyResolvers;
runTimeoutMs?: number;
Expand Down
50 changes: 50 additions & 0 deletions packages/engine-multi/src/util/ensure-payload-size.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
export const REDACTED_STATE = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part 1: this file has been moved from the worker to the engine, and lightly modified.

Note that it selectively validates payload objects so that it can generate an appropriate "fix"

data: '[REDACTED_STATE]',
_$REDACTED$_: true,
};

export const REDACTED_LOG = {
message: ['[REDACTED: Message length exceeds payload limit]'],
_$REDACTED$_: true,
};

export const verify = (value: any, limit_mb: number = 10) => {
if (value && !isNaN(limit_mb)) {
let size_mb = 0;
try {
const str = typeof value === 'string' ? value : JSON.stringify(value);
const size_bytes = Buffer.byteLength(str, 'utf8');
size_mb = size_bytes / 1024 / 1024;
} catch (e) {
// do nothing
}

if (size_mb > limit_mb) {
const e = new Error();
// @ts-ignore
e.name = 'PAYLOAD_TOO_LARGE';
e.message = `The payload exceeded the size limit of ${limit_mb}mb`;
throw e;
}
}
};

export default (payload: any, limit_mb: number = 10) => {
const newPayload = { ...payload };

// The payload could be any of the runtime events
// The bits we might want to redact are state and message
try {
verify(payload.state, limit_mb);
} catch (e) {
newPayload.state = REDACTED_STATE;
newPayload.redacted = true;
}
try {
verify(payload.log, limit_mb);
} catch (e) {
Object.assign(newPayload.log, REDACTED_LOG);
newPayload.redacted = true;
}
return newPayload;
};
4 changes: 3 additions & 1 deletion packages/engine-multi/src/worker/child/create-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import { ENGINE_RUN_TASK } from '../events';

const scriptPath = process.argv[2];

type ThreadOptions = {
export type ThreadOptions = {
memoryLimitMb?: number;
payloadLimitMb?: number;
};

const createThread = (
Expand All @@ -24,6 +25,7 @@ const createThread = (
type: ENGINE_RUN_TASK,
task,
args,
options,
});

return worker;
Expand Down
8 changes: 6 additions & 2 deletions packages/engine-multi/src/worker/child/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
ENGINE_RESOLVE_TASK,
ENGINE_RUN_TASK,
} from '../events';
import createThread from './create-thread';
import createThread, { ThreadOptions } from './create-thread';
import serializeError from '../../util/serialize-error';

process.on('message', async (evt: WorkerEvent) => {
Expand All @@ -17,7 +17,11 @@ process.on('message', async (evt: WorkerEvent) => {
}
});

const run = async (task: string, args: any[] = [], options = {}) => {
const run = async (
task: string,
args: any[] = [],
options: ThreadOptions = {}
) => {
const thread = createThread(task, args, options);

thread.on('error', (e) => {
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/worker/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export interface JobStartEvent extends InternalEvent {
export interface JobCompleteEvent extends InternalEvent {
jobId: string;
state: any;
redacted?: boolean;
duration: number;
next: string[];
mem: {
Expand Down
4 changes: 4 additions & 0 deletions packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export type ExecOpts = {
timeout?: number; // ms

memoryLimitMb?: number;
payloadLimitMb?: number;
};

export type ChildProcessPool = Array<ChildProcess | false>;
Expand Down Expand Up @@ -210,6 +211,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
args,
options: {
memoryLimitMb: opts.memoryLimitMb,
payloadLimitMb: opts.payloadLimitMb,
},
} as RunTaskEvent);
} catch (e) {
Expand All @@ -220,6 +222,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
worker.on('exit', onExit);

worker.on('message', (evt: any) => {
// TODO I think here we may have to decode the payload

// forward the message out of the pool
opts.on?.(evt);

Expand Down
21 changes: 18 additions & 3 deletions packages/engine-multi/src/worker/thread/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,28 @@ export const createLoggers = (
return { logger, jobLogger, adaptorLogger };
};

type Options = {
/**
* Should we return results directly?
* Useful for tests but dangerous in production
* as can cause OOM errors for large results
* */
directReturn?: boolean;

/**
* Allow a custom publish function to be passed in
*/
publish?: typeof publish;
};

// Execute wrapper function
export const execute = async (
workflowId: string,
executeFn: () => Promise<any> | undefined,
publishFn = publish
options: Options = {}
) => {
const publishFn = options.publish ?? publish;

const handleError = (err: any) => {
publishFn(workerEvents.ERROR, {
// @ts-ignore
Expand Down Expand Up @@ -127,8 +143,7 @@ export const execute = async (
const result = await executeFn();
publishFn(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result });

// For tests
return result;
return options.directReturn ? result : {};
} catch (err: any) {
handleError(err);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/worker/thread/mock-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ function mockRun(plan: MockExecutionPlan, input: State, _options = {}) {

register({
run: async (plan: MockExecutionPlan, input: State, _options?: any) =>
execute(plan.id, () => mockRun(plan, input)),
execute(plan.id, () => mockRun(plan, input), { directReturn: true }),
});
2 changes: 1 addition & 1 deletion packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ register({
console = adaptorLogger;

// Leave console.debug for local debugging
// This goes to stdout but not the adpator logger
// This goes to stdout but not the adapator logger
console.debug = debug;

// TODO I would like to pull these options out of here
Expand Down
Loading