Skip to content

Commit

Permalink
ddl: validate table info before doing ddl job to avoid load infoschem…
Browse files Browse the repository at this point in the history
…a error (#10464) (#10547)
  • Loading branch information
crazycs520 authored and winkyao committed May 20, 2019
1 parent cc0f1ce commit 329afda
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 5 deletions.
6 changes: 3 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
// none -> delete only
job.SchemaState = model.StateDeleteOnly
columnInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != columnInfo.State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
Expand Down Expand Up @@ -260,7 +260,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}
}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != colInfo.State)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
Expand Down Expand Up @@ -407,7 +407,7 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN
}
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
40 changes: 40 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,14 @@ func (s *testDBSuite) TestDropColumn(c *C) {
}
}

// Test for drop partition table column.
s.tk.MustExec("drop table if exists t1")
s.tk.MustExec("set @@tidb_enable_table_partition = 1")
s.tk.MustExec("create table t1 (a bigint, b int, c int generated always as (b+1)) partition by range(a) (partition p0 values less than (10));")
_, err := s.tk.Exec("alter table t1 drop column a")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[expression:1054]Unknown column 'a' in 'expression'")

s.tk.MustExec("drop database drop_col_db")
}

Expand Down Expand Up @@ -4684,3 +4692,35 @@ func (s *testDBSuite) TestAlterShardRowIDBits(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[autoid:1467]Failed to read auto-increment value from storage engine")
}

func (s *testDBSuite) TestDDLWithInvalidTableInfo(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
tk := s.tk

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("set @@tidb_enable_table_partition = 1")
// Test create with invalid expression.
_, err := s.tk.Exec(`CREATE TABLE t (
c0 int(11) ,
c1 int(11),
c2 decimal(16,4) GENERATED ALWAYS AS ((case when (c0 = 0) then 0 when (c0 > 0) then (c1 / c0) end))
);`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "line 1 column 26 near \" 0) THEN 0WHEN (`c0` > 0) THEN (`c1` / `c0`) END)\" (total length 75)")

tk.MustExec("create table t (a bigint, b int, c int generated always as (b+1)) partition by range(a) (partition p0 values less than (10));")
// Test drop partition column.
_, err = tk.Exec("alter table t drop column a;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[expression:1054]Unknown column 'a' in 'expression'")
// Test modify column with invalid expression.
_, err = tk.Exec("alter table t modify column c int GENERATED ALWAYS AS ((case when (a = 0) then 0 when (a > 0) then (b / a) end));")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]line 1 column 25 near \" 0) THEN 0WHEN (`a` > 0) THEN (`b` / `a`) END)\" (total length 71)")
// Test add column with invalid expression.
_, err = tk.Exec("alter table t add column d int GENERATED ALWAYS AS ((case when (a = 0) then 0 when (a > 0) then (b / a) end));")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:-1]line 1 column 25 near \" 0) THEN 0WHEN (`a` > 0) THEN (`b` / `a`) END)\" (total length 71)")
}
14 changes: 14 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/schemautil"
Expand Down Expand Up @@ -994,6 +995,12 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
return errors.Trace(err)
}

// checkTableInfoValid uses to check table info valid. This is used to validate table info.
func checkTableInfoValid(tblInfo *model.TableInfo) error {
_, err := tables.TableFromMeta(nil, tblInfo)
return err
}

func buildTableInfoWithLike(ident ast.Ident, referTblInfo *model.TableInfo) model.TableInfo {
tblInfo := *referTblInfo
// Check non-public column and adjust column offset.
Expand Down Expand Up @@ -1122,6 +1129,13 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
tbInfo.Partition = pi
}

tbInfo.State = model.StatePublic
err = checkTableInfoValid(tbInfo)
if err != nil {
return err
}
tbInfo.State = model.StateNone

job := &model.Job{
SchemaID: schema.ID,
TableID: tbInfo.ID,
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int
// none -> delete only
job.SchemaState = model.StateDeleteOnly
indexInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != indexInfo.State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
Expand Down
22 changes: 21 additions & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
// none -> public
tbInfo.State = model.StatePublic
tbInfo.UpdateTS = t.StartTS
err = t.CreateTable(schemaID, tbInfo)
err = createTableWithCheck(t, job, schemaID, tbInfo)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -83,6 +83,15 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
}
}

func createTableWithCheck(t *meta.Meta, job *model.Job, schemaID int64, tbInfo *model.TableInfo) error {
err := checkTableInfoValid(tbInfo)
if err != nil {
job.State = model.JobStateCancelled
return errors.Trace(err)
}
return t.CreateTable(schemaID, tbInfo)
}

func onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tableID := job.TableID
Expand Down Expand Up @@ -521,6 +530,17 @@ func checkTableNotExists(t *meta.Meta, job *model.Job, schemaID int64, tableName
return nil
}

// updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information
func updateVersionAndTableInfoWithCheck(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) (
ver int64, err error) {
err = checkTableInfoValid(tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return updateVersionAndTableInfo(t, job, tblInfo, shouldUpdateVer)
}

// updateVersionAndTableInfo updates the schema version and the table information.
func updateVersionAndTableInfo(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) (
ver int64, err error) {
Expand Down

0 comments on commit 329afda

Please sign in to comment.