Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PB-7476-sync: Merging commits from 1.2.15 to master and updating node label as per the best practices #398

Merged
merged 4 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/controllers/dataexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,6 +1903,11 @@ func startTransferJob(
psaJobUid = getAnnotationValue(dataExport, utils.PsaUIDKey)
psaJobGid = getAnnotationValue(dataExport, utils.PsaGIDKey)
}
nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.Rsync:
return drv.StartJob(
Expand Down Expand Up @@ -1947,6 +1952,7 @@ func startTransferJob(
drivers.WithExcludeFileList(excludeFileList),
drivers.WithPodDatapathType(podDataPath),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
Expand All @@ -1969,6 +1975,7 @@ func startTransferJob(
drivers.WithCertSecretNamespace(dataExport.Spec.Destination.Namespace),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
drivers.WithPodUserId(psaJobUid),
Expand Down Expand Up @@ -2397,6 +2404,11 @@ func startNfsCSIRestoreVolumeJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}
nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.NFSCSIRestore:
return drv.StartJob(
Expand All @@ -2411,6 +2423,7 @@ func startNfsCSIRestoreVolumeJob(
drivers.WithNfsSubPath(bl.Location.Path),
drivers.WithPodUserId(psaJobUid),
drivers.WithPodGroupId(psaJobGid),
drivers.WithNodeAffinity(nodeLabel),
)
}
return "", fmt.Errorf("unknown driver for nfs csi volume restore: %s", drv.Name())
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/resourceexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ func startNfsResourceJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}

nodeLabel, err := utils.GetNodeLabelFromDeployment(jobConfigMap, jobConfigMapNs, drivers.PxbJobNodeLabelKey)
if err != nil {
return "", err
}

switch drv.Name() {
case drivers.NFSBackup:
return drv.StartJob(
Expand All @@ -427,6 +433,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand All @@ -445,6 +452,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const (

var (
// ErrJobFailed is a know error for a data transfer job failure.
ErrJobFailed = fmt.Errorf("data transfer job failed")
ErrJobFailed = fmt.Errorf("data transfer job failed")
PxbJobNodeLabelKey = "pxb_job_node_affinity_label"
)

// Interface defines a data export driver behaviour.
Expand Down
60 changes: 36 additions & 24 deletions pkg/drivers/kopiabackup/kopiabackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func jobFor(
jobName string,
resources corev1.ResourceRequirements,
nodeName string,
live bool,
) (*batchv1.Job, error) {
backupName := jobName

Expand Down Expand Up @@ -401,15 +402,41 @@ func jobFor(
}
}

if len(nodeName) != 0 {
job.Spec.Template.Spec.NodeName = nodeName
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
if !live {
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}
} else {
accessModes, err := utils.GetAccessModeFromPvc(jobOption.SourcePVCName, jobOption.SourcePVCNamespace)
if err != nil {
return nil, err
}

var singleNodeMount bool
for _, val := range accessModes {
if val == "ReadWriteOnce" || val == "ReadWriteOncePod" {
singleNodeMount = true
break
}
}

if singleNodeMount && len(nodeName) != 0 {
job.Spec.Template.Spec.NodeName = nodeName
} else if !singleNodeMount {
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}
}
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down Expand Up @@ -493,8 +520,8 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
return nil, fmt.Errorf(errMsg)
}
var resourceNamespace string
var live bool
var nodeName string
var live bool
// filter out the pods that are create by us
for _, pod := range pods {
labels := pod.ObjectMeta.Labels
Expand All @@ -505,11 +532,12 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
// get the nodeName, if the pods is in Running state, So that we can schedule
// kopia job on the same node.
nodeName = pod.Spec.NodeName
live = true
break
}
}
resourceNamespace = jobOptions.Namespace
if err := utils.SetupServiceAccount(jobName, resourceNamespace, roleFor(live)); err != nil {
if err := utils.SetupServiceAccount(jobName, resourceNamespace, roleFor()); err != nil {
errMsg := fmt.Sprintf("error creating service account %s/%s: %v", resourceNamespace, jobName, err)
logrus.Errorf("%s: %v", fn, errMsg)
return nil, fmt.Errorf(errMsg)
Expand All @@ -519,10 +547,11 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
jobName,
resources,
nodeName,
live,
)
}

func roleFor(live bool) *rbacv1.Role {
func roleFor() *rbacv1.Role {
role := &rbacv1.Role{
Rules: []rbacv1.PolicyRule{
{
Expand All @@ -532,22 +561,5 @@ func roleFor(live bool) *rbacv1.Role {
},
},
}
// Only live backup, we will add the hostaccess and privilege option.
if live {
hostAccessRule := rbacv1.PolicyRule{
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
ResourceNames: []string{"hostaccess"},
Verbs: []string{"use"},
}
role.Rules = append(role.Rules, hostAccessRule)
PrivilegedRule := rbacv1.PolicyRule{
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
ResourceNames: []string{"privileged"},
Verbs: []string{"use"},
}
role.Rules = append(role.Rules, PrivilegedRule)
}
return role
}
6 changes: 6 additions & 0 deletions pkg/drivers/kopiarestore/kopiarestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func jobFor(
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

if drivers.CertFilePath != "" {
volumeMount := corev1.VolumeMount{
Name: utils.TLSCertMountVol,
Expand Down
6 changes: 6 additions & 0 deletions pkg/drivers/nfsbackup/nfsbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ func jobForBackupResource(
}
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
7 changes: 7 additions & 0 deletions pkg/drivers/nfscsirestore/nfscsirestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ func jobForRestoreCSISnapshot(
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down
7 changes: 7 additions & 0 deletions pkg/drivers/nfsrestore/nfsrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ func jobForRestoreResource(
if err != nil {
return nil, err
}

// Add node affinity to the job spec
job, err = utils.AddNodeAffinityToJob(job, jobOption)
if err != nil {
return nil, err
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
51 changes: 51 additions & 0 deletions pkg/drivers/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,20 @@ func GetNodeAffinityFromDeployment(name, namespace string) (*corev1.NodeAffinity
return deploy.Spec.Template.Spec.Affinity.NodeAffinity, nil
}

// GetNodeLabelFromDeployment gets node label from deployment
func GetNodeLabelFromDeployment(name, namespace, key string) (map[string]string, error) {
nodeLabel := make(map[string]string)
deploy, err := core.Instance().GetConfigMap(name, namespace)
if err != nil {
return nil, err
}
value, ok := deploy.Data[key]
if ok && value != "" {
nodeLabel[key] = value
}
return nodeLabel, nil
}

// IsJobPodMountFailed - checks for mount failure in a Job pod
func IsJobPodMountFailed(job *batchv1.Job, namespace string) bool {
fn := "IsJobPodMountFailed"
Expand Down Expand Up @@ -1127,3 +1141,40 @@ func PauseCleanupResource() (time.Duration, error) {
}
return pauseCleanupVal, nil
}

// AddNodeAffinityToJob adds node affinity to the job spec
func AddNodeAffinityToJob(job *batchv1.Job, jobOption drivers.JobOpts) (*batchv1.Job, error) {
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}
job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}
return job, nil
}

// GetAccessModeFromPvc gets the access modes of the pvc
func GetAccessModeFromPvc(srcPvcName, srcPvcNameSpace string) ([]corev1.PersistentVolumeAccessMode, error) {
srcPvc, err := core.Instance().GetPersistentVolumeClaim(srcPvcName, srcPvcNameSpace)
if err != nil {
return nil, err
}
accessModes := srcPvc.Status.AccessModes
return accessModes, nil
}
Loading