Skip to content

Commit

Permalink
Recreate resources when a member cluster rejoin the ClusterSet
Browse files Browse the repository at this point in the history
Add ClusterSet events mapping in a few member cluster controllers to make sure
that when the ClusterSet CR is deleted and recreated in a member cluster,
the corresponding ResourceExports will be created again in the leader cluster.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Aug 21, 2023
1 parent acf1846 commit 4c79c34
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 113 deletions.
60 changes: 45 additions & 15 deletions multicluster/controllers/multicluster/member/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"antrea.io/antrea/multicluster/apis/multicluster/constants"
mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2"
"antrea.io/antrea/multicluster/controllers/multicluster/common"
"antrea.io/antrea/multicluster/controllers/multicluster/commonarea"
)
Expand Down Expand Up @@ -87,15 +93,15 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
Name: resExportName,
Namespace: r.leaderNamespace,
}
resExport := &mcsv1alpha1.ResourceExport{
resExport := &mcv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Name: resExportName,
Namespace: r.leaderNamespace,
},
}

createOrUpdate := func(gateway *mcsv1alpha1.Gateway) error {
existingResExport := &mcsv1alpha1.ResourceExport{}
createOrUpdate := func(gateway *mcv1alpha1.Gateway) error {
existingResExport := &mcv1alpha1.ResourceExport{}
err := commonArea.Get(ctx, resExportNamespacedName, existingResExport)
if err != nil && !apierrors.IsNotFound(err) {
return err
Expand All @@ -114,7 +120,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return nil
}

gw := &mcsv1alpha1.Gateway{}
gw := &mcv1alpha1.Gateway{}
if err := r.Client.Get(ctx, req.NamespacedName, gw); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
Expand All @@ -132,8 +138,8 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gw *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
commonArea commonarea.RemoteCommonArea, existingResExport *mcv1alpha1.ResourceExport, gw *mcv1alpha1.Gateway) error {
resExportSpec := mcv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Name: r.localClusterID,
Expand All @@ -150,15 +156,15 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
}

func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, gateway *mcsv1alpha1.Gateway) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
commonArea commonarea.RemoteCommonArea, gateway *mcv1alpha1.Gateway) error {
resExportSpec := mcv1alpha1.ResourceExportSpec{
Kind: constants.ClusterInfoKind,
ClusterID: r.localClusterID,
Name: r.localClusterID,
Namespace: r.namespace,
}
resExportSpec.ClusterInfo = r.getClusterInfo(gateway)
resExport := &mcsv1alpha1.ResourceExport{
resExport := &mcv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.leaderNamespace,
Name: common.NewClusterInfoResourceExportName(r.localClusterID),
Expand All @@ -176,7 +182,10 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R
// SetupWithManager sets up the controller with the Manager.
func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&mcsv1alpha1.Gateway{}).
For(&mcv1alpha1.Gateway{}).
Watches(&source.Kind{Type: &mcv1alpha2.ClusterSet{}},
handler.EnqueueRequestsFromMapFunc(r.clusterSetMapFunc),
builder.WithPredicates(predicate.GenerationChangedPredicate{})).
WithOptions(controller.Options{
// TODO: add a lock for r.serviceCIDR and r.localClusterID if
// there is any plan to increase this concurrent number.
Expand All @@ -185,19 +194,40 @@ func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *GatewayReconciler) getClusterInfo(gateway *mcsv1alpha1.Gateway) *mcsv1alpha1.ClusterInfo {
clusterInfo := &mcsv1alpha1.ClusterInfo{
func (r *GatewayReconciler) clusterSetMapFunc(a client.Object) []reconcile.Request {
clientSet := &mcv1alpha2.ClusterSet{}
requests := []reconcile.Request{}
ctx := context.TODO()
err := r.Client.Get(ctx, types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()}, clientSet)
if err == nil {
GatewayList := &mcv1alpha1.GatewayList{}
r.Client.List(ctx, GatewayList, client.InNamespace(r.namespace))
requests = make([]reconcile.Request, len(GatewayList.Items))
for i, gw := range GatewayList.Items {
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Name: gw.GetName(),
Namespace: gw.GetNamespace(),
},
}
}
}
return requests
}

func (r *GatewayReconciler) getClusterInfo(gateway *mcv1alpha1.Gateway) *mcv1alpha1.ClusterInfo {
clusterInfo := &mcv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: gateway.ServiceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{
GatewayInfos: []mcv1alpha1.GatewayInfo{
{
GatewayIP: gateway.GatewayIP,
},
},
}
if gateway.WireGuard != nil && gateway.WireGuard.PublicKey != "" {
clusterInfo.WireGuard = &mcsv1alpha1.WireGuardInfo{
clusterInfo.WireGuard = &mcv1alpha1.WireGuardInfo{
PublicKey: gateway.WireGuard.PublicKey,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"antrea.io/antrea/multicluster/apis/multicluster/constants"
mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
mcv1alpha2 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha2"
"antrea.io/antrea/multicluster/controllers/multicluster/common"
"antrea.io/antrea/multicluster/controllers/multicluster/commonarea"
)
Expand All @@ -41,7 +43,7 @@ var (

gw1CreationTime = metav1.NewTime(time.Now())

gwNode1 = mcsv1alpha1.Gateway{
gwNode1 = mcv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
Namespace: "default",
Expand All @@ -51,19 +53,19 @@ var (
InternalIP: "172.11.10.1",
}

existingResExport = &mcsv1alpha1.ResourceExport{
existingResExport = &mcv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-a-clusterinfo",
Namespace: common.LeaderNamespace,
},
Spec: mcsv1alpha1.ResourceExportSpec{
Spec: mcv1alpha1.ResourceExportSpec{
Name: clusterID,
Namespace: "default",
Kind: constants.ClusterInfoKind,
ClusterInfo: &mcsv1alpha1.ClusterInfo{
ClusterInfo: &mcv1alpha1.ClusterInfo{
ServiceCIDR: serviceCIDR,
ClusterID: clusterID,
GatewayInfos: []mcsv1alpha1.GatewayInfo{
GatewayInfos: []mcv1alpha1.GatewayInfo{
{
GatewayIP: "10.10.10.10",
},
Expand All @@ -81,9 +83,9 @@ func TestGatewayReconciler(t *testing.T) {
tests := []struct {
name string
namespacedName types.NamespacedName
gateway []mcsv1alpha1.Gateway
resExport *mcsv1alpha1.ResourceExport
expectedInfo []mcsv1alpha1.GatewayInfo
gateway []mcv1alpha1.Gateway
resExport *mcv1alpha1.ResourceExport
expectedInfo []mcv1alpha1.GatewayInfo
expectedErr string
isDelete bool
}{
Expand All @@ -93,10 +95,10 @@ func TestGatewayReconciler(t *testing.T) {
Namespace: "default",
Name: "node-1",
},
gateway: []mcsv1alpha1.Gateway{
gateway: []mcv1alpha1.Gateway{
gwNode1,
},
expectedInfo: []mcsv1alpha1.GatewayInfo{
expectedInfo: []mcv1alpha1.GatewayInfo{
{
GatewayIP: "10.10.10.10",
},
Expand All @@ -108,7 +110,7 @@ func TestGatewayReconciler(t *testing.T) {
Namespace: "default",
Name: "node-1",
},
gateway: []mcsv1alpha1.Gateway{
gateway: []mcv1alpha1.Gateway{
gwNode1,
},
resExport: staleExistingResExport,
Expand All @@ -120,11 +122,11 @@ func TestGatewayReconciler(t *testing.T) {
Namespace: "default",
Name: "node-1",
},
gateway: []mcsv1alpha1.Gateway{
gateway: []mcv1alpha1.Gateway{
gwNode1New,
},
resExport: existingResExport,
expectedInfo: []mcsv1alpha1.GatewayInfo{
expectedInfo: []mcv1alpha1.GatewayInfo{
{
GatewayIP: "10.10.10.12",
},
Expand Down Expand Up @@ -166,7 +168,7 @@ func TestGatewayReconciler(t *testing.T) {
t.Errorf("Gateway Reconciler should handle ResourceExports events successfully but got error = %v", err)
}
} else {
ciExport := mcsv1alpha1.ResourceExport{}
ciExport := mcv1alpha1.ResourceExport{}
ciExportName := types.NamespacedName{
Namespace: common.LeaderNamespace,
Name: common.NewClusterInfoResourceExportName(common.LocalClusterID),
Expand All @@ -189,29 +191,57 @@ func TestGatewayReconciler(t *testing.T) {
func TestGetClusterInfo(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects().Build()
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, nil)
gw := &mcsv1alpha1.Gateway{
gw := &mcv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: "gw",
},
ServiceCIDR: "10.100.0.0/16",
GatewayIP: "10.10.1.1",
InternalIP: "10.10.1.1",
WireGuard: &mcsv1alpha1.WireGuardInfo{
WireGuard: &mcv1alpha1.WireGuardInfo{
PublicKey: "key",
},
}
expectedClusterInfo := &mcsv1alpha1.ClusterInfo{
GatewayInfos: []mcsv1alpha1.GatewayInfo{
expectedClusterInfo := &mcv1alpha1.ClusterInfo{
GatewayInfos: []mcv1alpha1.GatewayInfo{
{
GatewayIP: "10.10.1.1",
},
},
ServiceCIDR: "10.100.0.0/16",
PodCIDRs: []string{"10.200.1.1/16"},
WireGuard: &mcsv1alpha1.WireGuardInfo{
WireGuard: &mcv1alpha1.WireGuardInfo{
PublicKey: "key",
},
}

assert.Equal(t, expectedClusterInfo, r.getClusterInfo(gw))
}

func TestClusterSetMapFunc(t *testing.T) {
clusterSet := &mcv1alpha2.ClusterSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "clusterset-test",
},
}
gateway := &mcv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "node-1",
},
}
expectedReqs := []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: gateway.GetName(),
Namespace: gateway.GetNamespace(),
},
},
}
fakeClient := fake.NewClientBuilder().WithScheme(common.TestScheme).WithObjects(clusterSet, gateway).Build()
r := NewGatewayReconciler(fakeClient, common.TestScheme, "default", []string{"10.200.1.1/16"}, nil)
object := clusterSet
requests := r.clusterSetMapFunc(object)
assert.Equal(t, expectedReqs, requests)
}
Loading

0 comments on commit 4c79c34

Please sign in to comment.