diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index 6f4c7ea7..b13f3824 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -822,9 +822,17 @@ defmodule KafkaEx.GenConsumer do offset: offset } - [%OffsetCommitResponse{topic: ^topic, partitions: [^partition]}] = + [%OffsetCommitResponse{topic: ^topic, partitions: [partition_response]}] = KafkaEx.offset_commit(worker_name, request) + # one of these needs to match, depending on which client + case partition_response do + # old client + ^partition -> :ok + # new client + %{error_code: :no_error, partition: ^partition} -> :ok + end + Logger.debug(fn -> "Committed offset #{topic}/#{partition}@#{offset} for #{group}" end) diff --git a/lib/kafka_ex/new/adapter.ex b/lib/kafka_ex/new/adapter.ex index b520999b..05974add 100644 --- a/lib/kafka_ex/new/adapter.ex +++ b/lib/kafka_ex/new/adapter.ex @@ -343,18 +343,26 @@ defmodule KafkaEx.New.Adapter do } end - def offset_fetch_request(request, client_consumer_group) do - consumer_group = request.consumer_group || client_consumer_group - - {%Kayrock.OffsetFetch.V0.Request{ - group_id: consumer_group, - topics: [ - %{topic: request.topic, partitions: [%{partition: request.partition}]} - ] + def offset_fetch_request(offset_fetch_request, client_consumer_group) do + consumer_group = + offset_fetch_request.consumer_group || client_consumer_group + + request = + Kayrock.OffsetFetch.get_request_struct(offset_fetch_request.api_version) + + {%{ + request + | group_id: consumer_group, + topics: [ + %{ + topic: offset_fetch_request.topic, + partitions: [%{partition: offset_fetch_request.partition}] + } + ] }, consumer_group} end - def offset_fetch_response(%Kayrock.OffsetFetch.V0.Response{ + def offset_fetch_response(%{ responses: [ %{ topic: topic, @@ -384,32 +392,77 @@ defmodule KafkaEx.New.Adapter do ] end - def offset_commit_request(request, client_consumer_group) do - consumer_group = request.consumer_group || client_consumer_group - - {%Kayrock.OffsetCommit.V0.Request{ - group_id: consumer_group, - topics: [ - %{ - topic: request.topic, - partitions: [ - %{ - partition: request.partition, - offset: request.offset, - metadata: "" - } - ] - } - ] - }, consumer_group} + def offset_commit_request(offset_commit_request, client_consumer_group) do + consumer_group = + offset_commit_request.consumer_group || client_consumer_group + + request = + Kayrock.OffsetCommit.get_request_struct(offset_commit_request.api_version) + + request = %{ + request + | group_id: consumer_group, + topics: [ + %{ + topic: offset_commit_request.topic, + partitions: [ + %{ + partition: offset_commit_request.partition, + offset: offset_commit_request.offset, + metadata: "" + } + ] + } + ] + } + + request = + case offset_commit_request.api_version do + 1 -> + timestamp = + case offset_commit_request.timestamp do + t when t > 0 -> t + _ -> millis_timestamp_now() + end + + [topic] = request.topics + [partition] = topic.partitions + + %{ + request + | # offset_commit_request.generation_id, + generation_id: -1, + # offset_commit_request.member_id, + member_id: "", + topics: [ + %{ + topic + | partitions: [ + Map.put_new(partition, :timestamp, timestamp) + ] + } + ] + } + + v when v >= 2 -> + %{request | generation_id: -1, member_id: "", retention_time: -1} + + _ -> + request + end + + {request, consumer_group} end - def offset_commit_response(%Kayrock.OffsetCommit.V0.Response{ + @spec offset_commit_response(%{ + responses: [%{partition_responses: [...], topic: any}, ...] + }) :: [KafkaEx.Protocol.OffsetCommit.Response.t(), ...] + def offset_commit_response(%{ responses: [ %{ topic: topic, partition_responses: [ - %{partition: partition} + %{partition: partition, error_code: error_code} ] } ] @@ -418,7 +471,12 @@ defmodule KafkaEx.New.Adapter do [ %OffsetCommitResponse{ topic: topic, - partitions: [partition] + partitions: [ + %{ + partition: partition, + error_code: Kayrock.ErrorCode.code_to_atom(error_code) + } + ] } ] end @@ -591,4 +649,8 @@ defmodule KafkaEx.New.Adapter do defp minus_one_if_nil(nil), do: -1 defp minus_one_if_nil(x), do: x + + defp millis_timestamp_now do + :os.system_time(:millisecond) + end end diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index 75c8b617..22517731 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -579,23 +579,6 @@ defmodule KafkaEx.New.Client do end, state} end - defp get_send_request_function( - %NodeSelector{strategy: :controller}, - state, - network_timeout, - _synchronous - ) do - {:ok, broker} = State.select_broker(state, NodeSelector.controller()) - - {fn wire_request -> - NetworkClient.send_sync_request( - broker, - wire_request, - network_timeout - ) - end, state} - end - defp get_send_request_function( %NodeSelector{ strategy: :topic_partition, @@ -656,6 +639,23 @@ defmodule KafkaEx.New.Client do end, updated_state} end + defp get_send_request_function( + %NodeSelector{} = node_selector, + state, + network_timeout, + _synchronous + ) do + {:ok, broker} = State.select_broker(state, node_selector) + + {fn wire_request -> + NetworkClient.send_sync_request( + broker, + wire_request, + network_timeout + ) + end, state} + end + defp deserialize(data, request) do try do deserializer = Kayrock.Request.response_deserializer(request) diff --git a/lib/kafka_ex/protocol/offset_commit.ex b/lib/kafka_ex/protocol/offset_commit.ex index c72ba53f..b2ba9407 100644 --- a/lib/kafka_ex/protocol/offset_commit.ex +++ b/lib/kafka_ex/protocol/offset_commit.ex @@ -11,20 +11,29 @@ defmodule KafkaEx.Protocol.OffsetCommit do topic: nil, partition: nil, offset: nil, - metadata: "" + metadata: "", + # NOTE api_version, generation_id, member_id, and timestamp only used in new client + api_version: 0, + generation_id: -1, + member_id: "kafkaex", + timestamp: 0 @type t :: %Request{ consumer_group: binary, topic: binary, partition: integer, - offset: integer + offset: integer, + api_version: integer, + generation_id: integer, + member_id: binary, + timestamp: integer } end defmodule Response do @moduledoc false defstruct partitions: [], topic: nil - @type t :: %Response{partitions: [] | [integer], topic: binary} + @type t :: %Response{partitions: [] | [integer] | [map], topic: binary} end @spec create_request(integer, binary, Request.t()) :: binary diff --git a/lib/kafka_ex/protocol/offset_fetch.ex b/lib/kafka_ex/protocol/offset_fetch.ex index efa585e8..45a8e382 100644 --- a/lib/kafka_ex/protocol/offset_fetch.ex +++ b/lib/kafka_ex/protocol/offset_fetch.ex @@ -8,12 +8,17 @@ defmodule KafkaEx.Protocol.OffsetFetch do defmodule Request do @moduledoc false - defstruct consumer_group: nil, topic: nil, partition: nil + defstruct consumer_group: nil, + topic: nil, + partition: nil, + # NOTE api_version only used in new client + api_version: 0 @type t :: %Request{ consumer_group: nil | binary, topic: binary, - partition: integer + partition: integer, + api_version: integer } end @@ -68,14 +73,19 @@ defmodule KafkaEx.Protocol.OffsetFetch do partitions, topic ) do - parse_partitions(partitions_size - 1, rest, [ - %{ - partition: partition, - offset: offset, - metadata: metadata, - error_code: Protocol.error(error_code) - } - | partitions - ], topic) + parse_partitions( + partitions_size - 1, + rest, + [ + %{ + partition: partition, + offset: offset, + metadata: metadata, + error_code: Protocol.error(error_code) + } + | partitions + ], + topic + ) end end diff --git a/test/integration/kayrock/compatibility_consumer_group_test.exs b/test/integration/kayrock/compatibility_consumer_group_test.exs index a9123f44..0ce80cfc 100644 --- a/test/integration/kayrock/compatibility_consumer_group_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_test.exs @@ -219,7 +219,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do ) == [ %Proto.OffsetCommit.Response{ - partitions: [0], + partitions: [%{error_code: :no_error, partition: 0}], topic: random_string } ] diff --git a/test/integration/kayrock/offset_test.exs b/test/integration/kayrock/offset_test.exs new file mode 100644 index 00000000..f5181adc --- /dev/null +++ b/test/integration/kayrock/offset_test.exs @@ -0,0 +1,363 @@ +defmodule KafkaEx.KayrockOffsetTest do + @moduledoc """ + Tests for offset commit and fetch API versioning + """ + + use ExUnit.Case + + alias KafkaEx.New.Client + alias KafkaEx.Protocol.OffsetCommit + alias KafkaEx.Protocol.OffsetFetch + + @moduletag :new_client + + setup do + {:ok, args} = KafkaEx.build_worker_options([]) + + {:ok, pid} = Client.start_link(args, :no_name) + + {:ok, %{client: pid}} + end + + test "offset commit v0 and fetch v0", %{client: client} do + topic = "food" + consumer_group = "commit_v0_fetch_v0" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 0 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 0 + }) + + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{error_code: :no_error, offset: got_offset, partition: 0} + ] + } = resp + + assert got_offset == offset + end + + test "offset commit v1 and fetch v0", %{client: client} do + topic = "food" + consumer_group = "commit_v1_fetch_v0" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 1 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 0 + }) + + # we get an error code when we commit with v > 0 and fetch with v0 + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{ + error_code: :unknown_topic_or_partition, + offset: -1, + partition: 0 + } + ] + } = resp + end + + test "offset commit v1 and fetch v1", %{client: client} do + topic = "food" + consumer_group = "commit_v1_fetch_v1" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 1 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 1 + }) + + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{error_code: :no_error, offset: got_offset, partition: 0} + ] + } = resp + + assert got_offset == offset + end + + test "offset commit v0 and fetch v1", %{client: client} do + topic = "food" + consumer_group = "commit_v0_fetch_v1" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 0 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 1 + }) + + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{error_code: :no_error, offset: got_offset, partition: 0} + ] + } = resp + + # can't commit with v0 and fetch with v > 0 + assert got_offset == -1 + end + + test "offset commit v0 and fetch v2", %{client: client} do + topic = "food" + consumer_group = "commit_v0_fetch_v2" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 0 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 2 + }) + + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{error_code: :no_error, offset: got_offset, partition: 0} + ] + } = resp + + # can't commit with v0 and fetch with v > 0 + assert got_offset == -1 + end + + test "offset commit v0 and fetch v3", %{client: client} do + topic = "food" + consumer_group = "commit_v0_fetch_v3" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 0 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 3 + }) + + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{error_code: :no_error, offset: got_offset, partition: 0} + ] + } = resp + + # can't commit with v0 and fetch with v > 0 + assert got_offset == -1 + end + + test "offset commit v2 and fetch v2", %{client: client} do + topic = "food" + consumer_group = "commit_v2_fetch_v2" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 2 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 2 + }) + + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{error_code: :no_error, offset: got_offset, partition: 0} + ] + } = resp + + assert got_offset == offset + end + + test "offset commit v3 and fetch v3", %{client: client} do + topic = "food" + consumer_group = "commit_v3_fetch_v3" + msg = TestHelper.generate_random_string() + + {:ok, offset} = + KafkaEx.produce( + topic, + 0, + msg, + worker_name: client, + required_acks: 1 + ) + + [resp] = + KafkaEx.offset_commit(client, %OffsetCommit.Request{ + consumer_group: consumer_group, + topic: topic, + partition: 0, + offset: offset, + api_version: 3 + }) + + %OffsetCommit.Response{partitions: [%{error_code: :no_error}]} = resp + + [resp] = + KafkaEx.offset_fetch(client, %OffsetFetch.Request{ + topic: topic, + consumer_group: consumer_group, + partition: 0, + api_version: 3 + }) + + %KafkaEx.Protocol.OffsetFetch.Response{ + partitions: [ + %{error_code: :no_error, offset: got_offset, partition: 0} + ] + } = resp + + assert got_offset == offset + end +end