Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed(ticdc): add namespace info into ChangeFeedID #5300

Merged
merged 14 commits into from
May 5, 2022
Merged
70 changes: 42 additions & 28 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (h *openAPI) ListChangefeed(c *gin.Context) {
}

resp := &model.ChangefeedCommonInfo{
ID: cfID,
Namespace: cfID.Namespace,
ID: cfID.ID,
}

if cfInfo != nil {
Expand Down Expand Up @@ -187,9 +188,10 @@ func (h *openAPI) GetChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

Expand Down Expand Up @@ -221,7 +223,8 @@ func (h *openAPI) GetChangefeed(c *gin.Context) {
}

changefeedDetail := &model.ChangefeedDetail{
ID: changefeedID,
Namespace: changefeedID.Namespace,
ID: changefeedID.ID,
SinkURI: info.SinkURI,
CreateTime: model.JSONTime(info.CreateTime),
StartTs: info.StartTs,
Expand Down Expand Up @@ -272,7 +275,8 @@ func (h *openAPI) CreateChangefeed(c *gin.Context) {
return
}

err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info, changefeedConfig.ID)
err = h.capture.EtcdClient.CreateChangefeedInfo(ctx, info,
model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -300,9 +304,10 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {

ctx := c.Request.Context()

changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -341,9 +346,10 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -388,10 +394,11 @@ func (h *openAPI) UpdateChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))

if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
info, err := h.statusProvider().GetChangeFeedInfo(ctx, changefeedID)
Expand Down Expand Up @@ -444,9 +451,10 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -485,10 +493,11 @@ func (h *openAPI) RebalanceTables(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))

if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -524,9 +533,10 @@ func (h *openAPI) MoveTable(c *gin.Context) {
}

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
// check if the changefeed exists
Expand Down Expand Up @@ -600,9 +610,10 @@ func (h *openAPI) GetProcessor(c *gin.Context) {

ctx := c.Request.Context()

changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

Expand Down Expand Up @@ -685,7 +696,10 @@ func (h *openAPI) ListProcessor(c *gin.Context) {
}
resps := make([]*model.ProcessorCommonInfo, len(infos))
for i, info := range infos {
resp := &model.ProcessorCommonInfo{CfID: info.CfID, CaptureID: info.CaptureID}
resp := &model.ProcessorCommonInfo{
Namespace: info.CfID.Namespace,
CfID: info.CfID.ID, CaptureID: info.CaptureID,
}
resps[i] = resp
}
c.IndentedJSON(http.StatusOK, resps)
Expand Down
70 changes: 43 additions & 27 deletions cdc/api/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
"go.uber.org/zap"
)

const (
changeFeedID = "test-changeFeed"
var (
changeFeedID = model.DefaultChangeFeedID("test-changeFeed")
captureID = "test-capture"
nonExistChangefeedID = "non-exist-changefeed"
nonExistChangefeedID = model.DefaultChangeFeedID("non-exist-changefeed")
)

type mockStatusProvider struct {
Expand Down Expand Up @@ -115,14 +115,14 @@ func newStatusProvider() *mockStatusProvider {

statusProvider.On("GetAllChangeFeedStatuses", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{
changeFeedID + "1": {CheckpointTs: 1},
changeFeedID + "2": {CheckpointTs: 2},
model.DefaultChangeFeedID(changeFeedID.ID + "1"): {CheckpointTs: 1},
model.DefaultChangeFeedID(changeFeedID.ID + "2"): {CheckpointTs: 2},
}, nil)

statusProvider.On("GetAllChangeFeedInfo", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedInfo{
changeFeedID + "1": {State: model.StateNormal},
changeFeedID + "2": {State: model.StateStopped},
model.DefaultChangeFeedID(changeFeedID.ID + "1"): {State: model.StateNormal},
model.DefaultChangeFeedID(changeFeedID.ID + "2"): {State: model.StateStopped},
}, nil)

statusProvider.On("GetAllTaskStatuses", mock.Anything).
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestGetChangefeed(t *testing.T) {
router := newRouter(cp, newStatusProvider())

// test get changefeed succeeded
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "GET"}
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "GET"}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -191,7 +191,10 @@ func TestGetChangefeed(t *testing.T) {
require.Equal(t, model.StateNormal, resp.FeedState)

// test get changefeed failed
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "GET"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID.ID),
method: "GET",
}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -217,7 +220,10 @@ func TestPauseChangefeed(t *testing.T) {
require.EqualValues(t, model.AdminStop, adminJob.Type)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"}
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -230,7 +236,10 @@ func TestPauseChangefeed(t *testing.T) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID)
close(done)
})
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -242,7 +251,7 @@ func TestPauseChangefeed(t *testing.T) {

// test pause changefeed failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -270,7 +279,10 @@ func TestResumeChangefeed(t *testing.T) {
require.EqualValues(t, model.AdminResume, adminJob.Type)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"}
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -283,7 +295,10 @@ func TestResumeChangefeed(t *testing.T) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID)
close(done)
})
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"}
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -295,7 +310,7 @@ func TestResumeChangefeed(t *testing.T) {

// test resume changefeed failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -323,7 +338,7 @@ func TestRemoveChangefeed(t *testing.T) {
require.EqualValues(t, model.AdminRemove, adminJob.Type)
close(done)
})
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"}
api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "DELETE"}
w := httptest.NewRecorder()
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -336,7 +351,7 @@ func TestRemoveChangefeed(t *testing.T) {
done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID)
close(done)
})
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"}
api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID.ID), method: "DELETE"}
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
Expand All @@ -348,7 +363,7 @@ func TestRemoveChangefeed(t *testing.T) {

// test remove changefeed failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID.ID),
method: "DELETE",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -376,7 +391,7 @@ func TestRebalanceTables(t *testing.T) {
close(done)
})
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
Expand All @@ -392,7 +407,7 @@ func TestRebalanceTables(t *testing.T) {
close(done)
})
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand All @@ -406,7 +421,8 @@ func TestRebalanceTables(t *testing.T) {

// test rebalance table failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table",
nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -447,7 +463,7 @@ func TestMoveTable(t *testing.T) {
close(done)
})
api := testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID.ID),
method: "POST",
}
w := httptest.NewRecorder()
Expand Down Expand Up @@ -476,7 +492,7 @@ func TestMoveTable(t *testing.T) {
close(done)
})
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand All @@ -490,7 +506,7 @@ func TestMoveTable(t *testing.T) {

// test move table failed
api = testCase{
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID),
url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID.ID),
method: "POST",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -526,7 +542,7 @@ func TestGetProcessor(t *testing.T) {
router := newRouter(cp, newStatusProvider())
// test get processor succeeded
api := testCase{
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID),
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID.ID, captureID),
method: "GET",
}
w := httptest.NewRecorder()
Expand All @@ -540,7 +556,7 @@ func TestGetProcessor(t *testing.T) {

// test get processor fail due to capture ID error
api = testCase{
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"),
url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID.ID, "non-exist-capture"),
method: "GET",
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -568,7 +584,7 @@ func TestListProcessor(t *testing.T) {
var resp []model.ProcessorCommonInfo
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, changeFeedID, resp[0].CfID)
require.Equal(t, changeFeedID, model.DefaultChangeFeedID(resp[0].CfID))
}

func TestListCapture(t *testing.T) {
Expand Down
Loading