Skip to content

Commit

Permalink
Remove Agent's dependency on proxy to access Antrea Service (#6361)
Browse files Browse the repository at this point in the history
We add Endpoint resolution to the AntreaClientProvider, so that when
running in-cluster, accessing the Antrea Service (i.e., accessing the
Antrea Controller API) no longer depends on the ClusterIP functionality
provided by the K8s proxy, whether it is kube-proxy or AntreaProxy.

This gives us more flexibility during Agent initialization. For example,
when kube-proxy is removed and ProxyAll is enable for AntreaProxy,
accessing the Antrea Service no longer requires any routes or OVS flows
installed by the Antrea Agent.

To implement this functionality, we add a controller (EndpointResolver),
to watch the Antrea Service and the corresponding Endpoints
resource. For every relevant update, the Endpoint is resolved and the
new URL is sent to the AntreaClientProvider. This is a similar model as
the one we already use for CA bundle updates.

Note that when the Service stops being available, we clear the Endpoint
URL and notify listeners. This means that GetAntreaClient() can now
return an error even if a previous call was successful.

We also update the NetworkPolicyController in the Agent, so that we
fallback to saved policies in case the Antrea client does not become
ready within 5s.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas authored May 31, 2024
1 parent c3103a9 commit f20bdb7
Show file tree
Hide file tree
Showing 11 changed files with 573 additions and 61 deletions.
7 changes: 5 additions & 2 deletions cmd/antrea-agent-simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent"
"antrea.io/antrea/pkg/agent/client"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/env"
"antrea.io/antrea/pkg/util/k8s"
Expand All @@ -49,7 +49,10 @@ func run() error {
}

// Create Antrea Clientset for the given config.
antreaClientProvider := agent.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient)
antreaClientProvider, err := client.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient)
if err != nil {
return err
}

if err = antreaClientProvider.RunOnce(); err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/agent"
"antrea.io/antrea/pkg/agent/apiserver"
"antrea.io/antrea/pkg/agent/client"
"antrea.io/antrea/pkg/agent/cniserver"
"antrea.io/antrea/pkg/agent/cniserver/ipam"
"antrea.io/antrea/pkg/agent/config"
Expand Down Expand Up @@ -127,7 +128,10 @@ func run(o *Options) error {
nodeLatencyMonitorInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors()

// Create Antrea Clientset for the given config.
antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient)
antreaClientProvider, err := client.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient)
if err != nil {
return fmt.Errorf("failed to create Antrea client provider: %w", err)
}

// Register Antrea Agent metrics if EnablePrometheusMetrics is set
if *o.config.EnablePrometheusMetrics {
Expand Down Expand Up @@ -794,8 +798,6 @@ func run(o *Options) error {
}
}

