Skip to content

Commit

Permalink
Merge pull request #171 from kadishmal/fix.169
Browse files Browse the repository at this point in the history
Fix #169: When paused why try to fetch every 1000 ms?
  • Loading branch information
haio committed Mar 6, 2015
2 parents fd7d4df + 249e5be commit 78044da
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 45 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# kafka-node CHANGELOG

## Version NEXT (Unreleased)

- Fix #170: In case of `offsetOutOfRange`, the consumer should be paused.
- Fix #169: When paused why try to fetch every 1000 ms?
- Ref: remove unused variables.
78 changes: 38 additions & 40 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ var Client = function (connectionString, clientId, zkOptions) {
this.correlationId = 0;
this._socketId = 0;
this.cbqueue = {};
this.brokerMetadata = {}
this.brokerMetadata = {};
this.ready = false;
this.connect();
}
};

util.inherits(Client, events.EventEmitter);

Expand Down Expand Up @@ -81,14 +81,14 @@ Client.prototype.connect = function () {
zk.on('error', function (err) {
self.emit('error', err);
});
}
};

Client.prototype.close = function (cb) {
this.closeBrokers(this.brokers);
this.closeBrokers(this.longpollingBrokers);
this.zk.client.close();
cb && cb();
}
};

Client.prototype.closeBrokers = function (brokers) {
_.each(brokers, function (broker) {
Expand All @@ -98,16 +98,13 @@ Client.prototype.closeBrokers = function (brokers) {
}

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var topics = {},
count = payloads.length,
offsetError = false,
encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes),
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes),
decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
offsetError = true;
return consumer.emit('offsetOutOfRange', err);
}

return consumer.emit('error', err);
}

Expand All @@ -117,19 +114,20 @@ Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs
if (encoding !== 'buffer' && message.value) {
message.value = message.value.toString(encoding);
}

consumer.emit('message', message);
} else {
consumer.emit('done', message);
}
}, maxTickMessages);

this.send(payloads, encoder, decoder, function (err) {
if (err) {
Array.prototype.unshift.call(arguments, 'error')
consumer.emit.apply(consumer, arguments);
}
});
}
this.send(payloads, encoder, decoder, function (err) {
if (err) {
Array.prototype.unshift.call(arguments, 'error');
consumer.emit.apply(consumer, arguments);
}
});
};

Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs),
Expand All @@ -139,7 +137,7 @@ Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeou
async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
self.send(payloads, encoder, decoder, cb);
})
});

function buildRequest (payload, cb) {
var attributes = payload.attributes;
Expand All @@ -154,25 +152,25 @@ Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeou
cb();
});
}
}
};

Client.prototype.sendOffsetCommitRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetCommitRequest(group),
decoder = protocol.decodeOffsetCommitResponse;
this.send(payloads, encoder, decoder, cb);
}
};

Client.prototype.sendOffsetFetchRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchRequest(group),
decoder = protocol.decodeOffsetFetchResponse;
this.send(payloads, encoder, decoder, cb);
}
};

Client.prototype.sendOffsetRequest = function (payloads, cb) {
var encoder = protocol.encodeOffsetRequest,
decoder = protocol.decodeOffsetResponse;
this.send(payloads, encoder, decoder, cb);
}
};

