Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: introduce TSO TestCluster in the test #6333

Merged
merged 2 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,19 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing implements basicserver. It returns whether the server is the leader
// if there is embedded etcd, or the primary otherwise.
// TODO: support multiple keyspace groups
func (s *Server) IsServing() bool {
return s.IsKeyspaceServing(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
}

// IsKeyspaceServing returns whether the server is the primary of the given keyspace.
// TODO: update basicserver interface to support keyspace.
func (s *Server) IsKeyspaceServing(keyspaceID, keyspaceGroupID uint32) bool {
if atomic.LoadInt64(&s.isRunning) == 0 {
return false
}

member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
keyspaceID, keyspaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
Expand Down
26 changes: 0 additions & 26 deletions pkg/mcs/tso/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,14 @@
package server

import (
"context"
"os"
"strings"

"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"google.golang.org/grpc"
)

// NewTSOTestServer creates a tso server for testing.
func NewTSOTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, testutil.CleanupFunc, error) {
// New zap logger
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()

s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}

cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// MustNewGrpcClient must create a new TSO grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) (*grpc.ClientConn, tsopb.TSOClient) {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())
Expand Down
137 changes: 137 additions & 0 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mcs

import (
"context"
"time"

"github.com/stretchr/testify/require"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
)

// TestCluster is a test cluster for TSO.
type TestCluster struct {
ctx context.Context

backendEndpoints string
servers map[string]*tso.Server
cleanupFuncs map[string]testutil.CleanupFunc
}

// NewTestTSOCluster creates a new TSO test cluster.
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestCluster, err error) {
tc = &TestCluster{
ctx: ctx,
backendEndpoints: backendEndpoints,
servers: make(map[string]*tso.Server, initialServerCount),
cleanupFuncs: make(map[string]testutil.CleanupFunc, initialServerCount),
}
for i := 0; i < initialServerCount; i++ {
err = tc.AddServer(tempurl.Alloc())
if err != nil {
return nil, err
}
}
return tc, nil
}

// AddServer adds a new TSO server to the test cluster.
func (tc *TestCluster) AddServer(addr string) error {
cfg := tso.NewConfig()
cfg.BackendEndpoints = tc.backendEndpoints
cfg.ListenAddr = addr
cfg.Name = cfg.ListenAddr
generatedCfg, err := tso.GenerateConfig(cfg)
if err != nil {
return err
}
err = initLogger(generatedCfg)
if err != nil {
return err
}
server, cleanup, err := newTSOTestServer(tc.ctx, generatedCfg)
if err != nil {
return err
}
tc.servers[generatedCfg.GetListenAddr()] = server
tc.cleanupFuncs[generatedCfg.GetListenAddr()] = cleanup
return nil
}

// Destroy stops and destroy the test cluster.
func (tc *TestCluster) Destroy() {
for _, cleanup := range tc.cleanupFuncs {
cleanup()
}
tc.cleanupFuncs = nil
tc.servers = nil
}

// DestroyServer stops and destroy the test server by the given address.
func (tc *TestCluster) DestroyServer(addr string) {
tc.cleanupFuncs[addr]()
delete(tc.cleanupFuncs, addr)
delete(tc.servers, addr)
}

// GetPrimary returns the primary TSO server.
func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
}
}
return nil
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader of the given keyspace.
func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) string {
var primary string
testutil.Eventually(re, func() bool {
for name, s := range tc.servers {
if s.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = name
return true
}
}
return false
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return primary
}

// WaitForDefaultPrimaryServing waits for one of servers being elected to be the primary/leader of the default keyspace.
func (tc *TestCluster) WaitForDefaultPrimaryServing(re *require.Assertions) string {
return tc.WaitForPrimaryServing(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
}

// GetServer returns the TSO server by the given address.
func (tc *TestCluster) GetServer(addr string) *tso.Server {
for srvAddr, server := range tc.servers {
if srvAddr == addr {
return server
}
}
return nil
}

// GetServers returns all TSO servers.
func (tc *TestCluster) GetServers() map[string]*tso.Server {
return tc.servers
}
2 changes: 1 addition & 1 deletion tests/integrations/mcs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.2
github.com/tikv/pd v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -116,7 +117,6 @@ require (
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect
github.com/pingcap/errcode v0.3.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect
github.com/pingcap/tidb-dashboard v0.0.0-20230209052558-a58fc2a7e924 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
Expand Down
45 changes: 40 additions & 5 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,38 @@ package mcs

import (
"context"
"os"
"sync"
"time"

"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
rm "github.com/tikv/pd/pkg/mcs/resource_manager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"

"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
)

var once sync.Once

func initLogger(cfg *tso.Config) (err error) {
once.Do(func() {
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return
}
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries.
log.Sync()
})
return err
}

// SetupClientWithKeyspace creates a TSO client for test.
func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
cli, err := pd.NewClientWithKeyspace(ctx, utils.DefaultKeyspaceID, endpoints, pd.SecurityOption{}, opts...)
Expand Down Expand Up @@ -68,7 +87,11 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe
cfg, err := tso.GenerateConfig(cfg)
re.NoError(err)

s, cleanup, err := tso.NewTSOTestServer(ctx, re, cfg)
// Setup the logger.
err = initLogger(cfg)
re.NoError(err)

s, cleanup, err := newTSOTestServer(ctx, cfg)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
Expand All @@ -77,6 +100,18 @@ func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backe
return s, cleanup
}

func newTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) {
s := tso.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader
func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string {
var primary string
Expand Down
50 changes: 21 additions & 29 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/discovery"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
Expand Down Expand Up @@ -235,52 +234,45 @@ func (suite *APIServerForwardTestSuite) TearDownSuite() {
func (suite *APIServerForwardTestSuite) TestForwardTSORelated() {
// Unable to use the tso-related interface without tso server
suite.checkUnavailableTSO()
// can use the tso-related interface with tso server
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
serverMap := make(map[string]bs.Server)
serverMap[s.GetAddr()] = s
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
tc, err := mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
suite.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(suite.Require())
suite.checkAvailableTSO()
cleanup()
}

func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() {
serverMap := make(map[string]bs.Server)
for i := 0; i < 3; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
serverMap[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
re := suite.Require()

tc, err := mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)

// can use the tso-related interface with new primary
oldPrimary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
serverMap[oldPrimary].Close()
delete(serverMap, oldPrimary)
re.True(exist)
tc.DestroyServer(oldPrimary)
time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
tc.WaitForDefaultPrimaryServing(re)
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
suite.NotEqual(oldPrimary, primary)
re.True(exist)
re.NotEqual(oldPrimary, primary)
suite.checkAvailableTSO()

// can use the tso-related interface with old primary again
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, oldPrimary)
defer cleanup()
serverMap[oldPrimary] = s
tc.AddServer(oldPrimary)
suite.checkAvailableTSO()
for addr, s := range serverMap {
for addr := range tc.GetServers() {
if addr != oldPrimary {
s.Close()
delete(serverMap, addr)
tc.DestroyServer(addr)
}
}
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
tc.WaitForDefaultPrimaryServing(re)
time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout
primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName)
suite.True(exist)
suite.Equal(oldPrimary, primary)
re.True(exist)
re.Equal(oldPrimary, primary)
suite.checkAvailableTSO()
}

Expand Down