Skip to content

Commit

Permalink
Add verification on keys: should be always mononically increasing
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Jan 22, 2024
1 parent 15f95ec commit 6b9b7a8
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 24 deletions.
17 changes: 17 additions & 0 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ import (

type BucketID int

const (
BucketIdKey BucketID = 1
BucketIdMeta BucketID = 2
BucketIdLease BucketID = 3
BucketIdAlarm BucketID = 4
BucketIdCluster BucketID = 5

BucketIdMembers BucketID = 10
BucketIdMembersRemoved BucketID = 11

BucketIdAuth BucketID = 20
BucketIdAuthUsers BucketID = 21
BucketIdAuthRoles BucketID = 22

BucketIdTest BucketID = 100
)

type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
Expand Down
55 changes: 42 additions & 13 deletions server/storage/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package backend

import (
"bytes"
"encoding/hex"
"fmt"
"sort"

"go.etcd.io/etcd/client/pkg/v3/verify"
Expand Down Expand Up @@ -52,7 +54,6 @@ func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
}

func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) {
// TODO: Add (in tests?) verification whether k>b[len(b)]
txw.putInternal(bucket, k, v)
}

Expand Down Expand Up @@ -83,13 +84,31 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
if !ok {
delete(txw.buckets, k)
txr.buckets[k] = wb
continue
rb = wb
} else {
rb.merge(wb)
}
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
// assume no duplicate keys
sort.Sort(wb)

// Only verify the Key bucket. Reasons:
// 1. The keys in the Key bucket are monotonically increasing
// revisions, so there will never have duplicated keys, and
// all the keys should be already sorted. So no need to sort
// them again, also no need to perform the operation of removing
// duplicated keys from the buffer. The Key bucket is the most
// performance sensitive bucket, so it can also increase the
// performance (Need to run benchmark the double confirm this).
// 2. Currently, Meta bucket is the only case which might have
// duplicated keys. In case we add other buckets in the future,
// which may break the invariant property. Other buckets are
// also not performance sensitive, so we just keep them as they
// are for simplicity.
//
if k == BucketIdKey {
verifyMonotonicallyIncreasing(rb)
} else {
rb.dedupe()
}
rb.merge(wb)

}
txw.reset()
// increase the buffer version
Expand Down Expand Up @@ -197,16 +216,13 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
for i := 0; i < bbsrc.used; i++ {
bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
}
if bb.used == bbsrc.used {
return
}
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
}

func (bb *bucketBuffer) dedupe() {
if bb.used <= 1 {
return
}

sort.Stable(bb)

// remove duplicates, using only newest update
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
Expand All @@ -217,6 +233,19 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
bb.used = widx + 1
}

func verifyMonotonicallyIncreasing(bb *bucketBuffer) {
verify.Verify(func() {
for i := 1; i < bb.used; i++ {
prev := bb.buf[i-1]
cur := bb.buf[i]
if bytes.Compare(prev.key, cur.key) >= 0 {
panic(fmt.Sprintf("Broke the rule of monotonically increasing, key[%d]: %s, key[%d]: %s",
i-1, hex.EncodeToString(prev.key), i, hex.EncodeToString(cur.key)))
}
}
})
}

func (bb *bucketBuffer) Len() int { return bb.used }
func (bb *bucketBuffer) Less(i, j int) bool {
return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
Expand Down
22 changes: 11 additions & 11 deletions server/storage/schema/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ var (
)

var (
Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
Key = backend.Bucket(bucket{id: backend.BucketIdKey, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: backend.BucketIdMeta, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: backend.BucketIdLease, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: backend.BucketIdAlarm, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: backend.BucketIdCluster, name: clusterBucketName, safeRangeBucket: false})

Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})
Members = backend.Bucket(bucket{id: backend.BucketIdMembers, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: backend.BucketIdMembersRemoved, name: membersRemovedBucketName, safeRangeBucket: false})

Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
Auth = backend.Bucket(bucket{id: backend.BucketIdAuth, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: backend.BucketIdAuthUsers, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: backend.BucketIdAuthRoles, name: authRolesBucketName, safeRangeBucket: false})

Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
Test = backend.Bucket(bucket{id: backend.BucketIdTest, name: testBucketName, safeRangeBucket: false})
)

type bucket struct {
Expand Down

0 comments on commit 6b9b7a8

Please sign in to comment.