diff --git a/pkg/kvstore/etcd_lease.go b/pkg/kvstore/etcd_lease.go new file mode 100644 index 0000000000000..40caf80571301 --- /dev/null +++ b/pkg/kvstore/etcd_lease.go @@ -0,0 +1,270 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package kvstore + +import ( + "context" + "errors" + "strings" + "sync" + "time" + + "github.com/sirupsen/logrus" + v3rpcErrors "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + client "go.etcd.io/etcd/client/v3" + + "github.com/cilium/cilium/pkg/lock" +) + +// etcdLeaseClient represents the subset of the etcd client methods used to handle the leases lifecycle. +type etcdLeaseClient interface { + Grant(ctx context.Context, ttl int64) (*client.LeaseGrantResponse, error) + KeepAlive(ctx context.Context, id client.LeaseID) (<-chan *client.LeaseKeepAliveResponse, error) + Ctx() context.Context +} + +type leaseInfo struct { + count uint32 + cancel context.CancelFunc +} + +// etcdLeaseManager manages the acquisition of the leases, and keeps track of +// which lease is attached to which etcd key. +type etcdLeaseManager struct { + client etcdLeaseClient + log *logrus.Entry + + ttl time.Duration + limit uint32 + expired func(key string) + + mu lock.RWMutex + leases map[client.LeaseID]*leaseInfo + keys map[string]client.LeaseID + current client.LeaseID + + acquiring chan struct{} + wg sync.WaitGroup +} + +// newEtcdLeaseManager builds and returns a new lease manager instance. +func newEtcdLeaseManager(cl etcdLeaseClient, ttl time.Duration, limit uint32, expired func(key string), log *logrus.Entry) *etcdLeaseManager { + return &etcdLeaseManager{ + client: cl, + log: log, + + ttl: ttl, + limit: limit, + expired: expired, + + current: client.NoLease, + leases: make(map[client.LeaseID]*leaseInfo), + keys: make(map[string]client.LeaseID), + } +} + +// GetLeaseID returns a lease ID, and associates it to the given key. It leverages +// one of the already acquired leases if they are not already attached to too many +// keys, otherwise a new one is acquired. +func (elm *etcdLeaseManager) GetLeaseID(ctx context.Context, key string) (client.LeaseID, error) { + elm.mu.Lock() + + // This key is already attached to a lease, hence just return it. + if leaseID := elm.keys[key]; leaseID != client.NoLease { + elm.mu.Unlock() + return leaseID, nil + } + + // Return the current lease if it has not been used more than limit times + if info := elm.leases[elm.current]; info != nil && info.count < elm.limit { + info.count++ + elm.keys[key] = elm.current + elm.mu.Unlock() + + return elm.current, nil + } + + // Otherwise, loop through the other known leases to see if any has been released + for lease, info := range elm.leases { + if info.count < elm.limit { + elm.current = lease + info.count++ + elm.keys[key] = elm.current + elm.mu.Unlock() + + return elm.current, nil + } + } + + // If none is found, we need to acquire a new lease. acquiring is a channel + // used to detect whether we are already in the process of acquiring a new + // lease, to prevent multiple acquisitions in parallel. + acquiring := elm.acquiring + if acquiring == nil { + elm.acquiring = make(chan struct{}) + } + + // Unlock, so that we don't block other paraller operations (e.g., releases) + // while acquiring a new lease, since it might be a slow operation. + elm.mu.Unlock() + + // Someone else is already acquiring a new lease. Wait until + // it completes, and then retry again. + if acquiring != nil { + select { + case <-acquiring: + return elm.GetLeaseID(ctx, key) + case <-ctx.Done(): + return client.NoLease, ctx.Err() + } + } + + // Otherwise, we can proceed to acquire a new lease. + leaseID, cancel, err := elm.newLease(ctx) + + elm.mu.Lock() + + // Signal that the acquisition process has completed. + close(elm.acquiring) + elm.acquiring = nil + + if err != nil { + elm.mu.Unlock() + return client.NoLease, err + } + + elm.current = leaseID + elm.leases[leaseID] = &leaseInfo{cancel: cancel} + elm.mu.Unlock() + + return elm.GetLeaseID(ctx, key) +} + +// Release decrements the counter of the lease attached to the given key. +func (elm *etcdLeaseManager) Release(key string) { + elm.mu.Lock() + defer elm.mu.Unlock() + + elm.releaseUnlocked(key) +} + +// ReleasePrefix decrements the counter of the leases attached to the keys +// starting with the given prefix. +func (elm *etcdLeaseManager) ReleasePrefix(prefix string) { + elm.mu.Lock() + defer elm.mu.Unlock() + + for key, leaseID := range elm.keys { + if strings.HasPrefix(key, prefix) { + if info := elm.leases[leaseID]; info != nil && info.count > 0 { + info.count-- + } + delete(elm.keys, key) + } + } +} + +// KeyHasLease returns whether the given key is associated with the specified lease. +func (elm *etcdLeaseManager) KeyHasLease(key string, leaseID client.LeaseID) bool { + elm.mu.RLock() + defer elm.mu.RUnlock() + + return elm.keys[key] == leaseID +} + +// CancelIfExpired verifies whether the error reports that the given lease has +// expired, and in that case aborts the corresponding keepalive process. +func (elm *etcdLeaseManager) CancelIfExpired(err error, leaseID client.LeaseID) { + if errors.Is(err, v3rpcErrors.ErrLeaseNotFound) { + elm.mu.Lock() + if info := elm.leases[leaseID]; info != nil { + info.cancel() + } + elm.mu.Unlock() + } +} + +// TotalLeases returns the number of managed leases. +func (elm *etcdLeaseManager) TotalLeases() uint32 { + elm.mu.RLock() + defer elm.mu.RUnlock() + + return uint32(len(elm.leases)) +} + +// Wait waits until all child goroutines terminated. +func (elm *etcdLeaseManager) Wait() { + elm.wg.Wait() +} + +func (elm *etcdLeaseManager) newLease(ctx context.Context) (client.LeaseID, context.CancelFunc, error) { + resp, err := elm.client.Grant(ctx, int64(elm.ttl.Seconds())) + if err != nil { + return client.NoLease, nil, err + } + leaseID := resp.ID + + kctx, cancel := context.WithCancel(context.Background()) + keepalive, err := elm.client.KeepAlive(kctx, leaseID) + if err != nil { + cancel() + return client.NoLease, nil, err + } + + elm.wg.Add(1) + go elm.keepalive(kctx, leaseID, keepalive) + + elm.log.WithFields(logrus.Fields{ + "LeaseID": leaseID, + "TTL": elm.ttl, + }).Info("New lease successfully acquired") + return leaseID, cancel, nil +} + +func (elm *etcdLeaseManager) keepalive(ctx context.Context, leaseID client.LeaseID, + keepalive <-chan *client.LeaseKeepAliveResponse) { + defer elm.wg.Done() + + for range keepalive { + // Consume the keepalive messages until the channel is closed + } + + select { + case <-elm.client.Ctx().Done(): + // The context of the etcd client was closed + return + case <-ctx.Done(): + default: + } + + elm.log.WithField("LeaseID", leaseID).Warning("Lease expired") + + elm.mu.Lock() + delete(elm.leases, leaseID) + + var keys []string + for key, id := range elm.keys { + if id == leaseID { + keys = append(keys, key) + delete(elm.keys, key) + } + } + elm.mu.Unlock() + + if elm.expired != nil { + for _, key := range keys { + elm.expired(key) + } + } +} + +func (elm *etcdLeaseManager) releaseUnlocked(key string) { + leaseID := elm.keys[key] + if leaseID != client.NoLease { + if info := elm.leases[leaseID]; info != nil && info.count > 0 { + info.count-- + } + delete(elm.keys, key) + } +} diff --git a/pkg/kvstore/etcd_lease_test.go b/pkg/kvstore/etcd_lease_test.go new file mode 100644 index 0000000000000..9b787f9e36fe0 --- /dev/null +++ b/pkg/kvstore/etcd_lease_test.go @@ -0,0 +1,250 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package kvstore + +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + v3rpcErrors "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + client "go.etcd.io/etcd/client/v3" +) + +type fakeEtcdLeaseClient struct { + ctx context.Context + expectedTTLSeconds int64 + grantDelay time.Duration + + lease client.LeaseID + contexts map[client.LeaseID]context.Context +} + +func newFakeEtcdLeaseClient(ctx context.Context, expectedTTLSeconds int64) fakeEtcdLeaseClient { + return fakeEtcdLeaseClient{ + ctx: ctx, + expectedTTLSeconds: expectedTTLSeconds, + contexts: make(map[client.LeaseID]context.Context), + } +} + +func (f *fakeEtcdLeaseClient) Grant(ctx context.Context, ttl int64) (*client.LeaseGrantResponse, error) { + time.Sleep(f.grantDelay) + + f.lease++ + if ttl != f.expectedTTLSeconds { + return nil, fmt.Errorf("incorrect TTL, expected: %v, found: %v", f.expectedTTLSeconds, ttl) + } + + return &client.LeaseGrantResponse{ID: f.lease}, nil +} + +func (f *fakeEtcdLeaseClient) KeepAlive(ctx context.Context, id client.LeaseID) (<-chan *client.LeaseKeepAliveResponse, error) { + if id != f.lease { + return nil, fmt.Errorf("incorrect lease ID, expected: %v, found: %v", f.lease, id) + } + + ch := make(chan *client.LeaseKeepAliveResponse) + go func() { + select { + case <-f.ctx.Done(): + case <-ctx.Done(): + } + + close(ch) + }() + + f.contexts[id] = ctx + return ch, nil +} + +func (f *fakeEtcdLeaseClient) Ctx() context.Context { + return f.ctx +} + +func TestLeaseManager(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cl := newFakeEtcdLeaseClient(ctx, 10) + mgr := newEtcdLeaseManager(&cl, 10*time.Second, 5, nil, log) + + t.Cleanup(func() { + cancel() + mgr.Wait() + }) + + // Get the lease ID five times, and assert that the same ID is always returned + for i := 0; i < 5; i++ { + leaseID, err := mgr.GetLeaseID(ctx, fmt.Sprintf("key%d", i)) + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(1), leaseID) + } + + // Get the lease ID five more times, and assert that the same ID is always returned + for i := 0; i < 5; i++ { + leaseID, err := mgr.GetLeaseID(ctx, fmt.Sprintf("key%d", i+5)) + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(2), leaseID) + } + + // Release a few IDs and acquire than back + mgr.Release("key2") + mgr.Release("key4") + + leaseID, err := mgr.GetLeaseID(ctx, "key11") + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(1), leaseID) + + leaseID, err = mgr.GetLeaseID(ctx, "key12") + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(1), leaseID) + + // Getting yet another ID, which should be different + leaseID, err = mgr.GetLeaseID(ctx, "key13") + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(3), leaseID) + + // Getting an ID for an already known key should return the same lease + leaseID, err = mgr.GetLeaseID(ctx, "key1") + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(1), leaseID) + + require.Equal(t, uint32(3), mgr.TotalLeases()) +} + +func TestLeaseManagerParallel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cl := newFakeEtcdLeaseClient(ctx, 10) + mgr := newEtcdLeaseManager(&cl, 10*time.Second, 5, nil, log) + + t.Cleanup(func() { + cancel() + mgr.Wait() + }) + + ch := make(chan client.LeaseID, 0) + + // Perform multiple requests in parallel, simulating a slow client, and + // assert that they all return the same lease ID + cl.grantDelay = 500 * time.Millisecond + + for i := 0; i < 4; i++ { + go func(idx int) { + leaseID, err := mgr.GetLeaseID(ctx, fmt.Sprintf("key%d", idx)) + require.NoError(t, err, "GetLeaseID should succeed") + ch <- leaseID + }(i) + } + + for i := 0; i < 4; i++ { + require.Equal(t, client.LeaseID(1), <-ch) + } +} + +func TestLeaseManagerReleasePrefix(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cl := newFakeEtcdLeaseClient(ctx, 10) + mgr := newEtcdLeaseManager(&cl, 10*time.Second, 5, nil, log) + + t.Cleanup(func() { + cancel() + mgr.Wait() + }) + + for i := 0; i < 9; i++ { + leaseID, err := mgr.GetLeaseID(ctx, fmt.Sprintf("key%d%d", i/3, i)) + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(1+i/5), leaseID) + } + + // Delete the prefix which includes keys attached to both leases + mgr.ReleasePrefix("key1") + + for i := 0; i < 9; i++ { + // Verify that the leases for the keys matching the prefix have been + // released, and that the others are still in place. + require.Equal(t, i/3 != 1, mgr.KeyHasLease(fmt.Sprintf("key%d%d", i/3, i), client.LeaseID(1+i/5))) + } +} + +func TestLeaseManagerCancelIfExpired(t *testing.T) { + expiredCH := make(chan string) + observer := func(key string) { + expiredCH <- key + } + + ctx, cancel := context.WithCancel(context.Background()) + cl := newFakeEtcdLeaseClient(ctx, 10) + mgr := newEtcdLeaseManager(&cl, 10*time.Second, 5, observer, log) + + t.Cleanup(func() { + close(expiredCH) + cancel() + mgr.Wait() + }) + + for i := 0; i < 15; i++ { + leaseID, err := mgr.GetLeaseID(ctx, fmt.Sprintf("key%d", i)) + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(1+i/5), leaseID) + } + + mgr.CancelIfExpired(nil, client.LeaseID(2)) + mgr.CancelIfExpired(fmt.Errorf("something else"), client.LeaseID(2)) + mgr.CancelIfExpired(v3rpcErrors.ErrLeaseNotFound, client.LeaseID(10)) + + // The keepalive context should not have been closed + require.NoError(t, cl.contexts[client.LeaseID(1)].Err()) + require.NoError(t, cl.contexts[client.LeaseID(2)].Err()) + require.NoError(t, cl.contexts[client.LeaseID(3)].Err()) + + mgr.CancelIfExpired(v3rpcErrors.ErrLeaseNotFound, client.LeaseID(2)) + + // The keepalive context for the second lease should have been closed + require.NoError(t, cl.contexts[client.LeaseID(1)].Err()) + require.Error(t, cl.contexts[client.LeaseID(2)].Err()) + require.NoError(t, cl.contexts[client.LeaseID(3)].Err()) + + // Ensure consistent ordering since the expired entries are retrieved from a map. + var expired []string + for i := 0; i < 5; i++ { + expired = append(expired, <-expiredCH) + } + sort.Strings(expired) + require.ElementsMatch(t, expired, []string{"key5", "key6", "key7", "key8", "key9"}) + + // Get the lease for one of the expired keys, and check that it is a different one. + leaseID, err := mgr.GetLeaseID(ctx, "key7") + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(4), leaseID) +} + +func TestLeaseManagerKeyHasLease(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cl := newFakeEtcdLeaseClient(ctx, 10) + mgr := newEtcdLeaseManager(&cl, 10*time.Second, 5, nil, log) + + t.Cleanup(func() { + cancel() + mgr.Wait() + }) + + for i := 0; i < 8; i++ { + leaseID, err := mgr.GetLeaseID(ctx, fmt.Sprintf("key%d", i)) + require.NoError(t, err, "GetLeaseID should succeed") + require.Equal(t, client.LeaseID(1+i/5), leaseID) + } + + // Correct lease ID + require.True(t, mgr.KeyHasLease("key3", client.LeaseID(1))) + require.True(t, mgr.KeyHasLease("key7", client.LeaseID(2))) + + // Incorrect lease ID + require.False(t, mgr.KeyHasLease("key7", client.LeaseID(1))) + + // Non existing key + require.False(t, mgr.KeyHasLease("key99", client.LeaseID(1))) +}