Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting api versions for consumer groups #367

Merged
merged 5 commits into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ defmodule KafkaEx.ConsumerGroup do
implements the `KafkaEx.GenConsumer` behaviour and start a
`KafkaEx.ConsumerGroup` configured to use that module.
The api versions of some of the underlying messages can be specified in the
`:api_versions` option. Note that these will be ignored (api version 0 used)
unless you have `kafka_version: "kayrock"` set in the KafkaEx application
config. The following versions can be specified:
* `:fetch` - Fetch requests - use v2+ for newer versions of Kafka
* `:offset_fetch` - Offset fetch requests - use v1+ for offsets stored in
Kafka (as opposed to zookeeper)
* `:offset_commit` - Offset commit requests - use v1+ to store offsets in
Kafka (as opposed to zookeeper)
## Example
Suppose we want to consume from a topic called `"example_topic"` with a
Expand Down
14 changes: 13 additions & 1 deletion lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ defmodule KafkaEx.ConsumerGroup.Manager do
"#{inspect(reason)}"
end

Logger.debug(fn -> "Joined consumer group #{group_name}" end)
Logger.debug(fn ->
"Joined consumer group #{group_name} generation " <>
"#{join_response.generation_id} as #{join_response.member_id}"
end)

new_state = %State{
state
Expand Down Expand Up @@ -439,10 +442,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do
gen_consumer_module: gen_consumer_module,
consumer_opts: consumer_opts,
group_name: group_name,
member_id: member_id,
generation_id: generation_id,
supervisor_pid: pid
} = state,
assignments
) do
# add member_id and generation_id to the consumer opts
consumer_opts =
Keyword.merge(consumer_opts,
generation_id: generation_id,
member_id: member_id
)

