diff --git a/src/api.js b/src/api.js index 4509d95..73618b7 100644 --- a/src/api.js +++ b/src/api.js @@ -122,8 +122,6 @@ export class Api { this.redisWorkerGroupName = this.prefix + ':worker' this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset` this._destroyed = false - /** @type {import('worker_threads').Worker | null} */ - this.persistWorker = null const addScript = WORKER_DISABLED ? ` diff --git a/src/persist-worker-thread.js b/src/persist-worker-thread.js index 4e588be..6cb3af0 100644 --- a/src/persist-worker-thread.js +++ b/src/persist-worker-thread.js @@ -18,7 +18,11 @@ export class PersistWorkerThread { return } this.store = store - parentPort?.on('message', this.persist) + parentPort?.postMessage({ event: 'ready' }) + parentPort?.on('message', ({ event, ...rest }) => { + if (event === 'ping') parentPort?.postMessage({ event: 'pong' }) + else this.persist(rest) + }) } /** diff --git a/src/socketio.js b/src/socketio.js index 2388385..8208877 100644 --- a/src/socketio.js +++ b/src/socketio.js @@ -37,17 +37,17 @@ class YSocketIOServer { * @param {string} [conf.redisPrefix] * @param {string} [conf.redisUrl] * @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate - * @param {import('worker_threads').Worker=} [conf.persistWorker] + * @param {() => import('worker_threads').Worker=} [conf.getPersistWorker] * @param {boolean} [conf.enableAwareness] */ export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, - persistWorker, + getPersistWorker, enableAwareness = true }) => { const app = new YSocketIO(io, { authenticate, enableAwareness }) - const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker }) + const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, getPersistWorker }) return new YSocketIOServer(app, client, subscriber) } diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 6cf4ac8..8f395f5 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -21,6 +21,7 @@ const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000') +const WORKER_HEALTH_CHECK_INTERVAL = number.parseInt(env.getConf('y-socket-io-worker-health-check-interval') || '5000') process.on('SIGINT', function () { // calling .shutdown allows your process to exit normally @@ -146,6 +147,26 @@ export class YSocketIO { * @readonly */ awaitingCleanupNamespace = new Map() + /** + * @type {boolean} + * @private + */ + workerReady = false + /** + * @type {number | null} + * @private + */ + workerLastHeartbeat = null + /** + * @type {{ promise: Promise, resolve: (result: boolean) => void } | null} + * @private + */ + workerHeartbeatContext = null + /** + * @type {NodeJS.Timeout | null} + * @private + */ + persistWorkerHealthCheckTimeout = null /** * YSocketIO constructor. @@ -169,10 +190,10 @@ export class YSocketIO { * * It also starts socket connection listeners. * @param {import('../storage.js').AbstractStorage} store - * @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }=} opts + * @param {{ redisPrefix?: string, redisUrl?: string, getPersistWorker?: () => import('worker_threads').Worker }=} opts * @public */ - async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) { + async initialize (store, { redisUrl, redisPrefix = 'y', getPersistWorker } = {}) { const { enableAwareness } = this.configuration const [client, subscriber] = await promise.all([ api.createApiClient(store, { redisUrl, redisPrefix, enableAwareness }), @@ -180,9 +201,11 @@ export class YSocketIO { ]) this.client = client this.subscriber = subscriber - if (persistWorker) { - this.client.persistWorker = persistWorker + if (getPersistWorker) { + this.getPersistWorker = getPersistWorker + this.persistWorker = getPersistWorker() this.registerPersistWorkerResolve() + this.registerPersistWorkerHealthCheck() } this.nsp = this.io.of(/^\/yjs\|.*$/) @@ -518,16 +541,15 @@ export class YSocketIO { const doc = this.namespaceDocMap.get(namespace)?.ydoc logSocketIO(`trying to persist ${namespace}`) if (!doc) return - if (this.client.persistWorker) { + if (this.persistWorker && this.workerReady) { /** @type {ReturnType>} */ const { promise, resolve } = promiseWithResolvers() - assert(this.client?.persistWorker) this.awaitingPersistMap.set(namespace, { promise, resolve }) const docState = Y.encodeStateAsUpdateV2(doc) const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) buf.set(docState) - this.client.persistWorker.postMessage({ + this.persistWorker.postMessage({ room: namespace, docstate: buf }) @@ -627,6 +649,9 @@ export class YSocketIO { destroy () { try { + if (this.persistWorkerHealthCheckTimeout) { + clearInterval(this.persistWorkerHealthCheckTimeout) + } this.subscriber?.destroy() return this.client?.destroy() } catch (e) { @@ -635,9 +660,13 @@ export class YSocketIO { } registerPersistWorkerResolve () { - if (!this.client?.persistWorker) return - this.client.persistWorker.on('message', ({ event, room }) => { + if (!this.persistWorker) return + this.persistWorker.on('message', ({ event, room }) => { if (event === 'persisted') this.awaitingPersistMap.get(room)?.resolve() + if (event === 'pong' && this.workerHeartbeatContext) { + this.workerHeartbeatContext.resolve(true) + } + this.workerReady = true }) } @@ -677,4 +706,58 @@ export class YSocketIO { this.namespaceDocMap.delete(namespace) this.namespacePersistentMap.delete(namespace) } + + async waitUntilWorkerReady () { + if (!this.persistWorker || this.workerReady) return + /** @type {ReturnType>} */ + const { promise, resolve } = promiseWithResolvers() + const timer = setInterval(() => { + if (!this.workerReady) return + clearInterval(timer) + resolve() + }, 100) + await promise + } + + registerPersistWorkerHealthCheck () { + this.persistWorkerHealthCheckTimeout = setTimeout(async () => { + const workerHealth = await this.workerHealthCheck() + if (!workerHealth) { + logSocketIO('worker thread is unhealthy, recreating') + assert(this.getPersistWorker) + this.workerReady = false + await this.persistWorker?.removeAllListeners().terminate() + this.persistWorker = this.getPersistWorker() + this.registerPersistWorkerResolve() + await this.waitUntilWorkerReady() + } + this.registerPersistWorkerHealthCheck() + }, WORKER_HEALTH_CHECK_INTERVAL) + } + + async workerHealthCheck () { + if (!this.persistWorker || this.workerHeartbeatContext) return null + if ( + this.workerLastHeartbeat && + Date.now() - this.workerLastHeartbeat < WORKER_HEALTH_CHECK_INTERVAL * 2 + ) { + return true + } + + /** @type {ReturnType>} */ + const { promise: heartbeatPromise, resolve } = promiseWithResolvers() + this.workerHeartbeatContext = { promise: heartbeatPromise, resolve } + const now = performance.now() + this.persistWorker.postMessage({ event: 'ping' }) + const health = await Promise.race([ + heartbeatPromise, + promise.wait(3000).then(() => false) + ]) + this.workerHeartbeatContext = null + if (health) { + logSocketIO(`worker health check: responded in ${performance.now() - now}ms`) + this.workerLastHeartbeat = Date.now() + } + return health + } }