diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 1cc070d7618..2173aa92b0b 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -889,6 +889,145 @@ func (h *OpenAPIV2) status(c *gin.Context) { }) } +<<<<<<< HEAD +======= +// synced get the synced status of a changefeed +// @Summary Get synced status +// @Description get the synced status of a changefeed +// @Tags changefeed,v2 +// @Accept json +// @Produce json +// @Param changefeed_id path string true "changefeed_id" +// @Param namespace query string false "default" +// @Success 200 {object} SyncedStatus +// @Failure 500,400 {object} model.HTTPError +// @Router /api/v2/changefeeds/{changefeed_id}/synced [get] +func (h *OpenAPIV2) synced(c *gin.Context) { + ctx := c.Request.Context() + + namespace := getNamespaceValueWithDefault(c) + changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)} + if err := model.ValidateChangefeedID(changefeedID.ID); err != nil { + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", + changefeedID.ID)) + return + } + + status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(ctx, changefeedID) + if err != nil { + _ = c.Error(err) + return + } + + log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID)) + + cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()} + if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) { + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval + cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval + } + + // try to get pd client to get pd time, and determine synced status based on the pd time + if len(cfg.PDAddrs) == 0 { + up, err := getCaptureDefaultUpstream(h.capture) + if err != nil { + _ = c.Error(err) + return + } + cfg.PDConfig = getUpstreamPDConfig(up) + } + credential := cfg.PDConfig.toCredential() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + // case 1. we can't get pd client, pd may be unavailable. + // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced + // otherwise, if pd is unavailable, we decide data whether is synced based on + // the time difference between current time and lastSyncedTs. + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) + } else { + message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ + "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(0, 0)), + Info: message, + }) + return + } + defer pdClient.Close() + // get time from pd + physicalNow, _, _ := pdClient.GetTS(ctx) + + // We can normally get pd time. Thus we determine synced status based on physicalNow, lastSyncedTs, checkpointTs and pullerResolvedTs + if (physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) && + (physicalNow-oracle.ExtractPhysical(status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) { + // case 2: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs < CheckpointInterval + // --> reach strict synced status + c.JSON(http.StatusOK, SyncedStatus{ + Synced: true, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)), + Info: "Data syncing is finished", + }) + return + } + + if physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 { + // case 3: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs > CheckpointInterval + // we should consider the situation that pd or tikv region is not healthy to block the advancing resolveTs. + // if pullerResolvedTs - checkpointTs > CheckpointInterval--> data is not synced + // otherwise, if pd & tikv is healthy --> data is not synced + // if not healthy --> data is synced + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) < + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("Please check whether PD is online and TiKV Regions are all available. " + + "If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. " + + "To check whether TiKV regions are all available, you can view " + + "'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. " + + "If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. " + + "Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait") + } else { + message = "The data syncing is not finished, please wait" + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)), + Info: message, + }) + return + } + + // case 4: If physcialNow - lastSyncedTs < SyncedCheckInterval --> data is not synced + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)), + Info: "The data syncing is not finished, please wait", + }) +} + +>>>>>>> 36aed02d3e (cdc: update info message for ‘/api/v2/changefeeds/{changefeed_id}/synced’ (#10399)) func toAPIModel( info *model.ChangeFeedInfo, resolvedTs uint64, diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index e37cb4b4cda..99ed2842618 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -964,6 +964,216 @@ func TestPauseChangefeed(t *testing.T) { require.Equal(t, "{}", w.Body.String()) } +<<<<<<< HEAD +======= +func TestChangefeedSynced(t *testing.T) { + syncedInfo := testCase{url: "/api/v2/changefeeds/%s/synced?namespace=abc", method: "GET"} + helpers := NewMockAPIV2Helpers(gomock.NewController(t)) + cp := mock_capture.NewMockCapture(gomock.NewController(t)) + owner := mock_owner.NewMockOwner(gomock.NewController(t)) + apiV2 := NewOpenAPIV2ForTest(cp, helpers) + router := newRouter(apiV2) + + statusProvider := &mockStatusProvider{} + + cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() + cp.EXPECT().IsReady().Return(true).AnyTimes() + cp.EXPECT().IsController().Return(true).AnyTimes() + cp.EXPECT().GetOwner().Return(owner, nil).AnyTimes() + + pdClient := &mockPDClient{} + mockUpManager := upstream.NewManager4Test(pdClient) + cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes() + + { + // case 1: invalid changefeed id + w := httptest.NewRecorder() + invalidID := "@^Invalid" + req, _ := http.NewRequestWithContext(context.Background(), + syncedInfo.method, fmt.Sprintf(syncedInfo.url, invalidID), nil) + router.ServeHTTP(w, req) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Code, "ErrAPIInvalidParam") + } + + { + // case 2: not existed changefeed id + validID := changeFeedID.ID + statusProvider.err = cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID) + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext(context.Background(), syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), nil) + router.ServeHTTP(w, req) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Code, "ErrChangeFeedNotExists") + require.Equal(t, http.StatusBadRequest, w.Code) + } + + validID := changeFeedID.ID + cfInfo := &model.ChangeFeedInfo{ + ID: validID, + } + statusProvider.err = nil + statusProvider.changefeedInfo = cfInfo + { + helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) + // case3: pd is offline,resolvedTs - checkpointTs > 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217279 << 18, + LastSyncedTs: 1701153217279 << 18, + PullerResolvedTs: 1701153247279 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, "+ + "please recheck. Besides the data is not finish syncing", resp.Info) + } + + { + helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) + // case4: pd is offline,resolvedTs - checkpointTs < 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217279 << 18, + LastSyncedTs: 1701153217279 << 18, + PullerResolvedTs: 1701153217479 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, please recheck. "+ + "You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than 300 secs. "+ + "If it's satisfied, means the data syncing is totally finished", resp.Info) + } + + helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(pdClient, nil).AnyTimes() + pdClient.logicTime = 1000 + pdClient.timestamp = 1701153217279 + { + // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217209 << 18, + LastSyncedTs: 1701152217279 << 18, + PullerResolvedTs: 1701153217229 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, true, resp.Synced) + require.Equal(t, "Data syncing is finished", resp.Info) + } + + { + // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153201279 << 18, + LastSyncedTs: 1701152217279 << 18, + PullerResolvedTs: 1701153201379 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "Please check whether PD is online and TiKV Regions are all available. "+ + "If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. "+ + "To check whether TiKV regions are all available, you can view "+ + "'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. "+ + "If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. "+ + "Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait", resp.Info) + } + + { + // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153201279 << 18, + LastSyncedTs: 1701152207279 << 18, + PullerResolvedTs: 1701153218279 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "The data syncing is not finished, please wait", resp.Info) + } + + { + // case8: pdTs - lastSyncedTs < 5min + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217279 << 18, + LastSyncedTs: 1701153213279 << 18, + PullerResolvedTs: 1701153217279 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "The data syncing is not finished, please wait", resp.Info) + } +} + +>>>>>>> 36aed02d3e (cdc: update info message for ‘/api/v2/changefeeds/{changefeed_id}/synced’ (#10399)) func TestHasRunningImport(t *testing.T) { integration.BeforeTestExternal(t) testEtcdCluster := integration.NewClusterV3( diff --git a/tests/integration_tests/synced_status/run.sh b/tests/integration_tests/synced_status/run.sh new file mode 100644 index 00000000000..6b7fb4d0a05 --- /dev/null +++ b/tests/integration_tests/synced_status/run.sh @@ -0,0 +1,308 @@ +#!/bin/bash + +# [DISCRIPTION]: +# This test is related to +# It will test the sync status request of cdc server in the following scenarios: +# 1. The sync status request of cdc server when the upstream cluster is available +# 1.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold +# 1.2 pdNow - lastSyncedTs < threshold +# 1.3 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold, resolvedTs - checkpointTs > threshold +# 2. The sync status request of cdc server when the upstream pd is unavailable +# 2.1 resolvedTs - checkpointTs < threshold +# 3. The sync status request of cdc server when the upstream tikv is unavailable +# 3.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs > threshold, resolvedTs - checkpointTs < threshold +# 3.2 pdNow - lastSyncedTs < threshold +# 4. The sync status request of cdc server when the downstream tidb is available +# 4.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold +# 4.2 pdNow - lastSyncedTs < threshold +# [STEP]: +# 1. Create changefeed with synced-time-config = xx +# 2. insert data to upstream cluster, and do the related actions for each scenarios +# 3. do the query of synced status of cdc server +# 4. check the info and status of query + +set -xeu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function kill_pd() { + info=$(ps aux | grep pd-server | grep $WORK_DIR) || true + $(ps aux | grep pd-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function kill_tikv() { + info=$(ps aux | grep tikv-server | grep $WORK_DIR) || true + $(ps aux | grep tikv-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function kill_tidb() { + info=$(ps aux | grep tidb-server | grep $WORK_DIR) || true + $(ps aux | grep tidb-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function run_normal_case_and_unavailable_pd() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 1: test in available cluster + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + + status=$(echo $synced_status | jq '.synced') + sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts') + puller_resolved_ts=$(echo $synced_status | jq -r '.puller_resolved_ts') + last_synced_ts=$(echo $synced_status | jq -r '.last_synced_ts') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + # the timestamp for puller_resolved_ts is 0 when do data insert + if [ "$puller_resolved_ts" != "1970-01-01 08:00:00.000" ]; then + echo "puller_resolved_ts is not 1970-01-01 08:00:00.000" + exit 1 + fi + # the timestamp for last_synced_ts is 0 when do data insert + if [ "$last_synced_ts" != "1970-01-01 08:00:00.000" ]; then + echo "last_synced_ts is not 1970-01-01 08:00:00.000" + exit 1 + fi + + # compare sink_checkpoint_ts with current time + current=$(date +"%Y-%m-%d %H:%M:%S") + echo "sink_checkpoint_ts is "$sink_checkpoint_ts + checkpoint_timestamp=$(date -d "$sink_checkpoint_ts" +%s) + current_timestamp=$(date -d "$current" +%s) + if [ $(($current_timestamp - $checkpoint_timestamp)) -gt 300 ]; then # give a soft check + echo "sink_checkpoint_ts is not correct" + exit 1 + fi + + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # wait data insert + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + if [ "$info" != "The data syncing is not finished, please wait" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + + #========== + # case 2: test with unavailable pd, query will not get the available response + kill_pd + + sleep 20 + + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + error_code=$(echo $synced_status | jq -r '.error_code') + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_unavailable_tikv() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 3: test in unavailable tikv cluster + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # make data inserted into downstream + kill_tikv + + # test the case when pdNow - lastSyncedTs < threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="The data syncing is not finished, please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + # test the case when pdNow - lastSyncedTs > threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Please check whether PD is online and TiKV Regions are all available. \ +If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ +To check whether TiKV regions are all available, you can view \ +'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. \ +If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ +Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_unavailable_tidb() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 3: test in unavailable tikv cluster + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # make data inserted into downstream + kill_tidb + + # test the case when pdNow - lastSyncedTs < threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="The data syncing is not finished, please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + # test the case when pdNow - lastSyncedTs > threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Data syncing is finished" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_failpoint() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # make failpoint to block checkpoint-ts + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedOwnerNotUpdateCheckpoint=return(true)' + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + sleep 20 # wait enough time for pass checkpoint-check-interval + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Please check whether PD is online and TiKV Regions are all available. \ +If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. \ +To check whether TiKV regions are all available, you can view \ +'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. \ +If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. \ +Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait" + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + export GO_FAILPOINTS='' + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +trap stop_tidb_cluster EXIT +run_normal_case_and_unavailable_pd "conf/changefeed.toml" +run_case_with_unavailable_tikv "conf/changefeed.toml" +run_case_with_unavailable_tidb "conf/changefeed.toml" +run_case_with_failpoint "conf/changefeed.toml" + +# enable redo +run_normal_case_and_unavailable_pd "conf/changefeed-redo.toml" +run_case_with_unavailable_tikv "conf/changefeed-redo.toml" +run_case_with_unavailable_tidb "conf/changefeed-redo.toml" +run_case_with_failpoint "conf/changefeed-redo.toml" + +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"