diff --git a/servers/cu/.postgres/Dockerfile b/servers/cu/.postgres/Dockerfile new file mode 100644 index 000000000..fe4cef7c2 --- /dev/null +++ b/servers/cu/.postgres/Dockerfile @@ -0,0 +1,6 @@ +# Only used for development purposes +FROM postgres:17 + +ENV POSTGRES_PASSWORD=admin +ENV POSTGRES_USER=admin +ENV POSTGRES_DB=cu diff --git a/servers/cu/package-lock.json b/servers/cu/package-lock.json index aa31dc196..14dedf508 100644 --- a/servers/cu/package-lock.json +++ b/servers/cu/package-lock.json @@ -30,6 +30,7 @@ "opossum": "^8.4.0", "p-map": "^7.0.3", "p-queue": "^8.0.1", + "pg": "^8.13.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", "undici": "^7.2.0", @@ -1415,6 +1416,95 @@ "node": ">=16" } }, + "node_modules/pg": { + "version": "8.13.1", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.13.1.tgz", + "integrity": "sha512-OUir1A0rPNZlX//c7ksiu7crsGZTKSOXJPgtNiHGIlC9H0lO+NC6ZDYksSgBYY/thSWhnSRBv8w1lieNNGATNQ==", + "license": "MIT", + "dependencies": { + "pg-connection-string": "^2.7.0", + "pg-pool": "^3.7.0", + "pg-protocol": "^1.7.0", + "pg-types": "^2.1.0", + "pgpass": "1.x" + }, + "engines": { + "node": ">= 8.0.0" + }, + "optionalDependencies": { + "pg-cloudflare": "^1.1.1" + }, + "peerDependencies": { + "pg-native": ">=3.0.1" + }, + "peerDependenciesMeta": { + "pg-native": { + "optional": true + } + } + }, + "node_modules/pg-cloudflare": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz", + "integrity": "sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q==", + "license": "MIT", + "optional": true + }, + "node_modules/pg-connection-string": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.7.0.tgz", + "integrity": "sha512-PI2W9mv53rXJQEOb8xNR8lH7Hr+EKa6oJa38zsK0S/ky2er16ios1wLKhZyxzD7jUReiWokc9WK5nxSnC7W1TA==", + "license": "MIT" + }, + "node_modules/pg-int8": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", + "license": "ISC", + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/pg-pool": { + "version": "3.7.0", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.0.tgz", + "integrity": "sha512-ZOBQForurqh4zZWjrgSwwAtzJ7QiRX0ovFkZr2klsen3Nm0aoh33Ls0fzfv3imeH/nw/O27cjdz5kzYJfeGp/g==", + "license": "MIT", + "peerDependencies": { + "pg": ">=8.0" + } + }, + "node_modules/pg-protocol": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.7.0.tgz", + "integrity": "sha512-hTK/mE36i8fDDhgDFjy6xNOG+LCorxLG3WO17tku+ij6sVHXh1jQUJ8hYAnRhNla4QVD2H8er/FOjc/+EgC6yQ==", + "license": "MIT" + }, + "node_modules/pg-types": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", + "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", + "license": "MIT", + "dependencies": { + "pg-int8": "1.0.1", + "postgres-array": "~2.0.0", + "postgres-bytea": "~1.0.0", + "postgres-date": "~1.0.4", + "postgres-interval": "^1.1.0" + }, + "engines": { + "node": ">=4" + } + }, + "node_modules/pgpass": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", + "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", + "license": "MIT", + "dependencies": { + "split2": "^4.1.0" + } + }, "node_modules/picomatch": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.1.tgz", @@ -1464,6 +1554,45 @@ "integrity": "sha512-e906FRY0+tV27iq4juKzSYPbUj2do2X2JX4EzSca1631EB2QJQUqGbDuERal7LCtOpxl6x3+nvo9NPZcmjkiFA==", "license": "MIT" }, + "node_modules/postgres-array": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/postgres-bytea": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", + "integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-date": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-interval": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", + "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", + "license": "MIT", + "dependencies": { + "xtend": "^4.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/prebuild-install": { "version": "7.1.2", "resolved": "https://registry.npmjs.org/prebuild-install/-/prebuild-install-7.1.2.tgz", @@ -2144,6 +2273,15 @@ "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", "license": "ISC" }, + "node_modules/xtend": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", + "license": "MIT", + "engines": { + "node": ">=0.4" + } + }, "node_modules/zod": { "version": "3.24.1", "resolved": "https://registry.npmjs.org/zod/-/zod-3.24.1.tgz", diff --git a/servers/cu/package.json b/servers/cu/package.json index ec7a67486..1fd01ce1e 100644 --- a/servers/cu/package.json +++ b/servers/cu/package.json @@ -33,6 +33,7 @@ "opossum": "^8.4.0", "p-map": "^7.0.3", "p-queue": "^8.0.1", + "pg": "^8.13.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", "undici": "^7.2.0", diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index ba52e248d..2f69061b4 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -14,8 +14,8 @@ import { fromPromise } from 'hyper-async' import lt from 'long-timeout' // Precanned clients to use for OOTB apis +import * as DbClient from './effects/db.js' import * as ArweaveClient from './effects/arweave.js' -import * as SqliteClient from './effects/sqlite.js' import * as AoSuClient from './effects/ao-su.js' import * as WasmClient from './effects/wasm.js' import * as AoProcessClient from './effects/ao-process.js' @@ -77,8 +77,7 @@ export const createApis = async (ctx) => { cacheKeyFn: ({ processId }) => processId }) - const DB_URL = `${ctx.DB_URL}.sqlite` - const sqlite = await SqliteClient.createSqliteClient({ url: DB_URL, bootstrap: true }) + const db = await DbClient.createDbClient({ url: ctx.DB_URL, bootstrap: true }) const BROADCAST = 'workers' const workerBroadcast = new BroadcastChannel(BROADCAST).unref() @@ -98,7 +97,7 @@ export const createApis = async (ctx) => { ARWEAVE_URL: ctx.ARWEAVE_URL, GRAPHQL_URL: ctx.GRAPHQL_URL, CHECKPOINT_GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, - DB_URL, + DB_URL: ctx.DB_URL, id: workerId, MODE: ctx.MODE, LOG_CONFIG_PATH: ctx.LOG_CONFIG_PATH, @@ -219,9 +218,9 @@ export const createApis = async (ctx) => { hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }), buildAndSignDataItem: ArweaveClient.buildAndSignDataItemWith({ WALLET: ctx.WALLET }), uploadDataItem: ArweaveClient.uploadDataItemWith({ UPLOADER_URL: ctx.UPLOADER_URL, fetch: ctx.fetch, logger: ctx.logger }), - writeCheckpointRecord: AoProcessClient.writeCheckpointRecordWith({ db: sqlite }), + writeCheckpointRecord: AoProcessClient.writeCheckpointRecordWith({ db }), writeFileCheckpointMemory, - writeFileCheckpointRecord: AoProcessClient.writeFileCheckpointRecordWith({ db: sqlite }), + writeFileCheckpointRecord: AoProcessClient.writeFileCheckpointRecordWith({ db }), logger: ctx.logger, DISABLE_PROCESS_CHECKPOINT_CREATION: ctx.DISABLE_PROCESS_CHECKPOINT_CREATION, DISABLE_PROCESS_FILE_CHECKPOINT_CREATION: ctx.DISABLE_PROCESS_FILE_CHECKPOINT_CREATION, @@ -307,14 +306,14 @@ export const createApis = async (ctx) => { loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }), isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }), - findProcess: AoProcessClient.findProcessWith({ db: sqlite, logger }), + findProcess: AoProcessClient.findProcessWith({ db, logger }), findLatestProcessMemory: AoProcessClient.findLatestProcessMemoryWith({ cache: wasmMemoryCache, loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }), readProcessMemoryFile, readFileCheckpointMemory, - findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db: sqlite }), - findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db: sqlite }), + findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }), + findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }), address, queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }), queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }), @@ -331,14 +330,14 @@ export const createApis = async (ctx) => { }), evaluationCounter, // gasCounter, - saveProcess: AoProcessClient.saveProcessWith({ db: sqlite, logger }), - findEvaluation: AoEvaluationClient.findEvaluationWith({ db: sqlite, logger }), - saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db: sqlite, logger }), - findBlocks: AoBlockClient.findBlocksWith({ db: sqlite, logger }), - saveBlocks: AoBlockClient.saveBlocksWith({ db: sqlite, logger }), + saveProcess: AoProcessClient.saveProcessWith({ db, logger }), + findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }), + saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }), + findBlocks: AoBlockClient.findBlocksWith({ db, logger }), + saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }), loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, pageSize: 90, logger }), - findModule: AoModuleClient.findModuleWith({ db: sqlite, logger }), - saveModule: AoModuleClient.saveModuleWith({ db: sqlite, logger }), + findModule: AoModuleClient.findModuleWith({ db, logger }), + saveModule: AoModuleClient.saveModuleWith({ db, logger }), loadEvaluator: AoModuleClient.evaluatorWith({ loadWasmModule, evaluateWith: (prep) => primaryWorkQueue.add(() => @@ -362,7 +361,7 @@ export const createApis = async (ctx) => { ), logger }), - findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db: sqlite, logger }), + findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db, logger }), loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }), loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }), loadMessages: AoSuClient.loadMessagesWith({ fetch: ctx.fetch, pageSize: 1000, logger }), @@ -450,13 +449,13 @@ export const createApis = async (ctx) => { const readCronResultsLogger = ctx.logger.child('readCronResults') const readCronResults = readCronResultsWith({ ...sharedDeps(readCronResultsLogger), - findEvaluations: AoEvaluationClient.findEvaluationsWith({ db: sqlite, logger: readCronResultsLogger }) + findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readCronResultsLogger }) }) const readResultsLogger = ctx.logger.child('readResults') const readResults = readResultsWith({ ...sharedDeps(readResultsLogger), - findEvaluations: AoEvaluationClient.findEvaluationsWith({ db: sqlite, logger: readResultsLogger }) + findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readResultsLogger }) }) let checkpointP diff --git a/servers/cu/src/domain/model.js b/servers/cu/src/domain/model.js index 4ba990abe..3271e743a 100644 --- a/servers/cu/src/domain/model.js +++ b/servers/cu/src/domain/model.js @@ -411,7 +411,15 @@ export const evaluationSchema = z.object({ evaluatedAt: z.preprocess( ( arg - ) => (typeof arg === 'string' || arg instanceof Date ? new Date(arg) : arg), + ) => { + // typeof arg === 'string' || arg instanceof Date ? new Date(arg + 0) : arg + + if (arg instanceof Date) return arg + if (typeof arg === 'string') try { arg = parseInt(arg) } catch {} + if (typeof arg === 'number') return new Date(arg) + + return arg + }, z.date() ), /** diff --git a/servers/cu/src/effects/ao-block.js b/servers/cu/src/effects/ao-block.js index ce970c726..00136a83e 100644 --- a/servers/cu/src/effects/ao-block.js +++ b/servers/cu/src/effects/ao-block.js @@ -2,10 +2,11 @@ import { fromPromise, of } from 'hyper-async' import { applySpec, last, map, path, pathSatisfies, pipe, pluck, prop, props, splitEvery } from 'ramda' import { z } from 'zod' import pMap from 'p-map' +import CircuitBreaker from 'opossum' + import { blockSchema } from '../domain/model.js' -import { BLOCKS_TABLE } from './sqlite.js' import { backoff, strFromFetchError } from '../domain/utils.js' -import CircuitBreaker from 'opossum' +import { BLOCKS_TABLE } from './db.js' const okRes = (res) => { if (res.ok) return res diff --git a/servers/cu/src/effects/ao-evaluation.js b/servers/cu/src/effects/ao-evaluation.js index e1e30f138..33bcd94c1 100644 --- a/servers/cu/src/effects/ao-evaluation.js +++ b/servers/cu/src/effects/ao-evaluation.js @@ -3,7 +3,7 @@ import { always, applySpec, isEmpty, isNotNil, converge, mergeAll, map, unapply, import { z } from 'zod' import { evaluationSchema } from '../domain/model.js' -import { EVALUATIONS_TABLE, MESSAGES_TABLE, COLLATION_SEQUENCE_MAX_CHAR } from './sqlite.js' +import { EVALUATIONS_TABLE, MESSAGES_TABLE, COLLATION_SEQUENCE_MAX_CHAR } from './db.js' const evaluationDocSchema = z.object({ id: z.string().min(1), @@ -60,8 +60,7 @@ const toEvaluation = applySpec({ const fromEvaluationDoc = pipe( evolve({ - output: JSON.parse, - evaluatedAt: (timestamp) => new Date(timestamp) + output: JSON.parse }), /** * Ensure the input matches the expected @@ -76,8 +75,8 @@ export function findEvaluationWith ({ db }) { return { sql: ` SELECT - id, processId, messageId, deepHash, nonce, epoch, timestamp, - ordinate, blockHeight, cron, evaluatedAt, output + id, "processId", "messageId", "deepHash", nonce, epoch, timestamp, + ordinate, "blockHeight", cron, "evaluatedAt", output FROM ${EVALUATIONS_TABLE} WHERE id = ?; @@ -143,7 +142,7 @@ export function saveEvaluationWith ({ db, logger: _logger }) { { sql: ` INSERT OR IGNORE INTO ${EVALUATIONS_TABLE} - (id, processId, messageId, deepHash, nonce, epoch, timestamp, ordinate, blockHeight, cron, evaluatedAt, output) + (id, "processId", "messageId", "deepHash", nonce, epoch, timestamp, ordinate, "blockHeight", cron, "evaluatedAt", output) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); `, parameters: [ @@ -170,7 +169,7 @@ export function saveEvaluationWith ({ db, logger: _logger }) { if (!evaluation.cron) { statements.push({ sql: ` - INSERT OR IGNORE INTO ${MESSAGES_TABLE} (id, processId, seq) VALUES (?, ?, ?); + INSERT OR IGNORE INTO ${MESSAGES_TABLE} (id, "processId", seq) VALUES (?, ?, ?); `, parameters: messageDocParamsSchema.parse([ createMessageId({ @@ -204,8 +203,8 @@ export function findEvaluationsWith ({ db }) { return { sql: ` SELECT - id, processId, messageId, deepHash, nonce, epoch, timestamp, - ordinate, blockHeight, cron, evaluatedAt, output + id, "processId", "messageId", "deepHash", nonce, epoch, timestamp, + ordinate, "blockHeight", cron, "evaluatedAt", output FROM ${EVALUATIONS_TABLE} WHERE id > ? AND id <= ? @@ -251,7 +250,7 @@ export function findMessageBeforeWith ({ db }) { FROM ${MESSAGES_TABLE} WHERE id = ? - AND processId = ? + AND "processId" = ? AND seq < ? LIMIT 1; `, diff --git a/servers/cu/src/effects/ao-evaluation.test.js b/servers/cu/src/effects/ao-evaluation.test.js index 950c1d5b8..d9902e5de 100644 --- a/servers/cu/src/effects/ao-evaluation.test.js +++ b/servers/cu/src/effects/ao-evaluation.test.js @@ -5,7 +5,7 @@ import assert from 'node:assert' import { createTestLogger } from '../domain/logger.js' import { findEvaluationSchema, findEvaluationsSchema, findMessageBeforeSchema, saveEvaluationSchema } from '../domain/dal.js' import { findMessageBeforeWith, findEvaluationWith, findEvaluationsWith, saveEvaluationWith } from './ao-evaluation.js' -import { COLLATION_SEQUENCE_MAX_CHAR, EVALUATIONS_TABLE, MESSAGES_TABLE } from './sqlite.js' +import { COLLATION_SEQUENCE_MAX_CHAR, EVALUATIONS_TABLE, MESSAGES_TABLE } from './db.js' const logger = createTestLogger({ name: 'ao-cu:readState' }) diff --git a/servers/cu/src/effects/ao-module.js b/servers/cu/src/effects/ao-module.js index a112c7ef7..d8600d5b5 100644 --- a/servers/cu/src/effects/ao-module.js +++ b/servers/cu/src/effects/ao-module.js @@ -6,7 +6,7 @@ import { z } from 'zod' import { arrayBufferFromMaybeView, isJsonString } from '../domain/utils.js' import { moduleSchema } from '../domain/model.js' -import { MODULES_TABLE } from './sqlite.js' +import { MODULES_TABLE } from './db.js' import { timer } from './metrics.js' const TWO_GB = 2 * 1024 * 1024 * 1024 diff --git a/servers/cu/src/effects/ao-module.test.js b/servers/cu/src/effects/ao-module.test.js index 8b02c9fac..015597369 100644 --- a/servers/cu/src/effects/ao-module.test.js +++ b/servers/cu/src/effects/ao-module.test.js @@ -12,7 +12,7 @@ import { saveModuleWith } from './ao-module.js' import { createTestLogger } from '../domain/logger.js' -import { MODULES_TABLE } from './sqlite.js' +import { MODULES_TABLE } from './db.js' const logger = createTestLogger({ name: 'ao-cu:readState' }) diff --git a/servers/cu/src/effects/ao-process.js b/servers/cu/src/effects/ao-process.js index c86c7af41..ec6a9dfad 100644 --- a/servers/cu/src/effects/ao-process.js +++ b/servers/cu/src/effects/ao-process.js @@ -11,7 +11,7 @@ import AsyncLock from 'async-lock' import { isEarlierThan, isEqualTo, isJsonString, isLaterThan, maybeParseInt, parseTags } from '../domain/utils.js' import { processSchema } from '../domain/model.js' -import { PROCESSES_TABLE, CHECKPOINTS_TABLE, CHECKPOINT_FILES_TABLE, COLLATION_SEQUENCE_MIN_CHAR } from './sqlite.js' +import { PROCESSES_TABLE, CHECKPOINTS_TABLE, CHECKPOINT_FILES_TABLE, COLLATION_SEQUENCE_MIN_CHAR } from './db.js' import { timer } from './metrics.js' const gunzipP = promisify(gunzip) @@ -519,7 +519,7 @@ export function findRecordCheckpointBeforeWith ({ db }) { SELECT * FROM ${CHECKPOINTS_TABLE} WHERE - processId = ? AND timestamp < ? + "processId" = ? AND timestamp < ? ORDER BY timestamp DESC LIMIT 5; `, @@ -573,7 +573,7 @@ export function writeCheckpointRecordWith ({ db }) { return { sql: ` INSERT OR IGNORE INTO ${CHECKPOINTS_TABLE} - (id, processId, timestamp, ordinate, cron, memory, evaluation) + (id, "processId", timestamp, ordinate, cron, memory, evaluation) VALUES (?, ?, ?, ?, ?, ?, ?) `, parameters: [ @@ -617,7 +617,7 @@ export function findFileCheckpointBeforeWith ({ db }) { sql: ` SELECT * FROM ${CHECKPOINT_FILES_TABLE} - WHERE processId = ?; + WHERE "processId" = ?; `, parameters: [processId] } @@ -679,7 +679,7 @@ export function writeFileCheckpointRecordWith ({ db }) { return { sql: ` INSERT OR REPLACE INTO ${CHECKPOINT_FILES_TABLE} - (id, processId, timestamp, ordinate, cron, file, evaluation, cachedAt) + (id, "processId", timestamp, ordinate, cron, file, evaluation, "cachedAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?) `, parameters: [ diff --git a/servers/cu/src/effects/ao-process.test.js b/servers/cu/src/effects/ao-process.test.js index e3c151708..301af9c9d 100644 --- a/servers/cu/src/effects/ao-process.test.js +++ b/servers/cu/src/effects/ao-process.test.js @@ -9,7 +9,7 @@ import bytes from 'bytes' import { createTestLogger } from '../domain/logger.js' import { findLatestProcessMemorySchema, findProcessSchema, saveLatestProcessMemorySchema, saveProcessSchema } from '../domain/dal.js' -import { PROCESSES_TABLE } from './sqlite.js' +import { PROCESSES_TABLE } from './db.js' import { LATEST, createProcessMemoryCache, findFileCheckpointBeforeWith, findLatestProcessMemoryWith, findProcessWith, saveCheckpointWith, saveLatestProcessMemoryWith, saveProcessWith } from './ao-process.js' const gzipP = promisify(gzip) diff --git a/servers/cu/src/effects/db.js b/servers/cu/src/effects/db.js new file mode 100644 index 000000000..b152f82aa --- /dev/null +++ b/servers/cu/src/effects/db.js @@ -0,0 +1,38 @@ +import * as SqliteClient from './sqlite.js' +import * as PostgresClient from './pg.js' + +/** + * Shared db primitives + * + * TODO: better ways to do this, but this is fine for now, since their use + * is encapsulated within the effects + */ + +export const [PROCESSES_TABLE, BLOCKS_TABLE, MODULES_TABLE, EVALUATIONS_TABLE, MESSAGES_TABLE, CHECKPOINTS_TABLE, CHECKPOINT_FILES_TABLE] = [ + 'processes', + 'blocks', + 'modules', + 'evaluations', + 'messages', + 'checkpoints', + 'checkpoint_files' +] + +/** + * Use a high value unicode character to terminate a range query prefix. + * This will cause only string with a given prefix to match a range query + */ +export const COLLATION_SEQUENCE_MAX_CHAR = '\u{10FFFF}' + +/** + * This technically isn't the smallest char, but it's small enough for our needs + */ +export const COLLATION_SEQUENCE_MIN_CHAR = '0' + +export async function createDbClient ({ url, ...rest }) { + if (url.startsWith('postgres://')) { + return PostgresClient.createPostgresClient({ url, ...rest }) + } + + return SqliteClient.createSqliteClient({ url: `${url}.sqlite`, ...rest }) +} diff --git a/servers/cu/src/effects/pg.js b/servers/cu/src/effects/pg.js new file mode 100644 index 000000000..88e810af1 --- /dev/null +++ b/servers/cu/src/effects/pg.js @@ -0,0 +1,178 @@ +import pg from 'pg' + +import { BLOCKS_TABLE, CHECKPOINT_FILES_TABLE, CHECKPOINTS_TABLE, EVALUATIONS_TABLE, MESSAGES_TABLE, MODULES_TABLE, PROCESSES_TABLE } from './db.js' + +const { Pool } = pg + +const rows = (res) => res.rows + +export async function createPostgresClient ({ url, bootstrap = false, ...rest }) { + const pool = new Pool({ + connectionString: url, + max: 10, + idleTimeoutMillis: 10000, + allowExitOnIdle: true, + ...rest + }) + + if (bootstrap) { + await Promise.resolve() + .then(() => createProcesses(pool)) + .then(() => createBlocks(pool)) + .then(() => createModules(pool)) + .then(() => createEvaluations(pool)) + .then(() => createMessages(pool)) + .then(() => createCheckpoints(pool)) + .then(() => createCheckpointFiles(pool)) + .then(() => createBlocksIndexes(pool)) + .then(() => createMessagesIndexes(pool)) + .then(() => createCheckpointsIndexes(pool)) + .then(() => createCheckpointFilesIndexes(pool)) + } + + return { + query: async ({ sql, parameters }) => pool.query(toOrdinals(sql), parameters).then(rows), + run: async ({ sql, parameters }) => pool.query(toOrdinals(sql), parameters).then(rows), + transaction: async (statements) => pool.connect() + .then((client) => + /** + * https://node-postgres.com/features/transactions + */ + Promise.resolve() + .then(() => client.query('BEGIN')) + .then(() => Promise.all(statements.map( + ({ sql, parameters }) => client.query(toOrdinals(sql), parameters).then(rows) + ))) + .then(() => client.query('COMMIT')) + .catch((e) => client.query('ROLLBACK').then(() => { throw e })) + .finally(() => client.release()) + ) + } +} + +const createProcesses = async (pool) => pool.query( + `CREATE TABLE IF NOT EXISTS ${PROCESSES_TABLE}( + id TEXT PRIMARY KEY, + signature TEXT, + data TEXT, + anchor TEXT, + owner TEXT, + tags TEXT, + block TEXT + );` +) + +const createBlocks = async (pool) => pool.query( + `CREATE TABLE IF NOT EXISTS ${BLOCKS_TABLE}( + id BIGINT PRIMARY KEY, + height BIGINT, + timestamp BIGINT + );` +) + +const createModules = async (pool) => pool.query( + `CREATE TABLE IF NOT EXISTS ${MODULES_TABLE}( + id TEXT PRIMARY KEY, + owner TEXT, + tags TEXT + );` +) + +const createEvaluations = async (pool) => pool.query( + `CREATE TABLE IF NOT EXISTS ${EVALUATIONS_TABLE}( + id TEXT PRIMARY KEY, + "processId" TEXT, + "messageId" TEXT, + "deepHash" TEXT, + nonce BIGINT, + epoch BIGINT, + timestamp BIGINT, + ordinate TEXT, + "blockHeight" BIGINT, + cron TEXT, + output TEXT, + "evaluatedAt" BIGINT + );` +) + +const createMessages = async (pool) => pool.query( + `CREATE TABLE IF NOT EXISTS ${MESSAGES_TABLE}( + id TEXT, + "processId" TEXT, + seq TEXT, + PRIMARY KEY (id, "processId") + );` +) + +const createCheckpoints = async (pool) => pool.query( + `CREATE TABLE IF NOT EXISTS ${CHECKPOINTS_TABLE}( + id TEXT PRIMARY KEY, + "processId" TEXT, + timestamp BIGINT, + ordinate TEXT, + cron TEXT, + memory TEXT, + evaluation TEXT + );` +) + +const createCheckpointFiles = async (pool) => pool.query( + `CREATE TABLE IF NOT EXISTS ${CHECKPOINT_FILES_TABLE}( + id TEXT PRIMARY KEY, + "processId" TEXT UNIQUE, + timestamp BIGINT, + ordinate TEXT, + cron TEXT, + file TEXT, + evaluation TEXT, + "cachedAt" BIGINT + );` +) + +const createBlocksIndexes = async (pool) => pool.query( + `CREATE INDEX IF NOT EXISTS idx_${BLOCKS_TABLE}_height_timestamp + ON ${BLOCKS_TABLE} + (height, timestamp);` +) + +const createMessagesIndexes = async (pool) => pool.query( + `CREATE INDEX IF NOT EXISTS idx_${MESSAGES_TABLE}_id_processId_seq + ON ${MESSAGES_TABLE} + (id, "processId", seq); + ` +) + +const createCheckpointsIndexes = async (pool) => pool.query( + `CREATE INDEX IF NOT EXISTS idx_${CHECKPOINTS_TABLE}_processId_timestamp + ON ${CHECKPOINTS_TABLE} + ("processId", timestamp);` +) + +const createCheckpointFilesIndexes = async (pool) => pool.query( + `CREATE INDEX IF NOT EXISTS idx_${CHECKPOINT_FILES_TABLE}_processId_timestamp + ON ${CHECKPOINTS_TABLE} + ("processId", timestamp);` +) + +/** + * HACK to convert SQLite queries to postgres queries: + * 1. replace all parameters like '?' with the ordinal parameter like '$1' + * 2. replace INSERT AND IGNORE with ON CONFLICT DO NOTHING + * + * This is to circumvent the diverging sql dialects and different + * parameterization supported by different clients. + * + * We could eventually use a query builder ie. knex or kysely, + * that translates multiple dialects, but this is less changes, simpler, + * and less deps. So we'll do this for now. + */ +function toOrdinals (sql) { + let count = 0 + sql = sql.trim() + if (sql.startsWith('INSERT')) { + sql = sql.replace('INSERT OR IGNORE', 'INSERT') + if (sql.endsWith(';')) sql = sql.slice(0, -1) + sql += ' ON CONFLICT DO NOTHING;' + } + return sql.replace(/\?/g, () => `$${++count}`) +} diff --git a/servers/cu/src/effects/sqlite.js b/servers/cu/src/effects/sqlite.js index d15e3c140..95f968afb 100644 --- a/servers/cu/src/effects/sqlite.js +++ b/servers/cu/src/effects/sqlite.js @@ -3,15 +3,7 @@ import { stat } from 'node:fs' import Database from 'better-sqlite3' import bytes from 'bytes' -export const [PROCESSES_TABLE, BLOCKS_TABLE, MODULES_TABLE, EVALUATIONS_TABLE, MESSAGES_TABLE, CHECKPOINTS_TABLE, CHECKPOINT_FILES_TABLE] = [ - 'processes', - 'blocks', - 'modules', - 'evaluations', - 'messages', - 'checkpoints', - 'checkpoint_files' -] +import { BLOCKS_TABLE, CHECKPOINT_FILES_TABLE, CHECKPOINTS_TABLE, EVALUATIONS_TABLE, MESSAGES_TABLE, MODULES_TABLE, PROCESSES_TABLE } from './db.js' const createProcesses = async (db) => db.prepare( `CREATE TABLE IF NOT EXISTS ${PROCESSES_TABLE}( diff --git a/servers/cu/src/effects/worker/evaluator/main.js b/servers/cu/src/effects/worker/evaluator/main.js index 92064f5b9..69fec8d48 100644 --- a/servers/cu/src/effects/worker/evaluator/main.js +++ b/servers/cu/src/effects/worker/evaluator/main.js @@ -1,11 +1,11 @@ import * as WasmClient from '../../wasm.js' import * as AoEvaluationClient from '../../ao-evaluation.js' -import * as SqliteClient from '../../sqlite.js' +import * as DbClient from '../../db.js' import { evaluateWith } from '../evaluate.js' export const createApis = async (ctx) => { - const sqlite = await SqliteClient.createSqliteClient({ url: ctx.DB_URL, bootstrap: false }) + const db = await DbClient.createDbClient({ url: ctx.DB_URL, bootstrap: false, max: 5 }) const wasmInstanceCache = WasmClient.createWasmInstanceCache({ MAX_SIZE: ctx.WASM_INSTANCE_CACHE_MAX_SIZE }) const close = async (streamId) => wasmInstanceCache.delete(streamId) @@ -18,7 +18,7 @@ export const createApis = async (ctx) => { CHECKPOINT_GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL }), bootstrapWasmInstance: WasmClient.bootstrapWasmInstanceWith(), - saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db: sqlite, logger: ctx.logger }), + saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger: ctx.logger }), ARWEAVE_URL: ctx.ARWEAVE_URL, logger: ctx.logger })