Skip to content

Commit

Permalink
feat(cu): support using postgres as db #588
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Dec 23, 2024
1 parent 0d3f519 commit 29d5d53
Show file tree
Hide file tree
Showing 16 changed files with 413 additions and 53 deletions.
6 changes: 6 additions & 0 deletions servers/cu/.postgres/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Only used for development purposes
FROM postgres:17

ENV POSTGRES_PASSWORD=admin
ENV POSTGRES_USER=admin
ENV POSTGRES_DB=cu
138 changes: 138 additions & 0 deletions servers/cu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"opossum": "^8.4.0",
"p-map": "^7.0.3",
"p-queue": "^8.0.1",
"pg": "^8.13.1",
"prom-client": "^15.1.3",
"ramda": "^0.30.1",
"undici": "^7.2.0",
Expand Down
37 changes: 18 additions & 19 deletions servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import { fromPromise } from 'hyper-async'
import lt from 'long-timeout'

// Precanned clients to use for OOTB apis
import * as DbClient from './effects/db.js'
import * as ArweaveClient from './effects/arweave.js'
import * as SqliteClient from './effects/sqlite.js'
import * as AoSuClient from './effects/ao-su.js'
import * as WasmClient from './effects/wasm.js'
import * as AoProcessClient from './effects/ao-process.js'
Expand Down Expand Up @@ -77,8 +77,7 @@ export const createApis = async (ctx) => {
cacheKeyFn: ({ processId }) => processId
})

const DB_URL = `${ctx.DB_URL}.sqlite`
const sqlite = await SqliteClient.createSqliteClient({ url: DB_URL, bootstrap: true })
const db = await DbClient.createDbClient({ url: ctx.DB_URL, bootstrap: true })

const BROADCAST = 'workers'
const workerBroadcast = new BroadcastChannel(BROADCAST).unref()
Expand All @@ -98,7 +97,7 @@ export const createApis = async (ctx) => {
ARWEAVE_URL: ctx.ARWEAVE_URL,
GRAPHQL_URL: ctx.GRAPHQL_URL,
CHECKPOINT_GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL,
DB_URL,
DB_URL: ctx.DB_URL,
id: workerId,
MODE: ctx.MODE,
LOG_CONFIG_PATH: ctx.LOG_CONFIG_PATH,
Expand Down Expand Up @@ -219,9 +218,9 @@ export const createApis = async (ctx) => {
hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }),
buildAndSignDataItem: ArweaveClient.buildAndSignDataItemWith({ WALLET: ctx.WALLET }),
uploadDataItem: ArweaveClient.uploadDataItemWith({ UPLOADER_URL: ctx.UPLOADER_URL, fetch: ctx.fetch, logger: ctx.logger }),
writeCheckpointRecord: AoProcessClient.writeCheckpointRecordWith({ db: sqlite }),
writeCheckpointRecord: AoProcessClient.writeCheckpointRecordWith({ db }),
writeFileCheckpointMemory,
writeFileCheckpointRecord: AoProcessClient.writeFileCheckpointRecordWith({ db: sqlite }),
writeFileCheckpointRecord: AoProcessClient.writeFileCheckpointRecordWith({ db }),
logger: ctx.logger,
DISABLE_PROCESS_CHECKPOINT_CREATION: ctx.DISABLE_PROCESS_CHECKPOINT_CREATION,
DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: ctx.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION,
Expand Down Expand Up @@ -307,14 +306,14 @@ export const createApis = async (ctx) => {
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }),
isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }),
findProcess: AoProcessClient.findProcessWith({ db: sqlite, logger }),
findProcess: AoProcessClient.findProcessWith({ db, logger }),
findLatestProcessMemory: AoProcessClient.findLatestProcessMemoryWith({
cache: wasmMemoryCache,
loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }),
readProcessMemoryFile,
readFileCheckpointMemory,
findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db: sqlite }),
findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db: sqlite }),
findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }),
findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }),
address,
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }),
Expand All @@ -331,14 +330,14 @@ export const createApis = async (ctx) => {
}),
evaluationCounter,
// gasCounter,
saveProcess: AoProcessClient.saveProcessWith({ db: sqlite, logger }),
findEvaluation: AoEvaluationClient.findEvaluationWith({ db: sqlite, logger }),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db: sqlite, logger }),
findBlocks: AoBlockClient.findBlocksWith({ db: sqlite, logger }),
saveBlocks: AoBlockClient.saveBlocksWith({ db: sqlite, logger }),
saveProcess: AoProcessClient.saveProcessWith({ db, logger }),
findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }),
findBlocks: AoBlockClient.findBlocksWith({ db, logger }),
saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }),
loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, pageSize: 90, logger }),
findModule: AoModuleClient.findModuleWith({ db: sqlite, logger }),
saveModule: AoModuleClient.saveModuleWith({ db: sqlite, logger }),
findModule: AoModuleClient.findModuleWith({ db, logger }),
saveModule: AoModuleClient.saveModuleWith({ db, logger }),
loadEvaluator: AoModuleClient.evaluatorWith({
loadWasmModule,
evaluateWith: (prep) => primaryWorkQueue.add(() =>
Expand All @@ -362,7 +361,7 @@ export const createApis = async (ctx) => {
),
logger
}),
findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db: sqlite, logger }),
findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db, logger }),
loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }),
loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }),
loadMessages: AoSuClient.loadMessagesWith({ fetch: ctx.fetch, pageSize: 1000, logger }),
Expand Down Expand Up @@ -450,13 +449,13 @@ export const createApis = async (ctx) => {
const readCronResultsLogger = ctx.logger.child('readCronResults')
const readCronResults = readCronResultsWith({
...sharedDeps(readCronResultsLogger),
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db: sqlite, logger: readCronResultsLogger })
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readCronResultsLogger })
})

