Skip to content

feature/worker health check & recreation #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ export class Api {
this.redisWorkerGroupName = this.prefix + ':worker'
this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset`
this._destroyed = false
/** @type {import('worker_threads').Worker | null} */
this.persistWorker = null

const addScript = WORKER_DISABLED
? `
Expand Down
6 changes: 5 additions & 1 deletion src/persist-worker-thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ export class PersistWorkerThread {
return
}
this.store = store
parentPort?.on('message', this.persist)
parentPort?.postMessage({ event: 'ready' })
parentPort?.on('message', ({ event, ...rest }) => {
if (event === 'ping') parentPort?.postMessage({ event: 'pong' })
else this.persist(rest)
})
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ class YSocketIOServer {
* @param {string} [conf.redisPrefix]
* @param {string} [conf.redisUrl]
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
* @param {import('worker_threads').Worker=} [conf.persistWorker]
* @param {() => import('worker_threads').Worker=} [conf.getPersistWorker]
* @param {boolean} [conf.enableAwareness]
*/
export const registerYSocketIOServer = async (io, store, {
authenticate,
redisUrl,
redisPrefix,
persistWorker,
getPersistWorker,
enableAwareness = true
}) => {
const app = new YSocketIO(io, { authenticate, enableAwareness })
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, getPersistWorker })
return new YSocketIOServer(app, client, subscriber)
}
101 changes: 92 additions & 9 deletions src/y-socket-io/y-socket-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max
const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000')
const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true'
const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000')
const WORKER_HEALTH_CHECK_INTERVAL = number.parseInt(env.getConf('y-socket-io-worker-health-check-interval') || '5000')

process.on('SIGINT', function () {
// calling .shutdown allows your process to exit normally
Expand Down Expand Up @@ -146,6 +147,26 @@ export class YSocketIO {
* @readonly
*/
awaitingCleanupNamespace = new Map()
/**
* @type {boolean}
* @private
*/
workerReady = false
/**
* @type {number | null}
* @private
*/
workerLastHeartbeat = null
/**
* @type {{ promise: Promise<boolean>, resolve: (result: boolean) => void } | null}
* @private
*/
workerHeartbeatContext = null
/**
* @type {NodeJS.Timeout | null}
* @private
*/
persistWorkerHealthCheckTimeout = null

/**
* YSocketIO constructor.
Expand All @@ -169,20 +190,22 @@ export class YSocketIO {
*
* It also starts socket connection listeners.
* @param {import('../storage.js').AbstractStorage} store
* @param {{ redisPrefix?: string, redisUrl?: string, persistWorker?: import('worker_threads').Worker }=} opts
* @param {{ redisPrefix?: string, redisUrl?: string, getPersistWorker?: () => import('worker_threads').Worker }=} opts
* @public
*/
async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) {
async initialize (store, { redisUrl, redisPrefix = 'y', getPersistWorker } = {}) {
const { enableAwareness } = this.configuration
const [client, subscriber] = await promise.all([
api.createApiClient(store, { redisUrl, redisPrefix, enableAwareness }),
createSubscriber(store, { redisUrl, redisPrefix, enableAwareness })
])
this.client = client
this.subscriber = subscriber
if (persistWorker) {
this.client.persistWorker = persistWorker
if (getPersistWorker) {
this.getPersistWorker = getPersistWorker
this.persistWorker = getPersistWorker()
this.registerPersistWorkerResolve()
this.registerPersistWorkerHealthCheck()
}

this.nsp = this.io.of(/^\/yjs\|.*$/)
Expand Down Expand Up @@ -518,16 +541,15 @@ export class YSocketIO {
const doc = this.namespaceDocMap.get(namespace)?.ydoc
logSocketIO(`trying to persist ${namespace}`)
if (!doc) return
if (this.client.persistWorker) {
if (this.persistWorker && this.workerReady) {
/** @type {ReturnType<typeof promiseWithResolvers<void>>} */
const { promise, resolve } = promiseWithResolvers()
assert(this.client?.persistWorker)
this.awaitingPersistMap.set(namespace, { promise, resolve })

const docState = Y.encodeStateAsUpdateV2(doc)
const buf = new Uint8Array(new SharedArrayBuffer(docState.length))
buf.set(docState)
this.client.persistWorker.postMessage({
this.persistWorker.postMessage({
room: namespace,
docstate: buf
})
Expand Down Expand Up @@ -627,6 +649,9 @@ export class YSocketIO {

destroy () {
try {
if (this.persistWorkerHealthCheckTimeout) {
clearInterval(this.persistWorkerHealthCheckTimeout)
}
this.subscriber?.destroy()
return this.client?.destroy()
} catch (e) {
Expand All @@ -635,9 +660,13 @@ export class YSocketIO {
}

registerPersistWorkerResolve () {
if (!this.client?.persistWorker) return
this.client.persistWorker.on('message', ({ event, room }) => {
if (!this.persistWorker) return
this.persistWorker.on('message', ({ event, room }) => {
if (event === 'persisted') this.awaitingPersistMap.get(room)?.resolve()
if (event === 'pong' && this.workerHeartbeatContext) {
this.workerHeartbeatContext.resolve(true)
}
this.workerReady = true
})
}

Expand Down Expand Up @@ -677,4 +706,58 @@ export class YSocketIO {
this.namespaceDocMap.delete(namespace)
this.namespacePersistentMap.delete(namespace)
}

async waitUntilWorkerReady () {
if (!this.persistWorker || this.workerReady) return
/** @type {ReturnType<typeof promiseWithResolvers<void>>} */
const { promise, resolve } = promiseWithResolvers()
const timer = setInterval(() => {
if (!this.workerReady) return
clearInterval(timer)
resolve()
}, 100)
await promise
}

registerPersistWorkerHealthCheck () {
this.persistWorkerHealthCheckTimeout = setTimeout(async () => {
const workerHealth = await this.workerHealthCheck()
if (!workerHealth) {
logSocketIO('worker thread is unhealthy, recreating')
assert(this.getPersistWorker)
this.workerReady = false
await this.persistWorker?.removeAllListeners().terminate()
this.persistWorker = this.getPersistWorker()
this.registerPersistWorkerResolve()
await this.waitUntilWorkerReady()
}
this.registerPersistWorkerHealthCheck()
}, WORKER_HEALTH_CHECK_INTERVAL)
}

async workerHealthCheck () {
if (!this.persistWorker || this.workerHeartbeatContext) return null
if (
this.workerLastHeartbeat &&
Date.now() - this.workerLastHeartbeat < WORKER_HEALTH_CHECK_INTERVAL * 2
) {
return true
}

/** @type {ReturnType<typeof promiseWithResolvers<boolean>>} */
const { promise: heartbeatPromise, resolve } = promiseWithResolvers()
this.workerHeartbeatContext = { promise: heartbeatPromise, resolve }
const now = performance.now()
this.persistWorker.postMessage({ event: 'ping' })
const health = await Promise.race([
heartbeatPromise,
promise.wait(3000).then(() => false)
])
this.workerHeartbeatContext = null
if (health) {
logSocketIO(`worker health check: responded in ${performance.now() - now}ms`)
this.workerLastHeartbeat = Date.now()
}
return health
}
}