From 715cbfd6168a8b4eb82f2430751b602ffe2d7dcc Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 10 Mar 2023 17:10:18 -0500 Subject: [PATCH] Reduce deadlock in controller (#2349) (#2368) --- ...78226-Reduced-deadlocks-in-controller.yaml | 32 +++++ internal/pkg/composable/controller.go | 115 ++++++++++++------ 2 files changed, 110 insertions(+), 37 deletions(-) create mode 100644 changelog/fragments/1678178226-Reduced-deadlocks-in-controller.yaml diff --git a/changelog/fragments/1678178226-Reduced-deadlocks-in-controller.yaml b/changelog/fragments/1678178226-Reduced-deadlocks-in-controller.yaml new file mode 100644 index 00000000000..880ddc00ea3 --- /dev/null +++ b/changelog/fragments/1678178226-Reduced-deadlocks-in-controller.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Fixes a agent not starting monitoring new pods until the restart + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +description: Modifies synchronization and debounce mechanism in controller. +Notification is not blocking anymore so lock holded while notification is being performed can be released and other event processed faster. + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR number; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 2349 + +# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: 2269 diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 9fed9c14c7f..4c736bb7d0f 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -94,7 +94,7 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) return &controller{ logger: l, - ch: make(chan []*transpiler.Vars), + ch: make(chan []*transpiler.Vars, 1), errCh: make(chan error), contextProviders: contextProviders, dynamicProviders: dynamicProviders, @@ -106,7 +106,7 @@ func (c *controller) Run(ctx context.Context) error { c.logger.Debugf("Starting controller for composable inputs") defer c.logger.Debugf("Stopped controller for composable inputs") - notify := make(chan bool) + notify := make(chan bool, 1) // sized so we can store 1 notification or proceed localCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -148,43 +148,56 @@ func (c *controller) Run(ctx context.Context) error { c.logger.Debugf("Started controller for composable inputs") - // performs debounce of notifies; accumulates them into 100 millisecond chunks t := time.NewTimer(100 * time.Millisecond) + cleanupFn := func() { + c.logger.Debugf("Stopping controller for composable inputs") + t.Stop() + cancel() + + // wait for all providers to stop (but its possible they still send notifications over notify + // channel, and we cannot block them sending) + emptyChan, emptyCancel := context.WithCancel(context.Background()) + defer emptyCancel() + go func() { + for { + select { + case <-emptyChan.Done(): + return + case <-notify: + } + } + }() + + close(c.ch) + wg.Wait() + } + + // performs debounce of notifies; accumulates them into 100 millisecond chunks for { DEBOUNCE: for { select { case <-ctx.Done(): - c.logger.Debugf("Stopping controller for composable inputs") - t.Stop() - cancel() - - // wait for all providers to stop (but its possible they still send notifications over notify - // channel, and we cannot block them sending) - emptyChan, emptyCancel := context.WithCancel(context.Background()) - defer emptyCancel() - go func() { - for { - select { - case <-emptyChan.Done(): - return - case <-notify: - } - } - }() - - close(c.ch) - wg.Wait() + cleanupFn() return ctx.Err() case <-notify: t.Reset(100 * time.Millisecond) c.logger.Debugf("Variable state changed for composable inputs; debounce started") drainChan(notify) - case <-t.C: break DEBOUNCE } } + // notification received, wait for batch + select { + case <-ctx.Done(): + cleanupFn() + return ctx.Err() + case <-t.C: + drainChan(notify) + // batching done, gather results + } + c.logger.Debugf("Computing new variable state for composable inputs") // build the vars list of mappings @@ -208,10 +221,22 @@ func (c *controller) Run(ctx context.Context) error { } } - select { - case c.ch <- vars: - case <-ctx.Done(): - // coordinator is handling cancellation it won't drain the channel + UPDATEVARS: + for { + select { + case c.ch <- vars: + break UPDATEVARS + case <-ctx.Done(): + // coordinator is handling cancellation it won't drain the channel + default: + // c.ch is size of 1, nothing is reading and there's already a signal + select { + case <-c.ch: + // Vars not pushed, cleaning channel + default: + // already read + } + } } } } @@ -256,7 +281,11 @@ func (c *contextProviderState) Set(mapping map[string]interface{}) error { return nil } c.mapping = mapping - c.signal <- true + + select { + case c.signal <- true: + default: + } return nil } @@ -278,7 +307,7 @@ type dynamicProviderState struct { context.Context provider DynamicProvider - lock sync.RWMutex + lock sync.Mutex mappings map[string]dynamicProviderMapping signal chan bool } @@ -317,7 +346,11 @@ func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[ mapping: mapping, processors: processors, } - c.signal <- true + + select { + case c.signal <- true: + default: + } return nil } @@ -329,32 +362,40 @@ func (c *dynamicProviderState) Remove(id string) { if exists { // existed; remove and signal delete(c.mappings, id) - c.signal <- true + + select { + case c.signal <- true: + default: + } } } // Mappings returns the current mappings. func (c *dynamicProviderState) Mappings() []dynamicProviderMapping { - c.lock.RLock() - defer c.lock.RUnlock() + c.lock.Lock() + originalMapping := make(map[string]dynamicProviderMapping) + for k, v := range c.mappings { + originalMapping[k] = v + } + c.lock.Unlock() // add the mappings sorted by (priority,id) mappings := make([]dynamicProviderMapping, 0) priorities := make([]int, 0) - for _, mapping := range c.mappings { + for _, mapping := range originalMapping { priorities = addToSet(priorities, mapping.priority) } sort.Ints(priorities) for _, priority := range priorities { ids := make([]string, 0) - for name, mapping := range c.mappings { + for name, mapping := range originalMapping { if mapping.priority == priority { ids = append(ids, name) } } sort.Strings(ids) for _, name := range ids { - mappings = append(mappings, c.mappings[name]) + mappings = append(mappings, originalMapping[name]) } } return mappings