Skip to content

Commit

Permalink
The cancellation of taskruns is now done through the entrypoint binary
Browse files Browse the repository at this point in the history
through a new flag called 'stop_on_cancel'. This removes the need for
deleting the pods to cancel a taskrun, allowing examination of the logs
on the pods from cancelled taskruns. Part of work on issue tektoncd#3238

Signed-off-by: chengjoey <zchengjoey@gmail.com>
  • Loading branch information
chengjoey committed Apr 10, 2023
1 parent f543fdb commit bc96a34
Show file tree
Hide file tree
Showing 17 changed files with 533 additions and 27 deletions.
10 changes: 10 additions & 0 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
enableSpire = flag.Bool("enable_spire", false, "If specified by configmap, this enables spire signing and verification")
socketPath = flag.String("spire_socket_path", "unix:///spiffe-workload-api/spire-agent.sock", "Experimental: The SPIRE agent socket for SPIFFE workload API.")
resultExtractionMethod = flag.String("result_from", featureFlags.ResultExtractionMethodTerminationMessage, "The method using which to extract results from tasks. Default is using the termination message.")
stopOnCancel = flag.Bool("stop_on_cancel", false, "If specified, stop the step when the taskrun is cancelled")
)

const (
Expand Down Expand Up @@ -164,6 +165,7 @@ func main() {
StepMetadataDir: *stepMetadataDir,
SpireWorkloadAPI: spireWorkloadAPI,
ResultExtractionMethod: *resultExtractionMethod,
StopOnCancel: *stopOnCancel,
}

// Copy any creds injected by the controller into the $HOME directory of the current
Expand All @@ -181,6 +183,14 @@ func main() {
case termination.MessageLengthError:
log.Print(err.Error())
os.Exit(1)
case entrypoint.ContextError:
if errors.Is(err, entrypoint.ContextCanceledError) {
log.Print("Step was cancelled")
os.Exit(int(syscall.SIGKILL))
} else {
log.Print(err.Error())
os.Exit(1)
}
case *exec.ExitError:
// Copied from https://stackoverflow.com/questions/10385551/get-exit-code-go
// This works on both Unix and Windows. Although
Expand Down
8 changes: 7 additions & 1 deletion cmd/entrypoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,15 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
}()

// Wait for command to exit
// as os.exec [note](https://github.com/golang/go/blob/ee522e2cdad04a43bc9374776483b6249eb97ec9/src/os/exec/exec.go#L897-L906)
// cmd.Wait prefer Process error over context error
// but we want to return context error instead
if err := cmd.Wait(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return context.DeadlineExceeded
return entrypoint.ContextDeadlineExceededError
}
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ContextCanceledError
}
return err
}
Expand Down
21 changes: 20 additions & 1 deletion cmd/entrypoint/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"syscall"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

// TestRealRunnerSignalForwarding will artificially put an interrupt signal (SIGINT) in the rr.signals chan.
Expand Down Expand Up @@ -183,10 +185,27 @@ func TestRealRunnerTimeout(t *testing.T) {
defer cancel()

if err := rr.Run(ctx, "sleep", "0.01"); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, entrypoint.ContextDeadlineExceededError) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't timeout")
}
}

func TestRealRunnerCanceled(t *testing.T) {
rr := realRunner{}
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
cancel()
}()

if err := rr.Run(ctx, "sleep", "3"); err != nil {
if !errors.Is(err, entrypoint.ContextCanceledError) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't cancel")
}
}
40 changes: 40 additions & 0 deletions cmd/entrypoint/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package main

import (
"context"
"errors"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -74,6 +76,44 @@ func (rw *realWaiter) Wait(file string, expectContent bool, breakpointOnFailure
}
}

func (rw *realWaiter) WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error {
if file == "" {
return nil
}
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ContextCanceledError
}
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return entrypoint.ContextDeadlineExceededError
}
return nil
case <-time.After(rw.waitPollingInterval):
}
if info, err := os.Stat(file); err == nil {
if !expectContent || info.Size() > 0 {
return nil
}
} else if !os.IsNotExist(err) {
return fmt.Errorf("waiting for %q: %w", file, err)
}
// When a .err file is read by this step, it means that a previous step has failed
// We wouldn't want this step to stop executing because the previous step failed during debug
// That is counterproductive to debugging
// Hence we disable skipError here so that the other steps in the failed taskRun can continue
// executing if breakpointOnFailure is enabled for the taskRun
// TLDR: Do not return skipError when breakpointOnFailure is enabled as it breaks execution of the TaskRun
if _, err := os.Stat(file + ".err"); err == nil {
if breakpointOnFailure {
return nil
}
return skipError("error file present, bail and skip the step")
}
}
}

