Skip to content

Commit

Permalink
Merge pull request #295 from jonatanklosko/jk-stream-acc
Browse files Browse the repository at this point in the history
Return acc when Finch.{stream,stream_while}/5 fails
  • Loading branch information
sneako authored Nov 22, 2024
2 parents 94d9f9f + 45cd5ac commit 93dd445
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 67 deletions.
24 changes: 14 additions & 10 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ defmodule Finch do
File.close(file)
"""
@spec stream(Request.t(), name(), acc, stream(acc), request_opts()) ::
{:ok, acc} | {:error, Exception.t()}
{:ok, acc} | {:error, Exception.t(), acc}
when acc: term()
def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
fun = fn entry, acc ->
Expand Down Expand Up @@ -437,7 +437,7 @@ defmodule Finch do
File.close(file)
"""
@spec stream_while(Request.t(), name(), acc, stream_while(acc), request_opts()) ::
{:ok, acc} | {:error, Exception.t()}
{:ok, acc} | {:error, Exception.t(), acc}
when acc: term()
def stream_while(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
request_span req, name do
Expand Down Expand Up @@ -490,14 +490,18 @@ defmodule Finch do
{:cont, {status, headers, body, trailers ++ value}}
end

with {:ok, {status, headers, body, trailers}} <- __stream__(req, name, acc, fun, opts) do
{:ok,
%Response{
status: status,
headers: headers,
body: IO.iodata_to_binary(body),
trailers: trailers
}}
case __stream__(req, name, acc, fun, opts) do
{:ok, {status, headers, body, trailers}} ->
{:ok,
%Response{
status: status,
headers: headers,
body: IO.iodata_to_binary(body),
trailers: trailers
}}

{:error, error, _acc} ->
{:error, error}
end
end
end
Expand Down
19 changes: 10 additions & 9 deletions lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,15 @@ defmodule Finch.HTTP1.Conn do
conn,
mint,
error,
acc,
metadata,
start_time,
extra_measurements
)
end

{:error, mint, error} ->
handle_request_error(conn, mint, error, metadata, start_time, extra_measurements)
handle_request_error(conn, mint, error, acc, metadata, start_time, extra_measurements)
end
catch
kind, error ->
Expand All @@ -166,10 +167,10 @@ defmodule Finch.HTTP1.Conn do
defp stream_or_body({:stream, _}), do: :stream
defp stream_or_body(body), do: body

defp handle_request_error(conn, mint, error, metadata, start_time, extra_measurements) do
defp handle_request_error(conn, mint, error, acc, metadata, start_time, extra_measurements) do
metadata = Map.put(metadata, :error, error)
Telemetry.stop(:send, start_time, metadata, extra_measurements)
{:error, %{conn | mint: mint}, error}
{:error, %{conn | mint: mint}, error, acc}
end

defp maybe_stream_request_body(mint, ref, {:stream, stream}) do
Expand Down Expand Up @@ -201,10 +202,10 @@ defmodule Finch.HTTP1.Conn do
Telemetry.stop(:recv, start_time, metadata, extra_measurements)
{:ok, %{conn | mint: mint}, acc}

{:error, mint, error, resp_metadata} ->
{:error, mint, error, acc, resp_metadata} ->
metadata = Map.merge(metadata, Map.put(resp_metadata, :error, error))
Telemetry.stop(:recv, start_time, metadata, extra_measurements)
{:error, %{conn | mint: mint}, error}
{:error, %{conn | mint: mint}, error, acc}
end
end

Expand Down Expand Up @@ -234,7 +235,7 @@ defmodule Finch.HTTP1.Conn do

defp receive_response(
_,
_acc,
acc,
_fun,
mint,
_ref,
Expand All @@ -244,7 +245,7 @@ defmodule Finch.HTTP1.Conn do
)
when timeouts.request_timeout < 0 do
{:ok, mint} = Mint.HTTP1.close(mint)
{:error, mint, %Mint.TransportError{reason: :timeout}, resp_metadata}
{:error, mint, %Mint.TransportError{reason: :timeout}, acc, resp_metadata}
end

defp receive_response(
Expand Down Expand Up @@ -281,7 +282,7 @@ defmodule Finch.HTTP1.Conn do
)

{:error, mint, error, _responses} ->
{:error, mint, error, resp_metadata}
{:error, mint, error, acc, resp_metadata}
end
end

Expand Down Expand Up @@ -365,7 +366,7 @@ defmodule Finch.HTTP1.Conn do
end

