Skip to content

Commit

Permalink
Working !
Browse files Browse the repository at this point in the history
  • Loading branch information
jbruggem committed Jul 18, 2018
1 parent e1d85e8 commit 5a5494d
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 74 deletions.
17 changes: 9 additions & 8 deletions lib/kafka_ex/protocol/create_topics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,15 @@ defmodule KafkaEx.Protocol.CreateTopics do
@spec encode_topic_requests([TopicRequest.t]) :: binary
defp encode_topic_requests(requests) do
requests
|> IO.inspect
|> map_encode(&encode_topic_request/1)
|> IO.inspect
end

@spec encode_topic_request(TopicRequest.t) :: binary
defp encode_topic_request(request) do
(encode_string(request.topic) |> IO.inspect )<>
(<< request.num_partitions :: 32-signed, request.replication_factor :: 16-signed >> |> IO.inspect )<>
(encode_replica_assignments(request.replica_assignment) |> IO.inspect)<>
(encode_config_entries(request.config_entries) |> IO.inspect)
encode_string(request.topic) <>
<< request.num_partitions :: 32-signed, request.replication_factor :: 16-signed >> <>
encode_replica_assignments(request.replica_assignment) <>
encode_config_entries(request.config_entries)
end


Expand All @@ -103,17 +101,20 @@ defmodule KafkaEx.Protocol.CreateTopics do

@spec encode_config_entry(ConfigEntry.t) :: binary
defp encode_config_entry(config_entry) do
encode_string(config_entry.config_value) <> encode_nullable_string(config_entry.config_value)
encode_string(config_entry.config_name) <> encode_nullable_string(config_entry.config_value)
end

@spec encode_nullable_string(String.t) :: binary
defp encode_nullable_string(text) do
if text == nil do
<< -1 :: 16-signed >>
else
encode_string(text)
end
end

@spec encode_string(String.t) :: binary
defp encode_string(text) do
IO.inspect(["encode_string", text, byte_size(text)])
<< byte_size(text) :: 16-signed, text :: binary, >>
end

Expand Down
18 changes: 5 additions & 13 deletions lib/kafka_ex/protocol/metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ defmodule KafkaEx.Protocol.Metadata do

alias KafkaEx.Socket

defstruct node_id: -1, host: "", port: 0, socket: nil
@type t :: %__MODULE__{}
defstruct node_id: -1, host: "", port: 0, socket: nil, is_controller: nil
@type t :: %__MODULE__{node_id: integer, host: binary, port: integer, socket: KafkaEx.Socket.t, is_controller: boolean }

def connected?(%Broker{} = broker) do
broker.socket != nil && Socket.open?(broker.socket)
Expand Down Expand Up @@ -105,17 +105,16 @@ defmodule KafkaEx.Protocol.Metadata do
end

def create_request(correlation_id, client_id, ""), do: create_request(correlation_id, client_id, "", nil)
def create_request(correlation_id, client_id, topics) when is_list(topics), do: create_request(correlation_id, client_id, topics, nil)

def create_request(correlation_id, client_id, "", api_version) do
version = valid_api_version(api_version)
IO.inspect("********* create_request A version #{version}")
topic_count = if 0 == version, do: 0, else: -1
KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, version) <> << topic_count :: 32-signed >>
end

def create_request(correlation_id, client_id, topic, api_version) when is_binary(topic), do: create_request(correlation_id, client_id, [topic], valid_api_version(api_version))

def create_request(correlation_id, client_id, topics, api_version) when is_list(topics) do
IO.inspect("********* create_request B version #{valid_api_version(api_version)}")
KafkaEx.Protocol.create_request(:metadata, correlation_id, client_id, valid_api_version(api_version)) <> << length(topics) :: 32-signed, topic_data(topics) :: binary >>
end

Expand All @@ -125,13 +124,11 @@ defmodule KafkaEx.Protocol.Metadata do

def parse_response(<< _correlation_id :: 32-signed, brokers_size :: 32-signed, rest :: binary >>, api_version) do
version = valid_api_version(api_version)
IO.inspect("******** parse_response *********** version #{version}")
case version do
1 ->
{brokers, rest} = parse_brokers_v1(brokers_size, rest, [])
<< controller_id :: 32-signed, rest :: binary >> = rest
<< topic_metadatas_size :: 32-signed, rest :: binary >> = rest
IO.inspect("topics metadata size #{topic_metadatas_size}")
%Response{brokers: brokers, controller_id: controller_id, topic_metadatas: parse_topic_metadatas_v1(topic_metadatas_size, rest)}
0 ->
{brokers, rest} = parse_brokers(brokers_size, rest, [])
Expand All @@ -157,7 +154,6 @@ defmodule KafkaEx.Protocol.Metadata do
-1 :: 16-signed,
rest :: binary
>>, brokers) do
IO.inspect(" parse_brokers null RACK")
parse_brokers_v1(brokers_size - 1, rest, [%Broker{node_id: node_id, host: host, port: port} | brokers])
end

Expand All @@ -167,11 +163,9 @@ defmodule KafkaEx.Protocol.Metadata do
host :: size(host_len)-binary,
port :: 32-signed,
rack_len :: 16-signed,
rack :: size(rack_len)-binary,
_rack :: size(rack_len)-binary,
rest :: binary
>>, brokers) do
IO.inspect(" parse_brokers with RACK")
IO.inspect({node_id, host_len, host, port, rack_len, rack})
parse_brokers_v1(brokers_size - 1, rest, [%Broker{node_id: node_id, host: host, port: port} | brokers])
end