// NetworkPolicyController and EgressController accesses the "antrea" Service via its ClusterIP.
// Run them after AntreaProxy is ready.
go networkPolicyController.Run(stopCh)
if o.enableEgress {
go egressController.Run(stopCh)
Expand Down
88 changes: 64 additions & 24 deletions pkg/agent/client.go → pkg/agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package agent
package client

import (
"context"
"fmt"
"net"
"os"
"strconv"
"sync"

"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -40,7 +40,9 @@ type AntreaClientProvider interface {
GetAntreaClient() (versioned.Interface, error)
}

// antreaClientProvider provides an AntreaClientProvider that can dynamically react to ConfigMap changes.
// antreaClientProvider provides an AntreaClientProvider that can dynamically react to CA bundle
// ConfigMap changes, as well as directly resolve the Antrea Service Endpoint when running inside a K8s cluster.
// The consumers of antreaClientProvider are supposed to always call GetAntreaClient() to get a client and not cache it.
type antreaClientProvider struct {
config config.ClientConnectionConfiguration
// mutex protects client.
Expand All @@ -49,27 +51,62 @@ type antreaClientProvider struct {
client versioned.Interface
// caContentProvider provides the very latest content of the ca bundle.
caContentProvider *dynamiccertificates.ConfigMapCAController
// endpointResolver provides a known Endpoint for the Antrea Service. There is usually a
// single Endpoint at any given time, given that the Antrea Controller runs as a
// single-replica Deployment. By resolving the Endpoint manually and accessing it directly,
// instead of depending on the ClusterIP functionality provided by the K8s proxy, we get
// more flexibility when initializing the Antrea Agent. For example, we can retrieve
// NetworkPolicies from the Controller even if the proxy is not (yet) available.
// endpointResolver is only used when no kubeconfig is provided (otherwise we honor the
// provided config).
endpointResolver *EndpointResolver
}

// antreaClientProvider must implement the dynamiccertificates.Listener interface to be notified of
// CA bundle updates.
var _ dynamiccertificates.Listener = &antreaClientProvider{}

func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) *antreaClientProvider {
// The key "ca.crt" may not exist at the beginning, no need to fail as the CA provider will watch the ConfigMap
// and notify antreaClientProvider of any update. The consumers of antreaClientProvider are supposed to always
// call GetAntreaClient() to get a client and not cache it.
antreaCAProvider, _ := dynamiccertificates.NewDynamicCAFromConfigMapController(
// antreaClientProvider must implement the Listener interface to be notified of an Endpoint change
// for the Antrea Service.
var _ Listener = &antreaClientProvider{}

func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) (*antreaClientProvider, error) {
antreaCAProvider, err := dynamiccertificates.NewDynamicCAFromConfigMapController(
"antrea-ca",
cert.GetCAConfigMapNamespace(),
apis.AntreaCAConfigMapName,
apis.CAConfigMapKey,
kubeClient)
if err != nil {
return nil, err
}

var endpointResolver *EndpointResolver
if len(config.Kubeconfig) == 0 {
klog.InfoS("No Antrea kubeconfig file was specified. Falling back to in-cluster config")
port := os.Getenv("ANTREA_SERVICE_PORT")
if len(port) == 0 {
return nil, fmt.Errorf("unable to create Endpoint resolver for Antrea Service, ANTREA_SERVICE_PORT must be defined for in-cluster config")
}
servicePort, err := strconv.ParseInt(port, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid port number stored in ANTREA_SERVICE_PORT: %w", err)
}
endpointResolver = NewEndpointResolver(kubeClient, env.GetAntreaNamespace(), apis.AntreaServiceName, int32(servicePort))
}

antreaClientProvider := &antreaClientProvider{
config: config,
caContentProvider: antreaCAProvider,
endpointResolver: endpointResolver,
}

antreaCAProvider.AddListener(antreaClientProvider)
return antreaClientProvider
if endpointResolver != nil {
endpointResolver.AddListener(antreaClientProvider)
}

return antreaClientProvider, nil
}

// RunOnce runs the task a single time synchronously, ensuring client is initialized if kubeconfig is specified.
Expand All @@ -80,14 +117,18 @@ func (p *antreaClientProvider) RunOnce() error {
// Run starts the caContentProvider, which watches the ConfigMap and notifies changes
// by calling Enqueue.
func (p *antreaClientProvider) Run(ctx context.Context) {
p.caContentProvider.Run(ctx, 1)
go p.caContentProvider.Run(ctx, 1)
if p.endpointResolver != nil {
go p.endpointResolver.Run(ctx)
}
<-ctx.Done()
}

// Enqueue implements dynamiccertificates.Listener. It will be called by caContentProvider
// when caBundle is updated.
func (p *antreaClientProvider) Enqueue() {
if err := p.updateAntreaClient(); err != nil {
klog.Errorf("Failed to update Antrea client: %v", err)
klog.ErrorS(err, "Failed to update Antrea client")
}
}

Expand All @@ -105,13 +146,17 @@ func (p *antreaClientProvider) updateAntreaClient() error {
var kubeConfig *rest.Config
var err error
if len(p.config.Kubeconfig) == 0 {
klog.Info("No antrea kubeconfig file was specified. Falling back to in-cluster config")
caBundle := p.caContentProvider.CurrentCABundleContent()
if caBundle == nil {
klog.Info("Didn't get CA certificate, skip updating Antrea Client")
klog.InfoS("Didn't get CA certificate, skip updating Antrea Client")
return nil
}
kubeConfig, err = inClusterConfig(caBundle)
endpointURL := p.endpointResolver.CurrentEndpointURL()
if endpointURL == nil {
klog.InfoS("Didn't get Endpoint URL for Antrea Service, skip updating Antrea Client")
return nil
}
kubeConfig, err = inClusterConfig(caBundle, endpointURL.String())
} else {
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: p.config.Kubeconfig},
Expand All @@ -138,17 +183,12 @@ func (p *antreaClientProvider) updateAntreaClient() error {
return nil
}

// inClusterConfig returns a config object which uses the service account
// kubernetes gives to pods. It's intended for clients that expect to be
// running inside a pod running on kubernetes. It will return error
// if called from a process not running in a kubernetes environment.
func inClusterConfig(caBundle []byte) (*rest.Config, error) {
// inClusterConfig returns a config object which uses the service account Kubernetes gives to
// Pods. It's intended for clients that expect to be running inside a Pod running on Kubernetes. It
// will return error if called from a process not running in a Kubernetes environment.
func inClusterConfig(caBundle []byte, endpoint string) (*rest.Config, error) {
// #nosec G101: false positive triggered by variable name which includes "token"
const tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
host, port := os.Getenv("ANTREA_SERVICE_HOST"), os.Getenv("ANTREA_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, fmt.Errorf("unable to load in-cluster configuration, ANTREA_SERVICE_HOST and ANTREA_SERVICE_PORT must be defined")
}

token, err := os.ReadFile(tokenFile)
if err != nil {
Expand All @@ -161,7 +201,7 @@ func inClusterConfig(caBundle []byte) (*rest.Config, error) {
}

return &rest.Config{
Host: "https://" + net.JoinHostPort(host, port),
Host: endpoint,
TLSClientConfig: tlsClientConfig,
BearerToken: string(token),
BearerTokenFile: tokenFile,
Expand Down
Loading

0 comments on commit f20bdb7

Please sign in to comment.