From 3216cd47e4f0e438682ea9f69fe7134a2089ca6a Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 26 May 2020 14:21:14 -0600 Subject: [PATCH 1/4] Fix the catalog, making use of rc3 fixes --- dbt/adapters/spark/impl.py | 144 +++++++------------------- dbt/include/spark/macros/adapters.sql | 5 + 2 files changed, 40 insertions(+), 109 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 7e1e3910c..2e94c106f 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,25 +1,25 @@ from dataclasses import dataclass -from typing import Optional, List, Dict, Any, Union +from typing import Optional, List, Dict, Any, Union, Iterable import agate -import dbt.exceptions + import dbt +import dbt.exceptions + from dbt.adapters.base import AdapterConfig from dbt.adapters.sql import SQLAdapter - from dbt.adapters.spark import SparkConnectionManager from dbt.adapters.spark import SparkRelation from dbt.adapters.spark import SparkColumn - - from dbt.adapters.base import BaseRelation - +from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER from dbt.logger import GLOBAL_LOGGER as logger GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation' LIST_SCHEMAS_MACRO_NAME = 'list_schemas' LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching' DROP_RELATION_MACRO_NAME = 'drop_relation' +FETCH_TBL_PROPERTIES_MACRO_NAME = 'fetch_tbl_properties' KEY_TABLE_OWNER = 'Owner' KEY_TABLE_STATISTICS = 'Statistics' @@ -171,7 +171,7 @@ def parse_describe_extended( raw_table_stats = metadata.get(KEY_TABLE_STATISTICS) table_stats = SparkColumn.convert_table_stats(raw_table_stats) return [SparkColumn( - table_database=relation.database, + table_database=None, table_schema=relation.schema, table_name=relation.name, table_type=relation.type, @@ -195,102 +195,29 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: rows: List[agate.Row] = super().get_columns_in_relation(relation) return self.parse_describe_extended(relation, rows) - @staticmethod - def _parse_relation(relation: Relation, - table_columns: List[Column], - rel_type: str, - properties: Dict[str, str] = None) -> List[dict]: - properties = properties or {} - statistics = {} - table_owner_key = 'Owner' - - # First check if it is present in the properties - table_owner = properties.get(table_owner_key) - - found_detailed_table_marker = False - for column in table_columns: - if column.name == '# Detailed Table Information': - found_detailed_table_marker = True - - # In case there is another column with the name Owner - if not found_detailed_table_marker: - continue - - if not table_owner and column.name == table_owner_key: - table_owner = column.data_type - - if column.name == 'Statistics': - # format: 1109049927 bytes, 14093476 rows - statistics = {stats.split(" ")[1]: int(stats.split(" ")[0]) for - stats in column.data_type.split(', ')} - - columns = [] - for column_index, column in enumerate(table_columns): - # Fixes for pseudo-columns with no type - if column.name in { - '# Partition Information', - '# col_name', - '' - }: - continue - elif column.name == '# Detailed Table Information': - # Loop until the detailed table information - break - elif column.data_type is None: - continue - - column_data = ( - relation.database, - relation.schema, - relation.name, - rel_type, - None, - table_owner, - column.name, - column_index, - column.data_type, - None, - - # Table level stats - 'Table size', - statistics.get("bytes"), - "The size of the table in bytes", - statistics.get("bytes") is not None, - - # Column level stats - 'Number of rows', - statistics.get("rows"), - "The number of rows in the table", - statistics.get("rows") is not None - ) - - column_dict = dict(zip(SparkAdapter.COLUMN_NAMES, column_data)) - columns.append(column_dict) - - return columns - - def _massage_column_for_catalog( - self, column: SparkColumn - ) -> Dict[str, Any]: - dct = column.to_dict() - # different expectations here - Column.column is the name - dct['column_name'] = dct.pop('column') - dct['column_type'] = dct.pop('dtype') - # table_database can't be None in core. - if dct['table_database'] is None: - dct['table_database'] = dct['table_schema'] - return dct - - def _get_catalog_for_relations(self, database: str, schema: str): - with self.connection_named(f'{database}.{schema}'): - columns = [] - for relation in self.list_relations(database, schema): - logger.debug("Getting table schema for relation {}", relation) - columns.extend( - self._massage_column_for_catalog(col) - for col in self.get_columns_in_relation(relation) - ) - return agate.Table.from_object(columns) + def _get_columns_for_catalog( + self, relation: SparkRelation + ) -> Iterable[Dict[str, Any]]: + properties = self.get_properties(relation) + columns = self.get_columns_in_relation(relation) + owner = properties.get(KEY_TABLE_OWNER) + + for column in columns: + if owner: + column.table_owner = owner + # convert SparkColumns into catalog dicts + as_dict = column.to_dict() + as_dict['column_name'] = as_dict.pop('column', None) + as_dict['column_type'] = as_dict.pop('dtype') + as_dict['table_database'] = None + yield as_dict + + def get_properties(self, relation: Relation) -> Dict[str, str]: + properties = self.execute_macro( + FETCH_TBL_PROPERTIES_MACRO_NAME, + kwargs={'relation': relation} + ) + return dict(properties) def _get_one_catalog( self, information_schema, schemas, manifest, @@ -306,14 +233,13 @@ def _get_one_catalog( schema = list(schemas)[0] with self.connection_named(name): - columns = [] + columns: List[Dict[str, Any]] = [] for relation in self.list_relations(database, schema): logger.debug("Getting table schema for relation {}", relation) - columns.extend( - self._massage_column_for_catalog(col) - for col in self.get_columns_in_relation(relation) - ) - return agate.Table.from_object(columns) + columns.extend(self._get_columns_for_catalog(relation)) + return agate.Table.from_object( + columns, column_types=DEFAULT_TYPE_TESTER + ) def check_schema_exists(self, database, schema): results = self.execute_macro( diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index d0e11fca6..02253fe5e 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -153,3 +153,8 @@ drop {{ relation.type }} if exists {{ relation }} {%- endcall %} {% endmacro %} + + +{% macro spark__generate_database_name(custom_database_name=none, node=none) -%} + {% do return(None) %} +{%- endmacro %} From 78f2ae176669e914bbb79fcf157b0dec69439df7 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 26 May 2020 14:53:24 -0600 Subject: [PATCH 2/4] =?UTF-8?q?Bump=20dbt=20version:=200.17.0rc2=20?= =?UTF-8?q?=E2=86=92=200.17.0rc3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion-dbt.cfg | 2 +- .bumpversion.cfg | 2 +- dbt/adapters/spark/__version__.py | 2 +- requirements.txt | 2 +- setup.py | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.bumpversion-dbt.cfg b/.bumpversion-dbt.cfg index a169d8179..d9b0cdd5b 100644 --- a/.bumpversion-dbt.cfg +++ b/.bumpversion-dbt.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.17.0rc2 +current_version = 0.17.0rc3 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 5c3ccfa24..fcce6914f 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.17.0rc2 +current_version = 0.17.0rc3 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index ef664b00e..ef097f1b3 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "0.17.0rc2" +version = "0.17.0rc3" diff --git a/requirements.txt b/requirements.txt index 219e52ff3..c14a957d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -dbt-core==0.17.0rc2 +dbt-core==0.17.0rc3 PyHive[hive]>=0.6.0,<0.7.0 thrift>=0.11.0,<0.12.0 diff --git a/setup.py b/setup.py index 00c26e511..e067e75b1 100644 --- a/setup.py +++ b/setup.py @@ -28,9 +28,9 @@ def _dbt_spark_version(): package_version = _dbt_spark_version() description = """The SparkSQL plugin for dbt (data build tool)""" -dbt_version = '0.17.0rc2' +dbt_version = '0.17.0rc3' # the package version should be the dbt version, with maybe some things on the -# ends of it. (0.17.0rc2 vs 0.17.0rc2a1, 0.17.0rc2.1, ...) +# ends of it. (0.17.0rc3 vs 0.17.0rc3a1, 0.17.0rc3.1, ...) if not package_version.startswith(dbt_version): raise ValueError( f'Invalid setup.py: package_version={package_version} must start with ' From 6eb854facffc4de88bb0823b37cb04e831d04440 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Wed, 27 May 2020 08:02:15 -0600 Subject: [PATCH 3/4] Stop monkeying around with database --- dbt/adapters/spark/relation.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 2106e5cba..4aa06f820 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -25,16 +25,8 @@ class SparkRelation(BaseRelation): quote_character: str = '`' def __post_init__(self): - # some core things set database='', which we should ignore. - if self.database and self.database != self.schema: - raise RuntimeException( - f'Error while parsing relation {self.name}: \n' - f' identifier: {self.identifier} \n' - f' schema: {self.schema} \n' - f' database: {self.database} \n' - f'On Spark, database should not be set. Use the schema ' - f'config to set a custom schema/database for this relation.' - ) + if self.database != self.schema and self.database: + raise RuntimeException('Cannot set database in spark!') def render(self): if self.include_policy.database and self.include_policy.schema: From 76c23dbae8b4f200bea4e07eab893378f7f1848e Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 29 May 2020 13:38:20 -0600 Subject: [PATCH 4/4] Default the database to None instead of schema in credentials Fix catalog generation to properly handle 1 database (None!) with many schemas Update unit tests --- dbt/adapters/spark/connections.py | 2 +- dbt/adapters/spark/impl.py | 25 +++++++++++++++++++++++-- test/unit/test_adapter.py | 22 ++++++++++++---------- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 2c152fc82..4b2d89c37 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -53,7 +53,7 @@ def __post_init__(self): f'On Spark, database must be omitted or have the same value as' f' schema.' ) - self.database = self.schema + self.database = None @property def type(self): diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 2e94c106f..5e2b3447d 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,12 +1,13 @@ +from concurrent.futures import Future from dataclasses import dataclass from typing import Optional, List, Dict, Any, Union, Iterable - import agate import dbt import dbt.exceptions from dbt.adapters.base import AdapterConfig +from dbt.adapters.base.impl import catch_as_completed from dbt.adapters.sql import SQLAdapter from dbt.adapters.spark import SparkConnectionManager from dbt.adapters.spark import SparkRelation @@ -14,6 +15,7 @@ from dbt.adapters.base import BaseRelation from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER from dbt.logger import GLOBAL_LOGGER as logger +from dbt.utils import executor GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation' LIST_SCHEMAS_MACRO_NAME = 'list_schemas' @@ -219,6 +221,24 @@ def get_properties(self, relation: Relation) -> Dict[str, str]: ) return dict(properties) + def get_catalog(self, manifest): + schema_map = self._get_catalog_schemas(manifest) + if len(schema_map) != 1: + dbt.exceptions.raise_compiler_error( + f'Expected only one database in get_catalog, found ' + f'{list(schema_map)}' + ) + + with executor(self.config) as tpe: + futures: List[Future[agate.Table]] = [] + for info, schemas in schema_map.items(): + for schema in schemas: + futures.append(tpe.submit( + self._get_one_catalog, info, [schema], manifest + )) + catalogs, exceptions = catch_as_completed(futures) + return catalogs, exceptions + def _get_one_catalog( self, information_schema, schemas, manifest, ) -> agate.Table: @@ -226,7 +246,8 @@ def _get_one_catalog( if len(schemas) != 1: dbt.exceptions.raise_compiler_error( - 'Expected only one schema in spark _get_one_catalog' + f'Expected only one schema in spark _get_one_catalog, found ' + f'{schemas}' ) database = information_schema.database diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index e453c12b1..74f9790a7 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -73,11 +73,11 @@ def hive_http_connect(thrift_transport): connection.handle # trigger lazy-load self.assertEqual(connection.state, 'open') - self.assertNotEqual(connection.handle, None) + self.assertIsNotNone(connection.handle) self.assertEqual(connection.credentials.cluster, '01234-23423-coffeetime') self.assertEqual(connection.credentials.token, 'abc123') self.assertEqual(connection.credentials.schema, 'analytics') - self.assertEqual(connection.credentials.database, 'analytics') + self.assertIsNone(connection.credentials.database) def test_thrift_connection(self): config = self._get_target_thrift(self.project_cfg) @@ -93,9 +93,9 @@ def hive_thrift_connect(host, port, username): connection.handle # trigger lazy-load self.assertEqual(connection.state, 'open') - self.assertNotEqual(connection.handle, None) + self.assertIsNotNone(connection.handle) self.assertEqual(connection.credentials.schema, 'analytics') - self.assertEqual(connection.credentials.database, 'analytics') + self.assertIsNone(connection.credentials.database) def test_parse_relation(self): self.maxDiff = None @@ -106,6 +106,7 @@ def test_parse_relation(self): identifier='mytable', type=rel_type ) + assert relation.database is None # Mimics the output of Spark with a DESCRIBE TABLE EXTENDED plain_rows = [ @@ -117,7 +118,7 @@ def test_parse_relation(self): ('dt', 'date'), (None, None), ('# Detailed Table Information', None), - ('Database', relation.database), + ('Database', None), ('Owner', 'root'), ('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'), ('Last Access', 'Wed May 20 19:25:00 UTC 1925'), @@ -136,7 +137,7 @@ def test_parse_relation(self): rows = SparkAdapter(config).parse_describe_extended(relation, input_cols) self.assertEqual(len(rows), 3) self.assertEqual(rows[0].to_dict(omit_none=False), { - 'table_database': relation.database, + 'table_database': None, 'table_schema': relation.schema, 'table_name': relation.name, 'table_type': rel_type, @@ -150,7 +151,7 @@ def test_parse_relation(self): }) self.assertEqual(rows[1].to_dict(omit_none=False), { - 'table_database': relation.database, + 'table_database': None, 'table_schema': relation.schema, 'table_name': relation.name, 'table_type': rel_type, @@ -164,7 +165,7 @@ def test_parse_relation(self): }) self.assertEqual(rows[2].to_dict(omit_none=False), { - 'table_database': relation.database, + 'table_database': None, 'table_schema': relation.schema, 'table_name': relation.name, 'table_type': rel_type, @@ -186,6 +187,7 @@ def test_parse_relation_with_statistics(self): identifier='mytable', type=rel_type ) + assert relation.database is None # Mimics the output of Spark with a DESCRIBE TABLE EXTENDED plain_rows = [ @@ -193,7 +195,7 @@ def test_parse_relation_with_statistics(self): ('# Partition Information', 'data_type'), (None, None), ('# Detailed Table Information', None), - ('Database', relation.database), + ('Database', None), ('Owner', 'root'), ('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'), ('Last Access', 'Wed May 20 19:25:00 UTC 1925'), @@ -213,7 +215,7 @@ def test_parse_relation_with_statistics(self): rows = SparkAdapter(config).parse_describe_extended(relation, input_cols) self.assertEqual(len(rows), 1) self.assertEqual(rows[0].to_dict(omit_none=False), { - 'table_database': relation.database, + 'table_database': None, 'table_schema': relation.schema, 'table_name': relation.name, 'table_type': rel_type,