Skip to content

Commit

Permalink
feat: switch to ystream persistence (#4)
Browse files Browse the repository at this point in the history
* feat: add basic ystream setup

* chore: update upstream

* chore: bump ystream

* refactor: bump api

* feat: switch persistence
  • Loading branch information
doodlewind authored May 28, 2024
1 parent 8601bf9 commit 2aeb83e
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"skipFiles": ["<node_internals>/**"],
"env": {
"NODE_OPTIONS": "--import=./register.js",
"INSTANCE_NAME": "",
"INSTANCE_NAME": "demo",
"PORT": "3000"
},
"program": "${workspaceFolder}/packages/server/src/index.ts",
Expand Down
4 changes: 2 additions & 2 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
"type": "module",
"scripts": {
"start": "NODE_OPTIONS=--import=./register.js node src/index.ts",
"clean": "rm -rf ./.db*"
"clean": "rm -rf ./.db* db*.json"
},
"dependencies": {
"@notesuite/common": "workspace:*",
"@y/stream": "^0.0.3",
"body-parser": "^1.20.2",
"cors": "^2.8.5",
"express": "^4.19.2",
Expand All @@ -19,7 +20,6 @@
"ts-node": "^10.9.2",
"webdav-server": "^2.6.2",
"ws": "^8.16.0",
"y-leveldb": "^0.1.2",
"y-protocols": "^1.0.6",
"y-websocket": "^2.0.2",
"yjs": "^13.5.6"
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Y from 'yjs';
// @ts-ignore
import { getYDoc } from './third-party/y-websocket.js';
import { getYDoc } from './ystream/adaptor.js';
import type { AppContext } from './utils.js';
// import { exportSnapshot } from '@notesuite/common/dist/editor.js';

Expand Down
46 changes: 6 additions & 40 deletions packages/server/src/third-party/y-websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as encoding from 'lib0/encoding'
import * as decoding from 'lib0/decoding'
import * as map from 'lib0/map'
import * as lodash from 'lodash'
import { getYDoc as getYstreamDoc } from '../ystream/adaptor.js'

const { debounce } = lodash

Expand All @@ -25,44 +26,8 @@ const wsReadyStateClosed = 3 // eslint-disable-line

// disable gc when using snapshots!
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0'
const persistenceDir = `./.db-${process.env.INSTANCE_NAME || 'default'}`
/**
* @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise<any>, provider: any}|null}
*/
let persistence = null
if (typeof persistenceDir === 'string') {
console.info('Persisting documents to "' + persistenceDir + '"')
// @ts-ignore
const { LeveldbPersistence } = await import('y-leveldb')
const ldb = new LeveldbPersistence(persistenceDir)
persistence = {
provider: ldb,
bindState: async (docName, ydoc) => {
const persistedYdoc = await ldb.getYDoc(docName)
const newUpdates = Y.encodeStateAsUpdate(ydoc)
ldb.storeUpdate(docName, newUpdates)
Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc))
ydoc.on('update', update => {
ldb.storeUpdate(docName, update)
})
},
writeState: async (_docName, _ydoc) => {}
}
}

/**
* @param {{bindState: function(string,WSSharedDoc):void,
* writeState:function(string,WSSharedDoc):Promise<any>,provider:any}|null} persistence_
*/
export const setPersistence = persistence_ => {
persistence = persistence_
}

/**
* @return {null|{bindState: function(string,WSSharedDoc):void,
* writeState:function(string,WSSharedDoc):Promise<any>}|null} used persistence layer
*/
export const getPersistence = () => persistence
let persistence = null

