Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes refactor #2794

Merged
merged 15 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/gorilla/securecookie v1.1.2
github.com/jellydator/ttlcache/v3 v3.1.1
github.com/joho/godotenv v1.5.1
github.com/kinbiko/jsonassert v1.1.1
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.19
github.com/moby/moby v24.0.7+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kinbiko/jsonassert v1.1.1 h1:DB12divY+YB+cVpHULLuKePSi6+ui4M/shHSzJISkSE=
github.com/kinbiko/jsonassert v1.1.1/go.mod h1:NO4lzrogohtIdNUNzx8sdzB55M4R4Q1bsrWVdqQ7C+A=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand Down
120 changes: 27 additions & 93 deletions pipeline/backend/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/urfave/cli/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -46,16 +45,16 @@ const (
EngineName = "kubernetes"
)

var noContext = context.Background()
var defaultDeleteOptions = newDefaultDeleteOptions()

type kube struct {
ctx context.Context
client kubernetes.Interface
config *Config
config *config
goos string
}

type Config struct {
type config struct {
Namespace string
StorageClass string
VolumeSize string
Expand All @@ -68,10 +67,20 @@ type SecurityContextConfig struct {
RunAsNonRoot bool
}

func configFromCliContext(ctx context.Context) (*Config, error) {
func newDefaultDeleteOptions() metav1.DeleteOptions {
gracePeriodSeconds := int64(0) // immediately
propagationPolicy := metav1.DeletePropagationBackground

return metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &propagationPolicy,
}
}

func configFromCliContext(ctx context.Context) (*config, error) {
if ctx != nil {
if c, ok := ctx.Value(types.CliContext).(*cli.Context); ok {
config := Config{
config := config{
Namespace: c.String("backend-k8s-namespace"),
StorageClass: c.String("backend-k8s-storage-class"),
VolumeSize: c.String("backend-k8s-volume-size"),
Expand Down Expand Up @@ -151,12 +160,7 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")

for _, vol := range conf.Volumes {
pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx)
if err != nil {
return err
}

_, err = e.client.CoreV1().PersistentVolumeClaims(e.config.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
_, err := startVolume(ctx, e, vol.Name)
if err != nil {
return err
}
Expand All @@ -167,21 +171,10 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s
for _, stage := range conf.Stages {
if stage.Alias == "services" {
for _, step := range stage.Steps {
stepName, err := dnsName(step.Name)
svc, err := startService(ctx, e, step)
if err != nil {
return err
}
log.Trace().Str("pod-name", stepName).Msgf("Creating service: %s", step.Name)
svc, err := Service(e.config.Namespace, step.Name, step.Ports)
if err != nil {
return err
}

svc, err = e.client.CoreV1().Services(e.config.Namespace).Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
return err
}

extraHosts = append(extraHosts, step.Networks[0].Aliases[0]+":"+svc.Spec.ClusterIP)
}
}
Expand All @@ -199,13 +192,8 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s

// Start the pipeline step.
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
pod, err := Pod(e.config.Namespace, step, e.config.PodLabels, e.config.PodAnnotations, e.goos, e.config.SecurityContext)
if err != nil {
return err
}

log.Trace().Str("taskUUID", taskUUID).Msgf("Creating pod: %s", pod.Name)
_, err = e.client.CoreV1().Pods(e.config.Namespace).Create(ctx, pod, metav1.CreateOptions{})
log.Trace().Str("taskUUID", taskUUID).Msgf("Starting step: %s", step.Name)
_, err := startPod(ctx, e, step)
return err
}

Expand Down Expand Up @@ -341,92 +329,38 @@ func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string)
// return rc, nil
}

func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
podName, err := dnsName(step.Name)
if err != nil {
return err
}

log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping pod: %s", podName)

gracePeriodSeconds := int64(0) // immediately
dpb := metav1.DeletePropagationBackground

deleteOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &dpb,
}

if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(ctx, podName, deleteOpts); err != nil && !errors.IsNotFound(err) {
return err
}

return nil
func (e *kube) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name)
err := stopPod(e.ctx, e, step, defaultDeleteOptions)
return err
}

// Destroy the pipeline environment.
func (e *kube) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("Deleting Kubernetes primitives")

gracePeriodSeconds := int64(0) // immediately
dpb := metav1.DeletePropagationBackground

deleteOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &dpb,
}

// Use noContext because the ctx sent to this function will be canceled/done in case of error or canceled by user.
// Don't abort on 404 errors from k8s, they most likely mean that the pod hasn't been created yet, usually because pipeline was canceled before running all steps.
// Trace log them in case the info could be useful when troubleshooting.

for _, stage := range conf.Stages {
for _, step := range stage.Steps {
stepName, err := dnsName(step.Name)
err := stopPod(e.ctx, e, step, defaultDeleteOptions)
if err != nil {
return err
}
log.Trace().Msgf("Deleting pod: %s", stepName)
if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(noContext, stepName, deleteOpts); err != nil {
if !errors.IsNotFound(err) {
return err
}
}
}
}

for _, stage := range conf.Stages {
if stage.Alias == "services" {
for _, step := range stage.Steps {
log.Trace().Msgf("Deleting service: %s", step.Name)
svc, err := Service(e.config.Namespace, step.Name, step.Ports)
if step.Type == types.StepTypeService {
err := stopService(e.ctx, e, step, defaultDeleteOptions)
if err != nil {
return err
}
if err := e.client.CoreV1().Services(e.config.Namespace).Delete(noContext, svc.Name, deleteOpts); err != nil {
if errors.IsNotFound(err) {
log.Trace().Err(err).Msgf("Unable to delete service %s", svc.Name)
} else {
return err
}
}
}
}
}

for _, vol := range conf.Volumes {
pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx)
err := stopVolume(e.ctx, e, vol.Name, defaultDeleteOptions)
if err != nil {
return err
}
err = e.client.CoreV1().PersistentVolumeClaims(e.config.Namespace).Delete(noContext, pvc.Name, deleteOpts)
if err != nil {
if errors.IsNotFound(err) {
log.Trace().Err(err).Msgf("Unable to delete pvc %s", pvc.Name)
} else {
return err
}
}
}

return nil
Expand Down
Loading