Skip to content

Commit

Permalink
Merge pull request #13726 from chaochn47/backport_13676_to_3_5
Browse files Browse the repository at this point in the history
backport 3.5: #13676 load all leases from backend
  • Loading branch information
serathius authored Mar 8, 2022
2 parents 541635e + f634b44 commit 39baf36
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 10 deletions.
38 changes: 28 additions & 10 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"sort"
"sync"
Expand Down Expand Up @@ -799,15 +800,9 @@ func (le *lessor) initAndRecover() {
tx.Lock()

tx.UnsafeCreateBucket(buckets.Lease)
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
lpbs := unsafeGetAllLeases(tx)
tx.Unlock()
for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
Expand All @@ -825,7 +820,6 @@ func (le *lessor) initAndRecover() {
}
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()

le.b.ForceCommit()
}
Expand Down Expand Up @@ -923,6 +917,30 @@ func int64ToBytes(n int64) []byte {
return bytes
}

func bytesToLeaseID(bytes []byte) int64 {
if len(bytes) != 8 {
panic(fmt.Errorf("lease ID must be 8-byte"))
}
return int64(binary.BigEndian.Uint64(bytes))
}

func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
ls := make([]*leasepb.Lease, 0)
err := tx.UnsafeForEach(buckets.Lease, func(k, v []byte) error {
var lpb leasepb.Lease
err := lpb.Unmarshal(v)
if err != nil {
return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k))
}
ls = append(ls, &lpb)
return nil
})
if err != nil {
panic(err)
}
return ls
}

// FakeLessor is a fake implementation of Lessor interface.
// Used for testing only.
type FakeLessor struct{}
Expand Down
91 changes: 91 additions & 0 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"reflect"
Expand All @@ -27,9 +28,12 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -649,6 +653,93 @@ func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
}
}

func TestLeaseBackend(t *testing.T) {
tcs := []struct {
name string
setup func(tx backend.BatchTx)
want []*leasepb.Lease
}{
{
name: "Empty by default",
setup: func(tx backend.BatchTx) {},
want: []*leasepb.Lease{},
},
{
name: "Returns data put before",
setup: func(tx backend.BatchTx) {
mustUnsafePutLease(tx, &leasepb.Lease{
ID: -1,
TTL: 2,
})
},
want: []*leasepb.Lease{
{
ID: -1,
TTL: 2,
},
},
},
{
name: "Skips deleted",
setup: func(tx backend.BatchTx) {
mustUnsafePutLease(tx, &leasepb.Lease{
ID: -1,
TTL: 2,
})
mustUnsafePutLease(tx, &leasepb.Lease{
ID: math.MinInt64,
TTL: 2,
})
mustUnsafePutLease(tx, &leasepb.Lease{
ID: math.MaxInt64,
TTL: 3,
})
tx.UnsafeDelete(buckets.Lease, int64ToBytes(-1))
},
want: []*leasepb.Lease{
{
ID: math.MaxInt64,
TTL: 3,
},
{
ID: math.MinInt64, // bytes bigger than MaxInt64
TTL: 2,
},
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Lease)
tc.setup(tx)
tx.Unlock()

be.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
leases := unsafeGetAllLeases(be2.ReadTx())

assert.Equal(t, tc.want, leases)
})
}
}

func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := int64ToBytes(lpb.ID)

val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(buckets.Lease, key, val)
}

type fakeDeleter struct {
deleted []string
tx backend.BatchTx
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type ClusterConfig struct {
LeaseCheckpointPersist bool

WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}

type cluster struct {
Expand Down Expand Up @@ -332,6 +333,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
CorruptCheckTime: c.cfg.CorruptCheckTime,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
Expand Down Expand Up @@ -635,6 +637,7 @@ type memberConfig struct {
leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}

// mustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -737,6 +740,9 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval

m.InitialCorruptCheck = true
if mcfg.CorruptCheckTime > time.Duration(0) {
m.CorruptCheckTime = mcfg.CorruptCheckTime
}
m.WarningApplyDuration = embed.DefaultWarningApplyDuration

m.V2Deprecation = config.V2_DEPR_DEFAULT
Expand Down
119 changes: 119 additions & 0 deletions tests/integration/v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"context"
"encoding/binary"
"os"
"path/filepath"
"sync"
Expand All @@ -25,8 +26,10 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -228,3 +231,119 @@ func TestV3CorruptAlarm(t *testing.T) {
}
t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
}

func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{
CorruptCheckTime: time.Second,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
})
defer clus.Terminate(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60})
if err != nil {
t.Errorf("could not create lease 1 (%v)", err)
}
if lresp.ID != 1 {
t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
}

putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}
// Trigger snapshot from the leader to new member
for i := 0; i < 15; i++ {
_, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", i, err)
}
}

clus.RemoveMember(t, uint64(clus.Members[2].ID()))
oldMemberClient := clus.Client(2)
if err := oldMemberClient.Close(); err != nil {
t.Fatal(err)
}

clus.AddMember(t)
// Wait for new member to catch up
newMemberClient, err := clus.NewClientV3(2)
if err != nil {
t.Fatal(err)
}
WaitClientV3(t, newMemberClient)
clus.clients[2] = newMemberClient

// Corrupt member 2 by modifying backend lease bucket offline.
clus.Members[2].Stop(t)
fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db")
bcfg := backend.DefaultBackendConfig()
bcfg.Path = fp
bcfg.Logger = zaptest.NewLogger(t)
be := backend.New(bcfg)

tx := be.BatchTx()
tx.UnsafeDelete(buckets.Lease, leaseIdToBytes(1))
lpb := leasepb.Lease{ID: int64(2), TTL: 60}
mustUnsafePutLease(tx, &lpb)
tx.Commit()

if err := be.Close(); err != nil {
t.Fatal(err)
}

if err := clus.Members[2].Restart(t); err != nil {
t.Fatal(err)
}

clus.Members[1].WaitOK(t)
clus.Members[2].WaitOK(t)

// Revoke lease should remove key except the member with corruption
_, err = toGRPC(clus.Client(0)).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID})
if err != nil {
t.Fatal(err)
}
resp0, err0 := clus.Client(1).KV.Get(context.TODO(), "foo")
if err0 != nil {
t.Fatal(err0)
}
resp1, err1 := clus.Client(2).KV.Get(context.TODO(), "foo")
if err1 != nil {
t.Fatal(err1)
}

if resp0.Header.Revision == resp1.Header.Revision {
t.Fatalf("matching Revision values")
}

// Wait for CorruptCheckTime
time.Sleep(time.Second)
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
if perr != nil {
if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
} else {
return
}
}
}

func leaseIdToBytes(n int64) []byte {
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, uint64(n))
return bytes
}

func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := leaseIdToBytes(lpb.ID)

val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(buckets.Lease, key, val)
}
Loading

0 comments on commit 39baf36

Please sign in to comment.