Skip to content

Commit

Permalink
fix(sender-link): resolve send promises for certain sndSettleMode
Browse files Browse the repository at this point in the history
When the user has defined a link with `sndSettleMode` set to
`settled`, then all sent messages should implicitly not await a
disposition message.
  • Loading branch information
mbroadst committed Apr 20, 2017
1 parent fc99e24 commit be48605
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 16 deletions.
17 changes: 12 additions & 5 deletions lib/sender_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ SenderLink.prototype.send = function(msg, options) {
}

var self = this,
cbPolicy = self.policy.callback;
cbPolicy = self.policy.callback,
sndSettleMode = self.policy.attach.sndSettleMode;

if (cbPolicy === putils.SenderCallbackPolicies.None) {
var sendMessage = function(err) {
Expand Down Expand Up @@ -128,17 +129,23 @@ SenderLink.prototype.send = function(msg, options) {
deliveryTag: new Buffer(deliveryTag.toString())
});

if (sndSettleMode === constants.senderSettleMode.settled ||
cbPolicy === putils.SenderCallbackPolicies.OnSent) {
return resolve();
}


if (cbPolicy === putils.SenderCallbackPolicies.OnSettle) {
var deferredSender = function(err, state) {
if (!!err) return reject(err);
resolve(state);
};

self._unsettledSends[messageId] = deferredSender;
} else if (cbPolicy === putils.SenderCallbackPolicies.OnSent) {
resolve();
} else {
reject(new TypeError('Invalid sender callback policy: ' + cbPolicy));
return;
}

reject(new TypeError('Invalid sender callback policy: ' + cbPolicy));
};

if (!self.canSend()) {
Expand Down
18 changes: 18 additions & 0 deletions test/integration/qpid/disposition.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ describe('Disposition', function() {
});
});

it('should immediately resolve send promises if `sndSettleMode` is `settled`', function(done) {
test.client = new AMQPClient({
senderLink: {
attach: {
sndSettleMode: c.senderSettleMode.settled
}
}
});

test.client.connect(config.address)
.then(function() { return test.client.createSender('test.disposition.queue'); })
.then(function(sender) { return sender.send({ llamas: 'are cool' }); })

// now drain the queue, so we don't leave state on the server
.then(function() { return test.client.createReceiver('test.disposition.queue'); })
.then(function(receiver) { receiver.on('message', function(msg) { done(); }); });
});

it('should allow for manual disposition of received messages', function(done) {
var queueName = 'test.disposition.queue';
var messageCount = 5, receivedCount = 0;
Expand Down
9 changes: 5 additions & 4 deletions test/unit/mocks/receiver_link.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
'use strict';
var Link = require('../../../lib/link'),
ReceiverLink = require('../../../lib/receiver_link'),

Policy = require('../../../lib/policies/policy'),
putils = require('../../../lib/policies/policy_utilities'),
util = require('util');

function MockReceiverLink(session, options) {
MockReceiverLink.super_.call(this);
function MockReceiverLink(session, options, policyOverrides) {
var linkPolicy = putils.Merge(policyOverrides, (new Policy()).senderLink);
MockReceiverLink.super_.call(this, session, null, linkPolicy);

this._created = 0;
this.session = session;
this.options = options;
this._clearState();
}
Expand Down
11 changes: 4 additions & 7 deletions test/unit/mocks/sender_link.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
'use strict';
var Link = require('../../../lib/link'),
SenderLink = require('../../../lib/sender_link'),

Policy = require('../../../lib/policies/policy'),
putils = require('../../../lib/policies/policy_utilities'),
util = require('util');

function MockSenderLink(session, options) {
MockSenderLink.super_.call(this, session, null, {
encoder: function(body) { return body; },
callback: putils.SenderCallbackPolicies.OnSent
});
function MockSenderLink(session, options, policyOverrides) {
var linkPolicy = putils.Merge(policyOverrides, (new Policy()).senderLink);
MockSenderLink.super_.call(this, session, null, linkPolicy);

this._created = 0;
this.session = session;
this.options = options;
this._clearState();
}
Expand Down
1 change: 1 addition & 0 deletions test/unit/sender_link.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var TestPolicy = new Policy({
function AttachFrameWithReceivedName(role, offset) {
offset = offset || 1;
role = role || constants.linkRole.sender;

return function(prev) {
var data = prev[prev.length - offset].duplicate();
var lastAttach = frames.readFrame(data, { verbose: false });
Expand Down
4 changes: 4 additions & 0 deletions test/unit/test_amqpclient.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ describe('AMQPClient', function() {
name: 'queue_TX',
isSender: true,
capacity: 0
}, {
attach: { sndSettleMode: 1 }
});

s._addMockLink(l);
Expand Down Expand Up @@ -133,6 +135,8 @@ describe('AMQPClient', function() {
name: 'queue_TX',
isSender: true,
capacity: 100
}, {
attach: { sndSettleMode: 1 }
});

s._addMockLink(l);
Expand Down

0 comments on commit be48605

Please sign in to comment.