Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer (v1.4.2) gets stuck (NOT_COORDINATOR) whilst rejoining a group after a broker rolling update #2944

Closed
5 of 7 tasks
dimpavloff opened this issue Jun 27, 2020 · 19 comments · Fixed by #3238
Closed
5 of 7 tasks

Comments

@dimpavloff
Copy link

dimpavloff commented Jun 27, 2020

Hi,

First off, my company is a long-time user of rdkafka, thanks for all the work!

tl;dr We've had consumers stall indefinitely after rolling update of Kafka brokers with "not coordinator" errors returned to JoinGroup requests. The issue is present in 1.4.0 and 1.4.2 but not in 1.2.0 and happens fairly often after a rollout. This seems similar to #2791 but that should have been fixed in 1.4.2 according to that thread. Further, we've tried not subscribing to rebalance events without any difference. The issue may be related to autocommits. This looks similar to #2933, too; however, there the reporter says the issue does not exist in 1.4.0 whilst for us it does, as far as we can tell, and also there's no SESSTMOUT reported in their logs.

Description

Background:

We use the Go and Python bindings for rdkafka; we build these ourselves, including rdkafka which we compile against musl . The Python and Go configuration we have is almost identical with the main exception that in Go we use enable.auto.commit and make calls to StoreOffsets whilst in Python we call consumer.commit() (in either case, the goal is at-least-once delivery to the business code).
I mention this because this issue seems to have never been detected in our Python consumers whilst it frequently happens to the Go ones (we do have more Go consumers, however). In Go, we periodically use Poll(), just like in Python.

We perform rolling updates of our 3 Kafka brokers fairly regularly. The whole rollout takes about 12-13 minutes -- ~30s to shutdown, ~30s to attach the persistent volume to the new instance (a Kubernetes pod), 180s grace period before moving on to the next one. (This process could be improved to be signal-based and to be mindful of the current controller :) ).

The Issue:

With rdkafka 1.2, we can see the brokers handing over the coordinator role and the clients rejoining the consumer groups -- sometimes there's temporarily NOT_COORDINATOR responses which can be expected during the rollout. Here is an example of the broker JoinGroup responses during a rolling update:

rdkafka-1 2-normal

Here's the same rollout but on an environment with rdkafka 1.4 clients:
rdkafka-1 4 2-issue

Note the pre-existing "NOT COORDINATOR" errors already present and how these persist long after the 3 JoinGroup successful responses peaks (matching the 3 brokers getting replaced). These errors persist until the consumers which are stuck get restarted. The way we identify the stuck consumers is by having a goroutine reporting the length of consumer.Assignment() and if that is 0 for a prolonged period of time (this is adjusted for other factors that could lead to 0 assignment, ofc). In the logs of these affected consumers, there's always a

%4\|1593263797.864\|SESSTMOUT\|rdkafka#consumer-1\| [thrd:main]: Consumer group session timed out (in join-state started) after 10488 ms without a successful response from the group coordinator (broker 0, last error was Broker: Not coordinator): revoking assignment and rejoining group or a variation of this with last error was Success

The issue (which I assume starts with SESSTMOUT ) seems to happen a bit after the last broker has been replaced. It seems that the NOT_COORDINATOR is always / most of the time returned by the first broker that has been replaced during the rollout. To me, it seems that the consumer still thinks the coordinator is that broker , missing the fact that the last broker to be replaced has successfully rejoined the cluster and became the group coordinator in the meantime. However, this may just be heavily correlated due the constant speed at which the rollout happens and how that interacts with some consumer timeout parameters.

Of course, SESSTMOUT alone isn't the issue as long as rdkafka recovers from it. Our rdkafka 1.4 Python consumers do recover from it, it seems, which is why we've been suspicious of: our Go code, the confluent-kafka-go, and rdkafka. We've ruled out the first two and confirmed it's the latter one by reverting to 1.2 where the issue disappeared.

How to reproduce

Have an idle consumer (single consumer in a CG is enough), then trigger a rolling update.
What we have noticed is that the issue does not occur if this consumer does not have any offsets stored (i.e all offsets are -1001) ; otherwise, the consumer can get stuck after a broker rolling update. This is why I am suspricious of autocommit with StoreOffsets versus explicitly calling commit()

