From f3a952421144553fef563ad0c3c7eee4b261a2d7 Mon Sep 17 00:00:00 2001 From: Lucas Roesler Date: Mon, 3 Jul 2023 17:13:33 +0200 Subject: [PATCH] chore: fix various linting issues Signed-off-by: Lucas Roesler --- .vscode/settings.json | 4 +- datahub_sap_hana/column_lineage_schema.py | 12 ++- datahub_sap_hana/ingestion.py | 100 +++++++++++++--------- datahub_sap_hana/inspector.py | 3 +- pyproject.toml | 5 ++ tests/integration/test_lineage.py | 36 ++++++-- 6 files changed, 105 insertions(+), 55 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 2f13e90..a0899e2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,7 +2,9 @@ "cSpell.words": [ "datahub", "Hana", - "sqlalchemy" + "sqlachemy", + "sqlalchemy", + "sqlglot" ], "[python]": { "editor.tabSize": 4, diff --git a/datahub_sap_hana/column_lineage_schema.py b/datahub_sap_hana/column_lineage_schema.py index 1d2bae3..26c3819 100644 --- a/datahub_sap_hana/column_lineage_schema.py +++ b/datahub_sap_hana/column_lineage_schema.py @@ -31,7 +31,9 @@ class View(Table): class ColumnField: """ ColumnField contains the metadata to describe a column in a table. - Use `ColumnField.from_node(lineage_node)` to create a ColumnField from a sqlglot node. + + Use `ColumnField.from_node(lineage_node)` to create a ColumnField + from a sqlglot node. """ name: str @@ -47,12 +49,16 @@ def from_node(cls, node: Node, schema: str): class UpstreamLineageField(ColumnField): - """UpstreamField contains the metadata to describe the upstream column (source) lineage of a DownstreamField.""" + """UpstreamField contains the metadata to describe the + upstream column (source) lineage of a DownstreamField. + """ pass class DownstreamLineageField(ColumnField): - """DownstreamField contains the metadata to describe the downstream column (target) lineage of a column in a table.""" + """DownstreamField contains the metadata to describe the + downstream column (target) lineage of a column in a table. + """ pass diff --git a/datahub_sap_hana/ingestion.py b/datahub_sap_hana/ingestion.py index 068cba2..19d50cf 100644 --- a/datahub_sap_hana/ingestion.py +++ b/datahub_sap_hana/ingestion.py @@ -50,8 +50,9 @@ logger: logging.Logger = logging.getLogger(__name__) -# This query only takes 4 columns to match the fields in the ViewLineageEntry class. It also ignores schemas that contain SYS -# Object dependencies in SAP HANA https://help.sap.com/docs/SAP_HANA_PLATFORM/de2486ee947e43e684d39702027f8a94/5ce9a6584eb84f10afbbf2b133534932.html +# This query only takes 4 columns to match the fields in the ViewLineageEntry class. +# It also ignores schemas that contain SYS Object dependencies in SAP HANA +# https://help.sap.com/docs/SAP_HANA_PLATFORM/de2486ee947e43e684d39702027f8a94/5ce9a6584eb84f10afbbf2b133534932.html LINEAGE_QUERY = """ SELECT LOWER(BASE_OBJECT_NAME) as source_table, @@ -67,7 +68,8 @@ class ViewLineageEntry(BaseModel): - """Describes the upstream and downstream entities that will be assigned to the columns resulting from the LINEAGE_QUERY + """Describes the upstream and downstream entities that will be assigned to the + columns resulting from the LINEAGE_QUERY Attributes: source_table (str): base_object_name in sap hana object dependencies @@ -87,8 +89,7 @@ class HanaConfig(BasicSQLAlchemyConfig): """Represents the attributes needed to configure the SAP HANA DB connection""" scheme = "hana" - schema_pattern: AllowDenyPattern = Field( - default=AllowDenyPattern(deny=["*SYS*"])) + schema_pattern: AllowDenyPattern = Field(default=AllowDenyPattern(deny=["*SYS*"])) include_view_lineage: bool = Field( default=False, description="Include table lineage for views" ) @@ -110,8 +111,12 @@ def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str: @config_class(HanaConfig) # type: ignore class HanaSource(SQLAlchemySource): """Creates a datasource for the lineage of tables from a SAP HANA database. - It contains the connection to the db using sqlachemy to get the table metadata to construct the downstream-upstream lineage. - Returns an iterable work unit that gets emitted to Datahub.""" + + It contains the connection to the db using sqlachemy to get the table metadata + to construct the downstream-upstream lineage. + + Returns an iterable work unit that gets emitted to Datahub. + """ config: HanaConfig @@ -145,7 +150,9 @@ def get_db_connection(self) -> Connection: def _get_view_lineage_elements( self, conn: Connection ) -> Dict[Tuple[str, str], List[str]]: - """Connects to SAP HANA db to run the query statement. The results are then mapped to the ViewLineageEntry attributes. + """Connects to SAP HANA db to run the query statement. + + The results are then mapped to the ViewLineageEntry attributes. Returns a dictionary of downstream and upstream objects from the query results. """ @@ -162,33 +169,31 @@ def _get_view_lineage_elements( lineage_elements: Dict[Tuple[str, str], List[str]] = defaultdict(list) - for lineage in data: - if not self.config.view_pattern.allowed(lineage.dependent_view): + for item in data: + if not self.config.view_pattern.allowed(item.dependent_view): self.report.report_dropped( - f"{lineage.dependent_schema}.{lineage.dependent_view}" + f"{item.dependent_schema}.{item.dependent_view}" ) logger.debug( - f"View pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}" + f"View pattern is incompatible, dropping: {item.dependent_schema}.{item.dependent_view}" # noqa: E501 ) continue - if not self.config.schema_pattern.allowed(lineage.dependent_schema): + if not self.config.schema_pattern.allowed(item.dependent_schema): self.report.report_dropped( - f"{lineage.dependent_schema}.{lineage.dependent_view}" + f"{item.dependent_schema}.{item.dependent_view}" ) logger.debug( - f"Schema pattern is incompatible, dropping: {lineage.dependent_schema}.{lineage.dependent_view}" + f"Schema pattern is incompatible, dropping: {item.dependent_schema}.{item.dependent_view}" # noqa: E501 ) continue - key = (lineage.dependent_view, lineage.dependent_schema) + key = (item.dependent_view, item.dependent_schema) lineage_elements[key].append( mce_builder.make_dataset_urn( self.platform, - self.config.get_identifier( - lineage.source_schema, lineage.source_table - ), + self.config.get_identifier(item.source_schema, item.source_table), self.config.env, ) ) @@ -197,7 +202,9 @@ def _get_view_lineage_elements( def _get_view_lineage_workunits( self, conn: Connection ) -> Iterable[MetadataWorkUnit]: - """Creates MetadataWorkUnit objects for table lineage based on the downstream and downstream objects from the query results. + """Creates MetadataWorkUnit objects for table lineage based on the + downstream and downstream objects from the query results. + Returns an iterable MetadataWorkUnit that are emitted to Datahub. """ lineage_elements = self._get_view_lineage_elements(conn) @@ -235,8 +242,7 @@ def get_column_lineage_view_definitions( ) # returns a list for view_name in views: - view_sql: str = inspector.get_view_definition( - view_name, schema_name) + view_sql: str = inspector.get_view_definition(view_name, schema_name) if view_sql: yield View( @@ -246,7 +252,9 @@ def get_column_lineage_view_definitions( ) def _get_column_lineage_for_view(self, view_sql: str) -> List[Node]: - """Extracts the columns and the sql definitions of a downstream view to build a lineage graph.""" + """Extracts the columns and the sql definitions of a downstream view to build + a lineage graph. + """ expression: DerivedTable = parse_one(view_sql) # type: ignore selected_columns: List[str] = expression.named_selects @@ -263,9 +271,13 @@ def get_column_view_lineage_elements( ) -> Iterable[ Tuple[View, List[Tuple[DownstreamLineageField, List[UpstreamLineageField]]]] ]: - """This function returns an iterable of tuples containing information about the lineage of columns in a view. - Each tuple contains a downstream field (a column in a view) and a list of upstream fields - (columns in other views or tables that are used to calculate/transform the downstream column). + """ + This function returns an iterable of tuples containing information about the + lineage of columns in a view. + + Each tuple contains a downstream field (a column in a view) and a list of + upstream fields (columns in other views or tables that are used to + calculate/transform the downstream column). """ for view in self.get_column_lineage_view_definitions(inspector): @@ -280,8 +292,9 @@ def get_column_view_lineage_elements( ) # lineage_node represents the lineage of 1 column in sqlglot - # lineage_node.downstream is the datahub upstream - # each element of lineage_node.downstream is a node that represents a column in the source table + # lineage_node.downstream is the datahub upstream each element + # of lineage_node.downstream is a node that represents a column + # in the source table for lineage_node in column_lineages: downstream = DownstreamLineageField( @@ -289,7 +302,7 @@ def get_column_view_lineage_elements( dataset=view, ) - # checks the casing for the downstream column based on what is in the db + # checks the casing for the downstream column based on the db value downstream_column_metadata = downstream_table_metadata[ lineage_node.name.lower() ] @@ -307,12 +320,11 @@ def get_column_view_lineage_elements( # from the inspector so that the Datahub URN we generate matches # the URN from the base SQLAlchemy source implementation. for column in upstream_fields_list: - # checks the casing for the upstream column based on what is in the db + # checks the casing for the upstream column based on the db value source_table_metadata = get_table_schema( inspector, column.dataset.name, column.dataset.schema ) - column_metadata = source_table_metadata[column.name.lower( - )] + column_metadata = source_table_metadata[column.name.lower()] column.name = column_metadata["name"] # we only have lineage information if there are "upstream" fields @@ -324,8 +336,11 @@ def get_column_view_lineage_elements( def build_fine_grained_lineage( self, inspector: Inspector ) -> Iterable[Tuple[List[FineGrainedLineage], Set[str], str]]: - """Returns an iterable of tuples, where each tuple contains a list of FineGrainedLineage objects, which represents - column-level lineage information and a set of strings representing the upstream dataset URNs created during lineage generation. + """ + Returns an iterable of tuples, where each tuple contains a list of + FineGrainedLineage objects, which represents column-level lineage + information and a set of strings representing the upstream dataset + URNs created during lineage generation. """ upstream_type = FineGrainedLineageUpstreamType.FIELD_SET @@ -333,7 +348,7 @@ def build_fine_grained_lineage( for ( view, - lineage, + lineage_items, ) in self.get_column_view_lineage_elements(inspector): column_lineages: List[FineGrainedLineage] = [] seen_upstream_datasets: Set[str] = set() @@ -347,8 +362,9 @@ def build_fine_grained_lineage( env=self.config.env, ) - for downstream_field, upstream_fields in lineage: - # upstream_column/s should be dependent on the existence of downstream_field attached to it + for downstream_field, upstream_fields in lineage_items: + # upstream_column/s should be dependent on the existence of + # downstream_field attached to it upstream_columns: List[Any] = [] for upstream_field in upstream_fields: @@ -385,9 +401,10 @@ def build_fine_grained_lineage( def _get_column_lineage_workunits( self, inspector: Inspector ) -> Iterable[MetadataWorkUnit]: - """Returns an iterable of MetadataChangeProposalWrapper object that contains column lineage information that is sent to Datahub - after each iteration of the loop. The object is built with column lineages, upstream datasets, and downstream dataset - URNs from the create_column_lineage method. + """Returns an iterable of MetadataChangeProposalWrapper object that contains + column lineage information that is sent to Datahub after each iteration of + the loop. The object is built with column lineages, upstream datasets, and + downstream dataset URNs from the create_column_lineage method. """ for ( column_lineages, @@ -397,8 +414,7 @@ def _get_column_lineage_workunits( fieldLineages = UpstreamLineage( fineGrainedLineages=column_lineages, upstreams=[ - Upstream(dataset=dataset_urn, - type=DatasetLineageType.TRANSFORMED) + Upstream(dataset=dataset_urn, type=DatasetLineageType.TRANSFORMED) for dataset_urn in list(upstream_datasets) ], ) diff --git a/datahub_sap_hana/inspector.py b/datahub_sap_hana/inspector.py index 3cd289f..d1f189f 100644 --- a/datahub_sap_hana/inspector.py +++ b/datahub_sap_hana/inspector.py @@ -15,7 +15,8 @@ class Inspector(Protocol): """ - A protocol describing the required methods from the sqlalchemy.engine.reflection.Inspector class. + A protocol describing the required methods from the + sqlalchemy.engine.reflection.Inspector class. """ def get_columns( diff --git a/pyproject.toml b/pyproject.toml index e091702..76b274a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,11 @@ typeCheckingMode = "basic" # See https://beta.ruff.rs/docs/configuration/ select = ["E", "F", "I001"] + +[tool.ruff.per-file-ignores] +"tests/test_helpers/mce_helpers.py" = ["E501"] +"tests/test_helpers/type_helpers.py" = ["E501"] + [tool.unimport] sources = ["datahub_sap_hana", "tests"] exclude = '__init__.py|.venv/' diff --git a/tests/integration/test_lineage.py b/tests/integration/test_lineage.py index 6bd6fee..1983879 100644 --- a/tests/integration/test_lineage.py +++ b/tests/integration/test_lineage.py @@ -63,9 +63,19 @@ def test_get_column_lineage(config, ctx): "total_rooms_price", ] - # SELECT "H"."HNO" , "H"."NAME" , "H"."ADDRESS" , "H"."CITY" , "H"."STATE" , "H"."ZIP" , "R"."TYPE" , "R"."PRICE" , "R"."FREE" - # FROM HOTEL.ROOM AS R\n - # LEFT JOIN HOTEL.HOTEL AS H \n ON H.HNO=R.HNO' + # SELECT + # "H"."HNO" , + # "H"."NAME" , + # "H"."ADDRESS" , + # "H"."CITY" , + # "H"."STATE" , + # "H"."ZIP" , + # "R"."TYPE" , + # "R"."PRICE" , + # "R"."FREE" + # FROM HOTEL.ROOM AS R + # LEFT JOIN HOTEL.HOTEL AS H + # ON H.HNO=R.HNO' flat_hotel_rooms = lineages[1] column_lineage = flat_hotel_rooms[1] @@ -96,11 +106,21 @@ def test_get_column_lineage(config, ctx): upstreams = [x[1][0].name for x in column_lineage] assert upstreams == upstream_field_names, f"{upstreams}" - # SELECT\n H.NAME,\n H.CITY,\n R.TYPE,\n COUNT(R.TYPE) * (R.PRICE) AS TOTAL_ROOM_PRICE\n - # FROM\n HOTEL.ROOM AS R\n - # LEFT JOIN\n HOTEL.HOTEL AS H\n - # ON H.HNO = R.HNO\n - # GROUP BY \n H.NAME,\n H.CITY,\n R.TYPE, \n R.PRICE' + # SELECT + # H.NAME, + # H.CITY, + # R.TYPE, + # COUNT(R.TYPE) * (R.PRICE) AS TOTAL_ROOM_PRICE + # FROM + # HOTEL.ROOM AS R + # LEFT JOIN + # HOTEL.HOTEL AS H + # ON H.HNO = R.HNO + # GROUP BY + # H.NAME, + # H.CITY, + # R.TYPE, + # R.PRICE' total_rooms_price = lineages[4] column_lineage = total_rooms_price[1]