diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 3f662effc..c5b97634b 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -57,6 +57,8 @@ const ( // Limit is required to avoid memory spikes during cache initialization. // The default limit of 50 is chosen based on experiments. defaultListSemaphoreWeight = 50 + // defaultEventProcessingInterval is the default interval for processing events + defaultEventProcessingInterval = 1 * time.Second ) const ( @@ -75,6 +77,11 @@ type apiMeta struct { watchCancel context.CancelFunc } +type eventMeta struct { + event watch.EventType + un *unstructured.Unstructured +} + // ClusterInfo holds cluster cache stats type ClusterInfo struct { // Server holds cluster API server URL @@ -96,6 +103,9 @@ type ClusterInfo struct { // OnEventHandler is a function that handles Kubernetes event type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured) +// OnProcessEventsHandler handles process events event +type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int) + // OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) @@ -137,6 +147,8 @@ type ClusterCache interface { OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe // OnEvent register event handler that is executed every time when new K8S event received OnEvent(handler OnEventHandler) Unsubscribe + // OnProcessEventsHandler register event handler that is executed every time when events were processed + OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe } type WeightedSemaphore interface { @@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa cache := &clusterCache{ settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}}, apisMeta: make(map[schema.GroupKind]*apiMeta), + eventMetaCh: nil, listPageSize: defaultListPageSize, listPageBufferSize: defaultListPageBufferSize, listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), @@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa }, watchResyncTimeout: defaultWatchResyncTimeout, clusterSyncRetryTimeout: ClusterRetryTimeout, + eventProcessingInterval: defaultEventProcessingInterval, resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, eventHandlers: map[uint64]OnEventHandler{}, + processEventsHandlers: map[uint64]OnProcessEventsHandler{}, log: log, listRetryLimit: 1, listRetryUseBackoff: false, @@ -186,6 +201,7 @@ type clusterCache struct { syncStatus clusterCacheSync apisMeta map[schema.GroupKind]*apiMeta + eventMetaCh chan eventMeta serverVersion string apiResources []kube.APIResourceInfo // namespacedResources is a simple map which indicates a groupKind is namespaced @@ -195,6 +211,8 @@ type clusterCache struct { watchResyncTimeout time.Duration // sync retry timeout for cluster when sync error happens clusterSyncRetryTimeout time.Duration + // ticker interval for events processing + eventProcessingInterval time.Duration // size of a page for list operations pager. listPageSize int64 @@ -224,6 +242,7 @@ type clusterCache struct { populateResourceInfoHandler OnPopulateResourceInfoHandler resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler eventHandlers map[uint64]OnEventHandler + processEventsHandlers map[uint64]OnProcessEventsHandler openAPISchema openapi.Resources gvkParser *managedfields.GvkParser @@ -299,6 +318,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler { return handlers } +// OnProcessEventsHandler register event handler that is executed every time when events were processed +func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + key := c.handlerKey + c.handlerKey++ + c.processEventsHandlers[key] = handler + return func() { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + delete(c.processEventsHandlers, key) + } +} +func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler { + c.handlersLock.Lock() + defer c.handlersLock.Unlock() + handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers)) + for _, h := range c.processEventsHandlers { + handlers = append(handlers, h) + } + return handlers +} + // GetServerVersion returns observed cluster version func (c *clusterCache) GetServerVersion() string { return c.serverVersion @@ -440,6 +482,8 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { for i := range opts { opts[i](c) } + + c.invalidateEventMeta() c.apisMeta = nil c.namespacedResources = nil c.log.Info("Invalidated cluster") @@ -669,7 +713,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object) } - c.processEvent(event.Type, obj) + c.recordEvent(event.Type, obj) if kube.IsCRD(obj) { var resources []kube.APIResourceInfo crd := v1.CustomResourceDefinition{} @@ -823,11 +867,13 @@ func (c *clusterCache) sync() error { for i := range c.apisMeta { c.apisMeta[i].watchCancel() } + c.invalidateEventMeta() c.apisMeta = make(map[schema.GroupKind]*apiMeta) c.resources = make(map[kube.ResourceKey]*Resource) c.namespacedResources = make(map[schema.GroupKind]bool) config := c.config version, err := c.kubectl.GetServerVersion(config) + c.eventMetaCh = make(chan eventMeta) if err != nil { return err @@ -864,6 +910,8 @@ func (c *clusterCache) sync() error { return err } + go c.processEvents() + // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} err = kube.RunAllAsync(len(apis), func(i int) error { @@ -926,6 +974,14 @@ func (c *clusterCache) sync() error { return nil } +// invalidateEventMeta closes the eventMeta channel if it is open +func (c *clusterCache) invalidateEventMeta() { + if c.eventMetaCh != nil { + close(c.eventMetaCh) + c.eventMetaCh = nil + } +} + // EnsureSynced checks cache state and synchronizes it if necessary func (c *clusterCache) EnsureSynced() error { syncStatus := &c.syncStatus @@ -1231,7 +1287,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure return managedObjs, nil } -func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unstructured) { +func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) { for _, h := range c.getEventHandlers() { h(event, un) } @@ -1240,16 +1296,65 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst return } + c.eventMetaCh <- eventMeta{event, un} +} + +func (c *clusterCache) processEvents() { + log := c.log.WithValues("fn", "processItems") + log.V(1).Info("Start processing events") + c.lock.Lock() - defer c.lock.Unlock() - existingNode, exists := c.resources[key] - if event == watch.Deleted { - if exists { - c.onNodeRemoved(key) + ch := c.eventMetaCh + c.lock.Unlock() + + eventMetas := make([]eventMeta, 0) + ticker := time.NewTicker(c.eventProcessingInterval) + defer ticker.Stop() + + for { + select { + case em, ok := <-ch: + if !ok { + log.V(1).Info("Event processing channel closed, finish processing") + return + } + eventMetas = append(eventMetas, em) + case <-ticker.C: + if len(eventMetas) > 0 { + c.processEventsBatch(eventMetas) + eventMetas = eventMetas[:0] + } + } + } +} + +func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) { + log := c.log.WithValues("fn", "processEventsBatch") + start := time.Now() + c.lock.Lock() + log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds()) + defer func() { + c.lock.Unlock() + duration := time.Since(start) + // Update the metric with the duration of the events processing + for _, handler := range c.getProcessEventsHandlers() { + handler(duration, len(eventMetas)) + } + }() + + for _, em := range eventMetas { + key := kube.GetResourceKey(em.un) + existingNode, exists := c.resources[key] + if em.event == watch.Deleted { + if exists { + c.onNodeRemoved(key) + } + } else { + c.onNodeUpdated(existingNode, c.newResource(em.un)) } - } else if event != watch.Deleted { - c.onNodeUpdated(existingNode, c.newResource(un)) } + + log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds()) } func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 26815c07e..721725f03 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -75,7 +75,11 @@ var ( ) func newCluster(t testing.TB, objs ...runtime.Object) *clusterCache { - cache := newClusterWithOptions(t, []UpdateSettingsFunc{}, objs...) + var opts []UpdateSettingsFunc + opts = append(opts, func(c *clusterCache) { + c.eventProcessingInterval = 1 * time.Millisecond + }) + cache := newClusterWithOptions(t, opts, objs...) t.Cleanup(func() { cache.Invalidate() @@ -205,34 +209,15 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, }) + cluster.recordEvent(watch.Added, pvc) - cluster.processEvent(watch.Added, pvc) - - cluster.lock.Lock() - defer cluster.lock.Unlock() - - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - - assert.Len(t, refs, 0) - }) - - t.Run("STSTemplateNameNotMatching", func(t *testing.T) { - cluster := newCluster(t, sts) - err := cluster.EnsureSynced() - require.NoError(t, err) - - pvc := mustToUnstructured(&v1.PersistentVolumeClaim{ - TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, - ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, - }) - cluster.processEvent(watch.Added, pvc) - - cluster.lock.Lock() - defer cluster.lock.Unlock() - - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + require.Eventually(t, func() bool { + cluster.lock.Lock() + defer cluster.lock.Unlock() - assert.Len(t, refs, 0) + refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + return len(refs) == 0 + }, 1*time.Second, 10*time.Millisecond, "Expected PVC to not have owner reference") }) t.Run("MatchingSTSExists", func(t *testing.T) { @@ -244,14 +229,15 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, }) - cluster.processEvent(watch.Added, pvc) + cluster.recordEvent(watch.Added, pvc) - cluster.lock.Lock() - defer cluster.lock.Unlock() + require.Eventually(t, func() bool { + cluster.lock.Lock() + defer cluster.lock.Unlock() - refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs - - assert.ElementsMatch(t, refs, []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}) + refs := cluster.resources[kube.GetResourceKey(pvc)].OwnerRefs + return assert.ElementsMatch(t, refs, []metav1.OwnerReference{{APIVersion: "apps/v1", Kind: kube.StatefulSetKind, Name: "web", UID: "123"}}) + }, 1*time.Second, 10*time.Millisecond, "Expected PVC to have owner reference") }) } @@ -596,10 +582,12 @@ func TestChildDeletedEvent(t *testing.T) { err := cluster.EnsureSynced() require.NoError(t, err) - cluster.processEvent(watch.Deleted, mustToUnstructured(testPod1())) + cluster.recordEvent(watch.Deleted, mustToUnstructured(testPod1())) - rsChildren := getChildren(cluster, mustToUnstructured(testRS())) - assert.Equal(t, []*Resource{}, rsChildren) + require.Eventually(t, func() bool { + rsChildren := getChildren(cluster, mustToUnstructured(testRS())) + return assert.Equal(t, []*Resource{}, rsChildren) + }, 1*time.Second, 10*time.Millisecond, "Expected no children for ReplicaSet") } func TestProcessNewChildEvent(t *testing.T) { @@ -620,46 +608,49 @@ func TestProcessNewChildEvent(t *testing.T) { uid: "2" resourceVersion: "123"`) - cluster.processEvent(watch.Added, newPod) + cluster.recordEvent(watch.Added, newPod) + + require.Eventually(t, func() bool { + rsChildren := getChildren(cluster, mustToUnstructured(testRS())) + sort.Slice(rsChildren, func(i, j int) bool { + return strings.Compare(rsChildren[i].Ref.Name, rsChildren[j].Ref.Name) < 0 + }) + return assert.Equal(t, []*Resource{{ + Ref: corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "helm-guestbook-pod-1", + APIVersion: "v1", + UID: "1", + }, + OwnerRefs: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + UID: "2", + }}, + ResourceVersion: "123", + CreationTimestamp: &metav1.Time{ + Time: testCreationTime.Local(), + }, + }, { + Ref: corev1.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "helm-guestbook-pod-1-new", + APIVersion: "v1", + UID: "5", + }, + OwnerRefs: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + UID: "2", + }}, + ResourceVersion: "123", + }}, rsChildren) + }, 1*time.Second, 10*time.Millisecond, "Expected new child to be added to ReplicaSet") - rsChildren := getChildren(cluster, mustToUnstructured(testRS())) - sort.Slice(rsChildren, func(i, j int) bool { - return strings.Compare(rsChildren[i].Ref.Name, rsChildren[j].Ref.Name) < 0 - }) - assert.Equal(t, []*Resource{{ - Ref: corev1.ObjectReference{ - Kind: "Pod", - Namespace: "default", - Name: "helm-guestbook-pod-1", - APIVersion: "v1", - UID: "1", - }, - OwnerRefs: []metav1.OwnerReference{{ - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: "helm-guestbook-rs", - UID: "2", - }}, - ResourceVersion: "123", - CreationTimestamp: &metav1.Time{ - Time: testCreationTime.Local(), - }, - }, { - Ref: corev1.ObjectReference{ - Kind: "Pod", - Namespace: "default", - Name: "helm-guestbook-pod-1-new", - APIVersion: "v1", - UID: "5", - }, - OwnerRefs: []metav1.OwnerReference{{ - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: "helm-guestbook-rs", - UID: "2", - }}, - ResourceVersion: "123", - }}, rsChildren) } func TestWatchCacheUpdated(t *testing.T) { diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index c9fbc8f9c..f997493ad 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. +// Code generated by mockery v2.46.2. DO NOT EDIT. package mocks @@ -262,6 +262,26 @@ func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe return r0 } +// OnProcessEventsHandler provides a mock function with given fields: handler +func (_m *ClusterCache) OnProcessEventsHandler(handler cache.OnProcessEventsHandler) cache.Unsubscribe { + ret := _m.Called(handler) + + if len(ret) == 0 { + panic("no return value specified for OnProcessEventsHandler") + } + + var r0 cache.Unsubscribe + if rf, ok := ret.Get(0).(func(cache.OnProcessEventsHandler) cache.Unsubscribe); ok { + r0 = rf(handler) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cache.Unsubscribe) + } + } + + return r0 +} + // OnResourceUpdated provides a mock function with given fields: handler func (_m *ClusterCache) OnResourceUpdated(handler cache.OnResourceUpdatedHandler) cache.Unsubscribe { ret := _m.Called(handler)