diff --git a/multicluster/cmd/multicluster-controller/leader.go b/multicluster/cmd/multicluster-controller/leader.go index 83b80d2acc8..7842e4f0b2c 100644 --- a/multicluster/cmd/multicluster-controller/leader.go +++ b/multicluster/cmd/multicluster-controller/leader.go @@ -51,7 +51,8 @@ func newLeaderCommand() *cobra.Command { func runLeader(o *Options) error { // on the leader we want the reconciler to run for a given Namespace instead of cluster scope - o.options.Namespace = env.GetPodNamespace() + podNamespace := env.GetPodNamespace() + o.options.Namespace = podNamespace stopCh := signals.RegisterSignalHandlers() mgr, err := setupManagerAndCertControllerFunc(true, o) @@ -59,13 +60,15 @@ func runLeader(o *Options) error { return err } + mgrClient := mgr.GetClient() + mgrScheme := mgr.GetScheme() memberClusterStatusManager := leader.NewMemberClusterAnnounceReconciler( - mgr.GetClient(), mgr.GetScheme()) + mgrClient, mgrScheme) if err = memberClusterStatusManager.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating MemberClusterAnnounce controller: %v", err) } - noCachedClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()}) + noCachedClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgrScheme, Mapper: mgr.GetRESTMapper()}) if err != nil { return err } @@ -73,7 +76,7 @@ func runLeader(o *Options) error { hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-memberclusterannounce", &webhook.Admission{Handler: &memberClusterAnnounceValidator{ Client: noCachedClient, - namespace: env.GetPodNamespace()}}) + namespace: podNamespace}}) hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha2-clusterset", &webhook.Admission{Handler: &clusterSetValidator{ @@ -83,8 +86,8 @@ func runLeader(o *Options) error { }) clusterSetReconciler := &leader.LeaderClusterSetReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgrClient, + Scheme: mgrScheme, StatusManager: memberClusterStatusManager, ClusterCalimCRDAvailable: o.ClusterCalimCRDAvailable, } @@ -93,16 +96,16 @@ func runLeader(o *Options) error { } resExportReconciler := &leader.ResourceExportReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme()} + Client: mgrClient, + Scheme: mgrScheme} if err = resExportReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating ResourceExport controller: %v", err) } if o.EnableStretchedNetworkPolicy { labelExportReconciler := leader.NewLabelIdentityExportReconciler( - mgr.GetClient(), - mgr.GetScheme(), - env.GetPodNamespace()) + mgrClient, + mgrScheme, + podNamespace) if err = labelExportReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating LabelIdentityExport controller: %v", err) } diff --git a/multicluster/cmd/multicluster-controller/member.go b/multicluster/cmd/multicluster-controller/member.go index 896ded60020..274c093efa9 100644 --- a/multicluster/cmd/multicluster-controller/member.go +++ b/multicluster/cmd/multicluster-controller/member.go @@ -54,18 +54,20 @@ func runMember(o *Options) error { if err != nil { return err } - + mgrClient := mgr.GetClient() + mgrScheme := mgr.GetScheme() + podNamespace := env.GetPodNamespace() stopCh := signals.RegisterSignalHandlers() hookServer := mgr.GetWebhookServer() hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-gateway", &webhook.Admission{Handler: &gatewayValidator{ - Client: mgr.GetClient(), - namespace: env.GetPodNamespace()}}) + Client: mgrClient, + namespace: podNamespace}}) hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha2-clusterset", &webhook.Admission{Handler: &clusterSetValidator{ - Client: mgr.GetClient(), - namespace: env.GetPodNamespace(), + Client: mgrClient, + namespace: podNamespace, role: memberRole}, }) @@ -83,20 +85,22 @@ func runMember(o *Options) error { commonAreaGetter := clusterSetReconciler svcExportReconciler := member.NewServiceExportReconciler( - mgr.GetClient(), - mgr.GetScheme(), + mgrClient, + mgrScheme, commonAreaGetter, o.EndpointIPType, o.EnableEndpointSlice, + podNamespace, ) if err = svcExportReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating ServiceExport controller: %v", err) } if o.EnableStretchedNetworkPolicy { labelIdentityReconciler := member.NewLabelIdentityReconciler( - mgr.GetClient(), - mgr.GetScheme(), - commonAreaGetter) + mgrClient, + mgrScheme, + commonAreaGetter, + podNamespace) if err = labelIdentityReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("error creating LabelIdentity controller: %v", err) } @@ -104,9 +108,9 @@ func runMember(o *Options) error { } gwReconciler := member.NewGatewayReconciler( - mgr.GetClient(), - mgr.GetScheme(), - env.GetPodNamespace(), + mgrClient, + mgrScheme, + podNamespace, opts.PodCIDRs, commonAreaGetter) if err = gwReconciler.SetupWithManager(mgr); err != nil { @@ -114,9 +118,9 @@ func runMember(o *Options) error { } nodeReconciler := member.NewNodeReconciler( - mgr.GetClient(), - mgr.GetScheme(), - env.GetPodNamespace(), + mgrClient, + mgrScheme, + podNamespace, opts.ServiceCIDR, opts.GatewayIPPrecedence) if err = nodeReconciler.SetupWithManager(mgr); err != nil { diff --git a/multicluster/controllers/multicluster/member/gateway_controller.go b/multicluster/controllers/multicluster/member/gateway_controller.go index b80d4c19d68..2af87ec40c0 100644 --- a/multicluster/controllers/multicluster/member/gateway_controller.go +++ b/multicluster/controllers/multicluster/member/gateway_controller.go @@ -18,18 +18,25 @@ package member import ( "context" + "sync/atomic" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "antrea.io/antrea/multicluster/apis/multicluster/constants" - mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" ) @@ -44,6 +51,7 @@ type ( localClusterID string podCIDRs []string leaderNamespace string + clusterSetReady *atomic.Value } ) @@ -55,12 +63,15 @@ func NewGatewayReconciler( namespace string, podCIDRs []string, commonAreaGetter commonarea.RemoteCommonAreaGetter) *GatewayReconciler { + clusterSetReady := &atomic.Value{} + clusterSetReady.Store(false) reconciler := &GatewayReconciler{ Client: client, Scheme: scheme, namespace: namespace, podCIDRs: podCIDRs, commonAreaGetter: commonAreaGetter, + clusterSetReady: clusterSetReady, } return reconciler } @@ -73,6 +84,11 @@ func NewGatewayReconciler( //+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=clusterinfoimports/finalizers,verbs=update func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if !r.checkClusterSetReady() { + klog.InfoS("Skip reconciling Gateway since there is no connection to the leader") + return ctrl.Result{}, nil + } + klog.V(2).InfoS("Reconciling Gateway", "gateway", req.NamespacedName) var err error var commonArea commonarea.RemoteCommonArea @@ -87,15 +103,15 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct Name: resExportName, Namespace: r.leaderNamespace, } - resExport := &mcsv1alpha1.ResourceExport{ + resExport := &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: resExportName, Namespace: r.leaderNamespace, }, } - createOrUpdate := func(gateway *mcsv1alpha1.Gateway) error { - existingResExport := &mcsv1alpha1.ResourceExport{} + createOrUpdate := func(gateway *mcv1alpha1.Gateway) error { + existingResExport := &mcv1alpha1.ResourceExport{} err := commonArea.Get(ctx, resExportNamespacedName, existingResExport) if err != nil && !apierrors.IsNotFound(err) { return err @@ -114,7 +130,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return nil } - gw := &mcsv1alpha1.Gateway{} + gw := &mcv1alpha1.Gateway{} if err := r.Client.Get(ctx, req.NamespacedName, gw); err != nil { if !apierrors.IsNotFound(err) { return ctrl.Result{}, err @@ -132,8 +148,8 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request, - commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gw *mcsv1alpha1.Gateway) error { - resExportSpec := mcsv1alpha1.ResourceExportSpec{ + commonArea commonarea.RemoteCommonArea, existingResExport *mcv1alpha1.ResourceExport, gw *mcv1alpha1.Gateway) error { + resExportSpec := mcv1alpha1.ResourceExportSpec{ Kind: constants.ClusterInfoKind, ClusterID: r.localClusterID, Name: r.localClusterID, @@ -150,15 +166,15 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R } func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.Request, - commonArea commonarea.RemoteCommonArea, gateway *mcsv1alpha1.Gateway) error { - resExportSpec := mcsv1alpha1.ResourceExportSpec{ + commonArea commonarea.RemoteCommonArea, gateway *mcv1alpha1.Gateway) error { + resExportSpec := mcv1alpha1.ResourceExportSpec{ Kind: constants.ClusterInfoKind, ClusterID: r.localClusterID, Name: r.localClusterID, Namespace: r.namespace, } resExportSpec.ClusterInfo = r.getClusterInfo(gateway) - resExport := &mcsv1alpha1.ResourceExport{ + resExport := &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.leaderNamespace, Name: common.NewClusterInfoResourceExportName(r.localClusterID), @@ -176,7 +192,9 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R // SetupWithManager sets up the controller with the Manager. func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&mcsv1alpha1.Gateway{}). + For(&mcv1alpha1.Gateway{}). + Watches(&source.Kind{Type: &mcv1alpha2.ClusterSet{}}, handler.EnqueueRequestsFromMapFunc(r.clusterSetMapFunc), + builder.WithPredicates(statusReadyPredicate)). WithOptions(controller.Options{ // TODO: add a lock for r.serviceCIDR and r.localClusterID if // there is any plan to increase this concurrent number. @@ -185,19 +203,56 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *GatewayReconciler) getClusterInfo(gateway *mcsv1alpha1.Gateway) *mcsv1alpha1.ClusterInfo { - clusterInfo := &mcsv1alpha1.ClusterInfo{ +func (r *GatewayReconciler) clusterSetMapFunc(a client.Object) []reconcile.Request { + clusterSet := &mcv1alpha2.ClusterSet{} + requests := []reconcile.Request{} + if a.GetNamespace() != r.namespace { + return requests + } + ctx := context.TODO() + err := r.Client.Get(ctx, types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()}, clusterSet) + if err == nil { + if len(clusterSet.Status.Conditions) > 0 && clusterSet.Status.Conditions[0].Status == v1.ConditionTrue { + r.setClusterSetReady(true) + gwList := &mcv1alpha1.GatewayList{} + r.Client.List(ctx, gwList, &client.ListOptions{Namespace: r.namespace}) + requests = make([]reconcile.Request, len(gwList.Items)) + for i, gw := range gwList.Items { + requests[i] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: gw.Namespace, + Name: gw.Name, + }, + } + } + } + } else if apierrors.IsNotFound(err) { + r.setClusterSetReady(false) + } + return requests +} + +func (r *GatewayReconciler) checkClusterSetReady() bool { + return r.clusterSetReady.Load().(bool) +} + +func (r *GatewayReconciler) setClusterSetReady(clusterSetReady bool) { + r.clusterSetReady.Store(clusterSetReady) +} + +func (r *GatewayReconciler) getClusterInfo(gateway *mcv1alpha1.Gateway) *mcv1alpha1.ClusterInfo { + clusterInfo := &mcv1alpha1.ClusterInfo{ ClusterID: r.localClusterID, ServiceCIDR: gateway.ServiceCIDR, PodCIDRs: r.podCIDRs, - GatewayInfos: []mcsv1alpha1.GatewayInfo{ + GatewayInfos: []mcv1alpha1.GatewayInfo{ { GatewayIP: gateway.GatewayIP, }, }, } if gateway.WireGuard != nil && gateway.WireGuard.PublicKey != "" { - clusterInfo.WireGuard = &mcsv1alpha1.WireGuardInfo{ + clusterInfo.WireGuard = &mcv1alpha1.WireGuardInfo{ PublicKey: gateway.WireGuard.PublicKey, } } diff --git a/multicluster/controllers/multicluster/member/gateway_controller_test.go b/multicluster/controllers/multicluster/member/gateway_controller_test.go index 433872e29af..a9ec69edeb8 100644 --- a/multicluster/controllers/multicluster/member/gateway_controller_test.go +++ b/multicluster/controllers/multicluster/member/gateway_controller_test.go @@ -22,15 +22,18 @@ import ( "time" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "antrea.io/antrea/multicluster/apis/multicluster/constants" - mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" ) @@ -41,7 +44,7 @@ var ( gw1CreationTime = metav1.NewTime(time.Now()) - gwNode1 = mcsv1alpha1.Gateway{ + gwNode1 = mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: "node-1", Namespace: "default", @@ -51,19 +54,19 @@ var ( InternalIP: "172.11.10.1", } - existingResExport = &mcsv1alpha1.ResourceExport{ + existingResExport = &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: "cluster-a-clusterinfo", Namespace: common.LeaderNamespace, }, - Spec: mcsv1alpha1.ResourceExportSpec{ + Spec: mcv1alpha1.ResourceExportSpec{ Name: clusterID, Namespace: "default", Kind: constants.ClusterInfoKind, - ClusterInfo: &mcsv1alpha1.ClusterInfo{ + ClusterInfo: &mcv1alpha1.ClusterInfo{ ServiceCIDR: serviceCIDR, ClusterID: clusterID, - GatewayInfos: []mcsv1alpha1.GatewayInfo{ + GatewayInfos: []mcv1alpha1.GatewayInfo{ { GatewayIP: "10.10.10.10", }, @@ -81,9 +84,9 @@ func TestGatewayReconciler(t *testing.T) { tests := []struct { name string namespacedName types.NamespacedName - gateway []mcsv1alpha1.Gateway - resExport *mcsv1alpha1.ResourceExport - expectedInfo []mcsv1alpha1.GatewayInfo + gateway []mcv1alpha1.Gateway + resExport *mcv1alpha1.ResourceExport + expectedInfo []mcv1alpha1.GatewayInfo expectedErr string isDelete bool }{ @@ -93,10 +96,10 @@ func TestGatewayReconciler(t *testing.T) { Namespace: "default", Name: "node-1", }, - gateway: []mcsv1alpha1.Gateway{ + gateway: []mcv1alpha1.Gateway{ gwNode1, }, - expectedInfo: []mcsv1alpha1.GatewayInfo{ + expectedInfo: []mcv1alpha1.GatewayInfo{ { GatewayIP: "10.10.10.10", }, @@ -108,7 +111,7 @@ func TestGatewayReconciler(t *testing.T) { Namespace: "default", Name: "node-1", }, - gateway: []mcsv1alpha1.Gateway{ + gateway: []mcv1alpha1.Gateway{ gwNode1, }, resExport: staleExistingResExport, @@ -120,11 +123,11 @@ func TestGatewayReconciler(t *testing.T) { Namespace: "default", Name: "node-1", }, - gateway: []mcsv1alpha1.Gateway{ + gateway: []mcv1alpha1.Gateway{ gwNode1New, }, resExport: existingResExport, - expectedInfo: []mcsv1alpha1.GatewayInfo{ + expectedInfo: []mcv1alpha1.GatewayInfo{ { GatewayIP: "10.10.10.12", }, @@ -157,6 +160,7 @@ func TestGatewayReconciler(t *testing.T) { mcReconciler.SetRemoteCommonArea(commonArea) commonAreaGatter := mcReconciler r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, commonAreaGatter) + r.setClusterSetReady(true) t.Run(tt.name, func(t *testing.T) { req := ctrl.Request{NamespacedName: tt.namespacedName} if _, err := r.Reconcile(common.TestCtx, req); err != nil { @@ -166,7 +170,7 @@ func TestGatewayReconciler(t *testing.T) { t.Errorf("Gateway Reconciler should handle ResourceExports events successfully but got error = %v", err) } } else { - ciExport := mcsv1alpha1.ResourceExport{} + ciExport := mcv1alpha1.ResourceExport{} ciExportName := types.NamespacedName{ Namespace: common.LeaderNamespace, Name: common.NewClusterInfoResourceExportName(common.LocalClusterID), @@ -189,29 +193,80 @@ func TestGatewayReconciler(t *testing.T) { func TestGetClusterInfo(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects().Build() r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, nil) - gw := &mcsv1alpha1.Gateway{ + gw := &mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: "gw", }, ServiceCIDR: "10.100.0.0/16", GatewayIP: "10.10.1.1", InternalIP: "10.10.1.1", - WireGuard: &mcsv1alpha1.WireGuardInfo{ + WireGuard: &mcv1alpha1.WireGuardInfo{ PublicKey: "key", }, } - expectedClusterInfo := &mcsv1alpha1.ClusterInfo{ - GatewayInfos: []mcsv1alpha1.GatewayInfo{ + expectedClusterInfo := &mcv1alpha1.ClusterInfo{ + GatewayInfos: []mcv1alpha1.GatewayInfo{ { GatewayIP: "10.10.1.1", }, }, ServiceCIDR: "10.100.0.0/16", PodCIDRs: []string{"10.200.1.1/16"}, - WireGuard: &mcsv1alpha1.WireGuardInfo{ + WireGuard: &mcv1alpha1.WireGuardInfo{ PublicKey: "key", }, } assert.Equal(t, expectedClusterInfo, r.getClusterInfo(gw)) } + +func TestClusterSetMapFunc_Gateway(t *testing.T) { + clusterSet := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{ + { + Status: corev1.ConditionTrue, + Type: mcv1alpha2.ClusterSetReady, + }, + }, + }, + } + + deletedClusterSet := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test-deleted", + }, + } + gw1 := &mcv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gw-1", + Namespace: "default", + }, + } + expectedReqs := []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: gw1.GetName(), + Namespace: gw1.GetNamespace(), + }, + }, + } + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(clusterSet, gw1).Build() + r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, nil) + r.setClusterSetReady(true) + requests := r.clusterSetMapFunc(clusterSet) + assert.Equal(t, expectedReqs, requests) + + requests = r.clusterSetMapFunc(deletedClusterSet) + assert.Equal(t, []reconcile.Request{}, requests) + assert.Equal(t, false, r.checkClusterSetReady()) + + r = NewGatewayReconciler(fakeClient, common.TestScheme, "mismatch_ns", []string{"10.200.1.1/16"}, nil) + requests = r.clusterSetMapFunc(clusterSet) + assert.Equal(t, []reconcile.Request{}, requests) +} diff --git a/multicluster/controllers/multicluster/member/labelidentity_controller.go b/multicluster/controllers/multicluster/member/labelidentity_controller.go index cda8263469f..39bd2a6c05f 100644 --- a/multicluster/controllers/multicluster/member/labelidentity_controller.go +++ b/multicluster/controllers/multicluster/member/labelidentity_controller.go @@ -19,6 +19,7 @@ package member import ( "context" "sync" + "sync/atomic" "time" v1 "k8s.io/api/core/v1" @@ -41,7 +42,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" "antrea.io/antrea/multicluster/apis/multicluster/constants" - mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" ) @@ -56,6 +58,7 @@ type ( commonAreaMutex sync.Mutex commonAreaGetter commonarea.RemoteCommonAreaGetter remoteCommonArea commonarea.RemoteCommonArea + namespace string // labelMutex prevents concurrent access to labelToPodsCache and podLabelCache. // It also prevents concurrent updates to labelExportUpdatesInProgress. labelMutex sync.RWMutex @@ -65,22 +68,28 @@ type ( podLabelCache map[string]string // labelQueue maintains the normalized labels whose corresponding ResourceExport objects are // determined to be created/deleted by the reconciler. - labelQueue workqueue.RateLimitingInterface - localClusterID string + labelQueue workqueue.RateLimitingInterface + localClusterID string + clusterSetReady *atomic.Value } ) func NewLabelIdentityReconciler( client client.Client, scheme *runtime.Scheme, - commonAreaGetter commonarea.RemoteCommonAreaGetter) *LabelIdentityReconciler { + commonAreaGetter commonarea.RemoteCommonAreaGetter, + namespace string) *LabelIdentityReconciler { + clusterSetReady := &atomic.Value{} + clusterSetReady.Store(false) return &LabelIdentityReconciler{ Client: client, Scheme: scheme, + namespace: namespace, commonAreaGetter: commonAreaGetter, labelToPodsCache: map[string]sets.Set[string]{}, podLabelCache: map[string]string{}, labelQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()), + clusterSetReady: clusterSetReady, } } @@ -88,6 +97,11 @@ func NewLabelIdentityReconciler( // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch func (r *LabelIdentityReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if !r.checkClusterSetReady() { + klog.V(2).InfoS("Skip reconciling Pod since there is no connection to the leader") + return ctrl.Result{}, nil + } + klog.V(2).InfoS("Reconciling Pod for label identity", "pod", req.NamespacedName) if requeue := r.checkRemoteCommonArea(); requeue { return ctrl.Result{Requeue: true}, nil @@ -130,17 +144,60 @@ func (r *LabelIdentityReconciler) checkRemoteCommonArea() bool { // SetupWithManager sets up the controller with the Manager. func (r *LabelIdentityReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&v1.Pod{}). - WithEventFilter(predicate.LabelChangedPredicate{}). + For(&v1.Pod{}, builder.WithPredicates(predicate.LabelChangedPredicate{})). Watches(&source.Kind{Type: &v1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(r.namespaceMapFunc), builder.WithPredicates(predicate.LabelChangedPredicate{})). + Watches(&source.Kind{Type: &mcv1alpha2.ClusterSet{}}, + handler.EnqueueRequestsFromMapFunc(r.clusterSetMapFunc), + builder.WithPredicates(statusReadyPredicate)). WithOptions(controller.Options{ MaxConcurrentReconciles: common.LabelIdentityWorkerCount, }). Complete(r) } +func (r *LabelIdentityReconciler) clusterSetMapFunc(a client.Object) []reconcile.Request { + clusterSet := &mcv1alpha2.ClusterSet{} + requests := []reconcile.Request{} + if a.GetNamespace() != r.namespace { + return requests + } + ctx := context.TODO() + err := r.Client.Get(ctx, types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()}, clusterSet) + if err == nil { + if len(clusterSet.Status.Conditions) > 0 && clusterSet.Status.Conditions[0].Status == v1.ConditionTrue { + r.setClusterSetReady(true) + podList := &v1.PodList{} + r.Client.List(ctx, podList) + requests = make([]reconcile.Request, len(podList.Items)) + for i, pod := range podList.Items { + podNamespacedName := types.NamespacedName{ + Name: pod.GetName(), + Namespace: pod.GetNamespace(), + } + requests[i] = reconcile.Request{ + NamespacedName: podNamespacedName, + } + } + } + } else if apierrors.IsNotFound(err) { + r.setClusterSetReady(false) + // Reset caches when a ClusterSet is deleted. + r.labelToPodsCache = map[string]sets.Set[string]{} + r.podLabelCache = map[string]string{} + } + return requests +} + +func (r *LabelIdentityReconciler) checkClusterSetReady() bool { + return r.clusterSetReady.Load().(bool) +} + +func (r *LabelIdentityReconciler) setClusterSetReady(clusterSetReady bool) { + r.clusterSetReady.Store(clusterSetReady) +} + // namespaceMapFunc handles Namespace update events (Namespace label change) by enqueuing // all Pods in the Namespace into the reconciler processing queue. func (r *LabelIdentityReconciler) namespaceMapFunc(ns client.Object) []reconcile.Request { @@ -288,7 +345,7 @@ func (r *LabelIdentityReconciler) createLabelIdentityResExport(ctx context.Conte // deleteLabelIdentityResExport deletes a ResourceExport for a stale label. func (r *LabelIdentityReconciler) deleteLabelIdentityResExport(ctx context.Context, labelToDelete string) error { - labelResExport := &mcsv1alpha1.ResourceExport{ + labelResExport := &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: getResourceExportNameForLabelIdentity(r.localClusterID, labelToDelete), Namespace: r.remoteCommonArea.GetNamespace(), @@ -299,8 +356,8 @@ func (r *LabelIdentityReconciler) deleteLabelIdentityResExport(ctx context.Conte return client.IgnoreNotFound(err) } -func (r *LabelIdentityReconciler) getLabelIdentityResourceExport(name, normalizedLabel string) *mcsv1alpha1.ResourceExport { - return &mcsv1alpha1.ResourceExport{ +func (r *LabelIdentityReconciler) getLabelIdentityResourceExport(name, normalizedLabel string) *mcv1alpha1.ResourceExport { + return &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: r.remoteCommonArea.GetNamespace(), @@ -309,10 +366,10 @@ func (r *LabelIdentityReconciler) getLabelIdentityResourceExport(name, normalize constants.SourceClusterID: r.localClusterID, }, }, - Spec: mcsv1alpha1.ResourceExportSpec{ + Spec: mcv1alpha1.ResourceExportSpec{ ClusterID: r.localClusterID, Kind: constants.LabelIdentityKind, - LabelIdentity: &mcsv1alpha1.LabelIdentityExport{ + LabelIdentity: &mcv1alpha1.LabelIdentityExport{ NormalizedLabel: normalizedLabel, }, }, diff --git a/multicluster/controllers/multicluster/member/labelidentity_controller_test.go b/multicluster/controllers/multicluster/member/labelidentity_controller_test.go index a5e5480cae1..08ac5846f42 100644 --- a/multicluster/controllers/multicluster/member/labelidentity_controller_test.go +++ b/multicluster/controllers/multicluster/member/labelidentity_controller_test.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" ) @@ -176,7 +177,8 @@ func TestLabelIdentityReconciler(t *testing.T) { commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, common.LeaderNamespace, nil) mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - r := NewLabelIdentityReconciler(fakeClient, common.TestScheme, mcReconciler) + r := NewLabelIdentityReconciler(fakeClient, common.TestScheme, mcReconciler, "default") + r.setClusterSetReady(true) go r.Run(stopCh) for _, p := range tt.existingPods.Items { @@ -244,7 +246,8 @@ func TestNamespaceMapFunc(t *testing.T) { mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", true, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - r := NewLabelIdentityReconciler(fakeClient, common.TestScheme, mcReconciler) + r := NewLabelIdentityReconciler(fakeClient, common.TestScheme, mcReconciler, "default") + r.setClusterSetReady(true) actualReq := r.namespaceMapFunc(ns) assert.ElementsMatch(t, expReq, actualReq) } @@ -286,3 +289,76 @@ func TestGetNormalizedLabel(t *testing.T) { }) } } + +func TestClusterSetMapFunc_LabelIdentity(t *testing.T) { + clusterSet := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{ + { + Status: v1.ConditionTrue, + Type: mcv1alpha2.ClusterSetReady, + }, + }, + }, + } + clusterSet2 := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test-stale", + }, + } + pod1 := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + }, + } + pod2 := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "pod2", + }, + } + pods := &v1.PodList{ + Items: []v1.Pod{ + pod1, pod2, + }, + } + expectedReqs := []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: pod1.GetName(), + Namespace: pod1.GetNamespace(), + }, + }, + { + NamespacedName: types.NamespacedName{ + Name: pod2.GetName(), + Namespace: pod2.GetNamespace(), + }, + }, + } + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(clusterSet).WithLists(pods).Build() + r := NewLabelIdentityReconciler(fakeClient, common.TestScheme, nil, clusterSet.Namespace) + r.setClusterSetReady(true) + requests := r.clusterSetMapFunc(clusterSet) + assert.Equal(t, expectedReqs, requests) + + r = NewLabelIdentityReconciler(fakeClient, common.TestScheme, nil, "mismatch_ns") + requests = r.clusterSetMapFunc(clusterSet) + assert.Equal(t, []reconcile.Request{}, requests) + + // non-existing ClusterSet + r = NewLabelIdentityReconciler(fakeClient, common.TestScheme, nil, "default") + r.setClusterSetReady(true) + r.labelToPodsCache["label"] = sets.New[string]("default/nginx") + r.podLabelCache["default/nginx"] = "label" + requests = r.clusterSetMapFunc(clusterSet2) + assert.Equal(t, []reconcile.Request{}, requests) + assert.Equal(t, 0, len(r.labelToPodsCache)) + assert.Equal(t, 0, len(r.labelToPodsCache)) +} diff --git a/multicluster/controllers/multicluster/member/node_controller.go b/multicluster/controllers/multicluster/member/node_controller.go index 2f710e1e82c..5b46b58235b 100644 --- a/multicluster/controllers/multicluster/member/node_controller.go +++ b/multicluster/controllers/multicluster/member/node_controller.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "net" + "sync" + "sync/atomic" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -28,28 +30,60 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - - mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" ) var ( ServiceCIDRDiscoverFn = common.DiscoverServiceCIDRByInvalidServiceCreation + + statusReadyPredicateFunc = func(e event.UpdateEvent) bool { + if e.ObjectOld == nil || e.ObjectNew == nil { + return false + } + oldClusterSet := e.ObjectOld.(*mcv1alpha2.ClusterSet) + newClusterSet := e.ObjectNew.(*mcv1alpha2.ClusterSet) + oldConditionSize := len(oldClusterSet.Status.Conditions) + newConditionSize := len(newClusterSet.Status.Conditions) + if oldConditionSize == 0 && newConditionSize > 0 && newClusterSet.Status.Conditions[0].Status == corev1.ConditionTrue { + return true + } + if oldConditionSize > 0 && newConditionSize > 0 && + (oldClusterSet.Status.Conditions[0].Status == corev1.ConditionFalse || oldClusterSet.Status.Conditions[0].Status == corev1.ConditionUnknown) && + newClusterSet.Status.Conditions[0].Status == corev1.ConditionTrue { + return true + } + return false + } + + statusReadyPredicate = predicate.Funcs{ + UpdateFunc: statusReadyPredicateFunc, + } ) type ( // NodeReconciler is for member cluster only. NodeReconciler struct { client.Client - Scheme *runtime.Scheme - namespace string - precedence mcsv1alpha1.Precedence - gatewayCandidates map[string]bool - activeGateway string - serviceCIDR string - initialized bool + Scheme *runtime.Scheme + namespace string + precedence mcv1alpha1.Precedence + gatewayCandidates map[string]bool + activeGatewayMutex sync.Mutex + activeGateway string + serviceCIDR string + initialized bool + clusterSetReady *atomic.Value } ) @@ -63,10 +97,12 @@ func NewNodeReconciler( scheme *runtime.Scheme, namespace string, serviceCIDR string, - precedence mcsv1alpha1.Precedence) *NodeReconciler { + precedence mcv1alpha1.Precedence) *NodeReconciler { if string(precedence) == "" { - precedence = mcsv1alpha1.PrecedenceInternal + precedence = mcv1alpha1.PrecedenceInternal } + clusterSetReady := &atomic.Value{} + clusterSetReady.Store(false) reconciler := &NodeReconciler{ Client: client, Scheme: scheme, @@ -74,6 +110,7 @@ func NewNodeReconciler( serviceCIDR: serviceCIDR, precedence: precedence, gatewayCandidates: make(map[string]bool), + clusterSetReady: clusterSetReady, } return reconciler } @@ -84,6 +121,11 @@ func NewNodeReconciler( //+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=gateways/finalizers,verbs=update func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if !r.checkClusterSetReady() { + klog.V(2).InfoS("Skip reconciling Node since there is no connection to the leader") + return ctrl.Result{}, nil + } + klog.V(2).InfoS("Reconciling Node", "node", req.Name) if !r.initialized { if err := r.initialize(); err != nil { @@ -91,13 +133,15 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. } r.initialized = true } - gw := &mcsv1alpha1.Gateway{ + gw := &mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: req.Name, Namespace: r.namespace, }, } + r.activeGatewayMutex.Lock() + defer r.activeGatewayMutex.Unlock() noActiveGateway := r.activeGateway == "" isActiveGateway := r.activeGateway == req.Name stillGatewayNode := false @@ -161,7 +205,7 @@ func (r *NodeReconciler) initialize() error { return err } - gwList := &mcsv1alpha1.GatewayList{} + gwList := &mcv1alpha1.GatewayList{} if err := r.Client.List(ctx, gwList, &client.ListOptions{}); err != nil { return err } @@ -173,7 +217,7 @@ func (r *NodeReconciler) initialize() error { if !apierrors.IsNotFound(err) { return err } - staleGateway := &mcsv1alpha1.Gateway{ + staleGateway := &mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.namespace, Name: existingGWName}, @@ -194,8 +238,8 @@ func (r *NodeReconciler) initialize() error { return nil } -func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mcsv1alpha1.Gateway) error { - existingGW := &mcsv1alpha1.Gateway{} +func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mcv1alpha1.Gateway) error { + existingGW := &mcv1alpha1.Gateway{} // TODO: cache might be stale. Need to revisit here and other reconcilers to // check if we can improve this with 'Owns' or other methods. if err := r.Client.Get(ctx, types.NamespacedName{Name: newGateway.Name, Namespace: r.namespace}, existingGW); err != nil { @@ -222,7 +266,7 @@ func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mc // recreateActiveGateway will delete the existing Gateway CR and create a new Gateway // from the pool of Gateway candidates. -func (r *NodeReconciler) recreateActiveGateway(ctx context.Context, gateway *mcsv1alpha1.Gateway) error { +func (r *NodeReconciler) recreateActiveGateway(ctx context.Context, gateway *mcv1alpha1.Gateway) error { err := r.Client.Delete(ctx, gateway, &client.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err @@ -241,8 +285,8 @@ func (r *NodeReconciler) recreateActiveGateway(ctx context.Context, gateway *mcs // getValidGatewayFromCandidates picks a valid Node from Gateway candidates and // creates a Gateway. It returns no error if no good Gateway candidate. -func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcsv1alpha1.Gateway, error) { - var activeGateway *mcsv1alpha1.Gateway +func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcv1alpha1.Gateway, error) { + var activeGateway *mcv1alpha1.Gateway var internalIP, gwIP string var err error @@ -257,7 +301,7 @@ func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcsv1alpha1.Gateway, continue } - activeGateway = &mcsv1alpha1.Gateway{ + activeGateway = &mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: gatewayNode.Name, Namespace: r.namespace, @@ -276,8 +320,12 @@ func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcsv1alpha1.Gateway, return nil, nil } -func (r *NodeReconciler) createGateway(gateway *mcsv1alpha1.Gateway) error { +func (r *NodeReconciler) createGateway(gateway *mcv1alpha1.Gateway) error { if err := r.Client.Create(context.Background(), gateway, &client.CreateOptions{}); err != nil { + if apierrors.IsAlreadyExists(err) { + r.activeGateway = gateway.Name + return nil + } return err } r.activeGateway = gateway.Name @@ -288,12 +336,12 @@ func (r *NodeReconciler) getGatawayNodeIP(node *corev1.Node) (string, string, er var gatewayIP, internalIP string for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { - if r.precedence == mcsv1alpha1.PrecedencePrivate || r.precedence == mcsv1alpha1.PrecedenceInternal { + if r.precedence == mcv1alpha1.PrecedencePrivate || r.precedence == mcv1alpha1.PrecedenceInternal { gatewayIP = addr.Address } internalIP = addr.Address } - if (r.precedence == mcsv1alpha1.PrecedencePublic || r.precedence == mcsv1alpha1.PrecedenceExternal) && + if (r.precedence == mcv1alpha1.PrecedencePublic || r.precedence == mcv1alpha1.PrecedenceExternal) && addr.Type == corev1.NodeExternalIP { gatewayIP = addr.Address } @@ -324,12 +372,57 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&corev1.Node{}). + Watches(&source.Kind{Type: &mcv1alpha2.ClusterSet{}}, + handler.EnqueueRequestsFromMapFunc(r.clusterSetMapFunc), + builder.WithPredicates(statusReadyPredicate)). WithOptions(controller.Options{ MaxConcurrentReconciles: 1, }). Complete(r) } +func (r *NodeReconciler) clusterSetMapFunc(a client.Object) []reconcile.Request { + clusterSet := &mcv1alpha2.ClusterSet{} + requests := []reconcile.Request{} + if a.GetNamespace() != r.namespace { + return requests + } + ctx := context.TODO() + err := r.Client.Get(ctx, types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()}, clusterSet) + if err == nil { + if len(clusterSet.Status.Conditions) > 0 && clusterSet.Status.Conditions[0].Status == corev1.ConditionTrue { + r.setClusterSetReady(true) + nodeList := &corev1.NodeList{} + r.Client.List(ctx, nodeList) + for _, n := range nodeList.Items { + if _, ok := n.Annotations[common.GatewayAnnotation]; ok { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: n.GetName(), + }, + }) + } + } + } + } else if apierrors.IsNotFound(err) { + r.setClusterSetReady(false) + r.activeGatewayMutex.Lock() + defer r.activeGatewayMutex.Unlock() + // The Gateway will be removed when a ClusterSet is deleted, so here we can set + // the activeGateway to empty directly. + r.activeGateway = "" + } + return requests +} + +func (r *NodeReconciler) checkClusterSetReady() bool { + return r.clusterSetReady.Load().(bool) +} + +func (r *NodeReconciler) setClusterSetReady(clusterSetReady bool) { + r.clusterSetReady.Store(clusterSetReady) +} + func isReadyNode(node *corev1.Node) bool { var nodeIsReady bool for _, s := range node.Status.Conditions { diff --git a/multicluster/controllers/multicluster/member/node_controller_test.go b/multicluster/controllers/multicluster/member/node_controller_test.go index 84db6786e71..c53051a7081 100644 --- a/multicluster/controllers/multicluster/member/node_controller_test.go +++ b/multicluster/controllers/multicluster/member/node_controller_test.go @@ -26,9 +26,11 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" - mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" ) @@ -37,8 +39,8 @@ var ( node2 *corev1.Node node3 *corev1.Node node4 *corev1.Node - updatedGateway2 *mcsv1alpha1.Gateway - gateway3 *mcsv1alpha1.Gateway + updatedGateway2 *mcv1alpha1.Gateway + gateway3 *mcv1alpha1.Gateway ) func initializeCommonData() { @@ -98,7 +100,7 @@ func initializeCommonData() { common.GatewayIPAnnotation: "invalid-gatewayip", } - updatedGateway2 = &mcsv1alpha1.Gateway{ + updatedGateway2 = &mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: "node-2", Namespace: "default", @@ -137,9 +139,9 @@ func TestNodeReconciler(t *testing.T) { name string nodes []*corev1.Node req reconcile.Request - precedence mcsv1alpha1.Precedence - existingGW *mcsv1alpha1.Gateway - expectedGW *mcsv1alpha1.Gateway + precedence mcv1alpha1.Precedence + existingGW *mcv1alpha1.Gateway + expectedGW *mcv1alpha1.Gateway activeGateway string candidates map[string]bool isDelete bool @@ -149,14 +151,14 @@ func TestNodeReconciler(t *testing.T) { nodes: []*corev1.Node{node1}, req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, expectedGW: &gwNode1, - precedence: mcsv1alpha1.PrecedencePublic, + precedence: mcv1alpha1.PrecedencePublic, }, { name: "update a Gateway successfully by changing GatewayIP", nodes: []*corev1.Node{&node1WithIPAnnotation}, req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, existingGW: &gwNode1, - expectedGW: &mcsv1alpha1.Gateway{ + expectedGW: &mcv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: "node-1", Namespace: "default", @@ -165,7 +167,7 @@ func TestNodeReconciler(t *testing.T) { InternalIP: "172.11.10.1", }, activeGateway: "node-1", - precedence: mcsv1alpha1.PrecedencePublic, + precedence: mcv1alpha1.PrecedencePublic, }, { name: "remove a Gateway Node to delete a Gateway successfully", @@ -174,7 +176,7 @@ func TestNodeReconciler(t *testing.T) { existingGW: &gwNode1, activeGateway: "node-1", isDelete: true, - precedence: mcsv1alpha1.PrecedencePublic, + precedence: mcv1alpha1.PrecedencePublic, }, { name: "remove a Gateway Node's annotation to delete a Gateway successfully", @@ -183,7 +185,7 @@ func TestNodeReconciler(t *testing.T) { existingGW: &gwNode1, activeGateway: "node-1", isDelete: true, - precedence: mcsv1alpha1.PrecedencePublic, + precedence: mcv1alpha1.PrecedencePublic, }, { name: "remote a Gateway due to no IPs", @@ -191,7 +193,7 @@ func TestNodeReconciler(t *testing.T) { req: reconcile.Request{NamespacedName: types.NamespacedName{Name: newNode1.Name}}, existingGW: &gwNode1, isDelete: true, - precedence: mcsv1alpha1.PrecedencePrivate, + precedence: mcv1alpha1.PrecedencePrivate, }, { name: "remove a Gateway Node to create a new Gateway from candidates successfully", @@ -200,7 +202,7 @@ func TestNodeReconciler(t *testing.T) { existingGW: &gwNode1, expectedGW: updatedGateway2, activeGateway: "node-1", - precedence: mcsv1alpha1.PrecedencePublic, + precedence: mcv1alpha1.PrecedencePublic, }, { name: "create a new Gateway successfully when active Gateway Node is not ready", @@ -209,7 +211,7 @@ func TestNodeReconciler(t *testing.T) { existingGW: gateway3, expectedGW: updatedGateway2, activeGateway: "node-3", - precedence: mcsv1alpha1.PrecedencePublic, + precedence: mcv1alpha1.PrecedencePublic, }, { name: "create a new Gateway successfully when active Gateway Node has no valid IP", @@ -218,7 +220,7 @@ func TestNodeReconciler(t *testing.T) { existingGW: gateway4, expectedGW: updatedGateway2, activeGateway: "node-4", - precedence: mcsv1alpha1.PrecedencePublic, + precedence: mcv1alpha1.PrecedencePublic, }, } for _, tt := range tests { @@ -233,10 +235,11 @@ func TestNodeReconciler(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(obj...).Build() r := NewNodeReconciler(fakeClient, common.TestScheme, "default", "10.100.0.0/16", tt.precedence) r.activeGateway = tt.activeGateway + r.setClusterSetReady(true) if _, err := r.Reconcile(common.TestCtx, tt.req); err != nil { t.Errorf("Node Reconciler should handle Node events successfully but got error = %v", err) } else { - newGW := &mcsv1alpha1.Gateway{} + newGW := &mcv1alpha1.Gateway{} gwNamespcedName := types.NamespacedName{Name: tt.req.Name, Namespace: "default"} if tt.expectedGW != nil { gwNamespcedName = types.NamespacedName{Name: tt.expectedGW.Name, Namespace: "default"} @@ -269,7 +272,7 @@ func TestInitialize(t *testing.T) { name string nodes []*corev1.Node req reconcile.Request - existingGW *mcsv1alpha1.Gateway + existingGW *mcv1alpha1.Gateway expectedActiveGateway string isDelete bool candidatesSize int @@ -307,14 +310,15 @@ func TestInitialize(t *testing.T) { obj = append(obj, tt.existingGW) } fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(obj...).Build() - r := NewNodeReconciler(fakeClient, common.TestScheme, "default", "10.100.0.0/16", mcsv1alpha1.PrecedencePublic) + r := NewNodeReconciler(fakeClient, common.TestScheme, "default", "10.100.0.0/16", mcv1alpha1.PrecedencePublic) + r.setClusterSetReady(true) if err := r.initialize(); err != nil { t.Errorf("Expected initialize() successfully but got err: %v", err) } else { assert.Equal(t, tt.expectedActiveGateway, r.activeGateway) assert.Equal(t, tt.candidatesSize, len(r.gatewayCandidates)) if tt.isDelete { - deletedGW := &mcsv1alpha1.Gateway{} + deletedGW := &mcv1alpha1.Gateway{} gwNamespcedName := types.NamespacedName{Name: tt.existingGW.Name, Namespace: "default"} err := fakeClient.Get(common.TestCtx, gwNamespcedName, deletedGW) if !apierrors.IsNotFound(err) { @@ -325,3 +329,147 @@ func TestInitialize(t *testing.T) { }) } } + +func TestClusterSetMapFunc(t *testing.T) { + clusterSet := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{ + { + Status: corev1.ConditionTrue, + Type: mcv1alpha2.ClusterSetReady, + }, + }, + }, + } + + deletedClusterSet := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test-deleted", + }, + } + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + Annotations: map[string]string{ + common.GatewayAnnotation: "true", + }, + }, + } + expectedReqs := []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: node1.GetName(), + }, + }, + } + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(clusterSet, node1).Build() + r := NewNodeReconciler(fakeClient, common.TestScheme, "default", "10.200.1.1/16", "") + r.setClusterSetReady(true) + requests := r.clusterSetMapFunc(clusterSet) + assert.Equal(t, expectedReqs, requests) + + requests = r.clusterSetMapFunc(deletedClusterSet) + assert.Equal(t, []reconcile.Request{}, requests) + assert.Equal(t, false, r.checkClusterSetReady()) + + r = NewNodeReconciler(fakeClient, common.TestScheme, "mismatch_ns", "10.200.1.1/16", "") + requests = r.clusterSetMapFunc(clusterSet) + assert.Equal(t, []reconcile.Request{}, requests) +} + +func Test_StatusPredicate(t *testing.T) { + tests := []struct { + name string + updateEvent event.UpdateEvent + expected bool + }{ + { + name: "status changed to ready", + updateEvent: event.UpdateEvent{ + ObjectOld: &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-clusterset", + Namespace: "default", + }, + }, + ObjectNew: &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-clusterset", + Namespace: "default", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{{ + Status: corev1.ConditionTrue, + }}, + }, + }, + }, + expected: true, + }, + { + name: "status is changed from unknown to ready", + updateEvent: event.UpdateEvent{ + ObjectOld: &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-clusterset", + Namespace: "default", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{{ + Status: corev1.ConditionUnknown, + }}, + }, + }, + ObjectNew: &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-clusterset", + Namespace: "default", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{{ + Status: corev1.ConditionTrue, + }}, + }, + }, + }, + expected: true, + }, + { + name: "status is ready but no change", + updateEvent: event.UpdateEvent{ + ObjectOld: &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-clusterset", + Namespace: "default", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{{ + Status: corev1.ConditionTrue, + }}, + }, + }, + ObjectNew: &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-clusterset", + Namespace: "default", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{{ + Status: corev1.ConditionTrue, + }}, + }, + }, + }, + expected: false, + }, + } + for _, tt := range tests { + actual := statusReadyPredicateFunc(tt.updateEvent) + assert.Equal(t, tt.expected, actual) + } +} diff --git a/multicluster/controllers/multicluster/member/serviceexport_controller.go b/multicluster/controllers/multicluster/member/serviceexport_controller.go index ac2810a2f7a..0ae6e6e938a 100644 --- a/multicluster/controllers/multicluster/member/serviceexport_controller.go +++ b/multicluster/controllers/multicluster/member/serviceexport_controller.go @@ -21,6 +21,7 @@ import ( "net" "reflect" "sync" + "sync/atomic" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -30,9 +31,11 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -42,7 +45,8 @@ import ( k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "antrea.io/antrea/multicluster/apis/multicluster/constants" - mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" ) @@ -71,11 +75,13 @@ type ( remoteCommonArea commonarea.RemoteCommonArea installedSvcs cache.Indexer installedEps cache.Indexer + namespace string leaderNamespace string leaderClusterID string localClusterID string endpointIPType string endpointSliceEnabled bool + clusterSetReady *atomic.Value } ) @@ -95,16 +101,22 @@ func NewServiceExportReconciler( scheme *runtime.Scheme, commonAreaGetter commonarea.RemoteCommonAreaGetter, endpointIPType string, - endpointSliceEnabled bool) *ServiceExportReconciler { + endpointSliceEnabled bool, + namespace string) *ServiceExportReconciler { + clusterSetReady := &atomic.Value{} + clusterSetReady.Store(false) reconciler := &ServiceExportReconciler{ Client: client, Scheme: scheme, + namespace: namespace, commonAreaGetter: commonAreaGetter, endpointIPType: endpointIPType, endpointSliceEnabled: endpointSliceEnabled, installedSvcs: cache.NewIndexer(svcInfoKeyFunc, cache.Indexers{}), installedEps: cache.NewIndexer(epInfoKeyFunc, cache.Indexers{}), + clusterSetReady: clusterSetReady, } + return reconciler } @@ -132,6 +144,12 @@ func epInfoKeyFunc(obj interface{}) (string, error) { // and also Services/Endpoints resources. It will create/update/remove ResourceExport // in a leader cluster for corresponding ServiceExport from a member cluster. func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if !r.checkClusterSetReady() { + klog.V(2).InfoS("Skip reconciling ServiceExports since there is no connection to the leader") + // Return immediately if there is no ClusterSet CR at all. + return ctrl.Result{}, nil + } + klog.V(2).InfoS("Reconciling ServiceExport", "serviceexport", req.NamespacedName) svcExportList := &k8smcsv1alpha1.ServiceExportList{} err := r.Client.List(ctx, svcExportList, &client.ListOptions{}) @@ -289,20 +307,20 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques if svcInstalled { installedSvc := svcObj.(*svcInfo) if apiequality.Semantic.DeepEqual(svc.Spec.Ports, installedSvc.ports) { - klog.InfoS("Service has been converted into ResourceExport and no change, skip it", "service", - req.String(), "resourceexport", svcExportNSName) skipUpdateSvcResourceExport = true + klog.V(2).InfoS("Service has been converted into ResourceExport and no change, skip it", "service", + req.String(), "resourceexport", svcExportNSName) } } if epsInstalled { installedEp := epsObj.(*epInfo) if apiequality.Semantic.DeepEqual(newSubsets, installedEp.subsets) { - klog.InfoS("Service's Endpoints (PodIP or ClusterIP) has been converted into ResourceExport and no change, skip it", "Service", - req.String(), "resourceexport", epExportNSName) // When the EndpointIPType is EndpointIPTypeClusterIP, skipUpdateEPResourceExport should be false only - // when there is a ClusterIP/Port change. + // when there is a ClusterIP/Port change or the recreation flag is true. skipUpdateEPResourceExport = true + klog.V(2).InfoS("Service's Endpoints (PodIP or ClusterIP) has been converted into ResourceExport and no change, skip it", "Service", + req.String(), "resourceexport", epExportNSName) } } @@ -310,7 +328,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } - re := mcsv1alpha1.ResourceExport{ + re := mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.leaderNamespace, Labels: map[string]string{ @@ -319,7 +337,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques constants.SourceClusterID: r.localClusterID, }, }, - Spec: mcsv1alpha1.ResourceExportSpec{ + Spec: mcv1alpha1.ResourceExportSpec{ ClusterID: r.localClusterID, Name: req.Name, Namespace: req.Namespace, @@ -376,7 +394,7 @@ func (r *ServiceExportReconciler) checkRemoteCommonArea() bool { func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, req ctrl.Request, commonArea commonarea.RemoteCommonArea) error { svcResExportName := getResourceExportName(r.localClusterID, req, "service") - svcResExport := &mcsv1alpha1.ResourceExport{ + svcResExport := &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: svcResExportName, Namespace: r.leaderNamespace, @@ -398,7 +416,7 @@ func (r *ServiceExportReconciler) handleServiceDeleteEvent(ctx context.Context, func (r *ServiceExportReconciler) handleEndpointDeleteEvent(ctx context.Context, req ctrl.Request, commonArea commonarea.RemoteCommonArea) error { epResExportName := getResourceExportName(r.localClusterID, req, "endpoints") - epResExport := &mcsv1alpha1.ResourceExport{ + epResExport := &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: epResExportName, Namespace: r.leaderNamespace, @@ -491,29 +509,69 @@ func (r *ServiceExportReconciler) updateSvcExportStatus(ctx context.Context, req // SetupWithManager sets up the controller with the Manager. func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error { // Watch events only when resource version changes - versionChange := predicate.ResourceVersionChangedPredicate{} + versionChangePredicates := builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}) if r.endpointSliceEnabled { return ctrl.NewControllerManagedBy(mgr). - For(&k8smcsv1alpha1.ServiceExport{}). - Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc)). - Watches(&source.Kind{Type: &discovery.EndpointSlice{}}, handler.EnqueueRequestsFromMapFunc(endpointSliceMapFunc)). - WithEventFilter(versionChange). + For(&k8smcsv1alpha1.ServiceExport{}, versionChangePredicates). + Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc), versionChangePredicates). + Watches(&source.Kind{Type: &discovery.EndpointSlice{}}, handler.EnqueueRequestsFromMapFunc(endpointSliceMapFunc), versionChangePredicates). + Watches(&source.Kind{Type: &mcv1alpha2.ClusterSet{}}, handler.EnqueueRequestsFromMapFunc(r.clusterSetMapFunc), + builder.WithPredicates(statusReadyPredicate)). WithOptions(controller.Options{ MaxConcurrentReconciles: common.DefaultWorkerCount, }). Complete(r) } return ctrl.NewControllerManagedBy(mgr). - For(&k8smcsv1alpha1.ServiceExport{}). - Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc)). - Watches(&source.Kind{Type: &corev1.Endpoints{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc)). - WithEventFilter(versionChange). + For(&k8smcsv1alpha1.ServiceExport{}, versionChangePredicates). + Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc), versionChangePredicates). + Watches(&source.Kind{Type: &corev1.Endpoints{}}, handler.EnqueueRequestsFromMapFunc(objectMapFunc), versionChangePredicates). + Watches(&source.Kind{Type: &mcv1alpha2.ClusterSet{}}, handler.EnqueueRequestsFromMapFunc(r.clusterSetMapFunc), + builder.WithPredicates(statusReadyPredicate)). WithOptions(controller.Options{ MaxConcurrentReconciles: common.DefaultWorkerCount, }). Complete(r) } +// clusterSetMapFunc handles ClusterSet events by enqueuing all ServiceExports +// into the reconciler processing queue. +func (r *ServiceExportReconciler) clusterSetMapFunc(a client.Object) []reconcile.Request { + clusterSet := &mcv1alpha2.ClusterSet{} + requests := []reconcile.Request{} + if a.GetNamespace() != r.namespace { + return requests + } + + ctx := context.TODO() + err := r.Client.Get(ctx, types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()}, clusterSet) + if err == nil { + if len(clusterSet.Status.Conditions) > 0 && clusterSet.Status.Conditions[0].Status == corev1.ConditionTrue { + r.SetClusterSetReady(true) + svcExports := &k8smcsv1alpha1.ServiceExportList{} + r.Client.List(ctx, svcExports) + existingSvcExports := sets.Set[string]{} + for _, svcExport := range svcExports.Items { + namespacedName := types.NamespacedName{ + Name: svcExport.GetName(), + Namespace: svcExport.GetNamespace(), + } + req := reconcile.Request{ + NamespacedName: namespacedName, + } + existingSvcExports.Insert(namespacedName.String()) + requests = append(requests, req) + } + } + } else if apierrors.IsNotFound(err) { + r.SetClusterSetReady(false) + // Reset caches when a ClusterSet is deleted. + r.installedSvcs = cache.NewIndexer(svcInfoKeyFunc, cache.Indexers{}) + r.installedEps = cache.NewIndexer(epInfoKeyFunc, cache.Indexers{}) + } + return requests +} + // objectMapFunc simply maps all Serivce and Endpoints events to ServiceExports. // When there are any Service or Endpoints changes, it might be reflected in ResourceExport // in leader cluster as well, so ServiceExportReconciler also needs to watch @@ -551,7 +609,7 @@ func (r *ServiceExportReconciler) serviceHandler( req ctrl.Request, svc *corev1.Service, resName string, - re mcsv1alpha1.ResourceExport, + re mcv1alpha1.ResourceExport, rc commonarea.RemoteCommonArea) error { kind := constants.ServiceKind sinfo := &svcInfo{ @@ -562,7 +620,7 @@ func (r *ServiceExportReconciler) serviceHandler( svcType: string(svc.Spec.Type), } r.resetResourceExport(resName, kind, svc, nil, &re) - existingResExport := &mcsv1alpha1.ResourceExport{} + existingResExport := &mcv1alpha1.ResourceExport{} resNamespaced := types.NamespacedName{Namespace: rc.GetNamespace(), Name: resName} err := rc.Get(ctx, resNamespaced, existingResExport) if err != nil { @@ -585,7 +643,7 @@ func (r *ServiceExportReconciler) endpointsHandler( req ctrl.Request, eps *corev1.Endpoints, resName string, - re mcsv1alpha1.ResourceExport, + re mcv1alpha1.ResourceExport, rc commonarea.RemoteCommonArea) error { kind := constants.EndpointsKind epInfo := &epInfo{ @@ -594,7 +652,7 @@ func (r *ServiceExportReconciler) endpointsHandler( subsets: eps.Subsets, } r.resetResourceExport(resName, kind, nil, eps, &re) - existingResExport := &mcsv1alpha1.ResourceExport{} + existingResExport := &mcv1alpha1.ResourceExport{} resNamespaced := types.NamespacedName{Namespace: rc.GetNamespace(), Name: resName} err := rc.Get(ctx, resNamespaced, existingResExport) if err != nil { @@ -613,12 +671,12 @@ func (r *ServiceExportReconciler) endpointsHandler( func (r *ServiceExportReconciler) resetResourceExport(resName, kind string, svc *corev1.Service, ep *corev1.Endpoints, - re *mcsv1alpha1.ResourceExport) mcsv1alpha1.ResourceExport { + re *mcv1alpha1.ResourceExport) mcv1alpha1.ResourceExport { re.Spec.Kind = kind switch kind { case constants.ServiceKind: re.ObjectMeta.Name = resName - re.Spec.Service = &mcsv1alpha1.ServiceExport{ + re.Spec.Service = &mcv1alpha1.ServiceExport{ ServiceSpec: corev1.ServiceSpec{ Ports: svc.Spec.Ports, }, @@ -626,7 +684,7 @@ func (r *ServiceExportReconciler) resetResourceExport(resName, kind string, re.Labels[constants.SourceKind] = constants.ServiceKind case constants.EndpointsKind: re.ObjectMeta.Name = resName - re.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{ + re.Spec.Endpoints = &mcv1alpha1.EndpointsExport{ Subsets: ep.Subsets, } re.Labels[constants.SourceKind] = constants.EndpointsKind @@ -637,10 +695,10 @@ func (r *ServiceExportReconciler) resetResourceExport(resName, kind string, func (r *ServiceExportReconciler) updateOrCreateResourceExport(resName string, ctx context.Context, req ctrl.Request, - newResExport *mcsv1alpha1.ResourceExport, - existingResExport *mcsv1alpha1.ResourceExport, + newResExport *mcv1alpha1.ResourceExport, + existingResExport *mcv1alpha1.ResourceExport, rc commonarea.RemoteCommonArea) error { - createResExport := reflect.DeepEqual(*existingResExport, mcsv1alpha1.ResourceExport{}) + createResExport := reflect.DeepEqual(*existingResExport, mcv1alpha1.ResourceExport{}) resNamespaced := types.NamespacedName{Namespace: rc.GetNamespace(), Name: resName} if createResExport { // We are using Finalizers to implement asynchronous pre-delete hooks. @@ -737,6 +795,14 @@ func (r *ServiceExportReconciler) checkSubsetsFromEndpoint(ctx context.Context, return nil, false, nil } +func (r *ServiceExportReconciler) checkClusterSetReady() bool { + return r.clusterSetReady.Load().(bool) +} + +func (r *ServiceExportReconciler) SetClusterSetReady(clusterSetReady bool) { + r.clusterSetReady.Store(clusterSetReady) +} + func convertEndpointPorts(ports []discovery.EndpointPort) []corev1.EndpointPort { var v1Ports []corev1.EndpointPort for _, port := range ports { diff --git a/multicluster/controllers/multicluster/member/serviceexport_controller_test.go b/multicluster/controllers/multicluster/member/serviceexport_controller_test.go index 9a7013b3a06..74296a1c2a0 100644 --- a/multicluster/controllers/multicluster/member/serviceexport_controller_test.go +++ b/multicluster/controllers/multicluster/member/serviceexport_controller_test.go @@ -20,6 +20,7 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -29,10 +30,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" - k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + k8smcv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "antrea.io/antrea/multicluster/apis/multicluster/constants" - mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2" "antrea.io/antrea/multicluster/controllers/multicluster/common" "antrea.io/antrea/multicluster/controllers/multicluster/commonarea" ) @@ -51,7 +53,7 @@ var ( Name: "nginx", }} - existSvcExport = &k8smcsv1alpha1.ServiceExport{ + existSvcExport = &k8smcv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "nginx", @@ -60,13 +62,13 @@ var ( ) func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { - existSvcResExport := &mcsv1alpha1.ResourceExport{ + existSvcResExport := &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: common.LeaderNamespace, Name: getResourceExportName(common.LocalClusterID, nginxReq, "service"), }, } - existEpResExport := &mcsv1alpha1.ResourceExport{ + existEpResExport := &mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: common.LeaderNamespace, Name: getResourceExportName(common.LocalClusterID, nginxReq, "endpoints"), @@ -80,7 +82,8 @@ func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, "ClusterIP", false) + r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, "ClusterIP", false, "default") + r.SetClusterSetReady(true) r.installedSvcs.Add(&svcInfo{ name: common.SvcNginx.Name, namespace: common.SvcNginx.Namespace, @@ -88,7 +91,7 @@ func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { if _, err := r.Reconcile(common.TestCtx, nginxReq); err != nil { t.Errorf("ServiceExport Reconciler should handle delete event successfully but got error = %v", err) } else { - epResource := &mcsv1alpha1.ResourceExport{} + epResource := &mcv1alpha1.ResourceExport{} err := fakeRemoteClient.Get(common.TestCtx, types.NamespacedName{ Namespace: "default", Name: "cluster-a-default-nginx-endpoints", @@ -96,7 +99,7 @@ func TestServiceExportReconciler_handleDeleteEvent(t *testing.T) { if !apierrors.IsNotFound(err) { t.Errorf("Expected not found error but got error = %v", err) } - svcResource := &mcsv1alpha1.ResourceExport{} + svcResource := &mcv1alpha1.ResourceExport{} err = fakeRemoteClient.Get(common.TestCtx, types.NamespacedName{ Namespace: "default", Name: "cluster-a-default-nginx-service", @@ -111,7 +114,7 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { mcsSvc := common.SvcNginx.DeepCopy() mcsSvc.Name = "antrea-mc-nginx" mcsSvc.Annotations = map[string]string{common.AntreaMCServiceAnnotation: "true"} - mcsSvcExport := &k8smcsv1alpha1.ServiceExport{ + mcsSvcExport := &k8smcv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "antrea-mc-nginx", @@ -120,7 +123,7 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { nginx0Svc := common.SvcNginx.DeepCopy() nginx0Svc.Name = "nginx0" - nginx0SvcExport := &k8smcsv1alpha1.ServiceExport{ + nginx0SvcExport := &k8smcv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "nginx0", @@ -132,15 +135,15 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { now := metav1.Now() reason := "service_without_endpoints" message := "the Service has no Endpoints, failed to export" - nginx1SvcExportWithStatus := &k8smcsv1alpha1.ServiceExport{ + nginx1SvcExportWithStatus := &k8smcv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "nginx1", }, - Status: k8smcsv1alpha1.ServiceExportStatus{ - Conditions: []k8smcsv1alpha1.ServiceExportCondition{ + Status: k8smcv1alpha1.ServiceExportStatus{ + Conditions: []k8smcv1alpha1.ServiceExportCondition{ { - Type: k8smcsv1alpha1.ServiceExportValid, + Type: k8smcv1alpha1.ServiceExportValid, Status: corev1.ConditionFalse, LastTransitionTime: &now, Reason: &reason, @@ -168,7 +171,7 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { nginx3Svc := common.SvcNginx.DeepCopy() nginx3Svc.Name = "nginx3" nginx3Svc.Spec.Type = corev1.ServiceTypeExternalName - nginx3SvcExport := &k8smcsv1alpha1.ServiceExport{ + nginx3SvcExport := &k8smcv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "nginx3", @@ -196,7 +199,7 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { Subsets: common.EPNginxSubset, } - svcExpNoClusterIP := &k8smcsv1alpha1.ServiceExport{ + svcExpNoClusterIP := &k8smcv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "nginx-no-ip", @@ -275,13 +278,14 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, "ClusterIP", false) + r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, "ClusterIP", false, "default") + r.SetClusterSetReady(true) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if _, err := r.Reconcile(common.TestCtx, tt.req); err != nil { t.Errorf("ServiceExport Reconciler should update ServiceExport status successfully but got error = %v", err) } else { - newSvcExport := &k8smcsv1alpha1.ServiceExport{} + newSvcExport := &k8smcv1alpha1.ServiceExport{} err := fakeClient.Get(common.TestCtx, types.NamespacedName{Namespace: tt.req.Namespace, Name: tt.req.Name}, newSvcExport) if err != nil { t.Errorf("ServiceExport Reconciler should get new ServiceExport successfully but got error = %v", err) @@ -351,21 +355,22 @@ func TestServiceExportReconciler_handleServiceExportCreateEvent(t *testing.T) { commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) mcReconciler := NewMemberClusterSetReconciler(tt.fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - r := NewServiceExportReconciler(tt.fakeClient, common.TestScheme, mcReconciler, tt.endpointIPType, tt.endpointSliceEnabled) + r := NewServiceExportReconciler(tt.fakeClient, common.TestScheme, mcReconciler, tt.endpointIPType, tt.endpointSliceEnabled, "default") + r.SetClusterSetReady(true) if _, err := r.Reconcile(common.TestCtx, nginxReq); err != nil { t.Errorf("ServiceExport Reconciler should create ResourceExports but got error = %v", err) } else { - svcResExport := &mcsv1alpha1.ResourceExport{} + svcResExport := &mcv1alpha1.ResourceExport{} err := fakeRemoteClient.Get(common.TestCtx, types.NamespacedName{Namespace: "default", Name: "cluster-a-default-nginx-service"}, svcResExport) if err != nil { t.Errorf("ServiceExport Reconciler should get new Service kind of ResourceExport successfully but got error = %v", err) } - epResExport := &mcsv1alpha1.ResourceExport{} + epResExport := &mcv1alpha1.ResourceExport{} err = fakeRemoteClient.Get(common.TestCtx, types.NamespacedName{Namespace: "default", Name: "cluster-a-default-nginx-endpoints"}, epResExport) if err != nil { t.Errorf("ServiceExport Reconciler should get new Endpoints kind of ResourceExport successfully but got error = %v", err) } - newSvcExport := &k8smcsv1alpha1.ServiceExport{} + newSvcExport := &k8smcv1alpha1.ServiceExport{} if err = tt.fakeClient.Get(common.TestCtx, types.NamespacedName{Namespace: "default", Name: "nginx"}, newSvcExport); err != nil { t.Errorf("Should get ServiceExport successfully but got error = %v", err) } else { @@ -423,7 +428,7 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { }, Subsets: common.EPNginxSubset, } - re := mcsv1alpha1.ResourceExport{ + re := mcv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Namespace: common.LeaderNamespace, Labels: map[string]string{ @@ -432,7 +437,7 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { constants.SourceClusterID: common.LocalClusterID, }, }, - Spec: mcsv1alpha1.ResourceExportSpec{ + Spec: mcv1alpha1.ResourceExportSpec{ ClusterID: common.LocalClusterID, Name: nginxReq.Name, Namespace: nginxReq.Namespace, @@ -440,12 +445,12 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { } existSvcRe := re.DeepCopy() existSvcRe.Name = "cluster-a-default-nginx-service" - existSvcRe.Spec.Service = &mcsv1alpha1.ServiceExport{ServiceSpec: corev1.ServiceSpec{}} + existSvcRe.Spec.Service = &mcv1alpha1.ServiceExport{ServiceSpec: corev1.ServiceSpec{}} existSvcRe.Spec.Service.ServiceSpec.Ports = []corev1.ServicePort{common.SvcPort80} existEpRe := re.DeepCopy() existEpRe.Name = "cluster-a-default-nginx-endpoints" - existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: filteredSubsets} + existEpRe.Spec.Endpoints = &mcv1alpha1.EndpointsExport{Subsets: filteredSubsets} tests := []struct { name string @@ -534,13 +539,14 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { commonArea := commonarea.NewFakeRemoteCommonArea(fakeRemoteClient, "leader-cluster", common.LocalClusterID, "default", nil) mcReconciler := NewMemberClusterSetReconciler(fakeClient, common.TestScheme, "default", false, false, make(chan struct{})) mcReconciler.SetRemoteCommonArea(commonArea) - r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, tt.endpointIPType, false) + r := NewServiceExportReconciler(fakeClient, common.TestScheme, mcReconciler, tt.endpointIPType, false, "default") + r.SetClusterSetReady(true) r.installedSvcs.Add(sinfo) r.installedEps.Add(epInfo) if _, err := r.Reconcile(common.TestCtx, nginxReq); err != nil { t.Errorf("ServiceExport Reconciler should update ResourceExports but got error = %v", err) } else { - svcResExport := &mcsv1alpha1.ResourceExport{} + svcResExport := &mcv1alpha1.ResourceExport{} err := fakeRemoteClient.Get(common.TestCtx, types.NamespacedName{Namespace: "default", Name: "cluster-a-default-nginx-service"}, svcResExport) if err != nil { t.Errorf("ServiceExport Reconciler should get new Service kind of ResourceExport successfully but got error = %v", err) @@ -550,7 +556,7 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { t.Errorf("Expected Service ports are %v but got %v", tt.expectedPorts, ports) } } - epResExport := &mcsv1alpha1.ResourceExport{} + epResExport := &mcv1alpha1.ResourceExport{} err = fakeRemoteClient.Get(common.TestCtx, types.NamespacedName{Namespace: "default", Name: "cluster-a-default-nginx-endpoints"}, epResExport) if err != nil { t.Errorf("ServiceExport Reconciler should get new Endpoints kind of ResourceExport successfully but got error = %v", err) @@ -641,3 +647,77 @@ func Test_endpointSliceMapFunc(t *testing.T) { }) } } + +func TestClusterSetMapFunc_ServiceExport(t *testing.T) { + clusterSet := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test", + }, + Status: mcv1alpha2.ClusterSetStatus{ + Conditions: []mcv1alpha2.ClusterSetCondition{ + { + Status: corev1.ConditionTrue, + Type: mcv1alpha2.ClusterSetReady, + }, + }, + }, + } + clusterSet2 := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "clusterset-test-deleted", + }, + } + svcExport1 := k8smcv1alpha1.ServiceExport{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "nginx", + }, + } + svcExport2 := k8smcv1alpha1.ServiceExport{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "web", + }, + } + serviceExports := &k8smcv1alpha1.ServiceExportList{ + Items: []k8smcv1alpha1.ServiceExport{ + svcExport1, svcExport2, + }, + } + expectedReqs := []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: svcExport1.GetName(), + Namespace: svcExport1.GetNamespace(), + }, + }, + { + NamespacedName: types.NamespacedName{ + Name: svcExport2.GetName(), + Namespace: svcExport2.GetNamespace(), + }, + }, + } + fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(clusterSet).WithLists(serviceExports).Build() + r := NewServiceExportReconciler(fakeClient, common.TestScheme, nil, "PodIP", true, clusterSet.Namespace) + r.SetClusterSetReady(true) + requests := r.clusterSetMapFunc(clusterSet) + assert.Equal(t, expectedReqs, requests) + + r = NewServiceExportReconciler(fakeClient, common.TestScheme, nil, "PodIP", true, "mismatch_ns") + r.SetClusterSetReady(true) + requests = r.clusterSetMapFunc(clusterSet) + assert.Equal(t, []reconcile.Request{}, requests) + + // non-existing ClusterSet + r = NewServiceExportReconciler(fakeClient, common.TestScheme, nil, "PodIP", true, "default") + r.SetClusterSetReady(true) + r.installedSvcs.Add(&svcInfo{name: "nginx-stale", namespace: "default"}) + r.installedEps.Add(&epInfo{name: "nginx-stale", namespace: "default"}) + requests = r.clusterSetMapFunc(clusterSet2) + assert.Equal(t, []reconcile.Request{}, requests) + assert.Equal(t, 0, len(r.installedSvcs.List())) + assert.Equal(t, 0, len(r.installedEps.List())) +} diff --git a/multicluster/test/integration/suite_test.go b/multicluster/test/integration/suite_test.go index fc8296e2d8c..8be631ea028 100644 --- a/multicluster/test/integration/suite_test.go +++ b/multicluster/test/integration/suite_test.go @@ -155,8 +155,10 @@ var _ = BeforeSuite(func() { k8sManager.GetScheme(), clusterSetReconciler, "ClusterIP", - false) + false, + testNamespace) err = svcExportReconciler.SetupWithManager(k8sManager) + svcExportReconciler.SetClusterSetReady(true) Expect(err).ToNot(HaveOccurred()) // import reconciler will be started from RemoteCommonArea after @@ -189,10 +191,34 @@ var _ = BeforeSuite(func() { err = k8sManager.Start(ctrl.SetupSignalHandler()) Expect(err).ToNot(HaveOccurred()) }() - configureClusterSet() + configureMemberClusterSet() + configureLeaderClusterSet() }) -func configureClusterSet() { +func configureMemberClusterSet() { + clusterSet := &mcv1alpha2.ClusterSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: clusterSetID, + }, + Spec: mcv1alpha2.ClusterSetSpec{ + ClusterID: LocalClusterID, + Leaders: []mcv1alpha2.LeaderClusterInfo{ + { + ClusterID: LocalClusterID, + Secret: "access-token", + Server: k8sServerURL, + }, + }, + Namespace: LeaderNamespace, + }, + } + ctx := context.Background() + err := k8sClient.Create(ctx, clusterSet, &client.CreateOptions{}) + Expect(err == nil).Should(BeTrue()) +} + +func configureLeaderClusterSet() { clusterSet := &mcv1alpha2.ClusterSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: LeaderNamespace,