Skip to content

Commit

Permalink
Add verification to verify there is no duplicated keys in the bucket …
Browse files Browse the repository at this point in the history
…buffer

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Jan 19, 2024
1 parent e3c009e commit b46e6ee
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 31 deletions.
7 changes: 1 addition & 6 deletions server/storage/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,6 @@ func TestBackendWriteback(t *testing.T) {
tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("1"))
tx.Unlock()

// overwrites should be propagated too
tx.Lock()
tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("2"))
tx.Unlock()

keys := []struct {
key []byte
end []byte
Expand Down Expand Up @@ -254,7 +249,7 @@ func TestBackendWriteback(t *testing.T) {
end: []byte("\xff"),

wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")},
wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")},
wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("1")},
},
}
rtx := b.ReadTx()
Expand Down
17 changes: 17 additions & 0 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ import (

type BucketID int

const (
BucketIdKey BucketID = 1
BucketIdMeta BucketID = 2
BucketIdLease BucketID = 3
BucketIdAlarm BucketID = 4
BucketIdCluster BucketID = 5

BucketIdMembers BucketID = 10
BucketIdMembersRemoved BucketID = 11

BucketIdAuth BucketID = 20
BucketIdAuthUsers BucketID = 21
BucketIdAuthRoles BucketID = 22

BucketIdTest BucketID = 100
)

type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
Expand Down
62 changes: 49 additions & 13 deletions server/storage/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package backend

import (
"bytes"
"encoding/hex"
"fmt"
"sort"

"go.etcd.io/etcd/client/pkg/v3/verify"
Expand Down Expand Up @@ -83,13 +85,43 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
if !ok {
delete(txw.buckets, k)
txr.buckets[k] = wb
continue
rb = wb
} else {
if seq, okSeq := txw.bucket2seq[k]; okSeq && !seq && wb.used > 1 {
// assume no duplicate keys
sort.Sort(wb)
}
rb.merge(wb)
}
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
// assume no duplicate keys
sort.Sort(wb)
// Only verify the Key bucket. Reasons:
// 1. The keys in the Key bucket are monotonically increasing
// revisions, so there will never have duplicated keys. So no
// need to perform the operation of removing duplicated keys
// from the Key bucket. The Key bucket is the most performance
// sensitive bucket, so it can also increase the performance
// (Need to run benchmark the double confirm this).
// 2. Currently, Meta bucket is the only case which might have
// duplicated keys. In case we add other buckets in the future,
// which may break the invariant property. Other buckets are
// also not performance sensitive, so we just keep them as they
// are for simplicity.
//
if k == BucketIdKey {
verifyNoDuplicatedKeys(rb)
} else {
if rb.used <= 1 {
continue
}
sort.Stable(rb)
widx := 0
for ridx := 1; ridx < rb.used; ridx++ {
if !bytes.Equal(rb.buf[ridx].key, rb.buf[widx].key) {
widx++
}
rb.buf[widx] = rb.buf[ridx]
}
rb.used = widx + 1
}
rb.merge(wb)
}
txw.reset()
// increase the buffer version
Expand Down Expand Up @@ -205,16 +237,20 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
}

sort.Stable(bb)
}

// remove duplicates, using only newest update
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
widx++
func verifyNoDuplicatedKeys(bb *bucketBuffer) {
verify.Verify(func() {
keyMaps := make(map[string]struct{})
for i := 0; i < bb.used; i++ {
data := bb.buf[i]
key := hex.EncodeToString(data.key)
if _, ok := keyMaps[key]; ok {
panic(fmt.Sprintf("found duplicated keys in the bucketBuffer, bb.used: %d, key: %s", bb.used, key))
}
keyMaps[key] = struct{}{}
}
bb.buf[widx] = bb.buf[ridx]
}
bb.used = widx + 1
})
}

func (bb *bucketBuffer) Len() int { return bb.used }
Expand Down
3 changes: 2 additions & 1 deletion server/storage/mvcc/kvstore_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func TestScheduleCompaction(t *testing.T) {
tx := s.b.BatchTx()

tx.Lock()
ibytes := NewRevBytes()
for _, rev := range revs {
ibytes := NewRevBytes()
ibytes = RevToBytes(rev, ibytes)
tx.UnsafePut(schema.Key, ibytes, []byte("bar"))
}
Expand All @@ -90,6 +90,7 @@ func TestScheduleCompaction(t *testing.T) {

tx.Lock()
for _, rev := range tt.wrevs {
ibytes := NewRevBytes()
ibytes = RevToBytes(rev, ibytes)
keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0)
if len(keys) != 1 {
Expand Down
22 changes: 11 additions & 11 deletions server/storage/schema/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ var (
)

var (
Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
Key = backend.Bucket(bucket{id: backend.BucketIdKey, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: backend.BucketIdMeta, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: backend.BucketIdLease, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: backend.BucketIdAlarm, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: backend.BucketIdCluster, name: clusterBucketName, safeRangeBucket: false})

Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})
Members = backend.Bucket(bucket{id: backend.BucketIdMembers, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: backend.BucketIdMembersRemoved, name: membersRemovedBucketName, safeRangeBucket: false})

Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
Auth = backend.Bucket(bucket{id: backend.BucketIdAuth, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: backend.BucketIdAuthUsers, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: backend.BucketIdAuthRoles, name: authRolesBucketName, safeRangeBucket: false})

Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
Test = backend.Bucket(bucket{id: backend.BucketIdTest, name: testBucketName, safeRangeBucket: false})
)

type bucket struct {
Expand Down

0 comments on commit b46e6ee

Please sign in to comment.