{:error, ^ref, error} ->
{:error, mint, error, resp_metadata}
{:error, mint, error, acc, resp_metadata}
end
end
end
29 changes: 13 additions & 16 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,19 @@ defmodule Finch.HTTP1.Pool do
fn from, {state, conn, idle_time} ->
Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time})

with {:ok, conn} <- Conn.connect(conn, name),
{:ok, conn, acc} <-
Conn.request(
conn,
req,
acc,
fun,
name,
receive_timeout,
request_timeout,
idle_time
) do
{{:ok, acc}, transfer_if_open(conn, state, from)}
else
case Conn.connect(conn, name) do
{:ok, conn} ->
Conn.request(conn, req, acc, fun, name, receive_timeout, request_timeout, idle_time)
|> case do
{:ok, conn, acc} ->
{{:ok, acc}, transfer_if_open(conn, state, from)}

{:error, conn, error, acc} ->
{{:error, error, acc}, transfer_if_open(conn, state, from)}
end

{:error, conn, error} ->
{{:error, error}, transfer_if_open(conn, state, from)}
{{:error, error, acc}, transfer_if_open(conn, state, from)}
end
end,
pool_timeout
Expand Down Expand Up @@ -116,7 +113,7 @@ defmodule Finch.HTTP1.Pool do
opts
) do
{:ok, _} -> send(owner, {request_ref, :done})
{:error, error} -> send(owner, {request_ref, {:error, error}})
{:error, error, _acc} -> send(owner, {request_ref, {:error, error}})
end
end)

Expand Down
50 changes: 27 additions & 23 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,32 @@ defmodule Finch.HTTP2.Pool do
timeout = opts[:receive_timeout]
request_ref = make_request_ref(pool)

with {:ok, recv_start} <- :gen_statem.call(pool, {:request, request_ref, request, opts}) do
monitor = Process.monitor(pool)
# If the timeout is an integer, we add a fail-safe "after" clause that fires
# after a timeout that is double the original timeout (min 2000ms). This means
# that if there are no bugs in our code, then the normal :request_timeout is
# returned, but otherwise we have a way to escape this code, raise an error, and
# get the process unstuck.
fail_safe_timeout = if is_integer(timeout), do: max(2000, timeout * 2), else: :infinity

try do
response_waiting_loop(acc, fun, request_ref, monitor, fail_safe_timeout, :headers)
catch
kind, error ->
metadata = %{request: request, name: name}
Telemetry.exception(:recv, recv_start, kind, error, __STACKTRACE__, metadata)

:ok = :gen_statem.call(pool, {:cancel, request_ref})
clean_responses(request_ref)
Process.demonitor(monitor)

:erlang.raise(kind, error, __STACKTRACE__)
end
case :gen_statem.call(pool, {:request, request_ref, request, opts}) do
{:ok, recv_start} ->
monitor = Process.monitor(pool)
# If the timeout is an integer, we add a fail-safe "after" clause that fires
# after a timeout that is double the original timeout (min 2000ms). This means
# that if there are no bugs in our code, then the normal :request_timeout is
# returned, but otherwise we have a way to escape this code, raise an error, and
# get the process unstuck.
fail_safe_timeout = if is_integer(timeout), do: max(2000, timeout * 2), else: :infinity

try do
response_waiting_loop(acc, fun, request_ref, monitor, fail_safe_timeout, :headers)
catch
kind, error ->
metadata = %{request: request, name: name}
Telemetry.exception(:recv, recv_start, kind, error, __STACKTRACE__, metadata)

:ok = :gen_statem.call(pool, {:cancel, request_ref})
clean_responses(request_ref)
Process.demonitor(monitor)

:erlang.raise(kind, error, __STACKTRACE__)
end

{:error, error} ->
{:error, error, acc}
end
end

Expand Down Expand Up @@ -171,7 +175,7 @@ defmodule Finch.HTTP2.Pool do

{^request_ref, {:error, error}} ->
Process.demonitor(monitor_ref)
{:error, error}
{:error, error, acc}

{:DOWN, ^monitor_ref, _, _, _} ->
{:error, :connection_process_went_down}
Expand Down
2 changes: 1 addition & 1 deletion lib/finch/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Finch.Pool do
Finch.stream(acc),
Finch.name(),
list()
) :: {:ok, acc} | {:error, term()}
) :: {:ok, acc} | {:error, term(), acc}
when acc: term()

@callback async_request(
Expand Down
16 changes: 8 additions & 8 deletions test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ defmodule Finch.HTTP2.PoolTest do
start_pool(port)
end)

