-
Notifications
You must be signed in to change notification settings - Fork 188
syncer/: add async flush checkpoint feature #605
Conversation
Codecov Report
@@ Coverage Diff @@
## master #605 +/- ##
================================================
- Coverage 57.7236% 57.7212% -0.0024%
================================================
Files 203 201 -2
Lines 20515 20353 -162
================================================
- Hits 11842 11748 -94
+ Misses 7526 7474 -52
+ Partials 1147 1131 -16 |
@lichunzhu please resolve conflicts |
/run-unit-test |
@@ -721,7 +745,11 @@ func (s *Syncer) addJob(job *job) error { | |||
} | |||
s.jobWg.Wait() | |||
finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() | |||
return s.flushCheckPoints() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the old code, flush
job will wait for checkpoint flush to complete, do we still need to keep the behavior? or is there any mechanism to wait for checkpoint flushed (like wait before executing DDL)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Newly added structure flushHelper
may make flush checkpoint job wait.
syncer/syncer.go
Outdated
@@ -974,6 +1015,10 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo | |||
if !ok { | |||
return | |||
} | |||
if sqlJob.tp == xid { | |||
lastAddedXidPos.save(sqlJob.location.Clone(), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is currentLocation
more accurate? (although location
and currentLocation
are the same for XID job).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in c1bef9f
…t chan" This reverts commit d9dbdd1.
…o asyncCheckpoint
@lichunzhu: PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Won't merge |
cherry-pick of #595
If this PR is merged and operate stably on master branch we will then merge #595.
What problem does this PR solve?
Async flush checkpoint to improve effeciency.
What is changed and how it works?
worker goroutine: concurrent worker count
(The checkpoint saved by the worker goroutine is only global checkpoint, and the table checkpoint is refreshed by ddl job)
flush goroutine:
for loop
1. Get flush type from channel. Check whether we need to update global checkpoint this time. If so, update global checkpoint with min worker pos.
2. Flush checkpoint.
add job:
No longer scheduled wait dml job execution is complete
a. The ddl job is executed, the wait job is completed, the checkpoint is updated, and submitted to the flush goroutine
b. If there is no flush checkpoint for more than 30s, submit a request to the flush goroutine
fake checkpoint:
We will use added dml job location as fake checkpoint to avoid unbalanced dmls to cause an extremely small checkpoint.
Check List
Tests
Code changes