Skip to content

Commit

Permalink
cluster: intervally persistent store meta (#2143)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Feb 21, 2020
1 parent 7018734 commit 33155fe
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d
github.com/pingcap/kvproto v0.0.0-20200217122707-dba161a41fd2
github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200217122707-dba161a41fd2 h1:2ei+1u0CAqj4NLuh6U5lD7t/aAYRcFMJ8dPan3647nA=
github.com/pingcap/kvproto v0.0.0-20200217122707-dba161a41fd2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8 h1:dq2pqQZMlXd9nUN6P0WFkhdzPiGPGsVAXrAw1z3BsJY=
github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
2 changes: 1 addition & 1 deletion server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
}

if store.GetStoreStats() != nil {
startTS := store.GetStartTS()
startTS := store.GetStartTime()
s.Status.StartTS = &startTS
}
if lastHeartbeat := store.GetLastHeartbeatTS(); !lastHeartbeat.IsZero() {
Expand Down
7 changes: 6 additions & 1 deletion server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/docker/go-units"
"github.com/gogo/protobuf/proto"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -112,7 +113,11 @@ func checkStoresInfo(c *C, ss []*StoreInfo, want []*metapb.Store) {
}
}
for _, s := range ss {
c.Assert(s.Store.Store, DeepEquals, mapWant[s.Store.Store.Id])
obtained := proto.Clone(s.Store.Store).(*metapb.Store)
expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store)
// Ignore lastHeartbeat
obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0
c.Assert(obtained, DeepEquals, expected)
}
}

Expand Down
7 changes: 7 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
zap.Uint64("capacity", newStore.GetCapacity()),
zap.Uint64("available", newStore.GetAvailable()))
}
if newStore.NeedPersist() && c.storage != nil {
if err := c.storage.SaveStore(store.GetMeta()); err != nil {
log.Error("failed to persist store", zap.Uint64("store-id", newStore.GetID()))
} else {
newStore = newStore.Clone(core.SetLastPersistTime(time.Now()))
}
}
c.core.PutStore(newStore)
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
c.storesStats.UpdateTotalBytesRate(c.core.GetStores)
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
c.Assert(cluster.putStoreLocked(store), IsNil)
c.Assert(cluster.GetStoreCount(), Equals, i+1)

c.Assert(store.GetLastHeartbeatTS().IsZero(), IsTrue)
c.Assert(store.GetLastHeartbeatTS().UnixNano(), Equals, int64(0))

c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)

s := cluster.GetStore(store.GetID())
c.Assert(s.GetLastHeartbeatTS().IsZero(), IsFalse)
c.Assert(s.GetLastHeartbeatTS().UnixNano(), Not(Equals), int64(0))
c.Assert(s.GetStoreStats(), DeepEquals, storeStats)
}

Expand Down
27 changes: 15 additions & 12 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"go.uber.org/zap"
)

// Interval to save store meta (including heartbeat ts) to etcd.
const storePersistInterval = 5 * time.Minute

// StoreInfo contains information about a store.
type StoreInfo struct {
meta *metapb.Store
Expand All @@ -38,7 +41,7 @@ type StoreInfo struct {
leaderSize int64
regionSize int64
pendingPeerCount int
lastHeartbeatTS time.Time
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
available func() bool
Expand Down Expand Up @@ -70,7 +73,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
leaderSize: s.leaderSize,
regionSize: s.regionSize,
pendingPeerCount: s.pendingPeerCount,
lastHeartbeatTS: s.lastHeartbeatTS,
lastPersistTime: s.lastPersistTime,
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
available: s.available,
Expand Down Expand Up @@ -205,11 +208,6 @@ func (s *StoreInfo) GetApplyingSnapCount() uint32 {
return s.stats.GetApplyingSnapCount()
}

// GetStartTime returns the start time of the store.
func (s *StoreInfo) GetStartTime() uint32 {
return s.stats.GetStartTime()
}

// GetLeaderCount returns the leader count of the store.
func (s *StoreInfo) GetLeaderCount() int {
return s.leaderCount
Expand Down Expand Up @@ -247,7 +245,12 @@ func (s *StoreInfo) GetRegionWeight() float64 {

// GetLastHeartbeatTS returns the last heartbeat timestamp of the store.
func (s *StoreInfo) GetLastHeartbeatTS() time.Time {
return s.lastHeartbeatTS
return time.Unix(0, s.meta.GetLastHeartbeat())
}

// NeedPersist returns if it needs to save to etcd.
func (s *StoreInfo) NeedPersist() bool {
return s.GetLastHeartbeatTS().Sub(s.lastPersistTime) > storePersistInterval
}

const minWeight = 1e-6
Expand Down Expand Up @@ -385,14 +388,14 @@ func (s *StoreInfo) ResourceWeight(kind ResourceKind) float64 {
}
}

// GetStartTS returns the start timestamp.
func (s *StoreInfo) GetStartTS() time.Time {
return time.Unix(int64(s.GetStartTime()), 0)
// GetStartTime returns the start timestamp.
func (s *StoreInfo) GetStartTime() time.Time {
return time.Unix(s.meta.GetStartTimestamp(), 0)
}

// GetUptime returns the uptime.
func (s *StoreInfo) GetUptime() time.Duration {
uptime := s.GetLastHeartbeatTS().Sub(s.GetStartTS())
uptime := s.GetLastHeartbeatTS().Sub(s.GetStartTime())
if uptime > 0 {
return uptime
}
Expand Down
9 changes: 8 additions & 1 deletion server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ func SetRegionWeight(regionWeight float64) StoreCreateOption {
// SetLastHeartbeatTS sets the time of last heartbeat for the store.
func SetLastHeartbeatTS(lastHeartbeatTS time.Time) StoreCreateOption {
return func(store *StoreInfo) {
store.lastHeartbeatTS = lastHeartbeatTS
store.meta.LastHeartbeat = lastHeartbeatTS.UnixNano()
}
}

// SetLastPersistTime updates the time of last persistent.
func SetLastPersistTime(lastPersist time.Time) StoreCreateOption {
return func(store *StoreInfo) {
store.lastPersistTime = lastPersist
}
}

Expand Down
7 changes: 6 additions & 1 deletion tests/pdctl/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"path/filepath"
"sort"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -85,7 +86,11 @@ func CheckStoresInfo(c *check.C, stores []*api.StoreInfo, want []*metapb.Store)
}
}
for _, s := range stores {
c.Assert(s.Store.Store, check.DeepEquals, mapWant[s.Store.Store.Id])
obtained := proto.Clone(s.Store.Store).(*metapb.Store)
expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store)
// Ignore lastHeartbeat
obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0
c.Assert(obtained, check.DeepEquals, expected)
}
}

Expand Down

0 comments on commit 33155fe

Please sign in to comment.