Skip to content

Commit

Permalink
TEP-0011: Add StdoutConfig and StderrConfig to steps.
Browse files Browse the repository at this point in the history
Implements Option 1 of [TEP-0011](https://github.com/tektoncd/community/blob/master/teps/0011-redirecting-step-output-streams.md)

Resurrects [#3103](#3103)

Closes [#2925](#2925)

Signed-off-by: Brad Beck <bradley.beck@gmail.com>
  • Loading branch information
bradbeck authored and tekton-robot committed Jul 15, 2022
1 parent 7ac4c0f commit a0c7d31
Show file tree
Hide file tree
Showing 24 changed files with 1,313 additions and 17 deletions.
7 changes: 7 additions & 0 deletions cmd/entrypoint/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ The following flags are available:
- `-wait_file_content`: expects the `wait_file` to contain actual
contents. It will continue watching for `wait_file` until it has
content.
- `-stdout_path`: If specified, the stdout of the sub-process will be
copied to the given path on the local filesystem.
- `-stderr_path`: If specified, the stderr of the sub-process will be
copied to the given path on the local filesystem. It can be set to the
same value as `{{stdout_path}}` so both streams are copied to the same
file. However, there is no ordering guarantee on data copied from both
streams.

Any extra positional arguments are passed to the original entrypoint command.

Expand Down
121 changes: 121 additions & 0 deletions cmd/entrypoint/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright 2022 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"errors"
"io"
"math"
"os"
"time"
)

type ioResult struct {
numBytes int
err error
}

// readAsync implements a non-blocking read.
func readAsync(r io.Reader, p []byte) <-chan ioResult {
resultCh := make(chan ioResult, 1)
go func() {
defer close(resultCh)
n, err := r.Read(p)
resultCh <- ioResult{n, err}
}()
return resultCh
}

// copyAsync performs a non-blocking copy from src to dst.
func copyAsync(dst io.Writer, src io.Reader, stopCh <-chan struct{}) <-chan ioResult {
resultCh := make(chan ioResult, 1)
go func() {
defer close(resultCh)

buf := make([]byte, 1024)
result := ioResult{}
readCh := readAsync(src, buf)
stopped := false
done := false
timer := time.NewTimer(time.Duration(math.MaxInt64))
defer timer.Stop()

for !done {
// If the stop channel is signalled, continue the loop to read the rest of the available
// data with a short timeout instead of a non-blocking read to mitigate the race between
// this loop and Read() running in another goroutine.
if stopped {
if !timer.Stop() {
<-timer.C
}
timer.Reset(100 * time.Millisecond)
}
select {
case r := <-readCh:
if r.numBytes != 0 {
nw, err := dst.Write(buf[:r.numBytes])
result.numBytes += nw
if err != nil {
result.err = err
done = true
} else if nw < r.numBytes {
result.err = io.ErrShortWrite
done = true
}
}
if r.err != nil {
if !errors.Is(r.err, io.EOF) {
result.err = r.err
}
done = true
}
if !done {
readCh = readAsync(src, buf)
}
case <-stopCh:
stopped = true
stopCh = nil
case <-timer.C:
done = true
}
}

resultCh <- result
}()
return resultCh
}

// asyncWriter creates a write that duplicates its writes to the provided writer asynchronously.
func asyncWriter(w io.Writer, stopCh <-chan struct{}) (io.Writer, <-chan error, error) {
pr, pw, err := os.Pipe()
if err != nil {
return nil, nil, err
}

doneCh := make(chan error, 1)
go func() {
defer close(doneCh)

if err := (<-copyAsync(w, pr, stopCh)).err; err != nil {
doneCh <- err
}
pr.Close()
pw.Close()
}()

return pw, doneCh, nil
}
92 changes: 92 additions & 0 deletions cmd/entrypoint/io_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2022 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"bytes"
"errors"
"io"
"testing"
)

func TestCopyAsyncEOF(t *testing.T) {
stopCh := make(chan struct{}, 1)
defer close(stopCh)

pr, pw := io.Pipe()
defer pr.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
pw.Write([]byte(expectedString))
pw.Close()

if c := <-copyCh; c.err != nil {
t.Fatalf("Unexpected error: %v", c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}

func TestCopyAsyncStop(t *testing.T) {
stopCh := make(chan struct{}, 1)

pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
pw.Write([]byte(expectedString))

close(stopCh)

if c := <-copyCh; c.err != nil {
t.Fatalf("Unexpected error: %v", c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}

func TestCopyAsyncError(t *testing.T) {
stopCh := make(chan struct{}, 1)
defer close(stopCh)

pr, pw := io.Pipe()
defer pr.Close()

buf := &bytes.Buffer{}
copyCh := copyAsync(buf, pr, stopCh)

expectedString := "hello world"
expectedError := errors.New("test error")
pw.Write([]byte(expectedString))
pw.CloseWithError(expectedError)

if c := <-copyCh; !errors.Is(c.err, expectedError) {
t.Errorf("Expected error %v but got %v", expectedError, c.err)
}
if buf.String() != expectedString {
t.Errorf("got: %v, wanted: %v", buf.String(), expectedString)
}
}
19 changes: 12 additions & 7 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (
terminationPath = flag.String("termination_path", "/tekton/termination", "If specified, file to write upon termination")
results = flag.String("results", "", "If specified, list of file names that might contain task results")
timeout = flag.Duration("timeout", time.Duration(0), "If specified, sets timeout for step")
stdoutPath = flag.String("stdout_path", "", "If specified, file to copy stdout to")
stderrPath = flag.String("stderr_path", "", "If specified, file to copy stderr to")
breakpointOnFailure = flag.Bool("breakpoint_on_failure", false, "If specified, expect steps to not skip on failure")
onError = flag.String("on_error", "", "Set to \"continue\" to ignore an error and continue when a container terminates with a non-zero exit code."+
" Set to \"stopAndFail\" to declare a failure with a step error and stop executing the rest of the steps.")
Expand Down Expand Up @@ -130,13 +132,16 @@ func main() {
}

e := entrypoint.Entrypointer{
Command: append(cmd, commandArgs...),
WaitFiles: strings.Split(*waitFiles, ","),
WaitFileContent: *waitFileContent,
PostFile: *postFile,
TerminationPath: *terminationPath,
Waiter: &realWaiter{waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure},
Runner: &realRunner{},
Command: append(cmd, commandArgs...),
WaitFiles: strings.Split(*waitFiles, ","),
WaitFileContent: *waitFileContent,
PostFile: *postFile,
TerminationPath: *terminationPath,
Waiter: &realWaiter{waitPollingInterval: defaultWaitPollingInterval, breakpointOnFailure: *breakpointOnFailure},
Runner: &realRunner{
stdoutPath: *stdoutPath,
stderrPath: *stderrPath,
},
PostWriter: &realPostWriter{},
Results: strings.Split(*results, ","),
Timeout: timeout,
Expand Down
94 changes: 92 additions & 2 deletions cmd/entrypoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ package main

import (
"context"
"io"
"log"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"

"github.com/tektoncd/pipeline/pkg/entrypoint"
Expand All @@ -34,11 +38,34 @@ import (

// realRunner actually runs commands.
type realRunner struct {
signals chan os.Signal
sync.Mutex
signals chan os.Signal
signalsClosed bool
stdoutPath string
stderrPath string
}

var _ entrypoint.Runner = (*realRunner)(nil)

// close closes the signals channel which is used to receive system signals.
func (rr *realRunner) close() {
rr.Lock()
defer rr.Unlock()
if rr.signals != nil && !rr.signalsClosed {
close(rr.signals)
rr.signalsClosed = true
}
}

// signal allows the caller to simulate the sending of a system signal.
func (rr *realRunner) signal(signal os.Signal) {
rr.Lock()
defer rr.Unlock()
if rr.signals != nil && !rr.signalsClosed {
rr.signals <- signal
}
}

// Run executes the entrypoint.
func (rr *realRunner) Run(ctx context.Context, args ...string) error {
if len(args) == 0 {
Expand All @@ -50,13 +77,76 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
if rr.signals == nil {
rr.signals = make(chan os.Signal, 1)
}
defer close(rr.signals)
defer rr.close()
signal.Notify(rr.signals)
defer signal.Reset()

cmd := exec.CommandContext(ctx, name, args...)
stopCh := make(chan struct{}, 1)
defer close(stopCh)

cmd.Stdout = os.Stdout
var stdoutFile *os.File
if rr.stdoutPath != "" {
var err error
var doneCh <-chan error
// Create directory if it doesn't already exist
if err = os.MkdirAll(filepath.Dir(rr.stdoutPath), os.ModePerm); err != nil {
return err
}
if stdoutFile, err = os.Create(rr.stdoutPath); err != nil {
return err
}
// We use os.Pipe in asyncWriter to copy stdout instead of cmd.StdoutPipe or providing an
// io.Writer directly because otherwise Go would wait for the underlying fd to be closed by the
// child process before returning from cmd.Wait even if the process is no longer running. This
// would cause a deadlock if the child spawns a long running descendant process before exiting.
if cmd.Stdout, doneCh, err = asyncWriter(io.MultiWriter(os.Stdout, stdoutFile), stopCh); err != nil {
return err
}
go func() {
if err := <-doneCh; err != nil {
log.Fatalf("Copying stdout: %v", err)
}
stdoutFile.Close()
}()
}

cmd.Stderr = os.Stderr
var stderrFile *os.File
if rr.stderrPath != "" {
var err error
var doneCh <-chan error
if rr.stderrPath == rr.stdoutPath {
fd, err := syscall.Dup(int(stdoutFile.Fd()))
if err != nil {
return err
}
stderrFile = os.NewFile(uintptr(fd), rr.stderrPath)
} else {
// Create directory if it doesn't already exist
if err = os.MkdirAll(filepath.Dir(rr.stderrPath), os.ModePerm); err != nil {
return err
}
if stderrFile, err = os.Create(rr.stderrPath); err != nil {
return err
}
}
// We use os.Pipe in asyncWriter to copy stderr instead of cmd.StderrPipe or providing an
// io.Writer directly because otherwise Go would wait for the underlying fd to be closed by the
// child process before returning from cmd.Wait even if the process is no longer running. This
// would cause a deadlock if the child spawns a long running descendant process before exiting.
if cmd.Stderr, doneCh, err = asyncWriter(io.MultiWriter(os.Stderr, stderrFile), stopCh); err != nil {
return err
}
go func() {
if err := <-doneCh; err != nil {
log.Fatalf("Copying stderr: %v", err)
}
stderrFile.Close()
}()
}

// dedicated PID group used to forward signals to
// main process and all children
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
Expand Down
Loading

0 comments on commit a0c7d31

Please sign in to comment.