Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer partitioners #260

Merged
merged 3 commits into from
Jan 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,18 @@ Closes the connection to Zookeeper and the brokers so that the node process can
## Producer
### Producer(client, [options])
* `client`: client which keeps a connection with the Kafka server.
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`
* `options`: options for producer,

```js
{
// Configuration for when to consider a message as acknowledged, default 1
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 0
partitionerType: 2
}
```

``` js
var kafka = require('kafka-node'),
Expand All @@ -49,9 +60,10 @@ var kafka = require('kafka-node'),
``` js
{
topic: 'topicName',
messages: ['message body'],// multi messages should be a array, single message can be just a string or a KeyedMessage instance
partition: 0, //default 0
attributes: 2, // default: 0
messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
key: 'theKey', // only needed when using keyed partitioner
partition: 0, // default 0
attributes: 2 // default: 0
}
```

Expand Down Expand Up @@ -112,7 +124,18 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
## HighLevelProducer
### HighLevelProducer(client, [options])
* `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`
* `options`: options for producer,

```js
{
// Configuration for when to consider a message as acknowledged, default 1
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 2
partitionerType: 3
}
```

``` js
var kafka = require('kafka-node'),
Expand All @@ -132,7 +155,8 @@ var kafka = require('kafka-node'),
``` js
{
topic: 'topicName',
messages: ['message body'],// multi messages should be a array, single message can be just a string
messages: ['message body'], // multi messages should be a array, single message can be just a string,
key: 'theKey', // only needed when using keyed partitioner
attributes: 1
}
```
Expand Down
4 changes: 4 additions & 0 deletions kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ exports.Producer = require('./lib/producer');
exports.Client = require('./lib/client');
exports.Offset = require('./lib/offset');
exports.KeyedMessage = require('./lib/protocol').KeyedMessage;
exports.DefaultPartitioner = require('./lib/partitioner').DefaultPartitioner;
exports.CyclicPartitioner = require('./lib/partitioner').CyclicPartitioner;
exports.RandomPartitioner = require('./lib/partitioner').RandomPartitioner;
exports.KeyedPartitioner = require('./lib/partitioner').KeyedPartitioner;
13 changes: 2 additions & 11 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,6 @@ Client.prototype.nextSocketId = function () {
return this._socketId++;
};

Client.prototype.nextPartition = (function cycle() {
var c = 0;
return function (topic) {
if (_.isEmpty(this.topicPartitions)) return 0;
if (_.isEmpty(this.topicPartitions[topic])) return 0;
return this.topicPartitions[topic][ c++ % this.topicPartitions[topic].length ];
}
})();

Client.prototype.refreshBrokers = function (brokerMetadata) {
var self = this;
this.brokerMetadata = brokerMetadata;
Expand Down Expand Up @@ -369,7 +360,7 @@ Client.prototype.refreshMetadata = function (topicNames, cb) {

Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this, _payloads = payloads;
// payloads: [ [metadata exists], [metadta not exists] ]
// payloads: [ [metadata exists], [metadata not exists] ]
payloads = this.checkMetadatas(payloads);
if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
Expand Down Expand Up @@ -430,7 +421,7 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {

Client.prototype.checkMetadatas = function (payloads) {
if (_.isEmpty(this.topicMetadata)) return [ [],payloads ];
// out: [ [metadata exists], [metadta not exists] ]
// out: [ [metadata exists], [metadata not exists] ]
var out = [ [], [] ];
payloads.forEach(function (p) {
if (this.hasMetadata(p.topic, p.partition)) out[0].push(p)
Expand Down
45 changes: 36 additions & 9 deletions lib/highLevelProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,24 @@ var util = require('util'),
Message = protocol.Message,
KeyedMessage = protocol.KeyedMessage,
ProduceRequest = protocol.ProduceRequest,
DEFAULTS = {
requireAcks: 1,
ackTimeoutMs: 100
};
partitioner = require('./partitioner'),
DefaultPartitioner = partitioner.DefaultPartitioner,
RandomPartitioner = partitioner.RandomPartitioner,
CyclicPartitioner = partitioner.CyclicPartitioner,
KeyedPartitioner = partitioner.KeyedPartitioner;

var PARTITIONER_TYPES = {
default: 0,
random: 1,
cyclic: 2,
keyed: 3
};

var DEFAULTS = {
requireAcks: 1,
ackTimeoutMs: 100,
partitionerType: PARTITIONER_TYPES.cyclic
};

/**
* Provides common functionality for a kafka producer
Expand Down Expand Up @@ -41,6 +55,16 @@ var HighLevelProducer = function (client, options) {
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

if (options.partitionerType === PARTITIONER_TYPES.default) {
this.partitioner = new DefaultPartitioner();
} else if (options.partitionerType === PARTITIONER_TYPES.random) {
this.partitioner = new RandomPartitioner();
} else if (options.partitionerType === PARTITIONER_TYPES.keyed) {
this.partitioner = new KeyedPartitioner();
} else {
this.partitioner = new CyclicPartitioner();
}

this.connect();
};

Expand All @@ -58,7 +82,7 @@ HighLevelProducer.prototype.connect = function () {
}
});
this.client.on('brokersChanged', function () {
this.topicMetadata = {}
this.topicMetadata = {};
});
this.client.on('error', function (err) {
self.emit('error', err);
Expand All @@ -77,14 +101,15 @@ HighLevelProducer.prototype.connect = function () {
* @param {HighLevelProducer~sendCallback} cb A function to call once the send has completed
*/
HighLevelProducer.prototype.send = function (payloads, cb) {
this.client.sendProduceRequest(this.buildPayloads(payloads), this.requireAcks, this.ackTimeoutMs, cb);
this.client.sendProduceRequest(this.buildPayloads(payloads, this.client.topicMetadata), this.requireAcks, this.ackTimeoutMs, cb);
};

HighLevelProducer.prototype.buildPayloads = function (payloads) {
HighLevelProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
var self = this;
return payloads.map(function (p) {
p.partition = p.hasOwnProperty('partition') ? p.partition : self.client.nextPartition(p.topic);
p.attributes = p.attributes || 0;
var topicPartitions = _.pluck(topicMetadata[p.topic], 'partition');
p.partition = p.hasOwnProperty('partition') ? p.partition : self.partitioner.getPartition(topicPartitions, p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
var messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
Expand Down Expand Up @@ -112,4 +137,6 @@ function noAcks() {
return 'Not require ACK';
}

HighLevelProducer.PARTITIONER_TYPES = PARTITIONER_TYPES;

module.exports = HighLevelProducer;
68 changes: 68 additions & 0 deletions lib/partitioner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
'use strict';

var util = require('util');
var _ = require('lodash');

var Partitioner = function () {
};

var DefaultPartitioner = function () {
};
util.inherits(DefaultPartitioner, Partitioner);

DefaultPartitioner.prototype.getPartition = function (partitions) {
if (partitions && _.isArray(partitions) && partitions.length > 0) {
return partitions[0];
} else {
return 0;
}
};

var CyclicPartitioner = function () {
};
util.inherits(CyclicPartitioner, Partitioner);

CyclicPartitioner.prototype.getPartition = (function cycle() {
var c = 0;
return function (partitions) {
if (_.isEmpty(partitions)) return 0;
return partitions[ c++ % partitions.length ];
};
})();

var RandomPartitioner = function () {
};
util.inherits(RandomPartitioner, Partitioner);

RandomPartitioner.prototype.getPartition = function (partitions) {
return partitions[Math.floor(Math.random() * partitions.length)];
};

var KeyedPartitioner = function () {
};
util.inherits(KeyedPartitioner, Partitioner);

// Taken from oid package (Dan Bornstein)
// Copyright The Obvious Corporation.
KeyedPartitioner.prototype.hashCode = function(string) {
var hash = 0;
var length = string.length;

for (var i = 0; i < length; i++) {
hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff;
}

return (hash === 0) ? 1 : hash;
};

KeyedPartitioner.prototype.getPartition = function (partitions, key) {
key = key || '';

var index = this.hashCode(key) % partitions.length;
return partitions[index];
};

module.exports.DefaultPartitioner = DefaultPartitioner;
module.exports.CyclicPartitioner = CyclicPartitioner;
module.exports.RandomPartitioner = RandomPartitioner;
module.exports.KeyedPartitioner = KeyedPartitioner;
48 changes: 38 additions & 10 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,29 @@
var util = require('util'),
events = require('events'),
_ = require('lodash'),
async = require('async'),
Client = require('./client'),
protocol = require('./protocol'),
Message = protocol.Message,
KeyedMessage = protocol.KeyedMessage,
ProduceRequest = protocol.ProduceRequest,
DEFAULTS = {
requireAcks: 1,
ackTimeoutMs: 100
};
partitioner = require('./partitioner'),
DefaultPartitioner = partitioner.DefaultPartitioner,
RandomPartitioner = partitioner.RandomPartitioner,
CyclicPartitioner = partitioner.CyclicPartitioner,
KeyedPartitioner = partitioner.KeyedPartitioner;

var PARTITIONER_TYPES = {
default: 0,
random: 1,
cyclic: 2,
keyed: 3
};

var DEFAULTS = {
requireAcks: 1,
ackTimeoutMs: 100,
partitionerType: PARTITIONER_TYPES.default
};

/**
* Provides common functionality for a kafka producer
Expand Down Expand Up @@ -42,6 +55,16 @@ var Producer = function (client, options) {
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

if (options.partitionerType === PARTITIONER_TYPES.random) {
this.partitioner = new RandomPartitioner();
} else if (options.partitionerType === PARTITIONER_TYPES.cyclic) {
this.partitioner = new CyclicPartitioner();
} else if (options.partitionerType === PARTITIONER_TYPES.keyed) {
this.partitioner = new KeyedPartitioner();
} else {
this.partitioner = new DefaultPartitioner();
}

this.connect();
};

Expand All @@ -59,7 +82,7 @@ Producer.prototype.connect = function () {
}
});
this.client.on('brokersChanged', function () {
this.topicMetadata = {}
this.topicMetadata = {};
});
this.client.on('error', function (err) {
self.emit('error', err);
Expand All @@ -82,13 +105,15 @@ Producer.prototype.send = function (payloads, cb) {
requireAcks = this.requireAcks,
ackTimeoutMs = this.ackTimeoutMs;

client.sendProduceRequest(this.buildPayloads(payloads), requireAcks, ackTimeoutMs, cb);
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};

Producer.prototype.buildPayloads = function (payloads) {
Producer.prototype.buildPayloads = function (payloads, topicMetadata) {
var self = this;
return payloads.map(function (p) {
p.partition = p.partition || 0;
p.attributes = p.attributes || 0;
var topicPartitions = _.pluck(topicMetadata[p.topic], 'partition');
p.partition = p.hasOwnProperty('partition') ? p.partition : self.partitioner.getPartition(topicPartitions, p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
var messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
Expand All @@ -115,4 +140,7 @@ Producer.prototype.close = function (cb) {
function noAcks() {
return 'Not require ACK';
}

Producer.PARTITIONER_TYPES = PARTITIONER_TYPES;

module.exports = Producer;
Loading