-
Notifications
You must be signed in to change notification settings - Fork 188
pessimistic: record latest done ddl to handle conflict #1626
base: master
Are you sure you want to change the base?
Conversation
[REVIEW NOTIFICATION] This pull request has not been approved. To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by writing |
could we let DM-worker put And I don't know if there're some DDL that could be run twice with some DML interleaved and take some effects (like TRUNCATE but TRUNCATE is filtered, I don't know if we should support pessimistic sharding TRUNCATE someday). For this case, we might adding binlog location as a part of key of |
So you mean we should record last done ddl for every worker? Currently only put last done ddl after lock is resolved(all source done). Here handle the situation about DM-master down.
I don't think we will support truncate in pessimistic.🤣. We only skip the DDL when conflict error happened. So the case only happened like
we handle this situation. |
pkg/shardddl/pessimism/operation.go
Outdated
|
||
// only used to report to the caller of the watcher, do not marsh it. | ||
// if it's true, it means the Operation has been deleted in etcd. | ||
IsDeleted bool `json:"-"` | ||
} | ||
|
||
// NewOperation creates a new Operation instance. | ||
func NewOperation(id, task, source string, ddls []string, exec, done bool) Operation { | ||
func NewOperation(id, task, source string, ddls []string, exec, done bool, skip bool) Operation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func NewOperation(id, task, source string, ddls []string, exec, done bool, skip bool) Operation { | |
func NewOperation(id, task, source string, ddls []string, exec, done, skip bool) Operation { |
pkg/shardddl/pessimism/lock.go
Outdated
@@ -64,13 +68,46 @@ func NewLock(id, task, owner string, ddls, sources []string) *Lock { | |||
// TrySync tries to sync the lock, does decrease on remain, re-entrant. | |||
// new upstream sources may join when the DDL lock is in syncing, | |||
// so we need to merge these new sources. | |||
func (l *Lock) TrySync(caller string, ddls, sources []string) (bool, int, error) { | |||
func (l *Lock) TrySync(cli *clientv3.Client, caller string, ddls, sources []string, latestDoneDDLs []string) (bool, int, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (l *Lock) TrySync(cli *clientv3.Client, caller string, ddls, sources []string, latestDoneDDLs []string) (bool, int, error) { | |
func (l *Lock) TrySync(cli *clientv3.Client, caller string, ddls, sources, latestDoneDDLs []string) (bool, int, error) { |
pkg/shardddl/pessimism/lock.go
Outdated
} | ||
|
||
// current ddls idempotent, skip it. | ||
if utils.CompareShardingDDLs(latestDoneDDLs, ddls) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can save this function in a bool variable to save one comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will review later
pkg/shardddl/pessimism/ddls.go
Outdated
} | ||
|
||
// PutLatestDoneDDLs puts the last done shard DDL ddls into etcd. | ||
func PutLatestDoneDDLs(cli *clientv3.Client, lockID string, ddls []string) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may use above function
pkg/shardddl/pessimism/keeper.go
Outdated
if t := utils.ExtractTaskFromLockID(lockID); t == task { | ||
lockIDs = append(lockIDs, lockID) | ||
} | ||
delete(lk.latestDoneDDLs, lockID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove the lockID only when it matching the task 🤔
@@ -35,21 +35,23 @@ type Operation struct { | |||
DDLs []string `json:"ddls"` // DDL statements | |||
Exec bool `json:"exec"` // execute or skip the DDL statements | |||
Done bool `json:"done"` // whether the `Exec` operation has done | |||
Skip bool `json:"skip"` // Whether worker skip this operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we don't add Skip
, instead, use empty DDLs
need shfmt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will review later
_, _, err := PutOperations(cli, true, NewOperation(l.ID, l.Task, caller, latestDoneDDLs, false, false, true)) | ||
return l.remain <= 0, l.remain, err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you give an example of the cases that enter here 😵 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curIdempotent: worker1, worker2 ddl1 -> worker 2 ddl2 -> worker1 resync ddl1
otherIdempotent: worker1, worker2 ddl1 -> worker2 resync ddl1 -> worker1 ddl2
} | ||
|
||
// current ddls idempotent, skip it. | ||
if curIdempotent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still prefer that we should check curIdempotent
before utils.CompareShardingDDLs(ddls, l.DDLs)
background: the lock is resolved but one non-owner is failed before saving checkpoint and resync the DDL of the resolved lock
my prefer way:
- the failed DM-worker put the outdated DDL. if we check
curIdempotent
here, we could earlier avoid blocking that DM-worker which may caused by DDL lock
current way:
- the failed DM-worker (worker1) put the outdated DDL,
utils.CompareShardingDDLs(ddls, l.DDLs)
is true - another DM-worker (worker2) put a DDL more likely a new different DDL, enter if-then branch,
curIdempotent
is false,utils.CompareShardingDDLs(latestDoneDDLs, l.DDLs)
is true.
(And i think in most casescurIdempotent
is false. And whencurIdempotent
is true, worker1 will wait more time to get skip Op) - in step2 we enter "other sources' ddls idempotent, skip them" branch, and here we put skip Op for worker1
Above all, worker1 may be blocked for a longer time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I still prefer save a binlog position through same transaction in etcd 😂 When DM-worker starting, this position has higher priority than one in downstream checkpoint
/hold |
@GMHDBJD: PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/hold |
What problem does this PR solve?
issue
What is changed and how it works?
when latest DDLs is resolved, put it into etcd.
when conflict DDLs detected in master, ignore DDLs which equals to the latest DDLs.
Check List
Tests