We've ruled out the following:

  • given we weren't seeing the issue in Python, we considered the confluent-kafka-go wrapper. However, rolling it back to 1.1 (whilst still building it against rdkafka 1.4) did not make a difference
  • in the past we had observed some surprising behaviour with rebalance events so we tried disabling go.application.rebalance.enable and either a pass a nil callback or a rebalanceCb function when calling Go's consumer.Subscribe(). In all cases, this did not make a difference
  • slowing down the broker rollout (from 180s to 240s pause) made no difference.
  • setting topic.metadata.refresh.interval.ms to something like 150s did not make a difference (as expected, as that shouldn't be necessary).
  • tested with Kafka 2.1 brokers -- no change, (although we stopped seeing but no longer saw "MEMBER_ID_REQUIRED" and "UNKNOWN_MEMBER_ID" errors returned -- I assume the protocol is now such that if we don't use Static group membership, the brokers respond with this message, then rdkafka generates an ID and succeeds).

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): 1.4.0, 1.4.2
  • Apache Kafka version: 2.4.1 (with log.message.format.version=2.1)
  • librdkafka client configuration:

Python (no issue observed):

'default.topic.config': {
    'auto.offset.reset': 'earliest'
},
'metadata.request.timeout.ms': 20000,
'enable.auto.commit': False,
'api.version.request': True,
'fetch.wait.max.ms': 100,
'log.connection.close': False,
'max.partition.fetch.bytes': MEBIBYTE * 5,
'statistics.interval.ms': 15000,
'queued.max.messages.kbytes': 1024 * 64,

Go (issue observed):

"auto.offset.reset":           "earliest",
"enable.auto.commit":          true,
"enable.auto.offset.store":    false,
"metadata.request.timeout.ms": 20000,
"auto.commit.interval.ms":     2000,
"log.connection.close":        false,
"max.poll.interval.ms":        300000,
"max.partition.fetch.bytes":   int(units.Mebibyte * 5),
"queued.max.messages.kbytes": 1024 * 64,
"go.application.rebalance.enable": true,

  • Operating system: alpine linux container, kernel: 4.14.165
  • Provide logs (with debug=.. as necessary) from librdkafka:
    Using debug=consumer,cgrp,topic,metadata for a consumer that has StoredOffsets, starting right before a rolling update starts.
    Below is an excerpt , here's a full gist (3.5k lines) https://gist.github.com/dimpavloff/f7c8ee9975eb96b1bfe89ec385d78178
%7|1592857801.772|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Heartbeat for group "infrastructure.postmerge_tests.kafka.testconsumer" generation id 11
%7|1592857802.697|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op GET_ASSIGNMENT (v0) in state up (join state started, v4 vs 0)
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [0]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [1]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [2]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [3]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [4]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [5]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [6]: stored offset 6, committed offset 6: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [7]: stored offset -1001, committed offset -1001: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [8]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [9]: stored offset 3, committed offset 3: not including in commit
%7|1592857802.734|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 10 partition(s): cgrp auto commit timer: returned: Local: No offset stored

...

%7|1592858026.776|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Heartbeat for group "infrastructure.postmerge_tests.kafka.testconsumer" generation id 11
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]: 10.137.8.12:9094/0: ===== Received metadata (for 1 requested topics): broker down =====
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]: 10.137.8.12:9094/0: ClusterId: 9GR7XMKHSVOWc2aa3DQDUQ, ControllerId: 0
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]: 10.137.8.12:9094/0: 3 brokers, 1 topics
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]: 10.137.8.12:9094/0:   Broker #0/3: 10.137.8.12:9094 NodeId 0
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]: 10.137.8.12:9094/0:   Broker #1/3: 10.137.70.154:9094 NodeId 2
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]: 10.137.8.12:9094/0:   Broker #2/3: 10.137.150.246:9094 NodeId 1
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]: 10.137.8.12:9094/0:   Topic #0/1: infrastructure.postmerge_tests.kafka.testconsumer with 10 partitions
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic infrastructure.postmerge_tests.kafka.testconsumer partition 0 Leader 2
%7|1592858026.815|TOPICUPD|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [0]: migrating from broker 1 to 2 (leader is 2): leader updated
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0]: delegate to broker 10.137.70.154:9094/2 (rktp 0x261ee60, term 0, ref 5)
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0]: no longer delegated to broker 10.137.150.246:9094/1
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0]: delegating to broker 10.137.70.154:9094/2 for partition with 0 messages (0 bytes) queued
%7|1592858026.815|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic infrastructure.postmerge_tests.kafka.testconsumer [0] 0x261ee60 from 10.137.150.246:9094/1 to 10.137.70.154:9094/2 (sending PARTITION_LEAVE to 10.137.150.246:9094/1)
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic infrastructure.postmerge_tests.kafka.testconsumer partition 1 Leader 0
%7|1592858026.815|TOPICUPD|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [1]: migrating from broker 1 to 0 (leader is 0): leader updated
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1]: delegate to broker 10.137.8.12:9094/0 (rktp 0x261f260, term 0, ref 5)
%7|1592858026.815|FETCHADD|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.150.246:9094/1: Removed infrastructure.postmerge_tests.kafka.testconsumer [0] from fetch list (4 entries, opv 2): forced removal
%7|1592858026.815|TOPBRK|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.150.246:9094/1: Topic infrastructure.postmerge_tests.kafka.testconsumer [0]: leaving broker (0 messages in xmitq, next broker 10.137.70.154:9094/2, rktp 0x261ee60)
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1]: no longer delegated to broker 10.137.150.246:9094/1
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1]: delegating to broker 10.137.8.12:9094/0 for partition with 0 messages (0 bytes) queued
%7|1592858026.815|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic infrastructure.postmerge_tests.kafka.testconsumer [1] 0x261f260 from 10.137.150.246:9094/1 to 10.137.8.12:9094/0 (sending PARTITION_LEAVE to 10.137.150.246:9094/1)
%7|1592858026.815|TOPBRK|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Topic infrastructure.postmerge_tests.kafka.testconsumer [0]: joining broker (rktp 0x261ee60, 0 message(s) queued)
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic infrastructure.postmerge_tests.kafka.testconsumer partition 2 Leader 0
%7|1592858026.815|FETCHADD|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Added infrastructure.postmerge_tests.kafka.testconsumer [0] to fetch list (1 entries, opv 2, 0 messages queued): fetchable
%7|1592858026.815|FETCHADD|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.150.246:9094/1: Removed infrastructure.postmerge_tests.kafka.testconsumer [1] from fetch list (3 entries, opv 2): forced removal
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic infrastructure.postmerge_tests.kafka.testconsumer partition 3 Leader 2
%7|1592858026.815|TOPBRK|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.150.246:9094/1: Topic infrastructure.postmerge_tests.kafka.testconsumer [1]: leaving broker (0 messages in xmitq, next broker 10.137.8.12:9094/0, rktp 0x261f260)
%7|1592858026.815|TOPICUPD|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [3]: migrating from broker 0 to 2 (leader is 2): leader updated
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3]: delegate to broker 10.137.70.154:9094/2 (rktp 0x2622400, term 0, ref 6)
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3]: no longer delegated to broker 10.137.8.12:9094/0
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3]: delegating to broker 10.137.70.154:9094/2 for partition with 0 messages (0 bytes) queued
%7|1592858026.815|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic infrastructure.postmerge_tests.kafka.testconsumer [3] 0x2622400 from 10.137.8.12:9094/0 to 10.137.70.154:9094/2 (sending PARTITION_LEAVE to 10.137.8.12:9094/0)
%7|1592858026.815|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic infrastructure.postmerge_tests.kafka.testconsumer partition 4 Leader 2
%7|1592858026.815|TOPICUPD|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [4]: migrating from broker 1 to 2 (leader is 2): leader updated
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4]: delegate to broker 10.137.70.154:9094/2 (rktp 0x2622800, term 0, ref 5)
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4]: no longer delegated to broker 10.137.150.246:9094/1
%7|1592858026.815|BRKDELGT|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4]: delegating to broker 10.137.70.154:9094/2 for partition with 0 messages (0 bytes) queued
%7|1592858026.815|TOPBRK|rdkafka#consumer-1| [thrd:10.137.13.66:9094/0]: 10.137.8.12:9094/0: Topic infrastructure.postmerge_tests.kafka.testconsumer [1]: joining broker (rktp 0x261f260, 0 message(s) queued)
%7|1592858026.815|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic infrastructure.postmerge_tests.kafka.testconsumer [4] 0x2622800 from 10.137.150.246:9094/1 to 10.137.70.154:9094/2 (sending PARTITION_LEAVE to 10.137.150.246:9094/1)
%7|1592858026.815|FETCHADD|rdkafka#consumer-1| [thrd:10.137.13.66:9094/0]: 10.137.8.12:9094/0: Removed infrastructure.postmerge_tests.kafka.testconsumer [3] from fetch list (4 entries, opv 2): forced removal