{:ok, consumer_supervisor_pid} =
ConsumerGroup.start_consumer(
pid,
Expand Down
36 changes: 31 additions & 5 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ defmodule KafkaEx.GenConsumer do
{:commit_interval, non_neg_integer}
| {:commit_threshold, non_neg_integer}
| {:auto_offset_reset, :none | :earliest | :latest}
| {:api_versions, map()}
| {:extra_consumer_args, map()}

@typedoc """
Expand Down Expand Up @@ -388,12 +389,15 @@ defmodule KafkaEx.GenConsumer do
:group,
:topic,
:partition,
:member_id,
:generation_id,
:current_offset,
:committed_offset,
:acked_offset,
:last_commit,
:auto_offset_reset,
:fetch_options
:fetch_options,
:api_versions
]
end

Expand Down Expand Up @@ -530,6 +534,13 @@ defmodule KafkaEx.GenConsumer do
:extra_consumer_args
)

generation_id = Keyword.get(opts, :generation_id)
member_id = Keyword.get(opts, :member_id)

default_api_versions = %{fetch: 0, offset_fetch: 0, offset_commit: 0}
api_versions = Keyword.get(opts, :api_versions, %{})
api_versions = Map.merge(default_api_versions, api_versions)

{:ok, consumer_state} =
consumer_module.init(topic, partition, extra_consumer_args)

Expand Down Expand Up @@ -559,7 +570,10 @@ defmodule KafkaEx.GenConsumer do
group: group_name,
topic: topic,
partition: partition,
fetch_options: fetch_options
generation_id: generation_id,
member_id: member_id,
fetch_options: fetch_options,
api_versions: api_versions
}

Process.flag(:trap_exit, true)
Expand Down Expand Up @@ -675,7 +689,10 @@ defmodule KafkaEx.GenConsumer do
KafkaEx.fetch(
topic,
partition,
Keyword.merge(fetch_options, offset: offset)
Keyword.merge(fetch_options,
offset: offset,
api_version: Map.fetch!(state.api_versions, :fetch)
)
)

response
Expand Down Expand Up @@ -812,14 +829,19 @@ defmodule KafkaEx.GenConsumer do
group: group,
topic: topic,
partition: partition,
member_id: member_id,
generation_id: generation_id,
acked_offset: offset
} = state
) do
request = %OffsetCommitRequest{
consumer_group: group,
topic: topic,
partition: partition,
offset: offset
offset: offset,
member_id: member_id,
generation_id: generation_id,
api_version: Map.fetch!(state.api_versions, :offset_fetch)
}

[%OffsetCommitResponse{topic: ^topic, partitions: [partition_response]}] =
Expand Down Expand Up @@ -855,7 +877,8 @@ defmodule KafkaEx.GenConsumer do
request = %OffsetFetchRequest{
consumer_group: group,
topic: topic,
partition: partition
partition: partition,
api_version: Map.fetch!(state.api_versions, :offset_fetch)
}

[
Expand All @@ -867,6 +890,9 @@ defmodule KafkaEx.GenConsumer do
}
] = KafkaEx.offset_fetch(worker_name, request)

# newer api versions will return -1 if the consumer group does not exist
offset = max(offset, 0)

case error_code do
:no_error ->
%State{
Expand Down
13 changes: 8 additions & 5 deletions lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,8 @@ defmodule KafkaEx.New.Adapter do

%{
request
| # offset_commit_request.generation_id,
generation_id: -1,
# offset_commit_request.member_id,
member_id: "",
| generation_id: offset_commit_request.generation_id,
member_id: offset_commit_request.member_id,
topics: [
%{
topic
Expand All @@ -445,7 +443,12 @@ defmodule KafkaEx.New.Adapter do
}

v when v >= 2 ->
%{request | generation_id: -1, member_id: "", retention_time: -1}
%{
request
| generation_id: offset_commit_request.generation_id,
member_id: offset_commit_request.member_id,
retention_time: -1
}

_ ->
request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do
resp = create_topic(name, config, client)
assert {:topic_already_exists, name} == parse_create_topic_resp(resp)

TestHelper.wait_for(fn ->
{:ok, metadatas} = KafkaExAPI.topics_metadata(client, [name])
length(metadatas) > 0
end)

{:ok, [metadata]} = KafkaExAPI.topics_metadata(client, [name])
assert @num_partitions == length(metadata.partitions)
end
Expand All @@ -48,7 +53,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do
resp = create_topic(name, [], client)
assert {:no_error, name} == parse_create_topic_resp(resp)

{:ok, [_metadata]} = KafkaExAPI.topics_metadata(client, [name])
{:ok, _metadata} = KafkaExAPI.topics_metadata(client, [name])

resp = KafkaEx.delete_topics([name], timeout: 5_000, worker_name: client)
assert {:no_error, name} = parse_delete_topic_resp(resp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do

alias KafkaEx.ConsumerGroup
alias KafkaEx.GenConsumer
alias KafkaEx.New.Client
alias KafkaEx.Protocol.OffsetFetch

# note this topic is created by docker_up.sh
@topic_name "consumer_group_implementation_test"
@topic_name "consumer_group_implementation_test_kayrock"
@partition_count 4
@consumer_group_name "consumer_group_implementation"

Expand Down Expand Up @@ -106,7 +108,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
end

def produce(message, partition) do
KafkaEx.produce(@topic_name, partition, message)
:ok = KafkaEx.produce(@topic_name, partition, message)
message
end

Expand All @@ -123,6 +125,10 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
message.value == expected_message && message.offset == expected_offset
end

def has_timestamp?(message) do
is_integer(message.timestamp) && message.timestamp > 0
end

def sync_stop(pid) when is_pid(pid) do
TestHelper.wait_for(fn ->
if Process.alive?(pid) do
Expand Down Expand Up @@ -157,17 +163,27 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
end

setup do
ports_before = num_open_ports()
{:ok, _} = TestPartitioner.start_link()

{:ok, client_args} = KafkaEx.build_worker_options([])

{:ok, client_pid} = Client.start_link(client_args, :no_name)

# the client will die on its own, so don't count that
ports_before = num_open_ports()

{:ok, @topic_name} =
TestHelper.ensure_append_timestamp_topic(client_pid, @topic_name)

{:ok, consumer_group_pid1} =
ConsumerGroup.start_link(
TestConsumer,
@consumer_group_name,
[@topic_name],
heartbeat_interval: 100,
partition_assignment_callback: &TestPartitioner.assign_partitions/2,
session_timeout_padding: 30000
session_timeout_padding: 30000,
api_versions: %{fetch: 3, offset_fetch: 3, offset_commit: 3}
)

{:ok, consumer_group_pid2} =
Expand All @@ -177,7 +193,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
[@topic_name],
heartbeat_interval: 100,
partition_assignment_callback: &TestPartitioner.assign_partitions/2,
session_timeout_padding: 30000
session_timeout_padding: 30000,
api_versions: %{fetch: 3, offset_fetch: 3, offset_commit: 3}
)

# wait for both consumer groups to join
Expand All @@ -195,7 +212,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
:ok,
consumer_group_pid1: consumer_group_pid1,
consumer_group_pid2: consumer_group_pid2,
ports_before: ports_before
ports_before: ports_before,
client: client_pid
}
end

Expand Down Expand Up @@ -302,6 +320,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
end)

last_message = List.last(TestConsumer.last_message_set(consumer_pid))
# should have a timestamp because we're using fetch v3
assert has_timestamp?(last_message)
{px, last_message.offset}
end)
|> Enum.into(%{})
Expand All @@ -319,7 +339,9 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do
TestHelper.latest_consumer_offset_number(
@topic_name,
px,
@consumer_group_name
@consumer_group_name,
context[:client],
3
)

last_offset = Map.get(last_offsets, px)
Expand All @@ -329,5 +351,18 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do

# ports should be released
assert context[:ports_before] == num_open_ports()

# since we're using v3 for OffsetCommit, we should get an error if we try to
# fetch with v0
[resp] =
KafkaEx.offset_fetch(context[:client], %OffsetFetch.Request{
topic: @topic_name,
consumer_group: @consumer_group_name,
partition: 0,
api_version: 0
})

[partition_resp] = resp.partitions
assert partition_resp.error_code == :unknown_topic_or_partition
end
end
Loading