diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 9bed409312f..7ae6f061e65 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/component" @@ -538,6 +537,8 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { return nil } +var regionGuide = core.GenerateRegionGuideFunc(true) + // processRegionHeartbeat updates the region information. func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.RLock() @@ -553,77 +554,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. - var saveKV, saveCache, isNew, needSync bool - if origin == nil { - log.Debug("insert new region", - zap.Uint64("region-id", region.GetID()), - logutil.ZapRedactStringer("meta-region", core.RegionToHexMeta(region.GetMeta()))) - saveKV, saveCache, isNew = true, true, true - } else { - r := region.GetRegionEpoch() - o := origin.GetRegionEpoch() - if r.GetVersion() > o.GetVersion() { - log.Info("region Version changed", - zap.Uint64("region-id", region.GetID()), - logutil.ZapRedactString("detail", core.DiffRegionKeyInfo(origin, region)), - zap.Uint64("old-version", o.GetVersion()), - zap.Uint64("new-version", r.GetVersion()), - ) - saveKV, saveCache = true, true - } - if r.GetConfVer() > o.GetConfVer() { - log.Info("region ConfVer changed", - zap.Uint64("region-id", region.GetID()), - zap.String("detail", core.DiffRegionPeersInfo(origin, region)), - zap.Uint64("old-confver", o.GetConfVer()), - zap.Uint64("new-confver", r.GetConfVer()), - ) - saveKV, saveCache = true, true - } - if region.GetLeader().GetId() != origin.GetLeader().GetId() { - if origin.GetLeader().GetId() == 0 { - isNew = true - } else { - log.Info("leader changed", - zap.Uint64("region-id", region.GetID()), - zap.Uint64("from", origin.GetLeader().GetStoreId()), - zap.Uint64("to", region.GetLeader().GetStoreId()), - ) - } - saveCache, needSync = true, true - } - if !core.SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) { - log.Debug("down-peers changed", zap.Uint64("region-id", region.GetID())) - saveCache, needSync = true, true - } - if !core.SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) { - log.Debug("pending-peers changed", zap.Uint64("region-id", region.GetID())) - saveCache, needSync = true, true - } - if len(region.GetPeers()) != len(origin.GetPeers()) { - saveKV, saveCache = true, true - } - - if region.GetApproximateSize() != origin.GetApproximateSize() || - region.GetApproximateKeys() != origin.GetApproximateKeys() { - saveCache = true - } - - if c.traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() || - region.GetBytesRead() != origin.GetBytesRead() || - region.GetKeysWritten() != origin.GetKeysWritten() || - region.GetKeysRead() != origin.GetKeysRead()) { - saveCache, needSync = true, true - } - - if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN && - (region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() || - region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) { - saveCache = true - } - } - - if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew { + isNew, saveKV, saveCache, needSync := regionGuide(region, origin, c.traceRegionFlow) + if !saveKV && !saveCache && !isNew { return nil } diff --git a/server/core/region.go b/server/core/region.go index e7c1e1fd21a..745091c9626 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -27,6 +27,9 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/logutil" + "go.uber.org/zap" ) // errRegionIsStale is error info for region is stale. @@ -436,6 +439,95 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio return r.replicationStatus } +// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin +// and new region information. +type RegionGuideFunc func(region, origin *RegionInfo, traceRegionFlow bool) (isNew, saveKV, saveCache, needSync bool) + +// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. +// nil means do not print the log. +func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { + noLog := func(msg string, fields ...zap.Field) {} + debug, info := noLog, noLog + if enableLog { + debug = log.Debug + info = log.Info + } + // Save to storage if meta is updated. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + // Mark isNew if the region in cache does not have leader. + return func(region, origin *RegionInfo, traceRegionFlow bool) (isNew, saveKV, saveCache, needSync bool) { + if origin == nil { + debug("insert new region", + zap.Uint64("region-id", region.GetID()), + logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) + saveKV, saveCache, isNew = true, true, true + } else { + r := region.GetRegionEpoch() + o := origin.GetRegionEpoch() + if r.GetVersion() > o.GetVersion() { + info("region Version changed", + zap.Uint64("region-id", region.GetID()), + logutil.ZapRedactString("detail", DiffRegionKeyInfo(origin, region)), + zap.Uint64("old-version", o.GetVersion()), + zap.Uint64("new-version", r.GetVersion()), + ) + saveKV, saveCache = true, true + } + if r.GetConfVer() > o.GetConfVer() { + info("region ConfVer changed", + zap.Uint64("region-id", region.GetID()), + zap.String("detail", DiffRegionPeersInfo(origin, region)), + zap.Uint64("old-confver", o.GetConfVer()), + zap.Uint64("new-confver", r.GetConfVer()), + ) + saveKV, saveCache = true, true + } + if region.GetLeader().GetId() != origin.GetLeader().GetId() { + if origin.GetLeader().GetId() == 0 { + isNew = true + } else { + info("leader changed", + zap.Uint64("region-id", region.GetID()), + zap.Uint64("from", origin.GetLeader().GetStoreId()), + zap.Uint64("to", region.GetLeader().GetStoreId()), + ) + } + saveCache, needSync = true, true + } + if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) { + debug("down-peers changed", zap.Uint64("region-id", region.GetID())) + saveCache, needSync = true, true + } + if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) { + debug("pending-peers changed", zap.Uint64("region-id", region.GetID())) + saveCache, needSync = true, true + } + if len(region.GetPeers()) != len(origin.GetPeers()) { + saveKV, saveCache = true, true + } + + if region.GetApproximateSize() != origin.GetApproximateSize() || + region.GetApproximateKeys() != origin.GetApproximateKeys() { + saveCache = true + } + + if traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() || + region.GetBytesRead() != origin.GetBytesRead() || + region.GetKeysWritten() != origin.GetKeysWritten() || + region.GetKeysRead() != origin.GetKeysRead()) { + saveCache, needSync = true, true + } + + if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN && + (region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() || + region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) { + saveCache = true + } + } + return + } +} + // regionMap wraps a map[uint64]*core.RegionInfo and supports randomly pick a region. type regionMap struct { m map[uint64]*RegionInfo diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 590eceaab1c..2f425cf8f6b 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -123,6 +123,8 @@ func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) { return syncStream, nil } +var regionGuide = core.GenerateRegionGuideFunc(false) + // StartSyncWithLeader starts to sync with leader. func (s *RegionSyncer) StartSyncWithLeader(addr string) { s.wg.Add(1) @@ -132,7 +134,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { go func() { defer s.wg.Done() // used to load region from kv storage to cache storage. - err := s.server.GetStorage().LoadRegionsOnce(s.server.GetBasicCluster().CheckAndPutRegion) + bc := s.server.GetBasicCluster() + storage := s.server.GetStorage() + err := storage.LoadRegionsOnce(bc.CheckAndPutRegion) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) } @@ -149,9 +153,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) continue } - defer conn.Close() break } + defer conn.Close() // Start syncing data. for { @@ -172,6 +176,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) continue } + log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex())) for { resp, err := stream.Recv() @@ -201,7 +206,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region *core.RegionInfo regionLeader *metapb.Peer ) - if len(regionLeaders) > i && regionLeaders[i].Id != 0 { + if len(regionLeaders) > i && regionLeaders[i].GetId() != 0 { regionLeader = regionLeaders[i] } if hasStats { @@ -215,11 +220,23 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region = core.NewRegionInfo(r, regionLeader) } - s.server.GetBasicCluster().CheckAndPutRegion(region) - err = s.server.GetStorage().SaveRegion(r) + origin, err := bc.PreCheckPutRegion(region) + if err != nil { + log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) + continue + } + _, saveKV, _, _ := regionGuide(region, origin, true) + overlaps := bc.PutRegion(region) + + if saveKV { + err = storage.SaveRegion(r) + } if err == nil { s.history.Record(region) } + for _, old := range overlaps { + _ = storage.DeleteRegion(old.GetMeta()) + } } } } diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 02693ba334e..d0ab8f7f221 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -112,8 +112,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch return case first := <-regionNotifier: requests = append(requests, first.GetMeta()) - stats := append(stats, first.GetStat()) - leaders := append(leaders, first.GetLeader()) + stats = append(stats, first.GetStat()) + leaders = append(leaders, first.GetLeader()) startIndex := s.history.GetNextIndex() s.history.Record(first) pending := len(regionNotifier) @@ -140,6 +140,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch s.broadcast(alive) } requests = requests[:0] + stats = stats[:0] + leaders = leaders[:0] } } @@ -210,9 +212,11 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream lastIndex += len(metas) if err := stream.Send(resp); err != nil { log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err)) + return err } metas = metas[:0] stats = stats[:0] + leaders = leaders[:0] } log.Info("requested server has completed full synchronization with server", zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))