Skip to content

Commit

Permalink
changefeed(ticdc): add namespace info into ChangeFeedID (#5300)
Browse files Browse the repository at this point in the history
ref #5301
  • Loading branch information
sdojjy authored May 5, 2022
1 parent a2403ce commit 196de8b
Show file tree
Hide file tree
Showing 125 changed files with 2,104 additions and 1,149 deletions.
70 changes: 42 additions & 28 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (h *openAPI) ListChangefeed(c *gin.Context) {
}

resp := &model.ChangefeedCommonInfo{
ID: cfID,
Namespace: cfID.Namespace,
ID: cfID.ID,
}

if cfInfo != nil {
Expand Down Expand Up @@ -187,9 +188,10 @@ func (h *openAPI) GetChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

Expand Down Expand Up @@ -221,7 +223,8 @@ func (h *openAPI) GetChangefeed(c *gin.Context) {
}

changefeedDetail := &model.ChangefeedDetail{
ID: changefeedID,
Namespace: changefeedID.Namespace,
ID: changefeedID.ID,
SinkURI: info.SinkURI,
CreateTime: model.JSONTime(info.CreateTime),
StartTs: info.StartTs,
Expand Down Expand Up @@ -272,7 +275,8 @@ func (h *openAPI) CreateChangefeed(c *gin.Context) {
return
}

err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info, changefeedConfig.ID)
err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info,
model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -300,9 +304,10 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {

ctx := c.Request.Context()

changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -341,9 +346,10 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -388,10 +394,11 @@ func (h *openAPI) UpdateChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))

if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
info, err := h.statusProvider().GetChangeFeedInfo(ctx, changefeedID)
Expand Down Expand Up @@ -444,9 +451,10 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -485,10 +493,11 @@ func (h *openAPI) RebalanceTables(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))

if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -524,9 +533,10 @@ func (h *openAPI) MoveTable(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -600,9 +610,10 @@ func (h *openAPI) GetProcessor(c *gin.Context) {

ctx := c.Request.Context()

changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

Expand Down Expand Up @@ -685,7 +696,10 @@ func (h *openAPI) ListProcessor(c *gin.Context) {
}
resps := make([]*model.ProcessorCommonInfo, len(infos))
for i, info := range infos {
resp := &model.ProcessorCommonInfo{CfID: info.CfID, CaptureID: info.CaptureID}
resp := &model.ProcessorCommonInfo{
Namespace: info.CfID.Namespace,
CfID: info.CfID.ID, CaptureID: info.CaptureID,
}
resps[i] = resp
}
c.IndentedJSON(http.StatusOK, resps)
Expand Down
70 changes: 43 additions & 27 deletions cdc/api/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
"go.uber.org/zap"
)

const (
changeFeedID = "test-changeFeed"
var (
changeFeedID = model.DefaultChangeFeedID("test-changeFeed")
captureID = "test-capture"
nonExistChangefeedID = "non-exist-changefeed"
nonExistChangefeedID = model.DefaultChangeFeedID("non-exist-changefeed")
)

type mockStatusProvider struct {
Expand Down Expand Up @@ -115,14 +115,14 @@ func newStatusProvider() *mockStatusProvider {

statusProvider.On("GetAllChangeFeedStatuses", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{
changeFeedID + "1": {CheckpointTs: 1},
changeFeedID + "2": {CheckpointTs: 2},
model.DefaultChangeFeedID(changeFeedID.ID + "1"): {CheckpointTs: 1},
model.DefaultChangeFeedID(changeFeedID.ID + "2"): {CheckpointTs: 2},
}, nil)

statusProvider.On("GetAllChangeFeedInfo", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedInfo{
changeFeedID + "1": {State: model.StateNormal},
changeFeedID + "2": {State: model.StateStopped},
model.DefaultChangeFeedID(changeFeedID.ID + "1"): {State: model.StateNormal},
model.DefaultChangeFeedID(changeFeedID.ID + "2"): {State: model.StateStopped},
}, nil)

statusProvider.On("GetAllTaskStatuses", mock.Anything).
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestGetChangefeed(t *testing.T) {
router := newRouter(cp, newStatusProvider())

// test get changefeed succeeded
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "GET"}
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "GET"}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -191,7 +191,10 @@ func TestGetChangefeed(t *testing.T) {
require.Equal(t, model.StateNormal, resp.FeedState)

// test get changefeed failed
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "GET"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID.ID),
method: "GET",
}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -217,7 +220,10 @@ func TestPauseChangefeed(t *testing.T) {
require.EqualValues(t, model.AdminStop, adminJob.Type)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"}
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -230,7 +236,10 @@ func TestPauseChangefeed(t *testing.T) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID)
close(done)
})
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -242,7 +251,7 @@ func TestPauseChangefeed(t *testing.T) {

// test pause changefeed failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -270,7 +279,10 @@ func TestResumeChangefeed(t *testing.T) {
require.EqualValues(t, model.AdminResume, adminJob.Type)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"}
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -283,7 +295,10 @@ func TestResumeChangefeed(t *testing.T) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID)
close(done)
})
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -295,7 +310,7 @@ func TestResumeChangefeed(t *testing.T) {

// test resume changefeed failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -323,7 +338,7 @@ func TestRemoveChangefeed(t *testing.T) {
require.EqualValues(t, model.AdminRemove, adminJob.Type)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"}
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "DELETE"}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -336,7 +351,7 @@ func TestRemoveChangefeed(t *testing.T) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID)
close(done)
})
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"}
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "DELETE"}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -348,7 +363,7 @@ func TestRemoveChangefeed(t *testing.T) {

// test remove changefeed failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID.ID),
method: "DELETE",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -376,7 +391,7 @@ func TestRebalanceTables(t *testing.T) {
close(done)
})
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
Expand All @@ -392,7 +407,7 @@ func TestRebalanceTables(t *testing.T) {
close(done)
})
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand All @@ -406,7 +421,8 @@ func TestRebalanceTables(t *testing.T) {

// test rebalance table failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table",
nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -447,7 +463,7 @@ func TestMoveTable(t *testing.T) {
close(done)
})
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
Expand Down Expand Up @@ -476,7 +492,7 @@ func TestMoveTable(t *testing.T) {
close(done)
})
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand All @@ -490,7 +506,7 @@ func TestMoveTable(t *testing.T) {

// test move table failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -526,7 +542,7 @@ func TestGetProcessor(t *testing.T) {
router := newRouter(cp, newStatusProvider())
// test get processor succeeded
api := testCase{
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID),
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID.ID, captureID),
method: "GET",
}
w := httptest.NewRecorder()
Expand All @@ -540,7 +556,7 @@ func TestGetProcessor(t *testing.T) {

// test get processor fail due to capture ID error
api = testCase{
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"),
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID.ID, "non-exist-capture"),
method: "GET",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -568,7 +584,7 @@ func TestListProcessor(t *testing.T) {
var resp []model.ProcessorCommonInfo
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, changeFeedID, resp[0].CfID)
require.Equal(t, changeFeedID, model.DefaultChangeFeedID(resp[0].CfID))
}

func TestListCapture(t *testing.T) {
Expand Down
Loading

0 comments on commit 196de8b

Please sign in to comment.