Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Read Unit mode to CU #1096

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions servers/cu/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const server = pipeP(
*/
const server = app.listen({ port: config.port, host: '0.0.0.0' }, () => {
logger(`Server is running on http://localhost:${config.port}`)
logger(`Server in unit mode: "${config.UNIT_MODE}"`)
})

const memMonitor = setInterval(async () => {
Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ export const createApis = async (ctx) => {
id: workerId,
MODE: ctx.MODE,
LOG_CONFIG_PATH: ctx.LOG_CONFIG_PATH,
DEFAULT_LOG_LEVEL: ctx.DEFAULT_LOG_LEVEL
DEFAULT_LOG_LEVEL: ctx.DEFAULT_LOG_LEVEL,
DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE
}
}
}
Expand Down
34 changes: 31 additions & 3 deletions servers/cu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const DEFAULT_PROCESS_WASM_MODULE_FORMATS = [
*/
const serverConfigSchema = domainConfigSchema.extend({
MODE: z.enum(['development', 'production']),
/**
* Whether the unit is operating as a Compute Unit
* or Read Unit. Defaults to 'cu'.
*/
UNIT_MODE: z.enum(['cu', 'ru']),
port: positiveIntSchema,
ENABLE_METRICS_ENDPOINT: z.preprocess((val) => !!val, z.boolean())
})
Expand All @@ -51,13 +56,32 @@ const serverConfigSchema = domainConfigSchema.extend({
*/
/* eslint-disable no-throw-literal */

const preprocessUnitMode = (envConfig) => {
const { UNIT_MODE } = envConfig

if (UNIT_MODE === 'cu') return envConfig

/**
* A Read Unit's primary concern is serving dry-runs,
* and so does not create checkpoints and does not cache evaluation
* results to be served later.
*/
return {
...envConfig,
DISABLE_PROCESS_EVALUATION_CACHE: true,
DISABLE_PROCESS_CHECKPOINT_CREATION: true,
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: true,
PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: 0
}
}

/**
* If the WALLET is defined, then do nothing.
*
* Otherwise, check whether the WALLET_FILE env var is defined and load it contents
* as WALLET
*/
export const preprocessWallet = (envConfig) => {
const preprocessWallet = (envConfig) => {
const { WALLET, WALLET_FILE, ...theRestOfTheConfig } = envConfig

// WALLET takes precendent. nothing to do here
Expand All @@ -83,7 +107,7 @@ export const preprocessWallet = (envConfig) => {
const preprocessedServerConfigSchema = z.preprocess(
(envConfig, zodRefinementContext) => {
try {
return pipe(preprocessWallet, preprocessUrls)(envConfig)
return pipe(preprocessUnitMode, preprocessWallet, preprocessUrls)(envConfig)
} catch (message) {
zodRefinementContext.addIssue({ code: ZodIssueCode.custom, message })
}
Expand All @@ -100,6 +124,7 @@ const preprocessedServerConfigSchema = z.preprocess(
const CONFIG_ENVS = {
development: {
MODE,
UNIT_MODE: process.env.UNIT_MODE || 'cu',
DEFAULT_LOG_LEVEL: process.env.DEFAULT_LOG_LEVEL || 'debug',
LOG_CONFIG_PATH: process.env.LOG_CONFIG_PATH || '.loglevel',
MODULE_MODE: process.env.MODULE_MODE,
Expand All @@ -117,6 +142,7 @@ const CONFIG_ENVS = {
PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'),
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_EVALUATION_CACHE: process.env.DISABLE_PROCESS_EVALUATION_CACHE,
/**
* EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000)
* This was calculated by creating a process built to do continuous compute. After 2 hours, this process used
Expand Down Expand Up @@ -149,6 +175,7 @@ const CONFIG_ENVS = {
},
production: {
MODE,
UNIT_MODE: process.env.UNIT_MODE || 'cu',
DEFAULT_LOG_LEVEL: process.env.DEFAULT_LOG_LEVEL || 'debug',
LOG_CONFIG_PATH: process.env.LOG_CONFIG_PATH || '.loglevel',
MODULE_MODE: process.env.MODULE_MODE,
Expand All @@ -164,8 +191,9 @@ const CONFIG_ENVS = {
WALLET_FILE: process.env.WALLET_FILE,
MEM_MONITOR_INTERVAL: process.env.MEM_MONITOR_INTERVAL || ms('30s'),
PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'),
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', // TODO: disabled by default for now. Enable by default later
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION !== 'false',
DISABLE_PROCESS_EVALUATION_CACHE: process.env.DISABLE_PROCESS_EVALUATION_CACHE,
/**
* EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000)
* This was calculated by creating a process built to do continuous compute by adding and clearing a table.
Expand Down
17 changes: 13 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
}
}

export function maybePrependProcessMessage (ctx, logger) {
export function maybePrependProcessMessage (ctx, logger, loadTransactionData) {
return async function * ($messages) {
const isColdStart = isNil(ctx.from)

Expand Down Expand Up @@ -405,6 +405,15 @@ export function maybePrependProcessMessage (ctx, logger) {
*/
if (done || (parseTags(value.message.Tags).Type !== 'Process') || value.message.Cron) {
logger('Emitting process message at beginning of evaluation stream for process %s cold start', ctx.id)

/**
* data for a process can potentially be very large, and it's only needed
* as part of the very first process message sent to the process (aka. Bootloader).
*
* So in lieu of caching the process data, we fetch it once here, on cold start,
*/
const processData = await loadTransactionData(ctx.id).then(res => res.text())

yield {
/**
* Ensure the noSave flag is set, so evaluation does not persist
Expand All @@ -416,7 +425,7 @@ export function maybePrependProcessMessage (ctx, logger) {
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Data: processData,
Owner: ctx.owner,
/**
* the target of the process message is itself
Expand Down Expand Up @@ -484,7 +493,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) {
)
}

function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, saveBlocks, logger }) {
function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) {
loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp))

const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks })
Expand Down Expand Up @@ -697,7 +706,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, save
.map($messages => {
return composeStreams(
$messages,
Transform.from(maybePrependProcessMessage(ctx, logger))
Transform.from(maybePrependProcessMessage(ctx, logger, loadTransactionData))
)
})
.map(messages => ({ messages }))
Expand Down
16 changes: 12 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ describe('loadMessages', () => {
}
}

const loadTransactionData = async (id) => {
assert.equal(id, 'process-123')
return new Response('process data')
}

describe('should prepend the process message on cold start', () => {
test('if first stream message is not the process', async () => {
const $messages = Readable.from([
Expand All @@ -486,13 +491,14 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})

test('if the first stream message is a cron message', async () => {
Expand All @@ -506,24 +512,26 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 2)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})

test('if there are no messages', async () => {
const $messages = Readable.from([])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)

assert.equal(results.length, 1)
assert.equal(results[0].name, 'Process Message process-123')
assert.equal(results[0].message.Data, 'process data')
})
})

Expand All @@ -546,7 +554,7 @@ describe('loadMessages', () => {
}
}
])
const $merged = maybePrependProcessMessage(ctx, logger)($messages)
const $merged = maybePrependProcessMessage(ctx, logger, loadTransactionData)($messages)

const results = []
for await (const m of $merged) results.push(m)
Expand Down
5 changes: 5 additions & 0 deletions servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ export const domainConfigSchema = z.object({
* Whether to disable File Process Checkpoint creation entirely.
*/
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: z.preprocess((val) => !!val, z.boolean()),
/**
* Whether to disable caching process evaluations, useful when operating as
* a RU
*/
DISABLE_PROCESS_EVALUATION_CACHE: z.preprocess((val) => !!val, z.boolean()),
/**
* If a process uses this amount of
* gas, then it will immediately create a Checkpoint at the end of the
Expand Down
12 changes: 7 additions & 5 deletions servers/cu/src/effects/ao-evaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export function findEvaluationWith ({ db }) {
}
}

export function saveEvaluationWith ({ db, logger: _logger }) {
export function saveEvaluationWith ({ DISABLE_PROCESS_EVALUATION_CACHE, db, logger: _logger }) {
const toEvaluationDoc = pipe(
converge(
unapply(mergeAll),
Expand Down Expand Up @@ -138,8 +138,10 @@ export function saveEvaluationWith ({ db, logger: _logger }) {

function createQuery (evaluation) {
const evalDoc = toEvaluationDoc(evaluation)
const statements = [
{
const statements = []

if (!DISABLE_PROCESS_EVALUATION_CACHE) {
statements.push({
sql: `
INSERT OR IGNORE INTO ${EVALUATIONS_TABLE}
(id, "processId", "messageId", "deepHash", nonce, epoch, timestamp, ordinate, "blockHeight", cron, "evaluatedAt", output)
Expand All @@ -160,8 +162,8 @@ export function saveEvaluationWith ({ db, logger: _logger }) {
evalDoc.evaluatedAt.getTime(),
JSON.stringify(evalDoc.output)
]
}
]
})
}

/**
* Cron messages are not needed to be saved in the messages table
Expand Down
32 changes: 32 additions & 0 deletions servers/cu/src/effects/ao-evaluation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,38 @@ describe('ao-evaluation', () => {
evaluatedAt
})
})

test('noop insert evaluation if DISABLE_PROCESS_EVALUATION_CACHE', async () => {
const saveEvaluation = saveEvaluationSchema.implement(
saveEvaluationWith({
DISABLE_PROCESS_EVALUATION_CACHE: true,
db: {
transaction: async (statements) => {
assert.equal(statements.length, 1)
const [{ sql: messageDocSql }] = statements
assert.ok(messageDocSql.trim().startsWith(`INSERT OR IGNORE INTO ${MESSAGES_TABLE}`))

return Promise.resolve('process-123,1702677252111,1')
}
},
logger
})
)

await saveEvaluation({
isAssignment: false,
deepHash: 'deepHash-123',
timestamp: 1702677252111,
nonce: '1',
epoch: 0,
ordinate: 1,
blockHeight: 1234,
processId: 'process-123',
messageId: 'message-123',
output: { Messages: [{ foo: 'bar' }], Memory: 'foo' },
evaluatedAt
})
})
})

describe('findEvaluations', () => {
Expand Down
7 changes: 7 additions & 0 deletions servers/cu/src/effects/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ export function saveProcessWith ({ db }) {
}
return (process) => {
return of(process)
/**
* The data for the process could be very large, so we do not persist
* it, and instead hydrate it on the process message later, if needed.
*/
.map(evolve({
data: () => null
}))
/**
* Ensure the expected shape before writing to the db
*/
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/effects/ao-process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ describe('ao-process', () => {
assert.deepStrictEqual(parameters, [
'process-123',
'sig-123',
'data-123',
null, // data is nullified
null,
JSON.stringify({ address: 'owner-123', key: 'key-123' }),
JSON.stringify([{ name: 'foo', value: 'bar' }]),
Expand Down
6 changes: 5 additions & 1 deletion servers/cu/src/effects/worker/evaluator/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ export const createApis = async (ctx) => {
CHECKPOINT_GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL
}),
bootstrapWasmInstance: WasmClient.bootstrapWasmInstanceWith(),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger: ctx.logger }),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({
DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE,
db,
logger: ctx.logger
}),
ARWEAVE_URL: ctx.ARWEAVE_URL,
logger: ctx.logger
})
Expand Down
4 changes: 3 additions & 1 deletion servers/cu/src/routes/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { always, compose, identity } from 'ramda'
import { z } from 'zod'

import { withMetrics, withMiddleware, withProcessRestrictionFromPath } from './middleware/index.js'
import { withCuMode } from './middleware/withCuMode.js'

/**
* TODO: could be moved into a route utils or middleware
Expand Down Expand Up @@ -37,8 +38,9 @@ export const withCronRoutes = app => {
'/cron/:processId',
compose(
withMiddleware,
withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }),
withCuMode,
withProcessRestrictionFromPath,
withMetrics({ tracesFrom: (req) => ({ process_id: req.params.processId }) }),
always(async (req, res) => {
const {
params: { processId },
Expand Down
1 change: 1 addition & 0 deletions servers/cu/src/routes/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { withDomain } from './withDomain.js'

export * from './withProcessRestriction.js'
export * from './withMetrics.js'
export * from './withCuMode.js'

/**
* A convenience method that composes common middleware needed on most routes,
Expand Down
12 changes: 12 additions & 0 deletions servers/cu/src/routes/middleware/withCuMode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { config } from '../../config.js'

const withUnitMode = (mode) => (handler) => (req, res, next) => {
const { UNIT_MODE } = config

if (UNIT_MODE !== mode) return res.status(404).send('Not Found')

return handler(req, res, next)
}

export const withCuMode = withUnitMode('cu')
export const withRuMode = withUnitMode('ru')
Loading
Loading