const readResultsLogger = ctx.logger.child('readResults')
const readResults = readResultsWith({
...sharedDeps(readResultsLogger),
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db: sqlite, logger: readResultsLogger })
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readResultsLogger })
})

let checkpointP
Expand Down
10 changes: 9 additions & 1 deletion servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,15 @@ export const evaluationSchema = z.object({
evaluatedAt: z.preprocess(
(
arg
) => (typeof arg === 'string' || arg instanceof Date ? new Date(arg) : arg),
) => {
// typeof arg === 'string' || arg instanceof Date ? new Date(arg + 0) : arg

if (arg instanceof Date) return arg
if (typeof arg === 'string') try { arg = parseInt(arg) } catch {}
if (typeof arg === 'number') return new Date(arg)

return arg
},
z.date()
),
/**
Expand Down
5 changes: 3 additions & 2 deletions servers/cu/src/effects/ao-block.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { fromPromise, of } from 'hyper-async'
import { applySpec, last, map, path, pathSatisfies, pipe, pluck, prop, props, splitEvery } from 'ramda'
import { z } from 'zod'
import pMap from 'p-map'
import CircuitBreaker from 'opossum'

import { blockSchema } from '../domain/model.js'
import { BLOCKS_TABLE } from './sqlite.js'
import { backoff, strFromFetchError } from '../domain/utils.js'
import CircuitBreaker from 'opossum'
import { BLOCKS_TABLE } from './db.js'

const okRes = (res) => {
if (res.ok) return res
Expand Down
19 changes: 9 additions & 10 deletions servers/cu/src/effects/ao-evaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { always, applySpec, isEmpty, isNotNil, converge, mergeAll, map, unapply,
import { z } from 'zod'

import { evaluationSchema } from '../domain/model.js'
import { EVALUATIONS_TABLE, MESSAGES_TABLE, COLLATION_SEQUENCE_MAX_CHAR } from './sqlite.js'
import { EVALUATIONS_TABLE, MESSAGES_TABLE, COLLATION_SEQUENCE_MAX_CHAR } from './db.js'

const evaluationDocSchema = z.object({
id: z.string().min(1),
Expand Down Expand Up @@ -60,8 +60,7 @@ const toEvaluation = applySpec({

const fromEvaluationDoc = pipe(
evolve({
output: JSON.parse,
evaluatedAt: (timestamp) => new Date(timestamp)
output: JSON.parse
}),
/**
* Ensure the input matches the expected
Expand All @@ -76,8 +75,8 @@ export function findEvaluationWith ({ db }) {
return {
sql: `
SELECT
id, processId, messageId, deepHash, nonce, epoch, timestamp,
ordinate, blockHeight, cron, evaluatedAt, output
id, "processId", "messageId", "deepHash", nonce, epoch, timestamp,
ordinate, "blockHeight", cron, "evaluatedAt", output
FROM ${EVALUATIONS_TABLE}
WHERE
id = ?;
Expand Down Expand Up @@ -143,7 +142,7 @@ export function saveEvaluationWith ({ db, logger: _logger }) {
{
sql: `
INSERT OR IGNORE INTO ${EVALUATIONS_TABLE}
(id, processId, messageId, deepHash, nonce, epoch, timestamp, ordinate, blockHeight, cron, evaluatedAt, output)
(id, "processId", "messageId", "deepHash", nonce, epoch, timestamp, ordinate, "blockHeight", cron, "evaluatedAt", output)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`,
parameters: [
Expand All @@ -170,7 +169,7 @@ export function saveEvaluationWith ({ db, logger: _logger }) {
if (!evaluation.cron) {
statements.push({
sql: `
INSERT OR IGNORE INTO ${MESSAGES_TABLE} (id, processId, seq) VALUES (?, ?, ?);
INSERT OR IGNORE INTO ${MESSAGES_TABLE} (id, "processId", seq) VALUES (?, ?, ?);
`,
parameters: messageDocParamsSchema.parse([
createMessageId({
Expand Down Expand Up @@ -204,8 +203,8 @@ export function findEvaluationsWith ({ db }) {
return {
sql: `
SELECT
id, processId, messageId, deepHash, nonce, epoch, timestamp,
ordinate, blockHeight, cron, evaluatedAt, output
id, "processId", "messageId", "deepHash", nonce, epoch, timestamp,
ordinate, "blockHeight", cron, "evaluatedAt", output
FROM ${EVALUATIONS_TABLE}
WHERE
id > ? AND id <= ?
Expand Down Expand Up @@ -251,7 +250,7 @@ export function findMessageBeforeWith ({ db }) {
FROM ${MESSAGES_TABLE}
WHERE
id = ?
AND processId = ?
AND "processId" = ?
AND seq < ?
LIMIT 1;
`,
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/effects/ao-evaluation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import assert from 'node:assert'
import { createTestLogger } from '../domain/logger.js'
import { findEvaluationSchema, findEvaluationsSchema, findMessageBeforeSchema, saveEvaluationSchema } from '../domain/dal.js'
import { findMessageBeforeWith, findEvaluationWith, findEvaluationsWith, saveEvaluationWith } from './ao-evaluation.js'
import { COLLATION_SEQUENCE_MAX_CHAR, EVALUATIONS_TABLE, MESSAGES_TABLE } from './sqlite.js'
import { COLLATION_SEQUENCE_MAX_CHAR, EVALUATIONS_TABLE, MESSAGES_TABLE } from './db.js'

const logger = createTestLogger({ name: 'ao-cu:readState' })

Expand Down
Loading

0 comments on commit 29d5d53

Please sign in to comment.