Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mixmix committed Sep 11, 2023
1 parent 6ff280c commit a8004e5
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 31 deletions.
54 changes: 36 additions & 18 deletions lib/epochs.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const isSubsetOf = require('set.prototype.issubsetof')
const intersection = require('set.prototype.intersection')

const { groupRecp } = require('./operators')
const hookClose = require('./hook-close')
const getTangleUpdates = require('./tangles/get-tangle-updates')

const msgPattern = toPattern(new Butt64('ssb:message/[a-zA-Z0-9-]+/', null, 32))
Expand Down Expand Up @@ -71,8 +72,9 @@ const strategy = OverwriteFields({
// PATCH: @tangle/reduce needs this
strategy.mapToPure = (T) => T || strategy.identity()

module.exports = Epochs
function Epochs(ssb) {
module.exports = function Epochs(ssb) {
hookClose(ssb)

const allGetters = {
author(epochRoot, cb) {
ssb.metafeeds.findRootFeedId(epochRoot.value.author, cb)
Expand Down Expand Up @@ -255,7 +257,8 @@ function Epochs(ssb) {
epochsReduce(groupId, { getters: allGetters }, (err, reduce) => {
if (err) return cb(clarify(err, 'Failed to resolve epoch @tangle/reduce'))

buildPreferredEpoch(reduce, cb)
const handleDisjointEpochs = FixDisjointEpochs(ssb, groupId)
buildPreferredEpoch(reduce, handleDisjointEpochs, cb)
})
}

Expand All @@ -276,10 +279,12 @@ function Epochs(ssb) {
return
}

const handleDisjointEpochs = FixDisjointEpochs(ssb, groupId)

var sync = false
const source = pull(
epochsReduce.stream(groupId, { getters: allGetters, live }),
pull.asyncMap(buildPreferredEpoch),
pull.asyncMap((reduce, cb) => buildPreferredEpoch(reduce, handleDisjointEpochs, cb)),
pull.filter(epoch => {
// if have seen current preferredEpoch, allow through
if (sync) return true
Expand Down Expand Up @@ -442,17 +447,6 @@ function Epochs(ssb) {
}
}

function tieBreak(epochs) {
if (!epochs || !Array.isArray(epochs))
throw Error('tieBreak requires an Array of epochs')

const keys = epochs.map((epoch) => epoch.secret.toString('hex')).sort()

const winningKey = Buffer.from(keys[0], 'hex')

return epochs.find((epoch) => epoch.secret.equals(winningKey))
}

function epochNodeStream(ssb, groupId, opts = {}) {
const { getters, live } = opts
const deferredSource = pullDefer.source()
Expand Down Expand Up @@ -530,8 +524,30 @@ function getGroupInit(ssb, groupId, cb) {
})
}

function FixDisjointEpochs(ssb, groupId) {
const timeout = (110 * Math.random() + 10) * 1000 // 10-120 seconds

const timeoutId = setTimeout(() => {
// check if disjoint still
// start resolving
}, timeout)

hookClose.onClose(() => clearTimeout(timeoutId))
}

/* HELPERS */
function buildPreferredEpoch(reduce, cb) {
function tieBreak(epochs) {
if (!epochs || !Array.isArray(epochs))
throw Error('tieBreak requires an Array of epochs')

const keys = epochs.map((epoch) => epoch.secret.toString('hex')).sort()

const winningKey = Buffer.from(keys[0], 'hex')

return epochs.find((epoch) => epoch.secret.equals(winningKey))
}

function buildPreferredEpoch(reduce, handleDisjointEpochs, cb) {
const tips = Object.keys(reduce.state).map((id) => {
const info = {
id,
Expand Down Expand Up @@ -561,10 +577,12 @@ function buildPreferredEpoch(reduce, cb) {
}

// case 4.6 - groups have overlapping membership, but disjoint
else if (intersection(members0, members1).size > 0) {
else if (
intersection(members0, members1).size > 0
) {
// choose one, but also kick off resolution
preferredEpoch = tieBreak(tips)
// after a random time, check
if (handleDisjointEpochs) handleDisjointEpochs()
}

// case 4.7 - disjoint membership (no overlap!)
Expand Down
27 changes: 27 additions & 0 deletions lib/hook-close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-FileCopyrightText: 2023 Mix Irving <[email protected]>
//
// SPDX-License-Identifier: LGPL-3.0-only

let closeHooked = false
const calls = []

function hookClose(ssb) {
if (closeHooked) return

ssb.close.hook((close, args) => {
close.apply(ssb, args)
calls.forEach((fn) => fn())
})

closeHooked = true
}

// push a function to this list to have it called when the client is closing
hookClose.onClose = (call) => {
if (!closeHooked) throw Error('onClose requires ssb.close to already be hooked!')

if (typeof call === 'function') calls.push(call)
else throw Error('addCall only accepts functions')
}

module.exports = hookClose
20 changes: 7 additions & 13 deletions listeners.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-FileCopyrightText: 2023 Mix Irving <[email protected]>
//
// SPDX-License-Identifier: LGPL-3.0-only
//

const {
where,
Expand All @@ -25,11 +24,10 @@ const {
const pull = require('pull-stream')
const paraMap = require('pull-paramap')
const clarify = require('clarify-error')

const Epochs = require('./lib/epochs')
const { reAddMembers, createNewEpoch } = require('./lib/exclude')

// push a function to this list to have it called when the client is closing
const closeCalls = []
const hookClose = require('./lib/hook-close')

function randomTimeout(config) {
if (!config) throw new Error('Please give config')
Expand All @@ -40,15 +38,11 @@ function randomTimeout(config) {
}

module.exports = function startListeners(ssb, config, onError) {
const { getTipEpochs, getPreferredEpoch, getMembers } = Epochs(ssb)

hookClose(ssb)
let isClosed = false
ssb.close.hook((close, args) => {
isClosed = true
close.apply(ssb, args)
hookClose.onClose(() => { isClosed = true })

closeCalls.forEach((fn) => fn())
})
const { getTipEpochs, getPreferredEpoch, getMembers } = Epochs(ssb)

ssb.metafeeds.findOrCreate((err, myRoot) => {
// prettier-ignore
Expand Down Expand Up @@ -229,7 +223,7 @@ module.exports = function startListeners(ssb, config, onError) {
if (err && !isClosed) return onError(clarify(err, 'Failed re-adding members to epoch that missed some'))
})
}, timeout)
closeCalls.push(() => clearTimeout(timeoutId))
hookClose.onClose(() => clearTimeout(timeoutId))

// if we find an exclude and it's not excluding us but we don't find a new epoch, even after a while, then create a new epoch, since we assume that the excluder crashed or something
pull(
Expand Down Expand Up @@ -266,7 +260,7 @@ module.exports = function startListeners(ssb, config, onError) {
})
}, timeout)

closeCalls.push(() => clearTimeout(timeoutId))
hookClose.onClose(() => clearTimeout(timeoutId))
},
(err) => {
// prettier-ignore
Expand Down

0 comments on commit a8004e5

Please sign in to comment.