From d2d293e3fb57d6c9dea084b5fe6707d67c715af3 Mon Sep 17 00:00:00 2001 From: prathit06 Date: Thu, 12 Sep 2024 12:25:17 -0700 Subject: [PATCH] [SPARK-49598][K8S] Support user-defined labels for OnDemand PVCs ### What changes were proposed in this pull request? Currently when user sets `volumes.persistentVolumeClaim.[VolumeName].options.claimName=OnDemand` PVCs are created with only 1 label i.e. spark-app-selector = spark.app.id. Objective of this PR is to allow support of custom labels for onDemand PVCs ### Why are the changes needed? Changes are needed so users can set custom labels to PVCs ### Does this PR introduce _any_ user-facing change? It does not break any existing behaviour but adds a new feature/improvement to enable custom label additions in ondemand PVCs ### How was this patch tested? This was tested in internal/production k8 cluster ### Was this patch authored or co-authored using generative AI tooling? No Closes #48079 from prathit06/ondemand-pvc-labels. Lead-authored-by: prathit06 Co-authored-by: Prathit malik <53890994+prathit06@users.noreply.github.com> Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md | 18 +++++ .../org/apache/spark/deploy/k8s/Config.scala | 2 +- .../deploy/k8s/KubernetesVolumeSpec.scala | 3 +- .../deploy/k8s/KubernetesVolumeUtils.scala | 16 +++- .../features/MountVolumesFeatureStep.scala | 9 ++- .../spark/deploy/k8s/KubernetesTestConf.scala | 9 ++- .../k8s/KubernetesVolumeUtilsSuite.scala | 34 ++++++++- .../MountVolumesFeatureStepSuite.scala | 73 +++++++++++++++++++ 8 files changed, 154 insertions(+), 10 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c3c567e1b8224..d8be32e047717 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1182,6 +1182,15 @@ See the [configuration page](configuration.html) for information on Spark config 2.4.0 + + spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].label.[LabelName] + (none) + + Configure Kubernetes Volume labels passed to the Kubernetes with LabelName as key having specified value, must conform with Kubernetes label format. For example, + spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar. + + 4.0.0 + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path (none) @@ -1218,6 +1227,15 @@ See the [configuration page](configuration.html) for information on Spark config 2.4.0 + + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].label.[LabelName] + (none) + + Configure Kubernetes Volume labels passed to the Kubernetes with LabelName as key having specified value, must conform with Kubernetes label format. For example, + spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar. + + 4.0.0 + spark.kubernetes.local.dirs.tmpfs false diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 393ffc5674011..3a4d68c19014d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -776,7 +776,7 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium" val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit" val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server" - + val KUBERNETES_VOLUMES_LABEL_KEY = "label." val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index 3f7355de18911..9dfd40a773eb1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -24,7 +24,8 @@ private[spark] case class KubernetesHostPathVolumeConf(hostPath: String) private[spark] case class KubernetesPVCVolumeConf( claimName: String, storageClass: Option[String] = None, - size: Option[String] = None) + size: Option[String] = None, + labels: Option[Map[String, String]] = None) extends KubernetesVolumeSpecificConf private[spark] case class KubernetesEmptyDirVolumeConf( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index ee2108e8234d3..6463512c0114b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -45,13 +45,21 @@ object KubernetesVolumeUtils { val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY" val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY" val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY" + val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY" + + val volumeLabelsMap = properties + .filter(_._1.startsWith(labelKey)) + .map { + case (k, v) => k.replaceAll(labelKey, "") -> v + } KubernetesVolumeSpec( volumeName = volumeName, mountPath = properties(pathKey), mountSubPath = properties.getOrElse(subPathKey, ""), mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean), - volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName)) + volumeConf = parseVolumeSpecificConf(properties, + volumeType, volumeName, Option(volumeLabelsMap))) }.toSeq } @@ -74,7 +82,8 @@ object KubernetesVolumeUtils { private def parseVolumeSpecificConf( options: Map[String, String], volumeType: String, - volumeName: String): KubernetesVolumeSpecificConf = { + volumeName: String, + labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = { volumeType match { case KUBERNETES_VOLUMES_HOSTPATH_TYPE => val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" @@ -91,7 +100,8 @@ object KubernetesVolumeUtils { KubernetesPVCVolumeConf( options(claimNameKey), options.get(storageClassKey), - options.get(sizeLimitKey)) + options.get(sizeLimitKey), + labels) case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 72cc012a6bdd0..5cc61c746b0e0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath, "")) - case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) => + case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) => val claimName = conf match { case c: KubernetesExecutorConf => claimNameTemplate @@ -86,12 +86,17 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) .replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i") } if (storageClass.isDefined && size.isDefined) { + val defaultVolumeLabels = Map(SPARK_APP_ID_LABEL -> conf.appId) + val volumeLabels = labels match { + case Some(customLabelsMap) => (customLabelsMap ++ defaultVolumeLabels).asJava + case None => defaultVolumeLabels.asJava + } additionalResources.append(new PersistentVolumeClaimBuilder() .withKind(PVC) .withApiVersion("v1") .withNewMetadata() .withName(claimName) - .addToLabels(SPARK_APP_ID_LABEL, conf.appId) + .addToLabels(volumeLabels) .endMetadata() .withNewSpec() .withStorageClassName(storageClass.get) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala index b70b9348d23b4..7e0a65bcdda90 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala @@ -117,12 +117,17 @@ object KubernetesTestConf { (KUBERNETES_VOLUMES_HOSTPATH_TYPE, Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path)) - case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) => + case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) => val sconf = storageClass .map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap + val llabels = labels match { + case Some(value) => value.map { case(k, v) => s"label.$k" -> v } + case None => Map() + } (KUBERNETES_VOLUMES_PVC_TYPE, - Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf) + Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ + sconf ++ lconf ++ llabels) case KubernetesEmptyDirVolumeConf(medium, sizeLimit) => val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index fdc1aae0d4109..5c103739d3082 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -56,7 +56,39 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly) assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === - KubernetesPVCVolumeConf("claimName")) + KubernetesPVCVolumeConf("claimName", labels = Some(Map()))) + } + + test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with labels") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") + sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName") + sparkConf.set("test.persistentVolumeClaim.volumeName.label.env", "test") + sparkConf.set("test.persistentVolumeClaim.volumeName.label.foo", "bar") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === + KubernetesPVCVolumeConf(claimName = "claimName", + labels = Some(Map("env" -> "test", "foo" -> "bar")))) + } + + test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " + + "labels as empty Map if not provided") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") + sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === + KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()))) } test("Parses emptyDir volumes correctly") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 54796def95e53..6a68898c5f61c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -131,6 +131,79 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0")) } + test("SPARK-49598: Create and mounts persistentVolumeClaims in driver with labels") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + true, + KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND, + storageClass = Some("gp3"), + size = Some("1Mi"), + labels = Some(Map("foo" -> "bar", "env" -> "test"))) + ) + + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0")) + } + + test("SPARK-49598: Create and mounts persistentVolumeClaims in executors with labels") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + true, + KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND, + storageClass = Some("gp3"), + size = Some("1Mi"), + labels = Some(Map("foo1" -> "bar1", "env" -> "exec-test"))) + ) + + val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf)) + val executorStep = new MountVolumesFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(executorPod.pod.getSpec.getVolumes.size() === 1) + val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0")) + } + + test("SPARK-49598: Mount multiple volumes to executor with labels") { + val pvcVolumeConf1 = KubernetesVolumeSpec( + "checkpointVolume1", + "/checkpoints1", + "", + true, + KubernetesPVCVolumeConf(claimName = "pvcClaim1", + storageClass = Some("gp3"), + size = Some("1Mi"), + labels = Some(Map("foo1" -> "bar1", "env1" -> "exec-test-1"))) + ) + + val pvcVolumeConf2 = KubernetesVolumeSpec( + "checkpointVolume2", + "/checkpoints2", + "", + true, + KubernetesPVCVolumeConf(claimName = "pvcClaim2", + storageClass = Some("gp3"), + size = Some("1Mi"), + labels = Some(Map("foo2" -> "bar2", "env2" -> "exec-test-2"))) + ) + + val kubernetesConf = KubernetesTestConf.createExecutorConf( + volumes = Seq(pvcVolumeConf1, pvcVolumeConf2)) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 2) + assert(configuredPod.container.getVolumeMounts.size() === 2) + } + test("Create and mount persistentVolumeClaims in executors") { val volumeConf = KubernetesVolumeSpec( "testVolume",