Skip to content

Commit

Permalink
Add more tests code
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Apr 19, 2023
1 parent ce68621 commit be55f59
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 76 deletions.
9 changes: 9 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ func (s *Server) GetLeaderListenUrls() []string {
return member.GetLeaderListenUrls()
}

// GetMember returns the election member of the given keyspace and keyspace group.
func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMember, error) {
member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID)
if err != nil {
return nil, err
}
return member, nil
}

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
Expand Down
43 changes: 27 additions & 16 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/stretchr/testify/require"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
)

// TestCluster is a test cluster for TSO.
type TestCluster struct {
// TestTSOCluster is a test cluster for TSO.
type TestTSOCluster struct {
ctx context.Context

backendEndpoints string
Expand All @@ -35,8 +36,8 @@ type TestCluster struct {
}

// NewTestTSOCluster creates a new TSO test cluster.
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestCluster, err error) {
tc = &TestCluster{
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestTSOCluster, err error) {
tc = &TestTSOCluster{
ctx: ctx,
backendEndpoints: backendEndpoints,
servers: make(map[string]*tso.Server, initialServerCount),
Expand All @@ -52,7 +53,7 @@ func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpo
}

// AddServer adds a new TSO server to the test cluster.
func (tc *TestCluster) AddServer(addr string) error {
func (tc *TestTSOCluster) AddServer(addr string) error {
cfg := tso.NewConfig()
cfg.BackendEndpoints = tc.backendEndpoints
cfg.ListenAddr = addr
Expand All @@ -75,7 +76,7 @@ func (tc *TestCluster) AddServer(addr string) error {
}

// Destroy stops and destroy the test cluster.
func (tc *TestCluster) Destroy() {
func (tc *TestTSOCluster) Destroy() {
for _, cleanup := range tc.cleanupFuncs {
cleanup()
}
Expand All @@ -84,14 +85,14 @@ func (tc *TestCluster) Destroy() {
}

// DestroyServer stops and destroy the test server by the given address.
func (tc *TestCluster) DestroyServer(addr string) {
func (tc *TestTSOCluster) DestroyServer(addr string) {
tc.cleanupFuncs[addr]()
delete(tc.cleanupFuncs, addr)
delete(tc.servers, addr)
}

// GetPrimary returns the primary TSO server.
func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
Expand All @@ -101,12 +102,12 @@ func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Serve
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader of the given keyspace.
func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) string {
var primary string
func (tc *TestTSOCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) *tso.Server {
var primary *tso.Server
testutil.Eventually(re, func() bool {
for name, s := range tc.servers {
if s.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = name
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = server
return true
}
}
Expand All @@ -117,12 +118,12 @@ func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID,
}

// WaitForDefaultPrimaryServing waits for one of servers being elected to be the primary/leader of the default keyspace.
func (tc *TestCluster) WaitForDefaultPrimaryServing(re *require.Assertions) string {
func (tc *TestTSOCluster) WaitForDefaultPrimaryServing(re *require.Assertions) *tso.Server {
return tc.WaitForPrimaryServing(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
}

// GetServer returns the TSO server by the given address.
func (tc *TestCluster) GetServer(addr string) *tso.Server {
func (tc *TestTSOCluster) GetServer(addr string) *tso.Server {
for srvAddr, server := range tc.servers {
if srvAddr == addr {
return server
Expand All @@ -132,6 +133,16 @@ func (tc *TestCluster) GetServer(addr string) *tso.Server {
}

// GetServers returns all TSO servers.
func (tc *TestCluster) GetServers() map[string]*tso.Server {
func (tc *TestTSOCluster) GetServers() map[string]*tso.Server {
return tc.servers
}

// GetKeyspaceGroupMember converts the TSO servers to KeyspaceGroupMember and returns.
func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceGroupMember) {
for _, server := range tc.servers {
members = append(members, endpoint.KeyspaceGroupMember{
Address: server.GetAddr(),
})
}
return
}
158 changes: 98 additions & 60 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,24 @@ package tso

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/testutil"
"github.com/tikv/pd/pkg/election"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
"google.golang.org/grpc"
)

type tsoKeyspaceGroupManagerTestSuite struct {
Expand All @@ -48,13 +46,8 @@ type tsoKeyspaceGroupManagerTestSuite struct {
cluster *tests.TestCluster
// pdLeaderServer is the leader server of the PD cluster.
pdLeaderServer *tests.TestServer
// tsoServer is the TSO service provider.
// TODO: use TSO cluster instead.
tsoServer *tso.Server
tsoServerCleanup func()
tsoClientConn *grpc.ClientConn

tsoClient tsopb.TSOClient
// tsoCluster is the TSO service cluster.
tsoCluster *mcs.TestTSOCluster
}

func TestTSOKeyspaceGroupManager(t *testing.T) {
Expand All @@ -73,33 +66,35 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
re.NoError(suite.pdLeaderServer.BootstrapCluster())
backendEndpoints := suite.pdLeaderServer.GetAddr()
suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc())
suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
re.NoError(err)
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() {
suite.cancel()
suite.tsoClientConn.Close()
suite.tsoServerCleanup()
suite.tsoCluster.Destroy()
suite.cluster.Destroy()
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() {
cleanupKeyspaceGroups(suite.Require(), suite.pdLeaderServer)
}

func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) {
for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") {
handlersutil.MustDeleteKeyspaceGroup(re, server, group.ID)
}
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
UserKind: endpoint.Standard.String(),
Members: []endpoint.KeyspaceGroupMember{{Address: suite.tsoServer.GetAddr()}},
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
Expand All @@ -109,15 +104,17 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Get a TSO from the keyspace group 1.
var ts *pdpb.Timestamp
var (
ts pdpb.Timestamp
err error
)
testutil.Eventually(re, func() bool {
resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 1)
ts = resp.GetTimestamp()
return err == nil && tsoutil.CompareTimestamp(ts, &pdpb.Timestamp{}) > 0
ts, err = suite.requestTSO(re, 1, 222, 1)
return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err := suite.tsoServer.GetHandler().ResetTS(tsoutil.GenerateTS(ts), false, true, 1)
err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
re.NoError(err)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
Expand All @@ -129,42 +126,25 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
re.True(kg2.IsSplitTarget())
// Check the split TSO from keyspace group 2.
var splitTS *pdpb.Timestamp
var splitTS pdpb.Timestamp
testutil.Eventually(re, func() bool {
resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 2)
splitTS = resp.GetTimestamp()
return err == nil && tsoutil.CompareTimestamp(splitTS, &pdpb.Timestamp{}) > 0
splitTS, err = suite.requestTSO(re, 1, 222, 2)
return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0
})
re.Greater(tsoutil.CompareTimestamp(splitTS, ts), 0)
re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0)
// Finish the split.
handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2)
}

func request(
func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO(
re *require.Assertions,
ctx context.Context, client tsopb.TSOClient, count uint32,
clusterID uint64, keyspaceID, keyspaceGroupID uint32,
) (ts *tsopb.TsoResponse, err error) {
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
KeyspaceId: keyspaceID,
KeyspaceGroupId: keyspaceGroupID,
},
DcLocation: tsopkg.GlobalDCLocation,
Count: count,
}
tsoClient, err := client.Tso(ctx)
count, keyspaceID, keyspaceGroupID uint32,
) (pdpb.Timestamp, error) {
primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID)
tam, err := primary.GetTSOAllocatorManager(keyspaceGroupID)
re.NoError(err)
defer tsoClient.CloseSend()
re.NoError(tsoClient.Send(req))
return tsoClient.Recv()
}

func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) {
for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") {
handlersutil.MustDeleteKeyspaceGroup(re, server, group.ID)
}
re.NotNil(tam)
return tam.HandleRequest(tsopkg.GlobalDCLocation, count)
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() {
Expand All @@ -175,7 +155,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection
{
ID: 1,
UserKind: endpoint.Standard.String(),
Members: []endpoint.KeyspaceGroupMember{{Address: suite.tsoServer.GetAddr()}},
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
Expand All @@ -194,15 +174,11 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
re.True(kg2.IsSplitTarget())
// Check the leadership.
tam1, err := suite.tsoServer.GetTSOAllocatorManager(1)
re.NoError(err)
re.NotNil(tam1)
tam2, err := suite.tsoServer.GetTSOAllocatorManager(2)
member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, 1).GetMember(111, 1)
re.NoError(err)
re.NotNil(tam2)
member1 := tam1.GetMember()
re.NotNil(member1)
member2 := tam2.GetMember()
member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2)
re.NoError(err)
re.NotNil(member2)
// Wait for the leader of the keyspace group 1 and 2 to be elected.
testutil.Eventually(re, func() bool {
Expand All @@ -228,3 +204,65 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection
// Finish the split.
handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2)
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() {
// TODO: remove the skip after the client is able to support multi-keyspace-group.
suite.T().SkipNow()

re := suite.Require()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Prepare the client for keyspace 222.
var tsoClient pd.TSOClient
tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{})
re.NoError(err)
re.NotNil(tsoClient)
// Request the TSO for keyspace 222 concurrently.
var (
wg sync.WaitGroup
ctx, cancel = context.WithCancel(suite.ctx)
lastPhysical, lastLogical int64
)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
physical, logical, err := tsoClient.GetTS(ctx)
re.NoError(err)
re.Greater(physical, lastPhysical)
re.Greater(logical, lastLogical)
lastPhysical, lastLogical = physical, logical
}
}()
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
Keyspaces: []uint32{222, 333},
})
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
re.True(kg2.IsSplitTarget())
// Finish the split.
handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2)
cancel()
wg.Wait()
}

0 comments on commit be55f59

Please sign in to comment.