Skip to content

Commit

Permalink
*: add foreign key constraint check when execute update statement
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed Sep 26, 2022
1 parent 4e4169b commit df27883
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 1 deletion.
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
tblColPosInfos: v.TblColPosInfos,
assignFlag: assignFlag,
}
updateExec.fkChecks, b.err = buildTblID2FKCheckExecs(b.ctx, tblID2table, v.FKChecks)
if b.err != nil {
return nil
}
return updateExec
}

Expand Down
12 changes: 12 additions & 0 deletions executor/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ type FKCheckExec struct {
checkRowsCache map[string]bool
}

func buildTblID2FKCheckExecs(sctx sessionctx.Context, tblID2Table map[int64]table.Table, tblID2FKChecks map[int64][]*plannercore.FKCheck) (map[int64][]*FKCheckExec, error) {
var err error
fkChecks := make(map[int64][]*FKCheckExec)
for tid, tbl := range tblID2Table {
fkChecks[tid], err = buildFKCheckExecs(sctx, tbl, tblID2FKChecks[tid])
if err != nil {
return nil, err
}
}
return fkChecks, nil
}

func buildFKCheckExecs(sctx sessionctx.Context, tbl table.Table, fkChecks []*plannercore.FKCheck) ([]*FKCheckExec, error) {
fkCheckExecs := make([]*FKCheckExec, 0, len(fkChecks))
for _, fkCheck := range fkChecks {
Expand Down
234 changes: 234 additions & 0 deletions executor/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,52 @@ func TestForeignKeyCheckAndLock(t *testing.T) {
err := tk.ExecToErr("commit")
require.Error(t, err)
require.Contains(t, err.Error(), "Write conflict")

// Test update child table
tk.MustExec("insert into t1 (id, name) values (1, 'a'), (2, 'b');")
tk.MustExec("insert into t2 (a, name) values (1, 'a');")
tk.MustExec("begin optimistic")
tk.MustExec("update t2 set a=2 where a = 1")
tk2.MustExec("delete from t1 where id = 2")
err = tk.ExecToErr("commit")
require.Error(t, err)
require.Contains(t, err.Error(), "Write conflict")
tk.MustQuery("select id, name from t1 order by name").Check(testkit.Rows("1 a"))
tk.MustQuery("select a, name from t2 order by name").Check(testkit.Rows("1 a"))

// Test in pessimistic txn
tk.MustExec("delete from t2")
// Test insert child table
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t2 (a, name) values (1, 'a');")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := tk2.ExecToErr("update t1 set id = 2 where id = 1")
require.Error(t, err)
require.Equal(t, "[planner:1451]Cannot delete or update a parent row: a foreign key constraint fails (`test`.`t2`, CONSTRAINT `fk` FOREIGN KEY (`a`) REFERENCES `t1` (`id`))", err.Error())
}()
tk.MustExec("commit")
wg.Wait()
tk.MustQuery("select id, name from t1 order by name").Check(testkit.Rows("1 a"))
tk.MustQuery("select a, name from t2 order by name").Check(testkit.Rows("1 a"))

// Test update child table
tk.MustExec("insert into t1 (id, name) values (2, 'b');")
tk.MustExec("begin pessimistic")
tk.MustExec("update t2 set a=2 where a = 1")
wg.Add(1)
go func() {
defer wg.Done()
err := tk2.ExecToErr("update t1 set id = 3 where id = 2")
require.Error(t, err)
require.Equal(t, "[planner:1451]Cannot delete or update a parent row: a foreign key constraint fails (`test`.`t2`, CONSTRAINT `fk` FOREIGN KEY (`a`) REFERENCES `t1` (`id`))", err.Error())
}()
tk.MustExec("commit")
wg.Wait()
tk.MustQuery("select id, name from t1 order by name").Check(testkit.Rows("1 a", "2 b"))
tk.MustQuery("select a, name from t2 order by name").Check(testkit.Rows("2 a"))
}
}

Expand Down Expand Up @@ -443,6 +489,22 @@ func TestForeignKey(t *testing.T) {
tk.MustExec("insert into t2 (id, a, b) values (2, 22, 222);")
tk.MustGetDBError("insert into t3 (id, a, b) values (1, 1, 1)", plannercore.ErrNoReferencedRow2)
tk.MustGetDBError("insert into t3 (id, a, b) values (2, 3, 2)", plannercore.ErrNoReferencedRow2)
tk.MustExec("insert into t3 (id, a, b) values (0, 1, 2);")
tk.MustExec("insert into t3 (id, a, b) values (1, 2, 2);")
tk.MustGetDBError("update t3 set a=3 where a=1", plannercore.ErrNoReferencedRow2)
tk.MustGetDBError("update t3 set b=4 where id=1", plannercore.ErrNoReferencedRow2)

// Test table has been referenced by more than tables.
tk.MustExec("drop table if exists t3,t2,t1;")
tk.MustExec("create table t1 (id int, a int, b int, primary key (id));")
tk.MustExec("create table t2 (b int, a int, id int, primary key (a), foreign key (a) references t1(id));")
tk.MustExec("create table t3 (b int, a int, id int, primary key (a), foreign key (a) references t1(id));")
tk.MustExec("insert into t1 (id, a, b) values (1, 1, 1);")
tk.MustExec("insert into t2 (id, a, b) values (1, 1, 1);")
tk.MustExec("insert into t3 (id, a, b) values (1, 1, 1);")
tk.MustGetDBError(" update t1 set id=2 where id = 1", plannercore.ErrRowIsReferenced2)
tk.MustExec(" update t1 set a=2 where id = 1")
tk.MustExec(" update t1 set b=2 where id = 1")
}

func TestForeignKeyConcurrentInsertChildTable(t *testing.T) {
Expand Down Expand Up @@ -472,3 +534,175 @@ func TestForeignKeyConcurrentInsertChildTable(t *testing.T) {
}
wg.Wait()
}

func TestForeignKeyOnUpdateChildTable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1")
tk.MustExec("use test")

for _, ca := range foreignKeyTestCase1 {
tk.MustExec("drop table if exists t2;")
tk.MustExec("drop table if exists t1;")
for _, sql := range ca.prepareSQLs {
tk.MustExec(sql)
}
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a')")

sqls := []string{
"update t2 set a=100, b = 200 where id = 1",
"update t2 set a=a+10, b = b+20 where a = 11",
"update t2 set a=a+100, b = b+200",
"update t2 set a=12, b = 23 where id = 1",
}
for _, sqlStr := range sqls {
err := tk.ExecToErr(sqlStr)
require.NotNil(t, err)
require.True(t, plannercore.ErrNoReferencedRow2.Equal(err))
}
tk.MustExec("update t2 set a=12, b = 22 where id = 1")
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 12 22 a"))
if !ca.notNull {
tk.MustExec("update t2 set a=null, b = 22 where a = 12 ")
tk.MustExec("update t2 set b = null where b = 22 ")
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 <nil> <nil> a"))
}
tk.MustExec("update t2 set a=13, b=23 where id = 1")
tk.MustQuery("select id, a, b, name from t2").Check(testkit.Rows("1 13 23 a"))

// Test In txn.
tk.MustExec("delete from t2")
tk.MustExec("delete from t1")
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a')")
tk.MustExec("begin")
tk.MustExec("update t2 set a=12, b=22 where id=1")
tk.MustExec("rollback")

tk.MustExec("begin")
tk.MustExec("delete from t1 where id=2")
err := tk.ExecToErr("update t2 set a=12, b=22 where id=1")
require.NotNil(t, err)
require.True(t, plannercore.ErrNoReferencedRow2.Equal(err))
tk.MustExec("update t2 set a=13, b=23 where id=1")
tk.MustExec("insert into t1 (id, a, b) values (5, 15, 25)")
tk.MustExec("update t2 set a=15, b=25 where id=1")
tk.MustExec("delete from t1 where id=1")
err = tk.ExecToErr("update t2 set a=11, b=21 where id=1")
require.NotNil(t, err)
require.True(t, plannercore.ErrNoReferencedRow2.Equal(err))
tk.MustExec("commit")
tk.MustQuery("select id, a, b, name from t2").Check(testkit.Rows("1 15 25 a"))
}