/*
* Helper method
Expand Down Expand Up @@ -212,7 +210,7 @@ Client.prototype.loadMetadataForTopics = function (topics, cb) {

this.queueCallback(broker, correlationId, [protocol.decodeMetadataResponse, cb]);
broker && broker.write(request);
}
};

Client.prototype.createTopics = function (topics, isAsync, cb) {
topics = typeof topics === 'string' ? [topics] : topics;
Expand Down Expand Up @@ -258,7 +256,7 @@ Client.prototype.createTopics = function (topics, isAsync, cb) {
debug('craete topic by sending metadata request');
attemptCreateTopics(topicsNotExists, cb);
});
}
};

/**
* Checks to see if a given array of topics exists
Expand All @@ -284,7 +282,7 @@ Client.prototype.topicExists = function (topics, cb) {
cb();
});
}
}
};

Client.prototype.addTopics = function (topics, cb) {
var self = this;
Expand All @@ -296,15 +294,15 @@ Client.prototype.addTopics = function (topics, cb) {
cb(null, topics);
});
});
}
};

Client.prototype.nextId = function () {
return this.correlationId++;
}
};

Client.prototype.nextSocketId = function () {
return this._socketId++;
}
};

Client.prototype.nextPartition = (function cycle() {
var c = 0;
Expand All @@ -328,7 +326,7 @@ Client.prototype.refreshBrokers = function (brokerMetadata) {
delete brokers[deadKey];
}.bind(this));
}
}
};

Client.prototype.refreshMetadata = function (topicNames, retry, cb) {
var self = this;
Expand All @@ -353,7 +351,7 @@ Client.prototype.refreshMetadata = function (topicNames, retry, cb) {
}
}
});
}
};

Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this, _payloads = payloads;
Expand Down Expand Up @@ -385,7 +383,7 @@ Client.prototype.send = function (payloads, encoder, decoder, cb) {
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb);
});
}
}
};

Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
var longpolling = encoder.name === 'encodeFetchRequest';
Expand All @@ -408,7 +406,7 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
this.queueCallback(broker, correlationId, [decoder, cb]);
broker && broker.write(request);
}
}
};

Client.prototype.checkMetadatas = function (payloads) {
if (_.isEmpty(this.topicMetadata)) return [ [],payloads ];
Expand All @@ -419,15 +417,15 @@ Client.prototype.checkMetadatas = function (payloads) {
else out[1].push(p)
}.bind(this));
return out;
}
};

Client.prototype.hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata,
topicMetadata = this.topicMetadata;
var leader = this.leaderByPartition(topic, partition);

return (leader !== undefined) && brokerMetadata[leader];
}
};

Client.prototype.updateMetadatas = function (metadatas) {
// _.extend(this.brokerMetadata, metadatas[0]);
Expand All @@ -437,14 +435,14 @@ Client.prototype.updateMetadatas = function (metadatas) {
return parseInt(val, 10);
});
}
}
};

Client.prototype.removeTopicMetadata = function (topics, cb) {
topics.forEach(function (t) {
if (this.topicMetadata[t]) delete this.topicMetadata[t];
}.bind(this));
cb(null, topics.length);
}
};

Client.prototype.payloadsByLeader = function (payloads) {
return payloads.reduce(function (out, p) {
Expand All @@ -453,12 +451,12 @@ Client.prototype.payloadsByLeader = function (payloads) {
out[leader].push(p);
return out;
}.bind(this), {});
}
};

Client.prototype.leaderByPartition = function (topic, partition) {
var topicMetadata = this.topicMetadata;
return topicMetadata[topic] && topicMetadata[topic][partition] && topicMetadata[topic][partition]['leader'];
}
};

Client.prototype.brokerForLeader = function (leader, longpolling) {
var brokers = longpolling ? this.longpollingBrokers : this.brokers;
Expand All @@ -478,7 +476,7 @@ Client.prototype.brokerForLeader = function (leader, longpolling) {
addr = metadata.host + ':' + metadata.port;
brokers[addr] = brokers[addr] || this.createBroker(metadata.host, metadata.port, longpolling);
return brokers[addr];
}
};

Client.prototype.createBroker = function connect(host, port, longpolling) {
var self = this;
Expand Down Expand Up @@ -526,7 +524,7 @@ Client.prototype.createBroker = function connect(host, port, longpolling) {
}, 1000);
}
return socket;
}
};

Client.prototype.handleReceivedData = function (socket) {
var vars = Binary.parse(socket.buffer).word32bu('size').word32bu('correlationId').vars,
Expand All @@ -546,7 +544,7 @@ Client.prototype.handleReceivedData = function (socket) {

if (socket.buffer.length)
setImmediate(function () { this.handleReceivedData(socket);}.bind(this));
}
};

Client.prototype.queueCallback = function (broker, id, data) {
var brokerId = broker.socketId;
Expand Down
7 changes: 2 additions & 5 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ HighLevelConsumer.prototype.connect = function () {
});

this.on('offsetOutOfRange', function (topic) {
self.pause();
topic.maxNum = self.options.maxNumSegments;
topic.metadata = 'm';
topic.time = Date.now();
Expand All @@ -229,6 +230,7 @@ HighLevelConsumer.prototype.connect = function () {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
// set minimal offset
self.setOffset(topic.topic, topic.partition, min);
self.resume();
}
});
});
Expand All @@ -242,11 +244,6 @@ HighLevelConsumer.prototype.connect = function () {
self.fetch();
});
}
else {
setTimeout(function () {
self.fetch();
}, 1000);
}
});
}

Expand Down

0 comments on commit 78044da

Please sign in to comment.