Skip to content

Commit

Permalink
Add request streaming (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekmach authored Aug 24, 2023
1 parent de6c16b commit c2a3ffe
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 15 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Req is a batteries-included HTTP client for Elixir.

* Request body encoding and automatic response body decoding (via [`encode_body`] and [`decode_body`] steps.)

* Request body streaming

* Encode params as query string (via [`put_params`] step.)

* Basic, bearer, and `.netrc` authentication (via [`auth`] step.)
Expand Down Expand Up @@ -53,6 +55,13 @@ Req.get!("https://api.github.com/repos/wojtekmach/req").body["description"]

If you want to use Req in a Mix project, you can add the above dependency to your `mix.exs`.

Here's an example POST request (which sends data as `application/x-www-form-urlencoded`):

```elixir
iex> Req.post!("https://httpbin.org/post", form: [comments: "hello!"]).body["form"]
%{"comments" => "hello!"}
```

If you are planning to make several similar requests, you can build up a request struct with
desired common options and re-use it:

Expand Down
12 changes: 12 additions & 0 deletions lib/req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ defmodule Req do
iex> Req.post!("https://httpbin.org/post", form: [comments: "hello!"]).body["form"]
%{"comments" => "hello!"}
Stream request body:
iex> stream = Stream.duplicate("foo", 3)
iex> Req.post!("https://httpbin.org/post", body: {:stream, stream}).body["data"]
"foofoofoo"
"""

@type url() :: URI.t() | String.t()
Expand Down Expand Up @@ -64,6 +70,12 @@ defmodule Req do
* `:body` - the request body.
Can be one of:
* `iodata`
* `{:stream, enumerable}`
Additional URL options:
* `:base_url` - if set, the request URL is prepended with this base URL (via
Expand Down
8 changes: 7 additions & 1 deletion lib/req/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ defmodule Req.Request do
* `:body` - the HTTP request body
Can be one of:
* `iodata`
* `{:stream, enumerable}`
* `:options` - the options to be used by steps. See ["Options"](#module-options) section below
for more information.
Expand Down Expand Up @@ -311,7 +317,7 @@ defmodule Req.Request do
method: atom(),
url: URI.t(),
headers: [{binary(), binary()}],
body: iodata() | nil,
body: iodata() | {:stream, Enumerable.t()} | nil,
options: map(),
registered_options: MapSet.t(),
halted: boolean(),
Expand Down
51 changes: 49 additions & 2 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -504,14 +504,51 @@ defmodule Req.Steps do
@doc step: :request
def compress_body(request) do
if request.options[:compress_body] do
body =
case request.body do
{:stream, enumerable} ->
{:stream, gzip_stream(enumerable)}

iodata ->
:zlib.gzip(iodata)
end

request
|> Map.update!(:body, &:zlib.gzip/1)
|> Map.replace!(:body, body)
|> Req.Request.put_header("content-encoding", "gzip")
else
request
end
end

defp gzip_stream(enumerable) do
eof = make_ref()

enumerable
|> Stream.concat([eof])
|> Stream.transform(
fn ->
z = :zlib.open()
# https://github.com/erlang/otp/blob/OTP-26.0/erts/preloaded/src/zlib.erl#L551
:ok = :zlib.deflateInit(z, :default, :deflated, 16 + 15, 8, :default)
z
end,
fn
^eof, z ->
buf = :zlib.deflate(z, [], :finish)
{buf, z}

data, z ->
buf = :zlib.deflate(z, data)
{buf, z}
end,
fn z ->
:ok = :zlib.deflateEnd(z)
:ok = :zlib.close(z)
end
)
end

@doc """
Runs the request using `Finch`.
Expand Down Expand Up @@ -777,7 +814,17 @@ defmodule Req.Steps do
end

defp run_plug(request) do
body = IO.iodata_to_binary(request.body || "")
body =
case request.body do
nil ->
""

{:stream, enumerable} ->
enumerable |> Enum.to_list() |> IO.iodata_to_binary()

iodata ->
IO.iodata_to_binary(iodata)
end

conn =
Plug.Test.conn(request.method, request.url, body)
Expand Down
94 changes: 82 additions & 12 deletions test/req/steps_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,39 @@ defmodule Req.StepsTest do
end

describe "encode_body" do
# neither `body: data` nor `body: {:stream, data}` is used by the step but testing these
# here for locality
test "body", c do
Bypass.expect(c.bypass, "POST", "/", fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)
Plug.Conn.send_resp(conn, 200, body)
end)

