Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snowflake to slack operator #9023

Merged
merged 7 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ opsgenie http
postgres amazon
sftp ssh
slack http
snowflake slack
========================== ===========================

.. END PACKAGE DEPENDENCIES HERE
Expand Down
3 changes: 3 additions & 0 deletions airflow/providers/dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@
],
"slack": [
"http"
],
"snowflake": [
"slack"
]
}
32 changes: 21 additions & 11 deletions airflow/providers/snowflake/example_dags/example_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@
from airflow import DAG
from airflow.providers.snowflake.operators.s3_to_snowflake import S3ToSnowflakeTransfer
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.operators.snowflake_to_slack import SnowflakeToSlackOperator
from airflow.utils.dates import days_ago

SNOWFLAKE_CONN_ID = os.environ.get('SNOWFLAKE_CONN_ID', 'snowflake_default')
SLACK_CONN_ID = os.environ.get('SLACK_CONN_ID', 'slack_default')
# TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
SNOWFLAKE_SCHEMA = os.environ.get('SNOWFLAKE_SCHEMA', 'public')
SNOWFLAKE_STAGE = os.environ.get('SNOWFLAKE_STAGE', 'airflow')
SNOWFLAKE_SAMPLE_TABLE = os.environ.get('SNOWFLAKE_SAMPLE_TABLE', 'snowflake_sample_data.tpch_sf001.orders')
SNOWFLAKE_LOAD_TABLE = os.environ.get('SNOWFLAKE_LOAD_TABLE', 'airflow_example')
SNOWFLAKE_LOAD_JSON_PATH = os.environ.get('SNOWFLAKE_LOAD_PATH', 'example.json')

SNOWFLAKE_SELECT_SQL = f"SELECT * FROM {SNOWFLAKE_SAMPLE_TABLE} LIMIT 100;"
SNOWFLAKE_SLACK_SQL = f"SELECT O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS FROM {SNOWFLAKE_SAMPLE_TABLE} LIMIT 10;"
SNOWFLAKE_SLACK_MESSAGE = "Results in an ASCII table:\n" \
"```{{ results_df | tabulate(tablefmt='pretty', headers='keys') }}```"
SNOWFLAKE_CREATE_TABLE_SQL = f"CREATE TRANSIENT TABLE IF NOT EXISTS {SNOWFLAKE_LOAD_TABLE}(data VARIANT);"

default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
Expand All @@ -47,22 +55,23 @@
select = SnowflakeOperator(
task_id='select',
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="""
SELECT *
FROM {0}
LIMIT 100;
""".format(SNOWFLAKE_SAMPLE_TABLE),
sql=SNOWFLAKE_SELECT_SQL,
dag=dag,
)

slack_report = SnowflakeToSlackOperator(
task_id="slack_report",
sql=SNOWFLAKE_SLACK_SQL,
slack_message=SNOWFLAKE_SLACK_MESSAGE,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
slack_conn_id=SLACK_CONN_ID,
dag=dag
)

create_table = SnowflakeOperator(
task_id='create_table',
snowflake_conn_id='snowflake_conn_id',
sql="""
CREATE TRANSIENT TABLE IF NOT EXISTS {0} (
data VARIANT
);
""".format(SNOWFLAKE_LOAD_TABLE),
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=SNOWFLAKE_CREATE_TABLE_SQL,
schema=SNOWFLAKE_SCHEMA,
dag=dag,
)
Expand All @@ -78,4 +87,5 @@
dag=dag,
)

select >> slack_report
create_table >> copy_into_table
152 changes: 152 additions & 0 deletions airflow/providers/snowflake/operators/snowflake_to_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Iterable, Mapping, Optional, Union

from pandas import DataFrame
from tabulate import tabulate

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults


class SnowflakeToSlackOperator(BaseOperator):
"""
Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
results_df }}'. The 'results_df' variable name can be changed by specifing a different
'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.

simond marked this conversation as resolved.
Show resolved Hide resolved
:param sql: The SQL statement to execute on Snowflake (templated)
:type sql: str
:param slack_message: The templated Slack message to send with the data returned from Snowflake.
You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the
SQL results
:type slack_message: str
:param snowflake_conn_id: The Snowflake connection id
:type snowflake_conn_id: str
:param slack_conn_id: The connection id for Slack
:type slack_conn_id: str
:param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
:type results_df_name: str
:param parameters: The parameters to pass to the SQL query
:type parameters: Optional[Union[Iterable, Mapping]]
:param warehouse: The Snowflake virtual warehouse to use to run the SQL query
:type warehouse: Optional[str]
:param database: The Snowflake database to use for the SQL query
:type database: Optional[str]
:param schema: The schema to run the SQL against in Snowflake
:type schema: Optional[str]
:param role: The role to use when connecting to Snowflake
:type role: Optional[str]
:param slack_token: The token to use to authenticate to Slack. If this is not provided, the
'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
:type slack_token: Optional[str]
"""
template_fields = ['sql', 'slack_message']
template_ext = ['.sql', '.jinja', '.j2']
times_rendered = 0

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
sql: str,
slack_message: str,
snowflake_conn_id: str = 'snowflake_default',
slack_conn_id: str = 'slack_default',
results_df_name: str = 'results_df',
parameters: Optional[Union[Iterable, Mapping]] = None,
warehouse: Optional[str] = None,
database: Optional[str] = None,
schema: Optional[str] = None,
role: Optional[str] = None,
slack_token: Optional[str] = None,
*args, **kwargs
) -> None:
super(SnowflakeToSlackOperator, self).__init__(*args, **kwargs)

