From f5b1c386c1e95a68081aa2c2931d78d68f4cbd77 Mon Sep 17 00:00:00 2001 From: Alexander Bogdanowicz <55267935+akbog@users.noreply.github.com> Date: Tue, 6 Aug 2024 10:10:52 -0400 Subject: [PATCH] [dagster-sdf] Add additional metadata on materialize (#23358) ## Summary & Motivation This PR depends on updates made to the SDF CLI as of version 0.3.15. These updates enable more fine-grained control of the assets generated at compile and run time, via the `--save` and `--cache` parameters (see code for examples of usage). At run time (materialize), we ensure that the information schema is generated, and introduce a phase to the `SdfCliInvocation`, where we read additional metadata from the information schema (code-location, asset schema, materialized code snippets), once processing completes, yielding AssetObservations. ## How I Tested These Changes Updates to 0.3.15 should be covered by the corresponding updated tests. Tests have been introduced for code refs, new metadata, and updates to the dbt translator --- .../dagster-sdf/dagster_sdf/asset_utils.py | 72 ++++++++++- .../dagster_sdf/dagster_sdf_translator.py | 60 ++++++++- .../include/scaffold/assets.py.jinja | 2 +- .../dagster-sdf/dagster_sdf/resource.py | 5 +- .../dagster-sdf/dagster_sdf/sdf_cli_event.py | 9 +- .../dagster_sdf/sdf_cli_invocation.py | 22 +++- .../dagster_sdf/sdf_event_iterator.py | 8 +- .../dagster_sdf/sdf_information_schema.py | 121 ++++++++++++++++-- .../dagster-sdf/dagster_sdf/sdf_version.py | 2 +- .../dagster-sdf/dagster_sdf/sdf_workspace.py | 4 +- .../dagster_sdf_tests/cli/test_scaffold.py | 2 +- .../dagster-sdf/dagster_sdf_tests/conftest.py | 8 +- .../dagster_sdf_tests/test_asset_decorator.py | 116 ++++++++++++++++- .../dagster_sdf_tests/test_code_references.py | 83 ++++++++++++ .../dagster_sdf_tests/test_resource.py | 28 ++-- .../dagster_sdf_tests/test_table_metadata.py | 41 ++++++ python_modules/libraries/dagster-sdf/setup.py | 2 +- 17 files changed, 530 insertions(+), 55 deletions(-) create mode 100644 python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_code_references.py create mode 100644 python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_table_metadata.py diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py b/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py index 5d45dc9701bc2..7b88fc19809e6 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py @@ -1,4 +1,6 @@ -from typing import Mapping, Optional, Sequence +import textwrap +from pathlib import Path +from typing import Any, Dict, Mapping, Optional, Sequence from dagster import ( AssetKey, @@ -10,6 +12,8 @@ define_asset_job, ) +from .constants import SDF_TARGET_DIR + def dagster_name_fn(table_id: str) -> str: return table_id.replace(".", "_").replace("-", "_").replace("*", "_star") @@ -51,7 +55,7 @@ def build_schedule_from_sdf_selection( from dagster_sdf import sdf_assets, build_schedule_from_sdf_selection - @sdf_assets(manifest=...) + @sdf_assets(workspace=...) def all_sdf_assets(): ... @@ -93,7 +97,7 @@ def get_asset_key_for_table_id(sdf_assets: Sequence[AssetsDefinition], table_id: from dagster import asset from dagster_sdf import sdf_assets, get_asset_key_for_table_id - @sdf_assets(manifest=...) + @sdf_assets(workspace=...) def all_sdf_assets(): ... @@ -114,3 +118,65 @@ def upstream_python_asset(): f" {asset_keys_by_output_name.keys()}." ) return next(iter(asset_keys_by_output_name.values())) + + +def get_output_dir(target_dir: Path, environment: str) -> Path: + return target_dir.joinpath(SDF_TARGET_DIR, environment) + + +def get_info_schema_dir(target_dir: Path, environment: str) -> Path: + return get_output_dir(target_dir, environment).joinpath( + "data", "system", "information_schema::sdf" + ) + + +def get_materialized_sql_dir(target_dir: Path, environment: str) -> Path: + return get_output_dir(target_dir, environment).joinpath("materialized") + + +def get_table_path_from_parts(catalog_name: str, schema_name: str, table_name: str) -> Path: + return Path(catalog_name, schema_name, table_name) + + +def _read_sql_file(path_to_file: Path) -> str: + with open(path_to_file, "r") as file: + return textwrap.indent(file.read(), " ") + + +def default_description_fn( + table_row: Dict[str, Any], + workspace_dir: Optional[Path] = None, + output_dir: Optional[Path] = None, + enable_raw_sql_description: bool = True, + enable_materialized_sql_description: bool = True, +) -> str: + description_sections = [ + table_row["description"] or f"sdf {table_row['materialization']} {table_row['table_id']}", + ] + if enable_raw_sql_description: + if workspace_dir is None: + raise ValueError("workspace_dir must be provided to enable raw SQL description.") + path_to_file = None + for source_location in table_row["source_locations"]: + if source_location.endswith(".sql"): + path_to_file = workspace_dir.joinpath(source_location) + break + if path_to_file is not None and path_to_file.exists(): + description_sections.append(f"#### Raw SQL:\n```\n{_read_sql_file(path_to_file)}\n```") + if enable_materialized_sql_description: + if output_dir is None: + raise ValueError("output_dir must be provided to enable materialized SQL description.") + path_to_file = ( + output_dir.joinpath("materialized") + .joinpath( + get_table_path_from_parts( + table_row["catalog_name"], table_row["schema_name"], table_row["table_name"] + ) + ) + .with_suffix(".sql") + ) + if path_to_file.exists(): + description_sections.append( + f"#### Materialized SQL:\n```\n{_read_sql_file(path_to_file)}\n```" + ) + return "\n\n".join(filter(None, description_sections)) diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/dagster_sdf_translator.py b/python_modules/libraries/dagster-sdf/dagster_sdf/dagster_sdf_translator.py index 142da5b235994..990762d4a38ca 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/dagster_sdf_translator.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/dagster_sdf_translator.py @@ -1,5 +1,6 @@ from dataclasses import dataclass -from typing import Optional +from pathlib import Path +from typing import Any, Dict, Optional from dagster import ( AssetKey, @@ -7,12 +8,29 @@ ) from dagster._annotations import public -from .asset_utils import default_asset_key_fn +from .asset_utils import default_asset_key_fn, default_description_fn @dataclass(frozen=True) class DagsterSdfTranslatorSettings: - """Settings to enable Dagster features for your sdf project.""" + """Settings to enable Dagster features for your sdf project. + + Args: + enable_asset_checks (bool): Whether to load sdf table tests as Dagster asset checks. + Defaults to True. + enable_code_references (bool): Whether to enable Dagster code references for sdf tables. + Defaults to False. + enable_raw_sql_description (bool): Whether to display sdf raw sql in Dagster descriptions. + Defaults to True. + enable_materialized_sql_description (bool): Whether to display sdf materialized sql in + Dagster descriptions. Defaults to True. + + """ + + enable_asset_checks: bool = True + enable_code_references: bool = False + enable_raw_sql_description: bool = True + enable_materialized_sql_description: bool = True class DagsterSdfTranslator: @@ -42,6 +60,42 @@ def settings(self) -> DagsterSdfTranslatorSettings: def get_asset_key(self, fqn: str) -> AssetKey: return default_asset_key_fn(fqn) + @public + def get_description( + self, table_row: Dict[str, Any], workspace_dir: Optional[Path], output_dir: Optional[Path] + ) -> str: + """A function that takes a dictionary representing columns of an sdf table row in sdf's + information schema and returns the Dagster description for that table. + + This method can be overridden to provide a custom description for an sdf resource. + + Args: + table_row (Dict[str, Any]): A dictionary representing columns of an sdf table row. + workspace_dir (Optional[Path]): The path to the workspace directory. + + Returns: + str: The description for the dbt resource. + + Examples: + .. code-block:: python + + from typing import Any, Mapping + + from dagster_sdf import DagsterSdfTranslator + + + class CustomDagsterSdfTranslator(DagsterSdfTranslator): + def get_description(self, table_row: Dict[str, Any], workspace_dir: Optiona[Path], output_dir: Optional[Path]) -> str: + return "custom description" + """ + return default_description_fn( + table_row, + workspace_dir, + output_dir, + self.settings.enable_raw_sql_description, + self.settings.enable_materialized_sql_description, + ) + def validate_translator(dagster_sdf_translator: DagsterSdfTranslator) -> DagsterSdfTranslator: return check.inst_param( diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja b/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja index 4c2ccfcbecf58..72f957a8b3094 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/include/scaffold/assets.py.jinja @@ -10,4 +10,4 @@ workspace = SdfWorkspace(workspace_dir=sdf_workspace_dir, target_dir=target_dir, @sdf_assets(workspace=workspace) def {{ sdf_assets_name }}(context: AssetExecutionContext, sdf: SdfCliResource): - yield from sdf.cli(["run", "--no-cache"], target_dir=target_dir, environment=environment, context=context).stream() + yield from sdf.cli(["run", "--save", "info-schema", "--cache", "write-only"], target_dir=target_dir, environment=environment, context=context).stream() diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py b/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py index 67ae42e59fcae..f893351e37d87 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/resource.py @@ -13,7 +13,6 @@ DEFAULT_SDF_WORKSPACE_ENVIRONMENT, SDF_DAGSTER_OUTPUT_DIR, SDF_EXECUTABLE, - SDF_TARGET_DIR, SDF_WORKSPACE_YML, ) from .dagster_sdf_translator import DagsterSdfTranslator, validate_opt_translator @@ -146,8 +145,6 @@ def cli( target_args = ["--target-dir", str(target_path)] log_level_args = ["--log-level", "info"] - output_dir = target_path.joinpath(SDF_TARGET_DIR, environment) - # Ensure that the target_dir exists target_path.mkdir(parents=True, exist_ok=True) @@ -165,7 +162,7 @@ def cli( env=env, workspace_dir=Path(self.workspace_dir), target_dir=target_path, - output_dir=output_dir, + environment=environment, dagster_sdf_translator=dagster_sdf_translator, raise_on_error=raise_on_error, context=context, diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py index 9bad539084c2d..efab83f3a4944 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_event.py @@ -22,9 +22,10 @@ class SdfCliEventMessage: @property def is_result_event(self) -> bool: return ( - self.raw_event["ev"] == "cmd.do.derived" + bool(self.raw_event.get("ev_table")) + and bool(self.raw_event.get("status_type")) + and bool(self.raw_event.get("status_code")) and self.raw_event["ev_type"] == "close" - and bool(self.raw_event.get("status")) ) @public @@ -51,11 +52,11 @@ def to_default_asset_events( if not self.is_result_event: return - is_success = self.raw_event["status"] == "succeeded" + is_success = self.raw_event["status_code"] == "succeeded" if not is_success: return - table_id = self.raw_event["table"] + table_id = self.raw_event["ev_table"] default_metadata = { "table_id": table_id, "Execution Duration": self.raw_event["ev_dur_ms"] / 1000, diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py index 0621d7389cf1a..ed9eb984d59d5 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_cli_invocation.py @@ -11,10 +11,12 @@ from dagster._core.errors import DagsterExecutionInterruptedError from typing_extensions import Literal +from .constants import SDF_TARGET_DIR from .dagster_sdf_translator import DagsterSdfTranslator from .errors import DagsterSdfCliRuntimeError from .sdf_cli_event import SdfCliEventMessage from .sdf_event_iterator import SdfDagsterEventType, SdfEventIterator +from .sdf_information_schema import SdfInformationSchema logger = get_dagster_logger() @@ -27,14 +29,16 @@ class SdfCliInvocation: Args: process (subprocess.Popen): The process running the sdf command. + workspace_dir (Path): The path to the workspace directory. target_dir (Path): The path to the target directory. - output_dir (Path): The path to the output directory. + enviornment (str): The environment to use. raise_on_error (bool): Whether to raise an exception if the sdf command fails. """ process: subprocess.Popen + workspace_dir: Path target_dir: Path - output_dir: Path + environment: str dagster_sdf_translator: DagsterSdfTranslator raise_on_error: bool context: Optional[OpExecutionContext] = field(default=None, repr=False) @@ -50,7 +54,7 @@ def run( env: Dict[str, str], workspace_dir: Path, target_dir: Path, - output_dir: Path, + environment: str, dagster_sdf_translator: DagsterSdfTranslator, raise_on_error: bool, context: Optional[OpExecutionContext], @@ -66,8 +70,9 @@ def run( sdf_cli_invocation = cls( process=process, + workspace_dir=workspace_dir, target_dir=target_dir, - output_dir=output_dir, + environment=environment, dagster_sdf_translator=dagster_sdf_translator, raise_on_error=raise_on_error, context=context, @@ -124,6 +129,11 @@ def _stream_asset_events( yield from event.to_default_asset_events( dagster_sdf_translator=self.dagster_sdf_translator, context=self.context ) + yield from SdfInformationSchema( + workspace_dir=self.workspace_dir, + target_dir=self.target_dir, + environment=self.environment, + ).stream_asset_observations(dagster_sdf_translator=self.dagster_sdf_translator) @public def stream( @@ -148,7 +158,7 @@ def stream( from dagster_sdf import SdfCliResource, sdf_assets - @sdf_assets(workspace_dir="/path/to/sdf/workspace") + @sdf_assets(workspace=SdfWorkspace(workspace_dir="/path/to/sdf/workspace")) def my_sdf_assets(context, sdf: SdfCliResource): yield from sdf.cli(["run"], context=context).stream() """ @@ -203,7 +213,7 @@ def get_artifact( # Retrieve the makefile-run.json artifact. run_results = sdf_cli_invocation.get_artifact("makefile-run.json") """ - artifact_path = self.output_dir.joinpath(artifact) + artifact_path = self.target_dir.joinpath(SDF_TARGET_DIR, self.environment, artifact) return orjson.loads(artifact_path.read_bytes()) diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_event_iterator.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_event_iterator.py index da28a47c1eb52..2b3d24a2db6fe 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_event_iterator.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_event_iterator.py @@ -1,13 +1,13 @@ from collections import abc from typing import TYPE_CHECKING, Generic, Iterator, Union -from dagster import AssetMaterialization, Output +from dagster import AssetMaterialization, AssetObservation, Output from typing_extensions import TypeVar if TYPE_CHECKING: from .sdf_cli_invocation import SdfCliInvocation -SdfDagsterEventType = Union[Output, AssetMaterialization] +SdfDagsterEventType = Union[Output, AssetMaterialization, AssetObservation] # We define SdfEventIterator as a generic type for the sake of type hinting. # This is so that users who inspect the type of the return value of `SdfCliInvocation.stream()` @@ -22,10 +22,10 @@ class SdfEventIterator(Generic[T], abc.Iterator): def __init__( self, - events: Iterator[T], + exec_events: Iterator[T], sdf_cli_invocation: "SdfCliInvocation", ) -> None: - self._inner_iterator = events + self._inner_iterator = exec_events self._sdf_cli_invocation = sdf_cli_invocation def __next__(self) -> T: diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py index 172779fca896d..dd0f05acda2d7 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_information_schema.py @@ -1,25 +1,28 @@ +import os from pathlib import Path -from typing import Dict, Literal, Optional, Sequence, Set, Tuple, Union +from typing import Any, Dict, Iterator, List, Literal, Optional, Sequence, Set, Tuple, Union import dagster._check as check import polars as pl -from dagster import AssetDep, AssetKey, AssetOut, Nothing +from dagster import AssetDep, AssetKey, AssetObservation, AssetOut, Nothing, TableColumn +from dagster._core.definitions.metadata import ( + CodeReferencesMetadataSet, + CodeReferencesMetadataValue, + LocalFileCodeReference, + TableColumnConstraints, + TableMetadataSet, + TableSchema, +) from dagster._record import IHaveNew, record_custom -from .asset_utils import dagster_name_fn +from .asset_utils import dagster_name_fn, get_info_schema_dir, get_output_dir from .constants import ( DEFAULT_SDF_WORKSPACE_ENVIRONMENT, SDF_INFORMATION_SCHEMA_TABLES_STAGE_COMPILE, SDF_INFORMATION_SCHEMA_TABLES_STAGE_PARSE, - SDF_TARGET_DIR, ) from .dagster_sdf_translator import DagsterSdfTranslator - - -def get_info_schema_dir(target_dir: Path, environment: str) -> Path: - return target_dir.joinpath( - SDF_TARGET_DIR, environment, "data", "system", "information_schema::sdf" - ) +from .sdf_event_iterator import SdfDagsterEventType @record_custom(checked=False) @@ -42,6 +45,9 @@ class SdfInformationSchema(IHaveNew): environment (str, optional): The environment to use. Defaults to "dbg". """ + workspace_dir: Path + target_dir: Path + environment: str information_schema_dir: Path information_schema: Dict[str, pl.DataFrame] @@ -66,6 +72,9 @@ def __new__( return super().__new__( cls, + workspace_dir=workspace_dir, + target_dir=target_dir, + environment=environment, information_schema_dir=information_schema_dir, information_schema={}, ) @@ -121,14 +130,23 @@ def build_sdf_multi_asset_args( for table_row in table_deps.rows(named=True): asset_key = dagster_sdf_translator.get_asset_key(table_row["table_id"]) output_name = dagster_name_fn(table_row["table_id"]) + code_references = None + if dagster_sdf_translator.settings.enable_code_references: + code_references = self._extract_code_ref(table_row) + metadata = {**(code_references if code_references else {})} # If the table is a annotated as a dependency, we don't need to create an output for it if table_row["table_id"] not in table_id_to_dep: outs[output_name] = AssetOut( key=asset_key, dagster_type=Nothing, io_manager_key=io_manager_key, - description=table_row["description"], + description=dagster_sdf_translator.get_description( + table_row, + self.workspace_dir, + get_output_dir(self.target_dir, self.environment), + ), is_required=False, + metadata=metadata, ) internal_asset_deps[output_name] = { table_id_to_dep[dep] @@ -139,9 +157,88 @@ def build_sdf_multi_asset_args( ) # Otherwise, use the translator to get the asset key for dep in table_row["depends_on"] }.union(table_id_to_upstream.get(table_row["table_id"], set())) - return deps, outs, internal_asset_deps + def get_columns(self) -> Dict[str, List[TableColumn]]: + columns = self.read_table("columns")[ + ["table_id", "column_id", "classifiers", "column_name", "datatype", "description"] + ] + table_columns: Dict[str, List[TableColumn]] = {} + for row in columns.rows(named=True): + if row["table_id"] not in table_columns: + table_columns[row["table_id"]] = [] + table_columns[row["table_id"]].append( + TableColumn( + name=row["column_name"], + type=row["datatype"], + description=row["description"], + constraints=TableColumnConstraints(other=row["classifiers"]), + ) + ) + return table_columns + + def _extract_code_ref( + self, table_row: Dict[str, Any] + ) -> Union[CodeReferencesMetadataSet, None]: + code_references = None + # Check if any of the source locations are .sql files, return the first one + loc = ( + next( + ( + source_location + for source_location in table_row["source_locations"] + if source_location.endswith(".sql") + ), + None, + ) + or next( + ( + source_location + for source_location in table_row["source_locations"] + if source_location.endswith(".sdf.yml") + ), + None, + ) + or "workspace.sdf.yml" + ) + code_references = CodeReferencesMetadataSet( + code_references=CodeReferencesMetadataValue( + code_references=[ + LocalFileCodeReference(file_path=os.fspath(self.workspace_dir.joinpath(loc))) + ] + ) + ) + return code_references + + def stream_asset_observations( + self, dagster_sdf_translator: DagsterSdfTranslator + ) -> Iterator[SdfDagsterEventType]: + table_columns = self.get_columns() + tables = self.read_table("tables").filter( + ~pl.col("purpose").is_in(["system", "external-system"]) + ) + for table_row in tables.rows(named=True): + asset_key = dagster_sdf_translator.get_asset_key(table_row["table_id"]) + code_references = None + if dagster_sdf_translator.settings.enable_code_references: + code_references = self._extract_code_ref(table_row) + metadata = { + **TableMetadataSet( + column_schema=TableSchema( + columns=table_columns.get(table_row["table_id"], []), + ), + relation_identifier=table_row["table_id"], + ), + **(code_references if code_references else {}), + } + yield AssetObservation( + asset_key=asset_key, + description=dagster_sdf_translator.get_description( + table_row, self.workspace_dir, get_output_dir(self.target_dir, self.environment) + ), + metadata=metadata, + ) + def is_compiled(self) -> bool: for table in SDF_INFORMATION_SCHEMA_TABLES_STAGE_COMPILE: if not any(self.information_schema_dir.joinpath(table).iterdir()): diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py index 20f8dda551931..327c9b89edb64 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_version.py @@ -1 +1 @@ -SDF_VERSION_UPPER_BOUND = "0.3.13" +SDF_VERSION_UPPER_BOUND = "0.3.17" diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py index 0bc48ef2df36a..e4e9c47dbbb2b 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf/sdf_workspace.py @@ -51,9 +51,9 @@ def __init__( Args: generate_cli_args (Sequence[str]): The arguments to pass to the sdf cli to prepare the workspace. - Default: ["compile", "--stage==parse"] + Default: ["compile", "--save==table-deps"] """ - self._generate_cli_args = generate_cli_args or ["compile", "--stage=parse"] + self._generate_cli_args = generate_cli_args or ["compile", "--save", "table-deps"] def on_load(self, workspace: "SdfWorkspace"): if self.using_dagster_dev() or self.compile_on_load_opt_in(): diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/cli/test_scaffold.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/cli/test_scaffold.py index 4a3fe3cdd07bf..ee63214aa0523 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/cli/test_scaffold.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/cli/test_scaffold.py @@ -79,7 +79,7 @@ def test_project_scaffold_command( ) subprocess.run( - ["sdf", "compile", "--stage", "parse", "--target-dir", SDF_DAGSTER_OUTPUT_DIR], + ["sdf", "compile", "--save", "table-deps", "--target-dir", SDF_DAGSTER_OUTPUT_DIR], cwd=sdf_workspace_dir, check=True, ) diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py index 087643cfb73e5..aad54fc1c0587 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/conftest.py @@ -17,10 +17,14 @@ def _create_sdf_invocation( workspace_dir=os.fspath(workspace_dir), global_config_flags=["--log-form=nested"] ) - sdf_invocation = sdf.cli(["compile", "--stage=parse"], environment=environment).wait() + sdf_invocation = sdf.cli( + ["compile", "--save", "table-deps"], environment=environment, raise_on_error=False + ).wait() if run_workspace: - sdf.cli(["run"], environment=environment, raise_on_error=False).wait() + sdf.cli( + ["run", "--save", "info-schema"], environment=environment, raise_on_error=True + ).wait() return sdf_invocation diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py index 246c9c1e319be..ec9d93f4815b3 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_asset_decorator.py @@ -3,7 +3,7 @@ from dagster import AssetExecutionContext, AssetKey, materialize from dagster_sdf.asset_decorator import sdf_assets from dagster_sdf.asset_utils import get_asset_key_for_table_id -from dagster_sdf.dagster_sdf_translator import DagsterSdfTranslator +from dagster_sdf.dagster_sdf_translator import DagsterSdfTranslator, DagsterSdfTranslatorSettings from dagster_sdf.resource import SdfCliResource from dagster_sdf.sdf_workspace import SdfWorkspace @@ -90,7 +90,11 @@ def test_sdf_with_materialize(moms_flower_shop_target_dir: Path) -> None: ) ) def my_sdf_assets(context: AssetExecutionContext, sdf: SdfCliResource): - yield from sdf.cli(["run"], context=context).stream() + yield from sdf.cli( + ["run", "--save", "info-schema"], + target_dir=moms_flower_shop_target_dir, + context=context, + ).stream() result = materialize( [my_sdf_assets], @@ -196,3 +200,111 @@ def my_flower_shop_assets(): ... assert asset_key == AssetKey( ["pre-moms_flower_shop-suff", "pre-raw-suff", "pre-raw_addresses-suff"] ) + + +def test_asset_descriptions(moms_flower_shop_target_dir: str) -> None: + @sdf_assets( + workspace=SdfWorkspace( + workspace_dir=moms_flower_shop_path, + target_dir=moms_flower_shop_target_dir, + ), + dagster_sdf_translator=DagsterSdfTranslator( + settings=DagsterSdfTranslatorSettings(enable_raw_sql_description=False) + ), + ) + def my_flower_shop_assets(): ... + + # Currently, the expected output is duplicated, but this is a bug in the sdf cli and will be + # fixed in a future release. + assert my_flower_shop_assets.descriptions_by_key == { + AssetKey( + ["moms_flower_shop", "staging", "app_installs"] + ): "This table is a staging table which adds campaign information to app install events\n\nThis table is a staging table which adds campaign information to app install events\n", + AssetKey( + ["moms_flower_shop", "analytics", "agg_installs_and_campaigns"] + ): "sdf view moms_flower_shop.analytics.agg_installs_and_campaigns", + AssetKey( + ["moms_flower_shop", "raw", "raw_inapp_events"] + ): "Logged actions (events) that users perform inside the mobile app of mom's flower shop.\nLogged actions (events) that users perform inside the mobile app of mom's flower shop.", + AssetKey( + ["moms_flower_shop", "analytics", "dim_marketing_campaigns"] + ): "sdf view moms_flower_shop.analytics.dim_marketing_campaigns", + AssetKey( + ["moms_flower_shop", "raw", "raw_marketing_campaign_events"] + ): "An hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n\nAn hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n", + AssetKey( + ["moms_flower_shop", "raw", "raw_customers"] + ): "All relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n", + AssetKey( + ["moms_flower_shop", "staging", "inapp_events"] + ): "sdf view moms_flower_shop.staging.inapp_events", + AssetKey( + ["moms_flower_shop", "raw", "raw_addresses"] + ): "All relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n", + AssetKey( + ["moms_flower_shop", "staging", "stg_installs_per_campaign"] + ): "sdf view moms_flower_shop.staging.stg_installs_per_campaign", + AssetKey( + ["moms_flower_shop", "staging", "app_installs_v2"] + ): "sdf view moms_flower_shop.staging.app_installs_v2", + AssetKey( + ["moms_flower_shop", "staging", "customers"] + ): "sdf view moms_flower_shop.staging.customers", + AssetKey( + ["moms_flower_shop", "staging", "marketing_campaigns"] + ): "sdf view moms_flower_shop.staging.marketing_campaigns", + } + + +def test_asset_descriptions_with_raw_sql(moms_flower_shop_target_dir: str) -> None: + @sdf_assets( + workspace=SdfWorkspace( + workspace_dir=moms_flower_shop_path, + target_dir=moms_flower_shop_target_dir, + ), + dagster_sdf_translator=DagsterSdfTranslator( + settings=DagsterSdfTranslatorSettings(enable_raw_sql_description=True) + ), + ) + def my_flower_shop_assets(): ... + + # Currently, the expected output is duplicated, but this is a bug in the sdf cli and will be + # fixed in a future release. + assert my_flower_shop_assets.descriptions_by_key == { + AssetKey( + ["moms_flower_shop", "raw", "raw_marketing_campaign_events"] + ): "An hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n\nAn hourly table logging marketing campaigns. If a campaign is running that hour, it will be logged in the table. If no campaigns are running for a certain houe, no campaigns will be logged.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_marketing_campaign_events \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/marketing_campaign_events.parquet');\n```", + AssetKey( + ["moms_flower_shop", "analytics", "dim_marketing_campaigns"] + ): "sdf view moms_flower_shop.analytics.dim_marketing_campaigns\n\n#### Raw SQL:\n```\n SELECT \n -- marketing campaigns dimensions\n m.campaign_id,\n m.campaign_name,\n -- metrics\n i.total_num_installs,\n total_campaign_spent / \n NULLIF(i.total_num_installs, 0) AS avg_customer_acquisition_cost,\n campaign_duration / \n NULLIF(i.total_num_installs, 0) AS install_duration_ratio\n FROM staging.marketing_campaigns m\n LEFT OUTER JOIN staging.stg_installs_per_campaign i\n ON (m.campaign_id = i.campaign_id)\n ORDER BY total_num_installs DESC NULLS LAST\n```", + AssetKey( + ["moms_flower_shop", "analytics", "agg_installs_and_campaigns"] + ): "sdf view moms_flower_shop.analytics.agg_installs_and_campaigns\n\n#### Raw SQL:\n```\n SELECT \n -- install events data\n DATE_FORMAT(install_time, '%Y-%m-%d') AS install_date,\n campaign_name,\n platform,\n COUNT(DISTINCT customer_id) AS distinct_installs\n FROM staging.app_installs_v2\n GROUP BY 1,2,3\n```", + AssetKey( + ["moms_flower_shop", "raw", "raw_customers"] + ): "All relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to customers known to mom's flower shop. This information comes from the user input into the mobile app.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_customers \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/customers.parquet');\n```", + AssetKey( + ["moms_flower_shop", "raw", "raw_addresses"] + ): "All relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n\nAll relevant information related to street addresses known to mom's flower shop. This information comes from the user input into the mobile app.\n\n\n#### Raw SQL:\n```\n CREATE TABLE raw_addresses \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/addresses.parquet');\n```", + AssetKey( + ["moms_flower_shop", "staging", "inapp_events"] + ): "sdf view moms_flower_shop.staging.inapp_events\n\n#### Raw SQL:\n```\n SELECT \n event_id,\n customer_id,\n FROM_UNIXTIME(event_time/1000) AS event_time, \n event_name,\n event_value,\n additional_details,\n platform,\n campaign_id\n FROM raw.raw_inapp_events\n```", + AssetKey( + ["moms_flower_shop", "raw", "raw_inapp_events"] + ): "Logged actions (events) that users perform inside the mobile app of mom's flower shop.\nLogged actions (events) that users perform inside the mobile app of mom's flower shop.\n\n#### Raw SQL:\n```\n CREATE TABLE raw_inapp_events \n WITH (FORMAT='PARQUET', LOCATION='seeds/parquet/inapp_events.parquet');\n```", + AssetKey( + ["moms_flower_shop", "staging", "app_installs"] + ): "This table is a staging table which adds campaign information to app install events\n\nThis table is a staging table which adds campaign information to app install events\n\n\n#### Raw SQL:\n```\n SELECT \n -- install events data\n COALESCE(m.event_id, i.event_id) AS event_id,\n i.customer_id,\n i.event_time AS install_time,\n i.platform,\n\n -- marketing campaigns data - if doesn't exist than organic\n COALESCE(m.campaign_id, -1) AS campaign_id, \n COALESCE(m.campaign_name, 'organic') AS campaign_name,\n COALESCE(m.c_name, 'organic') AS campaign_type\n FROM inapp_events i \n LEFT OUTER JOIN raw.raw_marketing_campaign_events m\n ON (i.event_id = m.event_id) \n WHERE event_name = 'install'\n```", + AssetKey( + ["moms_flower_shop", "staging", "stg_installs_per_campaign"] + ): "sdf view moms_flower_shop.staging.stg_installs_per_campaign\n\n#### Raw SQL:\n```\n SELECT \n campaign_id,\n COUNT(event_id) AS total_num_installs\n FROM app_installs_v2\n GROUP BY 1\n\n```", + AssetKey( + ["moms_flower_shop", "staging", "customers"] + ): "sdf view moms_flower_shop.staging.customers\n\n#### Raw SQL:\n```\n SELECT \n c.id AS customer_id,\n c.first_name,\n c.last_name,\n c.first_name || ' ' || c.last_name AS full_name,\n c.email,\n c.gender,\n \n -- Marketing info\n i.campaign_id,\n i.campaign_name,\n i.campaign_type,\n\n -- Address info\n c.address_id,\n a.full_address,\n a.state\n FROM raw.raw_customers c \n\n LEFT OUTER JOIN app_installs_v2 i\n ON (c.id = i.customer_id)\n\n LEFT OUTER JOIN raw.raw_addresses a\n ON (c.address_id = a.address_id)\n\n```", + AssetKey( + ["moms_flower_shop", "staging", "app_installs_v2"] + ): "sdf view moms_flower_shop.staging.app_installs_v2\n\n#### Raw SQL:\n```\n SELECT \n DISTINCT\n -- install events data\n i.event_id,\n i.customer_id,\n i.event_time AS install_time,\n i.platform,\n\n -- marketing campaigns data - if doesn't exist than organic\n COALESCE(m.campaign_id, -1) AS campaign_id, \n COALESCE(m.campaign_name, 'organic') AS campaign_name,\n COALESCE(m.c_name, 'organic') AS campaign_type\n FROM inapp_events i \n LEFT OUTER JOIN raw.raw_marketing_campaign_events m\n ON (i.campaign_id = m.campaign_id) \n WHERE event_name = 'install'\n```", + AssetKey( + ["moms_flower_shop", "staging", "marketing_campaigns"] + ): "sdf view moms_flower_shop.staging.marketing_campaigns\n\n#### Raw SQL:\n```\n SELECT \n campaign_id,\n campaign_name,\n SUBSTR(c_name, 1, LENGTH(c_name)-1) AS campaign_type,\n MIN(\n FROM_UNIXTIME(event_time/1000) -- convert unixtime from milliseconds to seconds\n ) AS start_time,\n MAX(\n FROM_UNIXTIME(event_time/1000) -- convert unixtime from milliseconds to seconds\n ) AS end_time,\n COUNT(event_time) AS campaign_duration,\n SUM(cost) AS total_campaign_spent,\n ARRAY_AGG(event_id) AS event_ids\n FROM raw.raw_marketing_campaign_events\n GROUP BY \n campaign_id,\n campaign_name,\n campaign_type\n\n```", + } diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_code_references.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_code_references.py new file mode 100644 index 0000000000000..27628d58707e5 --- /dev/null +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_code_references.py @@ -0,0 +1,83 @@ +import os +from pathlib import Path + +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.metadata.source_code import ( + LocalFileCodeReference, + with_source_code_references, +) +from dagster_sdf.asset_decorator import sdf_assets +from dagster_sdf.dagster_sdf_translator import DagsterSdfTranslator, DagsterSdfTranslatorSettings +from dagster_sdf.sdf_workspace import SdfWorkspace + +from .sdf_workspaces import moms_flower_shop_path + + +def test_basic_attach_code_references(moms_flower_shop_target_dir: Path) -> None: + @sdf_assets( + workspace=SdfWorkspace( + workspace_dir=moms_flower_shop_path, + target_dir=moms_flower_shop_target_dir, + ), + dagster_sdf_translator=DagsterSdfTranslator( + settings=DagsterSdfTranslatorSettings(enable_code_references=True) + ), + ) + def my_flower_shop_assets(): ... + + for asset_key, asset_metadata in my_flower_shop_assets.metadata_by_key.items(): + assert "dagster/code_references" in asset_metadata + + references = asset_metadata["dagster/code_references"].code_references + assert len(references) == 1 + + reference = references[0] + assert isinstance(reference, LocalFileCodeReference) + assert reference.file_path.endswith( + asset_key.path[-1] + ".sql" + ) or reference.file_path.endswith(asset_key.path[-1] + ".sdf.yml") + assert os.path.exists(reference.file_path), reference.file_path + + +def test_basic_attach_code_references_disabled(moms_flower_shop_target_dir: Path) -> None: + @sdf_assets( + workspace=SdfWorkspace( + workspace_dir=moms_flower_shop_path, + target_dir=moms_flower_shop_target_dir, + ), + dagster_sdf_translator=DagsterSdfTranslator( + settings=DagsterSdfTranslatorSettings(enable_code_references=False) + ), + ) + def my_flower_shop_assets(): ... + + for asset_metadata in my_flower_shop_assets.metadata_by_key.values(): + assert "dagster/code_references" not in asset_metadata + + +def test_with_source_code_references_wrapper(moms_flower_shop_target_dir: Path) -> None: + @sdf_assets( + workspace=SdfWorkspace( + workspace_dir=moms_flower_shop_path, + target_dir=moms_flower_shop_target_dir, + ), + dagster_sdf_translator=DagsterSdfTranslator( + settings=DagsterSdfTranslatorSettings(enable_code_references=True) + ), + ) + def my_flower_shop_assets(): ... + + defs = Definitions(assets=with_source_code_references([my_flower_shop_assets])) + + assets = defs.get_asset_graph().all_asset_keys + + for asset_key in assets: + asset_metadata = defs.get_assets_def(asset_key).specs_by_key[asset_key].metadata + assert "dagster/code_references" in asset_metadata + + references = asset_metadata["dagster/code_references"].code_references + assert len(references) == 2 + + code_reference = references[1] + assert isinstance(code_reference, LocalFileCodeReference) + assert code_reference.file_path.endswith("test_code_references.py") diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py index cb7fdb8420bea..ae9673c83a5b1 100644 --- a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_resource.py @@ -8,7 +8,7 @@ from dagster import In, Nothing, Out, job, op from dagster._core.errors import DagsterExecutionInterruptedError from dagster._core.execution.context.compute import OpExecutionContext -from dagster_sdf.constants import SDF_DAGSTER_OUTPUT_DIR +from dagster_sdf.constants import SDF_DAGSTER_OUTPUT_DIR, SDF_TARGET_DIR from dagster_sdf.resource import SdfCliResource from pydantic import ValidationError from pytest_mock import MockerFixture @@ -29,6 +29,8 @@ def test_sdf_cli(global_config_flags: List[str]) -> None: "--log-level", "info", "compile", + "--save", + "table-deps", "--environment", "dbg", "--target-dir", @@ -37,7 +39,7 @@ def test_sdf_cli(global_config_flags: List[str]) -> None: sdf = SdfCliResource( workspace_dir=os.fspath(moms_flower_shop_path), global_config_flags=global_config_flags ) - sdf_cli_invocation = sdf.cli(["compile"]) + sdf_cli_invocation = sdf.cli(["compile", "--save", "table-deps"]) *_, target_dir = sdf_cli_invocation.process.args # type: ignore assert sdf_cli_invocation.process.args == [*expected_sdf_cli_args, target_dir] @@ -52,7 +54,7 @@ def test_sdf_cli_executable() -> None: sdf_executable = cast(str, shutil.which("sdf")) invocation = SdfCliResource( workspace_dir=os.fspath(moms_flower_shop_path), sdf_executable=sdf_executable - ).cli(["compile"]) + ).cli(["compile", "--save", "table-deps"]) assert invocation.is_successful() assert not invocation.get_error() @@ -65,7 +67,7 @@ def test_sdf_cli_workspace_dir_path() -> None: sdf = SdfCliResource(workspace_dir=os.fspath(moms_flower_shop_path)) assert Path(sdf.workspace_dir).is_absolute() - assert sdf.cli(["compile"]).is_successful() + assert sdf.cli(["compile", "--save", "table-deps"]).is_successful() # workspace directory must exist with pytest.raises(ValidationError, match="does not exist"): @@ -122,12 +124,20 @@ def test_sdf_cli_get_artifact(sdf: SdfCliResource) -> None: def test_sdf_cli_target_dir(tmp_path: Path, sdf: SdfCliResource) -> None: sdf_cli_invocation_1 = sdf.cli(["compile"], target_dir=tmp_path).wait() manifest_st_mtime_1 = ( - sdf_cli_invocation_1.output_dir.joinpath("makefile-compile.json").stat().st_mtime + sdf_cli_invocation_1.target_dir.joinpath( + SDF_TARGET_DIR, sdf_cli_invocation_1.environment, "makefile-compile.json" + ) + .stat() + .st_mtime ) sdf_cli_invocation_2 = sdf.cli(["compile"], target_dir=sdf_cli_invocation_1.target_dir).wait() manifest_st_mtime_2 = ( - sdf_cli_invocation_2.output_dir.joinpath("makefile-compile.json").stat().st_mtime + sdf_cli_invocation_2.target_dir.joinpath( + SDF_TARGET_DIR, sdf_cli_invocation_2.environment, "makefile-compile.json" + ) + .stat() + .st_mtime ) # The target path should be the same for both invocations @@ -158,11 +168,11 @@ def test_sdf_environment_configuration(sdf: SdfCliResource) -> None: def test_sdf_cli_op_execution(sdf: SdfCliResource) -> None: @op(out={}) def my_sdf_op_yield_events(context: OpExecutionContext, sdf: SdfCliResource): - yield from sdf.cli(["run"], context=context).stream() + yield from sdf.cli(["run", "--save", "info-schema"], context=context).stream() @op(out=Out(Nothing)) def my_sdf_op_yield_events_with_downstream(context: OpExecutionContext, sdf: SdfCliResource): - yield from sdf.cli(["run"], context=context).stream() + yield from sdf.cli(["run", "--save", "info-schema"], context=context).stream() @op(ins={"depends_on": In(dagster_type=Nothing)}) def my_downstream_op(): ... @@ -191,7 +201,7 @@ def test_custom_subclass(): def test_metadata(sdf: SdfCliResource) -> None: @op(out={}) def my_sdf_op_yield_events(context: OpExecutionContext, sdf: SdfCliResource): - yield from sdf.cli(["run"], context=context).stream() + yield from sdf.cli(["run", "--save", "info-schema"], context=context).stream() @job def my_sdf_job_yield_events(): diff --git a/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_table_metadata.py b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_table_metadata.py new file mode 100644 index 0000000000000..cfb42d9cf72b7 --- /dev/null +++ b/python_modules/libraries/dagster-sdf/dagster_sdf_tests/test_table_metadata.py @@ -0,0 +1,41 @@ +from pathlib import Path + +from dagster import AssetExecutionContext, materialize +from dagster._core.definitions.metadata import TableSchemaMetadataValue +from dagster_sdf.asset_decorator import sdf_assets +from dagster_sdf.dagster_sdf_translator import DagsterSdfTranslator, DagsterSdfTranslatorSettings +from dagster_sdf.resource import SdfCliResource +from dagster_sdf.sdf_workspace import SdfWorkspace + +from .sdf_workspaces import moms_flower_shop_path + + +def test_table_metadata_column_schema(moms_flower_shop_target_dir: Path) -> None: + @sdf_assets( + workspace=SdfWorkspace( + workspace_dir=moms_flower_shop_path, + target_dir=moms_flower_shop_target_dir, + ), + dagster_sdf_translator=DagsterSdfTranslator( + settings=DagsterSdfTranslatorSettings(enable_raw_sql_description=True) + ), + ) + def my_flower_shop_assets(context: AssetExecutionContext, sdf: SdfCliResource): + yield from sdf.cli( + ["run", "--save", "info-schema"], + target_dir=moms_flower_shop_target_dir, + context=context, + ).stream() + + result = materialize( + [my_flower_shop_assets], + resources={"sdf": SdfCliResource(workspace_dir=moms_flower_shop_path)}, + ) + + assert result.success + + for event in result.get_asset_observation_events(): + metadata = event.asset_observation_data.asset_observation.metadata + assert "dagster/column_schema" in metadata + assert isinstance(metadata["dagster/column_schema"], TableSchemaMetadataValue) + assert len(metadata["dagster/column_schema"].schema.columns) > 0 diff --git a/python_modules/libraries/dagster-sdf/setup.py b/python_modules/libraries/dagster-sdf/setup.py index f9e31f97facc2..85ca09d7b55f1 100644 --- a/python_modules/libraries/dagster-sdf/setup.py +++ b/python_modules/libraries/dagster-sdf/setup.py @@ -41,7 +41,7 @@ def get_version() -> Tuple[str, str]: python_requires=">=3.8,<3.13", install_requires=[ f"dagster{pin}", - f"sdf-cli>=0.3.12,<{SDF_VERSION_UPPER_BOUND}", + f"sdf-cli>=0.3.16,<{SDF_VERSION_UPPER_BOUND}", "orjson", "polars", "typer>=0.9.0",