Skip to content

Commit

Permalink
Handle abrupt terminations
Browse files Browse the repository at this point in the history
  • Loading branch information
akash-akya committed Jun 21, 2024
1 parent 7d83642 commit b6633e0
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 5 deletions.
25 changes: 21 additions & 4 deletions go_src/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package main
import (
"os"
"os/exec"
"os/signal"
"syscall"
"time"
)

func execute(workdir string, args []string) error {
done := make(chan struct{})

sigs := make(chan os.Signal, 1)
input := make(chan Packet, 1)
outputDemand := make(chan Packet)
inputDemand := make(chan Packet)
Expand All @@ -20,8 +23,17 @@ func execute(workdir string, args []string) error {
logger.Printf("Command path: %v\n", proc.Path)

output := startCommandPipeline(proc, input, inputDemand, outputDemand)

// Capture common signals.
// Setting notify for SIGPIPE is important to capture and without that
// we won't be able to handle abrupt beam vm terminations
// Also, SIGPIPE behaviour in golang is bit complex,
// see: https://pkg.go.dev/os/signal@go1.22.4#hdr-SIGPIPE
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)

// go handleSignals(input, outputDemand, done)
go dispatchStdin(input, outputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, sigs, done)

// wait for pipline to exit
<-done
Expand All @@ -33,8 +45,8 @@ func execute(workdir string, args []string) error {
logger.Printf("Command exited with error: %v\n", e)
os.Exit(3)
}
// TODO: return Stderr and exit stauts to beam process
logger.Printf("Command exited: %#v\n", err)
// TODO: return Stderr and exit status to beam process
logger.Printf("Command exited\n")
return err
}

Expand All @@ -57,15 +69,20 @@ func dispatchStdin(input chan<- Packet, outputDemand chan<- Packet, done chan st
stdinReader(dispatch, done)
}

func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, done chan struct{}) {
func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, sigs <-chan os.Signal, done chan struct{}) {
defer func() {
close(done)
}()

merged := func() (Packet, bool) {
select {
case sig := <-sigs:
logger.Printf("Received OS Signal: ", sig)
return Packet{}, false

case v, ok := <-inputDemand:
return v, ok

case v, ok := <-output:
return v, ok
}
Expand Down
5 changes: 4 additions & 1 deletion go_src/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ func writePacket(tag uint8, data []byte) {
_, writeErr := os.Stdout.Write(buf[:payloadLen+4])
if writeErr != nil {
switch writeErr.(type) {
// ignore broken pipe or closed pipe errors
// ignore broken pipe or closed pipe errors here.
// currently readCommandStdout closes output chan, making the
// flow break.
case *os.PathError:
logger.Printf("os.PathError: ", writeErr)
return
default:
fatal(writeErr)
Expand Down
78 changes: 78 additions & 0 deletions test/ex_cmd_exit_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
defmodule ExCmdExitTest do
use ExUnit.Case, async: false

test "if it kills external command on abnormal vm exit" do

Check failure on line 4 in test/ex_cmd_exit_test.exs

View workflow job for this annotation

GitHub Actions / Windows OTP 25 / Elixir 1.14

test if it kills external command on abnormal vm exit (ExCmdExitTest)

Check failure on line 4 in test/ex_cmd_exit_test.exs

View workflow job for this annotation

GitHub Actions / Windows OTP 25 / Elixir 1.14

test if it kills external command on abnormal vm exit (ExCmdExitTest)
ex_cmd_expr = ~S{ExCmd.stream!(["cat"]) |> Stream.run()}

port =
Port.open(
{:spawn, "elixir -S mix run -e '#{ex_cmd_expr}'"},
[:stderr_to_stdout, :use_stdio, :exit_status, :binary, :hide]
)

port_info = Port.info(port)
os_pid = port_info[:os_pid]

on_exit(fn ->
os_process_alive?(os_pid) && os_process_kill(os_pid)
end)

assert os_process_alive?(os_pid)

[_, cmd_pid] = capture_output!(port, ~r/os pid: ([0-9]+)/)

cmd_pid = String.to_integer(cmd_pid)
assert os_process_alive?(cmd_pid)

assert {:ok, _msg} = os_process_kill(os_pid)

# wait for the cleanup
:timer.sleep(5000)

refute os_process_alive?(os_pid)
refute os_process_alive?(cmd_pid)
end

defp os_process_alive?(pid) do
if windows?() do
case cmd(["tasklist", "/fi", "pid eq #{pid}"]) do
{"INFO: No tasks are running which match the specified criteria.\r\n", 0} -> false
{_, 0} -> true
end
else
match?({_, 0}, cmd(["ps", "-p", to_string(pid)]))
end
end

defp os_process_kill(pid) do
if windows?() do
cmd(["taskkill", "/pid", "#{pid}", "/f"])
else
cmd(["kill", "-SIGKILL", "#{pid}"])
end
|> case do
{msg, 0} -> {:ok, msg}
{msg, status} -> {:error, status, msg}
end
end

defp windows?, do: :os.type() == {:win32, :nt}

def cmd([cmd | args]), do: System.cmd(cmd, args, stderr_to_stdout: true)

defp capture_output!(port, regexp, acc \\ "") do
receive do
{^port, {:data, bin}} ->
output = acc <> bin

if match = Regex.run(regexp, output) do
match
else
capture_output!(port, regexp, output)
end
after
5000 ->
raise "timeout while waiting for the iex prompt, acc: #{acc}"
end
end
end

0 comments on commit b6633e0

Please sign in to comment.