Skip to content

Commit

Permalink
Merge branch 'master' into fix_client_hang
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 17, 2020
2 parents 639ab0e + 6a10e75 commit 9bdc111
Show file tree
Hide file tree
Showing 21 changed files with 97 additions and 74 deletions.
2 changes: 1 addition & 1 deletion pkg/autoscaling/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c *normalClient) buildCPUMockData(component ComponentType) {
cpuUsageQuery := fmt.Sprintf(cpuUsagePromQLTemplate[component], mockDuration)
cpuQuotaQuery := cpuQuotaPromQLTemplate[component]

results := make([]result, 0)
var results []result
for i := 0; i < instanceCount; i++ {
results = append(results, result{
Value: []interface{}{time.Now().Unix(), fmt.Sprintf("%f", mockResultValue)},
Expand Down
17 changes: 9 additions & 8 deletions pkg/encryption/kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@ func newAwsCredentials() (*credentials.Credentials, error) {
providers = append(providers, webIdentityProvider)
}

// Credentials from AWS environment variables.
providers = append(providers, &credentials.EnvProvider{})

// Credentials from default AWS credentials file.
providers = append(providers, &credentials.SharedCredentialsProvider{
Filename: "",
Profile: "",
})
providers = append(providers,
// Credentials from AWS environment variables.
&credentials.EnvProvider{},
// Credentials from default AWS credentials file.
&credentials.SharedCredentialsProvider{
Filename: "",
Profile: "",
},
)

credentials := credentials.NewChainCredentials(providers)
return credentials, nil
Expand Down
8 changes: 3 additions & 5 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ type Cluster struct {
*core.BasicCluster
*mockid.IDAllocator
*placement.RuleManager
*statistics.HotCache
*statistics.StoresStats
*statistics.HotStat
*config.PersistOptions
ID uint64
suspectRegions map[uint64]struct{}
Expand All @@ -56,8 +55,7 @@ func NewCluster(opts *config.PersistOptions) *Cluster {
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotCache: statistics.NewHotCache(),
StoresStats: statistics.NewStoresStats(),
HotStat: statistics.NewHotStat(),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
disabledFeatures: make(map[versioninfo.Feature]struct{}),
Expand Down Expand Up @@ -92,7 +90,7 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// GetStoresStats gets stores statistics.
func (mc *Cluster) GetStoresStats() *statistics.StoresStats {
return mc.StoresStats
return mc.HotStat.StoresStats
}

// GetStoreRegionCount gets region count with a given store.
Expand Down
3 changes: 1 addition & 2 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
defer conf.mu.RUnlock()
var res []string
for index := range conf.StoreIDWitRanges[id] {
res = append(res, (string)(conf.StoreIDWitRanges[id][index].StartKey))
res = append(res, (string)(conf.StoreIDWitRanges[id][index].EndKey))
res = append(res, (string)(conf.StoreIDWitRanges[id][index].StartKey), (string)(conf.StoreIDWitRanges[id][index].EndKey))
}
return res
}
Expand Down
54 changes: 26 additions & 28 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ type RaftCluster struct {

labelLevelStats *statistics.LabelStatistics
regionStats *statistics.RegionStatistics
storesStats *statistics.StoresStats
hotSpotCache *statistics.HotCache
hotStat *statistics.HotStat

coordinator *coordinator
suspectRegions *cache.TTLUint64 // suspectRegions are regions that may need fix
Expand Down Expand Up @@ -201,10 +200,9 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s
c.storage = storage
c.id = id
c.labelLevelStats = statistics.NewLabelStatistics()
c.storesStats = statistics.NewStoresStats()
c.hotStat = statistics.NewHotStat()
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.hotSpotCache = statistics.NewHotCache()
c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute)
c.suspectKeyRanges = cache.NewStringTTL(c.ctx, time.Minute, 3*time.Minute)
c.traceRegionFlow = opt.GetPDServerConfig().TraceRegionFlow
Expand Down Expand Up @@ -297,7 +295,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
zap.Duration("cost", time.Since(start)),
)
for _, store := range c.GetStores() {
c.storesStats.GetOrCreateRollingStoreStats(store.GetID())
c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
}
return c, nil
}
Expand Down Expand Up @@ -520,13 +518,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
}
}
if store := c.core.GetStore(newStore.GetID()); store != nil {
c.storesStats.UpdateStoreHeartbeatMetrics(store)
c.hotStat.UpdateStoreHeartbeatMetrics(store)
}
c.core.PutStore(newStore)
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
c.storesStats.UpdateTotalBytesRate(c.core.GetStores)
c.storesStats.UpdateTotalKeysRate(c.core.GetStores)
c.storesStats.FilterUnhealthyStore(c)
c.hotStat.Observe(newStore.GetID(), newStore.GetStoreStats())
c.hotStat.UpdateTotalBytesRate(c.core.GetStores)
c.hotStat.UpdateTotalKeysRate(c.core.GetStores)
c.hotStat.FilterUnhealthyStore(c)

// c.limiter is nil before "start" is called
if c.limiter != nil && c.opt.GetStoreLimitMode() == "auto" {
Expand Down Expand Up @@ -680,10 +678,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

for _, writeItem := range writeItems {
c.hotSpotCache.Update(writeItem)
c.hotStat.Update(writeItem)
}
for _, readItem := range readItems {
c.hotSpotCache.Update(readItem)
c.hotStat.Update(readItem)
}
c.Unlock()

Expand Down Expand Up @@ -801,7 +799,7 @@ func (c *RaftCluster) RandLearnerRegion(storeID uint64, ranges []core.KeyRange,
func (c *RaftCluster) RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
r := c.hotSpotCache.RandHotRegionFromStore(store, kind, c.opt.GetHotRegionCacheHitsThreshold())
r := c.hotStat.RandHotRegionFromStore(store, kind, c.opt.GetHotRegionCacheHitsThreshold())
if r == nil {
return nil
}
Expand Down Expand Up @@ -850,7 +848,7 @@ func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.Region
func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {
c.RLock()
defer c.RUnlock()
return c.storesStats
return c.hotStat.StoresStats
}

// DropCacheRegion removes a region from the cache.
Expand Down Expand Up @@ -888,7 +886,7 @@ func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo {
func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool {
c.RLock()
defer c.RUnlock()
return c.hotSpotCache.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
}

// GetAdjacentRegions returns regions' information that are adjacent with the specific region ID.
Expand Down Expand Up @@ -1149,7 +1147,7 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
}
}
c.core.PutStore(store)
c.storesStats.GetOrCreateRollingStoreStats(store.GetID())
c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
return nil
}

Expand Down Expand Up @@ -1226,15 +1224,15 @@ func (c *RaftCluster) deleteStoreLocked(store *core.StoreInfo) error {
}
}
c.core.DeleteStore(store)
c.storesStats.RemoveRollingStoreStats(store.GetID())
c.hotStat.RemoveRollingStoreStats(store.GetID())
return nil
}

