Skip to content

Commit

Permalink
Recreate resources when a member cluster rejoin the ClusterSet
Browse files Browse the repository at this point in the history
1. Add ClusterSet event mapping to several member cluster controllers
to ensure that when a ClusterSet CR is recreated in a member cluster,
the corresponding ResourceExports will be created again in the leader
cluster.
2. Skip reconciling resources when there is no ClusterSet CR in the
member cluster.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Oct 23, 2023
1 parent cfff914 commit bbe0c0c
Show file tree
Hide file tree
Showing 11 changed files with 843 additions and 180 deletions.
25 changes: 14 additions & 11 deletions multicluster/cmd/multicluster-controller/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,32 @@ func newLeaderCommand() *cobra.Command {

func runLeader(o *Options) error {
// on the leader we want the reconciler to run for a given Namespace instead of cluster scope
o.options.Namespace = env.GetPodNamespace()
podNamespace := env.GetPodNamespace()
o.options.Namespace = podNamespace
stopCh := signals.RegisterSignalHandlers()

mgr, err := setupManagerAndCertControllerFunc(true, o)
if err != nil {
return err
}

mgrClient := mgr.GetClient()
mgrScheme := mgr.GetScheme()
memberClusterStatusManager := leader.NewMemberClusterAnnounceReconciler(
mgr.GetClient(), mgr.GetScheme())
mgrClient, mgrScheme)
if err = memberClusterStatusManager.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating MemberClusterAnnounce controller: %v", err)
}

noCachedClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
noCachedClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgrScheme, Mapper: mgr.GetRESTMapper()})
if err != nil {
return err
}
hookServer := mgr.GetWebhookServer()
hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-memberclusterannounce",
&webhook.Admission{Handler: &memberClusterAnnounceValidator{
Client: noCachedClient,
namespace: env.GetPodNamespace()}})
namespace: podNamespace}})

hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha2-clusterset",
&webhook.Admission{Handler: &clusterSetValidator{
Expand All @@ -83,8 +86,8 @@ func runLeader(o *Options) error {
})

clusterSetReconciler := &leader.LeaderClusterSetReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgrClient,
Scheme: mgrScheme,
StatusManager: memberClusterStatusManager,
ClusterCalimCRDAvailable: o.ClusterCalimCRDAvailable,
}
Expand All @@ -93,16 +96,16 @@ func runLeader(o *Options) error {
}

resExportReconciler := &leader.ResourceExportReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme()}
Client: mgrClient,
Scheme: mgrScheme}
if err = resExportReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating ResourceExport controller: %v", err)
}
if o.EnableStretchedNetworkPolicy {
labelExportReconciler := leader.NewLabelIdentityExportReconciler(
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace())
mgrClient,
mgrScheme,
podNamespace)
if err = labelExportReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating LabelIdentityExport controller: %v", err)
}
Expand Down
36 changes: 20 additions & 16 deletions multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,20 @@ func runMember(o *Options) error {
if err != nil {
return err
}

mgrClient := mgr.GetClient()
mgrScheme := mgr.GetScheme()
podNamespace := env.GetPodNamespace()
stopCh := signals.RegisterSignalHandlers()
hookServer := mgr.GetWebhookServer()
hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-gateway",
&webhook.Admission{Handler: &gatewayValidator{
Client: mgr.GetClient(),
namespace: env.GetPodNamespace()}})
Client: mgrClient,
namespace: podNamespace}})

hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha2-clusterset",
&webhook.Admission{Handler: &clusterSetValidator{
Client: mgr.GetClient(),
namespace: env.GetPodNamespace(),
Client: mgrClient,
namespace: podNamespace,
role: memberRole},
})

Expand All @@ -83,40 +85,42 @@ func runMember(o *Options) error {

commonAreaGetter := clusterSetReconciler
svcExportReconciler := member.NewServiceExportReconciler(
mgr.GetClient(),
mgr.GetScheme(),
mgrClient,
mgrScheme,
commonAreaGetter,
o.EndpointIPType,
o.EnableEndpointSlice,
podNamespace,
)
if err = svcExportReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating ServiceExport controller: %v", err)
}
if o.EnableStretchedNetworkPolicy {
labelIdentityReconciler := member.NewLabelIdentityReconciler(
mgr.GetClient(),
mgr.GetScheme(),
commonAreaGetter)
mgrClient,
mgrScheme,
commonAreaGetter,
podNamespace)
if err = labelIdentityReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating LabelIdentity controller: %v", err)
}
go labelIdentityReconciler.Run(stopCh)
}

gwReconciler := member.NewGatewayReconciler(
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
mgrClient,
mgrScheme,
podNamespace,
opts.PodCIDRs,
commonAreaGetter)
if err = gwReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error creating Gateway controller: %v", err)
}

nodeReconciler := member.NewNodeReconciler(
mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
mgrClient,
mgrScheme,
podNamespace,
opts.ServiceCIDR,
opts.GatewayIPPrecedence)
if err = nodeReconciler.SetupWithManager(mgr); err != nil {
Expand Down
85 changes: 70 additions & 15 deletions multicluster/controllers/multicluster/member/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@ package member

import (
"context"
"sync/atomic"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"antrea.io/antrea/multicluster/apis/multicluster/constants"
mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2"
"antrea.io/antrea/multicluster/controllers/multicluster/common"
"antrea.io/antrea/multicluster/controllers/multicluster/commonarea"
)
Expand All @@ -44,6 +51,7 @@ type (
localClusterID string
podCIDRs []string
leaderNamespace string
clusterSetReady *atomic.Value
}
)

