Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 8705abd

Browse files
authored
Merge pull request share#220 from alecgibson/get-snapshot
Add support for fetching a particular version of a snapshot
2 parents 08f3339 + e80fe46 commit 8705abd

File tree

11 files changed

+624
-29
lines changed

11 files changed

+624
-29
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
# Emacs
99
\#*\#
1010

11+
# VS Code
12+
.vscode/
13+
1114
# Logs
1215
logs
1316
*.log

README.md

+23-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var socket = new WebSocket('ws://' + window.location.host);
3838
var connection = new sharedb.Connection(socket);
3939
```
4040

41-
The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.
41+
The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.
4242

4343
The easiest way is to give it a WebSocket object that does reconnect. There are plenty of example on the web. The most important thing is that the custom reconnecting websocket, must have the same API as the native rfc6455 version.
4444

@@ -227,6 +227,27 @@ changes. Returns a [`ShareDB.Query`](#class-sharedbquery) instance.
227227
* `options.*`
228228
All other options are passed through to the database adapter.
229229

230+
`connection.fetchSnapshot(collection, id, version, callback): void;`
231+
Get a read-only snapshot of a document at the requested version.
232+
233+
* `collection` _(String)_
234+
Collection name of the snapshot
235+
* `id` _(String)_
236+
ID of the snapshot
237+
* `version` _(number) [optional]_
238+
The version number of the desired snapshot
239+
* `callback` _(Function)_
240+
Called with `(error, snapshot)`, where `snapshot` takes the following form:
241+
242+
```javascript
243+
{
244+
id: string; // ID of the snapshot
245+
v: number; // version number of the snapshot
246+
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
247+
data: any; // the snapshot
248+
}
249+
```
250+
230251
### Class: `ShareDB.Doc`
231252

232253
`doc.type` _(String_)
@@ -375,6 +396,7 @@ Additional fields may be added to the error object for debugging context dependi
375396
* 4021 - Invalid client id
376397
* 4022 - Database adapter does not support queries
377398
* 4023 - Cannot project snapshots of this type
399+
* 4024 - Invalid version
378400

379401
### 5000 - Internal error
380402

lib/agent.js

+6
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,8 @@ Agent.prototype._handleMessage = function(request, callback) {
300300
var op = this._createOp(request);
301301
if (!op) return callback({code: 4000, message: 'Invalid op message'});
302302
return this._submit(request.c, request.d, op, callback);
303+
case 'nf':
304+
return this._fetchSnapshot(request.c, request.d, request.v, callback);
303305
default:
304306
callback({code: 4000, message: 'Invalid or unknown message'});
305307
}
@@ -582,3 +584,7 @@ Agent.prototype._createOp = function(request) {
582584
return new DeleteOp(src, request.seq, request.v, request.del);
583585
}
584586
};
587+
588+
Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
589+
this.backend.fetchSnapshot(this, collection, id, version, callback);
590+
};

lib/backend.js

+66
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ var MemoryPubSub = require('./pubsub/memory');
77
var ot = require('./ot');
88
var projections = require('./projections');
99
var QueryEmitter = require('./query-emitter');
10+
var Snapshot = require('./snapshot');
1011
var StreamSocket = require('./stream-socket');
1112
var SubmitRequest = require('./submit-request');
13+
var types = require('./types');
14+
1215
var warnDeprecatedDoc = true;
1316
var warnDeprecatedAfterSubmit = true;
1417

@@ -580,6 +583,69 @@ Backend.prototype.getChannels = function(collection, id) {
580583
];
581584
};
582585

586+
Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback) {
587+
var start = Date.now();
588+
var backend = this;
589+
var projection = this.projections[index];
590+
var collection = projection ? projection.target : index;
591+
var request = {
592+
agent: agent,
593+
index: index,
594+
collection: collection,
595+
id: id,
596+
version: version
597+
};
598+
599+
this._fetchSnapshot(collection, id, version, function (error, snapshot) {
600+
if (error) return callback(error);
601+
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
602+
var snapshots = [snapshot];
603+
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, function (error) {
604+
if (error) return callback(error);
605+
backend.emit('timing', 'fetchSnapshot', Date.now() - start, request);
606+
callback(null, snapshot);
607+
});
608+
});
609+
};
610+
611+
Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
612+
// Bypass backend.getOps so that we don't call _sanitizeOps. We want to avoid this, because:
613+
// - we want to avoid the 'op' middleware, because we later use the 'readSnapshots' middleware in _sanitizeSnapshots
614+
// - we handle the projection in _sanitizeSnapshots
615+
this.db.getOps(collection, id, 0, version, null, function (error, ops) {
616+
if (error) return callback(error);
617+
618+
var type = null;
619+
var data;
620+
var fetchedVersion = 0;
621+
622+
for (var index = 0; index < ops.length; index++) {
623+
var op = ops[index];
624+
fetchedVersion = op.v + 1;
625+
626+
if (op.create) {
627+
type = types.map[op.create.type];
628+
if (!type) return callback({ code: 4008, message: 'Unknown type' });
629+
data = type.create(op.create.data);
630+
} else if (op.del) {
631+
data = undefined;
632+
type = null;
633+
} else {
634+
data = type.apply(data, op.op);
635+
}
636+
}
637+
638+
type = type ? type.uri : null;
639+
640+
if (version > fetchedVersion) {
641+
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
642+
}
643+
644+
var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
645+
callback(null, snapshot);
646+
});
647+
};
648+
583649
function pluckIds(snapshots) {
584650
var ids = [];
585651
for (var i = 0; i < snapshots.length; i++) {

lib/client/connection.js

+63-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
var Doc = require('./doc');
22
var Query = require('./query');
3+
var SnapshotRequest = require('./snapshot-request');
34
var emitter = require('../emitter');
45
var ShareDBError = require('../error');
56
var types = require('../types');
@@ -33,13 +34,17 @@ function Connection(socket) {
3334
// (created documents MUST BE UNIQUE)
3435
this.collections = {};
3536

36-
// Each query is created with an id that the server uses when it sends us
37-
// info about the query (updates, etc)
37+
// Each query and snapshot request is created with an id that the server uses when it sends us
38+
// info about the request (updates, etc)
3839
this.nextQueryId = 1;
40+
this.nextSnapshotRequestId = 1;
3941

4042
// Map from query ID -> query object.
4143
this.queries = {};
4244

45+
// Map from snapshot request ID -> snapshot request
46+
this._snapshotRequests = {};
47+
4348
// A unique message number for the given id
4449
this.seq = 1;
4550

@@ -226,6 +231,9 @@ Connection.prototype.handleMessage = function(message) {
226231
case 'bu':
227232
return this._handleBulkMessage(message, '_handleUnsubscribe');
228233

234+
case 'nf':
235+
return this._handleSnapshotFetch(err, message);
236+
229237
case 'f':
230238
var doc = this.getExisting(message.c, message.d);
231239
if (doc) doc._handleFetch(err, message.data);
@@ -310,6 +318,11 @@ Connection.prototype._setState = function(newState, reason) {
310318
docs[id]._onConnectionStateChanged();
311319
}
312320
}
321+
// Emit the event to all snapshots
322+
for (var id in this._snapshotRequests) {
323+
var snapshotRequest = this._snapshotRequests[id];
324+
snapshotRequest._onConnectionStateChanged();
325+
}
313326
this.endBulk();
314327

315328
this.emit(newState, reason);
@@ -523,7 +536,8 @@ Connection.prototype.createSubscribeQuery = function(collection, q, options, cal
523536
Connection.prototype.hasPending = function() {
524537
return !!(
525538
this._firstDoc(hasPending) ||
526-
this._firstQuery(hasPending)
539+
this._firstQuery(hasPending) ||
540+
this._firstSnapshotRequest()
527541
);
528542
};
529543
function hasPending(object) {
@@ -552,6 +566,11 @@ Connection.prototype.whenNothingPending = function(callback) {
552566
query.once('ready', this._nothingPendingRetry(callback));
553567
return;
554568
}
569+
var snapshotRequest = this._firstSnapshotRequest();
570+
if (snapshotRequest) {
571+
snapshotRequest.once('ready', this._nothingPendingRetry(callback));
572+
return;
573+
}
555574
// Call back when no pending operations
556575
process.nextTick(callback);
557576
};
@@ -584,3 +603,44 @@ Connection.prototype._firstQuery = function(fn) {
584603
}
585604
}
586605
};
606+
607+
Connection.prototype._firstSnapshotRequest = function () {
608+
for (var id in this._snapshotRequests) {
609+
return this._snapshotRequests[id];
610+
}
611+
};
612+
613+
/**
614+
* Fetch a read-only snapshot at a given version
615+
*
616+
* @param collection - the collection name of the snapshot
617+
* @param id - the ID of the snapshot
618+
* @param version (optional) - the version number to fetch
619+
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
620+
*
621+
* {
622+
* id: string; // ID of the snapshot
623+
* v: number; // version number of the snapshot
624+
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
625+
* data: any; // the snapshot
626+
* }
627+
*
628+
*/
629+
Connection.prototype.fetchSnapshot = function(collection, id, version, callback) {
630+
if (typeof version === 'function') {
631+
callback = version;
632+
version = null;
633+
}
634+
635+
var requestId = this.nextSnapshotRequestId++;
636+
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
637+
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
638+
snapshotRequest.send();
639+
};
640+
641+
Connection.prototype._handleSnapshotFetch = function (error, message) {
642+
var snapshotRequest = this._snapshotRequests[message.id];
643+
if (!snapshotRequest) return;
644+
delete this._snapshotRequests[message.id];
645+
snapshotRequest._handleResponse(error, message);
646+
};

lib/client/snapshot-request.js

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
var Snapshot = require('../snapshot');
2+
var util = require('../util');
3+
var emitter = require('../emitter');
4+
5+
module.exports = SnapshotRequest;
6+
7+
function SnapshotRequest(connection, requestId, collection, id, version, callback) {
8+
emitter.EventEmitter.call(this);
9+
10+
if (typeof callback !== 'function') {
11+
throw new Error('Callback is required for SnapshotRequest');
12+
}
13+
14+
if (!this.isValidVersion(version)) {
15+
throw new Error('Snapshot version must be a positive integer or null');
16+
}
17+
18+
this.requestId = requestId;
19+
this.connection = connection;
20+
this.id = id;
21+
this.collection = collection;
22+
this.version = version;
23+
this.callback = callback;
24+
25+
this.sent = false;
26+
}
27+
emitter.mixin(SnapshotRequest);
28+
29+
SnapshotRequest.prototype.isValidVersion = function (version) {
30+
if (version === null) {
31+
return true;
32+
}
33+
34+
if (!util.isInteger(version)) {
35+
return false;
36+
}
37+
38+
return version >= 0;
39+
}
40+
41+
SnapshotRequest.prototype.send = function () {
42+
if (!this.connection.canSend) {
43+
return;
44+
}
45+
46+
var message = {
47+
a: 'nf',
48+
id: this.requestId,
49+
c: this.collection,
50+
d: this.id,
51+
v: this.version,
52+
};
53+
54+
this.connection.send(message);
55+
this.sent = true;
56+
};
57+
58+
SnapshotRequest.prototype._onConnectionStateChanged = function () {
59+
if (this.connection.canSend && !this.sent) {
60+
this.send();
61+
} else if (!this.connection.canSend) {
62+
this.sent = false;
63+
}
64+
};
65+
66+
SnapshotRequest.prototype._handleResponse = function (error, message) {
67+
this.emit('ready');
68+
69+
if (error) {
70+
return this.callback(error);
71+
}
72+
73+
var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
74+
this.callback(null, snapshot);
75+
};

lib/db/memory.js

+4-12
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
var DB = require('./index');
2+
var Snapshot = require('../snapshot');
23

34
// In-memory ShareDB database
45
//
@@ -151,24 +152,15 @@ MemoryDB.prototype._getSnapshotSync = function(collection, id, includeMetadata)
151152
var snapshot;
152153
if (doc) {
153154
var data = clone(doc.data);
154-
var meta = (includeMetadata) ? clone(doc.m) : undefined;
155-
snapshot = new MemorySnapshot(id, doc.v, doc.type, data, meta);
155+
var meta = (includeMetadata) ? clone(doc.m) : null;
156+
snapshot = new Snapshot(id, doc.v, doc.type, data, meta);
156157
} else {
157158
var version = this._getVersionSync(collection, id);
158-
snapshot = new MemorySnapshot(id, version, null, undefined);
159+
snapshot = new Snapshot(id, version, null, undefined, null);
159160
}
160161
return snapshot;
161162
};
162163

163-
// `id`, and `v` should be on every returned snapshot
164-
function MemorySnapshot(id, version, type, data, meta) {
165-
this.id = id;
166-
this.v = version;
167-
this.type = type;
168-
this.data = data;
169-
if (meta) this.m = meta;
170-
}
171-
172164
MemoryDB.prototype._getOpLogSync = function(collection, id) {
173165
var collectionOps = this.ops[collection] || (this.ops[collection] = {});
174166
return collectionOps[id] || (collectionOps[id] = []);

lib/snapshot.js

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module.exports = Snapshot;
2+
function Snapshot(id, version, type, data, meta) {
3+
this.id = id;
4+
this.v = version;
5+
this.type = type;
6+
this.data = data;
7+
this.m = meta;
8+
}

lib/util.js

+7
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,10 @@ exports.hasKeys = function(object) {
66
for (var key in object) return true;
77
return false;
88
};
9+
10+
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/isInteger#Polyfill
11+
exports.isInteger = Number.isInteger || function (value) {
12+
return typeof value === 'number' &&
13+
isFinite(value) &&
14+
Math.floor(value) === value;
15+
};

0 commit comments

Comments
 (0)