From 92520d512600a474c3d104607ac23f8fec783ccd Mon Sep 17 00:00:00 2001 From: Maha Benzekri Date: Tue, 3 Sep 2024 15:00:20 +0200 Subject: [PATCH] Fix BackbeatProducer event.error handlers set twice. Issue : BB-523 --- lib/BackbeatProducer.js | 3 ++- tests/unit/backbeatProducer.js | 46 ++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/lib/BackbeatProducer.js b/lib/BackbeatProducer.js index b8aca3930..df8b00c7e 100644 --- a/lib/BackbeatProducer.js +++ b/lib/BackbeatProducer.js @@ -36,7 +36,6 @@ class BackbeatProducer extends EventEmitter { this._producer.on('event', event => this._log.info('rdkafka.event', { event })); this._producer.on('event.log', log => this._log.info('rdkafka.log', { log })); this._producer.on('warning', warning => this._log.warn('rdkafka.warning', { warning })); - this._producer.on('event.error', err => this._log.error('rdkafka.error', { err })); this._producer.on('event.throttle', throttle => this._log.info('rdkafka.throttle', { throttle })); this._producer.on('event.stats', observeKafkaStats); @@ -196,6 +195,8 @@ class BackbeatProducer extends EventEmitter { method: `${this.getClientId()}.constructor`, }); this.emit('error', error); + } else { + this._log.error('rdkafka.error', { error }); } } diff --git a/tests/unit/backbeatProducer.js b/tests/unit/backbeatProducer.js index 2b6582142..793e2c856 100644 --- a/tests/unit/backbeatProducer.js +++ b/tests/unit/backbeatProducer.js @@ -1,10 +1,56 @@ const assert = require('assert'); const BackbeatProducer = require('../../lib/BackbeatProducer'); +const sinon = require('sinon'); +const CODES = require('node-rdkafka').CODES; const { kafka } = require('../config.json'); describe('backbeatProducer', () => { + describe('onEventError', () => { + let backbeatProducer; + let logErrorStub; + let emitStub; + + beforeEach(() => { + backbeatProducer = new BackbeatProducer({ kafka }); + logErrorStub = sinon.stub(backbeatProducer._log, 'error'); + emitStub = sinon.stub(backbeatProducer, 'emit'); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('should log and emit error for ERR__ALL_BROKERS_DOWN', () => { + const error = { code: CODES.ERRORS.ERR__ALL_BROKERS_DOWN, message: 'All brokers down' }; + backbeatProducer.onEventError(error); + + assert(logErrorStub.calledOnce); + assert(logErrorStub.calledWith('error with producer')); + assert(emitStub.calledOnce); + assert(emitStub.calledWith('error', error)); + }); + + it('should log and emit error for ERR__TRANSPORT', () => { + const error = { code: CODES.ERRORS.ERR__TRANSPORT, message: 'Transport error' }; + backbeatProducer.onEventError(error); + + assert(logErrorStub.calledOnce); + assert(logErrorStub.calledWith('error with producer')); + assert(emitStub.calledOnce); + assert(emitStub.calledWith('error', error)); + }); + + it('should log rdkafka.error for other errors', () => { + const error = { code: 'OTHER_ERROR', message: 'Some other error' }; + backbeatProducer.onEventError(error); + + assert(logErrorStub.calledOnce); + assert(logErrorStub.calledWith('rdkafka.error', { error })); + assert(emitStub.notCalled); + }); + }); it('should use default topic name without prefix', () => { const backbeatProducer = new BackbeatProducer({ kafka,