Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49598][K8S] Support user-defined labels for OnDemand PVCs #48079

Closed
wants to merge 10 commits into from
18 changes: 18 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].label.[LabelName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -1218,6 +1227,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].label.[LabelName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None)
size: Option[String] = None,
labels: Option[Map[String, String]] = None)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,21 @@ object KubernetesVolumeUtils {
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val labelsKey = s"$volumeType.$volumeName.label."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Change the variable name from labelsKey to labelKey to match the plural.
  • Define and use KUBERNETES_VOLUMES_LABEL_KEY at Config.scala like

val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"


val volumeSpecificLabelsMap = properties
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volumeSpecificLabelsMap -> volumeLabelsMap

.filter(_._1.startsWith(labelsKey))
.map {
case (k, v) => k.replaceAll(labelsKey, "") -> v
}

KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.getOrElse(subPathKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName))
volumeConf = parseVolumeSpecificConf(properties,
volumeType, volumeName, Option(volumeSpecificLabelsMap)))
}.toSeq
}

Expand All @@ -74,7 +82,8 @@ object KubernetesVolumeUtils {
private def parseVolumeSpecificConf(
options: Map[String, String],
volumeType: String,
volumeName: String): KubernetesVolumeSpecificConf = {
volumeName: String,
labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
Expand All @@ -91,7 +100,8 @@ object KubernetesVolumeUtils {
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey))
options.get(sizeLimitKey),
labels)

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate
Expand All @@ -86,12 +86,17 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i")
}
if (storageClass.isDefined && size.isDefined) {
val defaultVolumeLabels = Map(SPARK_APP_ID_LABEL -> conf.appId)
val volumeLabels = labels match {
case Some(customLabelsMap) => (customLabelsMap ++ defaultVolumeLabels).asJava
case None => defaultVolumeLabels.asJava
}
additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToLabels(volumeLabels)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,17 @@ object KubernetesTestConf {
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
val llabels = labels match {
case Some(value) => value.map { case(k, v) => s"label.$k" -> v }
case None => Map()
}
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf)
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++
sconf ++ lconf ++ llabels)

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
Expand All @@ -142,6 +147,7 @@ object KubernetesTestConf {
}
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_READONLY_KEY),
spec.mountReadOnly.toString)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this.

configs.foreach { case (k, v) =>
conf.set(key(vtype, spec.volumeName, k), v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
KubernetesPVCVolumeConf("claimName"))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with labels") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")
sparkConf.set("test.persistentVolumeClaim.volumeName.label.env", "test")
sparkConf.set("test.persistentVolumeClaim.volumeName.label.foo", "bar")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName",
labels = Some(Map("env" -> "test", "foo" -> "bar"))))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " +
"labels as empty Map if not provided") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map())))
}

test("Parses emptyDir volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"/tmp",
"",
true,
KubernetesPVCVolumeConf("OnDemand")
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't touch irrelevant line.

)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
Expand All @@ -131,6 +131,79 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("SPARK-49598: Create and mounts persistentVolumeClaims in driver with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Let's use the latest storage class; "gp" -> "gp3"

size = Some("1Mi"),
labels = Some(Map("foo" -> "bar", "env" -> "test")))
)

val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("SPARK-49598: Create and mounts persistentVolumeClaims in executors with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gp -> gp3

size = Some("1Mi"),
labels = Some(Map("foo1" -> "bar1", "env" -> "exec-test")))
)

val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("SPARK-49598: Mount multiple volumes to executor with labels") {
val pvcVolumeConf1 = KubernetesVolumeSpec(
"checkpointVolume1",
"/checkpoints1",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim1",
storageClass = Some("gp"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

size = Some("1Mi"),
labels = Some(Map("foo1" -> "bar1", "env1" -> "exec-test-1")))
)

val pvcVolumeConf2 = KubernetesVolumeSpec(
"checkpointVolume2",
"/checkpoints2",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim2",
storageClass = Some("gp"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

size = Some("1Mi"),
labels = Some(Map("foo2" -> "bar2", "env2" -> "exec-test-2")))
)

val kubernetesConf = KubernetesTestConf.createExecutorConf(
volumes = Seq(pvcVolumeConf1, pvcVolumeConf2))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
assert(configuredPod.container.getVolumeMounts.size() === 2)
}

test("Create and mount persistentVolumeClaims in executors") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
Expand Down