type skipError string

func (e skipError) Error() string {
Expand Down
59 changes: 59 additions & 0 deletions cmd/entrypoint/waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package main

import (
"context"
"errors"
"os"
"strings"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

const testWaitPollingInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -191,3 +194,59 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) {
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitWithContextCanceled(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithCancel(context.Background())
rw := realWaiter{}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context canceled error")
}
errCh <- err
}()
cancel()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ContextCanceledError) {
t.Errorf("expected ContextCanceledError, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ContextCanceledError")
}
}

func TestRealWaiterWaitWithTimeout(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
rw := realWaiter{}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context deadline error")
}
errCh <- err
}()
delay := time.NewTimer(2 * time.Second)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ContextDeadlineExceededError) {
t.Errorf("expected ContextDeadlineExceededError, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ContextDeadlineExceededError")
}
}
3 changes: 3 additions & 0 deletions docs/taskruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,9 @@ When you cancel a TaskRun, the running pod associated with that `TaskRun` is del
means that the logs of the `TaskRun` are not preserved. The deletion of the `TaskRun` pod is necessary
in order to stop `TaskRun` step containers from running.

**Note: if `enable-cancel-using-entrypoint` is set to
`"true"` in the `feature-flags`, the pod associated with that `TaskRun` will not be deleted**

Example of cancelling a `TaskRun`:

```yaml
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/config/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
DefaultEnableAPIFields = StableAPIFields
// DefaultSendCloudEventsForRuns is the default value for "send-cloudevents-for-runs".
DefaultSendCloudEventsForRuns = false
// DefaultEnableCancelUsingEntrypoint is the default value for "enable-cancel-using-entrypoint"
DefaultEnableCancelUsingEntrypoint = false
// EnforceNonfalsifiabilityWithSpire is the value used for "enable-nonfalsifiability" when SPIRE is used to enable non-falsifiability.
EnforceNonfalsifiabilityWithSpire = "spire"
// EnforceNonfalsifiabilityNone is the value used for "enable-nonfalsifiability" when non-falsifiability is not enabled.
Expand All @@ -76,6 +78,8 @@ const (
DefaultResultExtractionMethod = ResultExtractionMethodTerminationMessage
// DefaultMaxResultSize is the default value in bytes for the size of a result
DefaultMaxResultSize = 4096
// EnableCancelUsingEntrypoint is the flag used to enable cancelling a pod using the entrypoint
EnableCancelUsingEntrypoint = "enable-cancel-using-entrypoint"

disableAffinityAssistantKey = "disable-affinity-assistant"
disableCredsInitKey = "disable-creds-init"
Expand Down Expand Up @@ -105,6 +109,7 @@ type FeatureFlags struct {
SendCloudEventsForRuns bool
AwaitSidecarReadiness bool
EnforceNonfalsifiability string
EnableCancelUsingEntrypoint bool
// VerificationNoMatchPolicy is the feature flag for "trusted-resources-verification-no-match-policy"
// VerificationNoMatchPolicy can be set to "ignore", "warn" and "fail" values.
// ignore: skip trusted resources verification when no matching verification policies found
Expand Down Expand Up @@ -190,6 +195,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) {
if err := setMaxResultSize(cfgMap, DefaultMaxResultSize, &tc.MaxResultSize); err != nil {
return nil, err
}
if err := setFeature(EnableCancelUsingEntrypoint, DefaultEnableCancelUsingEntrypoint, &tc.EnableCancelUsingEntrypoint); err != nil {
return nil, err
}

// Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if
// enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/config/feature_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) {
EnableProvenanceInStatus: true,
ResultExtractionMethod: "termination-message",
MaxResultSize: 4096,
EnableCancelUsingEntrypoint: true,
},
fileName: "feature-flags-all-flags-set",
},
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/config/testdata/feature-flags-all-flags-set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ data:
enforce-nonfalsifiability: "spire"
trusted-resources-verification-no-match-policy: "fail"
enable-provenance-in-status: "true"
enable-cancel-using-entrypoint: "true"
61 changes: 54 additions & 7 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/pod"
"github.com/tektoncd/pipeline/pkg/spire"
"github.com/tektoncd/pipeline/pkg/termination"
"go.uber.org/zap"
Expand All @@ -43,6 +45,21 @@ const (
FailOnError = "stopAndFail"
)

// ContextError context error type
type ContextError string

// Error implements error interface
func (e ContextError) Error() string {
return string(e)
}

var (
// ContextDeadlineExceededError is the error returned when the context deadline is exceeded
ContextDeadlineExceededError = ContextError(context.DeadlineExceeded.Error())
// ContextCanceledError is the error returned when the context is canceled
ContextCanceledError = ContextError(context.Canceled.Error())
)

// Entrypointer holds fields for running commands with redirected
// entrypoints.
type Entrypointer struct {
Expand Down Expand Up @@ -87,12 +104,17 @@ type Entrypointer struct {
ResultsDirectory string
// ResultExtractionMethod is the method using which the controller extracts the results from the task pod.
ResultExtractionMethod string

// StopOnCancel indicates if the entrypoint should stop the command when taskrun is canceled
StopOnCancel bool
}

// Waiter encapsulates waiting for files to exist.
type Waiter interface {
// Wait blocks until the specified file exists.
Wait(file string, expectContent bool, breakpointOnFailure bool) error
// WaitWithContext blocks until the specified file exists or the context is done.
WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error
}

// Runner encapsulates running commands.
Expand Down Expand Up @@ -143,21 +165,25 @@ func (e Entrypointer) Go() error {
ResultType: v1beta1.InternalTektonResultType,
})

ctx := context.Background()
var err error

if e.Timeout != nil && *e.Timeout < time.Duration(0) {
err = fmt.Errorf("negative timeout specified")
}

ctx := context.Background()
var cancel context.CancelFunc
if err == nil {
var cancel context.CancelFunc
if e.Timeout != nil && *e.Timeout != time.Duration(0) {
ctx, cancel = context.WithCancel(ctx)
if e.Timeout != nil && *e.Timeout > time.Duration(0) {
ctx, cancel = context.WithTimeout(ctx, *e.Timeout)
defer cancel()
}
// start a goroutine to listen for cancellation file
go func() {
if err := e.waitingCancellation(ctx, cancel); err != nil {
logger.Error("Error while waiting for cancellation", zap.Error(err))
}
}()
err = e.Runner.Run(ctx, e.Command...)
if errors.Is(err, context.DeadlineExceeded) {
if errors.Is(err, ContextDeadlineExceededError) {
output = append(output, v1beta1.RunResult{
Key: "Reason",
Value: "TimeoutExceeded",
Expand All @@ -168,6 +194,15 @@ func (e Entrypointer) Go() error {

var ee *exec.ExitError
switch {
case err != nil && errors.Is(err, ContextCanceledError):
logger.Info("Step was canceling")
output = append(output, v1beta1.RunResult{
Key: "Reason",
Value: "Cancelled",
ResultType: v1beta1.InternalTektonResultType,
})
e.WritePostFile(e.PostFile, ContextCanceledError)
e.WriteExitCodeFile(e.StepMetadataDir, syscall.SIGKILL.String())
case err != nil && e.BreakpointOnFailure:
logger.Info("Skipping writing to PostFile")
case e.OnError == ContinueOnError && errors.As(err, &ee):
Expand Down Expand Up @@ -267,3 +302,15 @@ func (e Entrypointer) WriteExitCodeFile(stepPath, content string) {
exitCodeFile := filepath.Join(stepPath, "exitCode")
e.PostWriter.Write(exitCodeFile, content)
}

// waitingCancellation waiting cancellation file, if no error occurs, call cancelFunc to cancel the context
func (e Entrypointer) waitingCancellation(ctx context.Context, cancel context.CancelFunc) error {
if !e.StopOnCancel {
return nil
}
if err := e.Waiter.WaitWithContext(ctx, pod.DownwardMountCancelFile, true, false); err != nil {
return err
}
cancel()
return nil
}
Loading

0 comments on commit bc96a34

Please sign in to comment.