Skip to content

Commit 2187bc4

Browse files
author
Alec Gibson
committed
Add Presence functionality
This change adds the ability for clients to broadcast information about "Presence" - the notion of a client's position or state in a particular document. This might be represent a cursor in a text document, or a highlighted field in a more complex JSON document, or any other transient, current information about a client that shouldn't necessarily be stored in the document's chain of ops. The main complication that this feature solves is the issue of keeping presence correctly associated with the version of a `Doc` it was created at. For example, in a "naive" implementation of presence, presence information can arrive ahead of or behind ops, which - in a text-based example - can cause the cursor to "jitter" around the change. Using the ShareDB implementation will ensure that the presence is correctly transformed against any ops, and will ensure that presence information is always consistent with the version of the document. We also locally transform existing presence, which should help to keep (static) remote presence correctly positioned, independent of latency. In order to facilitate this, the feature must be used with an OT type that supports presence. The only requirement for supporting presence is the support of a `transformPresence` method: ```javascript type.transformPresence(presence, op, isOwnOperation): presence; ``` * `presence` _Object_: the presence data being transformed. The type will define this shape to be whatever is appropriate for the type. * `op` _Op_: the operation against which to transform the presence * `isOwnOperation`: _boolean_: whether the presence and the op have the same "owner". This information can be useful for some types to break ties when transforming a presence, for example as used in [`rich-text`][1] This work is based on the [work][2] by @gkubisa and @curran, but with the following aims: - avoid modifying the existing `Doc` class as much as possible, and instead use lifecycle hooks - keep presence separate as its own conceptual entity - break the presence subscriptions apart from `Doc` subscriptions (although in practice, the two are obviously tightly coupled) - allow multiple presences on a single `Doc` on the same `Connection` [1]: https://github.com/quilljs/delta#tranformposition [2]: #288
1 parent d5b5b5e commit 2187bc4

20 files changed

+2589
-19
lines changed

README.md

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ Register a new middleware.
158158
the database.
159159
* `'receive'`: Received a message from a client
160160
* `'reply'`: About to send a non-error reply to a client message
161+
* `'sendPresence'`: About to send presence information to a client
161162
* `fn` _(Function(context, callback))_
162163
Call this function at the time specified by `action`.
163164
* `context` will always have the following properties:
@@ -307,6 +308,20 @@ Get a read-only snapshot of a document at the requested version.
307308
}
308309
```
309310

311+
`connection.getPresence(channel): Presence;`
312+
Get a [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence.
313+
314+
* `channel` _(String)_
315+
Presence channel to subscribe to
316+
317+
`connection.getDocPresence(collection, id): DocPresence;`
318+
Get a special [`DocPresence`](#class-sharedbdocpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. This is tied to a `Doc`, and all presence will be automatically transformed against ops to keep presence current. Note that the `Doc` must be of a type that supports presence.
319+
320+
* `collection` _(String)_
321+
Document collection
322+
* `id` _(String)_
323+
Document ID
324+
310325
### Class: `ShareDB.Doc`
311326

312327
`doc.type` _(String_)
@@ -640,6 +655,109 @@ const connectionInfo = getUserPermissions();
640655
const connection = backend.connect(null, connectionInfo);
641656
```
642657

