Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEP-0135: implement per-pipelinerun coscheduling #6819

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 100 additions & 59 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/workspace"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -39,92 +40,132 @@ import (
)

const (
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// as a volume source and expect an Assistant StatefulSet, but couldn't create a StatefulSet.
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "CouldntCreateOrUpdateAffinityAssistantstatefulSet"
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace"

featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant"
)

// createOrUpdateAffinityAssistants creates an Affinity Assistant StatefulSet for every workspace in the PipelineRun that
// use a PersistentVolumeClaim volume. This is done to achieve Node Affinity for all TaskRuns that
// share the workspace volume and make it possible for the tasks to execute parallel while sharing volume.
func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []v1.WorkspaceBinding, pr *v1.PipelineRun, namespace string) error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

// createOrUpdateAffinityAssistantsPerAABehavior creates Affinity Assistant StatefulSets based on AffinityAssistantBehavior.
// This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for
// every taskrun in the pipelinerun that use the same PVC based volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation,
// it creates one Affinity Assistant for the pipelinerun.
// Other AffinityAssitantBehaviors are invalid.
func (c *Reconciler) createOrUpdateAffinityAssistantsPerAABehavior(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error {
var errs []error
var unschedulableNodes sets.Set[string] = nil
for _, w := range wb {

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
claimToWorkspace := map[*corev1.PersistentVolumeClaimVolumeSource]string{}
claimTemplatesToWorkspace := map[*corev1.PersistentVolumeClaim]string{}

for _, w := range pr.Spec.Workspaces {
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
continue
}

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
if w.PersistentVolumeClaim != nil {
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
claim := w.PersistentVolumeClaim.DeepCopy()
claims = append(claims, *claim)
claimToWorkspace[claim] = w.Name
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
claimTemplates = append(claimTemplates, *claimTemplate)
claimTemplatesToWorkspace[claimTemplate] = w.Name
}
}

switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
aaName := getAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes)
errs = append(errs, err...)
}
for claimTemplate, workspaceName := range claimTemplatesToWorkspace {
aaName := getAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, []corev1.PersistentVolumeClaim{*claimTemplate}, nil, unschedulableNodes)
errs = append(errs, err...)
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
if claims != nil || claimTemplates != nil {
aaName := getAffinityAssistantName("", pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes)
errs = append(errs, err...)
}
case aa.AffinityAssistantDisabled:
return fmt.Errorf("unexpected Affinity Assistant behavior %v", aa.AffinityAssistantDisabled)
}

return errorutils.NewAggregate(errs)
}

// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information.
// The VolumeClaimTemplates and Volumes of StatefulSet reference the resolved claimTemplates and claims respectively.
// It maintains a set of unschedulableNodes to detect and recreate Affinity Assistant in case of the node is cordoned to avoid pipelinerun deadlock.
func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affinityAssistantName string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string]) []error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
var errs []error
a, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, pr.Namespace)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
return errorutils.NewAggregate(errs)

return errs
QuanZhang-William marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO(#6740)(WIP) implement cleanupAffinityAssistants for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation affinity assistant modes
func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1.PipelineRun) error {
// omit cleanup if the feature is disabled
if c.isAffinityAssistantDisabled(ctx) {
Expand Down
Loading