diff --git a/cmd/entrypoint/main.go b/cmd/entrypoint/main.go index f13c512e584..747e255dbca 100644 --- a/cmd/entrypoint/main.go +++ b/cmd/entrypoint/main.go @@ -58,6 +58,7 @@ var ( enableSpire = flag.Bool("enable_spire", false, "If specified by configmap, this enables spire signing and verification") socketPath = flag.String("spire_socket_path", "unix:///spiffe-workload-api/spire-agent.sock", "Experimental: The SPIRE agent socket for SPIFFE workload API.") resultExtractionMethod = flag.String("result_from", featureFlags.ResultExtractionMethodTerminationMessage, "The method using which to extract results from tasks. Default is using the termination message.") + stopOnCancel = flag.Bool("stop_on_cancel", false, "If specified, stop the step when the taskrun is cancelled") ) const ( @@ -164,6 +165,7 @@ func main() { StepMetadataDir: *stepMetadataDir, SpireWorkloadAPI: spireWorkloadAPI, ResultExtractionMethod: *resultExtractionMethod, + StopOnCancel: *stopOnCancel, } // Copy any creds injected by the controller into the $HOME directory of the current @@ -181,6 +183,14 @@ func main() { case termination.MessageLengthError: log.Print(err.Error()) os.Exit(1) + case entrypoint.ContextError: + if errors.Is(err, entrypoint.ErrContextCanceled) { + log.Print("Step was cancelled") + os.Exit(int(syscall.SIGKILL)) + } else { + log.Print(err.Error()) + os.Exit(1) + } case *exec.ExitError: // Copied from https://stackoverflow.com/questions/10385551/get-exit-code-go // This works on both Unix and Windows. Although diff --git a/cmd/entrypoint/runner.go b/cmd/entrypoint/runner.go index c9031e4ae73..6e137378531 100644 --- a/cmd/entrypoint/runner.go +++ b/cmd/entrypoint/runner.go @@ -118,7 +118,10 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error { // Start defined command if err := cmd.Start(); err != nil { if errors.Is(ctx.Err(), context.DeadlineExceeded) { - return context.DeadlineExceeded + return entrypoint.ErrContextDeadlineExceeded + } + if errors.Is(ctx.Err(), context.Canceled) { + return entrypoint.ErrContextCanceled } return err } @@ -134,9 +137,15 @@ func (rr *realRunner) Run(ctx context.Context, args ...string) error { }() // Wait for command to exit + // as os.exec [note](https://github.com/golang/go/blob/ee522e2cdad04a43bc9374776483b6249eb97ec9/src/os/exec/exec.go#L897-L906) + // cmd.Wait prefer Process error over context error + // but we want to return context error instead if err := cmd.Wait(); err != nil { if errors.Is(ctx.Err(), context.DeadlineExceeded) { - return context.DeadlineExceeded + return entrypoint.ErrContextDeadlineExceeded + } + if errors.Is(ctx.Err(), context.Canceled) { + return entrypoint.ErrContextCanceled } return err } diff --git a/cmd/entrypoint/runner_test.go b/cmd/entrypoint/runner_test.go index b2af31eb1da..4568f9394d0 100644 --- a/cmd/entrypoint/runner_test.go +++ b/cmd/entrypoint/runner_test.go @@ -28,6 +28,8 @@ import ( "syscall" "testing" "time" + + "github.com/tektoncd/pipeline/pkg/entrypoint" ) // TestRealRunnerSignalForwarding will artificially put an interrupt signal (SIGINT) in the rr.signals chan. @@ -183,10 +185,27 @@ func TestRealRunnerTimeout(t *testing.T) { defer cancel() if err := rr.Run(ctx, "sleep", "0.01"); err != nil { - if !errors.Is(err, context.DeadlineExceeded) { + if !errors.Is(err, entrypoint.ErrContextDeadlineExceeded) { t.Fatalf("unexpected error received: %v", err) } } else { t.Fatalf("step didn't timeout") } } + +func TestRealRunnerCanceled(t *testing.T) { + rr := realRunner{} + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Second) + cancel() + }() + + if err := rr.Run(ctx, "sleep", "3"); err != nil { + if !errors.Is(err, entrypoint.ErrContextCanceled) { + t.Fatalf("unexpected error received: %v", err) + } + } else { + t.Fatalf("step didn't cancel") + } +} diff --git a/cmd/entrypoint/waiter.go b/cmd/entrypoint/waiter.go index 29ebf9b4441..6842ea420be 100644 --- a/cmd/entrypoint/waiter.go +++ b/cmd/entrypoint/waiter.go @@ -17,6 +17,8 @@ limitations under the License. package main import ( + "context" + "errors" "fmt" "os" "time" @@ -74,6 +76,44 @@ func (rw *realWaiter) Wait(file string, expectContent bool, breakpointOnFailure } } +func (rw *realWaiter) WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error { + if file == "" { + return nil + } + for { + select { + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return entrypoint.ErrContextCanceled + } + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return entrypoint.ErrContextDeadlineExceeded + } + return nil + case <-time.After(rw.waitPollingInterval): + } + if info, err := os.Stat(file); err == nil { + if !expectContent || info.Size() > 0 { + return nil + } + } else if !os.IsNotExist(err) { + return fmt.Errorf("waiting for %q: %w", file, err) + } + // When a .err file is read by this step, it means that a previous step has failed + // We wouldn't want this step to stop executing because the previous step failed during debug + // That is counterproductive to debugging + // Hence we disable skipError here so that the other steps in the failed taskRun can continue + // executing if breakpointOnFailure is enabled for the taskRun + // TLDR: Do not return skipError when breakpointOnFailure is enabled as it breaks execution of the TaskRun + if _, err := os.Stat(file + ".err"); err == nil { + if breakpointOnFailure { + return nil + } + return skipError("error file present, bail and skip the step") + } + } +} + type skipError string func (e skipError) Error() string { diff --git a/cmd/entrypoint/waiter_test.go b/cmd/entrypoint/waiter_test.go index 422211802bc..7ffb708d8b8 100644 --- a/cmd/entrypoint/waiter_test.go +++ b/cmd/entrypoint/waiter_test.go @@ -17,11 +17,14 @@ limitations under the License. package main import ( + "context" "errors" "os" "strings" "testing" "time" + + "github.com/tektoncd/pipeline/pkg/entrypoint" ) const testWaitPollingInterval = 50 * time.Millisecond @@ -191,3 +194,174 @@ func TestRealWaiterWaitWithBreakpointOnFailure(t *testing.T) { t.Errorf("expected Wait() to have detected a non-zero file size by now") } } + +func TestRealWaiterWaitWithContextCanceled(t *testing.T) { + tmp, err := os.CreateTemp("", "real_waiter_test_file") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + defer os.Remove(tmp.Name()) + ctx, cancel := context.WithCancel(context.Background()) + rw := realWaiter{} + errCh := make(chan error) + go func() { + err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false) + if err == nil { + t.Errorf("expected context canceled error") + } + errCh <- err + }() + cancel() + delay := time.NewTimer(2 * testWaitPollingInterval) + select { + case err := <-errCh: + if !errors.Is(err, entrypoint.ErrContextCanceled) { + t.Errorf("expected ErrContextCanceled, got %T", err) + } + case <-delay.C: + t.Errorf("expected Wait() to have a ErrContextCanceled") + } +} + +func TestRealWaiterWaitWithTimeout(t *testing.T) { + tmp, err := os.CreateTemp("", "real_waiter_test_file") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + defer os.Remove(tmp.Name()) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + rw := realWaiter{} + errCh := make(chan error) + go func() { + err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(ctx, tmp.Name(), true, false) + if err == nil { + t.Errorf("expected context deadline error") + } + errCh <- err + }() + delay := time.NewTimer(2 * time.Second) + select { + case err := <-errCh: + if !errors.Is(err, entrypoint.ErrContextDeadlineExceeded) { + t.Errorf("expected ErrContextDeadlineExceeded, got %T", err) + } + case <-delay.C: + t.Errorf("expected Wait() to have a ErrContextDeadlineExceeded") + } +} + +func TestRealWaiterWaitContextWithBreakpointOnFailure(t *testing.T) { + tmp, err := os.CreateTemp("", "real_waiter_test_file*.err") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1) + defer os.Remove(tmp.Name()) + rw := realWaiter{} + doneCh := make(chan struct{}) + go func() { + // When breakpoint on failure is enabled skipError shouldn't be returned for a error waitfile + err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmpFileName, false, true) + if err != nil { + t.Errorf("error waiting on tmp file %q", tmp.Name()) + } + close(doneCh) + }() + delay := time.NewTimer(2 * testWaitPollingInterval) + select { + case <-doneCh: + // Success + case <-delay.C: + t.Errorf("expected Wait() to have detected a non-zero file size by now") + } +} + +func TestRealWaiterWaitContextWithErrorWaitfile(t *testing.T) { + tmp, err := os.CreateTemp("", "real_waiter_test_file*.err") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + tmpFileName := strings.Replace(tmp.Name(), ".err", "", 1) + defer os.Remove(tmp.Name()) + rw := realWaiter{} + doneCh := make(chan struct{}) + go func() { + // error of type skipError is returned after encountering a error waitfile + err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmpFileName, false, false) + if err == nil { + t.Errorf("expected skipError upon encounter error waitfile") + } + var skipErr skipError + if errors.As(err, &skipErr) { + close(doneCh) + } else { + t.Errorf("unexpected error type %T", err) + } + }() + delay := time.NewTimer(2 * testWaitPollingInterval) + select { + case <-doneCh: + // Success + case <-delay.C: + t.Errorf("expected Wait() to have detected a non-zero file size by now") + } +} + +func TestRealWaiterWaitContextWithContent(t *testing.T) { + tmp, err := os.CreateTemp("", "real_waiter_test_file") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + defer os.Remove(tmp.Name()) + rw := realWaiter{} + doneCh := make(chan struct{}) + go func() { + err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmp.Name(), true, false) + if err != nil { + t.Errorf("error waiting on tmp file %q", tmp.Name()) + } + close(doneCh) + }() + if err := os.WriteFile(tmp.Name(), []byte("😺"), 0700); err != nil { + t.Errorf("error writing content to temp file: %v", err) + } + delay := time.NewTimer(2 * testWaitPollingInterval) + select { + case <-doneCh: + // Success + case <-delay.C: + t.Errorf("expected Wait() to have detected a non-zero file size by now") + } +} + +func TestRealWaiterWaitContextMissingFile(t *testing.T) { + // Create a temp file and then immediately delete it to get + // a legitimate tmp path and ensure the file doesnt exist + // prior to testing Wait(). + tmp, err := os.CreateTemp("", "real_waiter_test_file") + if err != nil { + t.Errorf("error creating temp file: %v", err) + } + os.Remove(tmp.Name()) + rw := realWaiter{} + doneCh := make(chan struct{}) + go func() { + err := rw.setWaitPollingInterval(testWaitPollingInterval).WaitWithContext(context.Background(), tmp.Name(), false, false) + if err != nil { + t.Errorf("error waiting on tmp file %q", tmp.Name()) + } + close(doneCh) + }() + + delay := time.NewTimer(2 * testWaitPollingInterval) + select { + case <-delay.C: + // Success + case <-doneCh: + t.Errorf("did not expect Wait() to have detected a file at path %q", tmp.Name()) + if !delay.Stop() { + <-delay.C + } + } +} diff --git a/docs/taskruns.md b/docs/taskruns.md index 2c7b9fc897d..4dae3c0f237 100644 --- a/docs/taskruns.md +++ b/docs/taskruns.md @@ -803,6 +803,9 @@ When you cancel a TaskRun, the running pod associated with that `TaskRun` is del means that the logs of the `TaskRun` are not preserved. The deletion of the `TaskRun` pod is necessary in order to stop `TaskRun` step containers from running. +**Note: if `enable-cancel-using-entrypoint` is set to +`"true"` in the `feature-flags`, the pod associated with that `TaskRun` will not be deleted** + Example of cancelling a `TaskRun`: ```yaml diff --git a/pkg/apis/config/feature_flags.go b/pkg/apis/config/feature_flags.go index aa532fafdf9..3685a2405e0 100644 --- a/pkg/apis/config/feature_flags.go +++ b/pkg/apis/config/feature_flags.go @@ -62,6 +62,8 @@ const ( DefaultEnableAPIFields = StableAPIFields // DefaultSendCloudEventsForRuns is the default value for "send-cloudevents-for-runs". DefaultSendCloudEventsForRuns = false + // DefaultEnableCancelUsingEntrypoint is the default value for "enable-cancel-using-entrypoint" + DefaultEnableCancelUsingEntrypoint = false // EnforceNonfalsifiabilityWithSpire is the value used for "enable-nonfalsifiability" when SPIRE is used to enable non-falsifiability. EnforceNonfalsifiabilityWithSpire = "spire" // EnforceNonfalsifiabilityNone is the value used for "enable-nonfalsifiability" when non-falsifiability is not enabled. @@ -76,6 +78,8 @@ const ( DefaultResultExtractionMethod = ResultExtractionMethodTerminationMessage // DefaultMaxResultSize is the default value in bytes for the size of a result DefaultMaxResultSize = 4096 + // EnableCancelUsingEntrypoint is the flag used to enable cancelling a pod using the entrypoint + EnableCancelUsingEntrypoint = "enable-cancel-using-entrypoint" disableAffinityAssistantKey = "disable-affinity-assistant" disableCredsInitKey = "disable-creds-init" @@ -105,6 +109,7 @@ type FeatureFlags struct { SendCloudEventsForRuns bool AwaitSidecarReadiness bool EnforceNonfalsifiability string + EnableCancelUsingEntrypoint bool // VerificationNoMatchPolicy is the feature flag for "trusted-resources-verification-no-match-policy" // VerificationNoMatchPolicy can be set to "ignore", "warn" and "fail" values. // ignore: skip trusted resources verification when no matching verification policies found @@ -190,6 +195,9 @@ func NewFeatureFlagsFromMap(cfgMap map[string]string) (*FeatureFlags, error) { if err := setMaxResultSize(cfgMap, DefaultMaxResultSize, &tc.MaxResultSize); err != nil { return nil, err } + if err := setFeature(EnableCancelUsingEntrypoint, DefaultEnableCancelUsingEntrypoint, &tc.EnableCancelUsingEntrypoint); err != nil { + return nil, err + } // Given that they are alpha features, Tekton Bundles and Custom Tasks should be switched on if // enable-api-fields is "alpha". If enable-api-fields is not "alpha" then fall back to the value of diff --git a/pkg/apis/config/feature_flags_test.go b/pkg/apis/config/feature_flags_test.go index c63d7d8a922..73fca325dc1 100644 --- a/pkg/apis/config/feature_flags_test.go +++ b/pkg/apis/config/feature_flags_test.go @@ -68,6 +68,7 @@ func TestNewFeatureFlagsFromConfigMap(t *testing.T) { EnableProvenanceInStatus: true, ResultExtractionMethod: "termination-message", MaxResultSize: 4096, + EnableCancelUsingEntrypoint: true, }, fileName: "feature-flags-all-flags-set", }, diff --git a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml index 07f5f33a9de..4259a6282ba 100644 --- a/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml +++ b/pkg/apis/config/testdata/feature-flags-all-flags-set.yaml @@ -29,3 +29,4 @@ data: enforce-nonfalsifiability: "spire" trusted-resources-verification-no-match-policy: "fail" enable-provenance-in-status: "true" + enable-cancel-using-entrypoint: "true" diff --git a/pkg/entrypoint/entrypointer.go b/pkg/entrypoint/entrypointer.go index 6d7273b5a14..032b4454b82 100644 --- a/pkg/entrypoint/entrypointer.go +++ b/pkg/entrypoint/entrypointer.go @@ -26,11 +26,13 @@ import ( "path/filepath" "strconv" "strings" + "syscall" "time" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/pod" "github.com/tektoncd/pipeline/pkg/spire" "github.com/tektoncd/pipeline/pkg/termination" "go.uber.org/zap" @@ -43,6 +45,21 @@ const ( FailOnError = "stopAndFail" ) +// ContextError context error type +type ContextError string + +// Error implements error interface +func (e ContextError) Error() string { + return string(e) +} + +var ( + // ErrContextDeadlineExceeded is the error returned when the context deadline is exceeded + ErrContextDeadlineExceeded = ContextError(context.DeadlineExceeded.Error()) + // ErrContextCanceled is the error returned when the context is canceled + ErrContextCanceled = ContextError(context.Canceled.Error()) +) + // Entrypointer holds fields for running commands with redirected // entrypoints. type Entrypointer struct { @@ -87,12 +104,17 @@ type Entrypointer struct { ResultsDirectory string // ResultExtractionMethod is the method using which the controller extracts the results from the task pod. ResultExtractionMethod string + + // StopOnCancel indicates if the entrypoint should stop the command when taskrun is canceled + StopOnCancel bool } // Waiter encapsulates waiting for files to exist. type Waiter interface { // Wait blocks until the specified file exists. Wait(file string, expectContent bool, breakpointOnFailure bool) error + // WaitWithContext blocks until the specified file exists or the context is done. + WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error } // Runner encapsulates running commands. @@ -143,21 +165,25 @@ func (e Entrypointer) Go() error { ResultType: v1beta1.InternalTektonResultType, }) - ctx := context.Background() var err error - if e.Timeout != nil && *e.Timeout < time.Duration(0) { err = fmt.Errorf("negative timeout specified") } - + ctx := context.Background() + var cancel context.CancelFunc if err == nil { - var cancel context.CancelFunc - if e.Timeout != nil && *e.Timeout != time.Duration(0) { + ctx, cancel = context.WithCancel(ctx) + if e.Timeout != nil && *e.Timeout > time.Duration(0) { ctx, cancel = context.WithTimeout(ctx, *e.Timeout) - defer cancel() } + // start a goroutine to listen for cancellation file + go func() { + if err := e.waitingCancellation(ctx, cancel); err != nil { + logger.Error("Error while waiting for cancellation", zap.Error(err)) + } + }() err = e.Runner.Run(ctx, e.Command...) - if errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, ErrContextDeadlineExceeded) { output = append(output, v1beta1.RunResult{ Key: "Reason", Value: "TimeoutExceeded", @@ -168,6 +194,15 @@ func (e Entrypointer) Go() error { var ee *exec.ExitError switch { + case err != nil && errors.Is(err, ErrContextCanceled): + logger.Info("Step was canceling") + output = append(output, v1beta1.RunResult{ + Key: "Reason", + Value: "Cancelled", + ResultType: v1beta1.InternalTektonResultType, + }) + e.WritePostFile(e.PostFile, ErrContextCanceled) + e.WriteExitCodeFile(e.StepMetadataDir, syscall.SIGKILL.String()) case err != nil && e.BreakpointOnFailure: logger.Info("Skipping writing to PostFile") case e.OnError == ContinueOnError && errors.As(err, &ee): @@ -267,3 +302,15 @@ func (e Entrypointer) WriteExitCodeFile(stepPath, content string) { exitCodeFile := filepath.Join(stepPath, "exitCode") e.PostWriter.Write(exitCodeFile, content) } + +// waitingCancellation waiting cancellation file, if no error occurs, call cancelFunc to cancel the context +func (e Entrypointer) waitingCancellation(ctx context.Context, cancel context.CancelFunc) error { + if !e.StopOnCancel { + return nil + } + if err := e.Waiter.WaitWithContext(ctx, pod.DownwardMountCancelFile, true, false); err != nil { + return err + } + cancel() + return nil +} diff --git a/pkg/entrypoint/entrypointer_test.go b/pkg/entrypoint/entrypointer_test.go index 2d6bcfa5275..e9c619478d0 100644 --- a/pkg/entrypoint/entrypointer_test.go +++ b/pkg/entrypoint/entrypointer_test.go @@ -601,6 +601,88 @@ func TestEntrypointerResults(t *testing.T) { } } +func Test_waitingCancellation(t *testing.T) { + type args struct { + stopOnCancel bool + } + testCases := []struct { + name string + args args + expectCtxErr error + }{ + { + name: "stopOnCancel is false", + args: args{ + stopOnCancel: false, + }, + }, + { + name: "stopOnCancel is true and want context canceled", + args: args{ + stopOnCancel: true, + }, + expectCtxErr: context.Canceled, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + fw := &fakeWaiter{} + err := Entrypointer{ + Waiter: fw, + StopOnCancel: tc.args.stopOnCancel, + }.waitingCancellation(ctx, cancel) + if err != nil { + t.Fatalf("Entrypointer waitingCancellation failed: %v", err) + } + if tc.expectCtxErr != nil && !errors.Is(ctx.Err(), tc.expectCtxErr) { + t.Errorf("expected context error %v, got %v", tc.expectCtxErr, ctx.Err()) + } + }) + } +} + +func TestEntrypointerStopOnCancel(t *testing.T) { + testCases := []struct { + name string + stopOnCancel bool + expectError error + }{ + { + name: "stopOnCancel is false, expect no error", + }, + { + name: "stopOnCancel is true, expect context canceled error", + stopOnCancel: true, + expectError: ErrContextCanceled, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + terminationPath := "termination" + if terminationFile, err := os.CreateTemp("", "termination"); err != nil { + t.Fatalf("unexpected error creating temporary termination file: %v", err) + } else { + terminationPath = terminationFile.Name() + defer os.Remove(terminationFile.Name()) + } + fw := &fakeWaiter{} + fr := &fakeLongRunner{duration: 1 * time.Second} + fp := &fakePostWriter{} + err := Entrypointer{ + Waiter: fw, + StopOnCancel: tc.stopOnCancel, + Runner: fr, + PostWriter: fp, + TerminationPath: terminationPath, + }.Go() + if !errors.Is(err, tc.expectError) { + t.Errorf("expected error %v, got %v", tc.expectError, err) + } + }) + } +} + type fakeWaiter struct{ waited []string } func (f *fakeWaiter) Wait(file string, _ bool, _ bool) error { @@ -608,6 +690,11 @@ func (f *fakeWaiter) Wait(file string, _ bool, _ bool) error { return nil } +func (f *fakeWaiter) WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error { + f.waited = append(f.waited, file) + return nil +} + type fakeRunner struct{ args *[]string } func (f *fakeRunner) Run(ctx context.Context, args ...string) error { @@ -637,6 +724,11 @@ func (f *fakeErrorWaiter) Wait(file string, expectContent bool, breakpointOnFail return errors.New("waiter failed") } +func (f *fakeErrorWaiter) WaitWithContext(ctx context.Context, file string, expectContent bool, breakpointOnFailure bool) error { + f.waited = &file + return errors.New("waiter failed") +} + type fakeErrorRunner struct{ args *[]string } func (f *fakeErrorRunner) Run(ctx context.Context, args ...string) error { @@ -671,6 +763,23 @@ func (f *fakeExitErrorRunner) Run(ctx context.Context, args ...string) error { return exec.Command("ls", "/bogus/path").Run() } +type fakeLongRunner struct{ duration time.Duration } + +func (f *fakeLongRunner) Run(ctx context.Context, _ ...string) error { + select { + case <-time.After(f.duration): + return nil + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return ErrContextCanceled + } + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return ErrContextDeadlineExceeded + } + return nil + } +} + type fakeResultsWriter struct { args *[]string resultsToWrite map[string]string diff --git a/pkg/pod/entrypoint.go b/pkg/pod/entrypoint.go index e62b04b6fc3..aa1c6ff77db 100644 --- a/pkg/pod/entrypoint.go +++ b/pkg/pod/entrypoint.go @@ -61,6 +61,10 @@ const ( sidecarPrefix = "sidecar-" breakpointOnFailure = "onFailure" + + downwardMountCancelFile = "cancel" + cancelAnnotation = "tekton.dev/cancel" + cancelAnnotationValue = "CANCEL" ) var ( @@ -83,6 +87,12 @@ var ( MountPath: pipeline.StepsDir, } + downwardCancelVolumeItem = corev1.DownwardAPIVolumeFile{ + Path: downwardMountCancelFile, + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", cancelAnnotation), + }, + } // TODO(#1605): Signal sidecar readiness by injecting entrypoint, // remove dependency on Downward API. downwardVolume = corev1.Volume{ @@ -105,6 +115,8 @@ var ( // since the volume itself is readonly, but including for completeness. ReadOnly: true, } + // DownwardMountCancelFile is cancellation file mount to step, entrypoint will check this file to cancel the step. + DownwardMountCancelFile = filepath.Join(downwardMountPoint, downwardMountCancelFile) ) // orderContainers returns the specified steps, modified so that they are @@ -114,7 +126,7 @@ var ( // command, we must have fetched the image's ENTRYPOINT before calling this // method, using entrypoint_lookup.go. // Additionally, Step timeouts are added as entrypoint flag. -func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Container, taskSpec *v1beta1.TaskSpec, breakpointConfig *v1beta1.TaskRunDebug, waitForReadyAnnotation bool) ([]corev1.Container, error) { +func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Container, taskSpec *v1beta1.TaskSpec, breakpointConfig *v1beta1.TaskRunDebug, waitForReadyAnnotation, enableCancelUsingEntrypoint bool) ([]corev1.Container, error) { if len(steps) == 0 { return nil, errors.New("No steps specified") } @@ -172,6 +184,10 @@ func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Containe } } + if enableCancelUsingEntrypoint { + argsForEntrypoint = append(argsForEntrypoint, "-stop_on_cancel") + } + cmd, args := s.Command, s.Args if len(cmd) > 0 { argsForEntrypoint = append(argsForEntrypoint, "-entrypoint", cmd[0]) @@ -209,7 +225,7 @@ func collectResultsName(results []v1beta1.TaskResult) string { return strings.Join(resultNames, ",") } -var replaceReadyPatchBytes []byte +var replaceReadyPatchBytes, replaceCancelPatchBytes []byte func init() { // https://stackoverflow.com/questions/55573724/create-a-patch-to-add-a-kubernetes-annotation @@ -223,6 +239,33 @@ func init() { if err != nil { log.Fatalf("failed to marshal replace ready patch bytes: %v", err) } + + cancelAnnotationPath := "/metadata/annotations/" + strings.Replace(cancelAnnotation, "/", "~1", 1) + replaceCancelPatchBytes, err = json.Marshal([]jsonpatch.JsonPatchOperation{{ + Operation: "replace", + Path: cancelAnnotationPath, + Value: cancelAnnotationValue, + }}) + if err != nil { + log.Fatalf("failed to marshal replace cancel patch bytes: %v", err) + } +} + +// CancelPod updates the Pod's annotations to signal the cancellation +// by projecting the cancel annotation via the Downward API. +// Do not wrap the error, return directly, because the caller needs to judge whether it is a not found error +func CancelPod(ctx context.Context, kubeClient kubernetes.Interface, namespace, podName string) error { + // PATCH the Pod's annotations to replace the cancel annotation with the + // "CANCEL" value, to signal the pod to be cancelled. + _, err := kubeClient.CoreV1().Pods(namespace).Patch(ctx, podName, types.JSONPatchType, replaceCancelPatchBytes, metav1.PatchOptions{}) + return err +} + +// DeletePod delete specify pod by namespace and pod name. +// Do not wrap the error, return directly, because the caller needs to judge whether it is a not found error +func DeletePod(ctx context.Context, kubeClient kubernetes.Interface, namespace, podName string) error { + err := kubeClient.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) + return err } // UpdateReady updates the Pod's annotations to signal the first step to start diff --git a/pkg/pod/entrypoint_test.go b/pkg/pod/entrypoint_test.go index 261862f1360..6cb5161be45 100644 --- a/pkg/pod/entrypoint_test.go +++ b/pkg/pod/entrypoint_test.go @@ -95,7 +95,7 @@ func TestOrderContainers(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, nil, nil, true) + got, err := orderContainers([]string{}, steps, nil, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -163,7 +163,7 @@ func TestOrderContainersWithResultsSidecarLogs(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{"-dont_send_results_to_termination_path"}, steps, nil, nil, true) + got, err := orderContainers([]string{"-dont_send_results_to_termination_path"}, steps, nil, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -209,7 +209,7 @@ func TestOrderContainersWithNoWait(t *testing.T) { VolumeMounts: []corev1.VolumeMount{volumeMount}, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, nil, nil, false) + got, err := orderContainers([]string{}, steps, nil, nil, false, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -243,7 +243,7 @@ func TestOrderContainersWithDebugOnFailure(t *testing.T) { taskRunDebugConfig := &v1beta1.TaskRunDebug{ Breakpoint: []string{"onFailure"}, } - got, err := orderContainers([]string{}, steps, nil, taskRunDebugConfig, true) + got, err := orderContainers([]string{}, steps, nil, taskRunDebugConfig, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -321,7 +321,38 @@ func TestEntryPointResults(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) + if err != nil { + t.Fatalf("orderContainers: %v", err) + } + if d := cmp.Diff(want, got); d != "" { + t.Errorf("Diff %s", diff.PrintWantGot(d)) + } +} + +func TestEntryPointEnableCancelUsingEntrypoint(t *testing.T) { + steps := []corev1.Container{{ + Image: "step-1", + Command: []string{"cmd"}, + Args: []string{"arg1", "arg2"}, + }} + want := []corev1.Container{{ + Image: "step-1", + Command: []string{entrypointBinary}, + Args: []string{ + "-wait_file", "/tekton/downward/ready", + "-wait_file_content", + "-post_file", "/tekton/run/0/out", + "-termination_path", "/tekton/termination", + "-step_metadata_dir", "/tekton/run/0/status", + "-stop_on_cancel", + "-entrypoint", "cmd", "--", + "arg1", "arg2", + }, + VolumeMounts: []corev1.VolumeMount{downwardMount}, + TerminationMessagePath: "/tekton/termination", + }} + got, err := orderContainers([]string{}, steps, nil, nil, true, true) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -362,7 +393,7 @@ func TestEntryPointResultsSingleStep(t *testing.T) { VolumeMounts: []corev1.VolumeMount{downwardMount}, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -399,7 +430,7 @@ func TestEntryPointSingleResultsSingleStep(t *testing.T) { VolumeMounts: []corev1.VolumeMount{downwardMount}, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } @@ -470,7 +501,7 @@ func TestEntryPointOnError(t *testing.T) { err: errors.New("task step onError must be either \"continue\" or \"stopAndFail\" but it is set to an invalid value \"invalid-on-error\""), }} { t.Run(tc.desc, func(t *testing.T) { - got, err := orderContainers([]string{}, steps, &tc.taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &tc.taskSpec, nil, true, false) if len(tc.wantContainers) == 0 { if err == nil { t.Fatalf("expected an error for an invalid value for onError but received none") @@ -569,7 +600,7 @@ func TestEntryPointStepOutputConfigs(t *testing.T) { }, TerminationMessagePath: "/tekton/termination", }} - got, err := orderContainers([]string{}, steps, &taskSpec, nil, true) + got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false) if err != nil { t.Fatalf("orderContainers: %v", err) } diff --git a/pkg/pod/pod.go b/pkg/pod/pod.go index 3bdd2fc49a6..ae508d2be0b 100644 --- a/pkg/pod/pod.go +++ b/pkg/pod/pod.go @@ -123,6 +123,7 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec defaultForbiddenEnv := config.FromContextOrDefaults(ctx).Defaults.DefaultForbiddenEnv alphaAPIEnabled := featureFlags.EnableAPIFields == config.AlphaAPIFields sidecarLogsResultsEnabled := config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs + enableCancelUsingEntrypoint := featureFlags.EnableCancelUsingEntrypoint // Add our implicit volumes first, so they can be overridden by the user if they prefer. volumes = append(volumes, implicitVolumes...) @@ -204,16 +205,20 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1beta1.TaskRun, taskSpec readyImmediately := isPodReadyImmediately(*featureFlags, taskSpec.Sidecars) if alphaAPIEnabled { - stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, taskRun.Spec.Debug, !readyImmediately) + stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, taskRun.Spec.Debug, !readyImmediately, enableCancelUsingEntrypoint) } else { - stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, nil, !readyImmediately) + stepContainers, err = orderContainers(commonExtraEntrypointArgs, stepContainers, &taskSpec, nil, !readyImmediately, enableCancelUsingEntrypoint) } if err != nil { return nil, err } volumes = append(volumes, binVolume) if !readyImmediately { - volumes = append(volumes, downwardVolume) + downwardVolumeDup := downwardVolume.DeepCopy() + if enableCancelUsingEntrypoint { + downwardVolumeDup.VolumeSource.DownwardAPI.Items = append(downwardVolumeDup.VolumeSource.DownwardAPI.Items, downwardCancelVolumeItem) + } + volumes = append(volumes, *downwardVolumeDup) } // Order of precedence for envs diff --git a/pkg/pod/pod_test.go b/pkg/pod/pod_test.go index 37f62924c00..a471d0f1a00 100644 --- a/pkg/pod/pod_test.go +++ b/pkg/pod/pod_test.go @@ -1963,6 +1963,64 @@ _EOF_ }), ActiveDeadlineSeconds: &defaultActiveDeadlineSeconds, }, + }, { + desc: "cancel use entrypoint enabled", + featureFlags: map[string]string{"enable-cancel-using-entrypoint": "true"}, + ts: v1beta1.TaskSpec{ + Steps: []v1beta1.Step{{ + Name: "name", + Image: "image", + Command: []string{"cmd"}, // avoid entrypoint lookup. + }}, + }, + want: &corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + InitContainers: []corev1.Container{ + entrypointInitContainer(images.EntrypointImage, []v1beta1.Step{{Name: "name"}}), + }, + Containers: []corev1.Container{{ + Name: "step-name", + Image: "image", + Command: []string{"/tekton/bin/entrypoint"}, + Args: []string{ + "-wait_file", + "/tekton/downward/ready", + "-wait_file_content", + "-post_file", + "/tekton/run/0/out", + "-termination_path", + "/tekton/termination", + "-step_metadata_dir", + "/tekton/run/0/status", + "-stop_on_cancel", + "-entrypoint", + "cmd", + "--", + }, + VolumeMounts: append([]corev1.VolumeMount{binROMount, runMount(0, false), downwardMount, { + Name: "tekton-creds-init-home-0", + MountPath: "/tekton/creds", + }}, implicitVolumeMounts...), + TerminationMessagePath: "/tekton/termination", + }}, + Volumes: append(implicitVolumes, binVolume, runVolume(0), corev1.Volume{ + Name: downwardVolumeName, + VolumeSource: corev1.VolumeSource{ + DownwardAPI: &corev1.DownwardAPIVolumeSource{ + Items: []corev1.DownwardAPIVolumeFile{{ + Path: downwardMountReadyFile, + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", readyAnnotation), + }, + }, downwardCancelVolumeItem}, + }, + }, + }, corev1.Volume{ + Name: "tekton-creds-init-home-0", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{Medium: corev1.StorageMediumMemory}}, + }), + ActiveDeadlineSeconds: &defaultActiveDeadlineSeconds, + }, }} { t.Run(c.desc, func(t *testing.T) { names.TestingSeed() @@ -2260,6 +2318,10 @@ debug-fail-continue-heredoc-randomly-generated-mz4c7 } } +func TestPodBuildWithEnabledCancelUseEntrypoint(t *testing.T) { + +} + type ExpectedComputeResources struct { name string ResourceRequirements corev1.ResourceRequirements diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index bbcf5436553..a79b2b63267 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -648,10 +648,17 @@ func (c *Reconciler) failTaskRun(ctx context.Context, tr *v1beta1.TaskRun, reaso return nil } - // tr.Status.PodName will be empty if the pod was never successfully created. This condition - // can be reached, for example, by the pod never being schedulable due to limits imposed by - // a namespace's ResourceQuota. - err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Delete(ctx, tr.Status.PodName, metav1.DeleteOptions{}) + var err error + switch { + case reason == v1beta1.TaskRunReasonCancelled && config.FromContextOrDefaults(ctx).FeatureFlags.EnableCancelUsingEntrypoint: + logger.Infof("canceling task run %q by entrypoint", tr.Name) + err = podconvert.CancelPod(ctx, c.KubeClientSet, tr.Namespace, tr.Status.PodName) + default: + // tr.Status.PodName will be empty if the pod was never successfully created. This condition + // can be reached, for example, by the pod never being schedulable due to limits imposed by + // a namespace's ResourceQuota. + err = podconvert.DeletePod(ctx, c.KubeClientSet, tr.Namespace, tr.Status.PodName) + } if err != nil && !k8serrors.IsNotFound(err) { logger.Infof("Failed to terminate pod: %v", err) return err diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 65bb2e93bd7..ff19efee49f 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -3372,6 +3372,7 @@ func TestFailTaskRun(t *testing.T) { pod *corev1.Pod reason v1beta1.TaskRunReason message string + featureFlags map[string]string expectedStatus apis.Condition expectedStepStates []v1beta1.StepState }{{ @@ -3468,6 +3469,51 @@ status: }, }, }, + }, { + name: "step-status-update-cancel-with-enable-cancel-using-entrypoint", + taskRun: parse.MustParseV1beta1TaskRun(t, ` +metadata: + name: test-taskrun-run-cancel + namespace: foo +spec: + status: TaskRunCancelled + statusMessage: "Test cancellation message." + taskRef: + name: test-task +status: + conditions: + - status: Unknown + type: Succeeded + podName: foo-is-bar + steps: + - running: + startedAt: "2022-01-01T00:00:00Z" +`), + pod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "foo-is-bar", + }}, + reason: v1beta1.TaskRunReasonCancelled, + message: "TaskRun test-taskrun-run-cancel was cancelled. Test cancellation message.", + featureFlags: map[string]string{ + "enable-cancel-using-entrypoint": "true", + }, + expectedStatus: apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: v1beta1.TaskRunReasonCancelled.String(), + Message: "TaskRun test-taskrun-run-cancel was cancelled. Test cancellation message.", + }, + expectedStepStates: []v1beta1.StepState{ + { + ContainerState: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 1, + Reason: v1beta1.TaskRunReasonCancelled.String(), + }, + }, + }, + }, }, { name: "step-status-update-timeout", taskRun: parse.MustParseV1beta1TaskRun(t, ` @@ -3693,6 +3739,15 @@ status: t.Run(tc.name, func(t *testing.T) { d := test.Data{ TaskRuns: []*v1beta1.TaskRun{tc.taskRun}, + ConfigMaps: []*corev1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: config.GetFeatureFlagsConfigName(), + Namespace: system.Namespace(), + }, + Data: tc.featureFlags, + }, + }, } if tc.pod != nil { d.Pods = []*corev1.Pod{tc.pod}