Skip to content

Commit 56fe6b0

Browse files
authored
Merge pull request #29 from hackmdio/fix/qol-and-client-resync
fix/client resync mechanism & server persist leader & quality-of-life improvements
2 parents a1db206 + 3a792db commit 56fe6b0

File tree

7 files changed

+205
-47
lines changed

7 files changed

+205
-47
lines changed

bin/worker.js

100644100755
File mode changed.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
"socket.io-client": "^4.8.0",
6969
"toobusy-js": "^0.5.1",
7070
"y-protocols": "^1.0.6",
71-
"yjs": "^13.6.18"
71+
"yjs": "^13.6.27"
7272
},
7373
"optionalDependencies": {
7474
"minio": "^7.1.3",

pnpm-lock.yaml

Lines changed: 18 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/api.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as Y from 'yjs'
2-
import * as redis from 'redis'
2+
import { createClient, defineScript, commandOptions } from 'redis'
33
import * as map from 'lib0/map'
44
import * as decoding from 'lib0/decoding'
55
import * as awarenessProtocol from 'y-protocols/awareness'
@@ -97,6 +97,7 @@ export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwar
9797
}
9898

9999
export class Api {
100+
redis
100101
/**
101102
* @param {import('./storage.js').AbstractStorage} store
102103
* @param {string=} prefix
@@ -138,11 +139,11 @@ export class Api {
138139
redis.call("EXPIRE", KEYS[1], ${ROOM_STREAM_TTL})
139140
`
140141

141-
this.redis = redis.createClient({
142+
this.redis = createClient({
142143
url,
143144
// scripting: https://github.com/redis/node-redis/#lua-scripts
144145
scripts: {
145-
addMessage: redis.defineScript({
146+
addMessage: defineScript({
146147
NUMBER_OF_KEYS: 1,
147148
SCRIPT: addScript,
148149
/**
@@ -159,7 +160,7 @@ export class Api {
159160
return x
160161
}
161162
}),
162-
xDelIfEmpty: redis.defineScript({
163+
xDelIfEmpty: defineScript({
163164
NUMBER_OF_KEYS: 1,
164165
SCRIPT: `
165166
if redis.call("XLEN", KEYS[1]) == 0 then
@@ -193,7 +194,7 @@ export class Api {
193194
return []
194195
}
195196
const reads = await this.redis.xRead(
196-
redis.commandOptions({ returnBuffers: true }),
197+
commandOptions({ returnBuffers: true }),
197198
streams,
198199
{ BLOCK: 1000, COUNT: 1000 }
199200
)
@@ -241,7 +242,7 @@ export class Api {
241242
* @param {string} docid
242243
*/
243244
async getDoc (room, docid) {
244-
const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
245+
const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
245246
const docMessages = ms.get(room)?.get(docid) || null
246247
if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`)
247248
const docstate = await this.store.retrieveDoc(room, docid)
@@ -289,7 +290,7 @@ export class Api {
289290
* @param {string} docid
290291
*/
291292
async getRedisLastId (room, docid) {
292-
const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
293+
const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
293294
const docMessages = ms.get(room)?.get(docid) || null
294295
return docMessages?.lastId.toString() || '0'
295296
}
@@ -370,6 +371,7 @@ export class Api {
370371
// call YDOC_UPDATE_CALLBACK here
371372
const formData = new FormData()
372373
// @todo only convert ydoc to updatev2 once
374+
// @ts-ignore
373375
formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)]))
374376
// @todo should add a timeout to fetch (see fetch signal abortcontroller)
375377
const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' })

src/server.js

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import * as env from 'lib0/environment'
2-
import * as logging from 'lib0/logging'
32
import * as jwt from 'lib0/crypto/jwt'
43
import * as ecdsa from 'lib0/crypto/ecdsa'
54
import * as json from 'lib0/json'
@@ -50,8 +49,12 @@ export const createYSocketIOServer = async ({
5049
}
5150
})
5251

53-
httpServer.listen(port, undefined, undefined, () => {
54-
logging.print(logging.GREEN, '[y-redis] Listening to port ', port)
55-
})
52+
httpServer.listen(port, undefined, undefined)
53+
54+
const oriDestroy = server.destroy
55+
server.destroy = async () => {
56+
await oriDestroy.call(server)
57+
await new Promise((resolve) => httpServer.close(resolve))
58+
}
5659
return server
5760
}

src/y-socket-io/client.js

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ import { io } from 'socket.io-client'
3737
*
3838
* @prop {Record<string, unknown>=} auth
3939
* (Optional) Add the authentication data
40+
*
41+
* @prop {ClientSocket=} socket
42+
* (Optional) Supply custom socket.io client socket. If supplied, `socketIoOptions` will be ignored.
4043
*/
4144

4245
/**
@@ -138,7 +141,8 @@ export class SocketIOProvider extends Observable {
138141
awareness = enableAwareness ? new AwarenessProtocol.Awareness(doc) : undefined,
139142
resyncInterval = -1,
140143
disableBc = false,
141-
auth = {}
144+
auth = {},
145+
socket
142146
} = {},
143147
socketIoOptions = undefined
144148
) {
@@ -157,14 +161,17 @@ export class SocketIOProvider extends Observable {
157161
this.disableBc = disableBc
158162
this._socketIoOptions = socketIoOptions
159163

160-
this.socket = io(`${this.url}/yjs|${roomName}`, {
161-
autoConnect: false,
162-
transports: ['websocket'],
163-
forceNew: true,
164-
auth,
165-
...socketIoOptions
166-
})
167-
this._socketIoOptions = socketIoOptions
164+
if (socket) {
165+
this.socket = socket
166+
} else {
167+
this.socket = io(`${this.url}/yjs|${roomName}`, {
168+
autoConnect: false,
169+
transports: ['websocket'],
170+
forceNew: true,
171+
auth,
172+
...socketIoOptions
173+
})
174+
}
168175

169176
this.doc.on('update', this.onUpdateDoc)
170177

@@ -333,19 +340,24 @@ export class SocketIOProvider extends Observable {
333340
)
334341
}
335342
if (resyncInterval > 0) {
336-
this.resyncInterval = setInterval(() => {
337-
if (this.socket.disconnected) return
338-
this.socket.emit(
339-
'sync-step-1',
340-
Y.encodeStateVector(this.doc),
341-
(/** @type {Uint8Array} */ update) => {
342-
Y.applyUpdate(this.doc, new Uint8Array(update), this)
343-
}
344-
)
345-
}, resyncInterval)
343+
this.resyncInterval = setInterval(() => this.resync(), resyncInterval)
346344
}
347345
}
348346

347+
/**
348+
* Resynchronize the document with the server by firing `sync-step-1`.
349+
*/
350+
resync () {
351+
if (this.socket.disconnected) return
352+
this.socket.emit(
353+
'sync-step-1',
354+
Y.encodeStateVector(this.doc),
355+
(/** @type {Uint8Array} */ update) => {
356+
Y.applyUpdate(this.doc, new Uint8Array(update), this)
357+
}
358+
)
359+
}
360+
349361
/**
350362
* Disconnect provider's socket
351363
* @type {() => void}
@@ -406,6 +418,11 @@ export class SocketIOProvider extends Observable {
406418
super.destroy()
407419
}
408420

421+
/**
422+
* @type {number}
423+
* @private
424+
*/
425+
_updateRetries = 0
409426
/**
410427
* This function is executed when the document is updated, if the instance that
411428
* emit the change is not this, it emit the changes by socket and broadcast channel.
@@ -414,9 +431,28 @@ export class SocketIOProvider extends Observable {
414431
* @param {SocketIOProvider} origin The SocketIOProvider instance that emits the change.
415432
* @readonly
416433
*/
417-
onUpdateDoc = (update, origin) => {
434+
onUpdateDoc = async (update, origin) => {
435+
if (this._updateRetries > 3) {
436+
this._updateRetries = 0
437+
this.disconnect()
438+
this.connect()
439+
return
440+
}
441+
418442
if (origin !== this) {
419-
this.socket.emit('sync-update', update)
443+
/** @type {boolean} */
444+
const ack = await Promise.race([
445+
new Promise((resolve) => this.socket.emit('sync-update', update, () => resolve(true))),
446+
new Promise((resolve) => setTimeout(() => resolve(false), 3000))
447+
])
448+
if (!ack) {
449+
this._updateRetries++
450+
if (this.socket.disconnected) return
451+
await this.onUpdateDoc(update, origin)
452+
return
453+
} else {
454+
this._updateRetries = 0
455+
}
420456
if (this.bcconnected) {
421457
bc.publish(
422458
this._broadcastChannel,

0 commit comments

Comments
 (0)