Skip to content

Commit

Permalink
change connector partition each time it is created
Browse files Browse the repository at this point in the history
The partition name is used as an indicator in the offsets topic to
distinguish between resume tokens of the different connectors.

When a connector is respawned it should start processing event
from the current moment and not use the previous resume token as it
can be invalid if the connector stayed shut for a long time and the
mongo oplog rotated.

Issue: BB-463
  • Loading branch information
Kerkesni committed Oct 26, 2023
1 parent cb3f321 commit 8d95c25
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
11 changes: 11 additions & 0 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const joi = require('joi');
const uuid = require('uuid');
const { errors } = require('arsenal');
const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper');

Expand Down Expand Up @@ -109,6 +110,14 @@ class Connector {
}
}

/**
* Updates partition name in connector config
* @returns {undefined}
*/
updatePartitionName() {
this._config['offset.partition.name'] = `partition-${uuid.v4()}`;
}

/**
* Creates the Kafka-connect mongo connector
* @returns {Promise|undefined} undefined
Expand All @@ -122,6 +131,8 @@ class Connector {
});
return;
}
// reset resume token to avoid getting outdated token
this.updatePartitionName();
try {
await this._kafkaConnect.createConnector({
name: this._name,
Expand Down
17 changes: 17 additions & 0 deletions tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ describe('Connector', () => {
}));
assert.strictEqual(connector.isRunning, true);
});
it('Should change partition name on creation', async () => {
sinon.stub(connector._kafkaConnect, 'createConnector')
.resolves();
await connector.spawn();
const partitionName = connector.config['offset.partition.name'];
connector._isRunning = false;
await connector.spawn();
assert.notStrictEqual(partitionName, connector.config['offset.partition.name']);
});
it('Should not try spawning a new connector when on is already existent', async () => {
const createStub = sinon.stub(connector._kafkaConnect, 'createConnector')
.resolves();
Expand Down Expand Up @@ -239,4 +248,12 @@ describe('Connector', () => {
assert.strictEqual(size, 15);
});
});

describe('updatePartitionName', () => {
it('Should update partition name in config', () => {
connector._config['offset.partition.name'] = 'partition-name';
connector.updatePartitionName();
assert.notStrictEqual(connector._config['offset.partition.name'], 'partition-name');
});
});
});

0 comments on commit 8d95c25

Please sign in to comment.