Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1010216: Support more option for create or replace dynamic table #2110

Merged
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@

### Snowpark Python API Updates

#### Improvements

- Added support for specifying the following parameters to `DataFrame.create_or_replace_dynamic_table`:
- `mode`
- `refresh_mode`
- `initialize`
- `clustering_keys`
- `is_transient`
- `data_retention_time`
- `max_data_extension_time`

#### Bug Fixes

- Fixed a bug in `session.read.csv` that caused an error when setting `PARSE_HEADER = True` in an externally defined file format.
Expand Down
3 changes: 2 additions & 1 deletion docs/source/snowpark/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ DataFrame
DataFrame.corr
DataFrame.count
DataFrame.cov
DataFrame.create_or_replace_dynamic_table
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
Expand Down Expand Up @@ -114,7 +115,7 @@ DataFrame
DataFrameAnalyticsFunctions.compute_lag
DataFrameAnalyticsFunctions.compute_lead
DataFrameAnalyticsFunctions.time_series_agg




Expand Down
22 changes: 16 additions & 6 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,12 +1126,22 @@ def do_resolve_with_resolved_children(

if isinstance(logical_plan, CreateDynamicTableCommand):
return self.plan_builder.create_or_replace_dynamic_table(
logical_plan.name,
logical_plan.warehouse,
logical_plan.lag,
logical_plan.comment,
resolved_children[logical_plan.child],
logical_plan,
name=logical_plan.name,
warehouse=logical_plan.warehouse,
lag=logical_plan.lag,
comment=logical_plan.comment,
create_mode=logical_plan.create_mode,
refresh_mode=logical_plan.refresh_mode,
initialize=logical_plan.initialize,
clustering_keys=[
self.analyze(x, df_aliased_col_name_to_real_col_name)
for x in logical_plan.clustering_exprs
],
is_transient=logical_plan.is_transient,
data_retention_time=logical_plan.data_retention_time,
max_data_extension_time=logical_plan.max_data_extension_time,
child=resolved_children[logical_plan.child],
source_plan=logical_plan,
)

if isinstance(logical_plan, CopyIntoTableNode):
Expand Down
53 changes: 41 additions & 12 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
PARTITION_BY = " PARTITION BY "
ORDER_BY = " ORDER BY "
CLUSTER_BY = " CLUSTER BY "
REFRESH_MODE = " REFRESH_MODE "
INITIALIZE = " INITIALIZE "
DATA_RETENTION_TIME_IN_DAYS = " DATA_RETENTION_TIME_IN_DAYS "
MAX_DATA_EXTENSION_TIME_IN_DAYS = " MAX_DATA_EXTENSION_TIME_IN_DAYS "
OVER = " OVER "
SELECT = " SELECT "
FROM = " FROM "
Expand All @@ -94,6 +98,7 @@
LAG = " LAG "
WAREHOUSE = " WAREHOUSE "
TEMPORARY = " TEMPORARY "
TRANSIENT = " TRANSIENT "
IF = " If "
INSERT = " INSERT "
OVERWRITE = " OVERWRITE "
Expand Down Expand Up @@ -1077,21 +1082,45 @@ def create_or_replace_view_statement(


def create_or_replace_dynamic_table_statement(
name: str, warehouse: str, lag: str, comment: Optional[str], child: str
name: str,
warehouse: str,
lag: str,
comment: Optional[str],
replace: bool,
if_not_exists: bool,
refresh_mode: Optional[str],
initialize: Optional[str],
clustering_keys: Iterable[str],
is_transient: bool,
data_retention_time: Optional[int],
max_data_extension_time: Optional[int],
child: str,
) -> str:
cluster_by_sql = (
f"{CLUSTER_BY}{LEFT_PARENTHESIS}{COMMA.join(clustering_keys)}{RIGHT_PARENTHESIS}"
if clustering_keys
else EMPTY_STRING
)
comment_sql = get_comment_sql(comment)
refresh_and_initialize_options = get_options_statement(
{
REFRESH_MODE: refresh_mode,
INITIALIZE: initialize,
}
)
data_retention_options = get_options_statement(
{
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
}
)

return (
CREATE
+ OR
+ REPLACE
+ DYNAMIC
+ TABLE
+ name
+ f"{LAG + EQUALS + convert_value_to_sql_option(lag)}"
+ f"{WAREHOUSE + EQUALS + warehouse}"
+ comment_sql
+ AS
+ project_statement([], child)
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}{TRANSIENT if is_transient else EMPTY_STRING}"
f"{DYNAMIC}{TABLE}{IF + NOT + EXISTS if if_not_exists else EMPTY_STRING}{name}{LAG}{EQUALS}"
f"{convert_value_to_sql_option(lag)}{WAREHOUSE}{EQUALS}{warehouse}"
f"{refresh_and_initialize_options}{cluster_by_sql}{data_retention_options}"
f"{comment_sql}{AS}{project_statement([], child)}"
)


Expand Down
35 changes: 34 additions & 1 deletion src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import (
CopyIntoLocationNode,
CopyIntoTableNode,
DynamicTableCreateMode,
LogicalPlan,
SaveMode,
SnowflakeCreateTable,
Expand Down Expand Up @@ -1088,6 +1089,13 @@ def create_or_replace_dynamic_table(
warehouse: str,
lag: str,
comment: Optional[str],
create_mode: DynamicTableCreateMode,
refresh_mode: Optional[str],
initialize: Optional[str],
clustering_keys: Iterable[str],
is_transient: bool,
data_retention_time: Optional[int],
max_data_extension_time: Optional[int],
child: SnowflakePlan,
source_plan: Optional[LogicalPlan],
) -> SnowflakePlan:
Expand All @@ -1097,10 +1105,35 @@ def create_or_replace_dynamic_table(
if not is_sql_select_statement(child.queries[0].sql.lower().strip()):
raise SnowparkClientExceptionMessages.PLAN_CREATE_DYNAMIC_TABLE_FROM_SELECT_ONLY()

if create_mode == DynamicTableCreateMode.OVERWRITE:
replace = True
if_not_exists = False
elif create_mode == DynamicTableCreateMode.ERROR_IF_EXISTS:
replace = False
if_not_exists = False
elif create_mode == DynamicTableCreateMode.IGNORE:
replace = False
if_not_exists = True
else:
# should never reach here
raise ValueError(f"Unknown create mode: {create_mode}") # pragma: no cover
sfc-gh-jrose marked this conversation as resolved.
Show resolved Hide resolved

child = child.replace_repeated_subquery_with_cte()
return self.build(
lambda x: create_or_replace_dynamic_table_statement(
name, warehouse, lag, comment, x
name=name,
warehouse=warehouse,
lag=lag,
comment=comment,
replace=replace,
if_not_exists=if_not_exists,
refresh_mode=refresh_mode,
initialize=initialize,
clustering_keys=clustering_keys,
is_transient=is_transient,
data_retention_time=data_retention_time,
max_data_extension_time=max_data_extension_time,
child=x,
),
child,
source_plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
}


class DynamicTableCreateMode(Enum):
OVERWRITE = "overwrite"
ERROR_IF_EXISTS = "errorifexists"
IGNORE = "ignore"


class SaveMode(Enum):
APPEND = "append"
OVERWRITE = "overwrite"
Expand Down
19 changes: 18 additions & 1 deletion src/snowflake/snowpark/_internal/analyzer/unary_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

from typing import Dict, List, Optional, Union
from typing import Dict, Iterable, List, Optional, Union

from snowflake.snowpark._internal.analyzer.expression import (
Expression,
Expand All @@ -14,6 +14,9 @@
sum_node_complexities,
)
from snowflake.snowpark._internal.analyzer.snowflake_plan import LogicalPlan
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import (
DynamicTableCreateMode,
)
from snowflake.snowpark._internal.analyzer.sort_expression import SortOrder


Expand Down Expand Up @@ -270,10 +273,24 @@ def __init__(
warehouse: str,
lag: str,
comment: Optional[str],
create_mode: DynamicTableCreateMode,
refresh_mode: Optional[str],
initialize: Optional[str],
clustering_exprs: Iterable[Expression],
is_transient: bool,
data_retention_time: Optional[int],
max_data_extension_time: Optional[int],
child: LogicalPlan,
) -> None:
super().__init__(child)
self.name = name
self.warehouse = warehouse
self.lag = lag
self.comment = comment
self.create_mode = create_mode
self.refresh_mode = refresh_mode
self.initialize = initialize
self.clustering_exprs = clustering_exprs
self.is_transient = is_transient
self.data_retention_time = data_retention_time
self.max_data_extension_time = max_data_extension_time
88 changes: 78 additions & 10 deletions src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from snowflake.snowpark._internal.analyzer.snowflake_plan import PlanQueryType
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import (
CopyIntoTableNode,
DynamicTableCreateMode,
Limit,
LogicalPlan,
SaveMode,
Expand Down Expand Up @@ -123,6 +124,7 @@
prepare_pivot_arguments,
quote_name,
random_name_for_temp_object,
str_to_enum,
validate_object_name,
)
from snowflake.snowpark.async_job import AsyncJob, _AsyncResultType
Expand Down Expand Up @@ -3418,6 +3420,13 @@ def create_or_replace_dynamic_table(
warehouse: str,
lag: str,
comment: Optional[str] = None,
mode: str = "overwrite",
refresh_mode: Optional[str] = None,
initialize: Optional[str] = None,
clustering_keys: Optional[Iterable[ColumnOrName]] = None,
is_transient: bool = False,
data_retention_time: Optional[int] = None,
max_data_extension_time: Optional[int] = None,
statement_params: Optional[Dict[str, str]] = None,
) -> List[Row]:
"""Creates a dynamic table that captures the computation expressed by this DataFrame.
Expand All @@ -3435,7 +3444,28 @@ def create_or_replace_dynamic_table(
lag: specifies the target data freshness
comment: Adds a comment for the created table. See
`COMMENT <https://docs.snowflake.com/en/sql-reference/sql/comment>`_.
mode: Specifies the behavior of create dynamic table. Allowed values are:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a dynamic table have truncate? I guess no. Just to double confirm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not support it in the create or replace dynamic table api. Here we map
overwrite = CREATE OR REPLACE DYNAMIC TABLE
errorifexists = CREATE DYNAMIC TABLE
ignore = CREATE DYNAMIC TABLE IF NOT EXISTS

- "overwrite" (default): Overwrite the table by dropping the old table.
- "errorifexists": Throw and exception if the table already exists.
- "ignore": Ignore the operation if table already exists.
refresh_mode: Specifies the refresh mode of the dynamic table. The value can be "AUTO",
"FULL", or "INCREMENTAL".
initialize: Specifies the behavior of initial refresh. The value can be "ON_CREATE" or
"ON_SCHEDULE".
clustering_keys: Specifies one or more columns or column expressions in the table as the clustering key.
See `Clustering Keys & Clustered Tables <https://docs.snowflake.com/en/user-guide/tables-clustering-keys>`_
for more details.
is_transient: A boolean value that specifies whether the dynamic table is transient.
data_retention_time: Specifies the retention period for the dynamic table in days so that
Time Travel actions can be performed on historical data in the dynamic table.
max_data_extension_time: Specifies the maximum number of days for which Snowflake can extend
the data retention period of the dynamic table to prevent streams on the dynamic table
from becoming stale.
statement_params: Dictionary of statement level parameters to be set while executing this action.

Note:
See `understanding dynamic table refresh <https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh>`_.
for more details on refresh mode.
"""
if isinstance(name, str):
formatted_name = name
Expand All @@ -3456,11 +3486,20 @@ def create_or_replace_dynamic_table(
"The lag input of create_or_replace_dynamic_table() can only be a str."
)

create_mode = str_to_enum(mode.lower(), DynamicTableCreateMode, "`mode`")

return self._do_create_or_replace_dynamic_table(
formatted_name,
warehouse,
lag,
comment,
name=formatted_name,
warehouse=warehouse,
lag=lag,
create_mode=create_mode,
comment=comment,
refresh_mode=refresh_mode,
initialize=initialize,
clustering_keys=clustering_keys,
is_transient=is_transient,
data_retention_time=data_retention_time,
max_data_extension_time=max_data_extension_time,
_statement_params=create_or_update_statement_params_with_query_tag(
statement_params, self._session.query_tag, SKIP_LEVELS_TWO
),
Expand Down Expand Up @@ -3529,15 +3568,44 @@ def _do_create_or_replace_view(
)

def _do_create_or_replace_dynamic_table(
self, name: str, warehouse: str, lag: str, comment: Optional[str], **kwargs
self,
name: str,
warehouse: str,
lag: str,
create_mode: DynamicTableCreateMode,
comment: Optional[str] = None,
refresh_mode: Optional[str] = None,
initialize: Optional[str] = None,
clustering_keys: Optional[Iterable[ColumnOrName]] = None,
is_transient: bool = False,
data_retention_time: Optional[int] = None,
max_data_extension_time: Optional[int] = None,
**kwargs,
):
validate_object_name(name)
clustering_exprs = (
[
_to_col_if_str(
col, "DataFrame.create_or_replace_dynamic_table"
)._expression
for col in clustering_keys
]
if clustering_keys
else []
)
cmd = CreateDynamicTableCommand(
name,
warehouse,
lag,
comment,
self._plan,
name=name,
warehouse=warehouse,
lag=lag,
comment=comment,
create_mode=create_mode,
refresh_mode=refresh_mode,
initialize=initialize,
clustering_exprs=clustering_exprs,
is_transient=is_transient,
data_retention_time=data_retention_time,
max_data_extension_time=max_data_extension_time,
child=self._plan,
)

return self._session._conn.execute(
Expand Down
Loading
Loading