Skip to content
152 changes: 130 additions & 22 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ function Multifeed (storage, opts) {

// random-access-storage wrapper that wraps all hypercores in a directory
// structures. (dir/0, dir/1, ...)
this._dirs = {}
this._max_dir = -1
this._storage = function (dir) {
return function (name) {
var s = storage
Expand Down Expand Up @@ -92,6 +94,51 @@ Multifeed.prototype._addFeed = function (feed, name) {
this._forwardLiveFeedAnnouncements(feed, name)
}

Multifeed.prototype.removeFeed = function (nameOrKey, cb) {
if (typeof cb !== 'function') cb = function noop () {}

var self = this

var feed = null
var name = null
var key = null

if (nameOrKey in self._feeds) {
name = nameOrKey
feed = self._feeds[name]
key = feed.key.toString('hex')
} else {
key = nameOrKey
feed = self._feedKeyToFeed[key]
name = Object.keys(self._feeds).find(key => self._feeds[key] === feed)
}

delete self._feeds[name]
delete self._feedKeyToFeed[key]

// Remove from dirs index
Object.keys(self._dirs).forEach((dir) => {
if (self._dirs[dir] === feed) {
delete self._dirs[dir]
}
})

// Remove from mux offering
self._streams.forEach((mux) => {
var idx = mux._localOffer.indexOf(key)
if (idx !== -1) {
mux._localOffer.splice(idx, 1)
}
})

self.writerLock(function (release) {
feed.destroyStorage(function (err) {
if (err) return self.emit('error', err)
self._updateStorageIndex(function () { release(cb) })
})
})
}

Multifeed.prototype.ready = function (cb) {
this._ready(cb)
}
Expand Down Expand Up @@ -133,32 +180,52 @@ function _close (cb) {
})
}

Multifeed.prototype._updateStorageIndex = function (cb) {
if (typeof cb !== 'function') cb = function noop () {}

var self = this

var dirs = Object.keys(self._dirs).join(',')
self._max_dir = Math.max.apply(null, Object.keys(self._dirs).map(Number))

var st = self._storage('index')('dirs')
writeStringToStorage(dirs, st, cb)
}

Multifeed.prototype._loadFeeds = function (cb) {
var self = this

// Hypercores are stored starting at 0 and incrementing by 1. A failed read
// at position 0 implies non-existance of the hypercore.
// Hypercores are stored via an index file in numbers directories. If no index
// is found, the structure is assumed to be legacy which starts at 0 and
// increments by 1. A failed read at position 0 implies non-existance of the
// hypercore and if legacy means the end of loading.
var doneOnErr = true
var nextDir = function (dir) { return dir + 1 }

var pending = 1
function next (n) {
var storage = self._storage('' + n)
function next (dir) {
if (!dir && typeof dir !== 'number') return done()

var storage = self._storage('' + dir)
var st = storage('key')
st.read(0, 4, function (err) {
if (err) return done() // means there are no more feeds to read
debug(self._id + ' [INIT] loading feed #' + n)
if (err && doneOnErr) return done() // means there are no more feeds to read
debug(self._id + ' [INIT] loading feed #' + dir)
pending++
var feed = self._hypercore(storage, self._opts)
process.nextTick(next, n + 1)
process.nextTick(next, nextDir(dir))

feed.ready(function () {
readStringFromStorage(storage('localname'), function (err, name) {
if (!err && name) {
self._addFeed(feed, name)
} else {
self._addFeed(feed, String(n))
self._addFeed(feed, String(dir))
}
self._dirs[dir] = feed
st.close(function (err) {
if (err) return done(err)
debug(self._id + ' [INIT] loaded feed #' + n)
debug(self._id + ' [INIT] loaded feed #' + dir)
done()
})
})
Expand All @@ -174,7 +241,30 @@ Multifeed.prototype._loadFeeds = function (cb) {
if (!--pending) cb()
}

next(0)
var indexSt = self._storage('index')('dirs')

readStringFromStorage(indexSt, function (err, dirs) {
if (err) {
next(0)
} else {
doneOnErr = false
dirs = dirs ? dirs.split(',') : []

// Update max dir on load
self._max_dir = Math.max.apply(null, dirs.map(Number).concat(self._max_dir))

nextDir = function (dir) {
var idx = dirs.indexOf(dir)
if (idx < dirs.length - 1) {
return dirs[idx + 1]
} else {
return ''
}
}

next(dirs[0])
}
})
}

Multifeed.prototype.writer = function (name, opts, cb) {
Expand All @@ -201,10 +291,10 @@ Multifeed.prototype.writer = function (name, opts, cb) {
debug(self._id + ' [WRITER] creating new writer: ' + name)

self.writerLock(function (release) {
var len = Object.keys(self._feeds).length
var storage = self._storage('' + len)
var dir = self._max_dir + 1
var storage = self._storage('' + dir)

var idx = name || String(len)
var idx = name || String(dir)

var nameStore = storage('localname')
writeStringToStorage(idx, nameStore, function (err) {
Expand All @@ -224,9 +314,12 @@ Multifeed.prototype.writer = function (name, opts, cb) {

feed.ready(function () {
self._addFeed(feed, String(idx))
release(function () {
if (err) cb(err)
else cb(null, feed, idx)
self._dirs[dir] = feed
self._updateStorageIndex(function (err) {
release(function () {
if (err) cb(err)
else cb(null, feed, idx)
})
})
})
})
Expand Down Expand Up @@ -332,7 +425,7 @@ Multifeed.prototype.replicate = function (isInitiator, opts) {
return !Number.isNaN(parseInt(key, 16)) && key.length === 64
})

var numFeeds = Object.keys(self._feeds).length
var numFeeds = self._max_dir + 1
var keyId = numFeeds
filtered.forEach(function (key) {
var feeds = values(self._feeds).filter(function (feed) {
Expand All @@ -355,6 +448,8 @@ Multifeed.prototype.replicate = function (isInitiator, opts) {
}
feed.ready(function () {
self._addFeed(feed, myKey)
self._dirs[myKey] = feed
self._updateStorageIndex()
keyId++
debug(self._id + ' [REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex'))
if (!--pending) cb()
Expand All @@ -378,12 +473,25 @@ Multifeed.prototype._forwardLiveFeedAnnouncements = function (feed, name) {
})
}

// TODO: what if the new data is shorter than the old data? things will break!
function writeStringToStorage (string, storage, cb) {
var buf = Buffer.from(string, 'utf8')
storage.write(0, buf, function (err) {
storage.close(function (err2) {
cb(err || err2)
function writeBuffer () {
var buf = Buffer.from(string, 'utf8')
storage.write(0, buf, function (err) {
storage.close(function (err2) {
cb(err || err2)
})
})
}

// Check if data already exists
storage.stat(function (err, stat) {
if (err) return writeBuffer()

var len = stat.size
storage.del(0, len, function (err) {
if (err) return cb(err)

writeBuffer()
})
})
}
Expand Down
Loading