func (c *RaftCluster) collectMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt)
stores := c.GetStores()
for _, s := range stores {
statsMap.Observe(s, c.storesStats)
statsMap.Observe(s, c.hotStat.StoresStats)
}
statsMap.Collect()

Expand Down Expand Up @@ -1264,7 +1262,7 @@ func (c *RaftCluster) collectClusterMetrics() {
c.regionStats.Collect()
c.labelLevelStats.Collect()
// collect hot cache metrics
c.hotSpotCache.CollectMetrics()
c.hotStat.CollectMetrics()
}

func (c *RaftCluster) resetClusterMetrics() {
Expand All @@ -1276,7 +1274,7 @@ func (c *RaftCluster) resetClusterMetrics() {
c.regionStats.Reset()
c.labelLevelStats.Reset()
// reset hot cache metrics
c.hotSpotCache.ResetMetrics()
c.hotStat.ResetMetrics()
}

func (c *RaftCluster) collectHealthStatus() {
Expand Down Expand Up @@ -1431,50 +1429,50 @@ func (c *RaftCluster) isPrepared() bool {
func (c *RaftCluster) GetStoresBytesWriteStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresBytesWriteStat()
return c.hotStat.GetStoresBytesWriteStat()
}

// GetStoresBytesReadStat returns the bytes read stat of all StoreInfo.
func (c *RaftCluster) GetStoresBytesReadStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresBytesReadStat()
return c.hotStat.GetStoresBytesReadStat()
}

// GetStoresKeysWriteStat returns the bytes write stat of all StoreInfo.
func (c *RaftCluster) GetStoresKeysWriteStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresKeysWriteStat()
return c.hotStat.GetStoresKeysWriteStat()
}

// GetStoresKeysReadStat returns the bytes read stat of all StoreInfo.
func (c *RaftCluster) GetStoresKeysReadStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresKeysReadStat()
return c.hotStat.GetStoresKeysReadStat()
}

// RegionReadStats returns hot region's read stats.
func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.ReadFlow)
return c.hotStat.RegionStats(statistics.ReadFlow)
}

// RegionWriteStats returns hot region's write stats.
func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.WriteFlow)
return c.hotStat.RegionStats(statistics.WriteFlow)
}

