Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-min-service-g…
Browse files Browse the repository at this point in the history
…c-safepoint-metrics
  • Loading branch information
sdojjy committed Mar 6, 2024
2 parents aec9e6e + 71f4f7f commit a806281
Show file tree
Hide file tree
Showing 678 changed files with 16,078 additions and 8,252 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/upgrade_dm_via_tiup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ concurrency:
jobs:
from_v1:
name: From V1
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04
# skip it now, the ansible script fails with "Could not find the requested service prometheus-9090.service: host"
# TODO enable it later.
if: ${{ false }}
env:
working-directory: ${{ github.workspace }}/go/src/github.com/pingcap/tiflow

Expand Down Expand Up @@ -111,21 +114,21 @@ jobs:
echo "package dm-master"
mkdir dm-master
cp ${{ env.working-directory }}/bin/dm-master dm-master
cp -r ${{ env.working-directory }}/dm/metrics/grafana/ dm-master/scripts
cp -r ${{ env.working-directory }}/metrics/grafana/ dm-master/scripts
cp -r ${{ env.working-directory }}/dm/metrics/alertmanager/ dm-master/conf
tar -czvf dm-master-nightly-linux-amd64.tar.gz dm-master
echo "package dm-worker"
mkdir dm-worker
cp ${{ env.working-directory }}/bin/dm-worker dm-worker
cp -r ${{ env.working-directory }}/dm/metrics/grafana/ dm-worker/scripts
cp -r ${{ env.working-directory }}/metrics/grafana/ dm-worker/scripts
cp -r ${{ env.working-directory }}/dm/metrics/alertmanager/ dm-worker/conf
tar -czvf dm-worker-nightly-linux-amd64.tar.gz dm-worker
echo "package dmctl"
mkdir dmctl
cp ${{ env.working-directory }}/bin/dmctl dmctl
cp -r ${{ env.working-directory }}/dm/metrics/grafana/ dmctl/scripts
cp -r ${{ env.working-directory }}/metrics/grafana/ dmctl/scripts
cp -r ${{ env.working-directory }}/dm/metrics/alertmanager/ dmctl/conf
tar -czvf dmctl-nightly-linux-amd64.tar.gz dmctl
Expand Down Expand Up @@ -158,6 +161,7 @@ jobs:
docker cp -L master1:/home/tidb/dm/deploy/dm-master-8261/log ./logs/master
docker cp -L worker1:/home/tidb/dm/deploy/dm-worker-8262/log ./logs/worker1
docker cp -L worker2:/home/tidb/dm/deploy/dm-worker-8262/log ./logs/worker2
docker cp -L control:/tmp/tiup-dm-operation.log ./logs
sudo chown -R runner ./logs
# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
Expand Down
2 changes: 2 additions & 0 deletions OWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ approvers:
- 5kbpers
- amyangfei
- asddongmen
- Benjamin2037
- buchuitoudegou
- CharlesCheung96
- csuzhangxc
Expand All @@ -17,6 +18,7 @@ approvers:
- hi-rustin
- hicqu
- holys
- hongyunyan
- IANTHEREAL
- july2993
- kennytm
Expand Down
60 changes: 60 additions & 0 deletions cdc/api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/upstream"
"go.uber.org/zap"
)

Expand All @@ -36,6 +38,7 @@ func LogMiddleware() gin.HandlerFunc {
start := time.Now()
path := c.Request.URL.Path
query := c.Request.URL.RawQuery
user, _, _ := c.Request.BasicAuth()
c.Next()

cost := time.Since(start)
Expand All @@ -53,6 +56,7 @@ func LogMiddleware() gin.HandlerFunc {
zap.String("query", query),
zap.String("ip", c.ClientIP()),
zap.String("user-agent", c.Request.UserAgent()), zap.String("client-version", version),
zap.String("username", user),
zap.Error(stdErr),
zap.Duration("duration", cost),
)
Expand Down Expand Up @@ -185,3 +189,59 @@ func CheckServerReadyMiddleware(capture capture.Capture) gin.HandlerFunc {
}
}
}

// AuthenticateMiddleware authenticates the request by query upstream TiDB.
func AuthenticateMiddleware(capture capture.Capture) gin.HandlerFunc {
return func(ctx *gin.Context) {
serverCfg := config.GetGlobalServerConfig()
if serverCfg.Security.ClientUserRequired {
up, err := getUpstream(capture)
if err != nil {
_ = ctx.Error(err)
ctx.Abort()
return
}

if err := verify(ctx, up); err != nil {
ctx.IndentedJSON(http.StatusUnauthorized, model.NewHTTPError(err))
ctx.Abort()
return
}
}
ctx.Next()
}
}

func getUpstream(capture capture.Capture) (*upstream.Upstream, error) {
m, err := capture.GetUpstreamManager()
if err != nil {
return nil, errors.Trace(err)
}
return m.GetDefaultUpstream()
}

