From 027fbea6e3fc515079c71ddca16faa2613803ea4 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Mon, 23 Sep 2024 23:20:16 +0100 Subject: [PATCH 1/5] use name parameter from spark yaml config or from operator argument parameter --- .../kubernetes/operators/spark_kubernetes.py | 26 ++++- .../operators/test_spark_kubernetes.py | 94 +++++++++++++++++++ 2 files changed, 116 insertions(+), 4 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 39fadae90e5b..e599eb1e4b9c 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -83,7 +83,7 @@ def __init__( image: str | None = None, code_path: str | None = None, namespace: str = "default", - name: str = "default", + name: str | None = None, application_file: str | None = None, template_spec=None, get_logs: bool = True, @@ -103,7 +103,6 @@ def __init__( self.code_path = code_path self.application_file = application_file self.template_spec = template_spec - self.name = self.create_job_name() self.kubernetes_conn_id = kubernetes_conn_id self.startup_timeout_seconds = startup_timeout_seconds self.reattach_on_restart = reattach_on_restart @@ -161,8 +160,25 @@ def manage_template_specs(self): return template_body def create_job_name(self): - initial_name = add_unique_suffix(name=self.task_id, max_len=MAX_LABEL_LEN) - return re.sub(r"[^a-z0-9-]+", "-", initial_name.lower()) + """ + Spark name is created based on the following order of precedence. + + Check the name argument in the operator parameters. + Check the name specified in the YAML file. + If neither is available, use the task ID as the name. + + It adds default random id characters for the name provided suffix. + """ + name = self.template_body.get("spark", {}).get("metadata", {}).get("name") + + if self.name: + name = self.name + elif not name: + name = self.task_id + + updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN) + + return re.sub(r"[^a-z0-9-]+", "-", updated_name.lower()) @staticmethod def _get_pod_identifying_label_string(labels) -> str: @@ -282,6 +298,8 @@ def custom_obj_api(self) -> CustomObjectsApi: return CustomObjectsApi() def execute(self, context: Context): + self.name = self.create_job_name() + self.log.info("Creating sparkApplication.") self.launcher = CustomObjectLauncher( name=self.name, diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py index bc8404aa8560..e81b9065e62c 100644 --- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -273,6 +273,100 @@ def test_create_application_from_yaml_json( version="v1beta2", ) + def test_create_application_from_yaml_json_and_use_name_from_metadata( + self, + mock_create_namespaced_crd, + mock_get_namespaced_custom_object_status, + mock_cleanup, + mock_create_job_name, + mock_get_kube_client, + mock_create_pod, + mock_await_pod_start, + mock_await_pod_completion, + mock_fetch_requested_container_logs, + data_file, + ): + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.yaml").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="default_yaml", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.json").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="default_json", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + + def test_create_application_from_yaml_json_and_use_name_from_operator_args( + self, + mock_create_namespaced_crd, + mock_get_namespaced_custom_object_status, + mock_cleanup, + mock_create_job_name, + mock_get_kube_client, + mock_create_pod, + mock_await_pod_start, + mock_await_pod_completion, + mock_fetch_requested_container_logs, + data_file, + ): + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.yaml").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="default_yaml", + name="test-spark", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("test-spark") + + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test.json").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="default_json", + name="test-spark", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("test-spark") + def test_new_template_from_yaml( self, mock_create_namespaced_crd, From 4f0646309519fc272866f7129f2b7faaf502b722 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Tue, 24 Sep 2024 18:11:03 +0100 Subject: [PATCH 2/5] update tests and name usage condition check --- .../kubernetes/operators/spark_kubernetes.py | 18 +++--------------- .../operators/test_spark_kubernetes.py | 6 ++++-- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index e599eb1e4b9c..e836c8b062ac 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -160,21 +160,9 @@ def manage_template_specs(self): return template_body def create_job_name(self): - """ - Spark name is created based on the following order of precedence. - - Check the name argument in the operator parameters. - Check the name specified in the YAML file. - If neither is available, use the task ID as the name. - - It adds default random id characters for the name provided suffix. - """ - name = self.template_body.get("spark", {}).get("metadata", {}).get("name") - - if self.name: - name = self.name - elif not name: - name = self.task_id + name = ( + self.name or self.template_body.get("spark", {}).get("metadata", {}).get("name") or self.task_id + ) updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN) diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py index e81b9065e62c..9fb617ca0d42 100644 --- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -289,7 +289,7 @@ def test_create_application_from_yaml_json_and_use_name_from_metadata( op = SparkKubernetesOperator( application_file=data_file("spark/application_test.yaml").as_posix(), kubernetes_conn_id="kubernetes_default_kube_config", - task_id="default_yaml", + task_id="create_app_and_use_name_from_metadata", ) context = create_context(op) op.execute(context) @@ -301,11 +301,12 @@ def test_create_application_from_yaml_json_and_use_name_from_metadata( plural="sparkapplications", version="v1beta2", ) + assert op.name.startswith("default_yaml") op = SparkKubernetesOperator( application_file=data_file("spark/application_test.json").as_posix(), kubernetes_conn_id="kubernetes_default_kube_config", - task_id="default_json", + task_id="create_app_and_use_name_from_metadata", ) context = create_context(op) op.execute(context) @@ -317,6 +318,7 @@ def test_create_application_from_yaml_json_and_use_name_from_metadata( plural="sparkapplications", version="v1beta2", ) + assert op.name.startswith("default_json") def test_create_application_from_yaml_json_and_use_name_from_operator_args( self, From a3aa6bbd6e006ef3cdb4a76d65bb5979c5745e6b Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Tue, 24 Sep 2024 22:34:18 +0100 Subject: [PATCH 3/5] adding test, to check spark name starts with task_id --- ...ication_test_with_no_name_from_config.json | 57 +++++++++++++++++++ ...ication_test_with_no_name_from_config.yaml | 55 ++++++++++++++++++ .../operators/test_spark_kubernetes.py | 47 +++++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json create mode 100644 tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml diff --git a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json new file mode 100644 index 000000000000..1504c40fbd1e --- /dev/null +++ b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json @@ -0,0 +1,57 @@ +{ + "apiVersion":"sparkoperator.k8s.io/v1beta2", + "kind":"SparkApplication", + "metadata":{ + "namespace":"default" + }, + "spec":{ + "type":"Scala", + "mode":"cluster", + "image":"gcr.io/spark-operator/spark:v2.4.5", + "imagePullPolicy":"Always", + "mainClass":"org.apache.spark.examples.SparkPi", + "mainApplicationFile":"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar", + "sparkVersion":"2.4.5", + "restartPolicy":{ + "type":"Never" + }, + "volumes":[ + { + "name":"test-volume", + "hostPath":{ + "path":"/tmp", + "type":"Directory" + } + } + ], + "driver":{ + "cores":1, + "coreLimit":"1200m", + "memory":"512m", + "labels":{ + "version":"2.4.5" + }, + "serviceAccount":"spark", + "volumeMounts":[ + { + "name":"test-volume", + "mountPath":"/tmp" + } + ] + }, + "executor":{ + "cores":1, + "instances":1, + "memory":"512m", + "labels":{ + "version":"2.4.5" + }, + "volumeMounts":[ + { + "name":"test-volume", + "mountPath":"/tmp" + } + ] + } + } +} diff --git a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml new file mode 100644 index 000000000000..91723980954e --- /dev/null +++ b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + namespace: default +spec: + type: Scala + mode: cluster + image: "gcr.io/spark-operator/spark:v2.4.5" + imagePullPolicy: Always + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar" + sparkVersion: "2.4.5" + restartPolicy: + type: Never + volumes: + - name: "test-volume" + hostPath: + path: "/tmp" + type: Directory + driver: + cores: 1 + coreLimit: "1200m" + memory: "512m" + labels: + version: 2.4.5 + serviceAccount: spark + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + executor: + cores: 1 + instances: 1 + memory: "512m" + labels: + version: 2.4.5 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py index 9fb617ca0d42..9c8c40de6558 100644 --- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -369,6 +369,53 @@ def test_create_application_from_yaml_json_and_use_name_from_operator_args( ) assert op.name.startswith("test-spark") + def test_create_application_from_yaml_json_and_use_name_task_id( + self, + mock_create_namespaced_crd, + mock_get_namespaced_custom_object_status, + mock_cleanup, + mock_create_job_name, + mock_get_kube_client, + mock_create_pod, + mock_await_pod_start, + mock_await_pod_completion, + mock_fetch_requested_container_logs, + data_file, + ): + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test_with_no_name_from_config.yaml").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="create_app_and_use_name_from_task_id", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("create_app_and_use_name_from_task_id") + + op = SparkKubernetesOperator( + application_file=data_file("spark/application_test_with_no_name_from_config.json").as_posix(), + kubernetes_conn_id="kubernetes_default_kube_config", + task_id="create_app_and_use_name_from_task_id", + ) + context = create_context(op) + op.execute(context) + TEST_APPLICATION_DICT["metadata"]["name"] = op.name + mock_create_namespaced_crd.assert_called_with( + body=TEST_APPLICATION_DICT, + group="sparkoperator.k8s.io", + namespace="default", + plural="sparkapplications", + version="v1beta2", + ) + assert op.name.startswith("create_app_and_use_name_from_task_id") + def test_new_template_from_yaml( self, mock_create_namespaced_crd, From d4c9a8537cd76e54f0362762b57587166bd941c9 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 27 Sep 2024 10:33:44 +0100 Subject: [PATCH 4/5] use set_name function in create_job --- .../providers/cncf/kubernetes/operators/spark_kubernetes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index e836c8b062ac..e67fe63e5f0b 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import re from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, Any @@ -166,7 +165,7 @@ def create_job_name(self): updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN) - return re.sub(r"[^a-z0-9-]+", "-", updated_name.lower()) + return self._set_name(updated_name.lower()) @staticmethod def _get_pod_identifying_label_string(labels) -> str: From 1630d9fad703f8bcb34b724551553055de365ae9 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 27 Sep 2024 13:10:51 +0100 Subject: [PATCH 5/5] remove lower --- airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index e67fe63e5f0b..9bcf46d0d4f5 100644 --- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -165,7 +165,7 @@ def create_job_name(self): updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN) - return self._set_name(updated_name.lower()) + return self._set_name(updated_name) @staticmethod def _get_pod_identifying_label_string(labels) -> str: