-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlinksAdapter.ts
204 lines (180 loc) · 6.3 KB
/
linksAdapter.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import { LinkSyncAdapter, PerspectiveDiffObserver, HolochainLanguageDelegate, LanguageContext, PerspectiveDiff, LinkExpression, DID } from "@perspect3vism/ad4m";
import { Perspective } from "@perspect3vism/ad4m";
import { DNA_NICK, ZOME_NAME } from "./dna";
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
class PeerInfo {
currentRevision: Buffer;
lastSeen: Date;
};
//@ts-ignore
export class LinkAdapter implements LinkSyncAdapter {
hcDna: HolochainLanguageDelegate;
linkCallback?: PerspectiveDiffObserver
peers: Map<DID, PeerInfo> = new Map();
me: DID
gossipLogCount: number = 0;
myCurrentRevision: Buffer | null = null;
constructor(context: LanguageContext) {
//@ts-ignore
this.hcDna = context.Holochain as HolochainLanguageDelegate;
this.me = context.agent.did;
}
writable(): boolean {
return true;
}
public(): boolean {
return false;
}
async others(): Promise<DID[]> {
return await this.hcDna.call(DNA_NICK, ZOME_NAME, "get_others", null);
}
async currentRevision(): Promise<string> {
let res = await this.hcDna.call(DNA_NICK, ZOME_NAME, "current_revision", null);
return res as string;
}
async sync(): Promise<PerspectiveDiff> {
let current_revision = await this.hcDna.call(DNA_NICK, ZOME_NAME, "sync", null);
if (current_revision && Buffer.isBuffer(current_revision)) {
this.myCurrentRevision = current_revision;
}
await this.gossip();
return new PerspectiveDiff()
}
async gossip() {
this.gossipLogCount += 1;
let lostPeers: DID[] = [];
this.peers.forEach( (peerInfo, peer) => {
if (peerInfo.lastSeen.getTime() + 10000 < new Date().getTime()) {
lostPeers.push(peer);
}
});
for (const peer of lostPeers) {
this.peers.delete(peer);
}
// flatten the map into an array of peers
let peers = Array.from(this.peers.keys());
peers.push(this.me);
// Lexically sort the peers
peers.sort();
// If we are the first peer, we are the scribe
let is_scribe = peers[0] == this.me;
// Get a deduped set of all peer's current revisions
let revisions = new Set<Buffer>();
for(const peerInfo of this.peers.values()) {
if (peerInfo.currentRevision) revisions.add(peerInfo.currentRevision);
}
revisions.forEach( async (hash) => {
if(!hash) return
if (this.myCurrentRevision && hash.equals(this.myCurrentRevision)) return
let pullResult = await this.hcDna.call(DNA_NICK, ZOME_NAME, "pull", {
hash,
is_scribe
});
if (pullResult) {
if (pullResult.current_revision && Buffer.isBuffer(pullResult.current_revision)) {
let myRevision = pullResult.current_revision;
this.myCurrentRevision = myRevision;
}
}
})
//Only show the gossip log every 10th iteration
if (this.gossipLogCount == 10) {
console.log(`
======
GOSSIP
--
me: ${this.me}
is scribe: ${is_scribe}
--
${Array.from(this.peers.entries()).map( ([peer, peerInfo]) => {
//@ts-ignore
return `${peer}: ${peerInfo.currentRevision.toString('base64')} ${peerInfo.lastSeen.toISOString()}\n`
})}
--
revisions: ${Array.from(revisions).map( (hash) => {
//@ts-ignore
return hash.toString('base64')
})}
`);
this.gossipLogCount = 0;
}
}
async render(): Promise<Perspective> {
let res = await this.hcDna.call(DNA_NICK, ZOME_NAME, "render", null);
return new Perspective(res.links);
}
async commit(diff: PerspectiveDiff): Promise<string> {
let prep_diff = {
additions: diff.additions.map((diff) => prepareLinkExpression(diff)),
removals: diff.removals.map((diff) => prepareLinkExpression(diff))
}
let res = await this.hcDna.call(DNA_NICK, ZOME_NAME, "commit", prep_diff);
if (res && Buffer.isBuffer(res)) {
this.myCurrentRevision = res;
}
return res as string;
}
addCallback(callback: PerspectiveDiffObserver): number {
this.linkCallback = callback;
return 1;
}
async handleHolochainSignal(signal: any): Promise<void> {
const { diff, reference_hash, reference, broadcast_author } = signal.payload;
//Check if this signal came from another agent & contains a diff and reference_hash
if (diff && reference_hash && reference && broadcast_author) {
// console.log(`PerspectiveDiffSync.handleHolochainSignal:
// diff: ${JSON.stringify(diff)}
// reference_hash: ${reference_hash.toString('base64')}
// reference: {
// diff: ${reference.diff?.toString('base64')}
// parents: ${reference.parents ? reference.parents.map( (parent: Buffer) => parent ? parent.toString('base64') : 'null').join(', '):'none'}
// diffs_since_snapshot: ${reference?.diffs_since_snapshot}
// }
// broadcast_author: ${broadcast_author}
// `)
this.peers.set(broadcast_author, { currentRevision: reference_hash, lastSeen: new Date() });
} else {
//console.log("PerspectiveDiffSync.handleHolochainSignal: received a signals from ourselves in fast_forward_signal or in a pull: ", signal.payload);
//This signal only contains link data and no reference, and therefore came from us in a pull in fast_forward_signal
if (this.linkCallback) {
this.linkCallback(signal.payload);
}
}
}
async addActiveAgentLink(hcDna: HolochainLanguageDelegate): Promise<any> {
if (hcDna == undefined) {
console.warn("===Perspective-diff-sync: Error tried to add an active agent link but received no hcDna to add the link onto");
} else {
return await hcDna.call(
DNA_NICK,
ZOME_NAME,
"add_active_agent_link",
null
);
}
}
}
function prepareLinkExpression(link: LinkExpression): object {
const data = Object.assign(link);
if (data.data.source == "") {
data.data.source = null;
}
if (data.data.target == "") {
data.data.target = null;
}
if (data.data.predicate == "") {
data.data.predicate = null;
}
if (data.data.source == undefined) {
data.data.source = null;
}
if (data.data.target == undefined) {
data.data.target = null;
}
if (data.data.predicate == undefined) {
data.data.predicate = null;
}
return data;
}