Skip to content

Commit

Permalink
The cancellation of taskruns is now done through the entrypoint binary
Browse files Browse the repository at this point in the history
through a new flag called 'stop_on_cancel'. This removes the need for
deleting the pods to cancel a taskrun, allowing examination of the logs
on the pods from cancelled taskruns. Part of work on issue tektoncd#3238

Signed-off-by: chengjoey <zchengjoey@gmail.com>
  • Loading branch information
chengjoey committed Apr 10, 2023
1 parent f543fdb commit cc99b2e
Show file tree
Hide file tree
Showing 17 changed files with 652 additions and 28 deletions.
10 changes: 10 additions & 0 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
enableSpire = flag.Bool("enable_spire", false, "If specified by configmap, this enables spire signing and verification")
socketPath = flag.String("spire_socket_path", "unix:///spiffe-workload-api/spire-agent.sock", "Experimental: The SPIRE agent socket for SPIFFE workload API.")
resultExtractionMethod = flag.String("result_from", featureFlags.ResultExtractionMethodTerminationMessage, "The method using which to extract results from tasks. Default is using the termination message.")
stopOnCancel = flag.Bool("stop_on_cancel", false, "If specified, stop the step when the taskrun is cancelled")
)

const (
Expand Down Expand Up @@ -164,6 +165,7 @@ func main() {
StepMetadataDir: *stepMetadataDir,
SpireWorkloadAPI: spireWorkloadAPI,
ResultExtractionMethod: *resultExtractionMethod,
StopOnCancel: *stopOnCancel,
}

// Copy any creds injected by the controller into the $HOME directory of the current
Expand All @@ -181,6 +183,14 @@ func main() {
case termination.MessageLengthError:
log.Print(err.Error())
os.Exit(1)
case entrypoint.ContextError:
if errors.Is(err, entrypoint.ErrContextCanceled) {
log.Print("Step was cancelled")
os.Exit(int(syscall.SIGKILL))
} else {
log.Print(err.Error())
os.Exit(1)
}
case *exec.ExitError:
// Copied from https://stackoverflow.com/questions/10385551/get-exit-code-go
// This works on both Unix and Windows. Although
Expand Down
13 changes: 11 additions & 2 deletions cmd/entrypoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
// Start defined command
if err := cmd.Start(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return context.DeadlineExceeded
return entrypoint.ErrContextDeadlineExceeded
}
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ErrContextCanceled
}
return err
}
Expand All @@ -134,9 +137,15 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error {
}()

// Wait for command to exit
// as os.exec [note](https://github.com/golang/go/blob/ee522e2cdad04a43bc9374776483b6249eb97ec9/src/os/exec/exec.go#L897-L906)
// cmd.Wait prefer Process error over context error
// but we want to return context error instead
if err := cmd.Wait(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return context.DeadlineExceeded
return entrypoint.ErrContextDeadlineExceeded
}
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ErrContextCanceled
}
return err
}
Expand Down
21 changes: 20 additions & 1 deletion cmd/entrypoint/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"syscall"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

// TestRealRunnerSignalForwarding will artificially put an interrupt signal (SIGINT) in the rr.signals chan.
Expand Down Expand Up @@ -183,10 +185,27 @@ func TestRealRunnerTimeout(t *testing.T) {
defer cancel()

if err := rr.Run(ctx, "sleep", "0.01"); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, entrypoint.ErrContextDeadlineExceeded) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't timeout")
}
}

func TestRealRunnerCanceled(t *testing.T) {
rr := realRunner{}
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
cancel()
}()

if err := rr.Run(ctx, "sleep", "3"); err != nil {
if !errors.Is(err, entrypoint.ErrContextCanceled) {
t.Fatalf("unexpected error received: %v", err)
}
} else {
t.Fatalf("step didn't cancel")
}
}
40 changes: 40 additions & 0 deletions cmd/entrypoint/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package main

import (
"context"
"errors"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -74,6 +76,44 @@ func (rw *realWaiter) Wait(file string, expectContent bool, breakpointOnFailure
}
}

func (rw *realWaiter) WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error {
if file == "" {
return nil
}
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return entrypoint.ErrContextCanceled
}
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return entrypoint.ErrContextDeadlineExceeded
}
return nil
case <-time.After(rw.waitPollingInterval):
}
if info, err := os.Stat(file); err == nil {
if !expectContent || info.Size() > 0 {
return nil
}
} else if !os.IsNotExist(err) {
return fmt.Errorf("waiting for %q: %w", file, err)
}
// When a .err file is read by this step, it means that a previous step has failed
// We wouldn't want this step to stop executing because the previous step failed during debug
// That is counterproductive to debugging
// Hence we disable skipError here so that the other steps in the failed taskRun can continue
// executing if breakpointOnFailure is enabled for the taskRun
// TLDR: Do not return skipError when breakpointOnFailure is enabled as it breaks execution of the TaskRun
if _, err := os.Stat(file + ".err"); err == nil {
if breakpointOnFailure {
return nil
}
return skipError("error file present, bail and skip the step")
}
}
}

type skipError string

func (e skipError) Error() string {
Expand Down
174 changes: 174 additions & 0 deletions cmd/entrypoint/waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package main

import (
"context"
"errors"
"os"
"strings"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/entrypoint"
)

const testWaitPollingInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -191,3 +194,174 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) {
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitWithContextCanceled(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithCancel(context.Background())
rw := realWaiter{}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context canceled error")
}
errCh <- err
}()
cancel()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ErrContextCanceled) {
t.Errorf("expected ErrContextCanceled, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ErrContextCanceled")
}
}

