diff --git a/kv/option.go b/kv/option.go index 7692c2f74a4a0..1b2f12dacd5a1 100644 --- a/kv/option.go +++ b/kv/option.go @@ -70,6 +70,9 @@ const ( // 2. Ranges of Retrievers do not intersect each other. // 3. Retrievers are sorted by range. SortedCustomRetrievers + // TableToColumnMaps is a map from tableID to a series of maps. The maps are needed when checking data consistency. + // Save them here to reduce redundant computations. + TableToColumnMaps ) // ReplicaReadType is the type of replica to read data from diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0c6bd9c718851..af8a7d5bf900d 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -947,7 +947,7 @@ type SessionVars struct { // MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash. MPPStoreFailTTL string - // cached is used to optimze the object allocation. + // cached is used to optimize the object allocation. cached struct { curr int8 data [2]stmtctx.StatementContext diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go new file mode 100644 index 0000000000000..c127945652389 --- /dev/null +++ b/table/tables/mutation_checker.go @@ -0,0 +1,314 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables + +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/rowcodec" + "go.uber.org/zap" +) + +type mutation struct { + key kv.Key + flags kv.KeyFlags + value []byte +} + +type columnMaps struct { + ColumnIDToInfo map[int64]*model.ColumnInfo + ColumnIDToFieldType map[int64]*types.FieldType + IndexIDToInfo map[int64]*model.IndexInfo + IndexIDToRowColInfos map[int64][]rowcodec.ColInfo +} + +// CheckIndexConsistency checks whether the given set of mutations corresponding to a single row is consistent. +// Namely, assume the database is consistent before, applying the mutations shouldn't break the consistency. +// It aims at reducing bugs that will corrupt data, and preventing mistakes from spreading if possible. +// +// The check doesn't work and just returns nil when: +// (1) the table is partitioned +// (2) new collation is enabled and restored data is needed +// +// How it works: +// +// Assume the set of row values changes from V1 to V2, we check +// (1) V2 - V1 = {added indices} +// (2) V1 - V2 = {deleted indices} +// +// To check (1), we need +// (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value +// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate +// the mutations, thus ignored. +func CheckIndexConsistency( + txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, + rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, +) error { + if t.Meta().GetPartitionInfo() != nil { + return nil + } + if sh == 0 { + // some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv + return nil + } + indexMutations, rowInsertion, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) + if err != nil { + return errors.Trace(err) + } + + columnMaps := getColumnMaps(txn, t) + + if rowToInsert != nil { + if err := checkRowInsertionConsistency( + sessVars, rowToInsert, rowInsertion, columnMaps.ColumnIDToInfo, columnMaps.ColumnIDToFieldType, + ); err != nil { + return errors.Trace(err) + } + } + if err := checkIndexKeys( + sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos, + ); err != nil { + return errors.Trace(err) + } + return nil +} + +// checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. +func checkIndexKeys( + sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, + indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, + indexIDToRowColInfos map[int64][]rowcodec.ColInfo, +) error { + + var indexData []types.Datum + for _, m := range indexMutations { + _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) + if err != nil { + return errors.Trace(err) + } + + indexInfo, ok := indexIDToInfo[indexID] + if !ok { + return errors.New("index not found") + } + rowColInfos, ok := indexIDToRowColInfos[indexID] + if !ok { + return errors.New("index not found") + } + + // when we cannot decode the key to get the original value + if len(m.value) == 0 && NeedRestoredData(indexInfo.Columns, t.Meta().Columns) { + continue + } + + decodedIndexValues, err := tablecodec.DecodeIndexKV( + m.key, m.value, len(indexInfo.Columns), tablecodec.HandleNotNeeded, rowColInfos, + ) + if err != nil { + return errors.Trace(err) + } + + // reuse the underlying memory, save an allocation + if indexData == nil { + indexData = make([]types.Datum, 0, len(decodedIndexValues)) + } else { + indexData = indexData[:0] + } + + for i, v := range decodedIndexValues { + fieldType := &t.Columns[indexInfo.Columns[i].Offset].FieldType + datum, err := tablecodec.DecodeColumnValue(v, fieldType, sessVars.Location()) + if err != nil { + return errors.Trace(err) + } + indexData = append(indexData, datum) + } + + if len(m.value) == 0 { + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo) + } else { + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo) + } + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +// checkRowInsertionConsistency checks whether the values of row mutations are consistent with the expected ones +// We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) +func checkRowInsertionConsistency( + sessVars *variable.SessionVars, rowToInsert []types.Datum, rowInsertion mutation, + columnIDToInfo map[int64]*model.ColumnInfo, columnIDToFieldType map[int64]*types.FieldType, +) error { + if rowToInsert == nil { + // it's a deletion + return nil + } + + decodedData, err := tablecodec.DecodeRowToDatumMap(rowInsertion.value, columnIDToFieldType, sessVars.Location()) + if err != nil { + return errors.Trace(err) + } + + // NOTE: we cannot check if the decoded values contain all columns since some columns may be skipped. It can even be empty + // Instead, we check that decoded index values are consistent with the input row. + + for columnID, decodedDatum := range decodedData { + inputDatum := rowToInsert[columnIDToInfo[columnID].Offset] + cmp, err := decodedDatum.CompareDatum(sessVars.StmtCtx, &inputDatum) + if err != nil { + return errors.Trace(err) + } + if cmp != 0 { + logutil.BgLogger().Error( + "inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), + zap.String("input datum", inputDatum.String()), + ) + return errors.Errorf( + "inconsistent row mutation, row datum = {%v}, input datum = {%v}", decodedDatum.String(), + inputDatum.String(), + ) + } + } + return nil +} + +// collectTableMutationsFromBufferStage collects mutations of the current table from the mem buffer stage +// It returns: (1) all index mutations (2) the only row insertion +// If there are no row insertions, the 2nd returned value is nil +// If there are multiple row insertions, an error is returned +func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) ( + []mutation, mutation, error, +) { + indexMutations := make([]mutation, 0) + var rowInsertion mutation + var err error + inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { + // only check the current table + if tablecodec.DecodeTableID(key) == t.physicalTableID { + m := mutation{key, flags, data} + if rowcodec.IsRowKey(key) { + if len(data) > 0 { + if rowInsertion.key == nil { + rowInsertion = m + } else { + err = errors.Errorf( + "multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m, + ) + } + } + } else { + indexMutations = append(indexMutations, m) + } + } + } + memBuffer.InspectStage(sh, inspector) + return indexMutations, rowInsertion, err +} + +// compareIndexData compares the decoded index data with the input data. +// Returns error if the index data is not a subset of the input data. +func compareIndexData( + sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo, +) error { + for i, decodedMutationDatum := range indexData { + expectedDatum := input[indexInfo.Columns[i].Offset] + + tablecodec.TruncateIndexValue( + &expectedDatum, indexInfo.Columns[i], + cols[indexInfo.Columns[i].Offset].ColumnInfo, + ) + tablecodec.TruncateIndexValue( + &decodedMutationDatum, indexInfo.Columns[i], + cols[indexInfo.Columns[i].Offset].ColumnInfo, + ) + + comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) + if err != nil { + return errors.Trace(err) + } + + if comparison != 0 { + logutil.BgLogger().Error( + "inconsistent index values", + zap.String("truncated mutation datum", fmt.Sprintf("%v", decodedMutationDatum)), + zap.String("truncated expected datum", fmt.Sprintf("%v", expectedDatum)), + ) + return errors.New("inconsistent index values") + } + } + return nil +} + +// getColumnMaps tries to get the columnMaps from transaction options. If there isn't one, it builds one and stores it. +// It saves redundant computations of the map. +func getColumnMaps(txn kv.Transaction, t *TableCommon) columnMaps { + getter := func() (map[int64]columnMaps, bool) { + m, ok := txn.GetOption(kv.TableToColumnMaps).(map[int64]columnMaps) + return m, ok + } + setter := func(maps map[int64]columnMaps) { + txn.SetOption(kv.TableToColumnMaps, maps) + } + columnMaps := getOrBuildColumnMaps(getter, setter, t) + return columnMaps +} + +// getOrBuildColumnMaps tries to get the columnMaps from some place. If there isn't one, it builds one and stores it. +// It saves redundant computations of the map. +func getOrBuildColumnMaps( + getter func() (map[int64]columnMaps, bool), setter func(map[int64]columnMaps), t *TableCommon, +) columnMaps { + tableMaps, ok := getter() + if !ok || tableMaps == nil { + tableMaps = make(map[int64]columnMaps) + } + maps, ok := tableMaps[t.tableID] + if !ok { + maps = columnMaps{ + make(map[int64]*model.ColumnInfo, len(t.Meta().Columns)), + make(map[int64]*types.FieldType, len(t.Meta().Columns)), + make(map[int64]*model.IndexInfo, len(t.Indices())), + make(map[int64][]rowcodec.ColInfo, len(t.Indices())), + } + + for _, col := range t.Meta().Columns { + maps.ColumnIDToInfo[col.ID] = col + maps.ColumnIDToFieldType[col.ID] = &col.FieldType + } + for _, index := range t.Indices() { + if index.Meta().Primary && t.meta.IsCommonHandle { + continue + } + maps.IndexIDToInfo[index.Meta().ID] = index.Meta() + maps.IndexIDToRowColInfos[index.Meta().ID] = BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()) + } + + tableMaps[t.tableID] = maps + setter(tableMaps) + } + return maps +} diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go new file mode 100644 index 0000000000000..ae41933f8522c --- /dev/null +++ b/table/tables/mutation_checker_test.go @@ -0,0 +1,306 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables + +import ( + "testing" + "time" + + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/rowcodec" + "github.com/stretchr/testify/require" +) + +func TestCompareIndexData(t *testing.T) { + // dimensions of the domain of compareIndexData + // 1. table structure, where we only care about column types that influence truncating values + // 2. comparison of row data & index data + + type caseData struct { + indexData []types.Datum + inputData []types.Datum + fts []*types.FieldType + indexLength []int + correct bool + } + + // assume the index is on all columns + testData := []caseData{ + { + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []*types.FieldType{types.NewFieldType(mysql.TypeShort), types.NewFieldType(mysql.TypeString)}, + []int{types.UnspecifiedLength, types.UnspecifiedLength}, + true, + }, + { + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string2")}, + []*types.FieldType{types.NewFieldType(mysql.TypeShort), types.NewFieldType(mysql.TypeString)}, + []int{types.UnspecifiedLength, types.UnspecifiedLength}, + false, + }, + { + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string2")}, + []*types.FieldType{types.NewFieldType(mysql.TypeShort), types.NewFieldType(mysql.TypeString)}, + []int{types.UnspecifiedLength, 11}, + true, + }, + } + + for caseID, data := range testData { + sc := &stmtctx.StatementContext{} + cols := make([]*table.Column, 0) + indexCols := make([]*model.IndexColumn, 0) + for i, ft := range data.fts { + cols = append(cols, &table.Column{ColumnInfo: &model.ColumnInfo{FieldType: *ft}}) + indexCols = append(indexCols, &model.IndexColumn{Offset: i, Length: data.indexLength[i]}) + } + indexInfo := &model.IndexInfo{Columns: indexCols} + + err := compareIndexData(sc, cols, data.indexData, data.inputData, indexInfo) + require.Equal(t, data.correct, err == nil, "case id = %v", caseID) + } +} + +func TestCheckRowInsertionConsistency(t *testing.T) { + sessVars := variable.NewSessionVars() + rd := rowcodec.Encoder{Enable: true} + + // mocked data + mockRowKey233 := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(233)) + mockValue233, err := tablecodec.EncodeRow( + sessVars.StmtCtx, []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd, + ) + require.Nil(t, err) + fakeRowInsertion := mutation{key: []byte{1, 1}, value: []byte{1, 1, 1}} + + type caseData struct { + columnIDToInfo map[int64]*model.ColumnInfo + columnIDToFieldType map[int64]*types.FieldType + rowToInsert []types.Datum + rowInsertion mutation + correct bool + } + + testData := []caseData{ + { + // expected correct behavior + map[int64]*model.ColumnInfo{ + 101: { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + map[int64]*types.FieldType{ + 101: types.NewFieldType(mysql.TypeShort), + }, + []types.Datum{types.NewIntDatum(233)}, + mutation{key: mockRowKey233, value: mockValue233}, + true, + }, + { + // mismatching mutation + map[int64]*model.ColumnInfo{ + 101: { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + map[int64]*types.FieldType{ + 101: types.NewFieldType(mysql.TypeShort), + }, + []types.Datum{types.NewIntDatum(1)}, + fakeRowInsertion, + false, + }, + { + // no input row + map[int64]*model.ColumnInfo{}, + map[int64]*types.FieldType{}, + nil, + fakeRowInsertion, + true, + }, + { + // invalid value + map[int64]*model.ColumnInfo{ + 101: { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + map[int64]*types.FieldType{ + 101: types.NewFieldType(mysql.TypeShort), + }, + []types.Datum{types.NewIntDatum(233)}, + mutation{key: mockRowKey233, value: []byte{0, 1, 2, 3}}, + false, + }, + } + + for caseID, data := range testData { + err := checkRowInsertionConsistency( + sessVars, data.rowToInsert, data.rowInsertion, data.columnIDToInfo, data.columnIDToFieldType, + ) + require.Equal(t, data.correct, err == nil, "case id = %v", caseID) + } +} + +func TestCheckIndexKeys(t *testing.T) { + // dimensions of the domain of checkIndexKeys: + // 1. location *2 + // 2. table structure + // (1) unique index/non-unique index *2 + // (2) clustered index *2 + // (3) string collation *2 + // We don't test primary clustered index and int handle, since they should not have index mutations. + + // cases + locations := []*time.Location{time.UTC, time.Local} + indexInfos := []*model.IndexInfo{ + { + ID: 1, + State: model.StatePublic, + Primary: false, + Unique: true, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Length: types.UnspecifiedLength, + }, + { + Offset: 0, + Length: types.UnspecifiedLength, + }, + }, + }, + { + ID: 2, + State: model.StatePublic, + Primary: false, + Unique: false, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Length: types.UnspecifiedLength, + }, + { + Offset: 0, + Length: types.UnspecifiedLength, + }, + }, + }, + } + columnInfoSets := [][]*model.ColumnInfo{ + { + {ID: 1, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeString)}, + {ID: 2, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeDatetime)}, + }, + { + {ID: 1, Offset: 0, FieldType: *types.NewFieldTypeWithCollation(mysql.TypeString, "big5_chinese_ci", + types.UnspecifiedLength)}, + {ID: 2, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeDatetime)}, + }, + } + sessVars := variable.NewSessionVars() + + now := types.CurrentTime(mysql.TypeDatetime) + rowToInsert := []types.Datum{ + types.NewStringDatum("some string"), + types.NewTimeDatum(now), + } + anotherTime, err := now.Add(sessVars.StmtCtx, types.NewDuration(24, 0, 0, 0, 0)) + require.Nil(t, err) + rowToRemove := []types.Datum{ + types.NewStringDatum("old string"), + types.NewTimeDatum(anotherTime), + } + + getter := func() (map[int64]columnMaps, bool) { + return nil, false + } + setter := func(maps map[int64]columnMaps) {} + + // test + collate.SetNewCollationEnabledForTest(true) + defer collate.SetNewCollationEnabledForTest(false) + for _, isCommonHandle := range []bool{true, false} { + for _, lc := range locations { + for _, columnInfos := range columnInfoSets { + sessVars.StmtCtx.TimeZone = lc + tableInfo := model.TableInfo{ + ID: 1, + Name: model.NewCIStr("t"), + Columns: columnInfos, + Indices: indexInfos, + PKIsHandle: false, + IsCommonHandle: isCommonHandle, + } + table := MockTableFromMeta(&tableInfo).(*TableCommon) + + for i, indexInfo := range indexInfos { + index := table.indices[i] + maps := getOrBuildColumnMaps(getter, setter, table) + insertionKey, insertionValue, err := buildKeyValue(index, rowToInsert, sessVars, tableInfo, indexInfo, table) + require.Nil(t, err) + deletionKey, _, err := buildKeyValue(index, rowToRemove, sessVars, tableInfo, indexInfo, table) + require.Nil(t, err) + indexMutations := []mutation{{key: insertionKey, value: insertionValue}, {key: deletionKey}} + err = checkIndexKeys( + sessVars, table, rowToInsert, rowToRemove, indexMutations, maps.IndexIDToInfo, + maps.IndexIDToRowColInfos, + ) + require.Nil(t, err) + } + } + } + } +} + +func buildKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *variable.SessionVars, + tableInfo model.TableInfo, indexInfo *model.IndexInfo, table *TableCommon) ([]byte, []byte, error) { + indexedValues, err := index.FetchValues(rowToInsert, nil) + if err != nil { + return nil, nil, err + } + key, distinct, err := tablecodec.GenIndexKey( + sessVars.StmtCtx, &tableInfo, indexInfo, 1, indexedValues, kv.IntHandle(1), nil, + ) + if err != nil { + return nil, nil, err + } + rsData := TryGetHandleRestoredDataWrapper(table, rowToInsert, nil, indexInfo) + value, err := tablecodec.GenIndexValuePortal( + sessVars.StmtCtx, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns), + distinct, false, indexedValues, kv.IntHandle(1), 0, rsData, + ) + if err != nil { + return nil, nil, err + } + return key, value, nil +} diff --git a/table/tables/tables.go b/table/tables/tables.go index dc4a7594d5924..c3537f5173f7c 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -420,6 +420,9 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if err = memBuffer.Set(key, value); err != nil { return err } + if err = CheckIndexConsistency(txn, sessVars, t, newData, oldData, memBuffer, sh); err != nil { + return errors.Trace(err) + } memBuffer.Release(sh) if shouldWriteBinlog(sctx, t.meta) { if !t.meta.PKIsHandle && !t.meta.IsCommonHandle { @@ -836,6 +839,10 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return h, err } + if err = CheckIndexConsistency(txn, sessVars, t, r, nil, memBuffer, sh); err != nil { + return nil, errors.Trace(err) + } + memBuffer.Release(sh) if shouldWriteBinlog(sctx, t.meta) { @@ -1047,15 +1054,20 @@ func GetChangingColVal(ctx sessionctx.Context, cols []*table.Column, col *table. // RemoveRecord implements table.Table RemoveRecord interface. func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - err := t.removeRowData(ctx, h) + txn, err := ctx.Txn(true) if err != nil { return err } - txn, err := ctx.Txn(true) + memBuffer := txn.GetMemBuffer() + sh := memBuffer.Staging() + defer memBuffer.Cleanup(sh) + + err = t.removeRowData(ctx, h) if err != nil { return err } + if m := t.Meta(); m.TempTableType != model.TempTableNone { if tmpTable := addTemporaryTable(ctx, m); tmpTable != nil { if err := checkTempTableSize(ctx, tmpTable, m); err != nil { @@ -1083,6 +1095,13 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } + sessVars := ctx.GetSessionVars() + sc := sessVars.StmtCtx + if err = CheckIndexConsistency(txn, sessVars, t, nil, r, memBuffer, sh); err != nil { + return errors.Trace(err) + } + memBuffer.Release(sh) + if shouldWriteBinlog(ctx, t.meta) { cols := t.Cols() colIDs := make([]int64, 0, len(cols)+1) @@ -1108,7 +1127,6 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return nil } colSize := make(map[int64]int64, len(t.Cols())) - sc := ctx.GetSessionVars().StmtCtx for id, col := range t.Cols() { size, err := codec.EstimateValueSize(sc, r[id]) if err != nil { diff --git a/util/rowcodec/row.go b/util/rowcodec/row.go index 2cafab6c875a9..0e6b1a9ede213 100644 --- a/util/rowcodec/row.go +++ b/util/rowcodec/row.go @@ -18,7 +18,7 @@ import ( "encoding/binary" ) -// row is the struct type used to access the a row. +// row is the struct type used to access a row. type row struct { // small: colID []byte, offsets []uint16, optimized for most cases. // large: colID []uint32, offsets []uint32.