Skip to content

Commit

Permalink
jobs: retry all reverting jobs
Browse files Browse the repository at this point in the history
Previously, only non-cancelable reverting jobs were retried
by default. This commit makes all reverting jobs to retry when
they fail. As a result, reverting jobs do not fail due to
transient errors.

Release justification: a bug fix and low-risk updates to
new functionality.

Release note: Jobs that perform reverting tasks do not
fail. Instead, they are retried with exponential-backoff
if an error is encountered while reverting. As a result,
transient errors do not impact jobs that are reverting.

Fixes: cockroachdb#66685
  • Loading branch information
Sajjad Rizvi committed Aug 26, 2021
1 parent 84562fa commit ac1a9b9
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 17 deletions.
91 changes: 91 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3253,6 +3253,97 @@ func TestJobsRetry(t *testing.T) {
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
})

t.Run("retry reverting", func(t *testing.T) {
// - Create a job.
// - Fail the job in resume to cause the job to revert.
// - Fail the job in revert state using a non-retryable error.
// - Make sure that the jobs is retried and is again in the revert state.
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

// Make Resume fail.
rts.resumeCh <- errors.New("failing resume to revert")
rts.mu.e.ResumeExit++

// Job is now reverting.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)

// Fail the job in reverting state without a retryable error.
rts.failOrCancelCh <- errors.New("failing with a non-retryable error")
rts.mu.e.OnFailOrCancelExit = true

// Job should be retried.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit = true

close(rts.failOrCancelCh)
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
})

t.Run("retry non-cancelable reverting", func(t *testing.T) {
// - Create a non-cancelable job.
// - Fail the job in resume with a permanent error to cause the job to revert.
// - Fail the job in revert state using a permanent error to ensure that the
// retries with a permanent error as well.
// - Make sure that the jobs is retried and is again in the revert state.
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()
// Make mockJob non-cancelable, ensuring that non-cancelable jobs are retried in reverting state.
rts.mockJob.SetNonCancelable(rts.ctx, func(ctx context.Context, nonCancelable bool) bool {
return true
})
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

// Make Resume fail with a permanent error.
rts.resumeCh <- jobs.MarkAsPermanentJobError(errors.New("permanent error"))
rts.mu.e.ResumeExit++

// Job is now reverting.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)

// Fail the job in reverting state with a permanent error a retryable error.
rts.failOrCancelCh <- jobs.MarkAsPermanentJobError(errors.New("permanent error"))
rts.mu.e.OnFailOrCancelExit = true

// Job should be retried.
rts.mu.e.OnFailOrCancelStart = true
rts.failOrCancelCheckCh <- struct{}{}
rts.check(t, jobs.StatusReverting)
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit = true

close(rts.failOrCancelCh)
close(rts.failOrCancelCheckCh)
rts.check(t, jobs.StatusFailed)
})
}

