Skip to content

Commit

Permalink
feat: enforce revisionHistoryLimit for resource snapshots (#426)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiying-lin authored Jul 14, 2023
1 parent a5d7500 commit 3f37a3a
Show file tree
Hide file tree
Showing 4 changed files with 707 additions and 106 deletions.
216 changes: 167 additions & 49 deletions pkg/controllers/clusterresourceplacement/controller.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

// Package clusterresourceplacement features a controller to reconcile the clusterResourcePlacement changes.
package clusterresourceplacement

import (
Expand All @@ -17,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/controller"
)

Expand Down Expand Up @@ -86,7 +93,18 @@ func (r *Reconciler) deleteClusterResourceSnapshots(ctx context.Context, crp *fl
// clusterSchedulingPolicySnapshot status and work status.
// If the error type is ErrUnexpectedBehavior, the controller will skip the reconciling.
func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (ctrl.Result, error) {
_, err := r.getOrCreateClusterSchedulingPolicySnapshot(ctx, crp)
revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue
if crp.Spec.RevisionHistoryLimit != nil {
revisionLimit = *crp.Spec.RevisionHistoryLimit
if revisionLimit <= 0 {
err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crp.Name, revisionLimit)
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Invalid revisionHistoryLimit value and using default value instead", "clusterResourcePlacement", klog.KObj(crp))
// use the default value instead
revisionLimit = fleetv1beta1.RevisionHistoryLimitDefaultValue
}
}

_, err := r.getOrCreateClusterSchedulingPolicySnapshot(ctx, crp, int(revisionLimit))
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -97,7 +115,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
resourceSnapshotSpec := fleetv1beta1.ResourceSnapshotSpec{
SelectedResources: selectedResources,
}
_, err = r.getOrCreateClusterResourceSnapshot(ctx, crp, &resourceSnapshotSpec)
_, err = r.getOrCreateClusterResourceSnapshot(ctx, crp, &resourceSnapshotSpec, int(revisionLimit))
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -107,11 +125,13 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
return ctrl.Result{}, nil
}

func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, error) {
func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, revisionHistoryLimit int) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, error) {
crpKObj := klog.KObj(crp)
schedulingPolicy := *crp.Spec.Policy // will exclude the numberOfClusters
schedulingPolicy.NumberOfClusters = nil
policyHash, err := generatePolicyHash(&schedulingPolicy)
schedulingPolicy := crp.Spec.Policy.DeepCopy()
if schedulingPolicy != nil {
schedulingPolicy.NumberOfClusters = nil // will exclude the numberOfClusters
}
policyHash, err := generatePolicyHash(schedulingPolicy)
if err != nil {
klog.ErrorS(err, "Failed to generate policy hash of crp", "clusterResourcePlacement", crpKObj)
return nil, controller.NewUnexpectedBehaviorError(err)
Expand Down Expand Up @@ -145,7 +165,7 @@ func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Cont

// delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots
// won't exceed the limit.
if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp); err != nil {
if err := r.deleteRedundantSchedulingPolicySnapshots(ctx, crp, revisionHistoryLimit); err != nil {
return nil, err
}

Expand All @@ -161,7 +181,7 @@ func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Cont
},
},
Spec: fleetv1beta1.SchedulingPolicySnapshotSpec{
Policy: &schedulingPolicy,
Policy: schedulingPolicy,
PolicyHash: []byte(policyHash),
},
}
Expand All @@ -187,38 +207,83 @@ func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Cont
return latestPolicySnapshot, nil
}

