Skip to content

Commit

Permalink
placement: add API support (#1894)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored and sre-bot committed Nov 6, 2019
1 parent 05dbc14 commit b7d614e
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/api.html

Large diffs are not rendered by default.

134 changes: 133 additions & 1 deletion server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,32 @@ types:
type: string
enum: [ leader, region ]
count: integer

Rule:
type: object
properties:
group_id: string
id: string
index?: integer
override?: boolean
start_key: string
end_key: string
role:
type: string
enum: [voter, leader, follower, learner]
count:
type: integer
minimum: 1
label_constraints: LabelConstraint[]
location_labels: string[]
LabelConstraint:
type: object
properties:
key: string
op:
type: string
enum: [ in, notIn, exists, notExists ]
values?: string[]

/cluster/status:
description: Cluster status.
Expand Down Expand Up @@ -608,7 +634,113 @@ types:
description: The config is updated.
500:
description: PD server failed to proceed the request.

/rules:
description: Placement rules.
get:
description: Get all placement rules.
responses:
200:
body:
application/json:
type: Rule[]
412:
description: Placement rules feature is not enabled.
500:
description: PD server failed to proceed the request.
/rules/group/{group}:
description: Placement rules of a group.
uriParameters:
group: string
get:
description: Get placement rules of a group.
responses:
200:
body:
application/json:
type: Rule[]
412:
description: Placement rules feature is not enabled.
500:
description: PD server failed to proceed the request.
/rules/region/{region}:
description: Placement rules matched by a region.
uriParameters:
region: integer
get:
description: Get placement rules matched by a region.
responses:
200:
body:
application/json:
type: Rule[]
400:
description: The region ID is invalid.
404:
description: The region is not found.
500:
description: PD server failed to proceed the request.
/rules/key/{key}:
description: Placement rules matched by a key.
uriParameters:
key: string
get:
description: Get placement rules matched by a key.
responses:
200:
body:
application/json:
type: Rule[]
400:
description: The key is not in hex format.
412:
description: Placement rules feature is not enabled.
500:
description: PD server failed to proceed the request.
/rule/{group}/{id}:
description: A Placement Rule.
uriParameters:
group: string
id: string
get:
description: Get a single Placement Rule.
responses:
200:
body:
application/json:
type: Rule
404:
description: The Rule is not found.
412:
description: Placement rules feature is not enabled.
500:
description: PD server failed to proceed the request.
delete:
description: Delete a Placement Rule.
responses:
200:
description: The Rule is delete.
412:
description: Placement rules feature is not enabled.
500:
description: PD server failed to proceed the request.
/rule:
description: A Placement Rule.
post:
description: Add or update a Placement rule.
body:
application/json:
description: Placement Rule.
type: Rule
responses:
200:
description: The rule is created or updated.
400:
description: The input is invalid.
412:
description: Placement rules feature is not enabled.
500:
description: PD server failed to proceed the request.

/stores:
description: The stores in the cluster.
get:
Expand Down
9 changes: 9 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/config/cluster-version", confHandler.GetClusterVersion).Methods("GET")
router.HandleFunc("/api/v1/config/cluster-version", confHandler.SetClusterVersion).Methods("POST")

rulesHandler := newRulesHandler(svr, rd)
router.HandleFunc("/api/v1/config/rules", rulesHandler.GetAll).Methods("GET")
router.HandleFunc("/api/v1/config/rules/group/{group}", rulesHandler.GetAllByGroup).Methods("GET")
router.HandleFunc("/api/v1/config/rules/region/{region}", rulesHandler.GetAllByRegion).Methods("GET")
router.HandleFunc("/api/v1/config/rules/key/{key}", rulesHandler.GetAllByKey).Methods("GET")
router.HandleFunc("/api/v1/config/rule/{group}/{id}", rulesHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/config/rule", rulesHandler.Set).Methods("POST")
router.HandleFunc("/api/v1/config/rule/{group}/{id}", rulesHandler.Delete).Methods("DELETE")

storeHandler := newStoreHandler(handler, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
Expand Down
210 changes: 210 additions & 0 deletions server/api/rule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"bytes"
"encoding/hex"
"net/http"
"strconv"

"github.com/gorilla/mux"
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/pkg/codec"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule/placement"
"github.com/pkg/errors"
"github.com/unrolled/render"
)

var errPlacementDisabled = errors.New("placement rules feature is disabled")

type ruleHandler struct {
svr *server.Server
rd *render.Render
}

func newRulesHandler(svr *server.Server, rd *render.Render) *ruleHandler {
return &ruleHandler{
svr: svr,
rd: rd,
}
}

func (h *ruleHandler) GetAll(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
rules := cluster.GetRuleManager().GetAllRules()
h.rd.JSON(w, http.StatusOK, rules)
}

func (h *ruleHandler) GetAllByGroup(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
group := mux.Vars(r)["group"]
rules := cluster.GetRuleManager().GetRulesByGroup(group)
h.rd.JSON(w, http.StatusOK, rules)
}

func (h *ruleHandler) GetAllByRegion(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
regionStr := mux.Vars(r)["region"]
regionID, err := strconv.ParseUint(regionStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, "invalid region id")
return
}
region := cluster.GetRegion(regionID)
if region == nil {
h.rd.JSON(w, http.StatusNotFound, server.ErrRegionNotFound(regionID).Error())
return
}
rules := cluster.GetRuleManager().GetRulesForApplyRegion(region)
h.rd.JSON(w, http.StatusOK, rules)
}

func (h *ruleHandler) GetAllByKey(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
keyHex := mux.Vars(r)["key"]
key, err := hex.DecodeString(keyHex)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, "key should be in hex format")
return
}
rules := cluster.GetRuleManager().GetRulesByKey(key)
h.rd.JSON(w, http.StatusOK, rules)
}

func (h *ruleHandler) Get(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
group, id := mux.Vars(r)["group"], mux.Vars(r)["id"]
rule := cluster.GetRuleManager().GetRule(group, id)
if rule == nil {
h.rd.JSON(w, http.StatusNotFound, nil)
return
}
h.rd.JSON(w, http.StatusOK, rule)
}

func (h *ruleHandler) Set(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
var rule placement.Rule
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &rule); err != nil {
return
}
if err := h.checkRule(&rule); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
if err := cluster.GetRuleManager().SetRule(&rule); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *ruleHandler) checkRule(r *placement.Rule) error {
start, err := hex.DecodeString(r.StartKeyHex)
if err != nil {
return errors.Wrap(err, "start key is not in hex format")
}
end, err := hex.DecodeString(r.EndKeyHex)
if err != nil {
return errors.Wrap(err, "end key is not hex format")
}
if len(start) > 0 && bytes.Compare(end, start) <= 0 {
return errors.New("endKey should be greater than startKey")
}

keyType := h.svr.GetConfig().PDServerCfg.KeyType
if keyType == core.Table.String() || keyType == core.Txn.String() {
if len(start) > 0 {
if _, _, err = codec.DecodeBytes(start); err != nil {
return errors.Wrapf(err, "start key should be encoded in %s mode", keyType)
}
}
if len(end) > 0 {
if _, _, err = codec.DecodeBytes(end); err != nil {
return errors.Wrapf(err, "end key should be encoded in %s mode", keyType)
}
}
}

return nil
}

func (h *ruleHandler) Delete(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}
if !cluster.IsPlacementRulesEnabled() {
h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error())
return
}
group, id := mux.Vars(r)["group"], mux.Vars(r)["id"]
if err := cluster.GetRuleManager().DeleteRule(group, id); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
}

0 comments on commit b7d614e

Please sign in to comment.