Skip to content

Commit

Permalink
Switch PipelineRun timeout -> TaskRun logic to instead signal the Tas…
Browse files Browse the repository at this point in the history
…kRuns to stop

Fixes #5127

As noted in #5127, the logic around calculating a timeout for a `PipelineRun`'s `TaskRun` to make sure that the `TaskRun`'s timeout is always going to end before the `PipelineRun`'s timeout ends is problematic. It can result in race conditions where a `TaskRun` gets timed out, immediately followed by the `PipelineRun` being reconciled while not yet having hit the end of its own timeout. This change gets rid of that behavior, and instead sets the `TaskRun.Spec.Status` to a new value, `TaskRunTimedOut`, with the `TaskRun` reconciler handling that in the same way it does setting `TaskRun.Spec.Status` to `TaskRunCancelled`.

By doing this, we can unify the behavior for both `TaskRun`s and `Run`s upon `PipelineRun`s timing out, given that we already cancel `Run`s upon `PipelineRun` timeout, and we can kill off a bunch of flaky outcomes for `PipelineRun`s.

Signed-off-by: Andrew Bayer <andrew.bayer@gmail.com>
  • Loading branch information
abayer committed Jul 13, 2022
1 parent 5126d15 commit 0d32280
Show file tree
Hide file tree
Showing 9 changed files with 611 additions and 16 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ const (
// TaskRunSpecStatusCancelled indicates that the user wants to cancel the task,
// if not already cancelled or terminated
TaskRunSpecStatusCancelled = "TaskRunCancelled"
// TaskRunSpecStatusTimedOut indicates that the PipelineRun owning this task wants to mark it as timed out,
// if not already cancelled or terminated
TaskRunSpecStatusTimedOut = "TaskRunTimedOut"
)

// TaskRunDebug defines the breakpoint config for a particular TaskRun
Expand Down Expand Up @@ -424,6 +427,11 @@ func (tr *TaskRun) IsCancelled() bool {
return tr.Spec.Status == TaskRunSpecStatusCancelled
}

// IsSpecTimedOut returns true if the TaskRun's spec status is set to TimedOut state
func (tr *TaskRun) IsSpecTimedOut() bool {
return tr.Spec.Status == TaskRunSpecStatusTimedOut
}

