diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 7236515f2b3..666fa746020 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -336,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error { // lease deletion needs to be in the same backend transaction with the // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. - le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID))) + buckets.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)}) txn.End() @@ -768,18 +768,12 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() - tx.UnsafeCreateBucket(buckets.Lease) - _, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) - // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue. - for i := range vs { - var lpb leasepb.Lease - err := lpb.Unmarshal(vs[i]) - if err != nil { - tx.Unlock() - panic("failed to unmarshal lease proto item") - } + tx.Lock() + buckets.UnsafeCreateLeaseBucket(tx) + lpbs := buckets.MustUnsafeGetAllLeases(tx) + tx.Unlock() + for _, lpb := range lpbs { ID := LeaseID(lpb.ID) if lpb.TTL < le.minLeaseTTL { lpb.TTL = le.minLeaseTTL @@ -796,7 +790,6 @@ func (le *lessor) initAndRecover() { } le.leaseExpiredNotifier.Init() heap.Init(&le.leaseCheckpointHeap) - tx.Unlock() le.b.ForceCommit() } @@ -821,17 +814,11 @@ func (l *Lease) expired() bool { } func (l *Lease) persistTo(b backend.Backend) { - key := int64ToBytes(int64(l.ID)) - lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} - val, err := lpb.Marshal() - if err != nil { - panic("failed to marshal lease proto item") - } - - b.BatchTx().Lock() - b.BatchTx().UnsafePut(buckets.Lease, key, val) - b.BatchTx().Unlock() + tx := b.BatchTx() + tx.Lock() + defer tx.Unlock() + buckets.MustUnsafePutLease(tx, &lpb) } // TTL returns the TTL of the Lease. diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 58a4ad29086..ef421aa395c 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -92,12 +92,13 @@ func TestLessorGrant(t *testing.T) { } } - be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0) - if len(vs) != 1 { - t.Errorf("len(vs) = %d, want 1", len(vs)) + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID)) + if lpb == nil { + t.Errorf("lpb = %d, want not nil", lpb) } - be.BatchTx().Unlock() } // TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded @@ -195,12 +196,13 @@ func TestLessorRevoke(t *testing.T) { t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted) } - be.BatchTx().Lock() - _, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0) - if len(vs) != 0 { - t.Errorf("len(vs) = %d, want 0", len(vs)) + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID)) + if lpb != nil { + t.Errorf("lpb = %d, want nil", lpb) } - be.BatchTx().Unlock() } // TestLessorRenew ensures Lessor can renew an existing lease. diff --git a/server/mvcc/buckets/lease.go b/server/mvcc/buckets/lease.go new file mode 100644 index 00000000000..6aa40fc73ac --- /dev/null +++ b/server/mvcc/buckets/lease.go @@ -0,0 +1,74 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "encoding/binary" + "math" + + "go.etcd.io/etcd/server/v3/lease/leasepb" + "go.etcd.io/etcd/server/v3/mvcc/backend" +) + +func UnsafeCreateLeaseBucket(tx backend.BatchTx) { + tx.UnsafeCreateBucket(Lease) +} + +func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease { + _, vs := tx.UnsafeRange(Lease, leaseIdToBytes(0), leaseIdToBytes(math.MaxInt64), 0) + ls := make([]*leasepb.Lease, 0, len(vs)) + for i := range vs { + var lpb leasepb.Lease + err := lpb.Unmarshal(vs[i]) + if err != nil { + panic("failed to unmarshal lease proto item") + } + ls = append(ls, &lpb) + } + return ls +} + +func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) { + key := leaseIdToBytes(lpb.ID) + + val, err := lpb.Marshal() + if err != nil { + panic("failed to marshal lease proto item") + } + tx.UnsafePut(Lease, key, val) +} + +func UnsafeDeleteLease(tx backend.BatchTx, lpb *leasepb.Lease) { + tx.UnsafeDelete(Lease, leaseIdToBytes(lpb.ID)) +} + +func MustUnsafeGetLease(tx backend.BatchTx, leaseID int64) *leasepb.Lease { + _, vs := tx.UnsafeRange(Lease, leaseIdToBytes(leaseID), nil, 0) + if len(vs) != 1 { + return nil + } + var lpb leasepb.Lease + err := lpb.Unmarshal(vs[0]) + if err != nil { + panic("failed to unmarshal lease proto item") + } + return &lpb +} + +func leaseIdToBytes(n int64) []byte { + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, uint64(n)) + return bytes +}