Skip to content

Commit

Permalink
Snowpark (Snowflake) dataset for kedro (kedro-org#104)
Browse files Browse the repository at this point in the history
* Add Snowpark datasets

Signed-off-by: Vladimir Filimonov <vladimir_filimonov@mckinsey.com>
Signed-off-by: heber-urdaneta <heber_urdaneta@mckinsey.com>
Signed-off-by: Danny Farah <danny_farah@mckinsey.com>
  • Loading branch information
Vladimir-Filimonov authored and dannyrfar committed Mar 21, 2023
1 parent e81e41a commit 2e21000
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 3 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ test-no-spark:

test-no-spark-sequential:
cd kedro-datasets && pytest tests --no-cov --ignore tests/spark

# kedro-datasets/snowflake tests skipped from default scope
test-snowflake-only:
cd kedro-datasets && pytest tests --no-cov --numprocesses 1 --dist loadfile -m snowflake
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
| Type | Description | Location |
| ------------------------------------ | -------------------------------------------------------------------------- | ----------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
| `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` |

## Bug fixes and other changes
* Add `mssql` backend to the `SQLQueryDataSet` DataSet using `pyodbc` library.
Expand Down
8 changes: 8 additions & 0 deletions kedro-datasets/kedro_datasets/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Provides I/O modules for Snowflake."""

__all__ = ["SnowparkTableDataSet"]

from contextlib import suppress

with suppress(ImportError):
from .snowpark_dataset import SnowparkTableDataSet
232 changes: 232 additions & 0 deletions kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
"""``AbstractDataSet`` implementation to access Snowflake using Snowpark dataframes
"""
import logging
from copy import deepcopy
from typing import Any, Dict

import snowflake.snowpark as sp
from kedro.io.core import AbstractDataSet, DataSetError

logger = logging.getLogger(__name__)


class SnowparkTableDataSet(AbstractDataSet):
"""``SnowparkTableDataSet`` loads and saves Snowpark dataframes.
As of Mar-2023, the snowpark connector only works with Python 3.8.
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
weather:
type: kedro_datasets.snowflake.SnowparkTableDataSet
table_name: "weather_data"
database: "meteorology"
schema: "observations"
credentials: db_credentials
save_args:
mode: overwrite
column_order: name
table_type: ''
You can skip everything but "table_name" if the database and
schema are provided via credentials. That way catalog entries can be shorter
if, for example, all used Snowflake tables live in same database/schema.
Values in the dataset definition take priority over those defined in credentials.
Example:
Credentials file provides all connection attributes, catalog entry
"weather" reuses credentials parameters, "polygons" catalog entry reuses
all credentials parameters except providing a different schema name.
Second example of credentials file uses ``externalbrowser`` authentication.
catalog.yml
.. code-block:: yaml
weather:
type: kedro_datasets.snowflake.SnowparkTableDataSet
table_name: "weather_data"
database: "meteorology"
schema: "observations"
credentials: snowflake_client
save_args:
mode: overwrite
column_order: name
table_type: ''
polygons:
type: kedro_datasets.snowflake.SnowparkTableDataSet
table_name: "geopolygons"
credentials: snowflake_client
schema: "geodata"
credentials.yml
.. code-block:: yaml
snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "service_account_abc"
password: "supersecret"
credentials.yml (with externalbrowser authenticator)
.. code-block:: yaml
snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "john_doe@wdomain.com"
authenticator: "externalbrowser"
"""

# this dataset cannot be used with ``ParallelRunner``,
# therefore it has the attribute ``_SINGLE_PROCESS = True``
# for parallelism within a pipeline please consider
# ``ThreadRunner`` instead
_SINGLE_PROCESS = True
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]

def __init__( # pylint: disable=too-many-arguments
self,
table_name: str,
schema: str = None,
database: str = None,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
credentials: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``SnowparkTableDataSet``.
Args:
table_name: The table name to load or save data to.
schema: Name of the schema where ``table_name`` is.
Optional as can be provided as part of ``credentials``
dictionary. Argument value takes priority over one provided
in ``credentials`` if any.
database: Name of the database where ``schema`` is.
Optional as can be provided as part of ``credentials``
dictionary. Argument value takes priority over one provided
in ``credentials`` if any.
load_args: Currently not used
save_args: Provided to underlying snowpark ``save_as_table``
To find all supported arguments, see here:
https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameWriter.saveAsTable.html
credentials: A dictionary with a snowpark connection string.
To find all supported arguments, see here:
https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect
"""

if not table_name:
raise DataSetError("'table_name' argument cannot be empty.")

if not credentials:
raise DataSetError("'credentials' argument cannot be empty.")

if not database:
if not ("database" in credentials and credentials["database"]):
raise DataSetError(
"'database' must be provided by credentials or dataset."
)
database = credentials["database"]

if not schema:
if not ("schema" in credentials and credentials["schema"]):
raise DataSetError(
"'schema' must be provided by credentials or dataset."
)
schema = credentials["schema"]
# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

self._table_name = table_name
self._database = database
self._schema = schema

connection_parameters = credentials
connection_parameters.update(
{"database": self._database, "schema": self._schema}
)
self._connection_parameters = connection_parameters
self._session = self._get_session(self._connection_parameters)

def _describe(self) -> Dict[str, Any]:
return {
"table_name": self._table_name,
"database": self._database,
"schema": self._schema,
}

@staticmethod
def _get_session(connection_parameters) -> sp.Session:
"""Given a connection string, create singleton connection
to be used across all instances of `SnowparkTableDataSet` that
need to connect to the same source.
connection_parameters is a dictionary of any values
supported by snowflake python connector:
https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect
example:
connection_parameters = {
"account": "",
"user": "",
"password": "", (optional)
"role": "", (optional)
"warehouse": "", (optional)
"database": "", (optional)
"schema": "", (optional)
"authenticator: "" (optional)
}
"""
try:
logger.debug("Trying to reuse active snowpark session...")
session = sp.context.get_active_session()
except sp.exceptions.SnowparkSessionException:
logger.debug("No active snowpark session found. Creating")
session = sp.Session.builder.configs(connection_parameters).create()
return session

def _load(self) -> sp.DataFrame:
table_name = [
self._database,
self._schema,
self._table_name,
]

sp_df = self._session.table(".".join(table_name))
return sp_df

def _save(self, data: sp.DataFrame) -> None:
table_name = [
self._database,
self._schema,
self._table_name,
]

data.write.save_as_table(table_name, **self._save_args)

def _exists(self) -> bool:
session = self._session
query = "SELECT COUNT(*) FROM {database}.INFORMATION_SCHEMA.TABLES \
WHERE TABLE_SCHEMA = '{schema}' \
AND TABLE_NAME = '{table_name}'"
rows = session.sql(
query.format(
database=self._database,
schema=self._schema,
table_name=self._table_name,
)
).collect()
return rows[0][0] == 1
1 change: 0 additions & 1 deletion kedro-datasets/kedro_datasets/video/video_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class VideoDataSet(AbstractDataSet[AbstractVideo, AbstractVideo]):
"""

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
Expand Down
2 changes: 1 addition & 1 deletion kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ min-public-methods = 1
[tool.coverage.report]
fail_under = 100
show_missing = true
omit = ["tests/*", "kedro_datasets/holoviews/*"]
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/snowflake/*"]
exclude_lines = ["pragma: no cover", "raise NotImplementedError"]

[tool.pytest.ini_options]
Expand Down
4 changes: 4 additions & 0 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def _collect_requirements(requires):
"spark.SparkJDBCDataSet": [SPARK, HDFS, S3FS],
"spark.DeltaTableDataSet": [SPARK, HDFS, S3FS, "delta-spark~=1.0"],
}
snowpark_require = {
"snowflake.SnowparkTableDataSet": ["snowflake-snowpark-python~=1.0.0", "pyarrow~=8.0"]
}
svmlight_require = {"svmlight.SVMLightDataSet": ["scikit-learn~=1.0.2", "scipy~=1.7.3"]}
tensorflow_required = {
"tensorflow.TensorflowModelDataset": [
Expand Down Expand Up @@ -136,6 +139,7 @@ def _collect_requirements(requires):
**video_require,
**plotly_require,
**spark_require,
**snowpark_require,
**svmlight_require,
**tensorflow_required,
**yaml_require,
Expand Down
3 changes: 2 additions & 1 deletion kedro-datasets/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ plotly>=4.8.0, <6.0
polars~=0.15.13
pre-commit>=2.9.2, <3.0 # The hook `mypy` requires pre-commit version 2.9.2.
psutil==5.8.0
pyarrow>=1.0, <7.0
pyarrow~=8.0
pylint>=2.5.2, <3.0
pyodbc~=4.0.35
pyproj~=3.0
Expand All @@ -52,6 +52,7 @@ requests~=2.20
s3fs>=0.3.0, <0.5 # Needs to be at least 0.3.0 to make use of `cachable` attribute on S3FileSystem.
scikit-learn~=1.0.2
scipy~=1.7.3
snowflake-snowpark-python~=1.0.0; python_version == '3.8'
SQLAlchemy~=1.2
tables~=3.6.0; platform_system == "Windows" and python_version<'3.9'
tables~=3.6; platform_system != "Windows"
Expand Down
34 changes: 34 additions & 0 deletions kedro-datasets/tests/snowflake/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Snowpark connector testing

Execution of automated tests for Snowpark connector requires real Snowflake instance access. Therefore tests located in this folder are **disabled** by default from pytest execution scope using [conftest.py](conftest.py).

[Makefile](/Makefile) provides separate argument ``test-snowflake-only`` to run only tests related to Snowpark connector. To run tests one need to provide Snowflake connection parameters via environment variables:
* SNOWSQL_ACCOUNT - Snowflake account name with region. Ex `ab12345.eu-central-2`
* SNOWSQL_WAREHOUSE - Snowflake virtual warehouse to use
* SNOWSQL_DATABASE - Database to use
* SNOWSQL_SCHEMA - Schema to use when creating tables for tests
* SNOWSQL_ROLE - Role to use for connection
* SNOWSQL_USER - Username to use for connection
* SNOWSQL_PWD - Plain password to use for connection

All environment variables need to be provided for tests to run.

Here is example shell command to run snowpark tests via make utility:
```bash
SNOWSQL_ACCOUNT='ab12345.eu-central-2' SNOWSQL_WAREHOUSE='DEV_WH' SNOWSQL_DATABASE='DEV_DB' SNOWSQL_ROLE='DEV_ROLE' SNOWSQL_USER='DEV_USER' SNOWSQL_SCHEMA='DATA' SNOWSQL_PWD='supersecret' make test-snowflake-only
```

Currently running tests supports only simple username & password authentication and not SSO/MFA.

As of Mar-2023, the snowpark connector only works with Python 3.8.

## Snowflake permissions required
Credentials provided via environment variables should have following permissions granted to run tests successfully:
* Create tables in a given schema
* Drop tables in a given schema
* Insert rows into tables in a given schema
* Query tables in a given schema
* Query `INFORMATION_SCHEMA.TABLES` of respective database

## Extending tests
Contributors adding new tests should add `@pytest.mark.snowflake` decorator to each test. Exclusion of Snowpark-related pytests from overall execution scope in [conftest.py](conftest.py) works based on markers.
Empty file.
24 changes: 24 additions & 0 deletions kedro-datasets/tests/snowflake/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
We disable execution of tests that require real Snowflake instance
to run by default. Providing -m snowflake option explicitly to
pytest will make these and only these tests run
"""
import pytest


def pytest_collection_modifyitems(config, items):
markers_arg = config.getoption("-m")

# Naive implementation to handle basic marker expressions
# Will not work if someone will (ever) run pytest with complex marker
# expressions like "-m spark and not (snowflake or pandas)"
if (
"snowflake" in markers_arg.lower()
and "not snowflake" not in markers_arg.lower()
):
return

skip_snowflake = pytest.mark.skip(reason="need -m snowflake option to run")
for item in items:
if "snowflake" in item.keywords:
item.add_marker(skip_snowflake)
Loading

0 comments on commit 2e21000

Please sign in to comment.