658+
### Class: `ShareDB.Presence`
659+
660+
Representation of the presence data associated with a given channel.
661+
662+
#### `subscribe`
663+
664+
```javascript
665+
presence.subscribe(callback): void;
666+
```
667+
668+
Subscribe to presence updates from other clients. Note that presence can be submitted without subscribing, but remote clients will not be able to re-request presence from you if you are not subscribed.
669+
670+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
671+
672+
#### `unsubscribe`
673+
674+
```javascript
675+
presence.unsubscribe(callback): void;
676+
```
677+
678+
Unsubscribe from presence updates from remote clients.
679+
680+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
681+
682+
#### `on`
683+
684+
```javascript
685+
presence.on('receive', callback): void;
686+
```
687+
688+
An update from a remote presence client has been received.
689+
690+
* `callback` _Function_: callback for handling the received presence: `function (presenceId, presenceValue): void;`
691+
692+
```javascript
693+
presence.on('error', callback): void;
694+
```
695+
696+
A presence-related error has occurred.
697+
698+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
699+
700+
#### `create`
701+
702+
```javascript
703+
presence.create(presenceId): LocalPresence;
704+
```
705+
706+
Create an instance of [`LocalPresence`](#class-sharedblocalpresence), which can be used to represent local presence. Many or none such local presences may exist on a `Presence` instance.
707+
708+
* `presenceId` _string_: a unique ID representing the local presence. Remember - depending on use-case - the same client might have multiple presences, so this might not necessarily be a user or client ID.
709+
710+
#### `destroy`
711+
712+
```javascript
713+
presence.destroy(callback);
714+
```
715+
716+
Updates all remote clients with a `null` presence, and removes it from the `Connection` cache, so that it can be garbage-collected. This should be called when you are done with a presence, and no longer need to use it to fire updates.
717+
718+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
719+
720+
### Class: `ShareDB.DocPresence`
721+
722+
Specialised case of [`Presence`](#class-sharedbpresence), which is tied to a specific [`Doc`](#class-sharedbdoc). When using presence with an associated `Doc`, any ops applied to the `Doc` will automatically be used to transform associated presence. On destroy, the `DocPresence` will unregister its listeners from the `Doc`.
723+
724+
See [`Presence`](#class-sharedbpresence) for available methods.
725+
726+
### Class: `ShareDB.LocalPresence`
727+
728+
`LocalPresence` represents the presence of the local client in a given `Doc`. For example, this might be the position of a caret in a text document; which field has been highlighted in a complex JSON object; etc. Multiple presences may exist per `Doc` even on the same client.
729+
730+
#### `submit`
731+
732+
```javascript
733+
localPresence.submit(presence, callback): void;
734+
```
735+
736+
Update the local representation of presence, and broadcast that presence to any other document presence subscribers.
737+
738+
* `presence` _Object_: the presence object to broadcast. The structure of this will depend on the OT type
739+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
740+
741+
#### `send`
742+
743+
```javascript
744+
localPresence.send(callback): void;
745+
```
746+
747+
Send presence like `submit`, but without updating the value. Can be useful if local presences expire periodically.
748+
749+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
750+
751+
#### `destroy`
752+
753+
```javascript
754+
localPresence.destroy(callback): void;
755+
```
756+
757+
Informs all remote clients that this presence is now `null`, and deletes itself for garbage collection.
758+
759+
* `callback` _Function_: a callback with the signature `function (error: Error): void;`
760+
643761
### Logging
644762

645763
By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service.

lib/agent.js

Lines changed: 141 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,19 @@ function Agent(backend, stream) {
3535
// Map from queryId -> emitter
3636
this.subscribedQueries = {};
3737

38+
// Track which documents are subscribed to presence by the client. This is a
39+
// map of channel -> stream
40+
this.subscribedPresences = {};
41+
// Highest seq received for a subscription request. Any seq lower than this
42+
// value is stale, and should be ignored. Used for keeping the subscription
43+
// state in sync with the client's desired state
44+
this.presenceSubscriptionSeq = 0;
45+
// Keep track of the last request that has been sent by each local presence
46+
// belonging to this agent. This is used to generate a new disconnection
47+
// request if the client disconnects ungracefully. This is a
48+
// map of channel -> id -> request
49+
this.presenceRequests = {};
50+
3851
// We need to track this manually to make sure we don't reply to messages
3952
// after the stream was closed.
4053
this.closed = false;
@@ -78,6 +91,11 @@ Agent.prototype._cleanup = function() {
7891
}
7992
this.subscribedDocs = {};
8093

94+
for (var channel in this.subscribedPresences) {
95+
this.subscribedPresences[channel].destroy();
96+
}
97+
this.subscribedPresences = {};
98+
8199
// Clean up query subscription streams
82100
for (var id in this.subscribedQueries) {
83101
var emitter = this.subscribedQueries[id];
@@ -121,6 +139,31 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
121139
});
122140
};
123141

142+
Agent.prototype._subscribeToPresenceStream = function(channel, stream) {
143+
if (this.closed) return stream.destroy();
144+
145+
stream.on('data', function(data) {
146+
if (data.error) {
147+
logger.error('Presence subscription stream error', channel, data.error);
148+
}
149+
this._handlePresenceData(data);
150+
}.bind(this));
151+
152+
stream.on('end', function() {
153+
var requests = this.presenceRequests[channel] || {};
154+
for (var id in requests) {
155+
var request = this.presenceRequests[channel][id];
156+
request.seq++;
157+
request.p = null;
158+
this._broadcastPresence(request, function(error) {
159+
if (error) logger.error('Error broadcasting disconnect presence', channel, error);
160+
});
161+
}
162+
delete this.subscribedPresences[channel];
163+
delete this.presenceRequests[channel];
164+
}.bind(this));
165+
};
166+
124167
Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
125168
var previous = this.subscribedQueries[queryId];
126169
if (previous) previous.destroy();
@@ -311,14 +354,18 @@ Agent.prototype._checkRequest = function(request) {
311354
if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') {
312355
// Query messages need an ID property.
313356
if (typeof request.id !== 'number') return 'Missing query ID';
314-
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u') {
357+
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') {
315358
// Doc-based request.
316359
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
317360
if (request.d != null && typeof request.d !== 'string') return 'Invalid id';
318361

319-
if (request.a === 'op') {
362+
if (request.a === 'op' || request.a === 'p') {
320363
if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version';
321364
}
365+
366+
if (request.a === 'p') {
367+
if (typeof request.id !== 'string') return 'Missing presence ID';
368+
}
322369
} else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') {
323370
// Bulk request
324371
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
@@ -369,6 +416,19 @@ Agent.prototype._handleMessage = function(request, callback) {
369416
return this._fetchSnapshot(request.c, request.d, request.v, callback);
370417
case 'nt':
371418
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
419+
case 'p':
420+
var presence = this._createPresence(request);
421+
if (presence.t && !util.supportsPresence(types.map[presence.t])) {
422+
return callback({
423+
code: ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE,
424+
message: 'Type does not support presence: ' + presence.t
425+
});
426+
}
427+
return this._broadcastPresence(presence, callback);
428+
case 'ps':
429+
return this._subscribePresence(request.ch, request.seq, callback);
430+
case 'pu':
431+
return this._unsubscribePresence(request.ch, request.seq, callback);
372432
default:
373433
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
374434
}
@@ -669,6 +729,85 @@ Agent.prototype._src = function() {
669729
return this.src || this.clientId;
670730
};
671731

732+
Agent.prototype._broadcastPresence = function(presence, callback) {
733+
var requests = this.presenceRequests[presence.ch] || (this.presenceRequests[presence.ch] = {});
734+
var previousRequest = requests[presence.id];
735+
if (!previousRequest || previousRequest.seq < presence.seq) {
736+
this.presenceRequests[presence.ch][presence.id] = presence;
737+
}
738+
this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) {
739+
if (error) return callback(error);
740+
var channel = this._getPresenceChannel(presence.ch);
741+
this.backend.pubsub.publish([channel], presence, function(error) {
742+
if (error) return callback(error);
743+
callback(null, presence);
744+
});
745+
}.bind(this));
746+
};
747+
748+
Agent.prototype._createPresence = function(request) {
749+
return {
750+
a: 'p',
751+
ch: request.ch,
752+
src: this.clientId,
753+
seq: request.seq,
754+
id: request.id,
755+
p: request.p,
756+
c: request.c,
757+
d: request.d,
758+
v: request.v,
759+
t: request.t
760+
};
761+
};
762+
763+
Agent.prototype._subscribePresence = function(channel, seq, callback) {
764+
var presenceChannel = this._getPresenceChannel(channel);
765+
this.backend.pubsub.subscribe(presenceChannel, function(error, stream) {
766+
if (error) return callback(error);
767+
if (seq < this.presenceSubscriptionSeq) return callback(null, {ch: channel, seq: seq});
768+
this.presenceSubscriptionSeq = seq;
769+
this.subscribedPresences[channel] = stream;
770+
this._subscribeToPresenceStream(channel, stream);
771+
this._requestPresence(channel, function(error) {
772+
callback(error, {ch: channel, seq: seq});
773+
});
774+
}.bind(this));
775+
};
776+
777+
Agent.prototype._unsubscribePresence = function(channel, seq, callback) {
778+
if (seq < this.presenceSubscriptionSeq) return;
779+
this.presenceSubscriptionSeq = seq;
780+
var stream = this.subscribedPresences[channel];
781+
if (stream) stream.destroy();
782+
callback(null, {ch: channel, seq: seq});
783+
};
784+
785+
Agent.prototype._getPresenceChannel = function(channel) {
786+
return '$presence.' + channel;
787+
};
788+
789+
Agent.prototype._requestPresence = function(channel, callback) {
790+
var presenceChannel = this._getPresenceChannel(channel);
791+
this.backend.pubsub.publish([presenceChannel], {ch: channel, r: true, src: this.clientId}, callback);
792+
};
793+
794+
Agent.prototype._handlePresenceData = function(presence) {
795+
if (presence.src === this.clientId) return;
796+
797+
if (presence.r) return this.send({a: 'pr', ch: presence.ch});
798+
799+
var backend = this.backend;
800+
var context = {
801+
collection: presence.c,
802+
presence: presence
803+
};
804+
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
805+
if (error) {
806+
return this.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
807+
}
808+
this.send(presence);
809+
}.bind(this));
810+
};
672811

673812
function createClientOp(request, clientId) {
674813
// src can be provided if it is not the same as the current agent,

lib/backend.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ Backend.prototype.MIDDLEWARE_ACTIONS = {
6666
// by design, changing existing reply properties can cause weird bugs, since
6767
// the rest of ShareDB would be unaware of those changes.
6868
reply: 'reply',
69+
// About to send presence information to a client
70+
sendPresence: 'sendPresence',
6971
// An operation is about to be submitted to the database
7072
submit: 'submit'
7173
};
@@ -822,6 +824,22 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca
822824
callback(error, snapshot);
823825
};
824826

827+
Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) {
828+
if (!presence.c || !presence.d) return callback(null, presence);
829+
this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) {
830+
if (error) return callback(error);
831+
for (var i = 0; i < ops.length; i++) {
832+
var op = ops[i];
833+
var isOwnOp = op.src === presence.src;
834+
var transformError = ot.transformPresence(presence, op, isOwnOp);
835+
if (transformError) {
836+
return callback(transformError);
837+
}
838+
}
839+
callback(null, presence);
840+
});
841+
};
842+
825843
function pluckIds(snapshots) {
826844
var ids = [];
827845
for (var i = 0; i < snapshots.length; i++) {

0 commit comments

Comments
 (0)