Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: update info message for ‘/api/v2/changefeeds/{changefeed_id}/synced’ (#10399) #10401

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,145 @@
})
}

<<<<<<< HEAD

Check failure on line 752 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 752 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body
=======
// synced get the synced status of a changefeed
// @Summary Get synced status
// @Description get the synced status of a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param namespace query string false "default"
// @Success 200 {object} SyncedStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/synced [get]
func (h *OpenAPIV2) synced(c *gin.Context) {
ctx := c.Request.Context()

namespace := getNamespaceValueWithDefault(c)
changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID))

cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}

// try to get pd client to get pd time, and determine synced status based on the pd time
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
}
defer pdClient.Close()
// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

// We can normally get pd time. Thus we determine synced status based on physicalNow, lastSyncedTs, checkpointTs and pullerResolvedTs
if (physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) &&
(physicalNow-oracle.ExtractPhysical(status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) {
// case 2: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs < CheckpointInterval
// --> reach strict synced status
c.JSON(http.StatusOK, SyncedStatus{
Synced: true,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "Data syncing is finished",
})
return
}

if physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 {
// case 3: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs > CheckpointInterval
// we should consider the situation that pd or tikv region is not healthy to block the advancing resolveTs.
// if pullerResolvedTs - checkpointTs > CheckpointInterval--> data is not synced
// otherwise, if pd & tikv is healthy --> data is not synced
// if not healthy --> data is synced
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) <
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("Please check whether PD is online and TiKV Regions are all available. " +
"If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. " +
"To check whether TiKV regions are all available, you can view " +
"'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. " +
"If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. " +
"Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait")
} else {
message = "The data syncing is not finished, please wait"
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: message,
})
return
}

// case 4: If physcialNow - lastSyncedTs < SyncedCheckInterval --> data is not synced
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "The data syncing is not finished, please wait",
})
}

>>>>>>> 36aed02d3e (cdc: update info message for ‘/api/v2/changefeeds/{changefeed_id}/synced’ (#10399))

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+2018 '‘' in identifier

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+2019 '’' in identifier

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+2018 '‘' in identifier

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+2019 '’' in identifier

Check failure on line 890 in cdc/api/v2/changefeed.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
func toAPIModel(
info *model.ChangeFeedInfo,
resolvedTs uint64,
Expand Down
238 changes: 238 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,3 +941,241 @@ func TestPauseChangefeed(t *testing.T) {
require.Equal(t, http.StatusOK, w.Code)
require.Equal(t, "{}", w.Body.String())
}
<<<<<<< HEAD
=======

func TestChangefeedSynced(t *testing.T) {
syncedInfo := testCase{url: "/api/v2/changefeeds/%s/synced?namespace=abc", method: "GET"}
helpers := NewMockAPIV2Helpers(gomock.NewController(t))
cp := mock_capture.NewMockCapture(gomock.NewController(t))
owner := mock_owner.NewMockOwner(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(cp, helpers)
router := newRouter(apiV2)

statusProvider := &mockStatusProvider{}

cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsController().Return(true).AnyTimes()
cp.EXPECT().GetOwner().Return(owner, nil).AnyTimes()

pdClient := &mockPDClient{}
mockUpManager := upstream.NewManager4Test(pdClient)
cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes()

{
// case 1: invalid changefeed id
w := httptest.NewRecorder()
invalidID := "@^Invalid"
req, _ := http.NewRequestWithContext(context.Background(),
syncedInfo.method, fmt.Sprintf(syncedInfo.url, invalidID), nil)
router.ServeHTTP(w, req)
respErr := model.HTTPError{}
err := json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrAPIInvalidParam")
}

{
// case 2: not existed changefeed id
validID := changeFeedID.ID
statusProvider.err = cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID)
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID), nil)
router.ServeHTTP(w, req)
respErr := model.HTTPError{}
err := json.NewDecoder(w.Body).Decode(&respErr)
require.Nil(t, err)
require.Contains(t, respErr.Code, "ErrChangeFeedNotExists")
require.Equal(t, http.StatusBadRequest, w.Code)
}

validID := changeFeedID.ID
cfInfo := &model.ChangeFeedInfo{
ID: validID,
}
statusProvider.err = nil
statusProvider.changefeedInfo = cfInfo
{
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case3: pd is offline,resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
LastSyncedTs: 1701153217279 << 18,
PullerResolvedTs: 1701153247279 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, "+
"please recheck. Besides the data is not finish syncing", resp.Info)
}

{
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
// case4: pd is offline,resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
LastSyncedTs: 1701153217279 << 18,
PullerResolvedTs: 1701153217479 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, please recheck. "+
"You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than 300 secs. "+
"If it's satisfied, means the data syncing is totally finished", resp.Info)
}

helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(pdClient, nil).AnyTimes()
pdClient.logicTime = 1000
pdClient.timestamp = 1701153217279
{
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217209 << 18,
LastSyncedTs: 1701152217279 << 18,
PullerResolvedTs: 1701153217229 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, true, resp.Synced)
require.Equal(t, "Data syncing is finished", resp.Info)
}

{
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
LastSyncedTs: 1701152217279 << 18,
PullerResolvedTs: 1701153201379 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "Please check whether PD is online and TiKV Regions are all available. "+
"If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. "+
"To check whether TiKV regions are all available, you can view "+
"'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. "+
"If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. "+
"Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait", resp.Info)
}

{
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153201279 << 18,
LastSyncedTs: 1701152207279 << 18,
PullerResolvedTs: 1701153218279 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "The data syncing is not finished, please wait", resp.Info)
}

{
// case8: pdTs - lastSyncedTs < 5min
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
CheckpointTs: 1701153217279 << 18,
LastSyncedTs: 1701153213279 << 18,
PullerResolvedTs: 1701153217279 << 18,
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(
context.Background(),
syncedInfo.method,
fmt.Sprintf(syncedInfo.url, validID),
nil,
)
router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
resp := SyncedStatus{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, false, resp.Synced)
require.Equal(t, "The data syncing is not finished, please wait", resp.Info)
}
}

func TestHasRunningImport(t *testing.T) {
integration.BeforeTestExternal(t)
testEtcdCluster := integration.NewClusterV3(
t, &integration.ClusterConfig{Size: 1},
)
defer testEtcdCluster.Terminate(t)

ctx := context.Background()
client := testEtcdCluster.RandClient()
hasImport := hasRunningImport(ctx, client)
require.NoError(t, hasImport)

lease, err := client.Lease.Grant(ctx, 3*60)
require.NoError(t, err)

_, err = client.KV.Put(
ctx, filepath.Join(RegisterImportTaskPrefix, "pitr"),
"", clientv3.WithLease(lease.ID),
)
require.NoError(t, err)

hasImport = hasRunningImport(ctx, client)
require.NotNil(t, hasImport)
require.Contains(
t, hasImport.Error(), "There are lightning/restore tasks running",
)
}
>>>>>>> 36aed02d3e (cdc: update info message for ‘/api/v2/changefeeds/{changefeed_id}/synced’ (#10399))
Loading
Loading