-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: Check mutations for single-row changes #27920
*: Check mutations for single-row changes #27920
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
func CheckIndexConsistency(sessVars *variable.SessionVars, t *TableCommon, | ||
dataAdded, dataRemoved []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle) error { | ||
sc := sessVars.StmtCtx | ||
if sh == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this happend?If it's unexpected should we print an error log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite remember when it's needed. I will let it return an error and see if any test fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some implementations of MemBuffer doesn't support staging. For example, the one in lightning:
tidb/br/pkg/lightning/backend/kv/session.go
Lines 138 to 140 in b25a392
func (mb *kvMemBuf) Staging() kv.StagingHandle { | |
return 0 | |
} |
I think we can just ignore them.
table/tables/mutation_checker.go
Outdated
return errors.Trace(err) | ||
} | ||
if cmp != 0 { | ||
logutil.BgLogger().Error("inconsistent row mutation", zap.String("decoded datum", decodedDatum.String()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the error report we could use reporter
from #27388, and inconsistency events could traced by the transaction event.
/cc @MyonKeminta @longfangsong
table/tables/mutation_checker.go
Outdated
func checkIndexKeys(sc *stmtctx.StatementContext, sessVars *variable.SessionVars, t *TableCommon, | ||
dataAdded []types.Datum, dataRemoved []types.Datum, mutations []mutation) error { | ||
indexIDMap := make(map[int64]indexHelperInfo) | ||
for _, index := range t.indices { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the clustered primary index be skipped here? If so we could rename it checkSecondaryIndexKeys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be skipped.
I think there are still cases where the (non-clustered) primary indices need to be checked?
table/tables/mutation_checker.go
Outdated
if len(m.value) == 0 && NeedRestoredData(indexHelperInfo.indexInfo.Columns, t.Meta().Columns) { | ||
continue | ||
} | ||
|
||
decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexHelperInfo.indexInfo.Columns), | ||
tablecodec.HandleNotNeeded, indexHelperInfo.rowColInfos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will index key utilities like expression index
, collations
or row formats
introduce corner cases here?
Need help /cc @lysu @wjhuang2016
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think it's fine here.
table/tables/mutation_checker.go
Outdated
if len(m.value) == 0 && NeedRestoredData(indexHelperInfo.indexInfo.Columns, t.Meta().Columns) { | ||
continue | ||
} | ||
|
||
decodedIndexValues, err := tablecodec.DecodeIndexKV(m.key, m.value, len(indexHelperInfo.indexInfo.Columns), | ||
tablecodec.HandleNotNeeded, indexHelperInfo.rowColInfos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think it's fine here.
} | ||
mutations := collectTableMutationsFromBufferStage(t, memBuffer, sh) | ||
if err := checkRowAdditionConsistency(sessVars, t.Meta().Columns, dataAdded, mutations); err != nil { | ||
return errors.Trace(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use Trace
if the err
is generated by errors.New
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The errors are temporary. They might change after #27388 is merged
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the comments seems outdated... but please still take a look
table/tables/mutation_checker.go
Outdated
} | ||
|
||
func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer, sh kv.StagingHandle) []mutation { | ||
mutations := make([]mutation, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible to make the membuffer support getting the size of the current stage, so that we can reserve enough space and allocate exactly once. But the change might be too much for a single PR.
table/tables/mutation_checker.go
Outdated
columnMap := make(map[int64]*model.ColumnInfo) | ||
for _, col := range tableColumns { | ||
columnMap[col.ID] = col | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks too expensive if we do this for each row... If this is really necessary, can we try to store it somewhere like the sessionctx, to make it reusable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to save it in the stmtctx. PTAL if it's reasonable
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com>
Signed-off-by: ekexium <ekexium@gmail.com> Auto stash before rebase of "ft-data-inconsistency"
Signed-off-by: ekexium <ekexium@gmail.com>
0996a1a
to
5eeb405
Compare
Some implementations of MemBuffer doesn't support staging. We don't care about them for now
/run-all-tests |
1 similar comment
/run-all-tests |
table/tables/mutation_checker.go
Outdated
if rowInsertion.key == nil { | ||
rowInsertion = m | ||
} else { | ||
err = errors.Errorf("multiple row mutations added/mutated, one = %+v, another = %+v", rowInsertion, m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be careful that the data may need to be redacted, if you are going to print the error to log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The errors will be replaced with the reporter (in the following PR, I suppose). I think we can handle redactions there.
table/tables/mutation_checker.go
Outdated
columnFieldMap := make(map[int64]*types.FieldType) | ||
for id, col := range columnMap { | ||
columnFieldMap[id] = &col.FieldType | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's another one. How about add a new version of DecodeRowToDatumMap
that accepts the ColumnInfo map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the small allocations doesn't have much direct affect to the performance, but it increases the GC pressure, so that allocations on such very-frequent paths should be very carefully treated IMO 🤔
And when map or slice is necessary, consider specifying the capacity in the make
statement if possible, to reduce the potential reallocation when expanding. For example, here len(columnMap)
can be used as the initial capacity of columnFieldmap
, if this new map is really needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
..actually I'm afraid if I'm overdoing... or may we do these kind of optimizations in a new PR...? @cfzjywxk how do you think...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about add a new version of DecodeRowToDatumMap that accepts the ColumnInfo map?
I think it would be less maintainable.
5f4bbe0
to
f868207
Compare
f868207
to
e448d49
Compare
1045e73
to
b90202c
Compare
/run-check_dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cfzjywxk PTAL, do you think the unit tests are enough now?
We may need to design a specific mechanism to cover more combinations of columns and indexes, which seems not easy to be done in the unit-test. The For example to test the clustered index, we've added something like https://github.com/pingcap/automated-tests/blob/master/ticases/clustered_index/dml/basic_generator.go to generate combinations. |
As it's in the development branch, maybe we could merge this first and start to make the detailed test plan ? @MyonKeminta @ekexium What do you think? |
🤔 I'm fine with it |
What problem does this PR solve?
Issue Number: part of #26833
Problem Summary:
Reduce data-index inconsistency issues by checking whether single-row changes generate corrupted mutations.
What is changed and how it works?
What's Changed:
RemoveRecord
creates a mem buffer stage, which is used to collect mutations generated by the operationAddRecord
,UpdateRecord
andRemoveRecord
, check the consistency of its mutations before releasing its mem buffer stageHow it Works:
Check List
Tests
Side effects
Documentation
Release note