// TestExecutionLogToJSON tests conversion of an executionLog in jobs payload
Expand Down
15 changes: 12 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -1275,6 +1276,9 @@ func (r *Registry) stepThroughStateMachine(
defer jm.CurrentlyRunning.Dec(1)
err = resumer.OnFailOrCancel(onFailOrCancelCtx, execCtx)
}()
if r.knobs.ModifyErrorAfterOnFailOrCancel != nil {
err = r.knobs.ModifyErrorAfterOnFailOrCancel(job.ID(), err)
}
if successOnFailOrCancel := err == nil; successOnFailOrCancel {
jm.FailOrCancelCompleted.Inc(1)
// If the job has failed with any error different than canceled we
Expand All @@ -1295,11 +1299,13 @@ func (r *Registry) stepThroughStateMachine(
jm.FailOrCancelRetryError.Inc(1)
return errors.Errorf("job %d: %s: restarting in background", job.ID(), err)
}
// A non-cancelable job is always retried while reverting unless the error is marked as permanent.
if job.Payload().Noncancelable && !IsPermanentJobError(err) {
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// All reverting jobs are retried.
jm.FailOrCancelRetryError.Inc(1)
return errors.Wrapf(err, "job %d: job is non-cancelable, restarting in background", job.ID())
return errors.Wrapf(err, "job %d: failed to revert, restarting in background", job.ID())
}
// TODO(sajjad): Remove rest of the code in this case after v21.2. All reverting jobs
// are retried after v21.2.
jm.FailOrCancelFailed.Inc(1)
if sErr := (*InvalidStatusError)(nil); errors.As(err, &sErr) {
if sErr.status != StatusPauseRequested {
Expand All @@ -1322,6 +1328,9 @@ func (r *Registry) stepThroughStateMachine(
telemetry.Inc(TelemetryMetrics[jobType].Failed)
return jobErr
case StatusRevertFailed:
// TODO(sajjad): Remove StatusRevertFailed and related code in other places in v22.1.
// v21.2 modified all reverting jobs to retry instead of go to revert-failed. Therefore,
// revert-failed state is not reachable after 21.2.
if jobErr == nil {
return errors.AssertionFailedf("job %d: has StatusRevertFailed but no error was provided",
job.ID())
Expand Down
5 changes: 5 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -66,6 +67,10 @@ type TestingKnobs struct {

// DisableAdoptions disables job adoptions.
DisableAdoptions bool

// ModifyErrorAfterOnFailOrCancel captures the error returned from OnFailOorCancel
// and sets the error to the returned value of this function.
ModifyErrorAfterOnFailOrCancel func(jobspb.JobID, error) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,7 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError(
// than tables. For jobs intended to drop other types of descriptors, we do
// nothing.
if _, ok := desc.(catalog.TableDescriptor); !ok {
// Mark the error as permanent so that is not retried in reverting state.
return jobs.MarkAsPermanentJobError(errors.Newf("schema change jobs on databases and schemas cannot be reverted"))
return errors.Newf("schema change jobs on databases and schemas cannot be reverted")
}

// Check that we aren't queued behind another schema changer.
Expand Down
61 changes: 49 additions & 12 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,12 @@ func TestSchemaUniqueColumnDropFailure(t *testing.T) {
StartupMigrationManager: &startupmigrations.MigrationManagerTestingKnobs{
DisableBackfillMigrations: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error {
// Allow jobs to terminate.
return nil
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())
Expand Down Expand Up @@ -6365,16 +6371,29 @@ SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNI
require.Regexp(t, "violates unique constraint", jobError)
}

// TestPermanentErrorDuringRollback tests that a permanent error while rolling
// TestPermanentErrorDuringRollback tests that an error while rolling
// back a schema change causes the job to fail, and that the appropriate error
// is displayed in the jobs table.
func TestPermanentErrorDuringRollback(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var sqlDB *gosql.DB
validateError := func(jobID jobspb.JobID, err error) error {
var jobIDFromTable jobspb.JobID
row := sqlDB.QueryRow("SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
assert.NoError(t, row.Scan(&jobIDFromTable))
if jobIDFromTable == jobID {
assert.Error(t, err, "the job must fail with an error")
// Allow the job to terminate.
return nil
}
return err
}
runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) {
s, sqlDB, _ := serverutils.StartServer(t, params)
var s serverutils.TestServerInterface
s, sqlDB, _ = serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`
Expand All @@ -6394,7 +6413,7 @@ CREATE UNIQUE INDEX i ON t.test(v);
var jobErr string
row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobID, &jobErr))
require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr)
require.Regexp(t, `violates unique constraint "i"`, jobErr)

if gcJobRecord {
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID)
Expand Down Expand Up @@ -6427,9 +6446,11 @@ CREATE UNIQUE INDEX i ON t.test(v);
return errors.New("permanent error")
},
},
// Decrease the adopt loop interval so that retries happen quickly.
// Decrease the adopt-loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = validateError
// Don't GC the job record after the schema change, so we can test dropping
// the table with a failed mutation job.
runTest(t, params, false /* gcJobRecord */)
Expand Down Expand Up @@ -6916,6 +6937,9 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
ctx := context.Background()

t.Run("failed due to injected error", func(t *testing.T) {
const maxToRetry = 2
var lastJobError error
retryCnt := int32(0)
var s serverutils.TestServerInterface
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
Expand All @@ -6938,9 +6962,19 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
return nil
},
},
// Decrease the adopt loop interval so that retries happen quickly.
// Decrease the adopt-loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, err error) error {
if retryCnt == maxToRetry {
// Let the job complete after a few retries.
lastJobError = err
return nil
}
retryCnt++
return err
}
var db *gosql.DB
s, db, _ = serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
Expand All @@ -6951,15 +6985,13 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
sqlDB.Exec(t, tc.setupStmts)
sqlDB.ExpectErr(t, "injected permanent error", tc.scStmt)
result := sqlDB.QueryStr(t,
`SELECT status, error FROM crdb_internal.jobs WHERE description ~ $1`,
`SELECT status FROM crdb_internal.jobs WHERE description ~ $1`,
tc.jobRegex)
require.Len(t, result, 1)
status, jobError := result[0][0], result[0][1]
require.Equal(t, string(jobs.StatusRevertFailed), status)
require.Equal(t, string(jobs.StatusFailed), result[0][0])
require.Regexp(t,
"cannot be reverted, manual cleanup may be required: "+
"schema change jobs on databases and schemas cannot be reverted",
jobError)
"schema change jobs on databases and schemas cannot be reverted",
lastJobError)
})
}
})
Expand Down Expand Up @@ -7015,7 +7047,7 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) {
return nil
},
},
// Decrease the adopt loop interval so that retries happen quickly.
// Decrease the adopt-loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
var db *gosql.DB
Expand Down Expand Up @@ -7093,6 +7125,11 @@ func TestDropColumnAfterMutations(t *testing.T) {
// Decrease the adopt loop interval so that retries happen quickly.
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}
jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs)
jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error {
// Allow jobs to terminate.
return nil
}

s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
Expand Down

0 comments on commit ac1a9b9

Please sign in to comment.