From e45f588c9e87b0e90a8ba73283ac6bd1228b5327 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 24 Apr 2023 18:03:27 +0800 Subject: [PATCH] Add API interface to obtain the TSO keyspace group member info Signed-off-by: JmPotato --- pkg/mcs/tso/server/apis/v1/api.go | 70 +++++++++--- pkg/mcs/tso/server/server.go | 5 + pkg/tso/keyspace_group_manager.go | 14 +++ .../apiutil/multiservicesapi/middleware.go | 9 +- tests/integrations/mcs/tso/api_test.go | 106 ++++++++++++++++++ 5 files changed, 186 insertions(+), 18 deletions(-) create mode 100644 tests/integrations/mcs/tso/api_test.go diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index af269cc95da7..87ce20b8e37f 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -22,16 +22,22 @@ import ( "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" "github.com/joho/godotenv" + "github.com/pingcap/kvproto/pkg/tsopb" + "github.com/pingcap/log" tsoserver "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/unrolled/render" + "go.uber.org/zap" ) -// APIPathPrefix is the prefix of the API path. -const APIPathPrefix = "/tso/api/v1" +const ( + // APIPathPrefix is the prefix of the API path. + APIPathPrefix = "/tso/api/v1" +) var ( once sync.Once @@ -46,14 +52,14 @@ var ( func init() { tsoserver.SetUpRestHandler = func(srv *tsoserver.Service) (http.Handler, apiutil.APIServiceGroup) { s := NewService(srv) - return s.handler(), apiServiceGroup + return s.apiHandlerEngine, apiServiceGroup } } // Service is the tso service. type Service struct { apiHandlerEngine *gin.Engine - baseEndpoint *gin.RouterGroup + root *gin.RouterGroup srv *tsoserver.Service rd *render.Render @@ -77,30 +83,64 @@ func NewService(srv *tsoserver.Service) *Service { apiHandlerEngine.Use(cors.Default()) apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) apiHandlerEngine.Use(func(c *gin.Context) { - c.Set("service", srv) + c.Set(multiservicesapi.ServiceContextKey, srv) c.Next() }) apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) apiHandlerEngine.GET("metrics", utils.PromHandler()) - endpoint := apiHandlerEngine.Group(APIPathPrefix) + root := apiHandlerEngine.Group(APIPathPrefix) s := &Service{ srv: srv, apiHandlerEngine: apiHandlerEngine, - baseEndpoint: endpoint, + root: root, rd: createIndentRender(), } - s.RegisterRouter() + s.RegisterAdminRouter() + s.RegisterKeyspaceGroupRouter() return s } -// RegisterRouter registers the router of the service. -func (s *Service) RegisterRouter() { +// RegisterAdminRouter registers the router of the TSO admin handler. +func (s *Service) RegisterAdminRouter() { + router := s.root.Group("admin") tsoAdminHandler := tso.NewAdminHandler(s.srv.GetHandler(), s.rd) - s.baseEndpoint.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS)) + router.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS)) } -func (s *Service) handler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s.apiHandlerEngine.ServeHTTP(w, r) - }) +// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler. +func (s *Service) RegisterKeyspaceGroupRouter() { + router := s.root.Group("keyspace-groups") + router.GET("/members", GetKeyspaceGroupMembers) +} + +// KeyspaceGroupMember contains the keyspace group and its member information. +type KeyspaceGroupMember struct { + Group *endpoint.KeyspaceGroup + Member *tsopb.Participant + IsPrimary bool `json:"is_primary"` + PrimaryID uint64 `json:"primary_id"` +} + +// GetKeyspaceGroupMembers gets the keyspace group members that the TSO service is serving. +func GetKeyspaceGroupMembers(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + kgm := svr.GetKeyspaceGroupManager() + keyspaceGroups := kgm.GetKeyspaceGroups() + members := make(map[uint32]*KeyspaceGroupMember, len(keyspaceGroups)) + for id, group := range keyspaceGroups { + am, err := kgm.GetAllocatorManager(id) + if err != nil { + log.Error("failed to get allocator manager", + zap.Uint32("keyspace-group-id", id), zap.Error(err)) + continue + } + member := am.GetMember() + members[id] = &KeyspaceGroupMember{ + Group: group, + Member: member.GetMember().(*tsopb.Participant), + IsPrimary: member.IsLeader(), + PrimaryID: member.GetLeaderID(), + } + } + c.IndentedJSON(http.StatusOK, members) } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 36207ebf4e0e..801602815596 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -274,6 +274,11 @@ func (s *Server) IsClosed() bool { return atomic.LoadInt64(&s.isRunning) == 0 } +// GetKeyspaceGroupManager returns the manager of keyspace group. +func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager { + return s.keyspaceGroupManager +} + // GetTSOAllocatorManager returns the manager of TSO Allocator. func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) { return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 9370e8664d1e..14347e6214ab 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -742,6 +742,20 @@ func (kgm *KeyspaceGroupManager) GetElectionMember( return am.GetMember(), nil } +// GetKeyspaceGroups returns all keyspace groups managed by the current keyspace group manager. +func (kgm *KeyspaceGroupManager) GetKeyspaceGroups() map[uint32]*endpoint.KeyspaceGroup { + kgm.RLock() + defer kgm.RUnlock() + keyspaceGroups := make(map[uint32]*endpoint.KeyspaceGroup) + for _, keyspaceGroupID := range kgm.keyspaceLookupTable { + if _, ok := keyspaceGroups[keyspaceGroupID]; ok { + continue + } + keyspaceGroups[keyspaceGroupID] = kgm.kgs[keyspaceGroupID] + } + return keyspaceGroups +} + // HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group. func (kgm *KeyspaceGroupManager) HandleTSORequest( keyspaceID, keyspaceGroupID uint32, diff --git a/pkg/utils/apiutil/multiservicesapi/middleware.go b/pkg/utils/apiutil/multiservicesapi/middleware.go index 4f889b52573a..ed34ecc6afb3 100644 --- a/pkg/utils/apiutil/multiservicesapi/middleware.go +++ b/pkg/utils/apiutil/multiservicesapi/middleware.go @@ -26,16 +26,19 @@ import ( "go.uber.org/zap" ) -// HTTP headers. const ( + // ServiceAllowDirectHandle is the header key to allow direct handle. ServiceAllowDirectHandle = "service-allow-direct-handle" - ServiceRedirectorHeader = "service-redirector" + // ServiceRedirectorHeader is the header key to indicate the request is redirected. + ServiceRedirectorHeader = "service-redirector" + // ServiceContextKey is the key to get service server from gin.Context. + ServiceContextKey = "service" ) // ServiceRedirector is a middleware to redirect the request to the right place. func ServiceRedirector() gin.HandlerFunc { return func(c *gin.Context) { - svr := c.MustGet("service").(bs.Server) + svr := c.MustGet(ServiceContextKey).(bs.Server) allowDirectHandle := len(c.Request.Header.Get(ServiceAllowDirectHandle)) > 0 isServing := svr.IsServing() if allowDirectHandle || isServing { diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go new file mode 100644 index 000000000000..e5dbcd7998c5 --- /dev/null +++ b/tests/integrations/mcs/tso/api_test.go @@ -0,0 +1,106 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "context" + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + tso "github.com/tikv/pd/pkg/mcs/tso/server" + apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/integrations/mcs" +) + +const ( + tsoKeyspaceGroupsPrefix = "/tso/api/v1/keyspace-groups" +) + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +type tsoAPITestSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + pdCluster *tests.TestCluster + tsoCluster *mcs.TestTSOCluster +} + +func TestTSOAPI(t *testing.T) { + suite.Run(t, new(tsoAPITestSuite)) +} + +func (suite *tsoAPITestSuite) SetupTest() { + re := suite.Require() + + var err error + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.pdCluster, err = tests.NewTestAPICluster(suite.ctx, 1) + re.NoError(err) + err = suite.pdCluster.RunInitialServers() + re.NoError(err) + leaderName := suite.pdCluster.WaitLeader() + pdLeaderServer := suite.pdCluster.GetServer(leaderName) + re.NoError(pdLeaderServer.BootstrapCluster()) + suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr()) + re.NoError(err) +} + +func (suite *tsoAPITestSuite) TearDownTest() { + suite.cancel() + suite.tsoCluster.Destroy() + suite.pdCluster.Destroy() +} + +func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() { + re := suite.Require() + + primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re) + re.NotNil(primary) + members := mustGetKeyspaceGroupMembers(re, primary) + re.Len(members, 1) + defaultGroupMember := members[mcsutils.DefaultKeyspaceGroupID] + re.NotNil(defaultGroupMember) + re.Equal(mcsutils.DefaultKeyspaceGroupID, defaultGroupMember.Group.ID) + re.True(defaultGroupMember.IsPrimary) + primaryMember, err := primary.GetMember(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + re.Equal(primaryMember.GetLeaderID(), defaultGroupMember.PrimaryID) +} + +func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember { + httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+tsoKeyspaceGroupsPrefix+"/members", nil) + re.NoError(err) + httpResp, err := dialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + data, err := io.ReadAll(httpResp.Body) + re.NoError(err) + re.Equal(http.StatusOK, httpResp.StatusCode, string(data)) + var resp map[uint32]*apis.KeyspaceGroupMember + re.NoError(json.Unmarshal(data, &resp)) + return resp +}