From 74cfeef89dc23bbcde5d1da44d341ad04bf07f8d Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 28 Sep 2021 12:25:41 +0800 Subject: [PATCH 1/8] allow cancel load region Signed-off-by: disksing --- server/cluster/cluster.go | 2 +- server/core/region_storage.go | 11 +++++++++++ server/core/storage.go | 13 +++++++------ server/core/storage_test.go | 9 +++++---- server/region_syncer/client.go | 12 ++++++------ server/region_syncer/server.go | 5 +++-- tests/server/cluster/cluster_test.go | 2 +- 7 files changed, 34 insertions(+), 20 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c99bc43671f..b0ef3b3d63b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -290,7 +290,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - if err := c.storage.LoadRegionsOnce(c.core.CheckAndPutRegion); err != nil { + if err := c.storage.LoadRegionsOnce(c.ctx, c.core.CheckAndPutRegion); err != nil { return nil, err } log.Info("load regions", diff --git a/server/core/region_storage.go b/server/core/region_storage.go index d5ee5d546bb..8566e92d295 100644 --- a/server/core/region_storage.go +++ b/server/core/region_storage.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/encryption" @@ -131,6 +132,7 @@ func deleteRegion(kv kv.Base, region *metapb.Region) error { } func loadRegions( + ctx context.Context, kv kv.Base, encryptionKeyManager *encryptionkm.KeyManager, f func(region *RegionInfo) []*RegionInfo, @@ -143,6 +145,10 @@ func loadRegions( // a variable rangeLimit to work around. rangeLimit := maxKVRangeLimit for { + failpoint.Inject("slowLoadRegion", func() { + rangeLimit = 1 + time.Sleep(time.Second) + }) startKey := regionPath(nextID) _, res, err := kv.LoadRange(startKey, endKey, rangeLimit) if err != nil { @@ -151,6 +157,11 @@ func loadRegions( } return err } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } for _, s := range res { region := &metapb.Region{} diff --git a/server/core/storage.go b/server/core/storage.go index 1fc5f6fc212..d65c6610a69 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -14,6 +14,7 @@ package core import ( + "context" "encoding/json" "fmt" "math" @@ -193,22 +194,22 @@ func (s *Storage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, e } // LoadRegions loads all regions from storage to RegionsInfo. -func (s *Storage) LoadRegions(f func(region *RegionInfo) []*RegionInfo) error { +func (s *Storage) LoadRegions(ctx context.Context, f func(region *RegionInfo) []*RegionInfo) error { if atomic.LoadInt32(&s.useRegionStorage) > 0 { - return loadRegions(s.regionStorage, s.encryptionKeyManager, f) + return loadRegions(ctx, s.regionStorage, s.encryptionKeyManager, f) } - return loadRegions(s.Base, s.encryptionKeyManager, f) + return loadRegions(ctx, s.Base, s.encryptionKeyManager, f) } // LoadRegionsOnce loads all regions from storage to RegionsInfo.Only load one time from regionStorage. -func (s *Storage) LoadRegionsOnce(f func(region *RegionInfo) []*RegionInfo) error { +func (s *Storage) LoadRegionsOnce(ctx context.Context, f func(region *RegionInfo) []*RegionInfo) error { if atomic.LoadInt32(&s.useRegionStorage) == 0 { - return loadRegions(s.Base, s.encryptionKeyManager, f) + return loadRegions(ctx, s.Base, s.encryptionKeyManager, f) } s.mu.Lock() defer s.mu.Unlock() if s.regionLoaded == 0 { - if err := loadRegions(s.regionStorage, s.encryptionKeyManager, f); err != nil { + if err := loadRegions(ctx, s.regionStorage, s.encryptionKeyManager, f); err != nil { return err } s.regionLoaded = 1 diff --git a/server/core/storage_test.go b/server/core/storage_test.go index d513da0d037..bc6855c423c 100644 --- a/server/core/storage_test.go +++ b/server/core/storage_test.go @@ -14,6 +14,7 @@ package core import ( + "context" "encoding/json" "fmt" "math" @@ -143,7 +144,7 @@ func (s *testKVSuite) TestLoadRegions(c *C) { n := 10 regions := mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegions(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegions(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { @@ -157,7 +158,7 @@ func (s *testKVSuite) TestLoadRegionsToCache(c *C) { n := 10 regions := mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegionsOnce(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { @@ -166,7 +167,7 @@ func (s *testKVSuite) TestLoadRegionsToCache(c *C) { n = 20 mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegionsOnce(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) } @@ -176,7 +177,7 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) { n := 1000 regions := mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegions(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegions(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { c.Assert(region, DeepEquals, regions[region.GetId()]) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 4eed5f18972..72a865ae8a4 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -41,8 +41,8 @@ const ( func (s *RegionSyncer) StopSyncWithLeader() { s.reset() s.mu.Lock() - close(s.mu.closed) - s.mu.closed = make(chan struct{}) + s.mu.clientCancel() + s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(context.Background()) s.mu.Unlock() s.wg.Wait() } @@ -129,14 +129,14 @@ var regionGuide = core.GenerateRegionGuideFunc(false) func (s *RegionSyncer) StartSyncWithLeader(addr string) { s.wg.Add(1) s.mu.RLock() - closed := s.mu.closed + ctx := s.mu.clientCtx s.mu.RUnlock() go func() { defer s.wg.Done() // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() storage := s.server.GetStorage() - err := storage.LoadRegionsOnce(bc.CheckAndPutRegion) + err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) } @@ -144,7 +144,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { var conn *grpc.ClientConn for { select { - case <-closed: + case <-ctx.Done(): return default: } @@ -160,7 +160,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { // Start syncing data. for { select { - case <-closed: + case <-ctx.Done(): return default: } diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index d0ab8f7f221..44436f56b4c 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -72,7 +72,8 @@ type RegionSyncer struct { streams map[string]ServerStream regionSyncerCtx context.Context regionSyncerCancel context.CancelFunc - closed chan struct{} + clientCtx context.Context + clientCancel context.CancelFunc } server Server wg sync.WaitGroup @@ -94,7 +95,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.closed = make(chan struct{}) + syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) return syncer } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 2d62cec22ef..48dd36444f8 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -726,7 +726,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) { for _, region := range regions { c.Assert(storage.SaveRegion(region), IsNil) } - raftCluster.GetStorage().LoadRegionsOnce(raftCluster.GetCacheCluster().PutRegion) + raftCluster.GetStorage().LoadRegionsOnce(s.ctx, raftCluster.GetCacheCluster().PutRegion) c.Assert(raftCluster.GetRegionCount(), Equals, n) } From 2294f09d75b7b9b94b34681c41a33d2d03e9072e Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 28 Sep 2021 15:26:23 +0800 Subject: [PATCH 2/8] add test Signed-off-by: disksing --- server/region_syncer/client_test.go | 104 ++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 server/region_syncer/client_test.go diff --git a/server/region_syncer/client_test.go b/server/region_syncer/client_test.go new file mode 100644 index 00000000000..c8c698bc3aa --- /dev/null +++ b/server/region_syncer/client_test.go @@ -0,0 +1,104 @@ +// Copyright 2018 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "context" + "os" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/grpcutil" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/kv" +) + +var _ = Suite(&testClientSuite{}) + +type testClientSuite struct{} + +// For issue https://github.com/tikv/pd/issues/3936 +func (t *testClientSuite) TestLoadRegion(c *C) { + tempDir, err := os.MkdirTemp(os.TempDir(), "region_syncer_load_region") + c.Assert(err, IsNil) + defer os.RemoveAll(tempDir) + rs, err := core.NewRegionStorage(context.Background(), tempDir, nil) + c.Assert(err, IsNil) + + server := &mockServer{ + ctx: context.Background(), + storage: core.NewStorage(kv.NewMemoryKV(), core.WithRegionStorage(rs)), + bc: core.NewBasicCluster(), + } + for i := 0; i < 30; i++ { + rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1}) + } + c.Assert(failpoint.Enable("github.com/tikv/pd/server/core/slowLoadRegion", "return(true)"), IsNil) + defer func() { c.Assert(failpoint.Disable("github.com/tikv/pd/server/core/slowLoadRegion"), IsNil) }() + + rc := NewRegionSyncer(server) + start := time.Now() + rc.StartSyncWithLeader("") + time.Sleep(time.Second) + rc.StopSyncWithLeader() + c.Assert(time.Since(start), Greater, time.Second) // make sure failpoint is injected + c.Assert(time.Since(start), Less, time.Second*2) +} + +type mockServer struct { + ctx context.Context + member, leader *pdpb.Member + storage *core.Storage + bc *core.BasicCluster +} + +func (s *mockServer) LoopContext() context.Context { + return s.ctx +} + +func (s *mockServer) ClusterID() uint64 { + return 1 +} + +func (s *mockServer) GetMemberInfo() *pdpb.Member { + return s.member +} + +func (s *mockServer) GetLeader() *pdpb.Member { + return s.leader +} + +func (s *mockServer) GetStorage() *core.Storage { + return s.storage +} + +func (s *mockServer) Name() string { + return "mock-server" +} + +func (s *mockServer) GetRegions() []*core.RegionInfo { + return s.bc.GetRegions() +} + +func (s *mockServer) GetTLSConfig() *grpcutil.TLSConfig { + return &grpcutil.TLSConfig{} +} + +func (s *mockServer) GetBasicCluster() *core.BasicCluster { + return s.bc +} From 49af7bc6f99f13e0ba54738b969a2541598b58eb Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 28 Sep 2021 15:27:08 +0800 Subject: [PATCH 3/8] minor update Signed-off-by: disksing --- server/region_syncer/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/client_test.go b/server/region_syncer/client_test.go index c8c698bc3aa..e0268bf52a6 100644 --- a/server/region_syncer/client_test.go +++ b/server/region_syncer/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 TiKV Project Authors. +// Copyright 2021 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 540b1f67f4e5fe33a9fccbb144d54d78f22e999a Mon Sep 17 00:00:00 2001 From: disksing Date: Sun, 10 Oct 2021 10:54:32 +0800 Subject: [PATCH 4/8] address comment Signed-off-by: disksing --- server/region_syncer/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 44436f56b4c..4a2e62d9ace 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -95,7 +95,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) + syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(s.LoopContext()) return syncer } From 7283e42ed321a84705d5a6fa9f5a1a666a4c08cc Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 11 Oct 2021 13:32:13 +0800 Subject: [PATCH 5/8] Revert "address comment" This reverts commit 5b69c4a2928aebcebbfffb5f422d91c266ab7add. Signed-off-by: disksing --- server/region_syncer/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 4a2e62d9ace..44436f56b4c 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -95,7 +95,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(s.LoopContext()) + syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) return syncer } From 2474307028955e00a2063025ac04baf73c7e512a Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 12 Oct 2021 15:23:14 +0800 Subject: [PATCH 6/8] add log Signed-off-by: disksing --- server/region_syncer/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 72a865ae8a4..99c49ea4e79 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -136,7 +136,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() storage := s.server.GetStorage() + log.Info("region syncer start load region") + start := time.Now() err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion) + log.Info("region syncer finished load region", zap.Duration("time_cost", time.Since(start))) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) } From b11368dff9b5420e40b3f2f3306349f734b8a43d Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Tue, 12 Oct 2021 16:48:19 +0800 Subject: [PATCH 7/8] merge clientCtx and regionSyncerCtx Signed-off-by: HunDunDM --- server/region_syncer/client.go | 45 +++++++++++----------------------- server/region_syncer/server.go | 9 +++---- 2 files changed, 17 insertions(+), 37 deletions(-) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 99c49ea4e79..7c21c767e54 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -40,10 +40,6 @@ const ( // StopSyncWithLeader stop to sync the region with leader. func (s *RegionSyncer) StopSyncWithLeader() { s.reset() - s.mu.Lock() - s.mu.clientCancel() - s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(context.Background()) - s.mu.Unlock() s.wg.Wait() } @@ -51,19 +47,15 @@ func (s *RegionSyncer) reset() { s.mu.Lock() defer s.mu.Unlock() - if s.mu.regionSyncerCancel == nil { - return + if s.mu.clientCancel != nil { + s.mu.clientCancel() } - s.mu.regionSyncerCancel() - s.mu.regionSyncerCancel, s.mu.regionSyncerCtx = nil, nil + s.mu.clientCancel, s.mu.clientCtx = nil, nil } -func (s *RegionSyncer) establish(addr string) (*grpc.ClientConn, error) { - s.reset() - ctx, cancel := context.WithCancel(s.server.LoopContext()) +func (s *RegionSyncer) establish(ctx context.Context, addr string) (*grpc.ClientConn, error) { tlsCfg, err := s.tlsConfig.ToTLSConfig() if err != nil { - cancel() return nil, err } cc, err := grpcutil.GetClientConn( @@ -88,28 +80,16 @@ func (s *RegionSyncer) establish(addr string) (*grpc.ClientConn, error) { grpc.WithBlock(), ) if err != nil { - cancel() return nil, errors.WithStack(err) } - - s.mu.Lock() - s.mu.regionSyncerCtx, s.mu.regionSyncerCancel = ctx, cancel - s.mu.Unlock() return cc, nil } -func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) { +func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (ClientStream, error) { cli := pdpb.NewPDClient(conn) - var ctx context.Context - s.mu.RLock() - ctx = s.mu.regionSyncerCtx - s.mu.RUnlock() - if ctx == nil { - return nil, errors.New("syncRegion failed due to regionSyncerCtx is nil") - } syncStream, err := cli.SyncRegions(ctx) if err != nil { - return syncStream, errs.ErrGRPCCreateStream.Wrap(err).FastGenWithCause() + return nil, errs.ErrGRPCCreateStream.Wrap(err).FastGenWithCause() } err = syncStream.Send(&pdpb.SyncRegionRequest{ Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}, @@ -117,7 +97,7 @@ func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) { StartIndex: s.history.GetNextIndex(), }) if err != nil { - return syncStream, errs.ErrGRPCSend.Wrap(err).FastGenWithCause() + return nil, errs.ErrGRPCSend.Wrap(err).FastGenWithCause() } return syncStream, nil @@ -128,9 +108,12 @@ var regionGuide = core.GenerateRegionGuideFunc(false) // StartSyncWithLeader starts to sync with leader. func (s *RegionSyncer) StartSyncWithLeader(addr string) { s.wg.Add(1) - s.mu.RLock() + + s.mu.Lock() + defer s.mu.Unlock() + s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(s.server.LoopContext()) ctx := s.mu.clientCtx - s.mu.RUnlock() + go func() { defer s.wg.Done() // used to load region from kv storage to cache storage. @@ -151,7 +134,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { return default: } - conn, err = s.establish(addr) + conn, err = s.establish(ctx, addr) if err != nil { log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) continue @@ -168,7 +151,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { default: } - stream, err := s.syncRegion(conn) + stream, err := s.syncRegion(ctx, conn) if err != nil { if ev, ok := status.FromError(err); ok { if ev.Code() == codes.Canceled { diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 44436f56b4c..6569b8863de 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -69,11 +69,9 @@ type Server interface { type RegionSyncer struct { mu struct { sync.RWMutex - streams map[string]ServerStream - regionSyncerCtx context.Context - regionSyncerCancel context.CancelFunc - clientCtx context.Context - clientCancel context.CancelFunc + streams map[string]ServerStream + clientCtx context.Context + clientCancel context.CancelFunc } server Server wg sync.WaitGroup @@ -95,7 +93,6 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) return syncer } From 6d05f1de9700c592d13081a808f1461fff729032 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 12 Oct 2021 21:46:48 +0800 Subject: [PATCH 8/8] Update server/region_syncer/client.go Signed-off-by: disksing Co-authored-by: Ryan Leung --- server/region_syncer/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 7c21c767e54..f76bc0a6525 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -122,7 +122,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Info("region syncer start load region") start := time.Now() err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion) - log.Info("region syncer finished load region", zap.Duration("time_cost", time.Since(start))) + log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start))) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) }