Skip to content

Commit

Permalink
Merge pull request #3 from Baraldo/feat/cast-task-group-callbacks
Browse files Browse the repository at this point in the history
fmt
  • Loading branch information
Baraldo authored Sep 22, 2023
2 parents 464cd54 + f292883 commit 9413d69
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,32 +640,58 @@ def make_task_groups(
if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"):
# https://github.com/apache/airflow/pull/16557
if isinstance(task_group_conf.get("default_args"), dict):
if utils.check_dict_key(task_group_conf["default_args"], "on_success_callback"):
if isinstance(task_group_conf["default_args"]["on_success_callback"], str):
if utils.check_dict_key(
task_group_conf["default_args"], "on_success_callback"
):
if isinstance(
task_group_conf["default_args"]["on_success_callback"],
str,
):
task_group_conf["default_args"][
"on_success_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_success_callback"]
task_group_conf["default_args"][
"on_success_callback"
]
)

if utils.check_dict_key(task_group_conf["default_args"], "on_execute_callback"):
if isinstance(task_group_conf["default_args"]["on_execute_callback"], str):
if utils.check_dict_key(
task_group_conf["default_args"], "on_execute_callback"
):
if isinstance(
task_group_conf["default_args"]["on_execute_callback"],
str,
):
task_group_conf["default_args"][
"on_execute_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_execute_callback"]
task_group_conf["default_args"][
"on_execute_callback"
]
)

if utils.check_dict_key(task_group_conf["default_args"], "on_failure_callback"):
if isinstance(task_group_conf["default_args"]["on_failure_callback"], str):
if utils.check_dict_key(
task_group_conf["default_args"], "on_failure_callback"
):
if isinstance(
task_group_conf["default_args"]["on_failure_callback"],
str,
):
task_group_conf["default_args"][
"on_failure_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_failure_callback"]
task_group_conf["default_args"][
"on_failure_callback"
]
)

if utils.check_dict_key(task_group_conf["default_args"], "on_retry_callback"):
if isinstance(task_group_conf["default_args"]["on_retry_callback"], str):
if utils.check_dict_key(
task_group_conf["default_args"], "on_retry_callback"
):
if isinstance(
task_group_conf["default_args"]["on_retry_callback"],
str,
):
task_group_conf["default_args"][
"on_retry_callback"
]: Callable = import_string(
Expand Down

0 comments on commit 9413d69

Please sign in to comment.