Expand All @@ -55,12 +63,15 @@ func NewGatewayReconciler(
namespace string,
podCIDRs []string,
commonAreaGetter commonarea.RemoteCommonAreaGetter) *GatewayReconciler {
clusterSetReady := &atomic.Value{}
clusterSetReady.Store(false)
reconciler := &GatewayReconciler{
Client: client,
Scheme: scheme,
namespace: namespace,
podCIDRs: podCIDRs,
commonAreaGetter: commonAreaGetter,
clusterSetReady: clusterSetReady,
}
return reconciler
}
Expand All @@ -73,6 +84,11 @@ func NewGatewayReconciler(
//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=clusterinfoimports/finalizers,verbs=update

func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if !r.checkClusterSetReady() {
klog.InfoS("Skip reconciling Gateway since there is no connection to the leader")
return ctrl.Result{}, nil
}

klog.V(2).InfoS("Reconciling Gateway", "gateway", req.NamespacedName)
var err error
var commonArea commonarea.RemoteCommonArea
Expand All @@ -87,15 +103,15 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
Name: resExportName,
Namespace: r.leaderNamespace,
}
resExport := &mcsv1alpha1.ResourceExport{
resExport := &mcv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Name: resExportName,
Namespace: r.leaderNamespace,
},
}

createOrUpdate := func(gateway *mcsv1alpha1.Gateway) error {
existingResExport := &mcsv1alpha1.ResourceExport{}
createOrUpdate := func(gateway *mcv1alpha1.Gateway) error {
existingResExport := &mcv1alpha1.ResourceExport{}
err := commonArea.Get(ctx, resExportNamespacedName, existingResExport)
if err != nil && !apierrors.IsNotFound(err) {
return err
Expand All @@ -114,7 +130,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return nil
}

gw := &mcsv1alpha1.Gateway{}
gw := &mcv1alpha1.Gateway{}
if err := r.Client.Get(ctx, req.NamespacedName, gw); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
Expand All @@ -132,8 +148,8 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gw *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
commonArea commonarea.RemoteCommonArea, existingResExport *mcv1alpha1.ResourceExport, gw *mcv1alpha1.Gateway) error {
resExportSpec := mcv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Name: r.localClusterID,
Expand All @@ -150,15 +166,15 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
}

func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, gateway *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
commonArea commonarea.RemoteCommonArea, gateway *mcv1alpha1.Gateway) error {
resExportSpec := mcv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Name: r.localClusterID,
Namespace: r.namespace,
}
resExportSpec.ClusterInfo = r.getClusterInfo(gateway)
resExport := &mcsv1alpha1.ResourceExport{
resExport := &mcv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.leaderNamespace,
Name: common.NewClusterInfoResourceExportName(r.localClusterID),
Expand All @@ -176,7 +192,9 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R
// SetupWithManager sets up the controller with the Manager.
func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mcsv1alpha1.Gateway{}).
For(&mcv1alpha1.Gateway{}).
Watches(&source.Kind{Type: &mcv1alpha2.ClusterSet{}}, handler.EnqueueRequestsFromMapFunc(r.clusterSetMapFunc),
builder.WithPredicates(statusReadyPredicate)).
WithOptions(controller.Options{
// TODO: add a lock for r.serviceCIDR and r.localClusterID if
// there is any plan to increase this concurrent number.
Expand All @@ -185,19 +203,56 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *GatewayReconciler) getClusterInfo(gateway *mcsv1alpha1.Gateway) *mcsv1alpha1.ClusterInfo {
clusterInfo := &mcsv1alpha1.ClusterInfo{
func (r *GatewayReconciler) clusterSetMapFunc(a client.Object) []reconcile.Request {
clusterSet := &mcv1alpha2.ClusterSet{}
requests := []reconcile.Request{}
if a.GetNamespace() != r.namespace {
return requests
}
ctx := context.TODO()
err := r.Client.Get(ctx, types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()}, clusterSet)
if err == nil {
if len(clusterSet.Status.Conditions) > 0 && clusterSet.Status.Conditions[0].Status == v1.ConditionTrue {
r.setClusterSetReady(true)
gwList := &mcv1alpha1.GatewayList{}
r.Client.List(ctx, gwList, &client.ListOptions{Namespace: r.namespace})
requests = make([]reconcile.Request, len(gwList.Items))
for i, gw := range gwList.Items {
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: gw.Namespace,
Name: gw.Name,
},
}
}
}
} else if apierrors.IsNotFound(err) {
r.setClusterSetReady(false)
}
return requests
}

func (r *GatewayReconciler) checkClusterSetReady() bool {
return r.clusterSetReady.Load().(bool)
}

func (r *GatewayReconciler) setClusterSetReady(clusterSetReady bool) {
r.clusterSetReady.Store(clusterSetReady)
}

func (r *GatewayReconciler) getClusterInfo(gateway *mcv1alpha1.Gateway) *mcv1alpha1.ClusterInfo {
clusterInfo := &mcv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: gateway.ServiceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{
GatewayInfos: []mcv1alpha1.GatewayInfo{
{
GatewayIP: gateway.GatewayIP,
},
},
}
if gateway.WireGuard != nil && gateway.WireGuard.PublicKey != "" {
clusterInfo.WireGuard = &mcsv1alpha1.WireGuardInfo{
clusterInfo.WireGuard = &mcv1alpha1.WireGuardInfo{
PublicKey: gateway.WireGuard.PublicKey,
}
}
Expand Down
Loading

0 comments on commit bbe0c0c

Please sign in to comment.