Skip to content

Commit

Permalink
jobs: retry non-cancelable running and all reverting jobs
Browse files Browse the repository at this point in the history
Previously, non-cancelable jobs were retried in running state
only if their errors were marked as retryable. Moreover, only
non-cancelable reverting jobs were retried by default. This
commit makes non-cancelable jobs always retry in running
state unless their error is marked as a permanent error. In
addition, this commit makes all reverting jobs to retry when
they fail. As a result, non-cancelable running jobs and all
reverting jobs do not fail due to transient errors.

Release justification: low-risk updates to new functionality.

Release note (general change): Non-cancelable jobs, such as
schema-change GC jobs, now do not fail unless they fail with
a permanent error. They retry with exponential-backoff if
they fail due to a transient error. Furthermore, Jobs that
perform reverting tasks do not fail. Instead, they are retried
with exponential-backoff if an error is encountered while
reverting. As a result, transient errors do not impact jobs that
are reverting.

Fixes: #66685
  • Loading branch information
Sajjad Rizvi committed Sep 13, 2021
1 parent 3dde366 commit d7ab27e
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 181 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {
<-rbrOpFinished
if !regionAlterCmd.shouldSucceed {
// Trigger a roll-back.
return errors.New("boom")
return jobs.MarkAsPermanentJobError(errors.New("boom"))
}
// Trod on.
return nil
Expand Down Expand Up @@ -544,7 +544,7 @@ func TestDroppingPrimaryRegionAsyncJobFailure(t *testing.T) {
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeExec: func() error {
return errors.New("yikes")
return jobs.MarkAsPermanentJobError(errors.New("yikes"))
},
RunAfterOnFailOrCancel: func() error {
mu.Lock()
Expand Down Expand Up @@ -609,7 +609,7 @@ func TestRollbackDuringAddDropRegionAsyncJobFailure(t *testing.T) {
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeMultiRegionUpdates: func() error {
return errors.New("boom")
return jobs.MarkAsPermanentJobError(errors.New("boom"))
},
},
// Decrease the adopt loop interval so that retries happen quickly.
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestRollbackDuringAddDropRegionPlacementRestricted(t *testing.T) {
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeMultiRegionUpdates: func() error {
return errors.New("boom")
return jobs.MarkAsPermanentJobError(errors.New("boom"))
},
},
// Decrease the adopt loop interval so that retries happen quickly.
Expand Down
258 changes: 197 additions & 61 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
rts.mu.Lock()
rts.mu.a.OnFailOrCancelExit = true
rts.mu.Unlock()
t.Log("Exiting OnFailOrCancel")
t.Log("Exiting FailOrCancel")
return err
}
},
Expand Down Expand Up @@ -744,17 +744,20 @@ func TestRegistryLifecycle(t *testing.T) {
})

