Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR for issue 1946 - snowflake integration #2029

Closed
wants to merge 9 commits into from

Conversation

heber-urdaneta
Copy link

@heber-urdaneta heber-urdaneta commented Nov 15, 2022

NOTE: Kedro datasets are moving from kedro.extras.datasets to a separate kedro-datasets package in
kedro-plugins repository. Any changes to the dataset implementations
should be done by opening a pull request in that repository.

Description

Issue 1946, kedro-org/kedro-plugins#108

Development notes

Created snowflake_dataset.py. Save and load has been tested with dummy datasets on sandbox snowflake env
Pending to integrate snowflake session creation with snowflake kedro starter
Pending to create tests

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change in the RELEASE.md file
  • Added tests to cover my changes

Copy link
Contributor

@datajoely datajoely left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really really nice work @heber-urdaneta 💪

# in case module import error does not match our expected pattern
# we have no recommendation
if not res:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to raise an error here?

if not credentials:
raise DataSetError("Please configure expected credentials")

# print(self._load_args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# print(self._load_args)


@classmethod
def _get_session(cls, credentials: dict) -> None:
"""Given a connection string, create singleton connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did something similar in pandas.SQL*DataSet - is this the same pattern?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the same pattern as the SQLDataSet, this would change when implementing a session hook, similar to the SparkDataSet


def _load(self) -> pd.DataFrame:
sp_df = self._session.table(self._load_args["table_name"])
return sp_df.to_pandas()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to return as a Pandas, I would return a SnowPark DataFrame and let the user do the pandas casting themselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

➕ to that, Snowpark DataFrames are lazy, so the user node could potentially leverage that.

sp_df = self._session.table(self._load_args["table_name"])
return sp_df.to_pandas()

def _save(self, data: pd.DataFrame) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _save(self, data: pd.DataFrame) -> None:
def _save(self, data: [pd.DataFrame, snowpark.DataFrame]) -> None:

I'm not actually sure on the type signature, but the push here is to accept either option and handle gracefully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Union should be stated either by Union[pd.DataFrame, snowpark.DataFrame] or pd.DataFrame | snowpark.DataFrame.

@@ -37,7 +37,7 @@ Pillow~=9.0
plotly>=4.8.0, <6.0
pre-commit>=2.9.2, <3.0 # The hook `mypy` requires pre-commit version 2.9.2.
psutil==5.8.0
pyarrow>=1.0, <7.0
pyarrow>=1.0, <9.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully this doesn't break any other bits of Kedro!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully not! But found this dependency update required for the snowpark df.to_pandas() method to work.
Maybe it won't be required if _load() returns a snowpark df - but still could affect if it gets converted to pandas on a pipeline

@@ -50,6 +50,7 @@ requests-mock~=1.6
requests~=2.20
s3fs>=0.3.0, <0.5 # Needs to be at least 0.3.0 to make use of `cachable` attribute on S3FileSystem.
SQLAlchemy~=1.2
snowflake-snowpark-python~=0.12.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.0.0 came out 1st November


def _load(self) -> pd.DataFrame:
sp_df = self._session.table(self._load_args["table_name"])
return sp_df.to_pandas()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

➕ to that, Snowpark DataFrames are lazy, so the user node could potentially leverage that.

sp_df = self._session.table(self._load_args["table_name"])
return sp_df.to_pandas()

def _save(self, data: pd.DataFrame) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Union should be stated either by Union[pd.DataFrame, snowpark.DataFrame] or pd.DataFrame | snowpark.DataFrame.

>>> "role": "",
>>> "warehouse": "",
>>> "database": "",
>>> "schema": ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are database, schema, warehouse part of credentials?

]
sp_df.write.mode(self._save_args["mode"]).save_as_table(
table_name,
column_order=self._save_args["column_order"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this be inferred from the passed dataframe and only fallback to save_args if they are set?

Updated snowpark test prerequisites
Draft implementation of ShowParkDataSet class
@datajoely
Copy link
Contributor

I'm confused between this and #2032 what's the difference?

Bumped python version to 3.8 as required by snowpark
@heber-urdaneta
Copy link
Author

We can ignore #2032, the latest commit on this PR has now the snowpark_dataset.py, addressing previous observations from snowflake_dataset.py

@heber-urdaneta
Copy link
Author

Development of snowpark integration moved to this PR: kedro-org/kedro-plugins#78

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants