Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer runner tries to start the fetch manager without delay #1384

Closed
adripc64 opened this issue Jun 2, 2022 · 13 comments · Fixed by #1402
Closed

Consumer runner tries to start the fetch manager without delay #1384

adripc64 opened this issue Jun 2, 2022 · 13 comments · Fixed by #1402
Labels

Comments

@adripc64
Copy link

adripc64 commented Jun 2, 2022

Describe the bug
When the FetchManager detects that a kafka node is lost, it tries to rebalance the fetchers. If there are no kafka nodes available, no fetchers are created. That causes the consumer runner to call FetchManager.start() in an infinite loop without any delay.
This blocks any other processing like responding to http requests. Also, the consumer is not disconnected properly nor can recover when kafka is available again.

To Reproduce

  1. Having a kafka with a single node
  2. Run a consumer that subscribes to a topic and consumes messages
  3. Stop kafka instance
  4. The consumer runner tries to rebalance fetchers. As there are no nodes, no fetchers are created. The consumer runner is calling FetcherManager.stat() continuosly without any delay, so blocking any other kind of processing in node.js

Expected behavior
The consumer should be disconnected properly and start retrying to connect it again.
As a hint, we have managed to solve this issue adding a simple delay of 1ms after the call to FetcherManager.stat(). Example:

async scheduleFetchManager() {
    this.consuming = true

    while (this.running) {
      try {
        await this.fetchManager.start()
        await new Promise(resolve => setTimeout(resolve, 1));
      } catch (e) {
      ...
      }
    }
    ...
}

Observed behavior

