Skip to content

Commit

Permalink
Add preliminary support for CreateTopics API
Browse files Browse the repository at this point in the history
  • Loading branch information
jbruggem committed Jul 18, 2018
1 parent 236629e commit d0300b4
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 30 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ config :kafka_ex,
],
# set this to the version of the kafka broker that you are using
# include only major.minor.patch versions. must be at least 0.8.0
kafka_version: "0.9.0"
kafka_version: "0.10.1"

env_config = Path.expand("#{Mix.env}.exs", __DIR__)
if File.exists?(env_config) do
Expand Down
12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,32 @@ services:
ports:
- "2181:2181"
kafka1:
image: wurstmeister/kafka:0.9.0.1
image: wurstmeister/kafka:0.10.1.0
ports:
- "9092:9092"
depends_on:
- zookeeper
volumes:
- ./kafka1/server.properties.in:/opt/kafka_2.11-0.9.0.1/config/server.properties.in
- ./kafka1/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
kafka2:
image: wurstmeister/kafka:0.9.0.1
image: wurstmeister/kafka:0.10.1.0
ports:
- "9093:9093"
depends_on:
- zookeeper
volumes:
- ./kafka2/server.properties.in:/opt/kafka_2.11-0.9.0.1/config/server.properties.in
- ./kafka2/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
kafka3:
image: wurstmeister/kafka:0.9.0.1
image: wurstmeister/kafka:0.10.1.0
ports:
- "9094:9094"
depends_on:
- zookeeper
volumes:
- ./kafka3/server.properties.in:/opt/kafka_2.11-0.9.0.1/config/server.properties.in
- ./kafka3/server.properties.in:/opt/kafka_2.11-0.10.1.0/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
17 changes: 17 additions & 0 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,23 @@ defmodule KafkaEx do
def valid_consumer_group?(b) when is_binary(b), do: byte_size(b) > 0
def valid_consumer_group?(_), do: false


@doc """
## Example
```elixir
iex> KafkaEx.create_worker(:mt)
```
"""
@spec create_topics(String.t) :: CreateTopics.Response.t
def create_topics(topic_name, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker)
# topic = Keyword.get(opts, :topic, "")
Server.call(worker_name, {:create_topics, topic_name})
end

#OTP API
def start(_type, _args) do
max_restarts = Application.get_env(:kafka_ex, :max_restarts, 10)
Expand Down
1 change: 1 addition & 0 deletions lib/kafka_ex/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ defmodule KafkaEx.Config do

defp server("0.8.0"), do: KafkaEx.Server0P8P0
defp server("0.8.2"), do: KafkaEx.Server0P8P2
defp server("0.10.1"), do: KafkaEx.Server0P10P1
defp server(_), do: KafkaEx.Server0P9P0

# ssl_options should be an empty list by default if use_ssl is false
Expand Down
13 changes: 12 additions & 1 deletion lib/kafka_ex/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ defmodule KafkaEx.Protocol do
@heartbeat_request 12
@leave_group_request 13
@sync_group_request 14
@create_topics_request 19
# DescribeConfigs 32
# AlterConfigs 33 Valid resource types are "Topic" and "Broker".

@api_version 0

Expand Down Expand Up @@ -59,8 +62,16 @@ defmodule KafkaEx.Protocol do
@sync_group_request
end

defp api_key(:create_topics) do
@create_topics_request
end

def create_request(type, correlation_id, client_id) do
<< api_key(type) :: 16, @api_version :: 16, correlation_id :: 32,
create_request(type, correlation_id, client_id, @api_version)
end

def create_request(type, correlation_id, client_id, api_version) do
<< api_key(type) :: 16, api_version :: 16, correlation_id :: 32,
byte_size(client_id) :: 16, client_id :: binary >>
end

Expand Down
146 changes: 146 additions & 0 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@

defmodule KafkaEx.Protocol.CreateTopics do
alias KafkaEx.Protocol

@moduledoc """
Implementation of the Kafka CreateTopics request and response APIs
"""

# CreateTopics Request (Version: 0) => [create_topic_requests] timeout
# create_topic_requests => topic num_partitions replication_factor [replica_assignment] [config_entries]
# topic => STRING
# num_partitions => INT32
# replication_factor => INT16
# replica_assignment => partition [replicas]
# partition => INT32
# replicas => INT32
# config_entries => config_name config_value
# config_name => STRING
# config_value => NULLABLE_STRING
# timeout => INT32

