-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TiCDC supports a snapshot-map in replication #932
Conversation
…point has not add yet)
@Colins110 Could you explain more about how a user would make use of this feature? What is the potential user scenario and how would the user make use of the "safepoint mapping"? |
you can refer to this issue: TiDB Cluster supports snapshot level consistency replication in time |
/run-integration-tests tidb=release-4.0 tikv=release-4.0 pd=release-4.0 |
Codecov Report
@@ Coverage Diff @@
## master #932 +/- ##
================================================
- Coverage 32.1982% 31.5646% -0.6337%
================================================
Files 100 101 +1
Lines 10836 11025 +189
================================================
- Hits 3489 3480 -9
- Misses 6956 7152 +196
- Partials 391 393 +2 |
@Colins110,Thanks for your review. The bot only counts LGTMs from Reviewers and higher roles, but you're still welcome to leave your comments.See the corresponding SIG page for more information. Related SIG: migrate(slack). |
@@ -333,13 +333,13 @@ func (o *Owner) newChangeFeed( | |||
} | |||
errCh := make(chan error, 1) | |||
|
|||
sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) | |||
primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) | |
sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should use package sink
in L360 (var syncpointStore sink.SyncpointStore
) to create a syncpointStore
,if the var named sink,the package sink
will not be used in the follow code,there is a conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move the sink.SyncpointStore
to another package is better, but it reuses some code in the sink package.
We can refine this in another PR.
/run-integration-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM |
/merge |
/run-all-tests |
@Colins110 merge failed. |
/run-integration-tests |
2 similar comments
/run-integration-tests |
/run-integration-tests |
What problem does this PR solve?
TiDB Cluster supports snapshot level consistency replication in time
What is changed and how it works?
Make MySQL sink supports to replicate to a global consistent state dynamically.
set a syncpoint at regular intervals for get globally consistent state;
by stopping updating the global resolvedTS util the global checkpointTS == global resolvedTS, then set the sync point TS
=checkpointTS;
Store a safepoint mapping in downstream, recording the TSO mapping between upstream and downstream.
the map in the downstream like this:
Check List
Tests
a integration test named
syncpoint
test the syncpoint's correctness by sync_diff_inspector tool. you can usemake integration_test CASE=syncpoint
to run the test.Code changes
in cdc/changeFeed.go:
add function:
startSyncPeriod()
createSynctable()
sinkSyncpoint(ctx context.Context)
add some property in
changeFeed struct
some change in function
calcResolvedTs(ctx context.Context)
:to judge the syncpoint and sink the map to downstream
in cdc/owner.go:
add function:
configureSinkURI(ctx context.Context, dsnCfg *dmysql.Config, tz *time.Location)
createSyncpointSink(ctx context.Context, info *model.ChangeFeedInfo, id string)
some change in function
newChangeFeed( ctx context.Context, id model.ChangeFeedID, processorsInfos model.ProcessorsInfos, taskPositions map[string]*model.TaskPosition, info *model.ChangeFeedInfo, checkpointTs uint64)
:for every changefeed,we need create a standalone mysql link to record the syncpoint map in the future.
the new flags for
cdc cli changefeed
:you can open the feature to record the syncpoint like this when you create a new changefeed or update a changefeed:
cdc changefeed create --sink-uri="$SINK_URI" --sync-point --sync-interval=10s
the
--sync-point
mean open the syncpoint feature, the--sync-interval
can set the interval for recording the sync point(the default value is 10min).Release note