Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/Replication Manager v3 #32

Closed
wants to merge 13 commits into from
Closed
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ and are the same as
Valid `opts` include:
- `opts.key` (string): optional encryption key to use during replication.

- `opts.headerOrigin` (string): when using multiple multifeeds or stores in
a single replication stack, this option controls the 'origin' tag which
is used as an address label to ensure that feeds end up in expected
stores during replication. defaults to: `'multifeed'`


### multi.writer([name, ]cb)

If no `name` is given, a new local writeable feed is created and returned via
Expand Down Expand Up @@ -101,12 +107,39 @@ Create a duplex stream for replication.
Works just like hypercore, except *all* local hypercores are exchanged between
replication endpoints.

**Note**: this stream is *not* an encrypted channel.
~~**Note**: this stream is *not* an encrypted channel.~~ it is now.

### multi.on('feed', function (feed, name) { ... })

Emitted whenever a new feed is added, whether locally or remotely.

### multi.use([namespace,] function middleware() { ... })

Helper for backwards compatibility.
Forwards the `use` call to the current ReplicationManager.
If the multifeed instance has no replication manager, then one will be lazily
initialized.

**Note:** When assembling complex replication stacks it is recommended to do the
reverse and include multifeed into an existing stack instead of using the
internal lazily initialized one:
```js
// Manually initialize your replication stack
var app = replic8(encryptionKey)

// Initialize stores & middleware
var multi1 = multifeed(storage1, { headerOrigin: 'texts' })
var multi2 = multifeed(storage2, { headerOrigin: 'images' })

// Register multifeeds as a middleware in the stack
app.use(multi1)
app.use(multi2)

// Replicate entire stack
var stream = app.replicate({ live: true })
```


## multi.close(cb)

Close all file resources being held by the multifeed instance. `cb` is called once this is complete.
Expand Down
239 changes: 127 additions & 112 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var readyify = require('./ready')
var mutexify = require('mutexify')
var through = require('through2')
var debug = require('debug')('multifeed')
var multiplexer = require('./mux')
var decentstack = require('decentstack')

