From aae410f16f8a16d873a8aad87acee3559db5e321 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Sun, 28 Apr 2024 15:44:57 +0800 Subject: [PATCH 1/3] client/tso: improve the switching of TSO stream (#8123) close tikv/pd#7997, ref tikv/pd#8047 Previously, without enabling the TSO Follower Proxy, we only passively update its stream when a TSO request fails. This means that we cannot automatically and gradually complete the TSO stream update after a service switch. This PR strengthens this logic, which can improve the success rate of TSO requests during service switching. Signed-off-by: JmPotato --- client/tso_client.go | 68 +++++---- client/tso_dispatcher.go | 181 ++++++++++++----------- tests/integrations/client/client_test.go | 35 +++++ 3 files changed, 169 insertions(+), 115 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 347d1f6ec0a..e3bdb835901 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -178,6 +178,14 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe return req } +func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) { + dispatcher, ok := c.tsoDispatcher.Load(dcLocation) + if !ok || dispatcher == nil { + return nil, false + } + return dispatcher.(*tsoDispatcher), true +} + // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map func (c *tsoClient) GetTSOAllocators() *sync.Map { return &c.tsoAllocators @@ -259,6 +267,7 @@ func (c *tsoClient) updateTSOGlobalServURL(url string) error { log.Info("[tso] switch dc tso global allocator serving url", zap.String("dc-location", globalDCLocation), zap.String("new-url", url)) + c.scheduleUpdateTSOConnectionCtxs() c.scheduleCheckTSODispatcher() return nil } @@ -333,40 +342,41 @@ func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc strin // while a new daemon will be created also to switch back to a normal leader connection ASAP the // connection comes back to normal. func (c *tsoClient) tryConnectToTSO( - dispatcherCtx context.Context, + ctx context.Context, dc string, connectionCtxs *sync.Map, ) error { var ( - networkErrNum uint64 - err error - stream tsoStream - url string - cc *grpc.ClientConn - ) - updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { - if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { - // If the previous connection still exists, we should close it first. - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Store(newURL, connectionCtx) + networkErrNum uint64 + err error + stream tsoStream + url string + cc *grpc.ClientConn + updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { + // Only store the `connectionCtx` if it does not exist before. + connectionCtxs.LoadOrStore(newURL, connectionCtx) + // Remove all other `connectionCtx`s. + connectionCtxs.Range(func(url, cc any) bool { + if url.(string) != newURL { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(url) + } + return true + }) } - connectionCtxs.Range(func(url, cc any) bool { - if url.(string) != newURL { - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(url) - } - return true - }) - } - // retry several times before falling back to the follower when the network problem happens + ) ticker := time.NewTicker(retryInterval) defer ticker.Stop() + // Retry several times before falling back to the follower when the network problem happens for i := 0; i < maxRetryTimes; i++ { c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) + if _, ok := connectionCtxs.Load(url); ok { + return nil + } if cc != nil { - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) failpoint.Inject("unreachableNetwork", func() { stream = nil @@ -392,7 +402,7 @@ func (c *tsoClient) tryConnectToTSO( networkErrNum++ } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return err case <-ticker.C: } @@ -409,14 +419,14 @@ func (c *tsoClient) tryConnectToTSO( } // create the follower stream - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) if err == nil { forwardedHostTrim := trimHTTPPrefix(forwardedHost) addr := trimHTTPPrefix(backupURL) // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) + go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) return nil @@ -429,7 +439,11 @@ func (c *tsoClient) tryConnectToTSO( // tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as // a TSO proxy to reduce the pressure of the main serving service endpoint. -func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { +func (c *tsoClient) tryConnectToTSOWithProxy( + ctx context.Context, + dc string, + connectionCtxs *sync.Map, +) error { tsoStreamBuilders := c.getAllTSOStreamBuilders() leaderAddr := c.svcDiscovery.GetServingURL() forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) @@ -455,7 +469,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s } log.Info("[tso] try to create tso stream", zap.String("dc", dc), zap.String("addr", addr)) - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) // Do not proxy the leader client. if addr != leaderAddr { log.Info("[tso] use follower to forward tso stream to do the proxy", diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 7528293a733..c82ec777eca 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -44,8 +44,17 @@ type tsoDispatcher struct { tsoBatchController *tsoBatchController } +func (td *tsoDispatcher) close() { + td.dispatcherCancel() + td.tsoBatchController.clear() +} + +func (td *tsoDispatcher) push(request *tsoRequest) { + td.tsoBatchController.tsoRequestCh <- request +} + func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { - dispatcher, ok := c.tsoDispatcher.Load(request.dcLocation) + dispatcher, ok := c.getTSODispatcher(request.dcLocation) if !ok { err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation)) log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err)) @@ -70,7 +79,7 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { failpoint.Inject("delayDispatchTSORequest", func() { time.Sleep(time.Second) }) - dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request + dispatcher.push(request) } // Check the contexts again to make sure the request is not been sent to a closed dispatcher. // Never retry on these conditions to prevent unexpected data race. @@ -89,9 +98,7 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { func (c *tsoClient) closeTSODispatcher() { c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { if dispatcherInterface != nil { - dispatcher := dispatcherInterface.(*tsoDispatcher) - dispatcher.dispatcherCancel() - dispatcher.tsoBatchController.clear() + dispatcherInterface.(*tsoDispatcher).close() } return true }) @@ -101,7 +108,7 @@ func (c *tsoClient) updateTSODispatcher() { // Set up the new TSO dispatcher and batch controller. c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { dcLocation := dcLocationKey.(string) - if !c.checkTSODispatcher(dcLocation) { + if _, ok := c.getTSODispatcher(dcLocation); !ok { c.createTSODispatcher(dcLocation) } return true @@ -115,8 +122,8 @@ func (c *tsoClient) updateTSODispatcher() { } if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) - dispatcher.(*tsoDispatcher).dispatcherCancel() c.tsoDispatcher.Delete(dcLocation) + dispatcher.(*tsoDispatcher).close() } return true }) @@ -215,7 +222,7 @@ func (c *tsoClient) tsoDispatcherCheckLoop() { } func (c *tsoClient) checkAllocator( - dispatcherCtx context.Context, + ctx context.Context, forwardCancel context.CancelFunc, dc, forwardedHostTrim, addr, url string, updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext)) { @@ -238,7 +245,7 @@ func (c *tsoClient) checkAllocator( healthCli = healthpb.NewHealthClient(cc) } if healthCli != nil { - healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(ctx, c.option.timeout) resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) failpoint.Inject("unreachableNetwork", func() { resp.Status = healthpb.HealthCheckResponse_UNKNOWN @@ -246,7 +253,7 @@ func (c *tsoClient) checkAllocator( healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { // create a stream of the original allocator - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) if err == nil && stream != nil { log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) @@ -256,7 +263,7 @@ func (c *tsoClient) checkAllocator( } } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return case <-ticker.C: // To ensure we can get the latest allocator leader @@ -266,30 +273,19 @@ func (c *tsoClient) checkAllocator( } } -func (c *tsoClient) checkTSODispatcher(dcLocation string) bool { - dispatcher, ok := c.tsoDispatcher.Load(dcLocation) - if !ok || dispatcher == nil { - return false - } - return true -} - func (c *tsoClient) createTSODispatcher(dcLocation string) { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) - dispatcher := &tsoDispatcher{ - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController( - make(chan *tsoRequest, defaultMaxTSOBatchSize*2), - defaultMaxTSOBatchSize), - } + tsoBatchController := newTSOBatchController( + make(chan *tsoRequest, defaultMaxTSOBatchSize*2), + defaultMaxTSOBatchSize, + ) failpoint.Inject("shortDispatcherChannel", func() { - dispatcher = &tsoDispatcher{ - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController( - make(chan *tsoRequest, 1), - defaultMaxTSOBatchSize), - } + tsoBatchController = newTSOBatchController( + make(chan *tsoRequest, 1), + defaultMaxTSOBatchSize, + ) }) + dispatcher := &tsoDispatcher{dispatcherCancel, tsoBatchController} if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { // Successfully stored the value. Start the following goroutine. @@ -306,7 +302,7 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) { } func (c *tsoClient) handleDispatcher( - dispatcherCtx context.Context, + ctx context.Context, dc string, tbc *tsoBatchController, ) { @@ -319,6 +315,7 @@ func (c *tsoClient) handleDispatcher( // url -> connectionContext connectionCtxs sync.Map ) + // Clean up the connectionCtxs when the dispatcher exits. defer func() { log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc)) // Cancel all connections. @@ -330,51 +327,8 @@ func (c *tsoClient) handleDispatcher( tbc.clear() c.wg.Done() }() - // Call updateTSOConnectionCtxs once to init the connectionCtxs first. - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - // Only the Global TSO needs to watch the updateTSOConnectionCtxsCh to sense the - // change of the cluster when TSO Follower Proxy is enabled. - // TODO: support TSO Follower Proxy for the Local TSO. - if dc == globalDCLocation { - go func() { - var updateTicker = &time.Ticker{} - setNewUpdateTicker := func(ticker *time.Ticker) { - if updateTicker.C != nil { - updateTicker.Stop() - } - updateTicker = ticker - } - // Set to nil before returning to ensure that the existing ticker can be GC. - defer setNewUpdateTicker(nil) - - for { - select { - case <-dispatcherCtx.Done(): - return - case <-c.option.enableTSOFollowerProxyCh: - enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() - log.Info("[tso] tso follower proxy status changed", - zap.String("dc-location", dc), - zap.Bool("enable", enableTSOFollowerProxy)) - if enableTSOFollowerProxy && updateTicker.C == nil { - // Because the TSO Follower Proxy is enabled, - // the periodic check needs to be performed. - setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) - } else if !enableTSOFollowerProxy && updateTicker.C != nil { - // Because the TSO Follower Proxy is disabled, - // the periodic check needs to be turned off. - setNewUpdateTicker(&time.Ticker{}) - } else { - // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered - continue - } - case <-updateTicker.C: - case <-c.updateTSOConnectionCtxsCh: - } - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - } - }() - } + // Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event. + go c.connectionCtxsUpdater(ctx, dc, &connectionCtxs) // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) @@ -383,7 +337,7 @@ func (c *tsoClient) handleDispatcher( tsoBatchLoop: for { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -391,7 +345,7 @@ tsoBatchLoop: maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, // otherwise the upper caller may get blocked on waiting for the results. - if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + if err = tbc.fetchPendingRequests(ctx, maxBatchWaitInterval); err != nil { // Finish the collected requests if the fetch failed. tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) if err == context.Canceled { @@ -427,14 +381,14 @@ tsoBatchLoop: // Check stream and retry if necessary. if stream == nil { log.Info("[tso] tso stream is not ready", zap.String("dc", dc)) - if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { + if c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) { continue streamChoosingLoop } timer := time.NewTimer(retryInterval) select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) timer.Stop() return case <-streamLoopTimer.C: @@ -471,9 +425,9 @@ tsoBatchLoop: tsDeadlineCh, ok = c.tsDeadline.Load(dc) } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) return case tsDeadlineCh.(chan *deadline) <- dl: } @@ -483,7 +437,7 @@ tsoBatchLoop: // If error happens during tso stream handling, reset stream and run the next trial. if err != nil { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -498,9 +452,9 @@ tsoBatchLoop: stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. if IsLeaderChange(err) { - if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil { + if err := bo.Exec(ctx, c.svcDiscovery.CheckMemberChanged); err != nil { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -510,8 +464,59 @@ tsoBatchLoop: // will cancel the current stream, then the EOF error caused by cancel() // should not trigger the updateTSOConnectionCtxs here. // So we should only call it when the leader changes. - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) + } + } + } +} + +// updateTSOConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. +// TODO: implement support for the Local TSO. +func (c *tsoClient) connectionCtxsUpdater( + ctx context.Context, + dc string, + connectionCtxs *sync.Map, +) { + if dc != globalDCLocation { + return + } + log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc)) + var updateTicker = &time.Ticker{} + setNewUpdateTicker := func(ticker *time.Ticker) { + if updateTicker.C != nil { + updateTicker.Stop() + } + updateTicker = ticker + } + // Set to nil before returning to ensure that the existing ticker can be GC. + defer setNewUpdateTicker(nil) + + for { + c.updateTSOConnectionCtxs(ctx, dc, connectionCtxs) + select { + case <-ctx.Done(): + log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc)) + return + case <-c.option.enableTSOFollowerProxyCh: + enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + log.Info("[tso] tso follower proxy status changed", + zap.String("dc-location", dc), + zap.Bool("enable", enableTSOFollowerProxy)) + if enableTSOFollowerProxy && updateTicker.C == nil { + // Because the TSO Follower Proxy is enabled, + // the periodic check needs to be performed. + setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) + } else if !enableTSOFollowerProxy && updateTicker.C != nil { + // Because the TSO Follower Proxy is disabled, + // the periodic check needs to be turned off. + setNewUpdateTicker(&time.Ticker{}) + } else { + continue } + case <-updateTicker.C: + // Triggered periodically when the TSO Follower Proxy is enabled. + case <-c.updateTSOConnectionCtxsCh: + // Triggered by the leader/follower change. } } } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 10be418c029..dfe7a6980c7 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -248,6 +249,40 @@ func TestLeaderTransferAndMoveCluster(t *testing.T) { wg.Wait() } +func TestGetTSAfterTransferLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 2) + re.NoError(err) + endpoints := runServer(re, cluster) + leader := cluster.WaitLeader() + re.NotEmpty(leader) + defer cluster.Destroy() + + cli := setupCli(ctx, re, endpoints, pd.WithCustomTimeoutOption(10*time.Second)) + defer cli.Close() + + var leaderSwitched atomic.Bool + cli.GetServiceDiscovery().AddServingURLSwitchedCallback(func() { + leaderSwitched.Store(true) + }) + err = cluster.GetServer(leader).ResignLeader() + re.NoError(err) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(leader, newLeader) + leader = cluster.WaitLeader() + re.NotEmpty(leader) + err = cli.GetServiceDiscovery().CheckMemberChanged() + re.NoError(err) + + testutil.Eventually(re, leaderSwitched.Load) + // The leader stream must be updated after the leader switch is sensed by the client. + _, _, err = cli.GetTS(context.TODO()) + re.NoError(err) +} + func TestTSOAllocatorLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) From 6e3cac7f95279f08ff31fe15e948afd0237881b8 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Tue, 30 Apr 2024 10:45:27 +0800 Subject: [PATCH 2/3] server/api: add the api to show the regions in subtree by type (#8101) close tikv/pd#8100 server/api: add the api to show the regions in subtree by type Signed-off-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region.go | 54 ++++++++++++++++++++ pkg/response/store.go | 2 + server/api/region.go | 12 ++++- server/cluster/cluster.go | 5 ++ tools/pd-ctl/pdctl/command/region_command.go | 3 ++ tools/pd-ctl/tests/region/region_test.go | 12 ++++- 6 files changed, 86 insertions(+), 2 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 713e82cc36d..8e3d4e5dec8 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1541,6 +1541,60 @@ func (r *RegionsInfo) GetStoreRegions(storeID uint64) []*RegionInfo { return regions } +// SubTreeRegionType is the type of sub tree region. +type SubTreeRegionType string + +const ( + // AllInSubTree is all sub trees. + AllInSubTree SubTreeRegionType = "all" + // LeaderInSubTree is the leader sub tree. + LeaderInSubTree SubTreeRegionType = "leader" + // FollowerInSubTree is the follower sub tree. + FollowerInSubTree SubTreeRegionType = "follower" + // LearnerInSubTree is the learner sub tree. + LearnerInSubTree SubTreeRegionType = "learner" + // WitnessInSubTree is the witness sub tree. + WitnessInSubTree SubTreeRegionType = "witness" + // PendingPeerInSubTree is the pending peer sub tree. + PendingPeerInSubTree SubTreeRegionType = "pending" +) + +// GetStoreRegions gets all RegionInfo with a given storeID +func (r *RegionsInfo) GetStoreRegionsByTypeInSubTree(storeID uint64, typ SubTreeRegionType) ([]*RegionInfo, error) { + r.st.RLock() + var regions []*RegionInfo + switch typ { + case LeaderInSubTree: + if leaders, ok := r.leaders[storeID]; ok { + regions = leaders.scanRanges() + } + case FollowerInSubTree: + if followers, ok := r.followers[storeID]; ok { + regions = followers.scanRanges() + } + case LearnerInSubTree: + if learners, ok := r.learners[storeID]; ok { + regions = learners.scanRanges() + } + case WitnessInSubTree: + if witnesses, ok := r.witnesses[storeID]; ok { + regions = witnesses.scanRanges() + } + case PendingPeerInSubTree: + if pendingPeers, ok := r.pendingPeers[storeID]; ok { + regions = pendingPeers.scanRanges() + } + case AllInSubTree: + r.st.RUnlock() + return r.GetStoreRegions(storeID), nil + default: + return nil, errors.Errorf("unknown sub tree region type %v", typ) + } + + r.st.RUnlock() + return regions, nil +} + // GetStoreLeaderRegionSize get total size of store's leader regions func (r *RegionsInfo) GetStoreLeaderRegionSize(storeID uint64) int64 { r.st.RLock() diff --git a/pkg/response/store.go b/pkg/response/store.go index 1efe11bfb39..8bff1e75e42 100644 --- a/pkg/response/store.go +++ b/pkg/response/store.go @@ -64,6 +64,7 @@ type StoreStatus struct { RegionSize int64 `json:"region_size"` LearnerCount int `json:"learner_count,omitempty"` WitnessCount int `json:"witness_count,omitempty"` + PendingPeerCount int `json:"pending_peer_count,omitempty"` SlowScore uint64 `json:"slow_score,omitempty"` SlowTrend *SlowTrend `json:"slow_trend,omitempty"` SendingSnapCount uint32 `json:"sending_snap_count,omitempty"` @@ -117,6 +118,7 @@ func BuildStoreInfo(opt *sc.ScheduleConfig, store *core.StoreInfo) *StoreInfo { SlowTrend: slowTrend, SendingSnapCount: store.GetSendingSnapCount(), ReceivingSnapCount: store.GetReceivingSnapCount(), + PendingPeerCount: store.GetPendingPeerCount(), IsBusy: store.IsBusy(), }, } diff --git a/server/api/region.go b/server/api/region.go index dac92f247ca..974b5e4fa12 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -218,7 +218,17 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - regions := rc.GetStoreRegions(uint64(id)) + // get type from query + typ := r.URL.Query().Get("type") + if len(typ) == 0 { + typ = string(core.AllInSubTree) + } + + regions, err := rc.GetStoreRegionsByTypeInSubTree(uint64(id), core.SubTreeRegionType(typ)) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } b, err := response.MarshalRegionsInfoJSON(r.Context(), regions) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dbc6a6cadf3..75301664b50 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1198,6 +1198,11 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo { return c.core.GetStoreRegions(storeID) } +// GetStoreRegions returns all regions' information with a given storeID. +func (c *RaftCluster) GetStoreRegionsByType(storeID uint64) []*core.RegionInfo { + return c.core.GetStoreRegions(storeID) +} + // RandLeaderRegions returns some random regions that has leader on the store. func (c *RaftCluster) RandLeaderRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo { return c.core.RandLeaderRegions(storeID, ranges) diff --git a/tools/pd-ctl/pdctl/command/region_command.go b/tools/pd-ctl/pdctl/command/region_command.go index d7e19967c7a..3536b01a606 100644 --- a/tools/pd-ctl/pdctl/command/region_command.go +++ b/tools/pd-ctl/pdctl/command/region_command.go @@ -486,6 +486,7 @@ func NewRegionWithStoreCommand() *cobra.Command { Short: "show the regions of a specific store", Run: showRegionWithStoreCommandFunc, } + r.Flags().String("type", "all", "the type of the regions, could be 'all', 'leader', 'learner' or 'pending'") return r } @@ -496,6 +497,8 @@ func showRegionWithStoreCommandFunc(cmd *cobra.Command, args []string) { } storeID := args[0] prefix := regionsStorePrefix + "/" + storeID + flagType := cmd.Flag("type") + prefix += "?type=" + flagType.Value.String() r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{}) if err != nil { cmd.Printf("Failed to get regions with the given storeID: %s\n", err) diff --git a/tools/pd-ctl/tests/region/region_test.go b/tools/pd-ctl/tests/region/region_test.go index b328fd88286..2952e137f3b 100644 --- a/tools/pd-ctl/tests/region/region_test.go +++ b/tools/pd-ctl/tests/region/region_test.go @@ -108,6 +108,11 @@ func TestRegion(t *testing.T) { ) defer cluster.Destroy() + getRegionsByType := func(storeID uint64, regionType core.SubTreeRegionType) []*core.RegionInfo { + regions, _ := leaderServer.GetRaftCluster().GetStoreRegionsByTypeInSubTree(storeID, regionType) + return regions + } + var testRegionsCases = []struct { args []string expect []*core.RegionInfo @@ -118,7 +123,12 @@ func TestRegion(t *testing.T) { {[]string{"region", "sibling", "2"}, leaderServer.GetAdjacentRegions(leaderServer.GetRegionInfoByID(2))}, // region store command {[]string{"region", "store", "1"}, leaderServer.GetStoreRegions(1)}, - {[]string{"region", "store", "1"}, []*core.RegionInfo{r1, r2, r3, r4}}, + {[]string{"region", "store", "1", "--type=leader"}, getRegionsByType(1, core.LeaderInSubTree)}, + {[]string{"region", "store", "1", "--type=follower"}, getRegionsByType(1, core.FollowerInSubTree)}, + {[]string{"region", "store", "1", "--type=learner"}, getRegionsByType(1, core.LearnerInSubTree)}, + {[]string{"region", "store", "1", "--type=witness"}, getRegionsByType(1, core.WitnessInSubTree)}, + {[]string{"region", "store", "1", "--type=pending"}, getRegionsByType(1, core.PendingPeerInSubTree)}, + {[]string{"region", "store", "1", "--type=all"}, []*core.RegionInfo{r1, r2, r3, r4}}, // region check extra-peer command {[]string{"region", "check", "extra-peer"}, []*core.RegionInfo{r1}}, // region check miss-peer command From 8ee18a4b498669d0a23067e643e5fcfbfc521803 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 30 Apr 2024 11:46:57 +0800 Subject: [PATCH 3/3] pkg: fix `RegionStatsNeedUpdate` condition (#8133) ref tikv/pd#7897 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/statistics/region_collection.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index cb0de6f601b..565597b4efb 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -158,14 +158,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) { return true } - // expected to be zero for below type - if r.IsRegionStatsType(regionID, PendingPeer) && len(region.GetPendingPeers()) == 0 { + + if r.IsRegionStatsType(regionID, PendingPeer) != (len(region.GetPendingPeers()) != 0) { return true } - if r.IsRegionStatsType(regionID, DownPeer) && len(region.GetDownPeers()) == 0 { + if r.IsRegionStatsType(regionID, DownPeer) != (len(region.GetDownPeers()) != 0) { return true } - if r.IsRegionStatsType(regionID, LearnerPeer) && len(region.GetLearners()) == 0 { + if r.IsRegionStatsType(regionID, LearnerPeer) != (len(region.GetLearners()) != 0) { return true }