Skip to content

Commit

Permalink
Unify DAG schedule args and change default to None (#41453)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored Aug 26, 2024
1 parent ef8f349 commit e8a5996
Show file tree
Hide file tree
Showing 50 changed files with 356 additions and 771 deletions.
25 changes: 9 additions & 16 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3196,14 +3196,20 @@ components:
description: >
User-provided DAG description, which can consist of several sentences or paragraphs that
describe DAG contents.
schedule_interval:
$ref: "#/components/schemas/ScheduleInterval"
timetable_summary:
type: string
readOnly: true
nullable: true
description: |
Timetable summary.
*New in version 3.0.0*
timetable_description:
type: string
readOnly: true
nullable: true
description: |
Timetable/Schedule Interval description.
Timetable description.
*New in version 2.3.0*
tags:
Expand Down Expand Up @@ -5190,19 +5196,6 @@ components:
*New in version 3.0.0*
# Common data type
ScheduleInterval:
description: |
Schedule interval. Defines how often DAG runs, this object gets added to your latest task instance's
execution_date to figure out the next schedule.
nullable: true
readOnly: true
anyOf:
- $ref: "#/components/schemas/TimeDelta"
- $ref: "#/components/schemas/RelativeDelta"
- $ref: "#/components/schemas/CronExpression"
discriminator:
propertyName: __type

TimeDelta:
description: Time delta
type: object
Expand Down
37 changes: 0 additions & 37 deletions airflow/api_connexion/schemas/common_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import marshmallow
from dateutil import relativedelta
from marshmallow import Schema, fields, validate
from marshmallow_oneofschema import OneOfSchema

from airflow.models.mappedoperator import MappedOperator
from airflow.serialization.serialized_objects import SerializedBaseOperator
Expand Down Expand Up @@ -90,42 +89,6 @@ def make_cron_expression(self, data, **kwargs):
return CronExpression(data["value"])


class ScheduleIntervalSchema(OneOfSchema):
"""
Schedule interval.
It supports the following types:
* TimeDelta
* RelativeDelta
* CronExpression
"""

type_field = "__type"
type_schemas = {
"TimeDelta": TimeDeltaSchema,
"RelativeDelta": RelativeDeltaSchema,
"CronExpression": CronExpressionSchema,
}

def _dump(self, obj, update_fields=True, **kwargs):
if isinstance(obj, str):
obj = CronExpression(obj)

return super()._dump(obj, update_fields=update_fields, **kwargs)

def get_obj_type(self, obj):
"""Select schema based on object type."""
if isinstance(obj, datetime.timedelta):
return "TimeDelta"
elif isinstance(obj, relativedelta.relativedelta):
return "RelativeDelta"
elif isinstance(obj, CronExpression):
return "CronExpression"
else:
raise TypeError(f"Unknown object type: {obj.__class__.__name__}")


class ColorField(fields.String):
"""Schema for color property."""

Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.api_connexion.schemas.common_schema import ScheduleIntervalSchema, TimeDeltaSchema, TimezoneField
from airflow.api_connexion.schemas.common_schema import TimeDeltaSchema, TimezoneField
from airflow.configuration import conf
from airflow.models.dag import DagModel, DagTag

Expand Down Expand Up @@ -63,7 +63,7 @@ class Meta:
file_token = fields.Method("get_token", dump_only=True)
owners = fields.Method("get_owners", dump_only=True)
description = auto_field(dump_only=True)
schedule_interval = fields.Nested(ScheduleIntervalSchema)
timetable_summary = auto_field(dump_only=True)
timetable_description = auto_field(dump_only=True)
tags = fields.List(fields.Nested(DagTagSchema), dump_only=True)
max_active_tasks = auto_field(dump_only=True)
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"file_token": None,
"owners": dag.owner,
"description": dag.description,
"schedule_interval": dag.schedule_interval,
"timetable_summary": dag.timetable.summary,
"timetable_description": dag.timetable.description,
"tags": dag.tags,
"max_active_tasks": dag.max_active_tasks,
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,7 @@ scheduler:
allow_trigger_in_future:
description: |
Allow externally triggered DagRuns for Execution Dates in the future
Only has effect if schedule_interval is set to None in DAG
Only has effect if schedule is set to None in DAG
version_added: 1.10.8
type: boolean
example: ~
Expand Down
4 changes: 0 additions & 4 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,6 @@ def __str__(self) -> str:
return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"


class AirflowDagInconsistent(AirflowException):
"""Raise when a DAG has inconsistent attributes."""


class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in DAG definition."""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Rename DagModel schedule_interval to timetable_summary.
Revision ID: 0bfc26bc256e
Revises: d0f1c55954fa
Create Date: 2024-08-15 06:24:14.363316
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0bfc26bc256e"
down_revision = "d0f1c55954fa"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Rename DagModel schedule_interval to timetable_summary."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column(
"schedule_interval",
new_column_name="timetable_summary",
type_=sa.Text,
nullable=True,
)


def downgrade():
"""Rename timetable_summary back to schedule_interval."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column(
"timetable_summary",
new_column_name="schedule_interval",
type_=sa.Text,
nullable=True,
)
7 changes: 2 additions & 5 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

import attr
import pendulum
from dateutil.relativedelta import relativedelta
from sqlalchemy import select
from sqlalchemy.orm.exc import NoResultFound

Expand Down Expand Up @@ -120,8 +119,6 @@
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import ArgNotSet

ScheduleInterval = Union[str, timedelta, relativedelta]

TaskPreExecuteHook = Callable[[Context], None]
TaskPostExecuteHook = Callable[[Context, Any], None]

Expand Down Expand Up @@ -612,10 +609,10 @@ class derived from this one results in the creation of a task object,
:param start_date: The ``start_date`` for the task, determines
the ``execution_date`` for the first task instance. The best practice
is to have the start_date rounded
to your DAG's ``schedule_interval``. Daily jobs have their start_date
to your DAG's schedule. Daily jobs have their start_date
some day at 00:00:00, hourly jobs have their start_date at 00:00
of a specific hour. Note that Airflow simply looks at the latest
``execution_date`` and adds the ``schedule_interval`` to determine
``execution_date`` and adds the schedule to determine
the next ``execution_date``. It is also very important
to note that different tasks' dependencies
need to line up in time. If task A depends on task B and their
Expand Down
Loading

0 comments on commit e8a5996

Please sign in to comment.