diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 872b8e0ad0b..1c42f9b46c0 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -575,6 +575,9 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() ticker := time.NewTicker(serviceModeUpdateInterval) + failpoint.Inject("fastUpdateServiceMode", func() { + ticker.Reset(10 * time.Millisecond) + }) defer ticker.Stop() for { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 46a525a3e09..7eb82c99b25 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -114,9 +114,6 @@ const ( heartbeatTaskRunner = "heartbeat-async" miscTaskRunner = "misc-async" logTaskRunner = "log-async" - - // TODO: make it configurable - IsTSODynamicSwitchingEnabled = false ) // Server is the interface for cluster. @@ -412,22 +409,24 @@ func (c *RaftCluster) checkSchedulingService() { // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { if c.isAPIServiceMode { - if IsTSODynamicSwitchingEnabled { + if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) if err != nil || len(servers) == 0 { if err := c.startTSOJobsIfNeeded(); err != nil { log.Error("failed to start TSO jobs", errs.ZapError(err)) return } - log.Info("TSO is provided by PD") - c.UnsetServiceIndependent(constant.TSOServiceName) + if c.IsServiceIndependent(constant.TSOServiceName) { + log.Info("TSO is provided by PD") + c.UnsetServiceIndependent(constant.TSOServiceName) + } } else { - if err := c.startTSOJobsIfNeeded(); err != nil { + if err := c.stopTSOJobsIfNeeded(); err != nil { log.Error("failed to stop TSO jobs", errs.ZapError(err)) return } - log.Info("TSO is provided by TSO server") if !c.IsServiceIndependent(constant.TSOServiceName) { + log.Info("TSO is provided by TSO server") c.SetServiceIndependent(constant.TSOServiceName) } } diff --git a/server/config/config.go b/server/config/config.go index c64ee3831b0..1ccafe7248a 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -252,7 +252,8 @@ const ( minCheckRegionSplitInterval = 1 * time.Millisecond maxCheckRegionSplitInterval = 100 * time.Millisecond - defaultEnableSchedulingFallback = true + defaultEnableSchedulingFallback = true + defaultEnableTSODynamicSwitching = false ) // Special keys for Labels @@ -854,13 +855,17 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { // MicroServiceConfig is the configuration for micro service. type MicroServiceConfig struct { - EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"` } func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("enable-scheduling-fallback") { c.EnableSchedulingFallback = defaultEnableSchedulingFallback } + if !meta.IsDefined("enable-tso-dynamic-switching") { + c.EnableTSODynamicSwitching = defaultEnableTSODynamicSwitching + } } // Clone returns a copy of micro service config. @@ -874,6 +879,11 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { return c.EnableSchedulingFallback } +// IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching. +func (c *MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool { + return c.EnableTSODynamicSwitching +} + // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/forward.go b/server/forward.go index 48f8f84aaa4..66ac41347c9 100644 --- a/server/forward.go +++ b/server/forward.go @@ -419,7 +419,7 @@ func (s *GrpcServer) isLocalRequest(host string) bool { } func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) { - if !s.IsAPIServiceMode() { + if !s.IsServiceIndependent(constant.TSOServiceName) { return s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) } request := &tsopb.TsoRequest{ diff --git a/server/server.go b/server/server.go index c88871658dc..029c85694c3 100644 --- a/server/server.go +++ b/server/server.go @@ -1411,7 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { if s.mode == APIServiceMode && !s.IsClosed() { - if name == constant.TSOServiceName && !cluster.IsTSODynamicSwitchingEnabled { + if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { return true } return s.cluster.IsServiceIndependent(name) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index bcd9059682d..97cd0c5c043 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -391,6 +391,7 @@ func TestTSOFollowerProxy(t *testing.T) { func TestTSOFollowerProxyWithTSOService(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() cluster, err := tests.NewTestAPICluster(ctx, 1) @@ -405,12 +406,14 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints) re.NoError(err) defer tsoCluster.Destroy() + time.Sleep(100 * time.Millisecond) cli := mcs.SetupClientWithKeyspaceID(ctx, re, constant.DefaultKeyspaceID, strings.Split(backendEndpoints, ",")) re.NotNil(cli) defer cli.Close() // TSO service does not support the follower proxy, so enabling it should fail. err = cli.UpdateOption(pd.EnableTSOFollowerProxy, true) re.Error(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) } // TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207 diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index aa767ecfbef..db07b72635f 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -35,11 +35,13 @@ import ( tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/endpoint" + tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" clientv3 "go.etcd.io/etcd/client/v3" @@ -163,7 +165,9 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() if isAPIServiceMode { - cluster, err = tests.NewTestAPICluster(ctx, 1) + cluster, err = tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + conf.MicroService.EnableTSODynamicSwitching = false + }) } else { cluster, err = tests.NewTestCluster(ctx, 1) } @@ -267,6 +271,10 @@ func TestForwardTSORelated(t *testing.T) { re := require.New(t) suite := NewAPIServerForward(re) defer suite.ShutDown() + leaderServer := suite.cluster.GetLeaderServer().GetServer() + cfg := leaderServer.GetMicroServiceConfig().Clone() + cfg.EnableTSODynamicSwitching = false + leaderServer.SetMicroServiceConfig(*cfg) // Unable to use the tso-related interface without tso server suite.checkUnavailableTSO(re) tc, err := tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) @@ -575,3 +583,99 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() { suite.pdLeader.ResignLeader() suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) } + +// TestTSOServiceSwitch tests the behavior of TSO service switching when `EnableTSODynamicSwitching` is enabled. +// Initially, the TSO service should be provided by PD. After starting a TSO server, the service should switch to the TSO server. +// When the TSO server is stopped, the PD should resume providing the TSO service if `EnableTSODynamicSwitching` is enabled. +// If `EnableTSODynamicSwitching` is disabled, the PD should not provide TSO service after the TSO server is stopped. +func TestTSOServiceSwitch(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, err := tests.NewTestAPICluster(ctx, 1, + func(conf *config.Config, _ string) { + conf.MicroService.EnableTSODynamicSwitching = true + }, + ) + re.NoError(err) + defer tc.Destroy() + + err = tc.RunInitialServers() + re.NoError(err) + leaderName := tc.WaitLeader() + re.NotEmpty(leaderName) + pdLeader := tc.GetServer(leaderName) + backendEndpoints := pdLeader.GetAddr() + re.NoError(pdLeader.BootstrapCluster()) + pdClient, err := pd.NewClientWithContext(ctx, []string{backendEndpoints}, pd.SecurityOption{}) + re.NoError(err) + re.NotNil(pdClient) + defer pdClient.Close() + + var globalLastTS uint64 + // Initially, TSO service should be provided by PD + re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)) + + // Start TSO server + tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, pdLeader.GetAddr()) + re.NoError(err) + tsoCluster.WaitForDefaultPrimaryServing(re) + + // Verify PD is not providing TSO service + testutil.Eventually(re, func() bool { + allocator, err := pdLeader.GetServer().GetTSOAllocatorManager().GetAllocator(tsopkg.GlobalDCLocation) + if err != nil { + return false + } + return !allocator.IsInitialize() + }) + + err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10) + re.NoError(err) + + // Disable TSO switching + cfg := pdLeader.GetServer().GetMicroServiceConfig().Clone() + cfg.EnableTSODynamicSwitching = false + pdLeader.GetServer().SetMicroServiceConfig(*cfg) + + tsoCluster.Destroy() + + // Wait for the configuration change to take effect + time.Sleep(300 * time.Millisecond) + // Verify PD is not providing TSO service multiple times + for range 10 { + err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 1) + re.Error(err, "TSO service should not be available") + time.Sleep(10 * time.Millisecond) + } + + // Now enable TSO switching + cfg = pdLeader.GetServer().GetMicroServiceConfig().Clone() + + cfg.EnableTSODynamicSwitching = true + pdLeader.GetServer().SetMicroServiceConfig(*cfg) + + // Wait for PD to detect the change + time.Sleep(300 * time.Millisecond) + + // Verify PD is now providing TSO service and timestamps are monotonically increasing + re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) +} + +func checkTSOMonotonic(ctx context.Context, pdClient pd.Client, globalLastTS *uint64, count int) error { + for range count { + physical, logical, err := pdClient.GetTS(ctx) + if err != nil { + return err + } + ts := (uint64(physical) << 18) + uint64(logical) + if ts <= *globalLastTS { + return fmt.Errorf("TSO is not globally increasing: last %d, current %d", globalLastTS, ts) + } + *globalLastTS = ts + } + return nil +} diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 35a54a2a8ad..a2bc138fcf6 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" @@ -91,7 +92,9 @@ func (suite *tsoClientTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, serverCount) + suite.cluster, err = tests.NewTestAPICluster(suite.ctx, serverCount, func(conf *config.Config, _ string) { + conf.MicroService.EnableTSODynamicSwitching = false + }) } re.NoError(err) err = suite.cluster.RunInitialServers()