...
%7|1592858374.785|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Heartbeat for group "infrastructure.postmerge_tests.kafka.testconsumer" generation id 11
%7|1592858374.790|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" heartbeat error response in state up (join state started, 10 partition(s) assigned): Broker: Not coordinator
%7|1592858374.790|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Heartbeat failed due to coordinator (10.137.12.3:9094/0) no longer available: Broker: Not coordinator: re-querying for coordinator
%7|1592858374.790|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator
%7|1592858374.813|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.137.12.3:9094 id 0
%7|1592858377.697|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op GET_ASSIGNMENT (v0) in state up (join state started, v4 vs 0)
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [0]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [1]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [2]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [3]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [4]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [5]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [6]: stored offset 6, committed offset 6: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [7]: stored offset -1001, committed offset -1001: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [8]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [9]: stored offset 3, committed offset 3: not including in commit
%7|1592858377.747|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 10 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1592858377.785|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Heartbeat for group "infrastructure.postmerge_tests.kafka.testconsumer" generation id 11
%7|1592858377.786|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" heartbeat error response in state up (join state started, 10 partition(s) assigned): Broker: Not coordinator
%7|1592858377.786|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Heartbeat failed due to coordinator (10.137.12.3:9094/0) no longer available: Broker: Not coordinator: re-querying for coordinator
%7|1592858377.786|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator
%7|1592858377.854|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.137.12.3:9094 id 0
%4|1592858379.285|SESSTMOUT|rdkafka#consumer-1| [thrd:main]: Consumer group session timed out (in join-state started) after 10499 ms without a successful response from the group coordinator (broker 0, last error was Broker: Not coordinator): revoking assignment and rejoining group
%7|1592858379.286|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": updating member id "rdkafka-630bf2ad-18fb-4f69-bac7-37dd8ea27809" -> ""
%7|1592858379.286|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" is rebalancing in state up (join-state started) with assignment: Consumer group session timed out (in join-state started) after 10499 ms without a successful response from the group coordinator (broker 0, last error was Broker: Not coordinator)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Library pausing 10 partition(s)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [0] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [1] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [2]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [2] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [3] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [4] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [5]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [5] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [6]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [6] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [7]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [7] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [8]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [8] (v3)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [9]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v3
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [9] (v3)
%7|1592858379.286|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delegating revoke of 10 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: Consumer group session timed out (in join-state started) after 10499 ms without a successful response from the group coordinator (broker 0, last error was Broker: Not coordinator)
%7|1592858379.286|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state started -> wait-revoke-rebalance_cb (v4, state up)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [0]: at offset 3 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [1]: at offset 3 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [2] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [2]: at offset 3 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [3]: at offset 3 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [4]: at offset 3 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [5] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [5]: at offset 3 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [6] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [6]: at offset 6 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [7] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [7]: at offset 0 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [8] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [8]: at offset 3 (state active, v3)
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [9] received op PAUSE (v3) in fetch-state active (opv2)
%7|1592858379.286|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause infrastructure.postmerge_tests.kafka.testconsumer [9]: at offset 3 (state active, v3)
%7|1592858379.286|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op ASSIGN (v0) in state up (join state wait-revoke-rebalance_cb, v4 vs 0)
%7|1592858379.286|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": new assignment of 0 partition(s) in join state wait-revoke-rebalance_cb
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": rd_kafka_cgrp_assign:2572: new version barrier v5
%7|1592858379.286|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state wait-revoke-rebalance_cb -> wait-unassign (v5, state up)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": rd_kafka_cgrp_unassign:2476: new version barrier v6
%7|1592858379.286|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": unassigning 10 partition(s) (v6)
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [0]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [1]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [2]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [3]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [4]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [5]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [6]: stored offset 6, committed offset 6: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [7]: stored offset -1001, committed offset -1001: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [8]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic infrastructure.postmerge_tests.kafka.testconsumer [9]: stored offset 3, committed offset 3: not including in commit
%7|1592858379.286|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 10 partition(s): unassign: returned: Local: No offset stored
%7|1592858379.286|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (0 wait_unassign, 10 assigned, 0 wait commit, join state wait-unassign): OffsetCommit done (__NO_OFFSET)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [0] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [0]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [1] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [1]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [2]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [2] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [2]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [3] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [3]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [4] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [4]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [5]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [5] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [5]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [6]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [6] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [6]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [7]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [7] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [7]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [8]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [8] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [8]
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [9]: rd_kafka_toppar_op_fetch_stop:2334: new version barrier v4
%7|1592858379.286|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming infrastructure.postmerge_tests.kafka.testconsumer [9] (v4)
%7|1592858379.286|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic infrastructure.postmerge_tests.kafka.testconsumer [9]
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Library resuming 10 partition(s)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [0] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [1] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [2]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [2] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [3] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [4] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [5]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [5] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [6]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [6] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [7]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [7] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [8]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [8] (v5)
%7|1592858379.286|BARRIER|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [9]: rd_kafka_toppar_op_pause_resume:2393: new version barrier v5
%7|1592858379.286|RESUME|rdkafka#consumer-1| [thrd:main]: Resume infrastructure.postmerge_tests.kafka.testconsumer [9] (v5)
%7|1592858379.286|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (10 wait_unassign, 10 assigned, 0 wait commit, join state wait-unassign): unassign
%7|1592858379.286|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": assigning 0 partition(s) in join state wait-unassign
%7|1592858379.286|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.286|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [0] in state active (v4)
%7|1592858379.286|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [0] changed fetch state active -> stopping
%7|1592858379.286|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [0] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [1] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [1] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [1] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [2] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [2] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [2] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [2]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [2] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [3] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [3] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [3] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [4] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [4] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [4] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [5] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [5] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [5] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [5]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [5] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [6] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [6] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [6] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [6]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [6] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [7] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [7] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [7] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [7]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [7] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [8] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [8] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [8] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [8]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [8] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [9] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1592858379.287|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for infrastructure.postmerge_tests.kafka.testconsumer [9] in state active (v4)
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [9] changed fetch state active -> stopping
%7|1592858379.287|STORETERM|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [9]: offset store terminating
%7|1592858379.287|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition infrastructure.postmerge_tests.kafka.testconsumer [9] changed fetch state stopping -> stopped
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [0] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [0]: at offset 3 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [1] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [1]: at offset 3 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [2] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [2]: at offset 3 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [3] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [3]: at offset 3 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [4] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [4]: at offset 3 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [5] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [5]: at offset 3 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [6] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [6]: at offset 6 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [7] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [7]: at offset 0 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [8] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [8]: at offset 3 (state stopped, v5)
%7|1592858379.287|OP|rdkafka#consumer-1| [thrd:main]: infrastructure.postmerge_tests.kafka.testconsumer [9] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1592858379.287|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped infrastructure.postmerge_tests.kafka.testconsumer [9]: at offset 3 (state stopped, v5)
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [0]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [0]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [0]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (9 wait_unassign, 9 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [1]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [1]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [1]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (8 wait_unassign, 8 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [2]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [2]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [2]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (7 wait_unassign, 7 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [3]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [3]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [3]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (6 wait_unassign, 6 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [4]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [4]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [4]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (5 wait_unassign, 5 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [5]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [5]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [5]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (4 wait_unassign, 4 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [6]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [6]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [6]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (3 wait_unassign, 3 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [7]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [7]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [7]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (2 wait_unassign, 2 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [8]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [8]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [8]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [9]
%7|1592858379.287|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": delete infrastructure.postmerge_tests.kafka.testconsumer [9]
%7|1592858379.287|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v6) for infrastructure.postmerge_tests.kafka.testconsumer [9]
%7|1592858379.287|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": unassign done in state up (join state wait-unassign): without new assignment: FETCH_STOP done
%7|1592858379.287|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state wait-unassign -> init (v6, state up)
%7|1592858379.287|JOIN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": join with 1 (1) subscribed topic(s)
%7|1592858379.287|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (90024ms old)
%7|1592858379.287|JOIN|rdkafka#consumer-1| [thrd:main]: 10.137.12.3:9094/0: Joining group "infrastructure.postmerge_tests.kafka.testconsumer" with 1 subscribed topic(s)
%7|1592858379.287|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state init -> wait-join (v6, state up)
%7|1592858379.288|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId  (me), my MemberId , 0 members in group: Broker: Not coordinator
%7|1592858379.288|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state wait-join -> init (v6, state up)
%7|1592858379.288|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state init, v6 vs 0)
%7|1592858379.288|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.137.70.154:9094/2: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator
%7|1592858379.288|FETCHADD|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Removed infrastructure.postmerge_tests.kafka.testconsumer [0] from fetch list (5 entries, opv 2): not in active fetch state
%7|1592858379.288|FETCHADD|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Removed infrastructure.postmerge_tests.kafka.testconsumer [3] from fetch list (4 entries, opv 2): not in active fetch state
%7|1592858379.288|FETCHADD|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Removed infrastructure.postmerge_tests.kafka.testconsumer [6] from fetch list (3 entries, opv 2): not in active fetch state
%7|1592858379.288|FETCHADD|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Removed infrastructure.postmerge_tests.kafka.testconsumer [9] from fetch list (2 entries, opv 2): not in active fetch state
%7|1592858379.288|FETCHADD|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Removed infrastructure.postmerge_tests.kafka.testconsumer [2] from fetch list (1 entries, opv 2): not in active fetch state
%7|1592858379.288|FETCHADD|rdkafka#consumer-1| [thrd:10.137.91.181:9094/2]: 10.137.70.154:9094/2: Removed infrastructure.postmerge_tests.kafka.testconsumer [8] from fetch list (0 entries, opv 2): not in active fetch state
%7|1592858379.291|FETCHADD|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.132.178:9094/1: Removed infrastructure.postmerge_tests.kafka.testconsumer [1] from fetch list (3 entries, opv 2): not in active fetch state
%7|1592858379.291|FETCHADD|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.132.178:9094/1: Removed infrastructure.postmerge_tests.kafka.testconsumer [4] from fetch list (2 entries, opv 2): not in active fetch state
%7|1592858379.291|FETCHADD|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.132.178:9094/1: Removed infrastructure.postmerge_tests.kafka.testconsumer [7] from fetch list (1 entries, opv 2): not in active fetch state
%7|1592858379.291|FETCHADD|rdkafka#consumer-1| [thrd:10.137.148.17:9094/1]: 10.137.132.178:9094/1: Removed infrastructure.postmerge_tests.kafka.testconsumer [5] from fetch list (0 entries, opv 2): not in active fetch state
%7|1592858379.323|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.137.70.154:9094/2: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.137.12.3:9094 id 0
%7|1592858381.753|JOIN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": join with 1 (1) subscribed topic(s)
%7|1592858381.753|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (92490ms old)
%7|1592858381.753|JOIN|rdkafka#consumer-1| [thrd:main]: 10.137.12.3:9094/0: Joining group "infrastructure.postmerge_tests.kafka.testconsumer" with 1 subscribed topic(s)
%7|1592858381.753|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state init -> wait-join (v6, state up)
%7|1592858381.754|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId  (me), my MemberId , 0 members in group: Broker: Not coordinator
%7|1592858381.754|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state wait-join -> init (v6, state up)
%7|1592858381.754|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state init, v6 vs 0)
%7|1592858381.754|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator
%7|1592858381.755|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.137.12.3:9094 id 0
%7|1592858382.697|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op GET_ASSIGNMENT (v0) in state up (join state init, v6 vs 0)
%7|1592858382.747|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1592858382.747|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": unassign done in state up (join state init): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1592858383.753|JOIN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": join with 1 (1) subscribed topic(s)
%7|1592858383.753|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (94490ms old)
%7|1592858383.753|JOIN|rdkafka#consumer-1| [thrd:main]: 10.137.12.3:9094/0: Joining group "infrastructure.postmerge_tests.kafka.testconsumer" with 1 subscribed topic(s)
%7|1592858383.753|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state init -> wait-join (v6, state up)
%7|1592858383.754|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId  (me), my MemberId , 0 members in group: Broker: Not coordinator
%7|1592858383.754|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state wait-join -> init (v6, state up)
%7|1592858383.754|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state init, v6 vs 0)
%7|1592858383.754|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator
%7|1592858383.755|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.137.132.178:9094/1: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.137.12.3:9094 id 0
%7|1592858385.753|JOIN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": join with 1 (1) subscribed topic(s)
%7|1592858385.753|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (96490ms old)
%7|1592858385.753|JOIN|rdkafka#consumer-1| [thrd:main]: 10.137.12.3:9094/0: Joining group "infrastructure.postmerge_tests.kafka.testconsumer" with 1 subscribed topic(s)
%7|1592858385.753|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state init -> wait-join (v6, state up)
%7|1592858385.754|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId  (me), my MemberId , 0 members in group: Broker: Not coordinator
%7|1592858385.754|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state wait-join -> init (v6, state up)
%7|1592858385.754|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state init, v6 vs 0)
%7|1592858385.754|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.137.70.154:9094/2: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator
%7|1592858385.755|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.137.70.154:9094/2: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.137.12.3:9094 id 0
%7|1592858387.697|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op GET_ASSIGNMENT (v0) in state up (join state init, v6 vs 0)
%7|1592858387.747|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1592858387.747|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer": unassign done in state up (join state init): without new assignment: OffsetCommit done (__NO_OFFSET)

< stuck at this point already, last lines repeated many times > 

  • Provide broker log excerpts -- happy to provide these although I'm unclear what to look for
  • Critical issue: this was a major issue for us but the revert reduced the urgency. It does seem quite important and likely but the lack of other reports makes me wonder if we are doing something incorrect
@edenhill
Copy link
Contributor

This is a fantastic bug report! 💯

@edenhill
Copy link
Contributor

%7|1592858603.758|JOIN|rdkafka#consumer-1| [thrd:main]: 10.137.12.3:9094/0: Joining group "infrastructure.postmerge_tests.kafka.testconsumer" with 1 subscribed topic(s)
%7|1592858603.758|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state init -> wait-join (v6, state up)
%7|1592858604.134|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId  (me), my MemberId , 0 members in group: Broker: Not coordinator
%7|1592858604.134|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed join state wait-join -> init (v6, state up)
%7|1592858604.134|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state init, v6 vs 0)
%7|1592858604.134|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.137.70.154:9094/2: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator
%7|1592858604.154|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.137.70.154:9094/2: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.137.12.3:9094 id 0

Broker 0 responds to FindCoordinator requests by saying that broker 0 (itself) is the coordinator for the given group, but on JoinGroup it responds with Not coordinator.
This suggests that broker 0 has stale metadata or state information, is there anything of interest in the broker 0 logs?
Or, is it possible that the client is connected to an old broker instance that is still running despite being replaced with a newer instance with the same broker id?

It would be useful to have this reproduced with debug=all to see exactly which requests are being sent to which broker.

@gridaphobe
Copy link
Contributor

The other interesting difference between this issue and #2933 is that this one seems to rely on the auto-commit, whereas I couldn't reproduce #2933 with auto-commit enabled.

@dimpavloff
Copy link
Author

dimpavloff commented Jun 29, 2020

Thanks for the quick response! I have been trying all day to reproduce this with the same binary (same artifact) with debug=all and only got to force it once (more on that later). I should have mentioned earlier that we have 150-170 CGs per environment and only 1 or 2 would get stuck during a rolling update. Unfortunately it is tricky for me to create that many consumers with debug=all.

Or, is it possible that the client is connected to an old broker instance that is still running despite being replaced with a newer instance with the same broker id?

This shouldn't be possible (apart from a bug in Kafka itself) because of how the rollout is done: a broker instance (a kubernetes pod) is terminated. Only then a new one is created, with a new IP and the same persistent volume attached to the new pod.

However, whilst the IPs will differ, the hostname and FQDN will be the same -- is it possible that there's some client-side caching / temporary blacklisting that confuses rdkafka? although that should be fine as the broker.address.ttl is the default 1 second. The advertised listeners on 9092 and 9094 are IP-based but on our usual listener (which I can't use in my test binary) they are FQDN-based. So far I have ruled this out because I was able to get that one consumer stuck on :9092 and :9094 but please let me know if I've missed something.
Furthermore, the bootstrap server input we provide is kafka which resolves to multiple CNAMEs, one per brokers -- would this be a concern?

This suggests that broker 0 has stale metadata or state information,

But would this be the case over many hours however? We monitor the Broker's health and there are no issues after the rollout. The JoinGroup error starts happening whilst there are still underreplicated partitions (replication factor 3, min.in.sync=2), which includes __consumer_offsets, I assume but those catch up within a minute or two after the last broker has been booted.
Furthermore, a restart of the affected consumer immediately fixes the issue which shouldn't matter if it was an issue with the broker having incorrect metadata?

As I said, I managed to reproduce it once. However, this actually happened with librdkafka 1.2.1! I think it's possible over the half a year we've used 1.2.1 that we've had a small handful of occurrences where this could have been the root cause without us noticing; however, this is nowhere near as frequent as is with 1.4.2 - which we observed more or less daily for 1-2 consumers. So perhaps something changed in newer versions to make this more probable.

Also, what I've captured, if I'm reading it correctly, is whilst the consumer is in state "up"; the original consumer state seems to be "join-state started". Do you think this could result in NOT_COORDINATOR errors during heartbeats / commits? As during my experiments, I have now started seeing the 1.2.1 environments report such (but no JoinGroup request errors in those environments).

Here's a breakdown of the rollout which I captured (for some irrelevant reasons, the rollout in this env was slower today ..)
kafka-2: TERM at 19:30:00, 19:37:00 ready (accepting requests there may still be some replicas remaining that are not in sync)
kafka-1: TERM at 19:35:23, 19:42:35 ready
kafka-0: TERM at 19:41:50, 19:48:44 ready

The only interesting bits I've managed to find in the sea of broker logs are around broker-2 handing over being coordinator and soon after the newly recovered broker-0 is reporting that the CG is empty. For our real consumer groups which we've detected stuck, I haven't been able to confirm/reject whether the below is always emitted but I have seen it; also, in some cases I have seen the group declared dead, too.

Broker logs:

broker-0
20591:[2020-06-29 19:49:04,211] INFO [GroupCoordinator 0]: Loading group metadata for infrastructure.postmerge_tests.kafka.testconsumer with generation 56 (kafka.coordinator.group.GroupCoordinator)
20592:[2020-06-29 19:49:04,211] INFO [GroupCoordinator 0]: Loading group metadata for infrastructure.postmerge_tests.kafka.testconsumer with generation 56 (kafka.coordinator.group.GroupCoordinator)
21087:[2020-06-29 19:49:14,212] INFO [GroupCoordinator 0]: Member rdkafka-f1ae5a77-5639-413d-a06e-6c2a1ed9ec85 in group infrastructure.postmerge_tests.kafka.testconsumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
21088:[2020-06-29 19:49:14,212] INFO [GroupCoordinator 0]: Member rdkafka-f1ae5a77-5639-413d-a06e-6c2a1ed9ec85 in group infrastructure.postmerge_tests.kafka.testconsumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
21089:[2020-06-29 19:49:14,212] INFO [GroupCoordinator 0]: Preparing to rebalance group infrastructure.postmerge_tests.kafka.testconsumer in state PreparingRebalance with old generation 56 (__consumer_offsets-45) (reason: removing member rdkafka-f1ae5a77-5639-413d-a06e-6c2a1ed9ec85 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
21090:[2020-06-29 19:49:14,212] INFO [GroupCoordinator 0]: Preparing to rebalance group infrastructure.postmerge_tests.kafka.testconsumer in state PreparingRebalance with old generation 56 (__consumer_offsets-45) (reason: removing member rdkafka-f1ae5a77-5639-413d-a06e-6c2a1ed9ec85 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
21091:[2020-06-29 19:49:14,212] INFO [GroupCoordinator 0]: Group infrastructure.postmerge_tests.kafka.testconsumer with generation 57 is now empty (__consumer_offsets-45) (kafka.coordinator.group.GroupCoordinator)
21092:[2020-06-29 19:49:14,212] INFO [GroupCoordinator 0]: Group infrastructure.postmerge_tests.kafka.testconsumer with generation 57 is now empty (__consumer_offsets-45) (kafka.coordinator.group.GroupCoordinator)

broker-2
2474:[2020-06-29 19:43:26,491] INFO [GroupCoordinator 2]: Unloading group metadata for infrastructure.postmerge_tests.kafka.testconsumer.1 with generation 56 (kafka.coordinator.group.GroupCoordinator)
2475:[2020-06-29 19:43:26,491] INFO [GroupCoordinator 2]: Unloading group metadata for infrastructure.postmerge_tests.kafka.testconsumer.1 with generation 56 (kafka.coordinator.group.GroupCoordinator)
26572:[2020-06-29 19:48:43,098] INFO [GroupCoordinator 2]: Unloading group metadata for infrastructure.postmerge_tests.kafka.testconsumer with generation 56 (kafka.coordinator.group.GroupCoordinator)
26573:[2020-06-29 19:48:43,098] INFO [GroupCoordinator 2]: Unloading group metadata for infrastructure.postmerge_tests.kafka.testconsumer with generation 56 (kafka.coordinator.group.GroupCoordinator)


some consumer logs with debug=all

%7|1593460123.705|SEND|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Sent FetchRequest (v4, 100 bytes @ 0, CorrId 3103)
%7|1593460123.705|FETCH|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Fetch 2/2/2 toppar(s)
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Topic infrastructure [9] MessageSet size 0, error "Success", MaxOffset 8, LSO 8, Ver 2/2
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Topic infrastructure [5] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Topic infrastructure [8] MessageSet size 0, error "Success", MaxOffset 5, LSO 5, Ver 2/2
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Topic infrastructure [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Fetch topic infrastructure [2] at offset 1 (v2)
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Topic infrastructure [3] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Fetch topic infrastructure [5] at offset 1 (v2)
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Fetch topic infrastructure [8] at offset 5 (v2)
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Fetch topic infrastructure [3] at offset 1 (v2)
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Fetch 5/5/5 toppar(s)
%7|1593460123.720|FETCH|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Fetch topic infrastructure [9] at offset 8 (v2)
%7|1593460123.720|SEND|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Sent FetchRequest (v4, 148 bytes @ 0, CorrId 5370)
%7|1593460123.746|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Heartbeat for group "infrastructure.postmerge_tests.kafka.testconsumer" generation id 56
%7|1593460123.746|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent HeartbeatRequest (v0, 128 bytes @ 0, CorrId 194)
%7|1593460123.747|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/0: Received HeartbeatResponse (v0, 2 bytes, CorrId 194, rtt 0.93ms)
%7|1593460123.747|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" heartbeat error response in state up (join state started, 10 partition(s) assigned): Broker: Not coordinator for group
%7|1593460123.747|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Heartbeat failed due to coordinator (10.136.22.113:9092/0) no longer available: Broker: Not coordinator for group: re-querying for coordinator
%7|1593460123.747|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.136.22.113:9092/0: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: intervaled in state up
%7|1593460123.747|REQERR|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: HeartbeatRequest failed: Broker: Not coordinator for group: actions Refresh
%7|1593460123.747|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: kafka:9092/bootstrap: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator for group
%7|1593460123.747|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state started, v4 vs 0)
%7|1593460123.747|SEND|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Sent GroupCoordinatorRequest (v0, 78 bytes @ 0, CorrId 1039)
%7|1593460123.748|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: kafka:9092/bootstrap: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.136.22.113:9092 id 0
%7|1593460123.753|RECV|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Received FetchResponse (v4, 124 bytes, CorrId 1038, rtt 118.54ms)
%7|1593460123.747|SEND|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Sent GroupCoordinatorRequest (v0, 78 bytes @ 0, CorrId 101)
%7|1593460123.748|RECV|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Received GroupCoordinatorResponse (v0, 25 bytes, CorrId 101, rtt 1.10ms)
%7|1593460123.753|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Topic infrastructure [7] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1593460123.753|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Topic infrastructure [1] MessageSet size 0, error "Success", MaxOffset 5, LSO 5, Ver 2/2
%7|1593460123.753|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Topic infrastructure [4] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1593460123.753|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Fetch topic infrastructure [1] at offset 5 (v2)
%7|1593460123.753|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Fetch topic infrastructure [4] at offset 1 (v2)
%7|1593460123.753|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Fetch topic infrastructure [7] at offset 0 (v2)
%7|1593460123.753|SEND|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Sent FetchRequest (v4, 116 bytes @ 0, CorrId 1040)
%7|1593460123.753|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Fetch 3/3/3 toppar(s)
%7|1593460123.754|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changing coordinator 0 -> 2
%7|1593460123.753|RECV|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Received GroupCoordinatorResponse (v0, 25 bytes, CorrId 1039, rtt 6.47ms)
%7|1593460123.754|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.136.22.113:9092/0: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.136.28.148:9092 id 2
%7|1593460123.754|COORDCLEAR|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" broker 10.136.22.113:9092/0 is no longer coordinator
%7|1593460123.754|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Broker nodename changed from "10.136.4.14:9092" to ""
%7|1593460123.754|COORDSET|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator set to broker 10.136.28.148:9092/2
%7|1593460123.754|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Resource temporarily unavailable)
%7|1593460123.754|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" changed state up -> wait-broker-transport (v4, join-state started)
%7|1593460123.754|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "10.136.28.148:9092"
%7|1593460123.754|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 455923ms in state UP)
%7|1593460123.754|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1593460123.754|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state UP -> DOWN
%7|1593460123.754|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Purging bufq with 0 buffers
%7|1593460123.754|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1593460123.754|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Purging bufq with 0 buffers
%7|1593460123.754|METADATA|rdkafka#consumer-1| [thrd:GroupCoordinator]: 10.136.156.76:9092/1: Request metadata for 1 topic(s): broker down
%7|1593460123.754|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Updating 0 buffers on connection reset
%7|1593460123.754|METADATA|rdkafka#consumer-1| [thrd:GroupCoordinator]: Requesting metadata for 1/1 topics: broker down
%7|1593460123.754|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.136.28.148:9092/2: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: intervaled in state wait-broker-transport
%7|1593460123.754|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1593460123.754|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1593460123.754|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1593460123.754|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1593460123.754|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1593460123.754|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1593460123.754|SEND|rdkafka#consumer-1| [thrd:10.136.28.148:9092/2]: 10.136.28.148:9092/2: Sent GroupCoordinatorRequest (v0, 78 bytes @ 0, CorrId 5371)
%7|1593460123.754|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: broker in state TRY_CONNECT connecting
%7|1593460123.754|SEND|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Sent MetadataRequest (v2, 47 bytes @ 0, CorrId 3104)
%7|1593460123.754|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Connecting to ipv4#10.136.28.148:9092 (plaintext) with socket 17
%7|1593460123.754|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Connected to ipv4#10.136.28.148:9092
%7|1593460123.754|CONNECTED|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Connected (#4)
%7|1593460123.756|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Received ApiVersionResponse (v0, 294 bytes, CorrId 195, rtt 1.95ms)
%7|1593460123.754|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 195)
%7|1593460123.754|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1593460123.754|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change

...

%7|1593460132.746|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Heartbeat for group "infrastructure.postmerge_tests.kafka.testconsumer" generation id 56
%7|1593460132.747|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent HeartbeatRequest (v0, 128 bytes @ 0, CorrId 199)
%7|1593460132.748|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" heartbeat error response in state up (join state started, 10 partition
(s) assigned): Broker: Not coordinator for group
%7|1593460132.748|REQERR|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: HeartbeatRequest failed: Broker: Not coordinator for group: actions Refresh
%7|1593460132.748|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/0: Received HeartbeatResponse (v0, 2 bytes, CorrId 199, rtt 1.05ms)
%7|1593460132.748|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state started, v4 vs 0)
%7|1593460132.748|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.136.156.76:9092/1: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinat
or for group
%7|1593460132.748|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Heartbeat failed due to coordinator (10.136.22.113:9092/0) no longer available: Broker: Not coordinator for group: re-querying f
or coordinator
%7|1593460132.748|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.136.22.113:9092/0: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: intervaled in state u
p
%7|1593460132.748|SEND|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Sent GroupCoordinatorRequest (v0, 78 bytes @ 0, CorrId 1127)
%7|1593460132.748|SEND|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Sent GroupCoordinatorRequest (v0, 78 bytes @ 0, CorrId 3194)
%7|1593460132.794|RECV|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Received FetchResponse (v4, 94 bytes, CorrId 3193, rtt 100.27ms)
%7|1593460132.794|FETCH|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Topic infrastructure [0] MessageSet size 0, error "Success", MaxOffset 3, LSO 3, Ver 2/2
%7|1593460132.794|FETCH|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Fetch topic infrastructure [6] at offset 10 (v2)
%7|1593460132.794|FETCH|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Fetch topic infrastructure [0] at offset 3 (v2)
%7|1593460132.794|FETCH|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Fetch 2/2/2 toppar(s)
%7|1593460132.794|FETCH|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Topic infrastructure [6] MessageSet size 0, error "Success", MaxOffset 10, LSO 10, Ver 2/2
%7|1593460132.794|RECV|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Received GroupCoordinatorResponse (v0, 25 bytes, CorrId 3194, rtt 46.29ms)
%7|1593460132.794|SEND|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Sent FetchRequest (v4, 100 bytes @ 0, CorrId 3195)
%7|1593460132.794|FETCH|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Topic infrastructure [4] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1593460132.794|RECV|rdkafka#consumer-1| [thrd:10.136.4.14:9092/0]: 10.136.22.113:9092/0: Received FetchResponse (v4, 124 bytes, CorrId 1126, rtt 101.32ms)
%7|1593460132.794|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: 10.136.156.76:9092/1: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.136.22.113:9092 id 0
%7|1593460135.747|REQERR|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: HeartbeatRequest failed: Broker: Not coordinator for group: actions Refresh
%7|1593460135.748|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" received op COORD_QUERY (v0) in state up (join state started, v4 vs 0)
%7|1593460135.747|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Heartbeat failed due to coordinator (10.136.22.113:9092/0) no longer available: Broker: Not coordinator for group: re-querying for coordinator
%7|1593460135.747|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: 10.136.156.76:9092/1: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: intervaled in state up
%7|1593460135.747|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "infrastructure.postmerge_tests.kafka.testconsumer" heartbeat error response in state up (join state started, 10 partition(s) assigned): Broker: Not coordinator for group
%7|1593460135.748|SEND|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Sent GroupCoordinatorRequest (v0, 78 bytes @ 0, CorrId 105)
%7|1593460135.748|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: kafka:9092/bootstrap: Group "infrastructure.postmerge_tests.kafka.testconsumer": querying for coordinator: Broker: Not coordinator for group
%7|1593460135.748|SEND|rdkafka#consumer-1| [thrd:10.136.156.76:9092/1]: 10.136.156.76:9092/1: Sent GroupCoordinatorRequest (v0, 78 bytes @ 0, CorrId 3225)
%7|1593460135.749|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: kafka:9092/bootstrap: Group "infrastructure.postmerge_tests.kafka.testconsumer" coordinator is 10.136.22.113:9092 id 0
%7|1593460135.749|RECV|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Received GroupCoordinatorResponse (v0, 25 bytes, CorrId 105, rtt 1.05ms)

