From 33155fe0d571c0aad9412653d9d39d635afd6c46 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 21 Feb 2020 15:21:13 +0800 Subject: [PATCH] cluster: intervally persistent store meta (#2143) Signed-off-by: disksing --- go.mod | 2 +- go.sum | 4 ++-- server/api/store.go | 2 +- server/api/store_test.go | 7 ++++++- server/cluster/cluster.go | 7 +++++++ server/cluster/cluster_test.go | 4 ++-- server/core/store.go | 27 +++++++++++++++------------ server/core/store_option.go | 9 ++++++++- tests/pdctl/helper.go | 7 ++++++- 9 files changed, 48 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 49f87658678..97440c93d6f 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d - github.com/pingcap/kvproto v0.0.0-20200217122707-dba161a41fd2 + github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8 github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 93b7e00b1b5..2340f4e9151 100644 --- a/go.sum +++ b/go.sum @@ -249,8 +249,8 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20200217122707-dba161a41fd2 h1:2ei+1u0CAqj4NLuh6U5lD7t/aAYRcFMJ8dPan3647nA= -github.com/pingcap/kvproto v0.0.0-20200217122707-dba161a41fd2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8 h1:dq2pqQZMlXd9nUN6P0WFkhdzPiGPGsVAXrAw1z3BsJY= +github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= diff --git a/server/api/store.go b/server/api/store.go index a5d5883cfca..11cc24c9d7d 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -97,7 +97,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo } if store.GetStoreStats() != nil { - startTS := store.GetStartTS() + startTS := store.GetStartTime() s.Status.StartTS = &startTS } if lastHeartbeat := store.GetLastHeartbeatTS(); !lastHeartbeat.IsZero() { diff --git a/server/api/store_test.go b/server/api/store_test.go index 209ea7505ff..2c590570d87 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/docker/go-units" + "github.com/gogo/protobuf/proto" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -112,7 +113,11 @@ func checkStoresInfo(c *C, ss []*StoreInfo, want []*metapb.Store) { } } for _, s := range ss { - c.Assert(s.Store.Store, DeepEquals, mapWant[s.Store.Store.Id]) + obtained := proto.Clone(s.Store.Store).(*metapb.Store) + expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store) + // Ignore lastHeartbeat + obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0 + c.Assert(obtained, DeepEquals, expected) } } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 04d2f54c536..7ccfcea5596 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -392,6 +392,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { zap.Uint64("capacity", newStore.GetCapacity()), zap.Uint64("available", newStore.GetAvailable())) } + if newStore.NeedPersist() && c.storage != nil { + if err := c.storage.SaveStore(store.GetMeta()); err != nil { + log.Error("failed to persist store", zap.Uint64("store-id", newStore.GetID())) + } else { + newStore = newStore.Clone(core.SetLastPersistTime(time.Now())) + } + } c.core.PutStore(newStore) c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats()) c.storesStats.UpdateTotalBytesRate(c.core.GetStores) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 882d0c78882..109aefff21c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -63,12 +63,12 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { c.Assert(cluster.putStoreLocked(store), IsNil) c.Assert(cluster.GetStoreCount(), Equals, i+1) - c.Assert(store.GetLastHeartbeatTS().IsZero(), IsTrue) + c.Assert(store.GetLastHeartbeatTS().UnixNano(), Equals, int64(0)) c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil) s := cluster.GetStore(store.GetID()) - c.Assert(s.GetLastHeartbeatTS().IsZero(), IsFalse) + c.Assert(s.GetLastHeartbeatTS().UnixNano(), Not(Equals), int64(0)) c.Assert(s.GetStoreStats(), DeepEquals, storeStats) } diff --git a/server/core/store.go b/server/core/store.go index d3dd46cdb62..ef59ec8d9ee 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -27,6 +27,9 @@ import ( "go.uber.org/zap" ) +// Interval to save store meta (including heartbeat ts) to etcd. +const storePersistInterval = 5 * time.Minute + // StoreInfo contains information about a store. type StoreInfo struct { meta *metapb.Store @@ -38,7 +41,7 @@ type StoreInfo struct { leaderSize int64 regionSize int64 pendingPeerCount int - lastHeartbeatTS time.Time + lastPersistTime time.Time leaderWeight float64 regionWeight float64 available func() bool @@ -70,7 +73,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo { leaderSize: s.leaderSize, regionSize: s.regionSize, pendingPeerCount: s.pendingPeerCount, - lastHeartbeatTS: s.lastHeartbeatTS, + lastPersistTime: s.lastPersistTime, leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, available: s.available, @@ -205,11 +208,6 @@ func (s *StoreInfo) GetApplyingSnapCount() uint32 { return s.stats.GetApplyingSnapCount() } -// GetStartTime returns the start time of the store. -func (s *StoreInfo) GetStartTime() uint32 { - return s.stats.GetStartTime() -} - // GetLeaderCount returns the leader count of the store. func (s *StoreInfo) GetLeaderCount() int { return s.leaderCount @@ -247,7 +245,12 @@ func (s *StoreInfo) GetRegionWeight() float64 { // GetLastHeartbeatTS returns the last heartbeat timestamp of the store. func (s *StoreInfo) GetLastHeartbeatTS() time.Time { - return s.lastHeartbeatTS + return time.Unix(0, s.meta.GetLastHeartbeat()) +} + +// NeedPersist returns if it needs to save to etcd. +func (s *StoreInfo) NeedPersist() bool { + return s.GetLastHeartbeatTS().Sub(s.lastPersistTime) > storePersistInterval } const minWeight = 1e-6 @@ -385,14 +388,14 @@ func (s *StoreInfo) ResourceWeight(kind ResourceKind) float64 { } } -// GetStartTS returns the start timestamp. -func (s *StoreInfo) GetStartTS() time.Time { - return time.Unix(int64(s.GetStartTime()), 0) +// GetStartTime returns the start timestamp. +func (s *StoreInfo) GetStartTime() time.Time { + return time.Unix(s.meta.GetStartTimestamp(), 0) } // GetUptime returns the uptime. func (s *StoreInfo) GetUptime() time.Duration { - uptime := s.GetLastHeartbeatTS().Sub(s.GetStartTS()) + uptime := s.GetLastHeartbeatTS().Sub(s.GetStartTime()) if uptime > 0 { return uptime } diff --git a/server/core/store_option.go b/server/core/store_option.go index 1aa679c65e8..378fc2a8578 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -138,7 +138,14 @@ func SetRegionWeight(regionWeight float64) StoreCreateOption { // SetLastHeartbeatTS sets the time of last heartbeat for the store. func SetLastHeartbeatTS(lastHeartbeatTS time.Time) StoreCreateOption { return func(store *StoreInfo) { - store.lastHeartbeatTS = lastHeartbeatTS + store.meta.LastHeartbeat = lastHeartbeatTS.UnixNano() + } +} + +// SetLastPersistTime updates the time of last persistent. +func SetLastPersistTime(lastPersist time.Time) StoreCreateOption { + return func(store *StoreInfo) { + store.lastPersistTime = lastPersist } } diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 143283559fa..03249460a9b 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -22,6 +22,7 @@ import ( "path/filepath" "sort" + "github.com/gogo/protobuf/proto" "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -85,7 +86,11 @@ func CheckStoresInfo(c *check.C, stores []*api.StoreInfo, want []*metapb.Store) } } for _, s := range stores { - c.Assert(s.Store.Store, check.DeepEquals, mapWant[s.Store.Store.Id]) + obtained := proto.Clone(s.Store.Store).(*metapb.Store) + expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store) + // Ignore lastHeartbeat + obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0 + c.Assert(obtained, check.DeepEquals, expected) } }