Skip to content

Commit

Permalink
Merge pull request #5 from Baraldo/feat/cast-task-group-callbacks
Browse files Browse the repository at this point in the history
lint
  • Loading branch information
Baraldo authored Sep 25, 2023
2 parents 2bcc0c4 + 17c98ac commit 7b7128f
Showing 1 changed file with 50 additions and 55 deletions.
105 changes: 50 additions & 55 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,66 +637,61 @@ def make_task_groups(
task_group_conf["group_id"] = task_group_name
task_group_conf["dag"] = dag

if version.parse(AIRFLOW_VERSION) >= version.parse("2.2.0"):
if version.parse(AIRFLOW_VERSION) >= version.parse(
"2.2.0"
) and isinstance(task_group_conf.get("default_args"), dict):
# https://github.com/apache/airflow/pull/16557
if isinstance(task_group_conf.get("default_args"), dict):
if utils.check_dict_key(
task_group_conf["default_args"], "on_success_callback"
if utils.check_dict_key(
task_group_conf["default_args"], "on_success_callback"
):
if isinstance(
task_group_conf["default_args"]["on_success_callback"],
str,
):
if isinstance(
task_group_conf["default_args"]["on_success_callback"],
str,
):
task_group_conf["default_args"][
"on_success_callback"
]: Callable = import_string(
task_group_conf["default_args"][
"on_success_callback"
]
)

if utils.check_dict_key(
task_group_conf["default_args"], "on_execute_callback"
task_group_conf["default_args"][
"on_success_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_success_callback"]
)

if utils.check_dict_key(
task_group_conf["default_args"], "on_execute_callback"
):
if isinstance(
task_group_conf["default_args"]["on_execute_callback"],
str,
):
if isinstance(
task_group_conf["default_args"]["on_execute_callback"],
str,
):
task_group_conf["default_args"][
"on_execute_callback"
]: Callable = import_string(
task_group_conf["default_args"][
"on_execute_callback"
]
)

if utils.check_dict_key(
task_group_conf["default_args"], "on_failure_callback"
task_group_conf["default_args"][
"on_execute_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_execute_callback"]
)

if utils.check_dict_key(
task_group_conf["default_args"], "on_failure_callback"
):
if isinstance(
task_group_conf["default_args"]["on_failure_callback"],
str,
):
if isinstance(
task_group_conf["default_args"]["on_failure_callback"],
str,
):
task_group_conf["default_args"][
"on_failure_callback"
]: Callable = import_string(
task_group_conf["default_args"][
"on_failure_callback"
]
)

if utils.check_dict_key(
task_group_conf["default_args"], "on_retry_callback"
task_group_conf["default_args"][
"on_failure_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_failure_callback"]
)

if utils.check_dict_key(
task_group_conf["default_args"], "on_retry_callback"
):
if isinstance(
task_group_conf["default_args"]["on_retry_callback"],
str,
):
if isinstance(
task_group_conf["default_args"]["on_retry_callback"],
str,
):
task_group_conf["default_args"][
"on_retry_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_retry_callback"]
)
task_group_conf["default_args"][
"on_retry_callback"
]: Callable = import_string(
task_group_conf["default_args"]["on_retry_callback"]
)

task_group = TaskGroup(
**{
Expand Down

0 comments on commit 7b7128f

Please sign in to comment.