Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49598][K8S] Support user-defined labels for OnDemand PVCs #48079

Closed
wants to merge 10 commits into from
18 changes: 18 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.label.[VolumeType].[VolumeName].[LabelName]</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well this looks wrong to me because label could be interpreted as [VolumeType] in the existing pattern.

Can we follow options location like you did previously? I expected the following specifically

spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].label.[LabelName]

<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.driver.volumes.label.persistentVolumeClaim.checkpointpvc.foo=bar</code>.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this too.

- spark.kubernetes.driver.volumes.label.persistentVolumeClaim.checkpointpvc.foo=bar
+ spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.label.foo=bar

</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -1218,6 +1227,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.label.[VolumeType].[VolumeName].[LabelName]</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> labels passed to the Kubernetes with <code>LabelName</code> as key having specified value, must conform with Kubernetes label format. For example,
<code>spark.kubernetes.executor.volumes.label.persistentVolumeClaim.checkpointpvc.foo=bar</code>.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None)
size: Option[String] = None,
labels: Map[String, String] = Map())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the existing semantic of Option.

- labels: Map[String, String] = Map())
+ labels: Option[Map[String, String]] = None)

extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,31 @@ object KubernetesVolumeUtils {
*/
@Since("3.0.0")
def parseVolumesWithPrefix(sparkConf: SparkConf, prefix: String): Seq[KubernetesVolumeSpec] = {
// since volume prefix for volume coincides with volume labels prefix, filtering out
// required config
val properties = sparkConf.getAllWithPrefix(prefix).toMap
val volumeProperties = properties.filter(!_._1.startsWith("label"))
val labelsProperties = properties.filter(_._1.startsWith("label"))

getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
getVolumeTypesAndNames(volumeProperties).map { case (volumeType, volumeName) =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val labelsSubKey = s"label.$volumeType.$volumeName."

val volumeSpecificLabelsMap = labelsProperties
.filter(_._1.startsWith(labelsSubKey))
.map {
case (k, v) => k.replaceAll(labelsSubKey, "") -> v
}

KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = properties(pathKey),
mountSubPath = properties.getOrElse(subPathKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName))
mountPath = volumeProperties(pathKey),
mountSubPath = volumeProperties.getOrElse(subPathKey, ""),
mountReadOnly = volumeProperties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(volumeProperties,
volumeType, volumeName, volumeSpecificLabelsMap))
}.toSeq
}

Expand All @@ -74,7 +86,8 @@ object KubernetesVolumeUtils {
private def parseVolumeSpecificConf(
options: Map[String, String],
volumeType: String,
volumeName: String): KubernetesVolumeSpecificConf = {
volumeName: String,
labels: Map[String, String]): KubernetesVolumeSpecificConf = {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
Expand All @@ -91,7 +104,8 @@ object KubernetesVolumeUtils {
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey))
options.get(sizeLimitKey),
labels)

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config.KUBERNETES_USE_LEGACY_PVC_ACCESS_MODE
import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, SPARK_APP_ID_LABEL}
import org.apache.spark.internal.Logging

private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
extends KubernetesFeatureConfigStep with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this after removing logDebug.

import MountVolumesFeatureStep._

val additionalResources = ArrayBuffer.empty[HasMetadata]
Expand Down Expand Up @@ -74,7 +75,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate
Expand All @@ -86,12 +87,15 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i")
}
if (storageClass.isDefined && size.isDefined) {
val volumeLabels = (labels ++
Map(SPARK_APP_ID_LABEL -> conf.appId)).asJava
logDebug(s"Adding $volumeLabels to $claimName PVC ")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this.

additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToLabels(volumeLabels)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,30 @@ object KubernetesTestConf {
}
}

def key(vtype: String, vname: String, subkey: String, prefix: String): String = {
s"${prefix}$vtype.$vname.$subkey"
}

private def setVolumeSpecs(
conf: SparkConf,
prefix: String,
volumes: Seq[KubernetesVolumeSpec]): Unit = {
def key(vtype: String, vname: String, subkey: String): String = {
s"${prefix}$vtype.$vname.$subkey"
}

val labelSubKey = "label"
volumes.foreach { case spec =>
val (vtype, configs) = spec.volumeConf match {
case KubernetesHostPathVolumeConf(path) =>
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
val llabels = labels.map { case(k, v) => s"$labelSubKey.$k" -> v }
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf)
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++
sconf ++ lconf ++ llabels)

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
Expand All @@ -135,15 +139,23 @@ object KubernetesTestConf {
KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY -> server))
}

conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_PATH_KEY), spec.mountPath)
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_PATH_KEY,
prefix), spec.mountPath)
if (spec.mountSubPath.nonEmpty) {
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY),
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY, prefix),
spec.mountSubPath)
}
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_READONLY_KEY),
conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_READONLY_KEY, prefix),
spec.mountReadOnly.toString)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this.

configs.foreach { case (k, v) =>
conf.set(key(vtype, spec.volumeName, k), v)
if (k.startsWith(s"$labelSubKey.")) {
conf.set(key(vtype, spec.volumeName, k.replaceAll(s"$labelSubKey.",
""), prefix + s"$labelSubKey."), v)
}
else {
conf.set(key(vtype, spec.volumeName, k, prefix), v)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,37 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
KubernetesPVCVolumeConf("claimName"))
}

test("Parses persistentVolumeClaim volumes correctly with labels") {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test prefix.

- test("Parses persistentVolumeClaim volumes correctly with labels") {
+ test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with labels") {

val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")
sparkConf.set("test.label.persistentVolumeClaim.volumeName.env", "test")
sparkConf.set("test.label.persistentVolumeClaim.volumeName.foo", "bar")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName",
labels = Map("env" -> "test", "foo" -> "bar")))
}

test("Parses persistentVolumeClaim volumes & puts labels as empty Map if not provided") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test prefix.

val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName", labels = Map()))
}

test("Parses emptyDir volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"/tmp",
"",
true,
KubernetesPVCVolumeConf("OnDemand")
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't touch irrelevant line.

)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
Expand All @@ -131,6 +131,78 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("Create and mounts persistentVolumeClaims in driver with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND,
labels = Map("foo" -> "bar", "env" -> "test"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is supported by language, we recommend to keep the parameter order. labels is the last parameter.

size = Some("1Mi"),
storageClass = Some("gp"))
)

val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("Create and mounts persistentVolumeClaims in executors with labels") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND,
labels = Map("foo1" -> "bar1", "env" -> "exec-test"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

size = Some("1Mi"),
storageClass = Some("gp"))
)

val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("Mounts multiple volumes to executor with labels") {
val pvcVolumeConf1 = KubernetesVolumeSpec(
"checkpointVolume1",
"/checkpoints1",
"",
true,
KubernetesPVCVolumeConf("pvcClaim1",
labels = Map("foo1" -> "bar1", "env1" -> "exec-test-1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

size = Some("1Mi"),
storageClass = Some("gp"))
)
val pvcVolumeConf2 = KubernetesVolumeSpec(
"checkpointVolume2",
"/checkpoints2",
"",
true,
KubernetesPVCVolumeConf("pvcClaim2",
labels = Map("foo2" -> "bar2", "env2" -> "exec-test-2"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

size = Some("1Mi"),
storageClass = Some("gp"))
)

val kubernetesConf = KubernetesTestConf.createExecutorConf(
volumes = Seq(pvcVolumeConf1, pvcVolumeConf2))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
assert(configuredPod.container.getVolumeMounts.size() === 2)
}

test("Create and mount persistentVolumeClaims in executors") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
Expand Down