From f20bdb715950580b20ea56352ce481825de0fe21 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Thu, 30 May 2024 21:22:24 -0700 Subject: [PATCH] Remove Agent's dependency on proxy to access Antrea Service (#6361) We add Endpoint resolution to the AntreaClientProvider, so that when running in-cluster, accessing the Antrea Service (i.e., accessing the Antrea Controller API) no longer depends on the ClusterIP functionality provided by the K8s proxy, whether it is kube-proxy or AntreaProxy. This gives us more flexibility during Agent initialization. For example, when kube-proxy is removed and ProxyAll is enable for AntreaProxy, accessing the Antrea Service no longer requires any routes or OVS flows installed by the Antrea Agent. To implement this functionality, we add a controller (EndpointResolver), to watch the Antrea Service and the corresponding Endpoints resource. For every relevant update, the Endpoint is resolved and the new URL is sent to the AntreaClientProvider. This is a similar model as the one we already use for CA bundle updates. Note that when the Service stops being available, we clear the Endpoint URL and notify listeners. This means that GetAntreaClient() can now return an error even if a previous call was successful. We also update the NetworkPolicyController in the Agent, so that we fallback to saved policies in case the Antrea client does not become ready within 5s. Signed-off-by: Antonin Bas --- cmd/antrea-agent-simulator/simulator.go | 7 +- cmd/antrea-agent/agent.go | 8 +- pkg/agent/{ => client}/client.go | 88 ++++-- pkg/agent/client/endpoint_resolver.go | 292 ++++++++++++++++++ pkg/agent/client/endpoint_resolver_test.go | 159 ++++++++++ .../controller/egress/egress_controller.go | 6 +- .../networkpolicy/networkpolicy_controller.go | 29 +- .../networkpolicy/status_controller.go | 6 +- pkg/agent/stats/collector.go | 6 +- .../support_bundle_controller.go | 6 +- test/e2e/networkpolicy_test.go | 27 +- 11 files changed, 573 insertions(+), 61 deletions(-) rename pkg/agent/{ => client}/client.go (58%) create mode 100644 pkg/agent/client/endpoint_resolver.go create mode 100644 pkg/agent/client/endpoint_resolver_test.go diff --git a/cmd/antrea-agent-simulator/simulator.go b/cmd/antrea-agent-simulator/simulator.go index 755642e6ac3..f9410edfc37 100644 --- a/cmd/antrea-agent-simulator/simulator.go +++ b/cmd/antrea-agent-simulator/simulator.go @@ -29,7 +29,7 @@ import ( componentbaseconfig "k8s.io/component-base/config" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/env" "antrea.io/antrea/pkg/util/k8s" @@ -49,7 +49,10 @@ func run() error { } // Create Antrea Clientset for the given config. - antreaClientProvider := agent.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient) + antreaClientProvider, err := client.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient) + if err != nil { + return err + } if err = antreaClientProvider.RunOnce(); err != nil { return err diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 246a1f19a0c..bf621481bde 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -34,6 +34,7 @@ import ( mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/agent" "antrea.io/antrea/pkg/agent/apiserver" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/config" @@ -127,7 +128,10 @@ func run(o *Options) error { nodeLatencyMonitorInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors() // Create Antrea Clientset for the given config. - antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) + antreaClientProvider, err := client.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) + if err != nil { + return fmt.Errorf("failed to create Antrea client provider: %w", err) + } // Register Antrea Agent metrics if EnablePrometheusMetrics is set if *o.config.EnablePrometheusMetrics { @@ -794,8 +798,6 @@ func run(o *Options) error { } } - // NetworkPolicyController and EgressController accesses the "antrea" Service via its ClusterIP. - // Run them after AntreaProxy is ready. go networkPolicyController.Run(stopCh) if o.enableEgress { go egressController.Run(stopCh) diff --git a/pkg/agent/client.go b/pkg/agent/client/client.go similarity index 58% rename from pkg/agent/client.go rename to pkg/agent/client/client.go index 57d87baa92e..63b13e78de0 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client/client.go @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package agent +package client import ( "context" "fmt" - "net" "os" + "strconv" "sync" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -40,7 +40,9 @@ type AntreaClientProvider interface { GetAntreaClient() (versioned.Interface, error) } -// antreaClientProvider provides an AntreaClientProvider that can dynamically react to ConfigMap changes. +// antreaClientProvider provides an AntreaClientProvider that can dynamically react to CA bundle +// ConfigMap changes, as well as directly resolve the Antrea Service Endpoint when running inside a K8s cluster. +// The consumers of antreaClientProvider are supposed to always call GetAntreaClient() to get a client and not cache it. type antreaClientProvider struct { config config.ClientConnectionConfiguration // mutex protects client. @@ -49,27 +51,62 @@ type antreaClientProvider struct { client versioned.Interface // caContentProvider provides the very latest content of the ca bundle. caContentProvider *dynamiccertificates.ConfigMapCAController + // endpointResolver provides a known Endpoint for the Antrea Service. There is usually a + // single Endpoint at any given time, given that the Antrea Controller runs as a + // single-replica Deployment. By resolving the Endpoint manually and accessing it directly, + // instead of depending on the ClusterIP functionality provided by the K8s proxy, we get + // more flexibility when initializing the Antrea Agent. For example, we can retrieve + // NetworkPolicies from the Controller even if the proxy is not (yet) available. + // endpointResolver is only used when no kubeconfig is provided (otherwise we honor the + // provided config). + endpointResolver *EndpointResolver } +// antreaClientProvider must implement the dynamiccertificates.Listener interface to be notified of +// CA bundle updates. var _ dynamiccertificates.Listener = &antreaClientProvider{} -func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) *antreaClientProvider { - // The key "ca.crt" may not exist at the beginning, no need to fail as the CA provider will watch the ConfigMap - // and notify antreaClientProvider of any update. The consumers of antreaClientProvider are supposed to always - // call GetAntreaClient() to get a client and not cache it. - antreaCAProvider, _ := dynamiccertificates.NewDynamicCAFromConfigMapController( +// antreaClientProvider must implement the Listener interface to be notified of an Endpoint change +// for the Antrea Service. +var _ Listener = &antreaClientProvider{} + +func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) (*antreaClientProvider, error) { + antreaCAProvider, err := dynamiccertificates.NewDynamicCAFromConfigMapController( "antrea-ca", cert.GetCAConfigMapNamespace(), apis.AntreaCAConfigMapName, apis.CAConfigMapKey, kubeClient) + if err != nil { + return nil, err + } + + var endpointResolver *EndpointResolver + if len(config.Kubeconfig) == 0 { + klog.InfoS("No Antrea kubeconfig file was specified. Falling back to in-cluster config") + port := os.Getenv("ANTREA_SERVICE_PORT") + if len(port) == 0 { + return nil, fmt.Errorf("unable to create Endpoint resolver for Antrea Service, ANTREA_SERVICE_PORT must be defined for in-cluster config") + } + servicePort, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid port number stored in ANTREA_SERVICE_PORT: %w", err) + } + endpointResolver = NewEndpointResolver(kubeClient, env.GetAntreaNamespace(), apis.AntreaServiceName, int32(servicePort)) + } + antreaClientProvider := &antreaClientProvider{ config: config, caContentProvider: antreaCAProvider, + endpointResolver: endpointResolver, } antreaCAProvider.AddListener(antreaClientProvider) - return antreaClientProvider + if endpointResolver != nil { + endpointResolver.AddListener(antreaClientProvider) + } + + return antreaClientProvider, nil } // RunOnce runs the task a single time synchronously, ensuring client is initialized if kubeconfig is specified. @@ -80,14 +117,18 @@ func (p *antreaClientProvider) RunOnce() error { // Run starts the caContentProvider, which watches the ConfigMap and notifies changes // by calling Enqueue. func (p *antreaClientProvider) Run(ctx context.Context) { - p.caContentProvider.Run(ctx, 1) + go p.caContentProvider.Run(ctx, 1) + if p.endpointResolver != nil { + go p.endpointResolver.Run(ctx) + } + <-ctx.Done() } // Enqueue implements dynamiccertificates.Listener. It will be called by caContentProvider // when caBundle is updated. func (p *antreaClientProvider) Enqueue() { if err := p.updateAntreaClient(); err != nil { - klog.Errorf("Failed to update Antrea client: %v", err) + klog.ErrorS(err, "Failed to update Antrea client") } } @@ -105,13 +146,17 @@ func (p *antreaClientProvider) updateAntreaClient() error { var kubeConfig *rest.Config var err error if len(p.config.Kubeconfig) == 0 { - klog.Info("No antrea kubeconfig file was specified. Falling back to in-cluster config") caBundle := p.caContentProvider.CurrentCABundleContent() if caBundle == nil { - klog.Info("Didn't get CA certificate, skip updating Antrea Client") + klog.InfoS("Didn't get CA certificate, skip updating Antrea Client") return nil } - kubeConfig, err = inClusterConfig(caBundle) + endpointURL := p.endpointResolver.CurrentEndpointURL() + if endpointURL == nil { + klog.InfoS("Didn't get Endpoint URL for Antrea Service, skip updating Antrea Client") + return nil + } + kubeConfig, err = inClusterConfig(caBundle, endpointURL.String()) } else { kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig( &clientcmd.ClientConfigLoadingRules{ExplicitPath: p.config.Kubeconfig}, @@ -138,17 +183,12 @@ func (p *antreaClientProvider) updateAntreaClient() error { return nil } -// inClusterConfig returns a config object which uses the service account -// kubernetes gives to pods. It's intended for clients that expect to be -// running inside a pod running on kubernetes. It will return error -// if called from a process not running in a kubernetes environment. -func inClusterConfig(caBundle []byte) (*rest.Config, error) { +// inClusterConfig returns a config object which uses the service account Kubernetes gives to +// Pods. It's intended for clients that expect to be running inside a Pod running on Kubernetes. It +// will return error if called from a process not running in a Kubernetes environment. +func inClusterConfig(caBundle []byte, endpoint string) (*rest.Config, error) { // #nosec G101: false positive triggered by variable name which includes "token" const tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" - host, port := os.Getenv("ANTREA_SERVICE_HOST"), os.Getenv("ANTREA_SERVICE_PORT") - if len(host) == 0 || len(port) == 0 { - return nil, fmt.Errorf("unable to load in-cluster configuration, ANTREA_SERVICE_HOST and ANTREA_SERVICE_PORT must be defined") - } token, err := os.ReadFile(tokenFile) if err != nil { @@ -161,7 +201,7 @@ func inClusterConfig(caBundle []byte) (*rest.Config, error) { } return &rest.Config{ - Host: "https://" + net.JoinHostPort(host, port), + Host: endpoint, TLSClientConfig: tlsClientConfig, BearerToken: string(token), BearerTokenFile: tokenFile, diff --git a/pkg/agent/client/endpoint_resolver.go b/pkg/agent/client/endpoint_resolver.go new file mode 100644 index 00000000000..9e4eef36fd7 --- /dev/null +++ b/pkg/agent/client/endpoint_resolver.go @@ -0,0 +1,292 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "fmt" + "net/url" + "reflect" + "sync/atomic" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/proxy" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const ( + // informerDefaultResync is the default resync period if a handler doesn't specify one. + // Use the same default value as kube-controller-manager: + // https://github.com/kubernetes/kubernetes/blob/release-1.17/pkg/controller/apis/config/v1alpha1/defaults.go#L120 + informerDefaultResync = 12 * time.Hour + + minRetryDelay = 100 * time.Millisecond + maxRetryDelay = 30 * time.Second +) + +// Listener defines the interface which needs to be implemented by clients which want to subscribe +// to Endpoint updates. +type Listener interface { + Enqueue() +} + +// EndpointResolver is in charge of resolving a specific Service Endpoint, which can then be +// accessed directly instead of depending on the ClusterIP functionality provided by K8s proxies +// (whether it's kube-proxy or AntreaProxy). A new Endpoint is resolved every time the Service's +// Spec or the Endpoints' Subsets are updated, and registered listeners are notified. While this +// EndpointResolver is somewhat generic, at the moment it is only meant to be used for the Antrea +// Service. +type EndpointResolver struct { + // name is the name of the controller in charge of Endpoint resolution. + name string + namespace string + serviceName string + servicePort int32 + // informerFactory is stored here so it can be started in the Run() method. + informerFactory informers.SharedInformerFactory + // serviceLister is used to retrieve the Service when selecting an Endpoint. + serviceLister corev1listers.ServiceLister + serviceListerSynced cache.InformerSynced + // endpointLister is used to retrieve the Endpoints for the Service during Endpoint selection. + endpointsLister corev1listers.EndpointsLister + endpointsListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + // listeners need to implement the Listerner interface and will get notified when the + // current Endpoint URL changes. + listeners []Listener + endpointURL atomic.Pointer[url.URL] +} + +func NewEndpointResolver(kubeClient kubernetes.Interface, namespace, serviceName string, servicePort int32) *EndpointResolver { + key := namespace + "/" + serviceName + controllerName := fmt.Sprintf("ServiceEndpointResolver:%s", key) + + // We only need a specific Service and corresponding Endpoints resource, so we create our + // own informer factory, and we filter by namespace and name. + informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, informerDefaultResync, informers.WithNamespace(namespace), informers.WithTweakListOptions(func(listOptions *metav1.ListOptions) { + listOptions.FieldSelector = fields.OneTermEqualSelector("metadata.name", serviceName).String() + })) + serviceInformer := informerFactory.Core().V1().Services() + endpointsInformer := informerFactory.Core().V1().Endpoints() + + resolver := &EndpointResolver{ + name: controllerName, + namespace: namespace, + serviceName: serviceName, + servicePort: servicePort, + informerFactory: informerFactory, + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + endpointsLister: endpointsInformer.Lister(), + endpointsListerSynced: endpointsInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), controllerName), + } + + serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + // FilterFunc ignores all Service events which do not relate to the named Service. + // It should be redundant given the filtering that we already do at the informer level. + FilterFunc: func(obj interface{}) bool { + if service, ok := obj.(*corev1.Service); ok { + return service.Namespace == namespace && service.Name == serviceName + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if service, ok := tombstone.Obj.(*corev1.Service); ok { + return service.Namespace == namespace && service.Name == serviceName + } + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // This should not happen: both objects should be Services in the + // update event handler. + oldSvc, ok := oldObj.(*corev1.Service) + if !ok { + return + } + newSvc, ok := newObj.(*corev1.Service) + if !ok { + return + } + // Ignore changes to metadata or status. + if reflect.DeepEqual(newSvc.Spec, oldSvc.Spec) { + return + } + resolver.queue.Add(key) + }, + DeleteFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + }, + }) + endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + // FilterFunc ignores all Endpoints events which do not relate to the named Service. + // It should be redundant given the filtering that we already do at the informer level. + FilterFunc: func(obj interface{}) bool { + // The Endpoints resource for a Service has the same name as the Service. + if endpoints, ok := obj.(*corev1.Endpoints); ok { + return endpoints.Namespace == namespace && endpoints.Name == serviceName + } + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if endpoints, ok := tombstone.Obj.(*corev1.Endpoints); ok { + return endpoints.Namespace == namespace && endpoints.Name == serviceName + } + } + return false + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // This should not happen: both objects should be Endpoints in the + // update event handler. + oldEndpoints, ok := oldObj.(*corev1.Endpoints) + if !ok { + return + } + newEndpoints, ok := newObj.(*corev1.Endpoints) + if !ok { + return + } + // Ignore changes to metadata. + if reflect.DeepEqual(newEndpoints.Subsets, oldEndpoints.Subsets) { + return + } + resolver.queue.Add(key) + }, + DeleteFunc: func(obj interface{}) { + resolver.queue.Add(key) + }, + }, + }) + return resolver +} + +func (r *EndpointResolver) Run(ctx context.Context) { + defer r.queue.ShutDown() + + klog.InfoS("Starting controller", "name", r.name) + defer klog.InfoS("Shutting down controller", "name", r.name) + + r.informerFactory.Start(ctx.Done()) + defer r.informerFactory.Shutdown() + + if !cache.WaitForNamedCacheSync(r.name, ctx.Done(), r.serviceListerSynced, r.endpointsListerSynced) { + return + } + + // We only start one worker for this controller. + go wait.Until(r.runWorker, time.Second, ctx.Done()) + + <-ctx.Done() +} + +func (r *EndpointResolver) runWorker() { + for r.processNextWorkItem() { + } +} + +func (r *EndpointResolver) processNextWorkItem() bool { + key, quit := r.queue.Get() + if quit { + return false + } + defer r.queue.Done(key) + + if err := r.resolveEndpoint(); err == nil { + r.queue.Forget(key) + } else { + klog.ErrorS(err, "Failed to resolve Service Endpoint, requeuing", "key", key) + r.queue.AddRateLimited(key) + } + + return true +} + +func (r *EndpointResolver) resolveEndpoint() error { + klog.V(2).InfoS("Resolving Endpoint", "service", klog.KRef(r.namespace, r.serviceName)) + endpointURL, err := proxy.ResolveEndpoint(r.serviceLister, r.endpointsLister, r.namespace, r.serviceName, r.servicePort) + // Typically we will get one of these 2 errors (unavailable or not found). + // In this case, it makes sense to reset the Endpoint URL to nil and notify listeners. + // There is also no need to retry, as we won't find a suitable Endpoint until the Service or + // the Endpoints resource is updated in a way that will cause this function to be called again. + if errors.IsServiceUnavailable(err) { + klog.ErrorS(err, "Cannot resolve endpoint because Service is unavailable", "service", klog.KRef(r.namespace, r.serviceName)) + r.updateEndpointIfNeeded(nil) + return nil + } + if errors.IsNotFound(err) { + klog.ErrorS(err, "Cannot resolve endpoint because of missing resource", "service", klog.KRef(r.namespace, r.serviceName)) + r.updateEndpointIfNeeded(nil) + return nil + } + if err != nil { + // Unknown error: we err on the side of caution. + // Do not reset the URL or notify listeners, and trigger a retry. + return err + } + klog.V(2).InfoS("Resolved Endpoint", "service", klog.KRef(r.namespace, r.serviceName), "url", endpointURL) + r.updateEndpointIfNeeded(endpointURL) + return nil +} + +func (r *EndpointResolver) updateEndpointIfNeeded(endpointURL *url.URL) { + // The separate Load and Store calls are safe because there is a single writer for r.endpointURL. + currentEndpointURL := r.endpointURL.Load() + updateNeeded := func() bool { + if endpointURL == nil && currentEndpointURL == nil { + return false + } + if endpointURL == nil || currentEndpointURL == nil { + return true + } + return endpointURL.String() != currentEndpointURL.String() + } + if !updateNeeded() { + klog.V(2).InfoS("No change to Endpoint for Service, no need to notify listeners", "service", klog.KRef(r.namespace, r.serviceName)) + return + } + if endpointURL != nil { + klog.InfoS("Selected a new Endpoint for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName), "url", endpointURL) + } else { + klog.InfoS("Selected no Endpoint for Service, notifying listeners", "service", klog.KRef(r.namespace, r.serviceName)) + } + r.endpointURL.Store(endpointURL) + for _, listener := range r.listeners { + listener.Enqueue() + } +} + +func (r *EndpointResolver) AddListener(listener Listener) { + r.listeners = append(r.listeners, listener) +} + +func (r *EndpointResolver) CurrentEndpointURL() *url.URL { + return r.endpointURL.Load() +} diff --git a/pkg/agent/client/endpoint_resolver_test.go b/pkg/agent/client/endpoint_resolver_test.go new file mode 100644 index 00000000000..e3fc6017667 --- /dev/null +++ b/pkg/agent/client/endpoint_resolver_test.go @@ -0,0 +1,159 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "fmt" + "net" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" +) + +const ( + testNamespace = "kube-system" + testServiceName = "antrea" + testServicePort = 443 + testTargetPort = 10349 + testEndpointIP1 = "172.18.0.3" + testEndpointIP2 = "172.18.0.4" +) + +func getTestObjects() (*corev1.Service, *corev1.Endpoints) { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testServiceName, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "antrea", + "component": "antrea=controller", + }, + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{ + { + Port: 443, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt32(testTargetPort), + }, + }, + }, + } + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testServiceName, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: testEndpointIP1, + }, + }, + Ports: []corev1.EndpointPort{ + { + Port: testTargetPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + } + return svc, endpoints +} + +func getEndpointURL(ip string) *url.URL { + return &url.URL{ + Scheme: "https", + Host: net.JoinHostPort(ip, fmt.Sprint(testTargetPort)), + } +} + +func runTestEndpointResolver(ctx context.Context, objects ...runtime.Object) (*fake.Clientset, *EndpointResolver) { + k8sClient := fake.NewSimpleClientset(objects...) + resolver := NewEndpointResolver(k8sClient, testNamespace, testServiceName, testServicePort) + go resolver.Run(ctx) + return k8sClient, resolver +} + +func TestEndpointResolver(t *testing.T) { + t.Run("add Service and Endpoints", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + k8sClient, resolver := runTestEndpointResolver(ctx) + require.Nil(t, resolver.CurrentEndpointURL()) + svc, endpoints := getTestObjects() + k8sClient.CoreV1().Services(testNamespace).Create(ctx, svc, metav1.CreateOptions{}) + k8sClient.CoreV1().Endpoints(testNamespace).Create(ctx, endpoints, metav1.CreateOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) + + t.Run("update Endpoint address", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + svc, endpoints := getTestObjects() + k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + endpoints.Subsets[0].Addresses[0].IP = testEndpointIP2 + k8sClient.CoreV1().Endpoints(testNamespace).Update(ctx, endpoints, metav1.UpdateOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP2), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) + + t.Run("remove Endpoint address", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + svc, endpoints := getTestObjects() + k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + endpoints.Subsets = nil + k8sClient.CoreV1().Endpoints(testNamespace).Update(ctx, endpoints, metav1.UpdateOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Nil(t, resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) + + t.Run("delete Service", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + svc, endpoints := getTestObjects() + k8sClient, resolver := runTestEndpointResolver(ctx, svc, endpoints) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, getEndpointURL(testEndpointIP1), resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + k8sClient.CoreV1().Services(testNamespace).Delete(ctx, testServiceName, metav1.DeleteOptions{}) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Nil(t, resolver.CurrentEndpointURL()) + }, 2*time.Second, 50*time.Millisecond) + }) +} diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index a6213897d40..7d50da0bf45 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -41,7 +41,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/ipassigner" "antrea.io/antrea/pkg/agent/memberlist" @@ -155,7 +155,7 @@ type EgressController struct { routeClient route.Interface k8sClient kubernetes.Interface crdClient clientsetversioned.Interface - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider egressInformer cache.SharedIndexInformer egressLister crdlisters.EgressLister @@ -210,7 +210,7 @@ type EgressController struct { func NewEgressController( ofClient openflow.Client, k8sClient kubernetes.Interface, - antreaClientGetter agent.AntreaClientProvider, + antreaClientGetter client.AntreaClientProvider, crdClient clientsetversioned.Interface, ifaceStore interfacestore.InterfaceStore, routeClient route.Interface, diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 63f1be42f4a..b95a8e1ec0f 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -34,7 +34,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/networkpolicy/l7engine" "antrea.io/antrea/pkg/agent/flowexporter/connections" @@ -113,14 +113,17 @@ type Controller struct { multicastEnabled bool // nodeType indicates type of the Node where Antrea Agent is running on. nodeType config.NodeType - // antreaClientProvider provides interfaces to get antreaClient, which can be - // used to watch Antrea AddressGroups, AppliedToGroups, and NetworkPolicies. - // We need to get antreaClient dynamically because the apiserver cert can be - // rotated and we need a new client with the updated CA cert. + // antreaClientProvider provides interfaces to get antreaClient, which + // can be used to watch Antrea AddressGroups, AppliedToGroups, and + // NetworkPolicies. We need to get antreaClient dynamically because we + // are not relying on the ClusterIP to access the Antrea Service (we + // resolve the endpoint directly, and the endpoint can change if the + // antrea-controller Pod is rescheduled), and because the apiserver cert + // can be rotated and we need a new client with the updated CA cert. // Verifying server certificate only takes place for new requests and existing // watches won't be interrupted by rotating cert. The new client will be used // after the existing watches expire. - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider // queue maintains the NetworkPolicy ruleIDs that need to be synced. queue workqueue.RateLimitingInterface // ruleCache maintains the desired state of NetworkPolicy rules. @@ -167,7 +170,7 @@ type Controller struct { } // NewNetworkPolicyController returns a new *Controller. -func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, +func NewNetworkPolicyController(antreaClientGetter client.AntreaClientProvider, ofClient openflow.Client, routeClient route.Interface, ifaceStore interfacestore.InterfaceStore, @@ -594,7 +597,13 @@ func (c *Controller) SetDenyConnStore(denyConnStore *connections.DenyConnectionS // Run will not return until stopCh is closed. func (c *Controller) Run(stopCh <-chan struct{}) { attempts := 0 - if err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 200*time.Millisecond, true, func(ctx context.Context) (bool, error) { + // If Antrea client is not ready within 5s, we assume that the Antrea Controller is not + // available. We proceed with our watches, which are likely to fail. In turn, this will + // trigger the fallback mechanism. + // 5s should be more than enough if the Antrea Controller is running correctly. + ctx, cancel := context.WithTimeout(wait.ContextForChannel(stopCh), 5*time.Second) + defer cancel() + if err := wait.PollUntilContextCancel(ctx, 200*time.Millisecond, true, func(ctx context.Context) (bool, error) { if attempts%10 == 0 { klog.Info("Waiting for Antrea client to be ready") } @@ -605,9 +614,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) { return true, nil }); err != nil { klog.Info("Stopped waiting for Antrea client") - return + } else { + klog.Info("Antrea client is ready") } - klog.Info("Antrea client is ready") // Use NonSlidingUntil so that normal reconnection (disconnected after // running a while) can reconnect immediately while abnormal reconnection diff --git a/pkg/agent/controller/networkpolicy/status_controller.go b/pkg/agent/controller/networkpolicy/status_controller.go index f062c41accc..9307048c69b 100644 --- a/pkg/agent/controller/networkpolicy/status_controller.go +++ b/pkg/agent/controller/networkpolicy/status_controller.go @@ -27,7 +27,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" ) @@ -82,7 +82,7 @@ func realizedRulePolicyIndexFunc(obj interface{}) ([]string, error) { return []string{string(rule.policyID)}, nil } -func newStatusController(antreaClientProvider agent.AntreaClientProvider, nodeName string, ruleCache *ruleCache) *StatusController { +func newStatusController(antreaClientProvider client.AntreaClientProvider, nodeName string, ruleCache *ruleCache) *StatusController { return &StatusController{ statusControlInterface: &networkPolicyStatusControl{antreaClientProvider: antreaClientProvider}, nodeName: nodeName, @@ -223,7 +223,7 @@ type networkPolicyStatusControlInterface interface { } type networkPolicyStatusControl struct { - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider } func (c *networkPolicyStatusControl) UpdateNetworkPolicyStatus(name string, status *v1beta2.NetworkPolicyStatus) error { diff --git a/pkg/agent/stats/collector.go b/pkg/agent/stats/collector.go index 2e1f19ffe8b..13543e271e6 100644 --- a/pkg/agent/stats/collector.go +++ b/pkg/agent/stats/collector.go @@ -23,7 +23,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/multicast" "antrea.io/antrea/pkg/agent/openflow" agenttypes "antrea.io/antrea/pkg/agent/types" @@ -57,7 +57,7 @@ type Collector struct { nodeName string // antreaClientProvider provides interfaces to get antreaClient, which will be used to report the statistics to the // antrea-controller. - antreaClientProvider agent.AntreaClientProvider + antreaClientProvider client.AntreaClientProvider // ofClient is the Openflow interface that can fetch the statistic of the Openflow entries. ofClient openflow.Client networkPolicyQuerier querier.AgentNetworkPolicyInfoQuerier @@ -68,7 +68,7 @@ type Collector struct { multicastEnabled bool } -func NewCollector(antreaClientProvider agent.AntreaClientProvider, ofClient openflow.Client, npQuerier querier.AgentNetworkPolicyInfoQuerier, mcQuerier *multicast.Controller) *Collector { +func NewCollector(antreaClientProvider client.AntreaClientProvider, ofClient openflow.Client, npQuerier querier.AgentNetworkPolicyInfoQuerier, mcQuerier *multicast.Controller) *Collector { nodeName, _ := env.GetNodeName() manager := &Collector{ nodeName: nodeName, diff --git a/pkg/agent/supportbundlecollection/support_bundle_controller.go b/pkg/agent/supportbundlecollection/support_bundle_controller.go index 160c236afdf..1202448019e 100644 --- a/pkg/agent/supportbundlecollection/support_bundle_controller.go +++ b/pkg/agent/supportbundlecollection/support_bundle_controller.go @@ -35,7 +35,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/exec" - "antrea.io/antrea/pkg/agent" + "antrea.io/antrea/pkg/agent/client" agentquerier "antrea.io/antrea/pkg/agent/querier" "antrea.io/antrea/pkg/apis/controlplane" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -69,7 +69,7 @@ type SupportBundleController struct { nodeName string supportBundleNodeType controlplane.SupportBundleCollectionNodeType namespace string - antreaClientGetter agent.AntreaClientProvider + antreaClientGetter client.AntreaClientProvider queue workqueue.Interface supportBundleCollection *cpv1b2.SupportBundleCollection supportBundleCollectionMutex sync.RWMutex @@ -84,7 +84,7 @@ type SupportBundleController struct { func NewSupportBundleController(nodeName string, supportBundleNodeType controlplane.SupportBundleCollectionNodeType, namespace string, - antreaClientGetter agent.AntreaClientProvider, + antreaClientGetter client.AntreaClientProvider, ovsCtlClient ovsctl.OVSCtlClient, aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index 080c879c1a9..bef78a76cb0 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" @@ -743,17 +744,17 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { require.NoError(t, err) t.Cleanup(func() { data.deleteNetworkpolicy(netpol) }) - checkFunc := func(testPod string, testPodIPs *PodIPs, expectErr bool) { + checkFunc := func(t assert.TestingT, testPod string, testPodIPs *PodIPs, expectErr bool) { var wg sync.WaitGroup checkOne := func(clientPod, serverPod string, serverIP *net.IP) { defer wg.Done() if serverIP != nil { cmd := []string{"wget", "-O", "-", serverIP.String(), "-T", "1"} _, _, err := data.RunCommandFromPod(data.testNamespace, clientPod, nginxContainerName, cmd) - if expectErr && err == nil { - t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) - } else if !expectErr && err != nil { - t.Errorf("Pod %s should be able to connect %s, but was not able to connect, err: %v", clientPod, serverPod, err) + if expectErr { + assert.Error(t, err, "Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) + } else { + assert.NoError(t, err, "Pod %s should be able to connect %s, but was not able to connect", clientPod, serverPod) } } } @@ -783,7 +784,7 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { // While the new antrea-agent starts, the denied Pod should never connect to the isolated Pod successfully. for i := 0; i < 5; i++ { - checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(t, deniedPod, deniedPodIPs, true) } antreaPod, err := data.getAntreaPodOnNode(workerNode) @@ -792,15 +793,21 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionFalse) waitForAgentCondition(t, data, antreaPod, v1beta1.OpenflowConnectionUp, corev1.ConditionTrue) // Even the new antrea-agent can't connect to antrea-controller, the previous policy should continue working. - checkFunc(deniedPod, deniedPodIPs, true) - checkFunc(allowedPod, allowedPodIPs, false) + checkFunc(t, deniedPod, deniedPodIPs, true) + // It may take some time for the antrea-agent to fallback to locally-saved policies. Until + // it happens, allowed traffic may be dropped. So we use polling to tolerate some delay. + // The important part is that traffic that should be denied is always denied, which we have + // already validated at that point. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + checkFunc(t, allowedPod, allowedPodIPs, false) + }, 10*time.Second, 1*time.Second) // Scale antrea-controller to 1 so antrea-agent will connect to antrea-controller. scaleFunc(1) // Make sure antrea-agent connects to antrea-controller. waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionTrue) - checkFunc(deniedPod, deniedPodIPs, true) - checkFunc(allowedPod, allowedPodIPs, false) + checkFunc(t, deniedPod, deniedPodIPs, true) + checkFunc(t, allowedPod, allowedPodIPs, false) } func testIngressPolicyWithoutPortNumber(t *testing.T, data *TestData) {