Skip to content

Commit

Permalink
Point cilium to talk to the local apiserver or apiserver-proxy (#697)
Browse files Browse the repository at this point in the history
  • Loading branch information
berkayoz committed Sep 27, 2024
1 parent d3c4a36 commit 9e0a43c
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 81 deletions.
4 changes: 2 additions & 2 deletions src/k8s/pkg/k8sd/api/certificates_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,10 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo
}

// Kubeconfigs
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), fmt.Sprintf("%s:6443", localhostAddress), certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil {
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), fmt.Sprintf("%s:%d", localhostAddress, clusterConfig.APIServer.GetSecurePort()), certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil {
return response.InternalError(fmt.Errorf("failed to generate kubelet kubeconfig: %w", err))
}
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), fmt.Sprintf("%s:6443", localhostAddress), certificates.CACert, certificates.KubeProxyClientCert, certificates.KubeProxyClientKey); err != nil {
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), fmt.Sprintf("%s:%d", localhostAddress, clusterConfig.APIServer.GetSecurePort()), certificates.CACert, certificates.KubeProxyClientCert, certificates.KubeProxyClientKey); err != nil {
return response.InternalError(fmt.Errorf("failed to generate kube-proxy kubeconfig: %w", err))
}

Expand Down
26 changes: 22 additions & 4 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"

apiv1 "github.com/canonical/k8s-snap-api/api/v1"
Expand Down Expand Up @@ -184,11 +186,22 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
localhostAddress = "127.0.0.1"
}

port := "6443"
if len(response.APIServers) == 0 {
return fmt.Errorf("no APIServers found in worker node info")
}
// Get the secure port from the first APIServer since they should all be the same.
port = response.APIServers[0][strings.LastIndex(response.APIServers[0], ":")+1:]
securePort, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("failed to parse apiserver secure port: %w", err)
}

