Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs, tso: add API interface to obtain the TSO keyspace group member info #6373

Merged
merged 4 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ import (
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
"go.uber.org/zap"
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/tso/api/v1"
const (
// APIPathPrefix is the prefix of the API path.
APIPathPrefix = "/tso/api/v1"
)

var (
once sync.Once
Expand All @@ -46,14 +52,14 @@ var (
func init() {
tsoserver.SetUpRestHandler = func(srv *tsoserver.Service) (http.Handler, apiutil.APIServiceGroup) {
s := NewService(srv)
return s.handler(), apiServiceGroup
return s.apiHandlerEngine, apiServiceGroup
}
}

// Service is the tso service.
type Service struct {
apiHandlerEngine *gin.Engine
baseEndpoint *gin.RouterGroup
root *gin.RouterGroup

srv *tsoserver.Service
rd *render.Render
Expand All @@ -77,30 +83,64 @@ func NewService(srv *tsoserver.Service) *Service {
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set("service", srv)
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
endpoint := apiHandlerEngine.Group(APIPathPrefix)
root := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
srv: srv,
apiHandlerEngine: apiHandlerEngine,
baseEndpoint: endpoint,
root: root,
rd: createIndentRender(),
}
s.RegisterRouter()
s.RegisterAdminRouter()
s.RegisterKeyspaceGroupRouter()
return s
}

// RegisterRouter registers the router of the service.
func (s *Service) RegisterRouter() {
// RegisterAdminRouter registers the router of the TSO admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
tsoAdminHandler := tso.NewAdminHandler(s.srv.GetHandler(), s.rd)
s.baseEndpoint.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
router.POST("/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
}

func (s *Service) handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.apiHandlerEngine.ServeHTTP(w, r)
})
// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler.
func (s *Service) RegisterKeyspaceGroupRouter() {
router := s.root.Group("keyspace-groups")
router.GET("/members", GetKeyspaceGroupMembers)
}

// KeyspaceGroupMember contains the keyspace group and its member information.
type KeyspaceGroupMember struct {
Copy link
Contributor

@binshi-bing binshi-bing Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The information that I plan to get from every tso server includes:

  1. TSO nodes.
  2. All keyspace groups; the full member/replica (distribution) list of every keyspace group; the primary of every keyspace group.

Every tso node can return the same result (global) picture with this API, which can't achieved by this pr, because of two reasons:

  1. A tso node mightn't allocate AllocatorManager for every keyspace group, especially if there are more than 2 tso nodes
  2. A member only knows itself and the primary, i.e., if it is the primary, it doesn't know the secondary.

To return the required information, every tso node needs to watch tso primary addresses of all keyspace groups and watch all registered tso nodes/pods, which we can add after Haoyang completes the refactor work of the initial loading of the range and dynamic watch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss about it so that we are on the same page.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to return its keyspaces?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endpoint.KeyspaceGroup contains keyspace list, which shall be enough.

Group *endpoint.KeyspaceGroup
Member *tsopb.Participant
IsPrimary bool `json:"is_primary"`
PrimaryID uint64 `json:"primary_id"`
}

// GetKeyspaceGroupMembers gets the keyspace group members that the TSO service is serving.
func GetKeyspaceGroupMembers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
kgm := svr.GetKeyspaceGroupManager()
keyspaceGroups := kgm.GetKeyspaceGroups()
members := make(map[uint32]*KeyspaceGroupMember, len(keyspaceGroups))
for id, group := range keyspaceGroups {
am, err := kgm.GetAllocatorManager(id)
if err != nil {
log.Error("failed to get allocator manager",
zap.Uint32("keyspace-group-id", id), zap.Error(err))
continue
}
member := am.GetMember()
members[id] = &KeyspaceGroupMember{
Group: group,
Member: member.GetMember().(*tsopb.Participant),
IsPrimary: member.IsLeader(),
PrimaryID: member.GetLeaderID(),
}
}
c.IndentedJSON(http.StatusOK, members)
}
5 changes: 5 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (s *Server) IsClosed() bool {
return atomic.LoadInt64(&s.isRunning) == 0
}

// GetKeyspaceGroupManager returns the manager of keyspace group.
func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager {
return s.keyspaceGroupManager
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) {
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
Expand Down
14 changes: 14 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,20 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
return am.GetMember(), nil
}

// GetKeyspaceGroups returns all keyspace groups managed by the current keyspace group manager.
func (kgm *KeyspaceGroupManager) GetKeyspaceGroups() map[uint32]*endpoint.KeyspaceGroup {
kgm.RLock()
defer kgm.RUnlock()
keyspaceGroups := make(map[uint32]*endpoint.KeyspaceGroup)
for _, keyspaceGroupID := range kgm.keyspaceLookupTable {
if _, ok := keyspaceGroups[keyspaceGroupID]; ok {
continue
}
keyspaceGroups[keyspaceGroupID] = kgm.kgs[keyspaceGroupID]
}
return keyspaceGroups
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group.
func (kgm *KeyspaceGroupManager) HandleTSORequest(
keyspaceID, keyspaceGroupID uint32,
Expand Down
9 changes: 6 additions & 3 deletions pkg/utils/apiutil/multiservicesapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"go.uber.org/zap"
)

// HTTP headers.
const (
// ServiceAllowDirectHandle is the header key to allow direct handle.
ServiceAllowDirectHandle = "service-allow-direct-handle"
ServiceRedirectorHeader = "service-redirector"
// ServiceRedirectorHeader is the header key to indicate the request is redirected.
ServiceRedirectorHeader = "service-redirector"
// ServiceContextKey is the key to get service server from gin.Context.
ServiceContextKey = "service"
)

// ServiceRedirector is a middleware to redirect the request to the right place.
func ServiceRedirector() gin.HandlerFunc {
return func(c *gin.Context) {
svr := c.MustGet("service").(bs.Server)
svr := c.MustGet(ServiceContextKey).(bs.Server)
allowDirectHandle := len(c.Request.Header.Get(ServiceAllowDirectHandle)) > 0
isServing := svr.IsServing()
if allowDirectHandle || isServing {
Expand Down
106 changes: 106 additions & 0 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"encoding/json"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

const (
tsoKeyspaceGroupsPrefix = "/tso/api/v1/keyspace-groups"
)

// dialClient used to dial http request.
var dialClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}

type tsoAPITestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
}

func TestTSOAPI(t *testing.T) {
suite.Run(t, new(tsoAPITestSuite))
}

func (suite *tsoAPITestSuite) SetupTest() {
re := suite.Require()

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.pdCluster, err = tests.NewTestAPICluster(suite.ctx, 1)
re.NoError(err)
err = suite.pdCluster.RunInitialServers()
re.NoError(err)
leaderName := suite.pdCluster.WaitLeader()
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
re.NoError(err)
}

func (suite *tsoAPITestSuite) TearDownTest() {
suite.cancel()
suite.tsoCluster.Destroy()
suite.pdCluster.Destroy()
}

func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() {
re := suite.Require()

primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
members := mustGetKeyspaceGroupMembers(re, primary)
re.Len(members, 1)
defaultGroupMember := members[mcsutils.DefaultKeyspaceGroupID]
re.NotNil(defaultGroupMember)
re.Equal(mcsutils.DefaultKeyspaceGroupID, defaultGroupMember.Group.ID)
re.True(defaultGroupMember.IsPrimary)
primaryMember, err := primary.GetMember(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID)
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil)
re.NoError(err)
httpResp, err := dialClient.Do(httpReq)
re.NoError(err)
defer httpResp.Body.Close()
data, err := io.ReadAll(httpResp.Body)
re.NoError(err)
re.Equal(http.StatusOK, httpResp.StatusCode, string(data))
var resp map[uint32]*apis.KeyspaceGroupMember
re.NoError(json.Unmarshal(data, &resp))
return resp
}
1 change: 1 addition & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
// Ignore the errors caused by the split and context cancellation.
if strings.Contains(errMsg, "context canceled") ||
strings.Contains(errMsg, "not leader") ||
strings.Contains(errMsg, "not served") ||
strings.Contains(errMsg, "ErrKeyspaceNotAssigned") {
continue
}
Expand Down