Skip to content

Commit

Permalink
refactor: remove use of k8s service
Browse files Browse the repository at this point in the history
  • Loading branch information
phillebaba committed May 23, 2024
1 parent 42e1bf9 commit 64642ab
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 217 deletions.
50 changes: 36 additions & 14 deletions src/pkg/cluster/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/google/go-containerregistry/pkg/crane"
"github.com/mholt/archiver/v3"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -156,7 +157,11 @@ func (c *Cluster) StopInjectionMadness(ctx context.Context) error {
}

// Remove the injector service
return c.DeleteService(ctx, ZarfNamespaceName, "zarf-injector")
err := c.Clientset.CoreV1().Services(ZarfNamespaceName).Delete(ctx, "zarf-injector", metav1.DeleteOptions{})
if err != nil {
return err
}
return nil
}

func (c *Cluster) loadSeedImages(imagesDir, seedImagesDir string, injectorSeedSrcs []string, spinner *message.Spinner) ([]transform.Image, error) {
Expand Down Expand Up @@ -306,20 +311,37 @@ func (c *Cluster) createInjectorConfigMap(ctx context.Context, binaryPath string
}

func (c *Cluster) createService(ctx context.Context) (*corev1.Service, error) {
service := c.GenerateService(ZarfNamespaceName, "zarf-injector")

service.Spec.Type = corev1.ServiceTypeNodePort
service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{
Port: int32(5000),
})
service.Spec.Selector = map[string]string{
"app": "zarf-injector",
svc := &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: "zarf-injector",
Namespace: ZarfNamespaceName,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Ports: []corev1.ServicePort{
{
Port: int32(5000),
},
},
Selector: map[string]string{
"app": "zarf-injector",
},
},
}

// Attempt to purse the service silently
_ = c.DeleteService(ctx, ZarfNamespaceName, "zarf-injector")

return c.CreateService(ctx, service)
// TODO: Replace with create or update
err := c.Clientset.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
if err != nil && !kerrors.IsNotFound(err) {
return nil, err
}
svc, err = c.Clientset.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return svc, nil
}

// buildInjectionPod return a pod for injection with the appropriate containers to perform the injection.
Expand Down
89 changes: 75 additions & 14 deletions src/pkg/cluster/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ package cluster
import (
"context"
"fmt"
"net/url"
"strconv"
"strings"

"github.com/defenseunicorns/zarf/src/types"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/defenseunicorns/pkg/helpers"
"github.com/defenseunicorns/zarf/src/config"
"github.com/defenseunicorns/zarf/src/pkg/k8s"
"github.com/defenseunicorns/zarf/src/pkg/message"
v1 "k8s.io/api/core/v1"
"github.com/defenseunicorns/zarf/src/types"
)

// Zarf specific connect strings
Expand Down Expand Up @@ -56,25 +60,30 @@ func NewTunnelInfo(namespace, resourceType, resourceName, urlSuffix string, loca

// PrintConnectTable will print a table of all Zarf connect matches found in the cluster.
func (c *Cluster) PrintConnectTable(ctx context.Context) error {
list, err := c.GetServicesByLabelExists(ctx, v1.NamespaceAll, config.ZarfConnectLabelName)
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{{
Operator: metav1.LabelSelectorOpExists,
Key: config.ZarfConnectLabelName,
}},
})
if err != nil {
return err
}
serviceList, err := c.Clientset.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}

connections := make(types.ConnectStrings)

for _, svc := range list.Items {
for _, svc := range serviceList.Items {
name := svc.Labels[config.ZarfConnectLabelName]

// Add the connectString for processing later in the deployment.
connections[name] = types.ConnectString{
Description: svc.Annotations[config.ZarfConnectAnnotationDescription],
URL: svc.Annotations[config.ZarfConnectAnnotationURL],
}
}

message.PrintConnectStringTable(connections)

return nil
}