assert {:error, error} = request(pool, %{req | headers: [{"foo", "bar"}]}, [])
assert {:error, error, _acc} = request(pool, %{req | headers: [{"foo", "bar"}]}, [])
assert %{reason: {:max_header_list_size_exceeded, _, _}} = error
end

Expand Down Expand Up @@ -105,7 +105,7 @@ defmodule Finch.HTTP2.PoolTest do
:timer.sleep(10)

# We can't send any more requests since the connection is closed for writing.
assert {:error, %Finch.Error{reason: :read_only}} = request(pool, req, [])
assert {:error, %Finch.Error{reason: :read_only}, _acc} = request(pool, req, [])

server_send_frames([
headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])),
Expand All @@ -120,7 +120,7 @@ defmodule Finch.HTTP2.PoolTest do
Process.sleep(50)

# If we try to make a request now that the server shut down, we get an error.
assert {:error, %Finch.Error{reason: :disconnected}} = request(pool, req, [])
assert {:error, %Finch.Error{reason: :disconnected}, _acc} = request(pool, req, [])
end

test "if server disconnects while there are waiting clients, we notify those clients", %{
Expand Down Expand Up @@ -148,7 +148,7 @@ defmodule Finch.HTTP2.PoolTest do

:ok = :ssl.close(server_socket())

assert_receive {:resp, {:error, %Finch.Error{reason: :connection_closed}}}
assert_receive {:resp, {:error, %Finch.Error{reason: :connection_closed}, _acc}}
end

test "if connections reaches max concurrent streams, we return an error", %{request: req} do
Expand All @@ -165,7 +165,7 @@ defmodule Finch.HTTP2.PoolTest do

assert_recv_frames([headers(stream_id: _stream_id)])

assert {:error, %Mint.HTTPError{reason: :too_many_concurrent_requests}} =
assert {:error, %Mint.HTTPError{reason: :too_many_concurrent_requests}, _acc} =
request(pool, req, [])
end

Expand All @@ -184,7 +184,7 @@ defmodule Finch.HTTP2.PoolTest do

assert_recv_frames([headers(stream_id: stream_id), rst_stream(stream_id: stream_id)])

assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}}
assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}, _acc}}
end

test "request timeout with timeout > 0", %{request: req} do
Expand All @@ -208,7 +208,7 @@ defmodule Finch.HTTP2.PoolTest do
headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers]))
])

assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}}
assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}, _acc}}
end

test "request timeout with timeout > 0 that fires after request is done", %{request: req} do
Expand Down Expand Up @@ -267,7 +267,7 @@ defmodule Finch.HTTP2.PoolTest do
# When there's a timeout, we cancel the request.
assert_recv_frames([rst_stream(stream_id: ^stream_id, error_code: :cancel)])

assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}}
assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}, _acc}}
end
end

Expand Down
31 changes: 31 additions & 0 deletions test/finch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,21 @@ defmodule FinchTest do
|> Finch.stream(finch_name, acc, fun)
end

test "unsuccessful get request", %{finch_name: finch_name} do
start_supervised!({Finch, name: finch_name})

acc = {nil, [], ""}

fun = fn
{:status, value}, {_, headers, body} -> {value, headers, body}
{:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
{:data, value}, {status, headers, body} -> {status, headers, body <> value}
end

assert {:error, %{reason: :nxdomain}, ^acc} =
Finch.build(:get, "http://idontexist.wat") |> Finch.stream(finch_name, acc, fun)
end

test "HTTP/1 with atom accumulator, illustrating that the type/shape of the accumulator is not important",
%{bypass: bypass, finch_name: finch_name} do
start_supervised!({Finch, name: finch_name})
Expand Down Expand Up @@ -769,6 +784,22 @@ defmodule FinchTest do
|> Finch.stream_while(finch_name, acc, fun)
end

test "unsuccessful get request", %{finch_name: finch_name} do
start_supervised!({Finch, name: finch_name})

acc = {nil, [], ""}

fun = fn
{:status, value}, {_, headers, body} -> {:cont, {value, headers, body}}
{:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}}
{:data, value}, {status, headers, body} -> {:cont, {status, headers, body <> value}}
end

assert {:error, %{reason: :nxdomain}, ^acc} =
Finch.build(:get, "http://idontexist.wat")
|> Finch.stream_while(finch_name, acc, fun)
end

defmodule InfiniteStream do
def init(options), do: options

Expand Down

0 comments on commit 93dd445

Please sign in to comment.