diff --git a/index.js b/index.js index 5d2236a..595a95c 100644 --- a/index.js +++ b/index.js @@ -39,6 +39,8 @@ function Multifeed (storage, opts) { // random-access-storage wrapper that wraps all hypercores in a directory // structures. (dir/0, dir/1, ...) + this._dirs = {} + this._max_dir = -1 this._storage = function (dir) { return function (name) { var s = storage @@ -92,6 +94,51 @@ Multifeed.prototype._addFeed = function (feed, name) { this._forwardLiveFeedAnnouncements(feed, name) } +Multifeed.prototype.removeFeed = function (nameOrKey, cb) { + if (typeof cb !== 'function') cb = function noop () {} + + var self = this + + var feed = null + var name = null + var key = null + + if (nameOrKey in self._feeds) { + name = nameOrKey + feed = self._feeds[name] + key = feed.key.toString('hex') + } else { + key = nameOrKey + feed = self._feedKeyToFeed[key] + name = Object.keys(self._feeds).find(key => self._feeds[key] === feed) + } + + delete self._feeds[name] + delete self._feedKeyToFeed[key] + + // Remove from dirs index + Object.keys(self._dirs).forEach((dir) => { + if (self._dirs[dir] === feed) { + delete self._dirs[dir] + } + }) + + // Remove from mux offering + self._streams.forEach((mux) => { + var idx = mux._localOffer.indexOf(key) + if (idx !== -1) { + mux._localOffer.splice(idx, 1) + } + }) + + self.writerLock(function (release) { + feed.destroyStorage(function (err) { + if (err) return self.emit('error', err) + self._updateStorageIndex(function () { release(cb) }) + }) + }) +} + Multifeed.prototype.ready = function (cb) { this._ready(cb) } @@ -133,32 +180,52 @@ function _close (cb) { }) } +Multifeed.prototype._updateStorageIndex = function (cb) { + if (typeof cb !== 'function') cb = function noop () {} + + var self = this + + var dirs = Object.keys(self._dirs).join(',') + self._max_dir = Math.max.apply(null, Object.keys(self._dirs).map(Number)) + + var st = self._storage('index')('dirs') + writeStringToStorage(dirs, st, cb) +} + Multifeed.prototype._loadFeeds = function (cb) { var self = this - // Hypercores are stored starting at 0 and incrementing by 1. A failed read - // at position 0 implies non-existance of the hypercore. + // Hypercores are stored via an index file in numbers directories. If no index + // is found, the structure is assumed to be legacy which starts at 0 and + // increments by 1. A failed read at position 0 implies non-existance of the + // hypercore and if legacy means the end of loading. + var doneOnErr = true + var nextDir = function (dir) { return dir + 1 } + var pending = 1 - function next (n) { - var storage = self._storage('' + n) + function next (dir) { + if (!dir && typeof dir !== 'number') return done() + + var storage = self._storage('' + dir) var st = storage('key') st.read(0, 4, function (err) { - if (err) return done() // means there are no more feeds to read - debug(self._id + ' [INIT] loading feed #' + n) + if (err && doneOnErr) return done() // means there are no more feeds to read + debug(self._id + ' [INIT] loading feed #' + dir) pending++ var feed = self._hypercore(storage, self._opts) - process.nextTick(next, n + 1) + process.nextTick(next, nextDir(dir)) feed.ready(function () { readStringFromStorage(storage('localname'), function (err, name) { if (!err && name) { self._addFeed(feed, name) } else { - self._addFeed(feed, String(n)) + self._addFeed(feed, String(dir)) } + self._dirs[dir] = feed st.close(function (err) { if (err) return done(err) - debug(self._id + ' [INIT] loaded feed #' + n) + debug(self._id + ' [INIT] loaded feed #' + dir) done() }) }) @@ -174,7 +241,30 @@ Multifeed.prototype._loadFeeds = function (cb) { if (!--pending) cb() } - next(0) + var indexSt = self._storage('index')('dirs') + + readStringFromStorage(indexSt, function (err, dirs) { + if (err) { + next(0) + } else { + doneOnErr = false + dirs = dirs ? dirs.split(',') : [] + + // Update max dir on load + self._max_dir = Math.max.apply(null, dirs.map(Number).concat(self._max_dir)) + + nextDir = function (dir) { + var idx = dirs.indexOf(dir) + if (idx < dirs.length - 1) { + return dirs[idx + 1] + } else { + return '' + } + } + + next(dirs[0]) + } + }) } Multifeed.prototype.writer = function (name, opts, cb) { @@ -201,10 +291,10 @@ Multifeed.prototype.writer = function (name, opts, cb) { debug(self._id + ' [WRITER] creating new writer: ' + name) self.writerLock(function (release) { - var len = Object.keys(self._feeds).length - var storage = self._storage('' + len) + var dir = self._max_dir + 1 + var storage = self._storage('' + dir) - var idx = name || String(len) + var idx = name || String(dir) var nameStore = storage('localname') writeStringToStorage(idx, nameStore, function (err) { @@ -224,9 +314,12 @@ Multifeed.prototype.writer = function (name, opts, cb) { feed.ready(function () { self._addFeed(feed, String(idx)) - release(function () { - if (err) cb(err) - else cb(null, feed, idx) + self._dirs[dir] = feed + self._updateStorageIndex(function (err) { + release(function () { + if (err) cb(err) + else cb(null, feed, idx) + }) }) }) }) @@ -332,7 +425,7 @@ Multifeed.prototype.replicate = function (isInitiator, opts) { return !Number.isNaN(parseInt(key, 16)) && key.length === 64 }) - var numFeeds = Object.keys(self._feeds).length + var numFeeds = self._max_dir + 1 var keyId = numFeeds filtered.forEach(function (key) { var feeds = values(self._feeds).filter(function (feed) { @@ -355,6 +448,8 @@ Multifeed.prototype.replicate = function (isInitiator, opts) { } feed.ready(function () { self._addFeed(feed, myKey) + self._dirs[myKey] = feed + self._updateStorageIndex() keyId++ debug(self._id + ' [REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex')) if (!--pending) cb() @@ -378,12 +473,25 @@ Multifeed.prototype._forwardLiveFeedAnnouncements = function (feed, name) { }) } -// TODO: what if the new data is shorter than the old data? things will break! function writeStringToStorage (string, storage, cb) { - var buf = Buffer.from(string, 'utf8') - storage.write(0, buf, function (err) { - storage.close(function (err2) { - cb(err || err2) + function writeBuffer () { + var buf = Buffer.from(string, 'utf8') + storage.write(0, buf, function (err) { + storage.close(function (err2) { + cb(err || err2) + }) + }) + } + + // Check if data already exists + storage.stat(function (err, stat) { + if (err) return writeBuffer() + + var len = stat.size + storage.del(0, len, function (err) { + if (err) return cb(err) + + writeBuffer() }) }) } diff --git a/test/basic.js b/test/basic.js index ac27a57..0d2ffb9 100644 --- a/test/basic.js +++ b/test/basic.js @@ -247,6 +247,86 @@ test('get localfeed by name across disk loads', function (t) { }) }) +test('load all feeds from disk after removing one', function (t) { + t.plan(14) + + var storage = tmp() + var multi = multifeed(storage, { valueEncoding: 'json' }) + + multi.writer('foo', function (err, wFoo) { + t.error(err) + t.ok(wFoo.key) + + multi.writer('bar', function (err, wBar) { + t.error(err) + t.ok(wBar.key) + wBar.append('a') + + multi.writer('baz', function (err, wBaz) { + t.error(err) + t.ok(wBaz.key) + + multi.removeFeed('bar', function () { + t.deepEquals(multi.feeds().length, 2, 'baz successfully deleted') + + multi.close(function () { + var multi2 = multifeed(storage, { valueEncoding: 'json' }) + multi2.writer('foo', function (err, wFoo2) { + t.error(err) + t.ok(wFoo2.key) + t.deepEquals(multi2.feeds().length, 2) + t.deepEquals(wFoo2.key, wFoo.key, 'keys match') + + multi2.writer('baz', function (err, wBaz2) { + t.error(err) + t.ok(wBaz2.key) + t.deepEquals(wBaz2.key, wBaz.key, 'keys match') + }) + }) + }) + }) + }) + }) + }) +}) + +test('load all feeds from legacy storage', function (t) { + t.plan(9) + + var storage = tmp() + var multi = multifeed(storage, { valueEncoding: 'json' }) + + // Create a legacy storage structure by creating two feeds & deleting the + // index. + multi.writer(function (err, wFoo) { + t.error(err, 'no error getting writer') + t.ok(wFoo.key, 'got a key') + + // Add second feed to highlight that feeds aren't loaded + // This is necessary because `writer()` implicitly loads a feed by + // "creating" it. + multi.writer('foo', function (err, wBar) { + t.error(err, 'no error getting second feed') + t.ok(wBar.key, 'got a key') + t.deepEquals(multi.feeds().length, 2, 'now have 2 feeds') + + // Remove index to replicate legacy storage configuration + multi._storage('index')('dirs').destroy(function () { + multi.close(function () { + var multi2 = multifeed(storage, { valueEncoding: 'json' }) + multi2.writer(function (err, wFoo2) { + t.error(err) + t.ok(wFoo2.key) + console.log('multi2._feeds', multi2._feeds) + t.deepEquals(multi2.feeds().length, 2) + t.deepEquals(wFoo2.key, wFoo.key, 'keys match') + }) + }) + }) + }) + }) +}) + test('close', function (t) { var storage = tmp() var multi = multifeed(storage, { valueEncoding: 'json' }) @@ -291,6 +371,112 @@ test('close after double-open', function (t) { } }) +test('remove feed w/ name', function (t) { + t.plan(6) + + var m1 = multifeed(ram, { valueEncoding: 'json' }) + var m2 = multifeed(ram, { valueEncoding: 'json' }) + + m1.writer(function (err) { + t.error(err) + m2.writer(function (err) { + t.error(err) + var r = m1.replicate(true) + r.pipe(m2.replicate(false)).pipe(r) + .once('end', remove) + }) + }) + + function remove () { + t.equals(m1.feeds().length, 2) + var feeds = m1.feeds() + var idx = feeds.length - 1 + m1.removeFeed(idx, function (err) { + t.error(err) + check() + }) + } + + function check () { + t.equals(m1.feeds().length, 1) + t.equals(m2.feeds().length, 2) + } +}) + +test('remove feed w/ key', function (t) { + t.plan(6) + + var m1 = multifeed(ram, { valueEncoding: 'json' }) + var m2 = multifeed(ram, { valueEncoding: 'json' }) + + m1.writer(function (err) { + t.error(err) + m2.writer(function (err) { + t.error(err) + var r = m1.replicate(true) + r.pipe(m2.replicate(false)).pipe(r) + .once('end', remove) + }) + }) + + function remove () { + t.equals(m1.feeds().length, 2) + var feeds = m1.feeds() + var feed = feeds[feeds.length - 1] + var key = feed.key.toString('hex') + m1.removeFeed(key, function (err) { + t.error(err) + check() + }) + } + + function check () { + t.equals(m1.feeds().length, 1) + t.equals(m2.feeds().length, 2) + } +}) + +test('remove feed updates mux\'s knownFeeds()', function (t) { + t.plan(8) + + var m1 = multifeed(ram, { valueEncoding: 'json' }) + var m2 = multifeed(ram, { valueEncoding: 'json' }) + + m1.writer(function (err) { + t.error(err) + m2.writer(function (err) { + t.error(err) + var r = m1.replicate(true, { live: true }) + r.pipe(m2.replicate(false, { live: true })).pipe(r) + setTimeout(remove, 1000) + }) + }) + + function remove () { + var feeds = m1.feeds() + var idx = feeds.length - 1 + var feed = feeds[idx] + var mux = m1._streams[0] + var key = feed.key.toString('hex') + + // Force feed key to be available in mux localOffer. This would be true of + // future replications. + mux.offerFeeds([key]) + + // Check it exists before removing + t.equals(m1.feeds().length, 2) + t.notEquals(mux._localOffer.indexOf(key), -1) + + m1.removeFeed(idx, function (err) { + t.error(err) + // Check it was removed from only m1 + t.equals(mux._localOffer.indexOf(key), -1) + t.equals(m1.feeds().length, 1) + t.equals(m2.feeds().length, 2) + }) + } +}) + test('can provide custom encryption key', function (t) { t.plan(2) diff --git a/test/regression.js b/test/regression.js index 18d5dd9..64618a1 100644 --- a/test/regression.js +++ b/test/regression.js @@ -356,7 +356,7 @@ test('regression: ensure encryption key is not written to disk', function (t) { t.error(err) fs.readdir(storage, function (err, res) { t.error(err) - t.equals(res.length, 1) + t.equals(res.length, 2) t.equals(res[0], '0') }) })