Skip to content

Commit

Permalink
Fix agent-injection instability for workloads with inject annotation.
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Hallgren <thomas@tada.se>
  • Loading branch information
thallgren committed Dec 21, 2024
1 parent cc2c2c5 commit 340c290
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 30 deletions.
18 changes: 10 additions & 8 deletions cmd/traffic/cmd/manager/mutator/agent_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ
if err != nil {
return nil, err
}

a.agentConfigs.BlacklistPrefix(pod.Name, pod.Namespace)
defer a.agentConfigs.WhitelistPrefix(pod.Name, pod.Namespace)

if isDelete {
a.agentConfigs.Blacklist(pod.Name, pod.Namespace)
return nil, nil
Expand Down Expand Up @@ -165,15 +169,13 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ
switch {
case err != nil:
return nil, err
case scx == nil && ia == "enabled":
// A race condition may occur when a workload with "enabled" is applied.
// The workload event handler will create the agent config, but the webhook injection call may arrive before
// that agent config has been stored.
// Returning an error here will make the webhook call again, and hopefully we're the agent config is ready
// by then.
dlog.Debugf(ctx, "No agent config has been generated for annotation enabled %s.%s", pod.Name, pod.Namespace)
return nil, errors.New("agent-config is not yet generated")
case scx == nil:
if ia == "enabled" {
// A race condition may occur when a workload with "enabled" is applied.
// The workload event handler will create the agent config, but the webhook injection call may arrive before
// that agent config has been stored.
dlog.Debugf(ctx, "No agent config has been generated for annotation enabled %s.%s", pod.Name, pod.Namespace)
}
return nil, nil
case scx.AgentConfig().Manual:
dlog.Debugf(ctx, "Skipping webhook where agent is manually injected %s.%s", pod.Name, pod.Namespace)
Expand Down
66 changes: 44 additions & 22 deletions cmd/traffic/cmd/manager/mutator/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Map interface {
OnDelete(context.Context, string, string) error
DeleteMapsAndRolloutAll(ctx context.Context)
Blacklist(podName, namespace string)
BlacklistPrefix(podNamePrefix, namespace string)
WhitelistPrefix(podNamePrefix, namespace string)
Whitelist(podName, namespace string)
IsBlacklisted(podName, namespace string) bool
DisableRollouts()
Expand Down Expand Up @@ -98,15 +100,6 @@ func (c *configWatcher) isRolloutNeeded(ctx context.Context, wl k8sapi.Workload,
if wl.GetDeletionTimestamp() != nil {
return false
}
if ia, ok := podMeta.GetAnnotations()[agentconfig.InjectAnnotation]; ok {
// Annotation controls injection, so no explicit rollout is needed unless the deployment was added before the traffic-manager.
// If the annotation changes, there will be an implicit rollout anyway.
if wl.GetCreationTimestamp().After(c.startedAt) {
dlog.Debugf(ctx, "Rollout of %s.%s is not necessary. Pod template has inject annotation %s",
wl.GetName(), wl.GetNamespace(), ia)
return false
}
}
podLabels := podMeta.GetLabels()
if len(podLabels) == 0 {
// Have never seen this, but if it happens, then rollout only if an agent is desired
Expand Down Expand Up @@ -419,12 +412,13 @@ type workloadKey struct {
}

type configWatcher struct {
cancel context.CancelFunc
rolloutLocks *xsync.MapOf[workloadKey, *sync.Mutex]
nsLocks *xsync.MapOf[string, *sync.RWMutex]
blacklistedPods *xsync.MapOf[string, time.Time]
startedAt time.Time
rolloutDisabled bool
cancel context.CancelFunc
rolloutLocks *xsync.MapOf[workloadKey, *sync.Mutex]
nsLocks *xsync.MapOf[string, *sync.RWMutex]
blacklistedPods *xsync.MapOf[string, time.Time]
blacklistedPrefixes *xsync.MapOf[string, time.Time]
startedAt time.Time
rolloutDisabled bool

cms []cache.SharedIndexInformer
svs []cache.SharedIndexInformer
Expand All @@ -441,20 +435,47 @@ type configWatcher struct {
// time when a pod is deleted and its agent announces its departure during which the pod must be
// considered inactive.
func (c *configWatcher) Blacklist(podName, namespace string) {
c.blacklistedPods.Store(podName+"."+namespace, time.Now())
c.blacklistedPods.Store(namespace+"."+podName, time.Now())
}

func (c *configWatcher) Whitelist(podName, namespace string) {
c.blacklistedPods.Delete(podName + "." + namespace)
c.blacklistedPods.Delete(namespace + "." + podName)
}

func (c *configWatcher) BlacklistPrefix(podNamePrefix, namespace string) {
c.blacklistedPrefixes.Store(namespace+"."+podNamePrefix, time.Now())
}

func (c *configWatcher) WhitelistPrefix(podNamePrefix, namespace string) {
const elapsed = time.Second
time.AfterFunc(elapsed, func() {
// Don't delete entries that might have been added after this entry was whitelisted.
c.blacklistedPrefixes.Compute(namespace+"."+podNamePrefix, func(v time.Time, loaded bool) (time.Time, bool) {
if loaded {
return v, time.Since(v) >= elapsed
}
return v, true
})
})
}

func (c *configWatcher) DisableRollouts() {
c.rolloutDisabled = true
}

func (c *configWatcher) IsBlacklisted(podName, namespace string) bool {
_, ok := c.blacklistedPods.Load(podName + "." + namespace)
return ok
key := namespace + "." + podName
_, yes := c.blacklistedPods.Load(key)
if !yes {
c.blacklistedPrefixes.Range(func(pfx string, when time.Time) bool {
if strings.HasPrefix(key, pfx) {
yes = time.Since(when) < time.Second
return false
}
return true
})
}
return yes
}

func (c *configWatcher) Delete(ctx context.Context, name, namespace string) error {
Expand Down Expand Up @@ -513,9 +534,10 @@ func (c *configWatcher) Update(ctx context.Context, namespace string, updater fu

func NewWatcher(namespaces ...string) Map {
w := &configWatcher{
nsLocks: xsync.NewMapOf[string, *sync.RWMutex](),
rolloutLocks: xsync.NewMapOf[workloadKey, *sync.Mutex](),
blacklistedPods: xsync.NewMapOf[string, time.Time](),
nsLocks: xsync.NewMapOf[string, *sync.RWMutex](),
rolloutLocks: xsync.NewMapOf[workloadKey, *sync.Mutex](),
blacklistedPods: xsync.NewMapOf[string, time.Time](),
blacklistedPrefixes: xsync.NewMapOf[string, time.Time](),
}
if len(namespaces) > 0 {
for _, ns := range namespaces {
Expand Down

0 comments on commit 340c290

Please sign in to comment.