full consumer logs (577k lines):
stuck-consumer-debug-all.zip

Hopefully this helps.

@dimpavloff
Copy link
Author

Hi @edenhill , is the above useful or do you need more information?

Thanks!

@edenhill
Copy link
Contributor

However, whilst the IPs will differ, the hostname and FQDN will be the same -- is it possible that there's some client-side caching / temporary blacklisting that confuses rdkafka? although that should be fine as the broker.address.ttl is the default 1 second.

If the broker id changes but the address:port remains the same then librdkafka will associate that address:port with the old broker id until metadata is refreshed (up to topic.metadata.refresh.interval.ms), so this definitely sounds like it could be the issue here.
There is no way within a Kafka protocol connection to identify a broker's id, it is only done through address:port->brokerId mappings in Metadata responses.

@dimpavloff
Copy link
Author

If the broker id changes but the address:port remains the same then librdkafka will associate that address:port with the old broker id until metadata is refreshed (up to topic.metadata.refresh.interval.ms), so this definitely sounds like it could be the issue here.

The broker ID doesn't change AFAIK as the new instance reuses the storage volume of the old instance and therefore adopts that old ID.
It's the IP which changes.

@edenhill
Copy link
Contributor

Does this mean that the old IP might be reused for a broker with a different broker id?

@dimpavloff
Copy link
Author

