Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add bootstrap test #6347

Merged
merged 7 commits into from
Apr 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 87 additions & 31 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tso

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand All @@ -30,8 +31,10 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/discovery"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -316,53 +319,106 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() {
suite.NoError(err)
}

func TestAdvertiseAddr(t *testing.T) {
re := require.New(t)
type CommonTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
pdLeader *tests.TestServer
tsoPrimary *tso.Server
backendEndpoints string
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
defer cluster.Destroy()
re.NoError(err)
func TestCommonTestSuite(t *testing.T) {
suite.Run(t, new(CommonTestSuite))
}

err = cluster.RunInitialServers()
func (suite *CommonTestSuite) SetupSuite() {
var err error
re := suite.Require()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1)
re.NoError(err)

leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)
err = suite.cluster.RunInitialServers()
re.NoError(err)

u := tempurl.Alloc()
s, cleanup := mcs.StartSingleTSOTestServer(ctx, re, leader.GetAddr(), u)
defer cleanup()
leaderName := suite.cluster.WaitLeader()
suite.pdLeader = suite.cluster.GetServer(leaderName)
suite.backendEndpoints = suite.pdLeader.GetAddr()
suite.NoError(suite.pdLeader.BootstrapCluster())

tsoServerConf := s.GetConfig()
re.Equal(u, tsoServerConf.AdvertiseListenAddr)
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
suite.NoError(err)
suite.tsoCluster.WaitForDefaultPrimaryServing(re)
suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
}

func TestMetrics(t *testing.T) {
re := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
defer cluster.Destroy()
re.NoError(err)
func (suite *CommonTestSuite) TearDownSuite() {
suite.tsoCluster.Destroy()
etcdClient := suite.pdLeader.GetEtcdClient()
clusterID := strconv.FormatUint(suite.pdLeader.GetClusterID(), 10)
endpoints, err := discovery.Discover(etcdClient, clusterID, utils.TSOServiceName)
suite.NoError(err)
if len(endpoints) != 0 {
endpoints, err = discovery.Discover(etcdClient, clusterID, utils.TSOServiceName)
suite.NoError(err)
suite.Empty(endpoints)
}
suite.cluster.Destroy()
suite.cancel()
}

err = cluster.RunInitialServers()
re.NoError(err)
func (suite *CommonTestSuite) TestAdvertiseAddr() {
re := suite.Require()

leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)
conf := suite.tsoPrimary.GetConfig()
re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr())
}

u := tempurl.Alloc()
s, cleanup := mcs.StartSingleTSOTestServer(ctx, re, leader.GetAddr(), u)
defer cleanup()
func (suite *CommonTestSuite) TestMetrics() {
re := suite.Require()

resp, err := http.Get(s.GetConfig().GetAdvertiseListenAddr() + "/metrics")
resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics")
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
respBytes, err := io.ReadAll(resp.Body)
re.NoError(err)
re.Contains(string(respBytes), "tso_server_info")
}

func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() {
re := suite.Require()

// check the default keyspace group
check := func() {
resp, err := http.Get(suite.pdLeader.GetServer().GetConfig().AdvertiseClientUrls + "/pd/api/v2/tso/keyspace-groups")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This RESTful api returns keyspace groups by loading from etcd storage whenever it is invoked? If so, it might impact API server's performance when it's being processed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of range of this pr. if could, we just have short discussion about it in this thread, but please ignore it for this pr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, it is. An optional choice is to return the group cached in the group manager.

re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
respString, err := io.ReadAll(resp.Body)
re.NoError(err)
var kgs []*endpoint.KeyspaceGroup
re.NoError(json.Unmarshal(respString, &kgs))
re.Len(kgs, 1)
re.Equal(utils.DefaultKeyspaceGroupID, kgs[0].ID)
re.Equal(endpoint.Basic.String(), kgs[0].UserKind)
re.Empty(kgs[0].SplitState)
re.Empty(kgs[0].Members)
re.Empty(kgs[0].KeyspaceLookupTable)
}
check()

s, err := suite.cluster.JoinAPIServer(suite.ctx)
re.NoError(err)
re.NoError(s.Run())

// transfer leader to the new server
suite.pdLeader.ResignLeader()
suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader())
check()
suite.pdLeader.ResignLeader()
suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader())
}