// Kubeconfigs
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), fmt.Sprintf("%s:6443", localhostAddress), certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil {
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "kubelet.conf"), fmt.Sprintf("%s:%d", localhostAddress, securePort), certificates.CACert, certificates.KubeletClientCert, certificates.KubeletClientKey); err != nil {
return fmt.Errorf("failed to generate kubelet kubeconfig: %w", err)
}
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), fmt.Sprintf("%s:6443", localhostAddress), certificates.CACert, certificates.KubeProxyClientCert, certificates.KubeProxyClientKey); err != nil {
if err := setup.Kubeconfig(filepath.Join(snap.KubernetesConfigDir(), "proxy.conf"), fmt.Sprintf("%s:%d", localhostAddress, securePort), certificates.CACert, certificates.KubeProxyClientCert, certificates.KubeProxyClientKey); err != nil {
return fmt.Errorf("failed to generate kube-proxy kubeconfig: %w", err)
}

Expand All @@ -203,6 +216,9 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
// TODO(neoaggelos): We should be explicit here and try to avoid having worker nodes use
// or set other cluster configuration keys by accident.
cfg := types.ClusterConfig{
APIServer: types.APIServer{
SecurePort: utils.Pointer(securePort),
},
Network: types.Network{
PodCIDR: utils.Pointer(response.PodCIDR),
ServiceCIDR: utils.Pointer(response.ServiceCIDR),
Expand Down Expand Up @@ -239,7 +255,7 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
if err := setup.KubeProxy(ctx, snap, s.Name(), response.PodCIDR, localhostAddress, joinConfig.ExtraNodeKubeProxyArgs); err != nil {
return fmt.Errorf("failed to configure kube-proxy: %w", err)
}
if err := setup.K8sAPIServerProxy(snap, response.APIServers, localhostAddress, joinConfig.ExtraNodeK8sAPIServerProxyArgs); err != nil {
if err := setup.K8sAPIServerProxy(snap, response.APIServers, securePort, joinConfig.ExtraNodeK8sAPIServerProxyArgs); err != nil {
return fmt.Errorf("failed to configure k8s-apiserver-proxy: %w", err)
}
if err := setup.ExtraNodeConfigFiles(snap, joinConfig.ExtraNodeConfigFiles); err != nil {
Expand Down Expand Up @@ -291,6 +307,8 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
localhostAddress = "127.0.0.1"
}

cfg.Network.LocalhostAddress = utils.Pointer(localhostAddress)

// Create directories
if err := setup.EnsureAllDirectories(snap); err != nil {
return fmt.Errorf("failed to create directories: %w", err)
Expand Down Expand Up @@ -441,7 +459,7 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
if err := setup.KubeScheduler(snap, bootstrapConfig.ExtraNodeKubeSchedulerArgs); err != nil {
return fmt.Errorf("failed to configure kube-scheduler: %w", err)
}
if err := setup.KubeAPIServer(snap, nodeIP, cfg.Network.GetServiceCIDR(), s.Address().Path("1.0", "kubernetes", "auth", "webhook").String(), true, cfg.Datastore, cfg.APIServer.GetAuthorizationMode(), bootstrapConfig.ExtraNodeKubeAPIServerArgs); err != nil {
if err := setup.KubeAPIServer(snap, cfg.APIServer.GetSecurePort(), nodeIP, cfg.Network.GetServiceCIDR(), s.Address().Path("1.0", "kubernetes", "auth", "webhook").String(), true, cfg.Datastore, cfg.APIServer.GetAuthorizationMode(), bootstrapConfig.ExtraNodeKubeAPIServerArgs); err != nil {
return fmt.Errorf("failed to configure kube-apiserver: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri
if err := setup.KubeScheduler(snap, joinConfig.ExtraNodeKubeSchedulerArgs); err != nil {
return fmt.Errorf("failed to configure kube-scheduler: %w", err)
}
if err := setup.KubeAPIServer(snap, nodeIP, cfg.Network.GetServiceCIDR(), s.Address().Path("1.0", "kubernetes", "auth", "webhook").String(), true, cfg.Datastore, cfg.APIServer.GetAuthorizationMode(), joinConfig.ExtraNodeKubeAPIServerArgs); err != nil {
if err := setup.KubeAPIServer(snap, cfg.APIServer.GetSecurePort(), nodeIP, cfg.Network.GetServiceCIDR(), s.Address().Path("1.0", "kubernetes", "auth", "webhook").String(), true, cfg.Datastore, cfg.APIServer.GetAuthorizationMode(), joinConfig.ExtraNodeKubeAPIServerArgs); err != nil {
return fmt.Errorf("failed to configure kube-apiserver: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/controllers/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *FeatureController) Run(
ctx = log.NewContext(ctx, log.FromContext(ctx).WithValues("controller", "feature"))

go c.reconcileLoop(ctx, getClusterConfig, setFeatureStatus, features.Network, c.triggerNetworkCh, c.reconciledNetworkCh, func(cfg types.ClusterConfig) (types.FeatureStatus, error) {
return features.Implementation.ApplyNetwork(ctx, c.snap, cfg.Network, cfg.Annotations)
return features.Implementation.ApplyNetwork(ctx, c.snap, cfg.APIServer, cfg.Network, cfg.Annotations)
})

go c.reconcileLoop(ctx, getClusterConfig, setFeatureStatus, features.Gateway, c.triggerGatewayCh, c.reconciledGatewayCh, func(cfg types.ClusterConfig) (types.FeatureStatus, error) {
Expand Down
12 changes: 6 additions & 6 deletions src/k8s/pkg/k8sd/features/calico/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ const (
deleteFailedMsgTmpl = "Failed to delete Calico, the error was: %v"
)

// ApplyNetwork will deploy Calico when cfg.Enabled is true.
// ApplyNetwork will remove Calico when cfg.Enabled is false.
// ApplyNetwork will deploy Calico when network.Enabled is true.
// ApplyNetwork will remove Calico when network.Enabled is false.
// ApplyNetwork will always return a FeatureStatus indicating the current status of the
// deployment.
// ApplyNetwork returns an error if anything fails. The error is also wrapped in the .Message field of the
// returned FeatureStatus.
func ApplyNetwork(ctx context.Context, snap snap.Snap, cfg types.Network, annotations types.Annotations) (types.FeatureStatus, error) {
func ApplyNetwork(ctx context.Context, snap snap.Snap, apiserver types.APIServer, network types.Network, annotations types.Annotations) (types.FeatureStatus, error) {
m := snap.HelmClient()

if !cfg.GetEnabled() {
if !network.GetEnabled() {
if _, err := m.Apply(ctx, ChartCalico, helm.StateDeleted, nil); err != nil {
err = fmt.Errorf("failed to uninstall network: %w", err)
return types.FeatureStatus{
Expand Down Expand Up @@ -54,7 +54,7 @@ func ApplyNetwork(ctx context.Context, snap snap.Snap, cfg types.Network, annota
}

podIpPools := []map[string]any{}
ipv4PodCIDR, ipv6PodCIDR, err := utils.ParseCIDRs(cfg.GetPodCIDR())
ipv4PodCIDR, ipv6PodCIDR, err := utils.ParseCIDRs(network.GetPodCIDR())
if err != nil {
err = fmt.Errorf("invalid pod cidr: %w", err)
return types.FeatureStatus{
Expand All @@ -79,7 +79,7 @@ func ApplyNetwork(ctx context.Context, snap snap.Snap, cfg types.Network, annota
}

serviceCIDRs := []string{}
ipv4ServiceCIDR, ipv6ServiceCIDR, err := utils.ParseCIDRs(cfg.GetServiceCIDR())
ipv4ServiceCIDR, ipv6ServiceCIDR, err := utils.ParseCIDRs(network.GetServiceCIDR())
if err != nil {
err = fmt.Errorf("invalid service cidr: %v", err)
return types.FeatureStatus{
Expand Down
52 changes: 35 additions & 17 deletions src/k8s/pkg/k8sd/features/calico/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ func TestDisabled(t *testing.T) {
HelmClient: helmM,
},
}
cfg := types.Network{
network := types.Network{
Enabled: ptr.To(false),
}
apiserver := types.APIServer{
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, cfg, nil)
status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, nil)

g.Expect(err).To(MatchError(applyErr))
g.Expect(status.Enabled).To(BeFalse())
Expand All @@ -65,11 +68,14 @@ func TestDisabled(t *testing.T) {
HelmClient: helmM,
},
}
cfg := types.Network{
network := types.Network{
Enabled: ptr.To(false),
}
apiserver := types.APIServer{
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, cfg, nil)
status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, nil)

g.Expect(err).ToNot(HaveOccurred())
g.Expect(status.Enabled).To(BeFalse())
Expand All @@ -94,12 +100,15 @@ func TestEnabled(t *testing.T) {
HelmClient: helmM,
},
}
cfg := types.Network{
network := types.Network{
Enabled: ptr.To(true),
PodCIDR: ptr.To("invalid-cidr"),
}
apiserver := types.APIServer{
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, cfg, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)

g.Expect(err).To(HaveOccurred())
g.Expect(status.Enabled).To(BeFalse())
Expand All @@ -115,13 +124,16 @@ func TestEnabled(t *testing.T) {
HelmClient: helmM,
},
}
cfg := types.Network{
network := types.Network{
Enabled: ptr.To(true),
PodCIDR: ptr.To("192.0.2.0/24,2001:db8::/32"),
ServiceCIDR: ptr.To("invalid-cidr"),
}
apiserver := types.APIServer{
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, cfg, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)

g.Expect(err).To(HaveOccurred())
g.Expect(status.Enabled).To(BeFalse())
Expand All @@ -140,13 +152,16 @@ func TestEnabled(t *testing.T) {
HelmClient: helmM,
},
}
cfg := types.Network{
network := types.Network{
Enabled: ptr.To(true),
PodCIDR: ptr.To("192.0.2.0/24,2001:db8::/32"),
ServiceCIDR: ptr.To("192.0.2.0/24,2001:db8::/32"),
}
apiserver := types.APIServer{
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, cfg, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)

g.Expect(err).To(MatchError(applyErr))
g.Expect(status.Enabled).To(BeFalse())
Expand All @@ -157,7 +172,7 @@ func TestEnabled(t *testing.T) {
callArgs := helmM.ApplyCalledWith[0]
g.Expect(callArgs.Chart).To(Equal(calico.ChartCalico))
g.Expect(callArgs.State).To(Equal(helm.StatePresent))
validateValues(t, callArgs.Values, cfg)
validateValues(t, callArgs.Values, network)
})
t.Run("Success", func(t *testing.T) {
g := NewWithT(t)
Expand All @@ -168,13 +183,16 @@ func TestEnabled(t *testing.T) {
HelmClient: helmM,
},
}
cfg := types.Network{
network := types.Network{
Enabled: ptr.To(true),
PodCIDR: ptr.To("192.0.2.0/24,2001:db8::/32"),
ServiceCIDR: ptr.To("192.0.2.0/24,2001:db8::/32"),
}
apiserver := types.APIServer{
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, cfg, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)

g.Expect(err).ToNot(HaveOccurred())
g.Expect(status.Enabled).To(BeTrue())
Expand All @@ -185,17 +203,17 @@ func TestEnabled(t *testing.T) {
callArgs := helmM.ApplyCalledWith[0]
g.Expect(callArgs.Chart).To(Equal(calico.ChartCalico))
g.Expect(callArgs.State).To(Equal(helm.StatePresent))
validateValues(t, callArgs.Values, cfg)
validateValues(t, callArgs.Values, network)
})
}

func validateValues(t *testing.T, values map[string]any, cfg types.Network) {
func validateValues(t *testing.T, values map[string]any, network types.Network) {
g := NewWithT(t)

podIPv4CIDR, podIPv6CIDR, err := utils.ParseCIDRs(cfg.GetPodCIDR())
podIPv4CIDR, podIPv6CIDR, err := utils.ParseCIDRs(network.GetPodCIDR())
g.Expect(err).ToNot(HaveOccurred())

svcIPv4CIDR, svcIPv6CIDR, err := utils.ParseCIDRs(cfg.GetServiceCIDR())
svcIPv4CIDR, svcIPv6CIDR, err := utils.ParseCIDRs(network.GetServiceCIDR())
g.Expect(err).ToNot(HaveOccurred())

// calico network
Expand Down
17 changes: 12 additions & 5 deletions src/k8s/pkg/k8sd/features/cilium/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ const (
networkDeployFailedMsgTmpl = "Failed to deploy Cilium Network, the error was: %v"
)

// ApplyNetwork will deploy Cilium when cfg.Enabled is true.
// ApplyNetwork will remove Cilium when cfg.Enabled is false.
// ApplyNetwork will deploy Cilium when network.Enabled is true.
// ApplyNetwork will remove Cilium when network.Enabled is false.
// ApplyNetwork requires that bpf and cgroups2 are already mounted and available when running under strict snap confinement. If they are not, it will fail (since Cilium will not have the required permissions to mount them).
// ApplyNetwork requires that `/sys` is mounted as a shared mount when running under classic snap confinement. This is to ensure that Cilium will be able to automatically mount bpf and cgroups2 on the pods.
// ApplyNetwork will always return a FeatureStatus indicating the current status of the
// deployment.
// ApplyNetwork returns an error if anything fails. The error is also wrapped in the .Message field of the
// returned FeatureStatus.
func ApplyNetwork(ctx context.Context, snap snap.Snap, cfg types.Network, _ types.Annotations) (types.FeatureStatus, error) {
func ApplyNetwork(ctx context.Context, snap snap.Snap, apiserver types.APIServer, network types.Network, _ types.Annotations) (types.FeatureStatus, error) {
m := snap.HelmClient()

if !cfg.GetEnabled() {
if !network.GetEnabled() {
if _, err := m.Apply(ctx, ChartCilium, helm.StateDeleted, nil); err != nil {
err = fmt.Errorf("failed to uninstall network: %w", err)
return types.FeatureStatus{
Expand All @@ -44,7 +44,7 @@ func ApplyNetwork(ctx context.Context, snap snap.Snap, cfg types.Network, _ type
}, nil
}

ipv4CIDR, ipv6CIDR, err := utils.ParseCIDRs(cfg.GetPodCIDR())
ipv4CIDR, ipv6CIDR, err := utils.ParseCIDRs(network.GetPodCIDR())
if err != nil {
err = fmt.Errorf("invalid kube-proxy --cluster-cidr value: %v", err)
return types.FeatureStatus{
Expand Down Expand Up @@ -87,10 +87,17 @@ func ApplyNetwork(ctx context.Context, snap snap.Snap, cfg types.Network, _ type
"clusterPoolIPv6PodCIDRList": ipv6CIDR,
},
},
// https://docs.cilium.io/en/v1.15/network/kubernetes/kubeproxy-free/#kube-proxy-hybrid-modes
"nodePort": map[string]any{
"enabled": true,
// kube-proxy also binds to the same port for health checks so we need to disable it
"enableHealthCheck": false,
},
"disableEnvoyVersionCheck": true,
// socketLB requires an endpoint to the apiserver that's not managed by the kube-proxy
// so we point to the localhost:secureport to talk to either the kube-apiserver or the kube-apiserver-proxy
"k8sServiceHost": network.GetLocalhostAddress(),
"k8sServicePort": apiserver.GetSecurePort(),
}

if snap.Strict() {
Expand Down
Loading

0 comments on commit 9e0a43c

Please sign in to comment.