Skip to content

Commit

Permalink
Merge pull request #367 from kafkaex/api_versions_in_consumer_groups
Browse files Browse the repository at this point in the history
Support setting api versions for consumer groups
  • Loading branch information
dantswain authored Oct 1, 2019
2 parents a1cb7ca + 2e7678e commit 39cc974
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 67 deletions.
11 changes: 11 additions & 0 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ defmodule KafkaEx.ConsumerGroup do
implements the `KafkaEx.GenConsumer` behaviour and start a
`KafkaEx.ConsumerGroup` configured to use that module.
The api versions of some of the underlying messages can be specified in the
`:api_versions` option. Note that these will be ignored (api version 0 used)
unless you have `kafka_version: "kayrock"` set in the KafkaEx application
config. The following versions can be specified:
* `:fetch` - Fetch requests - use v2+ for newer versions of Kafka
* `:offset_fetch` - Offset fetch requests - use v1+ for offsets stored in
Kafka (as opposed to zookeeper)
* `:offset_commit` - Offset commit requests - use v1+ to store offsets in
Kafka (as opposed to zookeeper)
## Example
Suppose we want to consume from a topic called `"example_topic"` with a
Expand Down
14 changes: 13 additions & 1 deletion lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ defmodule KafkaEx.ConsumerGroup.Manager do
"#{inspect(reason)}"
end

Logger.debug(fn -> "Joined consumer group #{group_name}" end)
Logger.debug(fn ->
"Joined consumer group #{group_name} generation " <>
"#{join_response.generation_id} as #{join_response.member_id}"
end)

new_state = %State{
state
Expand Down Expand Up @@ -439,10 +442,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do
gen_consumer_module: gen_consumer_module,
consumer_opts: consumer_opts,
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
supervisor_pid: pid
} = state,
assignments
) do
# add member_id and generation_id to the consumer opts
consumer_opts =
Keyword.merge(consumer_opts,
generation_id: generation_id,
member_id: member_id
)

