Skip to content

Commit

Permalink
[dagster-sdf] Add additional metadata on materialize (#23358)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR depends on updates made to the SDF CLI as of version 0.3.15.
These updates enable more fine-grained control of the assets generated
at compile and run time, via the `--save` and `--cache` parameters (see
code for examples of usage).

At run time (materialize), we ensure that the information schema is
generated, and introduce a phase to the `SdfCliInvocation`, where we
read additional metadata from the information schema (code-location,
asset schema, materialized code snippets), once processing completes,
yielding AssetObservations.

## How I Tested These Changes

Updates to 0.3.15 should be covered by the corresponding updated tests.

Tests have been introduced for code refs, new metadata, and updates to
the dbt translator
  • Loading branch information
akbog authored Aug 6, 2024
1 parent c137304 commit f5b1c38
Show file tree
Hide file tree
Showing 17 changed files with 530 additions and 55 deletions.
72 changes: 69 additions & 3 deletions python_modules/libraries/dagster-sdf/dagster_sdf/asset_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Mapping, Optional, Sequence
import textwrap
from pathlib import Path
from typing import Any, Dict, Mapping, Optional, Sequence

from dagster import (
AssetKey,
Expand All @@ -10,6 +12,8 @@
define_asset_job,
)

from .constants import SDF_TARGET_DIR


def dagster_name_fn(table_id: str) -> str:
return table_id.replace(".", "_").replace("-", "_").replace("*", "_star")
Expand Down Expand Up @@ -51,7 +55,7 @@ def build_schedule_from_sdf_selection(
from dagster_sdf import sdf_assets, build_schedule_from_sdf_selection
@sdf_assets(manifest=...)
@sdf_assets(workspace=...)
def all_sdf_assets():
...
Expand Down Expand Up @@ -93,7 +97,7 @@ def get_asset_key_for_table_id(sdf_assets: Sequence[AssetsDefinition], table_id:
from dagster import asset
from dagster_sdf import sdf_assets, get_asset_key_for_table_id
@sdf_assets(manifest=...)
@sdf_assets(workspace=...)
def all_sdf_assets():
...
Expand All @@ -114,3 +118,65 @@ def upstream_python_asset():
f" {asset_keys_by_output_name.keys()}."
)
return next(iter(asset_keys_by_output_name.values()))


def get_output_dir(target_dir: Path, environment: str) -> Path:
return target_dir.joinpath(SDF_TARGET_DIR, environment)


def get_info_schema_dir(target_dir: Path, environment: str) -> Path:
return get_output_dir(target_dir, environment).joinpath(
"data", "system", "information_schema::sdf"
)


def get_materialized_sql_dir(target_dir: Path, environment: str) -> Path:
return get_output_dir(target_dir, environment).joinpath("materialized")


def get_table_path_from_parts(catalog_name: str, schema_name: str, table_name: str) -> Path:
return Path(catalog_name, schema_name, table_name)


def _read_sql_file(path_to_file: Path) -> str:
with open(path_to_file, "r") as file:
return textwrap.indent(file.read(), " ")


def default_description_fn(
table_row: Dict[str, Any],
workspace_dir: Optional[Path] = None,
output_dir: Optional[Path] = None,
enable_raw_sql_description: bool = True,
enable_materialized_sql_description: bool = True,
) -> str:
description_sections = [
table_row["description"] or f"sdf {table_row['materialization']} {table_row['table_id']}",
]
if enable_raw_sql_description:
if workspace_dir is None:
raise ValueError("workspace_dir must be provided to enable raw SQL description.")
path_to_file = None
for source_location in table_row["source_locations"]:
if source_location.endswith(".sql"):
path_to_file = workspace_dir.joinpath(source_location)
break
if path_to_file is not None and path_to_file.exists():
description_sections.append(f"#### Raw SQL:\n```\n{_read_sql_file(path_to_file)}\n```")
if enable_materialized_sql_description:
if output_dir is None:
raise ValueError("output_dir must be provided to enable materialized SQL description.")
path_to_file = (
output_dir.joinpath("materialized")
.joinpath(
get_table_path_from_parts(
table_row["catalog_name"], table_row["schema_name"], table_row["table_name"]
)
)
.with_suffix(".sql")
)
if path_to_file.exists():
description_sections.append(
f"#### Materialized SQL:\n```\n{_read_sql_file(path_to_file)}\n```"
)
return "\n\n".join(filter(None, description_sections))
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
from dataclasses import dataclass
from typing import Optional
from pathlib import Path
from typing import Any, Dict, Optional

from dagster import (
AssetKey,
_check as check,
)
from dagster._annotations import public

from .asset_utils import default_asset_key_fn
from .asset_utils import default_asset_key_fn, default_description_fn


@dataclass(frozen=True)
class DagsterSdfTranslatorSettings:
"""Settings to enable Dagster features for your sdf project."""
"""Settings to enable Dagster features for your sdf project.
Args:
enable_asset_checks (bool): Whether to load sdf table tests as Dagster asset checks.
Defaults to True.
enable_code_references (bool): Whether to enable Dagster code references for sdf tables.
Defaults to False.
enable_raw_sql_description (bool): Whether to display sdf raw sql in Dagster descriptions.
Defaults to True.
enable_materialized_sql_description (bool): Whether to display sdf materialized sql in
Dagster descriptions. Defaults to True.
"""

enable_asset_checks: bool = True
enable_code_references: bool = False
enable_raw_sql_description: bool = True
enable_materialized_sql_description: bool = True


class DagsterSdfTranslator:
Expand Down Expand Up @@ -42,6 +60,42 @@ def settings(self) -> DagsterSdfTranslatorSettings:
def get_asset_key(self, fqn: str) -> AssetKey:
return default_asset_key_fn(fqn)

@public
def get_description(
self, table_row: Dict[str, Any], workspace_dir: Optional[Path], output_dir: Optional[Path]
) -> str:
"""A function that takes a dictionary representing columns of an sdf table row in sdf's
information schema and returns the Dagster description for that table.
This method can be overridden to provide a custom description for an sdf resource.
Args:
table_row (Dict[str, Any]): A dictionary representing columns of an sdf table row.
workspace_dir (Optional[Path]): The path to the workspace directory.
Returns:
str: The description for the dbt resource.
Examples:
.. code-block:: python
from typing import Any, Mapping
from dagster_sdf import DagsterSdfTranslator
class CustomDagsterSdfTranslator(DagsterSdfTranslator):
def get_description(self, table_row: Dict[str, Any], workspace_dir: Optiona[Path], output_dir: Optional[Path]) -> str:
return "custom description"
"""
return default_description_fn(
table_row,
workspace_dir,
output_dir,
self.settings.enable_raw_sql_description,
self.settings.enable_materialized_sql_description,
)


def validate_translator(dagster_sdf_translator: DagsterSdfTranslator) -> DagsterSdfTranslator:
return check.inst_param(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ workspace = SdfWorkspace(workspace_dir=sdf_workspace_dir, target_dir=target_dir,

@sdf_assets(workspace=workspace)
def {{ sdf_assets_name }}(context: AssetExecutionContext, sdf: SdfCliResource):
yield from sdf.cli(["run", "--no-cache"], target_dir=target_dir, environment=environment, context=context).stream()
yield from sdf.cli(["run", "--save", "info-schema", "--cache", "write-only"], target_dir=target_dir, environment=environment, context=context).stream()
5 changes: 1 addition & 4 deletions python_modules/libraries/dagster-sdf/dagster_sdf/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
DEFAULT_SDF_WORKSPACE_ENVIRONMENT,
SDF_DAGSTER_OUTPUT_DIR,
SDF_EXECUTABLE,
SDF_TARGET_DIR,
SDF_WORKSPACE_YML,
)
from .dagster_sdf_translator import DagsterSdfTranslator, validate_opt_translator
Expand Down Expand Up @@ -146,8 +145,6 @@ def cli(
target_args = ["--target-dir", str(target_path)]
log_level_args = ["--log-level", "info"]

output_dir = target_path.joinpath(SDF_TARGET_DIR, environment)

# Ensure that the target_dir exists
target_path.mkdir(parents=True, exist_ok=True)

Expand All @@ -165,7 +162,7 @@ def cli(
env=env,
workspace_dir=Path(self.workspace_dir),
target_dir=target_path,
output_dir=output_dir,
environment=environment,
dagster_sdf_translator=dagster_sdf_translator,
raise_on_error=raise_on_error,
context=context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ class SdfCliEventMessage:
@property
def is_result_event(self) -> bool:
return (
self.raw_event["ev"] == "cmd.do.derived"
bool(self.raw_event.get("ev_table"))
and bool(self.raw_event.get("status_type"))
and bool(self.raw_event.get("status_code"))
and self.raw_event["ev_type"] == "close"
and bool(self.raw_event.get("status"))
)

@public
Expand All @@ -51,11 +52,11 @@ def to_default_asset_events(
if not self.is_result_event:
return

is_success = self.raw_event["status"] == "succeeded"
is_success = self.raw_event["status_code"] == "succeeded"
if not is_success:
return

table_id = self.raw_event["table"]
table_id = self.raw_event["ev_table"]
default_metadata = {
"table_id": table_id,
"Execution Duration": self.raw_event["ev_dur_ms"] / 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from dagster._core.errors import DagsterExecutionInterruptedError
from typing_extensions import Literal

from .constants import SDF_TARGET_DIR
from .dagster_sdf_translator import DagsterSdfTranslator
from .errors import DagsterSdfCliRuntimeError
from .sdf_cli_event import SdfCliEventMessage
from .sdf_event_iterator import SdfDagsterEventType, SdfEventIterator
from .sdf_information_schema import SdfInformationSchema

logger = get_dagster_logger()

Expand All @@ -27,14 +29,16 @@ class SdfCliInvocation:
Args:
process (subprocess.Popen): The process running the sdf command.
workspace_dir (Path): The path to the workspace directory.
target_dir (Path): The path to the target directory.
output_dir (Path): The path to the output directory.
enviornment (str): The environment to use.
raise_on_error (bool): Whether to raise an exception if the sdf command fails.
"""

process: subprocess.Popen
workspace_dir: Path
target_dir: Path
output_dir: Path
environment: str
dagster_sdf_translator: DagsterSdfTranslator
raise_on_error: bool
context: Optional[OpExecutionContext] = field(default=None, repr=False)
Expand All @@ -50,7 +54,7 @@ def run(
env: Dict[str, str],
workspace_dir: Path,
target_dir: Path,
output_dir: Path,
environment: str,
dagster_sdf_translator: DagsterSdfTranslator,
raise_on_error: bool,
context: Optional[OpExecutionContext],
Expand All @@ -66,8 +70,9 @@ def run(

sdf_cli_invocation = cls(
process=process,
workspace_dir=workspace_dir,
target_dir=target_dir,
output_dir=output_dir,
environment=environment,
dagster_sdf_translator=dagster_sdf_translator,
raise_on_error=raise_on_error,
context=context,
Expand Down Expand Up @@ -124,6 +129,11 @@ def _stream_asset_events(
yield from event.to_default_asset_events(
dagster_sdf_translator=self.dagster_sdf_translator, context=self.context
)
yield from SdfInformationSchema(
workspace_dir=self.workspace_dir,
target_dir=self.target_dir,
environment=self.environment,
).stream_asset_observations(dagster_sdf_translator=self.dagster_sdf_translator)

@public
def stream(
Expand All @@ -148,7 +158,7 @@ def stream(
from dagster_sdf import SdfCliResource, sdf_assets
@sdf_assets(workspace_dir="/path/to/sdf/workspace")
@sdf_assets(workspace=SdfWorkspace(workspace_dir="/path/to/sdf/workspace"))
def my_sdf_assets(context, sdf: SdfCliResource):
yield from sdf.cli(["run"], context=context).stream()
"""
Expand Down Expand Up @@ -203,7 +213,7 @@ def get_artifact(
# Retrieve the makefile-run.json artifact.
run_results = sdf_cli_invocation.get_artifact("makefile-run.json")
"""
artifact_path = self.output_dir.joinpath(artifact)
artifact_path = self.target_dir.joinpath(SDF_TARGET_DIR, self.environment, artifact)

return orjson.loads(artifact_path.read_bytes())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from collections import abc
from typing import TYPE_CHECKING, Generic, Iterator, Union

from dagster import AssetMaterialization, Output
from dagster import AssetMaterialization, AssetObservation, Output
from typing_extensions import TypeVar

if TYPE_CHECKING:
from .sdf_cli_invocation import SdfCliInvocation

SdfDagsterEventType = Union[Output, AssetMaterialization]
SdfDagsterEventType = Union[Output, AssetMaterialization, AssetObservation]

# We define SdfEventIterator as a generic type for the sake of type hinting.
# This is so that users who inspect the type of the return value of `SdfCliInvocation.stream()`
Expand All @@ -22,10 +22,10 @@ class SdfEventIterator(Generic[T], abc.Iterator):

def __init__(
self,
events: Iterator[T],
exec_events: Iterator[T],
sdf_cli_invocation: "SdfCliInvocation",
) -> None:
self._inner_iterator = events
self._inner_iterator = exec_events
self._sdf_cli_invocation = sdf_cli_invocation

def __next__(self) -> T:
Expand Down
Loading

0 comments on commit f5b1c38

Please sign in to comment.