self.snowflake_conn_id = snowflake_conn_id
self.sql = sql
self.parameters = parameters
self.warehouse = warehouse
self.database = database
self.schema = schema
self.role = role
self.slack_conn_id = slack_conn_id
self.slack_token = slack_token
self.slack_message = slack_message
self.results_df_name = results_df_name

def _get_query_results(self) -> DataFrame:
snowflake_hook = self._get_snowflake_hook()

self.log.info('Running SQL query: %s', self.sql)
df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters)
return df

def _render_and_send_slack_message(self, context, df) -> None:
# Put the dataframe into the context and render the JINJA template fields
context[self.results_df_name] = df
self.render_template_fields(context)

slack_hook = self._get_slack_hook()
self.log.info('Sending slack message: %s', self.slack_message)
slack_hook.execute()

def _get_snowflake_hook(self) -> SnowflakeHook:
return SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id,
warehouse=self.warehouse, database=self.database,
role=self.role, schema=self.schema)

def _get_slack_hook(self) -> SlackWebhookHook:
return SlackWebhookHook(http_conn_id=self.slack_conn_id, message=self.slack_message,
webhook_token=self.slack_token)

def render_template_fields(self, context, jinja_env=None) -> None:
# If this is the first render of the template fields, exclude slack_message from rendering since
# the snowflake results haven't been retrieved yet.
if self.times_rendered == 0:
fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
else:
fields_to_render = self.template_fields

if not jinja_env:
jinja_env = self.get_template_env()

# Add the tabulate library into the JINJA environment
jinja_env.filters['tabulate'] = tabulate

self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
self.times_rendered += 1

def execute(self, context) -> None:
if not isinstance(self.sql, str):
raise AirflowException("Expected 'sql' parameter should be a string.")
if self.sql is None or self.sql.strip() == "":
raise AirflowException("Expected 'sql' parameter is missing.")
if self.slack_message is None or self.slack_message.strip() == "":
raise AirflowException("Expected 'slack_message' parameter is missing.")

df = self._get_query_results()
self._render_and_send_slack_message(context, df)

self.log.debug('Finished sending Snowflake data to Slack')
3 changes: 2 additions & 1 deletion docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,8 @@ These integrations allow you to perform various operations within various servic
* - `Snowflake <https://www.snowflake.com/>`__
-
- :mod:`airflow.providers.snowflake.hooks.snowflake`
- :mod:`airflow.providers.snowflake.operators.snowflake`
- :mod:`airflow.providers.snowflake.operators.snowflake`,
:mod:`airflow.providers.snowflake.operators.snowflake_to_slack`
-

* - `Vertica <https://www.vertica.com/>`__
Expand Down
80 changes: 80 additions & 0 deletions tests/providers/snowflake/operators/test_snowflake_to_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest import mock

from airflow.models import DAG
from airflow.providers.snowflake.operators.snowflake_to_slack import SnowflakeToSlackOperator
from airflow.utils import timezone

TEST_DAG_ID = 'snowflake_to_slack_unit_test'
DEFAULT_DATE = timezone.datetime(2017, 1, 1)


class TestSnowflakeToSlackOperator(unittest.TestCase):
def setUp(self):
self.example_dag = DAG('unit_test_dag_snowflake_to_slack', start_date=DEFAULT_DATE)

@staticmethod
def _construct_operator(**kwargs):
operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
return operator

@mock.patch('airflow.providers.snowflake.operators.snowflake_to_slack.SnowflakeHook')
@mock.patch('airflow.providers.snowflake.operators.snowflake_to_slack.SlackWebhookHook')
def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_class):
operator_args = {
'snowflake_conn_id': 'snowflake_connection',
'slack_conn_id': 'slack_connection',
'sql': "sql {{ ds }}",
'results_df_name': 'xxxx',
'warehouse': 'test_warehouse',
'database': 'test_database',
'role': 'test_role',
'schema': 'test_schema',
'parameters': ['1', '2', '3'],
'slack_message': 'message: {{ ds }}, {{ xxxx }}',
'slack_token': 'test_token',
'dag': self.example_dag
}
snowflake_to_slack_operator = self._construct_operator(**operator_args)

snowflake_hook = mock_snowflake_hook_class.return_value
snowflake_hook.get_pandas_df.return_value = '1234'
slack_webhook_hook = mock_slack_hook_class.return_value

snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

# Test that the Snowflake hook is instantiated with the right parameters
mock_snowflake_hook_class.assert_called_with(database='test_database',
simond marked this conversation as resolved.
Show resolved Hide resolved
role='test_role',
schema='test_schema',
snowflake_conn_id='snowflake_connection',
warehouse='test_warehouse')

# Test that the get_pandas_df method is executed on the Snowflake hook with the prendered sql and
# correct params
snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3'])

# Test that the Slack hook is instantiated with the right parameters
mock_slack_hook_class.assert_called_with(http_conn_id='slack_connection',
simond marked this conversation as resolved.
Show resolved Hide resolved
message='message: 2017-01-01, 1234',
webhook_token='test_token')

# Test that the Slack hook's execute method gets run once
slack_webhook_hook.execute.assert_called_once()