From b46e6ee9fff23c53d39cc8a0a486f2349461fb31 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 19 Jan 2024 10:32:45 +0000 Subject: [PATCH] Add verification to verify there is no duplicated keys in the bucket buffer Signed-off-by: Benjamin Wang --- server/storage/backend/backend_test.go | 7 +-- server/storage/backend/batch_tx.go | 17 +++++ server/storage/backend/tx_buffer.go | 62 +++++++++++++++---- .../storage/mvcc/kvstore_compaction_test.go | 3 +- server/storage/schema/bucket.go | 22 +++---- 5 files changed, 80 insertions(+), 31 deletions(-) diff --git a/server/storage/backend/backend_test.go b/server/storage/backend/backend_test.go index 57f31a7b4d06..1ac162d3bf51 100644 --- a/server/storage/backend/backend_test.go +++ b/server/storage/backend/backend_test.go @@ -207,11 +207,6 @@ func TestBackendWriteback(t *testing.T) { tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("1")) tx.Unlock() - // overwrites should be propagated too - tx.Lock() - tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("2")) - tx.Unlock() - keys := []struct { key []byte end []byte @@ -254,7 +249,7 @@ func TestBackendWriteback(t *testing.T) { end: []byte("\xff"), wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}, - wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")}, + wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("1")}, }, } rtx := b.ReadTx() diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 0d63f558449b..527088d4b04e 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -28,6 +28,23 @@ import ( type BucketID int +const ( + BucketIdKey BucketID = 1 + BucketIdMeta BucketID = 2 + BucketIdLease BucketID = 3 + BucketIdAlarm BucketID = 4 + BucketIdCluster BucketID = 5 + + BucketIdMembers BucketID = 10 + BucketIdMembersRemoved BucketID = 11 + + BucketIdAuth BucketID = 20 + BucketIdAuthUsers BucketID = 21 + BucketIdAuthRoles BucketID = 22 + + BucketIdTest BucketID = 100 +) + type Bucket interface { // ID returns a unique identifier of a bucket. // The id must NOT be persisted and can be used as lightweight identificator diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index 7c2f9d63ac4a..ee46ede8b5ed 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -16,6 +16,8 @@ package backend import ( "bytes" + "encoding/hex" + "fmt" "sort" "go.etcd.io/etcd/client/pkg/v3/verify" @@ -83,13 +85,43 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { if !ok { delete(txw.buckets, k) txr.buckets[k] = wb - continue + rb = wb + } else { + if seq, okSeq := txw.bucket2seq[k]; okSeq && !seq && wb.used > 1 { + // assume no duplicate keys + sort.Sort(wb) + } + rb.merge(wb) } - if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { - // assume no duplicate keys - sort.Sort(wb) + // Only verify the Key bucket. Reasons: + // 1. The keys in the Key bucket are monotonically increasing + // revisions, so there will never have duplicated keys. So no + // need to perform the operation of removing duplicated keys + // from the Key bucket. The Key bucket is the most performance + // sensitive bucket, so it can also increase the performance + // (Need to run benchmark the double confirm this). + // 2. Currently, Meta bucket is the only case which might have + // duplicated keys. In case we add other buckets in the future, + // which may break the invariant property. Other buckets are + // also not performance sensitive, so we just keep them as they + // are for simplicity. + // + if k == BucketIdKey { + verifyNoDuplicatedKeys(rb) + } else { + if rb.used <= 1 { + continue + } + sort.Stable(rb) + widx := 0 + for ridx := 1; ridx < rb.used; ridx++ { + if !bytes.Equal(rb.buf[ridx].key, rb.buf[widx].key) { + widx++ + } + rb.buf[widx] = rb.buf[ridx] + } + rb.used = widx + 1 } - rb.merge(wb) } txw.reset() // increase the buffer version @@ -205,16 +237,20 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { } sort.Stable(bb) +} - // remove duplicates, using only newest update - widx := 0 - for ridx := 1; ridx < bb.used; ridx++ { - if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { - widx++ +func verifyNoDuplicatedKeys(bb *bucketBuffer) { + verify.Verify(func() { + keyMaps := make(map[string]struct{}) + for i := 0; i < bb.used; i++ { + data := bb.buf[i] + key := hex.EncodeToString(data.key) + if _, ok := keyMaps[key]; ok { + panic(fmt.Sprintf("found duplicated keys in the bucketBuffer, bb.used: %d, key: %s", bb.used, key)) + } + keyMaps[key] = struct{}{} } - bb.buf[widx] = bb.buf[ridx] - } - bb.used = widx + 1 + }) } func (bb *bucketBuffer) Len() int { return bb.used } diff --git a/server/storage/mvcc/kvstore_compaction_test.go b/server/storage/mvcc/kvstore_compaction_test.go index fdab56b419bb..8e98dae3bb95 100644 --- a/server/storage/mvcc/kvstore_compaction_test.go +++ b/server/storage/mvcc/kvstore_compaction_test.go @@ -76,8 +76,8 @@ func TestScheduleCompaction(t *testing.T) { tx := s.b.BatchTx() tx.Lock() - ibytes := NewRevBytes() for _, rev := range revs { + ibytes := NewRevBytes() ibytes = RevToBytes(rev, ibytes) tx.UnsafePut(schema.Key, ibytes, []byte("bar")) } @@ -90,6 +90,7 @@ func TestScheduleCompaction(t *testing.T) { tx.Lock() for _, rev := range tt.wrevs { + ibytes := NewRevBytes() ibytes = RevToBytes(rev, ibytes) keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0) if len(keys) != 1 { diff --git a/server/storage/schema/bucket.go b/server/storage/schema/bucket.go index 5472af3c3b47..5c8d4712aa6d 100644 --- a/server/storage/schema/bucket.go +++ b/server/storage/schema/bucket.go @@ -40,20 +40,20 @@ var ( ) var ( - Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true}) - Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false}) - Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false}) - Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false}) - Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false}) + Key = backend.Bucket(bucket{id: backend.BucketIdKey, name: keyBucketName, safeRangeBucket: true}) + Meta = backend.Bucket(bucket{id: backend.BucketIdMeta, name: metaBucketName, safeRangeBucket: false}) + Lease = backend.Bucket(bucket{id: backend.BucketIdLease, name: leaseBucketName, safeRangeBucket: false}) + Alarm = backend.Bucket(bucket{id: backend.BucketIdAlarm, name: alarmBucketName, safeRangeBucket: false}) + Cluster = backend.Bucket(bucket{id: backend.BucketIdCluster, name: clusterBucketName, safeRangeBucket: false}) - Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false}) - MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false}) + Members = backend.Bucket(bucket{id: backend.BucketIdMembers, name: membersBucketName, safeRangeBucket: false}) + MembersRemoved = backend.Bucket(bucket{id: backend.BucketIdMembersRemoved, name: membersRemovedBucketName, safeRangeBucket: false}) - Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false}) - AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false}) - AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false}) + Auth = backend.Bucket(bucket{id: backend.BucketIdAuth, name: authBucketName, safeRangeBucket: false}) + AuthUsers = backend.Bucket(bucket{id: backend.BucketIdAuthUsers, name: authUsersBucketName, safeRangeBucket: false}) + AuthRoles = backend.Bucket(bucket{id: backend.BucketIdAuthRoles, name: authRolesBucketName, safeRangeBucket: false}) - Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false}) + Test = backend.Bucket(bucket{id: backend.BucketIdTest, name: testBucketName, safeRangeBucket: false}) ) type bucket struct {