diff --git a/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go b/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go index 7dc103d79aa..af0ed8c8c3c 100644 --- a/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go +++ b/multicluster/apis/multicluster/v1alpha1/multiclusterconfig_types.go @@ -57,9 +57,6 @@ type MultiClusterConfig struct { // ClusterSet and allow Antrea-native policies to select peers from other clusters // in a ClusterSet. EnableStretchedNetworkPolicy bool `json:"enableStretchedNetworkPolicy,omitempty"` - // Enable EndpointSlice to watch EndpointSlice changes only for exported Service, this is required - // when EndpointIPType is PodIP and CNI's EndpointSlice feature is enabled. - EnableEndpointSlice bool `json:"enableEndpointSlice,omitempty"` } func init() { diff --git a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml index 776bdd475af..ad04e65cc71 100644 --- a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml +++ b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml @@ -324,7 +324,6 @@ data: gatewayIPPrecedence: "private" endpointIPType: "ClusterIP" enableStretchedNetworkPolicy: false - enableEndpointSlice: false kind: ConfigMap metadata: labels: @@ -364,7 +363,7 @@ spec: template: metadata: annotations: - checksum/config: bcadbc5c432be6653173ef54e7512b86aa52b5a4f911de3f9793843f3e0e26e7 + checksum/config: 7eb0f1e65f7eb3e35b0739d6064b92b7621af0f4e41813c35bfdee71ceaefbe2 labels: app: antrea component: antrea-mc-controller diff --git a/multicluster/build/yamls/antrea-multicluster-member.yml b/multicluster/build/yamls/antrea-multicluster-member.yml index 5d7c13a9f18..8615b550a87 100644 --- a/multicluster/build/yamls/antrea-multicluster-member.yml +++ b/multicluster/build/yamls/antrea-multicluster-member.yml @@ -1073,7 +1073,6 @@ data: gatewayIPPrecedence: "private" endpointIPType: "ClusterIP" enableStretchedNetworkPolicy: false - enableEndpointSlice: false kind: ConfigMap metadata: labels: @@ -1113,7 +1112,7 @@ spec: template: metadata: annotations: - checksum/config: bcadbc5c432be6653173ef54e7512b86aa52b5a4f911de3f9793843f3e0e26e7 + checksum/config: 7eb0f1e65f7eb3e35b0739d6064b92b7621af0f4e41813c35bfdee71ceaefbe2 labels: app: antrea component: antrea-mc-controller diff --git a/multicluster/cmd/multicluster-controller/controller.go b/multicluster/cmd/multicluster-controller/controller.go index 09694f6f5e8..f27f108c766 100644 --- a/multicluster/cmd/multicluster-controller/controller.go +++ b/multicluster/cmd/multicluster-controller/controller.go @@ -42,6 +42,7 @@ import ( antreacrd "antrea.io/antrea/pkg/apis/crd/v1alpha1" "antrea.io/antrea/pkg/apiserver/certificate" "antrea.io/antrea/pkg/util/env" + k8sutil "antrea.io/antrea/pkg/util/k8s" // +kubebuilder:scaffold:imports ) @@ -154,6 +155,19 @@ func setupManagerAndCertController(isLeader bool, o *Options) (manager.Manager, o.options.CertDir = certDir } + // EndpointSlice is enabled in AntreaProxy by default since v1.11, so Antrea MC + // will use EndpointSlice API by default to keep consistent with AntreaProxy. + endpointSliceAPIAvailable, err := k8sutil.EndpointSliceAPIAvailable(client) + if err != nil { + return nil, fmt.Errorf("error checking if EndpointSlice v1 API is available") + } + if !endpointSliceAPIAvailable { + klog.InfoS("The EndpointSlice v1 API is not available, falling back to the Endpoints API") + o.EnableEndpointSlice = false + } else { + o.EnableEndpointSlice = true + } + mgr, err := ctrl.NewManager(k8sConfig, o.options) if err != nil { return nil, fmt.Errorf("error starting manager: %v", err) diff --git a/multicluster/cmd/multicluster-controller/options.go b/multicluster/cmd/multicluster-controller/options.go index 4a4fc7b8450..a343962ed58 100644 --- a/multicluster/cmd/multicluster-controller/options.go +++ b/multicluster/cmd/multicluster-controller/options.go @@ -44,8 +44,7 @@ type Options struct { // Enable StretchedNetworkPolicy to exchange labelIdentities info among the whole // ClusterSet. EnableStretchedNetworkPolicy bool - // Enable EndpointSlice to watch EndpointSlice changes only for exported Service, this is required - // when EndpointIPType is PodIP and CNI's EndpointSlice feature is enabled. + // Watch EndpointSlice API for exported Service if EndpointSlice API is available. EnableEndpointSlice bool } @@ -94,7 +93,6 @@ func (o *Options) complete(args []string) error { o.EndpointIPType = ctrlConfig.EndpointIPType } o.EnableStretchedNetworkPolicy = ctrlConfig.EnableStretchedNetworkPolicy - o.EnableEndpointSlice = ctrlConfig.EnableEndpointSlice klog.InfoS("Using config from file", "config", o.options) } else { klog.InfoS("Using default config", "config", o.options) diff --git a/multicluster/config/default/configmap/controller_manager_config.yaml b/multicluster/config/default/configmap/controller_manager_config.yaml index 37ec6477fe9..853e3d117a2 100644 --- a/multicluster/config/default/configmap/controller_manager_config.yaml +++ b/multicluster/config/default/configmap/controller_manager_config.yaml @@ -14,4 +14,3 @@ podCIDRs: gatewayIPPrecedence: "private" endpointIPType: "ClusterIP" enableStretchedNetworkPolicy: false -enableEndpointSlice: false diff --git a/multicluster/controllers/multicluster/common/helper.go b/multicluster/controllers/multicluster/common/helper.go index 2dcca29e8df..7e6f1248f90 100644 --- a/multicluster/controllers/multicluster/common/helper.go +++ b/multicluster/controllers/multicluster/common/helper.go @@ -76,27 +76,6 @@ func GetServiceEndpointPorts(ports []corev1.ServicePort) []corev1.EndpointPort { return epPorts } -// FilterEndpointSubsets keeps IPs only and removes other fields from EndpointSubset -// which are unnecessary information for other member clusters. -func FilterEndpointSubsets(subsets []corev1.EndpointSubset) []corev1.EndpointSubset { - var newSubsets []corev1.EndpointSubset - for _, s := range subsets { - subset := corev1.EndpointSubset{} - var newAddresses []corev1.EndpointAddress - for _, addr := range s.Addresses { - newAddresses = append(newAddresses, corev1.EndpointAddress{ - IP: addr.IP, - }) - } - if len(newAddresses) > 0 { - subset.Addresses = newAddresses - subset.Ports = s.Ports - newSubsets = append(newSubsets, subset) - } - } - return newSubsets -} - // HashLabelIdentity generates a hash value for label identity string. func HashLabelIdentity(l string) string { hash := sha1.New() // #nosec G401: not used for security purposes diff --git a/multicluster/controllers/multicluster/member/serviceexport_controller.go b/multicluster/controllers/multicluster/member/serviceexport_controller.go index b5d3346d164..ac2810a2f7a 100644 --- a/multicluster/controllers/multicluster/member/serviceexport_controller.go +++ b/multicluster/controllers/multicluster/member/serviceexport_controller.go @@ -240,13 +240,14 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques } var hasReadyEndpoints bool + var newSubsets []corev1.EndpointSubset if r.endpointSliceEnabled { - eps.Subsets, hasReadyEndpoints, err = r.getSubsetsFromEndpointSlice(ctx, req) + newSubsets, hasReadyEndpoints, err = r.getSubsetsFromEndpointSlice(ctx, req) if err != nil { return ctrl.Result{}, err } } else { - hasReadyEndpoints, err = r.checkSubsetsFromEndpoint(ctx, req, eps) + newSubsets, hasReadyEndpoints, err = r.checkSubsetsFromEndpoint(ctx, req, eps) if err != nil { klog.ErrorS(err, "Failed to get Endpoints", req.String()) return ctrl.Result{}, err @@ -267,21 +268,6 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } - // We also watch Service events via events mapping function. - // Need to check cache and compare with cache if there is any change for Service. - var skipUpdateSvcResourceExport, skipUpdateEPResourceExport bool - svcExportNSName := common.NamespacedName(r.leaderNamespace, svcResExportName) - epExportNSName := common.NamespacedName(r.leaderNamespace, epResExportName) - if svcInstalled { - installedSvc := svcObj.(*svcInfo) - if apiequality.Semantic.DeepEqual(svc.Spec.Ports, installedSvc.ports) { - klog.InfoS("Service has been converted into ResourceExport and no change, skip it", "service", - req.String(), "resourceexport", svcExportNSName) - skipUpdateSvcResourceExport = true - } - } - - var newSubsets []corev1.EndpointSubset if r.endpointIPType == common.EndpointIPTypeClusterIP { svcIPAsSubset := getClusterIPEndpointSubset(svc) if len(svcIPAsSubset.Addresses) == 0 { @@ -293,11 +279,19 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques } else { newSubsets = []corev1.EndpointSubset{svcIPAsSubset} } - } else { - if r.endpointSliceEnabled { - newSubsets = eps.Subsets - } else { - newSubsets = filterEndpointSubsets(eps.Subsets) + } + + // We also watch Service events via events mapping function. + // Need to check cache and compare with cache if there is any change for Service. + var skipUpdateSvcResourceExport, skipUpdateEPResourceExport bool + svcExportNSName := common.NamespacedName(r.leaderNamespace, svcResExportName) + epExportNSName := common.NamespacedName(r.leaderNamespace, epResExportName) + if svcInstalled { + installedSvc := svcObj.(*svcInfo) + if apiequality.Semantic.DeepEqual(svc.Spec.Ports, installedSvc.ports) { + klog.InfoS("Service has been converted into ResourceExport and no change, skip it", "service", + req.String(), "resourceexport", svcExportNSName) + skipUpdateSvcResourceExport = true } } @@ -455,6 +449,8 @@ func (r *ServiceExportReconciler) updateSvcExportStatus(ctx context.Context, req newCondition.Message = getStringPointer("The Service is imported, not allowed to export") case serviceExported: newCondition.Status = corev1.ConditionTrue + newCondition.Reason = getStringPointer("Succeed") + newCondition.Message = getStringPointer("The Service is exported successfully") } svcExportConditions := svcExport.Status.DeepCopy().Conditions @@ -673,7 +669,7 @@ func (r *ServiceExportReconciler) updateOrCreateResourceExport(resName string, } // getSubsetsFromEndpointSlice will get all ready endpoints from all the EndpointSlices which will -// be merged to one Endpoints. In future, we should change to track and export individual +// be merged to one Endpoints. In the future, we should change to track and export individual // EndpointSlices, rather than merge them to one Endpoints. func (r *ServiceExportReconciler) getSubsetsFromEndpointSlice(ctx context.Context, req ctrl.Request) ([]corev1.EndpointSubset, bool, error) { epSliceList := &discovery.EndpointSliceList{} @@ -694,18 +690,20 @@ func (r *ServiceExportReconciler) getSubsetsFromEndpointSlice(ctx context.Contex if r.endpointIPType == common.EndpointIPTypePodIP { ports = convertEndpointPorts(eps.Ports) } + subset := corev1.EndpointSubset{} + subset.Ports = ports for _, ep := range eps.Endpoints { - if *ep.Conditions.Ready { + if ep.Conditions.Ready != nil && *ep.Conditions.Ready { // We only cares if there is ready Endpoints for a Service when the endpointIPType is ClusterIP, // so skip handling the EndpointSubset and stop the loop early if any ready address is found. if r.endpointIPType == common.EndpointIPTypeClusterIP { - return nil, hasReadyEndpoints, nil + return nil, true, nil } - subset := corev1.EndpointSubset{} readyAddresses := ipsToEndpointAddresses(ep.Addresses) - subset.Addresses = append(subset.Addresses, readyAddresses...) - subset.Ports = ports - subsets = append(subsets, subset) + if len(readyAddresses) > 0 { + subset.Addresses = append(subset.Addresses, readyAddresses...) + subsets = append(subsets, subset) + } } } } @@ -713,20 +711,30 @@ func (r *ServiceExportReconciler) getSubsetsFromEndpointSlice(ctx context.Contex return subsets, len(subsets) > 0, nil } -func (r *ServiceExportReconciler) checkSubsetsFromEndpoint(ctx context.Context, req ctrl.Request, eps *corev1.Endpoints) (bool, error) { +func (r *ServiceExportReconciler) checkSubsetsFromEndpoint(ctx context.Context, req ctrl.Request, eps *corev1.Endpoints) ([]corev1.EndpointSubset, bool, error) { + var newSubsets []corev1.EndpointSubset err := r.Client.Get(ctx, req.NamespacedName, eps) if err == nil { - for _, subsets := range eps.Subsets { - if len(subsets.Addresses) > 0 { - return true, nil + for _, s := range eps.Subsets { + subset := corev1.EndpointSubset{} + var newAddresses []corev1.EndpointAddress + for _, addr := range s.Addresses { + newAddresses = append(newAddresses, corev1.EndpointAddress{ + IP: addr.IP, + }) + } + if len(newAddresses) > 0 { + subset.Addresses = newAddresses + subset.Ports = s.Ports + newSubsets = append(newSubsets, subset) } } - return false, nil + return newSubsets, len(newSubsets) > 0, nil } if !apierrors.IsNotFound(err) { - return false, err + return nil, false, err } - return false, nil + return nil, false, nil } func convertEndpointPorts(ports []discovery.EndpointPort) []corev1.EndpointPort { @@ -803,24 +811,3 @@ func getServiceEndpointPorts(ports []corev1.ServicePort) []corev1.EndpointPort { } return epPorts } - -// filterEndpointSubsets keeps IPs only and removes other fields from EndpointSubset -// which are unnecessary information for other member clusters. -func filterEndpointSubsets(subsets []corev1.EndpointSubset) []corev1.EndpointSubset { - var newSubsets []corev1.EndpointSubset - for _, s := range subsets { - subset := corev1.EndpointSubset{} - var newAddresses []corev1.EndpointAddress - for _, addr := range s.Addresses { - newAddresses = append(newAddresses, corev1.EndpointAddress{ - IP: addr.IP, - }) - } - if len(newAddresses) > 0 { - subset.Addresses = newAddresses - subset.Ports = s.Ports - newSubsets = append(newSubsets, subset) - } - } - return newSubsets -} diff --git a/multicluster/controllers/multicluster/member/serviceexport_controller_test.go b/multicluster/controllers/multicluster/member/serviceexport_controller_test.go index 55e5076f777..1be66be44a1 100644 --- a/multicluster/controllers/multicluster/member/serviceexport_controller_test.go +++ b/multicluster/controllers/multicluster/member/serviceexport_controller_test.go @@ -249,7 +249,9 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) { }}, }, { - name: "export Service and update status successfully", + name: "export Service and update status successfully", + expectedReason: "Succeed", + expectedMessage: "The Service is exported successfully", req: ctrl.Request{NamespacedName: types.NamespacedName{ Namespace: "default", Name: "nginx1", @@ -386,10 +388,27 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { svcType: string(common.SvcNginx.Spec.Type), } + filteredSubsets := []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "192.168.17.11", + }, + }, + Ports: []corev1.EndpointPort{ + { + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + } + epInfo := &epInfo{ name: common.EPNginx.Name, namespace: common.EPNginx.Namespace, - subsets: filterEndpointSubsets(common.EPNginx.Subsets), + subsets: filteredSubsets, } newSvcNginx := common.SvcNginx.DeepCopy() @@ -426,7 +445,7 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) { existEpRe := re.DeepCopy() existEpRe.Name = "cluster-a-default-nginx-endpoints" - existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: filterEndpointSubsets(common.EPNginxSubset)} + existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: filteredSubsets} tests := []struct { name string diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 6b83131f1b7..898f7d897c9 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -28,7 +28,6 @@ import ( "antrea.io/ofnet/ofctrl" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -46,6 +45,7 @@ import ( "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/features" binding "antrea.io/antrea/pkg/ovs/openflow" + k8sutil "antrea.io/antrea/pkg/util/k8s" k8sproxy "antrea.io/antrea/third_party/proxy" "antrea.io/antrea/third_party/proxy/config" "antrea.io/antrea/third_party/proxy/healthcheck" @@ -1117,7 +1117,7 @@ func NewProxier( endpointSliceEnabled := features.DefaultFeatureGate.Enabled(features.EndpointSlice) if endpointSliceEnabled { - apiAvailable, err := endpointSliceAPIAvailable(k8sClient) + apiAvailable, err := k8sutil.EndpointSliceAPIAvailable(k8sClient) if err != nil { return nil, fmt.Errorf("error checking if EndpointSlice v1 API is available") } @@ -1184,23 +1184,6 @@ func NewProxier( return p, nil } -func endpointSliceAPIAvailable(k8sClient clientset.Interface) (bool, error) { - resources, err := k8sClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()) - if err != nil { - // The group version doesn't exist. - if errors.IsNotFound(err) { - return false, nil - } - return false, fmt.Errorf("error getting server resources for GroupVersion %s: %v", discovery.SchemeGroupVersion.String(), err) - } - for _, resource := range resources.APIResources { - if resource.Kind == "EndpointSlice" { - return true, nil - } - } - return false, nil -} - // metaProxierWrapper wraps metaProxier, and implements the extra methods added // in interface Proxier. type metaProxierWrapper struct { diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 0693dfa0415..55b583d4caf 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -24,7 +24,6 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -2808,44 +2807,3 @@ func TestGetServiceFlowKeys(t *testing.T) { }) } } - -func TestEndpointSliceAPIAvailable(t *testing.T) { - testCases := []struct { - name string - resources []*metav1.APIResourceList - expectedAvailable bool - }{ - { - name: "empty", - expectedAvailable: false, - }, - { - name: "GroupVersion exists", - resources: []*metav1.APIResourceList{ - { - GroupVersion: discovery.SchemeGroupVersion.String(), - }, - }, - expectedAvailable: false, - }, - { - name: "API exists", - resources: []*metav1.APIResourceList{ - { - GroupVersion: discovery.SchemeGroupVersion.String(), - APIResources: []metav1.APIResource{{Kind: "EndpointSlice"}}, - }, - }, - expectedAvailable: true, - }, - } - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - k8sClient := fake.NewSimpleClientset() - k8sClient.Resources = tt.resources - available, err := endpointSliceAPIAvailable(k8sClient) - require.NoError(t, err) - assert.Equal(t, tt.expectedAvailable, available) - }) - } -} diff --git a/pkg/util/k8s/client.go b/pkg/util/k8s/client.go index b9af5bfe0d8..fbc08b2669b 100644 --- a/pkg/util/k8s/client.go +++ b/pkg/util/k8s/client.go @@ -15,8 +15,12 @@ package k8s import ( + "fmt" + netdefclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1" + discovery "k8s.io/api/discovery/v1" apiextensionclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/errors" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -108,3 +112,20 @@ func createRestConfig(config componentbaseconfig.ClientConnectionConfiguration, return kubeConfig, nil } + +func EndpointSliceAPIAvailable(k8sClient clientset.Interface) (bool, error) { + resources, err := k8sClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()) + if err != nil { + // The group version doesn't exist. + if errors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("error getting server resources for GroupVersion %s: %v", discovery.SchemeGroupVersion.String(), err) + } + for _, resource := range resources.APIResources { + if resource.Kind == "EndpointSlice" { + return true, nil + } + } + return false, nil +} diff --git a/pkg/util/k8s/client_test.go b/pkg/util/k8s/client_test.go new file mode 100644 index 00000000000..fc106ac1f4b --- /dev/null +++ b/pkg/util/k8s/client_test.go @@ -0,0 +1,66 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8s + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestEndpointSliceAPIAvailable(t *testing.T) { + testCases := []struct { + name string + resources []*metav1.APIResourceList + expectedAvailable bool + }{ + { + name: "empty", + expectedAvailable: false, + }, + { + name: "GroupVersion exists", + resources: []*metav1.APIResourceList{ + { + GroupVersion: discovery.SchemeGroupVersion.String(), + }, + }, + expectedAvailable: false, + }, + { + name: "API exists", + resources: []*metav1.APIResourceList{ + { + GroupVersion: discovery.SchemeGroupVersion.String(), + APIResources: []metav1.APIResource{{Kind: "EndpointSlice"}}, + }, + }, + expectedAvailable: true, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + k8sClient := fake.NewSimpleClientset() + k8sClient.Resources = tt.resources + available, err := EndpointSliceAPIAvailable(k8sClient) + require.NoError(t, err) + assert.Equal(t, tt.expectedAvailable, available) + }) + } +}