From 0f58fbe04a50febba3621a12442934cc911733b5 Mon Sep 17 00:00:00 2001 From: Jehan Bruggeman Date: Mon, 22 Oct 2018 10:43:28 +0200 Subject: [PATCH] Check api_version for create_topics to stay compatible 0.10.0.0 --- lib/kafka_ex/protocol/create_topics.ex | 18 ++++++++++++++---- lib/kafka_ex/protocol/metadata.ex | 5 +++++ lib/kafka_ex/server_0_p_10_and_later.ex | 11 +++++++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex index 44531a54..4a37ce51 100644 --- a/lib/kafka_ex/protocol/create_topics.ex +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -1,6 +1,8 @@ defmodule KafkaEx.Protocol.CreateTopics do alias KafkaEx.Protocol + @supported_versions_range {0, 0} + @default_api_version 0 @moduledoc """ Implementation of the Kafka CreateTopics request and response APIs @@ -63,8 +65,14 @@ defmodule KafkaEx.Protocol.CreateTopics do @type t :: %Response{topic_errors: [TopicError]} end - @spec create_request(integer, binary, Request.t) :: binary - def create_request(correlation_id, client_id, create_topics_request) do + def api_version(api_versions) do + KafkaEx.ApiVersions.find_api_version(api_versions, :create_topics, @supported_versions_range) + end + + @spec create_request(integer, binary, Request.t, integer) :: binary + def create_request(correlation_id, client_id, create_topics_request, api_version) + + def create_request(correlation_id, client_id, create_topics_request, 0) do Protocol.create_request(:create_topics, correlation_id, client_id) <> encode_topic_requests(create_topics_request.create_topic_requests) <> << create_topics_request.timeout :: 32-signed >> @@ -129,8 +137,10 @@ defmodule KafkaEx.Protocol.CreateTopics do end end - @spec parse_response(binary) :: [] | Response.t - def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>) do + @spec parse_response(binary, integer) :: [] | Response.t + def parse_response(message, api_version) + + def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>, 0) do %Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)} end diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index ca08ae8c..d790d151 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -101,6 +101,11 @@ defmodule KafkaEx.Protocol.Metadata do def api_version(api_versions) do case KafkaEx.ApiVersions.find_api_version(api_versions, :metadata, @supported_versions_range) do {:ok, version} -> version + # those three should never happen since :metadata is part of the protocol since the beginning. + # they are left here as this will server as reference implementation + # :unknown_message_for_server -> + # :unknown_message_for_client -> + # :no_version_supported -> _ -> @default_api_version end end diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 8a958c4e..dc8ce6c8 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -120,12 +120,19 @@ defmodule KafkaEx.Server0P10AndLater do end def kafka_create_topics(requests, network_timeout, state) do + api_version = case CreateTopics.api_version(state.api_versions) do + {:ok, api_version} -> api_version + _ -> raise "CreateTopic is not supported in this version of Kafka, or the versions supported by the client do not match the ones supported by the server." + end + + IO.puts "API version for create_topics: #{api_version}" + create_topics_request = %CreateTopics.Request{ create_topic_requests: requests, timeout: network_timeout } - mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request) + mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request, api_version) broker = state.brokers |> Enum.find(&(&1.is_controller)) @@ -138,7 +145,7 @@ defmodule KafkaEx.Server0P10AndLater do |> NetworkClient.send_sync_request(mainRequest, config_sync_timeout()) |> case do {:error, reason} -> {:error, reason} - response -> CreateTopics.parse_response(response) + response -> CreateTopics.parse_response(response, api_version) end {response, %{state | correlation_id: state.correlation_id + 1}} end