Skip to content

Commit

Permalink
Merge branch 'master' into wait
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored May 9, 2023
2 parents c8c9068 + 0965d12 commit 6aa0601
Show file tree
Hide file tree
Showing 22 changed files with 1,008 additions and 629 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230209052558-a58fc2a7e924
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230209052558-a58fc2a7e924 h1:49x3JR5zEYqjVqONKV9r/nrv0Rh5QU8ivIhktoLvP4g=
github.com/pingcap/tidb-dashboard v0.0.0-20230209052558-a58fc2a7e924/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5 h1:adV4kUWI7v+v/6joR7lfjFngHhS4eiqwr4g3dHCjHtA=
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
167 changes: 42 additions & 125 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

Expand All @@ -40,10 +41,6 @@ const (
allocNodesToKeyspaceGroupsInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
retryInterval = 100 * time.Millisecond
)

const (
Expand All @@ -65,18 +62,14 @@ type GroupManager struct {
// store is the storage for keyspace group related information.
store endpoint.KeyspaceGroupStorage

client *clientv3.Client

// tsoServiceKey is the path of TSO service in etcd.
tsoServiceKey string
// tsoServiceEndKey is the end key of TSO service in etcd.
tsoServiceEndKey string

// TODO: add user kind with different balancer
// when we ensure where the correspondence between tso node and user kind will be found
// nodeBalancer is the balancer for tso nodes.
// TODO: add user kind with different balancer when we ensure where the correspondence between tso node and user kind will be found
nodesBalancer balancer.Balancer[string]
// serviceRegistryMap stores the mapping from the service registry key to the service address.
// Note: it is only used in tsoNodesWatcher.
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
Expand All @@ -87,7 +80,6 @@ func NewKeyspaceGroupManager(
clusterID uint64,
) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
key := discovery.TSOPath(clusterID)
groups := make(map[endpoint.UserKind]*indexedHeap)
for i := 0; i < int(endpoint.UserKindCount); i++ {
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
Expand All @@ -96,20 +88,18 @@ func NewKeyspaceGroupManager(
ctx: ctx,
cancel: cancel,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
groups: groups,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if m.client != nil {
log.Info("start the watch loop for tso service discovery")
m.wg.Add(1)
go m.startWatchLoop(ctx)
if client != nil {
m.initTSONodesWatcher(client, clusterID)
m.wg.Add(2)
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
Expand All @@ -130,12 +120,6 @@ func (m *GroupManager) Bootstrap() error {
m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand Down Expand Up @@ -200,109 +184,42 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
}
}

func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
var (
resp *clientv3.GetResponse
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision + 1
for _, item := range resp.Kvs {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(item.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry", zap.Error(err))
continue
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(item.Key)] = s.ServiceAddr
}
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
}
for {
select {
case <-ctx.Done():
return
default:
}
nextRevision, err := m.watchServiceAddrs(ctx, revision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
revision = nextRevision
time.Sleep(watchEtcdChangeRetryInterval)
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
delete(m.serviceRegistryMap, key)
m.nodesBalancer.Delete(serviceAddr)
return nil
}
return errors.Errorf("failed to find the service address for key %s", key)
}
}

func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (int64, error) {
watcher := clientv3.NewWatcher(m.client)
defer watcher.Close()
for {
WatchChan:
watchChan := watcher.Watch(ctx, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey), clientv3.WithRev(revision))
select {
case <-ctx.Done():
return revision, nil
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, the watcher will watch again with the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
goto WatchChan
}
if wresp.Err() != nil {
log.Error("watch is canceled or closed",
zap.Int64("required-revision", revision),
zap.Error(wresp.Err()))
return revision, wresp.Err()
}
for _, event := range wresp.Events {
switch event.Type {
case clientv3.EventTypePut:
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(event.Kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(event.Kv.Key)), zap.Error(err))
break
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(event.Kv.Key)] = s.ServiceAddr
case clientv3.EventTypeDelete:
key := string(event.Kv.Key)
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
delete(m.serviceRegistryMap, key)
m.nodesBalancer.Delete(serviceAddr)
} else {
log.Warn("can't retrieve service addr from service registry map",
zap.String("event-kv-key", key))
}
}
}
revision = wresp.Header.Revision + 1
}
}
m.tsoNodesWatcher = etcdutil.NewLoopWatcher(
m.ctx,
&m.wg,
client,
"tso-nodes-watcher",
tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)
}

