Skip to content

Commit

Permalink
feat(connection): terminate connection when frame steam interrupted
Browse files Browse the repository at this point in the history
Previously, if invalid data was encountered while processing
incoming frames we would silently accept the data and continue on
our merry way - except we wouldn't process any future data. This
patch introduces a rather harsh assumption that incoming junk data
indicates a bad connection, and therefore terminates the connection
It could be argued that in the future we simply throw the junk data
away and wait for more frames, though it remains to be seen who
would actually want that functionality.
  • Loading branch information
mbroadst committed Nov 18, 2016
1 parent 2003782 commit 5455179
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 55 deletions.
46 changes: 28 additions & 18 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ Connection.prototype.close = function() {
this.connSM.sendClose();
};

Connection.prototype.sendFrame = function(frame) {
Connection.prototype.sendFrame = function(frame, callback) {
this._lastOutgoing = Date.now();
frames.writeFrame(frame, this._transport);
frames.writeFrame(frame, this._transport, callback);
};

Connection.prototype.associateSession = function(session) {
Expand Down Expand Up @@ -397,18 +397,26 @@ Connection.prototype._tryReceiveHeader = function(header) {
};

Connection.prototype._receiveAny = function() {
var frame = null;
while (true) {
if (this.sasl && !this.sasl.receivedHeader) {
if (!this._tryReceiveHeader(constants.saslVersion)) break;
} else if (!this.sasl && !this._receivedHeader) {
if (!this._tryReceiveHeader()) break;
}

var frame = frames.readFrame(this._buffer);
if (!frame) {
try {
frame = frames.readFrame(this._buffer);
} catch (e) {
var self = this;
this._sendCloseFrame(ErrorCondition.ConnectionFramingError, e.message, function(err) { // jshint ignore:line
self._terminate(true);
});

break;
}

if (!frame) break;
if (frame instanceof frames.OpenFrame) {
this._processOpenFrame(frame);
} else if (frame instanceof frames.CloseFrame) {
Expand Down Expand Up @@ -473,8 +481,13 @@ Connection.prototype._processOpenFrame = function(frame) {
if (this._buffer.length) this._receiveAny(); // Might have more frames pending.
};

Connection.prototype._sendCloseFrame = function(condition, description) {
this.sendFrame(new frames.CloseFrame({ condition: condition, description: description }));
Connection.prototype._sendCloseFrame = function(condition, description, callback) {
var error = null;
if (!!condition) {
error = { condition: condition, description: description };
}

this.sendFrame(new frames.CloseFrame({ error: error }), callback);
};

Connection.prototype._processCloseFrame = function(frame) {
Expand All @@ -492,22 +505,19 @@ Connection.prototype._connect = function(address, sasl) {
self.dataHandler = self._receiveHeader;
self._transport = TransportProvider.getTransportFor(address.protocol);

if(!self._transport) {
if (!self._transport) {
throw new Error('Invalid protocol, no associated transport. Please use client.register(protocol, transport) to register a new transport.');
}

self._transport.connect(address, self._sslOptions);

self._transport.on('connect', function() {
self.connSM.connected(sasl);
}).on('data', function(buf) {
self._receiveData(buf);
}).on('error', function(err) {
self.connSM.error(err);
}).on('end', function() {
debug('on(end)');
self.connSM.terminated();
});
self._transport
.on('connect', function() { self.connSM.connected(sasl); })
.on('data', function(buf) { self._receiveData(buf); })
.on('error', function(err) { self.connSM.error(err); })
.on('end', function() {
debug('on(end)');
self.connSM.terminated();
});
};

Connection.prototype._saslComplete = function(err) {
Expand Down
7 changes: 4 additions & 3 deletions lib/frames.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ frames.SaslFrame = function() {
};
util.inherits(frames.SaslFrame, frames.Frame);

frames.writeFrame = function(frame, stream) {
frames.writeFrame = function(frame, stream, callback) {
if (!(frame instanceof frames.Frame)) {
throw new errors.EncodingError(frame, 'unknown frame type');
}
Expand All @@ -53,17 +53,18 @@ frames.writeFrame = function(frame, stream) {
buffer.writeUInt32BE(buffer.length, 0);
debug('sending frame: ', frame);
trace('raw: [' + buffer.toString('hex') + ']');
stream.write(buffer);
stream.write(buffer, callback);
};

frames.readFrame = function(buffer) {
if (buffer.length < 8) return undefined;

var sizeAndDoff = buffer.slice(0, 8);
var size = sizeAndDoff.readUInt32BE(0);
var doff = sizeAndDoff.readUInt8(4);
if (doff !== 2) throw new errors.MalformedHeaderError('Invalid DOFF');
if (size > buffer.length) return undefined;

var doff = sizeAndDoff[4];
var frameType = sizeAndDoff[5];
if (frameType !== FrameType.AMQP && frameType !== FrameType.SASL) {
throw new errors.NotImplementedError('Unsupported frame type: ' + frameType);
Expand Down
2 changes: 1 addition & 1 deletion lib/transport/abstract_transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ AbstractTransport.prototype.connect = function (address) {
throwBecauseAbstract('connect');
};

AbstractTransport.prototype.write = function (data) {
AbstractTransport.prototype.write = function (data, callback) {
throwBecauseAbstract('write');
};

Expand Down
4 changes: 2 additions & 2 deletions lib/transport/net_transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ NetTransport.prototype.connect = function (address) {
this._socket.on('end', function() { self.emit('end'); });
};

NetTransport.prototype.write = function (data) {
NetTransport.prototype.write = function (data, callback) {
if (!this._socket) {
throw new errors.TransportError('Socket not connected');
}

this._socket.write(data);
this._socket.write(data, callback);
};

NetTransport.prototype.end = function() {
Expand Down
4 changes: 2 additions & 2 deletions lib/transport/tls_transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ TlsTransport.prototype.connect = function (address, sslOpts) {
this._socket.on('end', function() { self.emit('end'); });
};

TlsTransport.prototype.write = function (data) {
TlsTransport.prototype.write = function (data, callback) {
if (!this._socket) {
throw new errors.TransportError('Socket not connected');
}

this._socket.write(data);
this._socket.write(data, callback);
};

TlsTransport.prototype.end = function() {
Expand Down
36 changes: 23 additions & 13 deletions test/unit/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,31 @@ describe('Client', function() {
});
});

it('should disconnect if a framing error occurs', function() {
test.server.setResponseSequence([
constants.amqpVersion,
new Buffer([ 0x41, 0x4d, 0x51, 0x50, 0x01, 0x01, 0x00, 0x0a ])
]);

test.server.setExpectedFrameSequence([
constants.amqpVersion,
false,
new frames.CloseFrame({
error: { condition: ErrorCondition.ConnectionFramingError, description: 'malformed header: Invalid DOFF' }
})
]);

return expect(test.client.connect(test.server.address()))
.to.eventually.be.rejectedWith(errors.DisconnectedError);
});

it('should receive multi-frame messages', function(done) {
var message = { body: { test: 'Really long message' } };
var messageBuf = encodeMessagePayload(message);
var buf1 = messageBuf.slice(0, 10);
var buf2 = messageBuf.slice(10, 15);
var buf3 = messageBuf.slice(15);

test.server.setResponseSequence([
constants.amqpVersion,
new frames.OpenFrame(test.client.policy.connect.options),
Expand All @@ -185,26 +204,17 @@ describe('Client', function() {
},
[
function (prev) {
var txFrame = new frames.TransferFrame({
handle: 1, deliveryId: 1,
more: true
});
var txFrame = new frames.TransferFrame({ handle: 1, deliveryId: 1, more: true });
txFrame.payload = buf1;
return txFrame;
},
function (prev) {
var txFrame = new frames.TransferFrame({
handle: 1, deliveryId: 1,
more: true
});
var txFrame = new frames.TransferFrame({ handle: 1, deliveryId: 1, more: true });
txFrame.payload = buf2;
return txFrame;
},
function (prev) {
var txFrame = new frames.TransferFrame({
handle: 1,
more: false
});
var txFrame = new frames.TransferFrame({ handle: 1, more: false });
txFrame.payload = buf3;
return txFrame;
}
Expand Down Expand Up @@ -575,7 +585,7 @@ describe('Client', function() {
})
]);

// restart the server after 1s
// restart the server after 10ms
setTimeout(function() { return test.server.setup(); }, 10);

var address = test.server.address();
Expand Down
8 changes: 8 additions & 0 deletions test/unit/frames.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var frames = require('../../lib/frames'),
errors = require('../../lib/errors'),
builder = require('buffer-builder'),
constants = require('../../lib/constants'),
tu = require('./../testing_utils'),
Expand All @@ -14,6 +15,13 @@ var frames = require('../../lib/frames'),
translator = require('../../lib/adapters/translate_encoder');

describe('Frames', function() {
describe('errors', function() {
it('should throw an error on invalid DOFF', function() {
var actual = new Buffer([ 0x41, 0x4d, 0x51, 0x50, 0x01, 0x01, 0x00, 0x0a ]);
expect(function() { frames.readFrame(actual); }).to.throw(errors.MalformedHeaderError);
});
}); // Errors

describe('OpenFrame', function() {
it('should encode performative correctly', function() {
var open = new frames.OpenFrame({ containerId: 'test', hostname: 'localhost' });
Expand Down
31 changes: 15 additions & 16 deletions test/unit/mocks/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,22 @@ MockServer.prototype.setup = function() {
};

MockServer.prototype._checkExpectations = function(data) {
if (this._expectedFrames.length) {
var idx = 0;
var expectedFrame = this._expectedFrames.shift();
while (true) {
if (data.length <= idx + expectedFrame.length) break;
if (expectedFrame === false || expectedFrame === undefined) break;
var actualFrame = data.slice(idx, idx + expectedFrame.length);
debug('expected(', expectedFrame.length, '): ' + expectedFrame.toString('hex'));
debug(' actual(', actualFrame.length, '): ', actualFrame.toString('hex'));
expect(actualFrame).to.eql(expectedFrame);

if (this._expectedFrames[0] === false) break;
if (idx >= data.length) break;

idx += expectedFrame.length;
expectedFrame = this._expectedFrames.shift();
var idx = 0, expectedFrame;
while (this._expectedFrames.length) {
expectedFrame = this._expectedFrames.shift();
if (data.length < idx + expectedFrame.length) {
this._expectedFrames.unshift(expectedFrame);
break;
}

if (expectedFrame === false || expectedFrame === undefined) break;
var actualFrame = data.slice(idx, idx + expectedFrame.length);
debug('expected(', expectedFrame.length, '):', expectedFrame.toString('hex'));
debug(' actual(', actualFrame.length, '):', actualFrame.toString('hex'));
expect(actualFrame).to.eql(expectedFrame);
if (this._expectedFrames[0] === false) break;
if (idx >= data.length) break;
idx += expectedFrame.length;
}

this._seenFrames.push(new BufferList(data));
Expand Down

0 comments on commit 5455179

Please sign in to comment.