Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default_deferrable config #31712

Merged
merged 51 commits into from
Jul 5, 2023

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented Jun 5, 2023

What's changed

  1. add deferrable attribute to base operator
  2. add DEFAULT_DEFERRABLE for toggling deferrable mode globally
  3. change the default value of deferrable attribute in existing operators to DEFAULT_DEFERRABLE

Why

Many operators [1][2] can run in deferrable mode by setting deferrable attribute to true. Therefore, we suggest adding this attribute to the base operator. Additionally, we want to enable users to easily activate the deferrable mode for all tasks. To achieve this, we introduce the DEFAULT_DEFERRABLE configuration, which can be loaded from operators.default_deferrable in airflow.cfg or set through the environment variable AIRFLOW__OPERATORS__DEFAULT_DERFERRABLE. The default value is set to false to maintain the original behavior.

Backward compatibility

For users not using the latest airflow (which have DEFAULT_DEFERRABLE in airflow.models.abstractoperator), the following code is used.

try:
    from airflow.models.abstractoperator import DEFAULT_DEFERRABLE
except ImportError:
    DEFAULT_DEFERRABLE = conf.getboolean("operators", "default_deferrable", fallback=False)

Another solution I'm thinking of is

    def __init__(
        self,
        *,
        ...,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)

[1] List of operators with deferrable attribute
  1. TriggerDagRunOperator
  2. BatchOperator
  3. GlueJobOperator
  4. GlueCrawlerOperator
  5. RedshiftCreateClusterOperator
  6. RedshiftCreateClusterSnapshotOperator
  7. RedshiftResumeClusterOperator
  8. RedshiftPauseClusterOperator
  9. RedshiftDeleteClusterOperator
  10. SageMakerProcessingOperator
  11. SageMakerTransformOperator
  12. SageMakerTrainingOperator
  13. LivyOperator
  14. KubernetesPodOperator
  15. DatabricksSubmitRunOperator
  16. DatabricksRunNowOperator
  17. DbtCloudRunJobOperator
  18. BigQueryCheckOperator
  19. BigQueryValueCheckOperator
  20. BigQueryIntervalCheckOperator
  21. BigQueryGetDataOperator
  22. BigQueryInsertJobOperator
  23. BigQueryDataTransferServiceStartTransferRunsOperator
  24. CloudBuildCreateBuildOperator
  25. CloudComposerCreateEnvironmentOperator
  26. CloudComposerDeleteEnvironmentOperator
  27. CloudComposerUpdateEnvironmentOperator
  28. DataflowTemplatedJobStartOperator
  29. DataflowStartFlexTemplateOperator
  30. DataprocCreateClusterOperator
  31. DataprocDeleteClusterOperator
  32. DataprocJobBaseOperator
  33. DataprocInstantiateWorkflowTemplateOperator
  34. DataprocInstantiateInlineWorkflowTemplateOperator
  35. DataprocSubmitJobOperator
  36. DataprocUpdateClusterOperator
  37. DataprocCreateBatchOperator
  38. GKEDeleteClusterOperator
  39. GKECreateClusterOperator
  40. MLEngineStartTrainingJobOperator
  41. BigQueryToGCSOperator
  42. GCSToBigQueryOperator
  43. AzureDataFactoryRunPipelineOperator
[2] List of pull requests that introduce deferrable mode to existing operators
  1. Deferrable mode for EksCreateFargateProfileOperator and EksDeleteFargateProfileOperator #31657
  2. Add deferrable option to EmrTerminateJobFlowOperator #31646
  3. Add Deferrable option to EmrCreateJobFlowOperator #31641
  4. Add Deferrable switch to SnowflakeSqlApiOperator #31596
  5. Add deferrable mode to BeamRunPythonPipelineOperator #31471
  6. Add deferrable mode to PubsubPullSensor #31284
  7. Add deferrable param in EmrStepSensor #31048
  8. Add deferrable param in EmrJobFlowSensor #31045
  9. Add deferrable param in EmrContainerOperator #31035
  10. Add deferrable mode for S3KeySensor #31018
  11. Add deferrable param in EmrContainerSensor #30945
  12. Add Deferrable mode to Emr Add Steps operator #30928
  13. Add deferrable mode to CloudSQLExportInstanceOperator #30852
  14. Add custom waiters to EMR Serverless  #30463
  15. Add deferrable mode to BatchSensor  #30279
  16. Deferrable mode for S3ToGCSOperator #29462

cc @dstandish


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Lee-W Lee-W force-pushed the add-deferrable-attr-to-base-operator branch 4 times, most recently from d7d0c7d to 72b0b2e Compare June 6, 2023 09:43
@Lee-W Lee-W marked this pull request as ready for review June 6, 2023 10:26
@o-nikolas
Copy link
Contributor

CC @syedahsn

@Lee-W Lee-W force-pushed the add-deferrable-attr-to-base-operator branch from 2da9cd3 to c2bb2e7 Compare June 7, 2023 07:08
@uranusjr
Copy link
Member