// Key-less constant hypercore to bootstrap hypercore-protocol replication.
var defaultEncryptionKey = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex')
Expand All @@ -19,8 +19,10 @@ function Multifeed (hypercore, storage, opts) {
this._feeds = {}
this._feedKeyToFeed = {}
this._streams = []
this._replicationManager = null

opts = opts || {}
this.headerOrigin = opts.headerOrigin || 'multifeed'

// Support legacy opts.key
if (opts.key) opts.encryptionKey = opts.key
Expand Down Expand Up @@ -86,7 +88,6 @@ Multifeed.prototype._addFeed = function (feed, name) {
this._feedKeyToFeed[feed.key.toString('hex')] = feed
feed.setMaxListeners(Infinity)
this.emit('feed', feed, name)
this._forwardLiveFeedAnnouncements(feed, name)
}

Multifeed.prototype.ready = function (cb) {
Expand Down Expand Up @@ -225,137 +226,151 @@ Multifeed.prototype.feed = function (key) {
else return null
}

Multifeed.prototype.replicate = function (opts) {
if (!this._root) {
var tmp = through()
process.nextTick(function () {
tmp.emit('error', new Error('tried to use "replicate" before multifeed is ready'))
})
return tmp
}
/**
* Multifeed implements middleware interface
*/

if (!opts) opts = {}
var self = this
var mux = multiplexer(self._root.key, opts)
// Share all feeds
Multifeed.prototype.share = function (next) {
if (this.closed) return next()

// Add key exchange listener
var onManifest = function (m) {
mux.requestFeeds(m.keys)
}
mux.on('manifest', onManifest)

// Add replication listener
var onReplicate = function (keys, repl) {
addMissingKeys(keys, function (err) {
if (err) return mux.destroy(err)

// Create a look up table with feed-keys as keys
// (since not all keys in self._feeds are actual feed-keys)
var key2feed = values(self._feeds).reduce(function (h, feed) {
h[feed.key.toString('hex')] = feed
return h
}, {})

// Select feeds by key from LUT
var feeds = keys.map(function (k) { return key2feed[k] })
repl(feeds)
})
}
mux.on('replicate', onReplicate)

// Start streaming
this.ready(function(err){
if (err) return mux.stream.destroy(err)
if (mux.stream.destroyed) return
mux.ready(function(){
var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
mux.offerFeeds(keys)
})
var self = this
this.ready(function () {
var feeds = self.feeds()
next(null, feeds)
})
}

// Push session to _streams array
self._streams.push(mux)
// Tag all our own feeds with 'origin' header
Multifeed.prototype.describe = function (ctx, next) {
if (this.closed) return next()

// Register removal
var cleanup = function (err) {
mux.removeListener('manifest', onManifest)
mux.removeListener('replicate', onReplicate)
self._streams.splice(self._streams.indexOf(mux), 1)
debug('[REPLICATION] Client connection destroyed', err)
}
mux.stream.once('end', cleanup)
mux.stream.once('error', cleanup)
var self = this
this.ready(function () {
if (self.feed(ctx.key)) next(null, { origin: self.headerOrigin })
else next() // don't care about unknown keys.
})
}

return mux.stream

// Helper functions
// Accept all feeds with correct 'origin' header
// initializes new feeds if missing
Multifeed.prototype.store = function (ctx, next) {
if (this.closed) return next()

function addMissingKeys (keys, cb) {
self.ready(function (err) {
if (err) return cb(err)
self.writerLock(function (release) {
addMissingKeysLocked(keys, function (err) {
release(cb, err)
})
})
})
}
var self = this
var key = ctx.key
var meta = ctx.meta
// Ignore non-multifeed feeds
if (meta.origin !== self.headerOrigin) return next()

function addMissingKeysLocked (keys, cb) {
var pending = 0
debug(self._id + ' [REPLICATION] recv\'d ' + keys.length + ' keys')
var filtered = keys.filter(function (key) {
return !Number.isNaN(parseInt(key, 16)) && key.length === 64
})
this.ready(function () {
var feed = self.feed(key)
// accept the feed if it already exist
if (feed) return next(null, feed)

var numFeeds = Object.keys(self._feeds).length
var keyId = numFeeds
filtered.forEach(function (key) {
var feeds = values(self._feeds).filter(function (feed) {
return feed.key.toString('hex') === key
})
if (!feeds.length) {
var myKey = String(keyId)
var storage = self._storage(myKey)
keyId++
pending++
var feed
try {
debug(self._id + ' [REPLICATION] trying to create new local hypercore, key=' + key.toString('hex'))
feed = self._hypercore(storage, Buffer.from(key, 'hex'), self._opts)
} catch (e) {
debug(self._id + ' [REPLICATION] failed to create new local hypercore, key=' + key.toString('hex'))
debug(self._id + e.toString())
if (!--pending) cb()
return
}
// If not, then create the feed.
self.writerLock(function (release) {
var keyId = Object.keys(self._feeds).length
var myKey = String(keyId)
var storage = self._storage(myKey)
try {
debug(self._id + ' [REPLICATION] trying to create new local hypercore, key=' + key.toString('hex'))
var feed = self._hypercore(storage, Buffer.from(key, 'hex'), self._opts)
feed.ready(function () {
self._addFeed(feed, myKey)
keyId++
debug(self._id + ' [REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex'))
if (!--pending) cb()
release(next, null, feed)
})
} catch (e) {
debug(self._id + ' [REPLICATION] failed to create new local hypercore, key=' + key.toString('hex'))
debug(self._id + e.toString())
release(next, e) // something went wrong, manager will disconnect the peer.
}
})
if (!pending) cb()
}
})
}

Multifeed.prototype._forwardLiveFeedAnnouncements = function (feed, name) {
if (!this._streams.length) return // no-op if no live-connections
// Provide key to feed lookup for replication and other middleware
Multifeed.prototype.resolve = function (key, next) {
if (this.closed) return next()

var self = this
var hexKey = feed.key.toString('hex');
// Tell each remote that we have a new key available unless
// it's already being replicated
this._streams.forEach(function(mux) {
if (mux.knownFeeds().indexOf(hexKey) === -1) {
self._streams
debug("Forwarding new feed to existing peer:", hexKey)
mux.offerFeeds([hexKey])
}
this.ready(function () {
next(null, self.feed(key))
})
}

// Multifeed used to include replication manager capabilities.
// Now it might be included in an external replication stack
// and by storing a reference to the external manager on inclusion
// we will continue to support `replicate(boolean, opts)` calls to follow the
// standard interface
Multifeed.prototype.mounted = function (mgr, namespace) {
if (this._replicationManager && this._replicationManager !== mgr) {
console.warn('WARNING! Calling multifeed.replicate() is unsafe when used in more than one stacks. use mgr.replicate() instead!')
}
this._replicationManager = mgr
}
/*
* End of middleware interface
*/

// Forward .use() call to replicationManger
Multifeed.prototype.use = function (namespace, middleware, prepend) {
this._lazyInitReplicationManager()
this._replicationManager.use(namespace, middleware, prepend)
}

Multifeed.prototype._lazyInitReplicationManager = function (opts) {
if (this._replicationManager) return

var mgr = decentstack(this._root.key, opts)
// Automatic error logger for backwards compatibility.
var errLogger = function (err) {
// Ignore errors if the manager has other error handlers registered.
if (mgr.listeners('error').find(function (l) { return l !== errLogger })) {
return
}

// If manager dosent have another handler, and neither does this multifeed
// instance, then log a warning on the console for now.
// Applications should handle their own errors even if they simply log them.
if (!this.listeners('error').length) {
console.warn('WARNING! multifeed will not log errors in the future,' +
'please add an "error" event listener to either multifeed or your replication manager')
console.error(err)
} else {
// There's an errorhandler registered on this instance.
// forward the error event
this.emit('error', err)
}
}.bind(this)

mgr.on('error', errLogger)

// register multifeed in the replication stack.
// the mounted() hook above will save the
// `mgr` instance as this._replicationManager
mgr.use(this)
}

Multifeed.prototype.replicate = function (initiator, opts) {
if (!this._root) {
var tmp = through()
process.nextTick(function () {
tmp.emit('error', new Error('tried to use "replicate" before multifeed is ready'))
})
return tmp
}

// Lazy manager initialization / Legacy support
this._lazyInitReplicationManager(opts)

// Let replication manager take care of replication
// requests
return this._replicationManager.replicate(initiator, opts)
}

// TODO: what if the new data is shorter than the old data? things will break!
function writeStringToStorage (string, storage, cb) {
var buf = Buffer.from(string, 'utf8')
Expand Down
Loading