Skip to content

Commit

Permalink
Moves apply failure inventory calculation into inventory task
Browse files Browse the repository at this point in the history
  • Loading branch information
seans3 committed Jul 27, 2021
1 parent 8ccd6e6 commit 2f13357
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 204 deletions.
20 changes: 10 additions & 10 deletions pkg/apply/solver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,17 @@ func (t *TaskQueueBuilder) AppendInvAddTask(inv inventory.InventoryInfo, applyOb
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendInvSetTask(inv inventory.InventoryInfo, dryRun common.DryRunStrategy) *TaskQueueBuilder {
klog.V(2).Infoln("adding inventory set task")
prevInvIds, _ := t.InvClient.GetClusterObjs(inv, dryRun)
prevInventory := make(map[object.ObjMetadata]bool, len(prevInvIds))
for _, prevInvID := range prevInvIds {
prevInventory[prevInvID] = true
}
t.tasks = append(t.tasks, &task.InvSetTask{
TaskName: fmt.Sprintf("inventory-set-%d", t.invSetCounter),
InvClient: t.InvClient,
InvInfo: inv,
DryRun: dryRun,
TaskName: fmt.Sprintf("inventory-set-%d", t.invSetCounter),
InvClient: t.InvClient,
InvInfo: inv,
PrevInventory: prevInventory,
DryRun: dryRun,
})
t.invSetCounter += 1
return t
Expand All @@ -147,15 +153,9 @@ func (t *TaskQueueBuilder) AppendDeleteInvTask(inv inventory.InventoryInfo, dryR
func (t *TaskQueueBuilder) AppendApplyTask(inv inventory.InventoryInfo,
applyObjs []*unstructured.Unstructured, o Options) *TaskQueueBuilder {
klog.V(2).Infof("adding apply task (%d objects)", len(applyObjs))
prevInvIds, _ := t.InvClient.GetClusterObjs(inv, o.DryRunStrategy)
prevInventory := make(map[object.ObjMetadata]bool, len(prevInvIds))
for _, prevInvID := range prevInvIds {
prevInventory[prevInvID] = true
}
t.tasks = append(t.tasks, &task.ApplyTask{
TaskName: fmt.Sprintf("apply-%d", t.applyCounter),
Objects: applyObjs,
PrevInventory: prevInventory,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
InfoHelper: t.InfoHelper,
Expand Down
58 changes: 9 additions & 49 deletions pkg/apply/task/apply_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ type applyOptions interface {
type ApplyTask struct {
TaskName string

Factory util.Factory
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
Objects []*unstructured.Unstructured
// Used for determining inventory during errors
PrevInventory map[object.ObjMetadata]bool
Factory util.Factory
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
Objects []*unstructured.Unstructured
DryRunStrategy common.DryRunStrategy
ServerSideOptions common.ServerSideOptions
InventoryPolicy inventory.InventoryPolicy
Expand Down Expand Up @@ -102,10 +100,6 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
a.sendTaskResult(taskContext)
return
}

klog.V(4).Infof("attempting to apply %d remaining objects", len(objects))
// invInfos stores the objects which should be stored in the final inventory.
invInfos := make(map[object.ObjMetadata]*resource.Info, len(objects))
for _, obj := range objects {
// Set the client and mapping fields on the provided
// info so they can be applied to the cluster.
Expand All @@ -121,28 +115,18 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
taskContext.CaptureResourceFailure(id)
continue
}

clusterObj, err := getClusterObj(dynamic, info)
if err != nil {
if !apierrors.IsNotFound(err) {
if klog.V(4).Enabled() {
klog.Errorf("error (%s) retrieving %s/%s from cluster--continue",
err, info.Namespace, info.Name)
}
if a.objInCluster(id) {
// Object in cluster stays in the inventory.
klog.V(4).Infof("%s/%s apply retrieval failure, but in cluster--keep in inventory",
info.Namespace, info.Name)
invInfos[id] = info
}
taskContext.EventChannel() <- createApplyFailedEvent(id, err)
taskContext.CaptureResourceFailure(id)
continue
}
}
// At this point the object was either 1) successfully retrieved from the cluster, or
// 2) returned "Not Found" error (meaning first-time creation). Add to final inventory.
invInfos[id] = info
canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy)
if !canApply {
klog.V(5).Infof("can not apply %s/%s--continue",
Expand Down Expand Up @@ -171,30 +155,16 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
if klog.V(4).Enabled() {
klog.Errorf("error applying (%s/%s) %s", info.Namespace, info.Name, err)
}
// If apply failed and the object is not in the cluster, remove
// it from the final inventory.
if !a.objInCluster(id) {
klog.V(5).Infof("not in cluster; removing apply fail object %s/%s from inventory",
info.Namespace, info.Name)
delete(invInfos, id)
}
taskContext.EventChannel() <- createApplyFailedEvent(id,
applyerror.NewApplyRunError(err))
taskContext.CaptureResourceFailure(id)
}
}

// Store objects (and some obj metadata) in the task context
// for the final inventory.
for id, info := range invInfos {
if info.Object != nil {
} else if info.Object != nil {
acc, err := meta.Accessor(info.Object)
if err != nil {
continue
if err == nil {
uid := acc.GetUID()
gen := acc.GetGeneration()
taskContext.ResourceApplied(id, uid, gen)
}
uid := acc.GetUID()
gen := acc.GetGeneration()
taskContext.ResourceApplied(id, uid, gen)
}
}
a.sendTaskResult(taskContext)
Expand Down Expand Up @@ -253,16 +223,6 @@ func (a *ApplyTask) sendTaskResult(taskContext *taskrunner.TaskContext) {
taskContext.TaskChannel() <- taskrunner.TaskResult{}
}

// objInCluster returns true if the passed object is in the slice of
// previous inventory, because an object in the previous inventory
// exists in the cluster.
func (a *ApplyTask) objInCluster(obj object.ObjMetadata) bool {
if _, found := a.PrevInventory[obj]; found {
return true
}
return false
}

// ClearTimeout is not supported by the ApplyTask.
func (a *ApplyTask) ClearTimeout() {}

Expand Down
148 changes: 11 additions & 137 deletions pkg/apply/task/apply_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -128,140 +127,6 @@ func TestApplyTask_BasicAppliedObjects(t *testing.T) {
}
}

// Checks the inventory stored in the task context applied
// resources is correct, given a retrieval error and
// a specific previous inventory. Also, an apply failure
// for an object in the previous inventory should remain
// in the inventory, while an apply failure that is not
// in the previous inventory (creation) should not be
// in the final inventory.
func TestApplyTask_ApplyFailuresAndInventory(t *testing.T) {
resInfo := resourceInfo{
group: "apps",
apiVersion: "apps/v1",
kind: "Deployment",
name: "foo",
namespace: "default",
uid: types.UID("my-uid"),
generation: int64(42),
}
resID, _ := object.CreateObjMetadata("default", "foo",
schema.GroupKind{Group: "apps", Kind: "Deployment"})
applyFailInfo := resourceInfo{
group: "apps",
apiVersion: "apps/v1",
kind: "Deployment",
name: "failure",
namespace: "default",
uid: types.UID("my-uid"),
generation: int64(42),
}
applyFailID, _ := object.CreateObjMetadata("default", "failure",
schema.GroupKind{Group: "apps", Kind: "Deployment"})

testCases := map[string]struct {
applied []resourceInfo
prevInventory []object.ObjMetadata
expected []object.ObjMetadata
err error
}{
"not found error with successful apply is in final inventory": {
applied: []resourceInfo{resInfo},
prevInventory: []object.ObjMetadata{},
expected: []object.ObjMetadata{resID},
err: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pod"}, "fake"),
},
"unknown error, but in previous inventory: object is in final inventory": {
applied: []resourceInfo{resInfo},
prevInventory: []object.ObjMetadata{resID},
expected: []object.ObjMetadata{resID},
err: apierrors.NewUnauthorized("not authorized"),
},
"unknown error, not in previous inventory: object is NOT in final inventory": {
applied: []resourceInfo{resInfo},
prevInventory: []object.ObjMetadata{},
expected: []object.ObjMetadata{},
err: apierrors.NewUnauthorized("not authorized"),
},
"apply failure, in previous inventory: object is in final inventory": {
applied: []resourceInfo{applyFailInfo},
prevInventory: []object.ObjMetadata{applyFailID},
expected: []object.ObjMetadata{applyFailID},
err: nil,
},
"apply failure, not in previous inventory: object is NOT in final inventory": {
applied: []resourceInfo{applyFailInfo},
prevInventory: []object.ObjMetadata{},
expected: []object.ObjMetadata{},
err: nil,
},
}

for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
eventChannel := make(chan event.Event)
taskContext := taskrunner.NewTaskContext(eventChannel)

objs := toUnstructureds(tc.applied)

oldAO := applyOptionsFactoryFunc
applyOptionsFactoryFunc = func(chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, dynamic.Interface, error) {
return &fakeApplyOptions{}, nil, nil
}
defer func() { applyOptionsFactoryFunc = oldAO }()

restMapper := testutil.NewFakeRESTMapper(schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Deployment",
})

prevInv := map[object.ObjMetadata]bool{}
for _, id := range tc.prevInventory {
prevInv[id] = true
}
applyTask := &ApplyTask{
Objects: objs,
PrevInventory: prevInv,
Mapper: restMapper,
InfoHelper: &fakeInfoHelper{},
InvInfo: &fakeInventoryInfo{},
}

getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) {
return objs[0], nil
}
if tc.err != nil {
getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) {
return nil, tc.err
}
}

var events []event.Event
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for msg := range eventChannel {
events = append(events, msg)
}
}()

applyTask.Start(taskContext)
<-taskContext.TaskChannel()
close(eventChannel)
wg.Wait()

// The applied resources should be stored in the TaskContext
// for the final inventory.
actual := taskContext.AppliedResources()
if !object.SetEquals(tc.expected, actual) {
t.Errorf("expected (%s) inventory resources, got (%s)", tc.expected, actual)
}
})
}
}

func TestApplyTask_FetchGeneration(t *testing.T) {
testCases := map[string]struct {
rss []resourceInfo
Expand Down Expand Up @@ -321,7 +186,12 @@ func TestApplyTask_FetchGeneration(t *testing.T) {
}

getClusterObj = func(d dynamic.Interface, info *resource.Info) (*unstructured.Unstructured, error) {
return objs[0], nil
for _, obj := range objs {
if info.Name == obj.GetName() && info.Namespace == obj.GetNamespace() {
return obj, nil
}
}
return nil, nil
}
applyTask.Start(taskContext)

Expand Down Expand Up @@ -801,7 +671,11 @@ func TestApplyTaskWithDifferentInventoryAnnotation(t *testing.T) {
assert.Equal(t, tc.expectedEvents[i].ApplyEvent.Error.Error(), e.ApplyEvent.Error.Error())
}
actualUids := taskContext.AppliedResourceUIDs()
assert.Equal(t, len(actualUids), 1)
assert.Equal(t, len(tc.expectedObjects), len(actualUids))
actualObjs := taskContext.AppliedResources()
if !object.SetEquals(tc.expectedObjects, actualObjs) {
t.Errorf("expected applied objects (%v), got (%v)", tc.expectedObjects, actualObjs)
}
})
}
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/apply/task/inv_set_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
// InvSetTask encapsulates structures necessary to set the
// inventory references at the end of the apply/prune.
type InvSetTask struct {
TaskName string
InvClient inventory.InventoryClient
InvInfo inventory.InventoryInfo
DryRun common.DryRunStrategy
TaskName string
InvClient inventory.InventoryClient
InvInfo inventory.InventoryInfo
PrevInventory map[object.ObjMetadata]bool
DryRun common.DryRunStrategy
}

func (i *InvSetTask) Name() string {
Expand All @@ -41,9 +42,21 @@ func (i *InvSetTask) Start(taskContext *taskrunner.TaskContext) {
klog.V(2).Infoln("starting inventory replace task")
appliedObjs := taskContext.AppliedResources()
klog.V(4).Infof("set inventory %d applied objects", len(appliedObjs))
// If an object failed to apply, but it was previously stored in
// the inventory, then keep it in the inventory so we don't lose
// track of it for next apply/prune. An object not found in the cluster
// is NOT stored as an apply failure (so it is properly removed from the inventory).
applyFailures := []object.ObjMetadata{}
for _, failure := range taskContext.ResourceFailures() {
if _, exists := i.PrevInventory[failure]; exists {
applyFailures = append(applyFailures, failure)
}
}
klog.V(4).Infof("keep in inventory %d applied failures", len(applyFailures))
pruneFailures := taskContext.PruneFailures()
klog.V(4).Infof("set inventory %d prune failures", len(pruneFailures))
invObjs := object.Union(appliedObjs, pruneFailures)
allApplyObjs := object.Union(appliedObjs, applyFailures)
invObjs := object.Union(allApplyObjs, pruneFailures)
klog.V(4).Infof("set inventory %d total objects", len(invObjs))
err := i.InvClient.Replace(i.InvInfo, invObjs, i.DryRun)
taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err}
Expand Down
Loading

0 comments on commit 2f13357

Please sign in to comment.