Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ending the task directly from the triggerer without going into the worker. #40084

Merged
merged 6 commits into from
Jul 25, 2024

Conversation

sunank200
Copy link
Collaborator

@sunank200 sunank200 commented Jun 6, 2024

Why

For some operators such as DateTimeSensorAsync , TimeDeltaSensorAsync and TimeSensorAsync it would be better if we could terminate the task without needing a worker.

What

Introduce BaseTaskEndEvent to end the task without resuming on the worker.

Triggers have two options: they can either send execution back to the task or end the task instance directly. If the trigger ends the task instance, the method_name should be None. Otherwise, provide the method_name to be used when resuming the execution in the task.

Previously, we only paid attention to failure requests in _execute_task_callbacks. Now, we also execute any given callback type directly. In this scenario, the callback is submitted remotely, so we assume there's no need to change the state; we simply run the callback.

Note: Exiting from the trigger works only when listeners are not integrated for the deferrable operator. To use this feature, unregister the listeners, and set end_from_trigger=True on the given operator.

Introduce end_from_trigger in each Async operator which end the task directly from the triggerer without going into the worker. For example:

with DAG(
    "async_dag",
) as dag:
    async_date_time_sensor = DateTimeSensorAsync(
        task_id="async_date_time_sensor",
        target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=1) }}""",
        end_from_trigger=True,
    )

    async_time_delta = TimeDeltaSensorAsync(
        task_id="async_time_delta",
        delta=datetime.timedelta(minutes=1),
        end_from_trigger=False,
    )

    async_time_sensor = TimeSensorAsync(
        task_id="async_time_sensor",
        timeout=1,
        soft_fail=True,
        target_time=(datetime.datetime.now(tz=dt.timezone.utc) + datetime.timedelta(minutes=1)).time(),
        end_from_trigger=True,
    )

By default end_from_trigger is False.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:core-operators Operators, Sensors and hooks within Core Airflow area:Scheduler including HA (high availability) scheduler area:Triggerer labels Jun 6, 2024
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/triggers/temporal.py Outdated Show resolved Hide resolved
@sunank200 sunank200 force-pushed the terminal_state_from_triggerer branch from 1f22222 to 2015062 Compare June 7, 2024 06:37
@uranusjr uranusjr changed the title Make task to go to terminal state from triggerer without needing a worker Make task go to terminal state from triggerer without needing a worker Jun 7, 2024
@sunank200 sunank200 force-pushed the terminal_state_from_triggerer branch from 2015062 to e5c34b8 Compare June 12, 2024 05:37
@sunank200 sunank200 marked this pull request as ready for review June 12, 2024 06:16
@sunank200 sunank200 requested review from utkarsharma2, potiuk, pankajkoti, pankajastro, phanikumv and kaxil and removed request for kaxil June 12, 2024 06:19
@jedcunningham
Copy link
Member

Does this work with listeners?

airflow/sensors/date_time.py Outdated Show resolved Hide resolved
airflow/sensors/time_sensor.py Outdated Show resolved Hide resolved
@sunank200 sunank200 requested a review from Lee-W June 13, 2024 07:59
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
airflow/models/baseoperator.py Outdated Show resolved Hide resolved
airflow/triggers/base.py Outdated Show resolved Hide resolved
airflow/triggers/base.py Outdated Show resolved Hide resolved
airflow/triggers/base.py Outdated Show resolved Hide resolved
airflow/triggers/temporal.py Outdated Show resolved Hide resolved
tests/sensors/test_time_sensor.py Show resolved Hide resolved
airflow/triggers/base.py Show resolved Hide resolved
airflow/triggers/base.py Show resolved Hide resolved
@tirkarthi
Copy link
Contributor

Related : #31718

@Lee-W
Copy link
Member

Lee-W commented Jul 24, 2024

158 files change does seem right 🤔 we probably need to rebase again

@sunank200
Copy link
Collaborator Author

158 files change does seem right 🤔 we probably need to rebase again

Yes rebasing again

@sunank200 sunank200 force-pushed the terminal_state_from_triggerer branch from 8b86d00 to d42afd2 Compare July 24, 2024 11:48
@sunank200 sunank200 requested a review from Lee-W July 24, 2024 12:25
@Lee-W
Copy link
Member

Lee-W commented Jul 25, 2024

As this is approved by many, I'm planning on merging it within the next hour. Let me know if anyone still want to take a look

@Lee-W Lee-W merged commit 0fba616 into apache:main Jul 25, 2024
48 checks passed
@Lee-W Lee-W deleted the terminal_state_from_triggerer branch July 25, 2024 03:27
@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Jul 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:Scheduler including HA (high availability) scheduler area:Triggerer type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.