From 8a50ca5be4147f9c1f60815b8a4e35e86733a4e6 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 10 Mar 2023 10:29:35 +0800 Subject: [PATCH] etcdserver: add watchdog to detect stalled writes Signed-off-by: Benjamin Wang --- server/embed/config.go | 18 ++++ server/embed/etcd.go | 37 +++++-- server/etcdmain/config.go | 3 + server/etcdserver/api/snap/snapshotter.go | 3 + server/storage/backend/batch_tx.go | 15 ++- server/storage/wal/wal.go | 18 +++- server/watchdog/watchdog.go | 125 ++++++++++++++++++++++ 7 files changed, 208 insertions(+), 11 deletions(-) create mode 100644 server/watchdog/watchdog.go diff --git a/server/embed/config.go b/server/embed/config.go index 75bcbc34152e..8e97ff2551e7 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -74,6 +74,8 @@ const ( DefaultDiscoveryKeepAliveTime = 2 * time.Second DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second + DefaultWatchdogTolerateInactiveTimeout = 10 * time.Second + DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -380,6 +382,9 @@ type Config struct { // Defaults to 0. ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"` + EnableWatchDog bool `json:"experimental-enable-watchdog"` + WatchDogTolerateInactiveTimeout time.Duration `json:"experimental-watchdog-tolerate-inactive-timeout"` + // Logger is logger options: currently only supports "zap". // "capnslog" is removed in v3.5. Logger string `json:"logger"` @@ -532,6 +537,8 @@ func NewConfig() *Config { ExperimentalCompactHashCheckEnabled: false, ExperimentalCompactHashCheckTime: time.Minute, + WatchDogTolerateInactiveTimeout: DefaultWatchdogTolerateInactiveTimeout, + V2Deprecation: config.V2_DEPR_DEFAULT, DiscoveryCfg: v3discovery.DiscoveryConfig{ @@ -750,6 +757,17 @@ func (cfg *Config) Validate() error { return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs) } + if cfg.EnableWatchDog { + if cfg.WatchDogTolerateInactiveTimeout == 0 { + return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout must be >0 (set to %s)", cfg.WatchDogTolerateInactiveTimeout) + } + + if int64(2*cfg.ElectionMs) > cfg.WatchDogTolerateInactiveTimeout.Milliseconds() { + return fmt.Errorf("--experimental-watchdog-tolerate-inactive-timeout[%s] should be at least as 2 times as --election-timeout[%dms]", + cfg.WatchDogTolerateInactiveTimeout, cfg.ElectionMs) + } + } + // check this last since proxying in etcdmain may make this OK if cfg.LCUrls != nil && cfg.ACUrls == nil { return ErrUnsetAdvertiseClientURLsFlag diff --git a/server/embed/etcd.go b/server/embed/etcd.go index a03b4f1c9fe9..ce84d6cdd82e 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -40,6 +40,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/verify" + "go.etcd.io/etcd/server/v3/watchdog" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/soheilhy/cmux" @@ -263,6 +264,12 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return e, err } } + + if cfg.EnableWatchDog { + inactiveTimeoutMs := cfg.WatchDogTolerateInactiveTimeout.Milliseconds() + watchdog.Start(e.cfg.logger, e.Server.StoppingNotify(), e.Stop, inactiveTimeoutMs) + } + e.Server.Start() if err = e.servePeers(); err != nil { @@ -400,6 +407,28 @@ func (e *Etcd) Close() { lg.Sync() }() + e.Stop() + + // close rafthttp transports + if e.Server != nil { + e.Server.Stop() + } + + if e.errc != nil { + close(e.errc) + } +} + +func (e *Etcd) Stop() { + fields := []zap.Field{ + zap.String("name", e.cfg.Name), + zap.String("data-dir", e.cfg.Dir), + zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()), + zap.Strings("advertise-client-urls", e.cfg.getACURLs()), + } + lg := e.GetLogger() + lg.Info("stopping etcd server", fields...) + e.closeOnce.Do(func() { close(e.stopc) }) @@ -436,11 +465,6 @@ func (e *Etcd) Close() { e.tracingExporterShutdown() } - // close rafthttp transports - if e.Server != nil { - e.Server.Stop() - } - // close all idle connections in peer handler (wait up to 1-second) for i := range e.Peers { if e.Peers[i] != nil && e.Peers[i].close != nil { @@ -449,9 +473,6 @@ func (e *Etcd) Close() { cancel() } } - if e.errc != nil { - close(e.errc) - } } func stopServers(ctx context.Context, ss *servers) { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 84763dd9a6db..1e204d9f55e3 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -284,6 +284,9 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.") fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.") + fs.BoolVar(&cfg.ec.EnableWatchDog, "experimental-enable-watchdog", cfg.ec.EnableWatchDog, "Enable watchdog to detect inactive activities.") + fs.DurationVar(&cfg.ec.WatchDogTolerateInactiveTimeout, "experimental-watchdog-tolerate-inactive-timeout", cfg.ec.WatchDogTolerateInactiveTimeout, "Maximum inactive duration of each activity that the watchdog tolerates.") + // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.") diff --git a/server/etcdserver/api/snap/snapshotter.go b/server/etcdserver/api/snap/snapshotter.go index 093ab6bc9149..ffc299bb7dee 100644 --- a/server/etcdserver/api/snap/snapshotter.go +++ b/server/etcdserver/api/snap/snapshotter.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -88,7 +89,9 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { spath := filepath.Join(s.dir, fname) fsyncStart := time.Now() + cancel := watchdog.Register("save v2 snapshot") err = pioutil.WriteAndSyncFile(spath, d, 0666) + cancel() snapFsyncSec.Observe(time.Since(fsyncStart).Seconds()) if err != nil { diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 0d12a0868ddc..09e85b8b05e1 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/server/v3/watchdog" ) type BucketID int @@ -114,7 +115,9 @@ func (t *batchTx) RUnlock() { } func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { + cancel := watchdog.Register("batchTx createBucket") _, err := t.tx.CreateBucket(bucket.Name()) + cancel() if err != nil && err != bolt.ErrBucketExists { t.backend.lg.Fatal( "failed to create a bucket", @@ -126,7 +129,9 @@ func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { } func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) { + cancel := watchdog.Register("batchTx deleteBucket") err := t.tx.DeleteBucket(bucket.Name()) + cancel() if err != nil && err != bolt.ErrBucketNotFound { t.backend.lg.Fatal( "failed to delete a bucket", @@ -161,7 +166,11 @@ func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq boo // this can delay the page split and reduce space usage. bucket.FillPercent = 0.9 } - if err := bucket.Put(key, value); err != nil { + + cancel := watchdog.Register("batchTx put") + err := bucket.Put(key, value) + cancel() + if err != nil { t.backend.lg.Fatal( "failed to write to a bucket", zap.Stringer("bucket-name", bucketType), @@ -216,7 +225,9 @@ func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) { zap.Stack("stack"), ) } + cancel := watchdog.Register("batchTx delete") err := bucket.Delete(key) + cancel() if err != nil { t.backend.lg.Fatal( "failed to delete a key", @@ -269,7 +280,9 @@ func (t *batchTx) commit(stop bool) { start := time.Now() // gofail: var beforeCommit struct{} + cancel := watchdog.Register("batchTx commit") err := t.tx.Commit() + cancel() // gofail: var afterCommit struct{} rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds()) diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index 7f8b25f5ddd2..6d47a9a15347 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/server/v3/watchdog" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -798,6 +799,8 @@ func (w *WAL) cut() error { } func (w *WAL) sync() error { + cancel := watchdog.Register("WAL sync") + defer cancel() if w.encoder != nil { if err := w.encoder.flush(); err != nil { return err @@ -935,12 +938,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { mustSync := raft.MustSync(st, w.state, len(ents)) // TODO(xiangli): no more reference operator + cancel := watchdog.Register("WAL saveEntry") for i := range ents { if err := w.saveEntry(&ents[i]); err != nil { + cancel() return err } } - if err := w.saveState(&st); err != nil { + cancel() + + cancel = watchdog.Register("WAL saveState") + err := w.saveState(&st) + cancel() + if err != nil { return err } @@ -972,9 +982,13 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { defer w.mu.Unlock() rec := &walpb.Record{Type: SnapshotType, Data: b} - if err := w.encoder.encode(rec); err != nil { + cancel := watchdog.Register("WAL saveSnapshot") + err := w.encoder.encode(rec) + cancel() + if err != nil { return err } + // update enti only when snapshot is ahead of last index if w.enti < e.Index { w.enti = e.Index diff --git a/server/watchdog/watchdog.go b/server/watchdog/watchdog.go new file mode 100644 index 000000000000..d73ce552277c --- /dev/null +++ b/server/watchdog/watchdog.go @@ -0,0 +1,125 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watchdog + +import ( + "sync" + "time" + + "go.uber.org/zap" +) + +const ( + tickMs = 100 +) + +var ( + wdInstance *watchdog + nextActivityId uint64 +) + +type activity struct { + id uint64 + name string + inactiveElapsed int +} + +type watchdog struct { + lg *zap.Logger + + ticker *time.Ticker + inactiveTimeoutTick int + + mu sync.Mutex + activities map[uint64]*activity + + stopC <-chan struct{} + cleanup func() +} + +func (wd *watchdog) run() { + inactiveTimeout := time.Duration(wd.inactiveTimeoutTick*tickMs) * time.Millisecond + wd.lg.Info("Watchdog is running", zap.Duration("inactiveTimeout", inactiveTimeout)) + for { + select { + case <-wd.ticker.C: + wd.mu.Lock() + for _, v := range wd.activities { + v.inactiveElapsed++ + if v.inactiveElapsed > wd.inactiveTimeoutTick/2 { + elapsedTime := time.Duration(v.inactiveElapsed*tickMs) * time.Millisecond + wd.lg.Warn("Slow activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime)) + if v.inactiveElapsed > wd.inactiveTimeoutTick { + wd.mu.Unlock() + wd.cleanup() + wd.lg.Panic("Inactive activity detected", zap.String("activity", v.name), zap.Duration("duration", elapsedTime)) + } + } + } + wd.mu.Unlock() + case <-wd.stopC: + wd.lg.Info("Watchdog stopped") + return + } + } +} + +func (wd *watchdog) register(name string) func() { + wd.mu.Lock() + defer wd.mu.Unlock() + + id := nextActivityId + wd.activities[nextActivityId] = &activity{ + id: id, + name: name, + inactiveElapsed: 0, + } + nextActivityId++ + + return func() { + wd.reset(id) + } +} + +func (wd *watchdog) reset(id uint64) { + wd.mu.Lock() + defer wd.mu.Unlock() + delete(wd.activities, id) +} + +func Start(lg *zap.Logger, stopC <-chan struct{}, cleanup func(), inactiveTimeoutMs int64) { + if wdInstance != nil { + wdInstance.lg.Warn("Watchdog has already been started") + return + } + wdInstance = &watchdog{ + lg: lg, + stopC: stopC, + cleanup: cleanup, + + inactiveTimeoutTick: int(inactiveTimeoutMs / tickMs), + activities: make(map[uint64]*activity), + ticker: time.NewTicker(tickMs * time.Millisecond), + } + go wdInstance.run() +} + +func Register(name string) func() { + if wdInstance == nil { + // watchdog not enabled + return func() {} + } + return wdInstance.register(name) +}