// Attempt to mark success, but fail, but fail that also.
// TODO(ajwerner): This test seems a bit stale in that it really
// fails the resume rather than succeeding but failing to mark success.
// I think this is due to changes in responsibilities of the jobs
// lifecycle.
t.Run("fail marking success and fail OnFailOrCancel", func(t *testing.T) {
rts := registryTestSuite{}
var triedToMarkSucceeded atomic.Value
triedToMarkSucceeded.Store(false)
rts := registryTestSuite{beforeUpdate: func(orig, updated jobs.JobMetadata) error {
// Fail marking succeeded.
if updated.Status == jobs.StatusSucceeded {
triedToMarkSucceeded.Store(true)
return errors.New("injected failure at marking as succeeded")
}
return nil
}}
rts.setUp(t)
defer rts.tearDown()

// Make marking success fail.
rts.successErr = errors.New("injected failure at marking as succeeded")
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
Expand All @@ -764,20 +767,47 @@ func TestRegistryLifecycle(t *testing.T) {
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

// Let the resumer complete without error.
rts.resumeCh <- nil
rts.mu.e.ResumeExit++
rts.mu.e.Success = true
rts.mu.e.OnFailOrCancelStart = true

// The job is retried as we failed to mark the job successful.
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)
// Fail the resumer to transition to reverting state.
rts.resumeCh <- errors.New("injected error in resume")
rts.mu.e.ResumeExit++

// The job is now in state reverting and will never resume again because
// OnFailOrCancel also fails.
rts.check(t, jobs.StatusReverting)
//
// First retry.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
require.True(t, triedToMarkSucceeded.Load().(bool))
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- errors.New("injected failure while blocked in reverting")
rts.mu.e.OnFailOrCancelExit = true
close(rts.failOrCancelCheckCh)

// The job will be retried as all reverting jobs are retried.
//
// Second retry.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- errors.New("injected failure while blocked in reverting")
rts.check(t, jobs.StatusRevertFailed)
rts.mu.e.OnFailOrCancelExit = true

// The job will stay in reverting state. Let it fail to exit the test.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true

rts.check(t, jobs.StatusFailed)
})
// Succeed the job but inject an error actually marking the jobs successful.
// This could happen due to a transient network error or something like that.
Expand Down Expand Up @@ -827,12 +857,22 @@ func TestRegistryLifecycle(t *testing.T) {

// Fail the job, but also fail to mark it failed.
t.Run("fail marking failed", func(t *testing.T) {
rts := registryTestSuite{}
var triedToMarkFailed atomic.Value
triedToMarkFailed.Store(false)
rts := registryTestSuite{beforeUpdate: func(orig, updated jobs.JobMetadata) error {
if triedToMarkFailed.Load().(bool) == true {
return nil
}
if updated.Status == jobs.StatusFailed {
triedToMarkFailed.Store(true)
return errors.New("injected error while marking as failed")
}
return nil
}}
rts.setUp(t)
defer rts.tearDown()

// Make marking success fail.
rts.successErr = errors.New("resume failed")
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
Expand All @@ -845,16 +885,24 @@ func TestRegistryLifecycle(t *testing.T) {

rts.resumeCh <- errors.New("resume failed")
rts.mu.e.ResumeExit++

rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
close(rts.failOrCancelCheckCh)
// The job is now in state reverting and will never resume again.
rts.check(t, jobs.StatusReverting)
// The job is now in state reverting and will never resume again.
// Let revert complete without error so that the job is attempted to mark as failed.
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit = true

// But let it fail.
// We failed to mark the jobs as failed, resulting in the job to be retried.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
require.True(t, triedToMarkFailed.Load().(bool))
// Let the job complete to exit the test.
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true
rts.failOrCancelCh <- errors.New("resume failed")
rts.check(t, jobs.StatusRevertFailed)
rts.check(t, jobs.StatusFailed)
})

t.Run("OnPauseRequest", func(t *testing.T) {
Expand Down Expand Up @@ -3210,53 +3258,141 @@ func TestPauseReason(t *testing.T) {
}
}

// TestNonCancelableJobsRetry tests that a non-cancelable job is retried when
// failed with a non-retryable error.
func TestNonCancelableJobsRetry(t *testing.T) {
// TestJobsRetry tests that (1) non-cancelable jobs retry if they fail with an
// error marked as permanent, (2) reverting job always retry instead of failing.
func TestJobsRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Create a non-cancelable job.
// Fail the job in resume to cause the job to revert.
// Fail the job in revert state using a non-retryable error.
// Make sure that the jobs is retried and is again in the revert state.
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
// Make mockJob non-cancelable.
rts.mockJob.SetNonCancelable(rts.ctx, func(ctx context.Context, nonCancelable bool) bool {
return true
})
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)
t.Run("retry non-cancelable running", func(t *testing.T) {
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
// Make mockJob non-cancelable, ensuring that non-cancelable jobs are retried in running state.
rts.mockJob.SetNonCancelable(rts.ctx, func(ctx context.Context, nonCancelable bool) bool {
return true
})
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

// First job run in running state.
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)
// Make Resume fail.
rts.resumeCh <- errors.New("non-permanent error")
rts.mu.e.ResumeExit++

// Job should be retried in running state.
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)
rts.resumeCh <- jobs.MarkAsPermanentJobError(errors.New("permanent error"))
rts.mu.e.ResumeExit++

// Job should now revert.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit = true

close(rts.failOrCancelCh)
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
})

t.Run("retry reverting", func(t *testing.T) {
// - Create a job.
// - Fail the job in resume to cause the job to revert.
// - Fail the job in revert state using a non-retryable error.
// - Make sure that the jobs is retried and is again in the revert state.
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

// Make Resume fail.
rts.resumeCh <- errors.New("failing resume to revert")
rts.mu.e.ResumeExit++
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

// Job is now reverting.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
// Make Resume fail.
rts.resumeCh <- errors.New("failing resume to revert")
rts.mu.e.ResumeExit++

// Job is now reverting.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)

// Fail the job in reverting state without a retryable error.
rts.failOrCancelCh <- errors.New("failing with non-retryable error")
rts.mu.e.OnFailOrCancelExit = true
// Fail the job in reverting state without a retryable error.
rts.failOrCancelCh <- errors.New("failing with a non-retryable error")
rts.mu.e.OnFailOrCancelExit = true

// Job should be retried even though it is non-cancelable.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit = true
// Job should be retried.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit = true

close(rts.failOrCancelCh)
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
close(rts.failOrCancelCh)
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
})

t.Run("retry non-cancelable reverting", func(t *testing.T) {
// - Create a non-cancelable job.
// - Fail the job in resume with a permanent error to cause the job to revert.
// - Fail the job in revert state using a permanent error to ensure that the
// retries with a permanent error as well.
// - Make sure that the jobs is retried and is again in the revert state.
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
// Make mockJob non-cancelable, ensuring that non-cancelable jobs are retried in reverting state.
rts.mockJob.SetNonCancelable(rts.ctx, func(ctx context.Context, nonCancelable bool) bool {
return true
})
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

// Make Resume fail with a permanent error.
rts.resumeCh <- jobs.MarkAsPermanentJobError(errors.New("permanent error"))
rts.mu.e.ResumeExit++

// Job is now reverting.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)

// Fail the job in reverting state with a permanent error a retryable error.
rts.failOrCancelCh <- jobs.MarkAsPermanentJobError(errors.New("permanent error"))
rts.mu.e.OnFailOrCancelExit = true

// Job should be retried.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit = true

close(rts.failOrCancelCh)
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
})
}
4 changes: 3 additions & 1 deletion pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type JobTypeMetrics struct {
ResumeFailed *metric.Counter
FailOrCancelCompleted *metric.Counter
FailOrCancelRetryError *metric.Counter
FailOrCancelFailed *metric.Counter
// TODO (sajjad): FailOrCancelFailed metric is not updated after the modification
// of retrying all reverting jobs. Remove this metric in v22.1.
FailOrCancelFailed *metric.Counter
}

// MetricStruct implements the metric.Struct interface.
Expand Down
Loading

0 comments on commit d7ab27e

Please sign in to comment.