Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dag): avoid getting dataset next run info for unresolved dataset alias #41828

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3274,7 +3274,10 @@ def calculate_dagrun_date_fields(
def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None:
if self.dataset_expression is None:
return None
return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]

# When a dataset alias does not resolve into datasets, get_dataset_triggered_next_run_info returns
# an empty dict as there's no dataset info to get. This method should thus return None.
return get_dataset_triggered_next_run_info([self.dag_id], session=session).get(self.dag_id, None)


# NOTE: Please keep the list of arguments in sync with DAG.__init__.
Expand Down
4 changes: 3 additions & 1 deletion airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
:meta private:
"""

UNRESOLVED_ALIAS_SUMMARY = "Unresolved DatasetAlias"

description: str = "Triggered by datasets"

def __init__(self, datasets: BaseDataset) -> None:
Expand All @@ -170,7 +172,7 @@ def __init__(self, datasets: BaseDataset) -> None:
self.dataset_condition = _DatasetAliasCondition(self.dataset_condition.name)

if not next(self.dataset_condition.iter_datasets(), False):
self._summary = "Unresolved DatasetAlias"
self._summary = DatasetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY
else:
self._summary = "Dataset"

Expand Down
16 changes: 16 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3433,6 +3433,22 @@ def test_get_dataset_triggered_next_run_info(dag_maker, clear_datasets):
}


@pytest.mark.need_serialized_dag
def test_get_dataset_triggered_next_run_info_with_unresolved_dataset_alias(dag_maker, clear_datasets):
dataset_alias1 = DatasetAlias(name="alias")
with dag_maker(dag_id="dag-1", schedule=[dataset_alias1]):
pass
dag1 = dag_maker.dag
session = dag_maker.session
session.flush()

info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
assert info == {}

dag1_model = DagModel.get_dagmodel(dag1.dag_id)
assert dag1_model.get_dataset_triggered_next_run_info(session=session) is None


def test_dag_uses_timetable_for_run_id(session):
class CustomRunIdTimetable(Timetable):
def generate_run_id(self, *, run_type, logical_date, data_interval, **extra) -> str:
Expand Down