Expand All @@ -194,8 +188,6 @@ defmodule KafkaEx.Protocol.Metadata do
partition_metadatas_size :: 32-signed,
rest :: binary >>) do
{partition_metadatas, rest} = parse_partition_metadatas(partition_metadatas_size, [], rest)
IO.inspect("******** parse_topic_metadatas_v1 ***************")
IO.inspect({topic_len, topic, is_internal, partition_metadatas_size})
[%TopicMetadata{error_code: Protocol.error(error_code), topic: topic, partition_metadatas: partition_metadatas, is_internal: is_internal == 1 } |
parse_topic_metadatas_v1(topic_metadatas_size - 1, rest)]
end
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ defmodule KafkaEx.Server do

def update_metadata(state, api_version) do
{correlation_id, metadata} = retrieve_metadata_with_version(state.brokers, state.correlation_id, config_sync_timeout(), api_version)
metadata_brokers = metadata.brokers
metadata_brokers = metadata.brokers |> Enum.map(&(%{&1 | is_controller: &1.node_id == metadata.controller_id}))
brokers = state.brokers
|> remove_stale_brokers(metadata_brokers)
|> add_new_brokers(metadata_brokers, state.ssl_options, state.use_ssl)
Expand Down
68 changes: 19 additions & 49 deletions lib/kafka_ex/server_0_p_10_p_1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,59 +100,29 @@ defmodule KafkaEx.Server0P10P1 do
{:noreply, update_metadata(state, @metadata_api_version)}
end

def kafka_create_topics(topic_name, state) do
create_topics_request = %{
create_topic_requests: [%{
topic: topic_name,
num_partitions: 10,
replication_factor: 1,
replica_assignment: [],
config_entries: [
# %CreateTopics.ConfigEntry{config_name: "cleanup.policy", config_value: "compact"}
]
}],
def kafka_create_topics(requests, state) do
create_topics_request = %CreateTopics.Request{
create_topic_requests: requests,
timeout: 2000

}
mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request)

# unless consumer_group?(state) do
# raise ConsumerGroupRequiredError, offset_fetch
# end

# {broker, state} = KafkaEx.Server0P9P0.broker_for_consumer_group_with_update(state)

# # if the request is for a specific consumer group, use that
# # otherwise use the worker's consumer group
# consumer_group = offset_fetch.consumer_group || state.consumer_group
# offset_fetch = %{offset_fetch | consumer_group: consumer_group}

# offset_fetch_request = OffsetFetch.create_request(state.correlation_id, @client_id, offset_fetch)
IO.inspect("Go all brokers !")

all = state.brokers |> Enum.map(fn (broker) ->
IO.inspect("Broker: ")
IO.inspect(broker)
{response, state} = case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}
_ ->
response = broker
|> NetworkClient.send_sync_request(mainRequest, config_sync_timeout())
|> case do
{:error, reason} -> {:error, reason}
response -> CreateTopics.parse_response(response)
end
{response, %{state | correlation_id: state.correlation_id + 1}}
end

IO.inspect("*********************Response: ")
IO.inspect(response)
{response, state}
end)
mainRequest = CreateTopics.create_request(state.correlation_id, @client_id, create_topics_request)

{response, state} = hd(all)
broker = state.brokers |> Enum.find(&(&1.is_controller))

{response, state} = case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}
_ ->
response = broker
|> NetworkClient.send_sync_request(mainRequest, config_sync_timeout())
|> case do
{:error, reason} -> {:error, reason}
response -> CreateTopics.parse_response(response)
end
{response, %{state | correlation_id: state.correlation_id + 1}}
end

{:reply, response, state}
end
Expand Down
35 changes: 32 additions & 3 deletions test/integration/server0_p_10_p_1_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,38 @@ defmodule KafkaEx.Server0P10P1.Test do

@tag :createtopic
test "can create a topic" do
KafkaEx.create_topics("ploup2")
name = "topic3_#{:rand.uniform(2000000)}"

request = %{
topic: name,
num_partitions: 10,
replication_factor: 1,
replica_assignment: [],
config_entries: [
%{config_name: "cleanup.policy", config_value: "compact"},
%{config_name: "min.compaction.lag.ms", config_value: "0"}
]}

resp = KafkaEx.create_topics([request])
# error = NONE
assert {0, name} == parse_create_topic_resp(resp)

resp = KafkaEx.create_topics([request])
# error = TOPIC_ALREADY_EXISTS
assert {36, name} == parse_create_topic_resp(resp)

topics = KafkaEx.metadata.topic_metadatas |> Enum.map(&(&1.topic))
IO.inspect(topics)
assert Enum.member?(topics, "ploup2")
assert Enum.member?(topics, name)
end

def parse_create_topic_resp(response) do
%KafkaEx.Protocol.CreateTopics.Response{
topic_errors: [
%KafkaEx.Protocol.CreateTopics.TopicError{
error_code: error_code,
topic_name: topic_name
}
]} = response
{error_code, topic_name}
end
end

0 comments on commit 5a5494d

Please sign in to comment.