diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index c6ae74009..ebb3485a4 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -369,7 +369,7 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) { } } -export function maybePrependProcessMessage (ctx, logger) { +export function maybePrependProcessMessage (ctx, logger, loadTransactionData) { return async function * ($messages) { const isColdStart = isNil(ctx.from) @@ -405,6 +405,15 @@ export function maybePrependProcessMessage (ctx, logger) { */ if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) { logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id) + + /** + * data for a process can potentially be very large, and it's only needed + * as part of the very first process message sent to the process (aka. Bootloader). + * + * So in lieu of caching the process data, we fetch it once here, on cold start, + */ + const processData = await loadTransactionData(ctx.id).then(res => res.text()) + yield { /** * Ensure the noSave flag is set, so evaluation does not persist @@ -416,7 +425,7 @@ export function maybePrependProcessMessage (ctx, logger) { message: { Id: ctx.id, Signature: ctx.signature, - Data: ctx.data, + Data: processData, Owner: ctx.owner, /** * the target of the process message is itself @@ -484,7 +493,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { ) } -function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, saveBlocks, logger }) { +function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) { loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp)) const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks }) @@ -697,7 +706,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save .map($messages => { return composeStreams( $messages, - Transform.from(maybePrependProcessMessage(ctx, logger)) + Transform.from(maybePrependProcessMessage(ctx, logger, loadTransactionData)) ) }) .map(messages => ({ messages })) diff --git a/servers/cu/src/domain/lib/loadMessages.test.js b/servers/cu/src/domain/lib/loadMessages.test.js index 8c92542e2..d9210edc4 100644 --- a/servers/cu/src/domain/lib/loadMessages.test.js +++ b/servers/cu/src/domain/lib/loadMessages.test.js @@ -475,6 +475,11 @@ describe('loadMessages', () => { } } + const loadTransactionData = async (id) => { + assert.equal(id, 'process-123') + return new Response('process data') + } + describe('should prepend the process message on cold start', () => { test('if first stream message is not the process', async () => { const $messages = Readable.from([ @@ -486,13 +491,14 @@ describe('loadMessages', () => { } } ]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) assert.equal(results.length, 2) assert.equal(results[0].name, 'Process Message process-123') + assert.equal(results[0].message.Data, 'process data') }) test('if the first stream message is a cron message', async () => { @@ -506,24 +512,26 @@ describe('loadMessages', () => { } } ]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) assert.equal(results.length, 2) assert.equal(results[0].name, 'Process Message process-123') + assert.equal(results[0].message.Data, 'process data') }) test('if there are no messages', async () => { const $messages = Readable.from([]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) assert.equal(results.length, 1) assert.equal(results[0].name, 'Process Message process-123') + assert.equal(results[0].message.Data, 'process data') }) }) @@ -546,7 +554,7 @@ describe('loadMessages', () => { } } ]) - const $merged = maybePrependProcessMessage(ctx, logger)($messages) + const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages) const results = [] for await (const m of $merged) results.push(m) diff --git a/servers/cu/src/effects/ao-process.js b/servers/cu/src/effects/ao-process.js index ec6a9dfad..ff4c13692 100644 --- a/servers/cu/src/effects/ao-process.js +++ b/servers/cu/src/effects/ao-process.js @@ -419,6 +419,13 @@ export function saveProcessWith ({ db }) { } return (process) => { return of(process) + /** + * The data for the process could be very large, so we do not persist + * it, and instead hydrate it on the process message later, if needed. + */ + .map(evolve({ + data: () => null + })) /** * Ensure the expected shape before writing to the db */ diff --git a/servers/cu/src/effects/ao-process.test.js b/servers/cu/src/effects/ao-process.test.js index 301af9c9d..278299f29 100644 --- a/servers/cu/src/effects/ao-process.test.js +++ b/servers/cu/src/effects/ao-process.test.js @@ -134,7 +134,7 @@ describe('ao-process', () => { assert.deepStrictEqual(parameters, [ 'process-123', 'sig-123', - 'data-123', + null, // data is nullified null, JSON.stringify({ address: 'owner-123', key: 'key-123' }), JSON.stringify([{ name: 'foo', value: 'bar' }]),