Skip to content

Commit

Permalink
Fix BackbeatProducer event.error handlers set twice.
Browse files Browse the repository at this point in the history
Issue : BB-523
  • Loading branch information
benzekrimaha committed Sep 23, 2024
1 parent 0a3aa3c commit 92520d5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
3 changes: 2 additions & 1 deletion lib/BackbeatProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class BackbeatProducer extends EventEmitter {
this._producer.on('event', event => this._log.info('rdkafka.event', { event }));
this._producer.on('event.log', log => this._log.info('rdkafka.log', { log }));
this._producer.on('warning', warning => this._log.warn('rdkafka.warning', { warning }));
this._producer.on('event.error', err => this._log.error('rdkafka.error', { err }));
this._producer.on('event.throttle', throttle => this._log.info('rdkafka.throttle', { throttle }));
this._producer.on('event.stats', observeKafkaStats);

Expand Down Expand Up @@ -196,6 +195,8 @@ class BackbeatProducer extends EventEmitter {
method: `${this.getClientId()}.constructor`,
});
this.emit('error', error);
} else {
this._log.error('rdkafka.error', { error });
}
}

Expand Down
46 changes: 46 additions & 0 deletions tests/unit/backbeatProducer.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,56 @@
const assert = require('assert');

const BackbeatProducer = require('../../lib/BackbeatProducer');
const sinon = require('sinon');
const CODES = require('node-rdkafka').CODES;

const { kafka } = require('../config.json');

describe('backbeatProducer', () => {
describe('onEventError', () => {
let backbeatProducer;
let logErrorStub;
let emitStub;

beforeEach(() => {
backbeatProducer = new BackbeatProducer({ kafka });
logErrorStub = sinon.stub(backbeatProducer._log, 'error');
emitStub = sinon.stub(backbeatProducer, 'emit');
});

afterEach(() => {
sinon.restore();
});

it('should log and emit error for ERR__ALL_BROKERS_DOWN', () => {
const error = { code: CODES.ERRORS.ERR__ALL_BROKERS_DOWN, message: 'All brokers down' };
backbeatProducer.onEventError(error);

assert(logErrorStub.calledOnce);
assert(logErrorStub.calledWith('error with producer'));
assert(emitStub.calledOnce);
assert(emitStub.calledWith('error', error));
});

it('should log and emit error for ERR__TRANSPORT', () => {
const error = { code: CODES.ERRORS.ERR__TRANSPORT, message: 'Transport error' };
backbeatProducer.onEventError(error);

assert(logErrorStub.calledOnce);
assert(logErrorStub.calledWith('error with producer'));
assert(emitStub.calledOnce);
assert(emitStub.calledWith('error', error));
});

it('should log rdkafka.error for other errors', () => {
const error = { code: 'OTHER_ERROR', message: 'Some other error' };
backbeatProducer.onEventError(error);

assert(logErrorStub.calledOnce);
assert(logErrorStub.calledWith('rdkafka.error', { error }));
assert(emitStub.notCalled);
});
});
it('should use default topic name without prefix', () => {
const backbeatProducer = new BackbeatProducer({
kafka,
Expand Down

0 comments on commit 92520d5

Please sign in to comment.