diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 0d63f558449..527088d4b04 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -28,6 +28,23 @@ import ( type BucketID int +const ( + BucketIdKey BucketID = 1 + BucketIdMeta BucketID = 2 + BucketIdLease BucketID = 3 + BucketIdAlarm BucketID = 4 + BucketIdCluster BucketID = 5 + + BucketIdMembers BucketID = 10 + BucketIdMembersRemoved BucketID = 11 + + BucketIdAuth BucketID = 20 + BucketIdAuthUsers BucketID = 21 + BucketIdAuthRoles BucketID = 22 + + BucketIdTest BucketID = 100 +) + type Bucket interface { // ID returns a unique identifier of a bucket. // The id must NOT be persisted and can be used as lightweight identificator diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index 7c2f9d63ac4..d01094b4c1a 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -16,6 +16,8 @@ package backend import ( "bytes" + "encoding/hex" + "fmt" "sort" "go.etcd.io/etcd/client/pkg/v3/verify" @@ -52,7 +54,6 @@ func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) { } func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) { - // TODO: Add (in tests?) verification whether k>b[len(b)] txw.putInternal(bucket, k, v) } @@ -83,13 +84,31 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { if !ok { delete(txw.buckets, k) txr.buckets[k] = wb - continue + rb = wb + } else { + rb.merge(wb) } - if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { - // assume no duplicate keys - sort.Sort(wb) + + // Only verify the Key bucket. Reasons: + // 1. The keys in the Key bucket are monotonically increasing + // revisions, so there will never have duplicated keys, and + // all the keys should be already sorted. So no need to sort + // them again, also no need to perform the operation of removing + // duplicated keys from the buffer. The Key bucket is the most + // performance sensitive bucket, so it can also increase the + // performance (Need to run benchmark the double confirm this). + // 2. Currently, Meta bucket is the only case which might have + // duplicated keys. In case we add other buckets in the future, + // which may break the invariant property. Other buckets are + // also not performance sensitive, so we just keep them as they + // are for simplicity. + // + if k == BucketIdKey { + verifyMonotonicallyIncreasing(rb) + } else { + rb.dedupe() } - rb.merge(wb) + } txw.reset() // increase the buffer version @@ -197,16 +216,13 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { for i := 0; i < bbsrc.used; i++ { bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val) } - if bb.used == bbsrc.used { - return - } - if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 { +} + +func (bb *bucketBuffer) dedupe() { + if bb.used <= 1 { return } - sort.Stable(bb) - - // remove duplicates, using only newest update widx := 0 for ridx := 1; ridx < bb.used; ridx++ { if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { @@ -217,6 +233,19 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { bb.used = widx + 1 } +func verifyMonotonicallyIncreasing(bb *bucketBuffer) { + verify.Verify(func() { + for i := 1; i < bb.used; i++ { + prev := bb.buf[i-1] + cur := bb.buf[i] + if bytes.Compare(prev.key, cur.key) >= 0 { + panic(fmt.Sprintf("Broke the rule of monotonically increasing, key[%d]: %s, key[%d]: %s", + i-1, hex.EncodeToString(prev.key), i, hex.EncodeToString(cur.key))) + } + } + }) +} + func (bb *bucketBuffer) Len() int { return bb.used } func (bb *bucketBuffer) Less(i, j int) bool { return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0 diff --git a/server/storage/schema/bucket.go b/server/storage/schema/bucket.go index 5472af3c3b4..5c8d4712aa6 100644 --- a/server/storage/schema/bucket.go +++ b/server/storage/schema/bucket.go @@ -40,20 +40,20 @@ var ( ) var ( - Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true}) - Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false}) - Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false}) - Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false}) - Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false}) + Key = backend.Bucket(bucket{id: backend.BucketIdKey, name: keyBucketName, safeRangeBucket: true}) + Meta = backend.Bucket(bucket{id: backend.BucketIdMeta, name: metaBucketName, safeRangeBucket: false}) + Lease = backend.Bucket(bucket{id: backend.BucketIdLease, name: leaseBucketName, safeRangeBucket: false}) + Alarm = backend.Bucket(bucket{id: backend.BucketIdAlarm, name: alarmBucketName, safeRangeBucket: false}) + Cluster = backend.Bucket(bucket{id: backend.BucketIdCluster, name: clusterBucketName, safeRangeBucket: false}) - Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false}) - MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false}) + Members = backend.Bucket(bucket{id: backend.BucketIdMembers, name: membersBucketName, safeRangeBucket: false}) + MembersRemoved = backend.Bucket(bucket{id: backend.BucketIdMembersRemoved, name: membersRemovedBucketName, safeRangeBucket: false}) - Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false}) - AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false}) - AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false}) + Auth = backend.Bucket(bucket{id: backend.BucketIdAuth, name: authBucketName, safeRangeBucket: false}) + AuthUsers = backend.Bucket(bucket{id: backend.BucketIdAuthUsers, name: authUsersBucketName, safeRangeBucket: false}) + AuthRoles = backend.Bucket(bucket{id: backend.BucketIdAuthRoles, name: authRolesBucketName, safeRangeBucket: false}) - Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false}) + Test = backend.Bucket(bucket{id: backend.BucketIdTest, name: testBucketName, safeRangeBucket: false}) ) type bucket struct {