diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 264fe80c31e0..069f802d1d26 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -3253,6 +3253,97 @@ func TestJobsRetry(t *testing.T) { close(rts.failOrCancelCheckCh) rts.check(t, jobs.StatusFailed) }) + + t.Run("retry reverting", func(t *testing.T) { + // - Create a job. + // - Fail the job in resume to cause the job to revert. + // - Fail the job in revert state using a non-retryable error. + // - Make sure that the jobs is retried and is again in the revert state. + rts := registryTestSuite{} + rts.setUp(t) + defer rts.tearDown() + j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j + + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) + + // Make Resume fail. + rts.resumeCh <- errors.New("failing resume to revert") + rts.mu.e.ResumeExit++ + + // Job is now reverting. + rts.mu.e.OnFailOrCancelStart = true + rts.failOrCancelCheckCh <- struct{}{} + rts.check(t, jobs.StatusReverting) + + // Fail the job in reverting state without a retryable error. + rts.failOrCancelCh <- errors.New("failing with a non-retryable error") + rts.mu.e.OnFailOrCancelExit = true + + // Job should be retried. + rts.mu.e.OnFailOrCancelStart = true + rts.failOrCancelCheckCh <- struct{}{} + rts.check(t, jobs.StatusReverting) + rts.failOrCancelCh <- nil + rts.mu.e.OnFailOrCancelExit = true + + close(rts.failOrCancelCh) + close(rts.failOrCancelCheckCh) + rts.check(t, jobs.StatusFailed) + }) + + t.Run("retry non-cancelable reverting", func(t *testing.T) { + // - Create a non-cancelable job. + // - Fail the job in resume with a permanent error to cause the job to revert. + // - Fail the job in revert state using a permanent error to ensure that the + // retries with a permanent error as well. + // - Make sure that the jobs is retried and is again in the revert state. + rts := registryTestSuite{} + rts.setUp(t) + defer rts.tearDown() + // Make mockJob non-cancelable, ensuring that non-cancelable jobs are retried in reverting state. + rts.mockJob.SetNonCancelable(rts.ctx, func(ctx context.Context, nonCancelable bool) bool { + return true + }) + j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j + + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) + + // Make Resume fail with a permanent error. + rts.resumeCh <- jobs.MarkAsPermanentJobError(errors.New("permanent error")) + rts.mu.e.ResumeExit++ + + // Job is now reverting. + rts.mu.e.OnFailOrCancelStart = true + rts.failOrCancelCheckCh <- struct{}{} + rts.check(t, jobs.StatusReverting) + + // Fail the job in reverting state with a permanent error a retryable error. + rts.failOrCancelCh <- jobs.MarkAsPermanentJobError(errors.New("permanent error")) + rts.mu.e.OnFailOrCancelExit = true + + // Job should be retried. + rts.mu.e.OnFailOrCancelStart = true + rts.failOrCancelCheckCh <- struct{}{} + rts.check(t, jobs.StatusReverting) + rts.failOrCancelCh <- nil + rts.mu.e.OnFailOrCancelExit = true + + close(rts.failOrCancelCh) + close(rts.failOrCancelCheckCh) + rts.check(t, jobs.StatusFailed) + }) } // TestExecutionLogToJSON tests conversion of an executionLog in jobs payload diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 8485a7b83953..f096ff7853b1 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1275,6 +1276,9 @@ func (r *Registry) stepThroughStateMachine( defer jm.CurrentlyRunning.Dec(1) err = resumer.OnFailOrCancel(onFailOrCancelCtx, execCtx) }() + if r.knobs.ModifyErrorAfterOnFailOrCancel != nil { + err = r.knobs.ModifyErrorAfterOnFailOrCancel(job.ID(), err) + } if successOnFailOrCancel := err == nil; successOnFailOrCancel { jm.FailOrCancelCompleted.Inc(1) // If the job has failed with any error different than canceled we @@ -1295,11 +1299,13 @@ func (r *Registry) stepThroughStateMachine( jm.FailOrCancelRetryError.Inc(1) return errors.Errorf("job %d: %s: restarting in background", job.ID(), err) } - // A non-cancelable job is always retried while reverting unless the error is marked as permanent. - if job.Payload().Noncancelable && !IsPermanentJobError(err) { + if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + // All reverting jobs are retried. jm.FailOrCancelRetryError.Inc(1) - return errors.Wrapf(err, "job %d: job is non-cancelable, restarting in background", job.ID()) + return errors.Wrapf(err, "job %d: failed to revert, restarting in background", job.ID()) } + // TODO(sajjad): Remove rest of the code in this case after v21.2. All reverting jobs + // are retried after v21.2. jm.FailOrCancelFailed.Inc(1) if sErr := (*InvalidStatusError)(nil); errors.As(err, &sErr) { if sErr.status != StatusPauseRequested { @@ -1322,6 +1328,9 @@ func (r *Registry) stepThroughStateMachine( telemetry.Inc(TelemetryMetrics[jobType].Failed) return jobErr case StatusRevertFailed: + // TODO(sajjad): Remove StatusRevertFailed and related code in other places in v22.1. + // v21.2 modified all reverting jobs to retry instead of go to revert-failed. Therefore, + // revert-failed state is not reachable after 21.2. if jobErr == nil { return errors.AssertionFailedf("job %d: has StatusRevertFailed but no error was provided", job.ID()) diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index ea0a1bf9c974..f3b27fdc6f0c 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -66,6 +67,10 @@ type TestingKnobs struct { // DisableAdoptions disables job adoptions. DisableAdoptions bool + + // ModifyErrorAfterOnFailOrCancel captures the error returned from OnFailOorCancel + // and sets the error to the returned value of this function. + ModifyErrorAfterOnFailOrCancel func(jobspb.JobID, error) error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index d46053e77e91..6cd8c0a2f77a 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -740,8 +740,7 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError( // than tables. For jobs intended to drop other types of descriptors, we do // nothing. if _, ok := desc.(catalog.TableDescriptor); !ok { - // Mark the error as permanent so that is not retried in reverting state. - return jobs.MarkAsPermanentJobError(errors.Newf("schema change jobs on databases and schemas cannot be reverted")) + return errors.Newf("schema change jobs on databases and schemas cannot be reverted") } // Check that we aren't queued behind another schema changer. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index c325a02c86bc..46051f2a7ffc 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -2187,6 +2187,12 @@ func TestSchemaUniqueColumnDropFailure(t *testing.T) { StartupMigrationManager: &startupmigrations.MigrationManagerTestingKnobs{ DisableBackfillMigrations: true, }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error { + // Allow jobs to terminate. + return nil } server, sqlDB, kvDB := serverutils.StartServer(t, params) defer server.Stopper().Stop(context.Background()) @@ -6365,7 +6371,7 @@ SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNI require.Regexp(t, "violates unique constraint", jobError) } -// TestPermanentErrorDuringRollback tests that a permanent error while rolling +// TestPermanentErrorDuringRollback tests that an error while rolling // back a schema change causes the job to fail, and that the appropriate error // is displayed in the jobs table. func TestPermanentErrorDuringRollback(t *testing.T) { @@ -6373,8 +6379,21 @@ func TestPermanentErrorDuringRollback(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + var sqlDB *gosql.DB + validateError := func(jobID jobspb.JobID, err error) error { + var jobIDFromTable jobspb.JobID + row := sqlDB.QueryRow("SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") + assert.NoError(t, row.Scan(&jobIDFromTable)) + if jobIDFromTable == jobID { + assert.Error(t, err, "the job must fail with an error") + // Allow the job to terminate. + return nil + } + return err + } runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) { - s, sqlDB, _ := serverutils.StartServer(t, params) + var s serverutils.TestServerInterface + s, sqlDB, _ = serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) _, err := sqlDB.Exec(` @@ -6394,7 +6413,7 @@ CREATE UNIQUE INDEX i ON t.test(v); var jobErr string row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") require.NoError(t, row.Scan(&jobID, &jobErr)) - require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr) + require.Regexp(t, `violates unique constraint "i"`, jobErr) if gcJobRecord { _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID) @@ -6427,9 +6446,11 @@ CREATE UNIQUE INDEX i ON t.test(v); return errors.New("permanent error") }, }, - // Decrease the adopt loop interval so that retries happen quickly. + // Decrease the adopt-loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = validateError // Don't GC the job record after the schema change, so we can test dropping // the table with a failed mutation job. runTest(t, params, false /* gcJobRecord */) @@ -6916,6 +6937,9 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { ctx := context.Background() t.Run("failed due to injected error", func(t *testing.T) { + const maxToRetry = 2 + var lastJobError error + retryCnt := int32(0) var s serverutils.TestServerInterface params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ @@ -6938,9 +6962,19 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { return nil }, }, - // Decrease the adopt loop interval so that retries happen quickly. + // Decrease the adopt-loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, err error) error { + if retryCnt == maxToRetry { + // Let the job complete after a few retries. + lastJobError = err + return nil + } + retryCnt++ + return err + } var db *gosql.DB s, db, _ = serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -6951,15 +6985,13 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { sqlDB.Exec(t, tc.setupStmts) sqlDB.ExpectErr(t, "injected permanent error", tc.scStmt) result := sqlDB.QueryStr(t, - `SELECT status, error FROM crdb_internal.jobs WHERE description ~ $1`, + `SELECT status FROM crdb_internal.jobs WHERE description ~ $1`, tc.jobRegex) require.Len(t, result, 1) - status, jobError := result[0][0], result[0][1] - require.Equal(t, string(jobs.StatusRevertFailed), status) + require.Equal(t, string(jobs.StatusFailed), result[0][0]) require.Regexp(t, - "cannot be reverted, manual cleanup may be required: "+ - "schema change jobs on databases and schemas cannot be reverted", - jobError) + "schema change jobs on databases and schemas cannot be reverted", + lastJobError) }) } }) @@ -7015,7 +7047,7 @@ func TestRevertingJobsOnDatabasesAndSchemas(t *testing.T) { return nil }, }, - // Decrease the adopt loop interval so that retries happen quickly. + // Decrease the adopt-loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } var db *gosql.DB @@ -7093,6 +7125,11 @@ func TestDropColumnAfterMutations(t *testing.T) { // Decrease the adopt loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + jobKnobs := params.Knobs.JobsTestingKnobs.(*jobs.TestingKnobs) + jobKnobs.ModifyErrorAfterOnFailOrCancel = func(_ jobspb.JobID, _ error) error { + // Allow jobs to terminate. + return nil + } s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx)