diff --git a/pkg/apply/applier.go b/pkg/apply/applier.go index d738a864..b169f405 100644 --- a/pkg/apply/applier.go +++ b/pkg/apply/applier.go @@ -193,11 +193,13 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje }, } // Build list of apply mutators. + // Share a thread-safe cache with the status poller. + resourceCache := cache.NewResourceCacheMap() applyMutators := []mutator.Interface{ &mutator.ApplyTimeMutator{ Client: client, Mapper: mapper, - ResourceCache: cache.NewResourceCacheMap(), + ResourceCache: resourceCache, }, } // Build the task queue by appending tasks in the proper order. @@ -221,7 +223,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje // Create a new TaskStatusRunner to execute the taskQueue. klog.V(4).Infoln("applier building TaskStatusRunner...") allIds := object.UnstructuredsToObjMetasOrDie(append(applyObjs, pruneObjs...)) - runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller) + runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller, resourceCache) klog.V(4).Infoln("applier running TaskStatusRunner...") err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{ PollInterval: options.PollInterval, diff --git a/pkg/apply/cache/resource_cache.go b/pkg/apply/cache/resource_cache.go index 94045a38..d34fb5f5 100644 --- a/pkg/apply/cache/resource_cache.go +++ b/pkg/apply/cache/resource_cache.go @@ -5,20 +5,38 @@ package cache import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/object" ) -// ResourceCache stores unstructured resource objects in memory +// ResourceStatus wraps an unstructured resource object, combined with the +// computed status (whether the status matches the spec). +type ResourceStatus struct { + // Resource is the last known value retrieved from the cluster + Resource *unstructured.Unstructured + // Status of the resource + Status status.Status + // StatusMessage is the human readable reason for the status + StatusMessage string +} + +// ResourceCache stores CachedResource objects type ResourceCache interface { - // Put the resource into the cache, generating the ObjMetadata from the object. - Put(obj *unstructured.Unstructured) error - // Set the resource in the cache using the supplied key. - Set(objMeta object.ObjMetadata, obj *unstructured.Unstructured) - // Get the resource associated with the key from the cache. - // Returns (nil, true) if not found in the cache. - Get(objMeta object.ObjMetadata) (*unstructured.Unstructured, bool) - // Remove the resource associated with the key from the cache. - Remove(objMeta object.ObjMetadata) + ResourceCacheReader + // Load one or more resources into the cache, generating the ObjMetadata + // from the objects. + Load(...ResourceStatus) error + // Put the resource into the cache using the specified ID. + Put(object.ObjMetadata, ResourceStatus) + // Remove the resource associated with the ID from the cache. + Remove(object.ObjMetadata) // Clear the cache. Clear() } + +// ResourceCacheReader retrieves CachedResource objects +type ResourceCacheReader interface { + // Get the resource associated with the ID from the cache. + // If not cached, status will be Unknown and resource will be nil. + Get(object.ObjMetadata) ResourceStatus +} diff --git a/pkg/apply/cache/resource_cache_map.go b/pkg/apply/cache/resource_cache_map.go index e12a46b0..052aedd1 100644 --- a/pkg/apply/cache/resource_cache_map.go +++ b/pkg/apply/cache/resource_cache_map.go @@ -5,75 +5,89 @@ package cache import ( "fmt" + "sync" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog/v2" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/object" ) -// ResourceCacheMap stores unstructured resource objects in a map. -// ResourceCacheMap is NOT thread-safe. +// ResourceCacheMap stores ResourceStatus objects in a map indexed by resource ID. +// ResourceCacheMap is thread-safe. type ResourceCacheMap struct { - cache map[object.ObjMetadata]*unstructured.Unstructured + mu sync.RWMutex + cache map[object.ObjMetadata]ResourceStatus } // NewResourceCacheMap returns a new empty ResourceCacheMap func NewResourceCacheMap() *ResourceCacheMap { return &ResourceCacheMap{ - cache: make(map[object.ObjMetadata]*unstructured.Unstructured), + cache: make(map[object.ObjMetadata]ResourceStatus), } } -// Load adds the resources into the cache, replacing any existing resource with -// the same ID. Returns an error if any resource is invalid. -func (rc *ResourceCacheMap) Load(values ...*unstructured.Unstructured) error { +// Load resources into the cache, generating the ID from the resource itself. +// Existing resources with the same ID will be replaced. +// Returns an error if any resource ID cannot be generated. +func (rc *ResourceCacheMap) Load(values ...ResourceStatus) error { + rc.mu.Lock() + defer rc.mu.Unlock() + for _, value := range values { - key, err := object.UnstructuredToObjMeta(value) + id, err := object.UnstructuredToObjMeta(value.Resource) if err != nil { - return fmt.Errorf("failed to create resource cache key: %w", err) + return fmt.Errorf("failed to generate resource ID: %w", err) } - rc.cache[key] = value + rc.cache[id] = value } return nil } -// Put adds the resource into the cache, replacing any existing resource with -// the same ID. Returns an error if resource is invalid. -func (rc *ResourceCacheMap) Put(value *unstructured.Unstructured) error { - key, err := object.UnstructuredToObjMeta(value) - if err != nil { - return fmt.Errorf("failed to create resource cache key: %w", err) - } - rc.Set(key, value) - return nil -} +// Put the resource into the cache using the supplied ID, replacing any +// existing resource with the same ID. +func (rc *ResourceCacheMap) Put(id object.ObjMetadata, value ResourceStatus) { + rc.mu.Lock() + defer rc.mu.Unlock() -// Set the resource in the cache using the supplied key, and replacing any -// existing resource with the same key. -func (rc *ResourceCacheMap) Set(key object.ObjMetadata, value *unstructured.Unstructured) { - rc.cache[key] = value + rc.cache[id] = value } -// Get retrieves the resource associated with the key from the cache. +// Get retrieves the resource associated with the ID from the cache. // Returns (nil, true) if not found in the cache. -func (rc *ResourceCacheMap) Get(key object.ObjMetadata) (*unstructured.Unstructured, bool) { - obj, found := rc.cache[key] +func (rc *ResourceCacheMap) Get(id object.ObjMetadata) ResourceStatus { + rc.mu.RLock() + defer rc.mu.RUnlock() + + obj, found := rc.cache[id] if klog.V(4).Enabled() { if found { - klog.Infof("resource cache hit: %s", key) + klog.Infof("resource cache hit: %s", id) } else { - klog.Infof("resource cache miss: %s", key) + klog.Infof("resource cache miss: %s", id) } } - return obj, found + if !found { + return ResourceStatus{ + Resource: nil, + Status: status.UnknownStatus, + StatusMessage: "resource not cached", + } + } + return obj } -// Remove the resource associated with the key from the cache. -func (rc *ResourceCacheMap) Remove(key object.ObjMetadata) { - delete(rc.cache, key) +// Remove the resource associated with the ID from the cache. +func (rc *ResourceCacheMap) Remove(id object.ObjMetadata) { + rc.mu.Lock() + defer rc.mu.Unlock() + + delete(rc.cache, id) } // Clear the cache. func (rc *ResourceCacheMap) Clear() { - rc.cache = make(map[object.ObjMetadata]*unstructured.Unstructured) + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.cache = make(map[object.ObjMetadata]ResourceStatus) } diff --git a/pkg/apply/destroyer.go b/pkg/apply/destroyer.go index b0b5b62a..59be7e6e 100644 --- a/pkg/apply/destroyer.go +++ b/pkg/apply/destroyer.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog/v2" cmdutil "k8s.io/kubectl/pkg/cmd/util" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/filter" "sigs.k8s.io/cli-utils/pkg/apply/poller" @@ -149,7 +150,8 @@ func (d *Destroyer) Run(inv inventory.InventoryInfo, options DestroyerOptions) < // Create a new TaskStatusRunner to execute the taskQueue. klog.V(4).Infoln("destroyer building TaskStatusRunner...") deleteIds := object.UnstructuredsToObjMetasOrDie(deleteObjs) - runner := taskrunner.NewTaskStatusRunner(deleteIds, d.statusPoller) + resourceCache := cache.NewResourceCacheMap() + runner := taskrunner.NewTaskStatusRunner(deleteIds, d.statusPoller, resourceCache) klog.V(4).Infoln("destroyer running TaskStatusRunner...") // TODO(seans): Make the poll interval configurable like the applier. err = runner.Run(context.Background(), taskQueue.ToChannel(), eventChannel, taskrunner.Options{ diff --git a/pkg/apply/mutator/apply_time_mutator.go b/pkg/apply/mutator/apply_time_mutator.go index b09b2cc9..ee8e20ef 100644 --- a/pkg/apply/mutator/apply_time_mutator.go +++ b/pkg/apply/mutator/apply_time_mutator.go @@ -10,6 +10,7 @@ import ( "fmt" "strings" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -17,6 +18,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/jsonpath" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/cli-utils/pkg/object/mutation" ) @@ -168,35 +170,77 @@ func (atm *ApplyTimeMutator) getMapping(ref mutation.ResourceReference) (*meta.R // getObject returns a cached resource, if cached and cache exists, otherwise // the resource is retrieved from the cluster. func (atm *ApplyTimeMutator) getObject(ctx context.Context, mapping *meta.RESTMapping, ref mutation.ResourceReference) (*unstructured.Unstructured, error) { - // validate resource reference - id, err := mutation.ResourceReferenceToObjMeta(ref) - if err != nil { - return nil, fmt.Errorf("failed to validate resource reference: %w", err) + // validate source reference + if ref.Name == "" { + return nil, fmt.Errorf("invalid source reference: empty name") + } + if ref.Kind == "" { + return nil, fmt.Errorf("invalid source reference: empty kind") } + id := ref.ObjMetadata() // get resource from cache if atm.ResourceCache != nil { - obj, found := atm.ResourceCache.Get(id) - if found && obj != nil { - return obj, nil + result := atm.ResourceCache.Get(id) + // Use the cached version, if current/reconciled. + // Otherwise, get it from the cluster. + if result.Resource != nil && result.Status == status.CurrentStatus { + return result.Resource, nil } } // get resource from cluster namespacedClient := atm.Client.Resource(mapping.Resource).Namespace(ref.Namespace) obj, err := namespacedClient.Get(ctx, ref.Name, metav1.GetOptions{}) - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { + // Skip NotFound so the cache gets updated. return nil, fmt.Errorf("failed to retrieve resource from cluster: %w", err) } // add resource to cache if atm.ResourceCache != nil { - atm.ResourceCache.Set(id, obj) + // If it's not cached or not current, update the cache. + // This will add external resources to the cache, + // but the user won't get status events for them. + atm.ResourceCache.Put(id, computeStatus(obj)) + } + + if err != nil { + // NotFound + return nil, fmt.Errorf("resource not found: %w", err) } return obj, nil } +// computeStatus compares the spec to the status and returns the result. +func computeStatus(obj *unstructured.Unstructured) cache.ResourceStatus { + if obj == nil { + return cache.ResourceStatus{ + Resource: obj, + Status: status.NotFoundStatus, + StatusMessage: "Resource not found", + } + } + result, err := status.Compute(obj) + if err != nil { + if klog.V(3).Enabled() { + ref := mutation.NewResourceReference(obj) + klog.Info("failed to compute resource status (%s): %d", ref, err) + } + return cache.ResourceStatus{ + Resource: obj, + Status: status.UnknownStatus, + //StatusMessage: fmt.Sprintf("Failed to compute status: %s", err), + } + } + return cache.ResourceStatus{ + Resource: obj, + Status: result.Status, + StatusMessage: result.Message, + } +} + func readFieldValue(obj *unstructured.Unstructured, path string) (interface{}, bool, error) { if path == "" { return nil, false, errors.New("empty path expression") diff --git a/pkg/apply/mutator/apply_time_mutator_test.go b/pkg/apply/mutator/apply_time_mutator_test.go index 2a2b3fc2..0ce316fa 100644 --- a/pkg/apply/mutator/apply_time_mutator_test.go +++ b/pkg/apply/mutator/apply_time_mutator_test.go @@ -428,7 +428,7 @@ func TestMutate(t *testing.T) { reason: "", // exact error message isn't very important. Feel free to update if the error text changes. errMsg: `failed to get source resource (networking.k8s.io/namespaces/ingress-namespace/Ingress/ingress1-name): ` + - `failed to retrieve resource from cluster: ` + + `resource not found: ` + `ingresses.networking.k8s.io "ingress1-name" not found`, }, "pod env var string from ingress port int": { diff --git a/pkg/apply/prune/prune_test.go b/pkg/apply/prune/prune_test.go index 2b619ab6..ed4ce98b 100644 --- a/pkg/apply/prune/prune_test.go +++ b/pkg/apply/prune/prune_test.go @@ -20,6 +20,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/fake" "k8s.io/kubectl/pkg/scheme" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/filter" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" @@ -395,7 +396,8 @@ func TestPrune(t *testing.T) { // The event channel can not block; make sure its bigger than all // the events that can be put on it. eventChannel := make(chan event.Event, len(tc.pruneObjs)+1) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) err = func() error { defer close(eventChannel) // Run the prune and validate. @@ -485,7 +487,8 @@ func TestPruneWithErrors(t *testing.T) { // The event channel can not block; make sure its bigger than all // the events that can be put on it. eventChannel := make(chan event.Event, len(tc.pruneObjs)) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) err = func() error { defer close(eventChannel) var opts Options @@ -621,7 +624,8 @@ func TestPrune_PropagationPolicy(t *testing.T) { } eventChannel := make(chan event.Event, 1) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) err := po.Prune([]*unstructured.Unstructured{pdb}, []filter.ValidationFilter{}, taskContext, Options{ PropagationPolicy: tc.propagationPolicy, }) diff --git a/pkg/apply/task/apply_task_test.go b/pkg/apply/task/apply_task_test.go index fae754c4..67818c38 100644 --- a/pkg/apply/task/apply_task_test.go +++ b/pkg/apply/task/apply_task_test.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/cli-runtime/pkg/resource" "k8s.io/kubectl/pkg/cmd/util" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/common" @@ -79,7 +80,8 @@ func TestApplyTask_BasicAppliedObjects(t *testing.T) { t.Run(tn, func(t *testing.T) { eventChannel := make(chan event.Event) defer close(eventChannel) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) objs := toUnstructureds(tc.applied) @@ -164,7 +166,8 @@ func TestApplyTask_FetchGeneration(t *testing.T) { t.Run(tn, func(t *testing.T) { eventChannel := make(chan event.Event) defer close(eventChannel) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) objs := toUnstructureds(tc.rss) @@ -193,7 +196,7 @@ func TestApplyTask_FetchGeneration(t *testing.T) { uid, _ := taskContext.ResourceUID(id) assert.Equal(t, info.uid, uid) - gen, _ := taskContext.ResourceGeneration(id) + gen, _ := taskContext.AppliedGeneration(id) assert.Equal(t, info.generation, gen) } }) @@ -275,7 +278,8 @@ func TestApplyTask_DryRun(t *testing.T) { drs := common.Strategies[i] t.Run(tn, func(t *testing.T) { eventChannel := make(chan event.Event) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{ Group: "apps", @@ -409,7 +413,8 @@ func TestApplyTaskWithError(t *testing.T) { drs := common.DryRunNone t.Run(tn, func(t *testing.T) { eventChannel := make(chan event.Event) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{ Group: "apps", diff --git a/pkg/apply/task/delete_inv_task_test.go b/pkg/apply/task/delete_inv_task_test.go index ab34fb61..b336f1c9 100644 --- a/pkg/apply/task/delete_inv_task_test.go +++ b/pkg/apply/task/delete_inv_task_test.go @@ -8,6 +8,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/common" @@ -39,7 +40,9 @@ func TestDeleteInvTask(t *testing.T) { client := inventory.NewFakeInventoryClient([]object.ObjMetadata{}) client.Err = tc.err eventChannel := make(chan event.Event) - context := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + context := taskrunner.NewTaskContext(eventChannel, resourceCache) + task := DeleteInvTask{ TaskName: taskName, InvClient: client, diff --git a/pkg/apply/task/inv_add_task_test.go b/pkg/apply/task/inv_add_task_test.go index c7753825..38036be3 100644 --- a/pkg/apply/task/inv_add_task_test.go +++ b/pkg/apply/task/inv_add_task_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/common" @@ -109,7 +110,9 @@ func TestInvAddTask(t *testing.T) { t.Run(name, func(t *testing.T) { client := inventory.NewFakeInventoryClient(tc.initialObjs) eventChannel := make(chan event.Event) - context := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + context := taskrunner.NewTaskContext(eventChannel, resourceCache) + task := InvAddTask{ TaskName: taskName, InvClient: client, diff --git a/pkg/apply/task/inv_set_task_test.go b/pkg/apply/task/inv_set_task_test.go index c365ac09..6ecbdbed 100644 --- a/pkg/apply/task/inv_set_task_test.go +++ b/pkg/apply/task/inv_set_task_test.go @@ -6,6 +6,7 @@ package task import ( "testing" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/common" @@ -105,7 +106,9 @@ func TestInvSetTask(t *testing.T) { t.Run(name, func(t *testing.T) { client := inventory.NewFakeInventoryClient([]object.ObjMetadata{}) eventChannel := make(chan event.Event) - context := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + context := taskrunner.NewTaskContext(eventChannel, resourceCache) + prevInventory := make(map[object.ObjMetadata]bool, len(tc.prevInventory)) for _, prevInvID := range tc.prevInventory { prevInventory[prevInvID] = true diff --git a/pkg/apply/task/resetmapper_task_test.go b/pkg/apply/task/resetmapper_task_test.go index 5eb68100..f18d2ef5 100644 --- a/pkg/apply/task/resetmapper_task_test.go +++ b/pkg/apply/task/resetmapper_task_test.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/discovery" "k8s.io/client-go/restmapper" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/testutil" @@ -42,7 +43,8 @@ func TestResetRESTMapperTask(t *testing.T) { t.Run(tn, func(t *testing.T) { eventChannel := make(chan event.Event) defer close(eventChannel) - taskContext := taskrunner.NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache) mapper, discoveryClient := tc.toRESTMapper() diff --git a/pkg/apply/taskrunner/collector.go b/pkg/apply/taskrunner/collector.go deleted file mode 100644 index f608f914..00000000 --- a/pkg/apply/taskrunner/collector.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2020 The Kubernetes Authors. -// SPDX-License-Identifier: Apache-2.0 - -package taskrunner - -import ( - "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" - "sigs.k8s.io/cli-utils/pkg/kstatus/status" - "sigs.k8s.io/cli-utils/pkg/object" -) - -// newResourceStatusCollector returns a new resourceStatusCollector -// that will keep track of the status of the provided resources. -func newResourceStatusCollector(identifiers []object.ObjMetadata) *resourceStatusCollector { - rm := make(map[object.ObjMetadata]resourceStatus) - - for _, obj := range identifiers { - rm[obj] = resourceStatus{ - Identifier: obj, - CurrentStatus: status.UnknownStatus, - } - } - return &resourceStatusCollector{ - resourceMap: rm, - } -} - -// resourceStatusCollector keeps track of the latest seen status for all the -// resources that is of interest during the operation. -type resourceStatusCollector struct { - resourceMap map[object.ObjMetadata]resourceStatus -} - -// resoureStatus contains the latest status for a given -// resource as identified by the Identifier. -type resourceStatus struct { - Identifier object.ObjMetadata - CurrentStatus status.Status - Message string - Generation int64 -} - -// resourceStatus updates the collector with the latest -// seen status for the given resource. -func (a *resourceStatusCollector) resourceStatus(r *event.ResourceStatus) { - if ri, found := a.resourceMap[r.Identifier]; found { - ri.CurrentStatus = r.Status - ri.Message = r.Message - ri.Generation = getGeneration(r) - a.resourceMap[r.Identifier] = ri - } -} - -// getGeneration looks up the value of the generation field in the -// provided resource status. If the resource information is not available, -// this will return 0. -func getGeneration(r *event.ResourceStatus) int64 { - if r.Resource == nil { - return 0 - } - return r.Resource.GetGeneration() -} - -// conditionMet tests whether the provided Condition holds true for -// all resources given by the list of Ids. -func (a *resourceStatusCollector) conditionMet(rwd []resourceWaitData, c Condition) bool { - switch c { - case AllCurrent: - return a.allMatchStatus(rwd, status.CurrentStatus) - case AllNotFound: - return a.allMatchStatus(rwd, status.NotFoundStatus) - default: - return a.noneMatchStatus(rwd, status.UnknownStatus) - } -} - -// allMatchStatus checks whether all resources given by the -// Ids parameter has the provided status. -func (a *resourceStatusCollector) allMatchStatus(rwd []resourceWaitData, s status.Status) bool { - for _, wd := range rwd { - ri, found := a.resourceMap[wd.identifier] - if !found { - return false - } - if ri.Generation < wd.generation || ri.CurrentStatus != s { - return false - } - } - return true -} - -// noneMatchStatus checks whether none of the resources given -// by the Ids parameters has the provided status. -func (a *resourceStatusCollector) noneMatchStatus(rwd []resourceWaitData, s status.Status) bool { - for _, wd := range rwd { - ri, found := a.resourceMap[wd.identifier] - if !found { - return false - } - if ri.Generation < wd.generation || ri.CurrentStatus == s { - return false - } - } - return true -} diff --git a/pkg/apply/taskrunner/collector_test.go b/pkg/apply/taskrunner/collector_test.go deleted file mode 100644 index 28df349b..00000000 --- a/pkg/apply/taskrunner/collector_test.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2020 The Kubernetes Authors. -// SPDX-License-Identifier: Apache-2.0 - -package taskrunner - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/cli-utils/pkg/kstatus/status" - "sigs.k8s.io/cli-utils/pkg/object" -) - -func TestCollector_ConditionMet(t *testing.T) { - identifiers := map[string]object.ObjMetadata{ - "dep": { - GroupKind: schema.GroupKind{ - Group: "apps", - Kind: "Deployment", - }, - Name: "Foo", - Namespace: "default", - }, - "custom": { - GroupKind: schema.GroupKind{ - Group: "custom.io", - Kind: "Custom", - }, - Name: "Foo", - }, - } - - testCases := map[string]struct { - collectorState map[object.ObjMetadata]resourceStatus - waitTaskData []resourceWaitData - condition Condition - expectedResult bool - }{ - "single resource with current status": { - collectorState: map[object.ObjMetadata]resourceStatus{ - identifiers["dep"]: { - Identifier: identifiers["dep"], - CurrentStatus: status.CurrentStatus, - Generation: int64(42), - }, - }, - waitTaskData: []resourceWaitData{ - { - identifier: identifiers["dep"], - generation: int64(42), - }, - }, - condition: AllCurrent, - expectedResult: true, - }, - "single resource with current status and old generation": { - collectorState: map[object.ObjMetadata]resourceStatus{ - identifiers["dep"]: { - Identifier: identifiers["dep"], - CurrentStatus: status.CurrentStatus, - Generation: int64(41), - }, - }, - waitTaskData: []resourceWaitData{ - { - identifier: identifiers["dep"], - generation: int64(42), - }, - }, - condition: AllCurrent, - expectedResult: false, - }, - "multiple resources not all current": { - collectorState: map[object.ObjMetadata]resourceStatus{ - identifiers["dep"]: { - Identifier: identifiers["dep"], - CurrentStatus: status.CurrentStatus, - Generation: int64(41), - }, - identifiers["custom"]: { - Identifier: identifiers["custom"], - CurrentStatus: status.InProgressStatus, - Generation: int64(0), - }, - }, - waitTaskData: []resourceWaitData{ - { - identifier: identifiers["dep"], - generation: int64(42), - }, - { - identifier: identifiers["custom"], - generation: int64(0), - }, - }, - condition: AllCurrent, - expectedResult: false, - }, - "multiple resources single with old generation": { - collectorState: map[object.ObjMetadata]resourceStatus{ - identifiers["dep"]: { - Identifier: identifiers["dep"], - CurrentStatus: status.CurrentStatus, - Generation: int64(42), - }, - identifiers["custom"]: { - Identifier: identifiers["custom"], - CurrentStatus: status.CurrentStatus, - Generation: int64(4), - }, - }, - waitTaskData: []resourceWaitData{ - { - identifier: identifiers["dep"], - generation: int64(42), - }, - { - identifier: identifiers["custom"], - generation: int64(5), - }, - }, - condition: AllCurrent, - expectedResult: false, - }, - } - - for tn, tc := range testCases { - t.Run(tn, func(t *testing.T) { - rsc := newResourceStatusCollector([]object.ObjMetadata{}) - rsc.resourceMap = tc.collectorState - - res := rsc.conditionMet(tc.waitTaskData, tc.condition) - - assert.Equal(t, tc.expectedResult, res) - }) - } -} diff --git a/pkg/apply/taskrunner/condition.go b/pkg/apply/taskrunner/condition.go new file mode 100644 index 00000000..1a7d6e1c --- /dev/null +++ b/pkg/apply/taskrunner/condition.go @@ -0,0 +1,95 @@ +// Copyright 2020 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package taskrunner + +import ( + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +// Condition is a type that defines the types of conditions +// which a WaitTask can use. +type Condition string + +const ( + // AllCurrent Condition means all the provided resources + // has reached (and remains in) the Current status. + AllCurrent Condition = "AllCurrent" + + // AllNotFound Condition means all the provided resources + // has reached the NotFound status, i.e. they are all deleted + // from the cluster. + AllNotFound Condition = "AllNotFound" +) + +// Meets returns true if the provided status meets the condition and +// false if it does not. +func (c Condition) Meets(s status.Status) bool { + switch c { + case AllCurrent: + return s == status.CurrentStatus + case AllNotFound: + return s == status.NotFoundStatus + default: + return false + } +} + +// conditionMet tests whether the provided Condition holds true for +// all resources in the list, according to the ResourceCache. +// Resources in the cache older that the applied generation are non-matches. +func conditionMet(taskContext *TaskContext, ids []object.ObjMetadata, c Condition) bool { + switch c { + case AllCurrent: + return allMatchStatus(taskContext, ids, status.CurrentStatus) + case AllNotFound: + return allMatchStatus(taskContext, ids, status.NotFoundStatus) + default: + return noneMatchStatus(taskContext, ids, status.UnknownStatus) + } +} + +// allMatchStatus checks whether all of the resources provided have the provided status. +// Resources with older generations are considered non-matching. +func allMatchStatus(taskContext *TaskContext, ids []object.ObjMetadata, s status.Status) bool { + for _, id := range ids { + cached := taskContext.ResourceCache().Get(id) + if cached.Status != s { + return false + } + + applyGen, _ := taskContext.AppliedGeneration(id) // generation at apply time + cachedGen := int64(0) + if cached.Resource != nil { + cachedGen = cached.Resource.GetGeneration() + } + if cachedGen < applyGen { + // cache too old + return false + } + } + return true +} + +// allMatchStatus checks whether none of the resources provided have the provided status. +// Resources with older generations are considered matching. +func noneMatchStatus(taskContext *TaskContext, ids []object.ObjMetadata, s status.Status) bool { + for _, id := range ids { + cached := taskContext.ResourceCache().Get(id) + if cached.Status == s { + return false + } + + applyGen, _ := taskContext.AppliedGeneration(id) // generation at apply time + cachedGen := int64(0) + if cached.Resource != nil { + cachedGen = cached.Resource.GetGeneration() + } + if cachedGen < applyGen { + // cache too old + return false + } + } + return true +} diff --git a/pkg/apply/taskrunner/condition_test.go b/pkg/apply/taskrunner/condition_test.go new file mode 100644 index 00000000..a8cd0d7e --- /dev/null +++ b/pkg/apply/taskrunner/condition_test.go @@ -0,0 +1,170 @@ +// Copyright 2020 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package taskrunner + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/cli-utils/pkg/apply/cache" + ktestutil "sigs.k8s.io/cli-utils/pkg/kstatus/polling/testutil" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +var deployment1y = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: Foo + namespace: default +spec: + replicas: 1 +status: + replicas: 1 + readyReplicas: 1 + updatedReplicas: 1 + availableReplicas: 1 + conditions: + - status: "True" + type: Available + - status: "True" + type: Ready +` + +var custom1y = ` +apiVersion: custom.io/v1alpha1 +kind: Custom +metadata: + name: Foo + namespace: default +spec: {} +status: +conditions: +- status: "False" + type: Ready +` + +// withGeneration returns a DeepCopy with .metadata.generation set. +func withGeneration(obj *unstructured.Unstructured, gen int64) *unstructured.Unstructured { + obj = obj.DeepCopy() + obj.SetGeneration(gen) + return obj +} + +func TestCollector_ConditionMet(t *testing.T) { + deployment1 := ktestutil.YamlToUnstructured(t, deployment1y) + deployment1Meta := object.UnstructuredToObjMetaOrDie(deployment1) + custom1 := ktestutil.YamlToUnstructured(t, custom1y) + custom1Meta := object.UnstructuredToObjMetaOrDie(custom1) + + testCases := map[string]struct { + cacheContents []cache.ResourceStatus + appliedGen map[object.ObjMetadata]int64 + ids []object.ObjMetadata + condition Condition + expectedResult bool + }{ + "single resource with current status": { + cacheContents: []cache.ResourceStatus{ + { + Resource: withGeneration(deployment1, 42), + Status: status.CurrentStatus, + }, + }, + appliedGen: map[object.ObjMetadata]int64{ + deployment1Meta: 42, + }, + ids: []object.ObjMetadata{ + deployment1Meta, + }, + condition: AllCurrent, + expectedResult: true, + }, + "single resource with current status and old generation": { + cacheContents: []cache.ResourceStatus{ + { + Resource: withGeneration(deployment1, 41), + Status: status.CurrentStatus, + }, + }, + appliedGen: map[object.ObjMetadata]int64{ + deployment1Meta: 42, + }, + ids: []object.ObjMetadata{ + deployment1Meta, + }, + condition: AllCurrent, + expectedResult: false, + }, + "multiple resources not all current": { + cacheContents: []cache.ResourceStatus{ + { + Resource: withGeneration(deployment1, 42), + Status: status.InProgressStatus, + }, + { + Resource: withGeneration(custom1, 0), + Status: status.CurrentStatus, + }, + }, + appliedGen: map[object.ObjMetadata]int64{ + deployment1Meta: 42, + custom1Meta: 0, + }, + ids: []object.ObjMetadata{ + deployment1Meta, + custom1Meta, + }, + condition: AllCurrent, + expectedResult: false, + }, + "multiple resources single with old generation": { + cacheContents: []cache.ResourceStatus{ + { + Resource: withGeneration(deployment1, 42), + Status: status.CurrentStatus, + }, + { + Resource: withGeneration(custom1, 4), + Status: status.CurrentStatus, + }, + }, + appliedGen: map[object.ObjMetadata]int64{ + deployment1Meta: 42, + custom1Meta: 5, + }, + ids: []object.ObjMetadata{ + deployment1Meta, + custom1Meta, + }, + condition: AllCurrent, + expectedResult: false, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + resourceCache := cache.NewResourceCacheMap() + if tc.cacheContents != nil { + err := resourceCache.Load(tc.cacheContents...) + assert.NoError(t, err) + } + + taskContext := NewTaskContext(nil, resourceCache) + + if tc.appliedGen != nil { + for id, gen := range tc.appliedGen { + taskContext.ResourceApplied(id, types.UID("unused"), gen) + } + } + + res := conditionMet(taskContext, tc.ids, tc.condition) + + assert.Equal(t, tc.expectedResult, res) + }) + } +} diff --git a/pkg/apply/taskrunner/context.go b/pkg/apply/taskrunner/context.go index df2dfef6..177099af 100644 --- a/pkg/apply/taskrunner/context.go +++ b/pkg/apply/taskrunner/context.go @@ -6,15 +6,18 @@ package taskrunner import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/object" ) // NewTaskContext returns a new TaskContext -func NewTaskContext(eventChannel chan event.Event) *TaskContext { +func NewTaskContext(eventChannel chan event.Event, resourceCache cache.ResourceCache) *TaskContext { return &TaskContext{ taskChannel: make(chan TaskResult), eventChannel: eventChannel, + resourceCache: resourceCache, appliedResources: make(map[object.ObjMetadata]applyInfo), failedResources: make(map[object.ObjMetadata]struct{}), pruneFailures: make(map[object.ObjMetadata]struct{}), @@ -28,6 +31,8 @@ type TaskContext struct { eventChannel chan event.Event + resourceCache cache.ResourceCache + appliedResources map[object.ObjMetadata]applyInfo // failedResources records the IDs of resources that are failed during applying. @@ -45,6 +50,10 @@ func (tc *TaskContext) EventChannel() chan event.Event { return tc.eventChannel } +func (tc *TaskContext) ResourceCache() cache.ResourceCache { + return tc.resourceCache +} + // ResourceApplied updates the context with information about the // resource identified by the provided id. Currently, we keep information // about the generation of the resource after the apply operation completed. @@ -58,6 +67,13 @@ func (tc *TaskContext) ResourceApplied(id object.ObjMetadata, uid types.UID, gen // ResourceUID looks up the UID of the given resource func (tc *TaskContext) ResourceUID(id object.ObjMetadata) (types.UID, bool) { ai, found := tc.appliedResources[id] + if klog.V(4).Enabled() { + if found { + klog.Infof("resource applied UID cache hit (%s): %d", id, ai.uid) + } else { + klog.Infof("resource applied UID cache miss: (%s): %d", id, ai.uid) + } + } if !found { return "", false } @@ -87,10 +103,17 @@ func (tc *TaskContext) AppliedResourceUIDs() sets.String { return uids } -// ResourceGeneration looks up the generation of the given resource +// AppliedGeneration looks up the generation of the given resource // after it was applied. -func (tc *TaskContext) ResourceGeneration(id object.ObjMetadata) (int64, bool) { +func (tc *TaskContext) AppliedGeneration(id object.ObjMetadata) (int64, bool) { ai, found := tc.appliedResources[id] + if klog.V(4).Enabled() { + if found { + klog.Infof("resource applied generation cache hit (%s): %d", id, ai.generation) + } else { + klog.Infof("resource applied generation cache miss: (%s): %d", id, ai.generation) + } + } if !found { return 0, false } diff --git a/pkg/apply/taskrunner/runner.go b/pkg/apply/taskrunner/runner.go index c5bb99d0..915f2334 100644 --- a/pkg/apply/taskrunner/runner.go +++ b/pkg/apply/taskrunner/runner.go @@ -9,6 +9,7 @@ import ( "sort" "time" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/poller" "sigs.k8s.io/cli-utils/pkg/kstatus/polling" @@ -18,12 +19,11 @@ import ( ) // NewTaskStatusRunner returns a new TaskStatusRunner. -func NewTaskStatusRunner(identifiers []object.ObjMetadata, statusPoller poller.Poller) *taskStatusRunner { +func NewTaskStatusRunner(identifiers []object.ObjMetadata, statusPoller poller.Poller, cache cache.ResourceCache) *taskStatusRunner { return &taskStatusRunner{ identifiers: identifiers, statusPoller: statusPoller, - - baseRunner: newBaseRunner(newResourceStatusCollector(identifiers)), + baseRunner: newBaseRunner(cache), } } @@ -71,10 +71,10 @@ func (tsr *taskStatusRunner) Run(ctx context.Context, taskQueue chan Task, // NewTaskRunner returns a new taskRunner. It can process taskqueues // that does not contain any wait tasks. +// TODO: Do we need this abstraction layer now that baseRunner doesn't need a collector? func NewTaskRunner() *taskRunner { - collector := newResourceStatusCollector([]object.ObjMetadata{}) return &taskRunner{ - baseRunner: newBaseRunner(collector), + baseRunner: newBaseRunner(cache.NewResourceCacheMap()), } } @@ -99,37 +99,44 @@ func (tr *taskRunner) Run(ctx context.Context, taskQueue chan Task, return tr.baseRunner.run(ctx, taskQueue, nilStatusChannel, eventChannel, o) } -// newBaseRunner returns a new baseRunner using the given collector. -func newBaseRunner(collector *resourceStatusCollector) *baseRunner { +// newBaseRunner returns a new baseRunner using the provided cache. +func newBaseRunner(cache cache.ResourceCache) *baseRunner { return &baseRunner{ - collector: collector, + cache: cache, } } -// baseRunner provides the basic task runner functionality. It needs -// a channel that provides resource status updates in order to support -// wait tasks, but it can also be used with a nil statusChannel for -// cases where polling and waiting for status is not needed. -// This is not meant to be used directly. It is used by the -// taskRunner and the taskStatusRunner. +// baseRunner provides the basic task runner functionality. +// +// The cache can be used by tasks to retrieve the last known resource state. +// +// This is not meant to be used directly. It is used by the taskRunner and +// taskStatusRunner. type baseRunner struct { - collector *resourceStatusCollector + cache cache.ResourceCache } type baseOptions struct { + // emitStatusEvents enables emitting events on the eventChannel emitStatusEvents bool } -// run is the main function that implements the processing of -// tasks in the taskqueue. It sets up a loop where a single goroutine -// will process events from three different channels. +// run executes the tasks in the taskqueue. +// +// The tasks run in a loop where a single goroutine will process events from +// three different channels. +// - taskQueue is read to allow updating the task queue at runtime. +// - statusChannel is read to allow updates to the resource cache and triggering +// validation of wait conditions. +// - eventChannel is written to with events based on status updates, if +// emitStatusEvents is true. func (b *baseRunner) run(ctx context.Context, taskQueue chan Task, statusChannel <-chan pollevent.Event, eventChannel chan event.Event, o baseOptions) error { // taskContext is passed into all tasks when they are started. It // provides access to the eventChannel and the taskChannel, and // also provides a way to pass data between tasks. - taskContext := NewTaskContext(eventChannel) + taskContext := NewTaskContext(eventChannel, b.cache) // Find and start the first task in the queue. currentTask, done := b.nextTask(taskQueue, taskContext) @@ -192,14 +199,22 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task, } } - // The collector needs to keep track of the latest status - // for all resources so we can check whether wait task conditions - // has been met. - b.collector.resourceStatus(statusEvent.Resource) + // Update the cache to track the latest resource spec & status. + // Status is computed from the resource on-demand. + // Warning: Resource may be nil! + taskContext.ResourceCache().Put( + statusEvent.Resource.Identifier, + cache.ResourceStatus{ + Resource: statusEvent.Resource.Resource, + Status: statusEvent.Resource.Status, + StatusMessage: statusEvent.Resource.Message, + }, + ) + // If the current task is a wait task, we check whether // the condition has been met. If so, we complete the task. if wt, ok := currentTask.(*WaitTask); ok { - if wt.checkCondition(taskContext, b.collector) { + if wt.checkCondition(taskContext) { completeIfWaitTask(currentTask, taskContext) } } @@ -220,7 +235,7 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task, }, } if msg.Err != nil { - b.amendTimeoutError(msg.Err) + b.amendTimeoutError(taskContext, msg.Err) return msg.Err } if abort { @@ -243,21 +258,18 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task, } } -func (b *baseRunner) amendTimeoutError(err error) { +func (b *baseRunner) amendTimeoutError(taskContext *TaskContext, err error) { if timeoutErr, ok := err.(*TimeoutError); ok { var timedOutResources []TimedOutResource for _, id := range timeoutErr.Identifiers { - ls, found := b.collector.resourceMap[id] - if !found { - continue - } - if timeoutErr.Condition.Meets(ls.CurrentStatus) { + result := taskContext.ResourceCache().Get(id) + if timeoutErr.Condition.Meets(result.Status) { continue } timedOutResources = append(timedOutResources, TimedOutResource{ Identifier: id, - Status: ls.CurrentStatus, - Message: ls.Message, + Status: result.Status, + Message: result.StatusMessage, }) } timeoutErr.TimedOutResources = timedOutResources @@ -303,7 +315,7 @@ func (b *baseRunner) nextTask(taskQueue chan Task, // starting a new wait task, we check if the condition is already // met. Without this check, a task might end up waiting for // status events when the condition is in fact already met. - if st.checkCondition(taskContext, b.collector) { + if st.checkCondition(taskContext) { st.startAndComplete(taskContext) } else { st.Start(taskContext) diff --git a/pkg/apply/taskrunner/runner_test.go b/pkg/apply/taskrunner/runner_test.go index 4f99fb15..ed1572e6 100644 --- a/pkg/apply/taskrunner/runner_test.go +++ b/pkg/apply/taskrunner/runner_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" "sigs.k8s.io/cli-utils/pkg/kstatus/status" @@ -97,7 +98,7 @@ func TestBaseRunner(t *testing.T) { event.ActionGroupType, }, }, - "wait task times out eventually": { + "wait task times out eventually (Unknown)": { identifiers: []object.ObjMetadata{depID, cmID}, tasks: []Task{ NewWaitTask("wait", []object.ObjMetadata{depID, cmID}, AllCurrent, @@ -121,6 +122,42 @@ func TestBaseRunner(t *testing.T) { { Identifier: depID, Status: status.UnknownStatus, + Message: "resource not cached", + }, + }, + expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent", + }, + "wait task times out eventually (InProgress)": { + identifiers: []object.ObjMetadata{depID, cmID}, + tasks: []Task{ + NewWaitTask("wait", []object.ObjMetadata{depID, cmID}, AllCurrent, + 2*time.Second, testutil.NewFakeRESTMapper()), + }, + statusEventsDelay: time.Second, + statusEvents: []pollevent.Event{ + { + EventType: pollevent.ResourceUpdateEvent, + Resource: &pollevent.ResourceStatus{ + Identifier: cmID, + Status: status.CurrentStatus, + }, + }, + { + EventType: pollevent.ResourceUpdateEvent, + Resource: &pollevent.ResourceStatus{ + Identifier: depID, + Status: status.InProgressStatus, + }, + }, + }, + expectedEventTypes: []event.Type{ + event.StatusType, + }, + expectedError: &TimeoutError{}, + expectedTimedOutResources: []TimedOutResource{ + { + Identifier: depID, + Status: status.InProgressStatus, }, }, expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent", @@ -174,7 +211,7 @@ func TestBaseRunner(t *testing.T) { for tn, tc := range testCases { t.Run(tn, func(t *testing.T) { - runner := newBaseRunner(newResourceStatusCollector(tc.identifiers)) + runner := newBaseRunner(cache.NewResourceCacheMap()) eventChannel := make(chan event.Event) taskQueue := make(chan Task, len(tc.tasks)) for _, tsk := range tc.tasks { @@ -346,7 +383,7 @@ func TestBaseRunnerCancellation(t *testing.T) { for tn, tc := range testCases { t.Run(tn, func(t *testing.T) { - runner := newBaseRunner(newResourceStatusCollector(tc.identifiers)) + runner := newBaseRunner(cache.NewResourceCacheMap()) eventChannel := make(chan event.Event) taskQueue := make(chan Task, len(tc.tasks)) diff --git a/pkg/apply/taskrunner/task.go b/pkg/apply/taskrunner/task.go index f8f06ff0..f627bf47 100644 --- a/pkg/apply/taskrunner/task.go +++ b/pkg/apply/taskrunner/task.go @@ -14,7 +14,6 @@ import ( "k8s.io/client-go/restmapper" "k8s.io/klog/v2" "sigs.k8s.io/cli-utils/pkg/apply/event" - "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/object" ) @@ -129,33 +128,24 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) { } // checkCondition checks whether the condition set in the task -// is currently met given the status of resources in the collector. -func (w *WaitTask) checkCondition(taskContext *TaskContext, coll *resourceStatusCollector) bool { - rwd := w.computeResourceWaitData(taskContext) - return coll.conditionMet(rwd, w.Condition) +// is currently met given the status of resources in the cache. +func (w *WaitTask) checkCondition(taskContext *TaskContext) bool { + return conditionMet(taskContext, w.pending(taskContext), w.Condition) } -// computeResourceWaitData creates a slice of resourceWaitData for -// the resources that is relevant to this wait task. The objective is -// to match each resource with the generation seen after the resource -// was applied. -func (w *WaitTask) computeResourceWaitData(taskContext *TaskContext) []resourceWaitData { - var rwd []resourceWaitData +// pending returns the set of resources being waited on excluding +// apply/delete failures. This includes resources which are skipped because of +// filtering. +func (w *WaitTask) pending(taskContext *TaskContext) []object.ObjMetadata { + var ids []object.ObjMetadata for _, id := range w.Ids { - // Skip checking condition for resources which have failed - // to apply or failed to prune/delete (depending on wait condition). - // This includes resources which are skipped because of filtering. if (w.Condition == AllCurrent && taskContext.ResourceFailed(id)) || (w.Condition == AllNotFound && taskContext.PruneFailed(id)) { continue } - gen, _ := taskContext.ResourceGeneration(id) - rwd = append(rwd, resourceWaitData{ - identifier: id, - generation: gen, - }) + ids = append(ids, id) } - return rwd + return ids } // startAndComplete is invoked when the condition is already @@ -203,39 +193,6 @@ func (w *WaitTask) ClearTimeout() { w.cancelFunc() } -type resourceWaitData struct { - identifier object.ObjMetadata - generation int64 -} - -// Condition is a type that defines the types of conditions -// which a WaitTask can use. -type Condition string - -const ( - // AllCurrent Condition means all the provided resources - // has reached (and remains in) the Current status. - AllCurrent Condition = "AllCurrent" - - // AllNotFound Condition means all the provided resources - // has reached the NotFound status, i.e. they are all deleted - // from the cluster. - AllNotFound Condition = "AllNotFound" -) - -// Meets returns true if the provided status meets the condition and -// false if it does not. -func (c Condition) Meets(s status.Status) bool { - switch c { - case AllCurrent: - return s == status.CurrentStatus - case AllNotFound: - return s == status.NotFoundStatus - default: - return false - } -} - // extractDeferredDiscoveryRESTMapper unwraps the provided RESTMapper // interface to get access to the underlying DeferredDiscoveryRESTMapper // that can be reset. diff --git a/pkg/apply/taskrunner/task_test.go b/pkg/apply/taskrunner/task_test.go index bdde8504..4cf0a6b1 100644 --- a/pkg/apply/taskrunner/task_test.go +++ b/pkg/apply/taskrunner/task_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "sigs.k8s.io/cli-utils/pkg/apply/cache" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/cli-utils/pkg/testutil" @@ -18,7 +19,8 @@ func TestWaitTask_TimeoutTriggered(t *testing.T) { 2*time.Second, testutil.NewFakeRESTMapper()) eventChannel := make(chan event.Event) - taskContext := NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := NewTaskContext(eventChannel, resourceCache) defer close(eventChannel) task.Start(taskContext) @@ -41,7 +43,8 @@ func TestWaitTask_TimeoutCancelled(t *testing.T) { 2*time.Second, testutil.NewFakeRESTMapper()) eventChannel := make(chan event.Event) - taskContext := NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := NewTaskContext(eventChannel, resourceCache) defer close(eventChannel) task.Start(taskContext) @@ -61,7 +64,8 @@ func TestWaitTask_SingleTaskResult(t *testing.T) { 2*time.Second, testutil.NewFakeRESTMapper()) eventChannel := make(chan event.Event) - taskContext := NewTaskContext(eventChannel) + resourceCache := cache.NewResourceCacheMap() + taskContext := NewTaskContext(eventChannel, resourceCache) taskContext.taskChannel = make(chan TaskResult, 10) defer close(eventChannel)