diff --git a/src/lease-manager.ts b/src/lease-manager.ts index b616e0ce2..084f4b759 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -15,7 +15,7 @@ */ import {EventEmitter} from 'events'; -import {Message, Subscriber} from './subscriber'; +import {AckError, Message, Subscriber} from './subscriber'; import {defaultOptions} from './default-options'; export interface FlowControlOptions { @@ -257,7 +257,16 @@ export class LeaseManager extends EventEmitter { const lifespan = (Date.now() - message.received) / (60 * 1000); if (lifespan < this._options.maxExtensionMinutes!) { - message.modAck(deadline); + if (this._subscriber.isExactlyOnceDelivery) { + message.modAckWithResponse(deadline).catch(e => { + // In the case of a permanent failure (temporary failures are retried), + // we need to stop trying to lease-manage the message. + message.ackFailed(e as AckError); + this.remove(message); + }); + } else { + message.modAck(deadline); + } } else { this.remove(message); } diff --git a/src/subscriber.ts b/src/subscriber.ts index 46d4b8a84..fe5defc8e 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -103,6 +103,8 @@ export class Message { private _handled: boolean; private _length: number; private _subscriber: Subscriber; + private _ackFailed?: AckError; + /** * @hideconstructor * @@ -194,6 +196,16 @@ export class Message { return this._length; } + /** + * Sets this message's exactly once delivery acks to permanent failure. This is + * meant for internal library use only. + * + * @private + */ + ackFailed(error: AckError): void { + this._ackFailed = error; + } + /** * Acknowledges the message. * @@ -228,9 +240,18 @@ export class Message { return AckResponses.Success; } + if (this._ackFailed) { + throw this._ackFailed; + } + if (!this._handled) { this._handled = true; - return await this._subscriber.ackWithResponse(this); + try { + return await this._subscriber.ackWithResponse(this); + } catch (e) { + this.ackFailed(e as AckError); + throw e; + } } else { return AckResponses.Invalid; } @@ -261,8 +282,17 @@ export class Message { return AckResponses.Success; } + if (this._ackFailed) { + throw this._ackFailed; + } + if (!this._handled) { - return await this._subscriber.modAckWithResponse(this, deadline); + try { + return await this._subscriber.modAckWithResponse(this, deadline); + } catch (e) { + this.ackFailed(e as AckError); + throw e; + } } else { return AckResponses.Invalid; } @@ -303,9 +333,18 @@ export class Message { return AckResponses.Success; } + if (this._ackFailed) { + throw this._ackFailed; + } + if (!this._handled) { this._handled = true; - return await this._subscriber.nackWithResponse(this); + try { + return await this._subscriber.nackWithResponse(this); + } catch (e) { + this.ackFailed(e as AckError); + throw e; + } } else { return AckResponses.Invalid; } @@ -824,8 +863,23 @@ export class Subscriber extends EventEmitter { const span: Span | undefined = this._constructSpan(message); if (this.isOpen) { - message.modAck(this.ackDeadline); - this._inventory.add(message); + if (this.isExactlyOnceDelivery) { + // For exactly-once delivery, we must validate that we got a valid + // lease on the message before actually leasing it. + message + .modAckWithResponse(this.ackDeadline) + .then(() => { + this._inventory.add(message); + }) + .catch(() => { + // Temporary failures will retry, so if an error reaches us + // here, that means a permanent failure. Silently drop these. + this._discardMessage(message); + }); + } else { + message.modAck(this.ackDeadline); + this._inventory.add(message); + } } else { message.nack(); } @@ -835,6 +889,11 @@ export class Subscriber extends EventEmitter { } } + // Internal: This is here to provide a hook for unit testing, at least for now. + private _discardMessage(message: Message): void { + message; + } + /** * Returns a promise that will resolve once all pending requests have settled. * diff --git a/test/lease-manager.ts b/test/lease-manager.ts index cf8def1a6..2ceb17cf7 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -19,9 +19,16 @@ import {describe, it, before, beforeEach, afterEach} from 'mocha'; import {EventEmitter} from 'events'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; +import * as defer from 'p-defer'; import * as leaseTypes from '../src/lease-manager'; -import {Message, Subscriber} from '../src/subscriber'; +import { + AckError, + AckResponse, + AckResponses, + Message, + Subscriber, +} from '../src/subscriber'; import {defaultOptions} from '../src/default-options'; const FREE_MEM = 9376387072; @@ -34,6 +41,10 @@ class FakeSubscriber extends EventEmitter { isOpen = true; modAckLatency = 2000; async modAck(): Promise {} + async modAckWithResponse(): Promise { + return AckResponses.Success; + } + isExactlyOnceDelivery = false; } class FakeMessage { @@ -43,6 +54,21 @@ class FakeMessage { this.received = Date.now(); } modAck(): void {} + async modAckWithResponse(): Promise { + return AckResponses.Success; + } + ackFailed() {} +} + +interface LeaseManagerInternals { + _extendDeadlines(): void; + _messages: Set; + _isLeasing: boolean; + _scheduleExtension(): void; +} + +function getLMInternals(mgr: leaseTypes.LeaseManager): LeaseManagerInternals { + return mgr as unknown as LeaseManagerInternals; } describe('LeaseManager', () => { @@ -207,6 +233,18 @@ describe('LeaseManager', () => { assert.strictEqual(stub.callCount, 1); }); + it('should schedule a lease extension for exactly-once delivery', () => { + const message = new FakeMessage() as {} as Message; + const stub = sandbox + .stub(message, 'modAck') + .withArgs(subscriber.ackDeadline); + + leaseManager.add(message); + clock.tick(expectedTimeout); + + assert.strictEqual(stub.callCount, 1); + }); + it('should not schedule a lease extension if already in progress', () => { const messages = [new FakeMessage(), new FakeMessage()]; const stubs = messages.map(message => sandbox.stub(message, 'modAck')); @@ -274,6 +312,32 @@ describe('LeaseManager', () => { assert.strictEqual(deadline, subscriber.ackDeadline); }); + it('should remove and ackFailed any messages that fail to ack', done => { + (subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true; + + leaseManager.setOptions({ + maxExtensionMinutes: 600, + }); + + const goodMessage = new FakeMessage(); + + const removeStub = sandbox.stub(leaseManager, 'remove'); + const mawrStub = sandbox + .stub(goodMessage, 'modAckWithResponse') + .rejects(new AckError(AckResponses.Invalid)); + const failed = sandbox.stub(goodMessage, 'ackFailed'); + + removeStub.callsFake(() => { + assert.strictEqual(mawrStub.callCount, 1); + assert.strictEqual(removeStub.callCount, 1); + assert.strictEqual(failed.callCount, 1); + done(); + }); + + leaseManager.add(goodMessage as {} as Message); + clock.tick(halfway * 2 + 1); + }); + it('should continuously extend the deadlines', () => { const message = new FakeMessage(); // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -473,4 +537,86 @@ describe('LeaseManager', () => { assert.strictEqual(leaseManager.isFull(), true); }); }); + + describe('deadline extension', () => { + beforeEach(() => { + sandbox.useFakeTimers(); + }); + afterEach(() => { + sandbox.clock.restore(); + }); + + it('calls regular modAck periodically w/o exactly-once', () => { + const lmi = getLMInternals(leaseManager); + const msg = new Message(subscriber, { + ackId: 'ackack', + message: {data: ''}, + deliveryAttempt: 0, + }); + sandbox.clock.tick(1); + + const maStub = sandbox.stub(msg, 'modAck'); + + lmi._messages.add(msg); + lmi._extendDeadlines(); + + assert.ok(maStub.calledOnce); + }); + + it('calls modAckWithResponse periodically w/exactly-once, successful', async () => { + const lmi = getLMInternals(leaseManager); + const msg = new Message(subscriber, { + ackId: 'ackack', + message: {data: ''}, + deliveryAttempt: 0, + }); + sandbox.clock.tick(1); + (subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true; + + const done = defer(); + sandbox.stub(msg, 'modAck').callsFake(() => { + console.error('oops we did it wrong'); + }); + + const maStub = sandbox.stub(msg, 'modAckWithResponse'); + maStub.callsFake(async () => { + done.resolve(); + return AckResponses.Success; + }); + + lmi._messages.add(msg); + lmi._extendDeadlines(); + + await done.promise; + assert.ok(maStub.calledOnce); + }); + + it('calls modAckWithResponse periodically w/exactly-once, failure', async () => { + const lmi = getLMInternals(leaseManager); + const msg = new Message(subscriber, { + ackId: 'ackack', + message: {data: ''}, + deliveryAttempt: 0, + }); + sandbox.clock.tick(1); + (subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true; + + const done = defer(); + + const maStub = sandbox.stub(msg, 'modAckWithResponse'); + maStub.callsFake(async () => { + done.resolve(); + throw new AckError(AckResponses.Invalid); + }); + const rmStub = sandbox.stub(leaseManager, 'remove'); + + lmi._messages.add(msg); + lmi._extendDeadlines(); + + await done.promise; + + assert.ok(maStub.calledOnce); + assert.ok(rmStub.calledOnce); + }); + }); }); diff --git a/test/subscriber.ts b/test/subscriber.ts index aa9a953ef..cab00e286 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -24,6 +24,8 @@ import * as sinon from 'sinon'; import {PassThrough} from 'stream'; import * as uuid from 'uuid'; import * as opentelemetry from '@opentelemetry/api'; +import {google} from '../protos/protos'; +import * as defer from 'p-defer'; import {HistogramOptions} from '../src/histogram'; import {FlowControlOptions} from '../src/lease-manager'; @@ -35,6 +37,8 @@ import {SpanKind} from '@opentelemetry/api'; import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; import {Duration} from '../src'; +type PullResponse = google.pubsub.v1.IStreamingPullResponse; + const stubs = new Map(); class FakeClient {} @@ -129,6 +133,7 @@ class FakeMessageStream extends PassThrough { this.options = options; stubs.set('messageStream', this); } + setStreamAckDeadline(): void {} _destroy( // eslint-disable-next-line @typescript-eslint/no-unused-vars _error: Error | null, @@ -156,6 +161,17 @@ const RECEIVED_MESSAGE = { }, }; +interface SubInternals { + _stream: FakeMessageStream; + _inventory: FakeLeaseManager; + _onData(response: PullResponse): void; + _discardMessage(message: s.Message): void; +} + +function getSubInternals(sub: s.Subscriber) { + return sub as unknown as SubInternals; +} + describe('Subscriber', () => { let sandbox: sinon.SinonSandbox; @@ -233,6 +249,106 @@ describe('Subscriber', () => { }); }); + describe('receive', () => { + it('should add incoming messages to inventory w/o exactly-once', () => { + const sub = new Subscriber(subscription); + sub.isOpen = true; + const subint = getSubInternals(sub); + const modAckStub = sandbox.stub(sub, 'modAck'); + subint._inventory = new FakeLeaseManager(sub, {}); + const addStub = sandbox.stub(subint._inventory, 'add'); + subint._onData({ + subscriptionProperties: { + exactlyOnceDeliveryEnabled: false, + messageOrderingEnabled: false, + }, + receivedMessages: [ + { + ackId: 'ackack', + message: { + data: 'foo', + attributes: {}, + }, + }, + ], + }); + + assert.ok(modAckStub.calledOnce); + assert.ok(addStub.calledOnce); + }); + + it('should add incoming messages to inventory w/exactly-once, success', async () => { + const sub = new Subscriber(subscription); + sub.isOpen = true; + const subint = getSubInternals(sub); + subint._stream = new FakeMessageStream(sub, {}); + subint._inventory = new FakeLeaseManager(sub, {}); + const modAckStub = sandbox.stub(sub, 'modAckWithResponse'); + modAckStub.callsFake(async () => s.AckResponses.Success); + const addStub = sandbox.stub(subint._inventory, 'add'); + const done = defer(); + addStub.callsFake(() => { + assert.ok(modAckStub.calledOnce); + done.resolve(); + }); + subint._onData({ + subscriptionProperties: { + exactlyOnceDeliveryEnabled: true, + messageOrderingEnabled: false, + }, + receivedMessages: [ + { + ackId: 'ackack', + message: { + data: 'foo', + attributes: {}, + }, + }, + ], + }); + + await done.promise; + }); + + it('should add incoming messages to inventory w/exactly-once, permanent failure', async () => { + const sub = new Subscriber(subscription); + sub.isOpen = true; + const subint = getSubInternals(sub); + subint._stream = new FakeMessageStream(sub, {}); + subint._inventory = new FakeLeaseManager(sub, {}); + + const done = defer(); + + const modAckStub = sandbox.stub(sub, 'modAckWithResponse'); + modAckStub.rejects(new s.AckError(s.AckResponses.Invalid)); + const addStub = sandbox.stub(subint._inventory, 'add'); + const discardStub = sandbox.stub(subint, '_discardMessage'); + discardStub.callsFake(() => { + assert.ok(modAckStub.calledOnce); + assert.ok(addStub.notCalled); + done.resolve(); + }); + + subint._onData({ + subscriptionProperties: { + exactlyOnceDeliveryEnabled: true, + messageOrderingEnabled: false, + }, + receivedMessages: [ + { + ackId: 'ackack', + message: { + data: 'foo', + attributes: {}, + }, + }, + ], + }); + + await done.promise; + }); + }); + describe('modAckLatency', () => { it('should get the 99th percentile latency', () => { const latencies: FakeHistogram = stubs.get('latencies'); @@ -932,6 +1048,27 @@ describe('Subscriber', () => { assert.strictEqual(msg, message); }); + it('should ack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'ackWithResponse'); + + stub.resolves(s.AckResponses.Success); + const response = await message.ackWithResponse(); + assert.strictEqual(response, s.AckResponses.Success); + }); + + it('should fail to ack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'ackWithResponse'); + + stub.rejects(new s.AckError(s.AckResponses.Invalid)); + await assert.rejects(message.ackWithResponse()); + + // Should cache the result also. + await assert.rejects(message.ackWithResponse()); + assert.strictEqual(stub.callCount, 1); + }); + it('should not ack the message if its been handled', () => { const stub = sandbox.stub(subscriber, 'ack'); @@ -954,6 +1091,27 @@ describe('Subscriber', () => { assert.strictEqual(deadline, fakeDeadline); }); + it('should modAck the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'modAckWithResponse'); + + stub.resolves(s.AckResponses.Success); + const response = await message.modAckWithResponse(0); + assert.strictEqual(response, s.AckResponses.Success); + }); + + it('should fail to modAck the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'modAckWithResponse'); + + stub.rejects(new s.AckError(s.AckResponses.Invalid)); + await assert.rejects(message.modAckWithResponse(0)); + + // Should cache the result also. + await assert.rejects(message.modAckWithResponse(0)); + assert.strictEqual(stub.callCount, 1); + }); + it('should not modAck the message if its been handled', () => { const deadline = 10; const stub = sandbox.stub(subscriber, 'modAck'); @@ -976,6 +1134,27 @@ describe('Subscriber', () => { assert.strictEqual(delay, 0); }); + it('should nack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'nackWithResponse'); + + stub.resolves(s.AckResponses.Success); + const response = await message.nackWithResponse(); + assert.strictEqual(response, s.AckResponses.Success); + }); + + it('should fail to nack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'nackWithResponse'); + + stub.rejects(new s.AckError(s.AckResponses.Invalid)); + await assert.rejects(message.nackWithResponse()); + + // Should cache the result also. + await assert.rejects(message.nackWithResponse()); + assert.strictEqual(stub.callCount, 1); + }); + it('should not nack the message if its been handled', () => { const stub = sandbox.stub(subscriber, 'modAck');