Skip to content

Commit

Permalink
fix: Verify the existence of Registry tables in snowflake before call…
Browse files Browse the repository at this point in the history
…ing CREATE sql command. Allow read-only user to call feast apply. (#3851)

Signed-off-by: Shuchu Han <shuchu.han@gmail.com>
  • Loading branch information
shuchu authored Jan 24, 2024
1 parent 86d6221 commit 9a3590e
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 14 deletions.
71 changes: 62 additions & 9 deletions sdk/python/feast/infra/registry/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,19 @@ def __init__(
f'"{self.registry_config.database}"."{self.registry_config.schema_}"'
)

with GetSnowflakeConnection(self.registry_config) as conn:
sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql"
with open(sql_function_file, "r") as file:
sqlFile = file.read()

sqlCommands = sqlFile.split(";")
for command in sqlCommands:
query = command.replace("REGISTRY_PATH", f"{self.registry_path}")
execute_snowflake_statement(conn, query)
if not self._verify_registry_database():
# Verify the existing resitry database schema from snowflake. If any table names and column types is wrong, run table recreation SQL.
with GetSnowflakeConnection(self.registry_config) as conn:
sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql"
with open(sql_function_file, "r") as file:
sqlFile = file.read()

sqlCommands = sqlFile.split(";")
for command in sqlCommands:
query = command.replace(
"REGISTRY_PATH", f"{self.registry_path}"
)
execute_snowflake_statement(conn, query)

self.cached_registry_proto = self.proto()
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
Expand All @@ -145,6 +149,55 @@ def __init__(
)
self.project = project

def _verify_registry_database(
self,
) -> bool:
"""Verify the records in registry database. To check:
1, the 11 tables are existed.
2, the column types are correct.
Example return from snowflake's cursor.describe("SELECT * FROM a_table") command:
[ResultMetadata(name='ENTITY_NAME', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False),
ResultMetadata(name='PROJECT_ID', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False),
ResultMetadata(name='LAST_UPDATED_TIMESTAMP', type_code=6, display_size=None, internal_size=None, precision=0, scale=9, is_nullable=False),
ResultMetadata(name='ENTITY_PROTO', type_code=11, display_size=None, internal_size=8388608, precision=None, scale=None, is_nullable=False)]
Returns:
True if the necessary 11 tables are existed in Snowflake and schema of each table is correct.
False if failure happens.
"""

from feast.infra.utils.snowflake.registry.snowflake_registry_table import (
snowflake_registry_table_names_and_column_types as expect_tables,
)

res = True

try:
with GetSnowflakeConnection(self.registry_config) as conn:
for table_name in expect_tables:
result_metadata_list = conn.cursor().describe(
f"SELECT * FROM {table_name}"
)
for col in result_metadata_list:
if (
expect_tables[table_name][col.name]["type_code"]
!= col.type_code
):
res = False
break
except Exception as e:
res = False # Set to False for all errors.
logger.debug(
f"Failed to verify Registry tables and columns types with exception: {e}."
)
finally:
# The implementation in snowflake_utils.py will cache the established connection without re-connection logic.
# conn.close()
pass

return res

def refresh(self, project: Optional[str] = None):
if project:
project_metadata = proto_registry_utils.get_project_metadata(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-

"""
The table names and column types are following the creation detail listed
in "snowflake_table_creation.sql".
Snowflake Reference:
1, ResultMetadata: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-resultmetadata-object
2, Type Codes: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-python-connector-type-codes
----------------------------------------------
type_code String Representation Data Type
0 FIXED NUMBER/INT
1 REAL REAL
2 TEXT VARCHAR/STRING
3 DATE DATE
4 TIMESTAMP TIMESTAMP
5 VARIANT VARIANT
6 TIMESTAMP_LTZ TIMESTAMP_LTZ
7 TIMESTAMP_TZ TIMESTAMP_TZ
8 TIMESTAMP_NTZ TIMESTAMP_TZ
9 OBJECT OBJECT
10 ARRAY ARRAY
11 BINARY BINARY
12 TIME TIME
13 BOOLEAN BOOLEAN
----------------------------------------------
(last update: 2023-11-30)
"""

snowflake_registry_table_names_and_column_types = {
"DATA_SOURCES": {
"DATA_SOURCE_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"DATA_SOURCE_PROTO": {"type_code": 11, "type": "BINARY"},
},
"ENTITIES": {
"ENTITY_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"ENTITY_PROTO": {"type_code": 11, "type": "BINARY"},
},
"FEAST_METADATA": {
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"METADATA_KEY": {"type_code": 2, "type": "VARCHAR"},
"METADATA_VALUE": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
},
"FEATURE_SERVICES": {
"FEATURE_SERVICE_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"FEATURE_SERVICE_PROTO": {"type_code": 11, "type": "BINARY"},
},
"FEATURE_VIEWS": {
"FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
"MATERIALIZED_INTERVALS": {"type_code": 11, "type": "BINARY"},
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
},
"MANAGED_INFRA": {
"INFRA_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"INFRA_PROTO": {"type_code": 11, "type": "BINARY"},
},
"ON_DEMAND_FEATURE_VIEWS": {
"ON_DEMAND_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"ON_DEMAND_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
},
"REQUEST_FEATURE_VIEWS": {
"REQUEST_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"REQUEST_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
},
"SAVED_DATASETS": {
"SAVED_DATASET_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"SAVED_DATASET_PROTO": {"type_code": 11, "type": "BINARY"},
},
"STREAM_FEATURE_VIEWS": {
"STREAM_FEATURE_VIEW_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"STREAM_FEATURE_VIEW_PROTO": {"type_code": 11, "type": "BINARY"},
"USER_METADATA": {"type_code": 11, "type": "BINARY"},
},
"VALIDATION_REFERENCES": {
"VALIDATION_REFERENCE_NAME": {"type_code": 2, "type": "VARCHAR"},
"PROJECT_ID": {"type_code": 2, "type": "VARCHAR"},
"LAST_UPDATED_TIMESTAMP": {"type_code": 6, "type": "TIMESTAMP_LTZ"},
"VALIDATION_REFERENCE_PROTO": {"type_code": 11, "type": "BINARY"},
},
}
10 changes: 5 additions & 5 deletions sdk/python/feast/infra/utils/snowflake/snowflake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ def __init__(self, config: str, autocommit=True):

def __enter__(self):

assert self.config.type in [
assert self.config.type in {
"snowflake.registry",
"snowflake.offline",
"snowflake.engine",
"snowflake.online",
]
}

if self.config.type not in _cache:
if self.config.type == "snowflake.registry":
config_header = "connections.feast_registry"
elif self.config.type == "snowflake.offline":
config_header = "connections.feast_offline_store"
if self.config.type == "snowflake.engine":
elif self.config.type == "snowflake.engine":
config_header = "connections.feast_batch_engine"
elif self.config.type == "snowflake.online":
config_header = "connections.feast_online_store"
Expand Down Expand Up @@ -113,11 +113,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def assert_snowflake_feature_names(feature_view: FeatureView) -> None:
for feature in feature_view.features:
assert feature.name not in [
assert feature.name not in {
"entity_key",
"feature_name",
"feature_value",
], f"Feature Name: {feature.name} is a protected name to ensure query stability"
}, f"Feature Name: {feature.name} is a protected name to ensure query stability"
return None


Expand Down

0 comments on commit 9a3590e

Please sign in to comment.