func verify(ctx *gin.Context, up *upstream.Upstream) error {
// get the username and password from the authorization header
username, password, ok := ctx.Request.BasicAuth()
if !ok {
errMsg := "please specify the user and password via authorization header"
return errors.ErrCredentialNotFound.GenWithStackByArgs(errMsg)
}

allowed := false
serverCfg := config.GetGlobalServerConfig()
for _, user := range serverCfg.Security.ClientAllowedUser {
if user == username {
allowed = true
break
}
}
if !allowed {
errMsg := "The user is not allowed."
return errors.ErrUnauthorized.GenWithStackByArgs(username, errMsg)
}
if err := up.VerifyTiDBUser(ctx, username, password); err != nil {
return errors.ErrUnauthorized.GenWithStackByArgs(username, err.Error())
}
return nil
}
25 changes: 7 additions & 18 deletions cdc/api/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ import (
"go.uber.org/zap"
)

const (
// OpVarAdminJob is the key of admin job in HTTP API
OpVarAdminJob = "admin-job"
// OpVarChangefeedID is the key of changefeed ID in HTTP API
OpVarChangefeedID = "cf-id"
// OpVarTargetCaptureID is the key of to-capture ID in HTTP API
OpVarTargetCaptureID = "target-cp-id"
// OpVarTableID is the key of table ID in HTTP API
OpVarTableID = "table-id"
)

type commonResp struct {
Status bool `json:"status"`
Message string `json:"message"`
Expand Down Expand Up @@ -130,15 +119,15 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques
api.WriteError(w, http.StatusInternalServerError, err)
return
}
typeStr := req.Form.Get(OpVarAdminJob)
typeStr := req.Form.Get(api.OpVarAdminJob)
typ, err := strconv.ParseInt(typeStr, 10, 64)
if err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid admin job type: %s", typeStr))
return
}
job := model.AdminJob{
CfID: model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID)),
CfID: model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID)),
Type: model.AdminJobType(typ),
}

Expand All @@ -158,7 +147,7 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque
api.WriteError(w, http.StatusInternalServerError, err)
return
}
changefeedID := model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID.ID))
Expand All @@ -182,19 +171,19 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) {
cerror.WrapError(cerror.ErrInternalServerError, err))
return
}
changefeedID := model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID.ID))
return
}
to := req.Form.Get(OpVarTargetCaptureID)
to := req.Form.Get(api.OpVarTargetCaptureID)
if err := model.ValidateChangefeedID(to); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid target capture id: %s", to))
return
}
tableIDStr := req.Form.Get(OpVarTableID)
tableIDStr := req.Form.Get(api.OpVarTableID)
tableID, err := strconv.ParseInt(tableIDStr, 10, 64)
if err != nil {
api.WriteError(w, http.StatusBadRequest,
Expand All @@ -219,7 +208,7 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques
api.WriteError(w, http.StatusInternalServerError, err)
return
}
changefeedID := model.DefaultChangeFeedID(req.Form.Get(OpVarChangefeedID))
changefeedID := model.DefaultChangeFeedID(req.Form.Get(api.OpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID.ID))
Expand Down
3 changes: 2 additions & 1 deletion cdc/api/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/api/middleware"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/version"
Expand All @@ -44,7 +45,7 @@ type statusAPI struct {
func RegisterStatusAPIRoutes(router *gin.Engine, capture capture.Capture) {
statusAPI := statusAPI{capture: capture}
router.GET("/status", gin.WrapF(statusAPI.handleStatus))
router.GET("/debug/info", gin.WrapF(statusAPI.handleDebugInfo))
router.GET("/debug/info", middleware.AuthenticateMiddleware(capture), gin.WrapF(statusAPI.handleDebugInfo))
}

func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli etcd.CDCEtcdClient, w io.Writer) {
Expand Down
22 changes: 22 additions & 0 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ var httpBadRequestError = []*errors.Error{
}

const (
// OpVarAdminJob is the key of admin job in HTTP API
OpVarAdminJob = "admin-job"
// OpVarChangefeedID is the key of changefeed ID in HTTP API
OpVarChangefeedID = "cf-id"
// OpVarTargetCaptureID is the key of to-capture ID in HTTP API
OpVarTargetCaptureID = "target-cp-id"
// OpVarTableID is the key of table ID in HTTP API
OpVarTableID = "table-id"

// APIOpVarChangefeedState is the key of changefeed state in HTTP API.
APIOpVarChangefeedState = "state"
// APIOpVarChangefeedID is the key of changefeed ID in HTTP API.
APIOpVarChangefeedID = "changefeed_id"
// APIOpVarCaptureID is the key of capture ID in HTTP API.
APIOpVarCaptureID = "capture_id"
// APIOpVarNamespace is the key of changefeed namespace in HTTP API.
APIOpVarNamespace = "namespace"
// APIOpVarTiCDCUser is the key of ticdc user in HTTP API.
APIOpVarTiCDCUser = "user"
// APIOpVarTiCDCPassword is the key of ticdc password in HTTP API.
APIOpVarTiCDCPassword = "password"

// forwardFromCapture is a header to be set when forwarding requests to owner
forwardFromCapture = "TiCDC-ForwardFromCapture"
// forwardTimes is a header to identify how many times the request has been forwarded
Expand Down
Loading

0 comments on commit a806281

Please sign in to comment.