Skip to content

Commit

Permalink
add :uris as a valid option for ConsumerGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
jbruggem committed Jun 3, 2019
1 parent e1e4872 commit df72ac5
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ KakfaEx supports the following Kafka features:
* Message Compression with Snappy and gzip
* Offset Management (fetch / commit / autocommit)
* Consumer Groups
* Topics Management (create / delete)

See [Kafka Protocol Documentation](http://kafka.apache.org/protocol.html) and
[A Guide to the Kafka Protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)
Expand Down
2 changes: 2 additions & 0 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ defmodule KafkaEx.ConsumerGroup do
* `:max_restarts`, `:max_seconds` - Supervisor restart policy parameters
* `:partition_assignment_callback` - See
`t:KafkaEx.ConsumerGroup.PartitionAssignment.callback/0`
* `:uris` - See `KafkaEx.create_worker/2`
Note `:session_timeout` is registered with the broker and determines how long
before the broker will de-register a consumer from which it has not heard a
Expand All @@ -111,6 +112,7 @@ defmodule KafkaEx.ConsumerGroup do
| {:name, Supervisor.name()}
| {:max_restarts, non_neg_integer}
| {:max_seconds, non_neg_integer}
| {:uris, KafkaEx.uri()}

@type options :: [option]

Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ defmodule KafkaEx.Server0P10AndLater do
%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_api_versions(%State{brokers: brokers})
}, state} = kafka_server_api_versions(%State{brokers: brokers})
if error_code == :no_response do
sleep_for_reconnect()
raise "Brokers sockets are closed"
Expand Down
2 changes: 1 addition & 1 deletion test/integration/server0_p_10_and_later_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do
resp = create_topic(name, config)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

Enum.member?(existing_topics(), name)
assert Enum.member?(existing_topics(), name)

assert @num_partitions ==
KafkaEx.Protocol.Metadata.Response.partitions_for_topic(
Expand Down

0 comments on commit df72ac5

Please sign in to comment.