Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowpark (Snowflake) dataset for kedro #104

Merged
merged 21 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2a861cd
Add Snowpark datasets
Vladimir-Filimonov Jan 23, 2023
4469b5f
Add snowpark tests
heber-urdaneta Jan 23, 2023
d21fce4
Update tests requirements and config
heber-urdaneta Jan 23, 2023
05b5059
Snowpark dataset implementation
Vladimir-Filimonov Jan 23, 2023
ec6de72
Update snowpark class name and docs formatting
heber-urdaneta Feb 3, 2023
3cc53d7
Adjustments for lint check
heber-urdaneta Feb 3, 2023
de89ed7
Merge branch 'kedro-org:main' into main
heber-urdaneta Feb 8, 2023
c4a1fd9
Change describe to remove dict call
heber-urdaneta Feb 8, 2023
4168e64
Remove pylint too many args suppression
heber-urdaneta Feb 8, 2023
5fcd1b9
Update SnowparkTableDataSet and env vars
heber-urdaneta Feb 9, 2023
8f61ab7
Remove pd interactions and add docs
heber-urdaneta Feb 23, 2023
5bc11e1
Fix docs example
heber-urdaneta Feb 23, 2023
e968eec
Merge pull request #2 from Vladimir-Filimonov/remove_pd
heber-urdaneta Feb 24, 2023
386a1b9
Fix sp for other py versions
heber-urdaneta Feb 24, 2023
4898c5e
Remove leftover TODO
heber-urdaneta Feb 28, 2023
ca7de93
Adjust documentation wording
heber-urdaneta Mar 7, 2023
47d9afc
Add SnowparkTableDataSet to release notes
heber-urdaneta Mar 7, 2023
1904104
Revert Add SnowparkTableDataSet to release notes
heber-urdaneta Mar 8, 2023
c16b778
Merge branch 'kedro-org:main' into update_branch
heber-urdaneta Mar 8, 2023
c399e7c
Correct RELEASE.md conflict
heber-urdaneta Mar 8, 2023
2716be6
Merge pull request #3 from Vladimir-Filimonov/update_branch
heber-urdaneta Mar 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ test-no-spark:

test-no-spark-sequential:
cd kedro-datasets && pytest tests --no-cov --ignore tests/spark

# kedro-datasets/snowflake tests skipped from default scope
test-snowflake-only:
cd kedro-datasets && pytest tests --no-cov --numprocesses 1 --dist loadfile -m snowflake
8 changes: 8 additions & 0 deletions kedro-datasets/kedro_datasets/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Provides I/O modules for Snowflake."""

__all__ = ["SnowparkTableDataSet"]

from contextlib import suppress

with suppress(ImportError):
from .snowpark_dataset import SnowparkTableDataSet
233 changes: 233 additions & 0 deletions kedro-datasets/kedro_datasets/snowflake/snowpark_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""``AbstractDataSet`` implementation to access Snowflake using Snowpark dataframes
"""
import logging
from copy import deepcopy
from typing import Any, Dict

import snowflake.snowpark as sp
from kedro.io.core import AbstractDataSet, DataSetError

logger = logging.getLogger(__name__)


class SnowparkTableDataSet(AbstractDataSet):
"""``SnowparkTableDataSet`` loads and saves Snowpark dataframes.

Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:

.. code-block:: yaml
weather:
type: kedro_datasets.snowflake.SnowparkTableDataSet
table_name: "weather_data"
database: "meteorology"
schema: "observations"
credentials: db_credentials
save_args:
mode: overwrite
column_order: name
table_type: ''

One can skip everything but "table_name" if database and
schema provided via credentials. Therefore catalog entries can be shorter
if ex. all used Snowflake tables live in same database/schema.
Values in dataset definition take priority over ones defined in credentials
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
One can skip everything but "table_name" if database and
schema provided via credentials. Therefore catalog entries can be shorter
if ex. all used Snowflake tables live in same database/schema.
Values in dataset definition take priority over ones defined in credentials
You can skip everything but "table_name" if the database and
schema are provided via credentials. That way catalog entries can be shorter
if, for example, all used Snowflake tables live in same database/schema.
Values in the dataset definition take priority over those defined in credentials.


Example:
Credentials file provides all connection attributes, catalog entry
"weather" reuse credentials parameters, "polygons" catalog entry reuse
all credentials parameters except providing different schema name.
Second example of credentials file uses externalbrowser authentication
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Credentials file provides all connection attributes, catalog entry
"weather" reuse credentials parameters, "polygons" catalog entry reuse
all credentials parameters except providing different schema name.
Second example of credentials file uses externalbrowser authentication
Credentials file provides all connection attributes, catalog entry
"weather" reuses credentials parameters, "polygons" catalog entry reuses
all credentials parameters except providing a different schema name.
Second example of credentials file uses ``externalbrowser`` authentication


catalog.yml

.. code-block:: yaml
weather:
type: kedro_datasets.snowflake.SnowparkTableDataSet
table_name: "weather_data"
Vladimir-Filimonov marked this conversation as resolved.
Show resolved Hide resolved
database: "meteorology"
schema: "observations"
credentials: snowflake_client
save_args:
mode: overwrite
column_order: name
table_type: ''

polygons:
type: kedro_datasets.snowflake.SnowparkTableDataSet
table_name: "geopolygons"
credentials: snowflake_client
schema: "geodata"

credentials.yml

.. code-block:: yaml
snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "service_account_abc"
password: "supersecret"

credentials.yml (with externalbrowser authenticator)

.. code-block:: yaml
snowflake_client:
account: 'ab12345.eu-central-1'
port: 443
warehouse: "datascience_wh"
database: "detailed_data"
schema: "observations"
user: "john_doe@wdomain.com"
authenticator: "externalbrowser"

As of Jan-2023, the snowpark connector only works with Python 3.8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth to put this all the way at the top of the class doc string. I can imagine a lot of users would just skip reading the examples.

"""

