Skip to content

Commit

Permalink
ddl: get latest old table ID before replace view (#53720) (#55095)
Browse files Browse the repository at this point in the history
close #53673
  • Loading branch information
ti-chi-bot authored Aug 1, 2024
1 parent 45fd6e5 commit 6d7687a
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 28 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5845,13 +5845,13 @@ def go_deps():
name = "com_github_pingcap_failpoint",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/failpoint",
sha256 = "ea37b4dddfbccaaed9b313f9f1099dfbf00d36d768a8416d6d175cbe2c8b1254",
strip_prefix = "github.com/pingcap/failpoint@v0.0.0-20220801062533-2eaa32854a6c",
sha256 = "fb2b8146ff608751050d56d0506d271f75afa030d2d09d2da9e2bac562f6a866",
strip_prefix = "github.com/pingcap/failpoint@v0.0.0-20240528011301-b51a646c7c86",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ require (
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,8 @@ github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTw
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f h1:FxA+NgsdHNOv+/hZGxUh8Gb3WuZqgqmxDwztEOiA1v4=
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ=
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g=
github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z24=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ go_library(
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/stringutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/topsql",
"//pkg/util/topsql/state",
Expand Down Expand Up @@ -328,6 +327,7 @@ go_test(
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
Expand Down
6 changes: 0 additions & 6 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
Expand Down Expand Up @@ -383,11 +382,6 @@ type ddlCtx struct {

// reorgCtx is used for reorganization.
reorgCtx reorgContexts
// backfillCtx is used for backfill workers.
backfillCtx struct {
syncutil.RWMutex
jobCtxMap map[int64]*JobContext
}

jobCtx struct {
sync.RWMutex
Expand Down
41 changes: 41 additions & 0 deletions pkg/ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestGetDDLJobs(t *testing.T) {
Expand Down Expand Up @@ -151,6 +152,46 @@ func enQueueDDLJobs(t *testing.T, sess sessiontypes.Session, txn kv.Transaction,
}
}

func TestCreateViewConcurrently(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int);")
tk.MustExec("create view v as select * from t;")
var (
counterErr error
counter int
)
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/onDDLCreateView", func(job *model.Job) {
counter++
if counter > 1 {
counterErr = fmt.Errorf("create view job should not run concurrently")
return
}
})
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) {
if job.Type == model.ActionCreateView {
counter--
}
})
var eg errgroup.Group
for i := 0; i < 5; i++ {
eg.Go(func() error {
newTk := testkit.NewTestKit(t, store)
_, err := newTk.Exec("use test")
if err != nil {
return err
}
_, err = newTk.Exec("create or replace view v as select * from t;")
return err
})
}
err := eg.Wait()
require.NoError(t, err)
require.NoError(t, counterErr)
}

func TestCreateDropCreateTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
d.wg.Run(func() {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
defer func() {
failpoint.InjectCall("afterDelivery2Worker", job)
d.runningJobs.remove(job)
asyncNotify(d.ddlJobCh)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func getPlacementPolicyByName(d *ddlCtx, t *meta.Meta, policyName model.CIStr) (
}

is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
// Use cached policy.
policy, ok := is.PolicyByName(policyName)
if ok {
Expand Down Expand Up @@ -320,7 +320,7 @@ func checkPlacementPolicyNotInUse(d *ddlCtx, t *meta.Meta, policy *model.PolicyI
return err
}
is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
err = CheckPlacementPolicyNotInUseFromInfoSchema(is, policy)
} else {
err = CheckPlacementPolicyNotInUseFromMeta(t, policy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model
return err
}
is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo)
}
return checkSchemaNotExistsFromStore(t, schemaID, dbInfo)
Expand Down
63 changes: 55 additions & 8 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,19 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
var orReplace bool
var oldTbInfoID int64
if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil {
var _placeholder int64 // oldTblInfoID
if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tbInfo.State = model.StateNone
err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L)

oldTableID, err := findTableIDByName(d, t, schemaID, tbInfo.Name.L)
if infoschema.ErrTableNotExists.Equal(err) {
err = nil
}
failpoint.InjectCall("onDDLCreateView", job)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) {
job.State = model.JobStateCancelled
Expand All @@ -329,13 +334,13 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
// none -> public
tbInfo.State = model.StatePublic
tbInfo.UpdateTS = t.StartTS
if oldTbInfoID > 0 && orReplace {
err = t.DropTableOrView(schemaID, job.SchemaName, oldTbInfoID, tbInfo.Name.L)
if oldTableID > 0 && orReplace {
err = t.DropTableOrView(schemaID, job.SchemaName, oldTableID, tbInfo.Name.L)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
err = t.GetAutoIDAccessors(schemaID, oldTbInfoID).Del()
err = t.GetAutoIDAccessors(schemaID, oldTableID).Del()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -1505,7 +1510,7 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri
return err
}
is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
return checkTableNotExistsFromInfoSchema(is, schemaID, tableName)
}

Expand All @@ -1519,7 +1524,7 @@ func checkTableNotExistsByName(d *ddlCtx, t *meta.Meta, schemaID int64, schemaNa
return err
}
is := d.infoCache.GetLatest()
if is.SchemaMetaVersion() == currVer {
if is != nil && is.SchemaMetaVersion() == currVer {
return checkTableNotExistsFromInfoSchema(is, schemaID, tableName)
}
return t.CheckTableNameNotExists(t.TableNameKey(schemaName, tableName))
Expand Down Expand Up @@ -1593,6 +1598,48 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string
return nil
}

func findTableIDByName(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) {
// Try to use memory schema info to check first.
currVer, err := t.GetSchemaVersion()
if err != nil {
return 0, err
}
is := d.infoCache.GetLatest()
if is != nil && is.SchemaMetaVersion() == currVer {
return findTableIDFromInfoSchema(is, schemaID, tableName)
}

return findTableIDFromStore(t, schemaID, tableName)
}

func findTableIDFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) (int64, error) {
schema, ok := is.SchemaByID(schemaID)
if !ok {
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
}
tbl, err := is.TableByName(schema.Name, model.NewCIStr(tableName))
if err != nil {
return 0, err
}
return tbl.Meta().ID, nil
}

func findTableIDFromStore(t *meta.Meta, schemaID int64, tableName string) (int64, error) {
tbls, err := t.ListSimpleTables(schemaID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
}
return 0, errors.Trace(err)
}
for _, tbl := range tbls {
if tbl.Name.L == tableName {
return tbl.ID, nil
}
}
return 0, infoschema.ErrTableNotExists.FastGenByArgs(tableName)
}

// updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information
func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) (
ver int64, err error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ func TestCreateView(t *testing.T) {
}
ctx.SetValue(sessionctx.QueryString, "skip")
err = d.DoDDLJob(ctx, job)
require.Error(t, err)
// The non-existing table id in job args will not be considered anymore.
require.NoError(t, err)
}

func checkTableCacheTest(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {
Expand Down

0 comments on commit 6d7687a

Please sign in to comment.