Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple TaskRun startTime from pod start time ⌚ #780

Merged
merged 3 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ func (pr *PipelineRunStatus) GetCondition(t apis.ConditionType) *apis.Condition
}

// InitializeConditions will set all conditions in pipelineRunCondSet to unknown for the PipelineRun
// and set the started time to the current time
func (pr *PipelineRunStatus) InitializeConditions() {
if pr.TaskRuns == nil {
pr.TaskRuns = make(map[string]*PipelineRunTaskRunStatus)
}
if pr.StartTime.IsZero() {
pr.StartTime = &metav1.Time{time.Now()}
pr.StartTime = &metav1.Time{Time: time.Now()}
}
pipelineRunCondSet.Manage(pr).InitializeConditions()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestPipelineRunHasStarted(t *testing.T) {
}, {
name: "prWithStartTime",
prStatus: PipelineRunStatus{
StartTime: &metav1.Time{time.Now()},
StartTime: &metav1.Time{Time: time.Now()},
},
expectedValue: true,
}, {
Expand Down
12 changes: 10 additions & 2 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ type TaskRunStatus struct {
func (tr *TaskRunStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return taskRunCondSet.Manage(tr).GetCondition(t)
}

// InitializeConditions will set all conditions in taskRunCondSet to unknown for the TaskRun
// and set the started time to the current time
func (tr *TaskRunStatus) InitializeConditions() {
if tr.StartTime.IsZero() {
tr.StartTime = &metav1.Time{time.Now()}
tr.StartTime = &metav1.Time{Time: time.Now()}
}
taskRunCondSet.Manage(tr).InitializeConditions()
}
Expand Down Expand Up @@ -213,7 +216,7 @@ func (tr *TaskRun) GetPipelineRunPVCName() string {
return ""
}

// HasPipeluneRunOwnerReference returns true of TaskRun has
// HasPipelineRunOwnerReference returns true of TaskRun has
// owner reference of type PipelineRun
func (tr *TaskRun) HasPipelineRunOwnerReference() bool {
for _, ref := range tr.GetOwnerReferences() {
Expand All @@ -229,6 +232,11 @@ func (tr *TaskRun) IsDone() bool {
return !tr.Status.GetCondition(apis.ConditionSucceeded).IsUnknown()
}

// HasStarted function check whether taskrun has valid start time set in its status
func (tr *TaskRun) HasStarted() bool {
return tr.Status.StartTime != nil && !tr.Status.StartTime.IsZero()
}

// IsCancelled returns true if the TaskRun's spec status is set to Cancelled state
func (tr *TaskRun) IsCancelled() bool {
return tr.Spec.Status == TaskRunSpecStatusCancelled
Expand Down
39 changes: 39 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/knative/pkg/apis"
Expand Down Expand Up @@ -163,3 +164,41 @@ func TestTaskRunKey(t *testing.T) {
t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, tr.GetRunKey())
}
}

func TestTaskRunHasStarted(t *testing.T) {
params := []struct {
name string
trStatus TaskRunStatus
expectedValue bool
}{{
name: "trWithNoStartTime",
trStatus: TaskRunStatus{},
expectedValue: false,
}, {
name: "trWithStartTime",
trStatus: TaskRunStatus{
StartTime: &metav1.Time{Time: time.Now()},
},
expectedValue: true,
}, {
name: "trWithZeroStartTime",
trStatus: TaskRunStatus{
StartTime: &metav1.Time{},
},
expectedValue: false,
}}
for _, tc := range params {
t.Run(tc.name, func(t *testing.T) {
tr := &TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "prunname",
Namespace: "testns",
},
Status: tc.trStatus,
}
if tr.HasStarted() != tc.expectedValue {
t.Fatalf("Expected taskrun HasStarted() to return %t but got %t", tc.expectedValue, tr.HasStarted())
}
})
}
}
100 changes: 31 additions & 69 deletions pkg/reconciler/timeout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,6 @@ func (t *TimeoutSet) getOrCreateFinishedChan(runObj StatusKey) chan bool {
return finished
}

// StatusLock function acquires lock for taskrun/pipelinerun status key
func (t *TimeoutSet) StatusLock(runObj StatusKey) {
m, _ := t.statusMap.LoadOrStore(runObj.GetRunKey(), &sync.Mutex{})
mut := m.(*sync.Mutex)
mut.Lock()
}

// StatusUnlock function releases lock for taskrun/pipelinerun status key
func (t *TimeoutSet) StatusUnlock(runObj StatusKey) {
m, ok := t.statusMap.Load(runObj.GetRunKey())
if !ok {
return
}
mut := m.(*sync.Mutex)
mut.Unlock()
}

func getTimeout(d *metav1.Duration) time.Duration {
timeout := defaultTimeout
if d != nil {
Expand All @@ -131,7 +114,9 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) {
if pipelineRun.IsDone() || pipelineRun.IsCancelled() {
continue
}
go t.WaitPipelineRun(&pipelineRun)
if pipelineRun.HasStarted() {
go t.WaitPipelineRun(&pipelineRun, pipelineRun.Status.StartTime)
}
}
}

Expand Down Expand Up @@ -162,74 +147,51 @@ func (t *TimeoutSet) checkTaskRunTimeouts(namespace string) {
if taskrun.IsDone() || taskrun.IsCancelled() {
continue
}
go t.WaitTaskRun(&taskrun)
if taskrun.HasStarted() {
go t.WaitTaskRun(&taskrun, taskrun.Status.StartTime)
}
}
}

// WaitTaskRun function creates a blocking function for taskrun to wait for
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun) {
timeout := getTimeout(tr.Spec.Timeout)
runtime := time.Duration(0)

t.StatusLock(tr)
if tr.Status.StartTime != nil && !tr.Status.StartTime.Time.IsZero() {
runtime = time.Since(tr.Status.StartTime.Time)
}
t.StatusUnlock(tr)
timeout -= runtime
finished := t.getOrCreateFinishedChan(tr)
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun, startTime *metav1.Time) {
t.waitRun(tr, getTimeout(tr.Spec.Timeout), startTime, t.taskRunCallbackFunc)
}

defer t.Release(tr)
// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun, startTime *metav1.Time) {
t.waitRun(pr, getTimeout(pr.Spec.Timeout), startTime, t.pipelineRunCallbackFunc)
}

select {
case <-t.stopCh:
// we're stopping, give up
return
case <-finished:
// taskrun finished, we can stop watching
func (t *TimeoutSet) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) {
if startTime == nil {
t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey())
return
case <-time.After(timeout):
t.StatusLock(tr)
defer t.StatusUnlock(tr)
if t.taskRunCallbackFunc != nil {
t.taskRunCallbackFunc(tr)
} else {
defaultFunc(tr)
}
}
}
runtime := time.Since(startTime.Time)
finished := t.getOrCreateFinishedChan(runObj)

// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun) {
timeout := getTimeout(pr.Spec.Timeout)

runtime := time.Duration(0)
t.StatusLock(pr)
if pr.Status.StartTime != nil && !pr.Status.StartTime.Time.IsZero() {
runtime = time.Since(pr.Status.StartTime.Time)
}
t.StatusUnlock(pr)
timeout -= runtime
finished := t.getOrCreateFinishedChan(pr)
defer t.Release(runObj)

defer t.Release(pr)
t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime)

select {
case <-t.stopCh:
// we're stopping, give up
t.logger.Info("Stopping timeout timer for %s", runObj.GetRunKey())
return
case <-finished:
// pipelinerun finished, we can stop watching
t.logger.Info("%s finished, stopping the timeout timer", runObj.GetRunKey())
return
case <-time.After(timeout):
t.StatusLock(pr)
defer t.StatusUnlock(pr)
if t.pipelineRunCallbackFunc != nil {
t.pipelineRunCallbackFunc(pr)
case <-time.After(timeout - runtime):
t.logger.Info("Timeout timer for %s has timed out (started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime, timeout, time.Since(startTime.Time))
if callback != nil {
callback(runObj)
} else {
defaultFunc(pr)
defaultFunc(runObj)
}
}
}
4 changes: 2 additions & 2 deletions pkg/reconciler/timeout_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) {
}
return true, nil
}); err != nil {
t.Fatalf("Expected %s callback to be %t but got callback to be %#v", tc.name, tc.expectCallback, gotCallback)
t.Fatalf("Expected %s callback to be %t but got error: %s", tc.name, tc.expectCallback, err)
}
})
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) {
}
return true, nil
}); err != nil {
t.Fatalf("Expected %s callback to be %t but got callback to be %#+v", tc.name, tc.expectCallback, gotCallback)
t.Fatalf("Expected %s callback to be %t but got error: %s", tc.name, tc.expectCallback, err)
}
})
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,13 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {

// Don't modify the informer's copy.
pr := original.DeepCopy()
c.timeoutHandler.StatusLock(pr)
if !pr.HasStarted() {
pr.Status.InitializeConditions()
// start goroutine to track pipelinerun timeout only startTime is not set
go c.timeoutHandler.WaitPipelineRun(pr)
go c.timeoutHandler.WaitPipelineRun(pr, pr.Status.StartTime)
} else {
pr.Status.InitializeConditions()
}
pr.Status.InitializeConditions()

c.timeoutHandler.StatusUnlock(original)

if pr.IsDone() {
c.timeoutHandler.Release(pr)
Expand Down Expand Up @@ -372,10 +371,8 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
}
}
before := pr.Status.GetCondition(apis.ConditionSucceeded)
c.timeoutHandler.StatusLock(pr)
after := resources.GetPipelineConditionStatus(pr.Name, pipelineState, c.Logger, pr.Status.StartTime, pr.Spec.Timeout)
pr.Status.SetCondition(after)
c.timeoutHandler.StatusUnlock(pr)
reconciler.EmitEvent(c.Recorder, before, after, pr)

