Skip to content

Commit

Permalink
Fix BackbeatConsumer breakbeat tests
Browse files Browse the repository at this point in the history
The tests used to run setup hooks in parallel, so both consumers could
"see" the bootstrap message, causing the test to fail.

Also, we need to push some messages: otherwise we cannot see that
breakbeat is really "blocking" the consumption.

Issue: BB-441
  • Loading branch information
francoisferrand committed Oct 25, 2023
1 parent c35ac9c commit 4093ca7
Showing 1 changed file with 50 additions and 56 deletions.
106 changes: 50 additions & 56 deletions tests/functional/lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -559,12 +559,56 @@ describe('BackbeatConsumer "deferred committable" tests', () => {
});

describe('BackbeatConsumer with circuit breaker', () => {
const nMessages = 0;
const topicBreaker = 'backbeat-consumer-spec-breaker';
let groupIdBreaker;
let producer;
let consumer;
let consumedMessages = [];

function queueProcessor(message, cb) {
consumedMessages.push(message.value);
process.nextTick(cb);
}

beforeEach(function before(done) {
this.timeout(60000);

groupIdBreaker = `replication-group-breaker-${Math.random()}`;

producer = new BackbeatProducer({
kafka: producerKafkaConf,
topic: topicBreaker,
pollIntervalMs: 100,
});
consumer = new BackbeatConsumer({
zookeeper: zookeeperConf,
kafka: consumerKafkaConf, groupId: groupIdBreaker, topic: topicBreaker,
queueProcessor,
concurrency: 10,
bootstrap: true,
circuitBreaker: this.currentTest.breakerConf,
});
async.parallel([
innerDone => producer.on('ready', innerDone),
innerDone => consumer.on('ready', innerDone),
], done);
});

afterEach(done => {
consumedMessages = [];
consumer.removeAllListeners('consumed');

async.parallel([
innerDone => producer.close(innerDone),
innerDone => consumer.close(innerDone),
], done);
});

const nMessages = 50;

const testCases = [
{
description: 'should consume if breaker state nominal',
startDelayMs: 0,
expectedMessages: nMessages,
breakerConf: {
probes: [
Expand All @@ -577,7 +621,6 @@ describe('BackbeatConsumer with circuit breaker', () => {
},
{
description: 'should not consume if breaker state not nominal',
startDelayMs: 50,
expectedMessages: 0,
breakerConf: {
nominalEvaluateIntervalMs: 1,
Expand All @@ -592,59 +635,7 @@ describe('BackbeatConsumer with circuit breaker', () => {
];

testCases.forEach(t => {
const topicBreaker = 'backbeat-consumer-spec-breaker';
const groupIdBreaker = `replication-group-breaker-${Math.random()}`;
let producer;
let consumer;
let consumedMessages = [];
let taskStuckCallbacks = [];

function queueProcessor(message, cb) {
if (message.value.toString() !== 'taskStuck') {
consumedMessages.push(message.value);
process.nextTick(cb);
} else {
taskStuckCallbacks.push(cb);
}
}
before(function before(done) {
this.timeout(60000);

producer = new BackbeatProducer({
kafka: producerKafkaConf,
topic: topicBreaker,
pollIntervalMs: 100,
});
consumer = new BackbeatConsumer({
zookeeper: zookeeperConf,
kafka: consumerKafkaConf, groupId: groupIdBreaker, topic: topicBreaker,
queueProcessor,
concurrency: 10,
bootstrap: true,
circuitBreaker: t.breakerConf,
});
async.parallel([
innerDone => producer.on('ready', innerDone),
innerDone => consumer.on('ready', innerDone),
], (err, res) => {
setTimeout(() => done(err, res), t.startDelayMs);
});
});
afterEach(() => {
consumedMessages = [];
consumer.removeAllListeners('consumed');

taskStuckCallbacks.map(cb => cb());
taskStuckCallbacks = [];
});
after(done => {
async.parallel([
innerDone => producer.close(innerDone),
innerDone => consumer.close(innerDone),
], done);
});

it(t.description, done => {
const test = it(t.description, done => {
const boatloadOfMessages = [];
for (let i = 0; i < nMessages; ++i) {
boatloadOfMessages.push({
Expand Down Expand Up @@ -672,5 +663,8 @@ describe('BackbeatConsumer with circuit breaker', () => {
},
], done);
});

// Attach breakerConf to the test, so it can be used from the hooks
test.breakerConf = t.breakerConf;
});
});

0 comments on commit 4093ca7

Please sign in to comment.