/**
* @type {Map<string,WSSharedDoc>}
Expand Down Expand Up @@ -167,10 +132,11 @@ export const getYDoc = async (docname, gc = true) => {
if (!doc) {
doc = new WSSharedDoc(docname)
doc.gc = gc
if (persistence !== null) {
await persistence.bindState(docname, doc)
}
docs.set(docname, doc)

const peerDoc = await getYstreamDoc(docname)
peerDoc.on('update', update => Y.applyUpdate(doc, update))
doc.on('update', update => Y.applyUpdate(peerDoc, update))
}
return doc;
}
Expand Down
4 changes: 2 additions & 2 deletions packages/server/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import { JSONFilePreset } from 'lowdb/node';
import type http from 'http';
import {
setupWSConnection,
getYDoc,
docs as serverDocs,
// @ts-ignore
} from './third-party/y-websocket.js';
import * as Y from 'yjs';
import { getYDoc } from './ystream/adaptor.js';

const instanceName = process.env.INSTANCE_NAME || 'default';
const docs = serverDocs as Map<string, Y.Doc>;
Expand Down Expand Up @@ -44,7 +44,7 @@ async function initDB() {
activeWorkspaceId?: string;
workspaces: { id: string; rootId: string; name: string }[];
} = { workspaces: [] };
const db = await JSONFilePreset(`./.db-${instanceName}/db.json`, defaultData);
const db = await JSONFilePreset(`./db-${instanceName}.json`, defaultData);
await db.read();
await db.write();
return db;
Expand Down
42 changes: 42 additions & 0 deletions packages/server/src/ystream/adaptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import * as Ystream from '@y/stream';
import * as wscomm from '@y/stream/comms/websocket';
import * as authentication from '@y/stream/api/authentication';
import { createWSServer } from '@y/stream/comms/websocket-server';
import {
collectionDef,
testServerIdentity,
testUser,
} from './auth.js';

const dbname = `./.db-ystream-${process.env.INSTANCE_NAME}`;

const remoteServer = await createWSServer({
port: 9000,
dbname,
identity: testServerIdentity,
});

// const comm = new wscomm.WebSocketComm('ws://localhost:9000', collectionDef);
// await Ystream.remove(dbname);
const ystream = await Ystream.open(dbname, {
// comms: [comm],
});
await authentication.registerUser(ystream, testServerIdentity.user, {
isTrusted: true,
});
await authentication.setUserIdentity(
ystream,
testUser.user,
await testUser.user.publicKey,
testUser.privateKey
);

const { owner, name } = collectionDef;
const collection = ystream.getCollection(owner, name) as Ystream.Collection;


export async function getYDoc(id: string) {
const ydoc = collection.getYdoc(id);
await ydoc.whenLoaded;
return ydoc;
}
39 changes: 39 additions & 0 deletions packages/server/src/ystream/auth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import * as dbtypes from '@y/stream/api/dbtypes';
import * as json from 'lib0/json';
import * as buffer from 'lib0/buffer';
import * as decoding from 'lib0/decoding';
import * as ecdsa from 'lib0/crypto/ecdsa';

const testUserRaw = {
privateKey:
'{"key_ops":["sign"],"ext":true,"kty":"EC","x":"pAUmLYc-UFmPIt7leafPTbhxQyygcaW7__nPcUNCuu0wH27yS9P_pWFP1GwcsoAN","y":"u3109KjrPGsNUn2k5Whn2uHLAckQPdLNqtM4GpBEpUJwlvVDvk71-lS3YOEYJ_Sq","crv":"P-384","d":"OHnRw5an9hlSqSKg966lFRvB7dow669pVSn7sFZUi7UQh_Y9Xc95SQ6pEWsofsYD"}',
user: 'AMgBeyJrZXlfb3BzIjpbInZlcmlmeSJdLCJleHQiOnRydWUsImt0eSI6IkVDIiwieCI6InBBVW1MWWMtVUZtUEl0N2xlYWZQVGJoeFF5eWdjYVc3X19uUGNVTkN1dTB3SDI3eVM5UF9wV0ZQMUd3Y3NvQU4iLCJ5IjoidTMxMDlLanJQR3NOVW4yazVXaG4ydUhMQWNrUVBkTE5xdE00R3BCRXBVSndsdlZEdms3MS1sUzNZT0VZSl9TcSIsImNydiI6IlAtMzg0In0=',
};

const testServerIdentityRaw = {
privateKey:
'{"key_ops":["sign"],"ext":true,"kty":"EC","x":"CYwMakpn0onaNeCa-wqLn4Fzsris_UY4Z5gRQUA9xQOoh94YG9OHhItr6rovaYpZ","y":"74Ulju86IUMJZsYsSjxSjusLjj9U6rozZwbK9Xaqj3MgIWtnjNyjL1D-NzOP3FJ7","crv":"P-384","d":"-yKNOty9EshGL0yAOQ2q6c_b_PNCpeEK9FVPoB0wc9EUyt9BR4DZuqrC9t_DgNaF"}',
user: 'AMgBeyJrZXlfb3BzIjpbInZlcmlmeSJdLCJleHQiOnRydWUsImt0eSI6IkVDIiwieCI6IkNZd01ha3BuMG9uYU5lQ2Etd3FMbjRGenNyaXNfVVk0WjVnUlFVQTl4UU9vaDk0WUc5T0hoSXRyNnJvdmFZcFoiLCJ5IjoiNzRVbGp1ODZJVU1KWnNZc1NqeFNqdXNMamo5VTZyb3pad2JLOVhhcWozTWdJV3Ruak55akwxRC1Oek9QM0ZKNyIsImNydiI6IlAtMzg0In0=',
};

export const testUser = {
privateKey: await ecdsa.importKeyJwk(json.parse(testUserRaw.privateKey)),
user: dbtypes.UserIdentity.decode(
decoding.createDecoder(buffer.fromBase64(testUserRaw.user))
),
};

export const testServerIdentity = {
privateKey: await ecdsa.importKeyJwk(
json.parse(testServerIdentityRaw.privateKey)
),
user: dbtypes.UserIdentity.decode(
decoding.createDecoder(buffer.fromBase64(testServerIdentityRaw.user))
),
};

export const owner = testUser.user.hash;
export const collectionDef = {
owner: buffer.toBase64(owner),
name: 'c1',
};
Loading

0 comments on commit 2aeb83e

Please sign in to comment.