diff --git a/src/consumer/fetchManager.js b/src/consumer/fetchManager.js index 7ec46eac1..309a0986e 100644 --- a/src/consumer/fetchManager.js +++ b/src/consumer/fetchManager.js @@ -38,7 +38,7 @@ const createFetchManager = ({ const current = getNodeIds() const hasChanged = nodeIds.length !== current.length || nodeIds.some(nodeId => !current.includes(nodeId)) - if (hasChanged) { + if (hasChanged && current.length !== 0) { throw new KafkaJSFetcherRebalanceError() } } diff --git a/src/consumer/fetchManager.spec.js b/src/consumer/fetchManager.spec.js index 5438a5410..9dbf26df8 100644 --- a/src/consumer/fetchManager.spec.js +++ b/src/consumer/fetchManager.spec.js @@ -4,6 +4,7 @@ const createFetchManager = require('./fetchManager') const Batch = require('./batch') const { newLogger } = require('testHelpers') const waitFor = require('../utils/waitFor') +const { KafkaJSNonRetriableError } = require('../errors') describe('FetchManager', () => { let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize @@ -72,4 +73,25 @@ describe('FetchManager', () => { fetchers = fetchManager.getFetchers() expect(fetchers).toHaveLength(3) }) + + describe('when all brokers have become unavailable', () => { + it('should not rebalance and let the error bubble up', async () => { + const fetchMock = jest.fn().mockImplementation(async nodeId => { + if (!getNodeIds().includes(nodeId)) { + throw new KafkaJSNonRetriableError('Node not found') + } + + return fetch(nodeId) + }) + getNodeIds.mockImplementation(() => seq(1)) + + fetchManager = createTestFetchManager({ concurrency: 1, fetch: fetchMock }) + const fetchManagerPromise = fetchManager.start() + + expect(fetchManager.getFetchers()).toHaveLength(1) + + getNodeIds.mockImplementation(() => seq(0)) + await expect(fetchManagerPromise).rejects.toThrow('Node not found') + }) + }) })