diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index b071cb7a..f79da66c 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -27,6 +27,17 @@ defmodule KafkaEx.ConsumerGroup do implements the `KafkaEx.GenConsumer` behaviour and start a `KafkaEx.ConsumerGroup` configured to use that module. + The api versions of some of the underlying messages can be specified in the + `:api_versions` option. Note that these will be ignored (api version 0 used) + unless you have `kafka_version: "kayrock"` set in the KafkaEx application + config. The following versions can be specified: + + * `:fetch` - Fetch requests - use v2+ for newer versions of Kafka + * `:offset_fetch` - Offset fetch requests - use v1+ for offsets stored in + Kafka (as opposed to zookeeper) + * `:offset_commit` - Offset commit requests - use v1+ to store offsets in + Kafka (as opposed to zookeeper) + ## Example Suppose we want to consume from a topic called `"example_topic"` with a diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index a7e40953..744ee76e 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -285,7 +285,10 @@ defmodule KafkaEx.ConsumerGroup.Manager do "#{inspect(reason)}" end - Logger.debug(fn -> "Joined consumer group #{group_name}" end) + Logger.debug(fn -> + "Joined consumer group #{group_name} generation " <> + "#{join_response.generation_id} as #{join_response.member_id}" + end) new_state = %State{ state @@ -439,10 +442,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do gen_consumer_module: gen_consumer_module, consumer_opts: consumer_opts, group_name: group_name, + member_id: member_id, + generation_id: generation_id, supervisor_pid: pid } = state, assignments ) do + # add member_id and generation_id to the consumer opts + consumer_opts = + Keyword.merge(consumer_opts, + generation_id: generation_id, + member_id: member_id + ) + {:ok, consumer_supervisor_pid} = ConsumerGroup.start_consumer( pid, diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index b13f3824..15c33051 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -208,6 +208,7 @@ defmodule KafkaEx.GenConsumer do {:commit_interval, non_neg_integer} | {:commit_threshold, non_neg_integer} | {:auto_offset_reset, :none | :earliest | :latest} + | {:api_versions, map()} | {:extra_consumer_args, map()} @typedoc """ @@ -388,12 +389,15 @@ defmodule KafkaEx.GenConsumer do :group, :topic, :partition, + :member_id, + :generation_id, :current_offset, :committed_offset, :acked_offset, :last_commit, :auto_offset_reset, - :fetch_options + :fetch_options, + :api_versions ] end @@ -530,6 +534,13 @@ defmodule KafkaEx.GenConsumer do :extra_consumer_args ) + generation_id = Keyword.get(opts, :generation_id) + member_id = Keyword.get(opts, :member_id) + + default_api_versions = %{fetch: 0, offset_fetch: 0, offset_commit: 0} + api_versions = Keyword.get(opts, :api_versions, %{}) + api_versions = Map.merge(default_api_versions, api_versions) + {:ok, consumer_state} = consumer_module.init(topic, partition, extra_consumer_args) @@ -559,7 +570,10 @@ defmodule KafkaEx.GenConsumer do group: group_name, topic: topic, partition: partition, - fetch_options: fetch_options + generation_id: generation_id, + member_id: member_id, + fetch_options: fetch_options, + api_versions: api_versions } Process.flag(:trap_exit, true) @@ -675,7 +689,10 @@ defmodule KafkaEx.GenConsumer do KafkaEx.fetch( topic, partition, - Keyword.merge(fetch_options, offset: offset) + Keyword.merge(fetch_options, + offset: offset, + api_version: Map.fetch!(state.api_versions, :fetch) + ) ) response @@ -812,6 +829,8 @@ defmodule KafkaEx.GenConsumer do group: group, topic: topic, partition: partition, + member_id: member_id, + generation_id: generation_id, acked_offset: offset } = state ) do @@ -819,7 +838,10 @@ defmodule KafkaEx.GenConsumer do consumer_group: group, topic: topic, partition: partition, - offset: offset + offset: offset, + member_id: member_id, + generation_id: generation_id, + api_version: Map.fetch!(state.api_versions, :offset_fetch) } [%OffsetCommitResponse{topic: ^topic, partitions: [partition_response]}] = @@ -855,7 +877,8 @@ defmodule KafkaEx.GenConsumer do request = %OffsetFetchRequest{ consumer_group: group, topic: topic, - partition: partition + partition: partition, + api_version: Map.fetch!(state.api_versions, :offset_fetch) } [ @@ -867,6 +890,9 @@ defmodule KafkaEx.GenConsumer do } ] = KafkaEx.offset_fetch(worker_name, request) + # newer api versions will return -1 if the consumer group does not exist + offset = max(offset, 0) + case error_code do :no_error -> %State{ diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index 05974add..75e0cd18 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -430,10 +430,8 @@ defmodule KafkaEx.New.Adapter do %{ request - | # offset_commit_request.generation_id, - generation_id: -1, - # offset_commit_request.member_id, - member_id: "", + | generation_id: offset_commit_request.generation_id, + member_id: offset_commit_request.member_id, topics: [ %{ topic @@ -445,7 +443,12 @@ defmodule KafkaEx.New.Adapter do } v when v >= 2 -> - %{request | generation_id: -1, member_id: "", retention_time: -1} + %{ + request + | generation_id: offset_commit_request.generation_id, + member_id: offset_commit_request.member_id, + retention_time: -1 + } _ -> request diff --git a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs index 0611d8a5..9040d91f 100644 --- a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs +++ b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs @@ -38,6 +38,11 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do resp = create_topic(name, config, client) assert {:topic_already_exists, name} == parse_create_topic_resp(resp) + TestHelper.wait_for(fn -> + {:ok, metadatas} = KafkaExAPI.topics_metadata(client, [name]) + length(metadatas) > 0 + end) + {:ok, [metadata]} = KafkaExAPI.topics_metadata(client, [name]) assert @num_partitions == length(metadata.partitions) end @@ -48,7 +53,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do resp = create_topic(name, [], client) assert {:no_error, name} == parse_create_topic_resp(resp) - {:ok, [_metadata]} = KafkaExAPI.topics_metadata(client, [name]) + {:ok, _metadata} = KafkaExAPI.topics_metadata(client, [name]) resp = KafkaEx.delete_topics([name], timeout: 5_000, worker_name: client) assert {:no_error, name} = parse_delete_topic_resp(resp) diff --git a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs index 84221e9e..b55f2090 100644 --- a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs @@ -16,9 +16,11 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do alias KafkaEx.ConsumerGroup alias KafkaEx.GenConsumer + alias KafkaEx.New.Client + alias KafkaEx.Protocol.OffsetFetch # note this topic is created by docker_up.sh - @topic_name "consumer_group_implementation_test" + @topic_name "consumer_group_implementation_test_kayrock" @partition_count 4 @consumer_group_name "consumer_group_implementation" @@ -106,7 +108,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do end def produce(message, partition) do - KafkaEx.produce(@topic_name, partition, message) + :ok = KafkaEx.produce(@topic_name, partition, message) message end @@ -123,6 +125,10 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do message.value == expected_message && message.offset == expected_offset end + def has_timestamp?(message) do + is_integer(message.timestamp) && message.timestamp > 0 + end + def sync_stop(pid) when is_pid(pid) do TestHelper.wait_for(fn -> if Process.alive?(pid) do @@ -157,9 +163,18 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do end setup do - ports_before = num_open_ports() {:ok, _} = TestPartitioner.start_link() + {:ok, client_args} = KafkaEx.build_worker_options([]) + + {:ok, client_pid} = Client.start_link(client_args, :no_name) + + # the client will die on its own, so don't count that + ports_before = num_open_ports() + + {:ok, @topic_name} = + TestHelper.ensure_append_timestamp_topic(client_pid, @topic_name) + {:ok, consumer_group_pid1} = ConsumerGroup.start_link( TestConsumer, @@ -167,7 +182,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do [@topic_name], heartbeat_interval: 100, partition_assignment_callback: &TestPartitioner.assign_partitions/2, - session_timeout_padding: 30000 + session_timeout_padding: 30000, + api_versions: %{fetch: 3, offset_fetch: 3, offset_commit: 3} ) {:ok, consumer_group_pid2} = @@ -177,7 +193,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do [@topic_name], heartbeat_interval: 100, partition_assignment_callback: &TestPartitioner.assign_partitions/2, - session_timeout_padding: 30000 + session_timeout_padding: 30000, + api_versions: %{fetch: 3, offset_fetch: 3, offset_commit: 3} ) # wait for both consumer groups to join @@ -195,7 +212,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do :ok, consumer_group_pid1: consumer_group_pid1, consumer_group_pid2: consumer_group_pid2, - ports_before: ports_before + ports_before: ports_before, + client: client_pid } end @@ -302,6 +320,8 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do end) last_message = List.last(TestConsumer.last_message_set(consumer_pid)) + # should have a timestamp because we're using fetch v3 + assert has_timestamp?(last_message) {px, last_message.offset} end) |> Enum.into(%{}) @@ -319,7 +339,9 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do TestHelper.latest_consumer_offset_number( @topic_name, px, - @consumer_group_name + @consumer_group_name, + context[:client], + 3 ) last_offset = Map.get(last_offsets, px) @@ -329,5 +351,18 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do # ports should be released assert context[:ports_before] == num_open_ports() + + # since we're using v3 for OffsetCommit, we should get an error if we try to + # fetch with v0 + [resp] = + KafkaEx.offset_fetch(context[:client], %OffsetFetch.Request{ + topic: @topic_name, + consumer_group: @consumer_group_name, + partition: 0, + api_version: 0 + }) + + [partition_resp] = resp.partitions + assert partition_resp.error_code == :unknown_topic_or_partition end end diff --git a/test/integration/kayrock/timestamp_test.exs b/test/integration/kayrock/timestamp_test.exs index 6242f056..c343b23a 100644 --- a/test/integration/kayrock/timestamp_test.exs +++ b/test/integration/kayrock/timestamp_test.exs @@ -6,7 +6,6 @@ defmodule KafkaEx.KayrockTimestampTest do use ExUnit.Case alias KafkaEx.New.Client - alias KafkaEx.New.NodeSelector alias KafkaEx.TimestampNotSupportedError require Logger @@ -21,45 +20,6 @@ defmodule KafkaEx.KayrockTimestampTest do {:ok, %{client: pid}} end - defp ensure_append_timestamp_topic(client) do - topic_name = "test_log_append_timestamp" - - resp = - Client.send_request( - client, - %Kayrock.CreateTopics.V0.Request{ - create_topic_requests: [ - %{ - topic: topic_name, - num_partitions: 4, - replication_factor: 1, - replica_assignment: [], - config_entries: [ - %{ - config_name: "message.timestamp.type", - config_value: "LogAppendTime" - } - ] - } - ], - timeout: 1000 - }, - NodeSelector.controller() - ) - - {:ok, - %Kayrock.CreateTopics.V0.Response{ - topic_errors: [%{error_code: error_code}] - }} = resp - - unless error_code in [0, 36] do - Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") - assert false - end - - topic_name - end - test "fetch timestamp is nil by default on v0 messages", %{client: client} do topic = "food" msg = TestHelper.generate_random_string() @@ -151,7 +111,11 @@ defmodule KafkaEx.KayrockTimestampTest do end test "log with append time - v0", %{client: client} do - topic = ensure_append_timestamp_topic(client) + {:ok, topic} = + TestHelper.ensure_append_timestamp_topic( + client, + "test_log_append_timestamp" + ) msg = TestHelper.generate_random_string() @@ -182,7 +146,11 @@ defmodule KafkaEx.KayrockTimestampTest do end test "log with append time - v3", %{client: client} do - topic = ensure_append_timestamp_topic(client) + {:ok, topic} = + TestHelper.ensure_append_timestamp_topic( + client, + "test_log_append_timestamp" + ) msg = TestHelper.generate_random_string() @@ -214,7 +182,11 @@ defmodule KafkaEx.KayrockTimestampTest do end test "log with append time - v5", %{client: client} do - topic = ensure_append_timestamp_topic(client) + {:ok, topic} = + TestHelper.ensure_append_timestamp_topic( + client, + "test_log_append_timestamp" + ) msg = TestHelper.generate_random_string() diff --git a/test/integration/server0_p_10_and_later_test.exs b/test/integration/server0_p_10_and_later_test.exs index b8e32737..8ee52953 100644 --- a/test/integration/server0_p_10_and_later_test.exs +++ b/test/integration/server0_p_10_and_later_test.exs @@ -20,7 +20,9 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do resp = create_topic(name, config) assert {:topic_already_exists, name} == parse_create_topic_resp(resp) - assert Enum.member?(existing_topics(), name) + wait_for(fn -> + Enum.member?(existing_topics(), name) + end) assert @num_partitions == KafkaEx.Protocol.Metadata.Response.partitions_for_topic( diff --git a/test/test_helper.exs b/test/test_helper.exs index feaf2554..e36e8c86 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -13,6 +13,10 @@ ExUnit.configure( ) defmodule TestHelper do + alias KafkaEx.New.Client + alias KafkaEx.New.NodeSelector + require Logger + def generate_random_string(string_length \\ 20) do 1..string_length |> Enum.map(fn _ -> round(:rand.uniform() * 25 + 65) end) @@ -69,16 +73,55 @@ defmodule TestHelper do topic, partition, consumer_group, - worker \\ :kafka_ex + worker \\ :kafka_ex, + api_version \\ 0 ) do request = %KafkaEx.Protocol.OffsetFetch.Request{ topic: topic, partition: partition, - consumer_group: consumer_group + consumer_group: consumer_group, + api_version: api_version } - KafkaEx.offset_fetch(worker, request) - |> KafkaEx.Protocol.OffsetFetch.Response.last_offset() + resp = KafkaEx.offset_fetch(worker, request) + resp |> KafkaEx.Protocol.OffsetFetch.Response.last_offset() + end + + def ensure_append_timestamp_topic(client, topic_name) do + resp = + Client.send_request( + client, + %Kayrock.CreateTopics.V0.Request{ + create_topic_requests: [ + %{ + topic: topic_name, + num_partitions: 4, + replication_factor: 1, + replica_assignment: [], + config_entries: [ + %{ + config_name: "message.timestamp.type", + config_value: "LogAppendTime" + } + ] + } + ], + timeout: 1000 + }, + NodeSelector.controller() + ) + + {:ok, + %Kayrock.CreateTopics.V0.Response{ + topic_errors: [%{error_code: error_code}] + }} = resp + + if error_code in [0, 36] do + {:ok, topic_name} + else + Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") + {:error, topic_name} + end end defp first_partition_offset(:topic_not_found) do