# this dataset cannot be used with ``ParallelRunner``,
# therefore it has the attribute ``_SINGLE_PROCESS = True``
# for parallelism within a pipeline please consider
# ``ThreadRunner`` instead
_SINGLE_PROCESS = True
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]

# TODO: Update docstring
Vladimir-Filimonov marked this conversation as resolved.
Show resolved Hide resolved
def __init__( # pylint: disable=too-many-arguments
self,
table_name: str,
schema: str = None,
database: str = None,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
credentials: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``SnowparkTableDataSet``.

Args:
table_name: The table name to load or save data to.
schema: Name of the schema where ``table_name`` is.
Optional as can be provided as part of ``credentials``
dictionary. Argument value takes priority over one provided
in ``credentials`` if any.
database: Name of the database where ``schema`` is.
Optional as can be provided as part of ``credentials``
dictionary. Argument value takes priority over one provided
in ``credentials`` if any.
load_args: Currently not used
save_args: Provided to underlying snowpark ``save_as_table``
To find all supported arguments, see here:
https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameWriter.saveAsTable.html
credentials: A dictionary with a snowpark connection string.
To find all supported arguments, see here:
https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect
"""

if not table_name:
raise DataSetError("'table_name' argument cannot be empty.")

if not credentials:
raise DataSetError("'credentials' argument cannot be empty.")

if not database:
if not ("database" in credentials and credentials["database"]):
raise DataSetError(
"'database' must be provided by credentials or dataset."
)
database = credentials["database"]

if not schema:
if not ("schema" in credentials and credentials["schema"]):
raise DataSetError(
"'schema' must be provided by credentials or dataset."
)
schema = credentials["schema"]
# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

self._table_name = table_name
self._database = database
self._schema = schema

connection_parameters = credentials
connection_parameters.update(
{"database": self._database, "schema": self._schema}
)
self._connection_parameters = connection_parameters
self._session = self._get_session(self._connection_parameters)

def _describe(self) -> Dict[str, Any]:
return {
"table_name": self._table_name,
"database": self._database,
"schema": self._schema,
}

@staticmethod
def _get_session(connection_parameters) -> sp.Session:
"""Given a connection string, create singleton connection
to be used across all instances of `SnowparkTableDataSet` that
need to connect to the same source.
connection_parameters is a dictionary of any values
supported by snowflake python connector:
https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect
example:
connection_parameters = {
"account": "",
"user": "",
"password": "", (optional)
"role": "", (optional)
"warehouse": "", (optional)
"database": "", (optional)
"schema": "", (optional)
"authenticator: "" (optional)
}
"""
try:
logger.debug("Trying to reuse active snowpark session...")
session = sp.context.get_active_session()
except sp.exceptions.SnowparkSessionException:
logger.debug("No active snowpark session found. Creating")
session = sp.Session.builder.configs(connection_parameters).create()
return session

def _load(self) -> sp.DataFrame:
table_name = [
self._database,
self._schema,
self._table_name,
]

sp_df = self._session.table(".".join(table_name))
return sp_df

def _save(self, data: sp.DataFrame) -> None:
table_name = [
self._database,
self._schema,
self._table_name,
]

data.write.save_as_table(table_name, **self._save_args)

def _exists(self) -> bool:
session = self._session
query = "SELECT COUNT(*) FROM {database}.INFORMATION_SCHEMA.TABLES \
Vladimir-Filimonov marked this conversation as resolved.
Show resolved Hide resolved
WHERE TABLE_SCHEMA = '{schema}' \
AND TABLE_NAME = '{table_name}'"
rows = session.sql(
query.format(
database=self._database,
schema=self._schema,
table_name=self._table_name,
)
).collect()
return rows[0][0] == 1
1 change: 0 additions & 1 deletion kedro-datasets/kedro_datasets/video/video_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class VideoDataSet(AbstractDataSet[AbstractVideo, AbstractVideo]):
"""

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
Expand Down
2 changes: 1 addition & 1 deletion kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ min-public-methods = 1
[tool.coverage.report]
fail_under = 100
show_missing = true
omit = ["tests/*", "kedro_datasets/holoviews/*"]
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/snowflake/*"]
exclude_lines = ["pragma: no cover", "raise NotImplementedError"]

[tool.pytest.ini_options]
Expand Down
4 changes: 4 additions & 0 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ def _collect_requirements(requires):
"spark.SparkJDBCDataSet": [SPARK, HDFS, S3FS],
"spark.DeltaTableDataSet": [SPARK, HDFS, S3FS, "delta-spark~=1.0"],
}
snowpark_require = {
"snowflake.SnowparkTableDataSet": ["snowflake-snowpark-python~=1.0.0", "pyarrow~=8.0"]
}
svmlight_require = {"svmlight.SVMLightDataSet": ["scikit-learn~=1.0.2", "scipy~=1.7.3"]}
tensorflow_required = {
"tensorflow.TensorflowModelDataset": [
Expand Down Expand Up @@ -126,6 +129,7 @@ def _collect_requirements(requires):
**video_require,
**plotly_require,
**spark_require,
**snowpark_require,
**svmlight_require,
**tensorflow_required,
**yaml_require,
Expand Down
3 changes: 2 additions & 1 deletion kedro-datasets/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Pillow~=9.0
plotly>=4.8.0, <6.0
pre-commit>=2.9.2, <3.0 # The hook `mypy` requires pre-commit version 2.9.2.
psutil==5.8.0
pyarrow>=1.0, <7.0
pyarrow~=8.0
pylint>=2.5.2, <3.0
pyproj~=3.0
pyspark>=2.2, <4.0
Expand All @@ -49,6 +49,7 @@ requests~=2.20
s3fs>=0.3.0, <0.5 # Needs to be at least 0.3.0 to make use of `cachable` attribute on S3FileSystem.
scikit-learn~=1.0.2
scipy~=1.7.3
snowflake-snowpark-python~=1.0.0; python_version == '3.8'
SQLAlchemy~=1.2
tables~=3.6.0; platform_system == "Windows" and python_version<'3.9'
tables~=3.6; platform_system != "Windows"
Expand Down
34 changes: 34 additions & 0 deletions kedro-datasets/tests/snowflake/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Snowpark connector testing

Execution of automated tests for Snowpark connector requires real Snowflake instance access. Therefore tests located in this folder are **disabled** by default from pytest execution scope using [conftest.py](conftest.py).

[Makefile](/Makefile) provides separate argument ``test-snowflake-only`` to run only tests related to Snowpark connector. To run tests one need to provide Snowflake connection parameters via environment variables:
* SNOWSQL_ACCOUNT - Snowflake account name with region. Ex `ab12345.eu-central-2`
* SNOWSQL_WAREHOUSE - Snowflake virtual warehouse to use
* SNOWSQL_DATABASE - Database to use
* SNOWSQL_SCHEMA - Schema to use when creating tables for tests
* SNOWSQL_ROLE - Role to use for connection
* SNOWSQL_USER - Username to use for connection
* SNOWSQL_PWD - Plain password to use for connection

All environment variables need to be provided for tests to run.

Here is example shell command to run snowpark tests via make utility:
```bash
SF_ACCOUNT='ab12345.eu-central-2' SF_WAREHOUSE='DEV_WH' SF_DATABASE='DEV_DB' SF_ROLE='DEV_ROLE' SF_USER='DEV_USER' SF_SCHEMA='DATA' SF_PASSWORD='supersecret' make test-snowflake-only
```

Currently running tests supports only simple username & password authentication and not SSO/MFA.

As of Jan-2023, the snowpark connector only works with Python 3.8.

## Snowflake permissions required
Credentials provided via environment variables should have following permissions granted to run tests successfully:
* Create tables in a given schema
* Drop tables in a given schema
* Insert rows into tables in a given schema
* Query tables in a given schema
* Query `INFORMATION_SCHEMA.TABLES` of respective database

## Extending tests
Contributors adding new tests should add `@pytest.mark.snowflake` decorator to each test. Exclusion of Snowpark-related pytests from overall execution scope in [conftest.py](conftest.py) works based on markers.
Empty file.
24 changes: 24 additions & 0 deletions kedro-datasets/tests/snowflake/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
We disable execution of tests that require real Snowflake instance
to run by default. Providing -m snowflake option explicitly to
pytest will make these and only these tests run
"""
import pytest


def pytest_collection_modifyitems(config, items):
markers_arg = config.getoption("-m")

# Naive implementation to handle basic marker expressions
# Will not work if someone will (ever) run pytest with complex marker
# expressions like "-m spark and not (snowflake or pandas)"
if (
"snowflake" in markers_arg.lower()
and "not snowflake" not in markers_arg.lower()
):
return

skip_snowflake = pytest.mark.skip(reason="need -m snowflake option to run")
for item in items:
if "snowflake" in item.keywords:
item.add_marker(skip_snowflake)
Loading