Skip to content

Commit

Permalink
Merge pull request #406 from kafkaex/handle_tcp_close
Browse files Browse the repository at this point in the history
Handle closed sockets in new client
  • Loading branch information
dantswain authored May 4, 2020
2 parents 1113a82 + 4ba3d32 commit 5a0707b
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 7 deletions.
14 changes: 13 additions & 1 deletion lib/kafka_ex/new/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,19 @@ defmodule KafkaEx.New.Adapter do
}
]
}) do
base_offset
{:ok, base_offset}
end

def produce_response(%{
responses: [
%{
partition_responses: [
%{error_code: error_code}
]
}
]
}) do
{:error, Kayrock.ErrorCode.code_to_atom(error_code)}
end

def metadata_response(cluster_metadata) do
Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/new/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ defmodule KafkaEx.New.Broker do
host: nil,
port: nil,
socket: nil,
rack: nil,
socket: nil
rack: nil

@type t :: %__MODULE__{}

Expand Down
30 changes: 27 additions & 3 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ defmodule KafkaEx.New.Client do
{:noreply, update_metadata(state)}
end

def handle_info({:tcp_closed, socket}, state) do
state_out = close_broker_by_socket(state, socket)
{:noreply, state_out}
end

def handle_info({:ssl_closed, socket}, state) do
state_out = close_broker_by_socket(state, socket)
{:noreply, state_out}
end

@impl true
def terminate(reason, state) do
Logger.log(
Expand Down Expand Up @@ -222,9 +232,8 @@ defmodule KafkaEx.New.Client do
for broker <- brokers_to_close do
Logger.log(
:debug,
"Closing connection to broker #{broker.node_id}: #{
inspect(broker.host)
} on port #{inspect(broker.port)}"
"Closing connection to broker #{broker.node_id}: " <>
"#{inspect(broker.host)} on port #{inspect(broker.port)}"
)

NetworkClient.close_socket(broker.socket)
Expand Down Expand Up @@ -688,4 +697,19 @@ defmodule KafkaEx.New.Client do
{topic_metadata,
%{updated_state | allow_auto_topic_creation: allow_auto_topic_creation}}
end

defp close_broker_by_socket(state, socket) do
State.update_brokers(state, fn broker ->
if broker.socket.socket == socket do
Logger.log(
:debug,
"Broker #{inspect(broker.host)}:#{inspect(broker.port)} closed connection"
)

Broker.put_socket(broker, nil)
else
broker
end
end)
end
end
2 changes: 1 addition & 1 deletion lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ defmodule KafkaEx.New.ClientCompatibility do
response =
case response do
{:ok, :ok} -> {:ok, :ok}
{:ok, val} -> {:ok, Adapter.produce_response(val)}
{:ok, val} -> Adapter.produce_response(val)
_ -> response
end

Expand Down
18 changes: 18 additions & 0 deletions test/integration/new_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,22 @@ defmodule KafkaEx.New.Client.Test do
{:ok, offset_after} = KafkaExAPI.latest_offset(client, topic, partition)
assert offset_after == offset_before + 3
end

test "client can receive {:ssl_closed, _}", %{client: client} do
send(client, {:ssl_closed, :unused})
TestHelper.wait_for(fn ->
{:message_queue_len, m} = Process.info(client, :message_queue_len)
m == 0
end)
assert Process.alive?(client)
end

test "client can receive {:tcp_closed, _}", %{client: client} do
send(client, {:tcp_closed, :unused})
TestHelper.wait_for(fn ->
{:message_queue_len, m} = Process.info(client, :message_queue_len)
m == 0
end)
assert Process.alive?(client)
end
end

0 comments on commit 5a0707b

Please sign in to comment.