Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: DROP TABLE/VIEW/SEQUENCE now use XXXStmt as parameter #35741

Merged
merged 6 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ type DDL interface {
DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error
DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr, ifExists bool) error
Expand All @@ -122,7 +122,7 @@ type DDL interface {
UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error
RepairTable(ctx sessionctx.Context, table *ast.TableName, createStmt *ast.CreateTableStmt) error
CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
DropSequence(ctx sessionctx.Context, tableIdent ast.Ident, ifExists bool) (err error)
DropSequence(ctx sessionctx.Context, stmt *ast.DropSequenceStmt) (err error)
AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
Expand Down
237 changes: 156 additions & 81 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5326,71 +5326,172 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
return errors.Trace(err)
}

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}
// If one drop those tables by mistake, it's difficult to recover.
// In the worst case, the whole TiDB cluster fails to bootstrap, so we prevent user from dropping them.
var systemTables = map[string]struct{}{
"tidb": {},
"gc_delete_range": {},
"gc_delete_range_done": {},
}

if tb.Meta().IsView() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}
if tb.Meta().IsSequence() {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
func isSystemTable(schema, table string) bool {
if schema != "mysql" {
return false
}
if tb.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Table")
if _, ok := systemTables[table]; ok {
return true
}
return false
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: schema.State,
TableName: tb.Meta().Name.L,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
}
type objectType int

const (
tableObject objectType = iota
viewObject
sequenceObject
)

// dropTableObject provides common logic to DROP TABLE/VIEW/SEQUENCE.
func (d *ddl) dropTableObject(
ctx sessionctx.Context,
objects []*ast.TableName,
ifExists bool,
tableObjectType objectType,
) error {
var (
notExistTables []string
sessVars = ctx.GetSessionVars()
is = d.GetInfoSchemaWithInterceptor(ctx)
dropExistErr *terror.Error
jobType model.ActionType
)

switch tableObjectType {
case tableObject:
dropExistErr = infoschema.ErrTableDropExists
jobType = model.ActionDropTable
case viewObject:
dropExistErr = infoschema.ErrTableDropExists
jobType = model.ActionDropView
case sequenceObject:
dropExistErr = infoschema.ErrSequenceDropExists
jobType = model.ActionDropSequence
}

for _, tn := range objects {
fullti := ast.Ident{Schema: tn.Schema, Name: tn.Name}
schema, ok := is.SchemaByName(tn.Schema)
if !ok {
// TODO: we should return special error for table not exist, checking "not exist" is not enough,
// because some other errors may contain this error string too.
notExistTables = append(notExistTables, fullti.String())
continue
}
tableInfo, err := is.TableByName(tn.Schema, tn.Name)
if err != nil && infoschema.ErrTableNotExists.Equal(err) {
notExistTables = append(notExistTables, fullti.String())
continue
} else if err != nil {
return err
}

// prechecks before build DDL job

// Protect important system table from been dropped by a mistake.
// I can hardly find a case that a user really need to do this.
if isSystemTable(tn.Schema.L, tn.Name.L) {
return errors.Errorf("Drop tidb system table '%s.%s' is forbidden", tn.Schema.L, tn.Name.L)
}
switch tableObjectType {
case tableObject:
if !tableInfo.Meta().IsBaseTable() {
notExistTables = append(notExistTables, fullti.String())
continue
}

tempTableType := tableInfo.Meta().TempTableType
if config.CheckTableBeforeDrop && tempTableType == model.TempTableNone {
logutil.BgLogger().Warn("admin check table before drop",
zap.String("database", fullti.Schema.O),
zap.String("table", fullti.Name.O),
)
exec := ctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(context.TODO(), nil, "admin check table %n.%n", fullti.Schema.O, fullti.Name.O)
if err != nil {
return err
}
}

if tableInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Table")
}
case viewObject:
if !tableInfo.Meta().IsView() {
return dbterror.ErrWrongObject.GenWithStackByArgs(fullti.Schema, fullti.Name, "VIEW")
}
case sequenceObject:
if !tableInfo.Meta().IsSequence() {
err = dbterror.ErrWrongObject.GenWithStackByArgs(fullti.Schema, fullti.Name, "SEQUENCE")
if ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
continue
}
return err
}
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tableInfo.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: schema.State,
TableName: tableInfo.Meta().Name.L,
Type: jobType,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
notExistTables = append(notExistTables, fullti.String())
continue
} else if err != nil {
return errors.Trace(err)
}

// unlock table after drop
if tableObjectType != tableObject {
continue
}
if !config.TableLockEnabled() {
continue
}
if ok, _ := ctx.CheckTableLocked(tableInfo.Meta().ID); ok {
ctx.ReleaseTableLockByTableIDs([]int64{tableInfo.Meta().ID})
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
if err != nil {
return errors.Trace(err)
}
if !config.TableLockEnabled() {
return nil
if len(notExistTables) > 0 && !ifExists {
return dropExistErr.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
ctx.ReleaseTableLockByTableIDs([]int64{tb.Meta().ID})
// We need add warning when use if exists.
if len(notExistTables) > 0 && ifExists {
for _, table := range notExistTables {
sessVars.StmtCtx.AppendNote(dropExistErr.GenWithStackByArgs(table))
}
}
return nil
}

// DropView will proceed even if some view in the list does not exists.
func (d *ddl) DropView(ctx sessionctx.Context, ti ast.Ident) (err error) {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

if !tb.Meta().IsView() {
return dbterror.ErrWrongObject.GenWithStackByArgs(ti.Schema, ti.Name, "VIEW")
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: tb.Meta().State,
TableName: tb.Meta().Name.L,
Type: model.ActionDropView,
BinlogInfo: &model.HistoryInfo{},
}
// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
return d.dropTableObject(ctx, stmt.Tables, stmt.IfExists, tableObject)
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
// DropView will proceed even if some view in the list does not exists.
func (d *ddl) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
return d.dropTableObject(ctx, stmt.Tables, stmt.IfExists, viewObject)
}

func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
Expand Down Expand Up @@ -6676,34 +6777,8 @@ func (d *ddl) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt)
return errors.Trace(err)
}

func (d *ddl) DropSequence(ctx sessionctx.Context, ti ast.Ident, ifExists bool) (err error) {
schema, tbl, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

if !tbl.Meta().IsSequence() {
err = dbterror.ErrWrongObject.GenWithStackByArgs(ti.Schema, ti.Name, "SEQUENCE")
if ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
return err
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tbl.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: tbl.Meta().State,
TableName: tbl.Meta().Name.L,
Type: model.ActionDropSequence,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
func (d *ddl) DropSequence(ctx sessionctx.Context, stmt *ast.DropSequenceStmt) (err error) {
return d.dropTableObject(ctx, stmt.Sequences, stmt.IfExists, sequenceObject)
}

func (d *ddl) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, indexName model.CIStr, visibility ast.IndexVisibility) error {
Expand Down
Loading