updateTaskRunsStatus(pr, pipelineState)
Expand Down
10 changes: 6 additions & 4 deletions pkg/reconciler/v1alpha1/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {

// Don't modify the informer's copy.
tr := original.DeepCopy()

// If the TaskRun is just starting, this will also set the starttime,
// from which the timeout will immediately begin counting down.
tr.Status.InitializeConditions()

if tr.IsDone() {
Expand Down Expand Up @@ -303,7 +306,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
c.Logger.Errorf("Failed to create build pod for task %q :%v", err, tr.Name)
return nil
}
go c.timeoutHandler.WaitTaskRun(tr)
go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime)
}
if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil {
c.Logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err)
Expand All @@ -312,9 +315,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error

before := tr.Status.GetCondition(apis.ConditionSucceeded)

c.timeoutHandler.StatusLock(tr)
updateStatusFromPod(tr, pod)
c.timeoutHandler.StatusUnlock(tr)

after := tr.Status.GetCondition(apis.ConditionSucceeded)

Expand All @@ -336,7 +337,6 @@ func updateStatusFromPod(taskRun *v1alpha1.TaskRun, pod *corev1.Pod) {
})
}

taskRun.Status.StartTime = &pod.CreationTimestamp
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this is the line that actually fixes the problem XD

taskRun.Status.PodName = pod.Name

taskRun.Status.Steps = []v1alpha1.StepState{}
Expand Down Expand Up @@ -534,6 +534,8 @@ func (c *Reconciler) checkTimeout(tr *v1alpha1.TaskRun, ts *v1alpha1.TaskSpec, d
if tr.Spec.Timeout != nil {
timeout := tr.Spec.Timeout.Duration
runtime := time.Since(tr.Status.StartTime.Time)

c.Logger.Infof("Checking timeout for TaskRun %q (startTime %s, timeout %s, runtime %s)", tr.Name, tr.Status.StartTime, timeout, runtime)
if runtime > timeout {
c.Logger.Infof("TaskRun %q is timeout (runtime %s over %s), deleting pod", tr.Name, runtime, timeout)
if err := dp(tr.Status.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
Expand Down
Loading