Skip to content

Commit

Permalink
fix(sender-link): settle send promises on client disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Oct 4, 2016
1 parent 49edd34 commit 98bd734
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 22 deletions.
62 changes: 40 additions & 22 deletions lib/sender_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,28 @@ SenderLink.prototype.send = function(msg, options) {
var deliveryTag = self.session._deliveryTag++;
var sendMessage = function(err) {
if (err) {
reject(err);
return reject(err);
}

debug('sending: ', msg);
var messageId = self._sendMessage(message, {
deliveryTag: new Buffer(deliveryTag.toString())
});

var cbPolicy = self.policy.callback;
if (cbPolicy === putils.SenderCallbackPolicies.OnSettle) {
var deferredSender = function(err, state) {
if (!!err) {
reject(err);
} else {
resolve(state);
}
};
self._unsettledSends[messageId] = deferredSender;
} else if (cbPolicy === putils.SenderCallbackPolicies.OnSent) {
resolve();
} else {
debug('sending: ', msg);
var messageId = self._sendMessage(message, {
deliveryTag: new Buffer(deliveryTag.toString())
});

var cbPolicy = self.policy.callback;
if (cbPolicy === putils.SenderCallbackPolicies.OnSettle) {
var deferredSender = function(err, state) {
if (!!err) {
reject(err);
} else {
resolve(state);
}
};
self._unsettledSends[messageId] = deferredSender;
} else if (cbPolicy === putils.SenderCallbackPolicies.OnSent) {
resolve();
} else {
reject(new errors.ArgumentError('Invalid sender callback policy: ' + cbPolicy));
}
reject(new errors.ArgumentError('Invalid sender callback policy: ' + cbPolicy));
}
};

Expand Down Expand Up @@ -216,10 +216,22 @@ SenderLink.prototype._detached = function(frame) {
};

SenderLink.prototype._dispatchPendingSends = function(err) {
while (this._pendingSends && this._pendingSends.length > 0 && this.canSend()) {
if (!this._pendingSends || this._pendingSends.length === 0) return;
while (this._pendingSends.length > 0 && (!!err ? true : this.canSend())) {
var sendMessage = this._pendingSends.shift();
sendMessage(err);
}

if (!!err) {
var self = this;
Object.keys(this._unsettledSends).forEach(function(id) {
var deferredSender = self._unsettledSends[id];
deferredSender(err, null);
delete self._unsettledSends[id];
});

this._unsettledSends = {};
}
};

SenderLink.prototype._dispositionReceived = function(details) {
Expand All @@ -244,4 +256,10 @@ SenderLink.prototype._dispositionReceived = function(details) {
}
};

// @override
SenderLink.prototype.forceDetach = function() {
this._dispatchPendingSends(new errors.ProtocolError('amqp:link:detach-forced', 'detach-forced'));
Link.prototype.forceDetach.call(this);
};

module.exports = SenderLink;
10 changes: 10 additions & 0 deletions test/integration/qpid/sender_link.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,15 @@ describe('SenderLink', function() {
});
});

it('should resolve pending messages on disconnect', function(done) {
test.client.connect(config.address)
.then(function() { return test.client.createSender('amq.topic'); })
.then(function(sender) {
sender.linkCredit = 0;
sender.send({ test: 'data' }).catch(function(err) { done(); });
return test.client.disconnect();
});
});

});
});

0 comments on commit 98bd734

Please sign in to comment.