Skip to content

Commit

Permalink
Add response streaming (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekmach authored Aug 25, 2023
1 parent 8e87950 commit c149383
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 177 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Req is a batteries-included HTTP client for Elixir.

* Range requests (via [`put_range`]) step.)

* Response streaming

* Follows redirects (via [`follow_redirects`] step.)

* Retries on errors (via [`retry`] step.)
Expand Down Expand Up @@ -183,11 +185,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

[`Req.request/1`]: https://hexdocs.pm/req/Req.html#request/1
[`Req.new/1`]: https://hexdocs.pm/req/Req.html#new/1
[`Req.get!/2`]: https://hexdocs.pm/req/Req.html#get!/2
[`Req.post!/2`]: https://hexdocs.pm/req/Req.html#post!/2
[`Req.Request`]: https://hexdocs.pm/req/Req.Request.html
[`Req.request/1`]: https://hexdocs.pm/req/Req.html#request/1
[`Req.new/1`]: https://hexdocs.pm/req/Req.html#new/1
[`Req.get!/2`]: https://hexdocs.pm/req/Req.html#get!/2
[`Req.post!/2`]: https://hexdocs.pm/req/Req.html#post!/2
[`Req.async_request/2`]: https://hexdocs.pm/req/Req.html#async_request/2
[`Req.Request`]: https://hexdocs.pm/req/Req.Request.html

[`auth`]: https://hexdocs.pm/req/Req.Steps.html#auth/1
[`cache`]: https://hexdocs.pm/req/Req.Steps.html#cache/1
Expand Down
99 changes: 91 additions & 8 deletions lib/req.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Req do
@moduledoc """
@moduledoc ~S"""
The high-level API.
Req is composed of three main pieces:
Expand Down Expand Up @@ -35,8 +35,38 @@ defmodule Req do
iex> stream = Stream.duplicate("foo", 3)
iex> Req.post!("https://httpbin.org/post", body: {:stream, stream}).body["data"]
"foofoofoo"
Response streaming using callback:
iex> resp =
...> Req.get!("http://httpbin.org/stream/2", stream: fn {:data, data}, acc ->
...> IO.puts("got chunk with #{byte_size(data)} bytes")
...> {:cont, acc}
...> end)
# Outputs: got chunk with 249 bytes
# Outputs: got chunk with 249 bytes
iex> resp.status
200
iex> resp.body
""
"""

# TODO: Add when new version of Finch is out.
# Response streaming to caller:
#
# iex> {req, resp} = Req.async_request!("http://httpbin.org/stream/2")
# iex> resp.status
# 200
# iex> resp.body
# ""
# iex> Req.parse_message(req, receive do message -> message end)
# [{:data, "{\"url\": \"http://httpbin.org/stream/2\"" <> ...}]
# iex> Req.parse_message(req, receive do message -> message end)
# [{:data, "{\"url\": \"http://httpbin.org/stream/2\"" <> ...}]
# iex> Req.parse_message(req, receive do message -> message end)
# [:done]
# ""

@type url() :: URI.t() | String.t()

@doc """
Expand Down Expand Up @@ -71,9 +101,9 @@ defmodule Req do
Can be one of:
* `iodata`
* `iodata` - send request body eagerly
* `{:stream, enumerable}`
* `{:stream, enumerable}` - stream `enumerable` as request body
Additional URL options:
Expand Down Expand Up @@ -146,9 +176,20 @@ defmodule Req do
* `:max_redirects` - the maximum number of redirects, defaults to `10`.
Response streaming:
* `:stream` - a 2-arity function used to stream response. The first argument is a "Stream Command" tuple
described below. The second argument is a `{request, response}` tuple.
The "Stream Command" is one of:
* `{:data, data}` - a chunk of the response body.
See module documentation for an example of streaming responses.
Retry options ([`retry`](`Req.Steps.retry/1`) step):
* `:retry`: can be set to: `:safe` (default) to only retry GET/HEAD requests on HTTP 408/5xx
* `:retry` - can be set to: `:safe` (default) to only retry GET/HEAD requests on HTTP 408/5xx
responses or exceptions, `false` to never retry, and `fun` - a 1-arity function that accepts
either a `Req.Response` or an exception struct and returns boolean whether to retry
Expand Down Expand Up @@ -212,13 +253,13 @@ defmodule Req do
iex> URI.to_string(req.url)
"https://elixir-lang.org"
With mock adapter:
Fake adapter:
iex> mock = fn request ->
iex> fake = fn request ->
...> {request, Req.Response.new(status: 200, body: "it works!")}
...> end
iex>
iex> req = Req.new(adapter: mock)
iex> req = Req.new(adapter: fake)
iex> Req.get!(req).body
"it works!"
Expand Down Expand Up @@ -353,7 +394,7 @@ defmodule Req do
"""
@spec update(Req.Request.t(), options :: keyword()) :: Req.Request.t()
def update(%Req.Request{} = request, options) when is_list(options) do
request_option_names = [:method, :url, :headers, :body, :adapter]
request_option_names = [:method, :url, :headers, :body, :adapter, :stream]

{request_options, options} = Keyword.split(options, request_option_names)

Expand Down Expand Up @@ -918,6 +959,48 @@ defmodule Req do
end
end

# TODO
@doc false
def async_request(request, options \\ []) do
Req.Request.run_request(%{new(request, options) | stream: :self})
end

# TODO
@doc false
def async_request!(request, options \\ []) do
case async_request(request, options) do
{request, %Req.Response{} = response} ->
{request, response}

{_request, exception} ->
raise exception
end
end

def parse_message(%Req.Request{} = request, message) do
request.async.stream_fun.(request.async.ref, message)
end

def cancel_async_request(%Req.Request{} = request) do
request.async.cancel_fun.(request.async.ref)
end

def run_request(request, options \\ []) do
request
|> Req.update(options)
|> Req.Request.run_request()
end

def run_request!(request, options \\ []) do
case run_request(request, options) do
{request, %Req.Response{} = response} ->
{request, response}

{_request, exception} ->
raise exception
end
end

@doc """
Returns default options.
Expand Down
5 changes: 5 additions & 0 deletions lib/req/async.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Req.Async do
# TODO
@moduledoc false
defstruct [:ref, :stream_fun, :cancel_fun]
end
4 changes: 3 additions & 1 deletion lib/req/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ defmodule Req.Request do
error_steps: [],
private: %{},
registered_options: MapSet.new(),
current_request_steps: []
current_request_steps: [],
stream: nil,
async: nil

