From 0ce394462619ef03141151e6934bcb64deaa0c57 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 18 Apr 2023 16:47:04 +0800 Subject: [PATCH] Introduce TSO TestCluster in the test Signed-off-by: JmPotato --- pkg/mcs/tso/server/server.go | 9 +- pkg/mcs/tso/server/testutil.go | 26 ---- tests/integrations/mcs/cluster.go | 137 ++++++++++++++++++++++ tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/testutil.go | 45 ++++++- tests/integrations/mcs/tso/server_test.go | 50 ++++---- 6 files changed, 206 insertions(+), 63 deletions(-) create mode 100644 tests/integrations/mcs/cluster.go diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 14365aa9650..54e6266df5d 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -202,14 +202,19 @@ func (s *Server) AddStartCallback(callbacks ...func()) { // IsServing implements basicserver. It returns whether the server is the leader // if there is embedded etcd, or the primary otherwise. -// TODO: support multiple keyspace groups func (s *Server) IsServing() bool { + return s.IsKeyspaceServing(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) +} + +// IsKeyspaceServing returns whether the server is the primary of the given keyspace. +// TODO: update basicserver interface to support keyspace. +func (s *Server) IsKeyspaceServing(keyspaceID, keyspaceGroupID uint32) bool { if atomic.LoadInt64(&s.isRunning) == 0 { return false } member, err := s.keyspaceGroupManager.GetElectionMember( - mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + keyspaceID, keyspaceGroupID) if err != nil { log.Error("failed to get election member", errs.ZapError(err)) return false diff --git a/pkg/mcs/tso/server/testutil.go b/pkg/mcs/tso/server/testutil.go index 9495a711900..626d1474673 100644 --- a/pkg/mcs/tso/server/testutil.go +++ b/pkg/mcs/tso/server/testutil.go @@ -15,40 +15,14 @@ package server import ( - "context" - "os" "strings" "github.com/pingcap/kvproto/pkg/tsopb" - "github.com/pingcap/log" "github.com/spf13/pflag" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/utils/logutil" - "github.com/tikv/pd/pkg/utils/testutil" "google.golang.org/grpc" ) -// NewTSOTestServer creates a tso server for testing. -func NewTSOTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, testutil.CleanupFunc, error) { - // New zap logger - err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) - re.NoError(err) - log.ReplaceGlobals(cfg.Logger, cfg.LogProps) - // Flushing any buffered log entries - defer log.Sync() - - s := CreateServer(ctx, cfg) - if err = s.Run(); err != nil { - return nil, nil, err - } - - cleanup := func() { - s.Close() - os.RemoveAll(cfg.DataDir) - } - return s, cleanup, nil -} - // MustNewGrpcClient must create a new TSO grpc client. func MustNewGrpcClient(re *require.Assertions, addr string) (*grpc.ClientConn, tsopb.TSOClient) { conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure()) diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go new file mode 100644 index 00000000000..fd44fd4d439 --- /dev/null +++ b/tests/integrations/mcs/cluster.go @@ -0,0 +1,137 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mcs + +import ( + "context" + "time" + + "github.com/stretchr/testify/require" + tso "github.com/tikv/pd/pkg/mcs/tso/server" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/testutil" +) + +// TestCluster is a test cluster for TSO. +type TestCluster struct { + ctx context.Context + + backendEndpoints string + servers map[string]*tso.Server + cleanupFuncs map[string]testutil.CleanupFunc +} + +// NewTestTSOCluster creates a new TSO test cluster. +func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestCluster, err error) { + tc = &TestCluster{ + ctx: ctx, + backendEndpoints: backendEndpoints, + servers: make(map[string]*tso.Server, initialServerCount), + cleanupFuncs: make(map[string]testutil.CleanupFunc, initialServerCount), + } + for i := 0; i < initialServerCount; i++ { + err = tc.AddServer(tempurl.Alloc()) + if err != nil { + return nil, err + } + } + return tc, nil +} + +// AddServer adds a new TSO server to the test cluster. +func (tc *TestCluster) AddServer(addr string) error { + cfg := tso.NewConfig() + cfg.BackendEndpoints = tc.backendEndpoints + cfg.ListenAddr = addr + cfg.Name = cfg.ListenAddr + generatedCfg, err := tso.GenerateConfig(cfg) + if err != nil { + return err + } + err = initLogger(generatedCfg) + if err != nil { + return err + } + server, cleanup, err := newTSOTestServer(tc.ctx, generatedCfg) + if err != nil { + return err + } + tc.servers[generatedCfg.GetListenAddr()] = server + tc.cleanupFuncs[generatedCfg.GetListenAddr()] = cleanup + return nil +} + +// Destroy stops and destroy the test cluster. +func (tc *TestCluster) Destroy() { + for _, cleanup := range tc.cleanupFuncs { + cleanup() + } + tc.cleanupFuncs = nil + tc.servers = nil +} + +// DestroyServer stops and destroy the test server by the given address. +func (tc *TestCluster) DestroyServer(addr string) { + tc.cleanupFuncs[addr]() + delete(tc.cleanupFuncs, addr) + delete(tc.servers, addr) +} + +// GetPrimary returns the primary TSO server. +func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { + for _, server := range tc.servers { + if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { + return server + } + } + return nil +} + +// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader of the given keyspace. +func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) string { + var primary string + testutil.Eventually(re, func() bool { + for name, s := range tc.servers { + if s.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { + primary = name + return true + } + } + return false + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return primary +} + +// WaitForDefaultPrimaryServing waits for one of servers being elected to be the primary/leader of the default keyspace. +func (tc *TestCluster) WaitForDefaultPrimaryServing(re *require.Assertions) string { + return tc.WaitForPrimaryServing(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) +} + +// GetServer returns the TSO server by the given address. +func (tc *TestCluster) GetServer(addr string) *tso.Server { + for srvAddr, server := range tc.servers { + if srvAddr == addr { + return server + } + } + return nil +} + +// GetServers returns all TSO servers. +func (tc *TestCluster) GetServers() map[string]*tso.Server { + return tc.servers +} diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 60adb1ea0f9..7d135779def 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -13,6 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a + github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 @@ -116,7 +117,6 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect - github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect github.com/pingcap/tidb-dashboard v0.0.0-20230209052558-a58fc2a7e924 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index b277b9158cb..7a006289fec 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -16,19 +16,38 @@ package mcs import ( "context" + "os" + "sync" "time" + "github.com/pingcap/log" "github.com/pkg/errors" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" + bs "github.com/tikv/pd/pkg/basicserver" rm "github.com/tikv/pd/pkg/mcs/resource_manager/server" tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" - - "github.com/stretchr/testify/require" - pd "github.com/tikv/pd/client" - bs "github.com/tikv/pd/pkg/basicserver" ) +var once sync.Once + +func initLogger(cfg *tso.Config) (err error) { + once.Do(func() { + // Setup the logger. + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err != nil { + return + } + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + // Flushing any buffered log entries. + log.Sync() + }) + return err +} + // SetupClientWithKeyspace creates a TSO client for test. func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { cli, err := pd.NewClientWithKeyspace(ctx, utils.DefaultKeyspaceID, endpoints, pd.SecurityOption{}, opts...) @@ -68,7 +87,11 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe cfg, err := tso.GenerateConfig(cfg) re.NoError(err) - s, cleanup, err := tso.NewTSOTestServer(ctx, re, cfg) + // Setup the logger. + err = initLogger(cfg) + re.NoError(err) + + s, cleanup, err := newTSOTestServer(ctx, cfg) re.NoError(err) testutil.Eventually(re, func() bool { return !s.IsClosed() @@ -77,6 +100,18 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe return s, cleanup } +func newTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) { + s := tso.CreateServer(ctx, cfg) + if err := s.Run(); err != nil { + return nil, nil, err + } + cleanup := func() { + s.Close() + os.RemoveAll(cfg.DataDir) + } + return s, cleanup, nil +} + // WaitForPrimaryServing waits for one of servers being elected to be the primary/leader func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string { var primary string diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index b1f0945d44d..483f71992ef 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" - bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/discovery" tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" @@ -235,52 +234,45 @@ func (suite *APIServerForwardTestSuite) TearDownSuite() { func (suite *APIServerForwardTestSuite) TestForwardTSORelated() { // Unable to use the tso-related interface without tso server suite.checkUnavailableTSO() - // can use the tso-related interface with tso server - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) - serverMap := make(map[string]bs.Server) - serverMap[s.GetAddr()] = s - mcs.WaitForPrimaryServing(suite.Require(), serverMap) + tc, err := mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) + suite.NoError(err) + defer tc.Destroy() + tc.WaitForDefaultPrimaryServing(suite.Require()) suite.checkAvailableTSO() - cleanup() } func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() { - serverMap := make(map[string]bs.Server) - for i := 0; i < 3; i++ { - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) - defer cleanup() - serverMap[s.GetAddr()] = s - } - mcs.WaitForPrimaryServing(suite.Require(), serverMap) + re := suite.Require() + + tc, err := mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForDefaultPrimaryServing(re) // can use the tso-related interface with new primary oldPrimary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) - suite.True(exist) - serverMap[oldPrimary].Close() - delete(serverMap, oldPrimary) + re.True(exist) + tc.DestroyServer(oldPrimary) time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout - mcs.WaitForPrimaryServing(suite.Require(), serverMap) + tc.WaitForDefaultPrimaryServing(re) primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) - suite.True(exist) - suite.NotEqual(oldPrimary, primary) + re.True(exist) + re.NotEqual(oldPrimary, primary) suite.checkAvailableTSO() // can use the tso-related interface with old primary again - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, oldPrimary) - defer cleanup() - serverMap[oldPrimary] = s + tc.AddServer(oldPrimary) suite.checkAvailableTSO() - for addr, s := range serverMap { + for addr := range tc.GetServers() { if addr != oldPrimary { - s.Close() - delete(serverMap, addr) + tc.DestroyServer(addr) } } - mcs.WaitForPrimaryServing(suite.Require(), serverMap) + tc.WaitForDefaultPrimaryServing(re) time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) - suite.True(exist) - suite.Equal(oldPrimary, primary) + re.True(exist) + re.Equal(oldPrimary, primary) suite.checkAvailableTSO() }