Skip to content

Commit

Permalink
Merge Collector and ResourceCache
Browse files Browse the repository at this point in the history
- Make ResourceCache thread-safe
- Make ResourceCache store status and messagei
- Add ResourceCache to baseRunner and TaskContext
- Make Mutator compute resource status for uncached resources
- Share cache between StatusPoller and Mutator
- Move Condition and conditionMet() to its own file
- Simplify WaitTask.checkCondition
- Simplify baseRunner.amendTimeoutError
  • Loading branch information
karlkfi committed Sep 30, 2021
1 parent 73a24dc commit d964b03
Show file tree
Hide file tree
Showing 21 changed files with 564 additions and 409 deletions.
6 changes: 4 additions & 2 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,13 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
},
}
// Build list of apply mutators.
// Share a thread-safe cache with the status poller.
resourceCache := cache.NewResourceCacheMap()
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Client: client,
Mapper: mapper,
ResourceCache: cache.NewResourceCacheMap(),
ResourceCache: resourceCache,
},
}
// Build the task queue by appending tasks in the proper order.
Expand All @@ -221,7 +223,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredsToObjMetasOrDie(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller)
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller, resourceCache)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{
PollInterval: options.PollInterval,
Expand Down
38 changes: 28 additions & 10 deletions pkg/apply/cache/resource_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,38 @@ package cache

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

// ResourceCache stores unstructured resource objects in memory
// ResourceStatus wraps an unstructured resource object, combined with the
// computed status (whether the status matches the spec).
type ResourceStatus struct {
// Resource is the last known value retrieved from the cluster
Resource *unstructured.Unstructured
// Status of the resource
Status status.Status
// StatusMessage is the human readable reason for the status
StatusMessage string
}

// ResourceCache stores CachedResource objects
type ResourceCache interface {
// Put the resource into the cache, generating the ObjMetadata from the object.
Put(obj *unstructured.Unstructured) error
// Set the resource in the cache using the supplied key.
Set(objMeta object.ObjMetadata, obj *unstructured.Unstructured)
// Get the resource associated with the key from the cache.
// Returns (nil, true) if not found in the cache.
Get(objMeta object.ObjMetadata) (*unstructured.Unstructured, bool)
// Remove the resource associated with the key from the cache.
Remove(objMeta object.ObjMetadata)
ResourceCacheReader
// Load one or more resources into the cache, generating the ObjMetadata
// from the objects.
Load(...ResourceStatus) error
// Put the resource into the cache using the specified ID.
Put(object.ObjMetadata, ResourceStatus)
// Remove the resource associated with the ID from the cache.
Remove(object.ObjMetadata)
// Clear the cache.
Clear()
}

// ResourceCacheReader retrieves CachedResource objects
type ResourceCacheReader interface {
// Get the resource associated with the ID from the cache.
// If not cached, status will be Unknown and resource will be nil.
Get(object.ObjMetadata) ResourceStatus
}
84 changes: 49 additions & 35 deletions pkg/apply/cache/resource_cache_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,89 @@ package cache

import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

// ResourceCacheMap stores unstructured resource objects in a map.
// ResourceCacheMap is NOT thread-safe.
// ResourceCacheMap stores ResourceStatus objects in a map indexed by resource ID.
// ResourceCacheMap is thread-safe.
type ResourceCacheMap struct {
cache map[object.ObjMetadata]*unstructured.Unstructured
mu sync.RWMutex
cache map[object.ObjMetadata]ResourceStatus
}

// NewResourceCacheMap returns a new empty ResourceCacheMap
func NewResourceCacheMap() *ResourceCacheMap {
return &ResourceCacheMap{
cache: make(map[object.ObjMetadata]*unstructured.Unstructured),
cache: make(map[object.ObjMetadata]ResourceStatus),
}
}

// Load adds the resources into the cache, replacing any existing resource with
// the same ID. Returns an error if any resource is invalid.
func (rc *ResourceCacheMap) Load(values ...*unstructured.Unstructured) error {
// Load resources into the cache, generating the ID from the resource itself.
// Existing resources with the same ID will be replaced.
// Returns an error if any resource ID cannot be generated.
func (rc *ResourceCacheMap) Load(values ...ResourceStatus) error {
rc.mu.Lock()
defer rc.mu.Unlock()

for _, value := range values {
key, err := object.UnstructuredToObjMeta(value)
id, err := object.UnstructuredToObjMeta(value.Resource)
if err != nil {
return fmt.Errorf("failed to create resource cache key: %w", err)
return fmt.Errorf("failed to generate resource ID: %w", err)
}
rc.cache[key] = value
rc.cache[id] = value
}
return nil
}

// Put adds the resource into the cache, replacing any existing resource with
// the same ID. Returns an error if resource is invalid.
func (rc *ResourceCacheMap) Put(value *unstructured.Unstructured) error {
key, err := object.UnstructuredToObjMeta(value)
if err != nil {
return fmt.Errorf("failed to create resource cache key: %w", err)
}
rc.Set(key, value)
return nil
}
// Put the resource into the cache using the supplied ID, replacing any
// existing resource with the same ID.
func (rc *ResourceCacheMap) Put(id object.ObjMetadata, value ResourceStatus) {
rc.mu.Lock()
defer rc.mu.Unlock()

// Set the resource in the cache using the supplied key, and replacing any
// existing resource with the same key.
func (rc *ResourceCacheMap) Set(key object.ObjMetadata, value *unstructured.Unstructured) {
rc.cache[key] = value
rc.cache[id] = value
}

// Get retrieves the resource associated with the key from the cache.
// Get retrieves the resource associated with the ID from the cache.
// Returns (nil, true) if not found in the cache.
func (rc *ResourceCacheMap) Get(key object.ObjMetadata) (*unstructured.Unstructured, bool) {
obj, found := rc.cache[key]
func (rc *ResourceCacheMap) Get(id object.ObjMetadata) ResourceStatus {
rc.mu.RLock()
defer rc.mu.RUnlock()

obj, found := rc.cache[id]
if klog.V(4).Enabled() {
if found {
klog.Infof("resource cache hit: %s", key)
klog.Infof("resource cache hit: %s", id)
} else {
klog.Infof("resource cache miss: %s", key)
klog.Infof("resource cache miss: %s", id)
}
}
return obj, found
if !found {
return ResourceStatus{
Resource: nil,
Status: status.UnknownStatus,
StatusMessage: "resource not cached",
}
}
return obj
}

// Remove the resource associated with the key from the cache.
func (rc *ResourceCacheMap) Remove(key object.ObjMetadata) {
delete(rc.cache, key)
// Remove the resource associated with the ID from the cache.
func (rc *ResourceCacheMap) Remove(id object.ObjMetadata) {
rc.mu.Lock()
defer rc.mu.Unlock()

delete(rc.cache, id)
}

// Clear the cache.
func (rc *ResourceCacheMap) Clear() {
rc.cache = make(map[object.ObjMetadata]*unstructured.Unstructured)
rc.mu.Lock()
defer rc.mu.Unlock()

rc.cache = make(map[object.ObjMetadata]ResourceStatus)
}
4 changes: 3 additions & 1 deletion pkg/apply/destroyer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
Expand Down Expand Up @@ -149,7 +150,8 @@ func (d *Destroyer) Run(inv inventory.InventoryInfo, options DestroyerOptions) <
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("destroyer building TaskStatusRunner...")
deleteIds := object.UnstructuredsToObjMetasOrDie(deleteObjs)
runner := taskrunner.NewTaskStatusRunner(deleteIds, d.statusPoller)
resourceCache := cache.NewResourceCacheMap()
runner := taskrunner.NewTaskStatusRunner(deleteIds, d.statusPoller, resourceCache)
klog.V(4).Infoln("destroyer running TaskStatusRunner...")
// TODO(seans): Make the poll interval configurable like the applier.
err = runner.Run(context.Background(), taskQueue.ToChannel(), eventChannel, taskrunner.Options{
Expand Down
62 changes: 53 additions & 9 deletions pkg/apply/mutator/apply_time_mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"fmt"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/jsonpath"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
)
Expand Down Expand Up @@ -168,35 +170,77 @@ func (atm *ApplyTimeMutator) getMapping(ref mutation.ResourceReference) (*meta.R
// getObject returns a cached resource, if cached and cache exists, otherwise
// the resource is retrieved from the cluster.
func (atm *ApplyTimeMutator) getObject(ctx context.Context, mapping *meta.RESTMapping, ref mutation.ResourceReference) (*unstructured.Unstructured, error) {
// validate resource reference
id, err := mutation.ResourceReferenceToObjMeta(ref)
if err != nil {
return nil, fmt.Errorf("failed to validate resource reference: %w", err)
// validate source reference
if ref.Name == "" {
return nil, fmt.Errorf("invalid source reference: empty name")
}
if ref.Kind == "" {
return nil, fmt.Errorf("invalid source reference: empty kind")
}
id := ref.ObjMetadata()

// get resource from cache
if atm.ResourceCache != nil {
obj, found := atm.ResourceCache.Get(id)
if found && obj != nil {
return obj, nil
result := atm.ResourceCache.Get(id)
// Use the cached version, if current/reconciled.
// Otherwise, get it from the cluster.
if result.Resource != nil && result.Status == status.CurrentStatus {
return result.Resource, nil
}
}

// get resource from cluster
namespacedClient := atm.Client.Resource(mapping.Resource).Namespace(ref.Namespace)
obj, err := namespacedClient.Get(ctx, ref.Name, metav1.GetOptions{})
if err != nil {
if err != nil && !apierrors.IsNotFound(err) {
// Skip NotFound so the cache gets updated.
return nil, fmt.Errorf("failed to retrieve resource from cluster: %w", err)
}

// add resource to cache
if atm.ResourceCache != nil {
atm.ResourceCache.Set(id, obj)
// If it's not cached or not current, update the cache.
// This will add external resources to the cache,
// but the user won't get status events for them.
atm.ResourceCache.Put(id, computeStatus(obj))
}

if err != nil {
// NotFound
return nil, fmt.Errorf("resource not found: %w", err)
}

return obj, nil
}

// computeStatus compares the spec to the status and returns the result.
func computeStatus(obj *unstructured.Unstructured) cache.ResourceStatus {
if obj == nil {
return cache.ResourceStatus{
Resource: obj,
Status: status.NotFoundStatus,
StatusMessage: "Resource not found",
}
}
result, err := status.Compute(obj)
if err != nil {
if klog.V(3).Enabled() {
ref := mutation.NewResourceReference(obj)
klog.Info("failed to compute resource status (%s): %d", ref, err)
}
return cache.ResourceStatus{
Resource: obj,
Status: status.UnknownStatus,
//StatusMessage: fmt.Sprintf("Failed to compute status: %s", err),
}
}
return cache.ResourceStatus{
Resource: obj,
Status: result.Status,
StatusMessage: result.Message,
}
}

func readFieldValue(obj *unstructured.Unstructured, path string) (interface{}, bool, error) {
if path == "" {
return nil, false, errors.New("empty path expression")
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/mutator/apply_time_mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestMutate(t *testing.T) {
reason: "",
// exact error message isn't very important. Feel free to update if the error text changes.
errMsg: `failed to get source resource (networking.k8s.io/namespaces/ingress-namespace/Ingress/ingress1-name): ` +
`failed to retrieve resource from cluster: ` +
`resource not found: ` +
`ingresses.networking.k8s.io "ingress1-name" not found`,
},
"pod env var string from ingress port int": {
Expand Down
10 changes: 7 additions & 3 deletions pkg/apply/prune/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/fake"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
Expand Down Expand Up @@ -395,7 +396,8 @@ func TestPrune(t *testing.T) {
// The event channel can not block; make sure its bigger than all
// the events that can be put on it.
eventChannel := make(chan event.Event, len(tc.pruneObjs)+1)
taskContext := taskrunner.NewTaskContext(eventChannel)
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
err = func() error {
defer close(eventChannel)
// Run the prune and validate.
Expand Down Expand Up @@ -485,7 +487,8 @@ func TestPruneWithErrors(t *testing.T) {
// The event channel can not block; make sure its bigger than all
// the events that can be put on it.
eventChannel := make(chan event.Event, len(tc.pruneObjs))
taskContext := taskrunner.NewTaskContext(eventChannel)
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
err = func() error {
defer close(eventChannel)
var opts Options
Expand Down Expand Up @@ -621,7 +624,8 @@ func TestPrune_PropagationPolicy(t *testing.T) {
}

eventChannel := make(chan event.Event, 1)
taskContext := taskrunner.NewTaskContext(eventChannel)
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
err := po.Prune([]*unstructured.Unstructured{pdb}, []filter.ValidationFilter{}, taskContext, Options{
PropagationPolicy: tc.propagationPolicy,
})
Expand Down
Loading

0 comments on commit d964b03

Please sign in to comment.