Skip to content

Commit

Permalink
bf: crash backbeat components when unable to connect to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
giacomoguiulfo committed Jun 13, 2018
1 parent 96a0729 commit 3c2aa76
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
13 changes: 12 additions & 1 deletion extensions/lifecycle/lifecycleProducer/LifecycleProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class LifecycleProducer {
* @return {undefined}
*/
_setupConsumer() {
let consumerReady = false;
const consumer = new BackbeatConsumer({
zookeeper: {
connectionString: this._zkConfig.connectionString,
Expand All @@ -273,8 +274,17 @@ class LifecycleProducer {
autoCommit: true,
backlogMetrics: this._lcConfig.backlogMetrics,
});
consumer.on('error', () => {});
consumer.on('error', err => {
if (!consumerReady) {
this._log.fatal('unable to start lifecycle consumer', {
error: err,
method: 'LifecycleProducer._setupConsumer',
});
process.exit(1);
}
});
consumer.on('ready', () => {
consumerReady = true;
consumer.subscribe();
});
}
Expand Down Expand Up @@ -425,6 +435,7 @@ class LifecycleProducer {
error: err,
method: 'LifecycleProducer.start',
});
process.exit(1);
}
this._setupConsumer();
this._log.info('lifecycle producer successfully started');
Expand Down
12 changes: 11 additions & 1 deletion extensions/replication/failedCRR/FailedCRRConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class FailedCRRConsumer {
* @return {undefined}
*/
start(cb) {
let consumerReady = false;
const consumer = new BackbeatConsumer({
kafka: { hosts: this._kafkaConfig.hosts },
topic: this._topic,
Expand All @@ -41,8 +42,17 @@ class FailedCRRConsumer {
queueProcessor: this.processKafkaEntry.bind(this),
fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES,
});
consumer.on('error', () => {});
consumer.on('error', err => {
if (!consumerReady) {
this.logger.fatal('could not setup a backbeat consumer', {
method: 'FailedCRRConsumer.start',
error: err,
});
process.exit(1);
}
});
consumer.on('ready', () => {
consumerReady = true;
consumer.subscribe();
this.logger.info('retry consumer is ready to consume entries');
});
Expand Down
10 changes: 9 additions & 1 deletion extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class QueueProcessor extends EventEmitter {
*/
start(options) {
this._setupProducer(err => {
let consumerReady = false;
if (err) {
this.logger.info('error setting up kafka producer',
{ error: err.message });
Expand All @@ -249,8 +250,15 @@ class QueueProcessor extends EventEmitter {
concurrency: this.repConfig.queueProcessor.concurrency,
queueProcessor: this.processKafkaEntry.bind(this),
});
this._consumer.on('error', () => {});
this._consumer.on('error', () => {
if (!consumerReady) {
this.logger.fatal('queue processor failed to start a ' +
'backbeat consumer');
process.exit(1);
}
});
this._consumer.on('ready', () => {
consumerReady = true;
this._consumer.subscribe();
this.logger.info('queue processor is ready to consume ' +
'replication entries');
Expand Down
9 changes: 8 additions & 1 deletion lib/MetricsConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class MetricsConsumer {
}

start() {
let consumerReady = false;
const consumer = new BackbeatConsumer({
kafka: { hosts: this.kafkaConfig.hosts },
topic: this.mConfig.topic,
Expand All @@ -46,8 +47,14 @@ class MetricsConsumer {
queueProcessor: this.processKafkaEntry.bind(this),
fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES,
});
consumer.on('error', () => {});
consumer.on('error', () => {
if (!consumerReady) {
this.logger.fatal('error starting metrics consumer');
process.exit(1);
}
});
consumer.on('ready', () => {
consumerReady = true;
consumer.subscribe();
this.logger.info('metrics processor is ready to consume entries');
});
Expand Down

0 comments on commit 3c2aa76

Please sign in to comment.