diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 856b841c..44336a10 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -637,66 +637,61 @@ def make_task_groups( task_group_conf["group_id"] = task_group_name task_group_conf["dag"] = dag - if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"): + if version.parse(AIRFLOW_VERSION) >= version.parse( + "2.2.0" + ) and isinstance(task_group_conf.get("default_args"), dict): # https://github.com/apache/airflow/pull/16557 - if isinstance(task_group_conf.get("default_args"), dict): - if utils.check_dict_key( - task_group_conf["default_args"], "on_success_callback" + if utils.check_dict_key( + task_group_conf["default_args"], "on_success_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_success_callback"], + str, ): - if isinstance( - task_group_conf["default_args"]["on_success_callback"], - str, - ): - task_group_conf["default_args"][ - "on_success_callback" - ]: Callable = import_string( - task_group_conf["default_args"][ - "on_success_callback" - ] - ) - - if utils.check_dict_key( - task_group_conf["default_args"], "on_execute_callback" + task_group_conf["default_args"][ + "on_success_callback" + ]: Callable = import_string( + task_group_conf["default_args"]["on_success_callback"] + ) + + if utils.check_dict_key( + task_group_conf["default_args"], "on_execute_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_execute_callback"], + str, ): - if isinstance( - task_group_conf["default_args"]["on_execute_callback"], - str, - ): - task_group_conf["default_args"][ - "on_execute_callback" - ]: Callable = import_string( - task_group_conf["default_args"][ - "on_execute_callback" - ] - ) - - if utils.check_dict_key( - task_group_conf["default_args"], "on_failure_callback" + task_group_conf["default_args"][ + "on_execute_callback" + ]: Callable = import_string( + task_group_conf["default_args"]["on_execute_callback"] + ) + + if utils.check_dict_key( + task_group_conf["default_args"], "on_failure_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_failure_callback"], + str, ): - if isinstance( - task_group_conf["default_args"]["on_failure_callback"], - str, - ): - task_group_conf["default_args"][ - "on_failure_callback" - ]: Callable = import_string( - task_group_conf["default_args"][ - "on_failure_callback" - ] - ) - - if utils.check_dict_key( - task_group_conf["default_args"], "on_retry_callback" + task_group_conf["default_args"][ + "on_failure_callback" + ]: Callable = import_string( + task_group_conf["default_args"]["on_failure_callback"] + ) + + if utils.check_dict_key( + task_group_conf["default_args"], "on_retry_callback" + ): + if isinstance( + task_group_conf["default_args"]["on_retry_callback"], + str, ): - if isinstance( - task_group_conf["default_args"]["on_retry_callback"], - str, - ): - task_group_conf["default_args"][ - "on_retry_callback" - ]: Callable = import_string( - task_group_conf["default_args"]["on_retry_callback"] - ) + task_group_conf["default_args"][ + "on_retry_callback" + ]: Callable = import_string( + task_group_conf["default_args"]["on_retry_callback"] + ) task_group = TaskGroup( **{