From cfff91473d4081dcbd47bfc7f6cf272ac84d301e Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Thu, 19 Oct 2023 16:07:36 +0800 Subject: [PATCH] address comments --- .../cmd/multicluster-controller/leader.go | 6 +- .../cmd/multicluster-controller/member.go | 10 +- .../multicluster/common/cleanup.go | 172 ----------- .../multicluster/common/cleanup_test.go | 187 ------------ .../controllers/multicluster/common/helper.go | 11 + .../leader/clusterset_controller.go | 31 +- .../leader/clusterset_controller_test.go | 31 -- .../memberclusterannounce_controller.go | 6 - .../multicluster/leader/stale_controller.go | 187 +++++------- .../leader/stale_controller_test.go | 48 +-- .../member/clusterset_controller.go | 132 ++++---- .../member/clusterset_controller_test.go | 22 +- .../member/gateway_controller_test.go | 2 +- .../member/labelidentity_controller_test.go | 4 +- .../multicluster/member/node_controller.go | 3 +- .../member/resourceimport_controller_test.go | 7 +- .../member/serviceexport_controller_test.go | 8 +- .../multicluster/member/stale_controller.go | 283 ++++++++++-------- .../member/stale_controller_test.go | 214 +++++++++---- multicluster/test/integration/suite_test.go | 5 +- pkg/config/controller/config.go | 2 +- 21 files changed, 540 insertions(+), 831 deletions(-) delete mode 100644 multicluster/controllers/multicluster/common/cleanup.go delete mode 100644 multicluster/controllers/multicluster/common/cleanup_test.go diff --git a/multicluster/cmd/multicluster-controller/leader.go b/multicluster/cmd/multicluster-controller/leader.go index e115cb95450..83b80d2acc8 100644 --- a/multicluster/cmd/multicluster-controller/leader.go +++ b/multicluster/cmd/multicluster-controller/leader.go @@ -113,16 +113,14 @@ func runLeader(o *Options) error { return fmt.Errorf("error creating ResourceExport webhook: %v", err) } - staleController := leader.NewLeaderStaleResCleanupController( + staleController := leader.NewStaleResCleanupController( mgr.GetClient(), mgr.GetScheme(), - env.GetPodNamespace(), ) if err = staleController.SetupWithManager(mgr, stopCh); err != nil { - return fmt.Errorf("error creating LeaderStaleResCleanupController: %v", err) + return fmt.Errorf("error creating StaleResCleanupController: %v", err) } go staleController.RunPeriodically(stopCh) - go staleController.RunOnce(stopCh) klog.InfoS("Leader MC Controller Starting Manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/multicluster/cmd/multicluster-controller/member.go b/multicluster/cmd/multicluster-controller/member.go index edc8965ed10..896ded60020 100644 --- a/multicluster/cmd/multicluster-controller/member.go +++ b/multicluster/cmd/multicluster-controller/member.go @@ -69,11 +69,13 @@ func runMember(o *Options) error { role: memberRole}, }) + commonAreaCreationCh := make(chan struct{}) clusterSetReconciler := member.NewMemberClusterSetReconciler(mgr.GetClient(), mgr.GetScheme(), env.GetPodNamespace(), o.EnableStretchedNetworkPolicy, o.ClusterCalimCRDAvailable, + commonAreaCreationCh, ) if err = clusterSetReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating ClusterSet controller: %v", err) @@ -121,16 +123,16 @@ func runMember(o *Options) error { return fmt.Errorf("error creating Node controller: %v", err) } - staleController := member.NewMemberStaleResCleanupController( + staleController := member.NewStaleResCleanupController( mgr.GetClient(), mgr.GetScheme(), + commonAreaCreationCh, env.GetPodNamespace(), commonAreaGetter, ) - if err = staleController.SetupWithManager(mgr); err != nil { - return fmt.Errorf("error creating MemberStaleResCleanupController: %v", err) - } + go staleController.Run(stopCh) + // Member runs ResourceImportReconciler from RemoteCommonArea only klog.InfoS("Member MC Controller Starting Manager") diff --git a/multicluster/controllers/multicluster/common/cleanup.go b/multicluster/controllers/multicluster/common/cleanup.go deleted file mode 100644 index bef4df1e3e3..00000000000 --- a/multicluster/controllers/multicluster/common/cleanup.go +++ /dev/null @@ -1,172 +0,0 @@ -/* -Copyright 2023 Antrea Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package common - -import ( - "context" - "reflect" - - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" - k8smcv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" - - mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" - mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" - crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" -) - -var DeleteEventPredicate = predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return false - }, - UpdateFunc: func(e event.UpdateEvent) bool { - return false - }, -} - -var StatusReadyPredicate = func(e event.UpdateEvent) bool { - if e.ObjectOld == nil || e.ObjectNew == nil { - return false - } - oldClusterSet := e.ObjectOld.(*mcv1alpha2.ClusterSet) - newClusterSet := e.ObjectNew.(*mcv1alpha2.ClusterSet) - if !reflect.DeepEqual(oldClusterSet.Status.Conditions, newClusterSet.Status.Conditions) { - oldConditionSize := len(oldClusterSet.Status.Conditions) - newConditionSize := len(newClusterSet.Status.Conditions) - if oldConditionSize == 0 && newConditionSize > 0 && newClusterSet.Status.Conditions[0].Status == corev1.ConditionTrue { - return true - } - if oldConditionSize > 0 && newConditionSize > 0 && - (oldClusterSet.Status.Conditions[0].Status == corev1.ConditionFalse || oldClusterSet.Status.Conditions[0].Status == corev1.ConditionUnknown) && - newClusterSet.Status.Conditions[0].Status == corev1.ConditionTrue { - return true - } - } - return false -} - -func CleanUpResourcesCreatedByMC(ctx context.Context, mgrClient client.Client) error { - var err error - if err = cleanUpMCServicesAndServiceImports(ctx, mgrClient); err != nil { - return err - } - if err = cleanUpReplicatedACNPs(ctx, mgrClient); err != nil { - return err - } - if err = cleanUpLabelIdentities(ctx, mgrClient); err != nil { - return err - } - if err = cleanUpClusterInfoImports(ctx, mgrClient); err != nil { - return err - } - if err = cleanUpGateways(ctx, mgrClient); err != nil { - return err - } - return nil -} - -func cleanUpMCServicesAndServiceImports(ctx context.Context, mgrClient client.Client) error { - svcImpList := &k8smcv1alpha1.ServiceImportList{} - err := mgrClient.List(ctx, svcImpList, &client.ListOptions{}) - if err != nil { - return err - } - for _, svcImp := range svcImpList.Items { - svcImpTmp := svcImp - mcsvc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: svcImp.Namespace, - Name: ToMCResourceName(svcImp.Name), - }, - } - err = mgrClient.Delete(ctx, mcsvc, &client.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - err = mgrClient.Delete(ctx, &svcImpTmp, &client.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - } - return nil -} - -func cleanUpReplicatedACNPs(ctx context.Context, mgrClient client.Client) error { - acnpList := &crdv1beta1.ClusterNetworkPolicyList{} - if err := mgrClient.List(ctx, acnpList, &client.ListOptions{}); err != nil { - return err - } - for _, acnp := range acnpList.Items { - acnpTmp := acnp - if metav1.HasAnnotation(acnp.ObjectMeta, AntreaMCACNPAnnotation) { - err := mgrClient.Delete(ctx, &acnpTmp, &client.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - } - } - return nil -} - -func cleanUpLabelIdentities(ctx context.Context, mgrClient client.Client) error { - labelIdentityList := &mcv1alpha1.LabelIdentityList{} - if err := mgrClient.List(ctx, labelIdentityList, &client.ListOptions{}); err != nil { - return err - } - for _, labelIdt := range labelIdentityList.Items { - labelIdtTmp := labelIdt - err := mgrClient.Delete(ctx, &labelIdtTmp, &client.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - } - return nil -} - -func cleanUpClusterInfoImports(ctx context.Context, mgrClient client.Client) error { - ciImpList := &mcv1alpha1.ClusterInfoImportList{} - if err := mgrClient.List(ctx, ciImpList, &client.ListOptions{}); err != nil { - return err - } - for _, ciImp := range ciImpList.Items { - ciImpTmp := ciImp - err := mgrClient.Delete(ctx, &ciImpTmp, &client.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - } - return nil -} - -func cleanUpGateways(ctx context.Context, mgrClient client.Client) error { - gwList := &mcv1alpha1.GatewayList{} - if err := mgrClient.List(ctx, gwList, &client.ListOptions{}); err != nil { - return err - } - for _, gw := range gwList.Items { - gwTmp := gw - err := mgrClient.Delete(ctx, &gwTmp, &client.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - } - return nil -} diff --git a/multicluster/controllers/multicluster/common/cleanup_test.go b/multicluster/controllers/multicluster/common/cleanup_test.go deleted file mode 100644 index 6692d332004..00000000000 --- a/multicluster/controllers/multicluster/common/cleanup_test.go +++ /dev/null @@ -1,187 +0,0 @@ -/* -Copyright 2023 Antrea Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package common - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" - - mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" - "antrea.io/antrea/pkg/apis/crd/v1beta1" -) - -func TestCleanUpMCServiceAndServiceImport(t *testing.T) { - existingSVCs := &v1.ServiceList{ - Items: []v1.Service{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "svc-a", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "antrea-mc-svc-b", - }, - }, - }, - } - existingSVCImports := &k8smcsv1alpha1.ServiceImportList{ - Items: []k8smcsv1alpha1.ServiceImport{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "svc-b", - }, - }, - }, - } - - fakeClient := fake.NewClientBuilder().WithScheme(TestScheme).WithLists(existingSVCImports, existingSVCs).Build() - ctx := context.Background() - err := cleanUpMCServicesAndServiceImports(ctx, fakeClient) - require.NoError(t, err) - actualSvcList := &v1.ServiceList{} - err = fakeClient.List(ctx, actualSvcList) - require.NoError(t, err) - assert.Equal(t, 1, len(actualSvcList.Items)) - - actualSvcImpList := &k8smcsv1alpha1.ServiceImportList{} - err = fakeClient.List(ctx, actualSvcImpList) - require.NoError(t, err) - assert.Equal(t, 0, len(actualSvcImpList.Items)) -} - -func TestCleanUpReplicatedACNP(t *testing.T) { - acnpList := &v1beta1.ClusterNetworkPolicyList{ - Items: []v1beta1.ClusterNetworkPolicy{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "acnp-1", - Annotations: map[string]string{ - AntreaMCACNPAnnotation: "true", - }, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "acnp-2", - }, - }, - }, - } - - fakeClient := fake.NewClientBuilder().WithScheme(TestScheme).WithLists(acnpList).Build() - ctx := context.Background() - err := cleanUpReplicatedACNPs(ctx, fakeClient) - require.NoError(t, err) - - actualACNPList := &v1beta1.ClusterNetworkPolicyList{} - err = fakeClient.List(ctx, actualACNPList, &client.ListOptions{}) - require.NoError(t, err) - assert.Equal(t, 1, len(actualACNPList.Items)) -} - -func TestCleanUpLabelIdentities(t *testing.T) { - labelIdentityList := &mcv1alpha1.LabelIdentityList{ - Items: []mcv1alpha1.LabelIdentity{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "labelidt-1", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "labelidt-2", - }, - }, - }, - } - - fakeClient := fake.NewClientBuilder().WithScheme(TestScheme).WithLists(labelIdentityList).Build() - ctx := context.Background() - err := cleanUpLabelIdentities(ctx, fakeClient) - require.NoError(t, err) - - actualIdtList := &mcv1alpha1.LabelIdentityList{} - err = fakeClient.List(ctx, actualIdtList, &client.ListOptions{}) - require.NoError(t, err) - assert.Equal(t, 0, len(actualIdtList.Items)) -} - -func TestCleanUpClusterInfoImport(t *testing.T) { - ciImpList := &mcv1alpha1.ClusterInfoImportList{ - Items: []mcv1alpha1.ClusterInfoImport{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "cluster-1-import", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "cluster-2-import", - }, - }, - }, - } - - fakeClient := fake.NewClientBuilder().WithScheme(TestScheme).WithLists(ciImpList).Build() - ctx := context.Background() - err := cleanUpClusterInfoImports(ctx, fakeClient) - require.NoError(t, err) - - actualCIImpList := &mcv1alpha1.ClusterInfoImportList{} - err = fakeClient.List(ctx, actualCIImpList, &client.ListOptions{}) - require.NoError(t, err) - assert.Equal(t, 0, len(actualCIImpList.Items)) -} - -func TestCleanUpGateway(t *testing.T) { - gwList := &mcv1alpha1.GatewayList{ - Items: []mcv1alpha1.Gateway{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "gw-1", - }, - }, - }, - } - - fakeClient := fake.NewClientBuilder().WithScheme(TestScheme).WithLists(gwList).Build() - ctx := context.Background() - err := cleanUpGateways(ctx, fakeClient) - require.NoError(t, err) - - actualGWList := &mcv1alpha1.ClusterInfoImportList{} - err = fakeClient.List(ctx, actualGWList, &client.ListOptions{}) - require.NoError(t, err) - assert.Equal(t, 0, len(actualGWList.Items)) -} diff --git a/multicluster/controllers/multicluster/common/helper.go b/multicluster/controllers/multicluster/common/helper.go index 7e6f1248f90..d36a98a4337 100644 --- a/multicluster/controllers/multicluster/common/helper.go +++ b/multicluster/controllers/multicluster/common/helper.go @@ -16,12 +16,23 @@ package common import ( "crypto/sha1" // #nosec G505: not used for security purposes "encoding/hex" + "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" ) const labelIdentityHashLength = 16 +// CleanUpRetry is the retry when the clean up method +// failed to clean up all stale resources. +var CleanUpRetry = wait.Backoff{ + Steps: 5, + Duration: 500 * time.Millisecond, + Factor: 1.0, + Jitter: 1, +} + // TODO: Use NamespacedName stringer method instead of this. e.g. nsName.String() func NamespacedName(namespace, name string) string { return namespace + "/" + name diff --git a/multicluster/controllers/multicluster/leader/clusterset_controller.go b/multicluster/controllers/multicluster/leader/clusterset_controller.go index 1944bcf4530..65cfb905e59 100644 --- a/multicluster/controllers/multicluster/leader/clusterset_controller.go +++ b/multicluster/controllers/multicluster/leader/clusterset_controller.go @@ -33,8 +33,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" - "antrea.io/antrea/multicluster/apis/multicluster/constants" - mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" ) @@ -80,12 +78,6 @@ func (r *LeaderClusterSetReconciler) Reconcile(ctx context.Context, req ctrl.Req r.clusterSetConfig = nil r.clusterID = common.InvalidClusterID r.clusterSetID = common.InvalidClusterSetID - if err := cleanUpAllMemberClusterAnnounces(ctx, r.Client); err != nil { - return ctrl.Result{}, err - } - if err := cleanUpResourceExports(ctx, r.Client); err != nil { - return ctrl.Result{}, err - } return ctrl.Result{}, nil } @@ -130,7 +122,7 @@ func (r *LeaderClusterSetReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&mcv1alpha2.ClusterSet{}). WithEventFilter(instance). WithOptions(controller.Options{ - MaxConcurrentReconciles: common.DefaultWorkerCount, + MaxConcurrentReconciles: 1, }). Complete(r) } @@ -242,24 +234,3 @@ func validateMemberClusterExists(clusterID common.ClusterID, clusters []mcv1alph } return } - -func cleanUpResourceExports(ctx context.Context, mgrClient client.Client) error { - resExports := mcv1alpha1.ResourceExportList{} - err := mgrClient.List(ctx, &resExports, &client.ListOptions{}) - if err != nil { - return err - } - for _, resExport := range resExports.Items { - // AntreaClusterNetworkPolicy type of ResourceExports are created by the - // user, we should skip deleting this kind of ResourceExports. - if resExport.Spec.Kind == constants.AntreaClusterNetworkPolicyKind { - continue - } - resExpTmp := resExport - err := mgrClient.Delete(ctx, &resExpTmp, &client.DeleteOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - } - return nil -} diff --git a/multicluster/controllers/multicluster/leader/clusterset_controller_test.go b/multicluster/controllers/multicluster/leader/clusterset_controller_test.go index 02e7b0159bd..8a4ab933f2f 100644 --- a/multicluster/controllers/multicluster/leader/clusterset_controller_test.go +++ b/multicluster/controllers/multicluster/leader/clusterset_controller_test.go @@ -20,7 +20,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -260,33 +259,3 @@ func TestLeaderClusterStatus(t *testing.T) { assert.Equal(t, expectedStatus.Conditions[0].Status, actualStatus.Conditions[0].Status) assert.Equal(t, expectedStatus.Conditions[0].Message, actualStatus.Conditions[0].Message) } - -func TestCleanUpResourceExports(t *testing.T) { - resExports := &mcv1alpha1.ResourceExportList{ - Items: []mcv1alpha1.ResourceExport{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "acnp", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "svc-1", - }, - }, - }, - } - scheme := runtime.NewScheme() - mcv1alpha1.AddToScheme(scheme) - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(resExports).Build() - ctx := context.Background() - err := cleanUpResourceExports(ctx, fakeClient) - require.NoError(t, err) - - resExportList := &mcv1alpha1.ResourceExportList{} - err = fakeClient.List(ctx, resExportList, &client.ListOptions{}) - require.NoError(t, err) - assert.Equal(t, 0, len(resExportList.Items)) -} diff --git a/multicluster/controllers/multicluster/leader/memberclusterannounce_controller.go b/multicluster/controllers/multicluster/leader/memberclusterannounce_controller.go index 9542d74f013..2a6786da0fd 100644 --- a/multicluster/controllers/multicluster/leader/memberclusterannounce_controller.go +++ b/multicluster/controllers/multicluster/leader/memberclusterannounce_controller.go @@ -89,12 +89,6 @@ func (r *MemberClusterAnnounceReconciler) Reconcile(ctx context.Context, req ctr finalizer := fmt.Sprintf("%s/%s", MemberClusterAnnounceFinalizer, memberAnnounce.ClusterID) if !memberAnnounce.DeletionTimestamp.IsZero() { r.removeMemberStatus(memberID) - memberAnnounce.Finalizers = common.RemoveStringFromSlice(memberAnnounce.Finalizers, finalizer) - if err := r.Update(context.TODO(), memberAnnounce); err != nil { - klog.ErrorS(err, "Failed to update MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberAnnounce)) - return ctrl.Result{}, err - } - return ctrl.Result{}, nil } diff --git a/multicluster/controllers/multicluster/leader/stale_controller.go b/multicluster/controllers/multicluster/leader/stale_controller.go index 5472757fe1e..f8f0e186551 100644 --- a/multicluster/controllers/multicluster/leader/stale_controller.go +++ b/multicluster/controllers/multicluster/leader/stale_controller.go @@ -26,14 +26,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/retry" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "antrea.io/antrea/multicluster/apis/multicluster/constants" mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" - mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" ) @@ -43,63 +42,35 @@ const ( memberClusterAnnounceStaleTime = 24 * time.Hour ) -var ( - getResourceExportsByClusterIDFunc = getResourceExportsByClusterID +var getResourceExportsByClusterIDFunc = getResourceExportsByClusterID - // cleanUpRetry is the retry when the cleanUpStaleResourceExports - // failed to clean up all stale ResourceExports. - cleanUpRetry = wait.Backoff{ - Steps: 5, - Duration: 500 * time.Millisecond, - Factor: 1.0, - Jitter: 1, - } -) - -// LeaderStaleResCleanupController will run periodically (memberClusterAnnounceStaleTime / 2 ) (12 hours) +// StaleResCleanupController will run periodically (memberClusterAnnounceStaleTime / 2 ) (12 Hour) // to clean up stale MemberClusterAnnounce resources in the leader cluster if the MemberClusterAnnounce -// timestamp annotation has not been updated for memberClusterAnnounceStaleTime (24 hours). +// timestamp annotation has not been updated for memberClusterAnnounceStaleTime (24 Hours). // It will remove all ResourceExports belong to a member cluster when the corresponding MemberClusterAnnounce // CR is deleted. It will also try to clean up all stale ResourceExports during start. -type LeaderStaleResCleanupController struct { +type StaleResCleanupController struct { client.Client - Scheme *runtime.Scheme - namespace string + Scheme *runtime.Scheme } -func NewLeaderStaleResCleanupController( +func NewStaleResCleanupController( Client client.Client, Scheme *runtime.Scheme, - namespace string, -) *LeaderStaleResCleanupController { - reconciler := &LeaderStaleResCleanupController{ - Client: Client, - Scheme: Scheme, - namespace: namespace, +) *StaleResCleanupController { + reconciler := &StaleResCleanupController{ + Client: Client, + Scheme: Scheme, } return reconciler } -func cleanUpAllMemberClusterAnnounces(ctx context.Context, mgrClient client.Client) error { - memberClusterAnnounceList := &mcv1alpha1.MemberClusterAnnounceList{} - if err := mgrClient.List(ctx, memberClusterAnnounceList, &client.ListOptions{}); err != nil { - klog.ErrorS(err, "Fail to get MemberClusterAnnounce") - return err - } - for _, mca := range memberClusterAnnounceList.Items { - if err := mgrClient.Delete(ctx, &mca, &client.DeleteOptions{}); err != nil { - return err - } - } - return nil -} - -// cleanUpStaleMemberClusterAnnounces will delete any MemberClusterAnnounce if its +// cleanUpExpiredMemberClusterAnnounces will delete any MemberClusterAnnounce if its // last update timestamp is over 24 hours. -func (c *LeaderStaleResCleanupController) cleanUpStaleMemberClusterAnnounces(ctx context.Context) { +func (c *StaleResCleanupController) cleanUpExpiredMemberClusterAnnounces(ctx context.Context) { memberClusterAnnounceList := &mcv1alpha1.MemberClusterAnnounceList{} - if err := c.List(ctx, memberClusterAnnounceList, &client.ListOptions{Namespace: c.namespace}); err != nil { - klog.ErrorS(err, "Fail to get MemberClusterAnnounce in the Namespace", "namespace", c.namespace) + if err := c.List(ctx, memberClusterAnnounceList, &client.ListOptions{}); err != nil { + klog.ErrorS(err, "Fail to get MemberClusterAnnounces") return } @@ -110,9 +81,11 @@ func (c *LeaderStaleResCleanupController) cleanUpStaleMemberClusterAnnounces(ctx continue } if err == nil { - klog.InfoS("Cleaning up stale MemberClusterAnnounce. It has not been updated within the agreed period", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce), "agreedPeriod", memberClusterAnnounceStaleTime) + klog.InfoS("Cleaning up stale MemberClusterAnnounce. It has not been updated within the agreed period", + "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce), "agreedPeriod", memberClusterAnnounceStaleTime) } else { - klog.InfoS("Cleaning up stale MemberClusterAnnounce. The latest update time is not in RFC3339 format", "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce)) + klog.InfoS("Cleaning up stale MemberClusterAnnounce. The latest update time is not in RFC3339 format", + "MemberClusterAnnounce", klog.KObj(&memberClusterAnnounce)) } if err := c.Client.Delete(ctx, &memberClusterAnnounce, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { @@ -122,23 +95,24 @@ func (c *LeaderStaleResCleanupController) cleanUpStaleMemberClusterAnnounces(ctx } } -func (c *LeaderStaleResCleanupController) RunPeriodically(stopCh <-chan struct{}) { - klog.InfoS("Starting LeaderStaleResCleanupController") - defer klog.InfoS("Shutting down LeaderStaleResCleanupController") +func (c *StaleResCleanupController) RunPeriodically(stopCh <-chan struct{}) { + klog.InfoS("Starting StaleResCleanupController") + defer klog.InfoS("Shutting down StaleResCleanupController") ctx, _ := wait.ContextForChannel(stopCh) - go wait.UntilWithContext(ctx, c.cleanUpStaleMemberClusterAnnounces, memberClusterAnnounceStaleTime/2) + go wait.UntilWithContext(ctx, c.cleanUpExpiredMemberClusterAnnounces, memberClusterAnnounceStaleTime/2) <-stopCh } -func (c *LeaderStaleResCleanupController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (c *StaleResCleanupController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { memberAnnounce := &mcv1alpha1.MemberClusterAnnounce{} err := c.Get(ctx, req.NamespacedName, memberAnnounce) - if err != nil && !apierrors.IsNotFound(err) { - return ctrl.Result{}, err + if err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) } - if err == nil { + if err == nil && memberAnnounce.DeletionTimestamp.IsZero() { + // Ignore the event if it's not with non-zero DeletionTimestamp return ctrl.Result{}, nil } @@ -147,27 +121,53 @@ func (c *LeaderStaleResCleanupController) Reconcile(ctx context.Context, req ctr clusterID := getClusterIDFromName(req.Name) staleResExports, err := getResourceExportsByClusterIDFunc(c, ctx, clusterID) if err != nil { - klog.ErrorS(err, "Failed to get ResourceExports in the Namespace by ClusterID", "namespace", c.namespace, "clusterID", clusterID) + klog.ErrorS(err, "Failed to get ResourceExports by ClusterID", "clusterID", clusterID) return ctrl.Result{}, err } - cleanUpSucceed := c.deleteResourceExports(ctx, staleResExports) - if !cleanUpSucceed { + if !deleteResourceExports(ctx, c.Client, staleResExports) { return ctrl.Result{}, fmt.Errorf("failed to clean up all stale ResourceExports for the member cluster %s, retry later", clusterID) } + + // When cleanup is done, remove the Finalizer of this MemberClusterAnnounce. + finalizer := fmt.Sprintf("%s/%s", MemberClusterAnnounceFinalizer, memberAnnounce.ClusterID) + memberAnnounce.Finalizers = common.RemoveStringFromSlice(memberAnnounce.Finalizers, finalizer) + if err := c.Update(context.TODO(), memberAnnounce); err != nil { + klog.ErrorS(err, "Failed to update MemberClusterAnnounce", "MemberClusterAnnounce", klog.KObj(memberAnnounce)) + return ctrl.Result{}, err + } return ctrl.Result{}, nil } -func (c *LeaderStaleResCleanupController) cleanUpStaleResourceExports(ctx context.Context) error { - existingMemberClusterIDs, err := c.getExistingMemberClusterIDs(ctx) +// SetupWithManager sets up the controller with the Manager. +func (c *StaleResCleanupController) SetupWithManager(mgr ctrl.Manager, stopCh <-chan struct{}) error { + // Add an Indexer for ResourceExport, so it can be filtered by the ClusterID. + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &mcv1alpha1.ResourceExport{}, indexKey, func(rawObj client.Object) []string { + resExport := rawObj.(*mcv1alpha1.ResourceExport) + return []string{resExport.Spec.ClusterID} + }); err != nil { + klog.ErrorS(err, "Failed to create the index") + return err + } + + return ctrl.NewControllerManagedBy(mgr). + For(&mcv1alpha1.MemberClusterAnnounce{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + Complete(c) +} + +func cleanUpStaleResourceExports(ctx context.Context, mgrClient client.Client) error { + existingMemberClusterIDs, err := getExistingMemberClusterIDs(ctx, mgrClient) if err != nil { - klog.ErrorS(err, "Failed to get existing member cluster's ClusterID in the Namespace", "namespace", c.namespace) + klog.ErrorS(err, "Failed to get existing member cluster's ClusterID") return err } resourceExports := &mcv1alpha1.ResourceExportList{} - err = c.Client.List(ctx, resourceExports, &client.ListOptions{Namespace: c.namespace}) + err = mgrClient.List(ctx, resourceExports, &client.ListOptions{}) if err != nil { - klog.ErrorS(err, "Failed to get ResourceExports in the Namespace", "namespace", c.namespace) + klog.ErrorS(err, "Failed to get ResourceExports") return err } @@ -175,22 +175,22 @@ func (c *LeaderStaleResCleanupController) cleanUpStaleResourceExports(ctx contex for _, resExport := range resourceExports.Items { // The AntreaClusterNetworkPolicy kind of ResourceExport is created in the leader directly // without a ClusterID info. It's not owned by any member cluster. - if resExport.Spec.ClusterID != "" && !existingMemberClusterIDs.Has(resExport.Spec.ClusterID) { + if resExport.Spec.Kind != constants.AntreaClusterNetworkPolicyKind && !existingMemberClusterIDs.Has(resExport.Spec.ClusterID) { staleResExports = append(staleResExports, resExport) } } - cleanUpSucceed := c.deleteResourceExports(ctx, staleResExports) + cleanUpSucceed := deleteResourceExports(ctx, mgrClient, staleResExports) if !cleanUpSucceed { return fmt.Errorf("stale ResourceExports are not fully cleaned up, retry later") } return nil } -func (c *LeaderStaleResCleanupController) getExistingMemberClusterIDs(ctx context.Context) (sets.Set[string], error) { +func getExistingMemberClusterIDs(ctx context.Context, mgrClient client.Client) (sets.Set[string], error) { validMemberClusterIDs := sets.Set[string]{} memberClusterAnnounces := &mcv1alpha1.MemberClusterAnnounceList{} - err := c.Client.List(ctx, memberClusterAnnounces, &client.ListOptions{Namespace: c.namespace}) + err := mgrClient.List(ctx, memberClusterAnnounces, &client.ListOptions{}) if err != nil { return nil, err } @@ -201,15 +201,17 @@ func (c *LeaderStaleResCleanupController) getExistingMemberClusterIDs(ctx contex return validMemberClusterIDs, nil } -func (c *LeaderStaleResCleanupController) deleteResourceExports(ctx context.Context, resouceExports []mcv1alpha1.ResourceExport) bool { +func deleteResourceExports(ctx context.Context, mgrClient client.Client, resouceExports []mcv1alpha1.ResourceExport) bool { cleanupSucceed := true for _, resourceExport := range resouceExports { tmpResExp := resourceExport if resourceExport.DeletionTimestamp.IsZero() { - klog.V(2).InfoS("Clean up the stale ResourceExport from the member cluster", "resourceexport", klog.KObj(&tmpResExp), "clusterID", tmpResExp.Spec.ClusterID) - err := c.Client.Delete(ctx, &tmpResExp, &client.DeleteOptions{}) + klog.V(2).InfoS("Clean up the stale ResourceExport from the member cluster", + "resourceexport", klog.KObj(&tmpResExp), "clusterID", tmpResExp.Spec.ClusterID) + err := mgrClient.Delete(ctx, &tmpResExp, &client.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { - klog.ErrorS(err, "Failed to clean up the stale ResourceExport from the member cluster", "resourceexport", klog.KObj(&tmpResExp), "clusterID", tmpResExp.Spec.ClusterID) + klog.ErrorS(err, "Failed to clean up the stale ResourceExport from the member cluster", + "resourceexport", klog.KObj(&tmpResExp), "clusterID", tmpResExp.Spec.ClusterID) cleanupSucceed = false } } @@ -217,54 +219,13 @@ func (c *LeaderStaleResCleanupController) deleteResourceExports(ctx context.Cont return cleanupSucceed } -func (c *LeaderStaleResCleanupController) RunOnce(stopCh <-chan struct{}) { - // Try to clean up all stale ResourceExports before start in case - // there was a MemberClusterAnnounce deleted but the event was not handled - // properly if the Antrea Multi-cluster Controller is not healthy. - // Or clean up all resources when there is no ClusterSet CR at all. - retry.OnError(cleanUpRetry, func(err error) bool { return true }, - func() error { - ctx, _ := wait.ContextForChannel(stopCh) - clusterSets := &mcv1alpha2.ClusterSetList{} - if err := c.Client.List(ctx, clusterSets, &client.ListOptions{Namespace: c.namespace}); err != nil { - return err - } - if len(clusterSets.Items) == 0 { - if err := cleanUpAllMemberClusterAnnounces(ctx, c.Client); err != nil { - return err - } - } - return c.cleanUpStaleResourceExports(ctx) - }) -} - -// SetupWithManager sets up the controller with the Manager. -func (c *LeaderStaleResCleanupController) SetupWithManager(mgr ctrl.Manager, stopCh <-chan struct{}) error { - // Add an Indexer for ResourceExport, so it can be filtered by the ClusterID. - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &mcv1alpha1.ResourceExport{}, indexKey, func(rawObj client.Object) []string { - resExport := rawObj.(*mcv1alpha1.ResourceExport) - return []string{resExport.Spec.ClusterID} - }); err != nil { - klog.ErrorS(err, "Failed to create the index") - return err - } - - return ctrl.NewControllerManagedBy(mgr). - For(&mcv1alpha1.MemberClusterAnnounce{}). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). - WithEventFilter(common.DeleteEventPredicate). - Complete(c) -} - func getClusterIDFromName(name string) string { return strings.TrimPrefix(name, "member-announce-from-") } -func getResourceExportsByClusterID(c *LeaderStaleResCleanupController, ctx context.Context, clusterID string) ([]mcv1alpha1.ResourceExport, error) { +func getResourceExportsByClusterID(c *StaleResCleanupController, ctx context.Context, clusterID string) ([]mcv1alpha1.ResourceExport, error) { resourceExports := &mcv1alpha1.ResourceExportList{} - err := c.Client.List(ctx, resourceExports, &client.ListOptions{Namespace: c.namespace}, client.MatchingFields{indexKey: clusterID}) + err := c.Client.List(ctx, resourceExports, &client.ListOptions{}, client.MatchingFields{indexKey: clusterID}) if err != nil { klog.ErrorS(err, "Failed to get ResourceExports by ClusterID", "clusterID", clusterID) return nil, err diff --git a/multicluster/controllers/multicluster/leader/stale_controller_test.go b/multicluster/controllers/multicluster/leader/stale_controller_test.go index 764e9c9d2f7..576b6570310 100644 --- a/multicluster/controllers/multicluster/leader/stale_controller_test.go +++ b/multicluster/controllers/multicluster/leader/stale_controller_test.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "antrea.io/antrea/multicluster/apis/multicluster/constants" mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" @@ -98,7 +99,9 @@ func TestCleanUpStaleResourceExports(t *testing.T) { Namespace: "default", Name: "leader-acnp", }, - Spec: mcv1alpha1.ResourceExportSpec{}, + Spec: mcv1alpha1.ResourceExportSpec{ + Kind: constants.AntreaClusterNetworkPolicyKind, + }, }, }, } @@ -106,9 +109,9 @@ func TestCleanUpStaleResourceExports(t *testing.T) { scheme := runtime.NewScheme() mcv1alpha1.AddToScheme(scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(mcaList, resExports).Build() - c := NewLeaderStaleResCleanupController(fakeClient, scheme, "default") + c := NewStaleResCleanupController(fakeClient, scheme) ctx := context.Background() - c.cleanUpStaleResourceExports(ctx) + cleanUpStaleResourceExports(ctx, c.Client) latestResExports := &mcv1alpha1.ResourceExportList{} err := fakeClient.List(ctx, latestResExports) require.NoError(t, err) @@ -152,13 +155,22 @@ func TestReconcile(t *testing.T) { } scheme := runtime.NewScheme() mcv1alpha1.AddToScheme(scheme) + now := metav1.Now() memberClusterAnnounce1 := &mcv1alpha1.MemberClusterAnnounce{ ObjectMeta: metav1.ObjectMeta{ - Name: "member-announce-from-cluster-1", - Namespace: "default", + Name: "member-announce-from-cluster-1", + Namespace: "default", + DeletionTimestamp: &now, }, ClusterID: "cluster-1", } + memberClusterAnnounce2 := &mcv1alpha1.MemberClusterAnnounce{ + ObjectMeta: metav1.ObjectMeta{ + Name: "member-announce-from-cluster-2", + Namespace: "default", + }, + ClusterID: "cluster-2", + } tests := []struct { name string @@ -167,25 +179,25 @@ func TestReconcile(t *testing.T) { expectedErr error existingMemberAnnounce *mcv1alpha1.MemberClusterAnnounce existingResExports *mcv1alpha1.ResourceExportList - getResourceExportsByClusterID func(c *LeaderStaleResCleanupController, ctx context.Context, clusterID string) ([]mcv1alpha1.ResourceExport, error) + getResourceExportsByClusterID func(c *StaleResCleanupController, ctx context.Context, clusterID string) ([]mcv1alpha1.ResourceExport, error) }{ - { - name: "MemberClusterAnnounce exists", - memberAnnounceName: memberClusterAnnounce1.Name, - existingMemberAnnounce: memberClusterAnnounce1, - existingResExports: resExportsList, - expectedResExportsSize: 3, - }, { name: "MemberClusterAnnounce deleted", memberAnnounceName: memberClusterAnnounce1.Name, - existingMemberAnnounce: &mcv1alpha1.MemberClusterAnnounce{}, + existingMemberAnnounce: memberClusterAnnounce1, existingResExports: resExportsList, - getResourceExportsByClusterID: func(c *LeaderStaleResCleanupController, ctx context.Context, clusterID string) ([]mcv1alpha1.ResourceExport, error) { + getResourceExportsByClusterID: func(c *StaleResCleanupController, ctx context.Context, clusterID string) ([]mcv1alpha1.ResourceExport, error) { return []mcv1alpha1.ResourceExport{resExport1, resExport2}, nil }, expectedResExportsSize: 1, }, + { + name: "MemberClusterAnnounce exists", + memberAnnounceName: memberClusterAnnounce2.Name, + existingMemberAnnounce: memberClusterAnnounce2, + existingResExports: resExportsList, + expectedResExportsSize: 3, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -194,7 +206,7 @@ func TestReconcile(t *testing.T) { getResourceExportsByClusterIDFunc = getResourceExportsByClusterID }() fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithLists(tt.existingResExports).WithObjects(tt.existingMemberAnnounce).Build() - c := NewLeaderStaleResCleanupController(fakeClient, scheme, "default") + c := NewStaleResCleanupController(fakeClient, scheme) ctx := context.Background() _, err := c.Reconcile(ctx, ctrl.Request{ NamespacedName: types.NamespacedName{ @@ -301,8 +313,8 @@ func TestStaleController_CleanUpMemberClusterAnnounces(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(tt.memberClusterAnnounceList).WithLists(tt.clusterSet).Build() - c := NewLeaderStaleResCleanupController(fakeClient, common.TestScheme, "default") - c.cleanUpStaleMemberClusterAnnounces(ctx) + c := NewStaleResCleanupController(fakeClient, common.TestScheme) + c.cleanUpExpiredMemberClusterAnnounces(ctx) memberClusterAnnounceList := &mcv1alpha1.MemberClusterAnnounceList{} if err := fakeClient.List(context.TODO(), memberClusterAnnounceList, &client.ListOptions{}); err != nil { diff --git a/multicluster/controllers/multicluster/member/clusterset_controller.go b/multicluster/controllers/multicluster/member/clusterset_controller.go index 567dcb67df2..d7c7e30e2dd 100644 --- a/multicluster/controllers/multicluster/member/clusterset_controller.go +++ b/multicluster/controllers/multicluster/member/clusterset_controller.go @@ -56,7 +56,8 @@ type MemberClusterSetReconciler struct { ClusterCalimCRDAvailable bool // commonAreaLock protects the access to RemoteCommonArea. - commonAreaLock sync.RWMutex + commonAreaLock sync.RWMutex + commonAreaCreationCh chan struct{} clusterSetConfig *mcv1alpha2.ClusterSet clusterSetID common.ClusterSetID @@ -72,6 +73,7 @@ func NewMemberClusterSetReconciler(client client.Client, namespace string, enableStretchedNetworkPolicy bool, clusterCalimCRDAvailable bool, + commonAreaCreationCh chan struct{}, ) *MemberClusterSetReconciler { return &MemberClusterSetReconciler{ Client: client, @@ -79,6 +81,7 @@ func NewMemberClusterSetReconciler(client client.Client, Namespace: namespace, enableStretchedNetworkPolicy: enableStretchedNetworkPolicy, ClusterCalimCRDAvailable: clusterCalimCRDAvailable, + commonAreaCreationCh: commonAreaCreationCh, } } @@ -90,83 +93,72 @@ func NewMemberClusterSetReconciler(client client.Client, func (r *MemberClusterSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { clusterSet := &mcv1alpha2.ClusterSet{} err := r.Get(ctx, req.NamespacedName, clusterSet) - r.commonAreaLock.Lock() - defer r.commonAreaLock.Unlock() - if err != nil { - if !apierrors.IsNotFound(err) { - return ctrl.Result{}, err - } - klog.InfoS("Received ClusterSet delete", "clusterset", req.NamespacedName) - if r.remoteCommonArea != nil { - klog.InfoS("Clean up all exported resources created by Antrea Multi-cluster controller", "clusterset", req.NamespacedName) - if err := r.deleteResourceExports(ctx); err != nil { - return ctrl.Result{}, err + processClusterSet := func() error { + r.commonAreaLock.Lock() + defer r.commonAreaLock.Unlock() + if err != nil { + if !apierrors.IsNotFound(err) { + return err } - if err := r.deleteMemberClusterAnnounce(ctx); err != nil { - // MemberClusterAnnounce could be kept in the leader cluster, if antrea-mc-controller crashes after the failure. - // Leader cluster will delete the stale MemberClusterAnnounce with a garbage collection mechanism in this case. - return ctrl.Result{}, fmt.Errorf("failed to delete MemberClusterAnnounce in the leader cluster: %v", err) + klog.InfoS("Received ClusterSet delete", "clusterset", req.NamespacedName) + if r.remoteCommonArea != nil { + // Any ResourceExports belongs to this member cluster will be cleaned up by the leader cluster + // when its MemberClusterAnnounce is deleted. + if err := r.deleteMemberClusterAnnounce(ctx); err != nil { + // MemberClusterAnnounce could be kept in the leader cluster, if antrea-mc-controller crashes after the failure. + // Leader cluster will delete the stale MemberClusterAnnounce with a garbage collection mechanism in this case. + return fmt.Errorf("failed to delete MemberClusterAnnounce in the leader cluster: %v", err) + } + r.remoteCommonArea.Stop() + r.remoteCommonArea = nil + r.clusterSetConfig = nil + r.clusterID = common.InvalidClusterID + r.clusterSetID = common.InvalidClusterSetID } - r.remoteCommonArea.Stop() - r.remoteCommonArea = nil - r.clusterSetConfig = nil - r.clusterID = common.InvalidClusterID - r.clusterSetID = common.InvalidClusterSetID - } - klog.InfoS("Clean up all stale resources created by Antrea Multi-cluster controller", "clusterset", req.NamespacedName) - if err := common.CleanUpResourcesCreatedByMC(ctx, r.Client); err != nil { - return ctrl.Result{}, err + klog.InfoS("Clean up all resources created by Antrea Multi-cluster Controller", "clusterset", req.NamespacedName) + if err := cleanUpResourcesCreatedByMC(ctx, r.Client); err != nil { + return err + } + return nil } - return ctrl.Result{}, nil - } - klog.InfoS("Received ClusterSet add/update", "clusterset", klog.KObj(clusterSet)) + klog.InfoS("Received ClusterSet add/update", "clusterset", klog.KObj(clusterSet)) - // Handle create or update - if r.clusterSetConfig == nil { - r.clusterID, err = common.GetClusterID(r.ClusterCalimCRDAvailable, req, r.Client, clusterSet) - if err != nil { - return ctrl.Result{}, err - } - r.clusterSetID = common.ClusterSetID(clusterSet.Name) - if clusterSet.Spec.ClusterID == "" { - // ClusterID is a required feild, and the empty value case should only happen - // when Antrea Multi-cluster is upgraded from an old version prior to v1.13. - // Here we try to update the ClusterSet's ClusterID when it's configured in an - // existing ClusterClaim. - clusterSet.Spec.ClusterID = string(r.clusterID) - err = r.Update(context.TODO(), clusterSet) + // Handle create or update + if r.clusterSetConfig == nil { + r.clusterID, err = common.GetClusterID(r.ClusterCalimCRDAvailable, req, r.Client, clusterSet) if err != nil { - klog.ErrorS(err, "Failed to update ClusterSet's ClusterID", "clusterset", req.NamespacedName) - return ctrl.Result{}, err + return err + } + r.clusterSetID = common.ClusterSetID(clusterSet.Name) + if clusterSet.Spec.ClusterID == "" { + // ClusterID is a required feild, and the empty value case should only happen + // when Antrea Multi-cluster is upgraded from an old version prior to v1.13. + // Here we try to update the ClusterSet's ClusterID when it's configured in an + // existing ClusterClaim. + clusterSet.Spec.ClusterID = string(r.clusterID) + err = r.Update(context.TODO(), clusterSet) + if err != nil { + klog.ErrorS(err, "Failed to update ClusterSet's ClusterID", "clusterset", req.NamespacedName) + return err + } } } - } - r.clusterSetConfig = clusterSet.DeepCopy() + r.clusterSetConfig = clusterSet.DeepCopy() - err = r.createOrUpdateRemoteCommonArea(clusterSet) - if err != nil { - return ctrl.Result{}, err - } - - return ctrl.Result{}, nil -} - -func (r *MemberClusterSetReconciler) deleteResourceExports(ctx context.Context) error { - resExports := &mcv1alpha1.ResourceExportList{} - if err := r.remoteCommonArea.List(ctx, resExports, &client.ListOptions{Namespace: r.remoteCommonArea.GetNamespace()}); err != nil { - return err - } - for _, resExp := range resExports.Items { - if resExp.Spec.ClusterID == string(r.clusterID) { - err := r.remoteCommonArea.Delete(ctx, &resExp, &client.DeleteOptions{}) - if err == nil || apierrors.IsNotFound(err) { - continue - } + err = r.createOrUpdateRemoteCommonArea(clusterSet) + if err != nil { return err } + return nil } - return nil + if err := processClusterSet(); err != nil { + return ctrl.Result{}, err + } + if r.clusterSetConfig != nil { + r.commonAreaCreationCh <- struct{}{} + } + return ctrl.Result{}, nil } func (r *MemberClusterSetReconciler) deleteMemberClusterAnnounce(ctx context.Context) error { @@ -187,7 +179,7 @@ func (r *MemberClusterSetReconciler) SetupWithManager(mgr ctrl.Manager) error { // Update status periodically go func() { for { - <-time.After(time.Second * 15) + <-time.After(time.Second * 30) r.updateStatus() } }() @@ -198,7 +190,7 @@ func (r *MemberClusterSetReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&mcv1alpha2.ClusterSet{}). WithEventFilter(generationPredicate). WithOptions(controller.Options{ - MaxConcurrentReconciles: common.DefaultWorkerCount, + MaxConcurrentReconciles: 1, }). Complete(r) } @@ -372,7 +364,9 @@ func (r *MemberClusterSetReconciler) updateStatus() { clusterSet := &mcv1alpha2.ClusterSet{} err := r.Get(context.TODO(), namespacedName, clusterSet) if err != nil { - klog.ErrorS(err, "Failed to read ClusterSet", "name", namespacedName) + if !apierrors.IsNotFound(err) { + klog.ErrorS(err, "Failed to read ClusterSet", "name", namespacedName) + } return } status.Conditions = clusterSet.Status.Conditions diff --git a/multicluster/controllers/multicluster/member/clusterset_controller_test.go b/multicluster/controllers/multicluster/member/clusterset_controller_test.go index 49dba2426cf..466f2e4d980 100644 --- a/multicluster/controllers/multicluster/member/clusterset_controller_test.go +++ b/multicluster/controllers/multicluster/member/clusterset_controller_test.go @@ -238,17 +238,17 @@ func TestMemberCreateOrUpdateRemoteCommonArea(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(existingClusterSet, existingSecret).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader1", common.LocalClusterID, "mcs1", nil) reconciler := MemberClusterSetReconciler{ - Client: fakeClient, - remoteCommonArea: commonArea, - clusterSetConfig: existingClusterSet, - clusterSetID: "clusterset1", - clusterID: "east", + Client: fakeClient, + remoteCommonArea: commonArea, + clusterSetConfig: existingClusterSet, + clusterSetID: "clusterset1", + clusterID: "east", + enableStretchedNetworkPolicy: true, } - mockCtrl := gomock.NewController(t) mockManager := mocks.NewMockManager(mockCtrl) - mockManager.EXPECT().GetClient() - mockManager.EXPECT().GetScheme() + mockManager.EXPECT().GetClient().Times(2) + mockManager.EXPECT().GetScheme().Times(2) getRemoteConfigAndClient = commonarea.FuncGetFakeRemoteConfigAndClient(mockManager) @@ -257,7 +257,7 @@ func TestMemberCreateOrUpdateRemoteCommonArea(t *testing.T) { assert.Equal(t, expectedInstalledLeader, reconciler.installedLeader) } -func TestLeaderClusterSetAddWithoutClusterID(t *testing.T) { +func TestMemberClusterSetAddWithoutClusterID(t *testing.T) { existingSecret := &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: "mcs1", @@ -307,7 +307,11 @@ func TestLeaderClusterSetAddWithoutClusterID(t *testing.T) { reconciler := MemberClusterSetReconciler{ Client: fakeClient, ClusterCalimCRDAvailable: true, + commonAreaCreationCh: make(chan struct{}), } + go func() { + <-reconciler.commonAreaCreationCh + }() if _, err := reconciler.Reconcile(common.TestCtx, ctrl.Request{ NamespacedName: types.NamespacedName{ Namespace: "mcs1", diff --git a/multicluster/controllers/multicluster/member/gateway_controller_test.go b/multicluster/controllers/multicluster/member/gateway_controller_test.go index 1c48eff722b..433872e29af 100644 --- a/multicluster/controllers/multicluster/member/gateway_controller_test.go +++ b/multicluster/controllers/multicluster/member/gateway_controller_test.go @@ -153,7 +153,7 @@ func TestGatewayReconciler(t *testing.T) { fakeRemoteClient = fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(tt.resExport).Build() } commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, common.LeaderNamespace, nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) commonAreaGatter := mcReconciler r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, commonAreaGatter) diff --git a/multicluster/controllers/multicluster/member/labelidentity_controller_test.go b/multicluster/controllers/multicluster/member/labelidentity_controller_test.go index 183817b757c..a5e5480cae1 100644 --- a/multicluster/controllers/multicluster/member/labelidentity_controller_test.go +++ b/multicluster/controllers/multicluster/member/labelidentity_controller_test.go @@ -174,7 +174,7 @@ func TestLabelIdentityReconciler(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(tt.existingPods).WithObjects(ns).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, common.LeaderNamespace, nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) r := NewLabelIdentityReconciler(fakeClient, common.TestScheme, mcReconciler) go r.Run(stopCh) @@ -241,7 +241,7 @@ func TestNamespaceMapFunc(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(podA, podC, ns).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, common.LeaderNamespace, nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) r := NewLabelIdentityReconciler(fakeClient, common.TestScheme, mcReconciler) diff --git a/multicluster/controllers/multicluster/member/node_controller.go b/multicluster/controllers/multicluster/member/node_controller.go index 6d3d152600b..2f710e1e82c 100644 --- a/multicluster/controllers/multicluster/member/node_controller.go +++ b/multicluster/controllers/multicluster/member/node_controller.go @@ -201,8 +201,9 @@ func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mc if err := r.Client.Get(ctx, types.NamespacedName{Name: newGateway.Name, Namespace: r.namespace}, existingGW); err != nil { if apierrors.IsNotFound(err) { r.activeGateway = "" + return nil } - return client.IgnoreNotFound(err) + return err } if existingGW.GatewayIP == newGateway.GatewayIP && existingGW.InternalIP == newGateway.InternalIP && existingGW.ServiceCIDR == newGateway.ServiceCIDR { diff --git a/multicluster/controllers/multicluster/member/resourceimport_controller_test.go b/multicluster/controllers/multicluster/member/resourceimport_controller_test.go index 9b2dcbba40b..52f0904a3ae 100644 --- a/multicluster/controllers/multicluster/member/resourceimport_controller_test.go +++ b/multicluster/controllers/multicluster/member/resourceimport_controller_test.go @@ -565,9 +565,12 @@ func TestStaleControllerNoRaceWithResourceImportReconciler(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists().Build() ca := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "antrea-mcs", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(ca) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) + c := NewStaleResCleanupController(fakeClient, common.TestScheme, make(chan struct{}), "default", mcReconciler) + go func() { + c.commonAreaCreationCh <- struct{}{} + }() r := newLabelIdentityResourceImportReconciler(fakeClient, scheme, fakeClient, localClusterID, "default", ca) stopCh := make(chan struct{}) diff --git a/multicluster/controllers/multicluster/member/serviceexport_controller_test.go b/multicluster/controllers/multicluster/member/serviceexport_controller_test.go index 6da358953db..9a7013b3a06 100644 --- a/multicluster/controllers/multicluster/member/serviceexport_controller_test.go +++ b/multicluster/controllers/multicluster/member/serviceexport_controller_test.go @@ -78,7 +78,7 @@ func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(existSvcResExport, existEpResExport).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, "ClusterIP", false) r.installedSvcs.Add(&svcInfo{ @@ -273,7 +273,7 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, "ClusterIP", false) for _, tt := range tests { @@ -349,7 +349,7 @@ func TestServiceExportReconciler_handleServiceExportCreateEvent(t *testing.T) { t.Run(tt.name, func(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) - mcReconciler := NewMemberClusterSetReconciler(tt.fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(tt.fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(tt.fakeClient, common.TestScheme, mcReconciler, tt.endpointIPType, tt.endpointSliceEnabled) if _, err := r.Reconcile(common.TestCtx, nginxReq); err != nil { @@ -532,7 +532,7 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(existSvcRe, existEpRe).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, tt.endpointIPType, false) r.installedSvcs.Add(sinfo) diff --git a/multicluster/controllers/multicluster/member/stale_controller.go b/multicluster/controllers/multicluster/member/stale_controller.go index b77d2b229b0..972d2af4ba7 100644 --- a/multicluster/controllers/multicluster/member/stale_controller.go +++ b/multicluster/controllers/multicluster/member/stale_controller.go @@ -18,7 +18,6 @@ package member import ( "context" - "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -26,13 +25,9 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/workqueue" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" k8smcv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "antrea.io/antrea/multicluster/apis/multicluster/constants" @@ -43,34 +38,35 @@ import ( crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" ) -// MemberStaleResCleanupController will clean up ServiceImport, MC Service, ACNP, ClusterInfoImport and LabelIdentity +// StaleResCleanupController will clean up ServiceImport, MC Service, ACNP, ClusterInfoImport and LabelIdentity // resources if no corresponding ResourceImports in the leader cluster and remove stale ResourceExports // in the leader cluster if no corresponding ServiceExport or Gateway in the member cluster when it runs in -// the member cluster. MemberStaleResCleanupController one-time runner will run only once in the member cluster +// the member cluster. StaleResCleanupController one-time runner will run only once in the member cluster // during Multi-cluster Controller starts, and it will retry only if there is an error. -// MemberStaleResCleanupController's reconciler will handle ClusterSet deletion event to clean up any stale resources. -type MemberStaleResCleanupController struct { +// StaleResCleanupController's reconciler will handle ClusterSet deletion event to clean up all +// automatically created resources for the ClusterSet. +type StaleResCleanupController struct { client.Client - Scheme *runtime.Scheme - localClusterID string - commonAreaGetter commonarea.RemoteCommonAreaGetter - namespace string - // queue only ever has one item, but it has nice error handling backoff/retry semantics - queue workqueue.RateLimitingInterface + Scheme *runtime.Scheme + commonAreaCreationCh chan struct{} + localClusterID string + commonAreaGetter commonarea.RemoteCommonAreaGetter + namespace string } -func NewMemberStaleResCleanupController( +func NewStaleResCleanupController( Client client.Client, Scheme *runtime.Scheme, + commonAreaCreationCh chan struct{}, namespace string, commonAreaGetter commonarea.RemoteCommonAreaGetter, -) *MemberStaleResCleanupController { - reconciler := &MemberStaleResCleanupController{ - Client: Client, - Scheme: Scheme, - namespace: namespace, - commonAreaGetter: commonAreaGetter, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MemberStaleResCleanupController"), +) *StaleResCleanupController { + reconciler := &StaleResCleanupController{ + Client: Client, + Scheme: Scheme, + commonAreaCreationCh: commonAreaCreationCh, + namespace: namespace, + commonAreaGetter: commonAreaGetter, } return reconciler } @@ -80,7 +76,7 @@ func NewMemberStaleResCleanupController( // +kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceimports,verbs=get;list;watch; // +kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports,verbs=get;list;watch;delete -func (c *MemberStaleResCleanupController) CleanUp(ctx context.Context) error { +func (c *StaleResCleanupController) CleanUp(ctx context.Context) error { var err error clusterSets := &mcv1alpha2.ClusterSetList{} if err = c.Client.List(ctx, clusterSets, &client.ListOptions{}); err != nil { @@ -89,29 +85,15 @@ func (c *MemberStaleResCleanupController) CleanUp(ctx context.Context) error { if len(clusterSets.Items) == 0 { klog.InfoS("There is no existing ClusterSet, try to clean up all auto-generated resources by Antrea Multi-cluster") - if err = common.CleanUpResourcesCreatedByMC(ctx, c.Client); err != nil { + if err = cleanUpResourcesCreatedByMC(ctx, c.Client); err != nil { return err } - return nil } - var commonArea commonarea.RemoteCommonArea - commonArea, c.localClusterID, err = c.commonAreaGetter.GetRemoteCommonAreaAndLocalID() - if err != nil { - return err - } - - if err = c.cleanUpStaleResourcesOnMember(ctx, commonArea); err != nil { - return err - } - // Clean up stale ResourceExports in the leader cluster for a member cluster. - if err := c.cleanUpStaleResourceExportsOnLeader(ctx, commonArea); err != nil { - return err - } return nil } -func (c *MemberStaleResCleanupController) cleanUpStaleResourcesOnMember(ctx context.Context, commonArea commonarea.RemoteCommonArea) error { +func (c *StaleResCleanupController) cleanUpStaleResourcesOnMember(ctx context.Context, commonArea commonarea.RemoteCommonArea) error { svcImpList := &k8smcv1alpha1.ServiceImportList{} if err := c.List(ctx, svcImpList, &client.ListOptions{}); err != nil { return err @@ -163,7 +145,7 @@ func (c *MemberStaleResCleanupController) cleanUpStaleResourcesOnMember(ctx cont } // Clean up stale ResourceExports in the leader cluster for a member cluster. -func (c *MemberStaleResCleanupController) cleanUpStaleResourceExportsOnLeader(ctx context.Context, commonArea commonarea.RemoteCommonArea) error { +func (c *StaleResCleanupController) cleanUpStaleResourceExportsOnLeader(ctx context.Context, commonArea commonarea.RemoteCommonArea) error { if err := c.cleanUpClusterInfoResourceExports(ctx, commonArea); err != nil { return err } @@ -183,7 +165,7 @@ func (c *MemberStaleResCleanupController) cleanUpStaleResourceExportsOnLeader(ct return nil } -func (c *MemberStaleResCleanupController) cleanUpStaleServiceResources(ctx context.Context, svcImpList *k8smcv1alpha1.ServiceImportList, +func (c *StaleResCleanupController) cleanUpStaleServiceResources(ctx context.Context, svcImpList *k8smcv1alpha1.ServiceImportList, svcList *corev1.ServiceList, resImpList *mcv1alpha1.ResourceImportList) error { svcImpItems := map[string]k8smcv1alpha1.ServiceImport{} for _, svcImp := range svcImpList.Items { @@ -220,7 +202,7 @@ func (c *MemberStaleResCleanupController) cleanUpStaleServiceResources(ctx conte return nil } -func (c *MemberStaleResCleanupController) cleanUpACNPResources(ctx context.Context, acnpList *crdv1beta1.ClusterNetworkPolicyList, +func (c *StaleResCleanupController) cleanUpACNPResources(ctx context.Context, acnpList *crdv1beta1.ClusterNetworkPolicyList, resImpList *mcv1alpha1.ResourceImportList) error { staleMCACNPItems := map[string]crdv1beta1.ClusterNetworkPolicy{} for _, acnp := range acnpList.Items { @@ -244,7 +226,7 @@ func (c *MemberStaleResCleanupController) cleanUpACNPResources(ctx context.Conte return nil } -func (c *MemberStaleResCleanupController) cleanUpClusterInfoImports(ctx context.Context, ciImpList *mcv1alpha1.ClusterInfoImportList, +func (c *StaleResCleanupController) cleanUpClusterInfoImports(ctx context.Context, ciImpList *mcv1alpha1.ClusterInfoImportList, resImpList *mcv1alpha1.ResourceImportList) error { staleCIImps := map[string]mcv1alpha1.ClusterInfoImport{} for _, item := range ciImpList.Items { @@ -265,7 +247,7 @@ func (c *MemberStaleResCleanupController) cleanUpClusterInfoImports(ctx context. return nil } -func (c *MemberStaleResCleanupController) cleanUpLabelIdentities(ctx context.Context, labelIdentityList *mcv1alpha1.LabelIdentityList, +func (c *StaleResCleanupController) cleanUpLabelIdentities(ctx context.Context, labelIdentityList *mcv1alpha1.LabelIdentityList, resImpList *mcv1alpha1.ResourceImportList) error { staleLabelIdentities := map[string]mcv1alpha1.LabelIdentity{} for _, labelIdentityObj := range labelIdentityList.Items { @@ -286,7 +268,7 @@ func (c *MemberStaleResCleanupController) cleanUpLabelIdentities(ctx context.Con // cleanUpServiceResourceExports removes any Service/Endpoint kind of ResourceExports when there is no // corresponding ServiceExport in the local cluster. -func (c *MemberStaleResCleanupController) cleanUpServiceResourceExports(ctx context.Context, commonArea commonarea.RemoteCommonArea, resExpList *mcv1alpha1.ResourceExportList) error { +func (c *StaleResCleanupController) cleanUpServiceResourceExports(ctx context.Context, commonArea commonarea.RemoteCommonArea, resExpList *mcv1alpha1.ResourceExportList) error { svcExpList := &k8smcv1alpha1.ServiceExportList{} if err := c.List(ctx, svcExpList, &client.ListOptions{}); err != nil { return err @@ -319,7 +301,7 @@ func (c *MemberStaleResCleanupController) cleanUpServiceResourceExports(ctx cont return nil } -func (c *MemberStaleResCleanupController) cleanUpLabelIdentityResourceExports(ctx context.Context, commonArea commonarea.RemoteCommonArea, resExpList *mcv1alpha1.ResourceExportList) error { +func (c *StaleResCleanupController) cleanUpLabelIdentityResourceExports(ctx context.Context, commonArea commonarea.RemoteCommonArea, resExpList *mcv1alpha1.ResourceExportList) error { podList, nsList := &corev1.PodList{}, &corev1.NamespaceList{} if err := c.List(ctx, podList, &client.ListOptions{}); err != nil { return err @@ -363,7 +345,7 @@ func (c *MemberStaleResCleanupController) cleanUpLabelIdentityResourceExports(ct // cleanUpClusterInfoResourceExports removes any ClusterInfo kind of ResourceExports when there is no // Gateway in the local cluster. -func (c *MemberStaleResCleanupController) cleanUpClusterInfoResourceExports(ctx context.Context, commonArea commonarea.RemoteCommonArea) error { +func (c *StaleResCleanupController) cleanUpClusterInfoResourceExports(ctx context.Context, commonArea commonarea.RemoteCommonArea) error { var gws mcv1alpha1.GatewayList if err := c.Client.List(ctx, &gws, &client.ListOptions{}); err != nil { return err @@ -384,96 +366,155 @@ func (c *MemberStaleResCleanupController) cleanUpClusterInfoResourceExports(ctx return nil } -// Enqueue will be called after MemberStaleResCleanupController is initialized. -func (c *MemberStaleResCleanupController) Enqueue() { - // The key can be anything as we only have single item. - c.queue.Add("key") +// Run starts the StaleResCleanupController and blocks until stopCh is closed. +func (c *StaleResCleanupController) Run(stopCh <-chan struct{}) { + klog.InfoS("Starting StaleResCleanupController") + defer klog.InfoS("Shutting down StaleResCleanupController") + + ctx, _ := wait.ContextForChannel(stopCh) + retry.OnError(common.CleanUpRetry, func(err error) bool { return true }, + func() error { + return c.CleanUp(ctx) + }) + + go func() { + for range c.commonAreaCreationCh { + retry.OnError(common.CleanUpRetry, func(err error) bool { return true }, + func() error { + if err := c.cleanUpStaleResources(ctx); err != nil { + klog.ErrorS(err, "Failed to clean up stale resources after a ClusterSet is created, will retry later") + return err + } + return nil + }) + } + }() + <-stopCh } -// Run starts the MemberStaleResCleanupController and blocks until stopCh is closed. -// it will run only once to clean up stale resources if no error happens. -func (c *MemberStaleResCleanupController) Run(stopCh <-chan struct{}) { - defer c.queue.ShutDown() - - klog.InfoS("Starting MemberStaleResCleanupController") - defer klog.InfoS("Shutting down MemberStaleResCleanupController") - - ctx, cancel := wait.ContextForChannel(stopCh) +func (c *StaleResCleanupController) cleanUpStaleResources(ctx context.Context) error { + var err error + var commonArea commonarea.RemoteCommonArea + commonArea, c.localClusterID, err = c.commonAreaGetter.GetRemoteCommonAreaAndLocalID() + if err != nil { + return err + } - if err := c.CleanUp(ctx); err != nil { - c.Enqueue() - go wait.UntilWithContext(ctx, func(ctx context.Context) { - c.runWorker(ctx, cancel) - }, 5*time.Second) + klog.InfoS("Clean up all stale imported and exported resources created by Antrea Multi-cluster Controller") + if err = c.cleanUpStaleResourcesOnMember(ctx, commonArea); err != nil { + return err } - <-stopCh + // Clean up stale ResourceExports in the leader cluster for a member cluster. + if err := c.cleanUpStaleResourceExportsOnLeader(ctx, commonArea); err != nil { + return err + } + return nil } -func (c *MemberStaleResCleanupController) runWorker(ctx context.Context, cancel context.CancelFunc) { - for c.processNextWorkItem(ctx, cancel) { +func cleanUpResourcesCreatedByMC(ctx context.Context, mgrClient client.Client) error { + var err error + if err = cleanUpMCServicesAndServiceImports(ctx, mgrClient); err != nil { + return err + } + if err = cleanUpReplicatedACNPs(ctx, mgrClient); err != nil { + return err } + if err = cleanUpLabelIdentities(ctx, mgrClient); err != nil { + return err + } + if err = cleanUpClusterInfoImports(ctx, mgrClient); err != nil { + return err + } + if err = cleanUpGateways(ctx, mgrClient); err != nil { + return err + } + return nil } -func (c *MemberStaleResCleanupController) processNextWorkItem(ctx context.Context, cancel context.CancelFunc) bool { - key, quit := c.queue.Get() - if quit { - return false +func cleanUpMCServicesAndServiceImports(ctx context.Context, mgrClient client.Client) error { + svcImpList := &k8smcv1alpha1.ServiceImportList{} + err := mgrClient.List(ctx, svcImpList, &client.ListOptions{}) + if err != nil { + return err } - defer c.queue.Done(key) - err := c.CleanUp(ctx) - if err == nil { - c.queue.Forget(key) - cancel() - return false + for _, svcImp := range svcImpList.Items { + svcImpTmp := svcImp + mcsvc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: svcImp.Namespace, + Name: common.ToMCResourceName(svcImp.Name), + }, + } + err = mgrClient.Delete(ctx, mcsvc, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + err = mgrClient.Delete(ctx, &svcImpTmp, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } } - - klog.ErrorS(err, "Error removing stale resources, re-queuing it") - c.queue.AddRateLimited(key) - return true + return nil } -// Reconcile ClusterSet changes -func (c *MemberStaleResCleanupController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - var err error - var commonArea commonarea.RemoteCommonArea - commonArea, c.localClusterID, err = c.commonAreaGetter.GetRemoteCommonAreaAndLocalID() - if err != nil { - return ctrl.Result{}, err - } - // When the commonArea is not nil, the ClusterSet is ready since we ignore creation event - // and filter other status from update event. - if commonArea != nil { - klog.InfoS("Clean up all stale imported and exported resources created by Antrea Multi-cluster Controller", - "clusterset", req.NamespacedName) - if err = c.cleanUpStaleResourcesOnMember(ctx, commonArea); err != nil { - return ctrl.Result{}, err +func cleanUpReplicatedACNPs(ctx context.Context, mgrClient client.Client) error { + acnpList := &crdv1beta1.ClusterNetworkPolicyList{} + if err := mgrClient.List(ctx, acnpList, &client.ListOptions{}); err != nil { + return err + } + for _, acnp := range acnpList.Items { + acnpTmp := acnp + if metav1.HasAnnotation(acnp.ObjectMeta, common.AntreaMCACNPAnnotation) { + err := mgrClient.Delete(ctx, &acnpTmp, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } } - if err = c.cleanUpStaleResourceExportsOnLeader(ctx, commonArea); err != nil { - return ctrl.Result{}, err + } + return nil +} + +func cleanUpLabelIdentities(ctx context.Context, mgrClient client.Client) error { + labelIdentityList := &mcv1alpha1.LabelIdentityList{} + if err := mgrClient.List(ctx, labelIdentityList, &client.ListOptions{}); err != nil { + return err + } + for _, labelIdt := range labelIdentityList.Items { + labelIdtTmp := labelIdt + err := mgrClient.Delete(ctx, &labelIdtTmp, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err } } - return ctrl.Result{}, nil + return nil } -var ( - statusReadyPredicate = predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - return false - }, - UpdateFunc: common.StatusReadyPredicate, - DeleteFunc: func(e event.DeleteEvent) bool { - return false - }, +func cleanUpClusterInfoImports(ctx context.Context, mgrClient client.Client) error { + ciImpList := &mcv1alpha1.ClusterInfoImportList{} + if err := mgrClient.List(ctx, ciImpList, &client.ListOptions{}); err != nil { + return err } -) + for _, ciImp := range ciImpList.Items { + ciImpTmp := ciImp + err := mgrClient.Delete(ctx, &ciImpTmp, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + return nil +} -// SetupWithManager sets up the controller with the Manager. -func (r *MemberStaleResCleanupController) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&mcv1alpha2.ClusterSet{}). - WithEventFilter(statusReadyPredicate). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). - Complete(r) +func cleanUpGateways(ctx context.Context, mgrClient client.Client) error { + gwList := &mcv1alpha1.GatewayList{} + if err := mgrClient.List(ctx, gwList, &client.ListOptions{}); err != nil { + return err + } + for _, gw := range gwList.Items { + gwTmp := gw + err := mgrClient.Delete(ctx, &gwTmp, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + return nil } diff --git a/multicluster/controllers/multicluster/member/stale_controller_test.go b/multicluster/controllers/multicluster/member/stale_controller_test.go index ecb3659f48b..e9df72aab6a 100644 --- a/multicluster/controllers/multicluster/member/stale_controller_test.go +++ b/multicluster/controllers/multicluster/member/stale_controller_test.go @@ -21,13 +21,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/reconcile" k8smcv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "antrea.io/antrea/multicluster/apis/multicluster/constants" @@ -105,10 +104,10 @@ func TestStaleController_CleanUpService(t *testing.T) { WithLists(tt.existSvcList, tt.existSvcImpList).Build() fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(tt.existingResImpList).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) - if err := c.CleanUp(ctx); err != nil { + c := NewStaleResCleanupController(fakeClient, common.TestScheme, make(chan struct{}), "default", mcReconciler) + if err := c.cleanUpStaleResources(ctx); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale Service and ServiceImport but got err = %v", err) } ctx := context.TODO() @@ -200,10 +199,10 @@ func TestStaleController_CleanUpACNP(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(tt.existingResImpList).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) - if err := c.CleanUp(ctx); err != nil { + c := NewStaleResCleanupController(fakeClient, common.TestScheme, make(chan struct{}), "default", mcReconciler) + if err := c.cleanUpStaleResources(ctx); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale ACNPs but got err = %v", err) } ctx := context.TODO() @@ -438,10 +437,10 @@ func TestStaleController_CleanUpResourceExports(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(tt.existResExpList).Build() commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) - if err := c.CleanUp(ctx); err != nil { + c := NewStaleResCleanupController(fakeClient, common.TestScheme, make(chan struct{}), "default", mcReconciler) + if err := c.cleanUpStaleResources(ctx); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale ResourceExports but got err = %v", err) } resExpList := &mcv1alpha1.ResourceExportList{} @@ -515,10 +514,10 @@ func TestStaleController_CleanUpClusterInfoImports(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(tt.existingResImpList).Build() commonarea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "antrea-mcs", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonarea) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) - if err := c.CleanUp(ctx); err != nil { + c := NewStaleResCleanupController(fakeClient, common.TestScheme, make(chan struct{}), "default", mcReconciler) + if err := c.cleanUpStaleResources(ctx); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale ClusterInfoImport but got err = %v", err) } ctx := context.TODO() @@ -598,10 +597,10 @@ func TestStaleController_CleanUpLabelIdentites(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(tt.existingResImpList).Build() ca := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "antrea-mcs", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(ca) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) - if err := c.CleanUp(ctx); err != nil { + c := NewStaleResCleanupController(fakeClient, common.TestScheme, make(chan struct{}), "default", mcReconciler) + if err := c.cleanUpStaleResources(ctx); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale LabelIdentities but got err = %v", err) } ctx := context.TODO() @@ -624,59 +623,164 @@ func TestStaleController_CleanupAllWithEmptyClusterSet(t *testing.T) { fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).Build() commonarea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "antrea-mcs", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) + mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonarea) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) + c := NewStaleResCleanupController(fakeClient, common.TestScheme, make(chan struct{}), "default", mcReconciler) if err := c.CleanUp(ctx); err != nil { t.Errorf("StaleController.cleanup() should clean up all stale resources but got err = %v", err) } } -func TestStaleController_Reconcile(t *testing.T) { - toDeleteSvcResExport := &mcv1alpha1.ResourceExport{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "antrea-mcs", - Name: "cluster-a-default-nginx-service", - Labels: map[string]string{ - constants.SourceClusterID: "cluster-a", +func TestCleanUpMCServiceAndServiceImport(t *testing.T) { + existingSVCs := &corev1.ServiceList{ + Items: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "svc-a", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "antrea-mc-svc-b", + }, }, }, - Spec: mcv1alpha1.ResourceExportSpec{ - Name: "nginx", - Namespace: "default", - Kind: constants.ServiceKind, - ClusterID: "cluster-a", + } + existingSVCImports := &k8smcv1alpha1.ServiceImportList{ + Items: []k8smcv1alpha1.ServiceImport{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "svc-b", + }, + }, }, } - acnp1 := &v1beta1.ClusterNetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: common.AntreaMCSPrefix + acnpImportName, - Annotations: map[string]string{common.AntreaMCACNPAnnotation: "true"}, + + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(existingSVCImports, existingSVCs).Build() + ctx := context.Background() + err := cleanUpMCServicesAndServiceImports(ctx, fakeClient) + require.NoError(t, err) + actualSvcList := &corev1.ServiceList{} + err = fakeClient.List(ctx, actualSvcList) + require.NoError(t, err) + assert.Equal(t, 1, len(actualSvcList.Items)) + + actualSvcImpList := &k8smcv1alpha1.ServiceImportList{} + err = fakeClient.List(ctx, actualSvcImpList) + require.NoError(t, err) + assert.Equal(t, 0, len(actualSvcImpList.Items)) +} + +func TestCleanUpReplicatedACNP(t *testing.T) { + acnpList := &v1beta1.ClusterNetworkPolicyList{ + Items: []v1beta1.ClusterNetworkPolicy{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "acnp-1", + Annotations: map[string]string{ + common.AntreaMCACNPAnnotation: "true", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "acnp-2", + }, + }, }, } - fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(acnp1).Build() - fakeRemoteClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(toDeleteSvcResExport).Build() - commonarea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "antrea-mcs", nil) - mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false) - mcReconciler.SetRemoteCommonArea(commonarea) - c := NewMemberStaleResCleanupController(fakeClient, common.TestScheme, "default", mcReconciler) + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(acnpList).Build() ctx := context.Background() - c.Reconcile(ctx, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: "default", - Name: "test-clusterset", - }}) + err := cleanUpReplicatedACNPs(ctx, fakeClient) + require.NoError(t, err) + + actualACNPList := &v1beta1.ClusterNetworkPolicyList{} + err = fakeClient.List(ctx, actualACNPList, &client.ListOptions{}) + require.NoError(t, err) + assert.Equal(t, 1, len(actualACNPList.Items)) +} + +func TestCleanUpLabelIdentities(t *testing.T) { + labelIdentityList := &mcv1alpha1.LabelIdentityList{ + Items: []mcv1alpha1.LabelIdentity{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "labelidt-1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "labelidt-2", + }, + }, + }, + } + + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(labelIdentityList).Build() + ctx := context.Background() + err := cleanUpLabelIdentities(ctx, fakeClient) + require.NoError(t, err) + + actualIdtList := &mcv1alpha1.LabelIdentityList{} + err = fakeClient.List(ctx, actualIdtList, &client.ListOptions{}) + require.NoError(t, err) + assert.Equal(t, 0, len(actualIdtList.Items)) +} - acnpList := &v1beta1.ClusterNetworkPolicyList{} - if err := fakeClient.List(ctx, acnpList, &client.ListOptions{}); err != nil { - t.Errorf("Error when listing the ACNPs after cleanup") +func TestCleanUpClusterInfoImport(t *testing.T) { + ciImpList := &mcv1alpha1.ClusterInfoImportList{ + Items: []mcv1alpha1.ClusterInfoImport{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "cluster-1-import", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "cluster-2-import", + }, + }, + }, } - resExpList := &mcv1alpha1.ResourceExportList{} - err := fakeRemoteClient.List(context.TODO(), resExpList, &client.ListOptions{}) - if err != nil { - t.Errorf("Error when listing the ResourceExports after cleanup") + + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(ciImpList).Build() + ctx := context.Background() + err := cleanUpClusterInfoImports(ctx, fakeClient) + require.NoError(t, err) + + actualCIImpList := &mcv1alpha1.ClusterInfoImportList{} + err = fakeClient.List(ctx, actualCIImpList, &client.ListOptions{}) + require.NoError(t, err) + assert.Equal(t, 0, len(actualCIImpList.Items)) +} + +func TestCleanUpGateway(t *testing.T) { + gwList := &mcv1alpha1.GatewayList{ + Items: []mcv1alpha1.Gateway{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "gw-1", + }, + }, + }, } - assert.Equal(t, 0, len(acnpList.Items), "failed to clean up stale ACNP") - assert.Equal(t, 0, len(resExpList.Items), "failed to clean up stale ResourceExport") + + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithLists(gwList).Build() + ctx := context.Background() + err := cleanUpGateways(ctx, fakeClient) + require.NoError(t, err) + + actualGWList := &mcv1alpha1.ClusterInfoImportList{} + err = fakeClient.List(ctx, actualGWList, &client.ListOptions{}) + require.NoError(t, err) + assert.Equal(t, 0, len(actualGWList.Items)) } diff --git a/multicluster/test/integration/suite_test.go b/multicluster/test/integration/suite_test.go index 16c050a82e3..fc8296e2d8c 100644 --- a/multicluster/test/integration/suite_test.go +++ b/multicluster/test/integration/suite_test.go @@ -137,12 +137,14 @@ var _ = BeforeSuite(func() { k8sClient.Create(ctx, leaderNS) k8sClient.Create(ctx, testNS) k8sClient.Create(ctx, testNSStale) + commonAreaCreationCh := make(chan struct{}) clusterSetReconciler := member.NewMemberClusterSetReconciler( k8sManager.GetClient(), k8sManager.GetScheme(), LeaderNamespace, false, false, + commonAreaCreationCh, ) err = clusterSetReconciler.SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) @@ -161,9 +163,10 @@ var _ = BeforeSuite(func() { // configureClusterSet finishes By("Creating StaleController") - staleController := member.NewMemberStaleResCleanupController( + staleController := member.NewStaleResCleanupController( k8sManager.GetClient(), k8sManager.GetScheme(), + commonAreaCreationCh, "default", clusterSetReconciler, ) diff --git a/pkg/config/controller/config.go b/pkg/config/controller/config.go index 7ccf43b26cd..a2f0a4c343f 100644 --- a/pkg/config/controller/config.go +++ b/pkg/config/controller/config.go @@ -83,7 +83,7 @@ type MulticlusterConfig struct { // Enable Multi-cluster NetworkPolicy, including ingress rules that select peers from all // clusters in a ClusterSet, and egress rules that select Multi-cluster Services. EnableStretchedNetworkPolicy bool `yaml:"enableStretchedNetworkPolicy,omitempty"` - // The Namespace where the Antrea Multi-cluster controller is running. + // The Namespace where the Antrea Multi-cluster Controller is running. // The default is antrea-agent's Namespace. Namespace string `yaml:"namespace,omitempty"` }