Skip to content

Commit

Permalink
Merge branch 'master' into fix-5788
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 19, 2022
2 parents 8bb7c99 + 544c86a commit 3066c18
Showing 1 changed file with 70 additions and 22 deletions.
92 changes: 70 additions & 22 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ import (
)

const (
bytesUnit = 1 << 23 // 8MB
keysUint = 1 << 13 // 8K
intervalUint = 60 // 60s
bytesUnit = 8 * units.MiB
keysUint = 8 * units.KiB
queryUnit = 1 * units.KiB
regionReportInterval = 60 // 60s
storeReportInterval = 10 // 10s
capacity = 2 * units.TiB
)

var clusterID uint64
Expand Down Expand Up @@ -121,7 +124,8 @@ func bootstrap(ctx context.Context, cli pdpb.PDClient) {
log.Info("bootstrapped")
}

func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient) {
func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient, regions *Regions) {
storesStats := regions.collectStoresStats(cfg.StoreCount)
for i := uint64(1); i <= uint64(cfg.StoreCount); i++ {
store := &metapb.Store{
Id: i,
Expand All @@ -144,11 +148,7 @@ func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient) {
select {
case <-heartbeatTicker.C:
cctx, cancel := context.WithCancel(ctx)
cli.StoreHeartbeat(cctx, &pdpb.StoreHeartbeatRequest{Header: header(), Stats: &pdpb.StoreStats{
StoreId: storeID,
Capacity: 2 * units.TiB,
Available: 1.5 * units.TiB,
}})
cli.StoreHeartbeat(cctx, &pdpb.StoreHeartbeatRequest{Header: header(), Stats: storesStats[storeID]})
cancel()
case <-ctx.Done():
return
Expand Down Expand Up @@ -203,8 +203,9 @@ func (rs *Regions) init(cfg *config.Config) {
ApproximateSize: bytesUnit,
Interval: &pdpb.TimeInterval{
StartTimestamp: now,
EndTimestamp: now + intervalUint,
EndTimestamp: now + regionReportInterval,
},
QueryStats: &pdpb.QueryStats{},
ApproximateKeys: keysUint,
Term: 1,
}
Expand Down Expand Up @@ -263,21 +264,25 @@ func (rs *Regions) update(replica int) {
// update space
for _, i := range rs.updateSpace {
region := rs.regions[i]
region.ApproximateSize += bytesUnit
region.ApproximateKeys += keysUint
region.ApproximateSize = uint64(bytesUnit * rand.Float64())
region.ApproximateKeys = uint64(keysUint * rand.Float64())
}
// update flow
for _, i := range rs.updateFlow {
region := rs.regions[i]
region.BytesWritten += bytesUnit
region.BytesRead += bytesUnit
region.KeysWritten += keysUint
region.KeysRead += keysUint
region.BytesWritten = uint64(bytesUnit * rand.Float64())
region.BytesRead = uint64(bytesUnit * rand.Float64())
region.KeysWritten = uint64(keysUint * rand.Float64())
region.KeysRead = uint64(keysUint * rand.Float64())
region.QueryStats = &pdpb.QueryStats{
Get: uint64(queryUnit * rand.Float64()),
Put: uint64(queryUnit * rand.Float64()),
}
}
// update interval
for _, region := range rs.regions {
region.Interval.StartTimestamp = region.Interval.EndTimestamp
region.Interval.EndTimestamp = region.Interval.StartTimestamp + intervalUint
region.Interval.EndTimestamp = region.Interval.StartTimestamp + regionReportInterval
}
}

Expand Down Expand Up @@ -328,6 +333,50 @@ func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_Regi
log.Info("store finish one round region heartbeat", zap.Uint64("store-id", storeID), zap.Duration("cost-time", time.Since(start)))
}

func (rs *Regions) collectStoresStats(storeCount int) []*pdpb.StoreStats {
stores := make([]*pdpb.StoreStats, storeCount+1)
now := uint64(time.Now().Unix())
for i := 1; i <= storeCount; i++ {
stores[i] = &pdpb.StoreStats{
StoreId: uint64(i),
Capacity: capacity,
Available: capacity,
QueryStats: &pdpb.QueryStats{},
PeerStats: make([]*pdpb.PeerStat, 0),
Interval: &pdpb.TimeInterval{
StartTimestamp: now,
EndTimestamp: now + storeReportInterval,
},
}
}
for _, region := range rs.regions {
for _, peer := range region.Region.Peers {
store := stores[peer.StoreId]
store.UsedSize += region.ApproximateSize
store.Available -= region.ApproximateSize
store.RegionCount += 1
}
store := stores[region.Leader.StoreId]
if region.BytesWritten != 0 {
store.BytesWritten += region.BytesWritten
store.BytesRead += region.BytesRead
store.KeysWritten += region.KeysWritten
store.KeysRead += region.KeysRead
store.QueryStats.Get += region.QueryStats.Get
store.QueryStats.Put += region.QueryStats.Put
store.PeerStats = append(store.PeerStats, &pdpb.PeerStat{
RegionId: region.Region.Id,
ReadKeys: region.KeysRead,
ReadBytes: region.BytesRead,
WrittenKeys: region.KeysWritten,
WrittenBytes: region.BytesWritten,
QueryStats: region.QueryStats,
})
}
}
return stores
}

func main() {
cfg := config.NewConfig()
err := cfg.Parse(os.Args[1:])
Expand Down Expand Up @@ -366,18 +415,17 @@ func main() {
}()
cli := newClient(cfg)
initClusterID(ctx, cli)
bootstrap(ctx, cli)
putStores(ctx, cfg, cli)
log.Info("finish put stores")
regions := new(Regions)
regions.init(cfg)
log.Info("finish init regions")

bootstrap(ctx, cli)
putStores(ctx, cfg, cli, regions)
log.Info("finish put stores")
streams := make(map[uint64]pdpb.PD_RegionHeartbeatClient, cfg.StoreCount)
for i := 1; i <= cfg.StoreCount; i++ {
streams[uint64(i)] = createHeartbeatStream(ctx, cfg)
}
var heartbeatTicker = time.NewTicker(60 * time.Second)
var heartbeatTicker = time.NewTicker(regionReportInterval * time.Second)
defer heartbeatTicker.Stop()
for {
select {
Expand Down

0 comments on commit 3066c18

Please sign in to comment.