// CheckWriteStatus checks the write status, returns whether need update statistics and item.
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotSpotCache.CheckWrite(region)
return c.hotStat.CheckWrite(region)
}

// CheckReadStatus checks the read status, returns whether need update statistics and item.
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotSpotCache.CheckRead(region)
return c.hotStat.CheckRead(region)
}

// TODO: remove me.
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
}
c.Assert(cluster.putStoreLocked(store), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), NotNil)
c.Assert(cluster.hotStat.GetRollingStoreStats(store.GetID()), NotNil)
}

for _, store := range stores {
Expand All @@ -121,7 +121,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
c.Assert(cluster.putStoreLocked(newStore), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), IsNil)
c.Assert(cluster.hotStat.GetRollingStoreStats(store.GetID()), IsNil)
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (c *coordinator) pauseOrResumeScheduler(name string, t int64) error {
if c.cluster == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()
}
s := make([]*scheduleController, 0)
var s []*scheduleController
if name != "all" {
sc, ok := c.schedulers[name]
if !ok {
Expand Down
3 changes: 1 addition & 2 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (*testRegionKey) TestShouldRemoveFromSubTree(c *C) {
region.learners = append(region.learners, peer2)
c.Assert(regions.shouldRemoveFromSubTree(region, origin), Equals, true)

origin.learners = append(origin.learners, peer3)
origin.learners = append(origin.learners, peer2)
origin.learners = append(origin.learners, peer3, peer2)
region.learners = append(region.learners, peer4)
c.Assert(regions.shouldRemoveFromSubTree(region, origin), Equals, false)

Expand Down
2 changes: 1 addition & 1 deletion server/encryptionkm/key_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ type keyManagerHelper struct {

func defaultKeyManagerHelper() keyManagerHelper {
return keyManagerHelper{
now: func() time.Time { return time.Now() },
now: time.Now,
tick: func(ticker *time.Ticker) <-chan time.Time { return ticker.C },
newMasterKey: encryption.NewMasterKey,
eventAfterReloadByWatcher: func() {},
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func NewIsolationFilter(scope, isolationLevel string, locationLabels []string, r
}
// Collect all constraints for given isolationLevel
for _, regionStore := range regionStores {
constraintList := make([]string, 0)
var constraintList []string
for i := 0; i <= isolationLevelIdx; i++ {
constraintList = append(constraintList, regionStore.GetLabelValue(locationLabels[i]))
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (m *RuleManager) DeleteGroupBundle(id string, regex bool) error {
if err != nil {
return err
}
matchID = func(a string) bool { return r.MatchString(a) }
matchID = r.MatchString
}

p := m.beginPatch()
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/region_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (r *splitKeyResults) getSplitRegions() map[uint64][]byte {
}

func (r *splitKeyResults) getUnProcessedKeys(splitKeys [][]byte) [][]byte {
unProcessedKeys := make([][]byte, 0)
var unProcessedKeys [][]byte
for _, splitKey := range splitKeys {
processed := false
for _, regionStartKey := range r.newRegions {
Expand Down
3 changes: 1 addition & 2 deletions server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
var res []string
ranges := conf.StoreIDWithRanges[id]
for index := range ranges {
res = append(res, (string)(ranges[index].StartKey))
res = append(res, (string)(ranges[index].EndKey))
res = append(res, (string)(ranges[index].StartKey), (string)(ranges[index].EndKey))
}
return res
}
Expand Down
3 changes: 1 addition & 2 deletions server/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ func (conf *grantLeaderSchedulerConfig) getRanges(id uint64) []string {
var res []string
ranges := conf.StoreIDWithRanges[id]
for index := range ranges {
res = append(res, (string)(ranges[index].StartKey))
res = append(res, (string)(ranges[index].EndKey))
res = append(res, (string)(ranges[index].StartKey), (string)(ranges[index].EndKey))
}
return res
}
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func summaryStoresLoad(
keyRate := storeKeyRate[id]

// Find all hot peers first
hotPeers := make([]*statistics.HotPeerStat, 0)
var hotPeers []*statistics.HotPeerStat
{
byteSum := 0.0
keySum := 0.0
Expand Down Expand Up @@ -365,7 +365,7 @@ func filterHotPeers(
minHotDegree int,
peers []*statistics.HotPeerStat,
) []*statistics.HotPeerStat {
ret := make([]*statistics.HotPeerStat, 0)
var ret []*statistics.HotPeerStat
for _, peer := range peers {
if (kind == core.LeaderKind && !peer.IsLeader()) ||
peer.HotDegree < minHotDegree {
Expand Down
Loading

0 comments on commit 9bdc111

Please sign in to comment.