Skip to content

Commit

Permalink
Reduce deadlock in controller (#2349) (#2368)
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Mar 10, 2023
1 parent 79948b2 commit 715cbfd
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fixes a agent not starting monitoring new pods until the restart

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
description: Modifies synchronization and debounce mechanism in controller.
Notification is not blocking anymore so lock holded while notification is being performed can be released and other event processed faster.

# Affected component; a word indicating the component this changeset affects.
component:

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 2349

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2269
115 changes: 78 additions & 37 deletions internal/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error)

return &controller{
logger: l,
ch: make(chan []*transpiler.Vars),
ch: make(chan []*transpiler.Vars, 1),
errCh: make(chan error),
contextProviders: contextProviders,
dynamicProviders: dynamicProviders,
Expand All @@ -106,7 +106,7 @@ func (c *controller) Run(ctx context.Context) error {
c.logger.Debugf("Starting controller for composable inputs")
defer c.logger.Debugf("Stopped controller for composable inputs")

notify := make(chan bool)
notify := make(chan bool, 1) // sized so we can store 1 notification or proceed
localCtx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -148,43 +148,56 @@ func (c *controller) Run(ctx context.Context) error {

c.logger.Debugf("Started controller for composable inputs")

// performs debounce of notifies; accumulates them into 100 millisecond chunks
t := time.NewTimer(100 * time.Millisecond)
cleanupFn := func() {
c.logger.Debugf("Stopping controller for composable inputs")
t.Stop()
cancel()

// wait for all providers to stop (but its possible they still send notifications over notify
// channel, and we cannot block them sending)
emptyChan, emptyCancel := context.WithCancel(context.Background())
defer emptyCancel()
go func() {
for {
select {
case <-emptyChan.Done():
return
case <-notify:
}
}
}()

close(c.ch)
wg.Wait()
}

// performs debounce of notifies; accumulates them into 100 millisecond chunks
for {
DEBOUNCE:
for {
select {
case <-ctx.Done():
c.logger.Debugf("Stopping controller for composable inputs")
t.Stop()
cancel()

// wait for all providers to stop (but its possible they still send notifications over notify
// channel, and we cannot block them sending)
emptyChan, emptyCancel := context.WithCancel(context.Background())
defer emptyCancel()
go func() {
for {
select {
case <-emptyChan.Done():
return
case <-notify:
}
}
}()

close(c.ch)
wg.Wait()
cleanupFn()
return ctx.Err()
case <-notify:
t.Reset(100 * time.Millisecond)
c.logger.Debugf("Variable state changed for composable inputs; debounce started")
drainChan(notify)
case <-t.C:
break DEBOUNCE
}
}

// notification received, wait for batch
select {
case <-ctx.Done():
cleanupFn()
return ctx.Err()
case <-t.C:
drainChan(notify)
// batching done, gather results
}

c.logger.Debugf("Computing new variable state for composable inputs")

// build the vars list of mappings
Expand All @@ -208,10 +221,22 @@ func (c *controller) Run(ctx context.Context) error {
}
}

select {
case c.ch <- vars:
case <-ctx.Done():
// coordinator is handling cancellation it won't drain the channel
UPDATEVARS:
for {
select {
case c.ch <- vars:
break UPDATEVARS
case <-ctx.Done():
// coordinator is handling cancellation it won't drain the channel
default:
// c.ch is size of 1, nothing is reading and there's already a signal
select {
case <-c.ch:
// Vars not pushed, cleaning channel
default:
// already read
}
}
}
}
}
Expand Down Expand Up @@ -256,7 +281,11 @@ func (c *contextProviderState) Set(mapping map[string]interface{}) error {
return nil
}
c.mapping = mapping
c.signal <- true

select {
case c.signal <- true:
default:
}
return nil
}

Expand All @@ -278,7 +307,7 @@ type dynamicProviderState struct {
context.Context

provider DynamicProvider
lock sync.RWMutex
lock sync.Mutex
mappings map[string]dynamicProviderMapping
signal chan bool
}
Expand Down Expand Up @@ -317,7 +346,11 @@ func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[
mapping: mapping,
processors: processors,
}
c.signal <- true

select {
case c.signal <- true:
default:
}
return nil
}

Expand All @@ -329,32 +362,40 @@ func (c *dynamicProviderState) Remove(id string) {
if exists {
// existed; remove and signal
delete(c.mappings, id)
c.signal <- true

select {
case c.signal <- true:
default:
}
}
}

// Mappings returns the current mappings.
func (c *dynamicProviderState) Mappings() []dynamicProviderMapping {
c.lock.RLock()
defer c.lock.RUnlock()
c.lock.Lock()
originalMapping := make(map[string]dynamicProviderMapping)
for k, v := range c.mappings {
originalMapping[k] = v
}
c.lock.Unlock()

// add the mappings sorted by (priority,id)
mappings := make([]dynamicProviderMapping, 0)
priorities := make([]int, 0)
for _, mapping := range c.mappings {
for _, mapping := range originalMapping {
priorities = addToSet(priorities, mapping.priority)
}
sort.Ints(priorities)
for _, priority := range priorities {
ids := make([]string, 0)
for name, mapping := range c.mappings {
for name, mapping := range originalMapping {
if mapping.priority == priority {
ids = append(ids, name)
}
}
sort.Strings(ids)
for _, name := range ids {
mappings = append(mappings, c.mappings[name])
mappings = append(mappings, originalMapping[name])
}
}
return mappings
Expand Down

0 comments on commit 715cbfd

Please sign in to comment.