req =
Req.new(
url: c.url,
body: "foo"
)

assert Req.post!(req).body == "foo"
end

test "body stream", c do
Bypass.expect(c.bypass, "POST", "/", fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)
Plug.Conn.send_resp(conn, 200, body)
end)

req =
Req.new(
url: c.url,
# TODO: use Stream.duplicate("foo", 3) when we require Elixir 1.14
body: {:stream, ["foo"] |> Stream.cycle() |> Stream.take(3)}
)

assert Req.post!(req).body == "foofoofoo"
end

test "json", c do
Bypass.expect(c.bypass, "POST", "/", fn conn ->
assert {:ok, ~s|{"a":1}|, conn} = Plug.Conn.read_body(conn)
Expand Down Expand Up @@ -217,13 +250,34 @@ defmodule Req.StepsTest do
{"range", "bytes=0-20"}
end

test "compress_body" do
req = Req.new(method: :post, json: %{a: 1}) |> Req.Request.prepare()
assert Jason.decode!(req.body) == %{"a" => 1}
describe "compress_body" do
test "request" do
req = Req.new(method: :post, json: %{a: 1}) |> Req.Request.prepare()
assert Jason.decode!(req.body) == %{"a" => 1}

req = Req.new(method: :post, json: %{a: 1}, compress_body: true) |> Req.Request.prepare()
assert :zlib.gunzip(req.body) |> Jason.decode!() == %{"a" => 1}
assert List.keyfind(req.headers, "content-encoding", 0) == {"content-encoding", "gzip"}
req = Req.new(method: :post, json: %{a: 1}, compress_body: true) |> Req.Request.prepare()
assert :zlib.gunzip(req.body) |> Jason.decode!() == %{"a" => 1}
assert List.keyfind(req.headers, "content-encoding", 0) == {"content-encoding", "gzip"}
end

test "stream", c do
Bypass.expect(c.bypass, "POST", "/", fn conn ->
assert {:ok, body, conn} = Plug.Conn.read_body(conn)
body = :zlib.gunzip(body)
Plug.Conn.send_resp(conn, 200, body)
end)

req =
Req.new(
url: c.url,
method: :post,
# TODO: use Stream.duplicate("foo", 3) when we require Elixir 1.14
body: {:stream, ["foo"] |> Stream.cycle() |> Stream.take(3)},
compress_body: true
)

assert Req.post!(req).body == "foofoofoo"
end
end

## Response steps
Expand Down Expand Up @@ -1237,14 +1291,30 @@ defmodule Req.StepsTest do
refute_received _
end

test "put_plug" do
plug = fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)
assert body == ~s|{"a":1}|
Plug.Conn.send_resp(conn, 200, "ok")
describe "put_plug" do
test "request" do
plug = fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)
assert body == ~s|{"a":1}|
Plug.Conn.send_resp(conn, 200, "ok")
end

assert Req.request!(plug: plug, json: %{a: 1}).body == "ok"
end

assert Req.request!(plug: plug, json: %{a: 1}).body == "ok"
test "request stream" do
req =
Req.new(
plug: fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)
Plug.Conn.send_resp(conn, 200, body)
end,
# TODO: use Stream.duplicate("foo", 3) when we require Elixir 1.14
body: {:stream, ["foo"] |> Stream.cycle() |> Stream.take(3)}
)

assert Req.request!(req).body == "foofoofoo"
end
end

describe "run_finch" do
Expand Down

0 comments on commit c2a3ffe

Please sign in to comment.