Skip to content

Commit

Permalink
Add a member resources clean up controller
Browse files Browse the repository at this point in the history
1. Modify the stale controller to check MemberClusterAnnounce periodically
in the leader cluster and delete the stale CR if its last update timestamp
is over 24 hours.
2. Add a new member resources clean up controller in the leader that will
clean up all corresponding ResourceExports when a MemberClusterAnnounce
is deleted.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Aug 21, 2023
1 parent c421847 commit acf1846
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import (

//+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha2-clusterset,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=clustersets,verbs=create;update,versions=v1alpha2,name=vclusterset.kb.io,admissionReviewVersions={v1,v1beta1}

const (
mcControllerSAName = "antrea-mc-controller"
)

// ClusterSet validator
type clusterSetValidator struct {
Client client.Client
Expand Down
3 changes: 2 additions & 1 deletion multicluster/cmd/multicluster-controller/gateway_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
)

const (
antreaAgentSAName = "antrea-agent"
antreaAgentSAName = "antrea-agent"
mcControllerSAName = "antrea-mc-controller"
)

//+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha1-gateway,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=gateways,verbs=create;update,versions=v1alpha1,name=vgateway.kb.io,admissionReviewVersions={v1,v1beta1}
Expand Down
7 changes: 7 additions & 0 deletions multicluster/cmd/multicluster-controller/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func runLeader(o *Options) error {
return fmt.Errorf("error creating ResourceExport webhook: %v", err)
}

memberClusterCleanupController := leader.NewMemberResourcesCleanupController(
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
)
go memberClusterCleanupController.SetupWithManager(mgr, stopCh)

staleController := multiclustercontrollers.NewStaleResCleanupController(
mgr.GetClient(),
mgr.GetScheme(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
Copyright 2023 Antrea Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package leader

import (
"context"
"fmt"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
)

// MemberResourcesCleanupController will remove all ResourceExports belongs to a member
// cluster when the corresponding MemberClusterAnnounce CR is deleted. It will also try
// to clean up all stale ResourceExports during start.
type MemberResourcesCleanupController struct {
client.Client
Scheme *runtime.Scheme
namespace string
}

func NewMemberResourcesCleanupController(Client client.Client,
Scheme *runtime.Scheme,
namespace string) *MemberResourcesCleanupController {
controller := MemberResourcesCleanupController{
Client: Client,
Scheme: Scheme,
namespace: namespace,
}
return &controller
}

func (c *MemberResourcesCleanupController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
memberAnnounce := &mcv1alpha1.MemberClusterAnnounce{}
err := c.Get(ctx, req.NamespacedName, memberAnnounce)
if err != nil {
// Clean up all corresponding ResourceExports when the member cluster's MemberClusterAnnounce is deleted.
if apierrors.IsNotFound(err) {
clusterID := getClusterIDFromName(req.Name)
allResExports, err := c.getAllResourceExports(ctx)
if err != nil {
klog.V(2).ErrorS(err, "Failed to get ResourceExports in the Namespace", "namespace", c.namespace)
return ctrl.Result{}, err
}
cleanupSucceed := c.cleanupResources(ctx, clusterID, allResExports)
if !cleanupSucceed {
return ctrl.Result{}, fmt.Errorf("failed to clean up all stale ResourceExports for the member cluster %s, retry later", clusterID)
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

func (c *MemberResourcesCleanupController) getAllResourceExports(ctx context.Context) ([]mcv1alpha1.ResourceExport, error) {
resourceExports := &mcv1alpha1.ResourceExportList{}
err := c.Client.List(ctx, resourceExports, &client.ListOptions{Namespace: c.namespace})
if err != nil {
return nil, err
}
return resourceExports.Items, nil
}

func (c *MemberResourcesCleanupController) checkAndCleanup(ctx context.Context) {
validMemberClusterIDs, err := c.getValidMemberClusterIDs(ctx)
if err != nil {
klog.ErrorS(err, "Failed to get valid member cluster's ClusterID in the Namespace", "namespace", c.namespace)
return
}

allResExports, err := c.getAllResourceExports(ctx)
if err != nil {
klog.ErrorS(err, "Failed to get ResourceExports in the Namespace", "namespace", c.namespace)
return
}

allMemberClusterIDs, err := c.getAllMemberClusterIDs(ctx, allResExports)
if err != nil {
klog.ErrorS(err, "Failed to get all member cluster's ClusterID in the Namespace", "namespace", c.namespace)
return
}

staleMemberClusterIDs := allMemberClusterIDs.Difference(validMemberClusterIDs)
for clusterID := range staleMemberClusterIDs {
cleanupSucceed := c.cleanupResources(ctx, clusterID, allResExports)
if cleanupSucceed {
klog.InfoS("The member cluster's stale resources are cleaned up", "clusterID", clusterID)
} else {
klog.Error("Failed to clean up the member cluster's stale resources", "clusterID", clusterID)
}
}
}

func (c *MemberResourcesCleanupController) getAllMemberClusterIDs(ctx context.Context, resourceExports []mcv1alpha1.ResourceExport) (sets.Set[string], error) {
allMemberClusterIDs := sets.Set[string]{}
for _, resourceExport := range resourceExports {
if resourceExport.Spec.ClusterID != "" {
allMemberClusterIDs.Insert(resourceExport.Spec.ClusterID)
}
}
return allMemberClusterIDs, nil
}

func (c *MemberResourcesCleanupController) getValidMemberClusterIDs(ctx context.Context) (sets.Set[string], error) {
validMemberClusterIDs := sets.Set[string]{}
memberClusterAnnounces := &mcv1alpha1.MemberClusterAnnounceList{}
err := c.Client.List(ctx, memberClusterAnnounces, &client.ListOptions{Namespace: c.namespace})
if err != nil {
return nil, err
}

for _, m := range memberClusterAnnounces.Items {
validMemberClusterIDs.Insert(m.ClusterID)
}
return validMemberClusterIDs, nil
}

func (c *MemberResourcesCleanupController) cleanupResources(ctx context.Context, clusterID string, resouceExports []mcv1alpha1.ResourceExport) bool {
cleanupSucceed := true
for _, resourceExport := range resouceExports {
tmpResExp := resourceExport
if resourceExport.DeletionTimestamp.IsZero() && tmpResExp.Spec.ClusterID == clusterID {
klog.V(2).InfoS("Clean up the stale ResourceExport from the member cluster", "resourceexport", klog.KObj(&tmpResExp), "clusterID", clusterID)
err := c.Client.Delete(ctx, &tmpResExp, &client.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
cleanupSucceed = false
}
}
}
return cleanupSucceed
}

// Only register this controller to reconcile MemberClusterAnnounces in the same Namespace
func (c *MemberResourcesCleanupController) namespaceFilter(object client.Object) bool {
if mca, ok := object.(*mcv1alpha1.MemberClusterAnnounce); ok {
return mca.Namespace == c.namespace
}
return false
}

// SetupWithManager sets up the controller with the Manager.
func (c *MemberResourcesCleanupController) SetupWithManager(mgr ctrl.Manager, stopCh <-chan struct{}) error {
// Try to clean up all stale resources before start.
ctx, _ := wait.ContextForChannel(stopCh)
c.checkAndCleanup(ctx)

namespacePredicate := predicate.NewPredicateFuncs(c.namespaceFilter)
return ctrl.NewControllerManagedBy(mgr).
For(&mcv1alpha1.MemberClusterAnnounce{}).
WithEventFilter(namespacePredicate).
WithOptions(controller.Options{
MaxConcurrentReconciles: 1,
}).
Complete(c)
}

func getClusterIDFromName(name string) string {
return strings.TrimPrefix(name, "member-announce-from-")
}
Loading

0 comments on commit acf1846

Please sign in to comment.