From 0730c2ee61e84eb25ee40b27b38068fdf1bd616a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 20 Dec 2021 14:47:46 +0800 Subject: [PATCH] http_*: add log for http api and refine the err handle logic (#2997) (#3307) --- cdc/capture/http_errors.go | 51 +++++++++++ cdc/capture/http_errors_test.go | 33 +++++++ cdc/capture/http_handler.go | 150 ++++++++++---------------------- cdc/capture/http_validator.go | 2 +- cdc/http_router.go | 56 +++++++++++- cdc/http_router_test.go | 3 +- cdc/http_status.go | 2 +- cdc/sink/mysql.go | 24 +++-- cdc/sink/simple_mysql_tester.go | 2 +- cdc/sink/sink_test.go | 2 +- pkg/config/config_test.go | 1 + 11 files changed, 203 insertions(+), 123 deletions(-) create mode 100644 cdc/capture/http_errors.go create mode 100644 cdc/capture/http_errors_test.go diff --git a/cdc/capture/http_errors.go b/cdc/capture/http_errors.go new file mode 100644 index 00000000000..ed58b71e108 --- /dev/null +++ b/cdc/capture/http_errors.go @@ -0,0 +1,51 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "strings" + + "github.com/pingcap/errors" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +// httpBadRequestError is some errors that will cause a BadRequestError in http handler +var httpBadRequestError = []*errors.Error{ + cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC, + cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible, + cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError, + cerror.ErrMySQLInvalidConfig, +} + +// IsHTTPBadRequestError check if a error is a http bad request error +func IsHTTPBadRequestError(err error) bool { + if err == nil { + return false + } + for _, e := range httpBadRequestError { + if e.Equal(err) { + return true + } + + rfcCode, ok := cerror.RFCCode(err) + if ok && e.RFCCode() == rfcCode { + return true + } + + if strings.Contains(err.Error(), string(e.RFCCode())) { + return true + } + } + return false +} diff --git a/cdc/capture/http_errors_test.go b/cdc/capture/http_errors_test.go new file mode 100644 index 00000000000..8437577a20d --- /dev/null +++ b/cdc/capture/http_errors_test.go @@ -0,0 +1,33 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "testing" + + "github.com/pingcap/errors" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestIsHTTPBadRequestError(t *testing.T) { + err := cerror.ErrAPIInvalidParam.GenWithStack("aa") + require.Equal(t, true, IsHTTPBadRequestError(err)) + err = cerror.ErrAPIInvalidParam.Wrap(errors.New("aa")) + require.Equal(t, true, IsHTTPBadRequestError(err)) + err = cerror.ErrPDEtcdAPIError.GenWithStack("aa") + require.Equal(t, false, IsHTTPBadRequestError(err)) + err = nil + require.Equal(t, false, IsHTTPBadRequestError(err)) +} diff --git a/cdc/capture/http_handler.go b/cdc/capture/http_handler.go index 1b340c7477e..133e7cc9e92 100644 --- a/cdc/capture/http_handler.go +++ b/cdc/capture/http_handler.go @@ -78,13 +78,15 @@ func (h *HTTPHandler) ListChangefeed(c *gin.Context) { // get all changefeed status statuses, err := statusProvider.GetAllChangeFeedStatuses(ctx) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } // get all changefeed infos infos, err := statusProvider.GetAllChangeFeedInfo(ctx) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + // this call will return a parsedError generated by the error we passed in + // so it is no need to check the parsedError + _ = c.Error(err) return } @@ -140,38 +142,25 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } status, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } processorInfos, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -197,7 +186,7 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) { TaskStatus: taskStatus, } - c.JSON(http.StatusOK, changefeedDetail) + c.IndentedJSON(http.StatusOK, changefeedDetail) } // CreateChangefeed creates a changefeed @@ -219,29 +208,25 @@ func (h *HTTPHandler) CreateChangefeed(c *gin.Context) { ctx := c.Request.Context() var changefeedConfig model.ChangefeedConfig if err := c.BindJSON(&changefeedConfig); err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err)) return } info, err := verifyCreateChangefeedConfig(c, changefeedConfig, h.capture) if err != nil { - if cerror.ErrPDEtcdAPIError.Equal(err) { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(err) return } infoStr, err := info.Marshal() if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } err = h.capture.etcdClient.CreateChangefeedInfo(ctx, info, changefeedConfig.ID) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -269,18 +254,13 @@ func (h *HTTPHandler) PauseChangefeed(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -316,18 +296,13 @@ func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -370,21 +345,16 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } if info.State != model.StateStopped { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("can only update changefeed config when it is stopped"))) + _ = c.Error(cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("can only update changefeed config when it is stopped")) return } @@ -392,19 +362,19 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) { // filter_rules, ignore_txn_start_ts, mounter_worker_num, sink_config var changefeedConfig model.ChangefeedConfig if err = c.BindJSON(&changefeedConfig); err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } newInfo, err := verifyUpdateChangefeedConfig(ctx, changefeedConfig, info) if err != nil { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(err) return } err = h.capture.etcdClient.SaveChangeFeedInfo(ctx, newInfo, changefeedID) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -430,18 +400,13 @@ func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -478,18 +443,13 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -522,18 +482,13 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists _, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -543,12 +498,12 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) { }{} err = c.BindJSON(&data) if err != nil { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err)) + return } if err := model.ValidateChangefeedID(data.CaptureID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", data.CaptureID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", data.CaptureID)) return } @@ -603,42 +558,34 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } captureID := c.Param(apiOpVarCaptureID) if err := model.ValidateChangefeedID(captureID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", captureID)) return } statuses, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } status, exist := statuses[captureID] if !exist { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))) + _ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID)) } positions, err := statusProvider.GetTaskPositions(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } position, exist := positions[captureID] if !exist { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))) + _ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID)) } processorDetail := &model.ProcessorDetail{CheckPointTs: position.CheckPointTs, ResolvedTs: position.ResolvedTs, Error: position.Error} @@ -647,7 +594,7 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) { tables = append(tables, tableID) } processorDetail.Tables = tables - c.JSON(http.StatusOK, processorDetail) + c.IndentedJSON(http.StatusOK, processorDetail) } // ListProcessor lists all processors in the TiCDC cluster @@ -669,7 +616,7 @@ func (h *HTTPHandler) ListProcessor(c *gin.Context) { ctx := c.Request.Context() infos, err := statusProvider.GetProcessors(ctx) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } resps := make([]*model.ProcessorCommonInfo, len(infos)) @@ -699,7 +646,7 @@ func (h *HTTPHandler) ListCapture(c *gin.Context) { ctx := c.Request.Context() captureInfos, err := statusProvider.GetCaptures(ctx) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -781,14 +728,13 @@ func SetLogLevel(c *gin.Context) { }{} err := c.BindJSON(&data) if err != nil { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid log level: %s", err.Error())) return } err = logutil.SetLogLevel(data.Level) if err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", err))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", data.Level)) return } log.Warn("log level changed", zap.String("level", data.Level)) @@ -800,7 +746,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { ctx := c.Request.Context() // every request can only forward to owner one time if len(c.GetHeader(forWardFromCapture)) != 0 { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(cerror.ErrRequestForwardErr.FastGenByArgs())) + _ = c.Error(cerror.ErrRequestForwardErr.FastGenByArgs()) return } c.Header(forWardFromCapture, h.capture.Info().ID) @@ -817,13 +763,13 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { return nil }, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime)) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } tslConfig, err := config.GetGlobalServerConfig().Security.ToTLSConfigWithVerify() if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -845,7 +791,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { cli := httputil.NewClient(tslConfig) resp, err := cli.Do(req) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -863,7 +809,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { defer resp.Body.Close() _, err = bufio.NewReader(resp.Body).WriteTo(c.Writer) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } } diff --git a/cdc/capture/http_validator.go b/cdc/capture/http_validator.go index 0281a23d63f..029cfd68368 100644 --- a/cdc/capture/http_validator.go +++ b/cdc/capture/http_validator.go @@ -139,7 +139,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch tz, err := util.GetTimezone(changefeedConfig.TimeZone) if err != nil { - return nil, errors.Annotate(err, "invalid timezone:"+changefeedConfig.TimeZone) + return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone)) } ctx = util.PutTimezoneInCtx(ctx, tz) if err := sink.Validate(ctx, info.SinkURI, info.Config, info.Opts); err != nil { diff --git a/cdc/http_router.go b/cdc/http_router.go index 51019609629..dd75deef268 100644 --- a/cdc/http_router.go +++ b/cdc/http_router.go @@ -21,23 +21,29 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/capture" + "github.com/pingcap/tiflow/cdc/model" swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" + "go.uber.org/zap" // use for OpenAPI online docs _ "github.com/pingcap/tiflow/docs/api" ) // newRouter create a router for OpenAPI + func newRouter(captureHandler capture.HTTPHandler) *gin.Engine { // discard gin log output gin.DefaultWriter = io.Discard router := gin.New() + router.Use(logMiddleware()) // request will timeout after 10 second router.Use(timeoutMiddleware(time.Second * 10)) + router.Use(errorHandleMiddleware()) // OpenAPI online docs router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) @@ -96,7 +102,7 @@ func newRouter(captureHandler capture.HTTPHandler) *gin.Engine { } // timeoutMiddleware wraps the request context with a timeout -func timeoutMiddleware(timeout time.Duration) func(c *gin.Context) { +func timeoutMiddleware(timeout time.Duration) gin.HandlerFunc { return func(c *gin.Context) { // wrap the request context with a timeout ctx, cancel := context.WithTimeout(c.Request.Context(), timeout) @@ -119,3 +125,51 @@ func timeoutMiddleware(timeout time.Duration) func(c *gin.Context) { c.Next() } } + +func logMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + path := c.Request.URL.Path + query := c.Request.URL.RawQuery + c.Next() + + cost := time.Since(start) + + err := c.Errors.Last() + var stdErr error + if err != nil { + stdErr = err.Err + } + + log.Info(path, + zap.Int("status", c.Writer.Status()), + zap.String("method", c.Request.Method), + zap.String("path", path), + zap.String("query", query), + zap.String("ip", c.ClientIP()), + zap.String("user-agent", c.Request.UserAgent()), + zap.Error(stdErr), + zap.Duration("cost", cost), + ) + } +} + +func errorHandleMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + c.Next() + // because we will return immediately after an error occurs in http_handler + // there wil be only one error in c.Errors + lastError := c.Errors.Last() + if lastError != nil { + err := lastError.Err + // put the error into response + if capture.IsHTTPBadRequestError(err) { + c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + } else { + c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + } + c.Abort() + return + } + } +} diff --git a/cdc/http_router_test.go b/cdc/http_router_test.go index 47ebb4245b9..4f779c4adec 100644 --- a/cdc/http_router_test.go +++ b/cdc/http_router_test.go @@ -23,9 +23,8 @@ import ( "github.com/stretchr/testify/require" ) -func TestPProfRouter(t *testing.T) { +func TestPProfPath(t *testing.T) { t.Parallel() - router := newRouter(capture.NewHTTPHandler(nil)) apis := []*openAPI{ diff --git a/cdc/http_status.go b/cdc/http_status.go index 7b534b502ec..4b3890b3238 100644 --- a/cdc/http_status.go +++ b/cdc/http_status.go @@ -39,6 +39,7 @@ import ( ) func (s *Server) startStatusHTTP() error { + conf := config.GetGlobalServerConfig() router := newRouter(capture.NewHTTPHandler(s.capture)) router.GET("/status", gin.WrapF(s.handleStatus)) @@ -58,7 +59,6 @@ func (s *Server) startStatusHTTP() error { prometheus.DefaultGatherer = registry router.Any("/metrics", gin.WrapH(promhttp.Handler())) - conf := config.GetGlobalServerConfig() err := conf.Security.AddSelfCommonName() if err != nil { log.Error("status server set tls config failed", zap.Error(err)) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index d0704ba8a92..aeab222697d 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -334,7 +334,7 @@ func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultVal err := db.QueryRowContext(ctx, querySQL).Scan(&name, &value) if err != nil && err != sql.ErrNoRows { errMsg := "fail to query session variable " + variableName - return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) + return "", cerror.ErrMySQLQueryError.Wrap(err).GenWithStack(errMsg) } // session variable works, use given default value if err == nil { @@ -438,13 +438,12 @@ func parseSinkURI(ctx context.Context, sinkURI *url.URL, opts map[string]string) } tlsCfg, err := credential.ToTLSConfig() if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, errors.Trace(err) } name := "cdc_mysql_tls" + params.changefeedID err = dmysql.RegisterTLSConfig(name, tlsCfg) if err != nil { - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } params.tls = "?tls=" + name } @@ -513,8 +512,7 @@ var GetDBConnImpl = getDBConn func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) { db, err := sql.Open("mysql", dsnStr) if err != nil { - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "Open database connection failed") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } err = db.PingContext(ctx) if err != nil { @@ -522,8 +520,7 @@ func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) { if closeErr := db.Close(); closeErr != nil { log.Warn("close db failed", zap.Error(err)) } - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } return db, nil } @@ -1361,12 +1358,12 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S } tlsCfg, err := credential.ToTLSConfig() if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } name := "cdc_mysql_tls" + "syncpoint" + id err = dmysql.RegisterTLSConfig(name, tlsCfg) if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } tlsParam = "?tls=" + name } @@ -1406,8 +1403,7 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S } testDB, err := sql.Open("mysql", dsn.FormatDSN()) if err != nil { - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection when configuring sink") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection when configuring sink") } defer testDB.Close() dsnStr, err = configureSinkURI(ctx, dsn, params, testDB) @@ -1416,11 +1412,11 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S } syncDB, err = sql.Open("mysql", dsnStr) if err != nil { - return nil, errors.Annotate(err, "Open database connection failed") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } err = syncDB.PingContext(ctx) if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } log.Info("Start mysql syncpoint sink") diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index 2bb46c14bb9..4bfd5abf7ae 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -85,7 +85,7 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re db, err = sql.Open("mysql", dsnStr) if err != nil { return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "Open database connection failed") + cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") } err = db.PingContext(ctx) if err != nil { diff --git a/cdc/sink/sink_test.go b/cdc/sink/sink_test.go index ae68c9f3984..63acc486138 100644 --- a/cdc/sink/sink_test.go +++ b/cdc/sink/sink_test.go @@ -35,7 +35,7 @@ func TestValidateSink(t *testing.T) { sinkURI := "mysql://root:111@127.0.0.1:3306/" err := Validate(ctx, sinkURI, replicateConfig, opts) require.NotNil(t, err) - require.Regexp(t, "fail to open MySQL connection.*ErrMySQLConnectionError.*", err) + require.Contains(t, err.Error(), "fail to open MySQL connection") // test sink uri right sinkURI = "blackhole://" diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 05f523b33b7..09430135d3a 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -69,6 +69,7 @@ func TestReplicaConfigOutDated(t *testing.T) { func TestServerConfigMarshal(t *testing.T) { t.Parallel() + rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","log":{"file":{"max-size":300,"max-days":0,"max-backups":0}},"data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":10485760,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}` conf := GetDefaultServerConfig()