Skip to content

Commit

Permalink
fix(docker): fix streaming of combined stdout/stderr (#2368)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Mar 12, 2020
1 parent 64b6f3a commit 09ec9a0
Showing 1 changed file with 35 additions and 23 deletions.
58 changes: 35 additions & 23 deletions workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"os/exec"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -69,21 +70,17 @@ func (d *DockerExecutor) CopyFile(containerID string, sourcePath string, destPat
return nil
}

type multiReadCloser struct {
type cmdCloser struct {
io.Reader
closers map[string]io.Closer
cmd *exec.Cmd
}

func (mc *multiReadCloser) Close() error {
var lastErr error
for name, c := range mc.closers {
err := c.Close()
if err != nil {
log.Errorf("error closing docker logs %s pipe , %v", name, err)
lastErr = err
}
func (c *cmdCloser) Close() error {
err := c.cmd.Wait()
if err != nil {
return errors.InternalWrapError(err)
}
return lastErr
return nil
}

func (d *DockerExecutor) GetOutputStream(containerID string, combinedOutput bool) (io.ReadCloser, error) {
Expand All @@ -95,27 +92,42 @@ func (d *DockerExecutor) GetOutputStream(containerID string, combinedOutput bool
return nil, errors.InternalWrapError(err)
}

reader := stdout

if combinedOutput {
stderr, err := cmd.StderrPipe()
if !combinedOutput {
err = cmd.Start()
if err != nil {
return nil, errors.InternalWrapError(err)
}
reader = &multiReadCloser{
Reader: io.MultiReader(stdout, stderr),
closers: map[string]io.Closer{
"stdout": stdout,
"stderr": stderr,
},
}
return stdout, nil
}

stderr, err := cmd.StderrPipe()
if err != nil {
return nil, errors.InternalWrapError(err)
}

err = cmd.Start()
if err != nil {
return nil, errors.InternalWrapError(err)
}
return reader, nil

wg := &sync.WaitGroup{}
wg.Add(2)
reader, writer := io.Pipe()
go func() {
defer wg.Done()
_, _ = io.Copy(writer, stdout)
}()
go func() {
defer wg.Done()
_, _ = io.Copy(writer, stderr)
}()

go func() {
defer writer.Close()
wg.Wait()
}()

return &cmdCloser{Reader: reader, cmd: cmd}, nil
}

func (d *DockerExecutor) WaitInit() error {
Expand Down

0 comments on commit 09ec9a0

Please sign in to comment.