From df72ac501551b1a874a99dbdfa10d41a848da9b3 Mon Sep 17 00:00:00 2001 From: Jehan Bruggeman Date: Wed, 22 May 2019 13:36:17 +0200 Subject: [PATCH] add :uris as a valid option for ConsumerGroup --- README.md | 1 + lib/kafka_ex/consumer_group.ex | 2 ++ lib/kafka_ex/server_0_p_10_and_later.ex | 2 +- test/integration/server0_p_10_and_later_test.exs | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f6b2a149..1c575ac1 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ KakfaEx supports the following Kafka features: * Message Compression with Snappy and gzip * Offset Management (fetch / commit / autocommit) * Consumer Groups +* Topics Management (create / delete) See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and [A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index dd1909a4..41f69368 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -94,6 +94,7 @@ defmodule KafkaEx.ConsumerGroup do * `:max_restarts`, `:max_seconds` - Supervisor restart policy parameters * `:partition_assignment_callback` - See `t:KafkaEx.ConsumerGroup.PartitionAssignment.callback/0` + * `:uris` - See `KafkaEx.create_worker/2` Note `:session_timeout` is registered with the broker and determines how long before the broker will de-register a consumer from which it has not heard a @@ -111,6 +112,7 @@ defmodule KafkaEx.ConsumerGroup do | {:name, Supervisor.name()} | {:max_restarts, non_neg_integer} | {:max_seconds, non_neg_integer} + | {:uris, KafkaEx.uri()} @type options :: [option] diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 26f16a48..0e6ccbe2 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -99,7 +99,7 @@ defmodule KafkaEx.Server0P10AndLater do %KafkaEx.Protocol.ApiVersions.Response{ api_versions: api_versions, error_code: error_code - }, state} = kafka_api_versions(%State{brokers: brokers}) + }, state} = kafka_server_api_versions(%State{brokers: brokers}) if error_code == :no_response do sleep_for_reconnect() raise "Brokers sockets are closed" diff --git a/test/integration/server0_p_10_and_later_test.exs b/test/integration/server0_p_10_and_later_test.exs index 8ef00881..b8e32737 100644 --- a/test/integration/server0_p_10_and_later_test.exs +++ b/test/integration/server0_p_10_and_later_test.exs @@ -20,7 +20,7 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do resp = create_topic(name, config) assert {:topic_already_exists, name} == parse_create_topic_resp(resp) - Enum.member?(existing_topics(), name) + assert Enum.member?(existing_topics(), name) assert @num_partitions == KafkaEx.Protocol.Metadata.Response.partitions_for_topic(