diff --git a/lib/finch/http2/pool.ex b/lib/finch/http2/pool.ex index 6e93b8ac..43ea3ab6 100644 --- a/lib/finch/http2/pool.ex +++ b/lib/finch/http2/pool.ex @@ -160,9 +160,7 @@ defmodule Finch.HTTP2.Pool do end @impl true - def init({{scheme, host, port} = shp, registry, _pool_size, pool_opts}) do - {:ok, _} = Registry.register(registry, shp, __MODULE__) - + def init({{scheme, host, port}, registry, _pool_size, pool_opts}) do data = %{ conn: nil, scheme: scheme, @@ -171,7 +169,8 @@ defmodule Finch.HTTP2.Pool do requests: %{}, backoff_base: 500, backoff_max: 10_000, - connect_opts: pool_opts[:conn_opts] || [] + connect_opts: pool_opts[:conn_opts] || [], + registry: registry } {:ok, :disconnected, data, {:next_event, :internal, {:connect, 0}}} @@ -187,6 +186,8 @@ defmodule Finch.HTTP2.Pool do # When entering a disconnected state we need to fail all of the pending # requests def disconnected(:enter, _, data) do + Registry.unregister(data.registry, {data.scheme, data.host, data.port}) + :ok = Enum.each(data.requests, fn {ref, request} -> send(request.from_pid, {:error, ref, Error.exception(:connection_closed)}) @@ -276,7 +277,8 @@ defmodule Finch.HTTP2.Pool do @doc false def connected(event, content, data) - def connected(:enter, _old_state, _data) do + def connected(:enter, _old_state, data) do + {:ok, _} = Registry.register(data.registry, {data.scheme, data.host, data.port}, __MODULE__) :keep_state_and_data end @@ -398,6 +400,8 @@ defmodule Finch.HTTP2.Pool do def connected_read_only(event, content, data) def connected_read_only(:enter, _old_state, data) do + Registry.unregister(data.registry, {data.scheme, data.host, data.port}) + {actions, data} = Enum.flat_map_reduce(data.requests, data, fn # request is awaiting a response and should stay in state diff --git a/test/finch/http2/integration_test.exs b/test/finch/http2/integration_test.exs index 4ba922e1..35cc5ae0 100644 --- a/test/finch/http2/integration_test.exs +++ b/test/finch/http2/integration_test.exs @@ -81,20 +81,24 @@ defmodule Finch.HTTP2.IntegrationTest do # they shouldn't block each other which we check with a rough time estimates request = Finch.build(:get, url <> "/wait/1000") + # Warm the connection + {:ok, _} = Finch.request(request, TestFinch) + results = - 1..50 - |> Enum.map(fn _ -> - Task.async(fn -> + Task.async_stream( + 1..50, + fn _ -> start = System.monotonic_time() {:ok, _} = Finch.request(request, TestFinch) System.monotonic_time() - start - end) - end) - |> Enum.map(&Task.await/1) + end, + ordered: false + ) + |> Enum.into([]) - for result <- results do + for {:ok, result} <- results do time = System.convert_time_unit(result, :native, :millisecond) - assert time <= 1200 + assert time <= 1_050 end end diff --git a/test/finch/http2/pool_test.exs b/test/finch/http2/pool_test.exs index 7b680d05..11fe9ce6 100644 --- a/test/finch/http2/pool_test.exs +++ b/test/finch/http2/pool_test.exs @@ -3,6 +3,7 @@ defmodule Finch.HTTP2.PoolTest do import Mint.HTTP2.Frame + alias Finch.PoolManager alias Finch.HTTP2.Pool alias Finch.MockHTTP2Server @@ -162,6 +163,97 @@ defmodule Finch.HTTP2.PoolTest do request(pool, req, []) end + test "if connections are in connected_read_only state, don't let clients check them out from the pool", + %{ + request: req + } do + us = self() + + port = + start_server_and_connect_with(fn port -> + start_supervised!( + {Finch, + name: TestFinch, + pools: %{ + "https://localhost:#{port}" => [ + protocol: :http2, + count: 1, + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + }} + ) + + port + end) + + {pool, _} = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port}) + + spawn(fn -> + result = request(pool, req, []) + send(us, {:resp, result}) + end) + + assert_recv_frames([headers(stream_id: stream_id)]) + + # Force the connection to enter read only mode + server_send_frames([ + goaway(last_stream_id: stream_id, error_code: :no_error, debug_data: "all good") + ]) + + :timer.sleep(50) + # The connection should be discarded from the pool + assert :none = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port}) + end + + test "if connections are in disconnected state, don't let clients check them out from the pool", + %{ + request: req + } do + us = self() + + port = + start_server_and_connect_with(fn port -> + start_supervised!( + {Finch, + name: TestFinch, + pools: %{ + "https://localhost:#{port}" => [ + protocol: :http2, + count: 1, + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + }} + ) + + port + end) + + {pool, _} = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port}) + + spawn(fn -> + result = request(pool, req, []) + send(us, {:resp, result}) + end) + + # If the server closes the socket, the connection should be discarded from the pool + :ok = :ssl.close(server_socket()) + :timer.sleep(50) + assert :none = PoolManager.lookup_pool(TestFinch, {:https, "localhost", port}) + + # But after the client reconnects, it should be added back in + server_accept_socket() + :timer.sleep(50) + assert PoolManager.lookup_pool(TestFinch, {:https, "localhost", port}) != :none + end + test "request timeout with timeout of 0", %{request: req} do us = self() @@ -288,6 +380,11 @@ defmodule Finch.HTTP2.PoolTest do result end + defp server_accept_socket() do + server = Process.get(@pdict_key) + MockHTTP2Server.accept_socket(server) + end + defp recv_next_frames(n) do server = Process.get(@pdict_key) MockHTTP2Server.recv_next_frames(server, n) diff --git a/test/support/mock_http2_server.ex b/test/support/mock_http2_server.ex index 0577f246..a2bf58b6 100644 --- a/test/support/mock_http2_server.ex +++ b/test/support/mock_http2_server.ex @@ -4,7 +4,7 @@ defmodule Finch.MockHTTP2Server do alias Mint.{HTTP2.Frame, HTTP2.HPACK} - defstruct [:socket, :encode_table, :decode_table] + defstruct [:socket, :encode_table, :decode_table, :listen_socket, :server_settings] @fixtures_dir Path.expand("../fixtures", __DIR__) @@ -36,7 +36,9 @@ defmodule Finch.MockHTTP2Server do server = %__MODULE__{ socket: server_socket, encode_table: HPACK.new(4096), - decode_table: HPACK.new(4096) + decode_table: HPACK.new(4096), + listen_socket: listen_socket, + server_settings: server_settings } {result, server} @@ -107,6 +109,10 @@ defmodule Finch.MockHTTP2Server do server.socket end + def accept_socket(%__MODULE__{listen_socket: listen_socket, server_settings: server_settings}) do + accept(listen_socket, self(), server_settings) + end + defp accept(listen_socket, parent, server_settings) do {:ok, socket} = :ssl.transport_accept(listen_socket) {:ok, socket} = :ssl.handshake(socket)