diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 0fde8362..856b841c 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -640,32 +640,58 @@ def make_task_groups( if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"): # https://github.com/apache/airflow/pull/16557 if isinstance(task_group_conf.get("default_args"), dict): - if utils.check_dict_key(task_group_conf["default_args"], "on_success_callback"): - if isinstance(task_group_conf["default_args"]["on_success_callback"], str): + if utils.check_dict_key( + task_group_conf["default_args"], "on_success_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_success_callback"], + str, + ): task_group_conf["default_args"][ "on_success_callback" ]: Callable = import_string( - task_group_conf["default_args"]["on_success_callback"] + task_group_conf["default_args"][ + "on_success_callback" + ] ) - if utils.check_dict_key(task_group_conf["default_args"], "on_execute_callback"): - if isinstance(task_group_conf["default_args"]["on_execute_callback"], str): + if utils.check_dict_key( + task_group_conf["default_args"], "on_execute_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_execute_callback"], + str, + ): task_group_conf["default_args"][ "on_execute_callback" ]: Callable = import_string( - task_group_conf["default_args"]["on_execute_callback"] + task_group_conf["default_args"][ + "on_execute_callback" + ] ) - if utils.check_dict_key(task_group_conf["default_args"], "on_failure_callback"): - if isinstance(task_group_conf["default_args"]["on_failure_callback"], str): + if utils.check_dict_key( + task_group_conf["default_args"], "on_failure_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_failure_callback"], + str, + ): task_group_conf["default_args"][ "on_failure_callback" ]: Callable = import_string( - task_group_conf["default_args"]["on_failure_callback"] + task_group_conf["default_args"][ + "on_failure_callback" + ] ) - if utils.check_dict_key(task_group_conf["default_args"], "on_retry_callback"): - if isinstance(task_group_conf["default_args"]["on_retry_callback"], str): + if utils.check_dict_key( + task_group_conf["default_args"], "on_retry_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_retry_callback"], + str, + ): task_group_conf["default_args"][ "on_retry_callback" ]: Callable = import_string(