Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor code to make place for downgrade logic #13391

Merged
merged 8 commits into from
Oct 8, 2021
33 changes: 21 additions & 12 deletions server/etcdserver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"context"

"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit, lets group these by etcd internal and external packages. It's what some editors will auto change so we will see changes for this in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, hate that. I don't know good solution in Go for that. Python has isort https://github.com/PyCQA/isort that allows defining custom import blocks.

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)

// serverVersionAdapter implements Server interface needed by serverversion.Monitor
Expand All @@ -47,24 +48,32 @@ func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
s.GoAttach(func() { s.updateClusterVersionV2(version) })
}

func (s *serverVersionAdapter) DowngradeCancel() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error {
return s.linearizableReadNotify(ctx)
}

func (s *serverVersionAdapter) DowngradeEnable(ctx context.Context, targetVersion *semver.Version) error {
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
return err
}

func (s *serverVersionAdapter) DowngradeCancel(ctx context.Context) error {
raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
return err
}

func (s *serverVersionAdapter) GetClusterVersion() *semver.Version {
return s.cluster.Version()
}

func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
func (s *serverVersionAdapter) GetDowngradeInfo() *serverversion.DowngradeInfo {
return s.cluster.DowngradeInfo()
}

func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions {
return getMembersVersions(s.lg, s.cluster, s.id, s.peerRt)
}

func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/api/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"sync"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.uber.org/zap"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -64,7 +64,7 @@ func UpdateCapability(lg *zap.Logger, v *semver.Version) {
return
}
enableMapMu.Lock()
if curVersion != nil && !membership.IsValidVersionChange(v, curVersion) {
if curVersion != nil && !serverversion.IsValidVersionChange(v, curVersion) {
enableMapMu.Unlock()
return
}
Expand Down
39 changes: 13 additions & 26 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"

"github.com/coreos/go-semver/semver"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -58,7 +59,7 @@ type RaftCluster struct {
// removed id cannot be reused.
removed map[types.ID]bool

downgradeInfo *DowngradeInfo
downgradeInfo *serverversion.DowngradeInfo
versionChanged *notify.Notifier
}

Expand Down Expand Up @@ -113,7 +114,7 @@ func NewCluster(lg *zap.Logger) *RaftCluster {
lg: lg,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
downgradeInfo: &DowngradeInfo{Enabled: false},
downgradeInfo: &serverversion.DowngradeInfo{Enabled: false},
}
}

Expand Down Expand Up @@ -268,11 +269,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
d := &DowngradeInfo{Enabled: false}
d := &serverversion.DowngradeInfo{Enabled: false}
if c.downgradeInfo != nil {
d = &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
}
mustDetectDowngrade(c.lg, c.version, d)
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, d)
onSet(c.lg, c.version)

for _, m := range c.members {
Expand Down Expand Up @@ -540,7 +542,8 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
}
oldVer := c.version
c.version = ver
mustDetectDowngrade(c.lg, c.version, c.downgradeInfo)
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo)
if c.v2store != nil {
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
}
Expand Down Expand Up @@ -715,22 +718,6 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
return nil
}

// IsValidVersionChange checks the two scenario when version is valid to change:
// 1. Downgrade: cluster version is 1 minor version higher than local version,
// cluster version should change.
// 2. Cluster start: when not all members version are available, cluster version
// is set to MinVersion(3.0), when all members are at higher version, cluster version
// is lower than local version, cluster version should change
func IsValidVersionChange(cv *semver.Version, lv *semver.Version) bool {
cv = &semver.Version{Major: cv.Major, Minor: cv.Minor}
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}

if isValidDowngrade(cv, lv) || (cv.Major == lv.Major && cv.LessThan(*lv)) {
return true
}
return false
}

// IsLocalMemberLearner returns if the local member is raft learner
func (c *RaftCluster) IsLocalMemberLearner() bool {
c.Lock()
Expand All @@ -747,17 +734,17 @@ func (c *RaftCluster) IsLocalMemberLearner() bool {
}

// DowngradeInfo returns the downgrade status of the cluster
func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
func (c *RaftCluster) DowngradeInfo() *serverversion.DowngradeInfo {
c.Lock()
defer c.Unlock()
if c.downgradeInfo == nil {
return &DowngradeInfo{Enabled: false}
return &serverversion.DowngradeInfo{Enabled: false}
}
d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
d := &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
return d
}

func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()

Expand Down
73 changes: 0 additions & 73 deletions server/etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"reflect"
"testing"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/client/pkg/v3/testutil"
Expand Down Expand Up @@ -947,75 +946,3 @@ func TestIsReadyToPromoteMember(t *testing.T) {
}
}
}