@doc """
Returns a new request struct.
Expand Down
70 changes: 68 additions & 2 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ defmodule Req.Steps do
iex> Req.get!(url, connect_options: [transport_opts: [cacerts: :public_key.cacerts_get()]])
Stream response body:
Stream response body using `Finch.stream/5`:
fun = fn request, finch_request, finch_name, finch_options ->
fun = fn
Expand Down Expand Up @@ -682,7 +682,56 @@ defmodule Req.Steps do
{request, run_finch_request(deprecated_fun.(finch_request), finch_name, finch_options)}

nil ->
{request, run_finch_request(finch_request, finch_name, finch_options)}
case request.stream do
nil ->
{request, run_finch_request(finch_request, finch_name, finch_options)}

fun when is_function(fun, 2) ->
response = Req.Response.new()

fun = fn
{:status, status}, {request, response} ->
{request, %{response | status: status}}

{:headers, headers}, {request, response} ->
{request, %{response | headers: headers}}

{:data, data}, acc ->
{:cont, result} = fun.({:data, data}, acc)
# TODO: handle {:halt, result}
result
end

case Finch.stream(finch_request, finch_name, {request, response}, fun, finch_options) do
{:ok, acc} ->
acc
end

:self ->
ref = Finch.async_request(finch_request, finch_name)

{:status, status} =
receive do
{^ref, message} ->
message
end

{:headers, headers} =
receive do
{^ref, message} ->
message
end

async = %Req.Async{
ref: ref,
stream_fun: &finch_parse_message/2,
cancel_fun: &finch_cancel/1
}

request = put_in(request.async, async)
response = Req.Response.new(status: status, headers: headers)
{request, response}
end
end
end

Expand All @@ -693,6 +742,23 @@ defmodule Req.Steps do
end
end

defp finch_parse_message(ref, {ref, {:data, data}}) do
{:ok, [{:data, data}]}
end

defp finch_parse_message(ref, {ref, :done}) do
{:ok, [:done]}
end

# TODO: handle remaining possible Finch results
defp finch_parse_message(_ref, _other) do
:unknown
end

defp finch_cancel(ref) do
Finch.cancel_async_request(ref)
end

defp finch_name(request) do
if name = request.options[:finch] do
if request.options[:connect_options] do
Expand Down
16 changes: 13 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ defmodule Req.MixProject do
NimbleCSV.RFC4180,
Plug.Test,
:brotli,
:ezstd
:ezstd,
# TODO: Remove on next Finch release
Finch
]
]
]
Expand All @@ -35,7 +37,7 @@ defmodule Req.MixProject do
def application do
[
mod: {Req.Application, []},
extra_applications: [:logger]
extra_applications: [:logger, :inets]
]
end

Expand All @@ -52,7 +54,7 @@ defmodule Req.MixProject do

defp deps do
[
{:finch, "~> 0.9"},
{:finch, "~> 0.9", finch_opts()},
{:mime, "~> 1.6 or ~> 2.0"},
{:jason, "~> 1.0"},
{:nimble_csv, "~> 1.0", optional: true},
Expand All @@ -64,6 +66,14 @@ defmodule Req.MixProject do
]
end

defp finch_opts do
if path = System.get_env("FINCH_PATH") do
[path: path]
else
[]
end
end

defp docs do
[
main: "readme",
Expand Down
Loading

0 comments on commit c149383

Please sign in to comment.