diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index ae9454071..f1bd46f16 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -1,4 +1,5 @@ const joi = require('joi'); +const uuid = require('uuid'); const { errors } = require('arsenal'); const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper'); @@ -109,6 +110,14 @@ class Connector { } } + /** + * Updates partition name in connector config + * @returns {undefined} + */ + updatePartitionName() { + this._config['offset.partition.name'] = `partition-${uuid.v4()}`; + } + /** * Creates the Kafka-connect mongo connector * @returns {Promise|undefined} undefined @@ -122,6 +131,8 @@ class Connector { }); return; } + // reset resume token to avoid getting outdated token + this.updatePartitionName(); try { await this._kafkaConnect.createConnector({ name: this._name, diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index e3fed22e0..2c95edd78 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -49,6 +49,15 @@ describe('Connector', () => { })); assert.strictEqual(connector.isRunning, true); }); + it('Should change partition name on creation', async () => { + sinon.stub(connector._kafkaConnect, 'createConnector') + .resolves(); + await connector.spawn(); + const partitionName = connector.config['offset.partition.name']; + connector._isRunning = false; + await connector.spawn(); + assert.notStrictEqual(partitionName, connector.config['offset.partition.name']); + }); it('Should not try spawning a new connector when on is already existent', async () => { const createStub = sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); @@ -239,4 +248,12 @@ describe('Connector', () => { assert.strictEqual(size, 15); }); }); + + describe('updatePartitionName', () => { + it('Should update partition name in config', () => { + connector._config['offset.partition.name'] = 'partition-name'; + connector.updatePartitionName(); + assert.notStrictEqual(connector._config['offset.partition.name'], 'partition-name'); + }); + }); });