Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed May 23, 2023
1 parent 74a63f6 commit e626140
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ type MultiClusterConfig struct {
// ClusterSet and allow Antrea-native policies to select peers from other clusters
// in a ClusterSet.
EnableStretchedNetworkPolicy bool `json:"enableStretchedNetworkPolicy,omitempty"`
// Enable EndpointSlice to watch EndpointSlice changes only for exported Service, this is required
// when EndpointIPType is PodIP and CNI's EndpointSlice feature is enabled.
EnableEndpointSlice bool `json:"enableEndpointSlice,omitempty"`
}

func init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ data:
gatewayIPPrecedence: "private"
endpointIPType: "ClusterIP"
enableStretchedNetworkPolicy: false
enableEndpointSlice: false
kind: ConfigMap
metadata:
labels:
Expand Down Expand Up @@ -364,7 +363,7 @@ spec:
template:
metadata:
annotations:
checksum/config: bcadbc5c432be6653173ef54e7512b86aa52b5a4f911de3f9793843f3e0e26e7
checksum/config: 7eb0f1e65f7eb3e35b0739d6064b92b7621af0f4e41813c35bfdee71ceaefbe2
labels:
app: antrea
component: antrea-mc-controller
Expand Down
3 changes: 1 addition & 2 deletions multicluster/build/yamls/antrea-multicluster-member.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,6 @@ data:
gatewayIPPrecedence: "private"
endpointIPType: "ClusterIP"
enableStretchedNetworkPolicy: false
enableEndpointSlice: false
kind: ConfigMap
metadata:
labels:
Expand Down Expand Up @@ -1113,7 +1112,7 @@ spec:
template:
metadata:
annotations:
checksum/config: bcadbc5c432be6653173ef54e7512b86aa52b5a4f911de3f9793843f3e0e26e7
checksum/config: 7eb0f1e65f7eb3e35b0739d6064b92b7621af0f4e41813c35bfdee71ceaefbe2
labels:
app: antrea
component: antrea-mc-controller
Expand Down
14 changes: 14 additions & 0 deletions multicluster/cmd/multicluster-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
antreacrd "antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/apiserver/certificate"
"antrea.io/antrea/pkg/util/env"
k8sutil "antrea.io/antrea/pkg/util/k8s"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -154,6 +155,19 @@ func setupManagerAndCertController(isLeader bool, o *Options) (manager.Manager,
o.options.CertDir = certDir
}

// EndpointSlice is enabled in AntreaProxy by default since v1.11, so Antrea MC
// will use EndpointSlice API by default to keep consistent with AntreaProxy.
endpointSliceAPIAvailable, err := k8sutil.EndpointSliceAPIAvailable(client)
if err != nil {
return nil, fmt.Errorf("error checking if EndpointSlice v1 API is available")
}
if !endpointSliceAPIAvailable {
klog.InfoS("The EndpointSlice v1 API is not available, falling back to the Endpoints API")
o.EnableEndpointSlice = false
} else {
o.EnableEndpointSlice = true
}