func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) error {
func (r *Reconciler) deleteRedundantSchedulingPolicySnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, revisionHistoryLimit int) error {
sortedList, err := r.listSortedClusterSchedulingPolicySnapshots(ctx, crp)
if err != nil {
return err
}
if len(sortedList.Items) < revisionHistoryLimit {
return nil
}

crpKObj := klog.KObj(crp)
// respect the revisionHistoryLimit field
revisionLimit := fleetv1beta1.RevisionHistoryLimitDefaultValue
if crp.Spec.RevisionHistoryLimit != nil {
revisionLimit = *crp.Spec.RevisionHistoryLimit
if revisionLimit <= 0 {
err := fmt.Errorf("invalid clusterResourcePlacement %s: invalid revisionHistoryLimit %d", crpKObj, revisionLimit)
klog.ErrorS(controller.NewExpectedBehaviorError(err), "Invalid revisionHistoryLimit value and using default value instead", "clusterResourcePlacement", crpKObj)
// use the default value instead
revisionLimit = fleetv1beta1.RevisionHistoryLimitDefaultValue
if len(sortedList.Items)-revisionHistoryLimit > 0 {
// We always delete before creating a new snapshot, the snapshot size should never exceed the limit as there is
// no finalizer added and object should be deleted immediately.
klog.Warningf("The number of clusterSchedulingPolicySnapshots exceeds the revisionHistoryLimit and it should never happen", "clusterResourcePlacement", klog.KObj(crp), "numberOfSnapshots", len(sortedList.Items), "revisionHistoryLimit", revisionHistoryLimit)
}

// In normal situation, The max of len(sortedList) should be revisionHistoryLimit.
// We just need to delete one policySnapshot before creating a new one.
// As a result of defensive programming, it will delete any redundant snapshots which could be more than one.
for i := 0; i <= len(sortedList.Items)-revisionHistoryLimit; i++ { // need to reserve one slot for the new snapshot
if err := r.Client.Delete(ctx, &sortedList.Items[i]); err != nil && !errors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete clusterSchedulingPolicySnapshot", "clusterResourcePlacement", klog.KObj(crp), "clusterSchedulingPolicySnapshot", klog.KObj(&sortedList.Items[i]))
return controller.NewAPIServerError(false, err)
}
}
if len(sortedList.Items) < int(revisionLimit) {
return nil
}

// deleteRedundantResourceSnapshots handles multiple snapshots in a group.
func (r *Reconciler) deleteRedundantResourceSnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, revisionHistoryLimit int) error {
sortedList, err := r.listSortedResourceSnapshots(ctx, crp)
if err != nil {
return err
}

if len(sortedList.Items) < revisionHistoryLimit {
// If the number of existing snapshots is less than the limit no matter how many snapshots in a group, we don't
// need to delete any snapshots.
// Skip the checking and deleting.
return nil
}
for i := 0; i <= len(sortedList.Items)-int(revisionLimit); i++ {

crpKObj := klog.KObj(crp)
lastGroupIndex := -1
groupCounter := 0

// delete the snapshots from the end as there are could be multiple snapshots in a group in order to keep the latest
// snapshots from the end.
for i := len(sortedList.Items) - 1; i >= 0; i-- {
snapshotKObj := klog.KObj(&sortedList.Items[i])
ii, err := parseResourceIndexFromLabel(&sortedList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", snapshotKObj)
return controller.NewUnexpectedBehaviorError(err)
}
if ii != lastGroupIndex {
groupCounter++
lastGroupIndex = ii
}
if groupCounter < revisionHistoryLimit { // need to reserve one slot for the new snapshot
// When the number of group is less than the revision limit, skipping deleting the snapshot.
continue
}
if err := r.Client.Delete(ctx, &sortedList.Items[i]); err != nil && !errors.IsNotFound(err) {
klog.ErrorS(err, "Failed to delete clusterSchedulingPolicySnapshot", "clusterResourcePlacement", crpKObj, "clusterSchedulingPolicySnapshot", klog.KObj(&sortedList.Items[i]))
klog.ErrorS(err, "Failed to delete clusterResourceSnapshot", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", snapshotKObj)
return controller.NewAPIServerError(false, err)
}
}
if groupCounter-revisionHistoryLimit > 0 {
// We always delete before creating a new snapshot, the snapshot group size should never exceed the limit
// as there is no finalizer added and the object should be deleted immediately.
klog.Warningf("The number of clusterResourceSnapshot groups exceeds the revisionHistoryLimit and it should never happen", "clusterResourcePlacement", klog.KObj(crp), "numberOfSnapshotGroups", groupCounter, "revisionHistoryLimit", revisionHistoryLimit)
}
return nil
}

// TODO handle all the resources selected by placement larger than 1MB size limit of k8s objects.
func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec) (*fleetv1beta1.ClusterResourceSnapshot, error) {
func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec, revisionHistoryLimit int) (*fleetv1beta1.ClusterResourceSnapshot, error) {
resourceHash, err := generateResourceHash(resourceSnapshotSpec)
if err != nil {
klog.ErrorS(err, "Failed to generate resource hash of crp", "clusterResourcePlacement", klog.KObj(crp))
Expand Down Expand Up @@ -259,6 +324,11 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
return nil, controller.NewAPIServerError(false, err)
}
}
// delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots
// won't exceed the limit.
if err := r.deleteRedundantResourceSnapshots(ctx, crp, revisionHistoryLimit); err != nil {
return nil, err
}

// create a new resource snapshot
latestResourceSnapshotIndex++
Expand All @@ -272,6 +342,8 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
},
Annotations: map[string]string{
fleetv1beta1.ResourceGroupHashAnnotation: resourceHash,
// TODO need to updated once we support multiple snapshots
fleetv1beta1.NumberOfResourceSnapshotsAnnotation: "1",
},
},
Spec: *resourceSnapshotSpec,
Expand Down Expand Up @@ -307,7 +379,7 @@ func (r *Reconciler) ensureLatestPolicySnapshot(ctx context.Context, crp *fleetv
if crp.Spec.Policy != nil &&
crp.Spec.Policy.PlacementType == fleetv1beta1.PickNPlacementType &&
crp.Spec.Policy.NumberOfClusters != nil {
oldCount, err := parseNumberOfClustersFromAnnotation(latest)
oldCount, err := utils.ExtractNumOfClustersFromPolicySnapshot(latest)
if err != nil {
klog.ErrorS(err, "Failed to parse the numberOfClusterAnnotation", "clusterSchedulingPolicySnapshot", klog.KObj(latest))
return controller.NewUnexpectedBehaviorError(err)
Expand Down Expand Up @@ -463,29 +535,85 @@ func (r *Reconciler) lookupLatestResourceSnapshot(ctx context.Context, crp *flee
klog.ErrorS(err, "Invalid clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
// When there are no active snapshots, find the one who has the largest resource index.
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, -1, controller.NewAPIServerError(false, err)
// When there are no active snapshots, find the first snapshot who has the largest resource index.
// It should be rare only when CRP is crashed before creating the new active snapshot.
sortedList, err := r.listSortedResourceSnapshots(ctx, crp)
if err != nil {
return nil, -1, err
}
if len(snapshotList.Items) == 0 {
if len(sortedList.Items) == 0 {
// The resource index of the first snapshot will start from 0.
return nil, -1, nil
}
index := -1 // the index of the cluster resource snapshot array
lastResourceIndex := -1 // the assigned resource index of the cluster resource snapshot
for i := range snapshotList.Items {
resourceIndex, err := parseResourceIndexFromLabel(&snapshotList.Items[i])
latestSnapshot := &sortedList.Items[len(sortedList.Items)-1]
resourceIndex, err := parseResourceIndexFromLabel(latestSnapshot)
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", klog.KObj(latestSnapshot))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
}
return latestSnapshot, resourceIndex, nil
}

// listSortedResourceSnapshots returns the resource snapshots sorted by its index and its subindex.
// The resourceSnapshot is less than the other one when resourceIndex is less.
// When the resourceIndex is equal, then order by the subindex.
// Note: the snapshot does not have subindex is the largest of a group and there should be only one in a group.
func (r *Reconciler) listSortedResourceSnapshots(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement) (*fleetv1beta1.ClusterResourceSnapshotList, error) {
snapshotList := &fleetv1beta1.ClusterResourceSnapshotList{}
crpKObj := klog.KObj(crp)
if err := r.Client.List(ctx, snapshotList, client.MatchingLabels{fleetv1beta1.CRPTrackingLabel: crp.Name}); err != nil {
klog.ErrorS(err, "Failed to list all clusterResourceSnapshots", "clusterResourcePlacement", crpKObj)
return nil, controller.NewAPIServerError(false, err)
}
var errs []error
sort.Slice(snapshotList.Items, func(i, j int) bool {
iKObj := klog.KObj(&snapshotList.Items[i])
jKObj := klog.KObj(&snapshotList.Items[j])
ii, err := parseResourceIndexFromLabel(&snapshotList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourceSnapshot", klog.KObj(&snapshotList.Items[i]))
return nil, -1, controller.NewUnexpectedBehaviorError(err)
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", iKObj)
errs = append(errs, err)
}
ji, err := parseResourceIndexFromLabel(&snapshotList.Items[j])
if err != nil {
klog.ErrorS(err, "Failed to parse the resource index label", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", jKObj)
errs = append(errs, err)
}
if lastResourceIndex < resourceIndex {
index = i
lastResourceIndex = resourceIndex
if ii != ji {
return ii < ji
}

iDoesExist, iSubindex, err := utils.ExtractSubindexFromClusterResourceSnapshot(&snapshotList.Items[i])
if err != nil {
klog.ErrorS(err, "Failed to parse the subindex index", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", iKObj)
errs = append(errs, err)
}
jDoesExist, jSubindex, err := utils.ExtractSubindexFromClusterResourceSnapshot(&snapshotList.Items[j])
if err != nil {
klog.ErrorS(err, "Failed to parse the subindex index", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", jKObj)
errs = append(errs, err)
}

// Both of the snapshots do not have subindex, which should not happen.
if !iDoesExist && !jDoesExist {
klog.ErrorS(err, "There are more than one resource snapshot which do not have subindex in a group", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", iKObj, "clusterResourceSnapshot", jKObj)
errs = append(errs, err)
}

if !iDoesExist { // check if it's the first snapshot
return false
}
if !jDoesExist { // check if it's the first snapshot
return true
}
return iSubindex < jSubindex
})

if len(errs) > 0 {
return nil, controller.NewUnexpectedBehaviorError(utilerrors.NewAggregate(errs))
}
return &snapshotList.Items[index], lastResourceIndex, nil

return snapshotList, nil
}

// parsePolicyIndexFromLabel returns error when parsing the label which should never return error in production.
Expand All @@ -508,16 +636,6 @@ func parseResourceIndexFromLabel(s *fleetv1beta1.ClusterResourceSnapshot) (int,
return v, nil
}

// parseNumberOfClustersFromAnnotation returns error when parsing the annotation which should never return error in production.
func parseNumberOfClustersFromAnnotation(s *fleetv1beta1.ClusterSchedulingPolicySnapshot) (int, error) {
n := s.Annotations[fleetv1beta1.NumberOfClustersAnnotation]
v, err := strconv.Atoi(n)
if err != nil || v < 0 {
return -1, fmt.Errorf("invalid numberOfCluster %q, error: %w", n, err)
}
return v, nil
}

func generatePolicyHash(policy *fleetv1beta1.PlacementPolicy) (string, error) {
jsonBytes, err := json.Marshal(policy)
if err != nil {
Expand Down
Loading

0 comments on commit 3f37a3a

Please sign in to comment.