func TestIsVersionChangable(t *testing.T) {
v0 := semver.Must(semver.NewVersion("2.4.0"))
v1 := semver.Must(semver.NewVersion("3.4.0"))
v2 := semver.Must(semver.NewVersion("3.5.0"))
v3 := semver.Must(semver.NewVersion("3.5.1"))
v4 := semver.Must(semver.NewVersion("3.6.0"))

tests := []struct {
name string
currentVersion *semver.Version
localVersion *semver.Version
expectedResult bool
}{
{
name: "When local version is one minor lower than cluster version",
currentVersion: v2,
localVersion: v1,
expectedResult: true,
},
{
name: "When local version is one minor and one patch lower than cluster version",
currentVersion: v3,
localVersion: v1,
expectedResult: true,
},
{
name: "When local version is one minor higher than cluster version",
currentVersion: v1,
localVersion: v2,
expectedResult: true,
},
{
name: "When local version is two minor higher than cluster version",
currentVersion: v1,
localVersion: v4,
expectedResult: true,
},
{
name: "When local version is one major higher than cluster version",
currentVersion: v0,
localVersion: v1,
expectedResult: false,
},
{
name: "When local version is equal to cluster version",
currentVersion: v1,
localVersion: v1,
expectedResult: false,
},
{
name: "When local version is one patch higher than cluster version",
currentVersion: v2,
localVersion: v3,
expectedResult: false,
},
{
name: "When local version is two minor lower than cluster version",
currentVersion: v4,
localVersion: v1,
expectedResult: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if ret := IsValidVersionChange(tt.currentVersion, tt.localVersion); ret != tt.expectedResult {
t.Errorf("Expected %v; Got %v", tt.expectedResult, ret)
}
})
}
}
5 changes: 3 additions & 2 deletions server/etcdserver/api/membership/membership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/version"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -50,5 +51,5 @@ func (b *backendMock) MustSaveMemberToBackend(*Member) {}
func (b *backendMock) TrimMembershipFromBackend() error { return nil }
func (b *backendMock) MustDeleteMemberFromBackend(types.ID) {}

func (b *backendMock) MustSaveDowngradeToBackend(*DowngradeInfo) {}
func (b *backendMock) DowngradeInfoFromBackend() *DowngradeInfo { return nil }
func (b *backendMock) MustSaveDowngradeToBackend(*version.DowngradeInfo) {}
func (b *backendMock) DowngradeInfoFromBackend() *version.DowngradeInfo { return nil }
5 changes: 3 additions & 2 deletions server/etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"path"

"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/version"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
Expand All @@ -43,8 +44,8 @@ type MemberBackend interface {
}

type DowngradeInfoBackend interface {
MustSaveDowngradeToBackend(*DowngradeInfo)
DowngradeInfoFromBackend() *DowngradeInfo
MustSaveDowngradeToBackend(*version.DowngradeInfo)
DowngradeInfoFromBackend() *version.DowngradeInfo
}

func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
Expand Down
11 changes: 6 additions & 5 deletions server/etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc"

Expand Down Expand Up @@ -58,11 +59,11 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,

etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
etcdserver.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
etcdserver.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongDowngradeVersionFormat: rpctypes.ErrGRPCWrongDowngradeVersionFormat,
version.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
version.ErrDowngradeInProcess: rpctypes.ErrGRPCDowngradeInProcess,
version.ErrNoInflightDowngrade: rpctypes.ErrGRPCNoInflightDowngrade,

lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/mvcc"
Expand Down Expand Up @@ -946,9 +947,9 @@ func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAtt
}

func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
d := membership.DowngradeInfo{Enabled: false}
d := version.DowngradeInfo{Enabled: false}
if r.Enabled {
d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
d = version.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
}
a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3)
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *m
Node: n,
heartbeat: b.heartbeat,
raftStorage: b.storage,
storage: NewStorage(wal, ss),
storage: serverstorage.NewStorage(b.lg, wal, ss),
},
)
}
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string {
return us
}

// getVersions returns the versions of the members in the given cluster.
// getMembersVersions returns the versions of the members in the given cluster.
// The key of the returned map is the member's ID. The value of the returned map
// is the semver versions string, including server and cluster.
// If it fails to get the version of a member, the key will be nil.
func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
func getMembersVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
members := cl.Members()
vers := make(map[string]*version.Versions)
for _, m := range members {
Expand Down Expand Up @@ -184,7 +184,7 @@ func allowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *sem
// out of the range.
// We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(lg, cl, local, rt)
vers := getMembersVersions(lg, cl, local, rt)
minV, maxV := allowedVersionRange(getDowngradeEnabledFromRemotePeers(lg, cl, local, rt))
return isCompatibleWithVers(lg, vers, local, minV, maxV)
}
Expand Down
Loading