{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.213Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Created 0 fetchers","nodeIds":[],"concurrency":1}
{"level":"DEBUG","timestamp":"2022-06-02T09:57:05.214Z","logger":"kafkajs","message":"[FetchManager] Starting..."}

Environment:

  • OS: Mac OS 11.3.1
  • KafkaJS version 2.0.2
  • Kafka version 6.0.0
  • NodeJS version 17.1.0

Additional context
Add any other context about the problem here.

@tugtugtug
Copy link

@Nevon do we have an ETA for a fix for this one? This issue has a huge impact to our services as it causes the entire service to hang.
If you have a workaround at the meantime, please let us know too.

@Nevon
Copy link
Collaborator

Nevon commented Jun 10, 2022

do we have an ETA for a fix for this one?

No, that's not how free, open-source projects work. If this is very important to you, you are more than welcome to spend the resources to fix the issue. I'm in the middle of my vacation, and even once I'm back, no one other than me decides what I should spend my free time on.

@tugtugtug
Copy link

tugtugtug commented Jun 10, 2022

do we have an ETA for a fix for this one?

No, that's not how free, open-source projects work. If this is very important to you, you are more than welcome to spend the resources to fix the issue. I'm in the middle of my vacation, and even once I'm back, no one other than me decides what I should spend my free time on.

Sorry @Nevon if my question offended you and sorry to have interrupted your vacation. Maybe we have a different view on how open-source projects work. If I were a maintainer of a open-source project, I would certainly feel some responsibility for the quality of my product and if the issue has wide impact, I would probably put it as a priority (if I'm not on vacation, and have free time, of course).
I think your vent above certainly doesn't mean that you will just ignore this issue. Maybe you felt I was pushing you like I was a paying customer. I did try to put some urgency color to the issue, but that's typical response when the user of the product feels devastated trying to seek help to a deadly problem, hope you could understand.
I'm not sure I would be familiar enough to the project to contribute code in the same quality standard that I would hold for any of my code I release, thus I decided to create an issue with details of my analysis (#1387) to help whoever that may be able to fix this more professionally.

@adripc64
Copy link
Author

In the issue description I shared a 1-line fix that worked for me. I didn't opened a PR with such fix because I thought that someone with more knowledge about the project could implement a better solution. However, if you think that fix is acceptable I'll be pleased to open a PR.

@tugtugtug
Copy link

tugtugtug commented Jun 10, 2022

In our debugging session, we simply changed the isRunning to false to exit the loop, but I'm sure a better solution might be to even throw an error when the nodeIds return an empty array, or before the cluster becomes empty, stop the runloop.

@ixalon
Copy link

ixalon commented Jun 18, 2022

I've hit this problem too. I've applied a workaround using logging (horrid, hacky, but temporary) for the time being. The following approach prevents the tight loop (needs tweaked for whatever logging system you're using) without needing a patched copy of kafkajs.

new Kafka({
  ...
  logLevel: logLevel.DEBUG,
  logCreator: (level) => (entry) => {
    if (level == logLevel.DEBUG) {
      if (entry.log.message.startsWith('Created 0 fetchers')) {
        throw new KafkaJSConnectionError('No brokers available! Avoiding tight loop. See: https://github.com/tulios/kafkajs/issues/1384');
      }
    } else {
      console.log(entry.log.message);
    }
  }
});

@tugtugtug
Copy link

tugtugtug commented Jun 20, 2022

I've hit this problem too. I've applied a workaround using logging (horrid, hacky, but temporary) for the time being. The following approach prevents the tight loop (needs tweaked for whatever logging system you're using) without needing a patched copy of kafkajs.

new Kafka({
  ...
  logLevel: logLevel.DEBUG,
  logCreator: (level) => (entry) => {
    if (level == logLevel.DEBUG) {
      if (entry.log.message.startsWith('Created 0 fetchers')) {
        throw new KafkaJSConnectionError('No brokers available! Avoiding tight loop. See: https://github.com/tulios/kafkajs/issues/1384');
      }
    } else {
      console.log(entry.log.message);
    }
  }
});

Nice find, but brutal! For ppl doesn't want to patch, this is probably a workaround until an official fix.
For ppl already have or are okay with installing patch-package and apply a patch, I attached what I've been using, which fixes 2 problems.

  1. this infinite tight loop
  2. uncaughtExceptions due to await failures in catch

patches/kafkajs+2.0.2.patch

diff --git a/node_modules/kafkajs/src/consumer/fetchManager.js b/node_modules/kafkajs/src/consumer/fetchManager.js
index 7ec46ea..ebe28fb 100644
--- a/node_modules/kafkajs/src/consumer/fetchManager.js
+++ b/node_modules/kafkajs/src/consumer/fetchManager.js
@@ -2,7 +2,7 @@ const seq = require('../utils/seq')
 const createFetcher = require('./fetcher')
 const createWorker = require('./worker')
 const createWorkerQueue = require('./workerQueue')
-const { KafkaJSFetcherRebalanceError } = require('../errors')
+const { KafkaJSFetcherRebalanceError, KafkaJSBrokerNotFound } = require('../errors')
 
 /** @typedef {ReturnType<typeof createFetchManager>} FetchManager */
 
@@ -65,7 +65,9 @@ const createFetchManager = ({
 
     while (true) {
       fetchers = createFetchers()
-
+      if ((fetchers || []).length === 0) {
+        throw KafkaJSBrokerNotFound();
+      }
       try {
         await Promise.all(fetchers.map(fetcher => fetcher.start()))
       } catch (error) {
diff --git a/node_modules/kafkajs/src/consumer/runner.js b/node_modules/kafkajs/src/consumer/runner.js
index 686549c..632ac43 100644
--- a/node_modules/kafkajs/src/consumer/runner.js
+++ b/node_modules/kafkajs/src/consumer/runner.js
@@ -97,6 +97,7 @@ module.exports = class Runner extends EventEmitter {
       try {
         await this.fetchManager.start()
       } catch (e) {
+        let retry;
         if (isRebalancing(e)) {
           this.logger.warn('The group is rebalancing, re-joining', {
             groupId: this.consumerGroup.groupId,
@@ -109,11 +110,10 @@ module.exports = class Runner extends EventEmitter {
             memberId: this.consumerGroup.memberId,
           })
 
-          await this.consumerGroup.joinAndSync()
-          continue
+          retry = true;
         }
 
-        if (e.type === 'UNKNOWN_MEMBER_ID') {
+        if (!retry && e.type === 'UNKNOWN_MEMBER_ID') {
           this.logger.error('The coordinator is not aware of this member, re-joining the group', {
             groupId: this.consumerGroup.groupId,
             memberId: this.consumerGroup.memberId,
@@ -121,10 +121,16 @@ module.exports = class Runner extends EventEmitter {
           })
 
           this.consumerGroup.memberId = null
-          await this.consumerGroup.joinAndSync()
-          continue
+          retry = true;
+        }
+        if (retry) {
+          try {
+            await this.consumerGroup.joinAndSync();
+            continue;
+          } catch(err_in_retry) {
+            e = err_in_retry;
+          }
         }
-
         this.onCrash(e)
         break
       }

@chranmat
Copy link

I have today discovered that we are facing the same issue as you describe after several weeks of high CPU load from time to time. I have now tried to implement the workaround that @adripc64 describes by adding the following line;

await new Promise(resolve => setTimeout(resolve, 1));

When doing this a new issue occurs. Every x (10ish) second, reports the following messages, and re-connects:

{"level":"ERROR","timestamp":"2022-06-22T17:57:55.692Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"servicebus:9092","clientId":"6b128c7bf805","error":"The coordinator is not aware of this member","correlationId":97,"size":32}

{"level":"ERROR","timestamp":"2022-06-22T17:57:55.694Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"servicebus:9092","clientId":"6b128c7bf805","error":"The coordinator is not aware of this member","correlationId":99,"size":10}

{"level":"ERROR","timestamp":"2022-06-22T17:57:55.694Z","logger":"kafkajs","message":"[Runner] The coordinator is not aware of this member, re-joining the group","groupId":"orderservice","memberId":"6b128c7bf805-8f19dce5-b0cf-4c3c-b178-37642d7a1ca2","error":"The coordinator is not aware of this member"}

{"level":"INFO","timestamp":"2022-06-22T17:57:55.696Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"orderservice","memberId":"6b128c7bf805-3bcbbf76-ad8e-46aa-b33b-8d512fb2ad2f","leaderId":"6b128c7bf805-3bcbbf76-ad8e-46aa-b33b-8d512fb2ad2f","isLeader":true,"memberAssignment":{"ACL":[0],"announce":[0],"ORDER":[0]},"groupProtocol":"RoundRobinAssigner","duration":30010}

It seems like this additional 1ms wait causes the heartbeat to be "out of sync". @adripc64 or @Nevon, is there another workaround I possibly can implement? I will try to investigate more to possibly find one, but if you know any obvious, please shout out.

@chranmat
Copy link

Now also tried to apply the patch as described by @tugtugtug, and same issue occurs:

[Runner] The coordinator is not aware of this member, re-joining the group

@tugtugtug
Copy link

Now also tried to apply the patch as described by @tugtugtug, and same issue occurs:

[Runner] The coordinator is not aware of this member, re-joining the group

afaik, this isn't what we encountered, thus the fixes wouldn't apply in your case. You may want to create a separate issue for tracking.
In our case, the broker list became empty, and the fetchManager stays in an infinite loop, no prints happen when it loops, starving the entire node.

@chranmat
Copy link

chranmat commented Jun 23, 2022

the fetchManager stays in an infinite loop, no prints happen when it loops, starving the entire node.

@tugtugtug yes this is exactly the same for us, but with the patch applied something strange happens to provide this message. Exactly the same happened for the first workaround described @adripc64

Nevon added a commit that referenced this issue Jun 27, 2022
The fetch manager rebalancing mechanism caused an infinite loop when
there were no brokers available, causing the consumer to never become
aware of any connection issues.

Fixes #1384
@Nevon
Copy link
Collaborator

Nevon commented Jun 27, 2022

Opened up #1402 to fix this. The issue was that the rebalancing mechanism in the FetchManager would detect that the number of available nodes had changed, and trigger the recreation of the fetchers. Because there were no available nodes, no fetchers were created, and we entered the infinite tight loop. By bypassing the rebalancing mechanism in case there are no nodes available, the fetcher ends up attempting to fetch from the now unavailable node, and a KafkaJSConnectionError bubbles up as expected, causing the consumer to try to reconnect and eventually crash - same as in 1.16.0.

It's out in 2.1.0-beta.5, and will be included in 2.0.3 later this week.

@pravin-raha
Copy link

Still facing same problem. Anyone have same issue even with 2.1.0 version. Am I missing something. Do I need to set something in configuration. I tried to resolve the problem with #1428.

NotoriousBIT pushed a commit to cybusio/kafkajs that referenced this issue Sep 7, 2022
The fetch manager rebalancing mechanism caused an infinite loop when
there were no brokers available, causing the consumer to never become
aware of any connection issues.

Fixes tulios#1384
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants