From aceb3d0cf5d6ba0cef3561f11e0c00ee70b8db47 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 27 Jul 2021 22:59:53 +0800 Subject: [PATCH 01/21] feat: check index key in AddRecord Signed-off-by: ekexium --- table/tables/index.go | 3 + table/tables/tables.go | 100 +++++++++++++++++++++++++++++++++- tablecodec/tablecodec.go | 6 ++ tablecodec/tablecodec_test.go | 1 + 4 files changed, 108 insertions(+), 2 deletions(-) diff --git a/table/tables/index.go b/table/tables/index.go index b592c894f74ad..f529ada5af318 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -16,6 +16,8 @@ package tables import ( "context" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" "io" "sync" "time" @@ -155,6 +157,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue writeBufs := vars.GetWriteStmtBufs() skipCheck := vars.StmtCtx.BatchCheck key, distinct, err := c.GenIndexKey(vars.StmtCtx, indexedValues, h, writeBufs.IndexKeyBuf) + logutil.BgLogger().Warn("index.create", zap.ByteString("key", key)) if err != nil { return nil, err } diff --git a/table/tables/tables.go b/table/tables/tables.go index dc4a7594d5924..9410a90e043a5 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -20,6 +20,10 @@ package tables import ( "context" + "fmt" + "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tipb/go-tipb" "math" "strconv" "strings" @@ -48,8 +52,6 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" - "github.com/pingcap/tipb/go-binlog" - "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -836,6 +838,97 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return h, err } + // double check the data consistency + type Mutation = struct { + key kv.Key + flags kv.KeyFlags + value []byte + } + mutations := make([]Mutation, 0) + inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { + mutations = append(mutations, Mutation{key, flags, data}) + } + memBuffer.InspectStage(sh, inspector) + + // get the handle + handles := make([]kv.Handle, 0) + for _, m := range mutations { + handle, err := tablecodec.DecodeRowKey(m.key) + if err != nil { + handles = append(handles, handle) + } + } + logutil.BgLogger().Info("handles in AddRecord", zap.Any("handles", handles)) + //handle := handles[0] + + // index_id -> index + type IndexHelperInfo = struct { + indexInfo *model.IndexInfo + colInfos []rowcodec.ColInfo + } + indexIdMap := make(map[int64]IndexHelperInfo) + for _, index := range t.indices { + indexIdMap[index.Meta().ID] = IndexHelperInfo{ + index.Meta(), + BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()), + } + } + + // check each mutation + for _, m := range mutations { + if !tablecodec.IsIndexKey(m.key) { + continue + } + + tableId, indexId, _, err := tablecodec.DecodeIndexKey(m.key) + if err != nil { + continue + } + if tableId != t.tableID { + logutil.BgLogger().Info("different table id", zap.Int64("expected", t.tableID), zap.Int64("in mutation", tableId)) + continue + } + + indexHelperInfo, ok := indexIdMap[indexId] + if !ok { + return nil, errors.New("index not found") + } + + colInfos := BuildRowcodecColInfoForIndexColumns(indexHelperInfo.indexInfo, t.Meta()) + decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexHelperInfo.indexInfo.Columns), tablecodec.HandleNotNeeded, colInfos) + if err != nil { + return nil, errors.Trace(err) + } + indexData := make([]types.Datum, 0) + for i, v := range decodedIndexValues { + d, err := tablecodec.DecodeColumnValue(v, &t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].FieldType, sessVars.TimeZone) + if err != nil { + return nil, errors.Trace(err) + } + indexData = append(indexData, d) + logutil.BgLogger().Warn("decoded index value", zap.String("datum", d.String())) + } + + for i, decodedMutationDatum := range indexData { + expectedDatum := r[indexHelperInfo.indexInfo.Columns[i].Offset] + // FIXME: should we truncate index? + //tablecodec.TruncateIndexValue(&expectedDatum, indexHelperInfo.indexInfo.Columns[i], + // t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].ColumnInfo) + + comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) + if err != nil { + return nil, errors.Trace(err) + } + + if comparison != 0 { + logutil.BgLogger().Error("inconsistent index values", + zap.String("mutation datum", fmt.Sprintf("%v", decodedMutationDatum)), + zap.String("input datum", fmt.Sprintf("%v", expectedDatum))) + return nil, errors.New("inconsistent index values") + } + } + } + memBuffer.Release(sh) if shouldWriteBinlog(sctx, t.meta) { @@ -906,6 +999,9 @@ func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r dupErr = kv.ErrKeyExists.FastGenByArgs(entryKey, idxMeta.Name.String()) } rsData := TryGetHandleRestoredDataWrapper(t, r, nil, v.Meta()) + for _, datum := range indexVals { + logutil.BgLogger().Warn("creating index", zap.String("datum", datum.String())) + } if dupHandle, err := v.Create(sctx, txn, indexVals, recordID, rsData, opts...); err != nil { if kv.ErrKeyExists.Equal(err) { return dupHandle, dupErr diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 2fca105e8e259..2b7ff17768b44 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -324,6 +324,12 @@ func EncodeOldRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int6 return codec.EncodeValue(sc, valBuf, values...) } +func Flatten(sc *stmtctx.StatementContext, data types.Datum) (types.Datum, error) { + var ret types.Datum + err := flatten(sc, data, &ret) + return ret, err +} + func flatten(sc *stmtctx.StatementContext, data types.Datum, ret *types.Datum) error { switch data.Kind() { case types.KindMysqlTime: diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index af440cdf555f4..f1054887cb472 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -478,6 +478,7 @@ func TestDecodeIndexKey(t *testing.T) { str = fmt.Sprintf("%d-%v", v.Kind(), v.GetValue()) } valueStrs = append(valueStrs, str) + fmt.Println(valueStrs) } sc := &stmtctx.StatementContext{TimeZone: time.UTC} encodedValue, err := codec.EncodeKey(sc, nil, values...) From d01995bddfc652bbd2e10c474bef6843bb48fa7b Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 5 Aug 2021 00:58:32 +0800 Subject: [PATCH 02/21] feat: check for AddRecord/UpdateRecord Signed-off-by: ekexium --- table/tables/index.go | 3 - table/tables/mutation_checker.go | 188 +++++++++++++++++++++++++++++++ table/tables/tables.go | 103 ++--------------- tablecodec/tablecodec.go | 6 - tablecodec/tablecodec_test.go | 1 - 5 files changed, 195 insertions(+), 106 deletions(-) create mode 100644 table/tables/mutation_checker.go diff --git a/table/tables/index.go b/table/tables/index.go index f529ada5af318..b592c894f74ad 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -16,8 +16,6 @@ package tables import ( "context" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" "io" "sync" "time" @@ -157,7 +155,6 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue writeBufs := vars.GetWriteStmtBufs() skipCheck := vars.StmtCtx.BatchCheck key, distinct, err := c.GenIndexKey(vars.StmtCtx, indexedValues, h, writeBufs.IndexKeyBuf) - logutil.BgLogger().Warn("index.create", zap.ByteString("key", key)) if err != nil { return nil, err } diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go new file mode 100644 index 0000000000000..ddfe5150bae14 --- /dev/null +++ b/table/tables/mutation_checker.go @@ -0,0 +1,188 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables + +import ( + "fmt" + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/rowcodec" + "go.uber.org/zap" +) + +type Mutation = struct { + key kv.Key + flags kv.KeyFlags + value []byte +} + +type IndexHelperInfo = struct { + indexInfo *model.IndexInfo + colInfos []rowcodec.ColInfo +} + +// CheckIndexConsistency checks whether the given set of mutations corresponding to a single row is consistent. +// Namely, assume the database is consistent before, applying the mutations shouldn't break the consistency. +// It aims at reducing bugs that will corrupt data, and prevent mistakes from spreading if possible. +// +// Assume the set of row values changes from V1 to V2. We check +// (1) V2 - V1 = {added indices} +// (2) V1 - V2 = {deleted indices} +// +// To check (1), we need +// (a) {added indices} is a subset of {needed indices} <=> each index mutation is consistent with the input/row key/value +// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we calculate +// the mutations, thus ignored. +func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, + dataAdded, dataRemoved []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { + // collect mutations + mutations := make([]Mutation, 0) + inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { + // TODO: shall we check only the current table, or all tables involved? + if tablecodec.DecodeTableID(key) == t.physicalTableID { + mutations = append(mutations, Mutation{key, flags, data}) + } + } + memBuffer.InspectStage(sh, inspector) + + // get the handle + handlesAdded, handlesRemoved := ExtractHandles(mutations, t) + if len(handlesAdded) > 1 || len(handlesRemoved) > 1 { + // TODO: is it possible? + logutil.BgLogger().Error("multiple handles added/mutated", zap.Any("handlesAdded", handlesAdded), + zap.Any("handlesRemoved", handlesRemoved)) + return errors.New("multiple handles added/mutated") + } + + // 1. TODO: compare handlesAdded vs. dataAdded, handlesRemoved vs. dataRemoved + indexIdMap := make(map[int64]IndexHelperInfo) + for _, index := range t.indices { + indexIdMap[index.Meta().ID] = IndexHelperInfo{ + index.Meta(), + BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()), + } + } + + // 2. check index keys: consistent with input data + for _, m := range mutations { + if !tablecodec.IsIndexKey(m.key) { + continue + } + + _, indexId, _, err := tablecodec.DecodeIndexKey(m.key) + if err != nil { + continue + } + + indexHelperInfo, ok := indexIdMap[indexId] + if !ok { + return errors.New("index not found") + } + + colInfos := BuildRowcodecColInfoForIndexColumns(indexHelperInfo.indexInfo, t.Meta()) + + if len(m.value) == 0 { + // FIXME: for a delete index mutation, we cannot know the value of it. + // When new collation is enabled, we cannot decode value from the key. + // => ignore it for now + continue + } + decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexHelperInfo.indexInfo.Columns), + tablecodec.HandleNotNeeded, colInfos) + if err != nil { + return errors.Trace(err) + } + indexData := make([]types.Datum, 0) + for i, v := range decodedIndexValues { + d, err := tablecodec.DecodeColumnValue(v, &t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].FieldType, sessVars.TimeZone) + if err != nil { + return errors.Trace(err) + } + indexData = append(indexData, d) + logutil.BgLogger().Warn("decoded index value", zap.String("datum", d.String())) + } + + // TODO: when is it nil? + if m.value == nil { + continue + } + + if len(m.value) == 0 { + err = compareIndexData(sc, t, indexData, dataRemoved, indexHelperInfo) + } else { + err = compareIndexData(sc, t, indexData, dataAdded, indexHelperInfo) + } + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, input []types.Datum, indexHelperInfo IndexHelperInfo) error { + for i, decodedMutationDatum := range indexData { + expectedDatum := input[indexHelperInfo.indexInfo.Columns[i].Offset] + + tablecodec.TruncateIndexValue(&expectedDatum, indexHelperInfo.indexInfo.Columns[i], + t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].ColumnInfo) + + tablecodec.TruncateIndexValue(&decodedMutationDatum, indexHelperInfo.indexInfo.Columns[i], + t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].ColumnInfo) + + comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) + if err != nil { + return errors.Trace(err) + } + + if comparison != 0 { + logutil.BgLogger().Error("inconsistent index values", + zap.String("truncated mutation datum", fmt.Sprintf("%v", decodedMutationDatum)), + zap.String("truncated expected datum", fmt.Sprintf("%v", expectedDatum))) + return errors.New("inconsistent index values") + } + } + return nil +} + +// ExtractHandles extract handles of row mutations and classify them into 2 categories: put and delete +func ExtractHandles(mutations []Mutation, t *TableCommon) ([]kv.Handle, []kv.Handle) { + handlesAdded := make([]kv.Handle, 0) + handlesRemoved := make([]kv.Handle, 0) + for _, m := range mutations { + handle, err := tablecodec.DecodeRowKey(m.key) + if err != nil { + // TODO: remove it later + logutil.BgLogger().Warn("decode row key failed", zap.Error(err)) + continue + } + + // TODO: distinguish between nil and empty value + if m.value == nil { + logutil.BgLogger().Warn("row.value = nil", zap.String("handle", handle.String())) + continue + } + if len(m.value) > 0 { + handlesAdded = append(handlesAdded, handle) + } else { + handlesRemoved = append(handlesRemoved, handle) + } + } + return handlesAdded, handlesRemoved +} diff --git a/table/tables/tables.go b/table/tables/tables.go index 9410a90e043a5..ae1434471e652 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -20,10 +20,6 @@ package tables import ( "context" - "fmt" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/pingcap/tipb/go-binlog" - "github.com/pingcap/tipb/go-tipb" "math" "strconv" "strings" @@ -52,6 +48,8 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" + "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -422,6 +420,9 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if err = memBuffer.Set(key, value); err != nil { return err } + if err = CheckIndexConsistency(sc, sessVars, t, newData, oldData, memBuffer, sh); err != nil { + return errors.Trace(err) + } memBuffer.Release(sh) if shouldWriteBinlog(sctx, t.meta) { if !t.meta.PKIsHandle && !t.meta.IsCommonHandle { @@ -838,95 +839,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return h, err } - // double check the data consistency - type Mutation = struct { - key kv.Key - flags kv.KeyFlags - value []byte - } - mutations := make([]Mutation, 0) - inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { - mutations = append(mutations, Mutation{key, flags, data}) - } - memBuffer.InspectStage(sh, inspector) - - // get the handle - handles := make([]kv.Handle, 0) - for _, m := range mutations { - handle, err := tablecodec.DecodeRowKey(m.key) - if err != nil { - handles = append(handles, handle) - } - } - logutil.BgLogger().Info("handles in AddRecord", zap.Any("handles", handles)) - //handle := handles[0] - - // index_id -> index - type IndexHelperInfo = struct { - indexInfo *model.IndexInfo - colInfos []rowcodec.ColInfo - } - indexIdMap := make(map[int64]IndexHelperInfo) - for _, index := range t.indices { - indexIdMap[index.Meta().ID] = IndexHelperInfo{ - index.Meta(), - BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()), - } - } - - // check each mutation - for _, m := range mutations { - if !tablecodec.IsIndexKey(m.key) { - continue - } - - tableId, indexId, _, err := tablecodec.DecodeIndexKey(m.key) - if err != nil { - continue - } - if tableId != t.tableID { - logutil.BgLogger().Info("different table id", zap.Int64("expected", t.tableID), zap.Int64("in mutation", tableId)) - continue - } - - indexHelperInfo, ok := indexIdMap[indexId] - if !ok { - return nil, errors.New("index not found") - } - - colInfos := BuildRowcodecColInfoForIndexColumns(indexHelperInfo.indexInfo, t.Meta()) - decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexHelperInfo.indexInfo.Columns), tablecodec.HandleNotNeeded, colInfos) - if err != nil { - return nil, errors.Trace(err) - } - indexData := make([]types.Datum, 0) - for i, v := range decodedIndexValues { - d, err := tablecodec.DecodeColumnValue(v, &t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].FieldType, sessVars.TimeZone) - if err != nil { - return nil, errors.Trace(err) - } - indexData = append(indexData, d) - logutil.BgLogger().Warn("decoded index value", zap.String("datum", d.String())) - } - - for i, decodedMutationDatum := range indexData { - expectedDatum := r[indexHelperInfo.indexInfo.Columns[i].Offset] - // FIXME: should we truncate index? - //tablecodec.TruncateIndexValue(&expectedDatum, indexHelperInfo.indexInfo.Columns[i], - // t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].ColumnInfo) - - comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) - if err != nil { - return nil, errors.Trace(err) - } - - if comparison != 0 { - logutil.BgLogger().Error("inconsistent index values", - zap.String("mutation datum", fmt.Sprintf("%v", decodedMutationDatum)), - zap.String("input datum", fmt.Sprintf("%v", expectedDatum))) - return nil, errors.New("inconsistent index values") - } - } + if err = CheckIndexConsistency(sc, sessVars, t, r, nil, memBuffer, sh); err != nil { + return nil, errors.Trace(err) } memBuffer.Release(sh) @@ -999,9 +913,6 @@ func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r dupErr = kv.ErrKeyExists.FastGenByArgs(entryKey, idxMeta.Name.String()) } rsData := TryGetHandleRestoredDataWrapper(t, r, nil, v.Meta()) - for _, datum := range indexVals { - logutil.BgLogger().Warn("creating index", zap.String("datum", datum.String())) - } if dupHandle, err := v.Create(sctx, txn, indexVals, recordID, rsData, opts...); err != nil { if kv.ErrKeyExists.Equal(err) { return dupHandle, dupErr diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 2b7ff17768b44..2fca105e8e259 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -324,12 +324,6 @@ func EncodeOldRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int6 return codec.EncodeValue(sc, valBuf, values...) } -func Flatten(sc *stmtctx.StatementContext, data types.Datum) (types.Datum, error) { - var ret types.Datum - err := flatten(sc, data, &ret) - return ret, err -} - func flatten(sc *stmtctx.StatementContext, data types.Datum, ret *types.Datum) error { switch data.Kind() { case types.KindMysqlTime: diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index f1054887cb472..af440cdf555f4 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -478,7 +478,6 @@ func TestDecodeIndexKey(t *testing.T) { str = fmt.Sprintf("%d-%v", v.Kind(), v.GetValue()) } valueStrs = append(valueStrs, str) - fmt.Println(valueStrs) } sc := &stmtctx.StatementContext{TimeZone: time.UTC} encodedValue, err := codec.EncodeKey(sc, nil, values...) From 8a753822dbcbce40bdd500a26208e9ec5f48c85c Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 10 Aug 2021 19:48:25 +0800 Subject: [PATCH 03/21] ignore deletion of indices if NeedRestoredData Signed-off-by: ekexium --- table/tables/mutation_checker.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index ddfe5150bae14..1da2d61838f27 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -34,8 +34,8 @@ type Mutation = struct { } type IndexHelperInfo = struct { - indexInfo *model.IndexInfo - colInfos []rowcodec.ColInfo + indexInfo *model.IndexInfo + rowColInfos []rowcodec.ColInfo } // CheckIndexConsistency checks whether the given set of mutations corresponding to a single row is consistent. @@ -96,16 +96,13 @@ func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.Sess return errors.New("index not found") } - colInfos := BuildRowcodecColInfoForIndexColumns(indexHelperInfo.indexInfo, t.Meta()) - - if len(m.value) == 0 { - // FIXME: for a delete index mutation, we cannot know the value of it. - // When new collation is enabled, we cannot decode value from the key. - // => ignore it for now + if len(m.value) == 0 && NeedRestoredData(indexHelperInfo.indexInfo.Columns, t.Meta().Columns) { + // when we cannot decode the key to get the original value continue } + decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexHelperInfo.indexInfo.Columns), - tablecodec.HandleNotNeeded, colInfos) + tablecodec.HandleNotNeeded, indexHelperInfo.rowColInfos) if err != nil { return errors.Trace(err) } From 11fa2936f32f05861325470207f0fd1c9e2b3dc7 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 12 Aug 2021 22:31:37 +0800 Subject: [PATCH 04/21] check values of row mutations Signed-off-by: ekexium --- table/tables/mutation_checker.go | 150 ++++++++++++++++++++----------- util/rowcodec/row.go | 2 +- 2 files changed, 99 insertions(+), 53 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 1da2d61838f27..281577da787f8 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -34,44 +34,38 @@ type Mutation = struct { } type IndexHelperInfo = struct { - indexInfo *model.IndexInfo - rowColInfos []rowcodec.ColInfo + indexInfo *model.IndexInfo + rowColInfos []rowcodec.ColInfo } // CheckIndexConsistency checks whether the given set of mutations corresponding to a single row is consistent. // Namely, assume the database is consistent before, applying the mutations shouldn't break the consistency. -// It aims at reducing bugs that will corrupt data, and prevent mistakes from spreading if possible. +// It aims at reducing bugs that will corrupt data, and preventing mistakes from spreading if possible. // // Assume the set of row values changes from V1 to V2. We check // (1) V2 - V1 = {added indices} // (2) V1 - V2 = {deleted indices} // // To check (1), we need -// (a) {added indices} is a subset of {needed indices} <=> each index mutation is consistent with the input/row key/value -// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we calculate +// (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value +// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate // the mutations, thus ignored. func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, dataAdded, dataRemoved []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { - // collect mutations - mutations := make([]Mutation, 0) - inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { - // TODO: shall we check only the current table, or all tables involved? - if tablecodec.DecodeTableID(key) == t.physicalTableID { - mutations = append(mutations, Mutation{key, flags, data}) - } + mutations := collectTableMutationsFromBufferStage(t, memBuffer, sh) + if err := checkRowValues(sc, sessVars, t, dataAdded, dataRemoved, mutations); err != nil { + return errors.Trace(err) } - memBuffer.InspectStage(sh, inspector) - - // get the handle - handlesAdded, handlesRemoved := ExtractHandles(mutations, t) - if len(handlesAdded) > 1 || len(handlesRemoved) > 1 { - // TODO: is it possible? - logutil.BgLogger().Error("multiple handles added/mutated", zap.Any("handlesAdded", handlesAdded), - zap.Any("handlesRemoved", handlesRemoved)) - return errors.New("multiple handles added/mutated") + if err := checkIndexKeys(sc, sessVars, t, dataAdded, dataRemoved, mutations); err != nil { + return errors.Trace(err) } + // TODO: check whether handles match in index and row mutations + return nil +} - // 1. TODO: compare handlesAdded vs. dataAdded, handlesRemoved vs. dataRemoved +// checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. +func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, + dataAdded []types.Datum, dataRemoved []types.Datum, mutations []Mutation) error { indexIdMap := make(map[int64]IndexHelperInfo) for _, index := range t.indices { indexIdMap[index.Meta().ID] = IndexHelperInfo{ @@ -80,7 +74,6 @@ func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.Sess } } - // 2. check index keys: consistent with input data for _, m := range mutations { if !tablecodec.IsIndexKey(m.key) { continue @@ -96,8 +89,8 @@ func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.Sess return errors.New("index not found") } + // when we cannot decode the key to get the original value if len(m.value) == 0 && NeedRestoredData(indexHelperInfo.indexInfo.Columns, t.Meta().Columns) { - // when we cannot decode the key to get the original value continue } @@ -108,17 +101,12 @@ func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.Sess } indexData := make([]types.Datum, 0) for i, v := range decodedIndexValues { - d, err := tablecodec.DecodeColumnValue(v, &t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].FieldType, sessVars.TimeZone) + fieldType := &t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].FieldType + datum, err := tablecodec.DecodeColumnValue(v, fieldType, sessVars.Location()) if err != nil { return errors.Trace(err) } - indexData = append(indexData, d) - logutil.BgLogger().Warn("decoded index value", zap.String("datum", d.String())) - } - - // TODO: when is it nil? - if m.value == nil { - continue + indexData = append(indexData, datum) } if len(m.value) == 0 { @@ -133,6 +121,73 @@ func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.Sess return nil } +// checkRowValues checks whether the values of row mutations are consistent with the expected ones +func checkRowValues(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, + dataAdded, dataRemoved []types.Datum, mutations []Mutation) error { + rowsAdded, rowsRemoved := ExtractRowMutations(mutations) + if len(rowsAdded) > 1 || len(rowsRemoved) > 1 { + // TODO: is it possible? + logutil.BgLogger().Error("multiple row mutations added/mutated", zap.Any("rowsAdded", rowsAdded), + zap.Any("rowsRemoved", rowsRemoved)) + return errors.New("multiple row mutations added/mutated") + } + + columnMap := make(map[int64]*model.ColumnInfo) + columnFieldMap := make(map[int64]*types.FieldType) + for _, col := range t.Meta().Columns { + columnMap[col.ID] = col + columnFieldMap[col.ID] = &col.FieldType + } + + if err := checkRowMutationsWithData(sc, sessVars, rowsAdded, columnFieldMap, dataAdded, columnMap); err != nil { + return errors.Trace(err) + } + if err := checkRowMutationsWithData(sc, sessVars, rowsRemoved, columnFieldMap, dataRemoved, columnMap); err != nil { + return errors.Trace(err) + } + return nil +} + +func checkRowMutationsWithData(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, rowMutations []Mutation, + columnFieldMap map[int64]*types.FieldType, expectedData []types.Datum, columnMap map[int64]*model.ColumnInfo) error { + if len(rowMutations) > 0 { + decodedData, err := tablecodec.DecodeRowToDatumMap(rowMutations[0].value, columnFieldMap, sessVars.Location()) + if err != nil { + return errors.Trace(err) + } + + // TODO: we cannot check if the decoded values contain all columns since some columns may be skipped. + // Instead we check data in the value are consistent with input. + + for columnId, decodedDatum := range decodedData { + inputDatum := expectedData[columnMap[columnId].Offset] + cmp, err := decodedDatum.CompareDatum(sc, &inputDatum) + if err != nil { + return errors.Trace(err) + } + if cmp != 0 { + logutil.BgLogger().Error("inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), + zap.String("input datum", inputDatum.String())) + return errors.New(fmt.Sprintf("inconsistent row mutation, row datum = {%v}, input datum = {%v}", + decodedDatum.String(), inputDatum.String())) + } + } + } + return nil +} + +func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) []Mutation { + mutations := make([]Mutation, 0) + inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { + // TODO: shall we check only the current table, or all tables involved? + if tablecodec.DecodeTableID(key) == t.physicalTableID { + mutations = append(mutations, Mutation{key, flags, data}) + } + } + memBuffer.InspectStage(sh, inspector) + return mutations +} + func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, input []types.Datum, indexHelperInfo IndexHelperInfo) error { for i, decodedMutationDatum := range indexData { expectedDatum := input[indexHelperInfo.indexInfo.Columns[i].Offset] @@ -158,27 +213,18 @@ func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, i return nil } -// ExtractHandles extract handles of row mutations and classify them into 2 categories: put and delete -func ExtractHandles(mutations []Mutation, t *TableCommon) ([]kv.Handle, []kv.Handle) { - handlesAdded := make([]kv.Handle, 0) - handlesRemoved := make([]kv.Handle, 0) +// ExtractRowMutations extracts row mutations and classify them into 2 categories: put and delete +func ExtractRowMutations(mutations []Mutation) ([]Mutation, []Mutation) { + handlesAdded := make([]Mutation, 0) + handlesRemoved := make([]Mutation, 0) + // TODO: assumption: value in mem buffer for _, m := range mutations { - handle, err := tablecodec.DecodeRowKey(m.key) - if err != nil { - // TODO: remove it later - logutil.BgLogger().Warn("decode row key failed", zap.Error(err)) - continue - } - - // TODO: distinguish between nil and empty value - if m.value == nil { - logutil.BgLogger().Warn("row.value = nil", zap.String("handle", handle.String())) - continue - } - if len(m.value) > 0 { - handlesAdded = append(handlesAdded, handle) - } else { - handlesRemoved = append(handlesRemoved, handle) + if rowcodec.IsRowKey(m.key) { + if len(m.value) > 0 { + handlesAdded = append(handlesAdded, m) + } else { + handlesRemoved = append(handlesRemoved, m) + } } } return handlesAdded, handlesRemoved diff --git a/util/rowcodec/row.go b/util/rowcodec/row.go index 2cafab6c875a9..0e6b1a9ede213 100644 --- a/util/rowcodec/row.go +++ b/util/rowcodec/row.go @@ -18,7 +18,7 @@ import ( "encoding/binary" ) -// row is the struct type used to access the a row. +// row is the struct type used to access a row. type row struct { // small: colID []byte, offsets []uint16, optimized for most cases. // large: colID []uint32, offsets []uint32. From f5160e1941a4d7d06bd54ca06cd2d86a329ce43f Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 12 Aug 2021 22:49:00 +0800 Subject: [PATCH 05/21] style: fix naming Signed-off-by: ekexium --- table/tables/mutation_checker.go | 37 ++++++++++++++++---------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 281577da787f8..845cb34760632 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -15,6 +15,7 @@ package tables import ( "fmt" + "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" @@ -27,13 +28,13 @@ import ( "go.uber.org/zap" ) -type Mutation = struct { +type mutation = struct { key kv.Key flags kv.KeyFlags value []byte } -type IndexHelperInfo = struct { +type indexHelperInfo = struct { indexInfo *model.IndexInfo rowColInfos []rowcodec.ColInfo } @@ -65,10 +66,10 @@ func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.Sess // checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, - dataAdded []types.Datum, dataRemoved []types.Datum, mutations []Mutation) error { - indexIdMap := make(map[int64]IndexHelperInfo) + dataAdded []types.Datum, dataRemoved []types.Datum, mutations []mutation) error { + indexIDMap := make(map[int64]indexHelperInfo) for _, index := range t.indices { - indexIdMap[index.Meta().ID] = IndexHelperInfo{ + indexIDMap[index.Meta().ID] = indexHelperInfo{ index.Meta(), BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()), } @@ -79,12 +80,12 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars continue } - _, indexId, _, err := tablecodec.DecodeIndexKey(m.key) + _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) if err != nil { continue } - indexHelperInfo, ok := indexIdMap[indexId] + indexHelperInfo, ok := indexIDMap[indexID] if !ok { return errors.New("index not found") } @@ -123,7 +124,7 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars // checkRowValues checks whether the values of row mutations are consistent with the expected ones func checkRowValues(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, - dataAdded, dataRemoved []types.Datum, mutations []Mutation) error { + dataAdded, dataRemoved []types.Datum, mutations []mutation) error { rowsAdded, rowsRemoved := ExtractRowMutations(mutations) if len(rowsAdded) > 1 || len(rowsRemoved) > 1 { // TODO: is it possible? @@ -148,7 +149,7 @@ func checkRowValues(sc *stmtctx.StatementContext, sessVars *variable.SessionVars return nil } -func checkRowMutationsWithData(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, rowMutations []Mutation, +func checkRowMutationsWithData(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, rowMutations []mutation, columnFieldMap map[int64]*types.FieldType, expectedData []types.Datum, columnMap map[int64]*model.ColumnInfo) error { if len(rowMutations) > 0 { decodedData, err := tablecodec.DecodeRowToDatumMap(rowMutations[0].value, columnFieldMap, sessVars.Location()) @@ -159,8 +160,8 @@ func checkRowMutationsWithData(sc *stmtctx.StatementContext, sessVars *variable. // TODO: we cannot check if the decoded values contain all columns since some columns may be skipped. // Instead we check data in the value are consistent with input. - for columnId, decodedDatum := range decodedData { - inputDatum := expectedData[columnMap[columnId].Offset] + for columnID, decodedDatum := range decodedData { + inputDatum := expectedData[columnMap[columnID].Offset] cmp, err := decodedDatum.CompareDatum(sc, &inputDatum) if err != nil { return errors.Trace(err) @@ -176,19 +177,19 @@ func checkRowMutationsWithData(sc *stmtctx.StatementContext, sessVars *variable. return nil } -func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) []Mutation { - mutations := make([]Mutation, 0) +func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) []mutation { + mutations := make([]mutation, 0) inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { // TODO: shall we check only the current table, or all tables involved? if tablecodec.DecodeTableID(key) == t.physicalTableID { - mutations = append(mutations, Mutation{key, flags, data}) + mutations = append(mutations, mutation{key, flags, data}) } } memBuffer.InspectStage(sh, inspector) return mutations } -func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, input []types.Datum, indexHelperInfo IndexHelperInfo) error { +func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, input []types.Datum, indexHelperInfo indexHelperInfo) error { for i, decodedMutationDatum := range indexData { expectedDatum := input[indexHelperInfo.indexInfo.Columns[i].Offset] @@ -214,9 +215,9 @@ func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, i } // ExtractRowMutations extracts row mutations and classify them into 2 categories: put and delete -func ExtractRowMutations(mutations []Mutation) ([]Mutation, []Mutation) { - handlesAdded := make([]Mutation, 0) - handlesRemoved := make([]Mutation, 0) +func ExtractRowMutations(mutations []mutation) ([]mutation, []mutation) { + handlesAdded := make([]mutation, 0) + handlesRemoved := make([]mutation, 0) // TODO: assumption: value in mem buffer for _, m := range mutations { if rowcodec.IsRowKey(m.key) { From be15c781d59e14a991a146c99a6ac6f04ce759f1 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 12 Aug 2021 23:44:48 +0800 Subject: [PATCH 06/21] skip when sh == 0 Signed-off-by: ekexium --- table/tables/mutation_checker.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 845cb34760632..51c9d2ecbdb24 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -53,6 +53,9 @@ type indexHelperInfo = struct { // the mutations, thus ignored. func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, dataAdded, dataRemoved []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { + if sh == 0 { + return nil + } mutations := collectTableMutationsFromBufferStage(t, memBuffer, sh) if err := checkRowValues(sc, sessVars, t, dataAdded, dataRemoved, mutations); err != nil { return errors.Trace(err) From dca9d75fb42d22397b7dad9c9541e2b99d76b35c Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 1 Sep 2021 15:51:13 +0800 Subject: [PATCH 07/21] check in RemoveRecord Signed-off-by: ekexium --- table/tables/mutation_checker.go | 20 +++++++++----------- table/tables/tables.go | 17 ++++++++++++++--- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 51c9d2ecbdb24..6eb7e866282a9 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -57,7 +57,7 @@ func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.Sess return nil } mutations := collectTableMutationsFromBufferStage(t, memBuffer, sh) - if err := checkRowValues(sc, sessVars, t, dataAdded, dataRemoved, mutations); err != nil { + if err := checkRowAdditionConsistency(sc, sessVars, t, dataAdded, mutations); err != nil { return errors.Trace(err) } if err := checkIndexKeys(sc, sessVars, t, dataAdded, dataRemoved, mutations); err != nil { @@ -125,14 +125,14 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars return nil } -// checkRowValues checks whether the values of row mutations are consistent with the expected ones -func checkRowValues(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, - dataAdded, dataRemoved []types.Datum, mutations []mutation) error { - rowsAdded, rowsRemoved := ExtractRowMutations(mutations) - if len(rowsAdded) > 1 || len(rowsRemoved) > 1 { +// checkRowAdditionConsistency checks whether the values of row mutations are consistent with the expected ones +// We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) +func checkRowAdditionConsistency(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, + dataAdded []types.Datum, mutations []mutation) error { + rowsAdded, _ := ExtractRowMutations(mutations) + if len(rowsAdded) > 1 { // TODO: is it possible? - logutil.BgLogger().Error("multiple row mutations added/mutated", zap.Any("rowsAdded", rowsAdded), - zap.Any("rowsRemoved", rowsRemoved)) + logutil.BgLogger().Error("multiple row mutations added/mutated", zap.Any("rowsAdded", rowsAdded)) return errors.New("multiple row mutations added/mutated") } @@ -146,15 +146,13 @@ func checkRowValues(sc *stmtctx.StatementContext, sessVars *variable.SessionVars if err := checkRowMutationsWithData(sc, sessVars, rowsAdded, columnFieldMap, dataAdded, columnMap); err != nil { return errors.Trace(err) } - if err := checkRowMutationsWithData(sc, sessVars, rowsRemoved, columnFieldMap, dataRemoved, columnMap); err != nil { - return errors.Trace(err) - } return nil } func checkRowMutationsWithData(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, rowMutations []mutation, columnFieldMap map[int64]*types.FieldType, expectedData []types.Datum, columnMap map[int64]*model.ColumnInfo) error { if len(rowMutations) > 0 { + // FIXME: len(value) == 0 (delete) decodedData, err := tablecodec.DecodeRowToDatumMap(rowMutations[0].value, columnFieldMap, sessVars.Location()) if err != nil { return errors.Trace(err) diff --git a/table/tables/tables.go b/table/tables/tables.go index ae1434471e652..0970443afc5fa 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -1054,15 +1054,20 @@ func GetChangingColVal(ctx sessionctx.Context, cols []*table.Column, col *table. // RemoveRecord implements table.Table RemoveRecord interface. func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - err := t.removeRowData(ctx, h) + txn, err := ctx.Txn(true) if err != nil { return err } - txn, err := ctx.Txn(true) + memBuffer := txn.GetMemBuffer() + sh := memBuffer.Staging() + defer memBuffer.Cleanup(sh) + + err = t.removeRowData(ctx, h) if err != nil { return err } + if m := t.Meta(); m.TempTableType != model.TempTableNone { if tmpTable := addTemporaryTable(ctx, m); tmpTable != nil { if err := checkTempTableSize(ctx, tmpTable, m); err != nil { @@ -1090,6 +1095,13 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } + sessVars := ctx.GetSessionVars() + sc := sessVars.StmtCtx + if err = CheckIndexConsistency(sc, sessVars, t, nil, r, memBuffer, sh); err != nil { + return errors.Trace(err) + } + memBuffer.Release(sh) + if shouldWriteBinlog(ctx, t.meta) { cols := t.Cols() colIDs := make([]int64, 0, len(cols)+1) @@ -1115,7 +1127,6 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return nil } colSize := make(map[int64]int64, len(t.Cols())) - sc := ctx.GetSessionVars().StmtCtx for id, col := range t.Cols() { size, err := codec.EstimateValueSize(sc, r[id]) if err != nil { From ee5a80191b021c0db0f1fc90e98752797708b57a Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 1 Sep 2021 16:05:32 +0800 Subject: [PATCH 08/21] modify license header Signed-off-by: ekexium --- table/tables/mutation_checker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 6eb7e866282a9..488799731a688 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -8,6 +8,7 @@ // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. From ada4dc50cc80eff22fbf665f0266a12ac0589c5e Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 9 Sep 2021 00:36:51 +0800 Subject: [PATCH 09/21] test: add unit tests Signed-off-by: ekexium --- table/tables/mutation_checker.go | 104 +++++++------ table/tables/mutation_checker_test.go | 201 ++++++++++++++++++++++++++ table/tables/tables.go | 6 +- 3 files changed, 263 insertions(+), 48 deletions(-) create mode 100644 table/tables/mutation_checker_test.go diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 488799731a688..ebb64c2407cbe 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -17,6 +17,8 @@ package tables import ( "fmt" + "github.com/pingcap/tidb/table" + "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" @@ -52,13 +54,14 @@ type indexHelperInfo = struct { // (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value // (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate // the mutations, thus ignored. -func CheckIndexConsistency(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, +func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, dataAdded, dataRemoved []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { + sc := sessVars.StmtCtx if sh == 0 { return nil } mutations := collectTableMutationsFromBufferStage(t, memBuffer, sh) - if err := checkRowAdditionConsistency(sc, sessVars, t, dataAdded, mutations); err != nil { + if err := checkRowAdditionConsistency(sessVars, t.Meta().Columns, dataAdded, mutations); err != nil { return errors.Trace(err) } if err := checkIndexKeys(sc, sessVars, t, dataAdded, dataRemoved, mutations); err != nil { @@ -115,9 +118,9 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars } if len(m.value) == 0 { - err = compareIndexData(sc, t, indexData, dataRemoved, indexHelperInfo) + err = compareIndexData(sc, t.Columns, indexData, dataRemoved, indexHelperInfo.indexInfo) } else { - err = compareIndexData(sc, t, indexData, dataAdded, indexHelperInfo) + err = compareIndexData(sc, t.Columns, indexData, dataAdded, indexHelperInfo.indexInfo) } if err != nil { return errors.Trace(err) @@ -128,9 +131,15 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars // checkRowAdditionConsistency checks whether the values of row mutations are consistent with the expected ones // We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) -func checkRowAdditionConsistency(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, - dataAdded []types.Datum, mutations []mutation) error { - rowsAdded, _ := ExtractRowMutations(mutations) +func checkRowAdditionConsistency(sessVars *variable.SessionVars, tableColumns []*model.ColumnInfo, dataAdded []types.Datum, mutations []mutation) error { + rowsAdded, _ := extractRowMutations(mutations) + if dataAdded == nil { + // should be unreachable + return nil + } + if len(rowsAdded) == 0 { + return errors.New("record mutations not found for a put of a row") + } if len(rowsAdded) > 1 { // TODO: is it possible? logutil.BgLogger().Error("multiple row mutations added/mutated", zap.Any("rowsAdded", rowsAdded)) @@ -138,44 +147,46 @@ func checkRowAdditionConsistency(sc *stmtctx.StatementContext, sessVars *variabl } columnMap := make(map[int64]*model.ColumnInfo) - columnFieldMap := make(map[int64]*types.FieldType) - for _, col := range t.Meta().Columns { + for _, col := range tableColumns { columnMap[col.ID] = col - columnFieldMap[col.ID] = &col.FieldType } - if err := checkRowMutationsWithData(sc, sessVars, rowsAdded, columnFieldMap, dataAdded, columnMap); err != nil { + if err := checkRowMutationWithData(sessVars, rowsAdded[0].value, dataAdded, columnMap); err != nil { return errors.Trace(err) } return nil } -func checkRowMutationsWithData(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, rowMutations []mutation, - columnFieldMap map[int64]*types.FieldType, expectedData []types.Datum, columnMap map[int64]*model.ColumnInfo) error { - if len(rowMutations) > 0 { - // FIXME: len(value) == 0 (delete) - decodedData, err := tablecodec.DecodeRowToDatumMap(rowMutations[0].value, columnFieldMap, sessVars.Location()) +// checkRowMutationWithData checks if the given row mutation is consistent with the expected one +// precondition: expectedData contains all fields in order +func checkRowMutationWithData(sessVars *variable.SessionVars, mutationValue []byte, expectedData []types.Datum, columnMap map[int64]*model.ColumnInfo) error { + columnFieldMap := make(map[int64]*types.FieldType) + for id, col := range columnMap { + columnFieldMap[id] = &col.FieldType + } + decodedData, err := tablecodec.DecodeRowToDatumMap(mutationValue, columnFieldMap, sessVars.Location()) + // NOTE: decodedData can be empty + if err != nil { + return errors.Trace(err) + } + + // NOTE: we cannot check if the decoded values contain all columns since some columns may be skipped. It can even be empty + // Instead, we check that decoded index values are consistent with the input row. + + for columnID, decodedDatum := range decodedData { + inputDatum := expectedData[columnMap[columnID].Offset] + cmp, err := decodedDatum.CompareDatum(sessVars.StmtCtx, &inputDatum) if err != nil { return errors.Trace(err) } - - // TODO: we cannot check if the decoded values contain all columns since some columns may be skipped. - // Instead we check data in the value are consistent with input. - - for columnID, decodedDatum := range decodedData { - inputDatum := expectedData[columnMap[columnID].Offset] - cmp, err := decodedDatum.CompareDatum(sc, &inputDatum) - if err != nil { - return errors.Trace(err) - } - if cmp != 0 { - logutil.BgLogger().Error("inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), - zap.String("input datum", inputDatum.String())) - return errors.New(fmt.Sprintf("inconsistent row mutation, row datum = {%v}, input datum = {%v}", - decodedDatum.String(), inputDatum.String())) - } + if cmp != 0 { + logutil.BgLogger().Error("inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), + zap.String("input datum", inputDatum.String())) + return errors.New(fmt.Sprintf("inconsistent row mutation, row datum = {%v}, input datum = {%v}", + decodedDatum.String(), inputDatum.String())) } } + return nil } @@ -191,15 +202,18 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer return mutations } -func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, input []types.Datum, indexHelperInfo indexHelperInfo) error { +// compareIndexData compares the decoded index data with the input data. +// Returns error if the index data is not a subset of the input data. +func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo) error { for i, decodedMutationDatum := range indexData { - expectedDatum := input[indexHelperInfo.indexInfo.Columns[i].Offset] + expectedDatum := input[indexInfo.Columns[i].Offset] - tablecodec.TruncateIndexValue(&expectedDatum, indexHelperInfo.indexInfo.Columns[i], - t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].ColumnInfo) + tablecodec.TruncateIndexValue(&expectedDatum, indexInfo.Columns[i], + cols[indexInfo.Columns[i].Offset].ColumnInfo) - tablecodec.TruncateIndexValue(&decodedMutationDatum, indexHelperInfo.indexInfo.Columns[i], - t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].ColumnInfo) + // TODO: no need to truncate index data? + //tablecodec.TruncateIndexValue(&decodedMutationDatum, indexInfo.Columns[i], + // cols[indexInfo.Columns[i].Offset].ColumnInfo) comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) if err != nil { @@ -216,19 +230,19 @@ func compareIndexData(sc *stmtctx.StatementContext, t *TableCommon, indexData, i return nil } -// ExtractRowMutations extracts row mutations and classify them into 2 categories: put and delete -func ExtractRowMutations(mutations []mutation) ([]mutation, []mutation) { - handlesAdded := make([]mutation, 0) - handlesRemoved := make([]mutation, 0) +// extractRowMutations extracts row mutations and classify them into 2 categories: put and delete +func extractRowMutations(mutations []mutation) ([]mutation, []mutation) { + insertions := make([]mutation, 0) + deletions := make([]mutation, 0) // TODO: assumption: value in mem buffer for _, m := range mutations { if rowcodec.IsRowKey(m.key) { if len(m.value) > 0 { - handlesAdded = append(handlesAdded, m) + insertions = append(insertions, m) } else { - handlesRemoved = append(handlesRemoved, m) + deletions = append(deletions, m) } } } - return handlesAdded, handlesRemoved + return insertions, deletions } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go new file mode 100644 index 0000000000000..3508f48151e89 --- /dev/null +++ b/table/tables/mutation_checker_test.go @@ -0,0 +1,201 @@ +package tables + +import ( + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/rowcodec" + "github.com/stretchr/testify/require" + "testing" +) + +func TestCompareIndexData(t *testing.T) { + // dimensions of the domain of compareIndexData + // 1. table structure, where we only care about column types that influence truncating values + // 2. comparison of row data & index data + + type caseData = struct { + indexData []types.Datum + inputData []types.Datum + fts []*types.FieldType + indexLength []int + correct bool + } + + // assume the index is on all columns + testData := []caseData{ + { + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []*types.FieldType{types.NewFieldType(mysql.TypeShort), types.NewFieldType(mysql.TypeString)}, + []int{types.UnspecifiedLength, types.UnspecifiedLength}, + true, + }, + { + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string2")}, + []*types.FieldType{types.NewFieldType(mysql.TypeShort), types.NewFieldType(mysql.TypeString)}, + []int{types.UnspecifiedLength, types.UnspecifiedLength}, + false, + }, + { + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string")}, + []types.Datum{types.NewIntDatum(1), types.NewStringDatum("some string2")}, + []*types.FieldType{types.NewFieldType(mysql.TypeShort), types.NewFieldType(mysql.TypeString)}, + []int{types.UnspecifiedLength, 11}, + true, + }, + } + + for caseID, data := range testData { + sc := &stmtctx.StatementContext{} + cols := make([]*table.Column, 0) + indexCols := make([]*model.IndexColumn, 0) + for i, ft := range data.fts { + cols = append(cols, &table.Column{ColumnInfo: &model.ColumnInfo{FieldType: *ft}}) + indexCols = append(indexCols, &model.IndexColumn{Offset: i, Length: data.indexLength[i]}) + } + indexInfo := &model.IndexInfo{Columns: indexCols} + + err := compareIndexData(sc, cols, data.indexData, data.inputData, indexInfo) + require.Equal(t, data.correct, err == nil, "case id = %v", caseID) + } +} + +func TestCheckRowAdditionConsistency(t *testing.T) { + sessVars := variable.NewSessionVars() + rd := rowcodec.Encoder{Enable: true} + + // mocked data + mockRowKey := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(1)) + mockRowKey233 := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(233)) + mockValue233, err := tablecodec.EncodeRow(sessVars.StmtCtx, []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd) + require.Nil(t, err) + fakeMutations := []mutation{ + {key: []byte{1, 1}, value: []byte{1, 1, 1}}, + } + type caseData = struct { + tableColumns []*model.ColumnInfo + inputRow []types.Datum + mutations []mutation + autoEncode bool // encode mutations from row and cols, then append the result to the mutations + correct bool + } + + testData := []caseData{ + { // no mutations + []*model.ColumnInfo{ + { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + []types.Datum{types.NewIntDatum(1)}, + nil, + false, + false, + }, + { // no corresponding mutation + []*model.ColumnInfo{ + { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + []types.Datum{types.NewIntDatum(1)}, + fakeMutations, + false, + false, + }, + { + []*model.ColumnInfo{ + { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + []types.Datum{types.NewIntDatum(1)}, + fakeMutations, + true, + true, + }, + { // no input row + []*model.ColumnInfo{}, + nil, + fakeMutations, + true, + true, + }, + { // duplicated mutation + []*model.ColumnInfo{ + { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + []types.Datum{types.NewIntDatum(233)}, + []mutation{ + {key: mockRowKey233, value: mockValue233}, + }, + true, + false, + }, + { // different value + []*model.ColumnInfo{ + { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeInt24), + }, + }, + []types.Datum{types.NewIntDatum(1)}, + []mutation{ + {key: mockRowKey233, value: mockValue233}, + }, + false, + false, + }, + { // invalid value + []*model.ColumnInfo{ + { + ID: 101, + Offset: 0, + FieldType: *types.NewFieldType(mysql.TypeShort), + }, + }, + []types.Datum{types.NewIntDatum(233)}, + []mutation{ + {key: mockRowKey233, value: []byte{0, 1, 2, 3}}, + }, + false, + false, + }, + } + + for caseID, data := range testData { + if data.autoEncode { + value, err := tablecodec.EncodeRow(sessVars.StmtCtx, data.inputRow, columnsToIDs(data.tableColumns), nil, nil, &rd) + require.Nil(t, err) + data.mutations = append(data.mutations, mutation{key: mockRowKey, value: value}) + } + err := checkRowAdditionConsistency(sessVars, data.tableColumns, data.inputRow, data.mutations) + require.Equal(t, data.correct, err == nil, "case id = %v", caseID) + } +} + +func columnsToIDs(columns []*model.ColumnInfo) []int64 { + colIDs := make([]int64, 0, len(columns)) + for _, col := range columns { + colIDs = append(colIDs, col.ID) + } + return colIDs +} diff --git a/table/tables/tables.go b/table/tables/tables.go index 0970443afc5fa..e141d8cc812db 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -420,7 +420,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if err = memBuffer.Set(key, value); err != nil { return err } - if err = CheckIndexConsistency(sc, sessVars, t, newData, oldData, memBuffer, sh); err != nil { + if err = CheckIndexConsistency(sessVars, t, newData, oldData, memBuffer, sh); err != nil { return errors.Trace(err) } memBuffer.Release(sh) @@ -839,7 +839,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return h, err } - if err = CheckIndexConsistency(sc, sessVars, t, r, nil, memBuffer, sh); err != nil { + if err = CheckIndexConsistency(sessVars, t, r, nil, memBuffer, sh); err != nil { return nil, errors.Trace(err) } @@ -1097,7 +1097,7 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type sessVars := ctx.GetSessionVars() sc := sessVars.StmtCtx - if err = CheckIndexConsistency(sc, sessVars, t, nil, r, memBuffer, sh); err != nil { + if err = CheckIndexConsistency(sessVars, t, nil, r, memBuffer, sh); err != nil { return errors.Trace(err) } memBuffer.Release(sh) From 5c1483abf4d46675106d38fbad773acb3e0224de Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 9 Sep 2021 10:18:23 +0800 Subject: [PATCH 10/21] add lisence header Signed-off-by: ekexium --- config/config.toml | 7 +++++++ table/tables/mutation_checker_test.go | 14 ++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 config/config.toml diff --git a/config/config.toml b/config/config.toml new file mode 100644 index 0000000000000..3fc28cd644e05 --- /dev/null +++ b/config/config.toml @@ -0,0 +1,7 @@ + +enable-table-lock = true + +enable-telemetry = false + +[security] +spilled-file-encryption-method = "aes128-ctr" diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 3508f48151e89..bd74713205c56 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -1,3 +1,17 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tables import ( From 4e72298232c003293523b8fc54822e57e8b8c4a5 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 9 Sep 2021 10:28:53 +0800 Subject: [PATCH 11/21] also truncate decoded mutation Signed-off-by: ekexium --- config/config.toml | 7 ------- table/tables/mutation_checker.go | 6 ++---- table/tables/mutation_checker_test.go | 3 ++- 3 files changed, 4 insertions(+), 12 deletions(-) delete mode 100644 config/config.toml diff --git a/config/config.toml b/config/config.toml deleted file mode 100644 index 3fc28cd644e05..0000000000000 --- a/config/config.toml +++ /dev/null @@ -1,7 +0,0 @@ - -enable-table-lock = true - -enable-telemetry = false - -[security] -spilled-file-encryption-method = "aes128-ctr" diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index ebb64c2407cbe..b47fefe2f0373 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -210,10 +210,8 @@ func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexD tablecodec.TruncateIndexValue(&expectedDatum, indexInfo.Columns[i], cols[indexInfo.Columns[i].Offset].ColumnInfo) - - // TODO: no need to truncate index data? - //tablecodec.TruncateIndexValue(&decodedMutationDatum, indexInfo.Columns[i], - // cols[indexInfo.Columns[i].Offset].ColumnInfo) + tablecodec.TruncateIndexValue(&decodedMutationDatum, indexInfo.Columns[i], + cols[indexInfo.Columns[i].Offset].ColumnInfo) comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) if err != nil { diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index bd74713205c56..100d09e14dff5 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -15,6 +15,8 @@ package tables import ( + "testing" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" @@ -25,7 +27,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/rowcodec" "github.com/stretchr/testify/require" - "testing" ) func TestCompareIndexData(t *testing.T) { From 9e01dca51bb34e2579e90dc8b747e024ebd2fc12 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 9 Sep 2021 12:13:37 +0800 Subject: [PATCH 12/21] tidy up Signed-off-by: ekexium --- table/tables/mutation_checker.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index b47fefe2f0373..6ae37626f875a 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -141,7 +141,7 @@ func checkRowAdditionConsistency(sessVars *variable.SessionVars, tableColumns [] return errors.New("record mutations not found for a put of a row") } if len(rowsAdded) > 1 { - // TODO: is it possible? + // impossible to have multiple insertions of records when modifying a single row of in a table logutil.BgLogger().Error("multiple row mutations added/mutated", zap.Any("rowsAdded", rowsAdded)) return errors.New("multiple row mutations added/mutated") } @@ -165,7 +165,6 @@ func checkRowMutationWithData(sessVars *variable.SessionVars, mutationValue []by columnFieldMap[id] = &col.FieldType } decodedData, err := tablecodec.DecodeRowToDatumMap(mutationValue, columnFieldMap, sessVars.Location()) - // NOTE: decodedData can be empty if err != nil { return errors.Trace(err) } @@ -193,7 +192,7 @@ func checkRowMutationWithData(sessVars *variable.SessionVars, mutationValue []by func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) []mutation { mutations := make([]mutation, 0) inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { - // TODO: shall we check only the current table, or all tables involved? + // only check the current table if tablecodec.DecodeTableID(key) == t.physicalTableID { mutations = append(mutations, mutation{key, flags, data}) } @@ -232,7 +231,6 @@ func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexD func extractRowMutations(mutations []mutation) ([]mutation, []mutation) { insertions := make([]mutation, 0) deletions := make([]mutation, 0) - // TODO: assumption: value in mem buffer for _, m := range mutations { if rowcodec.IsRowKey(m.key) { if len(m.value) > 0 { From 1aab79ea20410316101a5db817ca974807538c97 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 14 Sep 2021 19:21:35 +0800 Subject: [PATCH 13/21] refactor according to comments Signed-off-by: ekexium Auto stash before rebase of "ft-data-inconsistency" --- table/tables/mutation_checker.go | 103 +++++++++++++------------- table/tables/mutation_checker_test.go | 97 ++++-------------------- 2 files changed, 67 insertions(+), 133 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 6ae37626f875a..723f3bf6f1856 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -17,13 +17,12 @@ package tables import ( "fmt" - "github.com/pingcap/tidb/table" - "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" @@ -31,13 +30,13 @@ import ( "go.uber.org/zap" ) -type mutation = struct { +type mutation struct { key kv.Key flags kv.KeyFlags value []byte } -type indexHelperInfo = struct { +type indexHelperInfo struct { indexInfo *model.IndexInfo rowColInfos []rowcodec.ColInfo } @@ -55,16 +54,25 @@ type indexHelperInfo = struct { // (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate // the mutations, thus ignored. func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, - dataAdded, dataRemoved []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { + rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { + if t.Meta().GetPartitionInfo() != nil { + return nil + } sc := sessVars.StmtCtx if sh == 0 { - return nil + logutil.BgLogger().Error("No membuffer stage when checking index consistency") + return errors.New("No membuffer stage when checking index consistency") } - mutations := collectTableMutationsFromBufferStage(t, memBuffer, sh) - if err := checkRowAdditionConsistency(sessVars, t.Meta().Columns, dataAdded, mutations); err != nil { + indexMutations, rowInsertion, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) + if err != nil { return errors.Trace(err) } - if err := checkIndexKeys(sc, sessVars, t, dataAdded, dataRemoved, mutations); err != nil { + if rowToInsert != nil { + if err := checkRowInsertionConsistency(sessVars, t.Meta().Columns, rowToInsert, rowInsertion); err != nil { + return errors.Trace(err) + } + } + if err := checkIndexKeys(sc, sessVars, t, rowToInsert, rowToRemove, indexMutations); err != nil { return errors.Trace(err) } // TODO: check whether handles match in index and row mutations @@ -73,23 +81,23 @@ func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, // checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, - dataAdded []types.Datum, dataRemoved []types.Datum, mutations []mutation) error { + dataAdded []types.Datum, dataRemoved []types.Datum, indexMutations []mutation) error { indexIDMap := make(map[int64]indexHelperInfo) for _, index := range t.indices { + if index.Meta().Primary && t.meta.IsCommonHandle { + continue + } indexIDMap[index.Meta().ID] = indexHelperInfo{ index.Meta(), BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()), } } - for _, m := range mutations { - if !tablecodec.IsIndexKey(m.key) { - continue - } - + for _, m := range indexMutations { _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) if err != nil { - continue + //FIXME: continue or return error? + return errors.Trace(err) } indexHelperInfo, ok := indexIDMap[indexID] @@ -129,29 +137,20 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars return nil } -// checkRowAdditionConsistency checks whether the values of row mutations are consistent with the expected ones +// checkRowInsertionConsistency checks whether the values of row mutations are consistent with the expected ones // We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) -func checkRowAdditionConsistency(sessVars *variable.SessionVars, tableColumns []*model.ColumnInfo, dataAdded []types.Datum, mutations []mutation) error { - rowsAdded, _ := extractRowMutations(mutations) - if dataAdded == nil { - // should be unreachable +func checkRowInsertionConsistency(sessVars *variable.SessionVars, tableColumns []*model.ColumnInfo, rowToInsert []types.Datum, rowInsertion mutation) error { + if rowToInsert == nil { + // it's a deletion return nil } - if len(rowsAdded) == 0 { - return errors.New("record mutations not found for a put of a row") - } - if len(rowsAdded) > 1 { - // impossible to have multiple insertions of records when modifying a single row of in a table - logutil.BgLogger().Error("multiple row mutations added/mutated", zap.Any("rowsAdded", rowsAdded)) - return errors.New("multiple row mutations added/mutated") - } columnMap := make(map[int64]*model.ColumnInfo) for _, col := range tableColumns { columnMap[col.ID] = col } - if err := checkRowMutationWithData(sessVars, rowsAdded[0].value, dataAdded, columnMap); err != nil { + if err := checkRowMutationWithData(sessVars, rowInsertion.value, rowToInsert, columnMap); err != nil { return errors.Trace(err) } return nil @@ -181,24 +180,40 @@ func checkRowMutationWithData(sessVars *variable.SessionVars, mutationValue []by if cmp != 0 { logutil.BgLogger().Error("inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), zap.String("input datum", inputDatum.String())) - return errors.New(fmt.Sprintf("inconsistent row mutation, row datum = {%v}, input datum = {%v}", - decodedDatum.String(), inputDatum.String())) + return errors.Errorf("inconsistent row mutation, row datum = {%v}, input datum = {%v}", decodedDatum.String(), inputDatum.String()) } } return nil } -func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) []mutation { - mutations := make([]mutation, 0) +// collectTableMutationsFromBufferStage collects mutations of the current table from the mem buffer stage +// It returns: (1) all index mutations (2) the only row insertion +// If there are no row insertions, the 2nd returned value is nil +// If there are multiple row insertions, an error is returned +func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) ([]mutation, mutation, error) { + indexMutations := make([]mutation, 0) + var rowInsertion mutation + var err error inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { // only check the current table if tablecodec.DecodeTableID(key) == t.physicalTableID { - mutations = append(mutations, mutation{key, flags, data}) + m := mutation{key, flags, data} + if rowcodec.IsRowKey(key) { + if len(data) > 0 { + if rowInsertion.key == nil { + rowInsertion = m + } else { + err = errors.Errorf("multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m) + } + } + } else { + indexMutations = append(indexMutations, m) + } } } memBuffer.InspectStage(sh, inspector) - return mutations + return indexMutations, rowInsertion, err } // compareIndexData compares the decoded index data with the input data. @@ -226,19 +241,3 @@ func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexD } return nil } - -// extractRowMutations extracts row mutations and classify them into 2 categories: put and delete -func extractRowMutations(mutations []mutation) ([]mutation, []mutation) { - insertions := make([]mutation, 0) - deletions := make([]mutation, 0) - for _, m := range mutations { - if rowcodec.IsRowKey(m.key) { - if len(m.value) > 0 { - insertions = append(insertions, m) - } else { - deletions = append(deletions, m) - } - } - } - return insertions, deletions -} diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 100d09e14dff5..0ce8e7c8d6ba3 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -34,7 +34,7 @@ func TestCompareIndexData(t *testing.T) { // 1. table structure, where we only care about column types that influence truncating values // 2. comparison of row data & index data - type caseData = struct { + type caseData struct { indexData []types.Datum inputData []types.Datum fts []*types.FieldType @@ -82,28 +82,25 @@ func TestCompareIndexData(t *testing.T) { } } -func TestCheckRowAdditionConsistency(t *testing.T) { +func TestCheckRowInsertionConsistency(t *testing.T) { sessVars := variable.NewSessionVars() rd := rowcodec.Encoder{Enable: true} // mocked data - mockRowKey := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(1)) mockRowKey233 := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(233)) mockValue233, err := tablecodec.EncodeRow(sessVars.StmtCtx, []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd) require.Nil(t, err) - fakeMutations := []mutation{ - {key: []byte{1, 1}, value: []byte{1, 1, 1}}, - } - type caseData = struct { + fakeRowInsertion := mutation{key: []byte{1, 1}, value: []byte{1, 1, 1}} + + type caseData struct { tableColumns []*model.ColumnInfo - inputRow []types.Datum - mutations []mutation - autoEncode bool // encode mutations from row and cols, then append the result to the mutations + rowToInsert []types.Datum + rowInsertion mutation correct bool } testData := []caseData{ - { // no mutations + { // expected correct behavior []*model.ColumnInfo{ { ID: 101, @@ -111,12 +108,11 @@ func TestCheckRowAdditionConsistency(t *testing.T) { FieldType: *types.NewFieldType(mysql.TypeShort), }, }, - []types.Datum{types.NewIntDatum(1)}, - nil, - false, - false, + []types.Datum{types.NewIntDatum(233)}, + mutation{key: mockRowKey233, value: mockValue233}, + true, }, - { // no corresponding mutation + { // mismatching mutation []*model.ColumnInfo{ { ID: 101, @@ -125,60 +121,15 @@ func TestCheckRowAdditionConsistency(t *testing.T) { }, }, []types.Datum{types.NewIntDatum(1)}, - fakeMutations, - false, + fakeRowInsertion, false, }, - { - []*model.ColumnInfo{ - { - ID: 101, - Offset: 0, - FieldType: *types.NewFieldType(mysql.TypeShort), - }, - }, - []types.Datum{types.NewIntDatum(1)}, - fakeMutations, - true, - true, - }, { // no input row []*model.ColumnInfo{}, nil, - fakeMutations, - true, + fakeRowInsertion, true, }, - { // duplicated mutation - []*model.ColumnInfo{ - { - ID: 101, - Offset: 0, - FieldType: *types.NewFieldType(mysql.TypeShort), - }, - }, - []types.Datum{types.NewIntDatum(233)}, - []mutation{ - {key: mockRowKey233, value: mockValue233}, - }, - true, - false, - }, - { // different value - []*model.ColumnInfo{ - { - ID: 101, - Offset: 0, - FieldType: *types.NewFieldType(mysql.TypeInt24), - }, - }, - []types.Datum{types.NewIntDatum(1)}, - []mutation{ - {key: mockRowKey233, value: mockValue233}, - }, - false, - false, - }, { // invalid value []*model.ColumnInfo{ { @@ -188,29 +139,13 @@ func TestCheckRowAdditionConsistency(t *testing.T) { }, }, []types.Datum{types.NewIntDatum(233)}, - []mutation{ - {key: mockRowKey233, value: []byte{0, 1, 2, 3}}, - }, - false, + mutation{key: mockRowKey233, value: []byte{0, 1, 2, 3}}, false, }, } for caseID, data := range testData { - if data.autoEncode { - value, err := tablecodec.EncodeRow(sessVars.StmtCtx, data.inputRow, columnsToIDs(data.tableColumns), nil, nil, &rd) - require.Nil(t, err) - data.mutations = append(data.mutations, mutation{key: mockRowKey, value: value}) - } - err := checkRowAdditionConsistency(sessVars, data.tableColumns, data.inputRow, data.mutations) + err := checkRowInsertionConsistency(sessVars, data.tableColumns, data.rowToInsert, data.rowInsertion) require.Equal(t, data.correct, err == nil, "case id = %v", caseID) } } - -func columnsToIDs(columns []*model.ColumnInfo) []int64 { - colIDs := make([]int64, 0, len(columns)) - for _, col := range columns { - colIDs = append(colIDs, col.ID) - } - return colIDs -} From 836f4e3e048b45c8f13049a7832088469712d436 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 14 Sep 2021 13:57:18 +0800 Subject: [PATCH 14/21] skip partitioned table Signed-off-by: ekexium --- table/tables/mutation_checker.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 723f3bf6f1856..3c07009398a05 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -45,7 +45,11 @@ type indexHelperInfo struct { // Namely, assume the database is consistent before, applying the mutations shouldn't break the consistency. // It aims at reducing bugs that will corrupt data, and preventing mistakes from spreading if possible. // -// Assume the set of row values changes from V1 to V2. We check +// The check doesn't work and just returns nil when: +// (1) the table is partitioned +// (2) new collation is enabled and restored data is needed +// +// Assume the set of row values changes from V1 to V2, we check // (1) V2 - V1 = {added indices} // (2) V1 - V2 = {deleted indices} // From 5eeb405691ab1a7dbc2d0edcd40333885aed166c Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 14 Sep 2021 20:59:45 +0800 Subject: [PATCH 15/21] save columnMap in stmtctx --- sessionctx/stmtctx/stmtctx.go | 3 +++ sessionctx/variable/session.go | 2 +- table/tables/mutation_checker.go | 29 ++++++++++++++++++++------- table/tables/mutation_checker_test.go | 18 ++++++++--------- 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 0689c4551b197..4ce9ef7a1b7ff 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -186,6 +186,9 @@ type StatementContext struct { OptimInfo map[int]string // InVerboseExplain indicates the statement is "explain format='verbose' ...". InVerboseExplain bool + + // columnMap maps tableID -> ColID -> ColumnInfo. It saves redundant computations when checking data consistency. + ColumnMap map[int64]map[int64]*model.ColumnInfo } // StmtHints are SessionVars related sql hints. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0c6bd9c718851..af8a7d5bf900d 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -947,7 +947,7 @@ type SessionVars struct { // MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash. MPPStoreFailTTL string - // cached is used to optimze the object allocation. + // cached is used to optimize the object allocation. cached struct { curr int8 data [2]stmtctx.StatementContext diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 3c07009398a05..c153c66794fd5 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -49,6 +49,8 @@ type indexHelperInfo struct { // (1) the table is partitioned // (2) new collation is enabled and restored data is needed // +// How it works: +// // Assume the set of row values changes from V1 to V2, we check // (1) V2 - V1 = {added indices} // (2) V1 - V2 = {deleted indices} @@ -71,8 +73,9 @@ func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, if err != nil { return errors.Trace(err) } + if rowToInsert != nil { - if err := checkRowInsertionConsistency(sessVars, t.Meta().Columns, rowToInsert, rowInsertion); err != nil { + if err := checkRowInsertionConsistency(sessVars, getOrBuildColumnMap(sessVars.StmtCtx, t), rowToInsert, rowInsertion); err != nil { return errors.Trace(err) } } @@ -83,6 +86,23 @@ func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, return nil } +// getOrBuildColumnMap tries to get the column map from stmt ctx. If there isn't one, it builds one and stores it. +// It saves redundant computations of the map. +func getOrBuildColumnMap(sc *stmtctx.StatementContext, t *TableCommon) map[int64]*model.ColumnInfo { + if sc.ColumnMap == nil { + sc.ColumnMap = make(map[int64]map[int64]*model.ColumnInfo) + } + columnMap, ok := sc.ColumnMap[t.tableID] + if !ok { + columnMap = make(map[int64]*model.ColumnInfo) + for _, col := range t.Meta().Columns { + columnMap[col.ID] = col + } + sc.ColumnMap[t.tableID] = columnMap + } + return columnMap +} + // checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, dataAdded []types.Datum, dataRemoved []types.Datum, indexMutations []mutation) error { @@ -143,17 +163,12 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars // checkRowInsertionConsistency checks whether the values of row mutations are consistent with the expected ones // We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) -func checkRowInsertionConsistency(sessVars *variable.SessionVars, tableColumns []*model.ColumnInfo, rowToInsert []types.Datum, rowInsertion mutation) error { +func checkRowInsertionConsistency(sessVars *variable.SessionVars, columnMap map[int64]*model.ColumnInfo, rowToInsert []types.Datum, rowInsertion mutation) error { if rowToInsert == nil { // it's a deletion return nil } - columnMap := make(map[int64]*model.ColumnInfo) - for _, col := range tableColumns { - columnMap[col.ID] = col - } - if err := checkRowMutationWithData(sessVars, rowInsertion.value, rowToInsert, columnMap); err != nil { return errors.Trace(err) } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 0ce8e7c8d6ba3..48e7745f92fa7 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -93,7 +93,7 @@ func TestCheckRowInsertionConsistency(t *testing.T) { fakeRowInsertion := mutation{key: []byte{1, 1}, value: []byte{1, 1, 1}} type caseData struct { - tableColumns []*model.ColumnInfo + columnMap map[int64]*model.ColumnInfo rowToInsert []types.Datum rowInsertion mutation correct bool @@ -101,8 +101,8 @@ func TestCheckRowInsertionConsistency(t *testing.T) { testData := []caseData{ { // expected correct behavior - []*model.ColumnInfo{ - { + map[int64]*model.ColumnInfo{ + 101: { ID: 101, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeShort), @@ -113,8 +113,8 @@ func TestCheckRowInsertionConsistency(t *testing.T) { true, }, { // mismatching mutation - []*model.ColumnInfo{ - { + map[int64]*model.ColumnInfo{ + 101: { ID: 101, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeShort), @@ -125,14 +125,14 @@ func TestCheckRowInsertionConsistency(t *testing.T) { false, }, { // no input row - []*model.ColumnInfo{}, + map[int64]*model.ColumnInfo{}, nil, fakeRowInsertion, true, }, { // invalid value - []*model.ColumnInfo{ - { + map[int64]*model.ColumnInfo{ + 101: { ID: 101, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeShort), @@ -145,7 +145,7 @@ func TestCheckRowInsertionConsistency(t *testing.T) { } for caseID, data := range testData { - err := checkRowInsertionConsistency(sessVars, data.tableColumns, data.rowToInsert, data.rowInsertion) + err := checkRowInsertionConsistency(sessVars, data.columnMap, data.rowToInsert, data.rowInsertion) require.Equal(t, data.correct, err == nil, "case id = %v", caseID) } } From 6431cbec3fb80763afca578c49f0ef9d83a2d708 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 15 Sep 2021 13:22:28 +0800 Subject: [PATCH 16/21] skip when sh == 0 Some implementations of MemBuffer doesn't support staging. We don't care about them for now --- table/tables/mutation_checker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index c153c66794fd5..fd1c73a21ea8b 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -66,8 +66,8 @@ func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, } sc := sessVars.StmtCtx if sh == 0 { - logutil.BgLogger().Error("No membuffer stage when checking index consistency") - return errors.New("No membuffer stage when checking index consistency") + // some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv + return nil } indexMutations, rowInsertion, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) if err != nil { From 2ee348cda0432db18e247d476fceea8934fcde72 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 16 Sep 2021 16:09:21 +0800 Subject: [PATCH 17/21] reuse a slice in a loop --- table/tables/mutation_checker.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index fd1c73a21ea8b..95103ee24bb30 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -105,7 +105,7 @@ func getOrBuildColumnMap(sc *stmtctx.StatementContext, t *TableCommon) map[int64 // checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, - dataAdded []types.Datum, dataRemoved []types.Datum, indexMutations []mutation) error { + rowToInsert, rowToRemove []types.Datum, indexMutations []mutation) error { indexIDMap := make(map[int64]indexHelperInfo) for _, index := range t.indices { if index.Meta().Primary && t.meta.IsCommonHandle { @@ -117,6 +117,7 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars } } + var indexData []types.Datum for _, m := range indexMutations { _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) if err != nil { @@ -139,7 +140,14 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars if err != nil { return errors.Trace(err) } - indexData := make([]types.Datum, 0) + + // reuse the underlying memory, save an allocation + if indexData == nil { + indexData = make([]types.Datum, 0, len(decodedIndexValues)) + } else { + indexData = indexData[:0] + } + for i, v := range decodedIndexValues { fieldType := &t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].FieldType datum, err := tablecodec.DecodeColumnValue(v, fieldType, sessVars.Location()) @@ -150,9 +158,9 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars } if len(m.value) == 0 { - err = compareIndexData(sc, t.Columns, indexData, dataRemoved, indexHelperInfo.indexInfo) + err = compareIndexData(sc, t.Columns, indexData, rowToRemove, indexHelperInfo.indexInfo) } else { - err = compareIndexData(sc, t.Columns, indexData, dataAdded, indexHelperInfo.indexInfo) + err = compareIndexData(sc, t.Columns, indexData, rowToInsert, indexHelperInfo.indexInfo) } if err != nil { return errors.Trace(err) From e448d4952e7c2fba4b4a02466a50b70e9aa4056d Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 16 Sep 2021 16:14:43 +0800 Subject: [PATCH 18/21] save reusable maps in stmtctx --- sessionctx/stmtctx/stmtctx.go | 5 +- table/tables/mutation_checker.go | 117 +++++++++++++------------- table/tables/mutation_checker_test.go | 21 +++-- 3 files changed, 78 insertions(+), 65 deletions(-) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 4ce9ef7a1b7ff..1f9f5d68864ea 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -187,8 +187,9 @@ type StatementContext struct { // InVerboseExplain indicates the statement is "explain format='verbose' ...". InVerboseExplain bool - // columnMap maps tableID -> ColID -> ColumnInfo. It saves redundant computations when checking data consistency. - ColumnMap map[int64]map[int64]*model.ColumnInfo + // TableToColumnMaps is a map from tableID to a series of maps. The maps are needed when checking data consistency. + // Save them here to reduce redundant computations. + TableToColumnMaps map[int64]interface{} } // StmtHints are SessionVars related sql hints. diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 95103ee24bb30..2a127faef5b93 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -36,9 +36,11 @@ type mutation struct { value []byte } -type indexHelperInfo struct { - indexInfo *model.IndexInfo - rowColInfos []rowcodec.ColInfo +type columnMaps struct { + ColumnIDToInfo map[int64]*model.ColumnInfo + ColumnIDToFieldType map[int64]*types.FieldType + IndexIDToInfo map[int64]*model.IndexInfo + IndexIDToRowColInfos map[int64][]rowcodec.ColInfo } // CheckIndexConsistency checks whether the given set of mutations corresponding to a single row is consistent. @@ -74,48 +76,23 @@ func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, return errors.Trace(err) } + columnMaps := getOrBuildColumnMaps(sessVars.StmtCtx, t) + if rowToInsert != nil { - if err := checkRowInsertionConsistency(sessVars, getOrBuildColumnMap(sessVars.StmtCtx, t), rowToInsert, rowInsertion); err != nil { + if err := checkRowInsertionConsistency(sessVars, rowToInsert, rowInsertion, columnMaps.ColumnIDToInfo, columnMaps.ColumnIDToFieldType); err != nil { return errors.Trace(err) } } - if err := checkIndexKeys(sc, sessVars, t, rowToInsert, rowToRemove, indexMutations); err != nil { + if err := checkIndexKeys(sc, sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos); err != nil { return errors.Trace(err) } // TODO: check whether handles match in index and row mutations return nil } -// getOrBuildColumnMap tries to get the column map from stmt ctx. If there isn't one, it builds one and stores it. -// It saves redundant computations of the map. -func getOrBuildColumnMap(sc *stmtctx.StatementContext, t *TableCommon) map[int64]*model.ColumnInfo { - if sc.ColumnMap == nil { - sc.ColumnMap = make(map[int64]map[int64]*model.ColumnInfo) - } - columnMap, ok := sc.ColumnMap[t.tableID] - if !ok { - columnMap = make(map[int64]*model.ColumnInfo) - for _, col := range t.Meta().Columns { - columnMap[col.ID] = col - } - sc.ColumnMap[t.tableID] = columnMap - } - return columnMap -} - // checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. -func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, - rowToInsert, rowToRemove []types.Datum, indexMutations []mutation) error { - indexIDMap := make(map[int64]indexHelperInfo) - for _, index := range t.indices { - if index.Meta().Primary && t.meta.IsCommonHandle { - continue - } - indexIDMap[index.Meta().ID] = indexHelperInfo{ - index.Meta(), - BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()), - } - } +func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, + indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, indexIDToRowColInfos map[int64][]rowcodec.ColInfo) error { var indexData []types.Datum for _, m := range indexMutations { @@ -125,18 +102,22 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars return errors.Trace(err) } - indexHelperInfo, ok := indexIDMap[indexID] + indexInfo, ok := indexIDToInfo[indexID] + if !ok { + return errors.New("index not found") + } + rowColInfos, ok := indexIDToRowColInfos[indexID] if !ok { return errors.New("index not found") } // when we cannot decode the key to get the original value - if len(m.value) == 0 && NeedRestoredData(indexHelperInfo.indexInfo.Columns, t.Meta().Columns) { + if len(m.value) == 0 && NeedRestoredData(indexInfo.Columns, t.Meta().Columns) { continue } - decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexHelperInfo.indexInfo.Columns), - tablecodec.HandleNotNeeded, indexHelperInfo.rowColInfos) + decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexInfo.Columns), + tablecodec.HandleNotNeeded, rowColInfos) if err != nil { return errors.Trace(err) } @@ -149,7 +130,7 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars } for i, v := range decodedIndexValues { - fieldType := &t.Columns[indexHelperInfo.indexInfo.Columns[i].Offset].FieldType + fieldType := &t.Columns[indexInfo.Columns[i].Offset].FieldType datum, err := tablecodec.DecodeColumnValue(v, fieldType, sessVars.Location()) if err != nil { return errors.Trace(err) @@ -158,9 +139,9 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars } if len(m.value) == 0 { - err = compareIndexData(sc, t.Columns, indexData, rowToRemove, indexHelperInfo.indexInfo) + err = compareIndexData(sc, t.Columns, indexData, rowToRemove, indexInfo) } else { - err = compareIndexData(sc, t.Columns, indexData, rowToInsert, indexHelperInfo.indexInfo) + err = compareIndexData(sc, t.Columns, indexData, rowToInsert, indexInfo) } if err != nil { return errors.Trace(err) @@ -171,26 +152,14 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars // checkRowInsertionConsistency checks whether the values of row mutations are consistent with the expected ones // We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) -func checkRowInsertionConsistency(sessVars *variable.SessionVars, columnMap map[int64]*model.ColumnInfo, rowToInsert []types.Datum, rowInsertion mutation) error { +func checkRowInsertionConsistency(sessVars *variable.SessionVars, rowToInsert []types.Datum, rowInsertion mutation, + columnIDToInfo map[int64]*model.ColumnInfo, columnIDToFieldType map[int64]*types.FieldType) error { if rowToInsert == nil { // it's a deletion return nil } - if err := checkRowMutationWithData(sessVars, rowInsertion.value, rowToInsert, columnMap); err != nil { - return errors.Trace(err) - } - return nil -} - -// checkRowMutationWithData checks if the given row mutation is consistent with the expected one -// precondition: expectedData contains all fields in order -func checkRowMutationWithData(sessVars *variable.SessionVars, mutationValue []byte, expectedData []types.Datum, columnMap map[int64]*model.ColumnInfo) error { - columnFieldMap := make(map[int64]*types.FieldType) - for id, col := range columnMap { - columnFieldMap[id] = &col.FieldType - } - decodedData, err := tablecodec.DecodeRowToDatumMap(mutationValue, columnFieldMap, sessVars.Location()) + decodedData, err := tablecodec.DecodeRowToDatumMap(rowInsertion.value, columnIDToFieldType, sessVars.Location()) if err != nil { return errors.Trace(err) } @@ -199,7 +168,7 @@ func checkRowMutationWithData(sessVars *variable.SessionVars, mutationValue []by // Instead, we check that decoded index values are consistent with the input row. for columnID, decodedDatum := range decodedData { - inputDatum := expectedData[columnMap[columnID].Offset] + inputDatum := rowToInsert[columnIDToInfo[columnID].Offset] cmp, err := decodedDatum.CompareDatum(sessVars.StmtCtx, &inputDatum) if err != nil { return errors.Trace(err) @@ -210,7 +179,6 @@ func checkRowMutationWithData(sessVars *variable.SessionVars, mutationValue []by return errors.Errorf("inconsistent row mutation, row datum = {%v}, input datum = {%v}", decodedDatum.String(), inputDatum.String()) } } - return nil } @@ -268,3 +236,36 @@ func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexD } return nil } + +// getOrBuildColumnMaps tries to get the columnMaps from stmt ctx. If there isn't one, it builds one and stores it. +// It saves redundant computations of the map. +func getOrBuildColumnMaps(sc *stmtctx.StatementContext, t *TableCommon) columnMaps { + if sc.TableToColumnMaps == nil { + sc.TableToColumnMaps = make(map[int64]interface{}) + } + originalMaps, ok := sc.TableToColumnMaps[t.tableID] + if !ok { + maps := columnMaps{ + make(map[int64]*model.ColumnInfo, len(t.Meta().Columns)), + make(map[int64]*types.FieldType, len(t.Meta().Columns)), + make(map[int64]*model.IndexInfo, len(t.Indices())), + make(map[int64][]rowcodec.ColInfo, len(t.Indices())), + } + + for _, col := range t.Meta().Columns { + maps.ColumnIDToInfo[col.ID] = col + maps.ColumnIDToFieldType[col.ID] = &col.FieldType + } + for _, index := range t.Indices() { + if index.Meta().Primary && t.meta.IsCommonHandle { + continue + } + maps.IndexIDToInfo[index.Meta().ID] = index.Meta() + maps.IndexIDToRowColInfos[index.Meta().ID] = BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()) + } + + sc.TableToColumnMaps[t.tableID] = maps + return maps + } + return originalMaps.(columnMaps) +} diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 48e7745f92fa7..b88ff655a6ebe 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -93,10 +93,11 @@ func TestCheckRowInsertionConsistency(t *testing.T) { fakeRowInsertion := mutation{key: []byte{1, 1}, value: []byte{1, 1, 1}} type caseData struct { - columnMap map[int64]*model.ColumnInfo - rowToInsert []types.Datum - rowInsertion mutation - correct bool + columnIDToInfo map[int64]*model.ColumnInfo + columnIDToFieldType map[int64]*types.FieldType + rowToInsert []types.Datum + rowInsertion mutation + correct bool } testData := []caseData{ @@ -108,6 +109,9 @@ func TestCheckRowInsertionConsistency(t *testing.T) { FieldType: *types.NewFieldType(mysql.TypeShort), }, }, + map[int64]*types.FieldType{ + 101: types.NewFieldType(mysql.TypeShort), + }, []types.Datum{types.NewIntDatum(233)}, mutation{key: mockRowKey233, value: mockValue233}, true, @@ -120,12 +124,16 @@ func TestCheckRowInsertionConsistency(t *testing.T) { FieldType: *types.NewFieldType(mysql.TypeShort), }, }, + map[int64]*types.FieldType{ + 101: types.NewFieldType(mysql.TypeShort), + }, []types.Datum{types.NewIntDatum(1)}, fakeRowInsertion, false, }, { // no input row map[int64]*model.ColumnInfo{}, + map[int64]*types.FieldType{}, nil, fakeRowInsertion, true, @@ -138,6 +146,9 @@ func TestCheckRowInsertionConsistency(t *testing.T) { FieldType: *types.NewFieldType(mysql.TypeShort), }, }, + map[int64]*types.FieldType{ + 101: types.NewFieldType(mysql.TypeShort), + }, []types.Datum{types.NewIntDatum(233)}, mutation{key: mockRowKey233, value: []byte{0, 1, 2, 3}}, false, @@ -145,7 +156,7 @@ func TestCheckRowInsertionConsistency(t *testing.T) { } for caseID, data := range testData { - err := checkRowInsertionConsistency(sessVars, data.columnMap, data.rowToInsert, data.rowInsertion) + err := checkRowInsertionConsistency(sessVars, data.rowToInsert, data.rowInsertion, data.columnIDToInfo, data.columnIDToFieldType) require.Equal(t, data.correct, err == nil, "case id = %v", caseID) } } From 220f0f31a607cdbc5bf14e45dafaeb5f64a4a112 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 16 Sep 2021 16:38:57 +0800 Subject: [PATCH 19/21] save reusable maps in txn option --- kv/option.go | 3 +++ sessionctx/stmtctx/stmtctx.go | 4 ---- table/tables/mutation_checker.go | 21 +++++++++++---------- table/tables/tables.go | 6 +++--- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/kv/option.go b/kv/option.go index 7692c2f74a4a0..1b2f12dacd5a1 100644 --- a/kv/option.go +++ b/kv/option.go @@ -70,6 +70,9 @@ const ( // 2. Ranges of Retrievers do not intersect each other. // 3. Retrievers are sorted by range. SortedCustomRetrievers + // TableToColumnMaps is a map from tableID to a series of maps. The maps are needed when checking data consistency. + // Save them here to reduce redundant computations. + TableToColumnMaps ) // ReplicaReadType is the type of replica to read data from diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 1f9f5d68864ea..0689c4551b197 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -186,10 +186,6 @@ type StatementContext struct { OptimInfo map[int]string // InVerboseExplain indicates the statement is "explain format='verbose' ...". InVerboseExplain bool - - // TableToColumnMaps is a map from tableID to a series of maps. The maps are needed when checking data consistency. - // Save them here to reduce redundant computations. - TableToColumnMaps map[int64]interface{} } // StmtHints are SessionVars related sql hints. diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 2a127faef5b93..121c98ad71cf6 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -61,7 +61,7 @@ type columnMaps struct { // (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value // (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate // the mutations, thus ignored. -func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, +func CheckIndexConsistency(txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { if t.Meta().GetPartitionInfo() != nil { return nil @@ -76,7 +76,7 @@ func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, return errors.Trace(err) } - columnMaps := getOrBuildColumnMaps(sessVars.StmtCtx, t) + columnMaps := getOrBuildColumnMaps(txn, t) if rowToInsert != nil { if err := checkRowInsertionConsistency(sessVars, rowToInsert, rowInsertion, columnMaps.ColumnIDToInfo, columnMaps.ColumnIDToFieldType); err != nil { @@ -239,13 +239,14 @@ func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexD // getOrBuildColumnMaps tries to get the columnMaps from stmt ctx. If there isn't one, it builds one and stores it. // It saves redundant computations of the map. -func getOrBuildColumnMaps(sc *stmtctx.StatementContext, t *TableCommon) columnMaps { - if sc.TableToColumnMaps == nil { - sc.TableToColumnMaps = make(map[int64]interface{}) +func getOrBuildColumnMaps(txn kv.Transaction, t *TableCommon) columnMaps { + tableMaps, ok := txn.GetOption(kv.TableToColumnMaps).(map[int64]columnMaps) + if !ok || tableMaps == nil { + tableMaps = make(map[int64]columnMaps) } - originalMaps, ok := sc.TableToColumnMaps[t.tableID] + maps, ok := tableMaps[t.tableID] if !ok { - maps := columnMaps{ + maps = columnMaps{ make(map[int64]*model.ColumnInfo, len(t.Meta().Columns)), make(map[int64]*types.FieldType, len(t.Meta().Columns)), make(map[int64]*model.IndexInfo, len(t.Indices())), @@ -264,8 +265,8 @@ func getOrBuildColumnMaps(sc *stmtctx.StatementContext, t *TableCommon) columnMa maps.IndexIDToRowColInfos[index.Meta().ID] = BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()) } - sc.TableToColumnMaps[t.tableID] = maps - return maps + tableMaps[t.tableID] = maps + txn.SetOption(kv.TableToColumnMaps, tableMaps) } - return originalMaps.(columnMaps) + return maps } diff --git a/table/tables/tables.go b/table/tables/tables.go index e141d8cc812db..c3537f5173f7c 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -420,7 +420,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if err = memBuffer.Set(key, value); err != nil { return err } - if err = CheckIndexConsistency(sessVars, t, newData, oldData, memBuffer, sh); err != nil { + if err = CheckIndexConsistency(txn, sessVars, t, newData, oldData, memBuffer, sh); err != nil { return errors.Trace(err) } memBuffer.Release(sh) @@ -839,7 +839,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return h, err } - if err = CheckIndexConsistency(sessVars, t, r, nil, memBuffer, sh); err != nil { + if err = CheckIndexConsistency(txn, sessVars, t, r, nil, memBuffer, sh); err != nil { return nil, errors.Trace(err) } @@ -1097,7 +1097,7 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type sessVars := ctx.GetSessionVars() sc := sessVars.StmtCtx - if err = CheckIndexConsistency(sessVars, t, nil, r, memBuffer, sh); err != nil { + if err = CheckIndexConsistency(txn, sessVars, t, nil, r, memBuffer, sh); err != nil { return errors.Trace(err) } memBuffer.Release(sh) From b90202c2b1fb0cd70507cae558c31328c380a535 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 21 Sep 2021 22:29:35 +0800 Subject: [PATCH 20/21] add unit test for checkIndexKeys --- table/tables/mutation_checker.go | 106 +++++++++++------ table/tables/mutation_checker_test.go | 157 +++++++++++++++++++++++++- 2 files changed, 225 insertions(+), 38 deletions(-) diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 121c98ad71cf6..c127945652389 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -61,12 +61,13 @@ type columnMaps struct { // (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value // (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate // the mutations, thus ignored. -func CheckIndexConsistency(txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, - rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { +func CheckIndexConsistency( + txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, + rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, +) error { if t.Meta().GetPartitionInfo() != nil { return nil } - sc := sessVars.StmtCtx if sh == 0 { // some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv return nil @@ -76,29 +77,34 @@ func CheckIndexConsistency(txn kv.Transaction, sessVars *variable.SessionVars, t return errors.Trace(err) } - columnMaps := getOrBuildColumnMaps(txn, t) + columnMaps := getColumnMaps(txn, t) if rowToInsert != nil { - if err := checkRowInsertionConsistency(sessVars, rowToInsert, rowInsertion, columnMaps.ColumnIDToInfo, columnMaps.ColumnIDToFieldType); err != nil { + if err := checkRowInsertionConsistency( + sessVars, rowToInsert, rowInsertion, columnMaps.ColumnIDToInfo, columnMaps.ColumnIDToFieldType, + ); err != nil { return errors.Trace(err) } } - if err := checkIndexKeys(sc, sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos); err != nil { + if err := checkIndexKeys( + sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos, + ); err != nil { return errors.Trace(err) } - // TODO: check whether handles match in index and row mutations return nil } // checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. -func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, - indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, indexIDToRowColInfos map[int64][]rowcodec.ColInfo) error { +func checkIndexKeys( + sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, + indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, + indexIDToRowColInfos map[int64][]rowcodec.ColInfo, +) error { var indexData []types.Datum for _, m := range indexMutations { _, indexID, _, err := tablecodec.DecodeIndexKey(m.key) if err != nil { - //FIXME: continue or return error? return errors.Trace(err) } @@ -116,8 +122,9 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars continue } - decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexInfo.Columns), - tablecodec.HandleNotNeeded, rowColInfos) + decodedIndexValues, err := tablecodec.DecodeIndexKV( + m.key, m.value, len(indexInfo.Columns), tablecodec.HandleNotNeeded, rowColInfos, + ) if err != nil { return errors.Trace(err) } @@ -139,9 +146,9 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars } if len(m.value) == 0 { - err = compareIndexData(sc, t.Columns, indexData, rowToRemove, indexInfo) + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo) } else { - err = compareIndexData(sc, t.Columns, indexData, rowToInsert, indexInfo) + err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo) } if err != nil { return errors.Trace(err) @@ -152,8 +159,10 @@ func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars // checkRowInsertionConsistency checks whether the values of row mutations are consistent with the expected ones // We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) -func checkRowInsertionConsistency(sessVars *variable.SessionVars, rowToInsert []types.Datum, rowInsertion mutation, - columnIDToInfo map[int64]*model.ColumnInfo, columnIDToFieldType map[int64]*types.FieldType) error { +func checkRowInsertionConsistency( + sessVars *variable.SessionVars, rowToInsert []types.Datum, rowInsertion mutation, + columnIDToInfo map[int64]*model.ColumnInfo, columnIDToFieldType map[int64]*types.FieldType, +) error { if rowToInsert == nil { // it's a deletion return nil @@ -174,9 +183,14 @@ func checkRowInsertionConsistency(sessVars *variable.SessionVars, rowToInsert [] return errors.Trace(err) } if cmp != 0 { - logutil.BgLogger().Error("inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), - zap.String("input datum", inputDatum.String())) - return errors.Errorf("inconsistent row mutation, row datum = {%v}, input datum = {%v}", decodedDatum.String(), inputDatum.String()) + logutil.BgLogger().Error( + "inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), + zap.String("input datum", inputDatum.String()), + ) + return errors.Errorf( + "inconsistent row mutation, row datum = {%v}, input datum = {%v}", decodedDatum.String(), + inputDatum.String(), + ) } } return nil @@ -186,7 +200,9 @@ func checkRowInsertionConsistency(sessVars *variable.SessionVars, rowToInsert [] // It returns: (1) all index mutations (2) the only row insertion // If there are no row insertions, the 2nd returned value is nil // If there are multiple row insertions, an error is returned -func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) ([]mutation, mutation, error) { +func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) ( + []mutation, mutation, error, +) { indexMutations := make([]mutation, 0) var rowInsertion mutation var err error @@ -199,7 +215,9 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer if rowInsertion.key == nil { rowInsertion = m } else { - err = errors.Errorf("multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m) + err = errors.Errorf( + "multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m, + ) } } } else { @@ -213,14 +231,20 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer // compareIndexData compares the decoded index data with the input data. // Returns error if the index data is not a subset of the input data. -func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo) error { +func compareIndexData( + sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo, +) error { for i, decodedMutationDatum := range indexData { expectedDatum := input[indexInfo.Columns[i].Offset] - tablecodec.TruncateIndexValue(&expectedDatum, indexInfo.Columns[i], - cols[indexInfo.Columns[i].Offset].ColumnInfo) - tablecodec.TruncateIndexValue(&decodedMutationDatum, indexInfo.Columns[i], - cols[indexInfo.Columns[i].Offset].ColumnInfo) + tablecodec.TruncateIndexValue( + &expectedDatum, indexInfo.Columns[i], + cols[indexInfo.Columns[i].Offset].ColumnInfo, + ) + tablecodec.TruncateIndexValue( + &decodedMutationDatum, indexInfo.Columns[i], + cols[indexInfo.Columns[i].Offset].ColumnInfo, + ) comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) if err != nil { @@ -228,19 +252,37 @@ func compareIndexData(sc *stmtctx.StatementContext, cols []*table.Column, indexD } if comparison != 0 { - logutil.BgLogger().Error("inconsistent index values", + logutil.BgLogger().Error( + "inconsistent index values", zap.String("truncated mutation datum", fmt.Sprintf("%v", decodedMutationDatum)), - zap.String("truncated expected datum", fmt.Sprintf("%v", expectedDatum))) + zap.String("truncated expected datum", fmt.Sprintf("%v", expectedDatum)), + ) return errors.New("inconsistent index values") } } return nil } -// getOrBuildColumnMaps tries to get the columnMaps from stmt ctx. If there isn't one, it builds one and stores it. +// getColumnMaps tries to get the columnMaps from transaction options. If there isn't one, it builds one and stores it. // It saves redundant computations of the map. -func getOrBuildColumnMaps(txn kv.Transaction, t *TableCommon) columnMaps { - tableMaps, ok := txn.GetOption(kv.TableToColumnMaps).(map[int64]columnMaps) +func getColumnMaps(txn kv.Transaction, t *TableCommon) columnMaps { + getter := func() (map[int64]columnMaps, bool) { + m, ok := txn.GetOption(kv.TableToColumnMaps).(map[int64]columnMaps) + return m, ok + } + setter := func(maps map[int64]columnMaps) { + txn.SetOption(kv.TableToColumnMaps, maps) + } + columnMaps := getOrBuildColumnMaps(getter, setter, t) + return columnMaps +} + +// getOrBuildColumnMaps tries to get the columnMaps from some place. If there isn't one, it builds one and stores it. +// It saves redundant computations of the map. +func getOrBuildColumnMaps( + getter func() (map[int64]columnMaps, bool), setter func(map[int64]columnMaps), t *TableCommon, +) columnMaps { + tableMaps, ok := getter() if !ok || tableMaps == nil { tableMaps = make(map[int64]columnMaps) } @@ -266,7 +308,7 @@ func getOrBuildColumnMaps(txn kv.Transaction, t *TableCommon) columnMaps { } tableMaps[t.tableID] = maps - txn.SetOption(kv.TableToColumnMaps, tableMaps) + setter(tableMaps) } return maps } diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index b88ff655a6ebe..bfd5c32e1f7f6 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -16,6 +16,7 @@ package tables import ( "testing" + "time" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/rowcodec" "github.com/stretchr/testify/require" ) @@ -88,7 +90,9 @@ func TestCheckRowInsertionConsistency(t *testing.T) { // mocked data mockRowKey233 := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(233)) - mockValue233, err := tablecodec.EncodeRow(sessVars.StmtCtx, []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd) + mockValue233, err := tablecodec.EncodeRow( + sessVars.StmtCtx, []types.Datum{types.NewIntDatum(233)}, []int64{101}, nil, nil, &rd, + ) require.Nil(t, err) fakeRowInsertion := mutation{key: []byte{1, 1}, value: []byte{1, 1, 1}} @@ -101,7 +105,8 @@ func TestCheckRowInsertionConsistency(t *testing.T) { } testData := []caseData{ - { // expected correct behavior + { + // expected correct behavior map[int64]*model.ColumnInfo{ 101: { ID: 101, @@ -116,7 +121,8 @@ func TestCheckRowInsertionConsistency(t *testing.T) { mutation{key: mockRowKey233, value: mockValue233}, true, }, - { // mismatching mutation + { + // mismatching mutation map[int64]*model.ColumnInfo{ 101: { ID: 101, @@ -131,14 +137,16 @@ func TestCheckRowInsertionConsistency(t *testing.T) { fakeRowInsertion, false, }, - { // no input row + { + // no input row map[int64]*model.ColumnInfo{}, map[int64]*types.FieldType{}, nil, fakeRowInsertion, true, }, - { // invalid value + { + // invalid value map[int64]*model.ColumnInfo{ 101: { ID: 101, @@ -156,7 +164,144 @@ func TestCheckRowInsertionConsistency(t *testing.T) { } for caseID, data := range testData { - err := checkRowInsertionConsistency(sessVars, data.rowToInsert, data.rowInsertion, data.columnIDToInfo, data.columnIDToFieldType) + err := checkRowInsertionConsistency( + sessVars, data.rowToInsert, data.rowInsertion, data.columnIDToInfo, data.columnIDToFieldType, + ) require.Equal(t, data.correct, err == nil, "case id = %v", caseID) } } + +func TestCheckIndexKeys(t *testing.T) { + /* + dimensions of the domain of checkIndexKeys: + 1. location *2 + 2. table structure + (1) unique index/non-unique index *2 + (2) clustered index *2 + (3) string collation *2 + We don't test primary clustered index and int handle, since they should not have index mutations. + */ + + // cases + locations := []*time.Location{time.UTC, time.Local} + indexInfos := []*model.IndexInfo{ + { + ID: 1, + State: model.StatePublic, + Primary: false, + Unique: true, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Length: types.UnspecifiedLength, + }, + { + Offset: 0, + Length: types.UnspecifiedLength, + }, + }, + }, + { + ID: 2, + State: model.StatePublic, + Primary: false, + Unique: false, + Columns: []*model.IndexColumn{ + { + Offset: 1, + Length: types.UnspecifiedLength, + }, + { + Offset: 0, + Length: types.UnspecifiedLength, + }, + }, + }, + } + columnInfoSets := [][]*model.ColumnInfo{ + { + {ID: 1, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeString)}, + {ID: 2, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeDatetime)}, + }, + { + {ID: 1, Offset: 0, FieldType: *types.NewFieldTypeWithCollation(mysql.TypeString, "big5_chinese_ci", + types.UnspecifiedLength)}, + {ID: 2, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeDatetime)}, + }, + } + sessVars := variable.NewSessionVars() + + now := types.CurrentTime(mysql.TypeDatetime) + rowToInsert := []types.Datum{ + types.NewStringDatum("some string"), + types.NewTimeDatum(now), + } + anotherTime, err := now.Add(sessVars.StmtCtx, types.NewDuration(24, 0, 0, 0, 0)) + require.Nil(t, err) + rowToRemove := []types.Datum{ + types.NewStringDatum("old string"), + types.NewTimeDatum(anotherTime), + } + + getter := func() (map[int64]columnMaps, bool) { + return nil, false + } + setter := func(maps map[int64]columnMaps) {} + + // test + collate.SetNewCollationEnabledForTest(true) + for _, isCommonHandle := range []bool{true, false} { + for _, lc := range locations { + for _, columnInfos := range columnInfoSets { + sessVars.StmtCtx.TimeZone = lc + tableInfo := model.TableInfo{ + ID: 1, + Name: model.NewCIStr("t"), + Columns: columnInfos, + Indices: indexInfos, + PKIsHandle: false, + IsCommonHandle: isCommonHandle, + } + table := MockTableFromMeta(&tableInfo).(*TableCommon) + + for i, indexInfo := range indexInfos { + index := table.indices[i] + maps := getOrBuildColumnMaps(getter, setter, table) + insertionKey, insertionValue, err := buildKeyValue(index, rowToInsert, sessVars, tableInfo, indexInfo, table) + require.Nil(t, err) + deletionKey, _, err := buildKeyValue(index, rowToRemove, sessVars, tableInfo, indexInfo, table) + require.Nil(t, err) + indexMutations := []mutation{{key: insertionKey, value: insertionValue}, {key: deletionKey}} + err = checkIndexKeys( + sessVars, table, rowToInsert, rowToRemove, indexMutations, maps.IndexIDToInfo, + maps.IndexIDToRowColInfos, + ) + require.Nil(t, err) + } + } + } + } +} + +func buildKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *variable.SessionVars, + tableInfo model.TableInfo, indexInfo *model.IndexInfo, table *TableCommon) ([]byte, []byte, error) { + indexedValues, err := index.FetchValues(rowToInsert, nil) + if err != nil { + return nil, nil, err + } + key, distinct, err := tablecodec.GenIndexKey( + sessVars.StmtCtx, &tableInfo, indexInfo, 1, indexedValues, kv.IntHandle(1), nil, + ) + if err != nil { + return nil, nil, err + } + rsData := TryGetHandleRestoredDataWrapper(table, rowToInsert, nil, indexInfo) + value, err := tablecodec.GenIndexValuePortal( + sessVars.StmtCtx, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns), + distinct, false, indexedValues, kv.IntHandle(1), 0, rsData, + ) + if err != nil { + return nil, nil, err + } + return key, value, nil +} From 841044a7d95de8ecc14aa6cfc6a41ae220682e00 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 22 Sep 2021 14:05:29 +0800 Subject: [PATCH 21/21] address comments in test --- table/tables/mutation_checker_test.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index bfd5c32e1f7f6..ae41933f8522c 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -172,15 +172,13 @@ func TestCheckRowInsertionConsistency(t *testing.T) { } func TestCheckIndexKeys(t *testing.T) { - /* - dimensions of the domain of checkIndexKeys: - 1. location *2 - 2. table structure - (1) unique index/non-unique index *2 - (2) clustered index *2 - (3) string collation *2 - We don't test primary clustered index and int handle, since they should not have index mutations. - */ + // dimensions of the domain of checkIndexKeys: + // 1. location *2 + // 2. table structure + // (1) unique index/non-unique index *2 + // (2) clustered index *2 + // (3) string collation *2 + // We don't test primary clustered index and int handle, since they should not have index mutations. // cases locations := []*time.Location{time.UTC, time.Local} @@ -250,6 +248,7 @@ func TestCheckIndexKeys(t *testing.T) { // test collate.SetNewCollationEnabledForTest(true) + defer collate.SetNewCollationEnabledForTest(false) for _, isCommonHandle := range []bool{true, false} { for _, lc := range locations { for _, columnInfos := range columnInfoSets {