Expand Down Expand Up @@ -151,11 +160,15 @@ func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInf
return "", tunnel, err
}
} else {
svcInfo, err := c.ServiceInfoFromNodePortURL(ctx, registryInfo.Address)
serviceList, err := c.Clientset.CoreV1().Services("").List(ctx, metav1.ListOptions{})
if err != nil {
return "", nil, err
}
namespace, name, port, err := serviceInfoFromNodePortURL(serviceList.Items, registryInfo.Address)

// If this is a service (no error getting svcInfo), create a port-forward tunnel to that resource
if err == nil {
if tunnel, err = c.NewTunnel(svcInfo.Namespace, k8s.SvcResource, svcInfo.Name, "", 0, svcInfo.Port); err != nil {
if tunnel, err = c.NewTunnel(namespace, k8s.SvcResource, name, "", 0, port); err != nil {
return "", tunnel, err
}
}
Expand All @@ -179,14 +192,23 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu

message.Debugf("Looking for a Zarf Connect Label in the cluster")

matches, err := c.GetServicesByLabel(ctx, "", config.ZarfConnectLabelName, name)
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{
config.ZarfConnectLabelName: name,
},
})
if err != nil {
return zt, fmt.Errorf("unable to lookup the service: %w", err)
return TunnelInfo{}, err
}
listOpts := metav1.ListOptions{LabelSelector: selector.String()}
serviceList, err := c.Clientset.CoreV1().Services("").List(ctx, listOpts)
if err != nil {
return TunnelInfo{}, err
}

if len(matches.Items) > 0 {
if len(serviceList.Items) > 0 {
// If there is a match, use the first one as these are supposed to be unique.
svc := matches.Items[0]
svc := serviceList.Items[0]

// Reset based on the matched params.
zt.resourceType = k8s.SvcResource
Expand All @@ -209,3 +231,42 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu

return zt, nil
}

// TODO: Refactor to use netip.AddrPort instead of a string for nodePortURL.
func serviceInfoFromNodePortURL(services []corev1.Service, nodePortURL string) (string, string, int, error) {
// Attempt to parse as normal, if this fails add a scheme to the URL (docker registries don't use schemes)
parsedURL, err := url.Parse(nodePortURL)
if err != nil {
parsedURL, err = url.Parse("scheme://" + nodePortURL)
if err != nil {
return "", "", 0, err
}
}

// Match hostname against localhost ip/hostnames
hostname := parsedURL.Hostname()
if hostname != helpers.IPV4Localhost && hostname != "localhost" {
return "", "", 0, fmt.Errorf("node port services should be on localhost")
}

// Get the node port from the nodeportURL.
nodePort, err := strconv.Atoi(parsedURL.Port())
if err != nil {
return "", "", 0, err
}
if nodePort < 30000 || nodePort > 32767 {
return "", "", 0, fmt.Errorf("node port services should use the port range 30000-32767")
}

for _, svc := range services {
if svc.Spec.Type == "NodePort" {
for _, port := range svc.Spec.Ports {
if int(port.NodePort) == nodePort {
return svc.Namespace, svc.Name, int(port.Port), nil
}
}
}
}

return "", "", 0, fmt.Errorf("no matching node port services found")
}
127 changes: 127 additions & 0 deletions src/pkg/cluster/tunnel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2021-Present The Zarf Authors

package cluster

import (
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestServiceInfoFromNodePortURL(t *testing.T) {
t.Parallel()

tests := []struct {
name string
services []corev1.Service
nodePortURL string
expectedErr string
expectedNamespace string
expectedName string
expectedPort int
}{
{
name: "invalid node port",
nodePortURL: "example.com",
expectedErr: "node port services should be on localhost",
},
{
name: "invalid port range",
nodePortURL: "http://localhost:8080",
expectedErr: "node port services should use the port range 30000-32767",
},
{
name: "no services",
nodePortURL: "http://localhost:30001",
services: []corev1.Service{},
expectedErr: "no matching node port services found",
},
{
name: "found serivce",
nodePortURL: "http://localhost:30001",
services: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "wrong-type",
Namespace: "wrong-type",
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Port: 1111,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "wrong-node-port",
Namespace: "wrong-node-port",
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Ports: []corev1.ServicePort{
{
NodePort: 30002,
Port: 2222,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "good-service",
Namespace: "good-namespace",
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Ports: []corev1.ServicePort{
{
NodePort: 30001,
Port: 3333,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "too-late",
Namespace: "too-late",
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Ports: []corev1.ServicePort{
{
NodePort: 30001,
Port: 4444,
},
},
},
},
},
expectedNamespace: "good-namespace",
expectedName: "good-service",
expectedPort: 3333,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

namespace, name, port, err := serviceInfoFromNodePortURL(tt.services, tt.nodePortURL)
if tt.expectedErr != "" {
require.EqualError(t, err, tt.expectedErr)
return
}
require.NoError(t, err)
require.Equal(t, tt.expectedNamespace, namespace)
require.Equal(t, tt.expectedName, name)
require.Equal(t, tt.expectedPort, port)
})
}
}
Loading

0 comments on commit 64642ab

Please sign in to comment.