Skip to content

Commit

Permalink
keyspace: add priority of tso node for the keyspace group (tikv#6602)
Browse files Browse the repository at this point in the history
ref tikv#6599

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent d5093eb commit 29263bc
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 39 deletions.
60 changes: 54 additions & 6 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount {
if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount {
continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
Expand All @@ -182,8 +182,8 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount)
if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
Expand Down Expand Up @@ -525,7 +525,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
return ErrKeyspaceGroupInMerging
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount {
if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
}
// Check if the new keyspace group already exists.
Expand Down Expand Up @@ -696,7 +696,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
continue
}
exists[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{Address: addr})
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = nodes
return m.store.SaveKeyspaceGroup(txn, kg)
Expand Down Expand Up @@ -731,7 +734,52 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
members = append(members, endpoint.KeyspaceGroupMember{
Address: node,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
if err != nil {
return err
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
return nil
}

// SetPriorityForKeyspaceGroup sets the priority of node for the keyspace group.
func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, priority int) error {
m.Lock()
defer m.Unlock()
var kg *endpoint.KeyspaceGroup
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
inKeyspaceGroup := false
members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members))
for _, member := range kg.Members {
if member.Address == node {
inKeyspaceGroup = true
member.Priority = priority
}
members = append(members, member)
}
if !inKeyspaceGroup {
return ErrNodeNotInKeyspaceGroup
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
Expand Down Expand Up @@ -330,7 +330,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
},
{
ID: uint32(3),
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
// ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group.
ErrNodeNotInKeyspaceGroup = errors.New("the tso node is not in this keyspace group")
// ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group.
ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group")
// ErrModifyDefaultKeyspaceGroup is used to indicate that default keyspace group cannot be modified.
Expand Down
7 changes: 5 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const (
// foreseen future, and the former is just for extensibility in theory.
MaxKeyspaceGroupCountInUse = uint32(4096)

// KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group.
KeyspaceGroupDefaultReplicaCount = 2
// DefaultKeyspaceGroupReplicaCount is the default replica count of keyspace group.
DefaultKeyspaceGroupReplicaCount = 2

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
DefaultKeyspaceGroupReplicaPriority = 0
)
6 changes: 5 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ func IsUserKindValid(kind string) bool {
}

// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group.
// Its `Priority` is used in keyspace group primary weighted-election to balance primaries' distribution.
// Among multiple replicas of a keyspace group, the higher the priority, the more likely
// the replica is to be elected as primary.
type KeyspaceGroupMember struct {
Address string `json:"address"`
Address string `json:"address"`
Priority int `json:"priority"`
}

// SplitState defines the split state of a keyspace group.
Expand Down
23 changes: 16 additions & 7 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,11 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
if !defaultKGConfigured {
log.Info("initializing default keyspace group")
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
Expand Down Expand Up @@ -400,7 +403,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// If the default keyspace group isn't assigned to any tso node/pod, assign it to everyone.
if group.ID == mcsutils.DefaultKeyspaceGroupID && len(group.Members) == 0 {
// TODO: fill members with all tso nodes/pods.
group.Members = []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}
group.Members = []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}}
}

if !kgm.isAssignedToMe(group) {
Expand Down Expand Up @@ -493,12 +499,12 @@ func validateSplit(
// could not be modified during the split process, so we can only check the
// member count of the source group here.
memberCount := len(sourceGroup.Members)
if memberCount < mcsutils.KeyspaceGroupDefaultReplicaCount {
if memberCount < mcsutils.DefaultKeyspaceGroupReplicaCount {
log.Error("the split source keyspace group does not have enough members",
zap.Uint32("target", targetGroup.ID),
zap.Uint32("source", splitSourceID),
zap.Int("member-count", memberCount),
zap.Int("replica-count", mcsutils.KeyspaceGroupDefaultReplicaCount))
zap.Int("replica-count", mcsutils.DefaultKeyspaceGroupReplicaCount))
return false
}
return true
Expand Down Expand Up @@ -611,8 +617,11 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
log.Info("removed default keyspace group meta config from the storage. " +
"now every tso node/pod will initialize it")
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
Expand Down
55 changes: 52 additions & 3 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/apiv2/middlewares"
Expand All @@ -40,6 +41,7 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/priority", SetPriorityForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
router.POST("/:id/merge", MergeKeyspaceGroups)
Expand Down Expand Up @@ -325,7 +327,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount {
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.DefaultKeyspaceGroupReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]")
return
}
Expand All @@ -347,7 +349,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace group.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
Expand Down Expand Up @@ -379,7 +381,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) {
return
}
// check if nodes is less than default replica count
if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount {
if len(setParams.Nodes) < utils.DefaultKeyspaceGroupReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes")
return
}
Expand All @@ -399,6 +401,53 @@ func SetNodesForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nil)
}

// SetPriorityForKeyspaceGroupParams defines the params for setting priority of tso node for the keyspace group.
type SetPriorityForKeyspaceGroupParams struct {
Node string `json:"node"`
Priority int `json:"priority"`
}

// SetPriorityForKeyspaceGroup sets priority of tso node for the keyspace group.
func SetPriorityForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
return
}
setParams := &SetPriorityForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
// check if keyspace group exists
kg, err := manager.GetKeyspaceGroupByID(id)
if err != nil || kg == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check if node exists
members := kg.Members
if slice.NoneOf(members, func(i int) bool {
return members[i].Address == setParams.Node
}) {
c.AbortWithStatusJSON(http.StatusBadRequest, "tso node does not exist in the keyspace group")
}
// set priority
err = manager.SetPriorityForKeyspaceGroup(id, setParams.Node, setParams.Priority)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down
Loading

0 comments on commit 29263bc

Please sign in to comment.