Skip to content

Commit

Permalink
Merge pull request #33 from akash-akya/dev
Browse files Browse the repository at this point in the history
Fix stderr FD leak
  • Loading branch information
akash-akya authored Dec 15, 2023
2 parents 774ce7e + e014fdc commit 6ecf4e0
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 28 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ jobs:
strategy:
matrix:
include:
- elixir: 1.10.x
otp: 22.x
- elixir: 1.12.x
otp: 23.x
- elixir: 1.14.x
Expand Down
47 changes: 39 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,45 @@
[![Hex.pm](https://img.shields.io/hexpm/v/exile.svg)](https://hex.pm/packages/exile)
[![docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/exile/)

Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues.

Exile is built around the idea of having demand-driven, asynchronous
interaction with external process. Think of streaming a video through
`ffmpeg` to serve a web request. Exile internally uses NIF. See
[Rationale](#rationale) for details. It also provides stream
abstraction for interacting with an external program. For example,
getting audio out of a stream is as simple as
Exile is an alternative to
[ports](https://hexdocs.pm/elixir/Port.html) for running external
programs. It let you stream input and output to external program with
back-pressure, non-blocking IO. Also, it fixes other port related
issues such as selectively closing stdin.

### Port IO Issue

With [Port](https://hexdocs.pm/elixir/Port.html) if you run external
program which generates lot of output to stdout. Something like
streaming video using `ffmpeg` to serve a web request. If you try
this with port then quickly going to run out of memory. Because port
IO is not not demand driven. It consumes output from stdout as soon as
it is available and `send` it to process mailbox. And as you know beam
process mailbox is unbounded, so output sits there waiting to be `receive`d.

#### Lets take an example.

Memory consumption with Port

```elixir
Port.open({:spawn_executable, "/bin/cat"}, [{:args, ["/dev/random"]}, {:line, 10}, :binary, :use_stdio])
```

![Port memory consumption](./images/port.png)

#### Memory consumption with Exile

```elixir
Exile.stream!(~w(cat /dev/random))
|> Enum.each(fn data ->
IO.puts(IO.iodata_length(data))
end)
```

![Exile memory consumption](./images/exile.png)

Exile achieves this by implementing demand-driven, asynchronous IO mechanism with external process using NIF.
See [Rationale](#rationale) for details. For example, getting audio out of a stream is as simple as

``` elixir
Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535))
Expand Down
2 changes: 1 addition & 1 deletion c_src/exile.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc,

return make_ok(env, term);

error_exit:
error_exit:
enif_release_resource(fd);
return ATOM_ERROR;
}
Expand Down
1 change: 0 additions & 1 deletion c_src/spawner.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ static int exec_process(char const *bin, char *const *args, int socket,
}

if (strcmp(stderr_str, "consume") == 0) {
debug("== %d", strcmp(stderr_str, "consume"));
close(STDERR_FILENO);
close(r_cmderr);
if (dup2(w_cmderr, STDERR_FILENO) < 0) {
Expand Down
Binary file added images/exile.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/port.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 14 additions & 15 deletions lib/exile/process/exec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,22 @@ defmodule Exile.Process.Exec do

alias Exile.Process.Nif
alias Exile.Process.Pipe
alias Exile.Process.State

@type args :: %{
cmd_with_args: [String.t()],
cd: String.t(),
env: [{String.t(), String.t()}]
}

@spec start(args, boolean()) :: %{
@spec start(args, State.stderr_mode()) :: %{
port: port,
stdin: non_neg_integer(),
stdout: non_neg_integer(),
stderr: non_neg_integer()
}
def start(
%{
cmd_with_args: cmd_with_args,
cd: cd,
env: env
},
stderr
) do
def start(args, stderr) do
%{cmd_with_args: cmd_with_args, cd: cd, env: env} = args
socket_path = socket_path()
{:ok, sock} = :socket.open(:local, :stream, :default)

Expand Down Expand Up @@ -78,8 +73,8 @@ defmodule Exile.Process.Exec do

@socket_timeout 2000

@spec receive_fds(:socket.socket(), boolean) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()}
defp receive_fds(lsock, stderr) do
@spec receive_fds(:socket.socket(), State.stderr_mode()) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()}
defp receive_fds(lsock, stderr_mode) do
{:ok, sock} = :socket.accept(lsock, @socket_timeout)

try do
Expand All @@ -91,12 +86,16 @@ defmodule Exile.Process.Exec do
# FDs are managed by the NIF resource life-cycle
{:ok, stdout} = Nif.nif_create_fd(stdout_fd)
{:ok, stdin} = Nif.nif_create_fd(stdin_fd)
{:ok, stderr} = Nif.nif_create_fd(stderr_fd)

{:ok, stderr} =
if stderr == :consume do
Nif.nif_create_fd(stderr_fd)
stderr =
if stderr_mode == :consume do
stderr
else
{:ok, nil}
# we have to explicitly close FD passed over socket.
# Since it will be tracked by the OS and kept open until we close.
Nif.nif_close(stderr)
nil
end

{stdin, stdout, stderr}
Expand Down
4 changes: 3 additions & 1 deletion lib/exile/process/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Exile.Process.State do

@type read_mode :: :stdout | :stderr | :stdout_or_stderr

@type stderr_mode :: :console | :disable | :consume

@type pipes :: %{
stdin: Pipe.t(),
stdout: Pipe.t(),
Expand All @@ -27,7 +29,7 @@ defmodule Exile.Process.State do
port: port(),
pipes: pipes,
status: status,
stderr: :console | :disable | :consume,
stderr: stderr_mode,
operations: Operations.t(),
exit_ref: reference(),
monitor_ref: reference()
Expand Down
27 changes: 27 additions & 0 deletions test/exile/sync_process_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,33 @@ defmodule Exile.SyncProcessTest do
assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor)
end

test "FDs are not leaked" do
before_count = opened_pipes()

for _ <- 1..100 do
{:ok, s} = Process.start_link(~w(date))
:ok = Process.close_stdin(s)
assert {:ok, _} = Process.read_any(s, 100)
assert :eof = Process.read_any(s, 100)
assert {:ok, 0} = Process.await_exit(s, 100)
end

# let the dust settle
:timer.sleep(2000)

after_count = opened_pipes()

assert before_count == after_count
end

defp opened_pipes do
{pipe_count, 0} = System.shell(~s(lsof -a -p #{:os.getpid()} | grep " PIPE " | wc -l))

pipe_count
|> String.trim()
|> String.to_integer()
end

defp stop_all_children(sup) do
DynamicSupervisor.which_children(sup)
|> Enum.each(fn {_, pid, _, _} ->
Expand Down

0 comments on commit 6ecf4e0

Please sign in to comment.