From 2f1335795dcd2ea8721063cd3ef4ffcd4f0f1de6 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Mon, 26 Jul 2021 20:34:52 -0700 Subject: [PATCH] Moves apply failure inventory calculation into inventory task --- pkg/apply/solver/solver.go | 20 ++-- pkg/apply/task/apply_task.go | 58 ++--------- pkg/apply/task/apply_task_test.go | 148 +++------------------------- pkg/apply/task/inv_set_task.go | 23 ++++- pkg/apply/task/inv_set_task_test.go | 65 +++++++++++- pkg/apply/taskrunner/context.go | 8 ++ 6 files changed, 118 insertions(+), 204 deletions(-) diff --git a/pkg/apply/solver/solver.go b/pkg/apply/solver/solver.go index 9cc93e67..1319ca79 100644 --- a/pkg/apply/solver/solver.go +++ b/pkg/apply/solver/solver.go @@ -118,11 +118,17 @@ func (t *TaskQueueBuilder) AppendInvAddTask(inv inventory.InventoryInfo, applyOb // Returns a pointer to the Builder to chain function calls. func (t *TaskQueueBuilder) AppendInvSetTask(inv inventory.InventoryInfo, dryRun common.DryRunStrategy) *TaskQueueBuilder { klog.V(2).Infoln("adding inventory set task") + prevInvIds, _ := t.InvClient.GetClusterObjs(inv, dryRun) + prevInventory := make(map[object.ObjMetadata]bool, len(prevInvIds)) + for _, prevInvID := range prevInvIds { + prevInventory[prevInvID] = true + } t.tasks = append(t.tasks, &task.InvSetTask{ - TaskName: fmt.Sprintf("inventory-set-%d", t.invSetCounter), - InvClient: t.InvClient, - InvInfo: inv, - DryRun: dryRun, + TaskName: fmt.Sprintf("inventory-set-%d", t.invSetCounter), + InvClient: t.InvClient, + InvInfo: inv, + PrevInventory: prevInventory, + DryRun: dryRun, }) t.invSetCounter += 1 return t @@ -147,15 +153,9 @@ func (t *TaskQueueBuilder) AppendDeleteInvTask(inv inventory.InventoryInfo, dryR func (t *TaskQueueBuilder) AppendApplyTask(inv inventory.InventoryInfo, applyObjs []*unstructured.Unstructured, o Options) *TaskQueueBuilder { klog.V(2).Infof("adding apply task (%d objects)", len(applyObjs)) - prevInvIds, _ := t.InvClient.GetClusterObjs(inv, o.DryRunStrategy) - prevInventory := make(map[object.ObjMetadata]bool, len(prevInvIds)) - for _, prevInvID := range prevInvIds { - prevInventory[prevInvID] = true - } t.tasks = append(t.tasks, &task.ApplyTask{ TaskName: fmt.Sprintf("apply-%d", t.applyCounter), Objects: applyObjs, - PrevInventory: prevInventory, ServerSideOptions: o.ServerSideOptions, DryRunStrategy: o.DryRunStrategy, InfoHelper: t.InfoHelper, diff --git a/pkg/apply/task/apply_task.go b/pkg/apply/task/apply_task.go index 6fa65734..cb021f07 100644 --- a/pkg/apply/task/apply_task.go +++ b/pkg/apply/task/apply_task.go @@ -47,12 +47,10 @@ type applyOptions interface { type ApplyTask struct { TaskName string - Factory util.Factory - InfoHelper info.InfoHelper - Mapper meta.RESTMapper - Objects []*unstructured.Unstructured - // Used for determining inventory during errors - PrevInventory map[object.ObjMetadata]bool + Factory util.Factory + InfoHelper info.InfoHelper + Mapper meta.RESTMapper + Objects []*unstructured.Unstructured DryRunStrategy common.DryRunStrategy ServerSideOptions common.ServerSideOptions InventoryPolicy inventory.InventoryPolicy @@ -102,10 +100,6 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { a.sendTaskResult(taskContext) return } - - klog.V(4).Infof("attempting to apply %d remaining objects", len(objects)) - // invInfos stores the objects which should be stored in the final inventory. - invInfos := make(map[object.ObjMetadata]*resource.Info, len(objects)) for _, obj := range objects { // Set the client and mapping fields on the provided // info so they can be applied to the cluster. @@ -121,7 +115,6 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { taskContext.CaptureResourceFailure(id) continue } - clusterObj, err := getClusterObj(dynamic, info) if err != nil { if !apierrors.IsNotFound(err) { @@ -129,20 +122,11 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { klog.Errorf("error (%s) retrieving %s/%s from cluster--continue", err, info.Namespace, info.Name) } - if a.objInCluster(id) { - // Object in cluster stays in the inventory. - klog.V(4).Infof("%s/%s apply retrieval failure, but in cluster--keep in inventory", - info.Namespace, info.Name) - invInfos[id] = info - } taskContext.EventChannel() <- createApplyFailedEvent(id, err) taskContext.CaptureResourceFailure(id) continue } } - // At this point the object was either 1) successfully retrieved from the cluster, or - // 2) returned "Not Found" error (meaning first-time creation). Add to final inventory. - invInfos[id] = info canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy) if !canApply { klog.V(5).Infof("can not apply %s/%s--continue", @@ -171,30 +155,16 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { if klog.V(4).Enabled() { klog.Errorf("error applying (%s/%s) %s", info.Namespace, info.Name, err) } - // If apply failed and the object is not in the cluster, remove - // it from the final inventory. - if !a.objInCluster(id) { - klog.V(5).Infof("not in cluster; removing apply fail object %s/%s from inventory", - info.Namespace, info.Name) - delete(invInfos, id) - } taskContext.EventChannel() <- createApplyFailedEvent(id, applyerror.NewApplyRunError(err)) taskContext.CaptureResourceFailure(id) - } - } - - // Store objects (and some obj metadata) in the task context - // for the final inventory. - for id, info := range invInfos { - if info.Object != nil { + } else if info.Object != nil { acc, err := meta.Accessor(info.Object) - if err != nil { - continue + if err == nil { + uid := acc.GetUID() + gen := acc.GetGeneration() + taskContext.ResourceApplied(id, uid, gen) } - uid := acc.GetUID() - gen := acc.GetGeneration() - taskContext.ResourceApplied(id, uid, gen) } } a.sendTaskResult(taskContext) @@ -253,16 +223,6 @@ func (a *ApplyTask) sendTaskResult(taskContext *taskrunner.TaskContext) { taskContext.TaskChannel() <- taskrunner.TaskResult{} } -// objInCluster returns true if the passed object is in the slice of -// previous inventory, because an object in the previous inventory -// exists in the cluster. -func (a *ApplyTask) objInCluster(obj object.ObjMetadata) bool { - if _, found := a.PrevInventory[obj]; found { - return true - } - return false -} - // ClearTimeout is not supported by the ApplyTask. func (a *ApplyTask) ClearTimeout() {} diff --git a/pkg/apply/task/apply_task_test.go b/pkg/apply/task/apply_task_test.go index 5ef04fea..0eb3a955 100644 --- a/pkg/apply/task/apply_task_test.go +++ b/pkg/apply/task/apply_task_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -128,140 +127,6 @@ func TestApplyTask_BasicAppliedObjects(t *testing.T) { } } -// Checks the inventory stored in the task context applied -// resources is correct, given a retrieval error and -// a specific previous inventory. Also, an apply failure -// for an object in the previous inventory should remain -// in the inventory, while an apply failure that is not -// in the previous inventory (creation) should not be -// in the final inventory. -func TestApplyTask_ApplyFailuresAndInventory(t *testing.T) { - resInfo := resourceInfo{ - group: "apps", - apiVersion: "apps/v1", - kind: "Deployment", - name: "foo", - namespace: "default", - uid: types.UID("my-uid"), - generation: int64(42), - } - resID, _ := object.CreateObjMetadata("default", "foo", - schema.GroupKind{Group: "apps", Kind: "Deployment"}) - applyFailInfo := resourceInfo{ - group: "apps", - apiVersion: "apps/v1", - kind: "Deployment", - name: "failure", - namespace: "default", - uid: types.UID("my-uid"), - generation: int64(42), - } - applyFailID, _ := object.CreateObjMetadata("default", "failure", - schema.GroupKind{Group: "apps", Kind: "Deployment"}) - - testCases := map[string]struct { - applied []resourceInfo - prevInventory []object.ObjMetadata - expected []object.ObjMetadata - err error - }{ - "not found error with successful apply is in final inventory": { - applied: []resourceInfo{resInfo}, - prevInventory: []object.ObjMetadata{}, - expected: []object.ObjMetadata{resID}, - err: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pod"}, "fake"), - }, - "unknown error, but in previous inventory: object is in final inventory": { - applied: []resourceInfo{resInfo}, - prevInventory: []object.ObjMetadata{resID}, - expected: []object.ObjMetadata{resID}, - err: apierrors.NewUnauthorized("not authorized"), - }, - "unknown error, not in previous inventory: object is NOT in final inventory": { - applied: []resourceInfo{resInfo}, - prevInventory: []object.ObjMetadata{}, - expected: []object.ObjMetadata{}, - err: apierrors.NewUnauthorized("not authorized"), - }, - "apply failure, in previous inventory: object is in final inventory": { - applied: []resourceInfo{applyFailInfo}, - prevInventory: []object.ObjMetadata{applyFailID}, - expected: []object.ObjMetadata{applyFailID}, - err: nil, - }, - "apply failure, not in previous inventory: object is NOT in final inventory": { - applied: []resourceInfo{applyFailInfo}, - prevInventory: []object.ObjMetadata{}, - expected: []object.ObjMetadata{}, - err: nil, - }, - } - - for tn, tc := range testCases { - t.Run(tn, func(t *testing.T) { - eventChannel := make(chan event.Event) - taskContext := taskrunner.NewTaskContext(eventChannel) - - objs := toUnstructureds(tc.applied) - - oldAO := applyOptionsFactoryFunc - applyOptionsFactoryFunc = func(chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, dynamic.Interface, error) { - return &fakeApplyOptions{}, nil, nil - } - defer func() { applyOptionsFactoryFunc = oldAO }() - - restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{ - Group: "apps", - Version: "v1", - Kind: "Deployment", - }) - - prevInv := map[object.ObjMetadata]bool{} - for _, id := range tc.prevInventory { - prevInv[id] = true - } - applyTask := &ApplyTask{ - Objects: objs, - PrevInventory: prevInv, - Mapper: restMapper, - InfoHelper: &fakeInfoHelper{}, - InvInfo: &fakeInventoryInfo{}, - } - - getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) { - return objs[0], nil - } - if tc.err != nil { - getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) { - return nil, tc.err - } - } - - var events []event.Event - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for msg := range eventChannel { - events = append(events, msg) - } - }() - - applyTask.Start(taskContext) - <-taskContext.TaskChannel() - close(eventChannel) - wg.Wait() - - // The applied resources should be stored in the TaskContext - // for the final inventory. - actual := taskContext.AppliedResources() - if !object.SetEquals(tc.expected, actual) { - t.Errorf("expected (%s) inventory resources, got (%s)", tc.expected, actual) - } - }) - } -} - func TestApplyTask_FetchGeneration(t *testing.T) { testCases := map[string]struct { rss []resourceInfo @@ -321,7 +186,12 @@ func TestApplyTask_FetchGeneration(t *testing.T) { } getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) { - return objs[0], nil + for _, obj := range objs { + if info.Name == obj.GetName() && info.Namespace == obj.GetNamespace() { + return obj, nil + } + } + return nil, nil } applyTask.Start(taskContext) @@ -801,7 +671,11 @@ func TestApplyTaskWithDifferentInventoryAnnotation(t *testing.T) { assert.Equal(t, tc.expectedEvents[i].ApplyEvent.Error.Error(), e.ApplyEvent.Error.Error()) } actualUids := taskContext.AppliedResourceUIDs() - assert.Equal(t, len(actualUids), 1) + assert.Equal(t, len(tc.expectedObjects), len(actualUids)) + actualObjs := taskContext.AppliedResources() + if !object.SetEquals(tc.expectedObjects, actualObjs) { + t.Errorf("expected applied objects (%v), got (%v)", tc.expectedObjects, actualObjs) + } }) } } diff --git a/pkg/apply/task/inv_set_task.go b/pkg/apply/task/inv_set_task.go index 2edbd88d..35e93362 100644 --- a/pkg/apply/task/inv_set_task.go +++ b/pkg/apply/task/inv_set_task.go @@ -15,10 +15,11 @@ import ( // InvSetTask encapsulates structures necessary to set the // inventory references at the end of the apply/prune. type InvSetTask struct { - TaskName string - InvClient inventory.InventoryClient - InvInfo inventory.InventoryInfo - DryRun common.DryRunStrategy + TaskName string + InvClient inventory.InventoryClient + InvInfo inventory.InventoryInfo + PrevInventory map[object.ObjMetadata]bool + DryRun common.DryRunStrategy } func (i *InvSetTask) Name() string { @@ -41,9 +42,21 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) { klog.V(2).Infoln("starting inventory replace task") appliedObjs := taskContext.AppliedResources() klog.V(4).Infof("set inventory %d applied objects", len(appliedObjs)) + // If an object failed to apply, but it was previously stored in + // the inventory, then keep it in the inventory so we don't lose + // track of it for next apply/prune. An object not found in the cluster + // is NOT stored as an apply failure (so it is properly removed from the inventory). + applyFailures := []object.ObjMetadata{} + for _, failure := range taskContext.ResourceFailures() { + if _, exists := i.PrevInventory[failure]; exists { + applyFailures = append(applyFailures, failure) + } + } + klog.V(4).Infof("keep in inventory %d applied failures", len(applyFailures)) pruneFailures := taskContext.PruneFailures() klog.V(4).Infof("set inventory %d prune failures", len(pruneFailures)) - invObjs := object.Union(appliedObjs, pruneFailures) + allApplyObjs := object.Union(appliedObjs, applyFailures) + invObjs := object.Union(allApplyObjs, pruneFailures) klog.V(4).Infof("set inventory %d total objects", len(invObjs)) err := i.InvClient.Replace(i.InvInfo, invObjs, i.DryRun) taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err} diff --git a/pkg/apply/task/inv_set_task_test.go b/pkg/apply/task/inv_set_task_test.go index ad1f39ef..c365ac09 100644 --- a/pkg/apply/task/inv_set_task_test.go +++ b/pkg/apply/task/inv_set_task_test.go @@ -20,6 +20,8 @@ func TestInvSetTask(t *testing.T) { tests := map[string]struct { appliedObjs []object.ObjMetadata + applyFailures []object.ObjMetadata + prevInventory []object.ObjMetadata pruneFailures []object.ObjMetadata expectedObjs []object.ObjMetadata }{ @@ -48,6 +50,55 @@ func TestInvSetTask(t *testing.T) { pruneFailures: []object.ObjMetadata{id2, id3}, expectedObjs: []object.ObjMetadata{id1, id2, id3}, }, + "no apply objs, no apply failures, no prune failures; no inventory": { + appliedObjs: []object.ObjMetadata{}, + applyFailures: []object.ObjMetadata{id3}, + prevInventory: []object.ObjMetadata{}, + pruneFailures: []object.ObjMetadata{}, + expectedObjs: []object.ObjMetadata{}, + }, + "one apply failure not in prev inventory; no inventory": { + appliedObjs: []object.ObjMetadata{}, + applyFailures: []object.ObjMetadata{id3}, + prevInventory: []object.ObjMetadata{}, + pruneFailures: []object.ObjMetadata{}, + expectedObjs: []object.ObjMetadata{}, + }, + "one apply obj, one apply failure not in prev inventory; one inventory": { + appliedObjs: []object.ObjMetadata{id2}, + applyFailures: []object.ObjMetadata{id3}, + prevInventory: []object.ObjMetadata{}, + pruneFailures: []object.ObjMetadata{}, + expectedObjs: []object.ObjMetadata{id2}, + }, + "one apply obj, one apply failure in prev inventory; one inventory": { + appliedObjs: []object.ObjMetadata{id2}, + applyFailures: []object.ObjMetadata{id3}, + prevInventory: []object.ObjMetadata{id3}, + pruneFailures: []object.ObjMetadata{}, + expectedObjs: []object.ObjMetadata{id2, id3}, + }, + "one apply obj, two apply failures with one in prev inventory; two inventory": { + appliedObjs: []object.ObjMetadata{id2}, + applyFailures: []object.ObjMetadata{id1, id3}, + prevInventory: []object.ObjMetadata{id3}, + pruneFailures: []object.ObjMetadata{}, + expectedObjs: []object.ObjMetadata{id2, id3}, + }, + "three apply failures with two in prev inventory; two inventory": { + appliedObjs: []object.ObjMetadata{}, + applyFailures: []object.ObjMetadata{id1, id2, id3}, + prevInventory: []object.ObjMetadata{id2, id3}, + pruneFailures: []object.ObjMetadata{}, + expectedObjs: []object.ObjMetadata{id2, id3}, + }, + "three apply failures with three in prev inventory; three inventory": { + appliedObjs: []object.ObjMetadata{}, + applyFailures: []object.ObjMetadata{id1, id2, id3}, + prevInventory: []object.ObjMetadata{id2, id3, id1}, + pruneFailures: []object.ObjMetadata{}, + expectedObjs: []object.ObjMetadata{id2, id1, id3}, + }, } for name, tc := range tests { @@ -55,14 +106,22 @@ func TestInvSetTask(t *testing.T) { client := inventory.NewFakeInventoryClient([]object.ObjMetadata{}) eventChannel := make(chan event.Event) context := taskrunner.NewTaskContext(eventChannel) + prevInventory := make(map[object.ObjMetadata]bool, len(tc.prevInventory)) + for _, prevInvID := range tc.prevInventory { + prevInventory[prevInvID] = true + } task := InvSetTask{ - TaskName: taskName, - InvClient: client, - InvInfo: nil, + TaskName: taskName, + InvClient: client, + InvInfo: nil, + PrevInventory: prevInventory, } for _, applyObj := range tc.appliedObjs { context.ResourceApplied(applyObj, "unusued-uid", int64(0)) } + for _, applyFailure := range tc.applyFailures { + context.CaptureResourceFailure(applyFailure) + } for _, pruneObj := range tc.pruneFailures { context.CapturePruneFailure(pruneObj) } diff --git a/pkg/apply/taskrunner/context.go b/pkg/apply/taskrunner/context.go index f8774330..184cf70c 100644 --- a/pkg/apply/taskrunner/context.go +++ b/pkg/apply/taskrunner/context.go @@ -108,6 +108,14 @@ func (tc *TaskContext) CaptureResourceFailure(id object.ObjMetadata) { tc.failedResources[id] = struct{}{} } +func (tc *TaskContext) ResourceFailures() []object.ObjMetadata { + failures := make([]object.ObjMetadata, 0, len(tc.failedResources)) + for f := range tc.failedResources { + failures = append(failures, f) + } + return failures +} + func (tc *TaskContext) CapturePruneFailure(id object.ObjMetadata) { tc.pruneFailures[id] = struct{}{} }