From 4c2ec050d8302d6ab6ab582e91c3b44b0a153e8e Mon Sep 17 00:00:00 2001 From: Brian Vuyk Date: Wed, 26 May 2021 16:58:51 -0400 Subject: [PATCH 1/3] Make the default partitioner topic-aware when in round-robin mode. --- src/producer/partitioners/default/partitioner.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/producer/partitioners/default/partitioner.js b/src/producer/partitioners/default/partitioner.js index 7de41c9b2..efd26a8ba 100644 --- a/src/producer/partitioners/default/partitioner.js +++ b/src/producer/partitioners/default/partitioner.js @@ -18,9 +18,13 @@ const toPositive = x => x & 0x7fffffff * - If no partition or key is present choose a partition in a round-robin fashion */ module.exports = murmur2 => () => { - let counter = randomBytes(32).readUInt32BE(0) + let counters = {} return ({ topic, partitionMetadata, message }) => { + + if (!(topic in counters)) { + counters[topic] = randomBytes(32).readUInt32BE(0); + } const numPartitions = partitionMetadata.length const availablePartitions = partitionMetadata.filter(p => p.leader >= 0) const numAvailablePartitions = availablePartitions.length @@ -34,11 +38,11 @@ module.exports = murmur2 => () => { } if (numAvailablePartitions > 0) { - const i = toPositive(++counter) % numAvailablePartitions + const i = toPositive(++counters[topic]) % numAvailablePartitions return availablePartitions[i].partitionId } // no partitions are available, give a non-available partition - return toPositive(++counter) % numPartitions + return toPositive(++counters[topic]) % numPartitions } } From df8d7a8bcff0bdba214d42d9ede965946b8cdafb Mon Sep 17 00:00:00 2001 From: Brian Vuyk Date: Wed, 26 May 2021 17:12:59 -0400 Subject: [PATCH 2/3] Fix linting errors. --- src/producer/partitioners/default/partitioner.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/producer/partitioners/default/partitioner.js b/src/producer/partitioners/default/partitioner.js index efd26a8ba..413d97e88 100644 --- a/src/producer/partitioners/default/partitioner.js +++ b/src/producer/partitioners/default/partitioner.js @@ -18,12 +18,11 @@ const toPositive = x => x & 0x7fffffff * - If no partition or key is present choose a partition in a round-robin fashion */ module.exports = murmur2 => () => { - let counters = {} + const counters = {} return ({ topic, partitionMetadata, message }) => { - if (!(topic in counters)) { - counters[topic] = randomBytes(32).readUInt32BE(0); + counters[topic] = randomBytes(32).readUInt32BE(0) } const numPartitions = partitionMetadata.length const availablePartitions = partitionMetadata.filter(p => p.leader >= 0) From b47f6632d4ca865e0fb4feef22c153552654aaa5 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 8 Feb 2022 16:13:27 +0100 Subject: [PATCH 3/3] Test round-robin partitioning across multiple topics --- .../partitioners/default/index.spec.js | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/producer/partitioners/default/index.spec.js b/src/producer/partitioners/default/index.spec.js index da33da786..ca796f5eb 100644 --- a/src/producer/partitioners/default/index.spec.js +++ b/src/producer/partitioners/default/index.spec.js @@ -51,6 +51,30 @@ describe('Producer > Partitioner > Default', () => { expect(partitionCount[2]).toEqual(10) }) + test('messages are partitioned in a round-robin fashion for each topic', () => { + const partitionMetadata = [ + { partitionId: 1, leader: 1 }, + { partitionId: 2, leader: 2 }, + ] + const topics = [ + { topic: 'topic-a', partitionMetadata, partitionCount: {} }, + { topic: 'topic-b', partitionMetadata, partitionCount: {} }, + ] + + for (let i = 0; i < 30; ++i) { + for (const { topic, partitionMetadata, partitionCount } of topics) { + const partition = partitioner({ topic, partitionMetadata, message: {} }) + const count = partitionCount[partition] || 0 + partitionCount[partition] = count + 1 + } + } + + expect(topics[0].partitionCount[1]).toEqual(15) + expect(topics[0].partitionCount[2]).toEqual(15) + expect(topics[1].partitionCount[1]).toEqual(15) + expect(topics[1].partitionCount[2]).toEqual(15) + }) + test('returns the configured partition if it exists', () => { const partition = partitioner({ topic,