Skip to content

Commit

Permalink
[SPARK-49598][K8S] Support user-defined labels for OnDemand PVCs
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Currently when user sets
`volumes.persistentVolumeClaim.[VolumeName].options.claimName=OnDemand`
PVCs are created with only 1 label i.e. spark-app-selector = spark.app.id.
Objective of this PR is to allow support of custom labels for onDemand PVCs

### Why are the changes needed?
Changes are needed so users can set custom labels to PVCs

### Does this PR introduce _any_ user-facing change?
It does not break any existing behaviour but adds a new feature/improvement to enable custom label additions in ondemand PVCs

### How was this patch tested?
This was tested in internal/production k8 cluster

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48079 from prathit06/ondemand-pvc-labels.

Lead-authored-by: prathit06 <malik.prathit@gmail.com>
Co-authored-by: Prathit malik <53890994+prathit06@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
2 people authored and dongjoon-hyun committed Sep 12, 2024
1 parent 317eddb commit d2d293e
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 10 deletions.
18 changes: 18 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].label.[LabelName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -1218,6 +1227,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].label.[LabelName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"

val KUBERNETES_VOLUMES_LABEL_KEY = "label."
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."

val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None)
size: Option[String] = None,
labels: Option[Map[String, String]] = None)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,21 @@ object KubernetesVolumeUtils {
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"

val volumeLabelsMap = properties
.filter(_._1.startsWith(labelKey))
.map {
case (k, v) => k.replaceAll(labelKey, "") -> v
}

KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.getOrElse(subPathKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName))
volumeConf = parseVolumeSpecificConf(properties,
volumeType, volumeName, Option(volumeLabelsMap)))
}.toSeq
}

Expand All @@ -74,7 +82,8 @@ object KubernetesVolumeUtils {
private def parseVolumeSpecificConf(
options: Map[String, String],
volumeType: String,
volumeName: String): KubernetesVolumeSpecificConf = {
volumeName: String,
labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
Expand All @@ -91,7 +100,8 @@ object KubernetesVolumeUtils {
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey))
options.get(sizeLimitKey),
labels)

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate
Expand All @@ -86,12 +86,17 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i")
}
if (storageClass.isDefined && size.isDefined) {
val defaultVolumeLabels = Map(SPARK_APP_ID_LABEL -> conf.appId)
val volumeLabels = labels match {
case Some(customLabelsMap) => (customLabelsMap ++ defaultVolumeLabels).asJava
case None => defaultVolumeLabels.asJava
}
additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToLabels(volumeLabels)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,17 @@ object KubernetesTestConf {
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
val llabels = labels match {
case Some(value) => value.map { case(k, v) => s"label.$k" -> v }
case None => Map()
}
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf)
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++
sconf ++ lconf ++ llabels)

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,39 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf("claimName"))
KubernetesPVCVolumeConf("claimName", labels = Some(Map())))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with labels") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")
sparkConf.set("test.persistentVolumeClaim.volumeName.label.env", "test")
sparkConf.set("test.persistentVolumeClaim.volumeName.label.foo", "bar")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName",
labels = Some(Map("env" -> "test", "foo" -> "bar"))))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " +
"labels as empty Map if not provided") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map())))
}

test("Parses emptyDir volumes correctly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,79 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("SPARK-49598: Create and mounts persistentVolumeClaims in driver with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo" -> "bar", "env" -> "test")))
)

val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("SPARK-49598: Create and mounts persistentVolumeClaims in executors with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo1" -> "bar1", "env" -> "exec-test")))
)

val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("SPARK-49598: Mount multiple volumes to executor with labels") {
val pvcVolumeConf1 = KubernetesVolumeSpec(
"checkpointVolume1",
"/checkpoints1",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim1",
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo1" -> "bar1", "env1" -> "exec-test-1")))
)

val pvcVolumeConf2 = KubernetesVolumeSpec(
"checkpointVolume2",
"/checkpoints2",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim2",
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo2" -> "bar2", "env2" -> "exec-test-2")))
)

val kubernetesConf = KubernetesTestConf.createExecutorConf(
volumes = Seq(pvcVolumeConf1, pvcVolumeConf2))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
assert(configuredPod.container.getVolumeMounts.size() === 2)
}

test("Create and mount persistentVolumeClaims in executors") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
Expand Down

0 comments on commit d2d293e

Please sign in to comment.