Skip to content

Commit

Permalink
fix: avoid resources lock contention utilizing channel
Browse files Browse the repository at this point in the history
Signed-off-by: Mykola Pelekh <mpelekh@demonware.net>
  • Loading branch information
mpelekh committed Oct 10, 2024
1 parent 72bcdda commit db0f61d
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 85 deletions.
123 changes: 114 additions & 9 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
// Limit is required to avoid memory spikes during cache initialization.
// The default limit of 50 is chosen based on experiments.
defaultListSemaphoreWeight = 50
// defaultEventProcessingInterval is the default interval for processing events
defaultEventProcessingInterval = 1 * time.Second
)

const (
Expand All @@ -75,6 +77,11 @@ type apiMeta struct {
watchCancel context.CancelFunc
}

type eventMeta struct {
event watch.EventType
un *unstructured.Unstructured
}

// ClusterInfo holds cluster cache stats
type ClusterInfo struct {
// Server holds cluster API server URL
Expand All @@ -96,6 +103,9 @@ type ClusterInfo struct {
// OnEventHandler is a function that handles Kubernetes event
type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured)

// OnProcessEventsHandler handles process events event
type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int)

// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool)

Expand Down Expand Up @@ -137,6 +147,8 @@ type ClusterCache interface {
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
// OnEvent register event handler that is executed every time when new K8S event received
OnEvent(handler OnEventHandler) Unsubscribe
// OnProcessEventsHandler register event handler that is executed every time when events were processed
OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe
}

type WeightedSemaphore interface {
Expand All @@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
cache := &clusterCache{
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
apisMeta: make(map[schema.GroupKind]*apiMeta),
eventMetaCh: nil,
listPageSize: defaultListPageSize,
listPageBufferSize: defaultListPageBufferSize,
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
Expand All @@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
},
watchResyncTimeout: defaultWatchResyncTimeout,
clusterSyncRetryTimeout: ClusterRetryTimeout,
eventProcessingInterval: defaultEventProcessingInterval,
resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
eventHandlers: map[uint64]OnEventHandler{},
processEventsHandlers: map[uint64]OnProcessEventsHandler{},
log: log,
listRetryLimit: 1,
listRetryUseBackoff: false,
Expand All @@ -186,6 +201,7 @@ type clusterCache struct {
syncStatus clusterCacheSync

apisMeta map[schema.GroupKind]*apiMeta
eventMetaCh chan eventMeta
serverVersion string
apiResources []kube.APIResourceInfo
// namespacedResources is a simple map which indicates a groupKind is namespaced
Expand All @@ -195,6 +211,8 @@ type clusterCache struct {
watchResyncTimeout time.Duration
// sync retry timeout for cluster when sync error happens
clusterSyncRetryTimeout time.Duration
// ticker interval for events processing
eventProcessingInterval time.Duration

// size of a page for list operations pager.
listPageSize int64
Expand Down Expand Up @@ -224,6 +242,7 @@ type clusterCache struct {
populateResourceInfoHandler OnPopulateResourceInfoHandler
resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler
eventHandlers map[uint64]OnEventHandler
processEventsHandlers map[uint64]OnProcessEventsHandler
openAPISchema openapi.Resources
gvkParser *managedfields.GvkParser

Expand Down Expand Up @@ -299,6 +318,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler {
return handlers
}

// OnProcessEventsHandler register event handler that is executed every time when events were processed
func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
key := c.handlerKey
c.handlerKey++
c.processEventsHandlers[key] = handler
return func() {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
delete(c.processEventsHandlers, key)
}
}
func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers))
for _, h := range c.processEventsHandlers {
handlers = append(handlers, h)
}
return handlers
}

// GetServerVersion returns observed cluster version
func (c *clusterCache) GetServerVersion() string {
return c.serverVersion
Expand Down Expand Up @@ -440,6 +482,8 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
for i := range opts {
opts[i](c)
}

c.invalidateEventMeta()
c.apisMeta = nil
c.namespacedResources = nil
c.log.Info("Invalidated cluster")
Expand Down Expand Up @@ -669,7 +713,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
}

c.processEvent(event.Type, obj)
c.recordEvent(event.Type, obj)
if kube.IsCRD(obj) {
var resources []kube.APIResourceInfo
crd := v1.CustomResourceDefinition{}
Expand Down Expand Up @@ -823,11 +867,13 @@ func (c *clusterCache) sync() error {
for i := range c.apisMeta {
c.apisMeta[i].watchCancel()
}
c.invalidateEventMeta()
c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.resources = make(map[kube.ResourceKey]*Resource)
c.namespacedResources = make(map[schema.GroupKind]bool)
config := c.config
version, err := c.kubectl.GetServerVersion(config)
c.eventMetaCh = make(chan eventMeta)

if err != nil {
return err
Expand Down Expand Up @@ -864,6 +910,8 @@ func (c *clusterCache) sync() error {
return err
}

go c.processEvents()

// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
Expand Down Expand Up @@ -926,6 +974,14 @@ func (c *clusterCache) sync() error {
return nil
}

// invalidateEventMeta closes the eventMeta channel if it is open
func (c *clusterCache) invalidateEventMeta() {
if c.eventMetaCh != nil {
close(c.eventMetaCh)
c.eventMetaCh = nil
}
}

// EnsureSynced checks cache state and synchronizes it if necessary
func (c *clusterCache) EnsureSynced() error {
syncStatus := &c.syncStatus
Expand Down Expand Up @@ -1231,7 +1287,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
return managedObjs, nil
}

func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unstructured) {
func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) {
for _, h := range c.getEventHandlers() {
h(event, un)
}
Expand All @@ -1240,16 +1296,65 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
return
}

c.eventMetaCh <- eventMeta{event, un}
}

func (c *clusterCache) processEvents() {
log := c.log.WithValues("fn", "processItems")
log.V(1).Info("Start processing events")

c.lock.Lock()
defer c.lock.Unlock()
existingNode, exists := c.resources[key]
if event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
ch := c.eventMetaCh
c.lock.Unlock()

eventMetas := make([]eventMeta, 0)
ticker := time.NewTicker(c.eventProcessingInterval)
defer ticker.Stop()

for {
select {
case em, ok := <-ch:
if !ok {
log.V(1).Info("Event processing channel closed, finish processing")
return
}
eventMetas = append(eventMetas, em)
case <-ticker.C:
if len(eventMetas) > 0 {
c.processEventsBatch(eventMetas)
eventMetas = eventMetas[:0]
}
}
}
}

func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) {
log := c.log.WithValues("fn", "processEventsBatch")
start := time.Now()
c.lock.Lock()
log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds())
defer func() {
c.lock.Unlock()
duration := time.Since(start)
// Update the metric with the duration of the events processing
for _, handler := range c.getProcessEventsHandlers() {
handler(duration, len(eventMetas))
}
}()

for _, em := range eventMetas {
key := kube.GetResourceKey(em.un)
existingNode, exists := c.resources[key]
if em.event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
}
} else {
c.onNodeUpdated(existingNode, c.newResource(em.un))
}
} else if event != watch.Deleted {
c.onNodeUpdated(existingNode, c.newResource(un))
}

log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds())
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
Expand Down
Loading

0 comments on commit db0f61d

Please sign in to comment.