Skip to content

Commit

Permalink
perf(cu): do not cache process data, and instead fetch once on cold s…
Browse files Browse the repository at this point in the history
…tart
  • Loading branch information
TillaTheHun0 committed Dec 23, 2024
1 parent 764f6cd commit 450ffa4
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
17 changes: 13 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
}
}

export function maybePrependProcessMessage (ctx, logger) {
export function maybePrependProcessMessage (ctx, logger, loadTransactionData) {
return async function * ($messages) {
const isColdStart = isNil(ctx.from)

Expand Down Expand Up @@ -405,6 +405,15 @@ export function maybePrependProcessMessage (ctx, logger) {
*/
if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)

/**
* data for a process can potentially be very large, and it's only needed
* as part of the very first process message sent to the process (aka. Bootloader).
*
* So in lieu of caching the process data, we fetch it once here, on cold start,
*/
const processData = await loadTransactionData(ctx.id).then(res => res.text())

yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
Expand All @@ -416,7 +425,7 @@ export function maybePrependProcessMessage (ctx, logger) {
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Data: processData,
Owner: ctx.owner,
/**
* the target of the process message is itself
Expand Down Expand Up @@ -484,7 +493,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) {
)
}

function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, saveBlocks, logger }) {
function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) {
loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp))

const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks })
Expand Down Expand Up @@ -697,7 +706,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save
.map($messages => {
return composeStreams(
$messages,
Transform.from(maybePrependProcessMessage(ctx, logger))
Transform.from(maybePrependProcessMessage(ctx, logger, loadTransactionData))
)
})
.map(messages => ({ messages }))
Expand Down
16 changes: 12 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ describe('loadMessages', () => {
}
}

const loadTransactionData = async (id) => {
assert.equal(id, 'process-123')
return new Response('process data')
}

describe('should prepend the process message on cold start', () => {
test('if first stream message is not the process', async () => {
const $messages = Readable.from([
Expand All @@ -486,13 +491,14 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})

test('if the first stream message is a cron message', async () => {
Expand All @@ -506,24 +512,26 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})

test('if there are no messages', async () => {
const $messages = Readable.from([])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 1)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})
})

Expand All @@ -546,7 +554,7 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)
Expand Down
7 changes: 7 additions & 0 deletions servers/cu/src/effects/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ export function saveProcessWith ({ db }) {
}
return (process) => {
return of(process)
/**
* The data for the process could be very large, so we do not persist
* it, and instead hydrate it on the process message later, if needed.
*/
.map(evolve({
data: () => null
}))
/**
* Ensure the expected shape before writing to the db
*/
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/effects/ao-process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ describe('ao-process', () => {
assert.deepStrictEqual(parameters, [
'process-123',
'sig-123',
'data-123',
null, // data is nullified
null,
JSON.stringify({ address: 'owner-123', key: 'key-123' }),
JSON.stringify([{ name: 'foo', value: 'bar' }]),
Expand Down

0 comments on commit 450ffa4

Please sign in to comment.