Skip to content

Commit

Permalink
Deprecate *args and **kwargs in BaseOperator (and test)
Browse files Browse the repository at this point in the history
BaseOperator silently accepts any arguments. This deprecates the
behavior with a warning that says it will be forbidden in Airflow 2.0.

This PR also turns on DeprecationWarnings by default, which in turn
revealed that inspect.getargspec is deprecated. Here it is replaced by
`inspect.signature` (Python 3) or `funcsigs.signature` (Python 2).

Lastly, this brought to attention that example_http_operator was
passing an illegal argument.

Add unit test
  • Loading branch information
jlowin committed Apr 5, 2016
1 parent ab1c90b commit 5d959f6
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 13 deletions.
19 changes: 14 additions & 5 deletions UPDATING.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# Updating Airflow

This file aims to document the backwards-incompatible changes in Airflow and
assist people with migrating to a new version.
This file documents any backwards-incompatible changes in Airflow and
assists people when migrating to a new version.

## 1.7 to 1.8

### DAGs now don't start automatically when created
## Airflow 1.8

To retain the old behavior, add this to your configuration:
### Changes to Behavior

#### New DAGs are paused by default

Previously, new DAGs would be scheduled immediately. To retain the old behavior, add this to airflow.cfg:

```
[core]
dags_are_paused_at_creation = False
```

### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0

#### Operators no longer accept arbitrary arguments
Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without complaint. Now, invalid arguments will be rejected.
4 changes: 4 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import subprocess
import warnings

from future import standard_library
standard_library.install_aliases()
Expand All @@ -16,6 +17,9 @@
from collections import OrderedDict
from configparser import ConfigParser

# show DeprecationWarning and PendingDeprecationWarning
warnings.simplefilter('default', DeprecationWarning)
warnings.simplefilter('default', PendingDeprecationWarning)

class AirflowConfigException(Exception):
pass
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_http_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

sensor = HttpSensor(
task_id='http_sensor_check',
conn_id='http_default',
http_conn_id='http_default',
endpoint='',
params={},
response_check=lambda response: True if "Google" in response.content else False,
Expand Down
12 changes: 12 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import socket
import sys
import traceback
import warnings
from urllib.parse import urlparse

from sqlalchemy import (
Expand Down Expand Up @@ -1597,6 +1598,17 @@ def __init__(
*args,
**kwargs):

if args or kwargs:
# TODO remove *args and **kwargs in Airflow 2.0
warnings.warn(
'Invalid arguments were passed to {c}. Support for '
'passing such arguments will be dropped in Airflow 2.0.'
'Invalid arguments were:'
'\n*args: {a}\n**kwargs: {k}'.format(
c=self.__class__.__name__, a=args, k=kwargs),
category=PendingDeprecationWarning
)

validate_key(task_id)
self.dag_id = dag.dag_id if dag else 'adhoc_' + owner
self.task_id = task_id
Expand Down
24 changes: 17 additions & 7 deletions airflow/utils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import os

# inspect.signature is only available in Python 3. funcsigs.signature is
# a backport.
try:
import inspect
signature = inspect.signature
except AttributeError:
import funcsigs
signature = funcsigs.signature

from copy import copy
from functools import wraps

Expand Down Expand Up @@ -57,12 +65,14 @@ def wrapper(*args, **kwargs):

dag_args.update(default_args)
default_args = dag_args
arg_spec = inspect.getargspec(func)
num_defaults = len(arg_spec.defaults) if arg_spec.defaults else 0
non_optional_args = arg_spec.args[:-num_defaults]
if 'self' in non_optional_args:
non_optional_args.remove('self')
for arg in func.__code__.co_varnames:

sig = signature(func)
non_optional_args = [
name for (name, param) in sig.parameters.items()
if param.default == param.empty and
param.name != 'self' and
param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)]
for arg in sig.parameters:
if arg in default_args and arg not in kwargs:
kwargs[arg] = default_args[arg]
missing_args = list(set(non_optional_args) - set(kwargs))
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def run(self):
'flask-cache>=0.13.1, <0.14',
'flask-login==0.2.11',
'future>=0.15.0, <0.16',
'funcsigs>=0.4, <1'
'gunicorn>=19.3.0, <19.4.0', # 19.4.? seemed to have issues
'jinja2>=2.7.3, <3.0',
'markdown>=2.5.2, <3.0',
Expand Down
17 changes: 17 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from email.mime.application import MIMEApplication
import signal
from time import sleep
import warnings

from dateutil.relativedelta import relativedelta

Expand Down Expand Up @@ -320,6 +321,22 @@ def test_clear_api(self):
ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.are_dependents_done()

def test_illegal_args(self):
"""
Tests that Operators reject illegal arguments
"""
with warnings.catch_warnings(record=True) as w:
t = operators.BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
illegal_argument_1234='hello?')
self.assertTrue(
issubclass(w[0].category, PendingDeprecationWarning))
self.assertIn(
'Invalid arguments were passed to BashOperator.',
w[0].message.args[0])

def test_bash_operator(self):
t = operators.BashOperator(
task_id='time_sensor_check',
Expand Down

0 comments on commit 5d959f6

Please sign in to comment.