Skip to content

Commit

Permalink
chore: fix various linting issues
Browse files Browse the repository at this point in the history
Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
  • Loading branch information
LucasRoesler committed Jul 3, 2023
1 parent 7f07d6e commit f3a9524
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 55 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
"cSpell.words": [
"datahub",
"Hana",
"sqlalchemy"
"sqlachemy",
"sqlalchemy",
"sqlglot"
],
"[python]": {
"editor.tabSize": 4,
Expand Down
12 changes: 9 additions & 3 deletions datahub_sap_hana/column_lineage_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class View(Table):
class ColumnField:
"""
ColumnField contains the metadata to describe a column in a table.
Use `ColumnField.from_node(lineage_node)` to create a ColumnField from a sqlglot node.
Use `ColumnField.from_node(lineage_node)` to create a ColumnField
from a sqlglot node.
"""

name: str
Expand All @@ -47,12 +49,16 @@ def from_node(cls, node: Node, schema: str):


class UpstreamLineageField(ColumnField):
"""UpstreamField contains the metadata to describe the upstream column (source) lineage of a DownstreamField."""
"""UpstreamField contains the metadata to describe the
upstream column (source) lineage of a DownstreamField.
"""

pass


class DownstreamLineageField(ColumnField):
"""DownstreamField contains the metadata to describe the downstream column (target) lineage of a column in a table."""
"""DownstreamField contains the metadata to describe the
downstream column (target) lineage of a column in a table.
"""

pass
100 changes: 58 additions & 42 deletions datahub_sap_hana/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@

logger: logging.Logger = logging.getLogger(__name__)

# This query only takes 4 columns to match the fields in the ViewLineageEntry class. It also ignores schemas that contain SYS
# Object dependencies in SAP HANA https://help.sap.com/docs/SAP_HANA_PLATFORM/de2486ee947e43e684d39702027f8a94/5ce9a6584eb84f10afbbf2b133534932.html
# This query only takes 4 columns to match the fields in the ViewLineageEntry class.
# It also ignores schemas that contain SYS Object dependencies in SAP HANA
# https://help.sap.com/docs/SAP_HANA_PLATFORM/de2486ee947e43e684d39702027f8a94/5ce9a6584eb84f10afbbf2b133534932.html
LINEAGE_QUERY = """
SELECT
LOWER(BASE_OBJECT_NAME) as source_table,
Expand All @@ -67,7 +68,8 @@


class ViewLineageEntry(BaseModel):
"""Describes the upstream and downstream entities that will be assigned to the columns resulting from the LINEAGE_QUERY
"""Describes the upstream and downstream entities that will be assigned to the
columns resulting from the LINEAGE_QUERY
Attributes:
source_table (str): base_object_name in sap hana object dependencies
Expand All @@ -87,8 +89,7 @@ class HanaConfig(BasicSQLAlchemyConfig):
"""Represents the attributes needed to configure the SAP HANA DB connection"""

scheme = "hana"
schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern(deny=["*SYS*"]))
schema_pattern: AllowDenyPattern = Field(default=AllowDenyPattern(deny=["*SYS*"]))
include_view_lineage: bool = Field(
default=False, description="Include table lineage for views"
)
Expand All @@ -110,8 +111,12 @@ def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str:
@config_class(HanaConfig) # type: ignore
class HanaSource(SQLAlchemySource):
"""Creates a datasource for the lineage of tables from a SAP HANA database.
It contains the connection to the db using sqlachemy to get the table metadata to construct the downstream-upstream lineage.
Returns an iterable work unit that gets emitted to Datahub."""
It contains the connection to the db using sqlachemy to get the table metadata
to construct the downstream-upstream lineage.
Returns an iterable work unit that gets emitted to Datahub.
"""

config: HanaConfig

Expand Down Expand Up @@ -145,7 +150,9 @@ def get_db_connection(self) -> Connection:
def _get_view_lineage_elements(
self, conn: Connection
) -> Dict[Tuple[str, str], List[str]]:
"""Connects to SAP HANA db to run the query statement. The results are then mapped to the ViewLineageEntry attributes.
"""Connects to SAP HANA db to run the query statement.
The results are then mapped to the ViewLineageEntry attributes.
Returns a dictionary of downstream and upstream objects from the query results.
"""

Expand All @@ -162,33 +169,31 @@ def _get_view_lineage_elements(

lineage_elements: Dict[Tuple[str, str], List[str]] = defaultdict(list)

for lineage in data:
if not self.config.view_pattern.allowed(lineage.dependent_view):
for item in data:
if not self.config.view_pattern.allowed(item.dependent_view):
self.report.report_dropped(
f"{lineage.dependent_schema}.{lineage.dependent_view}"
f"{item.dependent_schema}.{item.dependent_view}"
)
logger.debug(
f"View pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}"
f"View pattern is incompatible, dropping: {item.dependent_schema}.{item.dependent_view}" # noqa: E501
)
continue

if not self.config.schema_pattern.allowed(lineage.dependent_schema):
if not self.config.schema_pattern.allowed(item.dependent_schema):
self.report.report_dropped(
f"{lineage.dependent_schema}.{lineage.dependent_view}"
f"{item.dependent_schema}.{item.dependent_view}"
)
logger.debug(
f"Schema pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}"
f"Schema pattern is incompatible, dropping: {item.dependent_schema}.{item.dependent_view}" # noqa: E501
)
continue

key = (lineage.dependent_view, lineage.dependent_schema)
key = (item.dependent_view, item.dependent_schema)

lineage_elements[key].append(
mce_builder.make_dataset_urn(
self.platform,
self.config.get_identifier(
lineage.source_schema, lineage.source_table
),
self.config.get_identifier(item.source_schema, item.source_table),
self.config.env,
)
)
Expand All @@ -197,7 +202,9 @@ def _get_view_lineage_elements(
def _get_view_lineage_workunits(
self, conn: Connection
) -> Iterable[MetadataWorkUnit]:
"""Creates MetadataWorkUnit objects for table lineage based on the downstream and downstream objects from the query results.
"""Creates MetadataWorkUnit objects for table lineage based on the
downstream and downstream objects from the query results.
Returns an iterable MetadataWorkUnit that are emitted to Datahub.
"""
lineage_elements = self._get_view_lineage_elements(conn)
Expand Down Expand Up @@ -235,8 +242,7 @@ def get_column_lineage_view_definitions(
) # returns a list

for view_name in views:
view_sql: str = inspector.get_view_definition(
view_name, schema_name)
view_sql: str = inspector.get_view_definition(view_name, schema_name)

if view_sql:
yield View(
Expand All @@ -246,7 +252,9 @@ def get_column_lineage_view_definitions(
)

def _get_column_lineage_for_view(self, view_sql: str) -> List[Node]:
"""Extracts the columns and the sql definitions of a downstream view to build a lineage graph."""
"""Extracts the columns and the sql definitions of a downstream view to build
a lineage graph.
"""

expression: DerivedTable = parse_one(view_sql) # type: ignore
selected_columns: List[str] = expression.named_selects
Expand All @@ -263,9 +271,13 @@ def get_column_view_lineage_elements(
) -> Iterable[
Tuple[View, List[Tuple[DownstreamLineageField, List[UpstreamLineageField]]]]
]:
"""This function returns an iterable of tuples containing information about the lineage of columns in a view.
Each tuple contains a downstream field (a column in a view) and a list of upstream fields
(columns in other views or tables that are used to calculate/transform the downstream column).
"""
This function returns an iterable of tuples containing information about the
lineage of columns in a view.
Each tuple contains a downstream field (a column in a view) and a list of
upstream fields (columns in other views or tables that are used to
calculate/transform the downstream column).
"""

for view in self.get_column_lineage_view_definitions(inspector):
Expand All @@ -280,16 +292,17 @@ def get_column_view_lineage_elements(
)

# lineage_node represents the lineage of 1 column in sqlglot
# lineage_node.downstream is the datahub upstream
# each element of lineage_node.downstream is a node that represents a column in the source table
# lineage_node.downstream is the datahub upstream each element
# of lineage_node.downstream is a node that represents a column
# in the source table

for lineage_node in column_lineages:
downstream = DownstreamLineageField(
name=lineage_node.name,
dataset=view,
)

# checks the casing for the downstream column based on what is in the db
# checks the casing for the downstream column based on the db value
downstream_column_metadata = downstream_table_metadata[
lineage_node.name.lower()
]
Expand All @@ -307,12 +320,11 @@ def get_column_view_lineage_elements(
# from the inspector so that the Datahub URN we generate matches
# the URN from the base SQLAlchemy source implementation.
for column in upstream_fields_list:
# checks the casing for the upstream column based on what is in the db
# checks the casing for the upstream column based on the db value
source_table_metadata = get_table_schema(
inspector, column.dataset.name, column.dataset.schema
)
column_metadata = source_table_metadata[column.name.lower(
)]
column_metadata = source_table_metadata[column.name.lower()]
column.name = column_metadata["name"]

# we only have lineage information if there are "upstream" fields
Expand All @@ -324,16 +336,19 @@ def get_column_view_lineage_elements(
def build_fine_grained_lineage(
self, inspector: Inspector
) -> Iterable[Tuple[List[FineGrainedLineage], Set[str], str]]:
"""Returns an iterable of tuples, where each tuple contains a list of FineGrainedLineage objects, which represents
column-level lineage information and a set of strings representing the upstream dataset URNs created during lineage generation.
"""
Returns an iterable of tuples, where each tuple contains a list of
FineGrainedLineage objects, which represents column-level lineage
information and a set of strings representing the upstream dataset
URNs created during lineage generation.
"""

upstream_type = FineGrainedLineageUpstreamType.FIELD_SET
downstream_type = FineGrainedLineageDownstreamType.FIELD

for (
view,
lineage,
lineage_items,
) in self.get_column_view_lineage_elements(inspector):
column_lineages: List[FineGrainedLineage] = []
seen_upstream_datasets: Set[str] = set()
Expand All @@ -347,8 +362,9 @@ def build_fine_grained_lineage(
env=self.config.env,
)

for downstream_field, upstream_fields in lineage:
# upstream_column/s should be dependent on the existence of downstream_field attached to it
for downstream_field, upstream_fields in lineage_items:
# upstream_column/s should be dependent on the existence of
# downstream_field attached to it
upstream_columns: List[Any] = []

for upstream_field in upstream_fields:
Expand Down Expand Up @@ -385,9 +401,10 @@ def build_fine_grained_lineage(
def _get_column_lineage_workunits(
self, inspector: Inspector
) -> Iterable[MetadataWorkUnit]:
"""Returns an iterable of MetadataChangeProposalWrapper object that contains column lineage information that is sent to Datahub
after each iteration of the loop. The object is built with column lineages, upstream datasets, and downstream dataset
URNs from the create_column_lineage method.
"""Returns an iterable of MetadataChangeProposalWrapper object that contains
column lineage information that is sent to Datahub after each iteration of
the loop. The object is built with column lineages, upstream datasets, and
downstream dataset URNs from the create_column_lineage method.
"""
for (
column_lineages,
Expand All @@ -397,8 +414,7 @@ def _get_column_lineage_workunits(
fieldLineages = UpstreamLineage(
fineGrainedLineages=column_lineages,
upstreams=[
Upstream(dataset=dataset_urn,
type=DatasetLineageType.TRANSFORMED)
Upstream(dataset=dataset_urn, type=DatasetLineageType.TRANSFORMED)
for dataset_urn in list(upstream_datasets)
],
)
Expand Down
3 changes: 2 additions & 1 deletion datahub_sap_hana/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

class Inspector(Protocol):
"""
A protocol describing the required methods from the sqlalchemy.engine.reflection.Inspector class.
A protocol describing the required methods from the
sqlalchemy.engine.reflection.Inspector class.
"""

def get_columns(
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ typeCheckingMode = "basic"
# See https://beta.ruff.rs/docs/configuration/
select = ["E", "F", "I001"]


[tool.ruff.per-file-ignores]
"tests/test_helpers/mce_helpers.py" = ["E501"]
"tests/test_helpers/type_helpers.py" = ["E501"]

[tool.unimport]
sources = ["datahub_sap_hana", "tests"]
exclude = '__init__.py|.venv/'
36 changes: 28 additions & 8 deletions tests/integration/test_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,19 @@ def test_get_column_lineage(config, ctx):
"total_rooms_price",
]

# SELECT "H"."HNO" , "H"."NAME" , "H"."ADDRESS" , "H"."CITY" , "H"."STATE" , "H"."ZIP" , "R"."TYPE" , "R"."PRICE" , "R"."FREE"
# FROM HOTEL.ROOM AS R\n
# LEFT JOIN HOTEL.HOTEL AS H \n ON H.HNO=R.HNO'
# SELECT
# "H"."HNO" ,
# "H"."NAME" ,
# "H"."ADDRESS" ,
# "H"."CITY" ,
# "H"."STATE" ,
# "H"."ZIP" ,
# "R"."TYPE" ,
# "R"."PRICE" ,
# "R"."FREE"
# FROM HOTEL.ROOM AS R
# LEFT JOIN HOTEL.HOTEL AS H
# ON H.HNO=R.HNO'

flat_hotel_rooms = lineages[1]
column_lineage = flat_hotel_rooms[1]
Expand Down Expand Up @@ -96,11 +106,21 @@ def test_get_column_lineage(config, ctx):
upstreams = [x[1][0].name for x in column_lineage]
assert upstreams == upstream_field_names, f"{upstreams}"

# SELECT\n H.NAME,\n H.CITY,\n R.TYPE,\n COUNT(R.TYPE) * (R.PRICE) AS TOTAL_ROOM_PRICE\n
# FROM\n HOTEL.ROOM AS R\n
# LEFT JOIN\n HOTEL.HOTEL AS H\n
# ON H.HNO = R.HNO\n
# GROUP BY \n H.NAME,\n H.CITY,\n R.TYPE, \n R.PRICE'
# SELECT
# H.NAME,
# H.CITY,
# R.TYPE,
# COUNT(R.TYPE) * (R.PRICE) AS TOTAL_ROOM_PRICE
# FROM
# HOTEL.ROOM AS R
# LEFT JOIN
# HOTEL.HOTEL AS H
# ON H.HNO = R.HNO
# GROUP BY
# H.NAME,
# H.CITY,
# R.TYPE,
# R.PRICE'

total_rooms_price = lineages[4]
column_lineage = total_rooms_price[1]
Expand Down

0 comments on commit f3a9524

Please sign in to comment.