Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49598][K8S] Support user-defined labels for OnDemand PVCs #48079

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"
val KUBERNETES_VOLUMES_OPTIONS_LABELS_KEY = "options.labels"

val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None)
size: Option[String] = None,
labels: Map[String, String] = Map())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the existing semantic of Option.

- labels: Map[String, String] = Map())
+ labels: Option[Map[String, String]] = None)

extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ object KubernetesVolumeUtils {
val storageClassKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
val labelsKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_LABELS_KEY"
verifyOptionKey(options, claimNameKey, KUBERNETES_VOLUMES_PVC_TYPE)
verifySize(options.get(sizeLimitKey))
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey))
options.get(sizeLimitKey),
convertStringLabelsToMap(options.get(labelsKey)))

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
Expand Down Expand Up @@ -127,4 +129,20 @@ object KubernetesVolumeUtils {
}
}
}

/**
* Converts string of labels to consumable java Map
*
* @param labels labels in format : k1=v1,k2=v2
* @return Map[k1->v1, k2->v2]
*/
private def convertStringLabelsToMap(labels: Option[String]): Map[String, String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we don't need this helper.

labels match {
case Some(value) =>
value.split(",").map(_.split("=")).collect {
case Array(k, v) => k -> v
}.toMap
case None => Map()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate
Expand All @@ -91,7 +91,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
.addToLabels((labels ++ Map(SPARK_APP_ID_LABEL -> conf.appId)).asJava)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object KubernetesTestConf {
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,30 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.labels", "env=test,name=pvc-name")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf("claimName"))
KubernetesPVCVolumeConf(claimName = "claimName",
labels = Map("env" -> "test", "name" -> "pvc-name")))
}

test("Parses persistentVolumeClaim volumes & puts labels as empty Map if not in correct format") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.labels", "env:test,name:pvc-name")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName", labels = Map()))
}

test("Parses emptyDir volumes correctly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"/tmp",
"",
true,
KubernetesPVCVolumeConf("pvcClaim")
KubernetesPVCVolumeConf(claimName = "pvcClaim",
labels = Map("env" -> "test", "name" -> "pvc-name"))
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -120,7 +121,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"/tmp",
"",
true,
KubernetesPVCVolumeConf("OnDemand")
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't touch irrelevant line.

)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
Expand Down