Skip to content

Commit

Permalink
PCP: Accelerate the speed of the load region (#2034)
Browse files Browse the repository at this point in the history
  • Loading branch information
sourcelliu authored and sre-bot committed Dec 24, 2019
1 parent 5c7fbe7 commit 9ae11e2
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 24 deletions.
18 changes: 5 additions & 13 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Server interface {
GetStorage() *core.Storage
GetHBStreams() opt.HeartbeatStreams
GetRaftCluster() *RaftCluster
GetBasicCluster() *core.BasicCluster
}

// RaftCluster is used for cluster config management.
Expand Down Expand Up @@ -168,8 +169,8 @@ func (c *RaftCluster) loadBootstrapTime() (time.Time, error) {
}

// InitCluster initializes the raft cluster.
func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage) {
c.core = core.NewBasicCluster()
func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster) {
c.core = basicCluster
c.opt = opt
c.storage = storage
c.id = id
Expand All @@ -190,7 +191,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

c.InitCluster(s.GetAllocator(), s.GetScheduleOption(), s.GetStorage())
c.InitCluster(s.GetAllocator(), s.GetScheduleOption(), s.GetStorage(), s.GetBasicCluster())
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand Down Expand Up @@ -238,16 +239,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
putRegion := func(region *core.RegionInfo) []*core.RegionInfo {
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta()))
// return the state region to delete.
return []*core.RegionInfo{region}
}
return c.core.PutRegion(region)
}
if err := c.storage.LoadRegions(putRegion); err != nil {
if err := c.storage.LoadRegionsOnce(c.core.CheckAndPutRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down
18 changes: 9 additions & 9 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type testClusterInfoSuite struct{}
func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()))
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

n, np := uint64(3), uint64(3)
stores := newTestStores(n)
Expand Down Expand Up @@ -86,7 +86,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()))
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

n, np := uint64(3), uint64(3)

Expand Down Expand Up @@ -339,7 +339,7 @@ func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) {
func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()))
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

// 1: [nil, nil)
region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
Expand Down Expand Up @@ -378,7 +378,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()))
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})}

Expand Down Expand Up @@ -485,7 +485,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
regions := newTestRegions(n, np)
_, opts, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestRaftCluster(mockid.NewIDAllocator(), opts, core.NewStorage(kv.NewMemoryKV()))
tc := newTestRaftCluster(mockid.NewIDAllocator(), opts, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
cache := tc.core.Regions

for i := uint64(0); i < n; i++ {
Expand Down Expand Up @@ -595,7 +595,7 @@ type testGetStoresSuite struct {
func (s *testGetStoresSuite) SetUpSuite(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()))
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
s.cluster = cluster

stores := newTestStores(200)
Expand Down Expand Up @@ -629,13 +629,13 @@ func newTestScheduleConfig() (*config.ScheduleConfig, *config.ScheduleOption, er
}

func newTestCluster(opt *config.ScheduleOption) *testCluster {
rc := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()))
rc := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
return &testCluster{RaftCluster: rc}
}

func newTestRaftCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage) *RaftCluster {
func newTestRaftCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster {
rc := &RaftCluster{}
rc.InitCluster(id, opt, storage)
rc.InitCluster(id, opt, storage, basicCluster)
return rc
}

Expand Down
13 changes: 13 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"sync"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/slice"
"go.uber.org/zap"
)

// BasicCluster provides basic data member and interface for a tikv cluster.
Expand Down Expand Up @@ -312,6 +314,17 @@ func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo {
return bc.Regions.SetRegion(region)
}

// CheckAndPutRegion checks if the region is valid to put,if valid then put.
func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
origin, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta()))
// return the state region to delete.
return []*RegionInfo{region}
}
return bc.PutRegion(region)
}

// RemoveRegion removes RegionInfo from regionTree and regionMap.
func (bc *BasicCluster) RemoveRegion(region *RegionInfo) {
bc.Lock()
Expand Down
19 changes: 19 additions & 0 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -52,6 +53,8 @@ type Storage struct {
kv.Base
regionStorage *RegionStorage
useRegionStorage int32
regionLoaded int32
mu sync.Mutex
}

// NewStorage creates Storage instance with Base.
Expand Down Expand Up @@ -162,6 +165,22 @@ func (s *Storage) LoadRegions(f func(region *RegionInfo) []*RegionInfo) error {
return loadRegions(s.Base, f)
}

// LoadRegionsOnce loads all regions from storage to RegionsInfo.Only load one time from regionStorage.
func (s *Storage) LoadRegionsOnce(f func(region *RegionInfo) []*RegionInfo) error {
if atomic.LoadInt32(&s.useRegionStorage) == 0 {
return loadRegions(s.Base, f)
}
s.mu.Lock()
defer s.mu.Unlock()
if s.regionLoaded == 0 {
if err := loadRegions(s.regionStorage, f); err != nil {
return err
}
s.regionLoaded = 1
}
return nil
}

// SaveRegion saves one region to storage.
func (s *Storage) SaveRegion(region *metapb.Region) error {
if atomic.LoadInt32(&s.useRegionStorage) > 0 {
Expand Down
19 changes: 19 additions & 0 deletions server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ func (s *testKVSuite) TestLoadRegions(c *C) {
}
}

func (s *testKVSuite) TestLoadRegionsToCache(c *C) {
storage := NewStorage(kv.NewMemoryKV())
cache := NewRegionsInfo()

n := 10
regions := mustSaveRegions(c, storage, n)
c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil)

c.Assert(cache.GetRegionCount(), Equals, n)
for _, region := range cache.GetMetaRegions() {
c.Assert(region, DeepEquals, regions[region.GetId()])
}

n = 20
mustSaveRegions(c, storage, n)
c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil)
c.Assert(cache.GetRegionCount(), Equals, n)
}