// HasTimedOut returns true if the TaskRun runtime is beyond the allowed timeout
func (tr *TaskRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bool {
if tr.Status.StartTime.IsZero() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/pipeline/v1beta1/taskrun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (ts *TaskRunSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
}

if ts.Status != "" {
if ts.Status != TaskRunSpecStatusCancelled {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s", ts.Status, TaskRunSpecStatusCancelled), "status"))
if ts.Status != TaskRunSpecStatusCancelled && ts.Status != TaskRunSpecStatusTimedOut {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s or %s", ts.Status, TaskRunSpecStatusCancelled, TaskRunSpecStatusTimedOut), "status"))
}
}
if ts.Timeout != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1beta1/taskrun_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestTaskRunSpec_Invalidate(t *testing.T) {
},
Status: "TaskRunCancell",
},
wantErr: apis.ErrInvalidValue("TaskRunCancell should be TaskRunCancelled", "status"),
wantErr: apis.ErrInvalidValue("TaskRunCancell should be TaskRunCancelled or TaskRunTimedOut", "status"),
}, {
name: "invalid taskspec",
spec: v1beta1.TaskRunSpec{
Expand Down
18 changes: 13 additions & 5 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ const (
// ReasonCouldntCancel indicates that a PipelineRun was cancelled but attempting to update
// all of the running TaskRuns as cancelled failed.
ReasonCouldntCancel = "PipelineRunCouldntCancel"
// ReasonCouldntTimeOut indicates that a PipelineRun was timed out but attempting to update
// all of the running TaskRuns as timed out failed.
ReasonCouldntTimeOut = "PipelineRunCouldntTimeOut"
// ReasonInvalidTaskResultReference indicates a task result was declared
// but was not initialized by that task
ReasonInvalidTaskResultReference = "InvalidTaskResultReference"
Expand Down Expand Up @@ -579,6 +582,13 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
// Reset the skipped status to trigger recalculation
pipelineRunFacts.ResetSkippedCache()

// If the pipelinerun has timed out, mark tasks as timed out and update status
if pr.HasTimedOut(ctx, c.Clock) {
if err := timeoutPipelineRun(ctx, logger, pr, c.PipelineClientSet); err != nil {
return err
}
}

after := pipelineRunFacts.GetPipelineConditionStatus(ctx, pr, logger, c.Clock)
switch after.Status {
case corev1.ConditionTrue:
Expand Down Expand Up @@ -621,7 +631,7 @@ func (c *Reconciler) processRunTimeouts(ctx context.Context, pr *v1beta1.Pipelin
}
for _, rpt := range pipelineState {
if rpt.IsCustomTask() {
if rpt.Run != nil && !rpt.Run.IsCancelled() && (pr.HasTimedOut(ctx, c.Clock) || (rpt.Run.HasTimedOut(c.Clock) && !rpt.Run.IsDone())) {
if rpt.Run != nil && !rpt.Run.IsCancelled() && rpt.Run.HasTimedOut(c.Clock) && !rpt.Run.IsDone() {
logger.Infof("Cancelling run task: %s due to timeout.", rpt.RunName)
err := cancelRun(ctx, rpt.RunName, pr.Namespace, c.PipelineClientSet)
if err != nil {
Expand Down Expand Up @@ -1141,12 +1151,10 @@ func calculateTaskRunTimeout(timeout time.Duration, pr *v1beta1.PipelineRun, rpt
if pElapsedTime > timeout {
return &metav1.Duration{Duration: 1 * time.Second}
}
timeRemaining := (timeout - pElapsedTime)
// Return the smaller of timeRemaining and rpt.pipelineTask.timeout
if rpt.PipelineTask.Timeout != nil && rpt.PipelineTask.Timeout.Duration < timeRemaining {
if rpt.PipelineTask.Timeout != nil && rpt.PipelineTask.Timeout.Duration < timeout {
return &metav1.Duration{Duration: rpt.PipelineTask.Timeout.Duration}
}
return &metav1.Duration{Duration: timeRemaining}
return &metav1.Duration{Duration: timeout}
}

if timeout == apisconfig.NoTimeoutDuration && rpt.PipelineTask.Timeout != nil {
Expand Down
158 changes: 150 additions & 8 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,16 +1736,44 @@ spec:
name: test-pipeline
serviceAccountName: test-sa
status:
conditions:
- message: running...
reason: Running
status: Unknown
type: Succeeded
startTime: "2021-12-31T00:00:00Z"
runs:
test-pipeline-run-custom-task-hello-world-1:
pipelineTaskName: hello-world-1
status:
conditions:
- status: Unknown
type: Succeeded
`)}
prs[0].Spec.Timeout = tc.timeout
prs[0].Spec.Timeouts = tc.timeouts

runs := []*v1alpha1.Run{mustParseRunWithObjectMeta(t,
taskRunObjectMeta("test-pipeline-run-custom-task-hello-world-1", "test", "test-pipeline-run-custom-task",
"test-pipeline", "hello-world-1", true),
`
spec:
ref:
apiVersion: example.dev/v0
kind: Example
status:
conditions:
- status: Unknown
type: Succeeded
startTime: "2021-12-31T11:58:59Z"
`)}

cms := []*corev1.ConfigMap{withCustomTasks(newFeatureFlagsConfigMap())}
d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
ConfigMaps: cms,
Runs: runs,
}
prt := newPipelineRunTest(d, t)
defer prt.Cancel()
Expand All @@ -1763,9 +1791,9 @@ status:
}

gotTimeoutValue := postReconcileRun.GetTimeout()
expectedTimeoutValue := time.Second
expectedTimeoutValue := time.Hour

if d := cmp.Diff(gotTimeoutValue, expectedTimeoutValue); d != "" {
if d := cmp.Diff(expectedTimeoutValue, gotTimeoutValue); d != "" {
t.Fatalf("Expected timeout for created Run, but got a mismatch %s", diff.PrintWantGot(d))
}

Expand Down Expand Up @@ -2755,6 +2783,120 @@ spec:
}
}

func TestReconcileFailsTaskRunTimeOut(t *testing.T) {
prName := "test-pipeline-fails-to-timeout"

// TestReconcileFailsTaskRunTimeOut runs "Reconcile" on a PipelineRun with a single TaskRun.
// The TaskRun cannot be timed out. Check that the pipelinerun timeout fails, that reconcile fails and
// an event is generated
names.TestingSeed()
prs := []*v1beta1.PipelineRun{parse.MustParsePipelineRun(t, `
metadata:
name: test-pipeline-fails-to-timeout
namespace: foo
spec:
pipelineRef:
name: test-pipeline
timeout: 1h0m0s
status:
conditions:
- message: running...
reason: Running
status: Unknown
type: Succeeded
startTime: "2021-12-31T22:59:00Z"
taskRuns:
test-pipeline-fails-to-timeouthello-world-1:
pipelineTaskName: hello-world-1
`)}
ps := []*v1beta1.Pipeline{parse.MustParsePipeline(t, `
metadata:
name: test-pipeline
namespace: foo
spec:
tasks:
- name: hello-world-1
taskRef:
name: hello-world
- name: hello-world-2
taskRef:
name: hello-world
`)}
tasks := []*v1beta1.Task{simpleHelloWorldTask}
taskRuns := []*v1beta1.TaskRun{
getTaskRun(
t,
"test-pipeline-fails-to-timeouthello-world-1",
prName,
"test-pipeline",
"hello-world",
corev1.ConditionUnknown,
),
}

cms := []*corev1.ConfigMap{withEnabledAlphaAPIFields(newFeatureFlagsConfigMap())}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: tasks,
TaskRuns: taskRuns,
ConfigMaps: cms,
}

testAssets, cancel := getPipelineRunController(t, d)
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients
failingReactorActivated := true

// Make the patch call fail, i.e. make it so that the controller fails to cancel the TaskRun
clients.Pipeline.PrependReactor("patch", "taskruns", func(action ktesting.Action) (bool, runtime.Object, error) {
return failingReactorActivated, nil, fmt.Errorf("i'm sorry Dave, i'm afraid i can't do that")
})

err := c.Reconciler.Reconcile(testAssets.Ctx, "foo/test-pipeline-fails-to-timeout")
if err == nil {
t.Errorf("Expected to see error returned from reconcile after failing to timeout TaskRun but saw none!")
}

// Check that the PipelineRun is still running with correct error message
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Get(testAssets.Ctx, "test-pipeline-fails-to-timeout", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}

if val, ok := reconciledRun.GetLabels()[pipeline.PipelineLabelKey]; !ok {
t.Fatalf("expected pipeline label")
} else if d := cmp.Diff("test-pipeline", val); d != "" {
t.Errorf("expected to see pipeline label. Diff %s", diff.PrintWantGot(d))
}

// The PipelineRun should not be timed out b/c we couldn't timeout the TaskRun
checkPipelineRunConditionStatusAndReason(t, reconciledRun, corev1.ConditionUnknown, ReasonCouldntTimeOut)
// The event here is "Normal" because in case we fail to timeout we leave the condition to unknown
// Further reconcile might converge then the status of the pipeline.
// See https://github.com/tektoncd/pipeline/issues/2647 for further details.
wantEvents := []string{
"Normal PipelineRunCouldntTimeOut PipelineRun \"test-pipeline-fails-to-timeout\" was timed out but had errors trying to time out TaskRuns and/or Runs",
"Warning InternalError 1 error occurred",
}
err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, prName, wantEvents)
if err != nil {
t.Errorf(err.Error())
}

// Turn off failing reactor and retry reconciliation
failingReactorActivated = false

err = c.Reconciler.Reconcile(testAssets.Ctx, "foo/test-pipeline-fails-to-timeout")
if err == nil {
// No error is ok
} else if ok, _ := controller.IsRequeueKey(err); !ok { // Requeue is also fine.
t.Errorf("Expected to timeout TaskRun successfully!")
}
}

func TestReconcilePropagateLabelsAndAnnotations(t *testing.T) {
names.TestingSeed()

Expand Down Expand Up @@ -3094,7 +3236,7 @@ status:
reconciledRun, _ := prt.reconcileRun("foo", "test-pipeline-retry-run-with-timeout", []string{}, false)

if len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus) != tc.retries {
t.Fatalf(" %d retry expected but %d ", tc.retries, len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
t.Fatalf(" %d retries expected but got %d ", tc.retries, len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
}

if status := reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded).Status; status != tc.conditionSucceeded {
Expand Down Expand Up @@ -3240,7 +3382,7 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 10 * time.Minute},
expected: &metav1.Duration{Duration: 20 * time.Minute},
}, {
name: "taskrun with elapsed time; task.timeout applies",
timeoutFields: &v1beta1.TimeoutFields{
Expand All @@ -3259,7 +3401,7 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 10 * time.Minute},
expected: &metav1.Duration{Duration: 15 * time.Minute},
}, {
name: "taskrun with elapsed time; timeouts.pipeline applies",
timeoutFields: &v1beta1.TimeoutFields{
Expand All @@ -3278,7 +3420,7 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 10 * time.Minute},
expected: &metav1.Duration{Duration: 15 * time.Minute},
}}

for _, tc := range tcs {
Expand All @@ -3296,7 +3438,7 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
}
if d := cmp.Diff(getTaskRunTimeout(context.TODO(), pr, tc.rpt, testClock), tc.expected); d != "" {
if d := cmp.Diff(tc.expected, getTaskRunTimeout(context.TODO(), pr, tc.rpt, testClock)); d != "" {
t.Errorf("Unexpected task run timeout. Diff %s", diff.PrintWantGot(d))
}
})
Expand Down Expand Up @@ -3449,7 +3591,7 @@ func TestGetFinallyTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 11 * time.Minute},
expected: &metav1.Duration{Duration: 15 * time.Minute},
}}

for _, tc := range tcs {
Expand Down
Loading

0 comments on commit 0d32280

Please sign in to comment.