Skip to content

Commit

Permalink
tso, server: refine the TSO allocator manager parameters (#6269)
Browse files Browse the repository at this point in the history
ref #5895

- Refine the TSO allocator manager parameters.
- Always run `tsoAllocatorLoop` to advance the Global TSO.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Apr 6, 2023
1 parent da43376 commit 33daef0
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 47 deletions.
39 changes: 20 additions & 19 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ type ElectionMember interface {
PrecheckLeader() error
}

// ConfigProvider is used to provide TSO configuration.
type ConfigProvider interface {
IsLocalTSOEnabled() bool
GetLeaderLease() int64
GetTSOSaveInterval() time.Duration
GetTSOUpdatePhysicalInterval() time.Duration
GetMaxResetTSGap() time.Duration
GetTLSConfig() *grpcutil.TLSConfig
}

// AllocatorManager is used to manage the TSO Allocators a PD server holds.
// It is in charge of maintaining TSO allocators' leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
Expand Down Expand Up @@ -193,17 +203,12 @@ type AllocatorManager struct {
// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(
ctx context.Context,
startGlobalLeaderLoop bool,
keyspaceGroupID uint32,
member ElectionMember,
rootPath string,
storage endpoint.TSOStorage,
enableLocalTSO bool,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
leaderLease int64,
tlsConfig *grpcutil.TLSConfig,
maxResetTSGap func() time.Duration,
configProvider ConfigProvider,
startGlobalLeaderLoop bool,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
Expand All @@ -213,19 +218,21 @@ func NewAllocatorManager(
member: member,
rootPath: rootPath,
storage: storage,
enableLocalTSO: enableLocalTSO,
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
leaderLease: leaderLease,
maxResetTSGap: maxResetTSGap,
securityConfig: tlsConfig,
enableLocalTSO: configProvider.IsLocalTSOEnabled(),
saveInterval: configProvider.GetTSOSaveInterval(),
updatePhysicalInterval: configProvider.GetTSOUpdatePhysicalInterval(),
leaderLease: configProvider.GetLeaderLease(),
maxResetTSGap: configProvider.GetMaxResetTSGap,
securityConfig: configProvider.GetTLSConfig(),
}
am.mu.allocatorGroups = make(map[string]*allocatorGroup)
am.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)

// Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully.
am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop)
am.svcLoopWG.Add(1)
go am.tsoAllocatorLoop()

return am
}
Expand All @@ -247,11 +254,6 @@ func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership
leadership: leadership,
allocator: allocator,
}

if startGlobalLeaderLoop {
am.svcLoopWG.Add(1)
go am.tsoAllocatorLoop()
}
}

// setUpLocalAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
Expand Down Expand Up @@ -279,7 +281,6 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
// tso service starts the loop here, but pd starts its own loop.
func (am *AllocatorManager) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer am.svcLoopWG.Done()
Expand Down
11 changes: 3 additions & 8 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -74,8 +73,7 @@ type KeyspaceGroupManager struct {
// Note: The {group} is 5 digits integer with leading zeros.
tsoSvcRootPath string
// cfg is the TSO config
cfg ServiceConfig
maxResetTSGap func() time.Duration
cfg ServiceConfig
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand All @@ -102,7 +100,6 @@ func NewKeyspaceGroupManager(
defaultKsgStorageTSRootPath: defaultKsgStorageTSRootPath,
tsoSvcRootPath: tsoSvcRootPath,
cfg: cfg,
maxResetTSGap: func() time.Duration { return cfg.GetMaxResetTSGap() },
}

return ksgMgr
Expand All @@ -127,11 +124,9 @@ func (kgm *KeyspaceGroupManager) initDefaultKeyspaceGroup() {
defaultKsgGroupStorage := endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(kgm.etcdClient, kgm.defaultKsgStorageTSRootPath), nil)
kgm.ksgAllocatorManagers[mcsutils.DefaultKeySpaceGroupID] =
NewAllocatorManager(
kgm.ctx, true, mcsutils.DefaultKeySpaceGroupID, participant,
kgm.ctx, mcsutils.DefaultKeySpaceGroupID, participant,
kgm.defaultKsgStorageTSRootPath, defaultKsgGroupStorage,
kgm.cfg.IsLocalTSOEnabled(), kgm.cfg.GetTSOSaveInterval(),
kgm.cfg.GetTSOUpdatePhysicalInterval(), kgm.cfg.GetLeaderLease(),
kgm.cfg.GetTLSConfig(), kgm.maxResetTSGap)
kgm.cfg, true)
}

// GetAllocatorManager returns the AllocatorManager of the given keyspace group
Expand Down
46 changes: 26 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
if !s.IsAPIServiceMode() {
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.ctx, false, mcs.DefaultKeySpaceGroupID, s.member, s.rootPath, s.storage, s.cfg.IsLocalTSOEnabled(),
s.cfg.GetTSOSaveInterval(), s.cfg.GetTSOUpdatePhysicalInterval(), s.cfg.GetLeaderLease(), s.cfg.GetTLSConfig(),
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() })
// Set up the Global TSO Allocator here, it will be initialized once the PD campaigns leader successfully.
s.tsoAllocatorManager.SetUpGlobalAllocator(ctx, s.member.GetLeadership(), false)
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeySpaceGroupID, s.member, s.rootPath, s.storage, s, false)
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down Expand Up @@ -564,9 +559,6 @@ func (s *Server) startServerLoop(ctx context.Context) {
if s.IsAPIServiceMode() { // disable tso service in api server
s.serverLoopWg.Add(1)
go s.watchServicePrimaryAddrLoop(mcs.TSOServiceName)
} else { // enable tso service
s.serverLoopWg.Add(1)
go s.tsoAllocatorLoop()
}
}

Expand All @@ -592,17 +584,6 @@ func (s *Server) serverMetricsLoop() {
}
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info("server is closed, exit allocator loop")
}

// encryptionKeyManagerLoop is used to start monitor encryption key changes.
func (s *Server) encryptionKeyManagerLoop() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -1813,3 +1794,28 @@ func (s *Server) SetExternalTS(externalTS, globalTS uint64) error {
s.GetRaftCluster().SetExternalTS(externalTS)
return nil
}

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (s *Server) IsLocalTSOEnabled() bool {
return s.cfg.IsLocalTSOEnabled()
}

// GetLeaderLease returns the leader lease.
func (s *Server) GetLeaderLease() int64 {
return s.cfg.GetLeaderLease()
}

// GetTSOSaveInterval returns TSO save interval.
func (s *Server) GetTSOSaveInterval() time.Duration {
return s.cfg.GetTSOSaveInterval()
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration {
return s.cfg.GetTSOUpdatePhysicalInterval()
}

// GetMaxResetTSGap gets the max gap to reset the tso.
func (s *Server) GetMaxResetTSGap() time.Duration {
return s.persistOptions.GetMaxResetTSGap()
}

0 comments on commit 33daef0

Please sign in to comment.