From e2d529cc9887ab61063555dbbbf1d9f9765581e3 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 10 Feb 2022 16:39:36 +0800 Subject: [PATCH] api,owner(ticdc): return error when api fails (#4494) close pingcap/tiflow#1710, ref pingcap/tiflow#3456 --- Makefile | 7 +- cdc/api/open.go | 65 ++++---- cdc/api/open_test.go | 286 +++++++++++++++++++++++++++++----- cdc/api/owner.go | 28 +--- cdc/api/status.go | 5 +- cdc/api/util.go | 58 +++++++ cdc/capture/capture.go | 62 +++++--- cdc/http_test.go | 4 +- cdc/owner/mock/owner_mock.go | 126 +++++++++++++++ cdc/owner/owner.go | 206 +++++++++++++----------- cdc/owner/owner_test.go | 47 +++--- cdc/owner/status_provider.go | 97 ++++++------ cdc/processor/manager.go | 46 +++--- cdc/processor/manager_test.go | 21 ++- cdc/server_test.go | 4 +- errors.toml | 10 +- pkg/errors/errors.go | 10 +- scripts/check-copyright.sh | 2 +- 18 files changed, 776 insertions(+), 308 deletions(-) create mode 100644 cdc/owner/mock/owner_mock.go diff --git a/Makefile b/Makefile index c338ba4dac1..32abfbaea10 100644 --- a/Makefile +++ b/Makefile @@ -112,7 +112,7 @@ kafka_consumer: install: go install ./... -unit_test: check_failpoint_ctl +unit_test: check_failpoint_ctl generate_mock mkdir -p "$(TEST_DIR)" $(FAILPOINT_ENABLE) @export log_level=error;\ @@ -167,7 +167,7 @@ integration_test_mysql: integration_test_kafka: check_third_party_binary tests/integration_tests/run.sh kafka "$(CASE)" "$(START_AT)" -fmt: tools/bin/gofumports tools/bin/shfmt +fmt: tools/bin/gofumports tools/bin/shfmt generate_mock @echo "gofmt (simplify)" tools/bin/gofumports -l -w $(FILES) 2>&1 | $(FAIL_ON_STDOUT) @echo "run shfmt" @@ -234,6 +234,9 @@ data-flow-diagram: docs/data-flow.dot swagger-spec: tools/bin/swag tools/bin/swag init --parseVendor -generalInfo cdc/api/open.go --output docs/swagger +generate_mock: tools/bin/mockgen + tools/bin/mockgen -source cdc/owner/owner.go -destination cdc/owner/mock/owner_mock.go + clean: go clean -i ./... rm -rf *.out diff --git a/cdc/api/open.go b/cdc/api/open.go index 14f0b29c403..feacbd8525d 100644 --- a/cdc/api/open.go +++ b/cdc/api/open.go @@ -90,7 +90,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) { changefeedGroup.POST("/:changefeed_id/pause", api.PauseChangefeed) changefeedGroup.POST("/:changefeed_id/resume", api.ResumeChangefeed) changefeedGroup.DELETE("/:changefeed_id", api.RemoveChangefeed) - changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTable) + changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTables) changefeedGroup.POST("/:changefeed_id/tables/move_table", api.MoveTable) // owner API @@ -319,11 +319,10 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) { Type: model.AdminStop, } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) - return nil - }) - + if err := handleOwnerJob(ctx, h.capture, job); err != nil { + _ = c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -361,11 +360,10 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) { Type: model.AdminResume, } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) - return nil - }) - + if err := handleOwnerJob(ctx, h.capture, job); err != nil { + _ = c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -465,15 +463,14 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { Type: model.AdminRemove, } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) - return nil - }) - + if err := handleOwnerJob(ctx, h.capture, job); err != nil { + _ = c.Error(err) + return + } c.Status(http.StatusAccepted) } -// RebalanceTable rebalances tables +// RebalanceTables rebalances tables // @Summary rebalance tables // @Description rebalance all tables of a changefeed // @Tags changefeed @@ -483,7 +480,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) { // @Success 202 // @Failure 500,400 {object} model.HTTPError // @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post] -func (h *openAPI) RebalanceTable(c *gin.Context) { +func (h *openAPI) RebalanceTables(c *gin.Context) { if !h.capture.IsOwner() { h.forwardToOwner(c) return @@ -503,11 +500,10 @@ func (h *openAPI) RebalanceTable(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.TriggerRebalance(changefeedID) - return nil - }) - + if err := handleOwnerRebalance(ctx, h.capture, changefeedID); err != nil { + _ = c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -557,11 +553,12 @@ func (h *openAPI) MoveTable(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID) - return nil - }) - + err = handleOwnerScheduleTable( + ctx, h.capture, changefeedID, data.CaptureID, data.TableID) + if err != nil { + _ = c.Error(err) + return + } c.Status(http.StatusAccepted) } @@ -580,10 +577,10 @@ func (h *openAPI) ResignOwner(c *gin.Context) { return } - _ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.AsyncStop() - return nil - }) + o, _ := h.capture.GetOwner() + if o != nil { + o.AsyncStop() + } c.Status(http.StatusAccepted) } @@ -748,7 +745,7 @@ func (h *openAPI) ServerStatus(c *gin.Context) { func (h *openAPI) Health(c *gin.Context) { ctx := c.Request.Context() - if _, err := h.capture.GetOwner(ctx); err != nil { + if _, err := h.capture.GetOwnerCaptureInfo(ctx); err != nil { c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) return } @@ -798,7 +795,7 @@ func (h *openAPI) forwardToOwner(c *gin.Context) { var owner *model.CaptureInfo // get owner err := retry.Do(ctx, func() error { - o, err := h.capture.GetOwner(ctx) + o, err := h.capture.GetOwnerCaptureInfo(ctx) if err != nil { log.Info("get owner failed, retry later", zap.Error(err)) return err diff --git a/cdc/api/open_test.go b/cdc/api/open_test.go index 48a1fb53d4d..6541b63ba80 100644 --- a/cdc/api/open_test.go +++ b/cdc/api/open_test.go @@ -23,9 +23,11 @@ import ( "testing" "github.com/gin-gonic/gin" + "github.com/golang/mock/gomock" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" + mock_owner "github.com/pingcap/tiflow/cdc/owner/mock" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -88,8 +90,7 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI return args.Get(0).([]*model.CaptureInfo), args.Error(1) } -func newRouter(p *mockStatusProvider) *gin.Engine { - c := capture.NewCapture4Test(true) +func newRouter(c *capture.Capture, p *mockStatusProvider) *gin.Engine { router := gin.New() RegisterOpenAPIRoutes(router, NewOpenAPI4Test(c, p)) return router @@ -108,7 +109,9 @@ func newStatusProvider() *mockStatusProvider { Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil) statusProvider.On("GetTaskPositions", mock.Anything). - Return(map[model.CaptureID]*model.TaskPosition{captureID: {Error: &model.RunningError{Message: "test"}}}, nil) + Return(map[model.CaptureID]*model.TaskPosition{ + captureID: {Error: &model.RunningError{Message: "test"}}, + }, nil) statusProvider.On("GetAllChangeFeedStatuses", mock.Anything). Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{ @@ -139,7 +142,10 @@ func newStatusProvider() *mockStatusProvider { func TestListChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test list changefeed succeeded api := testCase{url: "/api/v1/changefeeds", method: "GET"} @@ -168,7 +174,10 @@ func TestListChangefeed(t *testing.T) { func TestGetChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test get changefeed succeeded api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "GET"} @@ -195,16 +204,33 @@ func TestGetChangefeed(t *testing.T) { func TestPauseChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test pause changefeed succeeded + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + require.EqualValues(t, changeFeedID, adminJob.CfID) + require.EqualValues(t, model.AdminStop, adminJob.Type) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test pause changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), method: "POST"} + // test pause changefeed failed from owner side + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID) + close(done) + }) + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", changeFeedID), method: "POST"} w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -213,20 +239,51 @@ func TestPauseChangefeed(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test pause changefeed failed + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/pause", nonExistChangefeedID), + method: "POST", + } + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestResumeChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test resume changefeed succeeded + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + require.EqualValues(t, changeFeedID, adminJob.CfID) + require.EqualValues(t, model.AdminResume, adminJob.Type) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test resume changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), method: "POST"} + // test resume changefeed failed from owner side. + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID) + close(done) + }) + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", changeFeedID), method: "POST"} w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -235,20 +292,51 @@ func TestResumeChangefeed(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test resume changefeed failed + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/resume", nonExistChangefeedID), + method: "POST", + } + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestRemoveChangefeed(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test remove changefeed succeeded + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + require.EqualValues(t, changeFeedID, adminJob.CfID) + require.EqualValues(t, model.AdminRemove, adminJob.Type) + close(done) + }) api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test remove changefeed failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), method: "DELETE"} + // test remove changefeed failed from owner size + mo.EXPECT(). + EnqueueJob(gomock.Any(), gomock.Any()). + Do(func(adminJob model.AdminJob, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(adminJob.CfID) + close(done) + }) + api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s", changeFeedID), method: "DELETE"} w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -257,20 +345,56 @@ func TestRemoveChangefeed(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test remove changefeed failed + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s", nonExistChangefeedID), + method: "DELETE", + } + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } -func TestRebalanceTable(t *testing.T) { +func TestRebalanceTables(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + // test rebalance table succeeded - api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), method: "POST"} + mo.EXPECT(). + RebalanceTables(gomock.Any(), gomock.Any()). + Do(func(cfID model.ChangeFeedID, done chan<- error) { + require.EqualValues(t, cfID, changeFeedID) + close(done) + }) + api := testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), + method: "POST", + } w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test rebalance table failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), method: "POST"} + // test rebalance table failed from owner side. + mo.EXPECT(). + RebalanceTables(gomock.Any(), gomock.Any()). + Do(func(cfID model.ChangeFeedID, done chan<- error) { + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID) + close(done) + }) + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", changeFeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -279,11 +403,31 @@ func TestRebalanceTable(t *testing.T) { err := json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test rebalance table failed + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/rebalance_table", nonExistChangefeedID), + method: "POST", + } + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, nil) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestMoveTable(t *testing.T) { t.Parallel() + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) + + // test move table succeeded data := struct { CaptureID string `json:"capture_id"` TableID int64 `json:"table_id"` @@ -291,17 +435,50 @@ func TestMoveTable(t *testing.T) { b, err := json.Marshal(&data) require.Nil(t, err) body := bytes.NewReader(b) - - router := newRouter(newStatusProvider()) - // test move table succeeded - api := testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), method: "POST"} + mo.EXPECT(). + ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func( + cfID model.ChangeFeedID, toCapture model.CaptureID, + tableID model.TableID, done chan<- error, + ) { + require.EqualValues(t, cfID, changeFeedID) + require.EqualValues(t, toCapture, data.CaptureID) + require.EqualValues(t, tableID, data.TableID) + close(done) + }) + api := testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), + method: "POST", + } w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, body) router.ServeHTTP(w, req) require.Equal(t, 202, w.Code) - // test move table failed - api = testCase{url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID), method: "POST"} + // test move table failed from owner side. + data = struct { + CaptureID string `json:"capture_id"` + TableID int64 `json:"table_id"` + }{captureID, 1} + b, err = json.Marshal(&data) + require.Nil(t, err) + body = bytes.NewReader(b) + mo.EXPECT(). + ScheduleTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func( + cfID model.ChangeFeedID, toCapture model.CaptureID, + tableID model.TableID, done chan<- error, + ) { + require.EqualValues(t, cfID, changeFeedID) + require.EqualValues(t, toCapture, data.CaptureID) + require.EqualValues(t, tableID, data.TableID) + done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(cfID) + close(done) + }) + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", changeFeedID), + method: "POST", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, body) router.ServeHTTP(w, req) @@ -310,12 +487,30 @@ func TestMoveTable(t *testing.T) { err = json.NewDecoder(w.Body).Decode(&respErr) require.Nil(t, err) require.Contains(t, respErr.Error, "changefeed not exists") + + // test move table failed + api = testCase{ + url: fmt.Sprintf("/api/v1/changefeeds/%s/tables/move_table", nonExistChangefeedID), + method: "POST", + } + w = httptest.NewRecorder() + req, _ = http.NewRequest(api.method, api.url, body) + router.ServeHTTP(w, req) + require.Equal(t, 400, w.Code) + respErr = model.HTTPError{} + err = json.NewDecoder(w.Body).Decode(&respErr) + require.Nil(t, err) + require.Contains(t, respErr.Error, "changefeed not exists") } func TestResignOwner(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test resign owner succeeded + mo.EXPECT().AsyncStop() api := testCase{url: "/api/v1/owner/resign", method: "POST"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) @@ -325,9 +520,15 @@ func TestResignOwner(t *testing.T) { func TestGetProcessor(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test get processor succeeded - api := testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID), method: "GET"} + api := testCase{ + url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, captureID), + method: "GET", + } w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -338,7 +539,10 @@ func TestGetProcessor(t *testing.T) { require.Equal(t, "test", processorDetail.Error.Message) // test get processor fail due to capture ID error - api = testCase{url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"), method: "GET"} + api = testCase{ + url: fmt.Sprintf("/api/v1/processors/%s/%s", changeFeedID, "non-exist-capture"), + method: "GET", + } w = httptest.NewRecorder() req, _ = http.NewRequest(api.method, api.url, nil) router.ServeHTTP(w, req) @@ -346,12 +550,15 @@ func TestGetProcessor(t *testing.T) { httpError := &model.HTTPError{} err = json.NewDecoder(w.Body).Decode(httpError) require.Nil(t, err) - require.Contains(t, httpError.Error, "capture not exists, key: non-exist-capture") + require.Contains(t, httpError.Error, "capture not exists, non-exist-capture") } func TestListProcessor(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test list processor succeeded api := testCase{url: "/api/v1/processors", method: "GET"} w := httptest.NewRecorder() @@ -366,7 +573,10 @@ func TestListProcessor(t *testing.T) { func TestListCapture(t *testing.T) { t.Parallel() - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) // test list processor succeeded api := testCase{url: "/api/v1/captures", method: "GET"} w := httptest.NewRecorder() @@ -382,7 +592,10 @@ func TestListCapture(t *testing.T) { func TestServerStatus(t *testing.T) { t.Parallel() // capture is owner - ownerRouter := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + ownerRouter := newRouter(cp, newStatusProvider()) api := testCase{url: "/api/v1/status", method: "GET"} w := httptest.NewRecorder() req, _ := http.NewRequest(api.method, api.url, nil) @@ -395,7 +608,7 @@ func TestServerStatus(t *testing.T) { require.True(t, resp.IsOwner) // capture is not owner - c := capture.NewCapture4Test(false) + c := capture.NewCapture4Test(nil) r := gin.New() RegisterOpenAPIRoutes(r, NewOpenAPI4Test(c, nil)) api = testCase{url: "/api/v1/status", method: "GET"} @@ -416,7 +629,10 @@ func TestSetLogLevel(t *testing.T) { data := struct { Level string `json:"log_level"` }{"warn"} - router := newRouter(newStatusProvider()) + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + cp := capture.NewCapture4Test(mo) + router := newRouter(cp, newStatusProvider()) api := testCase{url: "/api/v1/log", method: "POST"} w := httptest.NewRecorder() b, err := json.Marshal(&data) diff --git a/cdc/api/owner.go b/cdc/api/owner.go index a8b565c4a1a..d1275d8ea35 100644 --- a/cdc/api/owner.go +++ b/cdc/api/owner.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/owner" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" "github.com/tikv/client-go/v2/oracle" @@ -112,10 +111,10 @@ func (h *ownerAPI) handleResignOwner(w http.ResponseWriter, req *http.Request) { handleOwnerResp(w, concurrency.ErrElectionNotLeader) return } - err := h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.AsyncStop() - return nil - }) + o, err := h.capture.GetOwner() + if o != nil { + o.AsyncStop() + } handleOwnerResp(w, err) } @@ -153,11 +152,7 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques Opts: opts, } - err = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.EnqueueJob(job) - return nil - }) - + err = handleOwnerJob(req.Context(), h.capture, job) handleOwnerResp(w, err) } @@ -180,11 +175,7 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque return } - err = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.TriggerRebalance(changefeedID) - return nil - }) - + err = handleOwnerRebalance(req.Context(), h.capture, changefeedID) handleOwnerResp(w, err) } @@ -221,11 +212,8 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) { return } - err = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error { - owner.ManualSchedule(changefeedID, to, tableID) - return nil - }) - + err = handleOwnerScheduleTable( + req.Context(), h.capture, changefeedID, to, tableID) handleOwnerResp(w, err) } diff --git a/cdc/api/status.go b/cdc/api/status.go index eb93bfc5982..e71c1528c03 100644 --- a/cdc/api/status.go +++ b/cdc/api/status.go @@ -60,9 +60,10 @@ func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli *etcd.CDCEtcdClient, } func (h *statusAPI) handleDebugInfo(w http.ResponseWriter, req *http.Request) { - h.capture.WriteDebugInfo(w) + ctx := req.Context() + h.capture.WriteDebugInfo(ctx, w) fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n") - h.writeEtcdInfo(req.Context(), h.capture.EtcdClient, w) + h.writeEtcdInfo(ctx, h.capture.EtcdClient, w) } func (h *statusAPI) handleStatus(w http.ResponseWriter, req *http.Request) { diff --git a/cdc/api/util.go b/cdc/api/util.go index 4baebf54e64..7f9650972a0 100644 --- a/cdc/api/util.go +++ b/cdc/api/util.go @@ -14,12 +14,15 @@ package api import ( + "context" "encoding/json" "net/http" "strings" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -76,3 +79,58 @@ func writeData(w http.ResponseWriter, data interface{}) { log.Error("fail to write data", zap.Error(err)) } } + +func handleOwnerJob( + ctx context.Context, capture *capture.Capture, job model.AdminJob, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.EnqueueJob(job, done) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) + } +} + +func handleOwnerRebalance( + ctx context.Context, capture *capture.Capture, changefeedID string, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.RebalanceTables(changefeedID, done) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) + } +} + +func handleOwnerScheduleTable( + ctx context.Context, capture *capture.Capture, + changefeedID string, captureID string, tableID int64, +) error { + // Use buffered channel to prevernt blocking owner. + done := make(chan error, 1) + o, err := capture.GetOwner() + if err != nil { + return errors.Trace(err) + } + o.ScheduleTable(changefeedID, captureID, tableID, done) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-done: + return errors.Trace(err) + } +} diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index b2f8fae43aa..aa5681e608f 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -53,7 +53,7 @@ type Capture struct { info *model.CaptureInfo ownerMu sync.Mutex - owner *owner.Owner + owner owner.Owner processorManager *processor.Manager // session keeps alive between the capture and etcd @@ -88,7 +88,7 @@ type Capture struct { cancel context.CancelFunc newProcessorManager func() *processor.Manager - newOwner func(pd.Client) *owner.Owner + newOwner func(pd.Client) owner.Owner } // NewCapture returns a new Capture instance @@ -107,13 +107,11 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.C } } -func NewCapture4Test(isOwner bool) *Capture { +func NewCapture4Test(o owner.Owner) *Capture { res := &Capture{ info: &model.CaptureInfo{ID: "capture-for-test", AdvertiseAddr: "127.0.0.1", Version: "test"}, } - if isOwner { - res.owner = &owner.Owner{} - } + res.owner = o return res } @@ -482,20 +480,20 @@ func (c *Capture) runEtcdWorker( return nil } -func (c *Capture) setOwner(owner *owner.Owner) { +func (c *Capture) setOwner(owner owner.Owner) { c.ownerMu.Lock() defer c.ownerMu.Unlock() c.owner = owner } -// OperateOwnerUnderLock operates the owner with lock -func (c *Capture) OperateOwnerUnderLock(fn func(*owner.Owner) error) error { +// GetOwner returns owner if it is the owner. +func (c *Capture) GetOwner() (owner.Owner, error) { c.ownerMu.Lock() defer c.ownerMu.Unlock() if c.owner == nil { - return cerror.ErrNotOwner.GenWithStackByArgs() + return nil, cerror.ErrNotOwner.GenWithStackByArgs() } - return fn(c.owner) + return c.owner, nil } // campaign to be an owner. @@ -529,10 +527,10 @@ func (c *Capture) AsyncClose() { defer c.cancel() // Safety: Here we mainly want to stop the owner // and ignore it if the owner does not exist or is not set. - _ = c.OperateOwnerUnderLock(func(o *owner.Owner) error { + o, _ := c.GetOwner() + if o != nil { o.AsyncStop() - return nil - }) + } c.captureMu.Lock() defer c.captureMu.Unlock() if c.processorManager != nil { @@ -571,20 +569,38 @@ func (c *Capture) AsyncClose() { } // WriteDebugInfo writes the debug info into writer. -func (c *Capture) WriteDebugInfo(w io.Writer) { +func (c *Capture) WriteDebugInfo(ctx context.Context, w io.Writer) { + wait := func(done <-chan error) { + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case err = <-done: + } + if err != nil { + log.Warn("write debug info failed", zap.Error(err)) + } + } // Safety: Because we are mainly outputting information about the owner here, // if the owner does not exist or is not set, the information will not be output. - _ = c.OperateOwnerUnderLock(func(o *owner.Owner) error { + o, _ := c.GetOwner() + if o != nil { + doneOwner := make(chan error, 1) fmt.Fprintf(w, "\n\n*** owner info ***:\n\n") - o.WriteDebugInfo(w) - return nil - }) + o.WriteDebugInfo(w, doneOwner) + // wait the debug info printed + wait(doneOwner) + } + + doneM := make(chan error, 1) c.captureMu.Lock() defer c.captureMu.Unlock() if c.processorManager != nil { fmt.Fprintf(w, "\n\n*** processors info ***:\n\n") - c.processorManager.WriteDebugInfo(w) + c.processorManager.WriteDebugInfo(ctx, w, doneM) } + // wait the debug info printed + wait(doneM) } // IsOwner returns whether the capture is an owner @@ -594,8 +610,8 @@ func (c *Capture) IsOwner() bool { return c.owner != nil } -// GetOwner return the owner of current TiCDC cluster -func (c *Capture) GetOwner(ctx context.Context) (*model.CaptureInfo, error) { +// GetOwnerCaptureInfo return the owner capture info of current TiCDC cluster +func (c *Capture) GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo, error) { _, captureInfos, err := c.EtcdClient.GetCaptures(ctx) if err != nil { return nil, err @@ -621,5 +637,5 @@ func (c *Capture) StatusProvider() owner.StatusProvider { if c.owner == nil { return nil } - return c.owner.StatusProvider() + return owner.NewStatusProvider(c.owner) } diff --git a/cdc/http_test.go b/cdc/http_test.go index 39398a82639..6aaa47f3605 100644 --- a/cdc/http_test.go +++ b/cdc/http_test.go @@ -38,7 +38,7 @@ func (a *testCase) String() string { func TestPProfPath(t *testing.T) { router := gin.New() - RegisterRoutes(router, capture.NewCapture4Test(false), nil) + RegisterRoutes(router, capture.NewCapture4Test(nil), nil) apis := []*testCase{ {"/debug/pprof/", http.MethodGet}, @@ -63,7 +63,7 @@ func TestPProfPath(t *testing.T) { func TestHandleFailpoint(t *testing.T) { router := gin.New() - RegisterRoutes(router, capture.NewCapture4Test(false), nil) + RegisterRoutes(router, capture.NewCapture4Test(nil), nil) fp := "github.com/pingcap/tiflow/cdc/TestHandleFailpoint" uri := fmt.Sprintf("/debug/fail/%s", fp) body := bytes.NewReader([]byte("return(true)")) diff --git a/cdc/owner/mock/owner_mock.go b/cdc/owner/mock/owner_mock.go new file mode 100644 index 00000000000..b99c3b1d38f --- /dev/null +++ b/cdc/owner/mock/owner_mock.go @@ -0,0 +1,126 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: cdc/owner/owner.go + +// Package mock_owner is a generated GoMock package. +package mock_owner + +import ( + context "context" + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + model "github.com/pingcap/tiflow/cdc/model" + owner "github.com/pingcap/tiflow/cdc/owner" + orchestrator "github.com/pingcap/tiflow/pkg/orchestrator" +) + +// MockOwner is a mock of Owner interface. +type MockOwner struct { + ctrl *gomock.Controller + recorder *MockOwnerMockRecorder +} + +// MockOwnerMockRecorder is the mock recorder for MockOwner. +type MockOwnerMockRecorder struct { + mock *MockOwner +} + +// NewMockOwner creates a new mock instance. +func NewMockOwner(ctrl *gomock.Controller) *MockOwner { + mock := &MockOwner{ctrl: ctrl} + mock.recorder = &MockOwnerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOwner) EXPECT() *MockOwnerMockRecorder { + return m.recorder +} + +// AsyncStop mocks base method. +func (m *MockOwner) AsyncStop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AsyncStop") +} + +// AsyncStop indicates an expected call of AsyncStop. +func (mr *MockOwnerMockRecorder) AsyncStop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsyncStop", reflect.TypeOf((*MockOwner)(nil).AsyncStop)) +} + +// EnqueueJob mocks base method. +func (m *MockOwner) EnqueueJob(adminJob model.AdminJob, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "EnqueueJob", adminJob, done) +} + +// EnqueueJob indicates an expected call of EnqueueJob. +func (mr *MockOwnerMockRecorder) EnqueueJob(adminJob, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueJob", reflect.TypeOf((*MockOwner)(nil).EnqueueJob), adminJob, done) +} + +// Query mocks base method. +func (m *MockOwner) Query(query *owner.Query, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Query", query, done) +} + +// Query indicates an expected call of Query. +func (mr *MockOwnerMockRecorder) Query(query, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Query", reflect.TypeOf((*MockOwner)(nil).Query), query, done) +} + +// RebalanceTables mocks base method. +func (m *MockOwner) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RebalanceTables", cfID, done) +} + +// RebalanceTables indicates an expected call of RebalanceTables. +func (mr *MockOwnerMockRecorder) RebalanceTables(cfID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceTables", reflect.TypeOf((*MockOwner)(nil).RebalanceTables), cfID, done) +} + +// ScheduleTable mocks base method. +func (m *MockOwner) ScheduleTable(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ScheduleTable", cfID, toCapture, tableID, done) +} + +// ScheduleTable indicates an expected call of ScheduleTable. +func (mr *MockOwnerMockRecorder) ScheduleTable(cfID, toCapture, tableID, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleTable", reflect.TypeOf((*MockOwner)(nil).ScheduleTable), cfID, toCapture, tableID, done) +} + +// Tick mocks base method. +func (m *MockOwner) Tick(ctx context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Tick", ctx, state) + ret0, _ := ret[0].(orchestrator.ReactorState) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Tick indicates an expected call of Tick. +func (mr *MockOwnerMockRecorder) Tick(ctx, state interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockOwner)(nil).Tick), ctx, state) +} + +// WriteDebugInfo mocks base method. +func (m *MockOwner) WriteDebugInfo(w io.Writer, done chan<- error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WriteDebugInfo", w, done) +} + +// WriteDebugInfo indicates an expected call of WriteDebugInfo. +func (mr *MockOwnerMockRecorder) WriteDebugInfo(w, done interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteDebugInfo", reflect.TypeOf((*MockOwner)(nil).WriteDebugInfo), w, done) +} diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 53af624c721..70c546e0a17 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -15,7 +15,6 @@ package owner import ( "context" - "fmt" "io" "math" "sync" @@ -41,7 +40,7 @@ type ownerJobType int // All OwnerJob types const ( ownerJobTypeRebalance ownerJobType = iota - ownerJobTypeManualSchedule + ownerJobTypeScheduleTable ownerJobTypeAdminJob ownerJobTypeDebugInfo ownerJobTypeQuery @@ -51,30 +50,45 @@ const ( // captures with versions different from that of the owner const versionInconsistentLogRate = 1 +// Export field names for pretty printing. type ownerJob struct { - tp ownerJobType - changefeedID model.ChangeFeedID + Tp ownerJobType + ChangefeedID model.ChangeFeedID - // for ManualSchedule only - targetCaptureID model.CaptureID - // for ManualSchedule only - tableID model.TableID + // for ScheduleTable only + TargetCaptureID model.CaptureID + // for ScheduleTable only + TableID model.TableID // for Admin Job only - adminJob *model.AdminJob + AdminJob *model.AdminJob // for debug info only debugInfoWriter io.Writer // for status provider - query *ownerQuery + query *Query - done chan struct{} + done chan<- error } -// Owner manages many changefeeds -// All public functions are THREAD-SAFE, except for Tick, Tick is only used for etcd worker -type Owner struct { +// Owner managers TiCDC cluster. +// +// The interface is thread-safe, except for Tick, it's only used by etcd worker. +type Owner interface { + orchestrator.Reactor + EnqueueJob(adminJob model.AdminJob, done chan<- error) + RebalanceTables(cfID model.ChangeFeedID, done chan<- error) + ScheduleTable( + cfID model.ChangeFeedID, toCapture model.CaptureID, + tableID model.TableID, done chan<- error, + ) + WriteDebugInfo(w io.Writer, done chan<- error) + Query(query *Query, done chan<- error) + AsyncStop() +} + +type ownerImpl struct { changefeeds map[model.ChangeFeedID]*changefeed captures map[model.CaptureID]*model.CaptureInfo @@ -88,15 +102,16 @@ type Owner struct { closed int32 // bootstrapped specifies whether the owner has been initialized. // This will only be done when the owner starts the first Tick. - // NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value. + // NOTICE: Do not use it in a method other than tick unexpectedly, + // as it is not a thread-safe value. bootstrapped bool newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } // NewOwner creates a new Owner -func NewOwner(pdClient pd.Client) *Owner { - return &Owner{ +func NewOwner(pdClient pd.Client) Owner { + return &ownerImpl{ changefeeds: make(map[model.ChangeFeedID]*changefeed), gcManager: gc.NewManager(pdClient), lastTickTime: time.Now(), @@ -110,8 +125,8 @@ func NewOwner4Test( newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error), newSink func() DDLSink, pdClient pd.Client, -) *Owner { - o := NewOwner(pdClient) +) Owner { + o := NewOwner(pdClient).(*ownerImpl) // Most tests do not need to test bootstrap. o.bootstrapped = true o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { @@ -121,7 +136,7 @@ func NewOwner4Test( } // Tick implements the Reactor interface -func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { +func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { failpoint.Inject("owner-run-with-error", func() { failpoint.Return(nil, errors.New("owner run with injected error")) }) @@ -201,58 +216,65 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) } // EnqueueJob enqueues an admin job into an internal queue, and the Owner will handle the job in the next tick -func (o *Owner) EnqueueJob(adminJob model.AdminJob) { +// `done` must be buffered to prevernt blocking owner. +func (o *ownerImpl) EnqueueJob(adminJob model.AdminJob, done chan<- error) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeAdminJob, - adminJob: &adminJob, - changefeedID: adminJob.CfID, - done: make(chan struct{}), + Tp: ownerJobTypeAdminJob, + AdminJob: &adminJob, + ChangefeedID: adminJob.CfID, + done: done, }) } -// TriggerRebalance triggers a rebalance for the specified changefeed -func (o *Owner) TriggerRebalance(cfID model.ChangeFeedID) { +// RebalanceTables triggers a rebalance for the specified changefeed +// `done` must be buffered to prevernt blocking owner. +func (o *ownerImpl) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeRebalance, - changefeedID: cfID, - done: make(chan struct{}), + Tp: ownerJobTypeRebalance, + ChangefeedID: cfID, + done: done, }) } -// ManualSchedule moves a table from a capture to another capture -func (o *Owner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID) { +// ScheduleTable moves a table from a capture to another capture +// `done` must be buffered to prevernt blocking owner. +func (o *ownerImpl) ScheduleTable( + cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, + done chan<- error, +) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeManualSchedule, - changefeedID: cfID, - targetCaptureID: toCapture, - tableID: tableID, - done: make(chan struct{}), + Tp: ownerJobTypeScheduleTable, + ChangefeedID: cfID, + TargetCaptureID: toCapture, + TableID: tableID, + done: done, }) } // WriteDebugInfo writes debug info into the specified http writer -func (o *Owner) WriteDebugInfo(w io.Writer) { - timeout := time.Second * 3 - done := make(chan struct{}) +func (o *ownerImpl) WriteDebugInfo(w io.Writer, done chan<- error) { o.pushOwnerJob(&ownerJob{ - tp: ownerJobTypeDebugInfo, + Tp: ownerJobTypeDebugInfo, debugInfoWriter: w, done: done, }) - // wait the debug info printed - select { - case <-done: - case <-time.After(timeout): - fmt.Fprintf(w, "failed to print debug info for owner\n") - } +} + +// Query queries owner internal information. +func (o *ownerImpl) Query(query *Query, done chan<- error) { + o.pushOwnerJob(&ownerJob{ + Tp: ownerJobTypeQuery, + query: query, + done: done, + }) } // AsyncStop stops the owner asynchronously -func (o *Owner) AsyncStop() { +func (o *ownerImpl) AsyncStop() { atomic.StoreInt32(&o.closed, 1) } -func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { +func (o *ownerImpl) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { return nil, info != nil, nil }) @@ -277,7 +299,7 @@ func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { } // Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it. -func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) { +func (o *ownerImpl) Bootstrap(state *orchestrator.GlobalReactorState) { log.Info("Start bootstrapping") fixChangefeedInfos(state) } @@ -297,7 +319,7 @@ func fixChangefeedInfos(state *orchestrator.GlobalReactorState) { } } -func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { +func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) { // Keep the value of prometheus expression `rate(counter)` = 1 // Please also change alert rule in ticdc.rules.yml when change the expression value. now := time.Now() @@ -321,7 +343,7 @@ func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { } } -func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.CaptureInfo) bool { +func (o *ownerImpl) clusterVersionConsistent(captures map[model.CaptureID]*model.CaptureInfo) bool { myVersion := version.ReleaseVersion for _, capture := range captures { if myVersion != capture.Version { @@ -335,24 +357,26 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap return true } -func (o *Owner) handleJobs() { +func (o *ownerImpl) handleJobs() { jobs := o.takeOwnerJobs() for _, job := range jobs { - changefeedID := job.changefeedID + changefeedID := job.ChangefeedID cfReactor, exist := o.changefeeds[changefeedID] - if !exist && job.tp != ownerJobTypeQuery { + if !exist && job.Tp != ownerJobTypeQuery { log.Warn("changefeed not found when handle a job", zap.Reflect("job", job)) + job.done <- cerror.ErrChangeFeedNotExists.FastGenByArgs(job.ChangefeedID) + close(job.done) continue } - switch job.tp { + switch job.Tp { case ownerJobTypeAdminJob: - cfReactor.feedStateManager.PushAdminJob(job.adminJob) - case ownerJobTypeManualSchedule: - cfReactor.scheduler.MoveTable(job.tableID, job.targetCaptureID) + cfReactor.feedStateManager.PushAdminJob(job.AdminJob) + case ownerJobTypeScheduleTable: + cfReactor.scheduler.MoveTable(job.TableID, job.TargetCaptureID) case ownerJobTypeRebalance: cfReactor.scheduler.Rebalance() case ownerJobTypeQuery: - o.handleQueries(job.query) + job.done <- o.handleQueries(job.query) case ownerJobTypeDebugInfo: // TODO: implement this function } @@ -360,9 +384,9 @@ func (o *Owner) handleJobs() { } } -func (o *Owner) handleQueries(query *ownerQuery) { - switch query.tp { - case ownerQueryAllChangeFeedStatuses: +func (o *ownerImpl) handleQueries(query *Query) error { + switch query.Tp { + case QueryAllChangeFeedStatuses: ret := map[model.ChangeFeedID]*model.ChangeFeedStatus{} for cfID, cfReactor := range o.changefeeds { ret[cfID] = &model.ChangeFeedStatus{} @@ -376,8 +400,8 @@ func (o *Owner) handleQueries(query *ownerQuery) { ret[cfID].CheckpointTs = cfReactor.state.Status.CheckpointTs ret[cfID].AdminJobType = cfReactor.state.Status.AdminJobType } - query.data = ret - case ownerQueryAllChangeFeedInfo: + query.Data = ret + case QueryAllChangeFeedInfo: ret := map[model.ChangeFeedID]*model.ChangeFeedInfo{} for cfID, cfReactor := range o.changefeeds { if cfReactor.state == nil { @@ -390,20 +414,17 @@ func (o *Owner) handleQueries(query *ownerQuery) { var err error ret[cfID], err = cfReactor.state.Info.Clone() if err != nil { - query.err = errors.Trace(err) - return + return errors.Trace(err) } } - query.data = ret - case ownerQueryAllTaskStatuses: - cfReactor, ok := o.changefeeds[query.changeFeedID] + query.Data = ret + case QueryAllTaskStatuses: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } if cfReactor.state == nil { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } var ret map[model.CaptureID]*model.TaskStatus @@ -412,8 +433,7 @@ func (o *Owner) handleQueries(query *ownerQuery) { var err error ret, err = provider.GetTaskStatuses() if err != nil { - query.err = errors.Trace(err) - return + return errors.Trace(err) } } else { ret = map[model.CaptureID]*model.TaskStatus{} @@ -421,12 +441,11 @@ func (o *Owner) handleQueries(query *ownerQuery) { ret[captureID] = taskStatus.Clone() } } - query.data = ret - case ownerQueryTaskPositions: - cfReactor, ok := o.changefeeds[query.changeFeedID] + query.Data = ret + case QueryTaskPositions: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } var ret map[model.CaptureID]*model.TaskPosition @@ -435,21 +454,19 @@ func (o *Owner) handleQueries(query *ownerQuery) { var err error ret, err = provider.GetTaskPositions() if err != nil { - query.err = errors.Trace(err) - return + return errors.Trace(err) } } else { if cfReactor.state == nil { - query.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.changeFeedID) - return + return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(query.ChangeFeedID) } ret = map[model.CaptureID]*model.TaskPosition{} for captureID, taskPosition := range cfReactor.state.TaskPositions { ret[captureID] = taskPosition.Clone() } } - query.data = ret - case ownerQueryProcessors: + query.Data = ret + case QueryProcessors: var ret []*model.ProcInfoSnap for cfID, cfReactor := range o.changefeeds { if cfReactor.state == nil { @@ -462,8 +479,8 @@ func (o *Owner) handleQueries(query *ownerQuery) { }) } } - query.data = ret - case ownerQueryCaptures: + query.Data = ret + case QueryCaptures: var ret []*model.CaptureInfo for _, captureInfo := range o.captures { ret = append(ret, &model.CaptureInfo{ @@ -472,11 +489,12 @@ func (o *Owner) handleQueries(query *ownerQuery) { Version: captureInfo.Version, }) } - query.data = ret + query.Data = ret } + return nil } -func (o *Owner) takeOwnerJobs() []*ownerJob { +func (o *ownerImpl) takeOwnerJobs() []*ownerJob { o.ownerJobQueueMu.Lock() defer o.ownerJobQueueMu.Unlock() @@ -485,13 +503,13 @@ func (o *Owner) takeOwnerJobs() []*ownerJob { return jobs } -func (o *Owner) pushOwnerJob(job *ownerJob) { +func (o *ownerImpl) pushOwnerJob(job *ownerJob) { o.ownerJobQueueMu.Lock() defer o.ownerJobQueueMu.Unlock() o.ownerJobQueue = append(o.ownerJobQueue, job) } -func (o *Owner) updateGCSafepoint( +func (o *ownerImpl) updateGCSafepoint( ctx context.Context, state *orchestrator.GlobalReactorState, ) error { forceUpdate := false @@ -524,6 +542,6 @@ func (o *Owner) updateGCSafepoint( } // StatusProvider returns a StatusProvider -func (o *Owner) StatusProvider() StatusProvider { +func (o *ownerImpl) StatusProvider() StatusProvider { return &ownerStatusProvider{owner: o} } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index bcca13f5dad..4bf12827ac3 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -44,7 +44,7 @@ func (m *mockManager) CheckStaleCheckpointTs( var _ gc.Manager = (*mockManager)(nil) -func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*Owner, *orchestrator.GlobalReactorState, *orchestrator.ReactorStateTester) { +func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orchestrator.GlobalReactorState, *orchestrator.ReactorStateTester) { ctx.GlobalVars().PDClient = &gc.MockPDClient{ UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { return safePoint, nil @@ -68,7 +68,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*Owner, *orchestrat captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() require.Nil(t, err) tester.MustUpdate(cdcKey.String(), captureBytes) - return owner, state, tester + return owner.(*ownerImpl), state, tester } func TestCreateRemoveChangefeed(t *testing.T) { @@ -128,9 +128,11 @@ func TestCreateRemoveChangefeed(t *testing.T) { require.NotNil(t, err) // this tick create remove changefeed patches - owner.EnqueueJob(removeJob) + done := make(chan error, 1) + owner.EnqueueJob(removeJob, done) _, err = owner.Tick(ctx, state) require.Nil(t, err) + require.Nil(t, <-done) // apply patches and update owner's in memory changefeed states tester.MustApplyPatches() @@ -162,17 +164,20 @@ func TestStopChangefeed(t *testing.T) { require.Nil(t, err) require.Contains(t, owner.changefeeds, changefeedID) // remove changefeed forcibly + done := make(chan error, 1) owner.EnqueueJob(model.AdminJob{ CfID: changefeedID, Type: model.AdminRemove, Opts: &model.AdminJobOption{ ForceRemove: true, }, - }) + }, done) // this tick to clean the leak info fo the removed changefeed _, err = owner.Tick(ctx, state) require.Nil(t, err) + require.Nil(t, <-done) + // this tick to remove the changefeed state in memory tester.MustApplyPatches() _, err = owner.Tick(ctx, state) @@ -301,15 +306,19 @@ func TestAdminJob(t *testing.T) { ctx, cancel := cdcContext.WithCancel(ctx) defer cancel() + done1 := make(chan error, 1) owner, _, _ := createOwner4Test(ctx, t) owner.EnqueueJob(model.AdminJob{ CfID: "test-changefeed1", Type: model.AdminResume, - }) - owner.TriggerRebalance("test-changefeed2") - owner.ManualSchedule("test-changefeed3", "test-caputre1", 10) + }, done1) + done2 := make(chan error, 1) + owner.RebalanceTables("test-changefeed2", done2) + done3 := make(chan error, 1) + owner.ScheduleTable("test-changefeed3", "test-caputre1", 10, done3) + done4 := make(chan error, 1) var buf bytes.Buffer - owner.WriteDebugInfo(&buf) + owner.WriteDebugInfo(&buf, done4) // remove job.done, it's hard to check deep equals jobs := owner.takeOwnerJobs() @@ -320,22 +329,22 @@ func TestAdminJob(t *testing.T) { } require.Equal(t, jobs, []*ownerJob{ { - tp: ownerJobTypeAdminJob, - adminJob: &model.AdminJob{ + Tp: ownerJobTypeAdminJob, + AdminJob: &model.AdminJob{ CfID: "test-changefeed1", Type: model.AdminResume, }, - changefeedID: "test-changefeed1", + ChangefeedID: "test-changefeed1", }, { - tp: ownerJobTypeRebalance, - changefeedID: "test-changefeed2", + Tp: ownerJobTypeRebalance, + ChangefeedID: "test-changefeed2", }, { - tp: ownerJobTypeManualSchedule, - changefeedID: "test-changefeed3", - targetCaptureID: "test-caputre1", - tableID: 10, + Tp: ownerJobTypeScheduleTable, + ChangefeedID: "test-changefeed3", + TargetCaptureID: "test-caputre1", + TableID: 10, }, { - tp: ownerJobTypeDebugInfo, + Tp: ownerJobTypeDebugInfo, debugInfoWriter: &buf, }, }) @@ -344,7 +353,7 @@ func TestAdminJob(t *testing.T) { func TestUpdateGCSafePoint(t *testing.T) { mockPDClient := &gc.MockPDClient{} - o := NewOwner(mockPDClient) + o := NewOwner(mockPDClient).(*ownerImpl) o.gcManager = gc.NewManager(mockPDClient) ctx := cdcContext.NewBackendContext4Test(true) ctx, cancel := cdcContext.WithCancel(ctx) diff --git a/cdc/owner/status_provider.go b/cdc/owner/status_provider.go index 22ecdb2c072..54337e796f7 100644 --- a/cdc/owner/status_provider.go +++ b/cdc/owner/status_provider.go @@ -50,37 +50,49 @@ type StatusProvider interface { GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) } -type ownerQueryType int32 +// QueryType is the type of different queries. +type QueryType int32 const ( - ownerQueryAllChangeFeedStatuses = iota - ownerQueryAllChangeFeedInfo - ownerQueryAllTaskStatuses - ownerQueryTaskPositions - ownerQueryProcessors - ownerQueryCaptures + // QueryAllChangeFeedStatuses query all changefeed status. + QueryAllChangeFeedStatuses QueryType = iota + // QueryAllChangeFeedInfo is the type of query all changefeed info. + QueryAllChangeFeedInfo + // QueryAllTaskStatuses is the type of query all task statuses. + QueryAllTaskStatuses + // QueryTaskPositions is the type of query task positions. + QueryTaskPositions + // QueryProcessors is the type of query processors. + QueryProcessors + // QueryCaptures is the type of query captures info. + QueryCaptures ) -type ownerQuery struct { - tp ownerQueryType - changeFeedID model.ChangeFeedID +// Query wraps query command and return results. +type Query struct { + Tp QueryType + ChangeFeedID model.ChangeFeedID - data interface{} - err error + Data interface{} +} + +// NewStatusProvider returns a new StatusProvider for the owner. +func NewStatusProvider(owner Owner) StatusProvider { + return &ownerStatusProvider{owner: owner} } type ownerStatusProvider struct { - owner *Owner + owner Owner } func (p *ownerStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatus, error) { - query := &ownerQuery{ - tp: ownerQueryAllChangeFeedStatuses, + query := &Query{ + Tp: QueryAllChangeFeedStatuses, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.ChangeFeedID]*model.ChangeFeedStatus), nil + return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedStatus), nil } func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatus, error) { @@ -96,13 +108,13 @@ func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, changefee } func (p *ownerStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) { - query := &ownerQuery{ - tp: ownerQueryAllChangeFeedInfo, + query := &Query{ + Tp: QueryAllChangeFeedInfo, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.ChangeFeedID]*model.ChangeFeedInfo), nil + return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedInfo), nil } func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) { @@ -118,64 +130,59 @@ func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedI } func (p *ownerStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) { - query := &ownerQuery{ - tp: ownerQueryAllTaskStatuses, - changeFeedID: changefeedID, + query := &Query{ + Tp: QueryAllTaskStatuses, + ChangeFeedID: changefeedID, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.CaptureID]*model.TaskStatus), nil + return query.Data.(map[model.CaptureID]*model.TaskStatus), nil } func (p *ownerStatusProvider) GetTaskPositions(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskPosition, error) { - query := &ownerQuery{ - tp: ownerQueryTaskPositions, - changeFeedID: changefeedID, + query := &Query{ + Tp: QueryTaskPositions, + ChangeFeedID: changefeedID, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.(map[model.CaptureID]*model.TaskPosition), nil + return query.Data.(map[model.CaptureID]*model.TaskPosition), nil } func (p *ownerStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) { - query := &ownerQuery{ - tp: ownerQueryProcessors, + query := &Query{ + Tp: QueryProcessors, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.([]*model.ProcInfoSnap), nil + return query.Data.([]*model.ProcInfoSnap), nil } func (p *ownerStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) { - query := &ownerQuery{ - tp: ownerQueryCaptures, + query := &Query{ + Tp: QueryCaptures, } if err := p.sendQueryToOwner(ctx, query); err != nil { return nil, errors.Trace(err) } - return query.data.([]*model.CaptureInfo), nil + return query.Data.([]*model.CaptureInfo), nil } -func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *ownerQuery) error { - doneCh := make(chan struct{}) - job := &ownerJob{ - tp: ownerJobTypeQuery, - query: query, - done: doneCh, - } - p.owner.pushOwnerJob(job) +func (p *ownerStatusProvider) sendQueryToOwner(ctx context.Context, query *Query) error { + doneCh := make(chan error, 1) + p.owner.Query(query, doneCh) select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case <-doneCh: + case err := <-doneCh: + if err != nil { + return errors.Trace(err) + } } - if query.err != nil { - return errors.Trace(query.err) - } return nil } diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index e386f6cc995..a02ac106f86 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -43,7 +43,7 @@ const ( type command struct { tp commandTp payload interface{} - done chan struct{} + done chan<- error } // Manager is a manager of processor, which maintains the state and behavior of processors @@ -152,33 +152,42 @@ func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) { } } -// AsyncClose sends a close signal to Manager and closing all processors +// AsyncClose sends a signal to Manager to close all processors. func (m *Manager) AsyncClose() { - m.sendCommand(commandTpClose, nil) + timeout := 3 * time.Second + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + done := make(chan error, 1) + err := m.sendCommand(ctx, commandTpClose, nil, done) + if err != nil { + log.Warn("async close failed", zap.Error(err)) + } } // WriteDebugInfo write the debug info to Writer -func (m *Manager) WriteDebugInfo(w io.Writer) { - timeout := time.Second * 3 - done := m.sendCommand(commandTpWriteDebugInfo, w) - // wait the debug info printed - select { - case <-done: - case <-time.After(timeout): - fmt.Fprintf(w, "failed to print debug info for processor\n") +func (m *Manager) WriteDebugInfo( + ctx context.Context, w io.Writer, done chan<- error, +) { + err := m.sendCommand(ctx, commandTpWriteDebugInfo, w, done) + if err != nil { + log.Warn("send command commandTpWriteDebugInfo failed", zap.Error(err)) } } -func (m *Manager) sendCommand(tp commandTp, payload interface{}) chan struct{} { - timeout := time.Second * 3 - cmd := &command{tp: tp, payload: payload, done: make(chan struct{})} +// sendCommands sends command to manager. +// `done` is closed upon command completion or sendCommand returns error. +func (m *Manager) sendCommand( + ctx context.Context, tp commandTp, payload interface{}, done chan<- error, +) error { + cmd := &command{tp: tp, payload: payload, done: done} select { + case <-ctx.Done(): + close(done) + return errors.Trace(ctx.Err()) case m.commandQueue <- cmd: - case <-time.After(timeout): - close(cmd.done) - log.Warn("the command queue is full, ignore this command", zap.Any("command", cmd)) + // FIXME: signal EtcdWorker to handle commands ASAP. } - return cmd.done + return nil } func (m *Manager) handleCommand() error { @@ -194,6 +203,7 @@ func (m *Manager) handleCommand() error { for changefeedID := range m.processors { m.closeProcessor(changefeedID) } + // FIXME: we should drain command queue and signal callers an error. return cerrors.ErrReactorFinished case commandTpWriteDebugInfo: w := cmd.payload.(io.Writer) diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 8415af305aa..8d0fe700995 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -15,6 +15,7 @@ package processor import ( "bytes" + "context" "fmt" "math" "testing" @@ -170,8 +171,10 @@ func TestDebugInfo(t *testing.T) { s.tester.MustApplyPatches() } }() + doneM := make(chan error, 1) buf := bytes.NewBufferString("") - s.manager.WriteDebugInfo(buf) + s.manager.WriteDebugInfo(ctx, buf, doneM) + <-doneM require.Greater(t, len(buf.String()), 0) s.manager.AsyncClose() <-done @@ -218,3 +221,19 @@ func TestClose(t *testing.T) { s.tester.MustApplyPatches() require.Len(t, s.manager.processors, 0) } + +func TestSendCommandError(t *testing.T) { + m := NewManager() + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + // Use unbuffered channel to stable test. + m.commandQueue = make(chan *command) + done := make(chan error, 1) + err := m.sendCommand(ctx, commandTpClose, nil, done) + require.Error(t, err) + select { + case <-done: + case <-time.After(time.Second): + require.FailNow(t, "done must be closed") + } +} diff --git a/cdc/server_test.go b/cdc/server_test.go index b6eccf3c74b..784b7f15543 100644 --- a/cdc/server_test.go +++ b/cdc/server_test.go @@ -188,7 +188,7 @@ func TestServerTLSWithoutCommonName(t *testing.T) { config.StoreGlobalServerConfig(conf) server, err := NewServer([]string{"https://127.0.0.1:2379"}) - server.capture = capture.NewCapture4Test(false) + server.capture = capture.NewCapture4Test(nil) require.Nil(t, err) err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) require.Nil(t, err) @@ -264,7 +264,7 @@ func TestServerTLSWithCommonName(t *testing.T) { config.StoreGlobalServerConfig(conf) server, err := NewServer([]string{"https://127.0.0.1:2379"}) - server.capture = capture.NewCapture4Test(false) + server.capture = capture.NewCapture4Test(nil) require.Nil(t, err) err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) require.Nil(t, err) diff --git a/errors.toml b/errors.toml index 8726dc63a35..7e918972366 100755 --- a/errors.toml +++ b/errors.toml @@ -103,7 +103,7 @@ campaign owner failed ["CDC:ErrCaptureNotExist"] error = ''' -capture not exists, key: %s +capture not exists, %s ''' ["CDC:ErrCaptureRegister"] @@ -123,12 +123,12 @@ capture suicide ["CDC:ErrChangeFeedAlreadyExists"] error = ''' -changefeed already exists, key: %s +changefeed already exists, %s ''' ["CDC:ErrChangeFeedNotExists"] error = ''' -changefeed not exists, key: %s +changefeed not exists, %s ''' ["CDC:ErrChangefeedAbnormalState"] @@ -978,12 +978,12 @@ fail to create changefeed because target-ts %d is earlier than start-ts %d ["CDC:ErrTaskPositionNotExists"] error = ''' -task position not exists, key: %s +task position not exists, %s ''' ["CDC:ErrTaskStatusNotExists"] error = ''' -task status not exists, key: %s +task status not exists, %s ''' ["CDC:ErrTiKVEventFeed"] diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5acab118024..94c26419d7d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -21,11 +21,11 @@ import ( var ( // kv related errors ErrWriteTsConflict = errors.Normalize("write ts conflict", errors.RFCCodeText("CDC:ErrWriteTsConflict")) - ErrChangeFeedNotExists = errors.Normalize("changefeed not exists, key: %s", errors.RFCCodeText("CDC:ErrChangeFeedNotExists")) - ErrChangeFeedAlreadyExists = errors.Normalize("changefeed already exists, key: %s", errors.RFCCodeText("CDC:ErrChangeFeedAlreadyExists")) - ErrTaskStatusNotExists = errors.Normalize("task status not exists, key: %s", errors.RFCCodeText("CDC:ErrTaskStatusNotExists")) - ErrTaskPositionNotExists = errors.Normalize("task position not exists, key: %s", errors.RFCCodeText("CDC:ErrTaskPositionNotExists")) - ErrCaptureNotExist = errors.Normalize("capture not exists, key: %s", errors.RFCCodeText("CDC:ErrCaptureNotExist")) + ErrChangeFeedNotExists = errors.Normalize("changefeed not exists, %s", errors.RFCCodeText("CDC:ErrChangeFeedNotExists")) + ErrChangeFeedAlreadyExists = errors.Normalize("changefeed already exists, %s", errors.RFCCodeText("CDC:ErrChangeFeedAlreadyExists")) + ErrTaskStatusNotExists = errors.Normalize("task status not exists, %s", errors.RFCCodeText("CDC:ErrTaskStatusNotExists")) + ErrTaskPositionNotExists = errors.Normalize("task position not exists, %s", errors.RFCCodeText("CDC:ErrTaskPositionNotExists")) + ErrCaptureNotExist = errors.Normalize("capture not exists, %s", errors.RFCCodeText("CDC:ErrCaptureNotExist")) ErrGetAllStoresFailed = errors.Normalize("get stores from pd failed", errors.RFCCodeText("CDC:ErrGetAllStoresFailed")) ErrMetaListDatabases = errors.Normalize("meta store list databases", errors.RFCCodeText("CDC:ErrMetaListDatabases")) ErrGRPCDialFailed = errors.Normalize("grpc dial failed", errors.RFCCodeText("CDC:ErrGRPCDialFailed")) diff --git a/scripts/check-copyright.sh b/scripts/check-copyright.sh index 99b8848f15f..bd2d3387a78 100755 --- a/scripts/check-copyright.sh +++ b/scripts/check-copyright.sh @@ -1,4 +1,4 @@ -result=$(find ./ -name "*.go" | grep -vE '\.pb\.go|vendor/|leaktest.go|kv_gen|redo_gen|sink_gen|pbmock|\.pb\.gw\.go|statik.go|openapi/gen\..*\.go|embedded_asserts.go|empty_asserts.go|docs/swagger' | +result=$(find ./ -name "*.go" | grep -vE '\.pb\.go|vendor/|leaktest.go|kv_gen|redo_gen|sink_gen|pbmock|\.pb\.gw\.go|statik.go|openapi/gen\..*\.go|embedded_asserts.go|empty_asserts.go|docs/swagger|owner/mock' | while read -r file_path; do head=$(head -n 1 "$file_path") if [[ ! "$head" =~ Copyright\ 20[0-9][0-9]\ PingCAP,\ Inc\. ]]; then