// Case-9: test primary key is handle and contain foreign key column.
tk.MustExec("drop table if exists t2;")
tk.MustExec("drop table if exists t1;")
tk.MustExec("set @@tidb_enable_clustered_index=0;")
tk.MustExec("create table t1 (id int, a int, b int, primary key (id));")
tk.MustExec("create table t2 (b int, a int, id int, name varchar(10), primary key (a), foreign key fk(a) references t1(id));")
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
tk.MustExec("insert into t2 (id, a, b, name) values (11, 1, 21, 'a')")
tk.MustExec("update t2 set a = 2 where id = 11")
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 2 21 a"))
tk.MustExec("update t2 set a = 3 where id = 11")
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 3 21 a"))
tk.MustExec("update t2 set b=b+1 where id = 11")
tk.MustQuery("select id, a, b , name from t2 order by id").Check(testkit.Rows("11 3 22 a"))
tk.MustExec("update t2 set id = 1 where id = 11")
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 22 a"))
err := tk.ExecToErr("update t2 set a = 10 where id = 1")
require.NotNil(t, err)
require.True(t, plannercore.ErrNoReferencedRow2.Equal(err))
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 3 22 a"))

// Test In txn.
tk.MustExec("delete from t2")
tk.MustExec("delete from t1")
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
tk.MustExec("insert into t2 (id, a, b, name) values (1, 1, 21, 'a')")
tk.MustExec("begin")
tk.MustExec("update t2 set a=2, b=22 where id=1")
tk.MustExec("rollback")

tk.MustExec("begin")
tk.MustExec("delete from t1 where id=2")
err = tk.ExecToErr("update t2 set a=2, b=22 where id=1")
require.NotNil(t, err)
require.True(t, plannercore.ErrNoReferencedRow2.Equal(err))
tk.MustExec("update t2 set a=3, b=23 where id=1")
tk.MustExec("insert into t1 (id, a, b) values (5, 15, 25)")
tk.MustExec("update t2 set a=5, b=25 where id=1")
tk.MustExec("delete from t1 where id=1")
err = tk.ExecToErr("update t2 set a=1, b=21 where id=1")
require.NotNil(t, err)
require.True(t, plannercore.ErrNoReferencedRow2.Equal(err))
tk.MustExec("commit")
tk.MustQuery("select id, a, b, name from t2").Check(testkit.Rows("1 5 25 a"))
}

func TestForeignKeyOnUpdateParentTableCheck(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1")
tk.MustExec("use test")

for _, ca := range foreignKeyTestCase1 {
tk.MustExec("drop table if exists t2;")
tk.MustExec("drop table if exists t1;")
for _, sql := range ca.prepareSQLs {
tk.MustExec(sql)
}
if !ca.notNull {
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24), (5, 15, null), (6, null, 26), (7, null, null);")
tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a'), (5, 15, null, 'e'), (6, null, 26, 'f'), (7, null, null, 'g');")

tk.MustExec("update t1 set a=a+100, b = b+200 where id = 2")
tk.MustExec("update t1 set a=a+1000, b = b+2000 where a = 13 or b=222")
tk.MustExec("update t1 set a=a+10000, b = b+20000 where id = 5 or a is null or b is null")
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24", "5 10015 <nil>", "6 <nil> 20026", "7 <nil> <nil>"))
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a", "5 15 <nil> e", "6 <nil> 26 f", "7 <nil> <nil> g"))

err := tk.ExecToErr("update t1 set a=a+10, b = b+20 where id = 1 or a = 1112 or b = 24")
require.NotNil(t, err)
require.True(t, plannercore.ErrRowIsReferenced2.Equal(err))
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24", "5 10015 <nil>", "6 <nil> 20026", "7 <nil> <nil>"))
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a", "5 15 <nil> e", "6 <nil> 26 f", "7 <nil> <nil> g"))
} else {
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
tk.MustExec("insert into t2 (id, a, b, name) values (1, 11, 21, 'a');")

tk.MustExec("update t1 set a=a+100, b = b+200 where id = 2")
tk.MustExec("update t1 set a=a+1000, b = b+2000 where a = 13 or b=222")
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24"))
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a"))

err := tk.ExecToErr("update t1 set a=a+10, b = b+20 where id = 1 or a = 1112 or b = 24")
require.NotNil(t, err)
require.True(t, plannercore.ErrRowIsReferenced2.Equal(err))
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "2 1112 2222", "3 1013 2023", "4 14 24"))
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("1 11 21 a"))
}

}

