Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove use of k8s pods #2553

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions src/internal/packager/helm/zarf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/defenseunicorns/zarf/src/internal/packager/template"
"github.com/defenseunicorns/zarf/src/pkg/cluster"
"github.com/defenseunicorns/zarf/src/pkg/k8s"
"github.com/defenseunicorns/zarf/src/pkg/message"
"github.com/defenseunicorns/zarf/src/pkg/transform"
"github.com/defenseunicorns/zarf/src/pkg/utils"
Expand Down Expand Up @@ -79,18 +80,21 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error {
}

// Get the current agent image from one of its pods.
pods := h.cluster.WaitForPodsAndContainers(
ctx,
k8s.PodLookup{
Namespace: cluster.ZarfNamespaceName,
Selector: "app=agent-hook",
},
nil,
)
phillebaba marked this conversation as resolved.
Show resolved Hide resolved
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"app": "agent-hook"}})
if err != nil {
return err
}
listOpts := metav1.ListOptions{
LabelSelector: selector.String(),
}
podList, err := h.cluster.Clientset.CoreV1().Pods(cluster.ZarfNamespaceName).List(ctx, listOpts)
if err != nil {
return err
}

var currentAgentImage transform.Image
if len(pods) > 0 && len(pods[0].Spec.Containers) > 0 {
currentAgentImage, err = transform.ParseImageRef(pods[0].Spec.Containers[0].Image)
if len(podList.Items) > 0 && len(podList.Items[0].Spec.Containers) > 0 {
currentAgentImage, err = transform.ParseImageRef(podList.Items[0].Spec.Containers[0].Image)
if err != nil {
return fmt.Errorf("unable to parse current agent image reference: %w", err)
}
Expand Down Expand Up @@ -142,13 +146,21 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error {
defer spinner.Stop()

// Force pods to be recreated to get the updated secret.
err = h.cluster.DeletePods(
ctx,
k8s.PodLookup{
Namespace: cluster.ZarfNamespaceName,
Selector: "app=agent-hook",
},
)
// TODO: Explain why no grace period is given.
deleteGracePeriod := int64(0)
phillebaba marked this conversation as resolved.
Show resolved Hide resolved
deletePolicy := metav1.DeletePropagationForeground
deleteOpts := metav1.DeleteOptions{
GracePeriodSeconds: &deleteGracePeriod,
PropagationPolicy: &deletePolicy,
}
selector, err = metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"app": "agent-hook"}})
if err != nil {
return err
}
listOpts = metav1.ListOptions{
LabelSelector: selector.String(),
}
err = h.cluster.Clientset.CoreV1().Pods(cluster.ZarfNamespaceName).DeleteCollection(ctx, deleteOpts, listOpts)
if err != nil {
return fmt.Errorf("error recycling pods for the Zarf Agent: %w", err)
}
Expand Down
111 changes: 104 additions & 7 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,30 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/defenseunicorns/pkg/helpers/v2"

"github.com/defenseunicorns/zarf/src/config"
"github.com/defenseunicorns/zarf/src/pkg/k8s"
"github.com/defenseunicorns/zarf/src/pkg/layout"
"github.com/defenseunicorns/zarf/src/pkg/message"
"github.com/defenseunicorns/zarf/src/pkg/utils"
"github.com/defenseunicorns/zarf/src/pkg/utils/exec"
"github.com/defenseunicorns/zarf/src/types"
corev1 "k8s.io/api/core/v1"
)

// HandleDataInjection waits for the target pod(s) to come up and inject the data into them
// todo: this currently requires kubectl but we should have enough k8s work to make this native now.
func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) {
defer wg.Done()

injectionCompletionMarker := filepath.Join(componentPath.DataInjections, config.GetDataInjectionMarker())
if err := os.WriteFile(injectionCompletionMarker, []byte("🦄"), helpers.ReadWriteUser); err != nil {
message.WarnErrf(err, "Unable to create the data injection completion marker")
Expand Down Expand Up @@ -68,14 +72,14 @@ iterator:
}
}

target := k8s.PodLookup{
target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Wait until the pod we are injecting data into becomes available
pods := c.WaitForPodsAndContainers(ctx, target, podFilterByInitContainer)
pods := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if len(pods) < 1 {
continue
}
Expand Down Expand Up @@ -132,15 +136,15 @@ iterator:
}

// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := k8s.PodLookup{
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}

// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_ = c.WaitForPodsAndContainers(ctx, podOnlyTarget, podFilterByInitContainer)
_ = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)

// Cleanup now to reduce disk pressure
_ = os.RemoveAll(source)
Expand All @@ -149,3 +153,96 @@ iterator:
return
}
}

// podLookup is a struct for specifying a pod to target for data injection or lookups.
type podLookup struct {
Namespace string
Selector string
Container string
}

// podFilter is a function that returns true if the pod should be targeted for data injection or lookups.
type podFilter func(pod corev1.Pod) bool

// WaitForPodsAndContainers attempts to find pods matching the given selector and optional inclusion filter
// It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names
// If the timeout is reached, an empty list will be returned.
// TODO: Test, refactor and/or remove.
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) []corev1.Pod {
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()

timer := time.NewTimer(0)
defer timer.Stop()

for {
select {
case <-waitCtx.Done():
message.Debug("Pod lookup failed: %v", ctx.Err())
return nil
case <-timer.C:
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
message.Debug("Unable to find matching pods: %w", err)
return nil
}

message.Debug("Found %d pods for target %#v", len(podList.Items), target)

var readyPods = []corev1.Pod{}

// Sort the pods from newest to oldest
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time)
})

for _, pod := range podList.Items {
message.Debug("Testing pod %q", pod.Name)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Handle container targeting
if target.Container != "" {
message.Debug("Testing pod %q for container %q", pod.Name, target.Container)

// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if initContainer.Name == target.Container && isRunning {
// On running match in initContainer break this loop
readyPods = append(readyPods, pod)
break
}
}

// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if container.Name == target.Container && isRunning {
readyPods = append(readyPods, pod)
break
}
}
} else {
status := pod.Status.Phase
message.Debug("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
readyPods = append(readyPods, pod)
break
}
}
}
if len(readyPods) > 0 {
return readyPods
}
timer.Reset(3 * time.Second)
}
}
}
Loading
Loading