diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index a56eb25cc88..af09e0eb65b 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -25,6 +25,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "github.com/tektoncd/pipeline/pkg/workspace" appsv1 "k8s.io/api/apps/v1" @@ -39,92 +40,132 @@ import ( ) const ( - // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim + // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim // as a volume source and expect an Assistant StatefulSet, but couldn't create a StatefulSet. - ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "CouldntCreateOrUpdateAffinityAssistantstatefulSet" + ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace" featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant" ) -// createOrUpdateAffinityAssistants creates an Affinity Assistant StatefulSet for every workspace in the PipelineRun that -// use a PersistentVolumeClaim volume. This is done to achieve Node Affinity for all TaskRuns that -// share the workspace volume and make it possible for the tasks to execute parallel while sharing volume. -func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []v1.WorkspaceBinding, pr *v1.PipelineRun, namespace string) error { - logger := logging.FromContext(ctx) - cfg := config.FromContextOrDefaults(ctx) - +// createOrUpdateAffinityAssistantsPerAABehavior creates Affinity Assistant StatefulSets based on AffinityAssistantBehavior. +// This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume. +// If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for +// every taskrun in the pipelinerun that use the same PVC based volume. +// If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation, +// it creates one Affinity Assistant for the pipelinerun. +// Other AffinityAssitantBehaviors are invalid. +func (c *Reconciler) createOrUpdateAffinityAssistantsPerAABehavior(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error { var errs []error var unschedulableNodes sets.Set[string] = nil - for _, w := range wb { + + var claimTemplates []corev1.PersistentVolumeClaim + var claims []corev1.PersistentVolumeClaimVolumeSource + claimToWorkspace := map[*corev1.PersistentVolumeClaimVolumeSource]string{} + claimTemplatesToWorkspace := map[*corev1.PersistentVolumeClaim]string{} + + for _, w := range pr.Spec.Workspaces { if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil { continue } - - var claimTemplates []corev1.PersistentVolumeClaim - var claims []corev1.PersistentVolumeClaimVolumeSource if w.PersistentVolumeClaim != nil { - claims = append(claims, *w.PersistentVolumeClaim.DeepCopy()) + claim := w.PersistentVolumeClaim.DeepCopy() + claims = append(claims, *claim) + claimToWorkspace[claim] = w.Name } else if w.VolumeClaimTemplate != nil { claimTemplate := w.VolumeClaimTemplate.DeepCopy() - // PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `--0` claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr)) claimTemplates = append(claimTemplates, *claimTemplate) + claimTemplatesToWorkspace[claimTemplate] = w.Name } + } + + switch aaBehavior { + case aa.AffinityAssistantPerWorkspace: + for claim, workspaceName := range claimToWorkspace { + aaName := getAffinityAssistantName(workspaceName, pr.Name) + err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes) + errs = append(errs, err...) + } + for claimTemplate, workspaceName := range claimTemplatesToWorkspace { + aaName := getAffinityAssistantName(workspaceName, pr.Name) + err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, []corev1.PersistentVolumeClaim{*claimTemplate}, nil, unschedulableNodes) + errs = append(errs, err...) + } + case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation: + if claims != nil || claimTemplates != nil { + aaName := getAffinityAssistantName("", pr.Name) + err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes) + errs = append(errs, err...) + } + case aa.AffinityAssistantDisabled: + return fmt.Errorf("unexpected Affinity Assistant behavior %v", aa.AffinityAssistantDisabled) + } + + return errorutils.NewAggregate(errs) +} + +// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information. +// The VolumeClaimTemplates and Volumes of StatefulSet reference the resolved claimTemplates and claims respectively. +// It maintains a set of unschedulableNodes to detect and recreate Affinity Assistant in case of the node is cordoned to avoid pipelinerun deadlock. +func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affinityAssistantName string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string]) []error { + logger := logging.FromContext(ctx) + cfg := config.FromContextOrDefaults(ctx) - affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) - a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) - switch { - // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist - case apierrors.IsNotFound(err): - affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) - _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) + var errs []error + a, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) + switch { + // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist + case apierrors.IsNotFound(err): + affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) + _, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) + if err != nil { + errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err)) + } + if err == nil { + logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, pr.Namespace) + } + // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created + // this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation + // and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation + // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 + case err == nil && a != nil && a.Status.ReadyReplicas == 1: + if unschedulableNodes == nil { + ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{ + FieldSelector: "spec.unschedulable=true", + }) if err != nil { - errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err)) + errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err)) } - if err == nil { - logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace) + unschedulableNodes = sets.Set[string]{} + // maintain the list of nodes which are unschedulable + for _, n := range ns.Items { + unschedulableNodes.Insert(n.Name) } - // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created - // this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation - // and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation - // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 - case err == nil && a != nil && a.Status.ReadyReplicas == 1: - if unschedulableNodes == nil { - ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{ - FieldSelector: "spec.unschedulable=true", - }) - if err != nil { - errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err)) - } - unschedulableNodes = sets.Set[string]{} - // maintain the list of nodes which are unschedulable - for _, n := range ns.Items { - unschedulableNodes.Insert(n.Name) - } + } + if unschedulableNodes.Len() > 0 { + // get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1 + p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{}) + // ignore instead of failing if the affinity assistant pod was not found + if err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)) } - if unschedulableNodes.Len() > 0 { - // get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1 - p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{}) - // ignore instead of failing if the affinity assistant pod was not found - if err != nil && !apierrors.IsNotFound(err) { - errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)) - } - // check the node which hosts the affinity assistant pod if it is unschedulable or cordoned - if p != nil && unschedulableNodes.Has(p.Spec.NodeName) { - // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node - err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{}) - if err != nil { - errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err)) - } + // check the node which hosts the affinity assistant pod if it is unschedulable or cordoned + if p != nil && unschedulableNodes.Has(p.Spec.NodeName) { + // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node + err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{}) + if err != nil { + errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err)) } } - case err != nil: - errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } + case err != nil: + errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } - return errorutils.NewAggregate(errs) + + return errs } +// TODO(#6740)(WIP) implement cleanupAffinityAssistants for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation affinity assistant modes func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1.PipelineRun) error { // omit cleanup if the feature is disabled if c.isAffinityAssistantDisabled(ctx) { diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index 839c442b177..1bb9f37d963 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -19,6 +19,7 @@ package pipelinerun import ( "context" "errors" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -27,6 +28,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" "github.com/tektoncd/pipeline/pkg/workspace" "github.com/tektoncd/pipeline/test/diff" "github.com/tektoncd/pipeline/test/parse" @@ -45,43 +47,151 @@ import ( _ "knative.dev/pkg/system/testing" // Setup system.Namespace() ) -var workspaceName = "test-workspace" +var podSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodSpec{}, "Containers", "Affinity") +var statefulSetSpecFilter cmp.Option = cmpopts.IgnoreFields(appsv1.StatefulSetSpec{}, "Replicas", "Selector") +var podTemplateSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodTemplateSpec{}, "ObjectMeta") -var testPipelineRun = &v1.PipelineRun{ +var workspacePVCName = "test-workspace-pvc" +var workspaceVolumeClaimTemplateName = "test-workspace-vct" + +var testPRWithPVC = &v1.PipelineRun{ TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, ObjectMeta: metav1.ObjectMeta{ Name: "test-pipelinerun", }, Spec: v1.PipelineRunSpec{ Workspaces: []v1.WorkspaceBinding{{ - Name: workspaceName, + Name: workspacePVCName, PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ ClaimName: "myclaim", }, }}, }, } +var testPRWithVolumeClaimTemplate = &v1.PipelineRun{ + TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "pipelinerun-with-volumeClaimTemplate", + }, + Spec: v1.PipelineRunSpec{ + Workspaces: []v1.WorkspaceBinding{{ + Name: workspaceVolumeClaimTemplateName, + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }}, + }, +} +var testPRWithVolumeClaimTemplateAndPVC = &v1.PipelineRun{ + TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "pipelinerun-with-volumeClaimTemplate-and-pvc", + }, + Spec: v1.PipelineRunSpec{ + Workspaces: []v1.WorkspaceBinding{{ + Name: workspaceVolumeClaimTemplateName, + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }, { + Name: workspacePVCName, + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }}, + }, + }, +} +var testPRWithEmptyDir = &v1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-emptyDir"}, + Spec: v1.PipelineRunSpec{ + Workspaces: []v1.WorkspaceBinding{{ + Name: "EmptyDir Workspace", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }}, + }, +} -// TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant -// for a given PipelineRun -func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { +// TestCreateAndDeleteOfAffinityAssistantPerPipelineRun tests to create and delete an Affinity Assistant +// per pipelinerun for a given PipelineRun +func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { tests := []struct { name string pr *v1.PipelineRun - expectStatefulSetSpec []*appsv1.StatefulSetSpec + expectStatefulSetSpec *appsv1.StatefulSetSpec }{{ name: "PersistentVolumeClaim Workspace type", - pr: &v1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-pvc"}, - Spec: v1.PipelineRunSpec{ - Workspaces: []v1.WorkspaceBinding{{ - Name: "PersistentVolumeClaim Workspace", - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "myclaim", - }, - }}, + pr: testPRWithPVC, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{{ + Name: "workspace-0", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: "myclaim"}, + }, + }}, + }, + }, + }, + }, { + name: "VolumeClaimTemplate Workspace type", + pr: testPRWithVolumeClaimTemplate, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, + }}, + }, + }, { + name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", + pr: testPRWithVolumeClaimTemplateAndPVC, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, + }}, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{{ + Name: "workspace-0", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: "myclaim"}, + }, + }}, + }, }, }, + }, { + name: "other Workspace type", + pr: testPRWithEmptyDir, + expectStatefulSetSpec: nil, + }} + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + } + + err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun) + if err != nil { + t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerPipelineRun: %v", err) + } + + // validate StatefulSets from Affinity Assistant + expectAAName := getAffinityAssistantName("", tc.pr.Name) + validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec) + + // TODO(#6740)(WIP): test cleanupAffinityAssistants for coscheduling-pipelinerun mode when fully implemented + }) + } +} + +// TestCreateAndDeleteOfAffinityAssistantPerWorkspace tests to create and delete an Affinity Assistant +// per workspace for a given PipelineRun +func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { + tests := []struct { + name string + pr *v1.PipelineRun + expectStatefulSetSpec []*appsv1.StatefulSetSpec + }{{ + name: "PersistentVolumeClaim Workspace type", + pr: testPRWithPVC, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -96,39 +206,18 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { }}, }, { name: "VolumeClaimTemplate Workspace type", - pr: &v1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-volumeClaimTemplate"}, - Spec: v1.PipelineRunSpec{ - Workspaces: []v1.WorkspaceBinding{{ - Name: "VolumeClaimTemplate Workspace", - VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, - }}, - }, - }, + pr: testPRWithVolumeClaimTemplate, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{Name: "pvc-f0680e1c9c"}, + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}, }}, }, { name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", - pr: &v1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-volumeClaimTemplate-and-pvc"}, - Spec: v1.PipelineRunSpec{ - Workspaces: []v1.WorkspaceBinding{{ - Name: "VolumeClaimTemplate Workspace", - VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, - }, { - Name: "PersistentVolumeClaim Workspace", - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "myclaim", - }}, - }, - }, - }, + pr: testPRWithVolumeClaimTemplateAndPVC, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{Name: "pvc-f0680e1c9c"}, + ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}}, { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -142,16 +231,8 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { }, }}, }, { - name: "other Workspace type", - pr: &v1.PipelineRun{ - ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-with-emptyDir"}, - Spec: v1.PipelineRunSpec{ - Workspaces: []v1.WorkspaceBinding{{ - Name: "EmptyDir Workspace", - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }}, - }, - }, + name: "other Workspace type", + pr: testPRWithEmptyDir, expectStatefulSetSpec: nil, }} @@ -159,35 +240,20 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - c := Reconciler{ KubeClientSet: fakek8s.NewSimpleClientset(), } - err := c.createOrUpdateAffinityAssistants(ctx, tc.pr.Spec.Workspaces, tc.pr, tc.pr.Namespace) + err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, tc.pr, aa.AffinityAssistantPerWorkspace) if err != nil { - t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err) + t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerWorkspace: %v", err) } // validate StatefulSets from Affinity Assistant for i, w := range tc.pr.Spec.Workspaces { - expectAAName := getAffinityAssistantName(w.Name, tc.pr.Name) - aa, err := c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectAAName, metav1.GetOptions{}) if tc.expectStatefulSetSpec != nil { - if err != nil { - t.Fatalf("unexpected error when retrieving StatefulSet: %v", err) - } - - podSpecFilter := cmpopts.IgnoreFields(corev1.PodSpec{}, "Containers", "Affinity") - statefulSetSpecFilter := cmpopts.IgnoreFields(appsv1.StatefulSetSpec{}, "Replicas", "Selector") - podTemplateSpecFilter := cmpopts.IgnoreFields(corev1.PodTemplateSpec{}, "ObjectMeta") - if d := cmp.Diff(tc.expectStatefulSetSpec[i], &aa.Spec, statefulSetSpecFilter, podSpecFilter, podTemplateSpecFilter); d != "" { - t.Errorf("StatefulSetSpec diff: %s", diff.PrintWantGot(d)) - } - } else if !apierrors.IsNotFound(err) { - t.Errorf("unexpected error when retrieving StatefulSet which expects nil: %v", err) + expectAAName := getAffinityAssistantName(w.Name, tc.pr.Name) + validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec[i]) } } @@ -211,19 +277,37 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { } } +func TestCreateAndDeleteOfAffinityAssistantDisabled_Failure(t *testing.T) { + ctx := context.Background() + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + } + + wantErr := fmt.Errorf("unexpected Affinity Assistant behavior %s", aa.AffinityAssistantDisabled) + + err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, testPRWithPVC, aa.AffinityAssistantDisabled) + if err == nil { + t.Fatalf("expecting error: %v, but got nil", wantErr) + } + + if diff := cmp.Diff(wantErr.Error(), err.Error()); diff != "" { + t.Errorf("expected error mismatch: %v", diff) + } +} + // TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and // can migrate the affinity assistant pod to a healthy node so that the existing pipelineRun runs to competition func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { - expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name) + expectedAffinityAssistantName := getAffinityAssistantName(workspacePVCName, testPRWithPVC.Name) - aa := []*appsv1.StatefulSet{{ + ss := []*appsv1.StatefulSet{{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: expectedAffinityAssistantName, - Labels: getStatefulSetLabels(testPipelineRun, expectedAffinityAssistantName), + Labels: getStatefulSetLabels(testPRWithPVC, expectedAffinityAssistantName), }, Status: appsv1.StatefulSetStatus{ ReadyReplicas: 1, @@ -253,41 +337,41 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { data Data validatePodDeletion, expectedError bool }{{ - name: "createOrUpdateAffinityAssistants must ignore missing affinity assistant pod, this could be interim and must not fail the entire pipelineRun", + name: "createOrUpdateAffinityAssistantsPerWorkspace must ignore missing affinity assistant pod, this could be interim and must not fail the entire pipelineRun", data: Data{ - StatefulSets: aa, + StatefulSets: ss, Nodes: nodes, }, }, { - name: "createOrUpdateAffinityAssistants must delete an affinity assistant pod since the node on which its scheduled is marked as unschedulable", + name: "createOrUpdateAffinityAssistantsPerWorkspace must delete an affinity assistant pod since the node on which its scheduled is marked as unschedulable", data: Data{ - StatefulSets: aa, + StatefulSets: ss, Nodes: nodes, Pods: p, }, validatePodDeletion: true, }, { - name: "createOrUpdateAffinityAssistants must catch an error while listing nodes", + name: "createOrUpdateAffinityAssistantsPerWorkspace must catch an error while listing nodes", data: Data{ - StatefulSets: aa, + StatefulSets: ss, Nodes: nodes, }, verb: "list", resource: "nodes", expectedError: true, }, { - name: "createOrUpdateAffinityAssistants must catch an error while getting pods", + name: "createOrUpdateAffinityAssistantsPerWorkspace must catch an error while getting pods", data: Data{ - StatefulSets: aa, + StatefulSets: ss, Nodes: nodes, }, verb: "get", resource: "pods", expectedError: true, }, { - name: "createOrUpdateAffinityAssistants must catch an error while deleting pods", + name: "createOrUpdateAffinityAssistantsPerWorkspace must catch an error while deleting pods", data: Data{ - StatefulSets: aa, + StatefulSets: ss, Nodes: nodes, Pods: p, }, @@ -314,20 +398,19 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { return true, &corev1.Pod{}, errors.New("error listing/deleting pod") }) } - - err := c.createOrUpdateAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, testPRWithPVC, aa.AffinityAssistantPerWorkspace) if !tt.expectedError && err != nil { - t.Errorf("expected no error from createOrUpdateAffinityAssistants for the test \"%s\", but got: %v", tt.name, err) + t.Errorf("expected no error from createOrUpdateAffinityAssistantsPerWorkspace for the test \"%s\", but got: %v", tt.name, err) } // the affinity assistant pod must have been deleted when it was running on a cordoned node if tt.validatePodDeletion { - _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{}) + _, err = c.KubeClientSet.CoreV1().Pods(testPRWithPVC.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{}) if !apierrors.IsNotFound(err) { t.Errorf("expected a NotFound response, got: %v", err) } } if tt.expectedError && err == nil { - t.Errorf("expected error from createOrUpdateAffinityAssistants, but got no error") + t.Errorf("expected error from createOrUpdateAffinityAssistantsPerWorkspace, but got no error") } }) } @@ -650,7 +733,7 @@ func TestThatCleanupIsAvoidedIfAssistantIsDisabled(t *testing.T) { store := config.NewStore(logtesting.TestLogger(t)) store.OnConfigChanged(configMap) - _ = c.cleanupAffinityAssistants(store.ToContext(context.Background()), testPipelineRun) + _ = c.cleanupAffinityAssistants(store.ToContext(context.Background()), testPRWithPVC) if len(fakeClientSet.Actions()) != 0 { t.Errorf("Expected 0 k8s client requests, did %d request", len(fakeClientSet.Actions())) @@ -878,3 +961,18 @@ func seedTestData(d Data) (context.Context, Reconciler, func()) { } return ctx, c, cancel } + +func validateStatefulSetSpec(t *testing.T, ctx context.Context, c Reconciler, expectAAName string, expectStatefulSetSpec *appsv1.StatefulSetSpec) { + t.Helper() + aa, err := c.KubeClientSet.AppsV1().StatefulSets("").Get(ctx, expectAAName, metav1.GetOptions{}) + if expectStatefulSetSpec != nil { + if err != nil { + t.Fatalf("unexpected error when retrieving StatefulSet: %v", err) + } + if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, statefulSetSpecFilter, podSpecFilter, podTemplateSpecFilter); d != "" { + t.Errorf("StatefulSetSpec diff: %s", diff.PrintWantGot(d)) + } + } else if !apierrors.IsNotFound(err) { + t.Errorf("unexpected error when retrieving StatefulSet which expects nil: %v", err) + } +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 1c4700cb2f1..fac1e3c468e 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -36,6 +36,7 @@ import ( listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1" alpha1listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" beta1listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1" + "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" resolutionutil "github.com/tektoncd/pipeline/pkg/internal/resolution" "github.com/tektoncd/pipeline/pkg/pipelinerunmetrics" tknreconciler "github.com/tektoncd/pipeline/pkg/reconciler" @@ -608,9 +609,10 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod if !c.isAffinityAssistantDisabled(ctx) { // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - if err = c.createOrUpdateAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { + // TODO(#6740)(WIP): We only support AffinityAssistantPerWorkspace at the point. Implement different AffinityAssitantBehaviors based on `coscheduling` feature flag when adding end-to-end support. + if err = c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, pr, affinityassistant.AffinityAssistantPerWorkspace); err != nil { logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace, "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", pr.Namespace, pr.Name, err) return controller.NewPermanentError(err)