Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change connector partition name on restart #2465

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const joi = require('joi');
const uuid = require('uuid');
const { errors } = require('arsenal');
const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper');

Expand Down Expand Up @@ -109,12 +110,29 @@ class Connector {
}
}

/**
* Updates partition name in connector config
* @returns {undefined}
*/
updatePartitionName() {
this._config['offset.partition.name'] = `partition-${uuid.v4()}`;
}

/**
* Creates the Kafka-connect mongo connector
* @returns {Promise|undefined} undefined
* @throws {InternalError}
*/
async spawn() {
if (this._isRunning) {
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
this._logger.error('tried spawning an already created connector', {
method: 'Connector.spawn',
connector: this._name,
});
return;
}
// reset resume token to avoid getting outdated token
this.updatePartitionName();
try {
await this._kafkaConnect.createConnector({
name: this._name,
Expand All @@ -137,6 +155,13 @@ class Connector {
* @throws {InternalError}
*/
async destroy() {
if (!this._isRunning) {
this._logger.error('tried destroying an already destroyed connector', {
method: 'Connector.destroy',
connector: this._name,
});
return;
}
try {
await this._kafkaConnect.deleteConnector(this._name);
this._isRunning = false;
Expand Down Expand Up @@ -262,7 +287,7 @@ class Connector {
}
this._config.pipeline = this._generateConnectorPipeline([...this._buckets]);
try {
if (doUpdate) {
if (doUpdate && this._isRunning) {
const timeBeforeUpdate = Date.now();
this._state.isUpdating = true;
await this._kafkaConnect.updateConnectorConfig(this._name, this._config);
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ describe('Connector', () => {
}));
assert.strictEqual(connector.isRunning, true);
});
it('Should change partition name on creation', async () => {
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
sinon.stub(connector._kafkaConnect, 'createConnector')
.resolves();
await connector.spawn();
const partitionName = connector.config['offset.partition.name'];
connector._isRunning = false;
await connector.spawn();
assert.notStrictEqual(partitionName, connector.config['offset.partition.name']);
});
it('Should not try spawning a new connector when on is already existent', async () => {
const createStub = sinon.stub(connector._kafkaConnect, 'createConnector')
.resolves();
connector._isRunning = true;
await connector.spawn();
assert(createStub.notCalled);
});
});

describe('destroy', () => {
Expand All @@ -61,6 +77,13 @@ describe('Connector', () => {
assert(deleteStub.calledOnceWith('example-connector'));
assert.strictEqual(connector.isRunning, false);
});
it('Should not try destroying a new connector when connector is already destroyed', async () => {
const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector')
.resolves();
connector._isRunning = false;
await connector.destroy();
assert(deleteStub.notCalled);
});
});

describe('addBucket', () => {
Expand Down Expand Up @@ -166,6 +189,7 @@ describe('Connector', () => {
it('Should update connector', async () => {
connector._state.bucketsGotModified = true;
connector._state.isUpdating = false;
connector._isRunning = true;
const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline')
.returns('example-pipeline');
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
Expand Down Expand Up @@ -201,6 +225,20 @@ describe('Connector', () => {
assert(pipelineStub.notCalled);
assert(updateStub.notCalled);
});

it('Should not update destroyed connector', async () => {
connector._state.bucketsGotModified = true;
connector._state.isUpdating = false;
connector._isRunning = false;
const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline')
.returns('example-pipeline');
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
.resolves();
const didUpdate = await connector.updatePipeline(true);
assert.strictEqual(didUpdate, false);
assert(pipelineStub.calledOnceWith([]));
assert(updateStub.notCalled);
});
});

describe('getConfigSizeInBytes', () => {
Expand All @@ -210,4 +248,12 @@ describe('Connector', () => {
assert.strictEqual(size, 15);
});
});

describe('updatePartitionName', () => {
it('Should update partition name in config', () => {
connector._config['offset.partition.name'] = 'partition-name';
connector.updatePartitionName();
assert.notStrictEqual(connector._config['offset.partition.name'], 'partition-name');
});
});
});
Loading