From b0d7cff69c753ff0673686e8a4f4d32e1671a5b2 Mon Sep 17 00:00:00 2001 From: djshow832 <873581766@qq.com> Date: Wed, 8 Dec 2021 21:16:07 +0800 Subject: [PATCH 1/3] run in new txn --- ddl/db_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ executor/builder.go | 16 +++++++--------- kv/txn.go | 21 +++++++++++++++++++-- 3 files changed, 72 insertions(+), 11 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 31d50861bb0ab..677d48a335383 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/util/domainutil" "github.com/pingcap/tidb/util/israce" "github.com/pingcap/tidb/util/mock" + "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" "github.com/tikv/client-go/v2/testutils" @@ -7290,6 +7291,51 @@ func (s *testSerialDBSuite) TestJsonUnmarshalErrWhenPanicInCancellingPath(c *C) c.Assert(err.Error(), Equals, "[kv:1062]Duplicate entry '0' for key 'cc'") } +// Close issue #24172. +// See https://github.com/pingcap/tidb/issues/24172 +func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk1 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(id int)") + + var cancelErr error + var rs []sqlexec.RecordSet + hook := &ddl.TestDDLCallback{} + d := s.dom.DDL() + originalHook := d.GetHook() + d.(ddl.DDLForTest).SetHook(hook) + defer d.(ddl.DDLForTest).SetHook(originalHook) + + // Cancelling will be retried but it still fails, and adding index succeeds. + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { + stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("everytime")`), IsNil) + defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }() + rs, cancelErr = tk1.Se.Execute(context.Background(), stmt) + } + } + tk.MustExec("alter table t add index (id)") + c.Assert(cancelErr.Error(), Equals, "mock commit error") + + // Cancelling will be retried only once and it succeeds at the end. + var jobID int64 + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { + jobID = job.ID + stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("once")`), IsNil) + defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }() + rs, cancelErr = tk1.Se.Execute(context.Background(), stmt) + } + } + tk.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob) + c.Assert(cancelErr, IsNil) + result := tk1.ResultSetToResultWithCtx(context.Background(), rs[0], Commentf("cancel ddl job fails")) + result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID))) +} + // For Close issue #24288 // see https://github.com/pingcap/tidb/issues/24288 func (s *testDBSuite8) TestDdlMaxLimitOfIdentifier(c *C) { diff --git a/executor/builder.go b/executor/builder.go index 37cb9b63d5f01..f2a575d48560a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -283,15 +283,13 @@ func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Execu baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), jobIDs: v.JobIDs, } - txn, err := e.ctx.Txn(true) - if err != nil { - b.err = err - return nil - } - - e.errs, b.err = admin.CancelJobs(txn, e.jobIDs) - if b.err != nil { - return nil + // Run within a new transaction. If it runs within the session transaction, commit failure won't be reported to the user. + errInTxn := kv.RunInNewTxn(context.Background(), e.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { + e.errs, err = admin.CancelJobs(txn, e.jobIDs) + return + }) + if errInTxn != nil { + b.err = errInTxn } return e } diff --git a/kv/txn.go b/kv/txn.go index 1359e60abb47d..27afbdc7e537c 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -16,10 +16,12 @@ package kv import ( "context" + "errors" "math" "math/rand" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -58,9 +60,24 @@ func RunInNewTxn(ctx context.Context, store Storage, retryable bool, f func(ctx return err } - err = txn.Commit(ctx) + failpoint.Inject("mockCommitErrorInNewTxn", func(val failpoint.Value) { + if v := val.(string); len(v) > 0 { + switch v { + case "once": + if i == 0 { + err = ErrTxnRetryable + } + case "everytime": + failpoint.Return(errors.New("mock commit error")) + } + } + }) + if err == nil { - break + err = txn.Commit(ctx) + if err == nil { + break + } } if retryable && IsTxnRetryableError(err) { logutil.BgLogger().Warn("RunInNewTxn", From ed0505411e53d7191c9dba80299d9556f1c535bb Mon Sep 17 00:00:00 2001 From: djshow832 <873581766@qq.com> Date: Thu, 9 Dec 2021 17:21:56 +0800 Subject: [PATCH 2/3] rename failpoint params --- ddl/db_test.go | 4 ++-- kv/txn.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 677d48a335383..b570529429b1d 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -7311,7 +7311,7 @@ func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) { hook.OnJobRunBeforeExported = func(job *model.Job) { if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("everytime")`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`), IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }() rs, cancelErr = tk1.Se.Execute(context.Background(), stmt) } @@ -7325,7 +7325,7 @@ func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) { if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { jobID = job.ID stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("once")`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("retry_once")`), IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }() rs, cancelErr = tk1.Se.Execute(context.Background(), stmt) } diff --git a/kv/txn.go b/kv/txn.go index 27afbdc7e537c..7701dcd870b06 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -63,11 +63,11 @@ func RunInNewTxn(ctx context.Context, store Storage, retryable bool, f func(ctx failpoint.Inject("mockCommitErrorInNewTxn", func(val failpoint.Value) { if v := val.(string); len(v) > 0 { switch v { - case "once": + case "retry_once": if i == 0 { err = ErrTxnRetryable } - case "everytime": + case "no_retry": failpoint.Return(errors.New("mock commit error")) } } From 3b7e55dd49e7172f4c97b6ad63d220a55a896760 Mon Sep 17 00:00:00 2001 From: djshow832 <873581766@qq.com> Date: Thu, 9 Dec 2021 17:25:01 +0800 Subject: [PATCH 3/3] fix comments --- ddl/db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index b570529429b1d..deecc27b8974b 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -7307,7 +7307,7 @@ func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) { d.(ddl.DDLForTest).SetHook(hook) defer d.(ddl.DDLForTest).SetHook(originalHook) - // Cancelling will be retried but it still fails, and adding index succeeds. + // Test when cancelling cannot be retried and adding index succeeds. hook.OnJobRunBeforeExported = func(job *model.Job) { if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) @@ -7319,7 +7319,7 @@ func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) { tk.MustExec("alter table t add index (id)") c.Assert(cancelErr.Error(), Equals, "mock commit error") - // Cancelling will be retried only once and it succeeds at the end. + // Test when cancelling is retried only once and adding index is cancelled in the end. var jobID int64 hook.OnJobRunBeforeExported = func(job *model.Job) { if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {