Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiCDC support checking if data is entirely replicated to Downstream #10133

Merged
merged 39 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2898d74
first commit
hongyunyan Nov 22, 2023
73bc607
update code
hongyunyan Nov 22, 2023
6b92aa4
fix code
hongyunyan Nov 23, 2023
f45c7ae
add test
hongyunyan Nov 23, 2023
cbd4b7d
add test
hongyunyan Nov 27, 2023
37add5b
update test
hongyunyan Nov 28, 2023
45e0273
update code
hongyunyan Nov 28, 2023
8dedd2a
add ft
hongyunyan Nov 29, 2023
95f9204
update ft
hongyunyan Nov 30, 2023
8431839
update test
hongyunyan Nov 30, 2023
e2e3350
Merge branch 'master' of https://github.com/pingcap/tiflow into hongy…
hongyunyan Nov 30, 2023
9b37e32
update run_group
hongyunyan Dec 11, 2023
ad5e53b
fix conflict
hongyunyan Dec 11, 2023
a3f8736
fix typo
hongyunyan Dec 11, 2023
275ac70
update
hongyunyan Dec 11, 2023
e32aef5
update
hongyunyan Dec 11, 2023
c6bf615
fix ut
hongyunyan Dec 11, 2023
aee4463
fix test
hongyunyan Dec 12, 2023
c62086d
update
hongyunyan Dec 12, 2023
d5ead0a
revert change
hongyunyan Dec 12, 2023
6edef57
update it
hongyunyan Dec 12, 2023
440b4e8
update
hongyunyan Dec 12, 2023
1cc43cf
update
hongyunyan Dec 12, 2023
a921613
update
hongyunyan Dec 12, 2023
b9f7b1b
update
hongyunyan Dec 12, 2023
573ef55
update
hongyunyan Dec 12, 2023
e276bde
for test
hongyunyan Dec 13, 2023
f6f8d0a
update
hongyunyan Dec 13, 2023
9cfd6f2
update
hongyunyan Dec 13, 2023
40522d7
Merge branch 'master' of https://github.com/pingcap/tiflow into hongy…
hongyunyan Dec 13, 2023
317d58b
update
hongyunyan Dec 18, 2023
50b0a15
fix comments
hongyunyan Dec 19, 2023
74e9b82
fix comments
hongyunyan Dec 19, 2023
49d4583
Merge branch 'master' of https://github.com/pingcap/tiflow into hongy…
hongyunyan Dec 19, 2023
5f51903
update code
hongyunyan Dec 19, 2023
9680570
update
hongyunyan Dec 20, 2023
70866f5
update
hongyunyan Dec 20, 2023
fca13f0
fix
hongyunyan Dec 20, 2023
f9ea16c
fix typo
hongyunyan Dec 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedSyncedStatusForAPI, error) {
args := p.Called(ctx)
return args.Get(0).(*model.ChangeFeedSyncedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) IsHealthy(ctx context.Context) (bool, error) {
args := p.Called(ctx)
return args.Get(0).(bool), args.Error(1)
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", changefeedOwnerMiddleware, api.status)
changefeedGroup.GET("/:changefeed_id/synced", changefeedOwnerMiddleware, api.synced)

// capture apis
captureGroup := v2.Group("/captures")
Expand Down
25 changes: 17 additions & 8 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context,

// GetTS of mockPDClient returns a mock tso
func (c *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return c.logicTime, c.timestamp, nil
return c.timestamp, c.logicTime, nil
}

// GetClusterID of mockPDClient returns a mock ClusterID
Expand All @@ -62,13 +62,14 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
err error
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
changeFeedSyncedStatus *model.ChangeFeedSyncedStatusForAPI
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
Expand Down Expand Up @@ -120,6 +121,14 @@ func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
return m.changefeedStatuses, m.err
}

// GetChangeFeedSyncedStatus returns a mock changefeed status.
func (m *mockStatusProvider) GetChangeFeedSyncedStatus(_ context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedSyncedStatusForAPI,
error,
) {
return m.changeFeedSyncedStatus, m.err
}

func (m *mockStatusProvider) IsChangefeedOwner(_ context.Context, id model.ChangeFeedID) (bool, error) {
return true, nil
}
138 changes: 138 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
apiOpVarChangefeedID = "changefeed_id"
// apiOpVarNamespace is the key of changefeed namespace in HTTP API
apiOpVarNamespace = "namespace"
// timeout for pd client
timeout = 30 * time.Second
)

// createChangefeed handles create changefeed request,
Expand Down Expand Up @@ -895,6 +897,142 @@ func (h *OpenAPIV2) status(c *gin.Context) {
})
}

// synced get the synced status of a changefeed
// @Summary Get synced status
// @Description get the synced status of a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param namespace query string false "default"
// @Success 200 {object} SyncedStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/synced [get]
func (h *OpenAPIV2) synced(c *gin.Context) {
ctx := c.Request.Context()

namespace := getNamespaceValueWithDefault(c)
changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID))

cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}

// try to get pd client to get pd time, and determine synced status based on the pd time
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
}
defer pdClient.Close()
// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

// We can normally get pd time. Thus we determine synced status based on physicalNow, lastSyncedTs, checkpointTs and pullerResolvedTs
if (physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) &&
(physicalNow-oracle.ExtractPhysical(status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) {
// case 2: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs < CheckpointInterval
// --> reach strict synced status
c.JSON(http.StatusOK, SyncedStatus{
Synced: true,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "Data syncing is finished",
})
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
return
}

if physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 {
// case 3: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs > CheckpointInterval
// we should consider the situation that pd or tikv region is not healthy to block the advancing resolveTs.
// if pullerResolvedTs - checkpointTs > CheckpointInterval--> data is not synced
// otherwise, if pd & tikv is healthy --> data is not synced
// if not healthy --> data is synced
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) <
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("Please check whether pd is healthy and tikv region is all available. " +
"If pd is not healthy or tikv region is not available, the data syncing is finished. " +
"When pd is offline means that pd is not healthy. For tikv region, you can check the grafana info " +
"in 'TiKV-Details-Resolved-Ts-Max Leader Resolved TS gap'. If the gap is a large value, such as a few minutes, " +
"it means some regions in tikv are unavailable. " +
" Otherwise the data syncing is not finished, please wait")
} else {
message = "The data syncing is not finished, please wait"
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: message,
})
return
}

// case 4: If physcialNow - lastSyncedTs < SyncedCheckInterval --> data is not synced
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "The data syncing is not finished, please wait",
})
}

func toAPIModel(
info *model.ChangeFeedInfo,
resolvedTs uint64,
Expand Down
Loading
Loading