Skip to content

Commit

Permalink
api,owner(ticdc): return error when api fails (#4494)
Browse files Browse the repository at this point in the history
close #1710, ref #3456
  • Loading branch information
overvenus authored Feb 10, 2022
1 parent 85116e4 commit e2d529c
Show file tree
Hide file tree
Showing 18 changed files with 776 additions and 308 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ kafka_consumer:
install:
go install ./...

unit_test: check_failpoint_ctl
unit_test: check_failpoint_ctl generate_mock
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
@export log_level=error;\
Expand Down Expand Up @@ -167,7 +167,7 @@ integration_test_mysql:
integration_test_kafka: check_third_party_binary
tests/integration_tests/run.sh kafka "$(CASE)" "$(START_AT)"

fmt: tools/bin/gofumports tools/bin/shfmt
fmt: tools/bin/gofumports tools/bin/shfmt generate_mock
@echo "gofmt (simplify)"
tools/bin/gofumports -l -w $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
@echo "run shfmt"
Expand Down Expand Up @@ -234,6 +234,9 @@ data-flow-diagram: docs/data-flow.dot
swagger-spec: tools/bin/swag
tools/bin/swag init --parseVendor -generalInfo cdc/api/open.go --output docs/swagger

generate_mock: tools/bin/mockgen
tools/bin/mockgen -source cdc/owner/owner.go -destination cdc/owner/mock/owner_mock.go

clean:
go clean -i ./...
rm -rf *.out
Expand Down
65 changes: 31 additions & 34 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) {
changefeedGroup.POST("/:changefeed_id/pause", api.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", api.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTable)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTables)
changefeedGroup.POST("/:changefeed_id/tables/move_table", api.MoveTable)

// owner API
Expand Down Expand Up @@ -319,11 +319,10 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {
Type: model.AdminStop,
}

_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.EnqueueJob(job)
return nil
})

if err := handleOwnerJob(ctx, h.capture, job); err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
}

Expand Down Expand Up @@ -361,11 +360,10 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) {
Type: model.AdminResume,
}

_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.EnqueueJob(job)
return nil
})

if err := handleOwnerJob(ctx, h.capture, job); err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
}

Expand Down Expand Up @@ -465,15 +463,14 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
Type: model.AdminRemove,
}

_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.EnqueueJob(job)
return nil
})

if err := handleOwnerJob(ctx, h.capture, job); err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
}

// RebalanceTable rebalances tables
// RebalanceTables rebalances tables
// @Summary rebalance tables
// @Description rebalance all tables of a changefeed
// @Tags changefeed
Expand All @@ -483,7 +480,7 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
// @Success 202
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v1/changefeeds/{changefeed_id}/tables/rebalance_table [post]
func (h *openAPI) RebalanceTable(c *gin.Context) {
func (h *openAPI) RebalanceTables(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
Expand All @@ -503,11 +500,10 @@ func (h *openAPI) RebalanceTable(c *gin.Context) {
return
}

_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.TriggerRebalance(changefeedID)
return nil
})

if err := handleOwnerRebalance(ctx, h.capture, changefeedID); err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
}

Expand Down Expand Up @@ -557,11 +553,12 @@ func (h *openAPI) MoveTable(c *gin.Context) {
return
}

_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.ManualSchedule(changefeedID, data.CaptureID, data.TableID)
return nil
})

err = handleOwnerScheduleTable(
ctx, h.capture, changefeedID, data.CaptureID, data.TableID)
if err != nil {
_ = c.Error(err)
return
}
c.Status(http.StatusAccepted)
}

Expand All @@ -580,10 +577,10 @@ func (h *openAPI) ResignOwner(c *gin.Context) {
return
}

_ = h.capture.OperateOwnerUnderLock(func(owner *owner.Owner) error {
owner.AsyncStop()
return nil
})
o, _ := h.capture.GetOwner()
if o != nil {
o.AsyncStop()
}

c.Status(http.StatusAccepted)
}
Expand Down Expand Up @@ -748,7 +745,7 @@ func (h *openAPI) ServerStatus(c *gin.Context) {
func (h *openAPI) Health(c *gin.Context) {
ctx := c.Request.Context()

if _, err := h.capture.GetOwner(ctx); err != nil {
if _, err := h.capture.GetOwnerCaptureInfo(ctx); err != nil {
c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
return
}
Expand Down Expand Up @@ -798,7 +795,7 @@ func (h *openAPI) forwardToOwner(c *gin.Context) {
var owner *model.CaptureInfo
// get owner
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
o, err := h.capture.GetOwnerCaptureInfo(ctx)
if err != nil {
log.Info("get owner failed, retry later", zap.Error(err))
return err
Expand Down
Loading

0 comments on commit e2d529c

Please sign in to comment.