Skip to content

Commit

Permalink
Merge pull request #13151 from serathius/buckets
Browse files Browse the repository at this point in the history
etcdserver: Move all named keys to buckets module
  • Loading branch information
ptabor authored Jun 29, 2021
2 parents 2a0f8f0 + f79d09d commit 6ef7629
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 50 deletions.
19 changes: 8 additions & 11 deletions server/auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,8 @@ import (
)

var (
enableFlagKey = []byte("authEnabled")
authEnabled = []byte{1}
authDisabled = []byte{0}

revisionKey = []byte("authRevision")
authEnabled = []byte{1}
authDisabled = []byte{0}

rootPerm = authpb.Permission{PermType: authpb.READWRITE, Key: []byte{}, RangeEnd: []byte{0}}

Expand Down Expand Up @@ -240,7 +237,7 @@ func (as *authStore) AuthEnable() error {
return ErrRootRoleNotExist
}

tx.UnsafePut(buckets.Auth, enableFlagKey, authEnabled)
tx.UnsafePut(buckets.Auth, buckets.AuthEnabledKeyName, authEnabled)

as.enabled = true
as.tokenProvider.enable()
Expand All @@ -262,7 +259,7 @@ func (as *authStore) AuthDisable() {
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
tx.UnsafePut(buckets.Auth, buckets.AuthEnabledKeyName, authDisabled)
as.commitRevision(tx)
tx.Unlock()
b.ForceCommit()
Expand Down Expand Up @@ -357,7 +354,7 @@ func (as *authStore) Recover(be backend.Backend) {
as.be = be
tx := be.BatchTx()
tx.Lock()
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthEnabledKeyName, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
Expand Down Expand Up @@ -1041,7 +1038,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
tx.UnsafeCreateBucket(buckets.AuthRoles)

enabled := false
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthEnabledKeyName, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
Expand Down Expand Up @@ -1084,11 +1081,11 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
atomic.AddUint64(&as.revision, 1)
revBytes := make([]byte, revBytesLen)
binary.BigEndian.PutUint64(revBytes, as.Revision())
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
tx.UnsafePut(buckets.Auth, buckets.AuthRevisionKeyName, revBytes)
}

func getRevision(tx backend.BatchTx) uint64 {
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, buckets.AuthRevisionKeyName, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
return 0
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {

// The field is populated since etcd v3.5.
func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
ckey := backendClusterVersionKey()
ckey := buckets.ClusterClusterVersionKeyName
tx := be.ReadTx()
tx.RLock()
defer tx.RUnlock()
Expand All @@ -716,7 +716,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi

// The field is populated since etcd v3.5.
func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
dkey := backendDowngradeKey()
dkey := buckets.ClusterDowngradeKeyName
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
Expand Down
20 changes: 4 additions & 16 deletions server/etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
)

func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
mkey := backendMemberKey(m.ID)
mkey := buckets.BackendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
lg.Panic("failed to marshal member", zap.Error(err))
Expand All @@ -65,7 +65,7 @@ func TrimClusterFromBackend(be backend.Backend) error {
}

func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
mkey := backendMemberKey(id)
mkey := buckets.BackendMemberKey(id)

tx := be.BatchTx()
tx.Lock()
Expand Down Expand Up @@ -160,7 +160,7 @@ func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {

// The field is populated since etcd v3.5.
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()
ckey := buckets.ClusterClusterVersionKeyName

tx := be.BatchTx()
tx.Lock()
Expand All @@ -170,7 +170,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {

// The field is populated since etcd v3.5.
func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
dkey := backendDowngradeKey()
dkey := buckets.ClusterDowngradeKeyName
dvalue, err := json.Marshal(downgrade)
if err != nil {
lg.Panic("failed to marshal downgrade information", zap.Error(err))
Expand Down Expand Up @@ -281,18 +281,6 @@ func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
return m, nil
}

func backendMemberKey(id types.ID) []byte {
return []byte(id.String())
}

func backendClusterVersionKey() []byte {
return []byte("clusterVersion")
}

func backendDowngradeKey() []byte {
return []byte("downgrade")
}

func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
Expand Down
17 changes: 14 additions & 3 deletions server/mvcc/buckets/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package buckets
import (
"bytes"

"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)

Expand Down Expand Up @@ -67,11 +68,17 @@ func (b bucket) String() string { return string(b.Name()) }
func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }

var (
// Since v3.0
// Pre v3.5
ScheduledCompactKeyName = []byte("scheduledCompactRev")
FinishedCompactKeyName = []byte("finishedCompactRev")
MetaConsistentIndexKeyName = []byte("consistent_index")
AuthEnabledKeyName = []byte("authEnabled")
AuthRevisionKeyName = []byte("authRevision")
// Since v3.5
MetaTermKeyName = []byte("term")
MetaConfStateName = []byte("confState")
MetaTermKeyName = []byte("term")
MetaConfStateName = []byte("confState")
ClusterClusterVersionKeyName = []byte("clusterVersion")
ClusterDowngradeKeyName = []byte("downgrade")
// Since v3.6
MetaStorageVersionName = []byte("storageVersion")
// Before adding new meta key please update server/etcdserver/version
Expand All @@ -84,3 +91,7 @@ func DefaultIgnores(bucket, key []byte) bool {
return bytes.Compare(bucket, Meta.Name()) == 0 &&
(bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0)
}

func BackendMemberKey(id types.ID) []byte {
return []byte(id.String())
}
13 changes: 5 additions & 8 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ import (
)

var (
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")

ErrCompacted = errors.New("mvcc: required revision has been compacted")
ErrFutureRev = errors.New("mvcc: required revision is a future revision")
)
Expand Down Expand Up @@ -244,7 +241,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {

tx := s.b.BatchTx()
tx.Lock()
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes)
tx.Unlock()
// ensure that desired compaction is persisted
s.b.ForceCommit()
Expand Down Expand Up @@ -342,20 +339,20 @@ func (s *store) restore() error {
tx := s.b.BatchTx()
tx.Lock()

_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.revMu.Lock()
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main

s.lg.Info(
"restored last compact revision",
zap.Stringer("meta-bucket-name", buckets.Meta),
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
zap.String("meta-bucket-name-key", string(buckets.FinishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
s.revMu.Unlock()
}
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.ScheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
Expand Down Expand Up @@ -427,7 +424,7 @@ func (s *store) restore() error {
s.lg.Info(
"resume scheduled compaction",
zap.Stringer("meta-bucket-name", buckets.Meta),
zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
zap.String("meta-bucket-name-key", string(buckets.ScheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact),
)
}
Expand Down
2 changes: 1 addition & 1 deletion server/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
if len(keys) < batchNum {
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
tx.UnsafePut(buckets.Meta, buckets.FinishedCompactKeyName, rbytes)
tx.Unlock()
s.lg.Info(
"finished scheduled compaction",
Expand Down
4 changes: 2 additions & 2 deletions server/mvcc/kvstore_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func TestScheduleCompaction(t *testing.T) {
t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys))
}
}
_, vals := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
_, vals := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0)
revToBytes(revision{main: tt.rev}, ibytes)
if w := [][]byte{ibytes}; !reflect.DeepEqual(vals, w) {
t.Errorf("#%d: vals on %v = %+v, want %+v", i, finishedCompactKeyName, vals, w)
t.Errorf("#%d: vals on %v = %+v, want %+v", i, buckets.FinishedCompactKeyName, vals, w)
}
tx.Unlock()

Expand Down
14 changes: 7 additions & 7 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
{Name: "put", Params: []interface{}{buckets.Meta, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
Expand Down Expand Up @@ -384,8 +384,8 @@ func TestStoreRestore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{buckets.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{buckets.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}

b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
b.tx.rangeRespc <- rangeResp{nil, nil}
Expand All @@ -399,8 +399,8 @@ func TestStoreRestore(t *testing.T) {
t.Errorf("current rev = %v, want 5", s.currentRev)
}
wact := []testutil.Action{
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes)
tx.Unlock()

s0.Close()
Expand Down

0 comments on commit 6ef7629

Please sign in to comment.