func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) {
storage := NewStorage(&KVWithMaxRangeLimit{Base: kv.NewMemoryKV(), rangeLimit: 500})
cache := NewRegionsInfo()
Expand Down
6 changes: 6 additions & 0 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.RUnlock()
go func() {
defer s.wg.Done()
// used to load region from kv storage to cache storage.
err := s.server.GetStorage().LoadRegionsOnce(s.server.GetBasicCluster().CheckAndPutRegion)
if err != nil {
log.Warn("failed to load regions.", zap.Error(err))
}
for {
select {
case <-closed:
Expand Down Expand Up @@ -128,6 +133,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.history.ResetWithIndex(resp.GetStartIndex())
}
for _, r := range resp.GetRegions() {
s.server.GetBasicCluster().CheckAndPutRegion(core.NewRegionInfo(r, nil))
err = s.server.GetStorage().SaveRegion(r)
if err == nil {
s.history.Record(core.NewRegionInfo(r, nil))
Expand Down
1 change: 1 addition & 0 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Server interface {
Name() string
GetMetaRegions() []*metapb.Region
GetSecurityConfig() *config.SecurityConfig
GetBasicCluster() *core.BasicCluster
}

// RegionSyncer is used to sync the region information without raft.
Expand Down
8 changes: 8 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Server struct {
idAllocator *id.AllocatorImpl
// for storage operation.
storage *core.Storage
// for baiscCluster operation.
basicCluster *core.BasicCluster
// for tso.
tso *tso.TimestampOracle
// for raft cluster
Expand Down Expand Up @@ -307,6 +309,7 @@ func (s *Server) startServer(ctx context.Context) error {
return err
}
s.storage = core.NewStorage(kvBase).SetRegionStorage(regionStorage)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.GetClusterRootPath(), s.clusterID, syncer.NewRegionSyncer(s), s.client)
s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster)
// Server has started.
Expand Down Expand Up @@ -561,6 +564,11 @@ func (s *Server) SetStorage(storage *core.Storage) {
s.storage = storage
}

// GetBasicCluster returns the basic cluster of server.
func (s *Server) GetBasicCluster() *core.BasicCluster {
return s.basicCluster
}

// GetScheduleOption returns the schedule option.
func (s *Server) GetScheduleOption() *config.ScheduleOption {
return s.scheduleOpt
Expand Down
10 changes: 10 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,16 @@ func (c *TestCluster) GetLeader() string {
return ""
}

// GetFollower returns an follower of all servers
func (c *TestCluster) GetFollower() string {
for name, s := range c.servers {
if !s.server.IsClosed() && !s.server.GetMember().IsLeader() {
return name
}
}
return ""
}

// WaitLeader is used to get leader.
// If it exceeds the maximum number of loops, it will return an empty string.
func (c *TestCluster) WaitLeader() string {
Expand Down
23 changes: 21 additions & 2 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,13 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) {
rc := cluster.NewRaftCluster(s.ctx, svr.GetClusterRootPath(), svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetScheduleOption(), svr.GetStorage())
rc.InitCluster(svr.GetAllocator(), svr.GetScheduleOption(), svr.GetStorage(), svr.GetBasicCluster())
raftCluster, err := rc.LoadClusterInfo()
c.Assert(err, IsNil)
c.Assert(raftCluster, IsNil)

storage := rc.GetStorage()
basicCluster := rc.GetCacheCluster()
opt := rc.GetOpt()
// Save meta, stores and regions.
n := 10
Expand Down Expand Up @@ -662,7 +663,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) {
c.Assert(storage.Flush(), IsNil)

raftCluster = &cluster.RaftCluster{}
raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage)
raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster)
raftCluster, err = raftCluster.LoadClusterInfo()
c.Assert(err, IsNil)
c.Assert(raftCluster, NotNil)
Expand All @@ -677,6 +678,24 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) {
for _, region := range raftCluster.GetMetaRegions() {
c.Assert(region, DeepEquals, regions[region.GetId()])
}

m := 20
regions = make([]*metapb.Region, 0, n)
for i := uint64(0); i < uint64(m); i++ {
region := &metapb.Region{
Id: i,
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1},
}
regions = append(regions, region)
}

for _, region := range regions {
c.Assert(storage.SaveRegion(region), IsNil)
}
raftCluster.GetStorage().LoadRegionsOnce(raftCluster.GetCacheCluster().PutRegion)
c.Assert(raftCluster.GetRegionCount(), Equals, n)
}

func newIsBootstrapRequest(clusterID uint64) *pdpb.IsBootstrappedRequest {
Expand Down
10 changes: 10 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
// ensure flush to region storage, we use a duration larger than the
// region storage flush rate limit (3s).
time.Sleep(4 * time.Second)

//test All regions have been synchronized to the cache of followerServer
followerServer := cluster.GetServer(cluster.GetFollower())
c.Assert(followerServer, NotNil)
cacheRegions := followerServer.GetServer().GetBasicCluster().GetRegions()
c.Assert(cacheRegions, HasLen, regionLen)
for _, region := range cacheRegions {
c.Assert(followerServer.GetServer().GetBasicCluster().GetRegion(region.GetID()).GetMeta(), DeepEquals, region.GetMeta())
}

err = leaderServer.Stop()
c.Assert(err, IsNil)
cluster.WaitLeader()
Expand Down

0 comments on commit 9ae11e2

Please sign in to comment.