diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 105963bf8..d13d9561a 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,15 @@ # @openfn/integration-tests-worker +## 1.0.79 + +### Patch Changes + +- Updated dependencies [deb7293] +- Updated dependencies [d50c05d] + - @openfn/engine-multi@1.6.0 + - @openfn/ws-worker@1.12.0 + - @openfn/lightning-mock@2.1.2 + ## 1.0.78 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 53d7b548c..e5ecb7a2d 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.78", + "version": "1.0.79", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index b0e37f682..f37e49bb6 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -974,6 +974,45 @@ test.serial('Redact logs which exceed the payload limit', (t) => { }); }); +test.serial("Don't return dataclips which exceed the payload limit", (t) => { + return new Promise(async (done) => { + if (!worker.destroyed) { + await worker.destroy(); + } + + ({ worker } = await initWorker(lightningPort, { + maxWorkers: 1, + // use the dummy repo to remove autoinstall + repoDir: path.resolve('./dummy-repo'), + })); + + const run = { + id: crypto.randomUUID(), + jobs: [ + { + adaptor: '@openfn/test-adaptor@1.0.0', + body: `fn(() => ({ data: 'abdef' }))`, + }, + ], + options: { + payload_limit_mb: 0, + }, + }; + + lightning.on('step:complete', (evt) => { + t.is(evt.payload.output_dataclip_error, 'DATACLIP_TOO_LARGE'); + t.falsy(evt.payload.output_dataclip_id); + t.falsy(evt.payload.output_dataclip); + }); + + lightning.enqueueRun(run); + + lightning.once('run:complete', () => { + done(); + }); + }); +}); + test.serial( "Don't send job logs to stdout when job_log_level is set to none", (t) => { diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index ce1d181b6..e1cfa2b30 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,15 @@ # engine-multi +## 1.6.0 + +### Minor Changes + +- d50c05d: Fix an issue where large payloads can cause the worker to OOM crash + +### Patch Changes + +- deb7293: Don't return the result of a task unless explicitly requested + ## 1.5.1 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index 8ea19bdb1..2ba2dfcfe 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.5.1", + "version": "1.6.0", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index 4215091c9..18fddecdb 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -37,7 +37,7 @@ export default function initWorkers( const callWorker: CallWorker = ( task, args = [], - events = [], + events = {}, options = {} ) => { return workers.exec(task, args, { diff --git a/packages/engine-multi/src/api/execute.ts b/packages/engine-multi/src/api/execute.ts index dff4928a3..c54447742 100644 --- a/packages/engine-multi/src/api/execute.ts +++ b/packages/engine-multi/src/api/execute.ts @@ -55,6 +55,7 @@ const execute = async (context: ExecutionContext) => { const workerOptions = { memoryLimitMb: options.memoryLimitMb, + payloadLimitMb: options.payloadLimitMb, timeout: options.runTimeoutMs, }; diff --git a/packages/engine-multi/src/api/lifecycle.ts b/packages/engine-multi/src/api/lifecycle.ts index 715bba775..388a99c5e 100644 --- a/packages/engine-multi/src/api/lifecycle.ts +++ b/packages/engine-multi/src/api/lifecycle.ts @@ -93,7 +93,7 @@ export const jobComplete = ( context: ExecutionContext, event: internalEvents.JobCompleteEvent ) => { - const { threadId, state, duration, jobId, next, mem } = event; + const { threadId, state, duration, jobId, next, mem, redacted } = event; context.emit(externalEvents.JOB_COMPLETE, { threadId, @@ -101,6 +101,7 @@ export const jobComplete = ( duration, jobId, next, + redacted, mem, time: timestamp(), }); diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index bbb595166..05eae4bc7 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -31,6 +31,8 @@ const DEFAULT_RUN_TIMEOUT = 1000 * 60 * 10; // ms const DEFAULT_MEMORY_LIMIT_MB = 500; +const DEFAULT_PAYLOAD_LIMIT_MB = 10; + // For each workflow, create an API object with its own event emitter // this is a bit weird - what if the emitter went on state instead? const createWorkflowEvents = ( @@ -72,6 +74,7 @@ export type EngineOptions = { logger: Logger; maxWorkers?: number; memoryLimitMb?: number; + payloadLimitMb?: number; noCompile?: boolean; // TODO deprecate in favour of compile repoDir: string; resolvers?: LazyResolvers; @@ -100,6 +103,8 @@ const createEngine = async ( const defaultTimeout = options.runTimeoutMs || DEFAULT_RUN_TIMEOUT; const defaultMemoryLimit = options.memoryLimitMb || DEFAULT_MEMORY_LIMIT_MB; + const defaultPayloadLimit = + options.payloadLimitMb || DEFAULT_PAYLOAD_LIMIT_MB; let resolvedWorkerPath; if (workerPath) { @@ -173,6 +178,7 @@ const createEngine = async ( resolvers: opts.resolvers, runTimeoutMs: opts.runTimeoutMs ?? defaultTimeout, memoryLimitMb: opts.memoryLimitMb ?? defaultMemoryLimit, + payloadLimitMb: opts.payloadLimitMb ?? defaultPayloadLimit, jobLogLevel: opts.jobLogLevel, }, }); diff --git a/packages/engine-multi/src/events.ts b/packages/engine-multi/src/events.ts index 24bbac874..f24301a07 100644 --- a/packages/engine-multi/src/events.ts +++ b/packages/engine-multi/src/events.ts @@ -77,6 +77,7 @@ export interface JobCompletePayload extends ExternalEvent { state: any; // the result state next: string[]; // downstream jobs time: bigint; + redacted?: boolean; mem: { job: number; system: number; @@ -92,7 +93,9 @@ export interface JobErrorPayload extends ExternalEvent { next: string[]; // downstream jobs } -export interface WorkerLogPayload extends ExternalEvent, SerializedLogEvent {} +export interface WorkerLogPayload extends ExternalEvent, SerializedLogEvent { + redacted?: boolean; +} export interface EdgeResolvedPayload extends ExternalEvent { edgeId: string; // interesting, we don't really have this yet. Is index more appropriate? key? yeah, it's target node basically diff --git a/packages/engine-multi/src/types.ts b/packages/engine-multi/src/types.ts index f1fb40e43..92044b052 100644 --- a/packages/engine-multi/src/types.ts +++ b/packages/engine-multi/src/types.ts @@ -45,6 +45,7 @@ export type ExecutionContextConstructor = { }; export type ExecuteOptions = { + payloadLimitMb?: number; memoryLimitMb?: number; resolvers?: LazyResolvers; runTimeoutMs?: number; diff --git a/packages/engine-multi/src/util/ensure-payload-size.ts b/packages/engine-multi/src/util/ensure-payload-size.ts new file mode 100644 index 000000000..5234a099a --- /dev/null +++ b/packages/engine-multi/src/util/ensure-payload-size.ts @@ -0,0 +1,50 @@ +export const REDACTED_STATE = { + data: '[REDACTED_STATE]', + _$REDACTED$_: true, +}; + +export const REDACTED_LOG = { + message: ['[REDACTED: Message length exceeds payload limit]'], + _$REDACTED$_: true, +}; + +export const verify = (value: any, limit_mb: number = 10) => { + if (value && !isNaN(limit_mb)) { + let size_mb = 0; + try { + const str = typeof value === 'string' ? value : JSON.stringify(value); + const size_bytes = Buffer.byteLength(str, 'utf8'); + size_mb = size_bytes / 1024 / 1024; + } catch (e) { + // do nothing + } + + if (size_mb > limit_mb) { + const e = new Error(); + // @ts-ignore + e.name = 'PAYLOAD_TOO_LARGE'; + e.message = `The payload exceeded the size limit of ${limit_mb}mb`; + throw e; + } + } +}; + +export default (payload: any, limit_mb: number = 10) => { + const newPayload = { ...payload }; + + // The payload could be any of the runtime events + // The bits we might want to redact are state and message + try { + verify(payload.state, limit_mb); + } catch (e) { + newPayload.state = REDACTED_STATE; + newPayload.redacted = true; + } + try { + verify(payload.log, limit_mb); + } catch (e) { + Object.assign(newPayload.log, REDACTED_LOG); + newPayload.redacted = true; + } + return newPayload; +}; diff --git a/packages/engine-multi/src/worker/child/create-thread.ts b/packages/engine-multi/src/worker/child/create-thread.ts index 29dd8be4d..bca375f2f 100644 --- a/packages/engine-multi/src/worker/child/create-thread.ts +++ b/packages/engine-multi/src/worker/child/create-thread.ts @@ -5,8 +5,9 @@ import { ENGINE_RUN_TASK } from '../events'; const scriptPath = process.argv[2]; -type ThreadOptions = { +export type ThreadOptions = { memoryLimitMb?: number; + payloadLimitMb?: number; }; const createThread = ( @@ -24,6 +25,7 @@ const createThread = ( type: ENGINE_RUN_TASK, task, args, + options, }); return worker; diff --git a/packages/engine-multi/src/worker/child/runner.ts b/packages/engine-multi/src/worker/child/runner.ts index 1a9f6ba25..a33323773 100644 --- a/packages/engine-multi/src/worker/child/runner.ts +++ b/packages/engine-multi/src/worker/child/runner.ts @@ -7,7 +7,7 @@ import { ENGINE_RESOLVE_TASK, ENGINE_RUN_TASK, } from '../events'; -import createThread from './create-thread'; +import createThread, { ThreadOptions } from './create-thread'; import serializeError from '../../util/serialize-error'; process.on('message', async (evt: WorkerEvent) => { @@ -17,7 +17,11 @@ process.on('message', async (evt: WorkerEvent) => { } }); -const run = async (task: string, args: any[] = [], options = {}) => { +const run = async ( + task: string, + args: any[] = [], + options: ThreadOptions = {} +) => { const thread = createThread(task, args, options); thread.on('error', (e) => { diff --git a/packages/engine-multi/src/worker/events.ts b/packages/engine-multi/src/worker/events.ts index eabd8876a..c2cfc13da 100644 --- a/packages/engine-multi/src/worker/events.ts +++ b/packages/engine-multi/src/worker/events.ts @@ -49,6 +49,7 @@ export interface JobStartEvent extends InternalEvent { export interface JobCompleteEvent extends InternalEvent { jobId: string; state: any; + redacted?: boolean; duration: number; next: string[]; mem: { diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 3121fa9f3..2d282dd4b 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -34,6 +34,7 @@ export type ExecOpts = { timeout?: number; // ms memoryLimitMb?: number; + payloadLimitMb?: number; }; export type ChildProcessPool = Array; @@ -210,6 +211,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { args, options: { memoryLimitMb: opts.memoryLimitMb, + payloadLimitMb: opts.payloadLimitMb, }, } as RunTaskEvent); } catch (e) { @@ -220,6 +222,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { worker.on('exit', onExit); worker.on('message', (evt: any) => { + // TODO I think here we may have to decode the payload + // forward the message out of the pool opts.on?.(evt); diff --git a/packages/engine-multi/src/worker/thread/helpers.ts b/packages/engine-multi/src/worker/thread/helpers.ts index f18ec8af3..fbe980560 100644 --- a/packages/engine-multi/src/worker/thread/helpers.ts +++ b/packages/engine-multi/src/worker/thread/helpers.ts @@ -63,12 +63,28 @@ export const createLoggers = ( return { logger, jobLogger, adaptorLogger }; }; +type Options = { + /** + * Should we return results directly? + * Useful for tests but dangerous in production + * as can cause OOM errors for large results + * */ + directReturn?: boolean; + + /** + * Allow a custom publish function to be passed in + */ + publish?: typeof publish; +}; + // Execute wrapper function export const execute = async ( workflowId: string, executeFn: () => Promise | undefined, - publishFn = publish + options: Options = {} ) => { + const publishFn = options.publish ?? publish; + const handleError = (err: any) => { publishFn(workerEvents.ERROR, { // @ts-ignore @@ -127,8 +143,7 @@ export const execute = async ( const result = await executeFn(); publishFn(workerEvents.WORKFLOW_COMPLETE, { workflowId, state: result }); - // For tests - return result; + return options.directReturn ? result : {}; } catch (err: any) { handleError(err); } diff --git a/packages/engine-multi/src/worker/thread/mock-run.ts b/packages/engine-multi/src/worker/thread/mock-run.ts index 194ee5478..52b4bf2ba 100644 --- a/packages/engine-multi/src/worker/thread/mock-run.ts +++ b/packages/engine-multi/src/worker/thread/mock-run.ts @@ -87,5 +87,5 @@ function mockRun(plan: MockExecutionPlan, input: State, _options = {}) { register({ run: async (plan: MockExecutionPlan, input: State, _options?: any) => - execute(plan.id, () => mockRun(plan, input)), + execute(plan.id, () => mockRun(plan, input), { directReturn: true }), }); diff --git a/packages/engine-multi/src/worker/thread/run.ts b/packages/engine-multi/src/worker/thread/run.ts index 4ce11eef0..bb63910e9 100644 --- a/packages/engine-multi/src/worker/thread/run.ts +++ b/packages/engine-multi/src/worker/thread/run.ts @@ -43,7 +43,7 @@ register({ console = adaptorLogger; // Leave console.debug for local debugging - // This goes to stdout but not the adpator logger + // This goes to stdout but not the adapator logger console.debug = debug; // TODO I would like to pull these options out of here diff --git a/packages/engine-multi/src/worker/thread/runtime.ts b/packages/engine-multi/src/worker/thread/runtime.ts index 4f772923a..4b2ac6fac 100644 --- a/packages/engine-multi/src/worker/thread/runtime.ts +++ b/packages/engine-multi/src/worker/thread/runtime.ts @@ -5,6 +5,12 @@ import { ENGINE_RESOLVE_TASK, ENGINE_RUN_TASK, } from '../events'; +import ensurePayloadSize from '../../util/ensure-payload-size'; + +// This constrains the size of any payloads coming out of the worker thread +// Payloads exceeding this will be redacted +// (in practice I think this is specifically the state and message keys) +let payloadLimitMb: number | undefined; type TaskRegistry = Record Promise>; @@ -30,19 +36,28 @@ type Event = { [key: string]: any; }; +type Options = { + memoryLimitMb?: number; + payloadLimitMb?: number; +}; + export const publish = ( type: string, payload: Omit ) => { + // Validate the size of every outgoing message + // Redact any payloads that are too large + const safePayload = ensurePayloadSize(payload, payloadLimitMb); parentPort!.postMessage({ type, threadId, processId, - ...payload, + ...safePayload, }); }; -const run = (task: string, args: any[]) => { +const run = (task: string, args: any[], options: Options = {}) => { + payloadLimitMb = options.payloadLimitMb; tasks[task](...args) .then((result) => { publish(ENGINE_RESOLVE_TASK, { @@ -57,12 +72,15 @@ const run = (task: string, args: any[]) => { type: e.type || e.name, }, }); + }) + .finally(() => { + payloadLimitMb = undefined; }); }; parentPort!.on('message', async (evt) => { if (evt.type === ENGINE_RUN_TASK) { const args = evt.args || []; - run(evt.task, args); + run(evt.task, args, evt.options); } }); diff --git a/packages/engine-multi/test/integration.test.ts b/packages/engine-multi/test/integration.test.ts index a148942d2..74bf1ede7 100644 --- a/packages/engine-multi/test/integration.test.ts +++ b/packages/engine-multi/test/integration.test.ts @@ -5,8 +5,9 @@ import type { ExecutionPlan } from '@openfn/lexicon'; import createAPI from '../src/api'; import type { RuntimeEngine } from '../src'; +import { REDACTED_STATE, REDACTED_LOG } from '../src/util/ensure-payload-size'; -const logger = createMockLogger(); +const logger = createMockLogger(undefined, { level: 'debug' }); let api: RuntimeEngine; const emptyState = {}; @@ -262,6 +263,7 @@ test.serial('run without error if no state is returned', (t) => { api.execute(plan, emptyState).on('workflow-complete', ({ state }) => { t.falsy(state); + t.log(logger._history); // Ensure there are no error logs const err = logger._find('error', /./); @@ -480,3 +482,67 @@ test.serial('accept initial state', (t) => { test.todo('should report an error'); test.todo('various workflow options (start, initial state)'); + +test.serial('redact final state if it exceeds the payload limit', (t) => { + return new Promise(async (done) => { + api = await createAPI({ + logger, + }); + + const expression = ` +export default [(state) => { + state.data = new Array(1024 * 1024).fill('a') + return state; +}]`; + + const plan = createPlan([ + { + expression, + }, + ]); + const options = { + payloadLimitMb: 0.5, + }; + + api + .execute(plan, emptyState, options) + .on('workflow-complete', ({ state }) => { + t.log(state); + t.deepEqual(REDACTED_STATE, state); + done(); + }); + }); +}); + +test.serial('redact log line state if it exceeds the payload limit', (t) => { + return new Promise(async (done) => { + api = await createAPI({ + logger, + }); + + const expression = ` +export default [(state) => { + console.log(new Array(1024 * 1024).fill('a')); + return state; +}]`; + + const plan = createPlan([ + { + expression, + }, + ]); + const options = { + payloadLimitMb: 0.5, + }; + + api.execute(plan, emptyState, options).on('workflow-log', (evt) => { + if (evt.name === 'JOB') { + t.deepEqual(evt.message, REDACTED_LOG.message); + } + }); + + api.execute(plan, emptyState, options).on('workflow-complete', () => { + done(); + }); + }); +}); diff --git a/packages/ws-worker/test/util/ensure-payload-size.test.ts b/packages/engine-multi/test/util/ensure-payload-size.test.ts similarity index 50% rename from packages/ws-worker/test/util/ensure-payload-size.test.ts rename to packages/engine-multi/test/util/ensure-payload-size.test.ts index 20ac97418..05f6beaa8 100644 --- a/packages/ws-worker/test/util/ensure-payload-size.test.ts +++ b/packages/engine-multi/test/util/ensure-payload-size.test.ts @@ -1,52 +1,79 @@ import test from 'ava'; -import ensurePayloadSize from '../../src/util/ensure-payload-size'; +import ensurePayloadSize, { + REDACTED_LOG, + REDACTED_STATE, + verify, +} from '../../src/util/ensure-payload-size'; const mb = (bytes: number) => bytes / 1024 / 1024; test('throw limit 0, payload 1 byte', (t) => { - t.throws(() => ensurePayloadSize('x', 0), { + t.throws(() => verify('x', 0), { name: 'PAYLOAD_TOO_LARGE', }); }); test('ok for limit 1byte, payload 1 byte', (t) => { - t.notThrows(() => ensurePayloadSize('x', mb(1))); + t.notThrows(() => verify('x', mb(1))); }); test('throw for limit 1byte, payload 2 bytes', (t) => { - t.throws(() => ensurePayloadSize('xy', mb(1)), { + t.throws(() => verify('xy', mb(1)), { name: 'PAYLOAD_TOO_LARGE', }); }); test('ok for short string, limit 1mb', (t) => { - t.notThrows(() => ensurePayloadSize('hello world', 1)); + t.notThrows(() => verify('hello world', 1)); }); test('ok for 1mb string, limit 1mb', (t) => { const str = new Array(1024 * 1024).fill('z').join(''); - t.notThrows(() => ensurePayloadSize(str, 1)); + t.notThrows(() => verify(str, 1)); }); test('throw for 1mb string + 1 byte, limit 1mb', (t) => { const str = new Array(1024 * 1024 + 1).fill('z').join(''); - t.throws(() => ensurePayloadSize(str, 1), { + t.throws(() => verify(str, 1), { name: 'PAYLOAD_TOO_LARGE', }); }); test('ok if no limit', (t) => { const str = new Array(1024 * 1024 + 1).fill('z').join(''); - t.notThrows(() => ensurePayloadSize(str)); + t.notThrows(() => verify(str)); }); test('error shape', (t) => { try { const str = new Array(1024 * 1024 + 1).fill('z').join(''); - ensurePayloadSize(str, 1); + verify(str, 1); } catch (e: any) { - t.is(e.severity, 'kill'); t.is(e.name, 'PAYLOAD_TOO_LARGE'); t.is(e.message, 'The payload exceeded the size limit of 1mb'); } }); + +test('redact payload with state', (t) => { + const payload = { + state: { + data: new Array(1024 * 1024).fill('z').join(''), + }, + }; + + const newPayload = ensurePayloadSize(payload, 1); + t.deepEqual(newPayload.state, REDACTED_STATE); + t.true(newPayload.redacted); +}); + +test('redact payload with log message', (t) => { + const payload = { + log: { + message: [new Array(1024 * 1024).fill('z').join('')], + }, + }; + + const newPayload = ensurePayloadSize(payload, 1); + t.deepEqual(newPayload.log, REDACTED_LOG); + t.true(newPayload.redacted); +}); diff --git a/packages/engine-multi/test/worker/helper.test.ts b/packages/engine-multi/test/worker/helper.test.ts index 69ff74502..88a1c36aa 100644 --- a/packages/engine-multi/test/worker/helper.test.ts +++ b/packages/engine-multi/test/worker/helper.test.ts @@ -64,7 +64,7 @@ test('execute: should publish workflow-start', async (t) => { } }; - await execute('abc', async () => {}, publish); + await execute('abc', async () => {}, { publish }); t.deepEqual(event, { workflowId: 'abc' }); }); @@ -78,7 +78,7 @@ test('execute: should publish workflow-complete', async (t) => { } }; - await execute('abc', async () => ({}), publish); + await execute('abc', async () => ({}), { publish }); t.deepEqual(event, { workflowId: 'abc', state: {} }); }); diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index 26bf37170..13def93a9 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/lightning-mock +## 2.1.2 + +### Patch Changes + +- Updated dependencies [deb7293] +- Updated dependencies [d50c05d] + - @openfn/engine-multi@1.6.0 + ## 2.1.1 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index bea938bde..f54cc6f24 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.1.1", + "version": "2.1.2", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index 45945c196..ba68bc7ac 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,17 @@ # ws-worker +## 1.12.0 + +### Minor Changes + +- d50c05d: Fix an issue where large payloads can cause the worker to OOM crash + +### Patch Changes + +- Updated dependencies [deb7293] +- Updated dependencies [d50c05d] + - @openfn/engine-multi@1.6.0 + ## 1.11.1 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 7a6265d65..9e589f606 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.11.1", + "version": "1.12.0", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/execute.ts b/packages/ws-worker/src/api/execute.ts index 52753bf16..ba30bf581 100644 --- a/packages/ws-worker/src/api/execute.ts +++ b/packages/ws-worker/src/api/execute.ts @@ -1,13 +1,16 @@ import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon'; import type { RunLogPayload } from '@openfn/lexicon/lightning'; import type { Logger } from '@openfn/logger'; -import type { RuntimeEngine, Resolvers } from '@openfn/engine-multi'; +import type { + RuntimeEngine, + Resolvers, + WorkerLogPayload, +} from '@openfn/engine-multi'; import { getWithReply, createRunState, throttle as createThrottle, - stringify, timeInMicroseconds, } from '../util'; import { @@ -25,9 +28,8 @@ import handleStepStart from '../events/step-start'; import handleRunComplete from '../events/run-complete'; import handleRunError from '../events/run-error'; -import type { Channel, RunState, JSONLog } from '../types'; +import type { Channel, RunState } from '../types'; import { WorkerRunOptions } from '../util/convert-lightning-plan'; -import ensurePayloadSize from '../util/ensure-payload-size'; const enc = new TextDecoder('utf-8'); @@ -212,32 +214,26 @@ export function onJobError(context: Context, event: any) { } } -export function onJobLog({ channel, state, options }: Context, event: JSONLog) { - let message = event.message; - try { - // The message body, the actual thing that is logged, - // may be encoded into a string - // Parse it here before sending on to lightning - // TODO this needs optimising! - if (typeof event.message === 'string') { - ensurePayloadSize(event.message, options?.payloadLimitMb); - message = JSON.parse(message); - } else if (event.message) { - const payload = stringify(event.message); - ensurePayloadSize(payload, options?.payloadLimitMb); - } - } catch (e) { +export function onJobLog( + { channel, state, options }: Context, + event: Omit +) { + let message = event.message as any[]; + + if (event.redacted) { message = [ `(Log message redacted: exceeds ${options.payloadLimitMb}mb memory limit)`, ]; + } else if (typeof event.message === 'string') { + message = JSON.parse(event.message); } - // lightning-friendly log object const log: RunLogPayload = { run_id: state.plan.id!, - message: message, + message, source: event.name, level: event.level, + // @ts-ignore timestamp: timeInMicroseconds(event.time) as string, }; diff --git a/packages/ws-worker/src/events/step-complete.ts b/packages/ws-worker/src/events/step-complete.ts index 5307ee2dd..ae37c6105 100644 --- a/packages/ws-worker/src/events/step-complete.ts +++ b/packages/ws-worker/src/events/step-complete.ts @@ -7,7 +7,6 @@ import { STEP_COMPLETE } from '../events'; import { stringify, timeInMicroseconds } from '../util'; import { calculateJobExitReason } from '../api/reasons'; import { sendEvent, onJobLog, Context } from '../api/execute'; -import ensurePayloadSize from '../util/ensure-payload-size'; export default async function onStepComplete( context: Context, @@ -54,30 +53,27 @@ export default async function onStepComplete( timestamp: timeInMicroseconds(event.time), } as StepCompletePayload; - try { - if (!options || options.outputDataclips !== false) { - const payload = stringify(outputState); - ensurePayloadSize(payload, options?.payloadLimitMb); - - // Write the dataclip if it's not too big - evt.output_dataclip = payload; - } - evt.output_dataclip_id = dataclipId; - } catch (e) { + if (event.redacted) { state.withheldDataclips[dataclipId] = true; evt.output_dataclip_error = 'DATACLIP_TOO_LARGE'; - const time = (timestamp() - BigInt(10e6)).toString(); // If the dataclip is too big, return the step without it // (the workflow will carry on internally) await onJobLog(context, { time, message: [ - 'Dataclip too large. This dataclip will not be sent back to lighting.', + 'Dataclip exceeds payload limit: output will not be sent back to the app.', ], level: 'info', name: 'R/T', }); + } else { + evt.output_dataclip_id = dataclipId; + if (!options || options.outputDataclips !== false) { + const payload = stringify(outputState); + // Write the dataclip if it's not too big + evt.output_dataclip = payload; + } } const reason = calculateJobExitReason(job_id, event.state, error); diff --git a/packages/ws-worker/src/util/ensure-payload-size.ts b/packages/ws-worker/src/util/ensure-payload-size.ts deleted file mode 100644 index b5cf6d194..000000000 --- a/packages/ws-worker/src/util/ensure-payload-size.ts +++ /dev/null @@ -1,16 +0,0 @@ -export default (payload: string, limit_mb?: number) => { - // @ts-ignore - if (!isNaN(limit_mb)) { - const limit = limit_mb as number; - const size_bytes = Buffer.byteLength(payload, 'utf8'); - const size_mb = size_bytes / 1024 / 1024; - if (size_mb > limit) { - const e = new Error(); - // @ts-ignore - e.severity = 'kill'; - e.name = 'PAYLOAD_TOO_LARGE'; - e.message = `The payload exceeded the size limit of ${limit}mb`; - throw e; - } - } -}; diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index 926804630..cccc236f2 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -100,7 +100,7 @@ test('jobLog should send a log event outside a run', async (t) => { await onJobLog({ channel, state } as any, log); }); -test('jobLog should redact log messages which are too large', async (t) => { +test('jobLog should replace the message of redacted logs', async (t) => { const plan = { id: 'run-1' }; const jobId = 'job-1'; @@ -108,7 +108,8 @@ test('jobLog should redact log messages which are too large', async (t) => { name: 'R/T', level: 'info', time: getBigIntTimestamp(), - message: JSON.stringify(new Array(1024 * 1024 + 1).fill('z').join('')), + message: [''], + redacted: true, }; const state = { diff --git a/packages/ws-worker/test/events/step-complete.test.ts b/packages/ws-worker/test/events/step-complete.test.ts index 4cc53c750..af476d75b 100644 --- a/packages/ws-worker/test/events/step-complete.test.ts +++ b/packages/ws-worker/test/events/step-complete.test.ts @@ -193,7 +193,6 @@ test('do not include dataclips in step:complete if output_dataclip is false', as test('do not include dataclips in step:complete if output_dataclip is too big', async (t) => { const plan = createPlan(); const jobId = 'job-1'; - const result = { data: new Array(1024 * 1024 + 1).fill('z').join('') }; const state = createRunState(plan); state.activeJob = jobId; @@ -207,7 +206,7 @@ test('do not include dataclips in step:complete if output_dataclip is too big', [RUN_LOG]: () => true, [STEP_COMPLETE]: (evt: StepCompletePayload) => { const clipId = state.inputDataclips['a']; - t.true(state.withheldDataclips[clipId]) + t.true(state.withheldDataclips[clipId]); t.falsy(evt.output_dataclip_id); t.falsy(evt.output_dataclip); @@ -218,7 +217,8 @@ test('do not include dataclips in step:complete if output_dataclip is too big', const event = { jobId, workflowId: plan.id, - state: result, + state: {}, + redacted: true, next: ['a'], mem: { job: 1, system: 10 }, duration: 61, @@ -232,19 +232,16 @@ test('do not include dataclips in step:complete if output_dataclip is too big', test('log when the output_dataclip is too big', async (t) => { const plan = createPlan(); const jobId = 'job-1'; - const result = { data: new Array(1024 * 1024 + 1).fill('z').join('') }; const state = createRunState(plan); state.activeJob = jobId; state.activeStep = 'b'; - const options = { - payloadLimitMb: 1, - }; + const options = {}; const channel = mockChannel({ [RUN_LOG]: (e) => { - t.regex(e.message[0], /dataclip too large/i); + t.regex(e.message[0], /dataclip exceeds payload limit/i); }, [STEP_COMPLETE]: () => true, }); @@ -252,7 +249,8 @@ test('log when the output_dataclip is too big', async (t) => { const event = { jobId, workflowId: plan.id, - state: result, + redacted: true, + state: {}, next: ['a'], mem: { job: 1, system: 10 }, duration: 61, diff --git a/packages/ws-worker/tsconfig.json b/packages/ws-worker/tsconfig.json index 834d5af09..828d33698 100644 --- a/packages/ws-worker/tsconfig.json +++ b/packages/ws-worker/tsconfig.json @@ -2,6 +2,7 @@ "extends": "../../tsconfig.common", "include": ["src/**/*.ts", "test/**/*.ts", "src/channels/runs"], "compilerOptions": { - "module": "ESNext" + "module": "ESNext", + "sourceMap": true } }