// Case-9: test primary key is handle and contain foreign key column.
tk.MustExec("drop table if exists t2;")
tk.MustExec("drop table if exists t1;")
tk.MustExec("set @@tidb_enable_clustered_index=0;")
tk.MustExec("create table t1 (id int, a int, b int, primary key (id));")
tk.MustExec("create table t2 (b int, a int, id int, name varchar(10), primary key (a), foreign key fk(a) references t1(id));")
tk.MustExec("insert into t1 (id, a, b) values (1, 11, 21),(2, 12, 22), (3, 13, 23), (4, 14, 24)")
tk.MustExec("insert into t2 (id, a, b, name) values (11, 1, 21, 'a')")
tk.MustExec("update t1 set id = id + 100 where id =2 or a = 13")
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "4 14 24", "102 12 22", "103 13 23"))

err := tk.ExecToErr("update t1 set id = id+10 where id = 1 or b = 24")
require.NotNil(t, err)
require.True(t, plannercore.ErrRowIsReferenced2.Equal(err))
tk.MustQuery("select id, a, b from t1 order by id").Check(testkit.Rows("1 11 21", "4 14 24", "102 12 22", "103 13 23"))
tk.MustQuery("select id, a, b, name from t2 order by id").Check(testkit.Rows("11 1 21 a"))
}
14 changes: 13 additions & 1 deletion executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type UpdateExec struct {
tableUpdatable []bool
changed []bool
matches []bool
// fkChecks contains the foreign key checkers. the map is tableID -> []*FKCheckExec
fkChecks map[int64][]*FKCheckExec
}

// prepare `handles`, `tableUpdatable`, `changed` to avoid re-computations.
Expand Down Expand Up @@ -191,7 +193,8 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n
flags := bAssignFlag[content.Start:content.End]

// Update row
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker, nil)
fkChecks := e.fkChecks[content.TblID]
changed, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker, fkChecks)
if err1 == nil {
_, exist := e.updatedRowKeys[content.Start].Get(handle)
memDelta := e.updatedRowKeys[content.Start].Set(handle, changed)
Expand Down Expand Up @@ -537,3 +540,12 @@ func (e *updateRuntimeStats) Merge(other execdetails.RuntimeStats) {
func (e *updateRuntimeStats) Tp() int {
return execdetails.TpUpdateRuntimeStats
}

// GetFKChecks implements WithForeignKeyTrigger interface.
func (e *UpdateExec) GetFKChecks() []*FKCheckExec {
fkChecks := []*FKCheckExec{}
for _, fkcs := range e.fkChecks {
fkChecks = append(fkChecks, fkcs...)
}
return fkChecks
}
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ type Update struct {
PartitionedTable []table.PartitionedTable

tblID2Table map[int64]table.Table

FKChecks map[int64][]*FKCheck
}

// Delete represents a delete plan.
Expand Down
Loading

0 comments on commit df27883

Please sign in to comment.