-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: Check mutations for single-row changes #27920
Merged
cfzjywxk
merged 21 commits into
pingcap:ft-data-inconsistency
from
ekexium:assertion-in-tables
Sep 22, 2021
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
aceb3d0
feat: check index key in AddRecord
ekexium d01995b
feat: check for AddRecord/UpdateRecord
ekexium 8a75382
ignore deletion of indices if NeedRestoredData
ekexium 11fa293
check values of row mutations
ekexium f5160e1
style: fix naming
ekexium be15c78
skip when sh == 0
ekexium dca9d75
check in RemoveRecord
ekexium ee5a801
modify license header
ekexium ada4dc5
test: add unit tests
ekexium 5c1483a
add lisence header
ekexium 4e72298
also truncate decoded mutation
ekexium 9e01dca
tidy up
ekexium 1aab79e
refactor according to comments
ekexium 836f4e3
skip partitioned table
ekexium 5eeb405
save columnMap in stmtctx
ekexium 6431cbe
skip when sh == 0
ekexium 2ee348c
reuse a slice in a loop
ekexium e448d49
save reusable maps in stmtctx
ekexium 220f0f3
save reusable maps in txn option
ekexium b90202c
add unit test for checkIndexKeys
ekexium 841044a
address comments in test
ekexium File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,314 @@ | ||
// Copyright 2021 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package tables | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/parser/model" | ||
"github.com/pingcap/tidb/kv" | ||
"github.com/pingcap/tidb/sessionctx/stmtctx" | ||
"github.com/pingcap/tidb/sessionctx/variable" | ||
"github.com/pingcap/tidb/table" | ||
"github.com/pingcap/tidb/tablecodec" | ||
"github.com/pingcap/tidb/types" | ||
"github.com/pingcap/tidb/util/logutil" | ||
"github.com/pingcap/tidb/util/rowcodec" | ||
"go.uber.org/zap" | ||
) | ||
|
||
type mutation struct { | ||
key kv.Key | ||
flags kv.KeyFlags | ||
value []byte | ||
} | ||
|
||
type columnMaps struct { | ||
ColumnIDToInfo map[int64]*model.ColumnInfo | ||
ColumnIDToFieldType map[int64]*types.FieldType | ||
IndexIDToInfo map[int64]*model.IndexInfo | ||
IndexIDToRowColInfos map[int64][]rowcodec.ColInfo | ||
} | ||
|
||
// CheckIndexConsistency checks whether the given set of mutations corresponding to a single row is consistent. | ||
// Namely, assume the database is consistent before, applying the mutations shouldn't break the consistency. | ||
// It aims at reducing bugs that will corrupt data, and preventing mistakes from spreading if possible. | ||
// | ||
// The check doesn't work and just returns nil when: | ||
// (1) the table is partitioned | ||
// (2) new collation is enabled and restored data is needed | ||
// | ||
// How it works: | ||
// | ||
// Assume the set of row values changes from V1 to V2, we check | ||
// (1) V2 - V1 = {added indices} | ||
// (2) V1 - V2 = {deleted indices} | ||
// | ||
// To check (1), we need | ||
// (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value | ||
// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate | ||
// the mutations, thus ignored. | ||
func CheckIndexConsistency( | ||
txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon, | ||
rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle, | ||
) error { | ||
if t.Meta().GetPartitionInfo() != nil { | ||
return nil | ||
} | ||
if sh == 0 { | ||
// some implementations of MemBuffer doesn't support staging, e.g. that in br/pkg/lightning/backend/kv | ||
return nil | ||
} | ||
indexMutations, rowInsertion, err := collectTableMutationsFromBufferStage(t, memBuffer, sh) | ||
if err != nil { | ||
return errors.Trace(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The errors are temporary. They might change after #27388 is merged |
||
} | ||
|
||
columnMaps := getColumnMaps(txn, t) | ||
|
||
if rowToInsert != nil { | ||
if err := checkRowInsertionConsistency( | ||
sessVars, rowToInsert, rowInsertion, columnMaps.ColumnIDToInfo, columnMaps.ColumnIDToFieldType, | ||
); err != nil { | ||
return errors.Trace(err) | ||
} | ||
} | ||
if err := checkIndexKeys( | ||
sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos, | ||
); err != nil { | ||
return errors.Trace(err) | ||
} | ||
return nil | ||
} | ||
|
||
// checkIndexKeys checks whether the decoded data from keys of index mutations are consistent with the expected ones. | ||
func checkIndexKeys( | ||
sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum, | ||
indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, | ||
indexIDToRowColInfos map[int64][]rowcodec.ColInfo, | ||
) error { | ||
|
||
var indexData []types.Datum | ||
for _, m := range indexMutations { | ||
_, indexID, _, err := tablecodec.DecodeIndexKey(m.key) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
indexInfo, ok := indexIDToInfo[indexID] | ||
if !ok { | ||
return errors.New("index not found") | ||
} | ||
rowColInfos, ok := indexIDToRowColInfos[indexID] | ||
if !ok { | ||
return errors.New("index not found") | ||
} | ||
|
||
// when we cannot decode the key to get the original value | ||
if len(m.value) == 0 && NeedRestoredData(indexInfo.Columns, t.Meta().Columns) { | ||
continue | ||
} | ||
|
||
decodedIndexValues, err := tablecodec.DecodeIndexKV( | ||
m.key, m.value, len(indexInfo.Columns), tablecodec.HandleNotNeeded, rowColInfos, | ||
) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
// reuse the underlying memory, save an allocation | ||
if indexData == nil { | ||
indexData = make([]types.Datum, 0, len(decodedIndexValues)) | ||
} else { | ||
indexData = indexData[:0] | ||
} | ||
|
||
for i, v := range decodedIndexValues { | ||
fieldType := &t.Columns[indexInfo.Columns[i].Offset].FieldType | ||
datum, err := tablecodec.DecodeColumnValue(v, fieldType, sessVars.Location()) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
indexData = append(indexData, datum) | ||
} | ||
|
||
if len(m.value) == 0 { | ||
err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo) | ||
} else { | ||
err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo) | ||
} | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// checkRowInsertionConsistency checks whether the values of row mutations are consistent with the expected ones | ||
// We only check data added since a deletion of a row doesn't care about its value (and we cannot know it) | ||
func checkRowInsertionConsistency( | ||
sessVars *variable.SessionVars, rowToInsert []types.Datum, rowInsertion mutation, | ||
columnIDToInfo map[int64]*model.ColumnInfo, columnIDToFieldType map[int64]*types.FieldType, | ||
) error { | ||
if rowToInsert == nil { | ||
// it's a deletion | ||
return nil | ||
} | ||
ekexium marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
decodedData, err := tablecodec.DecodeRowToDatumMap(rowInsertion.value, columnIDToFieldType, sessVars.Location()) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
// NOTE: we cannot check if the decoded values contain all columns since some columns may be skipped. It can even be empty | ||
// Instead, we check that decoded index values are consistent with the input row. | ||
|
||
for columnID, decodedDatum := range decodedData { | ||
inputDatum := rowToInsert[columnIDToInfo[columnID].Offset] | ||
cmp, err := decodedDatum.CompareDatum(sessVars.StmtCtx, &inputDatum) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
if cmp != 0 { | ||
logutil.BgLogger().Error( | ||
"inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), | ||
zap.String("input datum", inputDatum.String()), | ||
) | ||
return errors.Errorf( | ||
"inconsistent row mutation, row datum = {%v}, input datum = {%v}", decodedDatum.String(), | ||
inputDatum.String(), | ||
) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// collectTableMutationsFromBufferStage collects mutations of the current table from the mem buffer stage | ||
// It returns: (1) all index mutations (2) the only row insertion | ||
// If there are no row insertions, the 2nd returned value is nil | ||
// If there are multiple row insertions, an error is returned | ||
func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) ( | ||
[]mutation, mutation, error, | ||
) { | ||
indexMutations := make([]mutation, 0) | ||
var rowInsertion mutation | ||
var err error | ||
inspector := func(key kv.Key, flags kv.KeyFlags, data []byte) { | ||
// only check the current table | ||
if tablecodec.DecodeTableID(key) == t.physicalTableID { | ||
m := mutation{key, flags, data} | ||
if rowcodec.IsRowKey(key) { | ||
if len(data) > 0 { | ||
if rowInsertion.key == nil { | ||
rowInsertion = m | ||
} else { | ||
err = errors.Errorf( | ||
"multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m, | ||
) | ||
} | ||
} | ||
} else { | ||
indexMutations = append(indexMutations, m) | ||
} | ||
} | ||
} | ||
memBuffer.InspectStage(sh, inspector) | ||
return indexMutations, rowInsertion, err | ||
} | ||
|
||
// compareIndexData compares the decoded index data with the input data. | ||
// Returns error if the index data is not a subset of the input data. | ||
func compareIndexData( | ||
sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo, | ||
) error { | ||
for i, decodedMutationDatum := range indexData { | ||
expectedDatum := input[indexInfo.Columns[i].Offset] | ||
|
||
tablecodec.TruncateIndexValue( | ||
&expectedDatum, indexInfo.Columns[i], | ||
cols[indexInfo.Columns[i].Offset].ColumnInfo, | ||
) | ||
tablecodec.TruncateIndexValue( | ||
&decodedMutationDatum, indexInfo.Columns[i], | ||
cols[indexInfo.Columns[i].Offset].ColumnInfo, | ||
) | ||
|
||
comparison, err := decodedMutationDatum.CompareDatum(sc, &expectedDatum) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
if comparison != 0 { | ||
logutil.BgLogger().Error( | ||
"inconsistent index values", | ||
zap.String("truncated mutation datum", fmt.Sprintf("%v", decodedMutationDatum)), | ||
zap.String("truncated expected datum", fmt.Sprintf("%v", expectedDatum)), | ||
) | ||
return errors.New("inconsistent index values") | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// getColumnMaps tries to get the columnMaps from transaction options. If there isn't one, it builds one and stores it. | ||
// It saves redundant computations of the map. | ||
func getColumnMaps(txn kv.Transaction, t *TableCommon) columnMaps { | ||
getter := func() (map[int64]columnMaps, bool) { | ||
m, ok := txn.GetOption(kv.TableToColumnMaps).(map[int64]columnMaps) | ||
return m, ok | ||
} | ||
setter := func(maps map[int64]columnMaps) { | ||
txn.SetOption(kv.TableToColumnMaps, maps) | ||
} | ||
columnMaps := getOrBuildColumnMaps(getter, setter, t) | ||
return columnMaps | ||
} | ||
|
||
// getOrBuildColumnMaps tries to get the columnMaps from some place. If there isn't one, it builds one and stores it. | ||
// It saves redundant computations of the map. | ||
func getOrBuildColumnMaps( | ||
getter func() (map[int64]columnMaps, bool), setter func(map[int64]columnMaps), t *TableCommon, | ||
) columnMaps { | ||
tableMaps, ok := getter() | ||
if !ok || tableMaps == nil { | ||
tableMaps = make(map[int64]columnMaps) | ||
} | ||
maps, ok := tableMaps[t.tableID] | ||
if !ok { | ||
maps = columnMaps{ | ||
make(map[int64]*model.ColumnInfo, len(t.Meta().Columns)), | ||
make(map[int64]*types.FieldType, len(t.Meta().Columns)), | ||
make(map[int64]*model.IndexInfo, len(t.Indices())), | ||
make(map[int64][]rowcodec.ColInfo, len(t.Indices())), | ||
} | ||
|
||
for _, col := range t.Meta().Columns { | ||
maps.ColumnIDToInfo[col.ID] = col | ||
maps.ColumnIDToFieldType[col.ID] = &col.FieldType | ||
} | ||
for _, index := range t.Indices() { | ||
if index.Meta().Primary && t.meta.IsCommonHandle { | ||
continue | ||
} | ||
maps.IndexIDToInfo[index.Meta().ID] = index.Meta() | ||
maps.IndexIDToRowColInfos[index.Meta().ID] = BuildRowcodecColInfoForIndexColumns(index.Meta(), t.Meta()) | ||
} | ||
|
||
tableMaps[t.tableID] = maps | ||
setter(tableMaps) | ||
} | ||
return maps | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this happend?If it's unexpected should we print an error log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite remember when it's needed. I will let it return an error and see if any test fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some implementations of MemBuffer doesn't support staging. For example, the one in lightning:
tidb/br/pkg/lightning/backend/kv/session.go
Lines 138 to 140 in b25a392
I think we can just ignore them.