func TestRealWaiterWaitWithTimeout(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
rw := realWaiter{}
errCh := make(chan error)
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false)
if err == nil {
t.Errorf("expected context deadline error")
}
errCh <- err
}()
delay := time.NewTimer(2 * time.Second)
select {
case err := <-errCh:
if !errors.Is(err, entrypoint.ErrContextDeadlineExceeded) {
t.Errorf("expected ErrContextDeadlineExceeded, got %T", err)
}
case <-delay.C:
t.Errorf("expected Wait() to have a ErrContextDeadlineExceeded")
}
}

func TestRealWaiterWaitContextWithBreakpointOnFailure(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file*.err")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1)
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
// When breakpoint on failure is enabled skipError shouldn't be returned for a error waitfile
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmpFileName, false, true)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
close(doneCh)
}()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-doneCh:
// Success
case <-delay.C:
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitContextWithErrorWaitfile(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file*.err")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1)
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
// error of type skipError is returned after encountering a error waitfile
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmpFileName, false, false)
if err == nil {
t.Errorf("expected skipError upon encounter error waitfile")
}
var skipErr skipError
if errors.As(err, &skipErr) {
close(doneCh)
} else {
t.Errorf("unexpected error type %T", err)
}
}()
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-doneCh:
// Success
case <-delay.C:
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitContextWithContent(t *testing.T) {
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
defer os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmp.Name(), true, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
close(doneCh)
}()
if err := os.WriteFile(tmp.Name(), []byte("😺"), 0700); err != nil {
t.Errorf("error writing content to temp file: %v", err)
}
delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-doneCh:
// Success
case <-delay.C:
t.Errorf("expected Wait() to have detected a non-zero file size by now")
}
}

func TestRealWaiterWaitContextMissingFile(t *testing.T) {
// Create a temp file and then immediately delete it to get
// a legitimate tmp path and ensure the file doesnt exist
// prior to testing Wait().
tmp, err := os.CreateTemp("", "real_waiter_test_file")
if err != nil {
t.Errorf("error creating temp file: %v", err)
}
os.Remove(tmp.Name())
rw := realWaiter{}
doneCh := make(chan struct{})
go func() {
err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmp.Name(), false, false)
if err != nil {
t.Errorf("error waiting on tmp file %q", tmp.Name())
}
close(doneCh)
}()

delay := time.NewTimer(2 * testWaitPollingInterval)
select {
case <-delay.C:
// Success
case <-doneCh:
t.Errorf("did not expect Wait() to have detected a file at path %q", tmp.Name())
if !delay.Stop() {
<-delay.C
}
}
}
3 changes: 3 additions & 0 deletions docs/taskruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,9 @@ When you cancel a TaskRun, the running pod associated with that `TaskRun` is del
means that the logs of the `TaskRun` are not preserved. The deletion of the `TaskRun` pod is necessary
in order to stop `TaskRun` step containers from running.

**Note: if `enable-cancel-using-entrypoint` is set to
`"true"` in the `feature-flags`, the pod associated with that `TaskRun` will not be deleted**

Example of cancelling a `TaskRun`:

```yaml
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/config/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
DefaultEnableAPIFields = StableAPIFields
// DefaultSendCloudEventsForRuns is the default value for "send-cloudevents-for-runs".
DefaultSendCloudEventsForRuns = false
// DefaultEnableCancelUsingEntrypoint is the default value for "enable-cancel-using-entrypoint"
DefaultEnableCancelUsingEntrypoint = false
// EnforceNonfalsifiabilityWithSpire is the value used for "enable-nonfalsifiability" when SPIRE is used to enable non-falsifiability.
EnforceNonfalsifiabilityWithSpire = "spire"
// EnforceNonfalsifiabilityNone is the value used for "enable-nonfalsifiability" when non-falsifiability is not enabled.
Expand All @@ -76,6 +78,8 @@ const (
DefaultResultExtractionMethod = ResultExtractionMethodTerminationMessage
// DefaultMaxResultSize is the default value in bytes for the size of a result
DefaultMaxResultSize = 4096
// EnableCancelUsingEntrypoint is the flag used to enable cancelling a pod using the entrypoint
EnableCancelUsingEntrypoint = "enable-cancel-using-entrypoint"

disableAffinityAssistantKey = "disable-affinity-assistant"
disableCredsInitKey = "disable-creds-init"
Expand Down Expand Up @@ -105,6 +109,7 @@ type FeatureFlags struct {
SendCloudEventsForRuns bool
AwaitSidecarReadiness bool
EnforceNonfalsifiability string
EnableCancelUsingEntrypoint bool
// VerificationNoMatchPolicy is the feature flag for "trusted-resources-verification-no-match-policy"
// VerificationNoMatchPolicy can be set to "ignore", "warn" and "fail" values.
// ignore: skip trusted resources verification when no matching verification policies found
Expand Down Expand Up @@ -190,6 +195,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) {
if err := setMaxResultSize(cfgMap, DefaultMaxResultSize, &tc.MaxResultSize); err != nil {
return nil, err
}
if err := setFeature(EnableCancelUsingEntrypoint, DefaultEnableCancelUsingEntrypoint, &tc.EnableCancelUsingEntrypoint); err != nil {
return nil, err
}

// Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if
// enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/config/feature_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) {
EnableProvenanceInStatus: true,
ResultExtractionMethod: "termination-message",
MaxResultSize: 4096,
EnableCancelUsingEntrypoint: true,
},
fileName: "feature-flags-all-flags-set",
},
Expand Down
Loading

0 comments on commit cc99b2e

Please sign in to comment.