diff --git a/config/config.exs b/config/config.exs index 5aaac334..f18b1408 100644 --- a/config/config.exs +++ b/config/config.exs @@ -54,7 +54,7 @@ config :kafka_ex, ], # set this to the version of the kafka broker that you are using # include only major.minor.patch versions. must be at least 0.8.0 - kafka_version: "0.9.0" + kafka_version: "0.10.1" env_config = Path.expand("#{Mix.env}.exs", __DIR__) if File.exists?(env_config) do diff --git a/docker-compose.yml b/docker-compose.yml index 45ea1d72..57ffc642 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,32 +5,32 @@ services: ports: - "2181:2181" kafka1: - image: wurstmeister/kafka:0.9.0.1 + image: wurstmeister/kafka:0.10.1.0 ports: - "9092:9092" depends_on: - zookeeper volumes: - - ./kafka1/server.properties.in:/opt/kafka_2.11-0.9.0.1/config/server.properties.in + - ./kafka1/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in - ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh - ./ssl:/ssl kafka2: - image: wurstmeister/kafka:0.9.0.1 + image: wurstmeister/kafka:0.10.1.0 ports: - "9093:9093" depends_on: - zookeeper volumes: - - ./kafka2/server.properties.in:/opt/kafka_2.11-0.9.0.1/config/server.properties.in + - ./kafka2/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in - ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh - ./ssl:/ssl kafka3: - image: wurstmeister/kafka:0.9.0.1 + image: wurstmeister/kafka:0.10.1.0 ports: - "9094:9094" depends_on: - zookeeper volumes: - - ./kafka3/server.properties.in:/opt/kafka_2.11-0.9.0.1/config/server.properties.in + - ./kafka3/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in - ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh - ./ssl:/ssl diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 1a3994f7..d1e8af70 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -527,6 +527,23 @@ defmodule KafkaEx do def valid_consumer_group?(b) when is_binary(b), do: byte_size(b) > 0 def valid_consumer_group?(_), do: false + + @doc """ + + ## Example + + ```elixir + iex> KafkaEx.create_worker(:mt) + + ``` + """ + @spec create_topics(String.t) :: CreateTopics.Response.t + def create_topics(topic_name, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker) + # topic = Keyword.get(opts, :topic, "") + Server.call(worker_name, {:create_topics, topic_name}) + end + #OTP API def start(_type, _args) do max_restarts = Application.get_env(:kafka_ex, :max_restarts, 10) diff --git a/lib/kafka_ex/config.ex b/lib/kafka_ex/config.ex index 551bed60..bc12d879 100644 --- a/lib/kafka_ex/config.ex +++ b/lib/kafka_ex/config.ex @@ -74,6 +74,7 @@ defmodule KafkaEx.Config do defp server("0.8.0"), do: KafkaEx.Server0P8P0 defp server("0.8.2"), do: KafkaEx.Server0P8P2 + defp server("0.10.1"), do: KafkaEx.Server0P10P1 defp server(_), do: KafkaEx.Server0P9P0 # ssl_options should be an empty list by default if use_ssl is false diff --git a/lib/kafka_ex/protocol.ex b/lib/kafka_ex/protocol.ex index 00d1db5c..6b345d44 100644 --- a/lib/kafka_ex/protocol.ex +++ b/lib/kafka_ex/protocol.ex @@ -12,6 +12,9 @@ defmodule KafkaEx.Protocol do @heartbeat_request 12 @leave_group_request 13 @sync_group_request 14 + @create_topics_request 19 + # DescribeConfigs 32 + # AlterConfigs 33 Valid resource types are "Topic" and "Broker". @api_version 0 @@ -59,8 +62,16 @@ defmodule KafkaEx.Protocol do @sync_group_request end + defp api_key(:create_topics) do + @create_topics_request + end + def create_request(type, correlation_id, client_id) do - << api_key(type) :: 16, @api_version :: 16, correlation_id :: 32, + create_request(type, correlation_id, client_id, @api_version) + end + + def create_request(type, correlation_id, client_id, api_version) do + << api_key(type) :: 16, api_version :: 16, correlation_id :: 32, byte_size(client_id) :: 16, client_id :: binary >> end diff --git a/lib/kafka_ex/protocol/create_topics.ex b/lib/kafka_ex/protocol/create_topics.ex new file mode 100644 index 00000000..c54dfd7a --- /dev/null +++ b/lib/kafka_ex/protocol/create_topics.ex @@ -0,0 +1,146 @@ + +defmodule KafkaEx.Protocol.CreateTopics do + alias KafkaEx.Protocol + + @moduledoc """ + Implementation of the Kafka CreateTopics request and response APIs + """ + + # CreateTopics Request (Version: 0) => [create_topic_requests] timeout + # create_topic_requests => topic num_partitions replication_factor [replica_assignment] [config_entries] + # topic => STRING + # num_partitions => INT32 + # replication_factor => INT16 + # replica_assignment => partition [replicas] + # partition => INT32 + # replicas => INT32 + # config_entries => config_name config_value + # config_name => STRING + # config_value => NULLABLE_STRING + # timeout => INT32 + + defmodule ReplicaAssignment do + defstruct partition: nil, replicas: nil + @type t :: %ReplicaAssignment{ partition: integer, replicas: [integer] } + end + + defmodule ConfigEntry do + defstruct config_name: nil, config_value: nil + @type t :: %ConfigEntry{ config_name: binary, config_value: binary } + end + + defmodule TopicRequest do + defstruct topic: nil, + num_partitions: nil, + replication_factor: nil, + replica_assignment: nil, + config_entries: nil + @type t :: %TopicRequest{ + topic: binary, + num_partitions: integer, + replication_factor: integer, + replica_assignment: [ReplicaAssignment], + config_entries: [ConfigEntry], + } + end + + defmodule Request do + @moduledoc false + defstruct create_topic_requests: nil, timeout: nil + @type t :: %Request{create_topic_requests: [TopicRequest], timeout: integer} + end + + defmodule TopicError do + defstruct topic_name: nil, error_code: nil + @type t :: %TopicError{ topic_name: binary, error_code: integer } + end + + defmodule Response do + @moduledoc false + defstruct topic_errors: nil + @type t :: %Response{topic_errors: [TopicError]} + end + + @spec create_request(integer, binary, Request.t) :: binary + def create_request(correlation_id, client_id, create_topics_request) do + Protocol.create_request(:create_topics, correlation_id, client_id) <> + encode_topic_requests(create_topics_request.create_topic_requests) <> + << create_topics_request.timeout :: 32-signed >> + end + + @spec encode_topic_requests([TopicRequest.t]) :: binary + defp encode_topic_requests(requests) do + requests + |> map_encode(&encode_topic_request/1) + end + + @spec encode_topic_request(TopicRequest.t) :: binary + defp encode_topic_request(request) do + encode_string(request.topic) <> + << request.num_partitions :: 32-signed, request.replication_factor :: 16-signed >> <> + encode_replica_assignments(request.replica_assignment) <> + encode_config_entries(request.config_entries) + end + + + @spec encode_replica_assignments([ReplicaAssignment.t]) :: binary + defp encode_replica_assignments(replica_assignments) do + replica_assignments |> map_encode(&encode_replica_assignment/1) + end + + @spec encode_replica_assignment(ReplicaAssignment.t) :: binary + defp encode_replica_assignment(replica_assignment) do + << replica_assignment.partition :: 32-signed >> <> + replica_assignment.replicas |> map_encode(&(<< &1 :: 32-signed >>)) + end + + @spec encode_config_entries([ConfigEntry.t]) :: binary + defp encode_config_entries(config_entries) do + config_entries |> map_encode(&encode_config_entry/1) + end + + @spec encode_config_entry(ConfigEntry.t) :: binary + defp encode_config_entry(config_entry) do + encode_string(config_entry.config_name) <> encode_nullable_string(config_entry.config_value) + end + + @spec encode_nullable_string(String.t) :: binary + defp encode_nullable_string(text) do + if text == nil do + << -1 :: 16-signed >> + else + encode_string(text) + end + end + + @spec encode_string(String.t) :: binary + defp encode_string(text) do + << byte_size(text) :: 16-signed, text :: binary, >> + end + + defp map_encode(elems, function) do + if nil == elems or 0 == length(elems) do + << 0 :: 32-signed >> + else + << length(elems) :: 32-signed >> <> + (elems + |> Enum.map(function) + |> Enum.reduce(&(&1 <> &2))) + end + + end + + @spec parse_response(binary) :: [] | Response.t + def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>) do + %Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)} + end + + defp parse_topic_errors(0, _), do: [] + + @spec parse_topic_errors(integer, binary) :: [TopicError.t] + defp parse_topic_errors(topic_errors_count, + << topic_name_size :: 16-signed, topic_name :: size(topic_name_size)-binary, error_code :: 16-signed, rest :: binary >>) do + [%TopicError{topic_name: topic_name, error_code: error_code} | parse_topic_errors(topic_errors_count - 1, rest)] + end + +end diff --git a/lib/kafka_ex/protocol/metadata.ex b/lib/kafka_ex/protocol/metadata.ex index da38033d..b108064d 100644 --- a/lib/kafka_ex/protocol/metadata.ex +++ b/lib/kafka_ex/protocol/metadata.ex @@ -2,6 +2,8 @@ defmodule KafkaEx.Protocol.Metadata do alias KafkaEx.Protocol import KafkaEx.Protocol.Common + @default_api_version 0 + @moduledoc """ Implementation of the Kafka Hearbeat request and response APIs """ @@ -16,8 +18,8 @@ defmodule KafkaEx.Protocol.Metadata do alias KafkaEx.Socket - defstruct node_id: -1, host: "", port: 0, socket: nil - @type t :: %__MODULE__{} + defstruct node_id: -1, host: "", port: 0, socket: nil, is_controller: nil + @type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean } def connected?(%Broker{} = broker) do broker.socket != nil && Socket.open?(broker.socket) @@ -28,10 +30,11 @@ defmodule KafkaEx.Protocol.Metadata do @moduledoc false alias KafkaEx.Protocol.Metadata.Broker alias KafkaEx.Protocol.Metadata.TopicMetadata - defstruct brokers: [], topic_metadatas: [] + defstruct brokers: [], topic_metadatas: [], controller_id: nil @type t :: %Response{ brokers: [Broker.t], - topic_metadatas: [TopicMetadata.t] + topic_metadatas: [TopicMetadata.t], + controller_id: integer } def broker_for_topic(metadata, brokers, topic, partition) do @@ -73,10 +76,11 @@ defmodule KafkaEx.Protocol.Metadata do defmodule TopicMetadata do @moduledoc false alias KafkaEx.Protocol.Metadata.PartitionMetadata - defstruct error_code: 0, topic: nil, partition_metadatas: [] + defstruct error_code: 0, topic: nil, is_internal: nil, partition_metadatas: [] @type t :: %TopicMetadata{ error_code: integer | :no_error, topic: nil | binary, + is_internal: nil | boolean, partition_metadatas: [PartitionMetadata.t] } end @@ -93,18 +97,44 @@ defmodule KafkaEx.Protocol.Metadata do } end - def create_request(correlation_id, client_id, ""), do: KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id) <> << 0 :: 32-signed >> + def valid_api_version(v) do + case v do + nil -> @default_api_version + v -> v + end + end + + def create_request(correlation_id, client_id, ""), do: create_request(correlation_id, client_id, "", nil) + def create_request(correlation_id, client_id, topics) when is_list(topics), do: create_request(correlation_id, client_id, topics, nil) - def create_request(correlation_id, client_id, topic) when is_binary(topic), do: create_request(correlation_id, client_id, [topic]) + def create_request(correlation_id, client_id, "", api_version) do + version = valid_api_version(api_version) + topic_count = if 0 == version, do: 0, else: -1 + KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, version) <> << topic_count :: 32-signed >> + end + def create_request(correlation_id, client_id, topic, api_version) when is_binary(topic), do: create_request(correlation_id, client_id, [topic], valid_api_version(api_version)) + + def create_request(correlation_id, client_id, topics, api_version) when is_list(topics) do + KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, valid_api_version(api_version)) <> << length(topics) :: 32-signed, topic_data(topics) :: binary >> + end - def create_request(correlation_id, client_id, topics) when is_list(topics) do - KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id) <> << length(topics) :: 32-signed, topic_data(topics) :: binary >> + def parse_response(data) do + parse_response(data, nil) end - def parse_response(<< _correlation_id :: 32-signed, brokers_size :: 32-signed, rest :: binary >>) do - {brokers, rest} = parse_brokers(brokers_size, rest, []) - << topic_metadatas_size :: 32-signed, rest :: binary >> = rest - %Response{brokers: brokers, topic_metadatas: parse_topic_metadatas(topic_metadatas_size, rest)} + def parse_response(<< _correlation_id :: 32-signed, brokers_size :: 32-signed, rest :: binary >>, api_version) do + version = valid_api_version(api_version) + case version do + 1 -> + {brokers, rest} = parse_brokers_v1(brokers_size, rest, []) + << controller_id :: 32-signed, rest :: binary >> = rest + << topic_metadatas_size :: 32-signed, rest :: binary >> = rest + %Response{brokers: brokers, controller_id: controller_id, topic_metadatas: parse_topic_metadatas_v1(topic_metadatas_size, rest)} + 0 -> + {brokers, rest} = parse_brokers(brokers_size, rest, []) + << topic_metadatas_size :: 32-signed, rest :: binary >> = rest + %Response{brokers: brokers, topic_metadatas: parse_topic_metadatas(topic_metadatas_size, rest)} + end end defp parse_brokers(0, rest, brokers), do: {brokers, rest} @@ -113,6 +143,32 @@ defmodule KafkaEx.Protocol.Metadata do parse_brokers(brokers_size - 1, rest, [%Broker{node_id: node_id, host: host, port: port} | brokers]) end + defp parse_brokers_v1(0, rest, brokers), do: {brokers, rest} + + defp parse_brokers_v1(brokers_size, << + node_id :: 32-signed, + host_len :: 16-signed, + host :: size(host_len)-binary, + port :: 32-signed, + # rack is nullable + -1 :: 16-signed, + rest :: binary + >>, brokers) do + parse_brokers_v1(brokers_size - 1, rest, [%Broker{node_id: node_id, host: host, port: port} | brokers]) + end + + defp parse_brokers_v1(brokers_size, << + node_id :: 32-signed, + host_len :: 16-signed, + host :: size(host_len)-binary, + port :: 32-signed, + rack_len :: 16-signed, + _rack :: size(rack_len)-binary, + rest :: binary + >>, brokers) do + parse_brokers_v1(brokers_size - 1, rest, [%Broker{node_id: node_id, host: host, port: port} | brokers]) + end + defp parse_topic_metadatas(0, _), do: [] defp parse_topic_metadatas(topic_metadatas_size, << error_code :: 16-signed, topic_len :: 16-signed, topic :: size(topic_len)-binary, partition_metadatas_size :: 32-signed, rest :: binary >>) do @@ -120,6 +176,22 @@ defmodule KafkaEx.Protocol.Metadata do [%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas} | parse_topic_metadatas(topic_metadatas_size - 1, rest)] end + defp parse_topic_metadatas_v1(0, _), do: [] + + defp parse_topic_metadatas_v1( + topic_metadatas_size, + << error_code :: 16-signed, + topic_len :: 16-signed, + topic :: size(topic_len)-binary, + # booleans are actually 8-signed + is_internal :: 8-signed, + partition_metadatas_size :: 32-signed, + rest :: binary >>) do + {partition_metadatas, rest} = parse_partition_metadatas(partition_metadatas_size, [], rest) + [%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1 } | + parse_topic_metadatas_v1(topic_metadatas_size - 1, rest)] + end + defp parse_partition_metadatas(0, partition_metadatas, rest), do: {partition_metadatas, rest} defp parse_partition_metadatas(partition_metadatas_size, partition_metadatas, << error_code :: 16-signed, partition_id :: 32-signed, leader :: 32-signed, rest :: binary >>) do diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 4a006581..d94f0052 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -260,6 +260,10 @@ defmodule KafkaEx.Server do kafka_server_heartbeat(request, network_timeout, state) end + def handle_call({:create_topics, topic_name}, _from, state) do + kafka_create_topics(topic_name, state) + end + def handle_info(:update_metadata, state) do kafka_server_update_metadata(state) end @@ -367,9 +371,11 @@ defmodule KafkaEx.Server do {:noreply, update_metadata(state)} end - def update_metadata(state) do - {correlation_id, metadata} = retrieve_metadata(state.brokers, state.correlation_id, config_sync_timeout()) - metadata_brokers = metadata.brokers + def update_metadata(state), do: update_metadata(state, nil) + + def update_metadata(state, api_version) do + {correlation_id, metadata} = retrieve_metadata_with_version(state.brokers, state.correlation_id, config_sync_timeout(), api_version) + metadata_brokers = metadata.brokers |> Enum.map(&(%{&1 | is_controller: &1.node_id == metadata.controller_id})) brokers = state.brokers |> remove_stale_brokers(metadata_brokers) |> add_new_brokers(metadata_brokers, state.ssl_options, state.use_ssl) @@ -378,14 +384,24 @@ defmodule KafkaEx.Server do # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity def retrieve_metadata(brokers, correlation_id, sync_timeout, topic \\ []), do: retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, 0) + + # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity + def retrieve_metadata_with_version(brokers, correlation_id, sync_timeout, api_version, topic \\ []), do: retrieve_metadata(brokers, correlation_id, sync_timeout, topic, @retry_count, api_version, 0) + # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity def retrieve_metadata(_, correlation_id, _sync_timeout, topic, 0, error_code) do Logger.log(:error, "Metadata request for topic #{inspect topic} failed with error_code #{inspect error_code}") {correlation_id, %Metadata.Response{}} end + + # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity + def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, error_code) do + retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, nil, error_code) + end + # credo:disable-for-next-line Credo.Check.Refactor.FunctionArity - def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, _error_code) do - metadata_request = Metadata.create_request(correlation_id, @client_id, topic) + def retrieve_metadata(brokers, correlation_id, sync_timeout, topic, retry, api_version, _error_code) do + metadata_request = Metadata.create_request(correlation_id, @client_id, topic, api_version) data = first_broker_response(metadata_request, brokers, sync_timeout) response = case data do nil -> @@ -393,10 +409,10 @@ defmodule KafkaEx.Server do raise "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}." :no_metadata_available data -> - Metadata.parse_response(data) + Metadata.parse_response(data, api_version) end - case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do + case Enum.find(response.topic_metadatas, &(&1.error_code == :leader_not_available)) do nil -> {correlation_id + 1, response} topic_metadata -> :timer.sleep(300) diff --git a/lib/kafka_ex/server_0_p_10_p_1.ex b/lib/kafka_ex/server_0_p_10_p_1.ex new file mode 100644 index 00000000..154ccb49 --- /dev/null +++ b/lib/kafka_ex/server_0_p_10_p_1.ex @@ -0,0 +1,184 @@ +defmodule KafkaEx.Server0P10P1 do + @moduledoc """ + Implements kafkaEx.Server behaviors for kafka 0.10.1 API. + """ + use KafkaEx.Server + alias KafkaEx.Protocol.CreateTopics + alias KafkaEx.Server0P8P2 + alias KafkaEx.Server0P9P0 + + # alias KafkaEx.ConsumerGroupRequiredError + alias KafkaEx.InvalidConsumerGroupError + alias KafkaEx.Protocol.ConsumerMetadata + alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse + # alias KafkaEx.Protocol.Heartbeat + # alias KafkaEx.Protocol.JoinGroup + # alias KafkaEx.Protocol.LeaveGroup + alias KafkaEx.Protocol.Metadata.Broker + # alias KafkaEx.Protocol.SyncGroup + alias KafkaEx.Server.State + # alias KafkaEx.NetworkClient + + require Logger + + @metadata_api_version 1 + @consumer_group_update_interval 30_000 + + + def start_link(args, name \\ __MODULE__) + + def start_link(args, :no_name) do + GenServer.start_link(__MODULE__, [args]) + end + + def start_link(args, name) do + GenServer.start_link(__MODULE__, [args, name], [name: name]) + end + + # The functions below are all defined in KafkaEx.Server0P8P2 + defdelegate kafka_server_consumer_group(state), to: Server0P8P2 + defdelegate kafka_server_fetch(fetch_request, state), to: Server0P8P2 + defdelegate kafka_server_offset_fetch(offset_fetch, state), to: Server0P8P2 + defdelegate kafka_server_offset_commit(offset_commit_request, state), to: Server0P8P2 + defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 + defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2 + + # The functions below are all defined in KafkaEx.Server0P9P0 + defdelegate kafka_server_join_group(request, network_timeout, state_in), to: Server0P9P0 + defdelegate kafka_server_sync_group(request, network_timeout, state_in), to: Server0P9P0 + defdelegate kafka_server_leave_group(request, network_timeout, state_in), to: Server0P9P0 + defdelegate kafka_server_heartbeat(request, network_timeout, state_in), to: Server0P9P0 + defdelegate consumer_group?(state), to: Server0P9P0 + + def kafka_server_init([args]) do + kafka_server_init([args, self()]) + end + + def kafka_server_init([args, name]) do + uris = Keyword.get(args, :uris, []) + metadata_update_interval = Keyword.get(args, :metadata_update_interval, @metadata_update_interval) + consumer_group_update_interval = Keyword.get(args, :consumer_group_update_interval, @consumer_group_update_interval) + + # this should have already been validated, but it's possible someone could + # try to short-circuit the start call + consumer_group = Keyword.get(args, :consumer_group) + unless KafkaEx.valid_consumer_group?(consumer_group) do + raise InvalidConsumerGroupError, consumer_group + end + + use_ssl = Keyword.get(args, :use_ssl, false) + ssl_options = Keyword.get(args, :ssl_options, []) + + brokers = Enum.map(uris, fn({host, port}) -> %Broker{host: host, port: port, socket: NetworkClient.create_socket(host, port, ssl_options, use_ssl)} end) + + {correlation_id, metadata} = retrieve_metadata_with_version(brokers, 0, config_sync_timeout(), @metadata_api_version) + state = %State{metadata: metadata, brokers: brokers, correlation_id: correlation_id, consumer_group: consumer_group, metadata_update_interval: metadata_update_interval, consumer_group_update_interval: consumer_group_update_interval, worker_name: name, ssl_options: ssl_options, use_ssl: use_ssl} + # Get the initial "real" broker list and start a regular refresh cycle. + state = update_metadata(state, @metadata_api_version) + {:ok, _} = :timer.send_interval(state.metadata_update_interval, :update_metadata) + + state = + if consumer_group?(state) do + # If we are using consumer groups then initialize the state and start the update cycle + {_, updated_state} = update_consumer_metadata(state) + {:ok, _} = :timer.send_interval(state.consumer_group_update_interval, :update_consumer_metadata) + updated_state + else + state + end + + {:ok, state} + end + + def kafka_server_metadata(topic, state) do + {correlation_id, metadata} = retrieve_metadata_with_version(state.brokers, state.correlation_id, config_sync_timeout(), @metadata_api_version, topic) + updated_state = %{state | metadata: metadata, correlation_id: correlation_id} + {:reply, metadata, updated_state} + end + + def kafka_server_update_metadata(state) do + {:noreply, update_metadata(state, @metadata_api_version)} + end + + def kafka_create_topics(requests, state) do + create_topics_request = %CreateTopics.Request{ + create_topic_requests: requests, + timeout: 2000 + } + + mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request) + + broker = state.brokers |> Enum.find(&(&1.is_controller)) + + {response, state} = case broker do + nil -> + Logger.log(:error, "Coordinator for topic is not available") + {:topic_not_found, state} + _ -> + response = broker + |> NetworkClient.send_sync_request(mainRequest, config_sync_timeout()) + |> case do + {:error, reason} -> {:error, reason} + response -> CreateTopics.parse_response(response) + end + {response, %{state | correlation_id: state.correlation_id + 1}} + end + + {:reply, response, state} + end + + defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0) + + defp update_consumer_metadata(%State{consumer_group: consumer_group} = state, 0, error_code) do + Logger.log(:error, "Fetching consumer_group #{consumer_group} metadata failed with error_code #{inspect error_code}") + {%ConsumerMetadataResponse{error_code: error_code}, state} + end + + defp update_consumer_metadata(%State{consumer_group: consumer_group, correlation_id: correlation_id} = state, retry, _error_code) do + response = correlation_id + |> ConsumerMetadata.create_request(@client_id, consumer_group) + |> first_broker_response(state) + |> ConsumerMetadata.parse_response + + case response.error_code do + :no_error -> + { + response, + %{ + state | + consumer_metadata: response, + correlation_id: state.correlation_id + 1 + } + } + _ -> :timer.sleep(400) + update_consumer_metadata( + %{state | correlation_id: state.correlation_id + 1}, + retry - 1, + response.error_code + ) + end + end + + defp broker_for_consumer_group(state) do + ConsumerMetadataResponse.broker_for_consumer_group(state.brokers, state.consumer_metadata) + end + + # refactored from two versions, one that used the first broker as valid answer, hence + # the optional extra flag to do that. Wraps broker_for_consumer_group with an update + # call if no broker was found. + def broker_for_consumer_group_with_update(state, use_first_as_default \\ false) do + case broker_for_consumer_group(state) do + nil -> + {_, updated_state} = update_consumer_metadata(state) + default_broker = if use_first_as_default, do: hd(state.brokers), else: nil + {broker_for_consumer_group(updated_state) || default_broker, updated_state} + broker -> + {broker, state} + end + end + + defp first_broker_response(request, state) do + first_broker_response(request, state.brokers, config_sync_timeout()) + end + +end diff --git a/lib/kafka_ex/server_0_p_8_p_0.ex b/lib/kafka_ex/server_0_p_8_p_0.ex index a6c0d3ff..5481b10d 100644 --- a/lib/kafka_ex/server_0_p_8_p_0.ex +++ b/lib/kafka_ex/server_0_p_8_p_0.ex @@ -62,6 +62,7 @@ defmodule KafkaEx.Server0P8P0 do def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" def kafka_server_update_consumer_metadata(_state), do: raise "Consumer Group Metadata is not supported in 0.8.0 version of kafka" + def kafka_create_topics(_, _state), do: raise "CreateTopic is not supported in 0.9.0 version of kafka" defp fetch(request, state) do case network_request(request, Fetch, state) do diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 3591bc5f..cec40e72 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -140,6 +140,7 @@ defmodule KafkaEx.Server0P8P2 do def kafka_server_leave_group(_, _, _state), do: raise "Leave Group is not supported in 0.8.0 version of Kafka" def kafka_server_heartbeat(_, _, _state), do: raise "Heartbeat is not supported in 0.8.0 version of kafka" defp update_consumer_metadata(state), do: update_consumer_metadata(state, @retry_count, 0) + def kafka_create_topics(_, _state), do: raise "CreateTopic is not supported in 0.9.0 version of kafka" defp update_consumer_metadata(%State{consumer_group: consumer_group} = state, 0, error_code) do Logger.log(:error, "Fetching consumer_group #{consumer_group} metadata failed with error_code #{inspect error_code}") diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 0112f4ed..66c9294c 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -1,7 +1,14 @@ defmodule KafkaEx.Server0P9P0 do @moduledoc """ - Implements kafkaEx.Server behaviors for kafka 0.9.0 API. + Implements kafkaEx.Server behaviors for kafka >= 0.9.0 < 0.10.1 API. + """ + + # these functions aren't implemented for 0.9.0 + @dialyzer [ + {:nowarn_function, kafka_create_topics: 2}, + ] + use KafkaEx.Server alias KafkaEx.ConsumerGroupRequiredError alias KafkaEx.InvalidConsumerGroupError @@ -38,6 +45,7 @@ defmodule KafkaEx.Server0P9P0 do defdelegate kafka_server_offset_commit(offset_commit_request, state), to: Server0P8P2 defdelegate kafka_server_consumer_group_metadata(state), to: Server0P8P2 defdelegate kafka_server_update_consumer_metadata(state), to: Server0P8P2 + def kafka_create_topics(_, _state), do: raise "CreateTopic is not supported in 0.9.0 version of kafka" def kafka_server_init([args]) do kafka_server_init([args, self()]) @@ -207,7 +215,7 @@ defmodule KafkaEx.Server0P9P0 do # refactored from two versions, one that used the first broker as valid answer, hence # the optional extra flag to do that. Wraps broker_for_consumer_group with an update # call if no broker was found. - defp broker_for_consumer_group_with_update(state, use_first_as_default \\ false) do + def broker_for_consumer_group_with_update(state, use_first_as_default \\ false) do case broker_for_consumer_group(state) do nil -> {_, updated_state} = update_consumer_metadata(state) @@ -224,6 +232,7 @@ defmodule KafkaEx.Server0P9P0 do def consumer_group?(%State{consumer_group: :no_consumer_group}), do: false def consumer_group?(_), do: true + defp first_broker_response(request, state) do first_broker_response(request, state.brokers, config_sync_timeout()) end diff --git a/scripts/docker_up.sh b/scripts/docker_up.sh index 5ae17653..657a0e7f 100755 --- a/scripts/docker_up.sh +++ b/scripts/docker_up.sh @@ -14,6 +14,7 @@ set -e # Kafka needs to know our ip address so that it can advertise valid # connnection details + if [ -z ${IP_IFACE} ] then echo Detecting active network interface diff --git a/test/integration/server0_p_10_p_1_test.exs b/test/integration/server0_p_10_p_1_test.exs new file mode 100644 index 00000000..ddef60bb --- /dev/null +++ b/test/integration/server0_p_10_p_1_test.exs @@ -0,0 +1,42 @@ +defmodule KafkaEx.Server0P10P1.Test do + use ExUnit.Case + + @moduletag :server_0_p_10_p_1 + + @tag :createtopic + test "can create a topic" do + name = "topic3_#{:rand.uniform(2000000)}" + + request = %{ + topic: name, + num_partitions: 10, + replication_factor: 1, + replica_assignment: [], + config_entries: [ + %{config_name: "cleanup.policy", config_value: "compact"}, + %{config_name: "min.compaction.lag.ms", config_value: "0"} + ]} + + resp = KafkaEx.create_topics([request]) + # error = NONE + assert {0, name} == parse_create_topic_resp(resp) + + resp = KafkaEx.create_topics([request]) + # error = TOPIC_ALREADY_EXISTS + assert {36, name} == parse_create_topic_resp(resp) + + topics = KafkaEx.metadata.topic_metadatas |> Enum.map(&(&1.topic)) + assert Enum.member?(topics, name) + end + + def parse_create_topic_resp(response) do + %KafkaEx.Protocol.CreateTopics.Response{ + topic_errors: [ + %KafkaEx.Protocol.CreateTopics.TopicError{ + error_code: error_code, + topic_name: topic_name + } + ]} = response + {error_code, topic_name} + end +end