Skip to content

Commit

Permalink
feat(disposition): add accept and reject disposition methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Jun 24, 2015
1 parent 1dc4004 commit 423952e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 37 deletions.
5 changes: 2 additions & 3 deletions lib/frames/disposition_frame.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ var debug = require('debug')('amqp10:framing:disposition'),
ForcedType = require('../types/forced_type'),
AMQPSymbol = require('../types/amqp_symbol'),

FrameBase = require('./frame'),
BeginFrame = require('./begin_frame');
FrameBase = require('./frame');



Expand Down Expand Up @@ -115,7 +114,7 @@ DispositionFrame.Descriptor = {

DispositionFrame.prototype._getPerformative = function() {
var self = this;
return new DescribedType(BeginFrame.Descriptor.code, {
return new DescribedType(DispositionFrame.Descriptor.code, {

This comment has been minimized.

Copy link
@noodlefrenzy

noodlefrenzy Jun 24, 2015

Owner

Really?! Crap, sorry.

This comment has been minimized.

Copy link
@mbroadst

mbroadst Jun 24, 2015

Author Collaborator

you're actually the one who caught this in your branch!

role: self.role,
first: new ForcedType('uint', self.first),
last: new ForcedType('uint', self.last),
Expand Down
102 changes: 68 additions & 34 deletions lib/link.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var EventEmitter = require('events').EventEmitter,
var _ = require('lodash'),
EventEmitter = require('events').EventEmitter,
Promise = require('bluebird'),
util = require('util'),

Expand All @@ -15,7 +16,9 @@ var EventEmitter = require('events').EventEmitter,
DetachFrame = require('./frames/detach_frame'),
FlowFrame = require('./frames/flow_frame'),
TransferFrame = require('./frames/transfer_frame'),
DispositionFrame = require('./frames/disposition_frame'),

DeliveryStates = require('./types/delivery_states'),
Session = require('./session');

function Link(session, handle, linkPolicy) {
Expand Down Expand Up @@ -127,6 +130,58 @@ Link.prototype.sendMessage = function(message, options) {
return this.session.sendMessage(this, message, options);
};

Link.prototype.addCredits = function(credits, flowOptions) {
if (this.role === constants.linkRole.sender) {
throw new errors.InvalidStateError('Cannot add link credits as a sender');
}

var opts = flowOptions || {};
this.linkCredit += credits;
this.totalCredits += credits;
opts.linkCredit = this.totalCredits;
this.session._sessionParams.incomingWindow += credits;
opts.nextIncomingId = this.session._sessionParams.nextIncomingId;
opts.incomingWindow = this.session._sessionParams.incomingWindow;
opts.nextOutgoingId = this.session._sessionParams.nextOutgoingId;
opts.outgoingWindow = this.session._sessionParams.outgoingWindow;
opts.handle = this.handle;
opts.available = this.available;
opts.deliveryCount = this.deliveryCount;
opts.drain = false;
var flow = new FlowFrame(opts);
flow.channel = this.session.channel;
this.session.connection.sendFrame(flow);

var self = this;
return new Promise(function(resolve, reject) {
var onError = function(err) { reject(err); };
self.once(Link.ErrorReceived, onError);
self.once(Link.CreditChange, function() {
self.removeListener(Link.ErrorReceived, onError);
resolve();
});
});
};

Link.prototype.accept = function(message) {
// @todo: handle accepting an array of messages
this._sendDisposition({
first: message._deliveryId,
last: message._deliveryId,
settled: true,
state: new DeliveryStates.Accepted()
});
};

Link.prototype.reject = function(message, reason) {
// @todo: handle accepting an array of messages
this._sendDisposition({
first: message._deliveryId,
last: message._deliveryId,
settled: true,
state: new DeliveryStates.Rejected({ error: reason })
});
};

// private api
Link.prototype._attachReceived = function(attachFrame) {
Expand Down Expand Up @@ -161,39 +216,6 @@ Link.prototype._flowReceived = function(flowFrame) {
}
};

Link.prototype.addCredits = function(credits, flowOptions) {
if (this.role === constants.linkRole.sender) {
throw new errors.InvalidStateError('Cannot add link credits as a sender');
}

var opts = flowOptions || {};
this.linkCredit += credits;
this.totalCredits += credits;
opts.linkCredit = this.totalCredits;
this.session._sessionParams.incomingWindow += credits;
opts.nextIncomingId = this.session._sessionParams.nextIncomingId;
opts.incomingWindow = this.session._sessionParams.incomingWindow;
opts.nextOutgoingId = this.session._sessionParams.nextOutgoingId;
opts.outgoingWindow = this.session._sessionParams.outgoingWindow;
opts.handle = this.handle;
opts.available = this.available;
opts.deliveryCount = this.deliveryCount;
opts.drain = false;
var flow = new FlowFrame(opts);
flow.channel = this.session.channel;
this.session.connection.sendFrame(flow);

var self = this;
return new Promise(function(resolve, reject) {
var onError = function(err) { reject(err); };
self.once(Link.ErrorReceived, onError);
self.once(Link.CreditChange, function() {
self.removeListener(Link.ErrorReceived, onError);
resolve();
});
});
};

Link.prototype._checkCredit = function() {
if (this.role === constants.linkRole.receiver) {
if (this.policy.credit && typeof this.policy.credit === 'function') {
Expand Down Expand Up @@ -258,4 +280,16 @@ Link.prototype._detached = function(frame) {
this.session._linkDetached(this);
};

Link.prototype._sendDisposition = function(options) {
var dispositionOptions = _.defaults(options, {
role: constants.linkRole.receiver,
channel: this.session.channel,
handle: this.handle
});

this.session.connection.sendFrame(
new DispositionFrame(dispositionOptions)
);
};

module.exports = Link;

0 comments on commit 423952e

Please sign in to comment.