defmodule ReplicaAssignment do
defstruct partition: nil, replicas: nil
@type t :: %ReplicaAssignment{ partition: integer, replicas: [integer] }
end

defmodule ConfigEntry do
defstruct config_name: nil, config_value: nil
@type t :: %ConfigEntry{ config_name: binary, config_value: binary }
end

defmodule TopicRequest do
defstruct topic: nil,
num_partitions: nil,
replication_factor: nil,
replica_assignment: nil,
config_entries: nil
@type t :: %TopicRequest{
topic: binary,
num_partitions: integer,
replication_factor: integer,
replica_assignment: [ReplicaAssignment],
config_entries: [ConfigEntry],
}
end

defmodule Request do
@moduledoc false
defstruct create_topic_requests: nil, timeout: nil
@type t :: %Request{create_topic_requests: [TopicRequest], timeout: integer}
end

defmodule TopicError do
defstruct topic_name: nil, error_code: nil
@type t :: %TopicError{ topic_name: binary, error_code: integer }
end

defmodule Response do
@moduledoc false
defstruct topic_errors: nil
@type t :: %Response{topic_errors: [TopicError]}
end

@spec create_request(integer, binary, Request.t) :: binary
def create_request(correlation_id, client_id, create_topics_request) do
Protocol.create_request(:create_topics, correlation_id, client_id) <>
encode_topic_requests(create_topics_request.create_topic_requests) <>
<< create_topics_request.timeout :: 32-signed >>
end

@spec encode_topic_requests([TopicRequest.t]) :: binary
defp encode_topic_requests(requests) do
requests
|> map_encode(&encode_topic_request/1)
end

@spec encode_topic_request(TopicRequest.t) :: binary
defp encode_topic_request(request) do
encode_string(request.topic) <>
<< request.num_partitions :: 32-signed, request.replication_factor :: 16-signed >> <>
encode_replica_assignments(request.replica_assignment) <>
encode_config_entries(request.config_entries)
end


@spec encode_replica_assignments([ReplicaAssignment.t]) :: binary
defp encode_replica_assignments(replica_assignments) do
replica_assignments |> map_encode(&encode_replica_assignment/1)
end

@spec encode_replica_assignment(ReplicaAssignment.t) :: binary
defp encode_replica_assignment(replica_assignment) do
<< replica_assignment.partition :: 32-signed >> <>
replica_assignment.replicas |> map_encode(&(<< &1 :: 32-signed >>))
end

@spec encode_config_entries([ConfigEntry.t]) :: binary
defp encode_config_entries(config_entries) do
config_entries |> map_encode(&encode_config_entry/1)
end

@spec encode_config_entry(ConfigEntry.t) :: binary
defp encode_config_entry(config_entry) do
encode_string(config_entry.config_name) <> encode_nullable_string(config_entry.config_value)
end

@spec encode_nullable_string(String.t) :: binary
defp encode_nullable_string(text) do
if text == nil do
<< -1 :: 16-signed >>
else
encode_string(text)
end
end

@spec encode_string(String.t) :: binary
defp encode_string(text) do
<< byte_size(text) :: 16-signed, text :: binary, >>
end

defp map_encode(elems, function) do
if nil == elems or 0 == length(elems) do
<< 0 :: 32-signed >>
else
<< length(elems) :: 32-signed >> <>
(elems
|> Enum.map(function)
|> Enum.reduce(&(&1 <> &2)))
end

end

@spec parse_response(binary) :: [] | Response.t
def parse_response(<< _correlation_id :: 32-signed, topic_errors_count :: 32-signed, topic_errors :: binary >>) do
%Response{topic_errors: parse_topic_errors(topic_errors_count, topic_errors)}
end

defp parse_topic_errors(0, _), do: []

@spec parse_topic_errors(integer, binary) :: [TopicError.t]
defp parse_topic_errors(topic_errors_count,
<< topic_name_size :: 16-signed, topic_name :: size(topic_name_size)-binary, error_code :: 16-signed, rest :: binary >>) do
[%TopicError{topic_name: topic_name, error_code: error_code} | parse_topic_errors(topic_errors_count - 1, rest)]
end

end
Loading

0 comments on commit d0300b4

Please sign in to comment.