Skip to content

Commit

Permalink
tables: fix insert ignore on duplicate with dup prefix 2nd index
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu committed Jul 2, 2021
1 parent 0b5c154 commit 1cce3e9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
2 changes: 1 addition & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if sc.DupKeyAsWarning {
// For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`
// If the new handle or unique index exists, this will avoid to remove the record.
err = tables.CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx, sctx, t, newHandle, newData, modified)
err = tables.CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx, sctx, t, newHandle, newData, oldData, modified)
if err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
return false, nil
Expand Down
6 changes: 6 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,12 @@ func (s *testSuite4) TestInsertIgnoreOnDup(c *C) {
tk.MustQuery("select * from t6").Check(testkit.Rows("100 10 20"))
tk.MustExec("insert ignore into t6 set a = 200, b= 10 on duplicate key update c = 1000")
tk.MustQuery("select * from t6").Check(testkit.Rows("100 10 1000"))

tk.MustExec("drop table if exists t8")
tk.MustExec("CREATE TABLE `t8` (`col_70` varbinary(444) NOT NULL DEFAULT 'bezhs', PRIMARY KEY (`col_70`) clustered, UNIQUE KEY `idx_22` (`col_70`(1)))")
tk.MustExec("insert into t8 values('lldcxiyfjrqzgj')")
tk.MustExec("insert ignore into t8 values ( 'lalozlkdosasfklmflo' ) on duplicate key update col_70 = 'lyhohxtby'")
tk.MustQuery("select * from t8").Check(testkit.Rows("lyhohxtby"))
}

func (s *testSuite4) TestInsertSetWithDefault(c *C) {
Expand Down
32 changes: 26 additions & 6 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tables

import (
"context"
"github.com/pingcap/parser/charset"
"math"
"strconv"
"strings"
Expand Down Expand Up @@ -1449,7 +1450,7 @@ func FindIndexByColName(t table.Table, name string) table.Index {

// CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore check whether recordID key or unique index key exists. if not exists, return nil,
// otherwise return kv.ErrKeyExists error.
func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, newRow []types.Datum, modified []bool) error {
func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, newRow []types.Datum, oldRow []types.Datum, modified []bool) error {
physicalTableID := t.Meta().ID
if pt, ok := t.(*partitionedTable); ok {
info := t.Meta().GetPartitionInfo()
Expand Down Expand Up @@ -1480,20 +1481,39 @@ func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.C

// Check unique key exists.
{
shouldSkipIgnoreCheck := func(idx table.Index) bool {
shouldSkipIgnoreCheck := func(idx table.Index) (bool, error) {
if !IsIndexWritable(idx) || !idx.Meta().Unique || (t.Meta().IsCommonHandle && idx.Meta().Primary) {
return true
return true, nil
}
for _, c := range idx.Meta().Columns {
if modified[c.Offset] {
return false
if c.Length != types.UnspecifiedLength && (newRow[c.Offset].Kind() == types.KindString || newRow[c.Offset].Kind() == types.KindBytes) {
newCol := newRow[c.Offset].Clone()
tablecodec.TruncateIndexValue(newCol, c, t.Meta().Columns[c.Offset])
oldCol := oldRow[c.Offset].Clone()
tablecodec.TruncateIndexValue(oldCol, c, t.Meta().Columns[c.Offset])
// We should use binary collation to compare datum, otherwise the result will be incorrect.
newCol.SetCollation(charset.CollationBin)
cmp, err := newCol.CompareDatum(sctx.GetSessionVars().StmtCtx, oldCol)
if err != nil {
return false, errors.Trace(err)
}
if cmp == 0 {
continue
}
}
return false, nil
}
}
return true
return true, nil
}

for _, idx := range t.Indices() {
if shouldSkipIgnoreCheck(idx) {
skip, err := shouldSkipIgnoreCheck(idx)
if err != nil {
return err
}
if skip {
continue
}
newVals, err := idx.FetchValues(newRow, nil)
Expand Down

0 comments on commit 1cce3e9

Please sign in to comment.