uranusjr commented Jun 7, 2023

For users not using the latest airflow

Why is this needed? I think most operators have **kwargs and we can simply handle the default in BaseOperator.

@Lee-W
Copy link
Member Author

Lee-W commented Jun 7, 2023

For users not using the latest airflow

Why is this needed? I think most operators have **kwargs and we can simply handle the default in BaseOperator.

Because these operators already have their default value as False, which would overwrite the value handled in BaseOperator. Users passing this value as position arguments might be affected if we remove' deferrable' from these operators.

Take GlueCrawlerOperator as an example, if users call the op through

op = GlueCrawlerOperator(
    config,
    "default",
    None,
    5,
    True,
    True
)

this might not work.

@Lee-W
Copy link
Member Author

Lee-W commented Jun 7, 2023

I noticed the CI failure might not be related to this PR, and other PRs have similar errors. Wondering is this something I need to fix in this PR? Thanks

All Warning errors can be found in the /files/warnings-WWW-sqlite.txt file.
=========================== short test summary info ============================
FAILED tests/www/views/test_views.py::test_get_safe_url[ javascript:alert(1)-http://localhost:8080/ javascript:alert(1)] - AssertionError: assert '/home' == 'http://local...ript:alert(1)'
  - http://localhost:8080/ javascript:alert(1)
  + /home
=========== 1 failed, 609 passed, 895 warnings in 342.01s (0:05:42) ============
Number of warnings: 198 /files/warnings-WWW-sqlite.txt

@dstandish
Copy link
Contributor

I noticed the CI failure might not be related to this PR, and other PRs have similar errors. Wondering is this something I need to fix in this PR? Thanks

All Warning errors can be found in the /files/warnings-WWW-sqlite.txt file.
=========================== short test summary info ============================
FAILED tests/www/views/test_views.py::test_get_safe_url[ javascript:alert(1)-http://localhost:8080/ javascript:alert(1)] - AssertionError: assert '/home' == 'http://local...ript:alert(1)'
  - http://localhost:8080/ javascript:alert(1)
  + /home
=========== 1 failed, 609 passed, 895 warnings in 342.01s (0:05:42) ============
Number of warnings: 198 /files/warnings-WWW-sqlite.txt

i've seen this too. i think main is broken

@potiuk
Copy link
Member

potiuk commented Jun 7, 2023

Fixed in #31766 - it was caused by a (security) fix - backwards incompatible in Python 3.8.17 and 3.9.17 released last night. Very rare case to hit.

@potiuk
Copy link
Member

potiuk commented Jun 7, 2023

You can rebase.

airflow/models/abstractoperator.py Outdated Show resolved Hide resolved
airflow/serialization/schema.json Outdated Show resolved Hide resolved
airflow/providers/microsoft/azure/sensors/wasb.py Outdated Show resolved Hide resolved
@Lee-W Lee-W force-pushed the add-deferrable-attr-to-base-operator branch from c2bb2e7 to b65f66e Compare June 8, 2023 02:29
@Lee-W Lee-W requested a review from XD-DENG as a code owner June 8, 2023 08:58
Lee-W added 19 commits July 5, 2023 09:22
This reverts commit c38c84921ef240abe073fd38251a4aef570fcabf.
…r docstring"

This reverts commit 128ef7fe3bbabbc75769339c00d7842486c08146.
…lse captialization to False"

This reverts commit adaf5ca52eda4f8a7303dabd150c6a97839a53a9.
…to False for waiter and hooks"

This reverts commit bdac07b635b96ad1d6ebd2113512fb953d622a80.
This reverts commit 0129a0e9606c5e85bf6939386ce910b636b721e9.
…deferrable attribute in BaseDeferrableOperator is the same as DEFAULT_DEFERRABLE"

This reverts commit bb6ca2105659cd6a5107e2913599655f491e6de8.
…eDeferrableOperator with empty execute method implementation"

This reverts commit a2d358f9bd20d3c010d1fa8efdcb7d2bfd546e42.
…o baseoperator"

This reverts commit 3f8c72e27496bb87abb3fa7a6715c63fd3acb2ea.
…ferrableOperator"

This reverts commit 9f85338a4eb49289e896f5844fd0d67897dafbd5.
…r for TriggerDagRunOperator"

This reverts commit 838fa28e7888937169d938143ee57fbafa41abf8.
…erator as the default value of deferabl"

This reverts commit a17afa02e5b6fd9bf4046be4188e229e12c72060.
…or class for operator with deferrable attribute and load this value from config"

This reverts commit 26404c231d55efe4466090a29a37a6e85316d690.
…e", fallback=False)" as the default deferrable value
@Lee-W Lee-W force-pushed the add-deferrable-attr-to-base-operator branch from 03415b8 to b9f870e Compare July 5, 2023 01:24
@Lee-W
Copy link
Member Author

Lee-W commented Jul 5, 2023

Conflict resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants