Skip to content

Commit

Permalink
Implements depends-on object sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
seans3 committed Jul 2, 2021
1 parent 56b08e9 commit c55d009
Show file tree
Hide file tree
Showing 2 changed files with 519 additions and 195 deletions.
111 changes: 49 additions & 62 deletions pkg/apply/solver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/graph"
)

const defaultWaitTimeout = 1 * time.Minute
Expand Down Expand Up @@ -100,7 +101,7 @@ func (t *TaskQueueBuilder) Build() *TaskQueue {
// AppendInvAddTask appends an inventory add task to the task queue.
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendInvAddTask(inv inventory.InventoryInfo, applyObjs []*unstructured.Unstructured) *TaskQueueBuilder {
klog.V(5).Infoln("adding inventory add task")
klog.V(2).Infoln("adding inventory add task")
t.tasks = append(t.tasks, &task.InvAddTask{
TaskName: fmt.Sprintf("inventory-add-%d", t.invAddCounter),
InvClient: t.InvClient,
Expand All @@ -114,7 +115,7 @@ func (t *TaskQueueBuilder) AppendInvAddTask(inv inventory.InventoryInfo, applyOb
// AppendInvAddTask appends an inventory set task to the task queue.
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendInvSetTask(inv inventory.InventoryInfo) *TaskQueueBuilder {
klog.V(5).Infoln("adding inventory set task")
klog.V(2).Infoln("adding inventory set task")
t.tasks = append(t.tasks, &task.InvSetTask{
TaskName: fmt.Sprintf("inventory-set-%d", t.invSetCounter),
InvClient: t.InvClient,
Expand All @@ -127,7 +128,7 @@ func (t *TaskQueueBuilder) AppendInvSetTask(inv inventory.InventoryInfo) *TaskQu
// AppendInvAddTask appends to the task queue a task to delete the inventory object.
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendDeleteInvTask(inv inventory.InventoryInfo) *TaskQueueBuilder {
klog.V(5).Infoln("adding delete inventory task")
klog.V(2).Infoln("adding delete inventory task")
t.tasks = append(t.tasks, &task.DeleteInvTask{
TaskName: fmt.Sprintf("delete-inventory-%d", t.deleteInvCounter),
InvClient: t.InvClient,
Expand All @@ -139,9 +140,9 @@ func (t *TaskQueueBuilder) AppendDeleteInvTask(inv inventory.InventoryInfo) *Tas

// AppendInvAddTask appends a task to the task queue to apply the passed objects
// to the cluster. Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendApplyTask(inv inventory.InventoryInfo, applyObjs []*unstructured.Unstructured,
crdSplitRes crdSplitResult, o Options) *TaskQueueBuilder {
klog.V(5).Infoln("adding apply task")
func (t *TaskQueueBuilder) AppendApplyTask(inv inventory.InventoryInfo,
applyObjs []*unstructured.Unstructured, o Options) *TaskQueueBuilder {
klog.V(2).Infof("adding apply task (%d objects)", len(applyObjs))
prevInvIds, _ := t.InvClient.GetClusterObjs(inv)
prevInventory := make(map[object.ObjMetadata]bool, len(prevInvIds))
for _, prevInvID := range prevInvIds {
Expand All @@ -150,7 +151,6 @@ func (t *TaskQueueBuilder) AppendApplyTask(inv inventory.InventoryInfo, applyObj
t.tasks = append(t.tasks, &task.ApplyTask{
TaskName: fmt.Sprintf("apply-%d", t.applyCounter),
Objects: applyObjs,
CRDs: crdSplitRes.crds,
PrevInventory: prevInventory,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
Expand All @@ -166,13 +166,14 @@ func (t *TaskQueueBuilder) AppendApplyTask(inv inventory.InventoryInfo, applyObj

// AppendInvAddTask appends a task to wait on the passed objects to the task queue.
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendWaitTask(waitIds []object.ObjMetadata) *TaskQueueBuilder {
klog.V(5).Infoln("adding wait task")
func (t *TaskQueueBuilder) AppendWaitTask(waitIds []object.ObjMetadata, condition taskrunner.Condition,
waitTimeout time.Duration) *TaskQueueBuilder {
klog.V(2).Infoln("adding wait task")
t.tasks = append(t.tasks, taskrunner.NewWaitTask(
fmt.Sprintf("wait-%d", t.waitCounter),
waitIds,
taskrunner.AllCurrent,
defaultWaitTimeout,
condition,
waitTimeout,
t.Mapper),
)
t.waitCounter += 1
Expand All @@ -183,7 +184,7 @@ func (t *TaskQueueBuilder) AppendWaitTask(waitIds []object.ObjMetadata) *TaskQue
// Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendPruneTask(pruneObjs []*unstructured.Unstructured,
pruneFilters []filter.ValidationFilter, o Options) *TaskQueueBuilder {
klog.V(5).Infoln("adding prune task")
klog.V(2).Infof("adding prune task (%d objects)", len(pruneObjs))
t.tasks = append(t.tasks,
&task.PruneTask{
TaskName: fmt.Sprintf("prune-%d", t.pruneCounter),
Expand All @@ -202,20 +203,19 @@ func (t *TaskQueueBuilder) AppendPruneTask(pruneObjs []*unstructured.Unstructure
// AppendApplyWaitTasks adds apply and wait tasks to the task queue,
// depending on build variables (like dry-run) and resource types
// (like CRD's). Returns a pointer to the Builder to chain function calls.
func (t *TaskQueueBuilder) AppendApplyWaitTasks(inv inventory.InventoryInfo, applyObjs []*unstructured.Unstructured, o Options) *TaskQueueBuilder {
crdSplitRes, hasCRDs := splitAfterCRDs(applyObjs)
if hasCRDs {
t.AppendApplyTask(inv, append(crdSplitRes.before, crdSplitRes.crds...), crdSplitRes, o)
if !o.DryRunStrategy.ClientOrServerDryRun() {
waitIds := object.UnstructuredsToObjMetasOrDie(crdSplitRes.crds)
t.AppendWaitTask(waitIds)
func (t *TaskQueueBuilder) AppendApplyWaitTasks(inv inventory.InventoryInfo,
applyObjs []*unstructured.Unstructured, o Options) *TaskQueueBuilder {
// Use the "depends-on" annotation to create a graph, ands sort the
// objects to apply into sets using a topological sort.
applySets := graph.SortObjs(applyObjs)
addWaitTask, waitTimeout := waitTaskTimeout(o.DryRunStrategy.ClientOrServerDryRun(),
len(applySets), o.ReconcileTimeout)
for _, applySet := range applySets {
t.AppendApplyTask(inv, applySet, o)
if addWaitTask {
applyIds := object.UnstructuredsToObjMetas(applySet)
t.AppendWaitTask(applyIds, taskrunner.AllCurrent, waitTimeout)
}
applyObjs = crdSplitRes.after
}
t.AppendApplyTask(inv, applyObjs, crdSplitRes, o)
if !o.DryRunStrategy.ClientOrServerDryRun() && o.ReconcileTimeout != time.Duration(0) {
waitIds := object.UnstructuredsToObjMetasOrDie(applyObjs)
t.AppendWaitTask(waitIds)
}
return t
}
Expand All @@ -226,47 +226,34 @@ func (t *TaskQueueBuilder) AppendApplyWaitTasks(inv inventory.InventoryInfo, app
func (t *TaskQueueBuilder) AppendPruneWaitTasks(pruneObjs []*unstructured.Unstructured,
pruneFilters []filter.ValidationFilter, o Options) *TaskQueueBuilder {
if o.Prune {
t.AppendPruneTask(pruneObjs, pruneFilters, o)
if !o.DryRunStrategy.ClientOrServerDryRun() && o.PruneTimeout != time.Duration(0) {
pruneIds := object.UnstructuredsToObjMetasOrDie(pruneObjs)
t.AppendWaitTask(pruneIds)
// Use the "depends-on" annotation to create a graph, ands sort the
// objects to prune into sets using a (reverse) topological sort.
pruneSets := graph.ReverseSortObjs(pruneObjs)
addWaitTask, waitTimeout := waitTaskTimeout(o.DryRunStrategy.ClientOrServerDryRun(),
len(pruneSets), o.ReconcileTimeout)
for _, pruneSet := range pruneSets {
t.AppendPruneTask(pruneSet, pruneFilters, o)
if addWaitTask {
pruneIds := object.UnstructuredsToObjMetas(pruneSet)
t.AppendWaitTask(pruneIds, taskrunner.AllNotFound, waitTimeout)
}
}
}
return t
}

type crdSplitResult struct {
before []*unstructured.Unstructured
after []*unstructured.Unstructured
crds []*unstructured.Unstructured
}

// splitAfterCRDs takes a sorted slice of infos and splits it into
// three parts; resources before CRDs, the CRDs themselves, and finally
// all the resources after the CRDs.
// The function returns the three different sets of resources and
// a boolean that tells whether there were any CRDs in the set of
// resources.
func splitAfterCRDs(objs []*unstructured.Unstructured) (crdSplitResult, bool) {
var before []*unstructured.Unstructured
var after []*unstructured.Unstructured

var crds []*unstructured.Unstructured
for _, obj := range objs {
if object.IsCRD(obj) {
crds = append(crds, obj)
continue
}

if len(crds) > 0 {
after = append(after, obj)
} else {
before = append(before, obj)
}
// waitTaskTimeout returns true if the wait task should be added to the task queue;
// false otherwise. If true, also returns the duration within wait task before timeout.
func waitTaskTimeout(dryRun bool, numObjSets int, reconcileTimeout time.Duration) (bool, time.Duration) {
var zeroTimeout = time.Duration(0)
if dryRun {
return false, zeroTimeout
}
if reconcileTimeout != zeroTimeout {
return true, reconcileTimeout
}
if numObjSets > 1 {
return true, defaultWaitTimeout
}
return crdSplitResult{
before: before,
after: after,
crds: crds,
}, len(crds) > 0
return false, zeroTimeout
}
Loading

0 comments on commit c55d009

Please sign in to comment.