From 0f0ddc9d7408ef3a0a9e50354f0c228cf4919872 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 5 Jun 2022 18:49:17 +0200 Subject: [PATCH] Fix backwards-compatibility introduced by fixing mypy problems There was a backwards-incompatibility introduced by #23716 in two providers by using get_mandatory_value config method. This PR corrects that backwards compatibility and updates 2.1 compatibility pre-commit to check for forbidden usage of get_mandatory_value. --- airflow/providers/apache/spark/hooks/spark_submit.py | 5 ++++- airflow/providers/qubole/hooks/qubole.py | 5 ++++- .../pre_commit/pre_commit_check_2_1_compatibility.py | 10 ++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index 0f5dc2f7307c..b3bebcb49546 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -632,7 +632,10 @@ def on_kill(self) -> None: # we still attempt to kill the yarn application renew_from_kt(self._principal, self._keytab, exit_on_fail=False) env = os.environ.copy() - env["KRB5CCNAME"] = airflow_conf.get_mandatory_value('kerberos', 'ccache') + ccacche = airflow_conf.get('kerberos', 'ccache') + if ccacche is None: + raise ValueError("The kerberos/ccache config should be set here!") + env["KRB5CCNAME"] = ccacche with subprocess.Popen( kill_cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py index 7896fbd35280..3b0d4bdd1a5f 100644 --- a/airflow/providers/qubole/hooks/qubole.py +++ b/airflow/providers/qubole/hooks/qubole.py @@ -227,7 +227,10 @@ def get_results( """ if fp is None: iso = datetime.datetime.utcnow().isoformat() - logpath = os.path.expanduser(conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER')) + base_log_folder = conf.get('logging', 'BASE_LOG_FOLDER') + if base_log_folder is None: + raise ValueError("logging/BASE_LOG_FOLDER config value should be set") + logpath = os.path.expanduser(base_log_folder) resultpath = logpath + '/' + self.dag_id + '/' + self.task_id + '/results' pathlib.Path(resultpath).mkdir(parents=True, exist_ok=True) fp = open(resultpath + '/' + iso, 'wb') diff --git a/scripts/ci/pre_commit/pre_commit_check_2_1_compatibility.py b/scripts/ci/pre_commit/pre_commit_check_2_1_compatibility.py index 6a5fddb24c8d..9c3d7628ab17 100755 --- a/scripts/ci/pre_commit/pre_commit_check_2_1_compatibility.py +++ b/scripts/ci/pre_commit/pre_commit_check_2_1_compatibility.py @@ -36,6 +36,7 @@ GET_ATTR_MATCHER = re.compile(r".*getattr\((ti|TI), ['\"]run_id['\"]\).*") TI_RUN_ID_MATCHER = re.compile(r".*(ti|TI)\.run_id.*") TRY_NUM_MATCHER = re.compile(r".*context.*\[[\"']try_number[\"']].*") +GET_MANDATORY_MATCHER = re.compile(r".*conf\.get_mandatory_value") def _check_file(_file: Path): @@ -91,6 +92,15 @@ def _check_file(_file: Path): f"as it is not available in Airflow 2.2[/]" ) + if GET_MANDATORY_MATCHER.match(line): + errors.append( + f"[red]In {_file}:{index} there is a forbidden construct " + f"(Airflow 2.3+ only):[/]\n\n" + f"{lines[index]}\n\n" + f"[yellow]You should not use conf.get_mandatory_value " + f"as it is not available in Airflow 2.2[/]" + ) + if __name__ == '__main__': for file in sys.argv[1:]: