From 74d968e1f489e7d01fef0e2813bd6b266a6f8488 Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Mon, 4 Nov 2024 15:44:54 +0100 Subject: [PATCH] DOWNSTREAM: : ETCD-696: Add rev bumping to force-new-cluster force-new-cluster seems to have similar watch cache issues as the ordinary snapshot restore. This PR introduces the already existing utl logic as a separate package into the server-side code. This will only introduce a revbump flag, but under the hood implement both rev bumping and compaction markers. Signed-off-by: Thomas Jungblut --- server/config/config.go | 3 +- server/embed/config.go | 3 +- server/embed/etcd.go | 1 + server/etcdmain/config.go | 1 + server/etcdserver/raft.go | 12 ++++++- server/etcdserver/server.go | 2 +- server/revbump/revbump.go | 65 +++++++++++++++++++++++++++++++++++++ server/revbump/revision.go | 46 ++++++++++++++++++++++++++ 8 files changed, 129 insertions(+), 4 deletions(-) create mode 100644 server/revbump/revbump.go create mode 100644 server/revbump/revision.go diff --git a/server/config/config.go b/server/config/config.go index 3d341b1abb9..ca964b7169d 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -152,7 +152,8 @@ type ServerConfig struct { // Logger logs server-side operations. Logger *zap.Logger - ForceNewCluster bool + ForceNewCluster bool + ForceNewClusterBumpAmount uint64 // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. EnableLeaseCheckpoint bool diff --git a/server/embed/config.go b/server/embed/config.go index 30c0d073029..5e0fe252901 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -350,7 +350,8 @@ type Config struct { ExperimentalMaxLearners int `json:"experimental-max-learners"` // ForceNewCluster starts a new cluster even if previously started; unsafe. - ForceNewCluster bool `json:"force-new-cluster"` + ForceNewCluster bool `json:"force-new-cluster"` + ForceNewClusterBumpAmount uint64 `json:"force-new-cluster-bump-amount"` EnablePprof bool `json:"enable-pprof"` Metrics string `json:"metrics"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index c98e32f67b1..08fe6606d29 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -212,6 +212,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { PreVote: cfg.PreVote, Logger: cfg.logger, ForceNewCluster: cfg.ForceNewCluster, + ForceNewClusterBumpAmount: cfg.ForceNewClusterBumpAmount, EnableGRPCGateway: cfg.EnableGRPCGateway, ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing, UnsafeNoFsync: cfg.UnsafeNoFsync, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 8292ac96878..8bd2eebc995 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -311,6 +311,7 @@ func newConfig() *config { // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.") + fs.Uint64Var(&cfg.ec.ForceNewClusterBumpAmount, "force-new-cluster-bump-amount", 0, "How much to increase the latest revision after --force-new-cluster.") // ignored for _, f := range cfg.ignored { diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 8338af99d38..b022c68fb04 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -33,6 +33,8 @@ import ( "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/revbump" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" @@ -570,7 +572,8 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, return id, cl, n, s, w } -func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot, be backend.Backend) (types.ID, + *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -610,6 +613,13 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) st.Commit = ents[len(ents)-1].Index } + if cfg.ForceNewClusterBumpAmount > 0 { + err = revbump.UnsafeModifyLastRevision(cfg.Logger, cfg.ForceNewClusterBumpAmount, be) + if err != nil { + cfg.Logger.Fatal("failed to modify last revision", zap.Error(err)) + } + } + cfg.Logger.Info( "forcing restart member", zap.String("cluster-id", cid.String()), diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 871cce817b8..a87f4266f7b 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -538,7 +538,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if !cfg.ForceNewCluster { id, cl, n, s, w = restartNode(cfg, snapshot) } else { - id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) + id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot, be) } cl.SetStore(st) diff --git a/server/revbump/revbump.go b/server/revbump/revbump.go new file mode 100644 index 00000000000..ce4ff823b34 --- /dev/null +++ b/server/revbump/revbump.go @@ -0,0 +1,65 @@ +package revbump + +import ( + "go.etcd.io/etcd/server/v3/mvcc" + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" + "go.uber.org/zap" +) + +func UnsafeModifyLastRevision(lg *zap.Logger, bumpAmount uint64, be backend.Backend) error { + defer be.ForceCommit() + + tx := be.BatchTx() + tx.LockOutsideApply() + defer tx.Unlock() + + latest, err := unsafeGetLatestRevision(tx) + if err != nil { + return err + } + + latest = unsafeBumpRevision(lg, tx, latest, int64(bumpAmount)) + unsafeMarkRevisionCompacted(lg, tx, latest) + return nil +} + +func unsafeBumpRevision(lg *zap.Logger, tx backend.BatchTx, latest revision, amount int64) revision { + lg.Info( + "bumping latest revision", + zap.Int64("latest-revision", latest.main), + zap.Int64("bump-amount", amount), + zap.Int64("new-latest-revision", latest.main+amount), + ) + + latest.main += amount + latest.sub = 0 + k := make([]byte, revBytesLen) + revToBytes(k, latest) + tx.UnsafePut(buckets.Key, k, []byte{}) + + return latest +} + +func unsafeMarkRevisionCompacted(lg *zap.Logger, tx backend.BatchTx, latest revision) { + lg.Info( + "marking revision compacted", + zap.Int64("revision", latest.main), + ) + + mvcc.UnsafeSetScheduledCompact(tx, latest.main) +} + +func unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) { + var latest revision + err := tx.UnsafeForEach(buckets.Key, func(k, _ []byte) (err error) { + rev := bytesToRev(k) + + if rev.GreaterThan(latest) { + latest = rev + } + + return nil + }) + return latest, err +} diff --git a/server/revbump/revision.go b/server/revbump/revision.go new file mode 100644 index 00000000000..875ce537227 --- /dev/null +++ b/server/revbump/revision.go @@ -0,0 +1,46 @@ +package revbump + +import "encoding/binary" + +// revBytesLen is the byte length of a normal revision. +// First 8 bytes is the revision.main in big-endian format. The 9th byte +// is a '_'. The last 8 bytes is the revision.sub in big-endian format. +const revBytesLen = 8 + 1 + 8 +const markedRevBytesLen = revBytesLen + 1 + +// A revision indicates modification of the key-value space. +// The set of changes that share same main revision changes the key-value space atomically. +type revision struct { + // main is the main revision of a set of changes that happen atomically. + main int64 + + // sub is the sub revision of a change in a set of changes that happen + // atomically. Each change has different increasing sub revision in that + // set. + sub int64 +} + +func (a revision) GreaterThan(b revision) bool { + if a.main > b.main { + return true + } + if a.main < b.main { + return false + } + return a.sub > b.sub +} + +// revToBytes should be synced with function in server +// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go +func revToBytes(bytes []byte, rev revision) { + binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main)) + bytes[8] = '_' + binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) +} + +func bytesToRev(bytes []byte) revision { + return revision{ + main: int64(binary.BigEndian.Uint64(bytes[0:8])), + sub: int64(binary.BigEndian.Uint64(bytes[9:])), + } +}