Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce deadlock in controller #2349

Merged
merged 7 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fixes a agent not starting monitoring new pods until the restart

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
description: Modifies synchronization and debounce mechanism in controller.
Notification is not blocking anymore so lock holded while notification is being performed can be released and other event processed faster.

# Affected component; a word indicating the component this changeset affects.
component:

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 2349

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2269
115 changes: 78 additions & 37 deletions internal/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error)

return &controller{
logger: l,
ch: make(chan []*transpiler.Vars),
ch: make(chan []*transpiler.Vars, 1),
errCh: make(chan error),
contextProviders: contextProviders,
dynamicProviders: dynamicProviders,
Expand All @@ -106,7 +106,7 @@ func (c *controller) Run(ctx context.Context) error {
c.logger.Debugf("Starting controller for composable inputs")
defer c.logger.Debugf("Stopped controller for composable inputs")

notify := make(chan bool)
notify := make(chan bool, 1) // sized so we can store 1 notification or proceed
localCtx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -148,43 +148,56 @@ func (c *controller) Run(ctx context.Context) error {

c.logger.Debugf("Started controller for composable inputs")

// performs debounce of notifies; accumulates them into 100 millisecond chunks
t := time.NewTimer(100 * time.Millisecond)
cleanupFn := func() {
c.logger.Debugf("Stopping controller for composable inputs")
t.Stop()
cancel()

// wait for all providers to stop (but its possible they still send notifications over notify
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
// channel, and we cannot block them sending)
emptyChan, emptyCancel := context.WithCancel(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure what's the purpose of having a cancellable context when we call cancel upon leaving the function scope...
Is it to try and consume as many notifications as possible just up to the last possible moment once the providers signal that they are done?
If that is the idea, wouldn't using a simple channel to stop the consuming goroutine suffice instead of creating a context ?

(It was already present in the previous version so it's not critical for the fix, just food for thought)

defer emptyCancel()
go func() {
for {
select {
case <-emptyChan.Done():
return
case <-notify:
}
}
}()

close(c.ch)
wg.Wait()
}

// performs debounce of notifies; accumulates them into 100 millisecond chunks
for {
DEBOUNCE:
for {
select {
case <-ctx.Done():
c.logger.Debugf("Stopping controller for composable inputs")
t.Stop()
cancel()

// wait for all providers to stop (but its possible they still send notifications over notify
// channel, and we cannot block them sending)
emptyChan, emptyCancel := context.WithCancel(context.Background())
defer emptyCancel()
go func() {
for {
select {
case <-emptyChan.Done():
return
case <-notify:
}
}
}()

close(c.ch)
wg.Wait()
cleanupFn()
return ctx.Err()
case <-notify:
t.Reset(100 * time.Millisecond)
c.logger.Debugf("Variable state changed for composable inputs; debounce started")
drainChan(notify)
case <-t.C:
break DEBOUNCE
}
}

// notification received, wait for batch
select {
case <-ctx.Done():
cleanupFn()
return ctx.Err()
case <-t.C:
drainChan(notify)
// batching done, gather results
}

c.logger.Debugf("Computing new variable state for composable inputs")

// build the vars list of mappings
Expand All @@ -208,10 +221,22 @@ func (c *controller) Run(ctx context.Context) error {
}
}

select {
case c.ch <- vars:
case <-ctx.Done():
// coordinator is handling cancellation it won't drain the channel
UPDATEVARS:
for {
select {
case c.ch <- vars:
break UPDATEVARS
case <-ctx.Done():
// coordinator is handling cancellation it won't drain the channel
default:
// c.ch is size of 1, nothing is reading and there's already a signal
select {
case <-c.ch:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a strange pattern: we are consuming from the same channel we are producing on because we didn't manage to write another value.
I understand why it's here but this shows that we have some big sync issues if we have to solve them this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree with refactoring option as a followup. i'd rather have notifications in a form of .net events where producer does not need to consider readers.
as we talked about, i wanted this PR to be as least intrusive as possible so we can spend proper time with refactor without time pressure from production issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an issue for the refactor so we don't forget to do it please :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the possibility that this results in a vars set being lost? Confused on why this is really needed? There should always be a read and if there is not a reader the controller should really block to ensure that all vars are read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea behind is that when controller is handling something else this will become blocking,
when this is blocked no new vars are being processed and stored in mappings because we have a loop stuck in here. so this helps controller to get most recent mappings based on actual events

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just worry that if we empty the channel like it is here, then the reader of the channel starts reading again the set of variables will be removed and then it will not get the variables from the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not happen when we drain we dont cancel a loop and will push vars again in next cycle

// Vars not pushed, cleaning channel
default:
// already read
}
}
}
}
}
Expand Down Expand Up @@ -256,7 +281,11 @@ func (c *contextProviderState) Set(mapping map[string]interface{}) error {
return nil
}
c.mapping = mapping
c.signal <- true

select {
case c.signal <- true:
default:
}
return nil
}

Expand All @@ -278,7 +307,7 @@ type dynamicProviderState struct {
context.Context

provider DynamicProvider
lock sync.RWMutex
lock sync.Mutex
mappings map[string]dynamicProviderMapping
signal chan bool
}
Expand Down Expand Up @@ -317,7 +346,11 @@ func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[
mapping: mapping,
processors: processors,
}
c.signal <- true

select {
case c.signal <- true:
default:
}
return nil
}

Expand All @@ -329,32 +362,40 @@ func (c *dynamicProviderState) Remove(id string) {
if exists {
// existed; remove and signal
delete(c.mappings, id)
c.signal <- true

select {
case c.signal <- true:
default:
}
}
}

// Mappings returns the current mappings.
func (c *dynamicProviderState) Mappings() []dynamicProviderMapping {
c.lock.RLock()
defer c.lock.RUnlock()
c.lock.Lock()
originalMapping := make(map[string]dynamicProviderMapping)
for k, v := range c.mappings {
originalMapping[k] = v
}
c.lock.Unlock()

// add the mappings sorted by (priority,id)
mappings := make([]dynamicProviderMapping, 0)
priorities := make([]int, 0)
for _, mapping := range c.mappings {
for _, mapping := range originalMapping {
priorities = addToSet(priorities, mapping.priority)
}
sort.Ints(priorities)
for _, priority := range priorities {
ids := make([]string, 0)
for name, mapping := range c.mappings {
for name, mapping := range originalMapping {
if mapping.priority == priority {
ids = append(ids, name)
}
}
sort.Strings(ids)
for _, name := range ids {
mappings = append(mappings, c.mappings[name])
mappings = append(mappings, originalMapping[name])
}
}
return mappings
Expand Down