From 259c193fd227f3b8deae048e58c474e3352ac552 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Sun, 26 Jun 2022 10:13:03 +0300 Subject: [PATCH 01/22] adding SqlToSlackOperator --- .../presto/transfers/presto_to_slack.py | 88 +++------- airflow/providers/slack/provider.yaml | 6 + airflow/providers/slack/transfers/__init__.py | 16 ++ .../providers/slack/transfers/sql_to_slack.py | 166 ++++++++++++++++++ .../snowflake/transfers/snowflake_to_slack.py | 94 +++------- docs/apache-airflow-providers-slack/index.rst | 10 ++ .../operators/sql_to_slack.rst | 38 ++++ .../presto/transfers/test_presto_to_slack.py | 45 +++-- tests/providers/slack/transfers/__init__.py | 17 ++ .../slack/transfers/test_sql_to_slack.py | 113 ++++++++++++ .../transfers/test_snowflake_to_slack.py | 43 +++-- tests/system/providers/slack/__init__.py | 16 ++ .../providers/slack/example_sql_to_slack.py | 55 ++++++ 13 files changed, 547 insertions(+), 160 deletions(-) create mode 100644 airflow/providers/slack/transfers/__init__.py create mode 100644 airflow/providers/slack/transfers/sql_to_slack.py create mode 100644 docs/apache-airflow-providers-slack/operators/sql_to_slack.rst create mode 100644 tests/providers/slack/transfers/__init__.py create mode 100644 tests/providers/slack/transfers/test_sql_to_slack.py create mode 100644 tests/system/providers/slack/__init__.py create mode 100644 tests/system/providers/slack/example_sql_to_slack.py diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index 6dd0ecb3ab137..647a8d1ce7d58 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -15,21 +15,17 @@ # specific language governing permissions and limitations # under the License. +import warnings from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union -from pandas import DataFrame -from tabulate import tabulate - from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.presto.hooks.presto import PrestoHook -from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator if TYPE_CHECKING: - from airflow.utils.context import Context + pass -class PrestoToSlackOperator(BaseOperator): +class PrestoToSlackOperator(SqlToSlackOperator): """ Executes a single SQL statement in Presto and sends the results to Slack. The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{ @@ -47,11 +43,12 @@ class PrestoToSlackOperator(BaseOperator): You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the SQL results :param presto_conn_id: destination presto connection - :param slack_conn_id: The connection id for Slack + :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_token' :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query :param slack_token: The token to use to authenticate to Slack. If this is not provided, the - 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id + 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id.py + Mutually exclusive with 'slack_conn_id' :param slack_channel: The channel to send message. Override default from Slack connection. """ @@ -66,14 +63,13 @@ def __init__( sql: str, slack_message: str, presto_conn_id: str = 'presto_default', - slack_conn_id: str = 'slack_default', + slack_conn_id: Optional[str] = None, results_df_name: str = 'results_df', parameters: Optional[Union[Iterable, Mapping]] = None, slack_token: Optional[str] = None, slack_channel: Optional[str] = None, **kwargs, ) -> None: - super().__init__(**kwargs) self.presto_conn_id = presto_conn_id self.sql = sql @@ -84,58 +80,24 @@ def __init__( self.results_df_name = results_df_name self.slack_channel = slack_channel - def _get_query_results(self) -> DataFrame: - presto_hook = self._get_presto_hook() - - self.log.info('Running SQL query: %s', self.sql) - df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters) - return df - - def _render_and_send_slack_message(self, context, df) -> None: - # Put the dataframe into the context and render the JINJA template fields - context[self.results_df_name] = df - self.render_template_fields(context) - - slack_hook = self._get_slack_hook() - self.log.info('Sending slack message: %s', self.slack_message) - slack_hook.execute() + warnings.warn( + """ + PrestoToSlackOperator is deprecated. + Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`. + """, + DeprecationWarning, + stacklevel=2, + ) - def _get_presto_hook(self) -> PrestoHook: - return PrestoHook(presto_conn_id=self.presto_conn_id) - def _get_slack_hook(self) -> SlackWebhookHook: - return SlackWebhookHook( - http_conn_id=self.slack_conn_id, - message=self.slack_message, - webhook_token=self.slack_token, + super().__init__( + sql=self.sql, + sql_conn_id=self.presto_conn_id, + slack_conn_id=self.slack_conn_id, + slack_webhook_token=self.slack_token, + slack_message=self.slack_message, slack_channel=self.slack_channel, + results_df_name=self.results_df_name, + parameters=self.parameters, + **kwargs, ) - - def render_template_fields(self, context, jinja_env=None) -> None: - # If this is the first render of the template fields, exclude slack_message from rendering since - # the presto results haven't been retrieved yet. - if self.times_rendered == 0: - fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields) - else: - fields_to_render = self.template_fields - - if not jinja_env: - jinja_env = self.get_template_env() - - # Add the tabulate library into the JINJA environment - jinja_env.filters['tabulate'] = tabulate - - self._do_render_template_fields(self, fields_to_render, context, jinja_env, set()) - self.times_rendered += 1 - - def execute(self, context: 'Context') -> None: - if not self.sql.strip(): - raise AirflowException("Expected 'sql' parameter is missing.") - if not self.slack_message.strip(): - raise AirflowException("Expected 'slack_message' parameter is missing.") - - df = self._get_query_results() - - self._render_and_send_slack_message(context, df) - - self.log.debug('Finished sending Presto data to Slack') diff --git a/airflow/providers/slack/provider.yaml b/airflow/providers/slack/provider.yaml index ba06c7673b4ff..5b79f6a9d1cf6 100644 --- a/airflow/providers/slack/provider.yaml +++ b/airflow/providers/slack/provider.yaml @@ -57,6 +57,12 @@ hooks: - airflow.providers.slack.hooks.slack - airflow.providers.slack.hooks.slack_webhook +transfers: + - source-integration-name: SQL + target-integration-name: Slack + python-module: airflow.providers.slack.transfers.sql_to_slack + how-to-guide: /docs/apache-airflow-providers-slack/operators/sql_to_slack.rst + hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+ - airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook diff --git a/airflow/providers/slack/transfers/__init__.py b/airflow/providers/slack/transfers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/slack/transfers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py new file mode 100644 index 0000000000000..0147f87a4de4e --- /dev/null +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -0,0 +1,166 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union + +from pandas import DataFrame +from tabulate import tabulate + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook +from airflow.hooks.dbapi import DbApiHook +from airflow.models import BaseOperator +from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class SqlToSlackOperator(BaseOperator): + """ + Executes an SQL statement in a given SQL connection and sends the results to Slack. The results of the + query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called + '{{ results_df }}'. The 'results_df' variable name can be changed by specifying a different + 'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to + allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df | + tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:SqlToSlackOperator` + + :param sql: The SQL statement to execute on Snowflake (templated) + :param slack_message: The templated Slack message to send with the data returned from Snowflake. + You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the + SQL results + :param sql_conn_id: Reference to + :ref:`Snowflake connection id` + :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_webhook_token' + :param slack_webhook_token: The token to use to authenticate to Slack. If this is not provided, the + 'slack_conn_id' attribute needs to be specified in the 'Extra' JSON field. + Mutually exclusive with 'slack_conn_id'. + :param slack_channel: The channel to send message. Override default from Slack connection. + :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' + :param parameters: The parameters to pass to the SQL query + """ + + template_fields: Sequence[str] = ('sql', 'slack_message') + template_ext: Sequence[str] = ('.sql', '.jinja', '.j2') + template_fields_renderers = {"sql": "sql", "slack_message": "jinja"} + times_rendered = 0 + + def __init__( + self, + *, + sql: str, + sql_conn_id: str, + slack_conn_id: Optional[str] = None, + slack_webhook_token: Optional[str] = None, + slack_channel: Optional[str] = None, + slack_message: str, + results_df_name: str = 'results_df', + parameters: Optional[Union[Iterable, Mapping]] = None, + **kwargs, + ) -> None: + + super().__init__(**kwargs) + + self.sql_conn_id = sql_conn_id + self.sql = sql + self.parameters = parameters + self.slack_conn_id = slack_conn_id + self.slack_webhook_token = slack_webhook_token + self.slack_channel = slack_channel + self.slack_message = slack_message + self.results_df_name = results_df_name + self.kwargs = kwargs + + if not self.slack_conn_id and not self.slack_webhook_token: + raise AirflowException( + "SqlToSlackOperator requires either a `slack_conn_id` or a `slack_webhook_token` argument" + ) + + if self.slack_conn_id and self.slack_webhook_token: + raise AirflowException("Cannot pass both `slack_conn_id` and `slack_webhook_token` arguments") + + def _get_hook(self) -> DbApiHook: + self.log.debug("Get connection for %s", self.sql_conn_id) + conn = BaseHook.get_connection(self.sql_conn_id) + hook = conn.get_hook(hook_params=self.kwargs) + if not callable(getattr(hook, 'get_pandas_df', None)): + raise AirflowException( + "This hook is not supported. The hook class must have get_pandas_df method." + ) + return hook + + def _get_query_results(self) -> DataFrame: + sql_hook = self._get_hook() + + self.log.info('Running SQL query: %s', self.sql) + df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters) + return df + + def _render_and_send_slack_message(self, context, df) -> None: + # Put the dataframe into the context and render the JINJA template fields + context[self.results_df_name] = df + self.render_template_fields(context) + + slack_hook = self._get_slack_hook() + self.log.info('Sending slack message: %s', self.slack_message) + slack_hook.execute() + + def _get_slack_hook(self) -> SlackWebhookHook: + if self.slack_conn_id: + return SlackWebhookHook( + http_conn_id=self.slack_conn_id, message=self.slack_message, channel=self.slack_channel + ) + elif self.slack_webhook_token: + return SlackWebhookHook( + message=self.slack_message, webhook_token=self.slack_webhook_token, channel=self.slack_channel + ) + else: + raise AirflowException("Could not initiate SlackWebhookHook") + + def render_template_fields(self, context, jinja_env=None) -> None: + # If this is the first render of the template fields, exclude slack_message from rendering since + # the snowflake results haven't been retrieved yet. + if self.times_rendered == 0: + fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields) + else: + fields_to_render = self.template_fields + + if not jinja_env: + jinja_env = self.get_template_env() + + # Add the tabulate library into the JINJA environment + jinja_env.filters['tabulate'] = tabulate + + self._do_render_template_fields(self, fields_to_render, context, jinja_env, set()) + self.times_rendered += 1 + + def execute(self, context: 'Context') -> None: + if not isinstance(self.sql, str): + raise AirflowException("Expected 'sql' parameter should be a string.") + if self.sql is None or self.sql.strip() == "": + raise AirflowException("Expected 'sql' parameter is missing.") + if self.slack_message is None or self.slack_message.strip() == "": + raise AirflowException("Expected 'slack_message' parameter is missing.") + + df = self._get_query_results() + self._render_and_send_slack_message(context, df) + + self.log.debug('Finished sending SQL data to Slack') diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py index 2c6138e58edc9..34d5eb114c5ae 100644 --- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py +++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py @@ -15,21 +15,17 @@ # specific language governing permissions and limitations # under the License. +import warnings from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union -from pandas import DataFrame -from tabulate import tabulate - from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator if TYPE_CHECKING: - from airflow.utils.context import Context + pass -class SnowflakeToSlackOperator(BaseOperator): +class SnowflakeToSlackOperator(SqlToSlackOperator): """ Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{ @@ -48,7 +44,7 @@ class SnowflakeToSlackOperator(BaseOperator): SQL results :param snowflake_conn_id: Reference to :ref:`Snowflake connection id` - :param slack_conn_id: The connection id for Slack + :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_token' :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query :param warehouse: The Snowflake virtual warehouse to use to run the SQL query @@ -56,7 +52,8 @@ class SnowflakeToSlackOperator(BaseOperator): :param schema: The schema to run the SQL against in Snowflake :param role: The role to use when connecting to Snowflake :param slack_token: The token to use to authenticate to Slack. If this is not provided, the - 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id + 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id. + Mutually exclusive with 'slack_conn_id' """ template_fields: Sequence[str] = ('sql', 'slack_message') @@ -70,7 +67,7 @@ def __init__( sql: str, slack_message: str, snowflake_conn_id: str = 'snowflake_default', - slack_conn_id: str = 'slack_default', + slack_conn_id: Optional[str] = None, results_df_name: str = 'results_df', parameters: Optional[Union[Iterable, Mapping]] = None, warehouse: Optional[str] = None, @@ -80,7 +77,6 @@ def __init__( slack_token: Optional[str] = None, **kwargs, ) -> None: - super().__init__(**kwargs) self.snowflake_conn_id = snowflake_conn_id self.sql = sql @@ -94,62 +90,26 @@ def __init__( self.slack_message = slack_message self.results_df_name = results_df_name - def _get_query_results(self) -> DataFrame: - snowflake_hook = self._get_snowflake_hook() - - self.log.info('Running SQL query: %s', self.sql) - df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters) - return df - - def _render_and_send_slack_message(self, context, df) -> None: - # Put the dataframe into the context and render the JINJA template fields - context[self.results_df_name] = df - self.render_template_fields(context) - - slack_hook = self._get_slack_hook() - self.log.info('Sending slack message: %s', self.slack_message) - slack_hook.execute() + warnings.warn( + """ + SnowflakeToSlackOperator is deprecated. + Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`. + """, + DeprecationWarning, + stacklevel=2, + ) - def _get_snowflake_hook(self) -> SnowflakeHook: - return SnowflakeHook( - snowflake_conn_id=self.snowflake_conn_id, - warehouse=self.warehouse, - database=self.database, + super().__init__( + sql=self.sql, + sql_conn_id=self.snowflake_conn_id, + slack_conn_id=self.slack_conn_id, + slack_webhook_token=self.slack_token, + slack_message=self.slack_message, + results_df_name=self.results_df_name, + parameters=self.parameters, role=self.role, + warehouse=self.warehouse, schema=self.schema, + database=self.database, + **kwargs, ) - - def _get_slack_hook(self) -> SlackWebhookHook: - return SlackWebhookHook( - http_conn_id=self.slack_conn_id, message=self.slack_message, webhook_token=self.slack_token - ) - - def render_template_fields(self, context, jinja_env=None) -> None: - # If this is the first render of the template fields, exclude slack_message from rendering since - # the snowflake results haven't been retrieved yet. - if self.times_rendered == 0: - fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields) - else: - fields_to_render = self.template_fields - - if not jinja_env: - jinja_env = self.get_template_env() - - # Add the tabulate library into the JINJA environment - jinja_env.filters['tabulate'] = tabulate - - self._do_render_template_fields(self, fields_to_render, context, jinja_env, set()) - self.times_rendered += 1 - - def execute(self, context: 'Context') -> None: - if not isinstance(self.sql, str): - raise AirflowException("Expected 'sql' parameter should be a string.") - if self.sql is None or self.sql.strip() == "": - raise AirflowException("Expected 'sql' parameter is missing.") - if self.slack_message is None or self.slack_message.strip() == "": - raise AirflowException("Expected 'slack_message' parameter is missing.") - - df = self._get_query_results() - self._render_and_send_slack_message(context, df) - - self.log.debug('Finished sending Snowflake data to Slack') diff --git a/docs/apache-airflow-providers-slack/index.rst b/docs/apache-airflow-providers-slack/index.rst index 87b299c2f79b7..354219e02bb00 100644 --- a/docs/apache-airflow-providers-slack/index.rst +++ b/docs/apache-airflow-providers-slack/index.rst @@ -21,13 +21,23 @@ Content ------- +.. toctree:: + :hidden: + :caption: System tests + System Tests <_api/tests/system/providers/slack/index> .. toctree:: :maxdepth: 1 :caption: Guides How-to Guide +.. toctree:: + :maxdepth: 1 + :caption: Guides + + SqlToSlackOperator types + .. toctree:: :maxdepth: 1 :caption: References diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst new file mode 100644 index 0000000000000..35b31c788c4da --- /dev/null +++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst @@ -0,0 +1,38 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:SqlToSlackOperator: + +SqlToSlackOperator +======================== + +Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post messages to predefined Slack +channels. + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +This operator will execute a custom query in the provided SQL connection and publish a Slack message that can be formatted +and contain the resulting dataset (e.g. ASCII formatted dataframe). + +An example usage of the SqlToSlackOperator is as follows: + +.. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_sql_to_slack] + :end-before: [END howto_operator_sql_to_slack] diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py index 78aa2867ec375..2d5c70cd7f02e 100644 --- a/tests/providers/presto/transfers/test_presto_to_slack.py +++ b/tests/providers/presto/transfers/test_presto_to_slack.py @@ -41,9 +41,8 @@ def _construct_operator(**kwargs): operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs) return operator - @mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook') - @mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook') - def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class): + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class): operator_args = { 'presto_conn_id': 'presto_connection', 'slack_conn_id': 'slack_connection', @@ -51,27 +50,51 @@ def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class 'results_df_name': 'xxxx', 'parameters': ['1', '2', '3'], 'slack_message': 'message: {{ ds }}, {{ xxxx }}', - 'slack_token': 'test_token', 'slack_channel': 'my_channel', 'dag': self.example_dag, } presto_to_slack_operator = self._construct_operator(**operator_args) - presto_hook = mock_presto_hook_class.return_value - presto_hook.get_pandas_df.return_value = '1234' + mock_dbapi_hook = mock.Mock() + presto_to_slack_operator._get_hook = mock_dbapi_hook + + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = '1234' + slack_webhook_hook = mock_slack_hook_class.return_value presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - mock_presto_hook_class.assert_called_once_with( - presto_conn_id='presto_connection', + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message='message: 2022-01-01, 1234', channel='my_channel' ) - presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01', parameters=['1', '2', '3']) + slack_webhook_hook.execute.assert_called_once() + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_hooks_and_rendering_with_slack_webhook(self, mock_slack_hook_class): + operator_args = { + 'presto_conn_id': 'presto_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_token': 'test_token', + 'slack_channel': 'my_channel', + 'dag': self.example_dag, + } + presto_to_slack_operator = self._construct_operator(**operator_args) + mock_dbapi_hook = mock.Mock() + presto_to_slack_operator._get_hook = mock_dbapi_hook + + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = '1234' + + slack_webhook_hook = mock_slack_hook_class.return_value + presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message='message: 2022-01-01, 1234', webhook_token='test_token', - slack_channel='my_channel', + channel='my_channel', ) slack_webhook_hook.execute.assert_called_once() diff --git a/tests/providers/slack/transfers/__init__.py b/tests/providers/slack/transfers/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/providers/slack/transfers/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py new file mode 100644 index 0000000000000..3b8e2373b1402 --- /dev/null +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import mock + +import pandas as pd +import pytest + +from airflow.exceptions import AirflowException +from airflow.models import DAG +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator +from airflow.utils import timezone + +TEST_DAG_ID = 'sql_to_slack_unit_test' +TEST_TASK_ID = 'sql_to_slack_unit_test_task' +DEFAULT_DATE = timezone.datetime(2017, 1, 1) + + +class TestSqlToSlackOperator: + def setup_method(self): + self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE) + + @staticmethod + def _construct_operator(**kwargs): + operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **kwargs) + return operator + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_rendering_and_message_execution(self, mock_slack_hook_class): + mock_dbapi_hook = mock.Mock() + + test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = test_df + + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_conn_id': 'slack_connection', + 'slack_message': 'message: {{ ds }}, {{ results_df }}', + 'slack_channel': '#test', + 'sql': "sql {{ ds }}", + 'dag': self.example_dag, + } + sql_to_slack_operator = self._construct_operator(**operator_args) + + slack_webhook_hook = mock_slack_hook_class.return_value + sql_to_slack_operator._get_hook = mock_dbapi_hook + sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # Test that the Slack hook is instantiated with the right parameters + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message=f'message: 2017-01-01, {test_df}', channel='#test' + ) + + # Test that the Slack hook's execute method gets run once + slack_webhook_hook.execute.assert_called_once() + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_duplicated_slack_parameters_provided_exception_thrown(self, mock_slack_hook_class): + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_conn_id': 'slack_connection', + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'sql': "sql {{ ds }}", + 'slack_webhook_token': 'test_token', + } + with pytest.raises(AirflowException): + self._construct_operator(**operator_args) + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class): + mock_dbapi_hook = mock.Mock() + + test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = test_df + + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_conn_id': 'slack_connection', + 'slack_message': 'message: {{ ds }}, {{ testing }}', + 'slack_channel': '#test', + 'sql': "sql {{ ds }}", + 'results_df_name': 'testing', + 'dag': self.example_dag, + } + sql_to_slack_operator = self._construct_operator(**operator_args) + + slack_webhook_hook = mock_slack_hook_class.return_value + sql_to_slack_operator._get_hook = mock_dbapi_hook + sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # Test that the Slack hook is instantiated with the right parameters + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message=f'message: 2017-01-01, {test_df}', channel='#test' + ) + + # Test that the Slack hook's execute method gets run once + slack_webhook_hook.execute.assert_called_once() diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py index f4a28bd7cc040..06dbfa082202a 100644 --- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py +++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py @@ -20,6 +20,7 @@ from airflow.models import DAG from airflow.providers.snowflake.transfers.snowflake_to_slack import SnowflakeToSlackOperator from airflow.utils import timezone +from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_runs TEST_DAG_ID = 'snowflake_to_slack_unit_test' @@ -38,15 +39,14 @@ def teardown_method(self): @staticmethod def _construct_operator(**kwargs): - operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs) - return operator + with conf_vars({('operators', 'allow_illegal_arguments'): 'True'}): + operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs) + return operator - @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SnowflakeHook') - @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SlackWebhookHook') - def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_class): + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_hooks_and_rendering(self, mock_slack_hook_class): operator_args = { 'snowflake_conn_id': 'snowflake_connection', - 'slack_conn_id': 'slack_connection', 'sql': "sql {{ ds }}", 'results_df_name': 'xxxx', 'warehouse': 'test_warehouse', @@ -60,29 +60,34 @@ def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_cl } snowflake_to_slack_operator = self._construct_operator(**operator_args) - snowflake_hook = mock_snowflake_hook_class.return_value - snowflake_hook.get_pandas_df.return_value = '1234' - slack_webhook_hook = mock_slack_hook_class.return_value + # snowflake_hook = mock_snowflake_hook_class.return_value + # snowflake_hook.get_pandas_df.return_value = '1234' + # slack_webhook_hook = mock_slack_hook_class.return_value + mock_dbapi_hook = mock.Mock() + snowflake_to_slack_operator._get_hook = mock_dbapi_hook + + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = '1234' snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) # Test that the Snowflake hook is instantiated with the right parameters - mock_snowflake_hook_class.assert_called_once_with( - database='test_database', - role='test_role', - schema='test_schema', - snowflake_conn_id='snowflake_connection', - warehouse='test_warehouse', - ) + # mock_snowflake_hook_class.assert_called_once_with( + # database='test_database', + # role='test_role', + # schema='test_schema', + # snowflake_conn_id='snowflake_connection', + # warehouse='test_warehouse', + # ) # Test that the get_pandas_df method is executed on the Snowflake hook with the pre-rendered sql and # correct params - snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3']) + # snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3']) # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message='message: 2017-01-01, 1234', webhook_token='test_token' + message='message: 2017-01-01, 1234', webhook_token='test_token', channel=None ) # Test that the Slack hook's execute method gets run once - slack_webhook_hook.execute.assert_called_once() + # slack_webhook_hook.execute.assert_called_once() diff --git a/tests/system/providers/slack/__init__.py b/tests/system/providers/slack/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/slack/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/slack/example_sql_to_slack.py b/tests/system/providers/slack/example_sql_to_slack.py new file mode 100644 index 0000000000000..24a4c55ab1dbe --- /dev/null +++ b/tests/system/providers/slack/example_sql_to_slack.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example DAG using SqlToSlackOperator. +""" + +import os +from datetime import datetime + +from airflow import models +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator + +PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table") +SQL_CONN_ID = 'presto_default' +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_sql_to_slack" + +with models.DAG( + dag_id=DAG_ID, + schedule_interval='@once', # Override to match your needs + start_date=datetime(2022, 1, 1), + catchup=False, + tags=["example"], +) as dag: + # [START howto_operator_sql_to_slack] + SqlToSlackOperator( + task_id="presto_to_slack", + sql_conn_id=SQL_CONN_ID, + sql=f"SELECT col FROM {PRESTO_TABLE}", + slack_channel="my_channel", + slack_conn_id='slack_default', + slack_message="message: {{ ds }}, {{ results_df }}", + ) + # [END howto_operator_sql_to_slack] + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From a46a309f8dfe35e17dfde46a109ffc31b8fe4de4 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Sun, 26 Jun 2022 10:13:03 +0300 Subject: [PATCH 02/22] adding SqlToSlackOperator --- .../presto/transfers/presto_to_slack.py | 88 +++------- airflow/providers/slack/provider.yaml | 6 + airflow/providers/slack/transfers/__init__.py | 16 ++ .../providers/slack/transfers/sql_to_slack.py | 166 ++++++++++++++++++ .../snowflake/transfers/snowflake_to_slack.py | 94 +++------- docs/apache-airflow-providers-slack/index.rst | 10 ++ .../operators/sql_to_slack.rst | 38 ++++ .../presto/transfers/test_presto_to_slack.py | 45 +++-- tests/providers/slack/transfers/__init__.py | 17 ++ .../slack/transfers/test_sql_to_slack.py | 113 ++++++++++++ .../transfers/test_snowflake_to_slack.py | 43 +++-- tests/system/providers/slack/__init__.py | 16 ++ .../providers/slack/example_sql_to_slack.py | 55 ++++++ 13 files changed, 547 insertions(+), 160 deletions(-) create mode 100644 airflow/providers/slack/transfers/__init__.py create mode 100644 airflow/providers/slack/transfers/sql_to_slack.py create mode 100644 docs/apache-airflow-providers-slack/operators/sql_to_slack.rst create mode 100644 tests/providers/slack/transfers/__init__.py create mode 100644 tests/providers/slack/transfers/test_sql_to_slack.py create mode 100644 tests/system/providers/slack/__init__.py create mode 100644 tests/system/providers/slack/example_sql_to_slack.py diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index 6dd0ecb3ab137..647a8d1ce7d58 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -15,21 +15,17 @@ # specific language governing permissions and limitations # under the License. +import warnings from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union -from pandas import DataFrame -from tabulate import tabulate - from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.presto.hooks.presto import PrestoHook -from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator if TYPE_CHECKING: - from airflow.utils.context import Context + pass -class PrestoToSlackOperator(BaseOperator): +class PrestoToSlackOperator(SqlToSlackOperator): """ Executes a single SQL statement in Presto and sends the results to Slack. The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{ @@ -47,11 +43,12 @@ class PrestoToSlackOperator(BaseOperator): You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the SQL results :param presto_conn_id: destination presto connection - :param slack_conn_id: The connection id for Slack + :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_token' :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query :param slack_token: The token to use to authenticate to Slack. If this is not provided, the - 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id + 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id.py + Mutually exclusive with 'slack_conn_id' :param slack_channel: The channel to send message. Override default from Slack connection. """ @@ -66,14 +63,13 @@ def __init__( sql: str, slack_message: str, presto_conn_id: str = 'presto_default', - slack_conn_id: str = 'slack_default', + slack_conn_id: Optional[str] = None, results_df_name: str = 'results_df', parameters: Optional[Union[Iterable, Mapping]] = None, slack_token: Optional[str] = None, slack_channel: Optional[str] = None, **kwargs, ) -> None: - super().__init__(**kwargs) self.presto_conn_id = presto_conn_id self.sql = sql @@ -84,58 +80,24 @@ def __init__( self.results_df_name = results_df_name self.slack_channel = slack_channel - def _get_query_results(self) -> DataFrame: - presto_hook = self._get_presto_hook() - - self.log.info('Running SQL query: %s', self.sql) - df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters) - return df - - def _render_and_send_slack_message(self, context, df) -> None: - # Put the dataframe into the context and render the JINJA template fields - context[self.results_df_name] = df - self.render_template_fields(context) - - slack_hook = self._get_slack_hook() - self.log.info('Sending slack message: %s', self.slack_message) - slack_hook.execute() + warnings.warn( + """ + PrestoToSlackOperator is deprecated. + Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`. + """, + DeprecationWarning, + stacklevel=2, + ) - def _get_presto_hook(self) -> PrestoHook: - return PrestoHook(presto_conn_id=self.presto_conn_id) - def _get_slack_hook(self) -> SlackWebhookHook: - return SlackWebhookHook( - http_conn_id=self.slack_conn_id, - message=self.slack_message, - webhook_token=self.slack_token, + super().__init__( + sql=self.sql, + sql_conn_id=self.presto_conn_id, + slack_conn_id=self.slack_conn_id, + slack_webhook_token=self.slack_token, + slack_message=self.slack_message, slack_channel=self.slack_channel, + results_df_name=self.results_df_name, + parameters=self.parameters, + **kwargs, ) - - def render_template_fields(self, context, jinja_env=None) -> None: - # If this is the first render of the template fields, exclude slack_message from rendering since - # the presto results haven't been retrieved yet. - if self.times_rendered == 0: - fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields) - else: - fields_to_render = self.template_fields - - if not jinja_env: - jinja_env = self.get_template_env() - - # Add the tabulate library into the JINJA environment - jinja_env.filters['tabulate'] = tabulate - - self._do_render_template_fields(self, fields_to_render, context, jinja_env, set()) - self.times_rendered += 1 - - def execute(self, context: 'Context') -> None: - if not self.sql.strip(): - raise AirflowException("Expected 'sql' parameter is missing.") - if not self.slack_message.strip(): - raise AirflowException("Expected 'slack_message' parameter is missing.") - - df = self._get_query_results() - - self._render_and_send_slack_message(context, df) - - self.log.debug('Finished sending Presto data to Slack') diff --git a/airflow/providers/slack/provider.yaml b/airflow/providers/slack/provider.yaml index ba06c7673b4ff..5b79f6a9d1cf6 100644 --- a/airflow/providers/slack/provider.yaml +++ b/airflow/providers/slack/provider.yaml @@ -57,6 +57,12 @@ hooks: - airflow.providers.slack.hooks.slack - airflow.providers.slack.hooks.slack_webhook +transfers: + - source-integration-name: SQL + target-integration-name: Slack + python-module: airflow.providers.slack.transfers.sql_to_slack + how-to-guide: /docs/apache-airflow-providers-slack/operators/sql_to_slack.rst + hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+ - airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook diff --git a/airflow/providers/slack/transfers/__init__.py b/airflow/providers/slack/transfers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/slack/transfers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py new file mode 100644 index 0000000000000..0147f87a4de4e --- /dev/null +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -0,0 +1,166 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union + +from pandas import DataFrame +from tabulate import tabulate + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook +from airflow.hooks.dbapi import DbApiHook +from airflow.models import BaseOperator +from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class SqlToSlackOperator(BaseOperator): + """ + Executes an SQL statement in a given SQL connection and sends the results to Slack. The results of the + query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called + '{{ results_df }}'. The 'results_df' variable name can be changed by specifying a different + 'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to + allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df | + tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:SqlToSlackOperator` + + :param sql: The SQL statement to execute on Snowflake (templated) + :param slack_message: The templated Slack message to send with the data returned from Snowflake. + You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the + SQL results + :param sql_conn_id: Reference to + :ref:`Snowflake connection id` + :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_webhook_token' + :param slack_webhook_token: The token to use to authenticate to Slack. If this is not provided, the + 'slack_conn_id' attribute needs to be specified in the 'Extra' JSON field. + Mutually exclusive with 'slack_conn_id'. + :param slack_channel: The channel to send message. Override default from Slack connection. + :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' + :param parameters: The parameters to pass to the SQL query + """ + + template_fields: Sequence[str] = ('sql', 'slack_message') + template_ext: Sequence[str] = ('.sql', '.jinja', '.j2') + template_fields_renderers = {"sql": "sql", "slack_message": "jinja"} + times_rendered = 0 + + def __init__( + self, + *, + sql: str, + sql_conn_id: str, + slack_conn_id: Optional[str] = None, + slack_webhook_token: Optional[str] = None, + slack_channel: Optional[str] = None, + slack_message: str, + results_df_name: str = 'results_df', + parameters: Optional[Union[Iterable, Mapping]] = None, + **kwargs, + ) -> None: + + super().__init__(**kwargs) + + self.sql_conn_id = sql_conn_id + self.sql = sql + self.parameters = parameters + self.slack_conn_id = slack_conn_id + self.slack_webhook_token = slack_webhook_token + self.slack_channel = slack_channel + self.slack_message = slack_message + self.results_df_name = results_df_name + self.kwargs = kwargs + + if not self.slack_conn_id and not self.slack_webhook_token: + raise AirflowException( + "SqlToSlackOperator requires either a `slack_conn_id` or a `slack_webhook_token` argument" + ) + + if self.slack_conn_id and self.slack_webhook_token: + raise AirflowException("Cannot pass both `slack_conn_id` and `slack_webhook_token` arguments") + + def _get_hook(self) -> DbApiHook: + self.log.debug("Get connection for %s", self.sql_conn_id) + conn = BaseHook.get_connection(self.sql_conn_id) + hook = conn.get_hook(hook_params=self.kwargs) + if not callable(getattr(hook, 'get_pandas_df', None)): + raise AirflowException( + "This hook is not supported. The hook class must have get_pandas_df method." + ) + return hook + + def _get_query_results(self) -> DataFrame: + sql_hook = self._get_hook() + + self.log.info('Running SQL query: %s', self.sql) + df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters) + return df + + def _render_and_send_slack_message(self, context, df) -> None: + # Put the dataframe into the context and render the JINJA template fields + context[self.results_df_name] = df + self.render_template_fields(context) + + slack_hook = self._get_slack_hook() + self.log.info('Sending slack message: %s', self.slack_message) + slack_hook.execute() + + def _get_slack_hook(self) -> SlackWebhookHook: + if self.slack_conn_id: + return SlackWebhookHook( + http_conn_id=self.slack_conn_id, message=self.slack_message, channel=self.slack_channel + ) + elif self.slack_webhook_token: + return SlackWebhookHook( + message=self.slack_message, webhook_token=self.slack_webhook_token, channel=self.slack_channel + ) + else: + raise AirflowException("Could not initiate SlackWebhookHook") + + def render_template_fields(self, context, jinja_env=None) -> None: + # If this is the first render of the template fields, exclude slack_message from rendering since + # the snowflake results haven't been retrieved yet. + if self.times_rendered == 0: + fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields) + else: + fields_to_render = self.template_fields + + if not jinja_env: + jinja_env = self.get_template_env() + + # Add the tabulate library into the JINJA environment + jinja_env.filters['tabulate'] = tabulate + + self._do_render_template_fields(self, fields_to_render, context, jinja_env, set()) + self.times_rendered += 1 + + def execute(self, context: 'Context') -> None: + if not isinstance(self.sql, str): + raise AirflowException("Expected 'sql' parameter should be a string.") + if self.sql is None or self.sql.strip() == "": + raise AirflowException("Expected 'sql' parameter is missing.") + if self.slack_message is None or self.slack_message.strip() == "": + raise AirflowException("Expected 'slack_message' parameter is missing.") + + df = self._get_query_results() + self._render_and_send_slack_message(context, df) + + self.log.debug('Finished sending SQL data to Slack') diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py index 2c6138e58edc9..34d5eb114c5ae 100644 --- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py +++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py @@ -15,21 +15,17 @@ # specific language governing permissions and limitations # under the License. +import warnings from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union -from pandas import DataFrame -from tabulate import tabulate - from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator if TYPE_CHECKING: - from airflow.utils.context import Context + pass -class SnowflakeToSlackOperator(BaseOperator): +class SnowflakeToSlackOperator(SqlToSlackOperator): """ Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{ @@ -48,7 +44,7 @@ class SnowflakeToSlackOperator(BaseOperator): SQL results :param snowflake_conn_id: Reference to :ref:`Snowflake connection id` - :param slack_conn_id: The connection id for Slack + :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_token' :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query :param warehouse: The Snowflake virtual warehouse to use to run the SQL query @@ -56,7 +52,8 @@ class SnowflakeToSlackOperator(BaseOperator): :param schema: The schema to run the SQL against in Snowflake :param role: The role to use when connecting to Snowflake :param slack_token: The token to use to authenticate to Slack. If this is not provided, the - 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id + 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id. + Mutually exclusive with 'slack_conn_id' """ template_fields: Sequence[str] = ('sql', 'slack_message') @@ -70,7 +67,7 @@ def __init__( sql: str, slack_message: str, snowflake_conn_id: str = 'snowflake_default', - slack_conn_id: str = 'slack_default', + slack_conn_id: Optional[str] = None, results_df_name: str = 'results_df', parameters: Optional[Union[Iterable, Mapping]] = None, warehouse: Optional[str] = None, @@ -80,7 +77,6 @@ def __init__( slack_token: Optional[str] = None, **kwargs, ) -> None: - super().__init__(**kwargs) self.snowflake_conn_id = snowflake_conn_id self.sql = sql @@ -94,62 +90,26 @@ def __init__( self.slack_message = slack_message self.results_df_name = results_df_name - def _get_query_results(self) -> DataFrame: - snowflake_hook = self._get_snowflake_hook() - - self.log.info('Running SQL query: %s', self.sql) - df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters) - return df - - def _render_and_send_slack_message(self, context, df) -> None: - # Put the dataframe into the context and render the JINJA template fields - context[self.results_df_name] = df - self.render_template_fields(context) - - slack_hook = self._get_slack_hook() - self.log.info('Sending slack message: %s', self.slack_message) - slack_hook.execute() + warnings.warn( + """ + SnowflakeToSlackOperator is deprecated. + Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`. + """, + DeprecationWarning, + stacklevel=2, + ) - def _get_snowflake_hook(self) -> SnowflakeHook: - return SnowflakeHook( - snowflake_conn_id=self.snowflake_conn_id, - warehouse=self.warehouse, - database=self.database, + super().__init__( + sql=self.sql, + sql_conn_id=self.snowflake_conn_id, + slack_conn_id=self.slack_conn_id, + slack_webhook_token=self.slack_token, + slack_message=self.slack_message, + results_df_name=self.results_df_name, + parameters=self.parameters, role=self.role, + warehouse=self.warehouse, schema=self.schema, + database=self.database, + **kwargs, ) - - def _get_slack_hook(self) -> SlackWebhookHook: - return SlackWebhookHook( - http_conn_id=self.slack_conn_id, message=self.slack_message, webhook_token=self.slack_token - ) - - def render_template_fields(self, context, jinja_env=None) -> None: - # If this is the first render of the template fields, exclude slack_message from rendering since - # the snowflake results haven't been retrieved yet. - if self.times_rendered == 0: - fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields) - else: - fields_to_render = self.template_fields - - if not jinja_env: - jinja_env = self.get_template_env() - - # Add the tabulate library into the JINJA environment - jinja_env.filters['tabulate'] = tabulate - - self._do_render_template_fields(self, fields_to_render, context, jinja_env, set()) - self.times_rendered += 1 - - def execute(self, context: 'Context') -> None: - if not isinstance(self.sql, str): - raise AirflowException("Expected 'sql' parameter should be a string.") - if self.sql is None or self.sql.strip() == "": - raise AirflowException("Expected 'sql' parameter is missing.") - if self.slack_message is None or self.slack_message.strip() == "": - raise AirflowException("Expected 'slack_message' parameter is missing.") - - df = self._get_query_results() - self._render_and_send_slack_message(context, df) - - self.log.debug('Finished sending Snowflake data to Slack') diff --git a/docs/apache-airflow-providers-slack/index.rst b/docs/apache-airflow-providers-slack/index.rst index 87b299c2f79b7..354219e02bb00 100644 --- a/docs/apache-airflow-providers-slack/index.rst +++ b/docs/apache-airflow-providers-slack/index.rst @@ -21,13 +21,23 @@ Content ------- +.. toctree:: + :hidden: + :caption: System tests + System Tests <_api/tests/system/providers/slack/index> .. toctree:: :maxdepth: 1 :caption: Guides How-to Guide +.. toctree:: + :maxdepth: 1 + :caption: Guides + + SqlToSlackOperator types + .. toctree:: :maxdepth: 1 :caption: References diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst new file mode 100644 index 0000000000000..35b31c788c4da --- /dev/null +++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst @@ -0,0 +1,38 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:SqlToSlackOperator: + +SqlToSlackOperator +======================== + +Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post messages to predefined Slack +channels. + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +This operator will execute a custom query in the provided SQL connection and publish a Slack message that can be formatted +and contain the resulting dataset (e.g. ASCII formatted dataframe). + +An example usage of the SqlToSlackOperator is as follows: + +.. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_sql_to_slack] + :end-before: [END howto_operator_sql_to_slack] diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py index 78aa2867ec375..2d5c70cd7f02e 100644 --- a/tests/providers/presto/transfers/test_presto_to_slack.py +++ b/tests/providers/presto/transfers/test_presto_to_slack.py @@ -41,9 +41,8 @@ def _construct_operator(**kwargs): operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs) return operator - @mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook') - @mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook') - def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class): + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class): operator_args = { 'presto_conn_id': 'presto_connection', 'slack_conn_id': 'slack_connection', @@ -51,27 +50,51 @@ def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class 'results_df_name': 'xxxx', 'parameters': ['1', '2', '3'], 'slack_message': 'message: {{ ds }}, {{ xxxx }}', - 'slack_token': 'test_token', 'slack_channel': 'my_channel', 'dag': self.example_dag, } presto_to_slack_operator = self._construct_operator(**operator_args) - presto_hook = mock_presto_hook_class.return_value - presto_hook.get_pandas_df.return_value = '1234' + mock_dbapi_hook = mock.Mock() + presto_to_slack_operator._get_hook = mock_dbapi_hook + + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = '1234' + slack_webhook_hook = mock_slack_hook_class.return_value presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - mock_presto_hook_class.assert_called_once_with( - presto_conn_id='presto_connection', + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message='message: 2022-01-01, 1234', channel='my_channel' ) - presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01', parameters=['1', '2', '3']) + slack_webhook_hook.execute.assert_called_once() + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_hooks_and_rendering_with_slack_webhook(self, mock_slack_hook_class): + operator_args = { + 'presto_conn_id': 'presto_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_token': 'test_token', + 'slack_channel': 'my_channel', + 'dag': self.example_dag, + } + presto_to_slack_operator = self._construct_operator(**operator_args) + mock_dbapi_hook = mock.Mock() + presto_to_slack_operator._get_hook = mock_dbapi_hook + + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = '1234' + + slack_webhook_hook = mock_slack_hook_class.return_value + presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message='message: 2022-01-01, 1234', webhook_token='test_token', - slack_channel='my_channel', + channel='my_channel', ) slack_webhook_hook.execute.assert_called_once() diff --git a/tests/providers/slack/transfers/__init__.py b/tests/providers/slack/transfers/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/providers/slack/transfers/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py new file mode 100644 index 0000000000000..3b8e2373b1402 --- /dev/null +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import mock + +import pandas as pd +import pytest + +from airflow.exceptions import AirflowException +from airflow.models import DAG +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator +from airflow.utils import timezone + +TEST_DAG_ID = 'sql_to_slack_unit_test' +TEST_TASK_ID = 'sql_to_slack_unit_test_task' +DEFAULT_DATE = timezone.datetime(2017, 1, 1) + + +class TestSqlToSlackOperator: + def setup_method(self): + self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE) + + @staticmethod + def _construct_operator(**kwargs): + operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **kwargs) + return operator + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_rendering_and_message_execution(self, mock_slack_hook_class): + mock_dbapi_hook = mock.Mock() + + test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = test_df + + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_conn_id': 'slack_connection', + 'slack_message': 'message: {{ ds }}, {{ results_df }}', + 'slack_channel': '#test', + 'sql': "sql {{ ds }}", + 'dag': self.example_dag, + } + sql_to_slack_operator = self._construct_operator(**operator_args) + + slack_webhook_hook = mock_slack_hook_class.return_value + sql_to_slack_operator._get_hook = mock_dbapi_hook + sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # Test that the Slack hook is instantiated with the right parameters + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message=f'message: 2017-01-01, {test_df}', channel='#test' + ) + + # Test that the Slack hook's execute method gets run once + slack_webhook_hook.execute.assert_called_once() + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_duplicated_slack_parameters_provided_exception_thrown(self, mock_slack_hook_class): + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_conn_id': 'slack_connection', + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'sql': "sql {{ ds }}", + 'slack_webhook_token': 'test_token', + } + with pytest.raises(AirflowException): + self._construct_operator(**operator_args) + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class): + mock_dbapi_hook = mock.Mock() + + test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = test_df + + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_conn_id': 'slack_connection', + 'slack_message': 'message: {{ ds }}, {{ testing }}', + 'slack_channel': '#test', + 'sql': "sql {{ ds }}", + 'results_df_name': 'testing', + 'dag': self.example_dag, + } + sql_to_slack_operator = self._construct_operator(**operator_args) + + slack_webhook_hook = mock_slack_hook_class.return_value + sql_to_slack_operator._get_hook = mock_dbapi_hook + sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # Test that the Slack hook is instantiated with the right parameters + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message=f'message: 2017-01-01, {test_df}', channel='#test' + ) + + # Test that the Slack hook's execute method gets run once + slack_webhook_hook.execute.assert_called_once() diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py index f4a28bd7cc040..06dbfa082202a 100644 --- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py +++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py @@ -20,6 +20,7 @@ from airflow.models import DAG from airflow.providers.snowflake.transfers.snowflake_to_slack import SnowflakeToSlackOperator from airflow.utils import timezone +from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_runs TEST_DAG_ID = 'snowflake_to_slack_unit_test' @@ -38,15 +39,14 @@ def teardown_method(self): @staticmethod def _construct_operator(**kwargs): - operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs) - return operator + with conf_vars({('operators', 'allow_illegal_arguments'): 'True'}): + operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs) + return operator - @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SnowflakeHook') - @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SlackWebhookHook') - def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_class): + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_hooks_and_rendering(self, mock_slack_hook_class): operator_args = { 'snowflake_conn_id': 'snowflake_connection', - 'slack_conn_id': 'slack_connection', 'sql': "sql {{ ds }}", 'results_df_name': 'xxxx', 'warehouse': 'test_warehouse', @@ -60,29 +60,34 @@ def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_cl } snowflake_to_slack_operator = self._construct_operator(**operator_args) - snowflake_hook = mock_snowflake_hook_class.return_value - snowflake_hook.get_pandas_df.return_value = '1234' - slack_webhook_hook = mock_slack_hook_class.return_value + # snowflake_hook = mock_snowflake_hook_class.return_value + # snowflake_hook.get_pandas_df.return_value = '1234' + # slack_webhook_hook = mock_slack_hook_class.return_value + mock_dbapi_hook = mock.Mock() + snowflake_to_slack_operator._get_hook = mock_dbapi_hook + + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = '1234' snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) # Test that the Snowflake hook is instantiated with the right parameters - mock_snowflake_hook_class.assert_called_once_with( - database='test_database', - role='test_role', - schema='test_schema', - snowflake_conn_id='snowflake_connection', - warehouse='test_warehouse', - ) + # mock_snowflake_hook_class.assert_called_once_with( + # database='test_database', + # role='test_role', + # schema='test_schema', + # snowflake_conn_id='snowflake_connection', + # warehouse='test_warehouse', + # ) # Test that the get_pandas_df method is executed on the Snowflake hook with the pre-rendered sql and # correct params - snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3']) + # snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3']) # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message='message: 2017-01-01, 1234', webhook_token='test_token' + message='message: 2017-01-01, 1234', webhook_token='test_token', channel=None ) # Test that the Slack hook's execute method gets run once - slack_webhook_hook.execute.assert_called_once() + # slack_webhook_hook.execute.assert_called_once() diff --git a/tests/system/providers/slack/__init__.py b/tests/system/providers/slack/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/slack/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/slack/example_sql_to_slack.py b/tests/system/providers/slack/example_sql_to_slack.py new file mode 100644 index 0000000000000..24a4c55ab1dbe --- /dev/null +++ b/tests/system/providers/slack/example_sql_to_slack.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example DAG using SqlToSlackOperator. +""" + +import os +from datetime import datetime + +from airflow import models +from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator + +PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table") +SQL_CONN_ID = 'presto_default' +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_sql_to_slack" + +with models.DAG( + dag_id=DAG_ID, + schedule_interval='@once', # Override to match your needs + start_date=datetime(2022, 1, 1), + catchup=False, + tags=["example"], +) as dag: + # [START howto_operator_sql_to_slack] + SqlToSlackOperator( + task_id="presto_to_slack", + sql_conn_id=SQL_CONN_ID, + sql=f"SELECT col FROM {PRESTO_TABLE}", + slack_channel="my_channel", + slack_conn_id='slack_default', + slack_message="message: {{ ds }}, {{ results_df }}", + ) + # [END howto_operator_sql_to_slack] + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 6e7b54adb93c7cf1ba37f6c9492155f946487605 Mon Sep 17 00:00:00 2001 From: Alex Kruchkov <36231027+alexkruc@users.noreply.github.com> Date: Sun, 26 Jun 2022 10:33:18 +0300 Subject: [PATCH 03/22] Update tests/system/providers/slack/example_sql_to_slack.py Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com> --- tests/system/providers/slack/example_sql_to_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/providers/slack/example_sql_to_slack.py b/tests/system/providers/slack/example_sql_to_slack.py index 24a4c55ab1dbe..d7a64fc5a1c98 100644 --- a/tests/system/providers/slack/example_sql_to_slack.py +++ b/tests/system/providers/slack/example_sql_to_slack.py @@ -25,7 +25,7 @@ from airflow import models from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator -PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table") +SQL_TABLE = os.environ.get("SQL_TABLE", "test_table") SQL_CONN_ID = 'presto_default' ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_sql_to_slack" From 4f67dc19b841e57001ef75a232f7e8e1e221ea33 Mon Sep 17 00:00:00 2001 From: Alex Kruchkov <36231027+alexkruc@users.noreply.github.com> Date: Sun, 26 Jun 2022 10:33:24 +0300 Subject: [PATCH 04/22] Update tests/system/providers/slack/example_sql_to_slack.py Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com> --- tests/system/providers/slack/example_sql_to_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/providers/slack/example_sql_to_slack.py b/tests/system/providers/slack/example_sql_to_slack.py index d7a64fc5a1c98..afdf3bcf62baa 100644 --- a/tests/system/providers/slack/example_sql_to_slack.py +++ b/tests/system/providers/slack/example_sql_to_slack.py @@ -41,7 +41,7 @@ SqlToSlackOperator( task_id="presto_to_slack", sql_conn_id=SQL_CONN_ID, - sql=f"SELECT col FROM {PRESTO_TABLE}", + sql=f"SELECT col FROM {SQL_TABLE}", slack_channel="my_channel", slack_conn_id='slack_default', slack_message="message: {{ ds }}, {{ results_df }}", From dc1acf64d0d85960bbddc6dd74abb1705ed81f8e Mon Sep 17 00:00:00 2001 From: alexkruc Date: Mon, 27 Jun 2022 15:10:44 +0300 Subject: [PATCH 05/22] Changes based on CR --- .../presto/transfers/presto_to_slack.py | 51 ++++++++----- .../providers/slack/transfers/sql_to_slack.py | 8 +- .../snowflake/transfers/snowflake_to_slack.py | 20 ++--- .../operators/sql_to_slack.rst | 2 +- .../presto/transfers/test_presto_to_slack.py | 49 +++++++++++- .../transfers/test_snowflake_to_slack.py | 74 ++++++++++++++----- .../presto/example_presto_to_slack.py | 2 + 7 files changed, 155 insertions(+), 51 deletions(-) diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index 647a8d1ce7d58..0b66405aa2100 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -15,14 +15,15 @@ # specific language governing permissions and limitations # under the License. +import logging import warnings -from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union +from typing import Iterable, Mapping, Optional, Sequence, Union -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowNotFoundException +from airflow.hooks.base import BaseHook from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator -if TYPE_CHECKING: - pass +log = logging.getLogger(__name__) class PrestoToSlackOperator(SqlToSlackOperator): @@ -63,14 +64,13 @@ def __init__( sql: str, slack_message: str, presto_conn_id: str = 'presto_default', - slack_conn_id: Optional[str] = None, + slack_conn_id: str = 'slack_default', results_df_name: str = 'results_df', parameters: Optional[Union[Iterable, Mapping]] = None, slack_token: Optional[str] = None, slack_channel: Optional[str] = None, **kwargs, ) -> None: - self.presto_conn_id = presto_conn_id self.sql = sql self.parameters = parameters @@ -89,15 +89,30 @@ def __init__( stacklevel=2, ) - - super().__init__( - sql=self.sql, - sql_conn_id=self.presto_conn_id, - slack_conn_id=self.slack_conn_id, - slack_webhook_token=self.slack_token, - slack_message=self.slack_message, - slack_channel=self.slack_channel, - results_df_name=self.results_df_name, - parameters=self.parameters, - **kwargs, - ) + try: + # Passing the connection in case it exists + BaseHook.get_connection(self.slack_conn_id) + super().__init__( + sql=self.sql, + sql_conn_id=self.presto_conn_id, + slack_conn_id=self.slack_conn_id, + slack_webhook_token=None, + slack_message=self.slack_message, + slack_channel=self.slack_channel, + results_df_name=self.results_df_name, + parameters=self.parameters, + **kwargs, + ) + except AirflowNotFoundException: + log.info(f"Didn't find the Slack connection {self.slack_conn_id}. Using the webhook instead") + super().__init__( + sql=self.sql, + sql_conn_id=self.presto_conn_id, + slack_conn_id=None, + slack_webhook_token=self.slack_token, + slack_message=self.slack_message, + slack_channel=self.slack_channel, + results_df_name=self.results_df_name, + parameters=self.parameters, + **kwargs, + ) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 0147f87a4de4e..2737907557606 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -49,9 +49,11 @@ class SqlToSlackOperator(BaseOperator): SQL results :param sql_conn_id: Reference to :ref:`Snowflake connection id` + :param sql_hook_params: Extra config params to be passed to the underlying hook. + Should match the desired hook constructor params. :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_webhook_token' :param slack_webhook_token: The token to use to authenticate to Slack. If this is not provided, the - 'slack_conn_id' attribute needs to be specified in the 'Extra' JSON field. + 'slack_conn_id' attribute needs to be specified in the 'password' field. Mutually exclusive with 'slack_conn_id'. :param slack_channel: The channel to send message. Override default from Slack connection. :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' @@ -68,6 +70,7 @@ def __init__( *, sql: str, sql_conn_id: str, + sql_hook_params: Optional[dict] = None, slack_conn_id: Optional[str] = None, slack_webhook_token: Optional[str] = None, slack_channel: Optional[str] = None, @@ -80,6 +83,7 @@ def __init__( super().__init__(**kwargs) self.sql_conn_id = sql_conn_id + self.sql_hook_params = sql_hook_params self.sql = sql self.parameters = parameters self.slack_conn_id = slack_conn_id @@ -100,7 +104,7 @@ def __init__( def _get_hook(self) -> DbApiHook: self.log.debug("Get connection for %s", self.sql_conn_id) conn = BaseHook.get_connection(self.sql_conn_id) - hook = conn.get_hook(hook_params=self.kwargs) + hook = conn.get_hook(hook_params=self.sql_hook_params) if not callable(getattr(hook, 'get_pandas_df', None)): raise AirflowException( "This hook is not supported. The hook class must have get_pandas_df method." diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py index 34d5eb114c5ae..c0d43e8821e73 100644 --- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py +++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py @@ -16,14 +16,10 @@ # under the License. import warnings -from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union +from typing import Iterable, Mapping, Optional, Sequence, Union -from airflow.exceptions import AirflowException from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator -if TYPE_CHECKING: - pass - class SnowflakeToSlackOperator(SqlToSlackOperator): """ @@ -77,7 +73,6 @@ def __init__( slack_token: Optional[str] = None, **kwargs, ) -> None: - self.snowflake_conn_id = snowflake_conn_id self.sql = sql self.parameters = parameters @@ -99,6 +94,14 @@ def __init__( stacklevel=2, ) + hook_params = { + "schema": self.schema, + "role": self.role, + "database": self.database, + "warehouse": self.warehouse, + } + cleaned_hook_params = {k: v for k, v in hook_params.items() if v is not None} + super().__init__( sql=self.sql, sql_conn_id=self.snowflake_conn_id, @@ -107,9 +110,6 @@ def __init__( slack_message=self.slack_message, results_df_name=self.results_df_name, parameters=self.parameters, - role=self.role, - warehouse=self.warehouse, - schema=self.schema, - database=self.database, + sql_hook_params=cleaned_hook_params, **kwargs, ) diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst index 35b31c788c4da..18f62f7616f37 100644 --- a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst +++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst @@ -21,7 +21,7 @@ SqlToSlackOperator ======================== Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post messages to predefined Slack -channels. +channel. Using the Operator ^^^^^^^^^^^^^^^^^^ diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py index 2d5c70cd7f02e..4e0c2c01545d5 100644 --- a/tests/providers/presto/transfers/test_presto_to_slack.py +++ b/tests/providers/presto/transfers/test_presto_to_slack.py @@ -17,9 +17,9 @@ from unittest import mock -from airflow.models import DAG +from airflow.models import DAG, Connection from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator -from airflow.utils import timezone +from airflow.utils import db, timezone from tests.test_utils.db import clear_db_runs TEST_DAG_ID = 'presto_to_slack_unit_test' @@ -43,6 +43,14 @@ def _construct_operator(**kwargs): @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class): + db.merge_conn( + Connection( + conn_id='slack_connection', + conn_type='slackwebhook', + extra='{"webhook_token": "your_token_here"}', + ) + ) + operator_args = { 'presto_conn_id': 'presto_connection', 'slack_conn_id': 'slack_connection', @@ -98,3 +106,40 @@ def test_hooks_and_rendering_with_slack_webhook(self, mock_slack_hook_class): ) slack_webhook_hook.execute.assert_called_once() + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_hooks_and_rendering_with_slack_conn_and_webhook(self, mock_slack_hook_class): + db.merge_conn( + Connection( + conn_id='slack_connection', + conn_type='slackwebhook', + extra='{"webhook_token": "your_token_here"}', + ) + ) + + operator_args = { + 'presto_conn_id': 'presto_connection', + 'slack_conn_id': 'slack_connection', + 'slack_token': 'test_token', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_channel': 'my_channel', + 'dag': self.example_dag, + } + presto_to_slack_operator = self._construct_operator(**operator_args) + mock_dbapi_hook = mock.Mock() + presto_to_slack_operator._get_hook = mock_dbapi_hook + + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = '1234' + + slack_webhook_hook = mock_slack_hook_class.return_value + presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message='message: 2022-01-01, 1234', channel='my_channel' + ) + + slack_webhook_hook.execute.assert_called_once() diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py index 06dbfa082202a..3e3d7b95389d8 100644 --- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py +++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py @@ -60,9 +60,6 @@ def test_hooks_and_rendering(self, mock_slack_hook_class): } snowflake_to_slack_operator = self._construct_operator(**operator_args) - # snowflake_hook = mock_snowflake_hook_class.return_value - # snowflake_hook.get_pandas_df.return_value = '1234' - # slack_webhook_hook = mock_slack_hook_class.return_value mock_dbapi_hook = mock.Mock() snowflake_to_slack_operator._get_hook = mock_dbapi_hook @@ -71,23 +68,64 @@ def test_hooks_and_rendering(self, mock_slack_hook_class): snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - # Test that the Snowflake hook is instantiated with the right parameters - # mock_snowflake_hook_class.assert_called_once_with( - # database='test_database', - # role='test_role', - # schema='test_schema', - # snowflake_conn_id='snowflake_connection', - # warehouse='test_warehouse', - # ) - - # Test that the get_pandas_df method is executed on the Snowflake hook with the pre-rendered sql and - # correct params - # snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3']) - # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( message='message: 2017-01-01, 1234', webhook_token='test_token', channel=None ) - # Test that the Slack hook's execute method gets run once - # slack_webhook_hook.execute.assert_called_once() + def test_hook_params_building(self): + hook_params = { + 'schema': 'test_schema', + 'role': 'test_role', + 'database': 'test_database', + 'warehouse': 'test_warehouse', + } + operator_args = { + 'snowflake_conn_id': 'snowflake_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'warehouse': hook_params['warehouse'], + 'database': hook_params['database'], + 'role': hook_params['role'], + 'schema': hook_params['schema'], + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_token': 'test_token', + 'dag': self.example_dag, + } + snowflake_operator = self._construct_operator(**operator_args) + + assert snowflake_operator.sql_hook_params == hook_params + + def test_partial_hook_params_building(self): + hook_params = {'role': 'test_role', 'database': 'test_database', 'warehouse': 'test_warehouse'} + operator_args = { + 'snowflake_conn_id': 'snowflake_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'warehouse': hook_params['warehouse'], + 'database': hook_params['database'], + 'role': hook_params['role'], + 'schema': None, + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_token': 'test_token', + 'dag': self.example_dag, + } + snowflake_operator = self._construct_operator(**operator_args) + + assert snowflake_operator.sql_hook_params == hook_params + + def test_no_hook_params_building(self): + operator_args = { + 'snowflake_conn_id': 'snowflake_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_token': 'test_token', + 'dag': self.example_dag, + } + snowflake_operator = self._construct_operator(**operator_args) + + assert snowflake_operator.sql_hook_params == {} diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/presto/example_presto_to_slack.py index 91ab9c42e6f99..b888f6bb93bd3 100644 --- a/tests/system/providers/presto/example_presto_to_slack.py +++ b/tests/system/providers/presto/example_presto_to_slack.py @@ -28,6 +28,7 @@ PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table") ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_presto_to_slack" +SLACK_CONN_ID = 'my_slack_conn' with models.DAG( dag_id=DAG_ID, @@ -39,6 +40,7 @@ # [START howto_operator_presto_to_slack] PrestoToSlackOperator( task_id="presto_to_slack", + slack_conn_id=SLACK_CONN_ID, sql=f"SELECT col FROM {PRESTO_TABLE}", slack_channel="my_channel", slack_message="message: {{ ds }}, {{ results_df }}", From 65896826c2c5a762ed94e34d835b40dda81a44b2 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Mon, 27 Jun 2022 15:23:29 +0300 Subject: [PATCH 06/22] removing the logger initialization, using `self.log` --- airflow/providers/presto/transfers/presto_to_slack.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index 0b66405aa2100..8295964cbce41 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. -import logging import warnings from typing import Iterable, Mapping, Optional, Sequence, Union @@ -23,8 +22,6 @@ from airflow.hooks.base import BaseHook from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator -log = logging.getLogger(__name__) - class PrestoToSlackOperator(SqlToSlackOperator): """ @@ -104,7 +101,8 @@ def __init__( **kwargs, ) except AirflowNotFoundException: - log.info(f"Didn't find the Slack connection {self.slack_conn_id}. Using the webhook instead") + self.log.info( + f"Didn't find the Slack connection '{self.slack_conn_id}'. Using the webhook instead") super().__init__( sql=self.sql, sql_conn_id=self.presto_conn_id, From aac9fe10092b3037151e87cd7e719a6ce4e652ef Mon Sep 17 00:00:00 2001 From: alexkruc Date: Mon, 27 Jun 2022 15:39:26 +0300 Subject: [PATCH 07/22] removing typo in `PrestoToSlackOperator` documentation --- airflow/providers/presto/transfers/presto_to_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index 8295964cbce41..96986977bbc4c 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -45,7 +45,7 @@ class PrestoToSlackOperator(SqlToSlackOperator): :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query :param slack_token: The token to use to authenticate to Slack. If this is not provided, the - 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id.py + 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id Mutually exclusive with 'slack_conn_id' :param slack_channel: The channel to send message. Override default from Slack connection. """ From 4bb47dba08208aaf5ab528e4704bf85d472e0384 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Mon, 27 Jun 2022 15:56:54 +0300 Subject: [PATCH 08/22] removing mutual exclusive comment in `PrestoToSlackOperator` Slack params --- airflow/providers/presto/transfers/presto_to_slack.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index 96986977bbc4c..6ead036eee358 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -41,12 +41,11 @@ class PrestoToSlackOperator(SqlToSlackOperator): You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the SQL results :param presto_conn_id: destination presto connection - :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_token' + :param slack_conn_id: The connection id for Slack :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query :param slack_token: The token to use to authenticate to Slack. If this is not provided, the 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id - Mutually exclusive with 'slack_conn_id' :param slack_channel: The channel to send message. Override default from Slack connection. """ From 0cbf0d274688d5befcde263d70677a1b25645a2d Mon Sep 17 00:00:00 2001 From: alexkruc Date: Mon, 27 Jun 2022 17:50:20 +0300 Subject: [PATCH 09/22] Fixing CI tests --- airflow/providers/presto/transfers/presto_to_slack.py | 3 ++- airflow/providers/slack/transfers/sql_to_slack.py | 2 +- .../operators/{ => transfer}/sql_to_slack.rst | 0 tests/system/providers/presto/example_presto_to_slack.py | 4 ++-- 4 files changed, 5 insertions(+), 4 deletions(-) rename docs/apache-airflow-providers-slack/operators/{ => transfer}/sql_to_slack.rst (100%) diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index 6ead036eee358..ad9c66de2637d 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -101,7 +101,8 @@ def __init__( ) except AirflowNotFoundException: self.log.info( - f"Didn't find the Slack connection '{self.slack_conn_id}'. Using the webhook instead") + f"Didn't find the Slack connection '{self.slack_conn_id}'. Using the webhook instead" + ) super().__init__( sql=self.sql, sql_conn_id=self.presto_conn_id, diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 2737907557606..f1c555c4fadf7 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -54,7 +54,7 @@ class SqlToSlackOperator(BaseOperator): :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_webhook_token' :param slack_webhook_token: The token to use to authenticate to Slack. If this is not provided, the 'slack_conn_id' attribute needs to be specified in the 'password' field. - Mutually exclusive with 'slack_conn_id'. + Mutually exclusive with 'slack_conn_id'. :param slack_channel: The channel to send message. Override default from Slack connection. :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst b/docs/apache-airflow-providers-slack/operators/transfer/sql_to_slack.rst similarity index 100% rename from docs/apache-airflow-providers-slack/operators/sql_to_slack.rst rename to docs/apache-airflow-providers-slack/operators/transfer/sql_to_slack.rst diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/presto/example_presto_to_slack.py index b888f6bb93bd3..dc87c831b4983 100644 --- a/tests/system/providers/presto/example_presto_to_slack.py +++ b/tests/system/providers/presto/example_presto_to_slack.py @@ -28,7 +28,7 @@ PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table") ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_presto_to_slack" -SLACK_CONN_ID = 'my_slack_conn' +SLACK_CONN_WEBHOOK = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX' with models.DAG( dag_id=DAG_ID, @@ -40,7 +40,7 @@ # [START howto_operator_presto_to_slack] PrestoToSlackOperator( task_id="presto_to_slack", - slack_conn_id=SLACK_CONN_ID, + slack_token=SLACK_CONN_WEBHOOK, sql=f"SELECT col FROM {PRESTO_TABLE}", slack_channel="my_channel", slack_message="message: {{ ds }}, {{ results_df }}", From e0c242111b923814d8efbe65cc40d5a70b6ce2d6 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Tue, 28 Jun 2022 11:07:27 +0300 Subject: [PATCH 10/22] changed slackhook related behaviour --- .../presto/transfers/presto_to_slack.py | 42 ++--- .../providers/slack/transfers/sql_to_slack.py | 24 +-- .../snowflake/transfers/snowflake_to_slack.py | 2 +- .../presto/transfers/test_presto_to_slack.py | 56 +------ .../slack/transfers/test_sql_to_slack.py | 144 +++++++++++++++++- .../transfers/test_snowflake_to_slack.py | 5 +- 6 files changed, 171 insertions(+), 102 deletions(-) diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py index ad9c66de2637d..bcf2c3177a6a5 100644 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -18,8 +18,6 @@ import warnings from typing import Iterable, Mapping, Optional, Sequence, Union -from airflow.exceptions import AirflowNotFoundException -from airflow.hooks.base import BaseHook from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator @@ -85,32 +83,14 @@ def __init__( stacklevel=2, ) - try: - # Passing the connection in case it exists - BaseHook.get_connection(self.slack_conn_id) - super().__init__( - sql=self.sql, - sql_conn_id=self.presto_conn_id, - slack_conn_id=self.slack_conn_id, - slack_webhook_token=None, - slack_message=self.slack_message, - slack_channel=self.slack_channel, - results_df_name=self.results_df_name, - parameters=self.parameters, - **kwargs, - ) - except AirflowNotFoundException: - self.log.info( - f"Didn't find the Slack connection '{self.slack_conn_id}'. Using the webhook instead" - ) - super().__init__( - sql=self.sql, - sql_conn_id=self.presto_conn_id, - slack_conn_id=None, - slack_webhook_token=self.slack_token, - slack_message=self.slack_message, - slack_channel=self.slack_channel, - results_df_name=self.results_df_name, - parameters=self.parameters, - **kwargs, - ) + super().__init__( + sql=self.sql, + sql_conn_id=self.presto_conn_id, + slack_conn_id=self.slack_conn_id, + slack_webhook_token=self.slack_token, + slack_message=self.slack_message, + slack_channel=self.slack_channel, + results_df_name=self.results_df_name, + parameters=self.parameters, + **kwargs, + ) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index f1c555c4fadf7..f436e0fe2c0b6 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -51,10 +51,9 @@ class SqlToSlackOperator(BaseOperator): :ref:`Snowflake connection id` :param sql_hook_params: Extra config params to be passed to the underlying hook. Should match the desired hook constructor params. - :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_webhook_token' + :param slack_conn_id: The connection id for Slack. :param slack_webhook_token: The token to use to authenticate to Slack. If this is not provided, the 'slack_conn_id' attribute needs to be specified in the 'password' field. - Mutually exclusive with 'slack_conn_id'. :param slack_channel: The channel to send message. Override default from Slack connection. :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query @@ -71,7 +70,7 @@ def __init__( sql: str, sql_conn_id: str, sql_hook_params: Optional[dict] = None, - slack_conn_id: Optional[str] = None, + slack_conn_id: str = 'slack_default', slack_webhook_token: Optional[str] = None, slack_channel: Optional[str] = None, slack_message: str, @@ -98,9 +97,6 @@ def __init__( "SqlToSlackOperator requires either a `slack_conn_id` or a `slack_webhook_token` argument" ) - if self.slack_conn_id and self.slack_webhook_token: - raise AirflowException("Cannot pass both `slack_conn_id` and `slack_webhook_token` arguments") - def _get_hook(self) -> DbApiHook: self.log.debug("Get connection for %s", self.sql_conn_id) conn = BaseHook.get_connection(self.sql_conn_id) @@ -128,16 +124,12 @@ def _render_and_send_slack_message(self, context, df) -> None: slack_hook.execute() def _get_slack_hook(self) -> SlackWebhookHook: - if self.slack_conn_id: - return SlackWebhookHook( - http_conn_id=self.slack_conn_id, message=self.slack_message, channel=self.slack_channel - ) - elif self.slack_webhook_token: - return SlackWebhookHook( - message=self.slack_message, webhook_token=self.slack_webhook_token, channel=self.slack_channel - ) - else: - raise AirflowException("Could not initiate SlackWebhookHook") + return SlackWebhookHook( + http_conn_id=self.slack_conn_id, + message=self.slack_message, + channel=self.slack_channel, + webhook_token=self.slack_webhook_token, + ) def render_template_fields(self, context, jinja_env=None) -> None: # If this is the first render of the template fields, exclude slack_message from rendering since diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py index c0d43e8821e73..ebda1dfb146e8 100644 --- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py +++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py @@ -63,7 +63,7 @@ def __init__( sql: str, slack_message: str, snowflake_conn_id: str = 'snowflake_default', - slack_conn_id: Optional[str] = None, + slack_conn_id: str = 'slack_default', results_df_name: str = 'results_df', parameters: Optional[Union[Iterable, Mapping]] = None, warehouse: Optional[str] = None, diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py index 4e0c2c01545d5..bcc5e82a8a9d2 100644 --- a/tests/providers/presto/transfers/test_presto_to_slack.py +++ b/tests/providers/presto/transfers/test_presto_to_slack.py @@ -17,9 +17,9 @@ from unittest import mock -from airflow.models import DAG, Connection +from airflow.models import DAG from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator -from airflow.utils import db, timezone +from airflow.utils import timezone from tests.test_utils.db import clear_db_runs TEST_DAG_ID = 'presto_to_slack_unit_test' @@ -43,14 +43,6 @@ def _construct_operator(**kwargs): @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class): - db.merge_conn( - Connection( - conn_id='slack_connection', - conn_type='slackwebhook', - extra='{"webhook_token": "your_token_here"}', - ) - ) - operator_args = { 'presto_conn_id': 'presto_connection', 'slack_conn_id': 'slack_connection', @@ -72,51 +64,16 @@ def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class): presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message='message: 2022-01-01, 1234', channel='my_channel' - ) - - slack_webhook_hook.execute.assert_called_once() - - @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') - def test_hooks_and_rendering_with_slack_webhook(self, mock_slack_hook_class): - operator_args = { - 'presto_conn_id': 'presto_connection', - 'sql': "sql {{ ds }}", - 'results_df_name': 'xxxx', - 'parameters': ['1', '2', '3'], - 'slack_message': 'message: {{ ds }}, {{ xxxx }}', - 'slack_token': 'test_token', - 'slack_channel': 'my_channel', - 'dag': self.example_dag, - } - presto_to_slack_operator = self._construct_operator(**operator_args) - mock_dbapi_hook = mock.Mock() - presto_to_slack_operator._get_hook = mock_dbapi_hook - - get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df - get_pandas_df_mock.return_value = '1234' - - slack_webhook_hook = mock_slack_hook_class.return_value - presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', message='message: 2022-01-01, 1234', - webhook_token='test_token', channel='my_channel', + webhook_token=None, ) slack_webhook_hook.execute.assert_called_once() @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_hooks_and_rendering_with_slack_conn_and_webhook(self, mock_slack_hook_class): - db.merge_conn( - Connection( - conn_id='slack_connection', - conn_type='slackwebhook', - extra='{"webhook_token": "your_token_here"}', - ) - ) - operator_args = { 'presto_conn_id': 'presto_connection', 'slack_conn_id': 'slack_connection', @@ -139,7 +96,10 @@ def test_hooks_and_rendering_with_slack_conn_and_webhook(self, mock_slack_hook_c presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message='message: 2022-01-01, 1234', channel='my_channel' + http_conn_id='slack_connection', + message='message: 2022-01-01, 1234', + channel='my_channel', + webhook_token='test_token', ) slack_webhook_hook.execute.assert_called_once() diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index 3b8e2373b1402..2048009efe763 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -21,7 +21,7 @@ import pytest from airflow.exceptions import AirflowException -from airflow.models import DAG +from airflow.models import DAG, Connection from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator from airflow.utils import timezone @@ -63,20 +63,86 @@ def test_rendering_and_message_execution(self, mock_slack_hook_class): # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message=f'message: 2017-01-01, {test_df}', channel='#test' + http_conn_id='slack_connection', + message=f'message: 2017-01-01, {test_df}', + channel='#test', + webhook_token=None, ) # Test that the Slack hook's execute method gets run once slack_webhook_hook.execute.assert_called_once() @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') - def test_duplicated_slack_parameters_provided_exception_thrown(self, mock_slack_hook_class): + def test_rendering_and_message_execution_with_default_slack(self, mock_slack_hook_class): + mock_dbapi_hook = mock.Mock() + + test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = test_df + + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_message': 'message: {{ ds }}, {{ results_df }}', + 'slack_channel': '#test', + 'sql': "sql {{ ds }}", + 'dag': self.example_dag, + } + sql_to_slack_operator = self._construct_operator(**operator_args) + + slack_webhook_hook = mock_slack_hook_class.return_value + sql_to_slack_operator._get_hook = mock_dbapi_hook + sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # Test that the Slack hook is instantiated with the right parameters + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_default', + message=f'message: 2017-01-01, {test_df}', + channel='#test', + webhook_token=None, + ) + + # Test that the Slack hook's execute method gets run once + slack_webhook_hook.execute.assert_called_once() + + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') + def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_class): + mock_dbapi_hook = mock.Mock() + + test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) + get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df + get_pandas_df_mock.return_value = test_df + operator_args = { 'sql_conn_id': 'snowflake_connection', 'slack_conn_id': 'slack_connection', + 'slack_webhook_token': 'test_token', + 'slack_message': 'message: {{ ds }}, {{ results_df }}', + 'slack_channel': '#test', + 'sql': "sql {{ ds }}", + 'dag': self.example_dag, + } + sql_to_slack_operator = self._construct_operator(**operator_args) + + slack_webhook_hook = mock_slack_hook_class.return_value + sql_to_slack_operator._get_hook = mock_dbapi_hook + sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # Test that the Slack hook is instantiated with the right parameters + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', + message=f'message: 2017-01-01, {test_df}', + channel='#test', + webhook_token='test_token', + ) + + # Test that the Slack hook's execute method gets run once + slack_webhook_hook.execute.assert_called_once() + + def test_non_existing_slack_parameters_provided_exception_thrown(self): + operator_args = { + 'sql_conn_id': 'snowflake_connection', 'slack_message': 'message: {{ ds }}, {{ xxxx }}', 'sql': "sql {{ ds }}", - 'slack_webhook_token': 'test_token', } with pytest.raises(AirflowException): self._construct_operator(**operator_args) @@ -106,8 +172,76 @@ def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class) # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', message=f'message: 2017-01-01, {test_df}', channel='#test' + http_conn_id='slack_connection', + message=f'message: 2017-01-01, {test_df}', + channel='#test', + webhook_token=None, ) # Test that the Slack hook's execute method gets run once slack_webhook_hook.execute.assert_called_once() + + @mock.patch('airflow.operators.sql.BaseHook.get_connection') + def test_hook_params_building(self, mock_get_conn): + mock_get_conn.return_value = Connection(conn_id='snowflake_connection', conn_type='snowflake') + hook_params = { + 'schema': 'test_schema', + 'role': 'test_role', + 'database': 'test_database', + 'warehouse': 'test_warehouse', + } + operator_args = { + 'sql_conn_id': 'dummy_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'sql_hook_params': hook_params, + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_webhook_token': 'test_token', + 'dag': self.example_dag, + } + sql_to_slack_operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **operator_args) + + assert sql_to_slack_operator.sql_hook_params == hook_params + + @mock.patch('airflow.operators.sql.BaseHook.get_connection') + def test_hook_params(self, mock_get_conn): + mock_get_conn.return_value = Connection(conn_id='postgres_test', conn_type='postgres') + op = SqlToSlackOperator( + task_id='sql_sensor_hook_params', + sql_conn_id='postgres_test', + sql="SELECT 1", + slack_message='message: {{ ds }}, {{ xxxx }}', + sql_hook_params={ + 'schema': 'public', + }, + ) + hook = op._get_hook() + assert hook.schema == 'public' + + @mock.patch('airflow.operators.sql.BaseHook.get_connection') + def test_hook_params_snowflake(self, mock_get_conn): + mock_get_conn.return_value = Connection(conn_id='snowflake_default', conn_type='snowflake') + hook_params = { + 'warehouse': 'warehouse', + 'database': 'database', + 'role': 'role', + 'schema': 'schema', + } + operator_args = { + 'sql_conn_id': 'dummy_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'sql_hook_params': hook_params, + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_webhook_token': 'test_token', + 'dag': self.example_dag, + } + sql_to_slack_operator = self._construct_operator(**operator_args) + + assert sql_to_slack_operator._get_hook.conn_type == 'snowflake' + assert sql_to_slack_operator._get_hook.warehouse == 'warehouse' + assert sql_to_slack_operator._get_hook.database == 'database' + assert sql_to_slack_operator._get_hook.role == 'role' + assert sql_to_slack_operator._get_hook.schema == 'schema' diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py index 3e3d7b95389d8..8973d1701f149 100644 --- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py +++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py @@ -70,7 +70,10 @@ def test_hooks_and_rendering(self, mock_slack_hook_class): # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - message='message: 2017-01-01, 1234', webhook_token='test_token', channel=None + message='message: 2017-01-01, 1234', + webhook_token='test_token', + channel=None, + http_conn_id='slack_default', ) def test_hook_params_building(self): From 9607fcde3edfc5575c597c99eb3245c73aa090b1 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Tue, 28 Jun 2022 12:17:49 +0300 Subject: [PATCH 11/22] fixing ci tests --- docs/apache-airflow-providers-slack/index.rst | 2 +- .../operators/{transfer => }/sql_to_slack.rst | 0 tests/providers/slack/transfers/test_sql_to_slack.py | 10 ---------- 3 files changed, 1 insertion(+), 11 deletions(-) rename docs/apache-airflow-providers-slack/operators/{transfer => }/sql_to_slack.rst (100%) diff --git a/docs/apache-airflow-providers-slack/index.rst b/docs/apache-airflow-providers-slack/index.rst index 354219e02bb00..eb833733a8fef 100644 --- a/docs/apache-airflow-providers-slack/index.rst +++ b/docs/apache-airflow-providers-slack/index.rst @@ -36,7 +36,7 @@ Content :maxdepth: 1 :caption: Guides - SqlToSlackOperator types + SqlToSlackOperator types .. toctree:: :maxdepth: 1 diff --git a/docs/apache-airflow-providers-slack/operators/transfer/sql_to_slack.rst b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst similarity index 100% rename from docs/apache-airflow-providers-slack/operators/transfer/sql_to_slack.rst rename to docs/apache-airflow-providers-slack/operators/sql_to_slack.rst diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index 2048009efe763..9324a394261ce 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -138,15 +138,6 @@ def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_c # Test that the Slack hook's execute method gets run once slack_webhook_hook.execute.assert_called_once() - def test_non_existing_slack_parameters_provided_exception_thrown(self): - operator_args = { - 'sql_conn_id': 'snowflake_connection', - 'slack_message': 'message: {{ ds }}, {{ xxxx }}', - 'sql': "sql {{ ds }}", - } - with pytest.raises(AirflowException): - self._construct_operator(**operator_args) - @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class): mock_dbapi_hook = mock.Mock() @@ -240,7 +231,6 @@ def test_hook_params_snowflake(self, mock_get_conn): } sql_to_slack_operator = self._construct_operator(**operator_args) - assert sql_to_slack_operator._get_hook.conn_type == 'snowflake' assert sql_to_slack_operator._get_hook.warehouse == 'warehouse' assert sql_to_slack_operator._get_hook.database == 'database' assert sql_to_slack_operator._get_hook.role == 'role' From 3247f09116769c9a3c7413a2a975f394e8f9d541 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Tue, 28 Jun 2022 13:13:34 +0300 Subject: [PATCH 12/22] removing unused imports and fixing conn_type in tests --- tests/providers/slack/transfers/test_sql_to_slack.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index 9324a394261ce..a45a36828d560 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -18,9 +18,7 @@ from unittest import mock import pandas as pd -import pytest -from airflow.exceptions import AirflowException from airflow.models import DAG, Connection from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator from airflow.utils import timezone @@ -220,7 +218,7 @@ def test_hook_params_snowflake(self, mock_get_conn): 'schema': 'schema', } operator_args = { - 'sql_conn_id': 'dummy_connection', + 'sql_conn_id': 'snowflake_default', 'sql': "sql {{ ds }}", 'results_df_name': 'xxxx', 'sql_hook_params': hook_params, From a8031d0f3558191a9b1a1f4184e2088141e84b36 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Tue, 28 Jun 2022 15:49:13 +0300 Subject: [PATCH 13/22] fix ci operator params --- .../slack/transfers/test_sql_to_slack.py | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index a45a36828d560..7ec56b3082fe4 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -197,7 +197,7 @@ def test_hook_params_building(self, mock_get_conn): def test_hook_params(self, mock_get_conn): mock_get_conn.return_value = Connection(conn_id='postgres_test', conn_type='postgres') op = SqlToSlackOperator( - task_id='sql_sensor_hook_params', + task_id='sql_hook_params', sql_conn_id='postgres_test', sql="SELECT 1", slack_message='message: {{ ds }}, {{ xxxx }}', @@ -211,25 +211,22 @@ def test_hook_params(self, mock_get_conn): @mock.patch('airflow.operators.sql.BaseHook.get_connection') def test_hook_params_snowflake(self, mock_get_conn): mock_get_conn.return_value = Connection(conn_id='snowflake_default', conn_type='snowflake') - hook_params = { - 'warehouse': 'warehouse', - 'database': 'database', - 'role': 'role', - 'schema': 'schema', - } - operator_args = { - 'sql_conn_id': 'snowflake_default', - 'sql': "sql {{ ds }}", - 'results_df_name': 'xxxx', - 'sql_hook_params': hook_params, - 'parameters': ['1', '2', '3'], - 'slack_message': 'message: {{ ds }}, {{ xxxx }}', - 'slack_webhook_token': 'test_token', - 'dag': self.example_dag, - } - sql_to_slack_operator = self._construct_operator(**operator_args) + op = SqlToSlackOperator( + task_id='snowflake_hook_params', + sql_conn_id='snowflake_default', + results_df_name='xxxx', + sql="SELECT 1", + slack_message='message: {{ ds }}, {{ xxxx }}', + sql_hook_params={ + 'warehouse': 'warehouse', + 'database': 'database', + 'role': 'role', + 'schema': 'schema', + }, + ) + hook = op._get_hook() - assert sql_to_slack_operator._get_hook.warehouse == 'warehouse' - assert sql_to_slack_operator._get_hook.database == 'database' - assert sql_to_slack_operator._get_hook.role == 'role' - assert sql_to_slack_operator._get_hook.schema == 'schema' + assert hook.warehouse == 'warehouse' + assert hook.database == 'database' + assert hook.role == 'role' + assert hook.schema == 'schema' From de63ee0614731e8a379e50183a14d86ddc8be6f8 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Tue, 28 Jun 2022 16:58:52 +0300 Subject: [PATCH 14/22] removing mutually exclusive comments for slack connection --- airflow/providers/snowflake/transfers/snowflake_to_slack.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py index ebda1dfb146e8..29e0ccbd23b14 100644 --- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py +++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py @@ -40,7 +40,7 @@ class SnowflakeToSlackOperator(SqlToSlackOperator): SQL results :param snowflake_conn_id: Reference to :ref:`Snowflake connection id` - :param slack_conn_id: The connection id for Slack. Mutually exclusive with 'slack_token' + :param slack_conn_id: The connection id for Slack. :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' :param parameters: The parameters to pass to the SQL query :param warehouse: The Snowflake virtual warehouse to use to run the SQL query @@ -49,7 +49,6 @@ class SnowflakeToSlackOperator(SqlToSlackOperator): :param role: The role to use when connecting to Snowflake :param slack_token: The token to use to authenticate to Slack. If this is not provided, the 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id. - Mutually exclusive with 'slack_conn_id' """ template_fields: Sequence[str] = ('sql', 'slack_message') From 387f4789e9b58d56407f0feb6d37d403fff38bc1 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Tue, 28 Jun 2022 22:32:46 +0300 Subject: [PATCH 15/22] backporting connection.get_hook method and removing default slack_conn for `SqlToS3Operator` --- .../providers/slack/transfers/sql_to_slack.py | 41 +++++++++++++++++-- .../slack/transfers/test_sql_to_slack.py | 32 --------------- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index f436e0fe2c0b6..c94bd3d799add 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +import warnings from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union from pandas import DataFrame @@ -25,11 +25,39 @@ from airflow.hooks.dbapi import DbApiHook from airflow.models import BaseOperator from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook +from airflow.providers_manager import ProvidersManager +from airflow.utils.module_loading import import_string +from airflow.version import version if TYPE_CHECKING: from airflow.utils.context import Context +def _backported_get_hook(connection, *, hook_params=None): + """Return hook based on conn_type + For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed + when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. Git reference: + https://github.com/apache/airflow/blob/main/airflow/providers/slack/provider.yaml#L38 + """ + hook = ProvidersManager().hooks.get(connection.conn_type, None) + + if hook is None: + raise AirflowException(f'Unknown hook type "{connection.conn_type}"') + try: + hook_class = import_string(hook.hook_class_name) + except ImportError: + warnings.warn( + "Could not import %s when discovering %s %s", + hook.hook_class_name, + hook.hook_name, + hook.package_name, + ) + raise + if hook_params is None: + hook_params = {} + return hook_class(**{hook.connection_id_attribute_name: connection.conn_id}, **hook_params) + + class SqlToSlackOperator(BaseOperator): """ Executes an SQL statement in a given SQL connection and sends the results to Slack. The results of the @@ -70,7 +98,7 @@ def __init__( sql: str, sql_conn_id: str, sql_hook_params: Optional[dict] = None, - slack_conn_id: str = 'slack_default', + slack_conn_id: Optional[str] = None, slack_webhook_token: Optional[str] = None, slack_channel: Optional[str] = None, slack_message: str, @@ -100,7 +128,14 @@ def __init__( def _get_hook(self) -> DbApiHook: self.log.debug("Get connection for %s", self.sql_conn_id) conn = BaseHook.get_connection(self.sql_conn_id) - hook = conn.get_hook(hook_params=self.sql_hook_params) + if version >= '2.3': + # "hook_params" were introduced to into "get_hook()" only in Airflow 2.3. + hook = conn.get_hook(hook_params=self.sql_hook_params) + else: + # For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed + # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. Git reference: + # https://github.com/apache/airflow/blob/main/airflow/providers/slack/provider.yaml#L38 + hook = _backported_get_hook(conn) if not callable(getattr(hook, 'get_pandas_df', None)): raise AirflowException( "This hook is not supported. The hook class must have get_pandas_df method." diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index 7ec56b3082fe4..4dec58225889c 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -70,38 +70,6 @@ def test_rendering_and_message_execution(self, mock_slack_hook_class): # Test that the Slack hook's execute method gets run once slack_webhook_hook.execute.assert_called_once() - @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') - def test_rendering_and_message_execution_with_default_slack(self, mock_slack_hook_class): - mock_dbapi_hook = mock.Mock() - - test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1]) - get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df - get_pandas_df_mock.return_value = test_df - - operator_args = { - 'sql_conn_id': 'snowflake_connection', - 'slack_message': 'message: {{ ds }}, {{ results_df }}', - 'slack_channel': '#test', - 'sql': "sql {{ ds }}", - 'dag': self.example_dag, - } - sql_to_slack_operator = self._construct_operator(**operator_args) - - slack_webhook_hook = mock_slack_hook_class.return_value - sql_to_slack_operator._get_hook = mock_dbapi_hook - sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - # Test that the Slack hook is instantiated with the right parameters - mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_default', - message=f'message: 2017-01-01, {test_df}', - channel='#test', - webhook_token=None, - ) - - # Test that the Slack hook's execute method gets run once - slack_webhook_hook.execute.assert_called_once() - @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_class): mock_dbapi_hook = mock.Mock() From fe59554a2c2670badf7768f4073aa9c8c2651783 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Tue, 28 Jun 2022 23:00:20 +0300 Subject: [PATCH 16/22] adding sql_hook_params to the backport get_hook method --- airflow/providers/slack/transfers/sql_to_slack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index c94bd3d799add..6249d784d2222 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -135,7 +135,7 @@ def _get_hook(self) -> DbApiHook: # For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. Git reference: # https://github.com/apache/airflow/blob/main/airflow/providers/slack/provider.yaml#L38 - hook = _backported_get_hook(conn) + hook = _backported_get_hook(conn, hook_params=self.sql_hook_params) if not callable(getattr(hook, 'get_pandas_df', None)): raise AirflowException( "This hook is not supported. The hook class must have get_pandas_df method." From d6cc88a3e4ecd2f0da679bf66fb6986d975a6f2c Mon Sep 17 00:00:00 2001 From: Alex Kruchkov <36231027+alexkruc@users.noreply.github.com> Date: Tue, 28 Jun 2022 23:02:10 +0300 Subject: [PATCH 17/22] Update airflow/providers/slack/transfers/sql_to_slack.py Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com> --- airflow/providers/slack/transfers/sql_to_slack.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 6249d784d2222..57137cc85a642 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -36,8 +36,7 @@ def _backported_get_hook(connection, *, hook_params=None): """Return hook based on conn_type For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed - when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. Git reference: - https://github.com/apache/airflow/blob/main/airflow/providers/slack/provider.yaml#L38 + when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. """ hook = ProvidersManager().hooks.get(connection.conn_type, None) From 748eabcf9692e476e8a2b93c6ce4388eebc2877c Mon Sep 17 00:00:00 2001 From: Alex Kruchkov <36231027+alexkruc@users.noreply.github.com> Date: Tue, 28 Jun 2022 23:02:27 +0300 Subject: [PATCH 18/22] Update airflow/providers/slack/transfers/sql_to_slack.py Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com> --- airflow/providers/slack/transfers/sql_to_slack.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 57137cc85a642..0dc3057e5c309 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -132,8 +132,7 @@ def _get_hook(self) -> DbApiHook: hook = conn.get_hook(hook_params=self.sql_hook_params) else: # For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed - # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. Git reference: - # https://github.com/apache/airflow/blob/main/airflow/providers/slack/provider.yaml#L38 + # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. hook = _backported_get_hook(conn, hook_params=self.sql_hook_params) if not callable(getattr(hook, 'get_pandas_df', None)): raise AirflowException( From bf522b3ca4944f9a1ab1b1e66ada6c8dca0cbfbc Mon Sep 17 00:00:00 2001 From: alexkruc Date: Wed, 29 Jun 2022 09:14:50 +0300 Subject: [PATCH 19/22] testing exception thrown --- tests/providers/slack/transfers/test_sql_to_slack.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index 4dec58225889c..f02e9e54a48e2 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -18,7 +18,9 @@ from unittest import mock import pandas as pd +import pytest +from airflow import AirflowException from airflow.models import DAG, Connection from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator from airflow.utils import timezone @@ -104,6 +106,15 @@ def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_c # Test that the Slack hook's execute method gets run once slack_webhook_hook.execute.assert_called_once() + def test_non_existing_slack_parameters_provided_exception_thrown(self): + operator_args = { + 'sql_conn_id': 'snowflake_connection', + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'sql': "sql {{ ds }}", + } + with pytest.raises(AirflowException): + self._construct_operator(**operator_args) + @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class): mock_dbapi_hook = mock.Mock() From f33699c1e905169384319f77070f5474682719d8 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Wed, 29 Jun 2022 10:34:11 +0300 Subject: [PATCH 20/22] fix indentation and ci tests --- airflow/providers/slack/transfers/sql_to_slack.py | 4 ++-- tests/providers/slack/transfers/test_sql_to_slack.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 0dc3057e5c309..5735f70e6c15a 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -35,8 +35,8 @@ def _backported_get_hook(connection, *, hook_params=None): """Return hook based on conn_type - For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed - when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. + For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed + when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. """ hook = ProvidersManager().hooks.get(connection.conn_type, None) diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index f02e9e54a48e2..0390a56b18cdb 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -178,6 +178,7 @@ def test_hook_params(self, mock_get_conn): op = SqlToSlackOperator( task_id='sql_hook_params', sql_conn_id='postgres_test', + slack_webhook_token='slack_token', sql="SELECT 1", slack_message='message: {{ ds }}, {{ xxxx }}', sql_hook_params={ @@ -193,6 +194,7 @@ def test_hook_params_snowflake(self, mock_get_conn): op = SqlToSlackOperator( task_id='snowflake_hook_params', sql_conn_id='snowflake_default', + slack_conn_id='slack_default', results_df_name='xxxx', sql="SELECT 1", slack_message='message: {{ ds }}, {{ xxxx }}', From f3a29fb07754e6edaea94ff3e7f81817933958a1 Mon Sep 17 00:00:00 2001 From: alexkruc Date: Wed, 29 Jun 2022 11:39:11 +0300 Subject: [PATCH 21/22] adding skip tests for pre-commit hooks on 2.2 comp --- airflow/providers/slack/transfers/sql_to_slack.py | 2 +- scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 5735f70e6c15a..602f92b70c8c8 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -129,7 +129,7 @@ def _get_hook(self) -> DbApiHook: conn = BaseHook.get_connection(self.sql_conn_id) if version >= '2.3': # "hook_params" were introduced to into "get_hook()" only in Airflow 2.3. - hook = conn.get_hook(hook_params=self.sql_hook_params) + hook = conn.get_hook(hook_params=self.sql_hook_params) # ignore 2.2 comp check else: # For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. diff --git a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py index 5c3c6dd89be47..54c8b1c7b2f2a 100755 --- a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py +++ b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py @@ -28,11 +28,11 @@ f"To run this script, run the ./{__file__} command [FILE] ..." ) - console = Console(color_system="standard", width=200) errors: List[str] = [] +SKIP_COMP_CHECK = "# ignore 2.2 comp check" TRY_NUM_MATCHER = re.compile(r".*context.*\[[\"']try_number[\"']].*") GET_MANDATORY_MATCHER = re.compile(r".*conf\.get_mandatory_value") GET_AIRFLOW_APP_MATCHER = re.compile(r".*get_airflow_app\(\)") @@ -43,6 +43,9 @@ def _check_file(_file: Path): lines = _file.read_text().splitlines() for index, line in enumerate(lines): + if SKIP_COMP_CHECK in line: + continue + if "XCom.get_value(" in line: if "if ti_key is not None:" not in lines[index - 1]: errors.append( From efd07a59159b4be94988b174b99af93557fad8ba Mon Sep 17 00:00:00 2001 From: alexkruc Date: Wed, 29 Jun 2022 14:08:06 +0300 Subject: [PATCH 22/22] changed ignore compatibility check comment name --- airflow/providers/slack/transfers/sql_to_slack.py | 2 +- scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 602f92b70c8c8..24a3ed93c672d 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -129,7 +129,7 @@ def _get_hook(self) -> DbApiHook: conn = BaseHook.get_connection(self.sql_conn_id) if version >= '2.3': # "hook_params" were introduced to into "get_hook()" only in Airflow 2.3. - hook = conn.get_hook(hook_params=self.sql_hook_params) # ignore 2.2 comp check + hook = conn.get_hook(hook_params=self.sql_hook_params) # ignore airflow compat check else: # For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3. diff --git a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py index 54c8b1c7b2f2a..d4fe0ea78d24a 100755 --- a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py +++ b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py @@ -32,7 +32,7 @@ errors: List[str] = [] -SKIP_COMP_CHECK = "# ignore 2.2 comp check" +SKIP_COMP_CHECK = "# ignore airflow compat check" TRY_NUM_MATCHER = re.compile(r".*context.*\[[\"']try_number[\"']].*") GET_MANDATORY_MATCHER = re.compile(r".*conf\.get_mandatory_value") GET_AIRFLOW_APP_MATCHER = re.compile(r".*get_airflow_app\(\)")