// CreateKeyspaceGroups creates keyspace groups.
Expand Down
70 changes: 55 additions & 15 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ import (
"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
"go.uber.org/zap"
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/tso/api/v1"
const (
// APIPathPrefix is the prefix of the API path.
APIPathPrefix = "/tso/api/v1"
)

var (
once sync.Once
Expand All @@ -46,14 +52,14 @@ var (
func init() {
tsoserver.SetUpRestHandler = func(srv *tsoserver.Service) (http.Handler, apiutil.APIServiceGroup) {
s := NewService(srv)
return s.handler(), apiServiceGroup
return s.apiHandlerEngine, apiServiceGroup
}
}

// Service is the tso service.
type Service struct {
apiHandlerEngine *gin.Engine
baseEndpoint *gin.RouterGroup
root *gin.RouterGroup

srv *tsoserver.Service
rd *render.Render
Expand All @@ -77,30 +83,64 @@ func NewService(srv *tsoserver.Service) *Service {
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set("service", srv)
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
endpoint := apiHandlerEngine.Group(APIPathPrefix)
root := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
srv: srv,
apiHandlerEngine: apiHandlerEngine,
baseEndpoint: endpoint,
root: root,
rd: createIndentRender(),
}
s.RegisterRouter()
s.RegisterAdminRouter()
s.RegisterKeyspaceGroupRouter()
return s
}

// RegisterRouter registers the router of the service.
func (s *Service) RegisterRouter() {
// RegisterAdminRouter registers the router of the TSO admin handler.
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
tsoAdminHandler := tso.NewAdminHandler(s.srv.GetHandler(), s.rd)
s.baseEndpoint.POST("/admin/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
router.POST("/reset-ts", gin.WrapF(tsoAdminHandler.ResetTS))
}

func (s *Service) handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.apiHandlerEngine.ServeHTTP(w, r)
})
// RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler.
func (s *Service) RegisterKeyspaceGroupRouter() {
router := s.root.Group("keyspace-groups")
router.GET("/members", GetKeyspaceGroupMembers)
}

// KeyspaceGroupMember contains the keyspace group and its member information.
type KeyspaceGroupMember struct {
Group *endpoint.KeyspaceGroup
Member *tsopb.Participant
IsPrimary bool `json:"is_primary"`
PrimaryID uint64 `json:"primary_id"`
}

// GetKeyspaceGroupMembers gets the keyspace group members that the TSO service is serving.
func GetKeyspaceGroupMembers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
kgm := svr.GetKeyspaceGroupManager()
keyspaceGroups := kgm.GetKeyspaceGroups()
members := make(map[uint32]*KeyspaceGroupMember, len(keyspaceGroups))
for id, group := range keyspaceGroups {
am, err := kgm.GetAllocatorManager(id)
if err != nil {
log.Error("failed to get allocator manager",
zap.Uint32("keyspace-group-id", id), zap.Error(err))
continue
}
member := am.GetMember()
members[id] = &KeyspaceGroupMember{
Group: group,
Member: member.GetMember().(*tsopb.Participant),
IsPrimary: member.IsLeader(),
PrimaryID: member.GetLeaderID(),
}
}
c.IndentedJSON(http.StatusOK, members)
}
5 changes: 5 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (s *Server) IsClosed() bool {
return atomic.LoadInt64(&s.isRunning) == 0
}

// GetKeyspaceGroupManager returns the manager of keyspace group.
func (s *Server) GetKeyspaceGroupManager() *tso.KeyspaceGroupManager {
return s.keyspaceGroupManager
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) {
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
Expand Down
Loading

0 comments on commit 6aa0601

Please sign in to comment.