Skip to content

Commit

Permalink
api(ticdc): Check min service gc safepoint when resume changefeed (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Jan 26, 2024
1 parent 63ccb9d commit 6600096
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
14 changes: 7 additions & 7 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type APIV2Helpers interface {
pdClient pd.Client,
gcServiceID string,
changefeedID model.ChangeFeedID,
checkpointTs uint64,
overrideCheckpointTs uint64,
) error

// getPDClient returns a PDClient given the PD cluster addresses and a credential
Expand Down Expand Up @@ -396,24 +396,24 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
return newInfo, newUpInfo, nil
}

// verifyResumeChangefeedConfig verifies the changefeed config before resuming a changefeed
// overrideCheckpointTs is the checkpointTs of the changefeed that specified by the user.
// or it is the checkpointTs of the changefeed before it is paused.
// we need to check weather the resuming changefeed is gc safe or not.
func (APIV2HelpersImpl) verifyResumeChangefeedConfig(ctx context.Context,
pdClient pd.Client,
gcServiceID string,
changefeedID model.ChangeFeedID,
checkpointTs uint64,
overrideCheckpointTs uint64,
) error {
if checkpointTs == 0 {
return nil
}

// 1h is enough for resuming a changefeed.
gcTTL := int64(60 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx,
pdClient,
gcServiceID,
changefeedID,
gcTTL, checkpointTs)
gcTTL, overrideCheckpointTs)
if err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
return cerror.ErrPDEtcdAPIError.Wrap(err)
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/v2/api_helpers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 12 additions & 5 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,11 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
status, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

var pdClient pd.Client
// if PDAddrs is empty, use the default pdClient
Expand All @@ -760,13 +765,17 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}
defer pdClient.Close()
}

// If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not.
newCheckpointTs := status.CheckpointTs
if cfg.OverwriteCheckpointTs != 0 {
newCheckpointTs = cfg.OverwriteCheckpointTs
}
if err := h.helpers.verifyResumeChangefeedConfig(
ctx,
pdClient,
h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming),
changefeedID,
cfg.OverwriteCheckpointTs); err != nil {
newCheckpointTs); err != nil {
_ = c.Error(err)
return
}
Expand Down Expand Up @@ -794,9 +803,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}

if err := api.HandleOwnerJob(ctx, h.capture, job); err != nil {
if cfg.OverwriteCheckpointTs > 0 {
needRemoveGCSafePoint = true
}
needRemoveGCSafePoint = true
_ = c.Error(err)
return
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,9 @@ func TestResumeChangefeed(t *testing.T) {
pdClient := &mockPDClient{}
etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t))
mockUpManager := upstream.NewManager4Test(pdClient)
statusProvider := &mockStatusProvider{}
statusProvider := &mockStatusProvider{
changefeedStatus: &model.ChangeFeedStatusForAPI{},
}

etcdClient.EXPECT().
GetEnsureGCServiceID(gomock.Any()).
Expand Down

0 comments on commit 6600096

Please sign in to comment.