Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: supports delete store label #5510

Merged
merged 6 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ error = '''
get TSO timeout
'''

["PD:cluster:ErrInvalidStoreID"]
error = '''
invalid store id %d, not found
'''

["PD:cluster:ErrNotBootstrapped"]
error = '''
TiKV cluster not bootstrapped, please start TiKV first
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ var (
var (
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
ErrInvalidStoreID = errors.Normalize("invalid store id %d, not found", errors.RFCCodeText("PD:cluster:ErrInvalidStoreID"))
)

// versioninfo errors
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/store/{id}", storeHandler.DeleteStore, setMethods(http.MethodDelete), setAuditBackend(localLog))
registerFunc(clusterRouter, "/store/{id}/state", storeHandler.SetStoreState, setMethods(http.MethodPost), setAuditBackend(localLog))
registerFunc(clusterRouter, "/store/{id}/label", storeHandler.SetStoreLabel, setMethods(http.MethodPost), setAuditBackend(localLog))
registerFunc(clusterRouter, "/store/{id}/label", storeHandler.DeleteStoreLabel, setMethods(http.MethodDelete), setAuditBackend(localLog))
registerFunc(clusterRouter, "/store/{id}/weight", storeHandler.SetStoreWeight, setMethods(http.MethodPost), setAuditBackend(localLog))
registerFunc(clusterRouter, "/store/{id}/limit", storeHandler.SetStoreLimit, setMethods(http.MethodPost), setAuditBackend(localLog))

Expand Down
34 changes: 34 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,40 @@ func (h *storeHandler) SetStoreLabel(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, "The store's label is updated.")
}

// @Tags store
// @Summary delete the store's label.
// @Param id path integer true "Store Id"
// @Param body body object true "Labels in json format"
// @Produce json
// @Success 200 {string} string "The store's label is updated."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /store/{id}/label [delete]
func (h *storeHandler) DeleteStoreLabel(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
vars := mux.Vars(r)
storeID, errParse := apiutil.ParseUint64VarsField(vars, "id")
if errParse != nil {
apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errParse))
return
}

var labelKey string
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &labelKey); err != nil {
return
}
if err := config.ValidateLabelKey(labelKey); err != nil {
apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(err))
return
}
if err := rc.DeleteStoreLabel(storeID, labelKey); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, fmt.Sprintf("The label %s is deleted for store %d.", labelKey, storeID))
}

// FIXME: details of input json body params
// @Tags store
// @Summary Set the store's leader/region weight.
Expand Down
39 changes: 31 additions & 8 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,17 +1046,44 @@ func (c *RaftCluster) GetRangeHoles() [][]string {
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel, force bool) error {
store := c.GetStore(storeID)
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
return errs.ErrInvalidStoreID.FastGenByArgs(storeID)
}
newStore := proto.Clone(store.GetMeta()).(*metapb.Store)
if force {
newStore.Labels = labels
} else {
// If 'force' isn't set, the given labels will merge into those labels which already existed in the store.
newStore.Labels = core.MergeLabels(newStore.GetLabels(), labels)
}
// PutStore will perform label merge.
return c.putStoreImpl(newStore)
}

// DeleteStoreLabel updates a store's location labels
func (c *RaftCluster) DeleteStoreLabel(storeID uint64, labelKey string) error {
store := c.GetStore(storeID)
if store == nil {
return errs.ErrInvalidStoreID.FastGenByArgs(storeID)
}
newStore := proto.Clone(store.GetMeta()).(*metapb.Store)
labels := make([]*metapb.StoreLabel, 0, len(newStore.GetLabels())-1)
for _, label := range newStore.GetLabels() {
if label.Key == labelKey {
continue
}
labels = append(labels, label)
}
if len(labels) == len(store.GetLabels()) {
return errors.Errorf("the label key %s does not exist", labelKey)
}
newStore.Labels = labels
// PutStore will perform label merge.
return c.putStoreImpl(newStore, force)
return c.putStoreImpl(newStore)
}

// PutStore puts a store.
func (c *RaftCluster) PutStore(store *metapb.Store) error {
if err := c.putStoreImpl(store, false); err != nil {
if err := c.putStoreImpl(store); err != nil {
return err
}
c.OnStoreVersionChange()
Expand All @@ -1066,7 +1093,7 @@ func (c *RaftCluster) PutStore(store *metapb.Store) error {

// putStoreImpl puts a store.
// If 'force' is true, then overwrite the store's labels.
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
func (c *RaftCluster) putStoreImpl(store *metapb.Store) error {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -1096,10 +1123,6 @@ func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
} else {
// Use the given labels to update the store.
labels := store.GetLabels()
if !force {
// If 'force' isn't set, the given labels will merge into those labels which already existed in the store.
labels = s.MergeLabels(labels)
}
// Update an existed store.
s = s.Clone(
core.SetStoreAddress(store.Address, store.StatusAddress, store.PeerAddress),
Expand Down
5 changes: 5 additions & 0 deletions server/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func ValidateLabels(labels []*metapb.StoreLabel) error {
return nil
}

// ValidateLabelKey checks the legality of the label key.
func ValidateLabelKey(key string) error {
return validateFormat(key, keyFormat)
}

// ValidateURLWithScheme checks the format of the URL.
func ValidateURLWithScheme(rawURL string) error {
u, err := url.ParseRequestURI(rawURL)
Expand Down
4 changes: 2 additions & 2 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ func DistinctScore(labels []string, stores []*StoreInfo, other *StoreInfo) float

// MergeLabels merges the passed in labels with origins, overriding duplicated
// ones.
func (s *StoreInfo) MergeLabels(labels []*metapb.StoreLabel) []*metapb.StoreLabel {
storeLabels := s.GetLabels()
func MergeLabels(origin []*metapb.StoreLabel, labels []*metapb.StoreLabel) []*metapb.StoreLabel {
storeLabels := origin
L:
for _, newLabel := range labels {
for _, label := range storeLabels {
Expand Down
113 changes: 67 additions & 46 deletions tests/pdctl/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
cmd "github.com/tikv/pd/tools/pd-ctl/pdctl"
ctl "github.com/tikv/pd/tools/pd-ctl/pdctl"
)

func TestStore(t *testing.T) {
Expand All @@ -41,7 +41,7 @@ func TestStore(t *testing.T) {
re.NoError(err)
cluster.WaitLeader()
pdAddr := cluster.GetConfig().GetClientURL()
cmd := cmd.GetRootCmd()
cmd := ctl.GetRootCmd()

stores := []*api.StoreInfo{
{
Expand Down Expand Up @@ -114,51 +114,69 @@ func TestStore(t *testing.T) {
re.NoError(json.Unmarshal(output, &storeInfo))

pdctl.CheckStoresInfo(re, []*api.StoreInfo{storeInfo}, stores[:1])

// store label <store_id> <key> <value> [<key> <value>]... [flags] command
re.Nil(storeInfo.Store.Labels)

args = []string{"-u", pdAddr, "store", "label", "1", "zone", "cn"}
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
args = []string{"-u", pdAddr, "store", "1"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.NoError(json.Unmarshal(output, &storeInfo))

label := storeInfo.Store.Labels[0]
re.Equal("zone", label.Key)
re.Equal("cn", label.Value)

// store label <store_id> <key> <value> <key> <value>... command
args = []string{"-u", pdAddr, "store", "label", "1", "zone", "us", "language", "English"}
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
args = []string{"-u", pdAddr, "store", "1"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.NoError(json.Unmarshal(output, &storeInfo))

label0 := storeInfo.Store.Labels[0]
re.Equal("zone", label0.Key)
re.Equal("us", label0.Value)
label1 := storeInfo.Store.Labels[1]
re.Equal("language", label1.Key)
re.Equal("English", label1.Value)

// store label <store_id> <key> <value> <key> <value>... -f command
args = []string{"-u", pdAddr, "store", "label", "1", "zone", "uk", "-f"}
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
args = []string{"-u", pdAddr, "store", "1"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.NoError(json.Unmarshal(output, &storeInfo))

label0 = storeInfo.Store.Labels[0]
re.Equal("zone", label0.Key)
re.Equal("uk", label0.Value)
re.Len(storeInfo.Store.Labels, 1)
// store <store_id> label command
labelTestCases := []struct {
args []string
newArgs []string
expectLabelLength int
expectKeys []string
expectValues []string
}{
{ // add label
args: []string{"-u", pdAddr, "store", "label", "1", "zone", "cn"},
newArgs: []string{"-u", pdAddr, "store", "label", "1", "zone=cn"},
expectLabelLength: 1,
expectKeys: []string{"zone"},
expectValues: []string{"cn"},
},
{ // update label
args: []string{"-u", pdAddr, "store", "label", "1", "zone", "us", "language", "English"},
newArgs: []string{"-u", pdAddr, "store", "label", "1", "zone=us", "language=English"},
expectLabelLength: 2,
expectKeys: []string{"zone", "language"},
expectValues: []string{"us", "English"},
},
{ // rewrite label
args: []string{"-u", pdAddr, "store", "label", "1", "zone", "uk", "-f"},
newArgs: []string{"-u", pdAddr, "store", "label", "1", "zone=uk", "--rewrite"},
expectLabelLength: 1,
expectKeys: []string{"zone"},
expectValues: []string{"uk"},
},
{ // delete label
args: []string{"-u", pdAddr, "store", "label", "1", "zone", "--delete"},
newArgs: []string{"-u", pdAddr, "store", "label", "1", "zone", "--delete"},
expectLabelLength: 0,
expectKeys: []string{""},
expectValues: []string{""},
},
}
for i := 0; i <= 1; i++ {
for _, testcase := range labelTestCases {
switch {
case i == 0: // old way
args = testcase.args
case i == 1: // new way
args = testcase.newArgs
}
cmd := ctl.GetRootCmd()
storeInfo := new(api.StoreInfo)
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
args = []string{"-u", pdAddr, "store", "1"}
output, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.NoError(json.Unmarshal(output, &storeInfo))
labels := storeInfo.Store.Labels
re.Len(labels, testcase.expectLabelLength)
for i := 0; i < testcase.expectLabelLength; i++ {
re.Equal(testcase.expectKeys[i], labels[i].Key)
re.Equal(testcase.expectValues[i], labels[i].Value)
}
}
}

// store weight <store_id> <leader_weight> <region_weight> command
re.Equal(float64(1), storeInfo.Status.LeaderWeight)
Expand Down Expand Up @@ -229,6 +247,9 @@ func TestStore(t *testing.T) {
re.Equal(float64(25), limit2)

// store limit all <key> <value> <rate> <type>
args = []string{"-u", pdAddr, "store", "label", "1", "zone=uk"}
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
args = []string{"-u", pdAddr, "store", "limit", "all", "zone", "uk", "20", "remove-peer"}
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
Expand Down Expand Up @@ -447,7 +468,7 @@ func TestTombstoneStore(t *testing.T) {
re.NoError(err)
cluster.WaitLeader()
pdAddr := cluster.GetConfig().GetClientURL()
cmd := cmd.GetRootCmd()
cmd := ctl.GetRootCmd()

stores := []*api.StoreInfo{
{
Expand Down
Loading