Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add create method to fix tests #57

Merged
merged 1 commit into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions datahub_sap_hana/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class HanaSource(SQLAlchemySource):
def __init__(self, config: HanaConfig, ctx: PipelineContext):
super().__init__(config, ctx, "hana")

@classmethod
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "HanaSource":
config = HanaConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
conn = self.get_db_connection()
try:
Expand Down Expand Up @@ -163,15 +168,17 @@ def _get_view_lineage_elements(
f"{lineage.dependent_schema}.{lineage.dependent_view}"
)
logger.debug(
f"View pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}")
f"View pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}"
)
continue

if not self.config.schema_pattern.allowed(lineage.dependent_schema):
self.report.report_dropped(
f"{lineage.dependent_schema}.{lineage.dependent_view}"
)
logger.debug(
f"Schema pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}")
f"Schema pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}"
)
continue

key = (lineage.dependent_view, lineage.dependent_schema)
Expand Down Expand Up @@ -241,7 +248,7 @@ def get_column_lineage_view_definitions(
def _get_column_lineage_for_view(self, view_sql: str) -> List[Node]:
"""Extracts the columns and the sql definitions of a downstream view to build a lineage graph."""

expression: DerivedTable = parse_one(view_sql)
expression: DerivedTable = parse_one(view_sql) # type: ignore
selected_columns: List[str] = expression.named_selects

view_sql = view_sql.lower()
Expand All @@ -266,11 +273,12 @@ def get_column_view_lineage_elements(
Tuple[DownstreamLineageField, List[UpstreamLineageField]]
] = []

column_lineages = self._get_column_lineage_for_view(view.sql)
column_lineages = self._get_column_lineage_for_view(view.sql)

downstream_table_metadata = get_table_schema(
inspector, view.name, view.schema)

inspector, view.name, view.schema
)

# lineage_node represents the lineage of 1 column in sqlglot
# lineage_node.downstream is the datahub upstream
# each element of lineage_node.downstream is a node that represents a column in the source table
Expand All @@ -282,8 +290,9 @@ def get_column_view_lineage_elements(
)

# checks the casing for the downstream column based on what is in the db
downstream_column_metadata = downstream_table_metadata[lineage_node.name.lower(
)]
downstream_column_metadata = downstream_table_metadata[
lineage_node.name.lower()
]
downstream.name = downstream_column_metadata["name"]

upstream_fields_list = [
Expand All @@ -298,7 +307,6 @@ def get_column_view_lineage_elements(
# from the inspector so that the Datahub URN we generate matches
# the URN from the base SQLAlchemy source implementation.
for column in upstream_fields_list:

# checks the casing for the upstream column based on what is in the db
source_table_metadata = get_table_schema(
inspector, column.dataset.name, column.dataset.schema
Expand Down Expand Up @@ -408,4 +416,4 @@ def get_table_schema(inspector: Inspector, table_name: str, schema_name: str) ->
return {
column["name"].lower(): column
for column in inspector.get_columns(table_name, schema_name)
}
}
7 changes: 5 additions & 2 deletions tests/integration/test_lineage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from datahub.ingestion.api.common import PipelineContext
from serde.json import to_json
from sqlalchemy import inspect

from datahub_sap_hana.ingestion import HanaSource

Expand Down Expand Up @@ -28,8 +29,9 @@ def config():
def test_get_view_definitions(config, ctx):
source = HanaSource.create(config, ctx)
conn = source.get_db_connection()
inspector = inspect(conn)

view_definitions = list(source.get_column_lineage_view_definitions(conn))
view_definitions = list(source.get_column_lineage_view_definitions(inspector))

assert len(view_definitions) == 5
assert [view.name for view in view_definitions] == [
Expand All @@ -48,8 +50,9 @@ def test_get_view_definitions(config, ctx):
def test_get_column_lineage(config, ctx):
source = HanaSource.create(config, ctx)
conn = source.get_db_connection()
inspector = inspect(conn)

lineages = list(source.get_column_view_lineage_elements(conn))
lineages = list(source.get_column_view_lineage_elements(inspector))

assert len(lineages) == 5
assert [x[0].name for x in lineages] == [
Expand Down