diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 39fadae90e5b..9bcf46d0d4f5 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import re from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, Any @@ -83,7 +82,7 @@ def __init__( image: str | None = None, code_path: str | None = None, namespace: str = "default", - name: str = "default", + name: str | None = None, application_file: str | None = None, template_spec=None, get_logs: bool = True, @@ -103,7 +102,6 @@ def __init__( self.code_path = code_path self.application_file = application_file self.template_spec = template_spec - self.name = self.create_job_name() self.kubernetes_conn_id = kubernetes_conn_id self.startup_timeout_seconds = startup_timeout_seconds self.reattach_on_restart = reattach_on_restart @@ -161,8 +159,13 @@ def manage_template_specs(self): return template_body def create_job_name(self): - initial_name = add_unique_suffix(name=self.task_id, max_len=MAX_LABEL_LEN) - return re.sub(r"[^a-z0-9-]+", "-", initial_name.lower()) + name = ( + self.name or self.template_body.get("spark", {}).get("metadata", {}).get("name") or self.task_id + ) + + updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN) + + return self._set_name(updated_name) @staticmethod def _get_pod_identifying_label_string(labels) -> str: @@ -282,6 +285,8 @@ def custom_obj_api(self) -> CustomObjectsApi: return CustomObjectsApi() def execute(self, context: Context): + self.name = self.create_job_name() + self.log.info("Creating sparkApplication.") self.launcher = CustomObjectLauncher( name=self.name, diff --git a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json new file mode 100644 index 000000000000..1504c40fbd1e --- /dev/null +++ b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json @@ -0,0 +1,57 @@ +{ + "apiVersion":"sparkoperator.k8s.io/v1beta2", + "kind":"SparkApplication", + "metadata":{ + "namespace":"default" + }, + "spec":{ + "type":"Scala", + "mode":"cluster", + "image":"gcr.io/spark-operator/spark:v2.4.5", + "imagePullPolicy":"Always", + "mainClass":"org.apache.spark.examples.SparkPi", + "mainApplicationFile":"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar", + "sparkVersion":"2.4.5", + "restartPolicy":{ + "type":"Never" + }, + "volumes":[ + { + "name":"test-volume", + "hostPath":{ + "path":"/tmp", + "type":"Directory" + } + } + ], + "driver":{ + "cores":1, + "coreLimit":"1200m", + "memory":"512m", + "labels":{ + "version":"2.4.5" + }, + "serviceAccount":"spark", + "volumeMounts":[ + { + "name":"test-volume", + "mountPath":"/tmp" + } + ] + }, + "executor":{ + "cores":1, + "instances":1, + "memory":"512m", + "labels":{ + "version":"2.4.5" + }, + "volumeMounts":[ + { + "name":"test-volume", + "mountPath":"/tmp" + } + ] + } + } +} diff --git a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml new file mode 100644 index 000000000000..91723980954e --- /dev/null +++ b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + namespace: default +spec: + type: Scala + mode: cluster + image: "gcr.io/spark-operator/spark:v2.4.5" + imagePullPolicy: Always + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar" + sparkVersion: "2.4.5" + restartPolicy: + type: Never + volumes: + - name: "test-volume" + hostPath: + path: "/tmp" + type: Directory + driver: + cores: 1 + coreLimit: "1200m" + memory: "512m" + labels: + version: 2.4.5 + serviceAccount: spark + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + executor: + cores: 1 + instances: 1 + memory: "512m" + labels: + version: 2.4.5 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py index bc8404aa8560..9c8c40de6558 100644 --- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -273,6 +273,149 @@ def test_create_application_from_yaml_json( version="v1beta2", ) + def test_create_application_from_yaml_json_and_use_name_from_metadata( + self, + mock_create_namespaced_crd, + mock_get_namespaced_custom_object_status, + mock_cleanup, + mock_create_job_name, + mock_get_kube_client, + mock_create_pod, + mock_await_pod_start, + mock_await_pod_completion, + mock_fetch_requested_container_logs, + data_file, + ): + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.yaml").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="create_app_and_use_name_from_metadata", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("default_yaml") + + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.json").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="create_app_and_use_name_from_metadata", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("default_json") + + def test_create_application_from_yaml_json_and_use_name_from_operator_args( + self, + mock_create_namespaced_crd, + mock_get_namespaced_custom_object_status, + mock_cleanup, + mock_create_job_name, + mock_get_kube_client, + mock_create_pod, + mock_await_pod_start, + mock_await_pod_completion, + mock_fetch_requested_container_logs, + data_file, + ): + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.yaml").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="default_yaml", + name="test-spark", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("test-spark") + + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.json").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="default_json", + name="test-spark", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("test-spark") + + def test_create_application_from_yaml_json_and_use_name_task_id( + self, + mock_create_namespaced_crd, + mock_get_namespaced_custom_object_status, + mock_cleanup, + mock_create_job_name, + mock_get_kube_client, + mock_create_pod, + mock_await_pod_start, + mock_await_pod_completion, + mock_fetch_requested_container_logs, + data_file, + ): + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test_with_no_name_from_config.yaml").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="create_app_and_use_name_from_task_id", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("create_app_and_use_name_from_task_id") + + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test_with_no_name_from_config.json").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="create_app_and_use_name_from_task_id", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("create_app_and_use_name_from_task_id") + def test_new_template_from_yaml( self, mock_create_namespaced_crd,