From 9d01e29c3fdaf8fcc1320aba8d6572da5f4af588 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 21 Dec 2023 13:04:54 +0800 Subject: [PATCH] This is an automated cherry-pick of #10133 Signed-off-by: ti-chi-bot --- cdc/api/v1/api_test.go | 7 + cdc/api/v2/api.go | 13 + cdc/api/v2/api_test.go | 35 +- cdc/api/v2/changefeed.go | 143 ++++++++ cdc/api/v2/changefeed_test.go | 207 ++++++++++++ cdc/api/v2/model.go | 32 ++ cdc/api/v2/model_test.go | 3 +- cdc/model/changefeed.go | 88 +++++ cdc/owner/changefeed.go | 65 +++- cdc/owner/changefeed_test.go | 9 +- cdc/owner/mock/status_provider_mock.go | 15 + cdc/owner/owner.go | 31 ++ cdc/owner/status_provider.go | 32 ++ cdc/processor/processor.go | 1 + cdc/processor/sinkmanager/manager.go | 3 + cdc/processor/sinkmanager/manager_test.go | 29 +- .../sinkmanager/table_sink_wrapper.go | 11 + cdc/processor/tablepb/table.pb.go | 123 ++++--- cdc/processor/tablepb/table.proto | 1 + cdc/scheduler/internal/scheduler.go | 2 +- cdc/scheduler/internal/v3/coordinator.go | 47 ++- cdc/scheduler/internal/v3/coordinator_test.go | 64 +++- .../v3/replication/replication_manager.go | 92 +++--- .../replication/replication_manager_test.go | 164 ++++++++-- .../v3/replication/replication_set.go | 4 + cdc/scheduler/schedulepb/watermark.go | 24 ++ cdc/sink/tablesink/table_sink.go | 5 + cdc/sink/tablesink/table_sink_impl.go | 45 ++- cdc/sink/tablesink/table_sink_impl_test.go | 6 + pkg/config/config_test_data.go | 18 +- pkg/config/replica_config.go | 31 ++ pkg/config/synced_status_config.go | 23 ++ pkg/orchestrator/reactor_state_test.go | 11 +- tests/integration_tests/run_group.sh | 5 + .../synced_status/conf/changefeed-redo.toml | 7 + .../synced_status/conf/changefeed.toml | 3 + tests/integration_tests/synced_status/run.sh | 308 ++++++++++++++++++ 37 files changed, 1537 insertions(+), 170 deletions(-) mode change 100644 => 100755 cdc/owner/changefeed.go create mode 100644 cdc/scheduler/schedulepb/watermark.go create mode 100644 pkg/config/synced_status_config.go create mode 100644 tests/integration_tests/synced_status/conf/changefeed-redo.toml create mode 100644 tests/integration_tests/synced_status/conf/changefeed.toml create mode 100644 tests/integration_tests/synced_status/run.sh diff --git a/cdc/api/v1/api_test.go b/cdc/api/v1/api_test.go index fd56c7f7a21..e2a51d1effd 100644 --- a/cdc/api/v1/api_test.go +++ b/cdc/api/v1/api_test.go @@ -99,6 +99,13 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI return args.Get(0).([]*model.CaptureInfo), args.Error(1) } +func (p *mockStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context, + changefeedID model.ChangeFeedID, +) (*model.ChangeFeedSyncedStatusForAPI, error) { + args := p.Called(ctx) + return args.Get(0).(*model.ChangeFeedSyncedStatusForAPI), args.Error(1) +} + func (p *mockStatusProvider) IsHealthy(ctx context.Context) (bool, error) { args := p.Called(ctx) return args.Get(0).(bool), args.Error(1) diff --git a/cdc/api/v2/api.go b/cdc/api/v2/api.go index d00a0a577ad..123c9470a5e 100644 --- a/cdc/api/v2/api.go +++ b/cdc/api/v2/api.go @@ -49,6 +49,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { // changefeed apis changefeedGroup := v2.Group("/changefeeds") +<<<<<<< HEAD changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture)) changefeedGroup.GET("/:changefeed_id", api.getChangeFeed) changefeedGroup.POST("", api.createChangefeed) @@ -59,6 +60,18 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { changefeedGroup.POST("/:changefeed_id/resume", api.resumeChangefeed) changefeedGroup.POST("/:changefeed_id/pause", api.pauseChangefeed) changefeedGroup.GET("/:changefeed_id/status", api.status) +======= + changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.getChangeFeed) + changefeedGroup.POST("", controllerMiddleware, api.createChangefeed) + changefeedGroup.GET("", controllerMiddleware, api.listChangeFeeds) + changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, api.updateChangefeed) + changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, api.deleteChangefeed) + changefeedGroup.GET("/:changefeed_id/meta_info", changefeedOwnerMiddleware, api.getChangeFeedMetaInfo) + changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed) + changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed) + changefeedGroup.GET("/:changefeed_id/status", changefeedOwnerMiddleware, api.status) + changefeedGroup.GET("/:changefeed_id/synced", changefeedOwnerMiddleware, api.synced) +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) // capture apis captureGroup := v2.Group("/captures") diff --git a/cdc/api/v2/api_test.go b/cdc/api/v2/api_test.go index 29f57285175..31c4d0899be 100644 --- a/cdc/api/v2/api_test.go +++ b/cdc/api/v2/api_test.go @@ -48,8 +48,13 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, } // GetTS of mockPDClient returns a mock tso +<<<<<<< HEAD func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) { return m.logicTime, m.timestamp, nil +======= +func (c *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return c.timestamp, c.logicTime, nil +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) } // GetClusterID of mockPDClient returns a mock ClusterID @@ -62,13 +67,14 @@ func (c *mockPDClient) Close() {} type mockStatusProvider struct { owner.StatusProvider - changefeedStatus *model.ChangeFeedStatusForAPI - changefeedInfo *model.ChangeFeedInfo - processors []*model.ProcInfoSnap - taskStatus map[model.CaptureID]*model.TaskStatus - changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo - changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI - err error + changefeedStatus *model.ChangeFeedStatusForAPI + changefeedInfo *model.ChangeFeedInfo + processors []*model.ProcInfoSnap + taskStatus map[model.CaptureID]*model.TaskStatus + changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo + changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI + changeFeedSyncedStatus *model.ChangeFeedSyncedStatusForAPI + err error } // GetChangeFeedStatus returns a changefeeds' runtime status. @@ -119,3 +125,18 @@ func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) ( ) { return m.changefeedStatuses, m.err } +<<<<<<< HEAD +======= + +// GetChangeFeedSyncedStatus returns a mock changefeed status. +func (m *mockStatusProvider) GetChangeFeedSyncedStatus(_ context.Context, changefeedID model.ChangeFeedID) ( + *model.ChangeFeedSyncedStatusForAPI, + error, +) { + return m.changeFeedSyncedStatus, m.err +} + +func (m *mockStatusProvider) IsChangefeedOwner(_ context.Context, id model.ChangeFeedID) (bool, error) { + return true, nil +} +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 1f7f2ec6648..654b7cac0e2 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -44,6 +44,13 @@ const ( apiOpVarChangefeedState = "state" // apiOpVarChangefeedID is the key of changefeed ID in HTTP API apiOpVarChangefeedID = "changefeed_id" +<<<<<<< HEAD +======= + // apiOpVarNamespace is the key of changefeed namespace in HTTP API + apiOpVarNamespace = "namespace" + // timeout for pd client + timeout = 30 * time.Second +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) ) // createChangefeed handles create changefeed request, @@ -826,6 +833,142 @@ func (h *OpenAPIV2) status(c *gin.Context) { }) } +// synced get the synced status of a changefeed +// @Summary Get synced status +// @Description get the synced status of a changefeed +// @Tags changefeed,v2 +// @Accept json +// @Produce json +// @Param changefeed_id path string true "changefeed_id" +// @Param namespace query string false "default" +// @Success 200 {object} SyncedStatus +// @Failure 500,400 {object} model.HTTPError +// @Router /api/v2/changefeeds/{changefeed_id}/synced [get] +func (h *OpenAPIV2) synced(c *gin.Context) { + ctx := c.Request.Context() + + namespace := getNamespaceValueWithDefault(c) + changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)} + if err := model.ValidateChangefeedID(changefeedID.ID); err != nil { + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", + changefeedID.ID)) + return + } + + status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(ctx, changefeedID) + if err != nil { + _ = c.Error(err) + return + } + + log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID)) + + cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()} + if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) { + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval + cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval + } + + // try to get pd client to get pd time, and determine synced status based on the pd time + if len(cfg.PDAddrs) == 0 { + up, err := getCaptureDefaultUpstream(h.capture) + if err != nil { + _ = c.Error(err) + return + } + cfg.PDConfig = getUpstreamPDConfig(up) + } + credential := cfg.PDConfig.toCredential() + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + // case 1. we can't get pd client, pd may be unavailable. + // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced + // otherwise, if pd is unavailable, we decide data whether is synced based on + // the time difference between current time and lastSyncedTs. + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) + } else { + message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ + "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(0, 0)), + Info: message, + }) + return + } + defer pdClient.Close() + // get time from pd + physicalNow, _, _ := pdClient.GetTS(ctx) + + // We can normally get pd time. Thus we determine synced status based on physicalNow, lastSyncedTs, checkpointTs and pullerResolvedTs + if (physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) && + (physicalNow-oracle.ExtractPhysical(status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) { + // case 2: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs < CheckpointInterval + // --> reach strict synced status + c.JSON(http.StatusOK, SyncedStatus{ + Synced: true, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)), + Info: "Data syncing is finished", + }) + return + } + + if physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 { + // case 3: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs > CheckpointInterval + // we should consider the situation that pd or tikv region is not healthy to block the advancing resolveTs. + // if pullerResolvedTs - checkpointTs > CheckpointInterval--> data is not synced + // otherwise, if pd & tikv is healthy --> data is not synced + // if not healthy --> data is synced + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) < + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("Please check whether pd is healthy and tikv region is all available. " + + "If pd is not healthy or tikv region is not available, the data syncing is finished. " + + "When pd is offline means that pd is not healthy. For tikv region, you can check the grafana info " + + "in 'TiKV-Details-Resolved-Ts-Max Leader Resolved TS gap'. If the gap is a large value, such as a few minutes, " + + "it means some regions in tikv are unavailable. " + + " Otherwise the data syncing is not finished, please wait") + } else { + message = "The data syncing is not finished, please wait" + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)), + Info: message, + }) + return + } + + // case 4: If physcialNow - lastSyncedTs < SyncedCheckInterval --> data is not synced + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)), + Info: "The data syncing is not finished, please wait", + }) +} + func toAPIModel( info *model.ChangeFeedInfo, resolvedTs uint64, diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 5b7dcfb03c0..26498813116 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -951,6 +951,213 @@ func TestPauseChangefeed(t *testing.T) { require.Equal(t, "{}", w.Body.String()) } +func TestChangefeedSynced(t *testing.T) { + syncedInfo := testCase{url: "/api/v2/changefeeds/%s/synced?namespace=abc", method: "GET"} + helpers := NewMockAPIV2Helpers(gomock.NewController(t)) + cp := mock_capture.NewMockCapture(gomock.NewController(t)) + owner := mock_owner.NewMockOwner(gomock.NewController(t)) + apiV2 := NewOpenAPIV2ForTest(cp, helpers) + router := newRouter(apiV2) + + statusProvider := &mockStatusProvider{} + + cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes() + cp.EXPECT().IsReady().Return(true).AnyTimes() + cp.EXPECT().IsController().Return(true).AnyTimes() + cp.EXPECT().GetOwner().Return(owner, nil).AnyTimes() + + pdClient := &mockPDClient{} + mockUpManager := upstream.NewManager4Test(pdClient) + cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes() + + { + // case 1: invalid changefeed id + w := httptest.NewRecorder() + invalidID := "@^Invalid" + req, _ := http.NewRequestWithContext(context.Background(), + syncedInfo.method, fmt.Sprintf(syncedInfo.url, invalidID), nil) + router.ServeHTTP(w, req) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Code, "ErrAPIInvalidParam") + } + + { + // case 2: not existed changefeed id + validID := changeFeedID.ID + statusProvider.err = cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID) + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext(context.Background(), syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), nil) + router.ServeHTTP(w, req) + respErr := model.HTTPError{} + err := json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Code, "ErrChangeFeedNotExists") + require.Equal(t, http.StatusBadRequest, w.Code) + } + + validID := changeFeedID.ID + cfInfo := &model.ChangeFeedInfo{ + ID: validID, + } + statusProvider.err = nil + statusProvider.changefeedInfo = cfInfo + { + helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) + // case3: pd is offline,resolvedTs - checkpointTs > 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217279 << 18, + LastSyncedTs: 1701153217279 << 18, + PullerResolvedTs: 1701153247279 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, "+ + "please recheck. Besides the data is not finish syncing", resp.Info) + } + + { + helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) + // case4: pd is offline,resolvedTs - checkpointTs < 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217279 << 18, + LastSyncedTs: 1701153217279 << 18, + PullerResolvedTs: 1701153217479 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "[CDC:ErrAPIGetPDClientFailed]failed to get PDClient to connect PD, please recheck. "+ + "You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than 300 secs. "+ + "If it's satisfied, means the data syncing is totally finished", resp.Info) + } + + helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(pdClient, nil).AnyTimes() + pdClient.logicTime = 1000 + pdClient.timestamp = 1701153217279 + { + // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217209 << 18, + LastSyncedTs: 1701152217279 << 18, + PullerResolvedTs: 1701153217229 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, true, resp.Synced) + require.Equal(t, "Data syncing is finished", resp.Info) + } + + { + // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153201279 << 18, + LastSyncedTs: 1701152217279 << 18, + PullerResolvedTs: 1701153201379 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "Please check whether pd is healthy and tikv region is all available. "+ + "If pd is not healthy or tikv region is not available, the data syncing is finished. "+ + "When pd is offline means that pd is not healthy. For tikv region, you can check the grafana info "+ + "in 'TiKV-Details-Resolved-Ts-Max Leader Resolved TS gap'. If the gap is a large value, such as a few minutes, "+ + "it means some regions in tikv are unavailable. "+ + " Otherwise the data syncing is not finished, please wait", resp.Info) + } + + { + // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153201279 << 18, + LastSyncedTs: 1701152207279 << 18, + PullerResolvedTs: 1701153218279 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "The data syncing is not finished, please wait", resp.Info) + } + + { + // case8: pdTs - lastSyncedTs < 5min + statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ + CheckpointTs: 1701153217279 << 18, + LastSyncedTs: 1701153213279 << 18, + PullerResolvedTs: 1701153217279 << 18, + } + w := httptest.NewRecorder() + req, _ := http.NewRequestWithContext( + context.Background(), + syncedInfo.method, + fmt.Sprintf(syncedInfo.url, validID), + nil, + ) + router.ServeHTTP(w, req) + require.Equal(t, http.StatusOK, w.Code) + resp := SyncedStatus{} + err := json.NewDecoder(w.Body).Decode(&resp) + require.Nil(t, err) + require.Equal(t, false, resp.Synced) + require.Equal(t, "The data syncing is not finished, please wait", resp.Info) + } +} + func TestHasRunningImport(t *testing.T) { integration.BeforeTestExternal(t) testEtcdCluster := integration.NewClusterV3( diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index bc5157da987..b8472ff0122 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -102,6 +102,15 @@ type ChangefeedCommonInfo struct { RunningError *model.RunningError `json:"error"` } +// SyncedStatusConfig represents synced check interval config for a changefeed +type SyncedStatusConfig struct { + // The minimum interval between the latest synced ts and now required to reach synced state + SyncedCheckInterval int64 `json:"synced_check_interval"` + // The maximum interval between latest checkpoint ts and now or + // between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state + CheckpointInterval int64 `json:"checkpoint_interval"` +} + // MarshalJSON marshal changefeed common info to json // we need to set feed state to normal if it is uninitialized and pending to warning // to hide the detail of uninitialized and pending state from user @@ -195,6 +204,7 @@ type ReplicaConfig struct { Integrity *IntegrityConfig `json:"integrity"` ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"` SQLMode string `json:"sql_mode,omitempty"` + SyncedStatus *SyncedStatusConfig `json:"synced_status,omitempty"` } // ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig @@ -445,6 +455,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( if c.ChangefeedErrorStuckDuration != nil { res.ChangefeedErrorStuckDuration = &c.ChangefeedErrorStuckDuration.duration } + if c.SyncedStatus != nil { + res.SyncedStatus = &config.SyncedStatusConfig{ + SyncedCheckInterval: c.SyncedStatus.SyncedCheckInterval, + CheckpointInterval: c.SyncedStatus.CheckpointInterval, + } + } return res } @@ -692,6 +708,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { if cloned.ChangefeedErrorStuckDuration != nil { res.ChangefeedErrorStuckDuration = &JSONDuration{*cloned.ChangefeedErrorStuckDuration} } + if cloned.SyncedStatus != nil { + res.SyncedStatus = &SyncedStatusConfig{ + SyncedCheckInterval: cloned.SyncedStatus.SyncedCheckInterval, + CheckpointInterval: cloned.SyncedStatus.CheckpointInterval, + } + } return res } @@ -920,6 +942,16 @@ type ChangeFeedInfo struct { TaskStatus []model.CaptureTaskStatus `json:"task_status,omitempty"` } +// SyncedStatus describes the detail of a changefeed's synced status +type SyncedStatus struct { + Synced bool `json:"synced"` + SinkCheckpointTs model.JSONTime `json:"sink_checkpoint_ts"` + PullerResolvedTs model.JSONTime `json:"puller_resolved_ts"` + LastSyncedTs model.JSONTime `json:"last_synced_ts"` + NowTs model.JSONTime `json:"now_ts"` + Info string `json:"info"` +} + // RunningError represents some running error from cdc components, // such as processor. type RunningError struct { diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index ed08876ab40..b6558f24dea 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -85,7 +85,8 @@ var defaultAPIConfig = &ReplicaConfig{ }, ChangefeedErrorStuckDuration: &JSONDuration{*config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration}, - SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SyncedStatus: (*SyncedStatusConfig)(config.GetDefaultReplicaConfig().SyncedStatus), } func TestDefaultReplicaConfig(t *testing.T) { diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 6868add16ed..608cc328551 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -339,6 +339,85 @@ func (info *ChangeFeedInfo) VerifyAndComplete() { if info.Config.SQLMode == "" { info.Config.SQLMode = defaultConfig.SQLMode } +<<<<<<< HEAD +======= + if info.Config.SyncedStatus == nil { + info.Config.SyncedStatus = defaultConfig.SyncedStatus + } + info.RmUnusedFields() +} + +// RmUnusedFields removes unnecessary fields based on the downstream type and +// the protocol. Since we utilize a common changefeed configuration template, +// certain fields may not be utilized for certain protocols. +func (info *ChangeFeedInfo) RmUnusedFields() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + log.Warn( + "failed to parse the sink uri", + zap.Error(err), + zap.Any("sinkUri", info.SinkURI), + ) + return + } + // blackhole is for testing purpose, no need to remove fields + if sink.IsBlackHoleScheme(uri.Scheme) { + return + } + if !sink.IsMQScheme(uri.Scheme) { + info.rmMQOnlyFields() + } else { + // remove schema registry for MQ downstream with + // protocol other than avro + if util.GetOrZero(info.Config.Sink.Protocol) != config.ProtocolAvro.String() { + info.Config.Sink.SchemaRegistry = nil + } + } + + if !sink.IsStorageScheme(uri.Scheme) { + info.rmStorageOnlyFields() + } + + if !sink.IsMySQLCompatibleScheme(uri.Scheme) { + info.rmDBOnlyFields() + } else { + // remove fields only being used by MQ and Storage downstream + info.Config.Sink.Protocol = nil + info.Config.Sink.Terminator = nil + } +} + +func (info *ChangeFeedInfo) rmMQOnlyFields() { + log.Info("since the downstream is not a MQ, remove MQ only fields", + zap.String("namespace", info.Namespace), + zap.String("changefeed", info.ID)) + info.Config.Sink.DispatchRules = nil + info.Config.Sink.SchemaRegistry = nil + info.Config.Sink.EncoderConcurrency = nil + info.Config.Sink.EnableKafkaSinkV2 = nil + info.Config.Sink.OnlyOutputUpdatedColumns = nil + info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil + info.Config.Sink.ContentCompatible = nil + info.Config.Sink.KafkaConfig = nil +} + +func (info *ChangeFeedInfo) rmStorageOnlyFields() { + info.Config.Sink.CSVConfig = nil + info.Config.Sink.DateSeparator = nil + info.Config.Sink.EnablePartitionSeparator = nil + info.Config.Sink.FileIndexWidth = nil + info.Config.Sink.CloudStorageConfig = nil +} + +func (info *ChangeFeedInfo) rmDBOnlyFields() { + info.Config.EnableSyncPoint = nil + info.Config.BDRMode = nil + info.Config.SyncPointInterval = nil + info.Config.SyncPointRetention = nil + info.Config.Consistent = nil + info.Config.Sink.SafeMode = nil + info.Config.Sink.MySQLConfig = nil +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) } // FixIncompatible fixes incompatible changefeed meta info. @@ -582,3 +661,12 @@ type ChangeFeedStatusForAPI struct { // initializing the changefeed. MinTableBarrierTs uint64 `json:"min-table-barrier-ts"` } + +// ChangeFeedSyncedStatusForAPI uses to transfer the synced status of changefeed for API. +type ChangeFeedSyncedStatusForAPI struct { + CheckpointTs uint64 `json:"checkpoint-ts"` + LastSyncedTs uint64 `json:"last-sync-time"` + PullerResolvedTs uint64 `json:"puller-resolved-ts"` + SyncedCheckInterval int64 `json:"synced-check-interval"` + CheckpointInterval int64 `json:"checkpoint-interval"` +} diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go old mode 100644 new mode 100755 index d9788608373..29ba54f0439 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -16,6 +16,7 @@ package owner import ( "context" "fmt" + "math" "strings" "sync" "time" @@ -78,6 +79,13 @@ type changefeed struct { feedStateManager *feedStateManager resolvedTs model.Ts + // lastSyncedTs is the lastest resolvedTs that has been synced to downstream. + // pullerResolvedTs is the minimum resolvedTs of all pullers. + // we don't need to initialize lastSyncedTs and pullerResolvedTs specially + // because it will be updated in tick. + lastSyncedTs model.Ts + pullerResolvedTs model.Ts + // ddl related fields ddlManager *ddlManager redoDDLMgr redo.DDLManager @@ -370,19 +378,47 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* return nil } +<<<<<<< HEAD newCheckpointTs, newResolvedTs, err := c.scheduler.Tick( ctx, preCheckpointTs, allPhysicalTables, captures, barrier) +======= + watermark, err := c.scheduler.Tick( + ctx, preCheckpointTs, allPhysicalTables, captures, + barrier) +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) if err != nil { return errors.Trace(err) } + if c.lastSyncedTs < watermark.LastSyncedTs { + c.lastSyncedTs = watermark.LastSyncedTs + } else if c.lastSyncedTs > watermark.LastSyncedTs { + log.Warn("LastSyncedTs should not be greater than newLastSyncedTs", + zap.Uint64("c.LastSyncedTs", c.lastSyncedTs), + zap.Uint64("newLastSyncedTs", watermark.LastSyncedTs)) + } + + if watermark.PullerResolvedTs != scheduler.CheckpointCannotProceed && watermark.PullerResolvedTs != math.MaxUint64 { + if watermark.PullerResolvedTs > c.pullerResolvedTs { + c.pullerResolvedTs = watermark.PullerResolvedTs + } else if watermark.PullerResolvedTs < c.pullerResolvedTs { + log.Warn("the newPullerResolvedTs should not be smaller than c.pullerResolvedTs", + zap.Uint64("c.pullerResolvedTs", c.pullerResolvedTs), + zap.Uint64("newPullerResolvedTs", watermark.PullerResolvedTs)) + } + } pdTime, _ := c.upstream.PDClock.CurrentTime() currentTs := oracle.GetPhysical(pdTime) // CheckpointCannotProceed implies that not all tables are being replicated normally, // so in that case there is no need to advance the global watermarks. +<<<<<<< HEAD if newCheckpointTs == scheduler.CheckpointCannotProceed { if c.state.Status != nil { +======= + if watermark.CheckpointTs == scheduler.CheckpointCannotProceed { + if c.latestStatus != nil { +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) // We should keep the metrics updated even if the scheduler cannot // advance the watermarks for now. c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.resolvedTs) @@ -392,13 +428,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* log.Debug("owner prepares to update status", zap.Uint64("prevResolvedTs", c.resolvedTs), - zap.Uint64("newResolvedTs", newResolvedTs), - zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("newResolvedTs", watermark.ResolvedTs), + zap.Uint64("newCheckpointTs", watermark.CheckpointTs), zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID)) // resolvedTs should never regress. - if newResolvedTs > c.resolvedTs { - c.resolvedTs = newResolvedTs + if watermark.ResolvedTs > c.resolvedTs { + c.resolvedTs = watermark.ResolvedTs } // MinTableBarrierTs should never regress @@ -411,6 +447,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* log.Info("owner won't update checkpoint because of failpoint", zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), +<<<<<<< HEAD zap.Uint64("keepCheckpoint", c.state.Status.CheckpointTs), zap.Uint64("skipCheckpoint", newCheckpointTs)) newCheckpointTs = c.state.Status.CheckpointTs @@ -422,6 +459,22 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* c.tickDownstreamObserver(ctx) return nil +======= + zap.Uint64("keepCheckpoint", c.latestStatus.CheckpointTs), + zap.Uint64("skipCheckpoint", watermark.CheckpointTs)) + watermark.CheckpointTs = c.latestStatus.CheckpointTs + } + }) + + failpoint.Inject("ChangefeedOwnerNotUpdateCheckpoint", func() { + watermark.CheckpointTs = c.latestStatus.CheckpointTs + }) + + c.updateMetrics(currentTs, watermark.CheckpointTs, c.resolvedTs) + c.tickDownstreamObserver(ctx) + + return watermark.CheckpointTs, barrier.MinTableBarrierTs, nil +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) } func (c *changefeed) initialize(ctx cdcContext.Context) (err error) { @@ -459,7 +512,11 @@ LOOP2: c.resolvedTs = checkpointTs } +<<<<<<< HEAD minTableBarrierTs := c.state.Status.MinTableBarrierTs +======= + minTableBarrierTs := c.latestStatus.MinTableBarrierTs +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) failpoint.Inject("NewChangefeedNoRetryError", func() { failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs)) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 5d125418437..73b09a0aa19 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -164,9 +164,14 @@ func (m *mockScheduler) Tick( currentTables []model.TableID, captures map[model.CaptureID]*model.CaptureInfo, barrier *schedulepb.BarrierWithMinTs, -) (newCheckpointTs, newResolvedTs model.Ts, err error) { +) (watermark schedulepb.Watermark, err error) { m.currentTables = currentTables - return barrier.MinTableBarrierTs, barrier.GlobalBarrierTs, nil + return schedulepb.Watermark{ + CheckpointTs: barrier.MinTableBarrierTs, + ResolvedTs: barrier.GlobalBarrierTs, + LastSyncedTs: scheduler.CheckpointCannotProceed, + PullerResolvedTs: scheduler.CheckpointCannotProceed, + }, nil } // MoveTable is used to trigger manual table moves. diff --git a/cdc/owner/mock/status_provider_mock.go b/cdc/owner/mock/status_provider_mock.go index 5e1230f6291..f86021b291f 100644 --- a/cdc/owner/mock/status_provider_mock.go +++ b/cdc/owner/mock/status_provider_mock.go @@ -125,6 +125,21 @@ func (mr *MockStatusProviderMockRecorder) GetChangeFeedStatus(ctx, changefeedID return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChangeFeedStatus", reflect.TypeOf((*MockStatusProvider)(nil).GetChangeFeedStatus), ctx, changefeedID) } +// GetChangeFeedSyncedStatus mocks base method. +func (m *MockStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedSyncedStatusForAPI, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetChangeFeedSyncedStatus", ctx, changefeedID) + ret0, _ := ret[0].(*model.ChangeFeedSyncedStatusForAPI) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetChangeFeedSyncedStatus indicates an expected call of GetChangeFeedSyncedStatus. +func (mr *MockStatusProviderMockRecorder) GetChangeFeedSyncedStatus(ctx, changefeedID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChangeFeedSyncedStatus", reflect.TypeOf((*MockStatusProvider)(nil).GetChangeFeedSyncedStatus), ctx, changefeedID) +} + // GetProcessors mocks base method. func (m *MockStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { m.ctrl.T.Helper() diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 12275337e91..b977fc072e7 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -505,6 +505,7 @@ func (o *ownerImpl) handleQueries(query *Query) error { ret[cfID].CheckpointTs = cfReactor.state.Status.CheckpointTs } query.Data = ret +<<<<<<< HEAD case QueryAllChangeFeedInfo: ret := map[model.ChangeFeedID]*model.ChangeFeedInfo{} for cfID, cfReactor := range o.changefeeds { @@ -515,6 +516,36 @@ func (o *ownerImpl) handleQueries(query *Query) error { ret[cfID] = &model.ChangeFeedInfo{} continue } +======= + case QueryChangeFeedSyncedStatus: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] + if !ok { + query.Data = nil + return nil + } + ret := &model.ChangeFeedSyncedStatusForAPI{} + ret.LastSyncedTs = cfReactor.lastSyncedTs + ret.CheckpointTs = cfReactor.latestStatus.CheckpointTs + ret.PullerResolvedTs = cfReactor.pullerResolvedTs + + if cfReactor.latestInfo == nil { + ret.CheckpointInterval = 0 + ret.SyncedCheckInterval = 0 + } else { + ret.CheckpointInterval = cfReactor.latestInfo.Config.SyncedStatus.CheckpointInterval + ret.SyncedCheckInterval = cfReactor.latestInfo.Config.SyncedStatus.SyncedCheckInterval + } + query.Data = ret + case QueryChangefeedInfo: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] + if !ok { + query.Data = nil + return nil + } + if cfReactor.latestInfo == nil { + query.Data = &model.ChangeFeedInfo{} + } else { +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) var err error ret[cfID], err = cfReactor.state.Info.Clone() if err != nil { diff --git a/cdc/owner/status_provider.go b/cdc/owner/status_provider.go index 9644f34e7d9..85e4cd9a499 100644 --- a/cdc/owner/status_provider.go +++ b/cdc/owner/status_provider.go @@ -30,8 +30,13 @@ type StatusProvider interface { // GetChangeFeedStatus returns a changefeeds' runtime status. GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatusForAPI, error) +<<<<<<< HEAD // GetAllChangeFeedInfo returns all changefeeds' info. GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) +======= + // GetChangeFeedSyncedStatus returns a changefeeds' synced status. + GetChangeFeedSyncedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedSyncedStatusForAPI, error) +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) // GetChangeFeedInfo returns a changefeeds' info. GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) @@ -65,6 +70,17 @@ const ( QueryCaptures // QueryHealth is the type of query cluster health info. QueryHealth +<<<<<<< HEAD +======= + // QueryOwner is the type of query changefeed owner + QueryOwner + // QueryChangefeedInfo is the type of query changefeed info + QueryChangefeedInfo + // QueryChangeFeedStatuses is the type of query changefeed status + QueryChangeFeedStatuses + // QueryChangeFeedSyncedStatus is the type of query changefeed synced status + QueryChangeFeedSyncedStatus +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) ) // Query wraps query command and return results. @@ -122,6 +138,22 @@ func (p *ownerStatusProvider) GetAllChangeFeedInfo(ctx context.Context) ( return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedInfo), nil } +func (p *ownerStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context, + changefeedID model.ChangeFeedID, +) (*model.ChangeFeedSyncedStatusForAPI, error) { + query := &Query{ + Tp: QueryChangeFeedSyncedStatus, + ChangeFeedID: changefeedID, + } + if err := p.sendQueryToOwner(ctx, query); err != nil { + return nil, errors.Trace(err) + } + if query.Data == nil { + return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID) + } + return query.Data.(*model.ChangeFeedSyncedStatusForAPI), nil +} + func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID, ) (*model.ChangeFeedInfo, error) { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index e9202dc321e..3acb2ebfa02 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -342,6 +342,7 @@ func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tabl Checkpoint: tablepb.Checkpoint{ CheckpointTs: sinkStats.CheckpointTs, ResolvedTs: sinkStats.ResolvedTs, + LastSyncedTs: sinkStats.LastSyncedTs, }, State: state, Stats: stats, diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 776af8cdf01..c2765e72af6 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -58,6 +58,7 @@ const ( type TableStats struct { CheckpointTs model.Ts ResolvedTs model.Ts + LastSyncedTs model.Ts BarrierTs model.Ts } @@ -1006,6 +1007,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { tableSink := value.(*tableSinkWrapper) checkpointTs := tableSink.getCheckpointTs() + lastSyncedTs := tableSink.getLastSyncedTs() m.sinkMemQuota.Release(span, checkpointTs) m.redoMemQuota.Release(span, checkpointTs) @@ -1048,6 +1050,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { return TableStats{ CheckpointTs: checkpointTs.ResolvedMark(), ResolvedTs: resolvedTs, + LastSyncedTs: lastSyncedTs, BarrierTs: tableSink.barrierTs.Load(), } } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 00eeebb3702..992743db857 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -96,6 +96,13 @@ func addTableAndAddEventsToSortEngine( CRTs: 4, }, }, + { + CRTs: 6, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypeResolved, + CRTs: 6, + }, + }, } for _, event := range events { engine.Add(span, event) @@ -198,10 +205,16 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - tableSink, ok := manager.tableSinks.Load(span) - require.True(t, ok) - checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() - return checkpointTS.ResolvedMark() == 4 + s := manager.GetTableStats(span) + return s.CheckpointTs == 4 && s.LastSyncedTs == 4 + }, 5*time.Second, 10*time.Millisecond) + + manager.UpdateBarrierTs(6, nil) + manager.UpdateReceivedSorterResolvedTs(span, 6) + manager.schemaStorage.AdvanceResolvedTs(6) + require.Eventually(t, func() bool { + s := manager.GetTableStats(span) + return s.CheckpointTs == 6 && s.LastSyncedTs == 4 }, 5*time.Second, 10*time.Millisecond) } @@ -229,10 +242,8 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - tableSink, ok := manager.tableSinks.Load(span) - require.True(t, ok) - checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() - return checkpointTS.ResolvedMark() == 3 + s := manager.GetTableStats(span) + return s.CheckpointTs == 3 && s.LastSyncedTs == 3 }, 5*time.Second, 10*time.Millisecond) } @@ -260,7 +271,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { require.Eventually(t, func() bool { s := manager.GetTableStats(span) - return manager.sinkMemQuota.GetUsedBytes() == 0 && s.CheckpointTs == 4 + return manager.sinkMemQuota.GetUsedBytes() == 0 && s.CheckpointTs == 4 && s.LastSyncedTs == 4 }, 5*time.Second, 10*time.Millisecond) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 5e10a4bdcb7..2171f04bfb9 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -58,6 +58,7 @@ type tableSinkWrapper struct { advanced time.Time resolvedTs model.ResolvedTs checkpointTs model.ResolvedTs + lastSyncedTs model.Ts } // state used to control the lifecycle of the table. @@ -223,6 +224,15 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { return t.tableSink.s.UpdateResolvedTs(ts) } +func (t *tableSinkWrapper) getLastSyncedTs() uint64 { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s != nil { + return t.tableSink.s.GetLastSyncedTs() + } + return t.tableSink.lastSyncedTs +} + func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs { t.tableSink.RLock() defer t.tableSink.RUnlock() @@ -362,6 +372,7 @@ func (t *tableSinkWrapper) doTableSinkClear() { t.tableSink.checkpointTs = checkpointTs } t.tableSink.resolvedTs = checkpointTs + t.tableSink.lastSyncedTs = t.tableSink.s.GetLastSyncedTs() t.tableSink.advanced = time.Now() t.tableSink.innerMu.Unlock() t.tableSink.s = nil diff --git a/cdc/processor/tablepb/table.pb.go b/cdc/processor/tablepb/table.pb.go index da01e6d59ef..c4b86e83406 100644 --- a/cdc/processor/tablepb/table.pb.go +++ b/cdc/processor/tablepb/table.pb.go @@ -117,6 +117,7 @@ var xxx_messageInfo_Span proto.InternalMessageInfo type Checkpoint struct { CheckpointTs Ts `protobuf:"varint,1,opt,name=checkpoint_ts,json=checkpointTs,proto3,casttype=Ts" json:"checkpoint_ts,omitempty"` ResolvedTs Ts `protobuf:"varint,2,opt,name=resolved_ts,json=resolvedTs,proto3,casttype=Ts" json:"resolved_ts,omitempty"` + LastSyncedTs Ts `protobuf:"varint,3,opt,name=last_synced_ts,json=lastSyncedTs,proto3,casttype=Ts" json:"last_synced_ts,omitempty"` } func (m *Checkpoint) Reset() { *m = Checkpoint{} } @@ -166,6 +167,13 @@ func (m *Checkpoint) GetResolvedTs() Ts { return 0 } +func (m *Checkpoint) GetLastSyncedTs() Ts { + if m != nil { + return m.LastSyncedTs + } + return 0 +} + // Stats holds a statistic for a table. type Stats struct { // Number of captured regions. @@ -329,50 +337,52 @@ func init() { func init() { proto.RegisterFile("processor/tablepb/table.proto", fileDescriptor_ae83c9c6cf5ef75c) } var fileDescriptor_ae83c9c6cf5ef75c = []byte{ - // 688 bytes of a gzipped FileDescriptorProto + // 706 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xbf, 0x6f, 0xd3, 0x4e, - 0x1c, 0xb5, 0x9d, 0x5f, 0xcd, 0x27, 0xf9, 0x7e, 0xe5, 0xde, 0xb7, 0xed, 0x37, 0x44, 0x22, 0x31, - 0x51, 0x81, 0xaa, 0x95, 0x1c, 0x28, 0x0b, 0xea, 0xd6, 0xb4, 0x80, 0xaa, 0x0a, 0x09, 0xb9, 0x81, - 0x81, 0x25, 0xf2, 0x8f, 0xc3, 0xb5, 0x1a, 0xce, 0x96, 0xef, 0xd2, 0x2a, 0x1b, 0x23, 0xca, 0x02, - 0x13, 0x62, 0x89, 0xd4, 0x3f, 0xa7, 0x63, 0x47, 0x06, 0x14, 0x41, 0x2a, 0x66, 0xf6, 0x4e, 0xe8, - 0xee, 0xdc, 0xb8, 0x09, 0x0c, 0xa1, 0x4b, 0x72, 0xbe, 0xf7, 0x3e, 0xcf, 0xef, 0xbd, 0x3b, 0x19, - 0x6e, 0x47, 0x71, 0xe8, 0x62, 0x4a, 0xc3, 0xb8, 0xc9, 0x6c, 0xa7, 0x8b, 0x23, 0x47, 0xfe, 0x9b, - 0x51, 0x1c, 0xb2, 0x10, 0xad, 0x46, 0x01, 0xf1, 0x5d, 0x3b, 0x32, 0x59, 0xf0, 0xa6, 0x1b, 0x9e, - 0x98, 0xae, 0xe7, 0x9a, 0x93, 0x09, 0x33, 0x99, 0xa8, 0x2e, 0xf9, 0xa1, 0x1f, 0x8a, 0x81, 0x26, - 0x5f, 0xc9, 0xd9, 0xc6, 0x07, 0x15, 0xb2, 0x07, 0x91, 0x4d, 0xd0, 0x43, 0x58, 0x10, 0xcc, 0x4e, - 0xe0, 0x55, 0x54, 0x43, 0x5d, 0xcb, 0xb4, 0x56, 0xc6, 0xa3, 0x7a, 0xa1, 0xcd, 0xf7, 0xf6, 0x76, - 0x2f, 0xd3, 0xa5, 0x55, 0x10, 0xbc, 0x3d, 0x0f, 0xad, 0x42, 0x91, 0x32, 0x3b, 0x66, 0x9d, 0x23, - 0xdc, 0xaf, 0x68, 0x86, 0xba, 0x56, 0x6e, 0x15, 0x2e, 0x47, 0xf5, 0xcc, 0x3e, 0xee, 0x5b, 0x0b, - 0x02, 0xd9, 0xc7, 0x7d, 0x64, 0x40, 0x01, 0x13, 0x4f, 0x70, 0x32, 0xd3, 0x9c, 0x3c, 0x26, 0xde, - 0x3e, 0xee, 0x6f, 0x95, 0xdf, 0x9f, 0xd6, 0x95, 0xcf, 0xa7, 0x75, 0xe5, 0xdd, 0x57, 0x43, 0x69, - 0x38, 0x00, 0x3b, 0x87, 0xd8, 0x3d, 0x8a, 0xc2, 0x80, 0x30, 0xb4, 0x01, 0xff, 0xb8, 0x93, 0xa7, - 0x0e, 0xa3, 0xc2, 0x5b, 0xb6, 0x95, 0xbf, 0x1c, 0xd5, 0xb5, 0x36, 0xb5, 0xca, 0x29, 0xd8, 0xa6, - 0xe8, 0x3e, 0x94, 0x62, 0x4c, 0xc3, 0xee, 0x31, 0xf6, 0x38, 0x55, 0x9b, 0xa2, 0xc2, 0x15, 0xd4, - 0xa6, 0x8d, 0x1f, 0x1a, 0xe4, 0x0e, 0x98, 0xcd, 0x28, 0xba, 0x03, 0xe5, 0x18, 0xfb, 0x41, 0x48, - 0x3a, 0x6e, 0xd8, 0x23, 0x4c, 0xca, 0x5b, 0x25, 0xb9, 0xb7, 0xc3, 0xb7, 0xd0, 0x5d, 0x00, 0xb7, - 0x17, 0xc7, 0x58, 0xbe, 0x7f, 0x5a, 0xb4, 0x98, 0x20, 0x6d, 0x8a, 0x18, 0x2c, 0x52, 0x66, 0xfb, - 0xb8, 0x93, 0x5a, 0xa2, 0x95, 0x8c, 0x91, 0x59, 0x2b, 0x6d, 0x6e, 0x9b, 0xf3, 0x9c, 0x90, 0x29, - 0x1c, 0xf1, 0x5f, 0x1f, 0xa7, 0x0d, 0xd0, 0x27, 0x84, 0xc5, 0xfd, 0x56, 0xf6, 0x6c, 0x54, 0x57, - 0x2c, 0x9d, 0xce, 0x80, 0xdc, 0x9c, 0x63, 0xc7, 0x71, 0x80, 0x63, 0x6e, 0x2e, 0x3b, 0x6d, 0x2e, - 0x41, 0xda, 0xb4, 0xda, 0x83, 0xe5, 0x3f, 0xea, 0x22, 0x1d, 0x32, 0xfc, 0x64, 0x78, 0xec, 0xa2, - 0xc5, 0x97, 0xe8, 0x29, 0xe4, 0x8e, 0xed, 0x6e, 0x0f, 0x8b, 0xa4, 0xa5, 0xcd, 0x07, 0xf3, 0x79, - 0x4f, 0x85, 0x2d, 0x39, 0xbe, 0xa5, 0x3d, 0x56, 0x1b, 0x3f, 0x35, 0x28, 0x89, 0x6b, 0xc3, 0xa3, - 0xf5, 0xe8, 0x4d, 0x2e, 0xd9, 0x2e, 0x64, 0x69, 0x64, 0x93, 0x4a, 0x4e, 0xb8, 0x59, 0x9f, 0xb3, - 0xc9, 0xc8, 0x26, 0x49, 0x65, 0x62, 0x9a, 0x87, 0xa2, 0xcc, 0x66, 0x32, 0xd4, 0xbf, 0xf3, 0x86, - 0x9a, 0x58, 0xc7, 0x96, 0x1c, 0x47, 0xaf, 0x00, 0xd2, 0xe3, 0x15, 0xf7, 0xf9, 0x06, 0x0d, 0x25, - 0xce, 0xae, 0x29, 0xa1, 0x67, 0xd2, 0x9f, 0x3c, 0xc1, 0xd2, 0xe6, 0xc6, 0x5f, 0x5c, 0x98, 0x44, - 0x4d, 0xce, 0xaf, 0x7f, 0xd2, 0x00, 0x52, 0xdb, 0xa8, 0x01, 0x85, 0x97, 0xe4, 0x88, 0x84, 0x27, - 0x44, 0x57, 0xaa, 0xcb, 0x83, 0xa1, 0xb1, 0x98, 0x82, 0x09, 0x80, 0x0c, 0xc8, 0x6f, 0x3b, 0x14, - 0x13, 0xa6, 0xab, 0xd5, 0xa5, 0xc1, 0xd0, 0xd0, 0x53, 0x8a, 0xdc, 0x47, 0xf7, 0xa0, 0xf8, 0x22, - 0xc6, 0x91, 0x1d, 0x07, 0xc4, 0xd7, 0xb5, 0xea, 0xff, 0x83, 0xa1, 0xf1, 0x5f, 0x4a, 0x9a, 0x40, - 0x68, 0x15, 0x16, 0xe4, 0x03, 0xf6, 0xf4, 0x4c, 0x75, 0x65, 0x30, 0x34, 0xd0, 0x2c, 0x0d, 0x7b, - 0x68, 0x1d, 0x4a, 0x16, 0x8e, 0xba, 0x81, 0x6b, 0x33, 0xae, 0x97, 0xad, 0xde, 0x1a, 0x0c, 0x8d, - 0xe5, 0x6b, 0x5d, 0xa7, 0x20, 0x57, 0x3c, 0x60, 0x61, 0xc4, 0xdb, 0xd0, 0x73, 0xb3, 0x8a, 0x57, - 0x08, 0x4f, 0x29, 0xd6, 0xd8, 0xd3, 0xf3, 0xb3, 0x29, 0x13, 0xa0, 0xf5, 0xfc, 0xfc, 0x7b, 0x4d, - 0x39, 0x1b, 0xd7, 0xd4, 0xf3, 0x71, 0x4d, 0xfd, 0x36, 0xae, 0xa9, 0x1f, 0x2f, 0x6a, 0xca, 0xf9, - 0x45, 0x4d, 0xf9, 0x72, 0x51, 0x53, 0x5e, 0x37, 0xfd, 0x80, 0x1d, 0xf6, 0x1c, 0xd3, 0x0d, 0xdf, - 0x36, 0x93, 0xea, 0x9b, 0xb2, 0xfa, 0xa6, 0xeb, 0xb9, 0xcd, 0xdf, 0xbe, 0xbf, 0x4e, 0x5e, 0x7c, - 0x3e, 0x1f, 0xfd, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x80, 0x0e, 0x45, 0x99, 0x9b, 0x05, 0x00, 0x00, + 0x1c, 0xb5, 0x9d, 0x5f, 0xcd, 0x27, 0xf9, 0x56, 0xee, 0x7d, 0xdb, 0x52, 0x22, 0x91, 0x98, 0xa8, + 0x40, 0xd5, 0x22, 0x07, 0xca, 0x82, 0xba, 0x35, 0x2d, 0xa0, 0xaa, 0x42, 0x42, 0x4e, 0x60, 0x60, + 0x89, 0x1c, 0xfb, 0x70, 0xad, 0x86, 0xb3, 0xe5, 0xbb, 0xb4, 0xca, 0xc6, 0x88, 0xb2, 0xd0, 0x09, + 0xb1, 0x44, 0xea, 0x9f, 0xd3, 0xb1, 0x23, 0x03, 0x8a, 0x20, 0x15, 0x33, 0x7b, 0x27, 0x74, 0x77, + 0x6e, 0xdc, 0x04, 0x86, 0xd0, 0x25, 0x39, 0xdf, 0x7b, 0x9f, 0xa7, 0xf7, 0xde, 0x9d, 0x0e, 0xee, + 0x84, 0x51, 0xe0, 0x60, 0x4a, 0x83, 0xa8, 0xc6, 0xec, 0x76, 0x07, 0x87, 0x6d, 0xf9, 0x6f, 0x86, + 0x51, 0xc0, 0x02, 0xb4, 0x1a, 0xfa, 0xc4, 0x73, 0xec, 0xd0, 0x64, 0xfe, 0xbb, 0x4e, 0x70, 0x6c, + 0x3a, 0xae, 0x63, 0x8e, 0x27, 0xcc, 0x78, 0xa2, 0xb4, 0xe8, 0x05, 0x5e, 0x20, 0x06, 0x6a, 0x7c, + 0x25, 0x67, 0xab, 0x9f, 0x54, 0x48, 0x37, 0x42, 0x9b, 0xa0, 0xc7, 0x30, 0x27, 0x98, 0x2d, 0xdf, + 0x5d, 0x51, 0x0d, 0x75, 0x2d, 0x55, 0x5f, 0x1e, 0x0d, 0x2b, 0xb9, 0x26, 0xdf, 0xdb, 0xdb, 0xbd, + 0x4c, 0x96, 0x56, 0x4e, 0xf0, 0xf6, 0x5c, 0xb4, 0x0a, 0x79, 0xca, 0xec, 0x88, 0xb5, 0x0e, 0x71, + 0x6f, 0x45, 0x33, 0xd4, 0xb5, 0x62, 0x3d, 0x77, 0x39, 0xac, 0xa4, 0xf6, 0x71, 0xcf, 0x9a, 0x13, + 0xc8, 0x3e, 0xee, 0x21, 0x03, 0x72, 0x98, 0xb8, 0x82, 0x93, 0x9a, 0xe4, 0x64, 0x31, 0x71, 0xf7, + 0x71, 0x6f, 0xab, 0xf8, 0xf1, 0xb4, 0xa2, 0x7c, 0x39, 0xad, 0x28, 0x1f, 0xbe, 0x19, 0x4a, 0xf5, + 0x44, 0x05, 0xd8, 0x39, 0xc0, 0xce, 0x61, 0x18, 0xf8, 0x84, 0xa1, 0x0d, 0xf8, 0xcf, 0x19, 0x7f, + 0xb5, 0x18, 0x15, 0xe6, 0xd2, 0xf5, 0xec, 0xe5, 0xb0, 0xa2, 0x35, 0xa9, 0x55, 0x4c, 0xc0, 0x26, + 0x45, 0x0f, 0xa0, 0x10, 0x61, 0x1a, 0x74, 0x8e, 0xb0, 0xcb, 0xa9, 0xda, 0x04, 0x15, 0xae, 0xa0, + 0x26, 0x45, 0x0f, 0x61, 0xbe, 0x63, 0x53, 0xd6, 0xa2, 0x3d, 0xe2, 0x48, 0x6e, 0x6a, 0x52, 0x96, + 0xa3, 0x0d, 0x01, 0x36, 0x69, 0xf5, 0xa7, 0x06, 0x99, 0x06, 0xb3, 0x19, 0x45, 0x77, 0xa1, 0x18, + 0x61, 0xcf, 0x0f, 0x48, 0xcb, 0x09, 0xba, 0x84, 0x49, 0x33, 0x56, 0x41, 0xee, 0xed, 0xf0, 0x2d, + 0x74, 0x0f, 0xc0, 0xe9, 0x46, 0x11, 0x96, 0x6e, 0x27, 0x2d, 0xe4, 0x63, 0xa4, 0x49, 0x11, 0x83, + 0x05, 0xca, 0x6c, 0x0f, 0xb7, 0x92, 0x00, 0xdc, 0x44, 0x6a, 0xad, 0xb0, 0xb9, 0x6d, 0xce, 0x72, + 0xa0, 0xa6, 0x70, 0xc4, 0x7f, 0x3d, 0x9c, 0xf4, 0x45, 0x9f, 0x11, 0x16, 0xf5, 0xea, 0xe9, 0xb3, + 0x61, 0x45, 0xb1, 0x74, 0x3a, 0x05, 0x72, 0x73, 0x6d, 0x3b, 0x8a, 0x7c, 0x1c, 0x71, 0x73, 0xe9, + 0x49, 0x73, 0x31, 0xd2, 0xa4, 0xa5, 0x2e, 0x2c, 0xfd, 0x55, 0x17, 0xe9, 0x90, 0xe2, 0x07, 0xc9, + 0x63, 0xe7, 0x2d, 0xbe, 0x44, 0xcf, 0x21, 0x73, 0x64, 0x77, 0xba, 0x58, 0x24, 0x2d, 0x6c, 0x3e, + 0x9a, 0xcd, 0x7b, 0x22, 0x6c, 0xc9, 0xf1, 0x2d, 0xed, 0xa9, 0x5a, 0xfd, 0xa5, 0x41, 0x41, 0xdc, + 0x32, 0x1e, 0xad, 0x4b, 0x6f, 0x72, 0x27, 0x77, 0x21, 0x4d, 0x43, 0x9b, 0xac, 0x64, 0x84, 0x9b, + 0xf5, 0x19, 0x9b, 0x0c, 0x6d, 0x12, 0x57, 0x26, 0xa6, 0x79, 0x28, 0xca, 0x6c, 0x26, 0x43, 0xcd, + 0xcf, 0x1a, 0x6a, 0x6c, 0x1d, 0x5b, 0x72, 0x1c, 0xbd, 0x01, 0x48, 0x8e, 0x57, 0x5c, 0xb1, 0x1b, + 0x34, 0x14, 0x3b, 0xbb, 0xa6, 0x84, 0x5e, 0x48, 0x7f, 0xf2, 0x04, 0x0b, 0x9b, 0x1b, 0xff, 0x70, + 0x61, 0x62, 0x35, 0x39, 0xbf, 0xfe, 0x59, 0x03, 0x48, 0x6c, 0xa3, 0x2a, 0xe4, 0x5e, 0x93, 0x43, + 0x12, 0x1c, 0x13, 0x5d, 0x29, 0x2d, 0xf5, 0x07, 0xc6, 0x42, 0x02, 0xc6, 0x00, 0x32, 0x20, 0xbb, + 0xdd, 0xa6, 0x98, 0x30, 0x5d, 0x2d, 0x2d, 0xf6, 0x07, 0x86, 0x9e, 0x50, 0xe4, 0x3e, 0xba, 0x0f, + 0xf9, 0x57, 0x11, 0x0e, 0xed, 0xc8, 0x27, 0x9e, 0xae, 0x95, 0x6e, 0xf5, 0x07, 0xc6, 0xff, 0x09, + 0x69, 0x0c, 0xa1, 0x55, 0x98, 0x93, 0x1f, 0xd8, 0xd5, 0x53, 0xa5, 0xe5, 0xfe, 0xc0, 0x40, 0xd3, + 0x34, 0xec, 0xa2, 0x75, 0x28, 0x58, 0x38, 0xec, 0xf8, 0x8e, 0xcd, 0xb8, 0x5e, 0xba, 0x74, 0xbb, + 0x3f, 0x30, 0x96, 0xae, 0x75, 0x9d, 0x80, 0x5c, 0xb1, 0xc1, 0x82, 0x90, 0xb7, 0xa1, 0x67, 0xa6, + 0x15, 0xaf, 0x10, 0x9e, 0x52, 0xac, 0xb1, 0xab, 0x67, 0xa7, 0x53, 0xc6, 0x40, 0xfd, 0xe5, 0xf9, + 0x8f, 0xb2, 0x72, 0x36, 0x2a, 0xab, 0xe7, 0xa3, 0xb2, 0xfa, 0x7d, 0x54, 0x56, 0x4f, 0x2e, 0xca, + 0xca, 0xf9, 0x45, 0x59, 0xf9, 0x7a, 0x51, 0x56, 0xde, 0xd6, 0x3c, 0x9f, 0x1d, 0x74, 0xdb, 0xa6, + 0x13, 0xbc, 0xaf, 0xc5, 0xd5, 0xd7, 0x64, 0xf5, 0x35, 0xc7, 0x75, 0x6a, 0x7f, 0x3c, 0xd7, 0xed, + 0xac, 0x78, 0x6d, 0x9f, 0xfc, 0x0e, 0x00, 0x00, 0xff, 0xff, 0xa3, 0xeb, 0x06, 0x9b, 0xca, 0x05, + 0x00, 0x00, } func (m *Span) Marshal() (dAtA []byte, err error) { @@ -437,6 +447,11 @@ func (m *Checkpoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.LastSyncedTs != 0 { + i = encodeVarintTable(dAtA, i, uint64(m.LastSyncedTs)) + i-- + dAtA[i] = 0x18 + } if m.ResolvedTs != 0 { i = encodeVarintTable(dAtA, i, uint64(m.ResolvedTs)) i-- @@ -618,6 +633,9 @@ func (m *Checkpoint) Size() (n int) { if m.ResolvedTs != 0 { n += 1 + sovTable(uint64(m.ResolvedTs)) } + if m.LastSyncedTs != 0 { + n += 1 + sovTable(uint64(m.LastSyncedTs)) + } return n } @@ -879,6 +897,25 @@ func (m *Checkpoint) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LastSyncedTs", wireType) + } + m.LastSyncedTs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.LastSyncedTs |= Ts(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipTable(dAtA[iNdEx:]) diff --git a/cdc/processor/tablepb/table.proto b/cdc/processor/tablepb/table.proto index 97de3a23e57..3727edc0c13 100644 --- a/cdc/processor/tablepb/table.proto +++ b/cdc/processor/tablepb/table.proto @@ -57,6 +57,7 @@ enum TableState { message Checkpoint { uint64 checkpoint_ts = 1 [(gogoproto.casttype) = "Ts"]; uint64 resolved_ts = 2 [(gogoproto.casttype) = "Ts"]; + uint64 last_synced_ts = 3 [(gogoproto.casttype) = "Ts"]; } // Stats holds a statistic for a table. diff --git a/cdc/scheduler/internal/scheduler.go b/cdc/scheduler/internal/scheduler.go index 111911b5632..28e241b9dd8 100644 --- a/cdc/scheduler/internal/scheduler.go +++ b/cdc/scheduler/internal/scheduler.go @@ -47,7 +47,7 @@ type Scheduler interface { // ddl jobs that need to be replicated. The Scheduler will // broadcast the barrierTs to all captures through the Heartbeat. barrier *schedulepb.BarrierWithMinTs, - ) (newCheckpointTs, newResolvedTs model.Ts, err error) + ) (watermark schedulepb.Watermark, err error) // MoveTable requests that a table be moved to target. // It is thread-safe. diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index 2a98c7be959..cb184cdf691 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -127,7 +127,7 @@ func (c *coordinator) Tick( // All captures that are alive according to the latest Etcd states. aliveCaptures map[model.CaptureID]*model.CaptureInfo, barrier *schedulepb.BarrierWithMinTs, -) (newCheckpointTs, newResolvedTs model.Ts, err error) { +) (watermark schedulepb.Watermark, err error) { startTime := time.Now() defer func() { costTime := time.Since(startTime) @@ -273,7 +273,7 @@ func (c *coordinator) poll( currentTables []model.TableID, aliveCaptures map[model.CaptureID]*model.CaptureInfo, barrier *schedulepb.BarrierWithMinTs, -) (newCheckpointTs, newResolvedTs model.Ts, err error) { +) (watermark schedulepb.Watermark, err error) { c.maybeCollectMetrics() if c.compat.UpdateCaptureInfo(aliveCaptures) { spanReplicationEnabled := c.compat.CheckSpanReplicationEnabled() @@ -284,7 +284,12 @@ func (c *coordinator) poll( recvMsgs, err := c.recvMsgs(ctx) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return schedulepb.Watermark{ + CheckpointTs: checkpointCannotProceed, + ResolvedTs: checkpointCannotProceed, + LastSyncedTs: checkpointCannotProceed, + PullerResolvedTs: checkpointCannotProceed, + }, errors.Trace(err) } var msgBuf []*schedulepb.Message @@ -296,7 +301,12 @@ func (c *coordinator) poll( // Handle received messages to advance replication set. msgs, err = c.replicationM.HandleMessage(recvMsgs) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return schedulepb.Watermark{ + CheckpointTs: checkpointCannotProceed, + ResolvedTs: checkpointCannotProceed, + LastSyncedTs: checkpointCannotProceed, + PullerResolvedTs: checkpointCannotProceed, + }, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) @@ -313,13 +323,13 @@ func (c *coordinator) poll( if !c.captureM.CheckAllCaptureInitialized() { // Skip generating schedule tasks for replication manager, // as not all capture are initialized. - newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) + watermark = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) // tick capture manager after checkpoint calculation to take account resolvedTs in barrier // when redo is enabled msgs = c.captureM.Tick(c.replicationM.ReplicationSets(), c.schedulerM.DrainingTarget(), barrier.Barrier) msgBuf = append(msgBuf, msgs...) - return newCheckpointTs, newResolvedTs, c.sendMsgs(ctx, msgBuf) + return watermark, c.sendMsgs(ctx, msgBuf) } // Handle capture membership changes. @@ -327,7 +337,12 @@ func (c *coordinator) poll( msgs, err = c.replicationM.HandleCaptureChanges( changes.Init, changes.Removed, checkpointTs) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return schedulepb.Watermark{ + CheckpointTs: checkpointCannotProceed, + ResolvedTs: checkpointCannotProceed, + LastSyncedTs: checkpointCannotProceed, + PullerResolvedTs: checkpointCannotProceed, + }, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) } @@ -343,12 +358,17 @@ func (c *coordinator) poll( // Handle generated schedule tasks. msgs, err = c.replicationM.HandleTasks(allTasks) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return schedulepb.Watermark{ + CheckpointTs: checkpointCannotProceed, + ResolvedTs: checkpointCannotProceed, + LastSyncedTs: checkpointCannotProceed, + PullerResolvedTs: checkpointCannotProceed, + }, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) // Checkpoint calculation - newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) + watermark = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) // tick capture manager after checkpoint calculation to take account resolvedTs in barrier // when redo is enabled @@ -359,10 +379,15 @@ func (c *coordinator) poll( // Send new messages. err = c.sendMsgs(ctx, msgBuf) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return schedulepb.Watermark{ + CheckpointTs: checkpointCannotProceed, + ResolvedTs: checkpointCannotProceed, + LastSyncedTs: checkpointCannotProceed, + PullerResolvedTs: checkpointCannotProceed, + }, errors.Trace(err) } - return newCheckpointTs, newResolvedTs, nil + return watermark, nil } func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { diff --git a/cdc/scheduler/internal/v3/coordinator_test.go b/cdc/scheduler/internal/v3/coordinator_test.go index 8d691b4e01f..8caacc30ae3 100644 --- a/cdc/scheduler/internal/v3/coordinator_test.go +++ b/cdc/scheduler/internal/v3/coordinator_test.go @@ -251,7 +251,7 @@ func TestCoordinatorHeartbeat(t *testing.T) { ctx := context.Background() currentTables := []model.TableID{1, 2, 3} aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} - _, _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) + _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) require.Nil(t, err) msgs := trans.SendBuffer require.Len(t, msgs, 2) @@ -283,7 +283,7 @@ func TestCoordinatorHeartbeat(t *testing.T) { }, }) trans.SendBuffer = []*schedulepb.Message{} - _, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) + _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) require.Nil(t, err) require.True(t, coord.captureM.CheckAllCaptureInitialized()) msgs = trans.SendBuffer @@ -324,7 +324,7 @@ func TestCoordinatorAddCapture(t *testing.T) { ctx := context.Background() currentTables := []model.TableID{1, 2, 3} aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} - _, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) + _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) require.Nil(t, err) msgs = trans.SendBuffer require.Len(t, msgs, 1) @@ -340,7 +340,7 @@ func TestCoordinatorAddCapture(t *testing.T) { HeartbeatResponse: &schedulepb.HeartbeatResponse{}, }) trans.SendBuffer = []*schedulepb.Message{} - _, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) + _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) require.Nil(t, err) msgs = trans.SendBuffer require.Len(t, msgs, 1) @@ -381,7 +381,7 @@ func TestCoordinatorRemoveCapture(t *testing.T) { ctx := context.Background() currentTables := []model.TableID{1, 2, 3} aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} - _, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) + _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) require.Nil(t, err) msgs = trans.SendBuffer require.Len(t, msgs, 1) @@ -456,7 +456,7 @@ func TestCoordinatorAdvanceCheckpoint(t *testing.T) { ctx := context.Background() currentTables := []model.TableID{1, 2} aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} - _, _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) + _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0)) require.Nil(t, err) // Initialize captures. @@ -482,24 +482,40 @@ func TestCoordinatorAdvanceCheckpoint(t *testing.T) { Span: spanz.TableIDToComparableSpan(1), State: tablepb.TableStateReplicating, Checkpoint: tablepb.Checkpoint{ - CheckpointTs: 2, ResolvedTs: 4, + CheckpointTs: 2, ResolvedTs: 4, LastSyncedTs: 3, + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(5), + }, + }, }, }, { Span: spanz.TableIDToComparableSpan(2), State: tablepb.TableStateReplicating, Checkpoint: tablepb.Checkpoint{ - CheckpointTs: 2, ResolvedTs: 4, + CheckpointTs: 2, ResolvedTs: 4, LastSyncedTs: 4, + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(6), + }, + }, }, }, }, }, }) - cts, rts, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(5)) + watermark, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(5)) require.Nil(t, err) require.True(t, coord.captureM.CheckAllCaptureInitialized()) - require.EqualValues(t, 2, cts) - require.EqualValues(t, 4, rts) + require.EqualValues(t, 2, watermark.CheckpointTs) + require.EqualValues(t, 4, watermark.ResolvedTs) + require.EqualValues(t, 4, watermark.LastSyncedTs) + require.EqualValues(t, 5, watermark.PullerResolvedTs) // Checkpoint should be advanced even if there is an uninitialized capture. aliveCaptures["c"] = &model.CaptureInfo{} @@ -517,24 +533,40 @@ func TestCoordinatorAdvanceCheckpoint(t *testing.T) { Span: spanz.TableIDToComparableSpan(1), State: tablepb.TableStateReplicating, Checkpoint: tablepb.Checkpoint{ - CheckpointTs: 3, ResolvedTs: 5, + CheckpointTs: 3, ResolvedTs: 5, LastSyncedTs: 4, + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(7), + }, + }, }, }, { Span: spanz.TableIDToComparableSpan(2), State: tablepb.TableStateReplicating, Checkpoint: tablepb.Checkpoint{ - CheckpointTs: 4, ResolvedTs: 5, + CheckpointTs: 4, ResolvedTs: 5, LastSyncedTs: 6, + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(7), + }, + }, }, }, }, }, }) - cts, rts, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(5)) + watermark, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(5)) require.Nil(t, err) require.False(t, coord.captureM.CheckAllCaptureInitialized()) - require.EqualValues(t, 3, cts) - require.EqualValues(t, 5, rts) + require.EqualValues(t, 3, watermark.CheckpointTs) + require.EqualValues(t, 5, watermark.ResolvedTs) + require.EqualValues(t, 6, watermark.LastSyncedTs) + require.EqualValues(t, 7, watermark.PullerResolvedTs) } func TestCoordinatorDropMsgIfChangefeedEpochMismatch(t *testing.T) { diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 4228cc0ec75..8196a06a0f1 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -557,9 +557,9 @@ func (r *Manager) AdvanceCheckpoint( currentPDTime time.Time, barrier *schedulepb.BarrierWithMinTs, redoMetaManager redo.MetaManager, -) (newCheckpointTs, newResolvedTs model.Ts) { +) (watermark schedulepb.Watermark) { var redoFlushedResolvedTs model.Ts - limitBarrierWithRedo := func(newCheckpointTs, newResolvedTs uint64) (uint64, uint64) { + limitBarrierWithRedo := func(watermark *schedulepb.Watermark) { flushedMeta := redoMetaManager.GetFlushedMeta() redoFlushedResolvedTs = flushedMeta.ResolvedTs log.Debug("owner gets flushed redo meta", @@ -567,18 +567,17 @@ func (r *Manager) AdvanceCheckpoint( zap.String("changefeed", r.changefeedID.ID), zap.Uint64("flushedCheckpointTs", flushedMeta.CheckpointTs), zap.Uint64("flushedResolvedTs", flushedMeta.ResolvedTs)) - if flushedMeta.ResolvedTs < newResolvedTs { - newResolvedTs = flushedMeta.ResolvedTs + if flushedMeta.ResolvedTs < watermark.ResolvedTs { + watermark.ResolvedTs = flushedMeta.ResolvedTs } - if newCheckpointTs > newResolvedTs { - newCheckpointTs = newResolvedTs + if watermark.CheckpointTs > watermark.ResolvedTs { + watermark.CheckpointTs = watermark.ResolvedTs } - if barrier.GlobalBarrierTs > newResolvedTs { - barrier.GlobalBarrierTs = newResolvedTs + if barrier.GlobalBarrierTs > watermark.ResolvedTs { + barrier.GlobalBarrierTs = watermark.ResolvedTs } - return newCheckpointTs, newResolvedTs } defer func() { if redoFlushedResolvedTs != 0 && barrier.GlobalBarrierTs > redoFlushedResolvedTs { @@ -592,9 +591,14 @@ func (r *Manager) AdvanceCheckpoint( r.slowestPuller = tablepb.Span{} r.slowestSink = tablepb.Span{} - var slowestPullerResolvedTs uint64 = math.MaxUint64 - newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64 + watermark = schedulepb.Watermark{ + CheckpointTs: math.MaxUint64, + ResolvedTs: math.MaxUint64, + LastSyncedTs: 0, + PullerResolvedTs: math.MaxUint64, + } + cannotProceed := false currentTables.Iter(func(tableID model.TableID, tableStart, tableEnd tablepb.Span) bool { tableSpanFound, tableHasHole := false, false @@ -621,21 +625,26 @@ func (r *Manager) AdvanceCheckpoint( } // Find the minimum checkpoint ts and resolved ts. - if newCheckpointTs > table.Checkpoint.CheckpointTs { - newCheckpointTs = table.Checkpoint.CheckpointTs + if watermark.CheckpointTs > table.Checkpoint.CheckpointTs { + watermark.CheckpointTs = table.Checkpoint.CheckpointTs r.slowestSink = span } - if newResolvedTs > table.Checkpoint.ResolvedTs { - newResolvedTs = table.Checkpoint.ResolvedTs + if watermark.ResolvedTs > table.Checkpoint.ResolvedTs { + watermark.ResolvedTs = table.Checkpoint.ResolvedTs } + // Find the max lastSyncedTs of all tables. + if watermark.LastSyncedTs < table.Checkpoint.LastSyncedTs { + watermark.LastSyncedTs = table.Checkpoint.LastSyncedTs + } // Find the minimum puller resolved ts. if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok { - if slowestPullerResolvedTs > pullerCkpt.ResolvedTs { - slowestPullerResolvedTs = pullerCkpt.ResolvedTs + if watermark.PullerResolvedTs > pullerCkpt.ResolvedTs { + watermark.PullerResolvedTs = pullerCkpt.ResolvedTs r.slowestPuller = span } } + return true }) if !tableSpanFound || !tableSpanStartFound || !tableSpanEndFound || tableHasHole { @@ -663,32 +672,39 @@ func (r *Manager) AdvanceCheckpoint( if cannotProceed { if redoMetaManager.Enabled() { // If redo is enabled, GlobalBarrierTs should be limited by redo flushed meta. - newResolvedTs = barrier.RedoBarrierTs - limitBarrierWithRedo(newCheckpointTs, newResolvedTs) + watermark.ResolvedTs = barrier.RedoBarrierTs + watermark.LastSyncedTs = checkpointCannotProceed + watermark.PullerResolvedTs = checkpointCannotProceed + limitBarrierWithRedo(&watermark) + } + return schedulepb.Watermark{ + CheckpointTs: checkpointCannotProceed, + ResolvedTs: checkpointCannotProceed, + LastSyncedTs: checkpointCannotProceed, + PullerResolvedTs: checkpointCannotProceed, } - return checkpointCannotProceed, checkpointCannotProceed } // If currentTables is empty, we should advance newResolvedTs to global barrier ts and // advance newCheckpointTs to min table barrier ts. - if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 { - if newCheckpointTs != newResolvedTs || currentTables.Len() != 0 { + if watermark.ResolvedTs == math.MaxUint64 || watermark.CheckpointTs == math.MaxUint64 { + if watermark.CheckpointTs != watermark.ResolvedTs || currentTables.Len() != 0 { log.Panic("schedulerv3: newCheckpointTs and newResolvedTs should be both maxUint64 "+ "if currentTables is empty", - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("newResolvedTs", newResolvedTs), + zap.Uint64("newCheckpointTs", watermark.CheckpointTs), + zap.Uint64("newResolvedTs", watermark.ResolvedTs), zap.Any("currentTables", currentTables)) } - newResolvedTs = barrier.GlobalBarrierTs - newCheckpointTs = barrier.MinTableBarrierTs + watermark.ResolvedTs = barrier.GlobalBarrierTs + watermark.CheckpointTs = barrier.MinTableBarrierTs } - if newCheckpointTs > barrier.MinTableBarrierTs { - newCheckpointTs = barrier.MinTableBarrierTs + if watermark.CheckpointTs > barrier.MinTableBarrierTs { + watermark.CheckpointTs = barrier.MinTableBarrierTs // TODO: add panic after we fix the bug that newCheckpointTs > minTableBarrierTs. // log.Panic("schedulerv3: newCheckpointTs should not be larger than minTableBarrierTs", - // zap.Uint64("newCheckpointTs", newCheckpointTs), - // zap.Uint64("newResolvedTs", newResolvedTs), + // zap.Uint64("newCheckpointTs", watermark.CheckpointTs), + // zap.Uint64("newResolvedTs", watermark.ResolvedTs), // zap.Any("currentTables", currentTables.currentTables), // zap.Any("barrier", barrier.Barrier), // zap.Any("minTableBarrierTs", barrier.MinTableBarrierTs)) @@ -697,7 +713,7 @@ func (r *Manager) AdvanceCheckpoint( // If changefeed's checkpoint lag is larger than 30s, // log the 4 slowlest table infos every minute, which can // help us find the problematic tables. - checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(newCheckpointTs)) + checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(watermark.CheckpointTs)) if checkpointLag > logSlowTablesLagThreshold && time.Since(r.lastLogSlowTablesTime) > logSlowTablesInterval { r.logSlowTableInfo(currentPDTime) @@ -705,19 +721,19 @@ func (r *Manager) AdvanceCheckpoint( } if redoMetaManager.Enabled() { - if newResolvedTs > barrier.RedoBarrierTs { - newResolvedTs = barrier.RedoBarrierTs + if watermark.ResolvedTs > barrier.RedoBarrierTs { + watermark.ResolvedTs = barrier.RedoBarrierTs } - redoMetaManager.UpdateMeta(newCheckpointTs, newResolvedTs) + redoMetaManager.UpdateMeta(watermark.CheckpointTs, watermark.ResolvedTs) log.Debug("owner updates redo meta", zap.String("namespace", r.changefeedID.Namespace), zap.String("changefeed", r.changefeedID.ID), - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("newResolvedTs", newResolvedTs)) - return limitBarrierWithRedo(newCheckpointTs, newResolvedTs) + zap.Uint64("newCheckpointTs", watermark.CheckpointTs), + zap.Uint64("newResolvedTs", watermark.ResolvedTs)) + limitBarrierWithRedo(&watermark) } - return newCheckpointTs, newResolvedTs + return watermark } func (r *Manager) logSlowTableInfo(currentPDTime time.Time) { diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 6d8cb9f3be5..98335ca23f2 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -15,6 +15,7 @@ package replication import ( "context" + "math" "testing" "time" @@ -621,6 +622,14 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(10), ResolvedTs: model.Ts(20), + LastSyncedTs: model.Ts(15), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(30), + }, + }, }, }, }, model.ChangeFeedID{}) @@ -636,6 +645,14 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(15), ResolvedTs: model.Ts(30), + LastSyncedTs: model.Ts(20), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(40), + }, + }, }, }, }, model.ChangeFeedID{}) @@ -647,21 +664,30 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { // no tables are replicating, resolvedTs should be advanced to globalBarrierTs and checkpoint // should be advanced to minTableBarrierTs. currentTables := &TableRanges{} - checkpoint, resolved := r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(5), redoMetaManager) - require.Equal(t, model.Ts(5), checkpoint) - require.Equal(t, model.Ts(5), resolved) + watermark := r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(5), redoMetaManager) + require.Equal(t, model.Ts(5), watermark.CheckpointTs) + require.Equal(t, model.Ts(5), watermark.ResolvedTs) + require.Equal(t, model.Ts(0), watermark.LastSyncedTs) + require.Equal(t, model.Ts(math.MaxUint64), watermark.PullerResolvedTs) // all tables are replicating currentTables.UpdateTables([]model.TableID{1, 2}) - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) - require.Equal(t, model.Ts(10), checkpoint) - require.Equal(t, model.Ts(20), resolved) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) + require.Equal(t, model.Ts(10), watermark.CheckpointTs) + require.Equal(t, model.Ts(20), watermark.ResolvedTs) + require.Equal(t, model.Ts(20), watermark.LastSyncedTs) + require.Equal(t, model.Ts(30), watermark.PullerResolvedTs) // some table not exist yet. currentTables.UpdateTables([]model.TableID{1, 2, 3}) - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) - require.Equal(t, checkpointCannotProceed, checkpoint) - require.Equal(t, checkpointCannotProceed, resolved) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) + require.Equal(t, checkpointCannotProceed, watermark.CheckpointTs) + require.Equal(t, checkpointCannotProceed, watermark.ResolvedTs) + require.Equal(t, checkpointCannotProceed, watermark.LastSyncedTs) + require.Equal(t, checkpointCannotProceed, watermark.PullerResolvedTs) span3 := spanz.TableIDToComparableSpan(3) rs, err = NewReplicationSet(span3, model.Ts(5), @@ -672,6 +698,14 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(5), ResolvedTs: model.Ts(40), + LastSyncedTs: model.Ts(30), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(50), + }, + }, }, }, "2": { @@ -680,14 +714,25 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(5), ResolvedTs: model.Ts(40), + LastSyncedTs: model.Ts(32), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(50), + }, + }, }, }, }, model.ChangeFeedID{}) require.NoError(t, err) r.spans.ReplaceOrInsert(span3, rs) - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) - require.Equal(t, model.Ts(5), checkpoint) - require.Equal(t, model.Ts(20), resolved) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) + require.Equal(t, model.Ts(5), watermark.CheckpointTs) + require.Equal(t, model.Ts(20), watermark.ResolvedTs) + require.Equal(t, model.Ts(32), watermark.LastSyncedTs) + require.Equal(t, model.Ts(30), watermark.PullerResolvedTs) currentTables.UpdateTables([]model.TableID{1, 2, 3, 4}) span4 := spanz.TableIDToComparableSpan(4) @@ -699,14 +744,25 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(3), ResolvedTs: model.Ts(10), + LastSyncedTs: model.Ts(5), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(12), + }, + }, }, }, }, model.ChangeFeedID{}) require.NoError(t, err) r.spans.ReplaceOrInsert(span4, rs) - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) - require.Equal(t, model.Ts(3), checkpoint) - require.Equal(t, model.Ts(10), resolved) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) + require.Equal(t, model.Ts(3), watermark.CheckpointTs) + require.Equal(t, model.Ts(10), watermark.ResolvedTs) + require.Equal(t, model.Ts(32), watermark.LastSyncedTs) + require.Equal(t, model.Ts(12), watermark.PullerResolvedTs) // Split table 5 into 2 spans. currentTables.UpdateTables([]model.TableID{1, 2, 3, 4, 5}) @@ -723,28 +779,45 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(3), ResolvedTs: model.Ts(10), + LastSyncedTs: model.Ts(8), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(11), + }, + }, }, }, }, model.ChangeFeedID{}) require.NoError(t, err) r.spans.ReplaceOrInsert(span, rs) } - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) - require.Equal(t, model.Ts(3), checkpoint) - require.Equal(t, model.Ts(10), resolved) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) + require.Equal(t, model.Ts(3), watermark.CheckpointTs) + require.Equal(t, model.Ts(10), watermark.ResolvedTs) + require.Equal(t, model.Ts(32), watermark.LastSyncedTs) + require.Equal(t, model.Ts(11), watermark.PullerResolvedTs) // The start span is missing rs5_1, _ := r.spans.Delete(span5_1) - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) - require.Equal(t, checkpointCannotProceed, checkpoint) - require.Equal(t, checkpointCannotProceed, resolved) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) + require.Equal(t, checkpointCannotProceed, watermark.CheckpointTs) + require.Equal(t, checkpointCannotProceed, watermark.ResolvedTs) + require.Equal(t, checkpointCannotProceed, watermark.LastSyncedTs) + require.Equal(t, checkpointCannotProceed, watermark.PullerResolvedTs) // The end span is missing r.spans.ReplaceOrInsert(span5_1, rs5_1) r.spans.Delete(span5_2) - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) - require.Equal(t, checkpointCannotProceed, checkpoint) - require.Equal(t, checkpointCannotProceed, resolved) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), schedulepb.NewBarrierWithMinTs(30), redoMetaManager) + require.Equal(t, checkpointCannotProceed, watermark.CheckpointTs) + require.Equal(t, checkpointCannotProceed, watermark.ResolvedTs) + require.Equal(t, checkpointCannotProceed, watermark.LastSyncedTs) + require.Equal(t, checkpointCannotProceed, watermark.PullerResolvedTs) // redo is enabled currentTables.UpdateTables([]model.TableID{4}) @@ -757,6 +830,14 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(10), ResolvedTs: model.Ts(15), + LastSyncedTs: model.Ts(12), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(16), + }, + }, }, }, }, model.ChangeFeedID{}) @@ -765,9 +846,12 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { barrier := schedulepb.NewBarrierWithMinTs(30) redoMetaManager.enable = true redoMetaManager.resolvedTs = 9 - checkpoint, resolved = r.AdvanceCheckpoint(currentTables, time.Now(), barrier, redoMetaManager) - require.Equal(t, model.Ts(9), resolved) - require.Equal(t, model.Ts(9), checkpoint) + watermark = r.AdvanceCheckpoint( + currentTables, time.Now(), barrier, redoMetaManager) + require.Equal(t, model.Ts(9), watermark.ResolvedTs) + require.Equal(t, model.Ts(9), watermark.CheckpointTs) + require.Equal(t, model.Ts(12), watermark.LastSyncedTs) + require.Equal(t, model.Ts(16), watermark.PullerResolvedTs) require.Equal(t, model.Ts(9), barrier.GetGlobalBarrierTs()) } @@ -783,6 +867,14 @@ func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(10), ResolvedTs: model.Ts(20), + LastSyncedTs: model.Ts(12), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(26), + }, + }, }, }, }, model.ChangeFeedID{}) @@ -798,6 +890,14 @@ func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { Checkpoint: tablepb.Checkpoint{ CheckpointTs: model.Ts(15), ResolvedTs: model.Ts(30), + LastSyncedTs: model.Ts(18), + }, + Stats: tablepb.Stats{ + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-egress": { + ResolvedTs: model.Ts(39), + }, + }, }, }, }, model.ChangeFeedID{}) @@ -810,9 +910,13 @@ func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { currentTables := &TableRanges{} currentTables.UpdateTables([]model.TableID{1, 2, 3}) barrier := schedulepb.NewBarrierWithMinTs(30) - checkpoint, resolved := r.AdvanceCheckpoint(currentTables, time.Now(), barrier, redoMetaManager) - require.Equal(t, checkpointCannotProceed, checkpoint) - require.Equal(t, checkpointCannotProceed, resolved) + watermark := r.AdvanceCheckpoint( + currentTables, + time.Now(), barrier, redoMetaManager) + require.Equal(t, checkpointCannotProceed, watermark.CheckpointTs) + require.Equal(t, checkpointCannotProceed, watermark.ResolvedTs) + require.Equal(t, checkpointCannotProceed, watermark.LastSyncedTs) + require.Equal(t, checkpointCannotProceed, watermark.PullerResolvedTs) require.Equal(t, uint64(25), barrier.Barrier.GetGlobalBarrierTs()) } diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index eab841b246c..158fe527129 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -1021,6 +1021,10 @@ func (r *ReplicationSet) updateCheckpointAndStats( zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) } + if r.Checkpoint.LastSyncedTs < checkpoint.LastSyncedTs { + r.Checkpoint.LastSyncedTs = checkpoint.LastSyncedTs + } + // we only update stats when stats is not empty, because we only collect stats every 10s. if stats.Size() > 0 { r.Stats = stats diff --git a/cdc/scheduler/schedulepb/watermark.go b/cdc/scheduler/schedulepb/watermark.go new file mode 100644 index 00000000000..81123734e74 --- /dev/null +++ b/cdc/scheduler/schedulepb/watermark.go @@ -0,0 +1,24 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulepb + +import "github.com/pingcap/tiflow/cdc/model" + +// Watermark contains various ts variables to make code easier +type Watermark struct { + CheckpointTs model.Ts + ResolvedTs model.Ts + LastSyncedTs model.Ts + PullerResolvedTs model.Ts +} diff --git a/cdc/sink/tablesink/table_sink.go b/cdc/sink/tablesink/table_sink.go index 69f6f61db11..e5e8cf19b3d 100644 --- a/cdc/sink/tablesink/table_sink.go +++ b/cdc/sink/tablesink/table_sink.go @@ -32,6 +32,11 @@ type TableSink interface { // For example, calculating the current progress from the statistics of the table sink. // This is a thread-safe method. GetCheckpointTs() model.ResolvedTs + // GetLastSyncedTs returns the last synced ts of table sink. + // the last synced ts means the biggest commits of the events + // that have been flushed to the downstream. + // This is a thread-safe method. + GetLastSyncedTs() model.Ts // Close closes the table sink. // After it returns, no more events will be sent out from this capture. Close() diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index 5ee1b869dba..76b65cc7781 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -15,6 +15,7 @@ package tablesink import ( "sort" + "sync" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -31,6 +32,21 @@ var ( _ TableSink = (*EventTableSink[*model.SingleTableTxn, *dmlsink.TxnEventAppender])(nil) ) +// LastSyncedTsRecord is used to record the last synced ts of table sink with lock +// lastSyncedTs means the biggest commits of the events +// that have been flushed to the downstream. +type LastSyncedTsRecord struct { + sync.Mutex + lastSyncedTs model.Ts +} + +// getLastSyncedTs get value from LastSyncedTsRecord +func (r *LastSyncedTsRecord) getLastSyncedTs() model.Ts { + r.Lock() + defer r.Unlock() + return r.lastSyncedTs +} + // EventTableSink is a table sink that can write events. type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct { changefeedID model.ChangeFeedID @@ -46,6 +62,8 @@ type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct { eventBuffer []E state state.TableSinkState + lastSyncedTs LastSyncedTsRecord + // For dataflow metrics. metricsTableSinkTotalRows prometheus.Counter } @@ -69,6 +87,7 @@ func New[E dmlsink.TableEvent, P dmlsink.Appender[E]]( eventAppender: appender, eventBuffer: make([]E, 0, 1024), state: state.TableSinkSinking, + lastSyncedTs: LastSyncedTsRecord{lastSyncedTs: startTs}, metricsTableSinkTotalRows: totalRowsCounter, } } @@ -114,9 +133,24 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err return SinkInternalError{err} } // We have to record the event ID for the callback. + postEventFlushFunc := e.progressTracker.addEvent() + evCommitTs := ev.GetCommitTs() ce := &dmlsink.CallbackableEvent[E]{ - Event: ev, - Callback: e.progressTracker.addEvent(), + Event: ev, + Callback: func() { + // Due to multi workers will call this callback concurrently, + // we need to add lock to protect lastSyncedTs + // we need make a performance test for it + { + e.lastSyncedTs.Lock() + defer e.lastSyncedTs.Unlock() + + if e.lastSyncedTs.lastSyncedTs < evCommitTs { + e.lastSyncedTs.lastSyncedTs = evCommitTs + } + } + postEventFlushFunc() + }, SinkState: &e.state, } resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) @@ -140,6 +174,13 @@ func (e *EventTableSink[E, P]) GetCheckpointTs() model.ResolvedTs { return e.progressTracker.advance() } +// GetLastSyncedTs returns the last synced ts of table sink. +// lastSyncedTs means the biggest commits of all the events +// that have been flushed to the downstream. +func (e *EventTableSink[E, P]) GetLastSyncedTs() model.Ts { + return e.lastSyncedTs.getLastSyncedTs() +} + // Close closes the table sink. // After it returns, no more events will be sent out from this capture. func (e *EventTableSink[E, P]) Close() { diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 9381a7c8105..0c8ce77b359 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -247,6 +247,7 @@ func TestGetCheckpointTs(t *testing.T) { tb.AppendRowChangedEvents(getTestRows()...) require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(0), "lastSyncedTs should be not updated") // One event will be flushed. err := tb.UpdateResolvedTs(model.NewResolvedTs(101)) @@ -254,11 +255,13 @@ func TestGetCheckpointTs(t *testing.T) { require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0") sink.acknowledge(101) require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(101), "lastSyncedTs should be the same as the flushed event") // Flush all events. err = tb.UpdateResolvedTs(model.NewResolvedTs(105)) require.Nil(t, err) require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(101), "lastSyncedTs should be not updated") // Only acknowledge some events. sink.acknowledge(102) @@ -268,10 +271,12 @@ func TestGetCheckpointTs(t *testing.T) { tb.GetCheckpointTs(), "checkpointTs should still be 101", ) + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(102), "lastSyncedTs should be updated") // Ack all events. sink.acknowledge(105) require.Equal(t, model.NewResolvedTs(105), tb.GetCheckpointTs(), "checkpointTs should be 105") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(105), "lastSyncedTs should be updated") } func TestClose(t *testing.T) { @@ -404,4 +409,5 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) { currentTs := tb.GetCheckpointTs() sink.acknowledge(105) require.Equal(t, currentTs, tb.GetCheckpointTs(), "checkpointTs should not be updated") + require.Equal(t, tb.lastSyncedTs.getLastSyncedTs(), uint64(105), "lastSyncedTs should not change") } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 846d523ac00..1499cf5e769 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -80,7 +80,11 @@ const ( "corruption-handle-level": "warn" }, "changefeed-error-stuck-duration": 1800000000000, - "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "synced-status": { + "synced-check-interval": 300, + "checkpoint-interval": 15 + } }` testCfgTestServerConfigMarshal = `{ @@ -309,7 +313,11 @@ const ( "corruption-handle-level": "warn" }, "changefeed-error-stuck-duration": 1800000000000, - "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "synced-status": { + "synced-check-interval": 300, + "checkpoint-interval": 15 + } }` testCfgTestReplicaConfigMarshal2 = `{ @@ -449,6 +457,10 @@ const ( "corruption-handle-level": "warn" }, "changefeed-error-stuck-duration": 1800000000000, - "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" + "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "synced-status": { + "synced-check-interval": 300, + "checkpoint-interval": 15 + } }` ) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 8be46c978d8..82519af5626 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -101,6 +101,7 @@ var defaultReplicaConfig = &ReplicaConfig{ }, ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30), SQLMode: defaultSQLMode, + SyncedStatus: &SyncedStatusConfig{SyncedCheckInterval: 5 * 60, CheckpointInterval: 15}, } // GetDefaultReplicaConfig returns the default replica config. @@ -141,10 +142,40 @@ type replicaConfig struct { Sink *SinkConfig `toml:"sink" json:"sink"` Consistent *ConsistentConfig `toml:"consistent" json:"consistent"` // Scheduler is the configuration for scheduler. +<<<<<<< HEAD Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` Integrity *integrity.Config `toml:"integrity" json:"integrity"` ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` SQLMode string `toml:"sql-mode" json:"sql-mode"` +======= + Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` + // Integrity is only available when the downstream is MQ. + Integrity *integrity.Config `toml:"integrity" json:"integrity"` + ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` + SQLMode string `toml:"sql-mode" json:"sql-mode"` + SyncedStatus *SyncedStatusConfig `toml:"synced-status" json:"synced-status,omitempty"` +} + +// Value implements the driver.Valuer interface +func (c ReplicaConfig) Value() (driver.Value, error) { + cfg, err := c.Marshal() + if err != nil { + return nil, err + } + + // TODO: refactor the meaningless type conversion. + return []byte(cfg), nil +} + +// Scan implements the sql.Scanner interface +func (c *ReplicaConfig) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + + return c.UnmarshalJSON(b) +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) } // Marshal returns the json marshal format of a ReplicationConfig diff --git a/pkg/config/synced_status_config.go b/pkg/config/synced_status_config.go new file mode 100644 index 00000000000..144851f8354 --- /dev/null +++ b/pkg/config/synced_status_config.go @@ -0,0 +1,23 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// SyncedStatusConfig represents synced check interval config for a changefeed +type SyncedStatusConfig struct { + // The minimum interval between the latest synced ts and now required to reach synced state + SyncedCheckInterval int64 `toml:"synced-check-interval" json:"synced-check-interval"` + // The maximum interval between latest checkpoint ts and now or + // between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state + CheckpointInterval int64 `toml:"checkpoint-interval" json:"checkpoint-interval"` +} diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 64998c6d194..f4285c3a147 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -130,7 +130,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, - SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SyncedStatus: config.GetDefaultReplicaConfig().SyncedStatus, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -186,7 +187,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, - SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SyncedStatus: config.GetDefaultReplicaConfig().SyncedStatus, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -247,7 +249,8 @@ func TestChangefeedStateUpdate(t *testing.T) { Integrity: config.GetDefaultReplicaConfig().Integrity, ChangefeedErrorStuckDuration: config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration, - SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SQLMode: config.GetDefaultReplicaConfig().SQLMode, + SyncedStatus: config.GetDefaultReplicaConfig().SyncedStatus, }, }, Status: &model.ChangeFeedStatus{CheckpointTs: 421980719742451713}, @@ -340,6 +343,7 @@ func TestPatchInfo(t *testing.T) { Integrity: defaultConfig.Integrity, ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, SQLMode: defaultConfig.SQLMode, + SyncedStatus: defaultConfig.SyncedStatus, }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { @@ -360,6 +364,7 @@ func TestPatchInfo(t *testing.T) { Integrity: defaultConfig.Integrity, ChangefeedErrorStuckDuration: defaultConfig.ChangefeedErrorStuckDuration, SQLMode: defaultConfig.SQLMode, + SyncedStatus: defaultConfig.SyncedStatus, }, }) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 1e5c4392b63..7e29ec24ccd 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -39,8 +39,13 @@ groups=( ["G05"]='charset_gbk ddl_manager multi_source' ["G06"]='sink_retry changefeed_error ddl_sequence resourcecontrol' ["G07"]='kv_client_stream_reconnect cdc split_region' +<<<<<<< HEAD ["G08"]='server_config_compatibility processor_err_chan changefeed_reconstruct multi_capture' ["G09"]='gc_safepoint changefeed_pause_resume cli savepoint' +======= + ["G08"]='processor_err_chan changefeed_reconstruct multi_capture' + ["G09"]='gc_safepoint changefeed_pause_resume cli savepoint synced_status' +>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133)) ["G10"]='default_value simple cdc_server_tips event_filter' ["G11"]='resolve_lock move_table autorandom generate_column' ["G12"]='many_pk_or_uk capture_session_done_during_task ddl_attributes' diff --git a/tests/integration_tests/synced_status/conf/changefeed-redo.toml b/tests/integration_tests/synced_status/conf/changefeed-redo.toml new file mode 100644 index 00000000000..8f67ecb9d8f --- /dev/null +++ b/tests/integration_tests/synced_status/conf/changefeed-redo.toml @@ -0,0 +1,7 @@ +[synced-status] +synced-check-interval = 120 +checkpoint-interval = 20 + +[consistent] +level="eventual" +storage = "file:///tmp/tidb_cdc_test/synced_status/redo" \ No newline at end of file diff --git a/tests/integration_tests/synced_status/conf/changefeed.toml b/tests/integration_tests/synced_status/conf/changefeed.toml new file mode 100644 index 00000000000..0d8aa19fbf5 --- /dev/null +++ b/tests/integration_tests/synced_status/conf/changefeed.toml @@ -0,0 +1,3 @@ +[synced-status] +synced-check-interval = 120 +checkpoint-interval = 20 \ No newline at end of file diff --git a/tests/integration_tests/synced_status/run.sh b/tests/integration_tests/synced_status/run.sh new file mode 100644 index 00000000000..04f55009dad --- /dev/null +++ b/tests/integration_tests/synced_status/run.sh @@ -0,0 +1,308 @@ +#!/bin/bash + +# [DISCRIPTION]: +# This test is related to +# It will test the sync status request of cdc server in the following scenarios: +# 1. The sync status request of cdc server when the upstream cluster is available +# 1.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold +# 1.2 pdNow - lastSyncedTs < threshold +# 1.3 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold, resolvedTs - checkpointTs > threshold +# 2. The sync status request of cdc server when the upstream pd is unavailable +# 2.1 resolvedTs - checkpointTs < threshold +# 3. The sync status request of cdc server when the upstream tikv is unavailable +# 3.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs > threshold, resolvedTs - checkpointTs < threshold +# 3.2 pdNow - lastSyncedTs < threshold +# 4. The sync status request of cdc server when the downstream tidb is available +# 4.1 pdNow - lastSyncedTs > threshold, pdNow - checkpointTs < threshold +# 4.2 pdNow - lastSyncedTs < threshold +# [STEP]: +# 1. Create changefeed with synced-time-config = xx +# 2. insert data to upstream cluster, and do the related actions for each scenarios +# 3. do the query of synced status of cdc server +# 4. check the info and status of query + +set -xeu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function kill_pd() { + info=$(ps aux | grep pd-server | grep $WORK_DIR) || true + $(ps aux | grep pd-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function kill_tikv() { + info=$(ps aux | grep tikv-server | grep $WORK_DIR) || true + $(ps aux | grep tikv-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function kill_tidb() { + info=$(ps aux | grep tidb-server | grep $WORK_DIR) || true + $(ps aux | grep tidb-server | grep $WORK_DIR | awk '{print $2}' | xargs kill -9 &>/dev/null) || true +} + +function run_normal_case_and_unavailable_pd() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 1: test in available cluster + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + + status=$(echo $synced_status | jq '.synced') + sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts') + puller_resolved_ts=$(echo $synced_status | jq -r '.puller_resolved_ts') + last_synced_ts=$(echo $synced_status | jq -r '.last_synced_ts') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + # the timestamp for puller_resolved_ts is 0 when do data insert + if [ "$puller_resolved_ts" != "1970-01-01 08:00:00.000" ]; then + echo "puller_resolved_ts is not 1970-01-01 08:00:00.000" + exit 1 + fi + # the timestamp for last_synced_ts is 0 when do data insert + if [ "$last_synced_ts" != "1970-01-01 08:00:00.000" ]; then + echo "last_synced_ts is not 1970-01-01 08:00:00.000" + exit 1 + fi + + # compare sink_checkpoint_ts with current time + current=$(date +"%Y-%m-%d %H:%M:%S") + echo "sink_checkpoint_ts is "$sink_checkpoint_ts + checkpoint_timestamp=$(date -d "$sink_checkpoint_ts" +%s) + current_timestamp=$(date -d "$current" +%s) + if [ $(($current_timestamp - $checkpoint_timestamp)) -gt 300 ]; then # give a soft check + echo "sink_checkpoint_ts is not correct" + exit 1 + fi + + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # wait data insert + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + if [ "$info" != "The data syncing is not finished, please wait" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + + #========== + # case 2: test with unavailable pd, query will not get the available response + kill_pd + + sleep 20 + + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + error_code=$(echo $synced_status | jq -r '.error_code') + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_unavailable_tikv() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 3: test in unavailable tikv cluster + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # make data inserted into downstream + kill_tikv + + # test the case when pdNow - lastSyncedTs < threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="The data syncing is not finished, please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + # test the case when pdNow - lastSyncedTs > threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Please check whether pd is healthy and tikv region is all available. \ +If pd is not healthy or tikv region is not available, the data syncing is finished. \ +When pd is offline means that pd is not healthy. For tikv region, you can check the grafana info \ +in 'TiKV-Details-Resolved-Ts-Max Leader Resolved TS gap'. If the gap is a large value, such as a few minutes, \ +it means some regions in tikv are unavailable. \ +Otherwise the data syncing is not finished, please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_unavailable_tidb() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + # case 3: test in unavailable tikv cluster + run_sql "USE TEST;Create table t1(a int primary key, b int);insert into t1 values(1,2);insert into t1 values(2,3);" + check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 5 # make data inserted into downstream + kill_tidb + + # test the case when pdNow - lastSyncedTs < threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="The data syncing is not finished, please wait" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + sleep 130 # wait enough time for pass synced-check-interval + # test the case when pdNow - lastSyncedTs > threshold + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != true ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Data syncing is finished" + + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +function run_case_with_failpoint() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # make failpoint to block checkpoint-ts + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedOwnerNotUpdateCheckpoint=return(true)' + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + config_path=$1 + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" + + sleep 20 # wait enough time for pass checkpoint-check-interval + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced) + status=$(echo $synced_status | jq '.synced') + if [ $status != false ]; then + echo "synced status isn't correct" + exit 1 + fi + info=$(echo $synced_status | jq -r '.info') + target_message="Please check whether pd is healthy and tikv region is all available. \ +If pd is not healthy or tikv region is not available, the data syncing is finished. \ +When pd is offline means that pd is not healthy. For tikv region, you can check the grafana info \ +in 'TiKV-Details-Resolved-Ts-Max Leader Resolved TS gap'. If the gap is a large value, such as a few minutes, \ +it means some regions in tikv are unavailable. \ +Otherwise the data syncing is not finished, please wait" + if [ "$info" != "$target_message" ]; then + echo "synced status info is not correct" + exit 1 + fi + + export GO_FAILPOINTS='' + + cleanup_process $CDC_BINARY + stop_tidb_cluster +} + +trap stop_tidb_cluster EXIT +run_normal_case_and_unavailable_pd "conf/changefeed.toml" +run_case_with_unavailable_tikv "conf/changefeed.toml" +run_case_with_unavailable_tidb "conf/changefeed.toml" +run_case_with_failpoint "conf/changefeed.toml" + +# enable redo +run_normal_case_and_unavailable_pd "conf/changefeed-redo.toml" +run_case_with_unavailable_tikv "conf/changefeed-redo.toml" +run_case_with_unavailable_tidb "conf/changefeed-redo.toml" +run_case_with_failpoint "conf/changefeed-redo.toml" + +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"