Skip to content

Commit

Permalink
plug: Handle response streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekmach committed Sep 3, 2023
1 parent 75f83f2 commit 0e3f26b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ jobs:
REQ_NOWARN_OUTPUT: true
# TODO: Remove on Finch 0.17
FINCH_REF: b08b594
# TODO: Remove on Plug 1.15
PLUG_REF: 7b859ef
strategy:
fail-fast: false
matrix:
Expand Down
69 changes: 64 additions & 5 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,8 @@ defmodule Req.Steps do
end

defp run_plug(request) do
plug = request.options[:plug]

req_body =
case request.body do
iodata when is_binary(iodata) or is_list(iodata) ->
Expand All @@ -1009,19 +1011,76 @@ defmodule Req.Steps do
end
end

conn = Plug.Test.conn(request.method, request.url, req_body)

conn =
Plug.Test.conn(request.method, request.url, req_body)
if request.into do
register_before_chunk(conn, fn conn, chunk ->
update_in(conn.private[:req_plug_chunks], fn chunks ->
(chunks || []) ++ [chunk]
end)
end)
else
conn
end

conn =
conn
|> Map.replace!(:req_headers, req_headers)
|> call_plug(request.options[:plug])
|> call_plug(plug)

response =
Req.Response.new(
status: conn.status,
headers: conn.resp_headers,
body: conn.resp_body
headers: conn.resp_headers
)

{request, response}
case request.into do
nil ->
response = put_in(response.body, conn.resp_body)
{request, response}

fun when is_function(fun, 2) ->
chunks =
conn.private[:req_plug_chunks] ||
raise "plug #{inspect(plug)} does not send chunked response"

Enum.reduce_while(
chunks,
{request, response},
fn chunk, acc ->
fun.({:data, chunk}, acc)
end
)

collectable ->
chunks =
conn.private[:req_plug_chunks] ||
raise "plug #{inspect(plug)} does not send chunked response"

{acc, collector} = Collectable.into(collectable)

{acc, {request, response}} =
Enum.reduce(
chunks,
{acc, {request, response}},
fn chunk, {acc1, acc2} ->
{collector.(acc1, {:cont, chunk}), acc2}
end
)

acc = collector.(acc, :done)
{request, %{response | body: acc}}
end
end

# TODO: remove when we depend on Plug 1.15
defp register_before_chunk(conn, callback) do
if Code.ensure_loaded?(Plug.Conn) and function_exported?(Plug.Conn, :register_before_chunk, 2) do
Plug.Conn.register_before_chunk(conn, callback)
else
raise "using :into and :plug requires Plug 1.15"
end
end

defp call_plug(conn, plug) when is_atom(plug) do
Expand Down
19 changes: 17 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ defmodule Req.MixProject do
:brotli,
:ezstd,
# TODO: Wait for async_request/3
Finch
Finch,
# TODO: Wait for Plug 1.15
Plug.Conn
]
]
]
Expand Down Expand Up @@ -58,7 +60,7 @@ defmodule Req.MixProject do
{:mime, "~> 1.6 or ~> 2.0"},
{:jason, "~> 1.0"},
{:nimble_csv, "~> 1.0", optional: true},
{:plug, "~> 1.0", optional: true},
{:plug, "~> 1.0", [optional: true] ++ plug_opts()},
{:brotli, "~> 0.3.1", optional: true},
{:ezstd, "~> 1.0", optional: true},
{:bypass, "~> 2.1", only: :test},
Expand All @@ -79,6 +81,19 @@ defmodule Req.MixProject do
end
end

defp plug_opts do
cond do
path = System.get_env("PLUG_PATH") ->
[path: path]

ref = System.get_env("PLUG_REF") ->
[github: "elixir-plug/plug", ref: ref]

true ->
[]
end
end

defp docs do
[
main: "readme",
Expand Down
21 changes: 21 additions & 0 deletions test/req/steps_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,27 @@ defmodule Req.StepsTest do

assert Req.request!(req).body == "foofoo"
end

# TODO: Remove on Plug 1.15
plug_register_before_chunk? =
Code.ensure_loaded?(Plug.Conn) and function_exported?(Plug.Conn, :register_before_chunk, 2)

@tag skip: not plug_register_before_chunk?
test "response stream" do
req =
Req.new(
plug: fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
{:ok, conn} = Plug.Conn.chunk(conn, "bar")
conn
end,
into: []
)

resp = Req.request!(req)
assert resp.body == ["foo", "bar"]
end
end

describe "run_finch" do
Expand Down

0 comments on commit 0e3f26b

Please sign in to comment.