Does this mean that the old IP might be reused for a broker with a different broker id?

It's definitely possible but I think unlikely as the IP CIDR blocks are large; from looking at the last 10+ rollouts, this hasn't happened. I'll keep an eye on that and report if it correlates, however.

@ajbarb
Copy link
Contributor

ajbarb commented Dec 7, 2020

I am hitting the same issue with 1.4.2 and its happening only in our prod environment. Its been difficult to collect logs with debug level since we have too many client instances running. @dimpavloff, is this issue still repro'ing for you? Did you add any resiliency on the client side?

@edenhill, any suggestions here.

@MaximGurschi
Copy link

@ajbarb Not sure about 1.5.2 but in 1.4.2 yes was getting the issue. I implement a reconnect when this is detected.

@ajbarb
Copy link
Contributor

ajbarb commented Dec 7, 2020

@ajbarb Not sure about 1.5.2 but in 1.4.2 yes was getting the issue. I implement a reconnect when this is detected.

How did you detect this issue on the client side? Do you restart the client or just reconnect the broker connections under the hood?

@MaximGurschi
Copy link

@ajbarb Similar to #2630. Check the error code and if it keeps happening cycle the connection. So close old connection and then open new one. The process itself is not restarted.

@koushikchitta
Copy link

seems to be related to https://issues.apache.org/jira/browse/KAFKA-9752

