Skip to content

Commit

Permalink
region_syncer: reduce saveKV of client (#3808) (#3993)
Browse files Browse the repository at this point in the history
* region_syncer: reduce saveKV of client (#3808)

Signed-off-by: HunDunDM <hundundm@gmail.com>

* tiny fix

Signed-off-by: HunDunDM <hundundm@gmail.com>

Co-authored-by: 混沌DM <hundundm@gmail.com>
  • Loading branch information
ti-chi-bot and HunDunDM authored Aug 27, 2021
1 parent 88726b2 commit f3c96e0
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 78 deletions.
74 changes: 3 additions & 71 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/component"
Expand Down Expand Up @@ -538,6 +537,8 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
Expand All @@ -553,76 +554,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
var saveKV, saveCache, isNew, needSync bool
if origin == nil {
log.Debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", core.RegionToHexMeta(region.GetMeta())))
saveKV, saveCache, isNew = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
log.Info("region Version changed",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactString("detail", core.DiffRegionKeyInfo(origin, region)),
zap.Uint64("old-version", o.GetVersion()),
zap.Uint64("new-version", r.GetVersion()),
)
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
log.Info("region ConfVer changed",
zap.Uint64("region-id", region.GetID()),
zap.String("detail", core.DiffRegionPeersInfo(origin, region)),
zap.Uint64("old-confver", o.GetConfVer()),
zap.Uint64("new-confver", r.GetConfVer()),
)
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else {
log.Info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
zap.Uint64("to", region.GetLeader().GetStoreId()),
)
}
saveCache, needSync = true, true
}
if !core.SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
log.Debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if !core.SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
log.Debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
}

if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}

if c.traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() ||
region.GetBytesRead() != origin.GetBytesRead() ||
region.GetKeysWritten() != origin.GetKeysWritten() ||
region.GetKeysRead() != origin.GetKeysRead()) {
saveCache, needSync = true, true
}

if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
}

isNew, saveKV, saveCache, needSync := regionGuide(region, origin, c.traceRegionFlow)
if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew {
return nil
}
Expand Down
92 changes: 92 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/logutil"
"go.uber.org/zap"
)

// errRegionIsStale is error info for region is stale.
Expand Down Expand Up @@ -436,6 +439,95 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo, traceRegionFlow bool) (isNew, saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
noLog := func(msg string, fields ...zap.Field) {}
debug, info := noLog, noLog
if enableLog {
debug = log.Debug
info = log.Info
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo, traceRegionFlow bool) (isNew, saveKV, saveCache, needSync bool) {
if origin == nil {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
saveKV, saveCache, isNew = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
info("region Version changed",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactString("detail", DiffRegionKeyInfo(origin, region)),
zap.Uint64("old-version", o.GetVersion()),
zap.Uint64("new-version", r.GetVersion()),
)
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
info("region ConfVer changed",
zap.Uint64("region-id", region.GetID()),
zap.String("detail", DiffRegionPeersInfo(origin, region)),
zap.Uint64("old-confver", o.GetConfVer()),
zap.Uint64("new-confver", r.GetConfVer()),
)
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
zap.Uint64("to", region.GetLeader().GetStoreId()),
)
}
saveCache, needSync = true, true
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
}

if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}

if traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() ||
region.GetBytesRead() != origin.GetBytesRead() ||
region.GetKeysWritten() != origin.GetKeysWritten() ||
region.GetKeysRead() != origin.GetKeysRead()) {
saveCache, needSync = true, true
}

if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
}
return
}
}

// regionMap wraps a map[uint64]*core.RegionInfo and supports randomly pick a region.
type regionMap struct {
m map[uint64]*RegionInfo
Expand Down
27 changes: 22 additions & 5 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) {
return syncStream, nil
}

var regionGuide = core.GenerateRegionGuideFunc(false)

// StartSyncWithLeader starts to sync with leader.
func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.wg.Add(1)
Expand All @@ -132,7 +134,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
go func() {
defer s.wg.Done()
// used to load region from kv storage to cache storage.
err := s.server.GetStorage().LoadRegionsOnce(s.server.GetBasicCluster().CheckAndPutRegion)
bc := s.server.GetBasicCluster()
storage := s.server.GetStorage()
err := storage.LoadRegionsOnce(bc.CheckAndPutRegion)
if err != nil {
log.Warn("failed to load regions.", errs.ZapError(err))
}
Expand All @@ -149,9 +153,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
continue
}
defer conn.Close()
break
}
defer conn.Close()

// Start syncing data.
for {
Expand All @@ -172,6 +176,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
time.Sleep(time.Second)
continue
}

log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
for {
resp, err := stream.Recv()
Expand Down Expand Up @@ -201,7 +206,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region *core.RegionInfo
regionLeader *metapb.Peer
)
if len(regionLeaders) > i && regionLeaders[i].Id != 0 {
if len(regionLeaders) > i && regionLeaders[i].GetId() != 0 {
regionLeader = regionLeaders[i]
}
if hasStats {
Expand All @@ -215,11 +220,23 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region = core.NewRegionInfo(r, regionLeader)
}

s.server.GetBasicCluster().CheckAndPutRegion(region)
err = s.server.GetStorage().SaveRegion(r)
origin, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin, true)
overlaps := bc.PutRegion(region)

if saveKV {
err = storage.SaveRegion(r)
}
if err == nil {
s.history.Record(region)
}
for _, old := range overlaps {
_ = storage.DeleteRegion(old.GetMeta())
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
return
case first := <-regionNotifier:
requests = append(requests, first.GetMeta())
stats := append(stats, first.GetStat())
leaders := append(leaders, first.GetLeader())
stats = append(stats, first.GetStat())
leaders = append(leaders, first.GetLeader())
startIndex := s.history.GetNextIndex()
s.history.Record(first)
pending := len(regionNotifier)
Expand All @@ -140,6 +140,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
s.broadcast(alive)
}
requests = requests[:0]
stats = stats[:0]
leaders = leaders[:0]
}
}

Expand Down Expand Up @@ -210,9 +212,11 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
lastIndex += len(metas)
if err := stream.Send(resp); err != nil {
log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err))
return err
}
metas = metas[:0]
stats = stats[:0]
leaders = leaders[:0]
}
log.Info("requested server has completed full synchronization with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))
Expand Down

0 comments on commit f3c96e0

Please sign in to comment.