Skip to content

Commit

Permalink
Merge branch 'main' into nathan/static-targeting-default-flag
Browse files Browse the repository at this point in the history
  • Loading branch information
nathantournant authored Aug 24, 2022
2 parents ce3e0d1 + ca77fb2 commit 4aa861c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
34 changes: 31 additions & 3 deletions controllers/cache_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
k8scache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -516,11 +517,14 @@ func (r *DisruptionReconciler) manageInstanceSelectorCache(instance *chaosv1beta

// start the cache with a cancelable context and duration, and attach it to the controller as a watch source
ch := make(chan error)
cacheCtx, cacheCancelFunc := context.WithTimeout(context.Background(), instance.Spec.Duration.Duration()+*r.ExpiredDisruptionGCDelay*2)

go func() { ch <- cache.Start(cacheCtx) }()
cacheCtx, cacheCancelFunc := context.WithCancel(context.Background())
ctxTuple := CtxTuple{cacheCtx, cacheCancelFunc, disNamespacedName}

r.CacheContextStore[disCacheHash] = ctxTuple

r.CacheContextStore[disCacheHash] = CtxTuple{cacheCtx, cacheCancelFunc}
go func() { ch <- cache.Start(cacheCtx) }()
go r.cacheDeletionSafety(ctxTuple, disCacheHash)

var cacheSource source.SyncingSource
if instance.Spec.Level == chaostypes.DisruptionLevelNode {
Expand Down Expand Up @@ -567,10 +571,34 @@ func (r *DisruptionReconciler) clearExpiredCacheContexts() {
for key, contextTuple := range r.CacheContextStore {
if contextTuple.Ctx.Err() != nil {
deletionList = append(deletionList, key)
continue
}

if err := r.Get(contextTuple.Ctx, contextTuple.DisruptionNamespacedName, &chaosv1beta1.Disruption{}); err != nil {
if client.IgnoreNotFound(err) == nil {
contextTuple.CancelFunc()

deletionList = append(deletionList, key)
}
}
}

for _, key := range deletionList {
delete(r.CacheContextStore, key)
}
}

// cacheDeletionSafety is thought to be run in a goroutine to assert a cache is not running without its disruption
// the polling is living on the cache context, meaning if it's deleted elsewhere this function will return early.
func (r *DisruptionReconciler) cacheDeletionSafety(ctxTpl CtxTuple, disHash string) {
_ = wait.PollInfiniteWithContext(ctxTpl.Ctx, time.Minute, func(context.Context) (bool, error) {
if err := r.Get(ctxTpl.Ctx, ctxTpl.DisruptionNamespacedName, &chaosv1beta1.Disruption{}); err != nil {
if client.IgnoreNotFound(err) == nil {
defer ctxTpl.CancelFunc()
delete(r.CacheContextStore, disHash)
return true, nil
}
}
return false, nil
})
}
5 changes: 3 additions & 2 deletions controllers/disruption_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type DisruptionReconciler struct {
}

type CtxTuple struct {
Ctx context.Context
CancelFunc context.CancelFunc
Ctx context.Context
CancelFunc context.CancelFunc
DisruptionNamespacedName types.NamespacedName
}

//+kubebuilder:rbac:groups=chaos.datadoghq.com,resources=disruptions,verbs=get;list;watch;create;update;patch;delete
Expand Down

0 comments on commit 4aa861c

Please sign in to comment.