@ajbarb
Copy link
Contributor

ajbarb commented Jan 15, 2021

I have been able to confirm that the client actually connects to wrong broker IP when the consumer group coordinator switch is happening due to cached broker address which is pointing to the earlier broker.

Broker/ip
1685982883/ 100.126.14.175
1685982916/ 100.126.61.68

12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:main]: GroupCoordinator/1685982883: Broker nodename changed from "100.126.14.175:9092" to ""
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "100.126.61.68:9092"
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator/1685982916: Closing connection due to nodename change (after 0ms in state CONNECT) (_TRANSPORT): identical to last error: error log suppressed
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator/1685982916: Delaying next reconnect by 638ms
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator/1685982916: broker in state TRY_CONNECT connecting
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator/1685982916: Connecting to ipv4#100.126.14.175:9092 (plaintext) with socket 11864
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator/1685982916: Connected to ipv4#100.126.14.175:9092
12-25-2020 13:32:48[ProcessId: 63772] Level: Debug, Message: [thrd:GroupCoordinator]: GroupCoordinator/1685982916: Connected (#15)

I was able to repro this locally with these steps:

  • Created a Kafka instance with 3 brokers (default.replication.factor=3 offsets.topic.replication.factor=3)
  • Started a test producer producing messages in a loop
  • Created a test consumer with broker.address.ttl config set to 80000ms to keep the address cached for long enough
  • Started a test consumer and set a breakpoint on the code where the broker address ttl expiry check is happening. This is to ensure that the client process is blocked till the current consumer group coordinator broker (which will be restarted) is back up
  • Brought down the current group coordinator
  • Observed the group coordinator change to another broker
  • Hit the breakpoint and started the broker back up
  • Let the breakpoint go and I saw the JoinGroupRequests failing with broker not coordinator error.

The problem seems to be in this code below (rdkafka_broker.c) where in if client observes multiple group coordinator switch within broker.address.ttl time then it uses cached address:

  if (rkb->rkb_rsal &&
        rkb->rkb_ts_rsal_last + (rkb->rkb_rk->rk_conf.broker_addr_ttl*1000)
        < rd_clock()) {
            /* Address list has expired. */

            /* Save the address index to make sure we still round-robin
             * if we get the same address list back */
            save_idx = rkb->rkb_rsal->rsal_curr;

            rd_sockaddr_list_destroy(rkb->rkb_rsal);
            rkb->rkb_rsal = NULL;
    }

    if (!rkb->rkb_rsal) {
            /* Resolve */

@edenhill, what would be the right fix here?

@dimpavloff
Copy link
Author

dimpavloff commented Jan 26, 2021

@ajbarb , looks like a very nice investigation, thanks for this!
I was sceptical that this would be exactly the issue in our env because we use the default value of 1 second for broker.address.ttl. And yet, it seems that for the 4 days I've had this changed to 0 in our dev environment, we have not seen any NOT_COORDINATOR errors after a broker rolling update has completed! I'll continue to monitor and report if the issue re-occurs but so far this is looking like a promising workaround.

One thing that slightly worries me is that setting the value to 0 removes the dampening from sending many DNS requests during downtime. I wonder if this would put a lot more pressure on the DNS servers if none of the brokers + the bootstrap server can be resolved. I assume reconnect.backoff.ms and reconnect.backoff.max.ms would help avoid this (i.e they aren't used only after we have an IP) but it would be good if @edenhill could confirm these parameters are relevant.

Btw, here's a reminder of the env we have, hopefully useful for debugging:

  • Kafka brokers on k8s. The bootstrap server is a K8s Service which resolves to all brokers's IPs. The brokers themselves have hostnames and it's these that they advertise (as we use SSL). The hostnames resolve to the broker's current IP. In both cases, the DNS records have 5 seconds TTL. When the broker (pod) gets shutdown, the records will not resolve temporarily. When a new broker pod is created (even if it there's no connectivity to it), the DNS records (both the K8s Service and pod's hostname) get updated close to immediately.
  • we compile rdkafka 1.2 ourselves against musl

@edenhill
Copy link
Contributor

I assume reconnect.backoff.ms and reconnect.backoff.max.ms would help avoid this (i.e they aren't used only after we have an IP)

Yep

@BEagle1984
Copy link

BEagle1984 commented Jan 27, 2021

We are experiencing what it looks to be the same issue described in here.
We are using librdkafka 1.5.3 through https://github.com/confluentinc/confluent-kafka-dotnet (1.5.3). It is fairly easily to reproduce the issue consistently:

  • Connect a consumer with default settings to a local kafka broker running in docker (zookeeper + a broker)
  • Stop the broker container and restart it
  • A warning like Consumer group session timed out (in join-state started) after xxxxx ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group (no error is caught through the error callback, just this warning is being forwarded to the log callback)
  • The consumer is stuck and you can't even disconnect it without killing the process

I made some tests with v1.6.0 and downgrading to v1.4.0, observing the exact same behavior.

What seems to solve the issue for us is setting session.timeout.ms to an higher value (e.g. 20000).

@edenhill do you think this is the same issue? Is the session.timeout.ms workaround legit or do we have to expect the error to randomly popup again sooner or later?

EDIT:
Setting session.timeout.ms to an higher value mitigates the issue but the consumer needs to be able to reconnect within the defined timeout to avoid the issue, which doesn't seem optimal.

@edenhill: Is this the only possible workaround? Is it a bad idea to set session.timeout.ms to a value that's large enough to accommodate for the rolling update?

@mhowlett: FYI

EDIT2: Nevermind, it turned out to be due to some race conditions on our side, in the partitions revoked / assigned callbacks.

@mikeatlas
Copy link

mikeatlas commented Sep 15, 2022

I hit some similar error-loop situation where I'd see these debug logs for the consumer:

Rejoining group without an assignment: JoinGroup error: Broker: Not coordinator

%7|1663256004.711|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "mygroup1": Rejoining group without an assignment: JoinGroup error: Broker: Not coordinator
%7|1663256004.711|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "mygroup1" changed join state wait-join -> init (state up)
%7|1663256004.711|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "mygroup1" received op COORD_QUERY in state up (join-state init)
%7|1663256004.711|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: sasl_ssl://b-1.msk.123.4566.kafka.us-east-1.amazonaws.com:9096/1: Group "mygroup1": querying for coordinator: Broker: Not coordinator
%7|1663256004.711|SEND|rdkafka#consumer-1| [thrd:sasl_ssl://b-1.msk.123.4566.kafka.us-east-1.amazonaws.com:]: sasl_ssl://b-1.msk.123.4566.kafka.us-east-1.amazonaws.com:9096/1: Sent FindCoordinatorRequest (v2, 53 bytes @ 0, CorrId 37)
%7|1663256004.712|RECV|rdkafka#consumer-1| [thrd:sasl_ssl://b-1.msk.123.4566.kafka.us-east-1.amazonaws.com:]: sasl_ssl://b-1.msk.123.4566.kafka.us-east-1.amazonaws.com:9096/1: Received FindCoordinatorResponse (v2, 73 bytes, CorrId 37, rtt 0.56ms)
%7|1663256004.712|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: sasl_ssl://b-1.msk.123.4566.kafka.us-east-1.amazonaws.com:9096/1: Group "mygroup1" coordinator is b-1.msk.123.4566.kafka.us-east-1.amazonaws.com:9096 id 1

It might be apparent these are AWS MSK brokers. What's not obvious is that the MSK cluster lives in a different VPC and I'm connecting using private DNS resolution for those hostnames to resolve VPC Private Links that are exposed to my consumer. A blog post called Secure connectivity patterns to access Amazon MSK across AWS Regions
describes the setup.

My error situation was a mistake on my part: the Route53 private domain aliases for each of the bootstrap servers was wrong for all 3 VPC endpoint DNS routes. As a result, some connectivity appeared to be fine while joining a group would fail in a loop; the reasoning is that the DNS aliases in my case were actually pointing to the wrong destination advertised by the broker for the group coordinator. As such, the client would connect to b-1... just fine, and get a message to switch to b-1 from the broker (but how could that be ❓❓❓).

The problem melted away when I straightened out DNS aliases to match the broker addresses' VPC endpoints, as one might expect. I'm sharing this on this thread since the hints about DNS and IP changes caused me to triple-check my endpoint configurations, and indeed the issue was that the aliases were mixed up (b-1 was resolving b-3, etc).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants