Skip to content

Commit

Permalink
etcdserver: add watchdog to detect stalled writes
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Mar 10, 2023
1 parent b3bb996 commit 8a50ca5
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 11 deletions.
18 changes: 18 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const (
DefaultDiscoveryKeepAliveTime = 2 * time.Second
DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second

DefaultWatchdogTolerateInactiveTimeout = 10 * time.Second

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"

Expand Down Expand Up @@ -380,6 +382,9 @@ type Config struct {
// Defaults to 0.
ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"`

EnableWatchDog bool `json:"experimental-enable-watchdog"`
WatchDogTolerateInactiveTimeout time.Duration `json:"experimental-watchdog-tolerate-inactive-timeout"`

// Logger is logger options: currently only supports "zap".
// "capnslog" is removed in v3.5.
Logger string `json:"logger"`
Expand Down Expand Up @@ -532,6 +537,8 @@ func NewConfig() *Config {
ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,

WatchDogTolerateInactiveTimeout: DefaultWatchdogTolerateInactiveTimeout,

V2Deprecation: config.V2_DEPR_DEFAULT,

DiscoveryCfg: v3discovery.DiscoveryConfig{
Expand Down Expand Up @@ -750,6 +757,17 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
}

if cfg.EnableWatchDog {
if cfg.WatchDogTolerateInactiveTimeout == 0 {
return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout must be >0 (set to %s)", cfg.WatchDogTolerateInactiveTimeout)
}

if int64(2*cfg.ElectionMs) > cfg.WatchDogTolerateInactiveTimeout.Milliseconds() {
return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout[%s] should be at least as 2 times as --election-timeout[%dms]",
cfg.WatchDogTolerateInactiveTimeout, cfg.ElectionMs)
}
}

// check this last since proxying in etcdmain may make this OK
if cfg.LCUrls != nil && cfg.ACUrls == nil {
return ErrUnsetAdvertiseClientURLsFlag
Expand Down
37 changes: 29 additions & 8 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/watchdog"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/soheilhy/cmux"
Expand Down Expand Up @@ -263,6 +264,12 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
return e, err
}
}

if cfg.EnableWatchDog {
inactiveTimeoutMs := cfg.WatchDogTolerateInactiveTimeout.Milliseconds()
watchdog.Start(e.cfg.logger, e.Server.StoppingNotify(), e.Stop, inactiveTimeoutMs)
}

e.Server.Start()

if err = e.servePeers(); err != nil {
Expand Down Expand Up @@ -400,6 +407,28 @@ func (e *Etcd) Close() {
lg.Sync()
}()

e.Stop()

// close rafthttp transports
if e.Server != nil {
e.Server.Stop()
}

if e.errc != nil {
close(e.errc)
}
}

func (e *Etcd) Stop() {
fields := []zap.Field{
zap.String("name", e.cfg.Name),
zap.String("data-dir", e.cfg.Dir),
zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()),
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
}
lg := e.GetLogger()
lg.Info("stopping etcd server", fields...)

e.closeOnce.Do(func() {
close(e.stopc)
})
Expand Down Expand Up @@ -436,11 +465,6 @@ func (e *Etcd) Close() {
e.tracingExporterShutdown()
}

// close rafthttp transports
if e.Server != nil {
e.Server.Stop()
}

// close all idle connections in peer handler (wait up to 1-second)
for i := range e.Peers {
if e.Peers[i] != nil && e.Peers[i].close != nil {
Expand All @@ -449,9 +473,6 @@ func (e *Etcd) Close() {
cancel()
}
}
if e.errc != nil {
close(e.errc)
}
}

func stopServers(ctx context.Context, ss *servers) {
Expand Down
3 changes: 3 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.")
fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.")

fs.BoolVar(&cfg.ec.EnableWatchDog, "experimental-enable-watchdog", cfg.ec.EnableWatchDog, "Enable watchdog to detect inactive activities.")
fs.DurationVar(&cfg.ec.WatchDogTolerateInactiveTimeout, "experimental-watchdog-tolerate-inactive-timeout", cfg.ec.WatchDogTolerateInactiveTimeout, "Maximum inactive duration of each activity that the watchdog tolerates.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
Expand Down
3 changes: 3 additions & 0 deletions server/etcdserver/api/snap/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/etcd/server/v3/watchdog"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

Expand Down Expand Up @@ -88,7 +89,9 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
spath := filepath.Join(s.dir, fname)

fsyncStart := time.Now()
cancel := watchdog.Register("save v2 snapshot")
err = pioutil.WriteAndSyncFile(spath, d, 0666)
cancel()
snapFsyncSec.Observe(time.Since(fsyncStart).Seconds())

if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/watchdog"
)

type BucketID int
Expand Down Expand Up @@ -114,7 +115,9 @@ func (t *batchTx) RUnlock() {
}

func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
cancel := watchdog.Register("batchTx createBucket")
_, err := t.tx.CreateBucket(bucket.Name())
cancel()
if err != nil && err != bolt.ErrBucketExists {
t.backend.lg.Fatal(
"failed to create a bucket",
Expand All @@ -126,7 +129,9 @@ func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
}

func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
cancel := watchdog.Register("batchTx deleteBucket")
err := t.tx.DeleteBucket(bucket.Name())
cancel()
if err != nil && err != bolt.ErrBucketNotFound {
t.backend.lg.Fatal(
"failed to delete a bucket",
Expand Down Expand Up @@ -161,7 +166,11 @@ func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq boo
// this can delay the page split and reduce space usage.
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {

cancel := watchdog.Register("batchTx put")
err := bucket.Put(key, value)
cancel()
if err != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.Stringer("bucket-name", bucketType),
Expand Down Expand Up @@ -216,7 +225,9 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
zap.Stack("stack"),
)
}
cancel := watchdog.Register("batchTx delete")
err := bucket.Delete(key)
cancel()
if err != nil {
t.backend.lg.Fatal(
"failed to delete a key",
Expand Down Expand Up @@ -269,7 +280,9 @@ func (t *batchTx) commit(stop bool) {
start := time.Now()

// gofail: var beforeCommit struct{}
cancel := watchdog.Register("batchTx commit")
err := t.tx.Commit()
cancel()
// gofail: var afterCommit struct{}

rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
Expand Down
18 changes: 16 additions & 2 deletions server/storage/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/etcd/server/v3/watchdog"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

Expand Down Expand Up @@ -798,6 +799,8 @@ func (w *WAL) cut() error {
}

func (w *WAL) sync() error {
cancel := watchdog.Register("WAL sync")
defer cancel()
if w.encoder != nil {
if err := w.encoder.flush(); err != nil {
return err
Expand Down Expand Up @@ -935,12 +938,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
mustSync := raft.MustSync(st, w.state, len(ents))

// TODO(xiangli): no more reference operator
cancel := watchdog.Register("WAL saveEntry")
for i := range ents {
if err := w.saveEntry(&ents[i]); err != nil {
cancel()
return err
}
}
if err := w.saveState(&st); err != nil {
cancel()

cancel = watchdog.Register("WAL saveState")
err := w.saveState(&st)
cancel()
if err != nil {
return err
}

Expand Down Expand Up @@ -972,9 +982,13 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
defer w.mu.Unlock()

rec := &walpb.Record{Type: SnapshotType, Data: b}
if err := w.encoder.encode(rec); err != nil {
cancel := watchdog.Register("WAL saveSnapshot")
err := w.encoder.encode(rec)
cancel()
if err != nil {
return err
}

// update enti only when snapshot is ahead of last index
if w.enti < e.Index {
w.enti = e.Index
Expand Down
125 changes: 125 additions & 0 deletions server/watchdog/watchdog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package watchdog

import (
"sync"
"time"

"go.uber.org/zap"
)

const (
tickMs = 100
)

var (
wdInstance *watchdog
nextActivityId uint64
)

type activity struct {
id uint64
name string
inactiveElapsed int
}

type watchdog struct {
lg *zap.Logger

ticker *time.Ticker
inactiveTimeoutTick int

mu sync.Mutex
activities map[uint64]*activity

stopC <-chan struct{}
cleanup func()
}

func (wd *watchdog) run() {
inactiveTimeout := time.Duration(wd.inactiveTimeoutTick*tickMs) * time.Millisecond
wd.lg.Info("Watchdog is running", zap.Duration("inactiveTimeout", inactiveTimeout))
for {
select {
case <-wd.ticker.C:
wd.mu.Lock()
for _, v := range wd.activities {
v.inactiveElapsed++
if v.inactiveElapsed > wd.inactiveTimeoutTick/2 {
elapsedTime := time.Duration(v.inactiveElapsed*tickMs) * time.Millisecond
wd.lg.Warn("Slow activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime))
if v.inactiveElapsed > wd.inactiveTimeoutTick {
wd.mu.Unlock()
wd.cleanup()
wd.lg.Panic("Inactive activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime))
}
}
}
wd.mu.Unlock()
case <-wd.stopC:
wd.lg.Info("Watchdog stopped")
return
}
}
}

func (wd *watchdog) register(name string) func() {
wd.mu.Lock()
defer wd.mu.Unlock()

id := nextActivityId
wd.activities[nextActivityId] = &activity{
id: id,
name: name,
inactiveElapsed: 0,
}
nextActivityId++

return func() {
wd.reset(id)
}
}

func (wd *watchdog) reset(id uint64) {
wd.mu.Lock()
defer wd.mu.Unlock()
delete(wd.activities, id)
}

func Start(lg *zap.Logger, stopC <-chan struct{}, cleanup func(), inactiveTimeoutMs int64) {
if wdInstance != nil {
wdInstance.lg.Warn("Watchdog has already been started")
return
}
wdInstance = &watchdog{
lg: lg,
stopC: stopC,
cleanup: cleanup,

inactiveTimeoutTick: int(inactiveTimeoutMs / tickMs),
activities: make(map[uint64]*activity),
ticker: time.NewTicker(tickMs * time.Millisecond),
}
go wdInstance.run()
}

func Register(name string) func() {
if wdInstance == nil {
// watchdog not enabled
return func() {}
}
return wdInstance.register(name)
}

0 comments on commit 8a50ca5

Please sign in to comment.