Skip to content

Commit

Permalink
Introduce TSO TestCluster in the test
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Apr 18, 2023
1 parent e75eef1 commit 8869b54
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 63 deletions.
9 changes: 7 additions & 2 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,19 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing implements basicserver. It returns whether the server is the leader
// if there is embedded etcd, or the primary otherwise.
// TODO: support multiple keyspace groups
func (s *Server) IsServing() bool {
return s.IsKeyspaceServing(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
}

// IsKeyspaceServing returns whether the server is the primary of the given keyspace.
// TODO: update basicserver interface to support keyspace.
func (s *Server) IsKeyspaceServing(keyspaceID, keyspaceGroupID uint32) bool {
if atomic.LoadInt64(&s.isRunning) == 0 {
return false
}

member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
keyspaceID, keyspaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
Expand Down
26 changes: 0 additions & 26 deletions pkg/mcs/tso/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,14 @@
package server

import (
"context"
"os"
"strings"

"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"google.golang.org/grpc"
)

// NewTSOTestServer creates a tso server for testing.
func NewTSOTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, testutil.CleanupFunc, error) {
// New zap logger
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()

s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}

cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// MustNewGrpcClient must create a new TSO grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) (*grpc.ClientConn, tsopb.TSOClient) {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())
Expand Down
137 changes: 137 additions & 0 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mcs

import (
"context"
"time"

"github.com/stretchr/testify/require"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
)

// TestCluster is a test cluster for TSO.
type TestCluster struct {
ctx context.Context

backendEndpoints string
servers map[string]*tso.Server
cleanupFuncs map[string]testutil.CleanupFunc
}

// NewTestTSOCluster creates a new TSO test cluster.
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestCluster, err error) {
tc = &TestCluster{
ctx: ctx,
backendEndpoints: backendEndpoints,
servers: make(map[string]*tso.Server, initialServerCount),
cleanupFuncs: make(map[string]testutil.CleanupFunc, initialServerCount),
}
for i := 0; i < initialServerCount; i++ {
err = tc.AddServer(tempurl.Alloc())
if err != nil {
return nil, err
}
}
return tc, nil
}

// AddServer adds a new TSO server to the test cluster.
func (tc *TestCluster) AddServer(addr string) error {
cfg := tso.NewConfig()
cfg.BackendEndpoints = tc.backendEndpoints
cfg.ListenAddr = addr
cfg.Name = cfg.ListenAddr
generatedCfg, err := tso.GenerateConfig(cfg)
if err != nil {
return err
}
err = initLogger(generatedCfg)
if err != nil {
return err
}
server, cleanup, err := newTSOTestServer(tc.ctx, generatedCfg)
if err != nil {
return err
}
tc.servers[generatedCfg.GetListenAddr()] = server
tc.cleanupFuncs[generatedCfg.GetListenAddr()] = cleanup
return nil
}

// Destroy stops and destroy the test cluster.
func (tc *TestCluster) Destroy() {
for _, cleanup := range tc.cleanupFuncs {
cleanup()
}
tc.cleanupFuncs = nil
tc.servers = nil
}

// DestroyServer stops and destroy the test server by the given address.
func (tc *TestCluster) DestroyServer(addr string) {
tc.cleanupFuncs[addr]()
delete(tc.cleanupFuncs, addr)
delete(tc.servers, addr)
}

// GetPrimary returns the primary TSO server.
func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
}
}
return nil
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader of the given keyspace.
func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) string {
var primary string
testutil.Eventually(re, func() bool {
for name, s := range tc.servers {
if s.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = name
return true
}
}
return false
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return primary
}

// WaitForDefaultPrimaryServing waits for one of servers being elected to be the primary/leader of the default keyspace.
func (tc *TestCluster) WaitForDefaultPrimaryServing(re *require.Assertions) string {
return tc.WaitForPrimaryServing(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
}

// GetServer returns the TSO server by the given address.
func (tc *TestCluster) GetServer(addr string) *tso.Server {
for srvAddr, server := range tc.servers {
if srvAddr == addr {
return server
}
}
return nil
}

// GetServers returns all TSO servers.
func (tc *TestCluster) GetServers() map[string]*tso.Server {
return tc.servers
}
2 changes: 1 addition & 1 deletion tests/integrations/mcs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.2
github.com/tikv/pd v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -116,7 +117,6 @@ require (
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect
github.com/pingcap/errcode v0.3.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect
github.com/pingcap/tidb-dashboard v0.0.0-20230209052558-a58fc2a7e924 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
Expand Down
45 changes: 40 additions & 5 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,38 @@ package mcs

import (
"context"
"os"
"sync"
"time"

"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
rm "github.com/tikv/pd/pkg/mcs/resource_manager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"

"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
)

var once sync.Once

func initLogger(cfg *tso.Config) (err error) {
once.Do(func() {
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return
}
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries.
log.Sync()
})
return err
}

// SetupClientWithKeyspace creates a TSO client for test.
func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
cli, err := pd.NewClientWithKeyspace(ctx, utils.DefaultKeyspaceID, endpoints, pd.SecurityOption{}, opts...)
Expand Down Expand Up @@ -68,7 +87,11 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe
cfg, err := tso.GenerateConfig(cfg)
re.NoError(err)

s, cleanup, err := tso.NewTSOTestServer(ctx, re, cfg)
// Setup the logger.
err = initLogger(cfg)
re.NoError(err)

s, cleanup, err := newTSOTestServer(ctx, cfg)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
Expand All @@ -77,6 +100,18 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe
return s, cleanup
}

func newTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) {
s := tso.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader
func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string {
var primary string
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,6 @@ func request(
re.NoError(tsoClient.Send(req))
return tsoClient.Recv()
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupPrimary() {
}
50 changes: 21 additions & 29 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/discovery"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
Expand Down Expand Up @@ -235,52 +234,45 @@ func (suite *APIServerForwardTestSuite) TearDownSuite() {
func (suite *APIServerForwardTestSuite) TestForwardTSORelated() {
// Unable to use the tso-related interface without tso server
suite.checkUnavailableTSO()
// can use the tso-related interface with tso server
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
serverMap := make(map[string]bs.Server)
serverMap[s.GetAddr()] = s
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
tc, err := mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
suite.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(suite.Require())
suite.checkAvailableTSO()
cleanup()
}

func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() {
serverMap := make(map[string]bs.Server)
for i := 0; i < 3; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
serverMap[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
re := suite.Require()

tc, err := mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)

// can use the tso-related interface with new primary
oldPrimary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
serverMap[oldPrimary].Close()
delete(serverMap, oldPrimary)
re.True(exist)
tc.DestroyServer(oldPrimary)
time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
tc.WaitForDefaultPrimaryServing(re)
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
suite.NotEqual(oldPrimary, primary)
re.True(exist)
re.NotEqual(oldPrimary, primary)
suite.checkAvailableTSO()

// can use the tso-related interface with old primary again
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, oldPrimary)
defer cleanup()
serverMap[oldPrimary] = s
tc.AddServer(oldPrimary)
suite.checkAvailableTSO()
for addr, s := range serverMap {
for addr := range tc.GetServers() {
if addr != oldPrimary {
s.Close()
delete(serverMap, addr)
tc.DestroyServer(addr)
}
}
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
tc.WaitForDefaultPrimaryServing(re)
time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout
primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
suite.Equal(oldPrimary, primary)
re.True(exist)
re.Equal(oldPrimary, primary)
suite.checkAvailableTSO()
}

Expand Down

0 comments on commit 8869b54

Please sign in to comment.