Skip to content

Commit

Permalink
*: remove infoschema.SchemaTables() API, replace it with SchemaTableI…
Browse files Browse the repository at this point in the history
…nfos() (#54664)

ref #50959
  • Loading branch information
tiancaiamao authored Jul 23, 2024
1 parent 2d8753b commit a18b3c5
Show file tree
Hide file tree
Showing 26 changed files with 396 additions and 303 deletions.
7 changes: 5 additions & 2 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,11 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
info := rc.dom.InfoSchema()
shcemas := info.AllSchemaNames()
for _, schema := range shcemas {
for _, table := range info.SchemaTables(schema) {
tableInfo := table.Meta()
tblInfos, err := info.SchemaTableInfos(ctx, schema)
if err != nil {
return nil, errors.Trace(err)
}
for _, tableInfo := range tblInfos {
if tableInfo.TiFlashReplica != nil && tableInfo.TiFlashReplica.Count > 0 {
return nil, errors.Errorf("exist table(s) have tiflash replica, please remove it before restore")
}
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,11 @@ func resetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage,
expectTiFlashStoreCount := uint64(0)
needTiFlash := false
for _, s := range allSchemaName {
for _, t := range info.SchemaTables(s) {
t := t.Meta()
tblInfos, err := info.SchemaTableInfos(ctx, s)
if err != nil {
return errors.Trace(err)
}
for _, t := range tblInfos {
if t.TiFlashReplica != nil {
expectTiFlashStoreCount = max(expectTiFlashStoreCount, t.TiFlashReplica.Count)
recorder.AddTable(t.ID, *t.TiFlashReplica)
Expand Down
19 changes: 12 additions & 7 deletions pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,10 @@ func (d *ddl) ModifySchemaSetTiFlashReplica(sctx sessionctx.Context, stmt *ast.A
return errors.Trace(dbterror.ErrUnsupportedTiFlashOperationForSysOrMemTable)
}

tbls := is.SchemaTables(dbInfo.Name)
tbls, err := is.SchemaTableInfos(context.Background(), dbInfo.Name)
if err != nil {
return errors.Trace(err)
}
total := len(tbls)
succ := 0
skip := 0
Expand All @@ -408,7 +411,7 @@ func (d *ddl) ModifySchemaSetTiFlashReplica(sctx sessionctx.Context, stmt *ast.A
if total == 0 {
return infoschema.ErrEmptyDatabase.GenWithStack("Empty database '%v'", dbName.O)
}
err := checkTiFlashReplicaCount(sctx, tiflashReplica.Count)
err = checkTiFlashReplicaCount(sctx, tiflashReplica.Count)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -421,7 +424,6 @@ func (d *ddl) ModifySchemaSetTiFlashReplica(sctx sessionctx.Context, stmt *ast.A
threshold := uint32(sctx.GetSessionVars().BatchPendingTiFlashCount)

for _, tbl := range tbls {
tbl := tbl.Meta()
done, killed := isSessionDone(sctx)
if done {
logutil.DDLLogger().Info("abort batch add TiFlash replica", zap.Int64("schemaID", dbInfo.ID), zap.Uint32("isKilled", killed))
Expand Down Expand Up @@ -666,7 +668,7 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) (er
return infoschema.ErrDatabaseDropExists.GenWithStackByArgs(stmt.Name)
}
fkCheck := ctx.GetSessionVars().ForeignKeyChecks
err = checkDatabaseHasForeignKeyReferred(is, old.Name, fkCheck)
err = checkDatabaseHasForeignKeyReferred(d.ctx, is, old.Name, fkCheck)
if err != nil {
return err
}
Expand Down Expand Up @@ -700,11 +702,14 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) (er
return nil
}
// Clear table locks hold by the session.
tbs := is.SchemaTables(stmt.Name)
tbs, err := is.SchemaTableInfos(d.ctx, stmt.Name)
if err != nil {
return errors.Trace(err)
}
lockTableIDs := make([]int64, 0)
for _, tb := range tbs {
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
lockTableIDs = append(lockTableIDs, tb.Meta().ID)
if ok, _ := ctx.CheckTableLocked(tb.ID); ok {
lockTableIDs = append(lockTableIDs, tb.ID)
}
}
ctx.ReleaseTableLockByTableIDs(lockTableIDs)
Expand Down
15 changes: 9 additions & 6 deletions pkg/ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,18 +601,21 @@ func (h *foreignKeyHelper) getTableFromStorage(is infoschema.InfoSchema, t *meta
return result, nil
}

func checkDatabaseHasForeignKeyReferred(is infoschema.InfoSchema, schema model.CIStr, fkCheck bool) error {
func checkDatabaseHasForeignKeyReferred(ctx context.Context, is infoschema.InfoSchema, schema model.CIStr, fkCheck bool) error {
if !fkCheck {
return nil
}
tables := is.SchemaTables(schema)
tables, err := is.SchemaTableInfos(ctx, schema)
if err != nil {
return errors.Trace(err)
}
tableNames := make([]ast.Ident, len(tables))
for i := range tables {
tableNames[i] = ast.Ident{Schema: schema, Name: tables[i].Meta().Name}
tableNames[i] = ast.Ident{Schema: schema, Name: tables[i].Name}
}
for _, tbl := range tables {
if referredFK := checkTableHasForeignKeyReferred(is, schema.L, tbl.Meta().Name.L, tableNames, fkCheck); referredFK != nil {
return errors.Trace(dbterror.ErrForeignKeyCannotDropParent.GenWithStackByArgs(tbl.Meta().Name, referredFK.ChildFKName, referredFK.ChildTable))
if referredFK := checkTableHasForeignKeyReferred(is, schema.L, tbl.Name.L, tableNames, fkCheck); referredFK != nil {
return errors.Trace(dbterror.ErrForeignKeyCannotDropParent.GenWithStackByArgs(tbl.Name, referredFK.ChildFKName, referredFK.ChildTable))
}
}
return nil
Expand All @@ -635,7 +638,7 @@ func checkDatabaseHasForeignKeyReferredInOwner(d *ddlCtx, t *meta.Meta, job *mod
if err != nil {
return errors.Trace(err)
}
err = checkDatabaseHasForeignKeyReferred(is, model.NewCIStr(job.SchemaName), fkCheck)
err = checkDatabaseHasForeignKeyReferred(d.ctx, is, model.NewCIStr(job.SchemaName), fkCheck)
if err != nil {
job.State = model.JobStateCancelled
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,11 @@ func CheckPlacementPolicyNotInUseFromInfoSchema(is infoschema.InfoSchema, policy
return dbterror.ErrPlacementPolicyInUse.GenWithStackByArgs(policy.Name)
}

for _, tbl := range is.SchemaTables(dbInfo.Name) {
tblInfo := tbl.Meta()
tblInfos, err := is.SchemaTableInfos(context.Background(), dbInfo.Name)
if err != nil {
return errors.Trace(err)
}
for _, tblInfo := range tblInfos {
if err := checkPlacementPolicyNotUsedByTable(tblInfo, policy); err != nil {
return err
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/ddl/tiflash_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
Expand Down Expand Up @@ -161,6 +162,15 @@ func setUpRPCService(t *testing.T, addr string, dom *domain.Domain, sm util.Sess
return srv, addr
}

func updateTableMeta(t *testing.T, store kv.Storage, dbID int64, tableInfo *model.TableInfo) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
return m.UpdateTable(dbID, tableInfo)
})
require.NoError(t, err)
}

func TestInfoSchemaForTiFlashReplica(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount", `return(true)`))
defer func() {
Expand All @@ -176,9 +186,12 @@ func TestInfoSchemaForTiFlashReplica(t *testing.T) {
tk.MustExec("create table t (a int, b int, index idx(a))")
tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';")
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE,PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0 0"))
tbl, err := domain.GetDomain(tk.Session()).InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
dom := domain.GetDomain(tk.Session())
tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tbl.Meta().TiFlashReplica.Available = true
updateTableMeta(t, store, tbl.Meta().DBID, tbl.Meta())
dom.Reload()
tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE,PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1 0"))
}

Expand Down
Loading

0 comments on commit a18b3c5

Please sign in to comment.