Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49598][K8S] Support user-defined labels for OnDemand PVCs #48079

Closed
wants to merge 10 commits into from
18 changes: 18 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].label.[LabelName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -1218,6 +1227,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].label.[LabelName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"

val KUBERNETES_VOLUMES_LABEL_KEY = "label."
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."

val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None)
size: Option[String] = None,
labels: Option[Map[String, String]] = None)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,21 @@ object KubernetesVolumeUtils {
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"

val volumeLabelsMap = properties
.filter(_._1.startsWith(labelKey))
.map {
case (k, v) => k.replaceAll(labelKey, "") -> v
}

KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.getOrElse(subPathKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName))
volumeConf = parseVolumeSpecificConf(properties,
volumeType, volumeName, Option(volumeLabelsMap)))
}.toSeq
}

Expand All @@ -74,7 +82,8 @@ object KubernetesVolumeUtils {
private def parseVolumeSpecificConf(
options: Map[String, String],
volumeType: String,
volumeName: String): KubernetesVolumeSpecificConf = {
volumeName: String,
labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
Expand All @@ -91,7 +100,8 @@ object KubernetesVolumeUtils {
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey))
options.get(sizeLimitKey),
labels)

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate
Expand All @@ -86,12 +86,17 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i")
}
if (storageClass.isDefined && size.isDefined) {
val defaultVolumeLabels = Map(SPARK_APP_ID_LABEL -> conf.appId)
val volumeLabels = labels match {
case Some(customLabelsMap) => (customLabelsMap ++ defaultVolumeLabels).asJava
case None => defaultVolumeLabels.asJava
}
additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToLabels(volumeLabels)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,17 @@ object KubernetesTestConf {
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
val llabels = labels match {
case Some(value) => value.map { case(k, v) => s"label.$k" -> v }
case None => Map()
}
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf)
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++
sconf ++ lconf ++ llabels)

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,39 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf("claimName"))
KubernetesPVCVolumeConf("claimName", labels = Some(Map())))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with labels") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")
sparkConf.set("test.persistentVolumeClaim.volumeName.label.env", "test")
sparkConf.set("test.persistentVolumeClaim.volumeName.label.foo", "bar")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName",
labels = Some(Map("env" -> "test", "foo" -> "bar"))))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " +
"labels as empty Map if not provided") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map())))
}

test("Parses emptyDir volumes correctly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,79 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("SPARK-49598: Create and mounts persistentVolumeClaims in driver with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo" -> "bar", "env" -> "test")))
)

val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("SPARK-49598: Create and mounts persistentVolumeClaims in executors with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo1" -> "bar1", "env" -> "exec-test")))
)

val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("SPARK-49598: Mount multiple volumes to executor with labels") {
val pvcVolumeConf1 = KubernetesVolumeSpec(
"checkpointVolume1",
"/checkpoints1",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim1",
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo1" -> "bar1", "env1" -> "exec-test-1")))
)

val pvcVolumeConf2 = KubernetesVolumeSpec(
"checkpointVolume2",
"/checkpoints2",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim2",
storageClass = Some("gp3"),
size = Some("1Mi"),
labels = Some(Map("foo2" -> "bar2", "env2" -> "exec-test-2")))
)

val kubernetesConf = KubernetesTestConf.createExecutorConf(
volumes = Seq(pvcVolumeConf1, pvcVolumeConf2))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
assert(configuredPod.container.getVolumeMounts.size() === 2)
}

test("Create and mount persistentVolumeClaims in executors") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
Expand Down