diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index ebeacde..d93a18c 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -157,7 +157,7 @@ export class SocketIOProvider extends Observable { this.doc.on('update', this.onUpdateDoc) - this.socket.on('connect', () => this.onSocketConnection(resyncInterval)) + this.socket.once('ready-for-sync', () => this.onSocketConnection(resyncInterval)) this.socket.on('disconnect', (event) => this.onSocketDisconnection(event)) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 2993fd0..1779033 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -90,6 +90,12 @@ export class YSocketIO { */ namespaceMap = new Map() + /** + * @type {Promise[]} + * @private + */ + syncQueue = [] + /** * YSocketIO constructor. * @constructor @@ -157,16 +163,27 @@ export class YSocketIO { this.initAwarenessListeners(socket) this.initSocketListeners(socket) - const doc = await this.client.getDoc(namespace, 'index') - - if ( - api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) - ) { - // our subscription is newer than the content that we received from the api - // need to renew subscription id and make sure that we catch the latest content. - this.subscriber.ensureSubId(stream, doc.redisLastId) - } - this.startSynchronization(socket, doc) + /** + * @type {Promise} + */ + const task = new Promise((resolve) => { + assert(this.client) + this.client.getDoc(namespace, 'index').then((doc) => { + assert(socket.user) + assert(this.subscriber) + socket.emit('ready-for-sync') + if ( + api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) + ) { + // our subscription is newer than the content that we received from the api + // need to renew subscription id and make sure that we catch the latest content. + this.subscriber.ensureSubId(stream, doc.redisLastId) + } + this.startSynchronization(socket, doc) + resolve() + }) + }) + this.queueUpSyncTask(task) }) return { client, subscriber } @@ -336,6 +353,25 @@ export class YSocketIO { } } + /** + * @private + * @param {Promise} task + */ + queueUpSyncTask (task) { + const len = this.syncQueue.push(task) + if (len === 1) this.consumeSyncQueue() + } + + /** + * @private + */ + async consumeSyncQueue () { + if (this.syncQueue.length === 0) return + const task = this.syncQueue.shift() + await task + this.consumeSyncQueue() + } + /** * @param {Namespace} namespace */