Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso, server: refine the TSO allocator manager parameters #6269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ type ElectionMember interface {
PrecheckLeader() error
}

// ConfigProvider is used to provide TSO configuration.
type ConfigProvider interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there is no need to define a new interface. The newly added interface tso.ServiceConfig in pkg/tso/config.go was designed for this purpose. It's already used by tso.KeyspaceGroupManager and planned to use it for simplifying the interface of NewAllocatorManager too.

pd server config.Config just needs to implement tso.ServiceConfig, i.e., only needs to add the following methods (the other methods have already been implemented, and GeBackendEndpoints() can be removed).

// GetName returns the Name
func (c *Config) GetName() string {
return c.Name
}

// GetListenAddr returns the ListenAddr
func (c *Config) GetListenAddr() string {
return c.ClientUrls
}

// GetAdvertiseListenAddr returns the AdvertiseListenAddr
func (c *Config) GetAdvertiseListenAddr() string {
return c.AdvertiseClientUrls
}

// GetMaxResetTSGap returns the MaxResetTSGap.
func (c *Config) GetMaxResetTSGap() time.Duration {
return ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that, for PD/API GetMaxResetTSGap() needs to return s.persistOptions.GetMaxResetTSGap(). Yes, you're right to let Server implement the interface. Shall we combine ConfigProvider and tso.ServiceConfig into one interface? I'm ok to either way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for merging this PR without noticing your comment. I filed a new PR to address this problem. PTAL.

#6272

IsLocalTSOEnabled() bool
GetLeaderLease() int64
GetTSOSaveInterval() time.Duration
GetTSOUpdatePhysicalInterval() time.Duration
GetMaxResetTSGap() time.Duration
GetTLSConfig() *grpcutil.TLSConfig
}

// AllocatorManager is used to manage the TSO Allocators a PD server holds.
// It is in charge of maintaining TSO allocators' leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
Expand Down Expand Up @@ -193,17 +203,12 @@ type AllocatorManager struct {
// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(
ctx context.Context,
startGlobalLeaderLoop bool,
keyspaceGroupID uint32,
member ElectionMember,
rootPath string,
storage endpoint.TSOStorage,
enableLocalTSO bool,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
leaderLease int64,
tlsConfig *grpcutil.TLSConfig,
maxResetTSGap func() time.Duration,
configProvider ConfigProvider,
startGlobalLeaderLoop bool,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
Expand All @@ -213,19 +218,21 @@ func NewAllocatorManager(
member: member,
rootPath: rootPath,
storage: storage,
enableLocalTSO: enableLocalTSO,
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
leaderLease: leaderLease,
maxResetTSGap: maxResetTSGap,
securityConfig: tlsConfig,
enableLocalTSO: configProvider.IsLocalTSOEnabled(),
saveInterval: configProvider.GetTSOSaveInterval(),
updatePhysicalInterval: configProvider.GetTSOUpdatePhysicalInterval(),
leaderLease: configProvider.GetLeaderLease(),
maxResetTSGap: configProvider.GetMaxResetTSGap,
securityConfig: configProvider.GetTLSConfig(),
}
am.mu.allocatorGroups = make(map[string]*allocatorGroup)
am.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)

// Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully.
am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop)
am.svcLoopWG.Add(1)
go am.tsoAllocatorLoop()

return am
}
Expand All @@ -247,11 +254,6 @@ func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership
leadership: leadership,
allocator: allocator,
}

if startGlobalLeaderLoop {
am.svcLoopWG.Add(1)
go am.tsoAllocatorLoop()
}
}

// setUpLocalAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
Expand Down Expand Up @@ -279,7 +281,6 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
// tso service starts the loop here, but pd starts its own loop.
func (am *AllocatorManager) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer am.svcLoopWG.Done()
Expand Down
11 changes: 3 additions & 8 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -74,8 +73,7 @@ type KeyspaceGroupManager struct {
// Note: The {group} is 5 digits integer with leading zeros.
tsoSvcRootPath string
// cfg is the TSO config
cfg ServiceConfig
maxResetTSGap func() time.Duration
cfg ServiceConfig
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand All @@ -102,7 +100,6 @@ func NewKeyspaceGroupManager(
defaultKsgStorageTSRootPath: defaultKsgStorageTSRootPath,
tsoSvcRootPath: tsoSvcRootPath,
cfg: cfg,
maxResetTSGap: func() time.Duration { return cfg.GetMaxResetTSGap() },
}

return ksgMgr
Expand All @@ -127,11 +124,9 @@ func (kgm *KeyspaceGroupManager) initDefaultKeyspaceGroup() {
defaultKsgGroupStorage := endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(kgm.etcdClient, kgm.defaultKsgStorageTSRootPath), nil)
kgm.ksgAllocatorManagers[mcsutils.DefaultKeySpaceGroupID] =
NewAllocatorManager(
kgm.ctx, true, mcsutils.DefaultKeySpaceGroupID, participant,
kgm.ctx, mcsutils.DefaultKeySpaceGroupID, participant,
kgm.defaultKsgStorageTSRootPath, defaultKsgGroupStorage,
kgm.cfg.IsLocalTSOEnabled(), kgm.cfg.GetTSOSaveInterval(),
kgm.cfg.GetTSOUpdatePhysicalInterval(), kgm.cfg.GetLeaderLease(),
kgm.cfg.GetTLSConfig(), kgm.maxResetTSGap)
kgm.cfg, true)
}

// GetAllocatorManager returns the AllocatorManager of the given keyspace group
Expand Down
46 changes: 26 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
if !s.IsAPIServiceMode() {
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.ctx, false, mcs.DefaultKeySpaceGroupID, s.member, s.rootPath, s.storage, s.cfg.IsLocalTSOEnabled(),
s.cfg.GetTSOSaveInterval(), s.cfg.GetTSOUpdatePhysicalInterval(), s.cfg.GetLeaderLease(), s.cfg.GetTLSConfig(),
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() })
// Set up the Global TSO Allocator here, it will be initialized once the PD campaigns leader successfully.
s.tsoAllocatorManager.SetUpGlobalAllocator(ctx, s.member.GetLeadership(), false)
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeySpaceGroupID, s.member, s.rootPath, s.storage, s, false)
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down Expand Up @@ -564,9 +559,6 @@ func (s *Server) startServerLoop(ctx context.Context) {
if s.IsAPIServiceMode() { // disable tso service in api server
s.serverLoopWg.Add(1)
go s.watchServicePrimaryAddrLoop(mcs.TSOServiceName)
} else { // enable tso service
s.serverLoopWg.Add(1)
go s.tsoAllocatorLoop()
}
}

Expand All @@ -592,17 +584,6 @@ func (s *Server) serverMetricsLoop() {
}
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info("server is closed, exit allocator loop")
}

// encryptionKeyManagerLoop is used to start monitor encryption key changes.
func (s *Server) encryptionKeyManagerLoop() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -1813,3 +1794,28 @@ func (s *Server) SetExternalTS(externalTS, globalTS uint64) error {
s.GetRaftCluster().SetExternalTS(externalTS)
return nil
}

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (s *Server) IsLocalTSOEnabled() bool {
return s.cfg.IsLocalTSOEnabled()
}

// GetLeaderLease returns the leader lease.
func (s *Server) GetLeaderLease() int64 {
return s.cfg.GetLeaderLease()
}

// GetTSOSaveInterval returns TSO save interval.
func (s *Server) GetTSOSaveInterval() time.Duration {
return s.cfg.GetTSOSaveInterval()
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration {
return s.cfg.GetTSOUpdatePhysicalInterval()
}

// GetMaxResetTSGap gets the max gap to reset the tso.
func (s *Server) GetMaxResetTSGap() time.Duration {
return s.persistOptions.GetMaxResetTSGap()
}