Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiCDC support checking if data is entirely replicated to Downstream #10133

Merged
merged 39 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2898d74
first commit
hongyunyan Nov 22, 2023
73bc607
update code
hongyunyan Nov 22, 2023
6b92aa4
fix code
hongyunyan Nov 23, 2023
f45c7ae
add test
hongyunyan Nov 23, 2023
cbd4b7d
add test
hongyunyan Nov 27, 2023
37add5b
update test
hongyunyan Nov 28, 2023
45e0273
update code
hongyunyan Nov 28, 2023
8dedd2a
add ft
hongyunyan Nov 29, 2023
95f9204
update ft
hongyunyan Nov 30, 2023
8431839
update test
hongyunyan Nov 30, 2023
e2e3350
Merge branch 'master' of https://github.com/pingcap/tiflow into hongy…
hongyunyan Nov 30, 2023
9b37e32
update run_group
hongyunyan Dec 11, 2023
ad5e53b
fix conflict
hongyunyan Dec 11, 2023
a3f8736
fix typo
hongyunyan Dec 11, 2023
275ac70
update
hongyunyan Dec 11, 2023
e32aef5
update
hongyunyan Dec 11, 2023
c6bf615
fix ut
hongyunyan Dec 11, 2023
aee4463
fix test
hongyunyan Dec 12, 2023
c62086d
update
hongyunyan Dec 12, 2023
d5ead0a
revert change
hongyunyan Dec 12, 2023
6edef57
update it
hongyunyan Dec 12, 2023
440b4e8
update
hongyunyan Dec 12, 2023
1cc43cf
update
hongyunyan Dec 12, 2023
a921613
update
hongyunyan Dec 12, 2023
b9f7b1b
update
hongyunyan Dec 12, 2023
573ef55
update
hongyunyan Dec 12, 2023
e276bde
for test
hongyunyan Dec 13, 2023
f6f8d0a
update
hongyunyan Dec 13, 2023
9cfd6f2
update
hongyunyan Dec 13, 2023
40522d7
Merge branch 'master' of https://github.com/pingcap/tiflow into hongy…
hongyunyan Dec 13, 2023
317d58b
update
hongyunyan Dec 18, 2023
50b0a15
fix comments
hongyunyan Dec 19, 2023
74e9b82
fix comments
hongyunyan Dec 19, 2023
49d4583
Merge branch 'master' of https://github.com/pingcap/tiflow into hongy…
hongyunyan Dec 19, 2023
5f51903
update code
hongyunyan Dec 19, 2023
9680570
update
hongyunyan Dec 20, 2023
70866f5
update
hongyunyan Dec 20, 2023
fca13f0
fix
hongyunyan Dec 20, 2023
f9ea16c
fix typo
hongyunyan Dec 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedSyncedStatusForAPI, error) {
args := p.Called(ctx)
return args.Get(0).(*model.ChangeFeedSyncedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) IsHealthy(ctx context.Context) (bool, error) {
args := p.Called(ctx)
return args.Get(0).(bool), args.Error(1)
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", changefeedOwnerMiddleware, api.status)
changefeedGroup.GET("/:changefeed_id/synced", changefeedOwnerMiddleware, api.synced)

// capture apis
captureGroup := v2.Group("/captures")
Expand Down
25 changes: 17 additions & 8 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context,

// GetTS of mockPDClient returns a mock tso
func (c *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return c.logicTime, c.timestamp, nil
return c.timestamp, c.logicTime, nil
}

// GetClusterID of mockPDClient returns a mock ClusterID
Expand All @@ -62,13 +62,14 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
err error
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
changeFeedSyncedStatus *model.ChangeFeedSyncedStatusForAPI
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
Expand Down Expand Up @@ -120,6 +121,14 @@ func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
return m.changefeedStatuses, m.err
}

// GetChangeFeedSyncedStatus returns a mock changefeed status.
func (m *mockStatusProvider) GetChangeFeedSyncedStatus(_ context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedSyncedStatusForAPI,
error,
) {
return m.changeFeedSyncedStatus, m.err
}

func (m *mockStatusProvider) IsChangefeedOwner(_ context.Context, id model.ChangeFeedID) (bool, error) {
return true, nil
}
130 changes: 130 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -895,6 +896,135 @@ func (h *OpenAPIV2) status(c *gin.Context) {
})
}

// transformer timestamp to readable format
func transformerTime(timestamp int64) string {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
location := time.Local
tm := time.Unix((timestamp / 1000), 0).In(location)
return tm.Format("2006-01-02 15:04:05")
}

// synced get the synced status of a changefeed
// @Summary Get synced status
// @Description get the synced status of a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param namespace query string false "default"
// @Success 200 {object} SyncedStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/synced [get]
func (h *OpenAPIV2) synced(c *gin.Context) {
ctx := c.Request.Context()

namespace := getNamespaceValueWithDefault(c)
changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
ctx,
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
changefeedID,
)
if err != nil {
_ = c.Error(err)
return
}

log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID))

cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}

// get pd client
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// pd is unavailable
var message string
if (status.PullerResolvedTs - status.CheckpointTs) > cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { // 5s
message = fmt.Sprintf("%s. Besides the data is not finish syncing", terror.Message(err))
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
} else {
message = fmt.Sprintf("%s. You can check the pd first, and if pd is available, means we don't finish sync data. "+
"If pd is not available, please check the whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: transformerTime(status.CheckpointTs),
PullerResolvedTs: transformerTime(status.PullerResolvedTs),
LastSyncedTs: transformerTime(status.LastSyncedTs),
NowTs: transformerTime(0),
Info: message,
})
return
}
defer pdClient.Close()

// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

if (physicalNow-status.LastSyncedTs > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) &&
(physicalNow-status.CheckpointTs < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) {
// reach strict synced condition
c.JSON(http.StatusOK, SyncedStatus{
Synced: true,
SinkCheckpointTs: transformerTime(status.CheckpointTs),
PullerResolvedTs: transformerTime(status.PullerResolvedTs),
LastSyncedTs: transformerTime(status.LastSyncedTs),
NowTs: transformerTime(physicalNow),
Info: "Data syncing is finished",
})
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
} else if physicalNow-status.LastSyncedTs > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 {
// lastSyncedTs reach the synced condition, while checkpoint-ts doesn't
var message string
if (status.PullerResolvedTs - status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("Please check whether pd is health and tikv region is all available. " +
"If pd is not health or tikv region is not available, the data syncing is finished. " +
" Otherwise the data syncing is not finished, please wait")
} else {
message = "The data syncing is not finished, please wait"
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: transformerTime(status.CheckpointTs),
PullerResolvedTs: transformerTime(status.PullerResolvedTs),
LastSyncedTs: transformerTime(status.LastSyncedTs),
NowTs: transformerTime(physicalNow),
Info: message,
})
} else {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
// lastSyncedTs doesn't reach the synced condition
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: transformerTime(status.CheckpointTs),
PullerResolvedTs: transformerTime(status.PullerResolvedTs),
LastSyncedTs: transformerTime(status.LastSyncedTs),
NowTs: transformerTime(physicalNow),
Info: "The data syncing is not finished, please wait",
})
}
}

func toAPIModel(
info *model.ChangeFeedInfo,
resolvedTs uint64,
Expand Down
Loading
Loading