Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle abrupt VM termination #31

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,4 @@ jobs:

- run: mix deps.get
- run: mix compile --warnings-as-errors
- run: mix test --trace
- run: mix test --trace --exclude os:unix
25 changes: 21 additions & 4 deletions go_src/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package main
import (
"os"
"os/exec"
"os/signal"
"syscall"
"time"
)

func execute(workdir string, args []string) error {
done := make(chan struct{})

sigs := make(chan os.Signal, 1)
input := make(chan Packet, 1)
outputDemand := make(chan Packet)
inputDemand := make(chan Packet)
Expand All @@ -20,8 +23,17 @@ func execute(workdir string, args []string) error {
logger.Printf("Command path: %v\n", proc.Path)

output := startCommandPipeline(proc, input, inputDemand, outputDemand)

// Capture common signals.
// Setting notify for SIGPIPE is important to capture and without that
// we won't be able to handle abrupt beam vm terminations
// Also, SIGPIPE behaviour in golang is bit complex,
// see: https://pkg.go.dev/os/signal@go1.22.4#hdr-SIGPIPE
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)

// go handleSignals(input, outputDemand, done)
go dispatchStdin(input, outputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, sigs, done)

// wait for pipline to exit
<-done
Expand All @@ -33,8 +45,8 @@ func execute(workdir string, args []string) error {
logger.Printf("Command exited with error: %v\n", e)
os.Exit(3)
}
// TODO: return Stderr and exit stauts to beam process
logger.Printf("Command exited: %#v\n", err)
// TODO: return Stderr and exit status to beam process
logger.Printf("Command exited\n")
return err
}

Expand All @@ -57,15 +69,20 @@ func dispatchStdin(input chan<- Packet, outputDemand chan<- Packet, done chan st
stdinReader(dispatch, done)
}

func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, done chan struct{}) {
func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, sigs <-chan os.Signal, done chan struct{}) {
defer func() {
close(done)
}()

merged := func() (Packet, bool) {
select {
case sig := <-sigs:
logger.Printf("Received OS Signal: ", sig)
return Packet{}, false

case v, ok := <-inputDemand:
return v, ok

case v, ok := <-output:
return v, ok
}
Expand Down
5 changes: 4 additions & 1 deletion go_src/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ func writePacket(tag uint8, data []byte) {
_, writeErr := os.Stdout.Write(buf[:payloadLen+4])
if writeErr != nil {
switch writeErr.(type) {
// ignore broken pipe or closed pipe errors
// ignore broken pipe or closed pipe errors here.
// currently readCommandStdout closes output chan, making the
// flow break.
case *os.PathError:
logger.Printf("os.PathError: ", writeErr)
return
default:
fatal(writeErr)
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
%{
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"credo": {:hex, :credo, "1.7.6", "b8f14011a5443f2839b04def0b252300842ce7388f3af177157c86da18dfbeea", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "146f347fb9f8cbc5f7e39e3f22f70acbef51d441baa6d10169dd604bfbc55296"},
"credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.33.0", "690562b153153c7e4d455dc21dab86e445f66ceba718defe64b0ef6f0bd83ba0", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "3f69adc28274cb51be37d09b03e4565232862a4b10288a3894587b0131412124"},
"ex_doc": {:hex, :ex_doc, "0.34.1", "9751a0419bc15bc7580c73fde506b17b07f6402a1e5243be9e0f05a68c723368", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d441f1a86a235f59088978eff870de2e815e290e44a8bd976fe5d64470a4c9d2"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
Expand Down
80 changes: 80 additions & 0 deletions test/ex_cmd_exit_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule ExCmdExitTest do
use ExUnit.Case, async: false

# currently running `elixir` command is not working in Windows
@tag os: :unix
test "if it kills external command on abnormal vm exit" do
ex_cmd_expr = ~S{ExCmd.stream!(["cat"]) |> Stream.run()}

port =
Port.open(
{:spawn, "elixir -S mix run -e '#{ex_cmd_expr}'"},
[:stderr_to_stdout, :use_stdio, :exit_status, :binary, :hide]
)

port_info = Port.info(port)
os_pid = port_info[:os_pid]

on_exit(fn ->
os_process_alive?(os_pid) && os_process_kill(os_pid)
end)

assert os_process_alive?(os_pid)

[_, cmd_pid] = capture_output!(port, ~r/os pid: ([0-9]+)/)

cmd_pid = String.to_integer(cmd_pid)
assert os_process_alive?(cmd_pid)

assert {:ok, _msg} = os_process_kill(os_pid)

# wait for the cleanup
:timer.sleep(5000)

refute os_process_alive?(os_pid)
refute os_process_alive?(cmd_pid)
end

defp os_process_alive?(pid) do
if windows?() do
case cmd(["tasklist", "/fi", "pid eq #{pid}"]) do
{"INFO: No tasks are running which match the specified criteria.\r\n", 0} -> false
{_, 0} -> true
end
else
match?({_, 0}, cmd(["ps", "-p", to_string(pid)]))
end
end

defp os_process_kill(pid) do
if windows?() do
cmd(["taskkill", "/pid", "#{pid}", "/f"])
else
cmd(["kill", "-SIGKILL", "#{pid}"])
end
|> case do
{msg, 0} -> {:ok, msg}
{msg, status} -> {:error, status, msg}
end
end

defp windows?, do: :os.type() == {:win32, :nt}

def cmd([cmd | args]), do: System.cmd(cmd, args, stderr_to_stdout: true)

defp capture_output!(port, regexp, acc \\ "") do
receive do
{^port, {:data, bin}} ->
output = acc <> bin

if match = Regex.run(regexp, output) do
match
else
capture_output!(port, regexp, output)
end
after
5000 ->
raise "timeout while waiting for the iex prompt, acc: #{acc}"
end
end
end
Loading