From 503eaaf5da182051e5d6fa9b2c8ec6a74153e557 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Sun, 7 Sep 2025 23:16:24 -0500 Subject: [PATCH 1/7] early draft of connector cli program --- defra-connector/cli.js | 294 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 defra-connector/cli.js diff --git a/defra-connector/cli.js b/defra-connector/cli.js new file mode 100644 index 0000000..3c2c203 --- /dev/null +++ b/defra-connector/cli.js @@ -0,0 +1,294 @@ +"use strict"; + +process.on("uncaughtException",function(err){ + console.log(err.stack); +}); + +var util = require("util"); +var fs = require("fs/promises"); + +var { MongoClient, } = require("mongodb"); +// var { createClient: GQLWSClient, } = require("graphql-ws"); +var { createClient: GQLSSEClient, } = require("graphql-sse"); +// var ws = require("ws"); +var fetch = require("node-fetch"); + +var { + logError, + ...localDefra +} = require("./defra-client.js"); + +const MONGO_ENDPOINT = "mongodb://127.0.0.1:27017/?replicaSet=rs0"; +const LOG_MONGO = false; + +// const DEFRA_WS_ENDPOINT = "ws://127.0.0.1:9181/api/v0/graphql"; +const DEFRA_SSE_ENDPOINT = "http://127.0.0.1:9181/api/v0/graphql"; +const LOG_DEFRA = false; + +var mClient; +var mEvents; +var mResumeToken; +var mStartToken; + +var dConnSeq = 0; +var dClient; +var dEvents; +var dLogFile; + + +main().catch(console.error); + +// ************************************* + +async function main() { + localDefra.init(); + + try { + // NOTE: just temporary tests + var res = await localDefra.has("foo"); + console.log("has foo",res); + + var res = await localDefra.set("foo",{ bar: 1 }); + console.log("set",res); + + var res = await localDefra.set("foo",{ bar: 2 }); + console.log("set",res); + + var res = await localDefra.get("foo"); + console.log("get",res); + + var res = await localDefra.remove("foo"); + console.log("remove",res); + } + catch (err) { + logError(err); + } + + + // await Promise.all([ + // listenMongo(), + // listenDefra(), + // ]); +} + +async function listenMongo() { + mClient = new MongoClient(MONGO_ENDPOINT,{ + directConnection: true, + ...( + LOG_MONGO ? { + mongodbLogComponentSeverities: { + default: "debug", + }, + mongodbLogPath: { + file: await fs.open(`./mongo-${+new Date()}.log`,"w"), + async write(log) { + try { + await this.file.appendFile(util.inspect(log) + "\n"); + } + catch (fileError) { + this.file = null; + console.error("MONGO LOGGING FAILED:",String(fileError)); + } + } + }, + } : null + ), + }); + + await mClient.connect(); + + console.log("MongoDB connected."); + + return startMongoWatcher(); +} + +async function startMongoWatcher() { + console.log("Listening for MongoDB change events..."); + + // filter if you want; otherwise omit pipeline + // const pipeline = [ + // { $match: { operationType: { $in: ["insert","update","replace","delete","drop","rename","invalidate"] } } } + // ]; + var pipeline = []; + + mEvents = mClient.watch(pipeline,{ + fullDocument: "updateLookup", + ...(mResumeToken != null ? { resumeAfter: mResumeToken, } : null), + ...(mStartToken != null ? { startAfter: mStartToken, } : null), + }); + + mEvents.on("change",mOnChange); + mEvents.on("error",mOnError); + mEvents.on("close",mOnClose); + + // for await (const ev of mEvents) { + // console.log(ev); + // } +} + +function mTeardownEvents() { + if (mEvents != null) { + mEvents.off("change",mOnChange); + mEvents.off("error",mOnError); + mEvents.off("close",mOnClose); + mEvents = null; + } +} + +function mOnChange(data) { + if (data.operationType == "invalidate") { + mStartToken = data._id; + mResumeToken = null; + console.log("MONGO INVALIDATE:",util.inspect(data,{depth:10})); + mTeardownEvents(); + return startMongoWatcher(); + } + else { + mStartToken = null; + mResumeToken = data._id; + console.log("MONGO CHANGE:",util.inspect(data,{depth:10})); + } +} + +function mOnError(err) { + console.error("MONGO WATCH ERROR:",String(err)); +} + +function mOnClose() { + console.log("MONGO CLOSE"); + mTeardownEvents(); + setTimeout(() => { + listenMongo().catch(console.error); + },250); +} + +async function listenDefra() { + // var dClient = GQLWSClient({ + // url: DEFRA_WS_ENDPOINT, + // webSocketImpl: ws, + // lazy: true, + // retryAttempts: Infinity, + // connectionParams: { + // "Content-Type": "application/json", + // "Accept": "text/event-stream", + // "Cache-Control": "no-cache", + // }, + // }); + + if (LOG_DEFRA && dLogFile == null) { + dLogFile = await fs.open(`./defra-${+new Date()}.log`,"w") + } + + dClient = GQLSSEClient({ + url: DEFRA_SSE_ENDPOINT, + fetchFn: fetch, + lazy: true, + retryAttempts: Infinity, + // singleConnection: true, + headers: { + "Content-Type": "application/json", + "Accept": "text/event-stream", + "Cache-Control": "no-cache", + }, + ...( + LOG_DEFRA && dLogFile != null ? + { + onMessage(msg) { + return dLogFile.appendFile(util.inspect(msg,{depth:10}) + "\n"); + }, + async fetchFn(url,opts) { + var id = ++dConnSeq; + await dLogFile.appendFile(`[DEFRA CONNECT ${id}] ${opts?.method || "GET"}: ${url}\n`); + var fetchRes = fetch(url,opts); + try { + return await fetchRes; + } + catch (err) { + await dLogFile.appendFile(`[DEFRA CONNECT ${id}] ERROR: ${String(err)}\n`); + console.error(); + return fetchRes; + } + }, + } : + + { + fetchFn: fetch, + } + ), + }); + + console.log("DefraDB Connected."); + + return startDefraListener(); +} + +async function startDefraListener() { + console.log("Listening for DefraDB updates..."); + + dEvents = dClient.subscribe( + { + // not currently supported: + // + // query: ` + // subscription { + // Commit { + // CID + // DocID + // CollectionID + // Delta + // } + // } + // `, + query: ` + subscription { + User { + _docID + name + email + counter + } + } + `, + }, + { + next: dOnNext, + error: dOnError, + complete: dOnComplete, + } + ); + // "dEvents.return()" to dispose + + + // var dEvents = dClient.iterate({ + // query: ` + // subscription { + // User { + // _docID + // name + // email + // counter + // } + // } + // `, + // }); + // for await (let result of dEvents) { + // // next = result = { data: { greetings: 5x } } + // console.log("DEFRA:",result); + // // "break" to dispose + // } +} + +function dOnNext(evt) { + console.log("DEFRA UPDATE:",util.inspect(evt.data,{depth:10})); +} + +function dOnError(err) { + console.error("DEFRA WATCH ERROR:",String(err)); +} + +function dOnComplete() { + console.log("DEFRA SUBSCRIPTION CLOSED"); + setTimeout(() => { + listenDefra().catch(console.error); + },250); +} From 95a6599427190c8bd40ec29a69e1984301a49eab Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Sun, 7 Sep 2025 23:18:40 -0500 Subject: [PATCH 2/7] early draft of client module for defra-kv client module that negotiates communications with side-car `defra-kv` tool for persisting KV info --- defra-connector/defra-client.js | 454 ++++++++++++++++++++++++++++++++ 1 file changed, 454 insertions(+) create mode 100644 defra-connector/defra-client.js diff --git a/defra-connector/defra-client.js b/defra-connector/defra-client.js new file mode 100644 index 0000000..17fa7a4 --- /dev/null +++ b/defra-connector/defra-client.js @@ -0,0 +1,454 @@ +"use strict"; + +var util = require("util"); +var { spawn } = require("child_process"); +var path = require("path"); + +var transport = null; +var queue = Promise.resolve(); + +const DEFAULT_TIMEOUT_MS = 10000; + + +module.exports = { + execQuery, + close, + + has: hasKV, + get: getKV, + set: setKV, + remove: removeKV, + + logError, + + // not using WASM transport yet + // init: initWASM, + init: initCLI, +}; + + +// ************************************** + +function execQuery(query,vars,timeoutMs) { + if (!transport) { + // not using WASM transport yet + // initWASM(); + initCLI(); + } + if (typeof query != "string" || query == "") { + return Promise.reject(new Error("Query must be a non-empty string")); + } + + var run = () => transport.execute({ query, vars, timeoutMs, }); + + // keep the chain alive even after failures + var p = queue.then(run,run); + queue = p.catch(() => {}); + return p; +} + +async function hasKV(key) { + // var res = await execQuery( + // `query hasKV($key:String!) { + // KV(filter: { key: { _eq:$key } }) + // { _docID } + // }`, + // { key, } + // ); + // if (res && res.KV) { + // return res.KV.some(entry => entry.key == key); + // } + // return false; + + return transport.execute({ has: key, }); +} + +async function getKV(key) { + // var res = await execQuery( + // `query getKV($key:String!) { + // KV(filter: { key: { _eq: $key } }) + // { key value } + // }`, + // { key, } + // ); + // if (res && res.KV) { + // return res.KV.find(entry => entry.key == key); + // } + + return transport.execute({ get: key, }); +} + +async function setKV(key,value) { + var now = new Date().toISOString(); + + // Note: this style (passing in `value` as an external variable) + // does not currently work, due to a bug in defra + // + // await execQuery( + // `mutation setKV($key:String!,$value:JSON!,$now:DateTime!) { + // upsert_KV( + // filter: { key: { _eq: $key } } + // create: { key: $key, value: $value, updatedAt: $now } + // update: { value: $value, updatedAt: $now } + // ) + // { _docID } + // }`, + // { key, value, now, } + // ); + + // var valueLiteral = toGraphQLLiteral(value); + // var res = await execQuery( + // `mutation setKV($key:String!,$now:DateTime!) { + // upsert_KV( + // filter: { key: { _eq: $key } } + // create: { key: $key, value: ${valueLiteral}, updatedAt: $now } + // update: { value: ${valueLiteral}, updatedAt: $now } + // ) + // { _docID } + // }`, + // { key, now, } + // ); + + // return ( + // res && + // Array.isArray(res.upsert_KV) && + // res.upsert_KV.length > 0 + // ); + + return transport.execute({ set: { key, value, }, }); +} + +async function removeKV(key) { + // var res = await execQuery( + // `mutation removeKV($key:String!) { + // delete_KV( + // filter: { key: { _eq: $key } } + // ) + // { _docID } + // }`, + // { key, } + // ); + + // return ( + // res && + // Array.isArray(res.delete_KV) && + // res.delete_KV.length > 0 + // ); + + return transport.execute({ remove: key, }); +} + +function close() { + if (!transport || typeof transport.close !== "function") { + return Promise.resolve(); + } + return transport.close(); +} + + +// ************************************** + +// current (hack) +function initCLI() { + var bin = path.resolve(process.cwd(),"defra-kv"); + var dataConfigDir = path.resolve(process.cwd(),".defra-kv"); + var keyringSecret = "dev-dev-dev"; + var dev = false; + + transport = { + execute({ query, vars, has, get, set, remove, timeoutMs, } = {}) { + return execCLI({ + bin, + dataConfigDir, + keyringSecret, + dev, + query, + vars, + has, + get, + set, + remove, + timeoutMs, + }) + }, + + close: async () => {}, + }; +} + +function execCLI({ + bin, + dataConfigDir, + keyringSecret = process.env.DEFRA_KEYRING_SECRET, + dev, + query, + vars, + has, + get, + set, + remove, + timeoutMs, +} = {}) { + var tmo = Number.isFinite(timeoutMs) ? timeoutMs : DEFAULT_TIMEOUT_MS; + + var args = [ + "-dir", dataConfigDir, + "-timeout", `${Math.max(1,Math.ceil(tmo / 1000))}s`, + "-pretty=false", + ]; + if (dev) { + args.push("-dev"); + } + // query/vars takes precedence over KV actions + if (typeof query == "string" && query != "") { + if (vars != null) { + args.push("-vars",JSON.stringify(vars)); + } + } + else if (typeof has == "string" && has != "") { + args.push("-has",has); + } + else if (typeof get == "string" && get != "") { + args.push("-get",get); + } + else if (set && typeof set.key == "string" && set.key != "") { + args.push("-set",set.key); + } + else if (typeof remove == "string" && remove != "") { + args.push("-remove",remove); + } + + var env = { + ...process.env, + ...( + !!keyringSecret ? + { DEFRA_KEYRING_SECRET: keyringSecret, } : + null + ), + }; + + return new Promise((resolve,reject) => { + var child = spawn( + bin, + args, + { + stdio: ["pipe","pipe","pipe",], + cwd: process.cwd(), + env, + } + ); + + var stdout = ""; + var stderr = ""; + var timedOut = false; + + var timer = setTimeout( + () => { + timedOut = true; + child.kill("SIGKILL"); + }, + + // cushion beyond CLI's own -timeout + tmo + 1000 + ); + + child.stdout.on("data",d => { stdout += d.toString("utf8"); }); + child.stderr.on("data",d => { stderr += d.toString("utf8"); }); + child.on("error", (err) => { + clearTimeout(timer); + reject(new Error(`Failed to spawn defra-kv: ${err.message}`)); + }); + + child.on("close",code => { + clearTimeout(timer); + if (timedOut) return reject(new Error(`defra-kv timed out after ${tmo}ms`)); + + // explicit exit-code handling for KV actions? + if ( + args.includes("-has") || + args.includes("-get") || + args.includes("-set") || + args.includes("-remove") + ) { + if (code === 0) { + if (args.includes("-get")) { + try { + resolve(JSON.parse(stdout)); + } + catch (err) { + reject( + new Error( + `Could not parse defra-kv output: ${stdout}`, + { + cause: err, + } + ) + ); + } + } + else { + return resolve(true); + } + } + else if (code == 3) { + return resolve(false); + } + else { + return reject( + new Error(`defra-kv exited with code: ${code}`) + ); + } + } + else if (code !== 0) { + // stderr may contain JSON array of errors or plain text + return reject( + new GraphQLError( + `defra-kv exited with code: ${code}`, + { stdout, stderr, } + ) + ); + } + else { + try { + var parsed = JSON.parse(stdout); // {"data": ...} + resolve(parsed.data); + } + catch (err) { + reject( + new GraphQLError( + "Could not parse defra-kv output", + { + stdout, + cause: err, + } + ) + ); + } + } + }); + + // feed query/set via stdin to avoid shell quoting issues + if (query || (set && set.value)) { + child.stdin.write(query || JSON.stringify(set.value)); + } + child.stdin.end(); + }); +} + +// (future) +function initWASM({ instance, DEFAULT_TIMEOUT_MS = 10000 } = {}) { + if (!instance || typeof instance.exec !== "function") { + throw new Error("initWASM requires an instance exposing exec(query, vars, {timeoutMs})"); + } + transport = { + async execute({ query, vars, timeoutMs, } = {}) { + var tmo = Number.isFinite(timeoutMs) ? timeoutMs : DEFAULT_TIMEOUT_MS; + // expect instance.exec to throw on errors, return {"data": ...} or just the data + var out = await instance.exec( + query, + vars, + { timeoutMs: tmo, } + ); + // normalize to "data" like the CLI + return out && out.data !== undefined ? out.data : out; + }, + async close() { + if (typeof instance.close == "function") { + await instance.close(); + } + }, + }; + return state; +} + +function logError(err) { + if (err.errors && err.errors.length > 0) { + console.error(util.inspect(err.errors,{depth:10})); + } + else if (err.stderr) { + console.error(err.stderr); + } + else { + console.error(String(err)); + } +} + +class GraphQLError extends Error { + constructor( + message, + { + errors = [], + stdout = "", + stderr = "", + cause, + } = {} + ) { + super(message,cause ? { cause, } : undefined); + + this.name = "GraphQLError"; + + if (stderr != "") { + try { + // TODO: remove temporary hack to skip the sonic warning + stderr = stderr.replace( + "WARNING:(ast) sonic only supports go1.17~1.23, but your environment is not suitable\n", + "" + ); + + stderr = JSON.parse(stderr); + if (Array.isArray(stderr)) { + this.errors = stderr; + stderr = ""; + } + } + catch (err) {} + } + else { + this.errors = errors; + } + if (stdout) { + this.stdout = stdout; + } + if (stderr) { + this.stderr = stderr; + } + + if (Error.captureStackTrace) { + Error.captureStackTrace(this,GraphQLError); + } + } +} + +// Convert a JS value into a GraphQL input literal (strings quoted, keys unquoted) +// +// Note: temporary hack, only needed because can't send JSON/object value +// into defra as external variable currently, have to inline them into +// GraphQL query +function toGraphQLLiteral(v) { + if (v == null) return "null"; + var t = typeof v; + + if (t == "string") return JSON.stringify(v); // "..." + if (t == "number") { + if (!Number.isFinite(v)) throw new Error("Non-finite number not allowed"); + return String(v); + } + if (t == "boolean") return v ? "true" : "false"; + if (v instanceof Date) return JSON.stringify(v.toISOString()); + if (Array.isArray(v)) return `[${v.map(toGraphQLLiteral).join(", ")}]`; + + if (t == "object") { + let fields = Object.entries(v) + .filter(([, val ]) => val != null) + .map(([ k, val ]) => { + if (!/^[_A-Za-z][_0-9A-Za-z]*$/.test(k)) { + throw new Error(`Key "${k}" is not a valid GraphQL name; can't inline`); + } + return `${k}: ${toGraphQLLiteral(val)}`; + }) + .join(", "); + return `{${fields}}`; + } + + throw new Error(`Unsupported type: ${t}`); +} From 78f986119873c3c645bc60bacee0b83e31f57f55 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Tue, 9 Sep 2025 15:20:18 -0500 Subject: [PATCH 3/7] Delete defra-connector/defra-client.js --- defra-connector/defra-client.js | 454 -------------------------------- 1 file changed, 454 deletions(-) delete mode 100644 defra-connector/defra-client.js diff --git a/defra-connector/defra-client.js b/defra-connector/defra-client.js deleted file mode 100644 index 17fa7a4..0000000 --- a/defra-connector/defra-client.js +++ /dev/null @@ -1,454 +0,0 @@ -"use strict"; - -var util = require("util"); -var { spawn } = require("child_process"); -var path = require("path"); - -var transport = null; -var queue = Promise.resolve(); - -const DEFAULT_TIMEOUT_MS = 10000; - - -module.exports = { - execQuery, - close, - - has: hasKV, - get: getKV, - set: setKV, - remove: removeKV, - - logError, - - // not using WASM transport yet - // init: initWASM, - init: initCLI, -}; - - -// ************************************** - -function execQuery(query,vars,timeoutMs) { - if (!transport) { - // not using WASM transport yet - // initWASM(); - initCLI(); - } - if (typeof query != "string" || query == "") { - return Promise.reject(new Error("Query must be a non-empty string")); - } - - var run = () => transport.execute({ query, vars, timeoutMs, }); - - // keep the chain alive even after failures - var p = queue.then(run,run); - queue = p.catch(() => {}); - return p; -} - -async function hasKV(key) { - // var res = await execQuery( - // `query hasKV($key:String!) { - // KV(filter: { key: { _eq:$key } }) - // { _docID } - // }`, - // { key, } - // ); - // if (res && res.KV) { - // return res.KV.some(entry => entry.key == key); - // } - // return false; - - return transport.execute({ has: key, }); -} - -async function getKV(key) { - // var res = await execQuery( - // `query getKV($key:String!) { - // KV(filter: { key: { _eq: $key } }) - // { key value } - // }`, - // { key, } - // ); - // if (res && res.KV) { - // return res.KV.find(entry => entry.key == key); - // } - - return transport.execute({ get: key, }); -} - -async function setKV(key,value) { - var now = new Date().toISOString(); - - // Note: this style (passing in `value` as an external variable) - // does not currently work, due to a bug in defra - // - // await execQuery( - // `mutation setKV($key:String!,$value:JSON!,$now:DateTime!) { - // upsert_KV( - // filter: { key: { _eq: $key } } - // create: { key: $key, value: $value, updatedAt: $now } - // update: { value: $value, updatedAt: $now } - // ) - // { _docID } - // }`, - // { key, value, now, } - // ); - - // var valueLiteral = toGraphQLLiteral(value); - // var res = await execQuery( - // `mutation setKV($key:String!,$now:DateTime!) { - // upsert_KV( - // filter: { key: { _eq: $key } } - // create: { key: $key, value: ${valueLiteral}, updatedAt: $now } - // update: { value: ${valueLiteral}, updatedAt: $now } - // ) - // { _docID } - // }`, - // { key, now, } - // ); - - // return ( - // res && - // Array.isArray(res.upsert_KV) && - // res.upsert_KV.length > 0 - // ); - - return transport.execute({ set: { key, value, }, }); -} - -async function removeKV(key) { - // var res = await execQuery( - // `mutation removeKV($key:String!) { - // delete_KV( - // filter: { key: { _eq: $key } } - // ) - // { _docID } - // }`, - // { key, } - // ); - - // return ( - // res && - // Array.isArray(res.delete_KV) && - // res.delete_KV.length > 0 - // ); - - return transport.execute({ remove: key, }); -} - -function close() { - if (!transport || typeof transport.close !== "function") { - return Promise.resolve(); - } - return transport.close(); -} - - -// ************************************** - -// current (hack) -function initCLI() { - var bin = path.resolve(process.cwd(),"defra-kv"); - var dataConfigDir = path.resolve(process.cwd(),".defra-kv"); - var keyringSecret = "dev-dev-dev"; - var dev = false; - - transport = { - execute({ query, vars, has, get, set, remove, timeoutMs, } = {}) { - return execCLI({ - bin, - dataConfigDir, - keyringSecret, - dev, - query, - vars, - has, - get, - set, - remove, - timeoutMs, - }) - }, - - close: async () => {}, - }; -} - -function execCLI({ - bin, - dataConfigDir, - keyringSecret = process.env.DEFRA_KEYRING_SECRET, - dev, - query, - vars, - has, - get, - set, - remove, - timeoutMs, -} = {}) { - var tmo = Number.isFinite(timeoutMs) ? timeoutMs : DEFAULT_TIMEOUT_MS; - - var args = [ - "-dir", dataConfigDir, - "-timeout", `${Math.max(1,Math.ceil(tmo / 1000))}s`, - "-pretty=false", - ]; - if (dev) { - args.push("-dev"); - } - // query/vars takes precedence over KV actions - if (typeof query == "string" && query != "") { - if (vars != null) { - args.push("-vars",JSON.stringify(vars)); - } - } - else if (typeof has == "string" && has != "") { - args.push("-has",has); - } - else if (typeof get == "string" && get != "") { - args.push("-get",get); - } - else if (set && typeof set.key == "string" && set.key != "") { - args.push("-set",set.key); - } - else if (typeof remove == "string" && remove != "") { - args.push("-remove",remove); - } - - var env = { - ...process.env, - ...( - !!keyringSecret ? - { DEFRA_KEYRING_SECRET: keyringSecret, } : - null - ), - }; - - return new Promise((resolve,reject) => { - var child = spawn( - bin, - args, - { - stdio: ["pipe","pipe","pipe",], - cwd: process.cwd(), - env, - } - ); - - var stdout = ""; - var stderr = ""; - var timedOut = false; - - var timer = setTimeout( - () => { - timedOut = true; - child.kill("SIGKILL"); - }, - - // cushion beyond CLI's own -timeout - tmo + 1000 - ); - - child.stdout.on("data",d => { stdout += d.toString("utf8"); }); - child.stderr.on("data",d => { stderr += d.toString("utf8"); }); - child.on("error", (err) => { - clearTimeout(timer); - reject(new Error(`Failed to spawn defra-kv: ${err.message}`)); - }); - - child.on("close",code => { - clearTimeout(timer); - if (timedOut) return reject(new Error(`defra-kv timed out after ${tmo}ms`)); - - // explicit exit-code handling for KV actions? - if ( - args.includes("-has") || - args.includes("-get") || - args.includes("-set") || - args.includes("-remove") - ) { - if (code === 0) { - if (args.includes("-get")) { - try { - resolve(JSON.parse(stdout)); - } - catch (err) { - reject( - new Error( - `Could not parse defra-kv output: ${stdout}`, - { - cause: err, - } - ) - ); - } - } - else { - return resolve(true); - } - } - else if (code == 3) { - return resolve(false); - } - else { - return reject( - new Error(`defra-kv exited with code: ${code}`) - ); - } - } - else if (code !== 0) { - // stderr may contain JSON array of errors or plain text - return reject( - new GraphQLError( - `defra-kv exited with code: ${code}`, - { stdout, stderr, } - ) - ); - } - else { - try { - var parsed = JSON.parse(stdout); // {"data": ...} - resolve(parsed.data); - } - catch (err) { - reject( - new GraphQLError( - "Could not parse defra-kv output", - { - stdout, - cause: err, - } - ) - ); - } - } - }); - - // feed query/set via stdin to avoid shell quoting issues - if (query || (set && set.value)) { - child.stdin.write(query || JSON.stringify(set.value)); - } - child.stdin.end(); - }); -} - -// (future) -function initWASM({ instance, DEFAULT_TIMEOUT_MS = 10000 } = {}) { - if (!instance || typeof instance.exec !== "function") { - throw new Error("initWASM requires an instance exposing exec(query, vars, {timeoutMs})"); - } - transport = { - async execute({ query, vars, timeoutMs, } = {}) { - var tmo = Number.isFinite(timeoutMs) ? timeoutMs : DEFAULT_TIMEOUT_MS; - // expect instance.exec to throw on errors, return {"data": ...} or just the data - var out = await instance.exec( - query, - vars, - { timeoutMs: tmo, } - ); - // normalize to "data" like the CLI - return out && out.data !== undefined ? out.data : out; - }, - async close() { - if (typeof instance.close == "function") { - await instance.close(); - } - }, - }; - return state; -} - -function logError(err) { - if (err.errors && err.errors.length > 0) { - console.error(util.inspect(err.errors,{depth:10})); - } - else if (err.stderr) { - console.error(err.stderr); - } - else { - console.error(String(err)); - } -} - -class GraphQLError extends Error { - constructor( - message, - { - errors = [], - stdout = "", - stderr = "", - cause, - } = {} - ) { - super(message,cause ? { cause, } : undefined); - - this.name = "GraphQLError"; - - if (stderr != "") { - try { - // TODO: remove temporary hack to skip the sonic warning - stderr = stderr.replace( - "WARNING:(ast) sonic only supports go1.17~1.23, but your environment is not suitable\n", - "" - ); - - stderr = JSON.parse(stderr); - if (Array.isArray(stderr)) { - this.errors = stderr; - stderr = ""; - } - } - catch (err) {} - } - else { - this.errors = errors; - } - if (stdout) { - this.stdout = stdout; - } - if (stderr) { - this.stderr = stderr; - } - - if (Error.captureStackTrace) { - Error.captureStackTrace(this,GraphQLError); - } - } -} - -// Convert a JS value into a GraphQL input literal (strings quoted, keys unquoted) -// -// Note: temporary hack, only needed because can't send JSON/object value -// into defra as external variable currently, have to inline them into -// GraphQL query -function toGraphQLLiteral(v) { - if (v == null) return "null"; - var t = typeof v; - - if (t == "string") return JSON.stringify(v); // "..." - if (t == "number") { - if (!Number.isFinite(v)) throw new Error("Non-finite number not allowed"); - return String(v); - } - if (t == "boolean") return v ? "true" : "false"; - if (v instanceof Date) return JSON.stringify(v.toISOString()); - if (Array.isArray(v)) return `[${v.map(toGraphQLLiteral).join(", ")}]`; - - if (t == "object") { - let fields = Object.entries(v) - .filter(([, val ]) => val != null) - .map(([ k, val ]) => { - if (!/^[_A-Za-z][_0-9A-Za-z]*$/.test(k)) { - throw new Error(`Key "${k}" is not a valid GraphQL name; can't inline`); - } - return `${k}: ${toGraphQLLiteral(val)}`; - }) - .join(", "); - return `{${fields}}`; - } - - throw new Error(`Unsupported type: ${t}`); -} From db8f56f9114f027719276d08293c802b3042bb60 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Tue, 9 Sep 2025 15:21:44 -0500 Subject: [PATCH 4/7] renaming "defra-client" module to "defra-kv" --- defra-connector/cli.js | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/defra-connector/cli.js b/defra-connector/cli.js index 3c2c203..c5d484c 100644 --- a/defra-connector/cli.js +++ b/defra-connector/cli.js @@ -15,8 +15,8 @@ var fetch = require("node-fetch"); var { logError, - ...localDefra -} = require("./defra-client.js"); + ...defraKV +} = require("./defra-kv.js"); const MONGO_ENDPOINT = "mongodb://127.0.0.1:27017/?replicaSet=rs0"; const LOG_MONGO = false; @@ -41,23 +41,22 @@ main().catch(console.error); // ************************************* async function main() { - localDefra.init(); + defraKV.init(); try { - // NOTE: just temporary tests - var res = await localDefra.has("foo"); + var res = await defraKV.has("foo"); console.log("has foo",res); - var res = await localDefra.set("foo",{ bar: 1 }); + var res = await defraKV.set("foo",{ bar: 1 }); console.log("set",res); - var res = await localDefra.set("foo",{ bar: 2 }); + var res = await defraKV.set("foo",{ bar: 2 }); console.log("set",res); - var res = await localDefra.get("foo"); + var res = await defraKV.get("foo"); console.log("get",res); - var res = await localDefra.remove("foo"); + var res = await defraKV.remove("foo"); console.log("remove",res); } catch (err) { From fa9c5abfc94da50e706fd8ae37f399325700f3ac Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Tue, 9 Sep 2025 15:22:44 -0500 Subject: [PATCH 5/7] Add files via upload --- defra-connector/defra-kv.js | 454 ++++++++++++++++++++++++++++++++++++ 1 file changed, 454 insertions(+) create mode 100644 defra-connector/defra-kv.js diff --git a/defra-connector/defra-kv.js b/defra-connector/defra-kv.js new file mode 100644 index 0000000..17fa7a4 --- /dev/null +++ b/defra-connector/defra-kv.js @@ -0,0 +1,454 @@ +"use strict"; + +var util = require("util"); +var { spawn } = require("child_process"); +var path = require("path"); + +var transport = null; +var queue = Promise.resolve(); + +const DEFAULT_TIMEOUT_MS = 10000; + + +module.exports = { + execQuery, + close, + + has: hasKV, + get: getKV, + set: setKV, + remove: removeKV, + + logError, + + // not using WASM transport yet + // init: initWASM, + init: initCLI, +}; + + +// ************************************** + +function execQuery(query,vars,timeoutMs) { + if (!transport) { + // not using WASM transport yet + // initWASM(); + initCLI(); + } + if (typeof query != "string" || query == "") { + return Promise.reject(new Error("Query must be a non-empty string")); + } + + var run = () => transport.execute({ query, vars, timeoutMs, }); + + // keep the chain alive even after failures + var p = queue.then(run,run); + queue = p.catch(() => {}); + return p; +} + +async function hasKV(key) { + // var res = await execQuery( + // `query hasKV($key:String!) { + // KV(filter: { key: { _eq:$key } }) + // { _docID } + // }`, + // { key, } + // ); + // if (res && res.KV) { + // return res.KV.some(entry => entry.key == key); + // } + // return false; + + return transport.execute({ has: key, }); +} + +async function getKV(key) { + // var res = await execQuery( + // `query getKV($key:String!) { + // KV(filter: { key: { _eq: $key } }) + // { key value } + // }`, + // { key, } + // ); + // if (res && res.KV) { + // return res.KV.find(entry => entry.key == key); + // } + + return transport.execute({ get: key, }); +} + +async function setKV(key,value) { + var now = new Date().toISOString(); + + // Note: this style (passing in `value` as an external variable) + // does not currently work, due to a bug in defra + // + // await execQuery( + // `mutation setKV($key:String!,$value:JSON!,$now:DateTime!) { + // upsert_KV( + // filter: { key: { _eq: $key } } + // create: { key: $key, value: $value, updatedAt: $now } + // update: { value: $value, updatedAt: $now } + // ) + // { _docID } + // }`, + // { key, value, now, } + // ); + + // var valueLiteral = toGraphQLLiteral(value); + // var res = await execQuery( + // `mutation setKV($key:String!,$now:DateTime!) { + // upsert_KV( + // filter: { key: { _eq: $key } } + // create: { key: $key, value: ${valueLiteral}, updatedAt: $now } + // update: { value: ${valueLiteral}, updatedAt: $now } + // ) + // { _docID } + // }`, + // { key, now, } + // ); + + // return ( + // res && + // Array.isArray(res.upsert_KV) && + // res.upsert_KV.length > 0 + // ); + + return transport.execute({ set: { key, value, }, }); +} + +async function removeKV(key) { + // var res = await execQuery( + // `mutation removeKV($key:String!) { + // delete_KV( + // filter: { key: { _eq: $key } } + // ) + // { _docID } + // }`, + // { key, } + // ); + + // return ( + // res && + // Array.isArray(res.delete_KV) && + // res.delete_KV.length > 0 + // ); + + return transport.execute({ remove: key, }); +} + +function close() { + if (!transport || typeof transport.close !== "function") { + return Promise.resolve(); + } + return transport.close(); +} + + +// ************************************** + +// current (hack) +function initCLI() { + var bin = path.resolve(process.cwd(),"defra-kv"); + var dataConfigDir = path.resolve(process.cwd(),".defra-kv"); + var keyringSecret = "dev-dev-dev"; + var dev = false; + + transport = { + execute({ query, vars, has, get, set, remove, timeoutMs, } = {}) { + return execCLI({ + bin, + dataConfigDir, + keyringSecret, + dev, + query, + vars, + has, + get, + set, + remove, + timeoutMs, + }) + }, + + close: async () => {}, + }; +} + +function execCLI({ + bin, + dataConfigDir, + keyringSecret = process.env.DEFRA_KEYRING_SECRET, + dev, + query, + vars, + has, + get, + set, + remove, + timeoutMs, +} = {}) { + var tmo = Number.isFinite(timeoutMs) ? timeoutMs : DEFAULT_TIMEOUT_MS; + + var args = [ + "-dir", dataConfigDir, + "-timeout", `${Math.max(1,Math.ceil(tmo / 1000))}s`, + "-pretty=false", + ]; + if (dev) { + args.push("-dev"); + } + // query/vars takes precedence over KV actions + if (typeof query == "string" && query != "") { + if (vars != null) { + args.push("-vars",JSON.stringify(vars)); + } + } + else if (typeof has == "string" && has != "") { + args.push("-has",has); + } + else if (typeof get == "string" && get != "") { + args.push("-get",get); + } + else if (set && typeof set.key == "string" && set.key != "") { + args.push("-set",set.key); + } + else if (typeof remove == "string" && remove != "") { + args.push("-remove",remove); + } + + var env = { + ...process.env, + ...( + !!keyringSecret ? + { DEFRA_KEYRING_SECRET: keyringSecret, } : + null + ), + }; + + return new Promise((resolve,reject) => { + var child = spawn( + bin, + args, + { + stdio: ["pipe","pipe","pipe",], + cwd: process.cwd(), + env, + } + ); + + var stdout = ""; + var stderr = ""; + var timedOut = false; + + var timer = setTimeout( + () => { + timedOut = true; + child.kill("SIGKILL"); + }, + + // cushion beyond CLI's own -timeout + tmo + 1000 + ); + + child.stdout.on("data",d => { stdout += d.toString("utf8"); }); + child.stderr.on("data",d => { stderr += d.toString("utf8"); }); + child.on("error", (err) => { + clearTimeout(timer); + reject(new Error(`Failed to spawn defra-kv: ${err.message}`)); + }); + + child.on("close",code => { + clearTimeout(timer); + if (timedOut) return reject(new Error(`defra-kv timed out after ${tmo}ms`)); + + // explicit exit-code handling for KV actions? + if ( + args.includes("-has") || + args.includes("-get") || + args.includes("-set") || + args.includes("-remove") + ) { + if (code === 0) { + if (args.includes("-get")) { + try { + resolve(JSON.parse(stdout)); + } + catch (err) { + reject( + new Error( + `Could not parse defra-kv output: ${stdout}`, + { + cause: err, + } + ) + ); + } + } + else { + return resolve(true); + } + } + else if (code == 3) { + return resolve(false); + } + else { + return reject( + new Error(`defra-kv exited with code: ${code}`) + ); + } + } + else if (code !== 0) { + // stderr may contain JSON array of errors or plain text + return reject( + new GraphQLError( + `defra-kv exited with code: ${code}`, + { stdout, stderr, } + ) + ); + } + else { + try { + var parsed = JSON.parse(stdout); // {"data": ...} + resolve(parsed.data); + } + catch (err) { + reject( + new GraphQLError( + "Could not parse defra-kv output", + { + stdout, + cause: err, + } + ) + ); + } + } + }); + + // feed query/set via stdin to avoid shell quoting issues + if (query || (set && set.value)) { + child.stdin.write(query || JSON.stringify(set.value)); + } + child.stdin.end(); + }); +} + +// (future) +function initWASM({ instance, DEFAULT_TIMEOUT_MS = 10000 } = {}) { + if (!instance || typeof instance.exec !== "function") { + throw new Error("initWASM requires an instance exposing exec(query, vars, {timeoutMs})"); + } + transport = { + async execute({ query, vars, timeoutMs, } = {}) { + var tmo = Number.isFinite(timeoutMs) ? timeoutMs : DEFAULT_TIMEOUT_MS; + // expect instance.exec to throw on errors, return {"data": ...} or just the data + var out = await instance.exec( + query, + vars, + { timeoutMs: tmo, } + ); + // normalize to "data" like the CLI + return out && out.data !== undefined ? out.data : out; + }, + async close() { + if (typeof instance.close == "function") { + await instance.close(); + } + }, + }; + return state; +} + +function logError(err) { + if (err.errors && err.errors.length > 0) { + console.error(util.inspect(err.errors,{depth:10})); + } + else if (err.stderr) { + console.error(err.stderr); + } + else { + console.error(String(err)); + } +} + +class GraphQLError extends Error { + constructor( + message, + { + errors = [], + stdout = "", + stderr = "", + cause, + } = {} + ) { + super(message,cause ? { cause, } : undefined); + + this.name = "GraphQLError"; + + if (stderr != "") { + try { + // TODO: remove temporary hack to skip the sonic warning + stderr = stderr.replace( + "WARNING:(ast) sonic only supports go1.17~1.23, but your environment is not suitable\n", + "" + ); + + stderr = JSON.parse(stderr); + if (Array.isArray(stderr)) { + this.errors = stderr; + stderr = ""; + } + } + catch (err) {} + } + else { + this.errors = errors; + } + if (stdout) { + this.stdout = stdout; + } + if (stderr) { + this.stderr = stderr; + } + + if (Error.captureStackTrace) { + Error.captureStackTrace(this,GraphQLError); + } + } +} + +// Convert a JS value into a GraphQL input literal (strings quoted, keys unquoted) +// +// Note: temporary hack, only needed because can't send JSON/object value +// into defra as external variable currently, have to inline them into +// GraphQL query +function toGraphQLLiteral(v) { + if (v == null) return "null"; + var t = typeof v; + + if (t == "string") return JSON.stringify(v); // "..." + if (t == "number") { + if (!Number.isFinite(v)) throw new Error("Non-finite number not allowed"); + return String(v); + } + if (t == "boolean") return v ? "true" : "false"; + if (v instanceof Date) return JSON.stringify(v.toISOString()); + if (Array.isArray(v)) return `[${v.map(toGraphQLLiteral).join(", ")}]`; + + if (t == "object") { + let fields = Object.entries(v) + .filter(([, val ]) => val != null) + .map(([ k, val ]) => { + if (!/^[_A-Za-z][_0-9A-Za-z]*$/.test(k)) { + throw new Error(`Key "${k}" is not a valid GraphQL name; can't inline`); + } + return `${k}: ${toGraphQLLiteral(val)}`; + }) + .join(", "); + return `{${fields}}`; + } + + throw new Error(`Unsupported type: ${t}`); +} From 96a96945a0958ca47ef32f2788e493a32ee3dd8e Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Tue, 9 Sep 2025 15:39:57 -0500 Subject: [PATCH 6/7] switching to level-db for local KV store per @jsminz, switching from the (hacky) `defra-kv` approach to using `classic-level` (node wrapper around level-db) for KV operations (local sync metadata) --- defra-connector/cli.js | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/defra-connector/cli.js b/defra-connector/cli.js index c5d484c..fbb2627 100644 --- a/defra-connector/cli.js +++ b/defra-connector/cli.js @@ -4,6 +4,7 @@ process.on("uncaughtException",function(err){ console.log(err.stack); }); +var path = require("path"); var util = require("util"); var fs = require("fs/promises"); @@ -12,11 +13,7 @@ var { MongoClient, } = require("mongodb"); var { createClient: GQLSSEClient, } = require("graphql-sse"); // var ws = require("ws"); var fetch = require("node-fetch"); - -var { - logError, - ...defraKV -} = require("./defra-kv.js"); +var { ClassicLevel, } = require("classic-level"); const MONGO_ENDPOINT = "mongodb://127.0.0.1:27017/?replicaSet=rs0"; const LOG_MONGO = false; @@ -25,6 +22,11 @@ const LOG_MONGO = false; const DEFRA_SSE_ENDPOINT = "http://127.0.0.1:9181/api/v0/graphql"; const LOG_DEFRA = false; +var localKV = new ClassicLevel( + path.join(".",".local-kv"), + { valueEncoding: "json", } +); + var mClient; var mEvents; var mResumeToken; @@ -41,26 +43,24 @@ main().catch(console.error); // ************************************* async function main() { - defraKV.init(); - try { - var res = await defraKV.has("foo"); + var res = await localKV.has("foo"); console.log("has foo",res); - var res = await defraKV.set("foo",{ bar: 1 }); - console.log("set",res); + var res = await localKV.put("foo",{ bar: 1 }); + console.log("put",res); - var res = await defraKV.set("foo",{ bar: 2 }); - console.log("set",res); + var res = await localKV.put("foo",{ bar: 2 }); + console.log("put",res); - var res = await defraKV.get("foo"); + var res = await localKV.get("foo"); console.log("get",res); - var res = await defraKV.remove("foo"); - console.log("remove",res); + var res = await localKV.del("foo"); + console.log("del",res); } catch (err) { - logError(err); + console.error(err); } From 977a7bfac724d6a64864ef4849d67b8dbf7cfd11 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Thu, 11 Sep 2025 14:56:48 -0500 Subject: [PATCH 7/7] still very rough wip status: have achieved first 2-way sync (insert + update) between defra and mongo still plenty of caveats/bugs to test and workout. --- defra-connector/cli.js | 517 +++++++++++++++++++++++++++++++++++------ 1 file changed, 447 insertions(+), 70 deletions(-) diff --git a/defra-connector/cli.js b/defra-connector/cli.js index fbb2627..4608f3b 100644 --- a/defra-connector/cli.js +++ b/defra-connector/cli.js @@ -1,3 +1,5 @@ +#!/bin/env node + "use strict"; process.on("uncaughtException",function(err){ @@ -8,19 +10,37 @@ var path = require("path"); var util = require("util"); var fs = require("fs/promises"); -var { MongoClient, } = require("mongodb"); +var { + MongoClient, + ObjectId: MongoObjectId, +} = require("mongodb"); // var { createClient: GQLWSClient, } = require("graphql-ws"); var { createClient: GQLSSEClient, } = require("graphql-sse"); // var ws = require("ws"); var fetch = require("node-fetch"); var { ClassicLevel, } = require("classic-level"); +var cliArgs = require("minimist")( + process.argv.slice(2), + { + boolean: [ "help", ], + string: [ "collection", "db" ], + alias: { + collection: "c" + }, + default: {}, + } +); const MONGO_ENDPOINT = "mongodb://127.0.0.1:27017/?replicaSet=rs0"; const LOG_MONGO = false; // const DEFRA_WS_ENDPOINT = "ws://127.0.0.1:9181/api/v0/graphql"; const DEFRA_SSE_ENDPOINT = "http://127.0.0.1:9181/api/v0/graphql"; -const LOG_DEFRA = false; +const DEFRA_SCHEMA_ENDPOINT = "http://127.0.0.1:9181/api/v0/schema"; +const DEFRA_GRAPHQL_ENDPOINT = "http://127.0.0.1:9181/api/v0/graphql"; +const LOG_DEFRA = true; + + var localKV = new ClassicLevel( path.join(".",".local-kv"), @@ -28,13 +48,15 @@ var localKV = new ClassicLevel( ); var mClient; +var mSession; +var mSessionID; var mEvents; var mResumeToken; var mStartToken; var dConnSeq = 0; var dClient; -var dEvents; +var dEvents = []; var dLogFile; @@ -43,31 +65,55 @@ main().catch(console.error); // ************************************* async function main() { - try { - var res = await localKV.has("foo"); - console.log("has foo",res); + if (cliArgs.help) { + printHelp(); + process.exit(0); + } + else if (!(cliArgs.collection && cliArgs.db)) { + console.error("Missing required --collection and --db"); + console.log(""); + printHelp(); + process.exit(1); + } + else if (Array.isArray(cliArgs.db)) { + console.error("Only one --db may be specified"); + console.log(""); + printHelp(); + process.exit(1); + } - var res = await localKV.put("foo",{ bar: 1 }); - console.log("put",res); + cliArgs.collection = ( + Array.isArray(cliArgs.collection) ? + cliArgs.collection : - var res = await localKV.put("foo",{ bar: 2 }); - console.log("put",res); + [ cliArgs.collection, ] + ); - var res = await localKV.get("foo"); - console.log("get",res); + // try { + // var res = await localKV.has("foo"); + // console.log("has foo",res); - var res = await localKV.del("foo"); - console.log("del",res); - } - catch (err) { - console.error(err); - } + // var res = await localKV.put("foo",{ bar: 1 }); + // console.log("put",res); + // var res = await localKV.put("foo",{ bar: 2 }); + // console.log("put",res); - // await Promise.all([ - // listenMongo(), - // listenDefra(), - // ]); + // var res = await localKV.get("foo"); + // console.log("get",res); + + // var res = await localKV.del("foo"); + // console.log("del",res); + // } + // catch (err) { + // console.error(err); + // } + + + await Promise.all([ + listenMongo(), + listenDefra(), + ]); } async function listenMongo() { @@ -94,23 +140,47 @@ async function listenMongo() { ), }); - await mClient.connect(); + try { + mSession = mClient.startSession(); + mSessionID = mSession.id.id; - console.log("MongoDB connected."); + await mClient.connect(); - return startMongoWatcher(); + console.log("MongoDB connected."); + + return await startMongoWatcher(); + } + catch (err) { + console.error(err); + process.exit(1); + } } async function startMongoWatcher() { - console.log("Listening for MongoDB change events..."); + console.log(`Listening for MongoDB change events on (${ + cliArgs.collection + .map(typeName => `${cliArgs.db}:${typeName}`) + .join(",") + })...`); // filter if you want; otherwise omit pipeline // const pipeline = [ // { $match: { operationType: { $in: ["insert","update","replace","delete","drop","rename","invalidate"] } } } // ]; - var pipeline = []; + var pipeline = [ + { + $match: { + $or: [ + { + lsid: { $exists: false, }, + "lsid.id": { $ne: mSessionID, }, + }, + ], + }, + }, + ]; - mEvents = mClient.watch(pipeline,{ + mEvents = mClient.db(cliArgs.db).watch(pipeline,{ fullDocument: "updateLookup", ...(mResumeToken != null ? { resumeAfter: mResumeToken, } : null), ...(mStartToken != null ? { startAfter: mStartToken, } : null), @@ -134,18 +204,104 @@ function mTeardownEvents() { } } -function mOnChange(data) { - if (data.operationType == "invalidate") { - mStartToken = data._id; +async function mOnChange(changeEntry) { + if (changeEntry.operationType == "invalidate") { + mStartToken = changeEntry._id; mResumeToken = null; - console.log("MONGO INVALIDATE:",util.inspect(data,{depth:10})); + console.log("MONGO INVALIDATE:",util.inspect(changeEntry,{depth:10})); mTeardownEvents(); return startMongoWatcher(); } else { mStartToken = null; - mResumeToken = data._id; - console.log("MONGO CHANGE:",util.inspect(data,{depth:10})); + mResumeToken = changeEntry._id; + + let { coll: collName, } = changeEntry.ns || {}; + let { _id: mID, ...fullDocument } = changeEntry.fullDocument || {} + + if (mID != null && collName != null && fullDocument != null) { + console.log("MONGO CHANGE:",util.inspect(changeEntry,{depth:10})); + + let _docID = await localKV.get(`md_${String(mID)}`); + let resp; + + // already a known document in defra? + if (_docID != null) { + resp = await defraGraphQL({ + query: ` + mutation Update${collName}($docID:ID!,$input:${collName}MutationInputArg!) { + update_${collName}( + filter: { _docID: { _eq: $docID } }, + input: $input + ) + { + _docID + _version { cid height signature { __typename } } + } + } + `, + variables: { + docID: _docID, + input: fullDocument, + }, + }); + } + // otherwise, insert new document + else { + resp = await defraGraphQL({ + query: ` + mutation Insert${collName}($input:${collName}MutationInputArg!) { + ${collName}(input: $input) + { + _docID + _version { cid height signature { __typename } } + } + } + `, + variables: { + input: fullDocument, + }, + }); + } + + // cache defra commit info (if successful) + let respEntry = ( + ((resp ? + resp[Object.keys(resp)[0]] : + null + ) || [])[0] + ); + + if (respEntry != null) { + let headsEntry = extractHeads(respEntry._version); + if ( + headsEntry.height >= 0 && + headsEntry.heads.length > 0 + ) { + + if (_docID == null) { + _docID = respEntry._docID; + console.log(`insert; mongo (${mID}) -> defra (${_docID})`); + await localKV.put(`dh_${_docID}`,headsEntry); + await localKV.put(`dm_${_docID}`,mID); + await localKV.put(`md_${mID}`,_docID); + } + else { + await localKV.put(`dh_${_docID}`,headsEntry); + console.log(`update; mongo (${mID}) -> defra (${_docID})`); + } + } + else { + console.error(`MONGO->DEFRA INSERT/UPDATE ERROR: ${JSON.stringify(resp)}`) + } + } + else { + console.error(`MONGO->DEFRA ERROR: ${JSON.stringify(resp)}`); + } + } + else { + console.error(`MONGO UNRECOGNIZED CHANGE: ${JSON.stringify(changeEntry)}`); + } } } @@ -162,7 +318,7 @@ function mOnClose() { } async function listenDefra() { - // var dClient = GQLWSClient({ + // dClient = GQLWSClient({ // url: DEFRA_WS_ENDPOINT, // webSocketImpl: ws, // lazy: true, @@ -218,43 +374,149 @@ async function listenDefra() { console.log("DefraDB Connected."); - return startDefraListener(); + try { + let typeDefs = ( + Object.fromEntries( + (await Promise.all( + cliArgs.collection.map(fetchDefraTypeFields) + )) + ) + ); + + return await startDefraListener(typeDefs); + } + catch (err) { + console.error(err.toString()); + process.exit(1); + } +} + +async function defraGraphQL({ query, schema, variables = {}, } = {}) { + try { + let apiResp = await fetch( + ( + schema != null ? + DEFRA_SCHEMA_ENDPOINT : + + DEFRA_GRAPHQL_ENDPOINT + ), + { + method: "POST", + headers: { + "Content-Type": ( + schema != null ? + "text/plain" : + + "application/json" + ), + }, + body: ( + schema || + (query ? JSON.stringify({ query, variables, }) : null) || + "" + ), + } + ); + if (apiResp.ok) { + return ( + schema != null ? + (await apiResp.text()) : + + (await apiResp.json()).data + ); + } + else { + let apiRespBody = await apiResp.text(); + throw new Error(apiRespBody); + } + } + catch (err) { + console.error(err.toString()); + } } -async function startDefraListener() { - console.log("Listening for DefraDB updates..."); +async function fetchDefraTypeFields(collection) { + console.log(`Fetching DefraDB type definition for ${collection}...`); - dEvents = dClient.subscribe( - { - // not currently supported: - // - // query: ` - // subscription { - // Commit { - // CID - // DocID - // CollectionID - // Delta - // } - // } - // `, - query: ` - subscription { - User { - _docID - name - email - counter - } + var resp = await defraGraphQL({ + query: ` + query { + __type(name:"${collection}") { + fields { name } } - `, - }, - { - next: dOnNext, - error: dOnError, - complete: dOnComplete, - } - ); + } + `, + }); + + if (resp && resp.__type && resp.__type.fields) { + let fieldNames = ( + resp.__type.fields + .map(entry => entry.name) + .filter(name => name[0] != "_") + ); + return [ collection, fieldNames, ]; + } + + throw new Error(`Type '${collection}' not found/retrieved properly`); +} + +async function startDefraListener(typeDefs) { + console.log(`Subscribing to DefraDB updates (${Object.keys(typeDefs).join(",")})...`); + + for (let [ typeName, typeFields, ] of Object.entries(typeDefs)) { + dEvents.push( + dClient.subscribe( + { + query: ` + subscription { + ${typeName}(showDeleted: true) { + _docID ${typeFields.join(" ")} + _version { cid height signature { __typename } } + } + } + `, + }, + { + next: dOnNext, + error: dOnError, + complete: dOnComplete, + } + ) + ); + } + + + // dEvents = dClient.subscribe( + // { + // // not currently supported: + // // + // // query: ` + // // subscription { + // // Commit { + // // CID + // // DocID + // // CollectionID + // // Delta + // // } + // // } + // // `, + // query: ` + // subscription { + // User { + // _docID + // name + // email + // counter + // } + // } + // `, + // }, + // { + // next: dOnNext, + // error: dOnError, + // complete: dOnComplete, + // } + // ); // "dEvents.return()" to dispose @@ -277,8 +539,114 @@ async function startDefraListener() { // } } -function dOnNext(evt) { - console.log("DEFRA UPDATE:",util.inspect(evt.data,{depth:10})); +function extractHeads(versions) { + if (versions != null) { + let maxHeight = Math.max(...versions.map(v => v.height)); + return { + heads: [ + ...(new Set( + versions + .filter(v => v.height == maxHeight) + .map(v => v.cid) + )), + ], + height: maxHeight, + }; + } + return { heads: [], height: -1, }; +} + +async function dOnNext(evt) { + var changeEntry = ( + ((evt && evt.data ? + evt.data[Object.keys(evt.data)[0]] : + null + ) || [])[0] + ); + + if (changeEntry != null) { + let collName = Object.keys(evt.data)[0]; + let { _docID, _version, ...$set } = changeEntry; + let headsEntry = extractHeads(_version); + let cachedHeads = ( + (await localKV.get(`dh_${_docID}`)) || + { heads: [], height: -1 } + ); + + if ( + // higher commit height? + headsEntry.height > cachedHeads.height || + + ( + // same commit height? + headsEntry.height == cachedHeads.height && + + // but includes commit(s) we didn't do and cache? + ![ ...headsEntry.heads, ] + .every(cID => cachedHeads.heads.includes(cID)) + ) + ) { + console.log("DEFRA CHANGE:",util.inspect(evt.data,{depth:10})); + + try { + await mSession.withTransaction(async () => { + var mID = (await localKV.get(`dm_${_docID}`)) || null; + + try { + let res = await ( + mClient + .db(cliArgs.db) + .collection(collName) + .updateOne( + /*filter=*/{ + _id: new MongoObjectId(...( + mID != null ? [ mID ] : [] + )), + }, + /*update=*/{ + $set, + }, + /*options=*/{ + upsert: true, + session: mSession, + } + ) + ); + + // mongo upsert succeeded? + if ( + res && + res.acknowledged && + (res.upsertedCount == 1 || res.modifiedCount == 1) + ) { + // inserting new document into mongo? + if (mID == null) { + mID = String(res.upsertedId); + console.log(`insert; defra (${_docID}) -> mongo (${mID})`); + await localKV.put(`dm_${_docID}`,mID); + await localKV.put(`md_${mID}`,_docID); + } + else { + console.log(`update; defra (${_docID}) -> mongo (${mID})`); + } + } + else { + console.error(`DEFRA->MONGO UPSERT ERROR: ${util.inspect(res,{depth:10})}`); + } + } + catch (err) { + console.error(`DEFRA->MONGO ERROR: ${err.toString()}`); + } + }); + } + catch (err) { + console.error(`DEFRA->MONGO TRANSACTION ERROR: ${err.toString()}`); + } + } + } + else { + console.error(`DEFRA UNRECOGNIZED CHANGE: ${JSON.stringify(evt)}`); + } } function dOnError(err) { @@ -291,3 +659,12 @@ function dOnComplete() { listenDefra().catch(console.error); },250); } + +function printHelp() { + console.log("DefraConnector"); + console.log(" Usage: cli.js --db=DB_NAME -c COLL_NAME [OPTION]..."); + console.log(""); + console.log("--db=DB mongo DB name"); + console.log("-c, --collection=COLL_NAME collection to subscribe to by name"); + console.log("--help print this help"); +}