From 8d95c25f78d830ff132c7263a5a3ce564de218f4 Mon Sep 17 00:00:00 2001 From: Kerkesni Date: Wed, 25 Oct 2023 16:26:42 +0200 Subject: [PATCH] change connector partition each time it is created The partition name is used as an indicator in the offsets topic to distinguish between resume tokens of the different connectors. When a connector is respawned it should start processing event from the current moment and not use the previous resume token as it can be invalid if the connector stayed shut for a long time and the mongo oplog rotated. Issue: BB-463 --- extensions/oplogPopulator/modules/Connector.js | 11 +++++++++++ tests/unit/oplogPopulator/Connector.js | 17 +++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index ae9454071..f1bd46f16 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -1,4 +1,5 @@ const joi = require('joi'); +const uuid = require('uuid'); const { errors } = require('arsenal'); const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper'); @@ -109,6 +110,14 @@ class Connector { } } + /** + * Updates partition name in connector config + * @returns {undefined} + */ + updatePartitionName() { + this._config['offset.partition.name'] = `partition-${uuid.v4()}`; + } + /** * Creates the Kafka-connect mongo connector * @returns {Promise|undefined} undefined @@ -122,6 +131,8 @@ class Connector { }); return; } + // reset resume token to avoid getting outdated token + this.updatePartitionName(); try { await this._kafkaConnect.createConnector({ name: this._name, diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index e3fed22e0..2c95edd78 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -49,6 +49,15 @@ describe('Connector', () => { })); assert.strictEqual(connector.isRunning, true); }); + it('Should change partition name on creation', async () => { + sinon.stub(connector._kafkaConnect, 'createConnector') + .resolves(); + await connector.spawn(); + const partitionName = connector.config['offset.partition.name']; + connector._isRunning = false; + await connector.spawn(); + assert.notStrictEqual(partitionName, connector.config['offset.partition.name']); + }); it('Should not try spawning a new connector when on is already existent', async () => { const createStub = sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); @@ -239,4 +248,12 @@ describe('Connector', () => { assert.strictEqual(size, 15); }); }); + + describe('updatePartitionName', () => { + it('Should update partition name in config', () => { + connector._config['offset.partition.name'] = 'partition-name'; + connector.updatePartitionName(); + assert.notStrictEqual(connector._config['offset.partition.name'], 'partition-name'); + }); + }); });