Skip to content

Commit

Permalink
Handle abrupt terminations
Browse files Browse the repository at this point in the history
  • Loading branch information
akash-akya committed Jun 9, 2024
1 parent 7d83642 commit dc24002
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
25 changes: 21 additions & 4 deletions go_src/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package main
import (
"os"
"os/exec"
"os/signal"
"syscall"
"time"
)

func execute(workdir string, args []string) error {
done := make(chan struct{})

sigs := make(chan os.Signal, 1)
input := make(chan Packet, 1)
outputDemand := make(chan Packet)
inputDemand := make(chan Packet)
Expand All @@ -20,8 +23,17 @@ func execute(workdir string, args []string) error {
logger.Printf("Command path: %v\n", proc.Path)

output := startCommandPipeline(proc, input, inputDemand, outputDemand)

// Capture common signals.
// Setting notify for SIGPIPE is important to capture and without that
// we won't be able to handle abrupt beam vm terminations
// Also, SIGPIPE behaviour in golang is bit complex,
// see: https://pkg.go.dev/os/signal@go1.22.4#hdr-SIGPIPE
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)

// go handleSignals(input, outputDemand, done)
go dispatchStdin(input, outputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, done)
go collectStdout(proc.Process.Pid, output, inputDemand, sigs, done)

// wait for pipline to exit
<-done
Expand All @@ -33,8 +45,8 @@ func execute(workdir string, args []string) error {
logger.Printf("Command exited with error: %v\n", e)
os.Exit(3)
}
// TODO: return Stderr and exit stauts to beam process
logger.Printf("Command exited: %#v\n", err)
// TODO: return Stderr and exit status to beam process
logger.Printf("Command exited\n")
return err
}

Expand All @@ -57,15 +69,20 @@ func dispatchStdin(input chan<- Packet, outputDemand chan<- Packet, done chan st
stdinReader(dispatch, done)
}

func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, done chan struct{}) {
func collectStdout(pid int, output <-chan Packet, inputDemand <-chan Packet, sigs <-chan os.Signal, done chan struct{}) {
defer func() {
close(done)
}()

merged := func() (Packet, bool) {
select {
case sig := <-sigs:
logger.Printf("Received OS Signal: ", sig)
return Packet{}, false

case v, ok := <-inputDemand:
return v, ok

case v, ok := <-output:
return v, ok
}
Expand Down
5 changes: 4 additions & 1 deletion go_src/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ func writePacket(tag uint8, data []byte) {
_, writeErr := os.Stdout.Write(buf[:payloadLen+4])
if writeErr != nil {
switch writeErr.(type) {
// ignore broken pipe or closed pipe errors
// ignore broken pipe or closed pipe errors here.
// currently readCommandStdout closes output chan, making the
// flow break.
case *os.PathError:
logger.Printf("os.PathError: ", writeErr)
return
default:
fatal(writeErr)
Expand Down

0 comments on commit dc24002

Please sign in to comment.