From 65275ab8112364a5933699b0c40a7f7f1510f030 Mon Sep 17 00:00:00 2001 From: Michael Parrott Date: Wed, 18 Mar 2020 14:43:26 -0400 Subject: [PATCH 1/2] Increment attempt number instead of decrement when joining a consumer group --- lib/kafka_ex/consumer_group/manager.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 1a2d576d..dff119c6 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -292,7 +292,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do ) :timer.sleep(1000) - join(state, attempt_number - 1) + join(state, attempt_number + 1) %{error_code: error_code} -> raise "Error joining consumer group #{group_name}: " <> From 4444c9651676e58afa19fccef47b293b00f13963 Mon Sep 17 00:00:00 2001 From: Michael Parrott Date: Mon, 6 Apr 2020 18:22:29 -0400 Subject: [PATCH 2/2] Be more defensive when checking for consumer group retry count --- lib/kafka_ex/consumer_group/manager.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index dff119c6..505f6b1e 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -281,7 +281,7 @@ defmodule KafkaEx.ConsumerGroup.Manager do on_successful_join(state, join_response) {:error, :no_broker} -> - if attempt_number == @max_join_retries do + if attempt_number >= @max_join_retries do raise "Unable to join consumer group #{state.group_name} after " <> "#{@max_join_retries} attempts" end