mgr, err := ctrl.NewManager(k8sConfig, o.options)
if err != nil {
return nil, fmt.Errorf("error starting manager: %v", err)
Expand Down
4 changes: 1 addition & 3 deletions multicluster/cmd/multicluster-controller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ type Options struct {
// Enable StretchedNetworkPolicy to exchange labelIdentities info among the whole
// ClusterSet.
EnableStretchedNetworkPolicy bool
// Enable EndpointSlice to watch EndpointSlice changes only for exported Service, this is required
// when EndpointIPType is PodIP and CNI's EndpointSlice feature is enabled.
// Watch EndpointSlice API for exported Service if EndpointSlice API is available.
EnableEndpointSlice bool
}

Expand Down Expand Up @@ -94,7 +93,6 @@ func (o *Options) complete(args []string) error {
o.EndpointIPType = ctrlConfig.EndpointIPType
}
o.EnableStretchedNetworkPolicy = ctrlConfig.EnableStretchedNetworkPolicy
o.EnableEndpointSlice = ctrlConfig.EnableEndpointSlice
klog.InfoS("Using config from file", "config", o.options)
} else {
klog.InfoS("Using default config", "config", o.options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ podCIDRs:
gatewayIPPrecedence: "private"
endpointIPType: "ClusterIP"
enableStretchedNetworkPolicy: false
enableEndpointSlice: false
21 changes: 0 additions & 21 deletions multicluster/controllers/multicluster/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,6 @@ func GetServiceEndpointPorts(ports []corev1.ServicePort) []corev1.EndpointPort {
return epPorts
}

// FilterEndpointSubsets keeps IPs only and removes other fields from EndpointSubset
// which are unnecessary information for other member clusters.
func FilterEndpointSubsets(subsets []corev1.EndpointSubset) []corev1.EndpointSubset {
var newSubsets []corev1.EndpointSubset
for _, s := range subsets {
subset := corev1.EndpointSubset{}
var newAddresses []corev1.EndpointAddress
for _, addr := range s.Addresses {
newAddresses = append(newAddresses, corev1.EndpointAddress{
IP: addr.IP,
})
}
if len(newAddresses) > 0 {
subset.Addresses = newAddresses
subset.Ports = s.Ports
newSubsets = append(newSubsets, subset)
}
}
return newSubsets
}

// HashLabelIdentity generates a hash value for label identity string.
func HashLabelIdentity(l string) string {
hash := sha1.New() // #nosec G401: not used for security purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,14 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

var hasReadyEndpoints bool
var newSubsets []corev1.EndpointSubset
if r.endpointSliceEnabled {
eps.Subsets, hasReadyEndpoints, err = r.getSubsetsFromEndpointSlice(ctx, req)
newSubsets, hasReadyEndpoints, err = r.getSubsetsFromEndpointSlice(ctx, req)
if err != nil {
return ctrl.Result{}, err
}
} else {
hasReadyEndpoints, err = r.checkSubsetsFromEndpoint(ctx, req, eps)
newSubsets, hasReadyEndpoints, err = r.checkSubsetsFromEndpoint(ctx, req, eps)
if err != nil {
klog.ErrorS(err, "Failed to get Endpoints", req.String())
return ctrl.Result{}, err
Expand All @@ -267,21 +268,6 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

// We also watch Service events via events mapping function.
// Need to check cache and compare with cache if there is any change for Service.
var skipUpdateSvcResourceExport, skipUpdateEPResourceExport bool
svcExportNSName := common.NamespacedName(r.leaderNamespace, svcResExportName)
epExportNSName := common.NamespacedName(r.leaderNamespace, epResExportName)
if svcInstalled {
installedSvc := svcObj.(*svcInfo)
if apiequality.Semantic.DeepEqual(svc.Spec.Ports, installedSvc.ports) {
klog.InfoS("Service has been converted into ResourceExport and no change, skip it", "service",
req.String(), "resourceexport", svcExportNSName)
skipUpdateSvcResourceExport = true
}
}

var newSubsets []corev1.EndpointSubset
if r.endpointIPType == common.EndpointIPTypeClusterIP {
svcIPAsSubset := getClusterIPEndpointSubset(svc)
if len(svcIPAsSubset.Addresses) == 0 {
Expand All @@ -293,11 +279,19 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
} else {
newSubsets = []corev1.EndpointSubset{svcIPAsSubset}
}
} else {
if r.endpointSliceEnabled {
newSubsets = eps.Subsets
} else {
newSubsets = filterEndpointSubsets(eps.Subsets)
}

// We also watch Service events via events mapping function.
// Need to check cache and compare with cache if there is any change for Service.
var skipUpdateSvcResourceExport, skipUpdateEPResourceExport bool
svcExportNSName := common.NamespacedName(r.leaderNamespace, svcResExportName)
epExportNSName := common.NamespacedName(r.leaderNamespace, epResExportName)
if svcInstalled {
installedSvc := svcObj.(*svcInfo)
if apiequality.Semantic.DeepEqual(svc.Spec.Ports, installedSvc.ports) {
klog.InfoS("Service has been converted into ResourceExport and no change, skip it", "service",
req.String(), "resourceexport", svcExportNSName)
skipUpdateSvcResourceExport = true
}
}

Expand Down Expand Up @@ -455,6 +449,8 @@ func (r *ServiceExportReconciler) updateSvcExportStatus(ctx context.Context, req
newCondition.Message = getStringPointer("The Service is imported, not allowed to export")
case serviceExported:
newCondition.Status = corev1.ConditionTrue
newCondition.Reason = getStringPointer("Succeed")
newCondition.Message = getStringPointer("The Service is exported successfully")
}

svcExportConditions := svcExport.Status.DeepCopy().Conditions
Expand Down Expand Up @@ -673,7 +669,7 @@ func (r *ServiceExportReconciler) updateOrCreateResourceExport(resName string,
}

// getSubsetsFromEndpointSlice will get all ready endpoints from all the EndpointSlices which will
// be merged to one Endpoints. In future, we should change to track and export individual
// be merged to one Endpoints. In the future, we should change to track and export individual
// EndpointSlices, rather than merge them to one Endpoints.
func (r *ServiceExportReconciler) getSubsetsFromEndpointSlice(ctx context.Context, req ctrl.Request) ([]corev1.EndpointSubset, bool, error) {
epSliceList := &discovery.EndpointSliceList{}
Expand All @@ -694,39 +690,51 @@ func (r *ServiceExportReconciler) getSubsetsFromEndpointSlice(ctx context.Contex
if r.endpointIPType == common.EndpointIPTypePodIP {
ports = convertEndpointPorts(eps.Ports)
}
subset := corev1.EndpointSubset{}
subset.Ports = ports
for _, ep := range eps.Endpoints {
if *ep.Conditions.Ready {
if ep.Conditions.Ready != nil && *ep.Conditions.Ready {
// We only cares if there is ready Endpoints for a Service when the endpointIPType is ClusterIP,
// so skip handling the EndpointSubset and stop the loop early if any ready address is found.
if r.endpointIPType == common.EndpointIPTypeClusterIP {
return nil, hasReadyEndpoints, nil
return nil, true, nil
}
subset := corev1.EndpointSubset{}
readyAddresses := ipsToEndpointAddresses(ep.Addresses)
subset.Addresses = append(subset.Addresses, readyAddresses...)
subset.Ports = ports
subsets = append(subsets, subset)
if len(readyAddresses) > 0 {
subset.Addresses = append(subset.Addresses, readyAddresses...)
subsets = append(subsets, subset)
}
}
}
}
}
return subsets, len(subsets) > 0, nil
}

func (r *ServiceExportReconciler) checkSubsetsFromEndpoint(ctx context.Context, req ctrl.Request, eps *corev1.Endpoints) (bool, error) {
func (r *ServiceExportReconciler) checkSubsetsFromEndpoint(ctx context.Context, req ctrl.Request, eps *corev1.Endpoints) ([]corev1.EndpointSubset, bool, error) {
var newSubsets []corev1.EndpointSubset
err := r.Client.Get(ctx, req.NamespacedName, eps)
if err == nil {
for _, subsets := range eps.Subsets {
if len(subsets.Addresses) > 0 {
return true, nil
for _, s := range eps.Subsets {
subset := corev1.EndpointSubset{}
var newAddresses []corev1.EndpointAddress
for _, addr := range s.Addresses {
newAddresses = append(newAddresses, corev1.EndpointAddress{
IP: addr.IP,
})
}
if len(newAddresses) > 0 {
subset.Addresses = newAddresses
subset.Ports = s.Ports
newSubsets = append(newSubsets, subset)
}
}
return false, nil
return newSubsets, len(newSubsets) > 0, nil
}
if !apierrors.IsNotFound(err) {
return false, err
return nil, false, err
}
return false, nil
return nil, false, nil
}

func convertEndpointPorts(ports []discovery.EndpointPort) []corev1.EndpointPort {
Expand Down Expand Up @@ -803,24 +811,3 @@ func getServiceEndpointPorts(ports []corev1.ServicePort) []corev1.EndpointPort {
}
return epPorts
}

// filterEndpointSubsets keeps IPs only and removes other fields from EndpointSubset
// which are unnecessary information for other member clusters.
func filterEndpointSubsets(subsets []corev1.EndpointSubset) []corev1.EndpointSubset {
var newSubsets []corev1.EndpointSubset
for _, s := range subsets {
subset := corev1.EndpointSubset{}
var newAddresses []corev1.EndpointAddress
for _, addr := range s.Addresses {
newAddresses = append(newAddresses, corev1.EndpointAddress{
IP: addr.IP,
})
}
if len(newAddresses) > 0 {
subset.Addresses = newAddresses
subset.Ports = s.Ports
newSubsets = append(newSubsets, subset)
}
}
return newSubsets
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ func TestServiceExportReconciler_CheckExportStatus(t *testing.T) {
}},
},
{
name: "export Service and update status successfully",
name: "export Service and update status successfully",
expectedReason: "Succeed",
expectedMessage: "The Service is exported successfully",
req: ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: "default",
Name: "nginx1",
Expand Down Expand Up @@ -386,10 +388,27 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) {
svcType: string(common.SvcNginx.Spec.Type),
}

filteredSubsets := []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.17.11",
},
},
Ports: []corev1.EndpointPort{
{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
},
},
},
}

epInfo := &epInfo{
name: common.EPNginx.Name,
namespace: common.EPNginx.Namespace,
subsets: filterEndpointSubsets(common.EPNginx.Subsets),
subsets: filteredSubsets,
}

newSvcNginx := common.SvcNginx.DeepCopy()
Expand Down Expand Up @@ -426,7 +445,7 @@ func TestServiceExportReconciler_handleUpdateEvent(t *testing.T) {

existEpRe := re.DeepCopy()
existEpRe.Name = "cluster-a-default-nginx-endpoints"
existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: filterEndpointSubsets(common.EPNginxSubset)}
existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: filteredSubsets}

tests := []struct {
name string
Expand Down
21 changes: 2 additions & 19 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"antrea.io/ofnet/ofctrl"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -46,6 +45,7 @@ import (
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/features"
binding "antrea.io/antrea/pkg/ovs/openflow"
k8sutil "antrea.io/antrea/pkg/util/k8s"
k8sproxy "antrea.io/antrea/third_party/proxy"
"antrea.io/antrea/third_party/proxy/config"
"antrea.io/antrea/third_party/proxy/healthcheck"
Expand Down Expand Up @@ -1117,7 +1117,7 @@ func NewProxier(

endpointSliceEnabled := features.DefaultFeatureGate.Enabled(features.EndpointSlice)
if endpointSliceEnabled {
apiAvailable, err := endpointSliceAPIAvailable(k8sClient)
apiAvailable, err := k8sutil.EndpointSliceAPIAvailable(k8sClient)
if err != nil {
return nil, fmt.Errorf("error checking if EndpointSlice v1 API is available")
}
Expand Down Expand Up @@ -1184,23 +1184,6 @@ func NewProxier(
return p, nil
}

func endpointSliceAPIAvailable(k8sClient clientset.Interface) (bool, error) {
resources, err := k8sClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String())
if err != nil {
// The group version doesn't exist.
if errors.IsNotFound(err) {
return false, nil
}
return false, fmt.Errorf("error getting server resources for GroupVersion %s: %v", discovery.SchemeGroupVersion.String(), err)
}
for _, resource := range resources.APIResources {
if resource.Kind == "EndpointSlice" {
return true, nil
}
}
return false, nil
}

// metaProxierWrapper wraps metaProxier, and implements the extra methods added
// in interface Proxier.
type metaProxierWrapper struct {
Expand Down
Loading

0 comments on commit e626140

Please sign in to comment.