From 66000960342156ce0d2efaa593663f4e318e45ca Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Fri, 26 Jan 2024 13:10:50 +0800 Subject: [PATCH] api(ticdc): Check min service gc safepoint when resume changefeed (#10546) close pingcap/tiflow#10463 --- cdc/api/v2/api_helpers.go | 14 +++++++------- cdc/api/v2/api_helpers_mock.go | 8 ++++---- cdc/api/v2/changefeed.go | 17 ++++++++++++----- cdc/api/v2/changefeed_test.go | 4 +++- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 4f0bf88783b..d7780510e35 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -92,7 +92,7 @@ type APIV2Helpers interface { pdClient pd.Client, gcServiceID string, changefeedID model.ChangeFeedID, - checkpointTs uint64, + overrideCheckpointTs uint64, ) error // getPDClient returns a PDClient given the PD cluster addresses and a credential @@ -396,16 +396,16 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig( return newInfo, newUpInfo, nil } +// verifyResumeChangefeedConfig verifies the changefeed config before resuming a changefeed +// overrideCheckpointTs is the checkpointTs of the changefeed that specified by the user. +// or it is the checkpointTs of the changefeed before it is paused. +// we need to check weather the resuming changefeed is gc safe or not. func (APIV2HelpersImpl) verifyResumeChangefeedConfig(ctx context.Context, pdClient pd.Client, gcServiceID string, changefeedID model.ChangeFeedID, - checkpointTs uint64, + overrideCheckpointTs uint64, ) error { - if checkpointTs == 0 { - return nil - } - // 1h is enough for resuming a changefeed. gcTTL := int64(60 * 60) err := gc.EnsureChangefeedStartTsSafety( @@ -413,7 +413,7 @@ func (APIV2HelpersImpl) verifyResumeChangefeedConfig(ctx context.Context, pdClient, gcServiceID, changefeedID, - gcTTL, checkpointTs) + gcTTL, overrideCheckpointTs) if err != nil { if !cerror.ErrStartTsBeforeGC.Equal(err) { return cerror.ErrPDEtcdAPIError.Wrap(err) diff --git a/cdc/api/v2/api_helpers_mock.go b/cdc/api/v2/api_helpers_mock.go index e687ff4cc11..78aebd54bca 100644 --- a/cdc/api/v2/api_helpers_mock.go +++ b/cdc/api/v2/api_helpers_mock.go @@ -119,17 +119,17 @@ func (mr *MockAPIV2HelpersMockRecorder) verifyCreateChangefeedConfig(ctx, cfg, p } // verifyResumeChangefeedConfig mocks base method. -func (m *MockAPIV2Helpers) verifyResumeChangefeedConfig(ctx context.Context, pdClient client.Client, gcServiceID string, changefeedID model.ChangeFeedID, checkpointTs uint64) error { +func (m *MockAPIV2Helpers) verifyResumeChangefeedConfig(ctx context.Context, pdClient client.Client, gcServiceID string, changefeedID model.ChangeFeedID, overrideCheckpointTs uint64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "verifyResumeChangefeedConfig", ctx, pdClient, gcServiceID, changefeedID, checkpointTs) + ret := m.ctrl.Call(m, "verifyResumeChangefeedConfig", ctx, pdClient, gcServiceID, changefeedID, overrideCheckpointTs) ret0, _ := ret[0].(error) return ret0 } // verifyResumeChangefeedConfig indicates an expected call of verifyResumeChangefeedConfig. -func (mr *MockAPIV2HelpersMockRecorder) verifyResumeChangefeedConfig(ctx, pdClient, gcServiceID, changefeedID, checkpointTs interface{}) *gomock.Call { +func (mr *MockAPIV2HelpersMockRecorder) verifyResumeChangefeedConfig(ctx, pdClient, gcServiceID, changefeedID, overrideCheckpointTs interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "verifyResumeChangefeedConfig", reflect.TypeOf((*MockAPIV2Helpers)(nil).verifyResumeChangefeedConfig), ctx, pdClient, gcServiceID, changefeedID, checkpointTs) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "verifyResumeChangefeedConfig", reflect.TypeOf((*MockAPIV2Helpers)(nil).verifyResumeChangefeedConfig), ctx, pdClient, gcServiceID, changefeedID, overrideCheckpointTs) } // verifyUpdateChangefeedConfig mocks base method. diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index d44c3a5effa..7b68644a9ab 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -739,6 +739,11 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + status, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID) + if err != nil { + _ = c.Error(err) + return + } var pdClient pd.Client // if PDAddrs is empty, use the default pdClient @@ -760,13 +765,17 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { } defer pdClient.Close() } - + // If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not. + newCheckpointTs := status.CheckpointTs + if cfg.OverwriteCheckpointTs != 0 { + newCheckpointTs = cfg.OverwriteCheckpointTs + } if err := h.helpers.verifyResumeChangefeedConfig( ctx, pdClient, h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming), changefeedID, - cfg.OverwriteCheckpointTs); err != nil { + newCheckpointTs); err != nil { _ = c.Error(err) return } @@ -794,9 +803,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { } if err := api.HandleOwnerJob(ctx, h.capture, job); err != nil { - if cfg.OverwriteCheckpointTs > 0 { - needRemoveGCSafePoint = true - } + needRemoveGCSafePoint = true _ = c.Error(err) return } diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 94ff9e29133..f99e9e46e76 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -714,7 +714,9 @@ func TestResumeChangefeed(t *testing.T) { pdClient := &mockPDClient{} etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t)) mockUpManager := upstream.NewManager4Test(pdClient) - statusProvider := &mockStatusProvider{} + statusProvider := &mockStatusProvider{ + changefeedStatus: &model.ChangeFeedStatusForAPI{}, + } etcdClient.EXPECT(). GetEnsureGCServiceID(gomock.Any()).