Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add set handler for balancer and alloc node for default keyspace group #6342

Merged
merged 16 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 117 additions & 25 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import (
)

const (
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodeTimeout = 1 * time.Second
allocNodeInterval = 10 * time.Millisecond
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodesForDefaultKeyspaceGroupInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
Expand Down Expand Up @@ -114,6 +115,15 @@ func (m *GroupManager) Bootstrap() error {

m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand All @@ -126,17 +136,16 @@ func (m *GroupManager) Bootstrap() error {
return err
}
for _, group := range groups {
if group.ID == utils.DefaultKeyspaceGroupID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that, not just about default keyspace group, this logic applies to any keyspace group instead, i.e., if any keyspace group's member/replica count is under the desired replica count, then alloc nodes to it. The desired replica count is specified as one of parameters when creating keyspace group and the default value is 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to check params for set-nodes and alloc-nodes interface?

if len(group.Members) == 0 && m.client != nil {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// The default keyspace group should have one replica at least.
m.wg.Add(1)
go m.allocNodesForDefaultKeyspaceGroup()
}
}
userKind := endpoint.StringUserKind(group.UserKind)
m.groups[userKind].Put(group)
}

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop()
}
return nil
}

Expand All @@ -146,6 +155,31 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesForDefaultKeyspaceGroup() {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesForDefaultKeyspaceGroupInterval)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need timeout for it?

select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
nodes := m.nodesBalancer.GetAll()
if err == nil && kg != nil && equalMembers(kg.Members, nodes) {
continue
}
err = m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, nodes)
if err == nil {
log.Info("alloc nodes for default keyspace group", zap.Reflect("nodes", nodes))
} else {
log.Warn("failed to alloc nodes for default keyspace group", zap.Error(err))
}
}
}

func (m *GroupManager) startWatchLoop() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand All @@ -156,12 +190,9 @@ func (m *GroupManager) startWatchLoop() {
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision
Expand All @@ -177,6 +208,11 @@ func (m *GroupManager) startWatchLoop() {
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
Expand Down Expand Up @@ -603,18 +639,23 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
return nil
}

// GetNodesNum returns the number of nodes.
func (m *GroupManager) GetNodesNum() int {
// GetNodesCount returns the count of nodes.
func (m *GroupManager) GetNodesCount() int {
if m.nodesBalancer == nil {
return 0
}
return m.nodesBalancer.Len()
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) {
ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout)
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
defer cancel()
ticker := time.NewTicker(allocNodeInterval)
ticker := time.NewTicker(allocNodesInterval)
defer ticker.Stop()
nodes := make([]endpoint.KeyspaceGroupMember, 0, replica)
nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount)
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
Expand All @@ -628,14 +669,17 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
for len(exists) < replica {
if len(exists) >= desiredReplicaCount {
return nil
}
for len(exists) < desiredReplicaCount {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
num := m.GetNodesNum()
if num < replica || num == 0 { // double check
countOfNodes := m.GetNodesCount()
if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need countOfNodes == 0 when countOfNodes >= desiredReplicaCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider meeting offline node

return ErrNoAvailableNode
}
addr := m.nodesBalancer.Next()
Expand All @@ -656,3 +700,51 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
}
return nodes, nil
}

// SetNodesForKeyspaceGroup sets the nodes for the keyspace group.
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel here we have more work to do in the future, such as sanity check and service availability check for the nodes switched to before actually refreshing the member list. Please ignore this comment for now.

for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
}
}
return false
}

func equalMembers(a []endpoint.KeyspaceGroupMember, b []string) bool {
if len(a) != len(b) {
return false
}
isExist := make(map[string]struct{}, len(b))
for _, node := range b {
isExist[node] = struct{}{}
}
for _, member := range a {
if _, ok := isExist[member.Address]; !ok {
return false
}
}
return true
}
3 changes: 3 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (s *Server) IsKeyspaceServing(keyspaceID, keyspaceGroupID uint32) bool {
log.Error("failed to get election member", errs.ZapError(err))
return false
}
if member == nil {
return false
}
return member.IsLeader()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,9 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
if err != nil {
return nil, err
}
if am == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if it is primary after kgm.deleteKeyspaceGroup with default keyspace group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean delete the default keyspace group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default keyspace group no longer contain this member

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it actually goes here, it will panic I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be removed after @binshi-bing has fixed

return nil, nil
}
return am.getMember(), nil
}

Expand Down
63 changes: 56 additions & 7 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
router.GET("", GetKeyspaceGroups)
router.GET("/:id", GetKeyspaceGroupByID)
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodeForKeyspaceGroup)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
}
Expand Down Expand Up @@ -190,27 +191,27 @@ func FinishSplitKeyspaceByID(c *gin.Context) {
c.JSON(http.StatusOK, nil)
}

// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodeForKeyspaceGroupParams struct {
// AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodesForKeyspaceGroupParams struct {
Replica int `json:"replica"`
}

// AllocNodeForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodeForKeyspaceGroup(c *gin.Context) {
// AllocNodesForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
allocParams := &AllocNodeForKeyspaceGroupParams{}
allocParams := &AllocNodesForKeyspaceGroupParams{}
err = c.BindJSON(allocParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesNum() < allocParams.Replica || allocParams.Replica < 1 {
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < 1 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [1, nodes_num]")
return
}
Expand All @@ -232,6 +233,54 @@ func AllocNodeForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
}

// SetNodesForKeyspaceGroup sets nodes for keyspace group.
func SetNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
setParams := &SetNodesForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
// check keyspace group whether exist
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
if err != nil || keyspaceGroup == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check nodes whether empty
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
if len(setParams.Nodes) == 0 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty nodes")
return
}
// check nodes whether exist
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
}
// set nodes
err = manager.SetNodesForKeyspaceGroup(id, setParams.Nodes)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down
Loading