Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Etcd downgrade] Add http handler to enable downgrade info communication between each member #12099

Merged
merged 2 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
DefaultDowngradeCheckTime = 5 * time.Second

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
Expand Down Expand Up @@ -330,6 +331,8 @@ type Config struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`

ExperimentalDowngradeCheckTime time.Duration `json:"experimental-downgrade-check-time"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down Expand Up @@ -413,6 +416,8 @@ func NewConfig() *Config {
LogOutputs: []string{DefaultLogOutput},
LogLevel: logutil.DefaultLogLevel,
EnableGRPCGateway: true,

ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
Expand Down
2 changes: 2 additions & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down Expand Up @@ -303,6 +304,7 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
zap.String("discovery-url", sc.DiscoveryURL),
zap.String("discovery-proxy", sc.DiscoveryProxy),
zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
)
}

Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
6 changes: 5 additions & 1 deletion etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler())
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}

func newPeerHandler(
Expand All @@ -47,6 +47,7 @@ func newPeerHandler(
raftHandler http.Handler,
leaseHandler http.Handler,
hashKVHandler http.Handler,
downgradeEnabledHandler http.Handler,
) http.Handler {
if lg == nil {
lg = zap.NewNop()
Expand All @@ -64,6 +65,9 @@ func newPeerHandler(
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
}
if hashKVHandler != nil {
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
}
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque
// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
// handles raft-prefix requests well.
func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

Expand Down Expand Up @@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) {

// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly
func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

Expand Down
95 changes: 95 additions & 0 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -369,9 +370,103 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R

// getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster.
func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
members := cl.Members()

for _, m := range members {
if m.ID == local {
continue
}
enable, err := getDowngradeEnabled(lg, m, rt)
if err != nil {
lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
} else {
// Since the "/downgrade/enabled" serves linearized data,
// this function can return once it gets a non-error response from the endpoint.
return enable
}
}
return false
}

// getDowngradeEnabled returns the downgrade enabled status of the given member
// via its peerURLs. Returns the last error if it fails to get it.
func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
cc := &http.Client{
Transport: rt,
}
var (
err error
resp *http.Response
)

for _, u := range m.PeerURLs {
addr := u + DowngradeEnabledPath
resp, err = cc.Get(addr)
if err != nil {
lg.Warn(
"failed to reach the peer URL",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var b []byte
b, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
lg.Warn(
"failed to read body of response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var enable bool
YoyinZyc marked this conversation as resolved.
Show resolved Hide resolved
if enable, err = strconv.ParseBool(string(b)); err != nil {
lg.Warn(
"failed to convert response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
return enable, nil
}
return false, err
}

// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),
zap.String("current-server-version", v.String()),
zap.String("target-version", targetVersion.String()),
)
return false
YoyinZyc marked this conversation as resolved.
Show resolved Hide resolved
}
}
return true
}

func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {
Expand Down
49 changes: 49 additions & 0 deletions etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,52 @@ func TestDecideAllowedVersionRange(t *testing.T) {
})
}
}

func TestIsMatchedVersions(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap)
if actual != tt.expectedFinished {
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
}
})
}
}
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ type ServerConfig struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`

DowngradeCheckTime time.Duration
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
5 changes: 0 additions & 5 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,6 @@ func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return nil, ErrCorrupt
}

type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
}

const PeerHashKVPath = "/members/hashkv"

type hashKVHandler struct {
Expand Down
Loading