Skip to content

Commit

Permalink
fix: handle receipt modAck and lease extensions with exactly-once del…
Browse files Browse the repository at this point in the history
…ivery correctly (#1709)

* fix: don't try to lease messages with failed acks in exactly-once delivery

* fix: don't pass messages to clients or start leasing in exactly-once delivery if the receipt modAck fails

* fix: handle receipt modAcks and lease extensions properly with exactly-once delivery enabled

* fix: permanently fail any ack/modAck/nack that fails once under exactly-once delivery
  • Loading branch information
feywind authored Apr 20, 2023
1 parent 0eb1ca8 commit d786d22
Show file tree
Hide file tree
Showing 4 changed files with 401 additions and 8 deletions.
13 changes: 11 additions & 2 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import {EventEmitter} from 'events';
import {Message, Subscriber} from './subscriber';
import {AckError, Message, Subscriber} from './subscriber';
import {defaultOptions} from './default-options';

export interface FlowControlOptions {
Expand Down Expand Up @@ -257,7 +257,16 @@ export class LeaseManager extends EventEmitter {
const lifespan = (Date.now() - message.received) / (60 * 1000);

if (lifespan < this._options.maxExtensionMinutes!) {
message.modAck(deadline);
if (this._subscriber.isExactlyOnceDelivery) {
message.modAckWithResponse(deadline).catch(e => {
// In the case of a permanent failure (temporary failures are retried),
// we need to stop trying to lease-manage the message.
message.ackFailed(e as AckError);
this.remove(message);
});
} else {
message.modAck(deadline);
}
} else {
this.remove(message);
}
Expand Down
69 changes: 64 additions & 5 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ export class Message {
private _handled: boolean;
private _length: number;
private _subscriber: Subscriber;
private _ackFailed?: AckError;

/**
* @hideconstructor
*
Expand Down Expand Up @@ -194,6 +196,16 @@ export class Message {
return this._length;
}

/**
* Sets this message's exactly once delivery acks to permanent failure. This is
* meant for internal library use only.
*
* @private
*/
ackFailed(error: AckError): void {
this._ackFailed = error;
}

/**
* Acknowledges the message.
*
Expand Down Expand Up @@ -228,9 +240,18 @@ export class Message {
return AckResponses.Success;
}

if (this._ackFailed) {
throw this._ackFailed;
}

if (!this._handled) {
this._handled = true;
return await this._subscriber.ackWithResponse(this);
try {
return await this._subscriber.ackWithResponse(this);
} catch (e) {
this.ackFailed(e as AckError);
throw e;
}
} else {
return AckResponses.Invalid;
}
Expand Down Expand Up @@ -261,8 +282,17 @@ export class Message {
return AckResponses.Success;
}

if (this._ackFailed) {
throw this._ackFailed;
}

if (!this._handled) {
return await this._subscriber.modAckWithResponse(this, deadline);
try {
return await this._subscriber.modAckWithResponse(this, deadline);
} catch (e) {
this.ackFailed(e as AckError);
throw e;
}
} else {
return AckResponses.Invalid;
}
Expand Down Expand Up @@ -303,9 +333,18 @@ export class Message {
return AckResponses.Success;
}

if (this._ackFailed) {
throw this._ackFailed;
}

if (!this._handled) {
this._handled = true;
return await this._subscriber.nackWithResponse(this);
try {
return await this._subscriber.nackWithResponse(this);
} catch (e) {
this.ackFailed(e as AckError);
throw e;
}
} else {
return AckResponses.Invalid;
}
Expand Down Expand Up @@ -824,8 +863,23 @@ export class Subscriber extends EventEmitter {
const span: Span | undefined = this._constructSpan(message);

if (this.isOpen) {
message.modAck(this.ackDeadline);
this._inventory.add(message);
if (this.isExactlyOnceDelivery) {
// For exactly-once delivery, we must validate that we got a valid
// lease on the message before actually leasing it.
message
.modAckWithResponse(this.ackDeadline)
.then(() => {
this._inventory.add(message);
})
.catch(() => {
// Temporary failures will retry, so if an error reaches us
// here, that means a permanent failure. Silently drop these.
this._discardMessage(message);
});
} else {
message.modAck(this.ackDeadline);
this._inventory.add(message);
}
} else {
message.nack();
}
Expand All @@ -835,6 +889,11 @@ export class Subscriber extends EventEmitter {
}
}

// Internal: This is here to provide a hook for unit testing, at least for now.
private _discardMessage(message: Message): void {
message;
}

/**
* Returns a promise that will resolve once all pending requests have settled.
*
Expand Down
148 changes: 147 additions & 1 deletion test/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ import {describe, it, before, beforeEach, afterEach} from 'mocha';
import {EventEmitter} from 'events';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as defer from 'p-defer';

import * as leaseTypes from '../src/lease-manager';
import {Message, Subscriber} from '../src/subscriber';
import {
AckError,
AckResponse,
AckResponses,
Message,
Subscriber,
} from '../src/subscriber';
import {defaultOptions} from '../src/default-options';

const FREE_MEM = 9376387072;
Expand All @@ -34,6 +41,10 @@ class FakeSubscriber extends EventEmitter {
isOpen = true;
modAckLatency = 2000;
async modAck(): Promise<void> {}
async modAckWithResponse(): Promise<AckResponse> {
return AckResponses.Success;
}
isExactlyOnceDelivery = false;
}

class FakeMessage {
Expand All @@ -43,6 +54,21 @@ class FakeMessage {
this.received = Date.now();
}
modAck(): void {}
async modAckWithResponse(): Promise<AckResponse> {
return AckResponses.Success;
}
ackFailed() {}
}

interface LeaseManagerInternals {
_extendDeadlines(): void;
_messages: Set<Message>;
_isLeasing: boolean;
_scheduleExtension(): void;
}

function getLMInternals(mgr: leaseTypes.LeaseManager): LeaseManagerInternals {
return mgr as unknown as LeaseManagerInternals;
}

describe('LeaseManager', () => {
Expand Down Expand Up @@ -207,6 +233,18 @@ describe('LeaseManager', () => {
assert.strictEqual(stub.callCount, 1);
});

it('should schedule a lease extension for exactly-once delivery', () => {
const message = new FakeMessage() as {} as Message;
const stub = sandbox
.stub(message, 'modAck')
.withArgs(subscriber.ackDeadline);

leaseManager.add(message);
clock.tick(expectedTimeout);

assert.strictEqual(stub.callCount, 1);
});

it('should not schedule a lease extension if already in progress', () => {
const messages = [new FakeMessage(), new FakeMessage()];
const stubs = messages.map(message => sandbox.stub(message, 'modAck'));
Expand Down Expand Up @@ -274,6 +312,32 @@ describe('LeaseManager', () => {
assert.strictEqual(deadline, subscriber.ackDeadline);
});

it('should remove and ackFailed any messages that fail to ack', done => {
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;

leaseManager.setOptions({
maxExtensionMinutes: 600,
});

const goodMessage = new FakeMessage();

const removeStub = sandbox.stub(leaseManager, 'remove');
const mawrStub = sandbox
.stub(goodMessage, 'modAckWithResponse')
.rejects(new AckError(AckResponses.Invalid));
const failed = sandbox.stub(goodMessage, 'ackFailed');

removeStub.callsFake(() => {
assert.strictEqual(mawrStub.callCount, 1);
assert.strictEqual(removeStub.callCount, 1);
assert.strictEqual(failed.callCount, 1);
done();
});

leaseManager.add(goodMessage as {} as Message);
clock.tick(halfway * 2 + 1);
});

it('should continuously extend the deadlines', () => {
const message = new FakeMessage();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -473,4 +537,86 @@ describe('LeaseManager', () => {
assert.strictEqual(leaseManager.isFull(), true);
});
});

describe('deadline extension', () => {
beforeEach(() => {
sandbox.useFakeTimers();
});
afterEach(() => {
sandbox.clock.restore();
});

it('calls regular modAck periodically w/o exactly-once', () => {
const lmi = getLMInternals(leaseManager);
const msg = new Message(subscriber, {
ackId: 'ackack',
message: {data: ''},
deliveryAttempt: 0,
});
sandbox.clock.tick(1);

const maStub = sandbox.stub(msg, 'modAck');

lmi._messages.add(msg);
lmi._extendDeadlines();

assert.ok(maStub.calledOnce);
});

it('calls modAckWithResponse periodically w/exactly-once, successful', async () => {
const lmi = getLMInternals(leaseManager);
const msg = new Message(subscriber, {
ackId: 'ackack',
message: {data: ''},
deliveryAttempt: 0,
});
sandbox.clock.tick(1);
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;

const done = defer();
sandbox.stub(msg, 'modAck').callsFake(() => {
console.error('oops we did it wrong');
});

const maStub = sandbox.stub(msg, 'modAckWithResponse');
maStub.callsFake(async () => {
done.resolve();
return AckResponses.Success;
});

lmi._messages.add(msg);
lmi._extendDeadlines();

await done.promise;
assert.ok(maStub.calledOnce);
});

it('calls modAckWithResponse periodically w/exactly-once, failure', async () => {
const lmi = getLMInternals(leaseManager);
const msg = new Message(subscriber, {
ackId: 'ackack',
message: {data: ''},
deliveryAttempt: 0,
});
sandbox.clock.tick(1);
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;

const done = defer();

const maStub = sandbox.stub(msg, 'modAckWithResponse');
maStub.callsFake(async () => {
done.resolve();
throw new AckError(AckResponses.Invalid);
});
const rmStub = sandbox.stub(leaseManager, 'remove');

lmi._messages.add(msg);
lmi._extendDeadlines();

await done.promise;

assert.ok(maStub.calledOnce);
assert.ok(rmStub.calledOnce);
});
});
});
Loading

0 comments on commit d786d22

Please sign in to comment.