Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move keyspace group primary path code to key_path.go #6755

Merged
merged 7 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ package discovery
import (
"strconv"
"strings"

"github.com/tikv/pd/pkg/mcs/utils"
)

const (
registryPrefix = "/ms"
registryKey = "registry"
registryKey = "registry"
)

// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand Down Expand Up @@ -366,10 +367,10 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := fmt.Sprintf("/ms/%d/resource_manager", s.clusterID)
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
"primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr)
utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr)

s.service = &Service{
ctx: s.ctx,
Expand Down
9 changes: 3 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -64,11 +65,7 @@ import (

const (
// pdRootPath is the old path for storing the tso related root path.
pdRootPath = "/pd"
msServiceRootPath = "/ms"
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName
pdRootPath = "/pd"
// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -536,7 +533,7 @@ func (s *Server) startServer() (err error) {
// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// MicroserviceRootPath is the root path of microservice in etcd.
MicroserviceRootPath = "/ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
Expand All @@ -59,6 +59,8 @@ const (
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"
// KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups.
KeyspaceGroupsPrimaryKey = "primary"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
47 changes: 43 additions & 4 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ const (
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupsMembershipKey = "membership"
keyspaceGroupsElectionKey = "election"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
Expand Down Expand Up @@ -228,13 +229,13 @@ func EncodeKeyspaceID(spaceID uint32) string {
// KeyspaceGroupIDPrefix returns the prefix of keyspace group id.
// Path: tso/keyspace_groups/membership
func KeyspaceGroupIDPrefix() string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey)
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey)
}

// KeyspaceGroupIDPath returns the path to keyspace id from the given name.
// Path: tso/keyspace_groups/membership/{id}
func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey, encodeKeyspaceGroupID(id))
}

// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
Expand All @@ -243,6 +244,44 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
return regexp.MustCompile(pattern)
}

// ResourceManagerSvcRootPath returns the root path of resource manager service.
func ResourceManagerSvcRootPath(clusterID uint64) string {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return svcRootPath(clusterID, utils.ResourceManagerServiceName)
}

// TSOSvcRootPath returns the root path of tso service.
func TSOSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.TSOServiceName)
}

func svcRootPath(clusterID uint64, svcName string) string {
c := strconv.FormatUint(clusterID, 10)
return path.Join(utils.MicroserviceRootPath, c, svcName)
}

// KeyspaceGroupPrimaryPath returns the path of keyspace group primary.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
electionPath := KeyspaceGroupsElectionPath(rootPath, keyspaceGroupID)
return path.Join(electionPath, utils.KeyspaceGroupsPrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
func KeyspaceGroupsElectionPath(rootPath string, keyspaceGroupID uint32) string {
if keyspaceGroupID == utils.DefaultKeyspaceGroupID {
return path.Join(rootPath, "00000")
}
return path.Join(rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, fmt.Sprintf("%05d", keyspaceGroupID))
}

// GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func GetCompiledNonDefaultIDRegexp(clusterID uint64) *regexp.Regexp {
rootPath := TSOSvcRootPath(clusterID)
pattern := strings.Join([]string{rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, `(\d{5})`, utils.KeyspaceGroupsPrimaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand Down
41 changes: 3 additions & 38 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"net/http"
"path"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -51,9 +50,6 @@ import (
)

const (
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
// mergingCheckInterval is the interval for merging check to see if the keyspace groups
// merging process could be moved forward.
mergingCheckInterval = 5 * time.Second
Expand Down Expand Up @@ -231,32 +227,6 @@ func (s *state) getNextPrimaryToReset(
return nil, nil, 0, groupID
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
type kgPrimaryPathBuilder struct {
// rootPath is "/ms/{cluster_id}/tso".
rootPath string
// defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000".
defaultKeyspaceGroupIDPath string
}

// getKeyspaceGroupIDPath returns the keyspace group primary ID path.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string {
if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
return p.defaultKeyspaceGroupIDPath
}
return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID))
}

// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
Expand Down Expand Up @@ -330,7 +300,6 @@ type KeyspaceGroupManager struct {
// mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group.
mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc

primaryPathBuilder *kgPrimaryPathBuilder
primaryPriorityCheckInterval time.Duration

// tsoNodes is the registered tso servers.
Expand Down Expand Up @@ -381,10 +350,6 @@ func NewKeyspaceGroupManager(
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp()
kgm.primaryPathBuilder = &kgPrimaryPathBuilder{
rootPath: kgm.tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"),
}
kgm.state.initialize()
return kgm
}
Expand Down Expand Up @@ -692,8 +657,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID),
primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName, uniqueID, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID),
mcsutils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down Expand Up @@ -1248,7 +1213,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey)
leaderPath := endpoint.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id)
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() {
guid := uuid.New().String()
tsoServiceKey := discovery.ServicePath(guid, "tso") + "/"
legacySvcRootPath := path.Join("/pd", guid)
tsoSvcRootPath := path.Join("/ms", guid, "tso")
tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, guid, "tso")
electionNamePrefix := "tso-server-" + guid

kgm := NewKeyspaceGroupManager(
Expand Down Expand Up @@ -766,7 +766,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()}
tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") + "/"
legacySvcRootPath := path.Join("/pd", uniqueStr)
tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso")
tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, uniqueStr, "tso")
electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr()

kgm := NewKeyspaceGroupManager(
Expand Down
9 changes: 1 addition & 8 deletions pkg/tso/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tso

import (
"path"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -75,13 +74,7 @@ func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) {
func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) {
re := require.New(t)

tsoSvcRootPath := "/ms/111/tso"
primaryPathBuilder := &kgPrimaryPathBuilder{
rootPath: tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(tsoSvcRootPath, "00000"),
}

compiledRegexp := primaryPathBuilder.getCompiledNonDefaultIDRegexp()
compiledRegexp := endpoint.GetCompiledNonDefaultIDRegexp(uint64(111))

rightCases := []struct {
path string
Expand Down
7 changes: 2 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,13 +1844,10 @@ func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}

func (s *Server) initTSOPrimaryWatcher() {
serviceName := mcs.TSOServiceName
tsoServicePrimaryKey := s.servicePrimaryKey(serviceName)
tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID)
tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID)
putFn := func(kv *mvccpb.KeyValue) error {
primary := &tsopb.Participant{} // TODO: use Generics
if err := proto.Unmarshal(kv.Value, primary); err != nil {
Expand Down
10 changes: 3 additions & 7 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,15 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
// Make sure every keyspace group is using the right timestamp path
// for loading/saving timestamp from/to etcd and the right primary path
// for primary election.
var (
timestampPath string
primaryPath string
)
var timestampPath string
clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10)
rootPath := endpoint.TSOSvcRootPath(suite.pdLeaderServer.GetClusterID())
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID)
if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID)
primaryPath = fmt.Sprintf("/ms/%s/tso/00000/primary", clusterID)
} else {
timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp",
clusterID, param.keyspaceGroupID)
primaryPath = fmt.Sprintf("/ms/%s/tso/%s/election/%05d/primary",
clusterID, mcsutils.KeyspaceGroupsKey, param.keyspaceGroupID)
}
re.Equal(timestampPath, am.GetTimestampPath(tsopkg.GlobalDCLocation))
re.Equal(primaryPath, am.GetMember().GetLeaderPath())
Expand Down