Skip to content

Commit

Permalink
This is an automated cherry-pick of #10399
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hongyunyan authored and ti-chi-bot committed Jan 29, 2024
1 parent 12bae00 commit 00ad118
Show file tree
Hide file tree
Showing 3 changed files with 657 additions and 0 deletions.
139 changes: 139 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,145 @@ func (h *OpenAPIV2) status(c *gin.Context) {
})
}

<<<<<<< HEAD
=======
// synced get the synced status of a changefeed
// @Summary Get synced status
// @Description get the synced status of a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param namespace query string false "default"
// @Success 200 {object} SyncedStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/synced [get]
func (h *OpenAPIV2) synced(c *gin.Context) {
ctx := c.Request.Context()

namespace := getNamespaceValueWithDefault(c)
changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID))

cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}

// try to get pd client to get pd time, and determine synced status based on the pd time
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
}
defer pdClient.Close()
// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

// We can normally get pd time. Thus we determine synced status based on physicalNow, lastSyncedTs, checkpointTs and pullerResolvedTs
if (physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) &&
(physicalNow-oracle.ExtractPhysical(status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) {
// case 2: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs < CheckpointInterval
// --> reach strict synced status
c.JSON(http.StatusOK, SyncedStatus{
Synced: true,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "Data syncing is finished",
})
return
}

if physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 {
// case 3: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs > CheckpointInterval
// we should consider the situation that pd or tikv region is not healthy to block the advancing resolveTs.
// if pullerResolvedTs - checkpointTs > CheckpointInterval--> data is not synced
// otherwise, if pd & tikv is healthy --> data is not synced
// if not healthy --> data is synced
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) <
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("Please check whether PD is online and TiKV Regions are all available. " +
"If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. " +
"To check whether TiKV regions are all available, you can view " +
"'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. " +
"If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. " +
"Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait")
} else {
message = "The data syncing is not finished, please wait"
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: message,
})
return
}

// case 4: If physcialNow - lastSyncedTs < SyncedCheckInterval --> data is not synced
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "The data syncing is not finished, please wait",
})
}

>>>>>>> 36aed02d3e (cdc: update info message for/api/v2/changefeeds/{changefeed_id}/synced’ (#10399))
func toAPIModel(
info *model.ChangeFeedInfo,
resolvedTs uint64,
Expand Down
210 changes: 210 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,216 @@ func TestPauseChangefeed(t *testing.T) {
require.Equal(t, "{}", w.Body.String())
}

<<<<<<< HEAD
=======
func TestChangefeedSynced(t *testing.T) {
syncedInfo := testCase{url: "/api/v2/changefeeds/%s/synced?namespace=abc", method: "GET"}
helpers := NewMockAPIV2Helpers(gomock.NewController(t))
cp := mock_capture.NewMockCapture(gomock.NewController(t))
owner := mock_owner.NewMockOwner(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(cp, helpers)
router := newRouter(apiV2)

statusProvider := &mockStatusProvider{}

cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsController().Return(true).AnyTimes()
cp.EXPECT().GetOwner().Return(owner, nil).AnyTimes()

pdClient := &mockPDClient{}
mockUpManager := upstream.NewManager4Test(pdClient)
cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes()

{
// case 1: invalid changefeed id
w := httptest.NewRecorder()
invalidID := "@^Invalid"
req, _ := http.NewRequestWithContext(context.Background(),
syncedInfo.method, fmt.Sprintf(syncedInfo.url, invalidID), nil)
router.ServeHTTP(w, req)
respErr := model.HTTPError{}
err := json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrAPIInvalidParam")
}

{
// case 2: not existed changefeed id
validID := changeFeedID.ID
statusProvider.err = cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID)
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID), nil)
router.ServeHTTP(w, req)
respErr := model.HTTPError{}
err := json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrChangeFeedNotExists")
require.Equal(t, http.StatusBadRequest, w.Code)
}

validID := changeFeedID.ID
cfInfo := &model.ChangeFeedInfo{
ID: validID,
}
statusProvider.err = nil
statusProvider.changefeedInfo = cfInfo
{
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case3: pd is offline,resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
LastSyncedTs: 1701153217279 << 18,
PullerResolvedTs: 1701153247279 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, "+
"please recheck. Besides the data is not finish syncing", resp.Info)
}

{
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case4: pd is offline,resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
LastSyncedTs: 1701153217279 << 18,
PullerResolvedTs: 1701153217479 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, please recheck. "+
"You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than 300 secs. "+
"If it's satisfied, means the data syncing is totally finished", resp.Info)
}

helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(pdClient, nil).AnyTimes()
pdClient.logicTime = 1000
pdClient.timestamp = 1701153217279
{
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217209 << 18,
LastSyncedTs: 1701152217279 << 18,
PullerResolvedTs: 1701153217229 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, true, resp.Synced)
require.Equal(t, "Data syncing is finished", resp.Info)
}

{
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
LastSyncedTs: 1701152217279 << 18,
PullerResolvedTs: 1701153201379 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "Please check whether PD is online and TiKV Regions are all available. "+
"If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. "+
"To check whether TiKV regions are all available, you can view "+
"'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. "+
"If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. "+
"Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait", resp.Info)
}

{
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
LastSyncedTs: 1701152207279 << 18,
PullerResolvedTs: 1701153218279 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "The data syncing is not finished, please wait", resp.Info)
}

{
// case8: pdTs - lastSyncedTs < 5min
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
LastSyncedTs: 1701153213279 << 18,
PullerResolvedTs: 1701153217279 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "The data syncing is not finished, please wait", resp.Info)
}
}

>>>>>>> 36aed02d3e (cdc: update info message for/api/v2/changefeeds/{changefeed_id}/synced’ (#10399))
func TestHasRunningImport(t *testing.T) {
integration.BeforeTestExternal(t)
testEtcdCluster := integration.NewClusterV3(
Expand Down
Loading

0 comments on commit 00ad118

Please sign in to comment.