Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Snowflake Agent Bug #2605

Merged
merged 38 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
52c79b6
fix snowflake agent bug
Future-Outlier Jul 24, 2024
2d5c1a6
a work version
Future-Outlier Jul 24, 2024
804052a
Snowflake work version
Future-Outlier Jul 25, 2024
90f08dc
fix secret encode
Future-Outlier Jul 25, 2024
c0f84f2
all works, I am so happy
Future-Outlier Jul 25, 2024
05adfd1
improve additional protocol
Future-Outlier Jul 25, 2024
89d633f
fix tests
Future-Outlier Jul 25, 2024
d1d8024
Fix Tests
Future-Outlier Jul 25, 2024
a17f28d
update agent
pingsutw Jul 25, 2024
547a801
Add snowflake test
pingsutw Jul 25, 2024
a6de45c
nit
pingsutw Jul 25, 2024
14c4318
sd
pingsutw Jul 25, 2024
76637e8
snowflake loglinks
Future-Outlier Jul 25, 2024
762ad0b
add metadata
Future-Outlier Jul 26, 2024
1fcd2de
secret
pingsutw Jul 29, 2024
4a8c8ba
nit
pingsutw Jul 29, 2024
3a7a9cd
remove table
Future-Outlier Jul 30, 2024
2704555
add comment for get private key
Future-Outlier Jul 30, 2024
469b86c
update comments:
Future-Outlier Jul 30, 2024
378327f
Fix Tests
Future-Outlier Jul 30, 2024
d71ef8f
update comments
Future-Outlier Jul 30, 2024
6a8cd9a
update comments
Future-Outlier Jul 30, 2024
5035063
Better Secrets
Future-Outlier Jul 30, 2024
aaff3d2
use union secret
Future-Outlier Jul 30, 2024
45a788d
Update Changes
Future-Outlier Jul 30, 2024
dfe6f97
use if not get_plugin().secret_requires_group()
Future-Outlier Jul 30, 2024
03e8b69
Use Union SDK
Future-Outlier Jul 30, 2024
41e2a19
Update
Future-Outlier Jul 30, 2024
af5a2f1
Fix Secrets
Future-Outlier Jul 30, 2024
c4b641e
Fix Secrets
Future-Outlier Jul 30, 2024
c8f472f
remove pacakge.json
Future-Outlier Jul 31, 2024
4b08fbe
lint
Future-Outlier Jul 31, 2024
de6ce1a
add snowflake-connector-python
Future-Outlier Jul 31, 2024
58a1106
fix test_snowflake
Future-Outlier Jul 31, 2024
4aa2411
Try to fix tests
Future-Outlier Jul 31, 2024
31e57c8
fix tests
Future-Outlier Jul 31, 2024
4a9e936
Try Fix snowflake Import
Future-Outlier Jul 31, 2024
1dd36b2
snowflake test passed
Future-Outlier Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@
Retrieves a secret using the resolution order -> Env followed by file. If not found raises a ValueError
param encode_mode, defines the mode to open files, it can either be "r" to read file, or "rb" to read binary file
"""

from flytekit.configuration.plugin import get_plugin

if not get_plugin().secret_requires_group():
group, group_version = None, None

Check warning on line 374 in flytekit/core/context_manager.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/context_manager.py#L374

Added line #L374 was not covered by tests

env_var = self.get_secrets_env_var(group, key, group_version)
fpath = self.get_secrets_file(group, key, group_version)
v = os.environ.get(env_var)
Expand Down
6 changes: 6 additions & 0 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@
register_arrow_handlers,
register_bigquery_handlers,
register_pandas_handlers,
register_snowflake_handlers,
)
from flytekit.types.structured.structured_dataset import DuplicateHandlerError

Expand Down Expand Up @@ -1015,6 +1016,11 @@
from flytekit.types import numpy # noqa: F401
if is_imported("PIL"):
from flytekit.types.file import image # noqa: F401
if is_imported("snowflake.connector"):
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
try:
register_snowflake_handlers()
except DuplicateHandlerError:
logger.debug("Transformer for snowflake is already registered.")

Check warning on line 1023 in flytekit/core/type_engine.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/type_engine.py#L1020-L1023

Added lines #L1020 - L1023 were not covered by tests
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def to_literal_type(cls, python_type: Type) -> LiteralType:
Expand Down
14 changes: 14 additions & 0 deletions flytekit/types/structured/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,17 @@
"We won't register bigquery handler for structured dataset because "
"we can't find the packages google-cloud-bigquery-storage and google-cloud-bigquery"
)


def register_snowflake_handlers():
try:
from .snowflake import PandasToSnowflakeEncodingHandlers, SnowflakeToPandasDecodingHandler

Check warning on line 75 in flytekit/types/structured/__init__.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/__init__.py#L74-L75

Added lines #L74 - L75 were not covered by tests

StructuredDatasetTransformerEngine.register(SnowflakeToPandasDecodingHandler())
StructuredDatasetTransformerEngine.register(PandasToSnowflakeEncodingHandlers())

Check warning on line 78 in flytekit/types/structured/__init__.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/__init__.py#L77-L78

Added lines #L77 - L78 were not covered by tests

except ImportError:
logger.info(

Check warning on line 81 in flytekit/types/structured/__init__.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/__init__.py#L80-L81

Added lines #L80 - L81 were not covered by tests
"We won't register snowflake handler for structured dataset because "
"we can't find package snowflake-connector-python"
)
106 changes: 106 additions & 0 deletions flytekit/types/structured/snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import re
import typing

Check warning on line 2 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L1-L2

Added lines #L1 - L2 were not covered by tests

import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

Check warning on line 6 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L4-L6

Added lines #L4 - L6 were not covered by tests

import flytekit
from flytekit import FlyteContext
from flytekit.models import literals
from flytekit.models.types import StructuredDatasetType
from flytekit.types.structured.structured_dataset import (

Check warning on line 12 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L8-L12

Added lines #L8 - L12 were not covered by tests
StructuredDataset,
StructuredDatasetDecoder,
StructuredDatasetEncoder,
StructuredDatasetMetadata,
)

SNOWFLAKE = "snowflake"
PROTOCOL_SEP = "\\/|://|:"

Check warning on line 20 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L19-L20

Added lines #L19 - L20 were not covered by tests


def get_private_key() -> bytes:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

Check warning on line 25 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L23-L25

Added lines #L23 - L25 were not covered by tests

pk_string = flytekit.current_context().secrets.get("private_key", "snowflake", encode_mode="r")

Check warning on line 27 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L27

Added line #L27 was not covered by tests

# Cryptography needs the string to be stripped and converted to bytes
pk_string = pk_string.strip().encode()
p_key = serialization.load_pem_private_key(pk_string, password=None, backend=default_backend())

Check warning on line 31 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L30-L31

Added lines #L30 - L31 were not covered by tests

pkb = p_key.private_bytes(

Check warning on line 33 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L33

Added line #L33 was not covered by tests
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)

return pkb

Check warning on line 39 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L39

Added line #L39 was not covered by tests


def _write_to_sf(structured_dataset: StructuredDataset):

Check warning on line 42 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L42

Added line #L42 was not covered by tests
if structured_dataset.uri is None:
raise ValueError("structured_dataset.uri cannot be None.")

Check warning on line 44 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L44

Added line #L44 was not covered by tests

uri = structured_dataset.uri
_, user, account, warehouse, database, schema, table = re.split(PROTOCOL_SEP, uri)
df = structured_dataset.dataframe

Check warning on line 48 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L46-L48

Added lines #L46 - L48 were not covered by tests

conn = snowflake.connector.connect(

Check warning on line 50 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L50

Added line #L50 was not covered by tests
user=user, account=account, private_key=get_private_key(), database=database, schema=schema, warehouse=warehouse
)

write_pandas(conn, df, table)

Check warning on line 54 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L54

Added line #L54 was not covered by tests


def _read_from_sf(

Check warning on line 57 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L57

Added line #L57 was not covered by tests
flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata
) -> pd.DataFrame:
if flyte_value.uri is None:
raise ValueError("structured_dataset.uri cannot be None.")

Check warning on line 61 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L61

Added line #L61 was not covered by tests

uri = flyte_value.uri
_, user, account, warehouse, database, schema, query_id = re.split(PROTOCOL_SEP, uri)

Check warning on line 64 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L63-L64

Added lines #L63 - L64 were not covered by tests

conn = snowflake.connector.connect(

Check warning on line 66 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L66

Added line #L66 was not covered by tests
user=user,
account=account,
private_key=get_private_key(),
database=database,
schema=schema,
warehouse=warehouse,
)

cs = conn.cursor()
cs.get_results_from_sfqid(query_id)
return cs.fetch_pandas_all()

Check warning on line 77 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L75-L77

Added lines #L75 - L77 were not covered by tests


class PandasToSnowflakeEncodingHandlers(StructuredDatasetEncoder):
def __init__(self):
super().__init__(python_type=pd.DataFrame, protocol=SNOWFLAKE, supported_format="")

Check warning on line 82 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L80-L82

Added lines #L80 - L82 were not covered by tests

def encode(

Check warning on line 84 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L84

Added line #L84 was not covered by tests
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
) -> literals.StructuredDataset:
_write_to_sf(structured_dataset)
return literals.StructuredDataset(

Check warning on line 91 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L90-L91

Added lines #L90 - L91 were not covered by tests
uri=typing.cast(str, structured_dataset.uri), metadata=StructuredDatasetMetadata(structured_dataset_type)
)


class SnowflakeToPandasDecodingHandler(StructuredDatasetDecoder):
def __init__(self):
super().__init__(pd.DataFrame, protocol=SNOWFLAKE, supported_format="")

Check warning on line 98 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L96-L98

Added lines #L96 - L98 were not covered by tests

def decode(

Check warning on line 100 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L100

Added line #L100 was not covered by tests
self,
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
current_task_metadata: StructuredDatasetMetadata,
) -> pd.DataFrame:
return _read_from_sf(flyte_value, current_task_metadata)

Check warning on line 106 in flytekit/types/structured/snowflake.py

View check run for this annotation

Codecov / codecov/patch

flytekit/types/structured/snowflake.py#L106

Added line #L106 was not covered by tests
17 changes: 14 additions & 3 deletions flytekit/types/structured/structured_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import typing
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, is_dataclass
from typing import Dict, Generator, Optional, Type, Union
from typing import Dict, Generator, List, Optional, Type, Union

from dataclasses_json import config
from fsspec.utils import get_protocol
Expand Down Expand Up @@ -222,7 +222,12 @@ def extract_cols_and_format(


class StructuredDatasetEncoder(ABC):
def __init__(self, python_type: Type[T], protocol: Optional[str] = None, supported_format: Optional[str] = None):
def __init__(
self,
python_type: Type[T],
protocol: Optional[str] = None,
supported_format: Optional[str] = None,
):
"""
Extend this abstract class, implement the encode function, and register your concrete class with the
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle
Expand Down Expand Up @@ -284,7 +289,13 @@ def encode(


class StructuredDatasetDecoder(ABC):
def __init__(self, python_type: Type[DF], protocol: Optional[str] = None, supported_format: Optional[str] = None):
def __init__(
self,
python_type: Type[DF],
protocol: Optional[str] = None,
supported_format: Optional[str] = None,
additional_protocols: Optional[List[str]] = None,
):
"""
Extend this abstract class, implement the decode function, and register your concrete class with the
StructuredDatasetTransformerEngine class in order for the core flytekit type engine to handle
Expand Down
154 changes: 154 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"dependencies": {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
"msgpack5": "^6.0.2"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
self,
name: str,
query_template: str,
task_config: Optional[BigQueryConfig],
task_config: BigQueryConfig,
inputs: Optional[Dict[str, Type]] = None,
output_structured_dataset_type: Optional[Type[StructuredDataset]] = None,
**kwargs,
Expand Down
Loading
Loading