Skip to content

Commit

Permalink
Merge pull request #300 from tjungblu/bump_mark_force_new_c
Browse files Browse the repository at this point in the history
DOWNSTREAM: <carry>: ETCD-696: Add rev bumping to force-new-cluster
  • Loading branch information
openshift-merge-bot[bot] authored Nov 5, 2024
2 parents 3b0a2d2 + 74d968e commit ad72abe
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 4 deletions.
3 changes: 2 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ type ServerConfig struct {
// Logger logs server-side operations.
Logger *zap.Logger

ForceNewCluster bool
ForceNewCluster bool
ForceNewClusterBumpAmount uint64

// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool
Expand Down
3 changes: 2 additions & 1 deletion server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ type Config struct {
ExperimentalMaxLearners int `json:"experimental-max-learners"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
ForceNewCluster bool `json:"force-new-cluster"`
ForceNewClusterBumpAmount uint64 `json:"force-new-cluster-bump-amount"`

EnablePprof bool `json:"enable-pprof"`
Metrics string `json:"metrics"`
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
ForceNewClusterBumpAmount: cfg.ForceNewClusterBumpAmount,
EnableGRPCGateway: cfg.EnableGRPCGateway,
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func newConfig() *config {
// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
fs.Uint64Var(&cfg.ec.ForceNewClusterBumpAmount, "force-new-cluster-bump-amount", 0, "How much to increase the latest revision after --force-new-cluster.")

// ignored
for _, f := range cfg.ignored {
Expand Down
12 changes: 11 additions & 1 deletion server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/revbump"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -570,7 +572,8 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID,
return id, cl, n, s, w
}

func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot, be backend.Backend) (types.ID,
*membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
Expand Down Expand Up @@ -610,6 +613,13 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot)
st.Commit = ents[len(ents)-1].Index
}

if cfg.ForceNewClusterBumpAmount > 0 {
err = revbump.UnsafeModifyLastRevision(cfg.Logger, cfg.ForceNewClusterBumpAmount, be)
if err != nil {
cfg.Logger.Fatal("failed to modify last revision", zap.Error(err))
}
}

cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", cid.String()),
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot)
} else {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot, be)
}

cl.SetStore(st)
Expand Down
65 changes: 65 additions & 0 deletions server/revbump/revbump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package revbump

import (
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)

func UnsafeModifyLastRevision(lg *zap.Logger, bumpAmount uint64, be backend.Backend) error {
defer be.ForceCommit()

tx := be.BatchTx()
tx.LockOutsideApply()
defer tx.Unlock()

latest, err := unsafeGetLatestRevision(tx)
if err != nil {
return err
}

latest = unsafeBumpRevision(lg, tx, latest, int64(bumpAmount))
unsafeMarkRevisionCompacted(lg, tx, latest)
return nil
}

func unsafeBumpRevision(lg *zap.Logger, tx backend.BatchTx, latest revision, amount int64) revision {
lg.Info(
"bumping latest revision",
zap.Int64("latest-revision", latest.main),
zap.Int64("bump-amount", amount),
zap.Int64("new-latest-revision", latest.main+amount),
)

latest.main += amount
latest.sub = 0
k := make([]byte, revBytesLen)
revToBytes(k, latest)
tx.UnsafePut(buckets.Key, k, []byte{})

return latest
}

func unsafeMarkRevisionCompacted(lg *zap.Logger, tx backend.BatchTx, latest revision) {
lg.Info(
"marking revision compacted",
zap.Int64("revision", latest.main),
)

mvcc.UnsafeSetScheduledCompact(tx, latest.main)
}

func unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) {
var latest revision
err := tx.UnsafeForEach(buckets.Key, func(k, _ []byte) (err error) {
rev := bytesToRev(k)

if rev.GreaterThan(latest) {
latest = rev
}

return nil
})
return latest, err
}
46 changes: 46 additions & 0 deletions server/revbump/revision.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package revbump

import "encoding/binary"

// revBytesLen is the byte length of a normal revision.
// First 8 bytes is the revision.main in big-endian format. The 9th byte
// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
const revBytesLen = 8 + 1 + 8
const markedRevBytesLen = revBytesLen + 1

// A revision indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64

// sub is the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}

func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
return true
}
if a.main < b.main {
return false
}
return a.sub > b.sub
}

// revToBytes should be synced with function in server
// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go
func revToBytes(bytes []byte, rev revision) {
binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}

func bytesToRev(bytes []byte) revision {
return revision{
main: int64(binary.BigEndian.Uint64(bytes[0:8])),
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}

0 comments on commit ad72abe

Please sign in to comment.