{:ok, consumer_supervisor_pid} =
ConsumerGroup.start_consumer(
pid,
Expand Down
36 changes: 31 additions & 5 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ defmodule KafkaEx.GenConsumer do
{:commit_interval, non_neg_integer}
| {:commit_threshold, non_neg_integer}
| {:auto_offset_reset, :none | :earliest | :latest}
| {:api_versions, map()}
| {:extra_consumer_args, map()}

@typedoc """
Expand Down Expand Up @@ -388,12 +389,15 @@ defmodule KafkaEx.GenConsumer do
:group,
:topic,
:partition,
:member_id,
:generation_id,
:current_offset,
:committed_offset,
:acked_offset,
:last_commit,
:auto_offset_reset,
:fetch_options
:fetch_options,
:api_versions
]
end

Expand Down Expand Up @@ -530,6 +534,13 @@ defmodule KafkaEx.GenConsumer do
:extra_consumer_args
)

generation_id = Keyword.get(opts, :generation_id)
member_id = Keyword.get(opts, :member_id)

default_api_versions = %{fetch: 0, offset_fetch: 0, offset_commit: 0}
api_versions = Keyword.get(opts, :api_versions, %{})
api_versions = Map.merge(default_api_versions, api_versions)

{:ok, consumer_state} =
consumer_module.init(topic, partition, extra_consumer_args)

Expand Down Expand Up @@ -559,7 +570,10 @@ defmodule KafkaEx.GenConsumer do
group: group_name,
topic: topic,
partition: partition,
fetch_options: fetch_options
generation_id: generation_id,
member_id: member_id,
fetch_options: fetch_options,
api_versions: api_versions
}

Process.flag(:trap_exit, true)
Expand Down Expand Up @@ -675,7 +689,10 @@ defmodule KafkaEx.GenConsumer do
KafkaEx.fetch(
topic,
partition,
Keyword.merge(fetch_options, offset: offset)
Keyword.merge(fetch_options,
offset: offset,
api_version: Map.fetch!(state.api_versions, :fetch)
)
)

response
Expand Down Expand Up @@ -812,14 +829,19 @@ defmodule KafkaEx.GenConsumer do
group: group,
topic: topic,
partition: partition,
member_id: member_id,
generation_id: generation_id,
acked_offset: offset
} = state
) do
request = %OffsetCommitRequest{
consumer_group: group,
topic: topic,
partition: partition,
offset: offset
offset: offset,
member_id: member_id,
generation_id: generation_id,
api_version: Map.fetch!(state.api_versions, :offset_fetch)
}

[%OffsetCommitResponse{topic: ^topic, partitions: [partition_response]}] =
Expand Down Expand Up @@ -855,7 +877,8 @@ defmodule KafkaEx.GenConsumer do
request = %OffsetFetchRequest{
consumer_group: group,
topic: topic,
partition: partition
partition: partition,
api_version: Map.fetch!(state.api_versions, :offset_fetch)
}

[
Expand All @@ -867,6 +890,9 @@ defmodule KafkaEx.GenConsumer do
}
] = KafkaEx.offset_fetch(worker_name, request)

# newer api versions will return -1 if the consumer group does not exist
offset = max(offset, 0)

case error_code do
:no_error ->
%State{
Expand Down
13 changes: 8 additions & 5 deletions lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,8 @@ defmodule KafkaEx.New.Adapter do

%{
request
| # offset_commit_request.generation_id,
generation_id: -1,
# offset_commit_request.member_id,
member_id: "",
| generation_id: offset_commit_request.generation_id,
member_id: offset_commit_request.member_id,
topics: [
%{
topic
Expand All @@ -445,7 +443,12 @@ defmodule KafkaEx.New.Adapter do
}

v when v >= 2 ->
%{request | generation_id: -1, member_id: "", retention_time: -1}
%{
request
| generation_id: offset_commit_request.generation_id,
member_id: offset_commit_request.member_id,
retention_time: -1
}

_ ->
request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do
resp = create_topic(name, config, client)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

TestHelper.wait_for(fn ->
{:ok, metadatas} = KafkaExAPI.topics_metadata(client, [name])
length(metadatas) > 0
end)

{:ok, [metadata]} = KafkaExAPI.topics_metadata(client, [name])
assert @num_partitions == length(metadata.partitions)
end
Expand All @@ -48,7 +53,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do
resp = create_topic(name, [], client)
assert {:no_error, name} == parse_create_topic_resp(resp)

{:ok, [_metadata]} = KafkaExAPI.topics_metadata(client, [name])
{:ok, _metadata} = KafkaExAPI.topics_metadata(client, [name])

resp = KafkaEx.delete_topics([name], timeout: 5_000, worker_name: client)
assert {:no_error, name} = parse_delete_topic_resp(resp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do

alias KafkaEx.ConsumerGroup
alias KafkaEx.GenConsumer
alias KafkaEx.New.Client
alias KafkaEx.Protocol.OffsetFetch

# note this topic is created by docker_up.sh
@topic_name "consumer_group_implementation_test"
@topic_name "consumer_group_implementation_test_kayrock"
@partition_count 4
@consumer_group_name "consumer_group_implementation"

Expand Down Expand Up @@ -106,7 +108,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
end

def produce(message, partition) do
KafkaEx.produce(@topic_name, partition, message)
:ok = KafkaEx.produce(@topic_name, partition, message)
message
end

Expand All @@ -123,6 +125,10 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
message.value == expected_message && message.offset == expected_offset
end

def has_timestamp?(message) do
is_integer(message.timestamp) && message.timestamp > 0
end

def sync_stop(pid) when is_pid(pid) do
TestHelper.wait_for(fn ->
if Process.alive?(pid) do
Expand Down Expand Up @@ -157,17 +163,27 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
end

setup do
ports_before = num_open_ports()
{:ok, _} = TestPartitioner.start_link()

{:ok, client_args} = KafkaEx.build_worker_options([])

{:ok, client_pid} = Client.start_link(client_args, :no_name)

# the client will die on its own, so don't count that
ports_before = num_open_ports()

{:ok, @topic_name} =
TestHelper.ensure_append_timestamp_topic(client_pid, @topic_name)

{:ok, consumer_group_pid1} =
ConsumerGroup.start_link(
TestConsumer,
@consumer_group_name,
[@topic_name],
heartbeat_interval: 100,
partition_assignment_callback: &TestPartitioner.assign_partitions/2,
session_timeout_padding: 30000
session_timeout_padding: 30000,
api_versions: %{fetch: 3, offset_fetch: 3, offset_commit: 3}
)

{:ok, consumer_group_pid2} =
Expand All @@ -177,7 +193,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
[@topic_name],
heartbeat_interval: 100,
partition_assignment_callback: &TestPartitioner.assign_partitions/2,
session_timeout_padding: 30000
session_timeout_padding: 30000,
api_versions: %{fetch: 3, offset_fetch: 3, offset_commit: 3}
)

# wait for both consumer groups to join
Expand All @@ -195,7 +212,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
:ok,
consumer_group_pid1: consumer_group_pid1,
consumer_group_pid2: consumer_group_pid2,
ports_before: ports_before
ports_before: ports_before,
client: client_pid
}
end

Expand Down Expand Up @@ -302,6 +320,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
end)

last_message = List.last(TestConsumer.last_message_set(consumer_pid))
# should have a timestamp because we're using fetch v3
assert has_timestamp?(last_message)
{px, last_message.offset}
end)
|> Enum.into(%{})
Expand All @@ -319,7 +339,9 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
TestHelper.latest_consumer_offset_number(
@topic_name,
px,
@consumer_group_name
@consumer_group_name,
context[:client],
3
)

last_offset = Map.get(last_offsets, px)
Expand All @@ -329,5 +351,18 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do

# ports should be released
assert context[:ports_before] == num_open_ports()

# since we're using v3 for OffsetCommit, we should get an error if we try to
# fetch with v0
[resp] =
KafkaEx.offset_fetch(context[:client], %OffsetFetch.Request{
topic: @topic_name,
consumer_group: @consumer_group_name,
partition: 0,
api_version: 0
})

[partition_resp] = resp.partitions
assert partition_resp.error_code == :unknown_topic_or_partition
end
end
Loading

0 comments on commit 39cc974

Please sign in to comment.