Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACK feature #215

Merged
merged 4 commits into from Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,22 @@ Options include:
``` js
{
live: false, // keep replicating after all remote data has been downloaded?
ack: false, // set to true to get explicit acknowledgement when a peer has written a block
download: true, // download data from peers?
encrypt: true // encrypt the data sent using the hypercore key pair
}
```

When `ack` is `true`, you can listen on the replication `stream` for an `ack`
event:

``` js
var stream = feed.replicate({ ack: true })
stream.on('ack', function (ack) {
console.log(ack.start, ack.length)
})
```

#### `feed.close([callback])`

Fully close this feed.
Expand Down
13 changes: 13 additions & 0 deletions lib/replicate.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function replicate (feed, opts) {
if (!triggerReady()) {
peer.feed.emit('remote-update', peer)
}
peer.remoteAck = stream.remoteAck
})
var firstTime = true

Expand Down Expand Up @@ -70,8 +71,10 @@ function Peer (feed, opts) {
this.remoteLength = 0
this.remoteWant = false
this.remoteTree = null
this.remoteAck = false
this.live = !!opts.live
this.sparse = feed.sparse
this.ack = !!opts.ack

this.remoteDownloading = true
this.downloading = typeof opts.download === 'boolean' ? opts.download : !feed.writable
Expand Down Expand Up @@ -130,6 +133,11 @@ Peer.prototype.ondata = function (data) {
this.feed._putBuffer(data.index, data.value, data, this, function (err) {
if (err) return self.destroy(err)
if (data.value) self.remoteBitfield.set(data.index, false)
if (self.remoteAck) {
// Send acknowledgement.
// In the future this could batch several ACKs at once
self.stream.have({start: data.index, length: 1, ack: true})
}
if (self._stats && data.value) {
self._stats.downloadedBlocks += 1
self._stats.downloadedBytes += data.value.length
Expand Down Expand Up @@ -237,6 +245,11 @@ Peer.prototype.ontick = function () {
}

Peer.prototype.onhave = function (have) {
if (this.ack && have.ack && !have.bitfield && this.feed.bitfield.get(have.start)) {
this.stream.stream.emit('ack', have)
return
}

var updated = this._first
if (this._first) this._first = false

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"flat-tree": "^1.6.0",
"from2": "^2.3.0",
"hypercore-crypto": "^1.0.0",
"hypercore-protocol": "^6.4.1",
"hypercore-protocol": "^6.5.0",
"inherits": "^2.0.3",
"inspect-custom-symbol": "^1.1.0",
"last-one-wins": "^1.0.4",
Expand Down
311 changes: 311 additions & 0 deletions test/ack.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
var create = require('./helpers/create')
var tape = require('tape')

tape('replicate with ack', function (t) {
var feed = create()
feed.on('ready', function () {
var clone = create(feed.key)

var stream = feed.replicate({live: true, ack: true})
stream.pipe(clone.replicate({live: true})).pipe(stream)

stream.on('handshake', function () {
feed.append(['a', 'b', 'c'])
})
var seen = 0
stream.on('ack', function (ack) {
seen++
if (seen > 3) t.fail()
if (seen === 3) t.end()
})
})
})

tape('ack only when something is downloaded', function (t) {
t.plan(1)
var feed = create()
feed.on('ready', function () {
var clone = create(feed.key)
var stream1 = clone.replicate()
stream1.on('ack', function (ack) {
t.fail('unexpected ack')
})
feed.append(['a', 'b', 'c'], function () {
// pre-populate with 3 records
stream1.pipe(feed.replicate()).pipe(stream1)
})
stream1.on('end', function () {
feed.append([ 'd', 'e' ])
// add 2 more records. only these should be ACK'd
var acks = []
var stream2 = feed.replicate({ ack: true })
stream2.on('ack', function (ack) {
acks.push(ack.start)
})
stream2.pipe(clone.replicate()).pipe(stream2)
stream2.on('end', function () {
t.deepEqual(acks.sort(), [3, 4])
t.end()
})
})
})
})

tape('simultaneous replication with ack and no-ack', function (t) {
t.plan(1)
var feed = create()
feed.on('ready', function () {
feed.append(['a', 'b', 'c'])

var clone1 = create(feed.key)
var clone2 = create(feed.key)
var stream0 = feed.replicate({ ack: true })
var stream1 = clone1.replicate()
var stream2 = clone2.replicate()
var stream3 = feed.replicate()
stream1.pipe(stream0).pipe(stream1)
stream2.pipe(stream3).pipe(stream2)

var acks = []
stream0.on('ack', function (ack) {
acks.push(ack.start)
})
stream1.on('ack', function (ack) {
t.fail('unexpected ack')
})
stream2.on('ack', function (ack) {
t.fail('unexpected ack')
})
stream3.on('ack', function (ack) {
t.fail('unexpected ack')
})
stream1.on('end', function () {
t.deepEqual(acks.sort(), [0, 1, 2])
t.end()
})
})
})

tape('simultaneous replication with two acks', function (t) {
t.plan(1)
var feed = create()
feed.on('ready', function () {
feed.append(['a', 'b', 'c'])

var clone1 = create(feed.key)
var clone2 = create(feed.key)
var stream0 = feed.replicate({ ack: true })
var stream1 = clone1.replicate()
var stream2 = clone2.replicate()
var stream3 = feed.replicate({ ack: true })
stream1.pipe(stream0).pipe(stream1)
stream2.pipe(stream3).pipe(stream2)

var acks = [[], []]
stream0.on('ack', function (ack) {
acks[0].push(ack.start)
})
stream1.on('ack', function (ack) {
t.fail('unexpected ack')
})
stream2.on('ack', function (ack) {
t.fail('unexpected ack')
})
stream3.on('ack', function (ack) {
acks[1].push(ack.start)
})
var pending = 2
stream1.on('end', function () {
if (--pending === 0) check()
})
stream2.on('end', function () {
if (--pending === 0) check()
})
function check () {
acks.forEach(function (r) { r.sort() })
t.deepEqual(acks, [[0, 1, 2], [0, 1, 2]])
t.end()
}
})
})

tape('acks where clones should not ack', function (t) {
t.plan(1)
var feed = create()
feed.on('ready', function () {
feed.append(['a', 'b', 'c'])

var clone1 = create(feed.key)
var clone2 = create(feed.key)
var stream1 = feed.replicate({ ack: true })
var stream2 = feed.replicate({ ack: true })
var cstream1 = clone1.replicate({ ack: true }) // but shouldn't get any acks
var cstream2 = clone2.replicate({ ack: true }) // but shouldn't get any acks
stream1.pipe(cstream1).pipe(stream1)
stream2.pipe(cstream2).pipe(stream2)

cstream1.on('ack', function (ack) {
t.fail('unexpected ack')
})
cstream2.on('ack', function (ack) {
t.fail('unexpected ack')
})
var acks = [[], []]
stream1.on('ack', function (ack) {
acks[0].push(ack.start)
})
stream2.on('ack', function (ack) {
acks[1].push(ack.start)
})
var pending = 2
stream1.on('end', function () {
if (--pending === 0) check()
})
stream2.on('end', function () {
if (--pending === 0) check()
})
function check () {
acks.forEach(function (r) { r.sort() })
t.deepEqual(acks, [[0, 1, 2], [0, 1, 2]])
t.end()
}
})
})

tape('transitive clone acks', function (t) {
t.plan(2)
var feed = create()
feed.on('ready', function () {
feed.append(['a', 'b', 'c'], ready)
})
function ready (err) {
t.ifError(err)
var clone1 = create(feed.key)
var clone2 = create(feed.key)
var stream1 = feed.replicate({ live: true, ack: true })
var stream2 = clone1.replicate({ live: true, ack: true })
var stream3 = clone1.replicate({ live: true, ack: true })
var stream4 = clone2.replicate({ live: true, ack: true })
var acks = [[], [], [], []]
;[stream1, stream2, stream3, stream4].forEach(function (stream, i) {
stream.on('ack', function (ack) {
acks[i].push(ack.start)
})
})
stream1.pipe(stream2).pipe(stream1)
stream3.pipe(stream4).pipe(stream3)
var dl = 0
clone2.on('download', function () {
// allow an extra tick for ack response to arrive
if (++dl === 3) ntick(2, check)
})
function check () {
acks.forEach(function (r) { r.sort() })
t.deepEqual(acks, [[0, 1, 2], [], [0, 1, 2], []])
}
}
})

tape('larger gossip network acks', function (t) {
t.plan(16)
var feed = create()
var cores = [feed]
var acks = {}
feed.on('ready', function () {
for (var i = 1; i < 10; i++) {
cores.push(create(feed.key))
}
next(0)
})
var ops = [
['append', 'A'],
['connect', 0, 1], // acks["0,1"].push(0)
['append', 'B'],
['append', 'C'],
['connect', 1, 2], // acks["1,2"].push(0)
['connect', 0, 1], // acks["0,1"].push(1,2)
['append', 'D'],
['append', 'E'],
['append', 'F'],
['connect', 0, 5], // acks["0,5"].push(0,1,2,3,4,5)
['connect', 2, 5], // acks["5,2"].push(1,2,3,4,5)
['connect', 5, 6], // acks["5,6"].push(0,1,2,3,4,5)
['connect', 1, 6], // acks["6,1"].push(3,4,5)
['append', 'G'],
['append', 'H'],
['connect', 4, 2], // acks["2,4"].push(0,1,2,3,4,5)
['connect', 0, 7], // acks["0,7"].push(0,1,2,3,4,5,6,7)
['connect', 4, 7], // acks["7,4"].push(6,7)
['connect', 4, 5], // acks["4,5"].push(6,7)
['connect', 5, 8], // acks["5,8"].push(0,1,2,3,4,5,6,7)
['append', 'I'],
['append', 'J'],
['append', 'K'],
['connect', 0, 8], // acks["0,8"].push(8,9,10)
['connect', 5, 9], // acks["5,9"].push(0,1,2,3,4,5,6,7)
['connect', 8, 4], // acks["8,4"].push(8,9,10)
['append', 'L'],
['append', 'M'],
['append', 'N'],
['append', 'O'],
['connect', 9, 0], // acks["0,9"].push(8,9,10,11,12,13,14)
['connect', 2, 9] // acks["9,2"].push(6,7,8,9,10,11,12,13,14)
]
function next (i) {
var op = ops[i]
if (!op) return check()
if (op[0] === 'append') {
feed.append(op[1], function (err) {
t.ifError(err)
next(i + 1)
})
} else if (op[0] === 'connect') {
var src = cores[op[1]]
var dst = cores[op[2]]
var sr = src.replicate({ ack: true })
var dr = dst.replicate({ ack: true })
sr.on('ack', function (ack) {
var key = op[1] + ',' + op[2]
if (!acks[key]) acks[key] = []
acks[key].push(ack.start)
})
dr.on('ack', function (ack) {
var key = op[2] + ',' + op[1]
if (!acks[key]) acks[key] = []
acks[key].push(ack.start)
})
sr.pipe(dr).pipe(sr)
var pending = 2
sr.on('end', function () { if (--pending === 0) next(i + 1) })
dr.on('end', function () { if (--pending === 0) next(i + 1) })
}
}
function check () {
Object.keys(acks).forEach(function (key) {
acks[key].sort(function (a, b) { return a - b })
})
t.deepEqual(acks, {
'0,1': [ 0, 1, 2 ],
'1,2': [ 0 ],
'0,5': [ 0, 1, 2, 3, 4, 5 ],
'5,2': [ 1, 2, 3, 4, 5 ],
'5,6': [ 0, 1, 2, 3, 4, 5 ],
'6,1': [ 3, 4, 5 ],
'2,4': [ 0, 1, 2, 3, 4, 5 ],
'0,7': [ 0, 1, 2, 3, 4, 5, 6, 7 ],
'7,4': [ 6, 7 ],
'4,5': [ 6, 7 ],
'5,8': [ 0, 1, 2, 3, 4, 5, 6, 7 ],
'0,8': [ 8, 9, 10 ],
'5,9': [ 0, 1, 2, 3, 4, 5, 6, 7 ],
'8,4': [ 8, 9, 10 ],
'0,9': [ 8, 9, 10, 11, 12, 13, 14 ],
'9,2': [ 6, 7, 8, 9, 10, 11, 12, 13, 14 ]
})
}
})

function ntick (times, cb) {
if (times === 0) cb()
else process.nextTick(ntick, times - 1, cb)
}