Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the catalog, making use of rc3 fixes #92

Merged
merged 4 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion-dbt.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.17.0rc2
current_version = 0.17.0rc3
parse = (?P<major>\d+)
\.(?P<minor>\d+)
\.(?P<patch>\d+)
Expand Down
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.17.0rc2
current_version = 0.17.0rc3
parse = (?P<major>\d+)
\.(?P<minor>\d+)
\.(?P<patch>\d+)
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/spark/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.17.0rc2"
version = "0.17.0rc3"
2 changes: 1 addition & 1 deletion dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __post_init__(self):
f'On Spark, database must be omitted or have the same value as'
f' schema.'
)
self.database = self.schema
self.database = None

@property
def type(self):
Expand Down
165 changes: 56 additions & 109 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Optional, List, Dict, Any, Union

from typing import Optional, List, Dict, Any, Union, Iterable
import agate
import dbt.exceptions

import dbt
import dbt.exceptions

from dbt.adapters.base import AdapterConfig
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.sql import SQLAdapter

from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
from dbt.adapters.spark import SparkColumn


from dbt.adapters.base import BaseRelation

from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import executor

GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation'
LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching'
DROP_RELATION_MACRO_NAME = 'drop_relation'
FETCH_TBL_PROPERTIES_MACRO_NAME = 'fetch_tbl_properties'

KEY_TABLE_OWNER = 'Owner'
KEY_TABLE_STATISTICS = 'Statistics'
Expand Down Expand Up @@ -171,7 +173,7 @@ def parse_describe_extended(
raw_table_stats = metadata.get(KEY_TABLE_STATISTICS)
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
return [SparkColumn(
table_database=relation.database,
table_database=None,
table_schema=relation.schema,
table_name=relation.name,
table_type=relation.type,
Expand All @@ -195,102 +197,47 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
rows: List[agate.Row] = super().get_columns_in_relation(relation)
return self.parse_describe_extended(relation, rows)

@staticmethod
def _parse_relation(relation: Relation,
table_columns: List[Column],
rel_type: str,
properties: Dict[str, str] = None) -> List[dict]:
properties = properties or {}
statistics = {}
table_owner_key = 'Owner'

# First check if it is present in the properties
table_owner = properties.get(table_owner_key)

found_detailed_table_marker = False
for column in table_columns:
if column.name == '# Detailed Table Information':
found_detailed_table_marker = True

# In case there is another column with the name Owner
if not found_detailed_table_marker:
continue

if not table_owner and column.name == table_owner_key:
table_owner = column.data_type

if column.name == 'Statistics':
# format: 1109049927 bytes, 14093476 rows
statistics = {stats.split(" ")[1]: int(stats.split(" ")[0]) for
stats in column.data_type.split(', ')}

columns = []
for column_index, column in enumerate(table_columns):
# Fixes for pseudo-columns with no type
if column.name in {
'# Partition Information',
'# col_name',
''
}:
continue
elif column.name == '# Detailed Table Information':
# Loop until the detailed table information
break
elif column.data_type is None:
continue

column_data = (
relation.database,
relation.schema,
relation.name,
rel_type,
None,
table_owner,
column.name,
column_index,
column.data_type,
None,

# Table level stats
'Table size',
statistics.get("bytes"),
"The size of the table in bytes",
statistics.get("bytes") is not None,

# Column level stats
'Number of rows',
statistics.get("rows"),
"The number of rows in the table",
statistics.get("rows") is not None
def _get_columns_for_catalog(
self, relation: SparkRelation
) -> Iterable[Dict[str, Any]]:
properties = self.get_properties(relation)
columns = self.get_columns_in_relation(relation)
owner = properties.get(KEY_TABLE_OWNER)

for column in columns:
if owner:
column.table_owner = owner
# convert SparkColumns into catalog dicts
as_dict = column.to_dict()
as_dict['column_name'] = as_dict.pop('column', None)
as_dict['column_type'] = as_dict.pop('dtype')
as_dict['table_database'] = None
yield as_dict

def get_properties(self, relation: Relation) -> Dict[str, str]:
properties = self.execute_macro(
FETCH_TBL_PROPERTIES_MACRO_NAME,
kwargs={'relation': relation}
)
return dict(properties)

def get_catalog(self, manifest):
schema_map = self._get_catalog_schemas(manifest)
if len(schema_map) != 1:
dbt.exceptions.raise_compiler_error(
f'Expected only one database in get_catalog, found '
f'{list(schema_map)}'
)

column_dict = dict(zip(SparkAdapter.COLUMN_NAMES, column_data))
columns.append(column_dict)

return columns

def _massage_column_for_catalog(
self, column: SparkColumn
) -> Dict[str, Any]:
dct = column.to_dict()
# different expectations here - Column.column is the name
dct['column_name'] = dct.pop('column')
dct['column_type'] = dct.pop('dtype')
# table_database can't be None in core.
if dct['table_database'] is None:
dct['table_database'] = dct['table_schema']
return dct

def _get_catalog_for_relations(self, database: str, schema: str):
with self.connection_named(f'{database}.{schema}'):
columns = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(
self._massage_column_for_catalog(col)
for col in self.get_columns_in_relation(relation)
)
return agate.Table.from_object(columns)
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(tpe.submit(
self._get_one_catalog, info, [schema], manifest
))
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
self, information_schema, schemas, manifest,
Expand All @@ -299,21 +246,21 @@ def _get_one_catalog(

if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
'Expected only one schema in spark _get_one_catalog'
f'Expected only one schema in spark _get_one_catalog, found '
f'{schemas}'
)

database = information_schema.database
schema = list(schemas)[0]

with self.connection_named(name):
columns = []
columns: List[Dict[str, Any]] = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(
self._massage_column_for_catalog(col)
for col in self.get_columns_in_relation(relation)
)
return agate.Table.from_object(columns)
columns.extend(self._get_columns_for_catalog(relation))
return agate.Table.from_object(
columns, column_types=DEFAULT_TYPE_TESTER
)

def check_schema_exists(self, database, schema):
results = self.execute_macro(
Expand Down
12 changes: 2 additions & 10 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,8 @@ class SparkRelation(BaseRelation):
quote_character: str = '`'

def __post_init__(self):
# some core things set database='', which we should ignore.
if self.database and self.database != self.schema:
raise RuntimeException(
f'Error while parsing relation {self.name}: \n'
f' identifier: {self.identifier} \n'
f' schema: {self.schema} \n'
f' database: {self.database} \n'
f'On Spark, database should not be set. Use the schema '
f'config to set a custom schema/database for this relation.'
)
if self.database != self.schema and self.database:
raise RuntimeException('Cannot set database in spark!')

def render(self):
if self.include_policy.database and self.include_policy.schema:
Expand Down
5 changes: 5 additions & 0 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,8 @@
drop {{ relation.type }} if exists {{ relation }}
{%- endcall %}
{% endmacro %}


{% macro spark__generate_database_name(custom_database_name=none, node=none) -%}
{% do return(None) %}
{%- endmacro %}
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
dbt-core==0.17.0rc2
dbt-core==0.17.0rc3
PyHive[hive]>=0.6.0,<0.7.0
thrift>=0.11.0,<0.12.0
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def _dbt_spark_version():
package_version = _dbt_spark_version()
description = """The SparkSQL plugin for dbt (data build tool)"""

dbt_version = '0.17.0rc2'
dbt_version = '0.17.0rc3'
# the package version should be the dbt version, with maybe some things on the
# ends of it. (0.17.0rc2 vs 0.17.0rc2a1, 0.17.0rc2.1, ...)
# ends of it. (0.17.0rc3 vs 0.17.0rc3a1, 0.17.0rc3.1, ...)
if not package_version.startswith(dbt_version):
raise ValueError(
f'Invalid setup.py: package_version={package_version} must start with '
Expand Down
22 changes: 12 additions & 10 deletions test/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ def hive_http_connect(thrift_transport):
connection.handle # trigger lazy-load

self.assertEqual(connection.state, 'open')
self.assertNotEqual(connection.handle, None)
self.assertIsNotNone(connection.handle)
self.assertEqual(connection.credentials.cluster, '01234-23423-coffeetime')
self.assertEqual(connection.credentials.token, 'abc123')
self.assertEqual(connection.credentials.schema, 'analytics')
self.assertEqual(connection.credentials.database, 'analytics')
self.assertIsNone(connection.credentials.database)

def test_thrift_connection(self):
config = self._get_target_thrift(self.project_cfg)
Expand All @@ -93,9 +93,9 @@ def hive_thrift_connect(host, port, username):
connection.handle # trigger lazy-load

self.assertEqual(connection.state, 'open')
self.assertNotEqual(connection.handle, None)
self.assertIsNotNone(connection.handle)
self.assertEqual(connection.credentials.schema, 'analytics')
self.assertEqual(connection.credentials.database, 'analytics')
self.assertIsNone(connection.credentials.database)

def test_parse_relation(self):
self.maxDiff = None
Expand All @@ -106,6 +106,7 @@ def test_parse_relation(self):
identifier='mytable',
type=rel_type
)
assert relation.database is None

# Mimics the output of Spark with a DESCRIBE TABLE EXTENDED
plain_rows = [
Expand All @@ -117,7 +118,7 @@ def test_parse_relation(self):
('dt', 'date'),
(None, None),
('# Detailed Table Information', None),
('Database', relation.database),
('Database', None),
('Owner', 'root'),
('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'),
('Last Access', 'Wed May 20 19:25:00 UTC 1925'),
Expand All @@ -136,7 +137,7 @@ def test_parse_relation(self):
rows = SparkAdapter(config).parse_describe_extended(relation, input_cols)
self.assertEqual(len(rows), 3)
self.assertEqual(rows[0].to_dict(omit_none=False), {
'table_database': relation.database,
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
Expand All @@ -150,7 +151,7 @@ def test_parse_relation(self):
})

self.assertEqual(rows[1].to_dict(omit_none=False), {
'table_database': relation.database,
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
Expand All @@ -164,7 +165,7 @@ def test_parse_relation(self):
})

self.assertEqual(rows[2].to_dict(omit_none=False), {
'table_database': relation.database,
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
Expand All @@ -186,14 +187,15 @@ def test_parse_relation_with_statistics(self):
identifier='mytable',
type=rel_type
)
assert relation.database is None

# Mimics the output of Spark with a DESCRIBE TABLE EXTENDED
plain_rows = [
('col1', 'decimal(22,0)'),
('# Partition Information', 'data_type'),
(None, None),
('# Detailed Table Information', None),
('Database', relation.database),
('Database', None),
('Owner', 'root'),
('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'),
('Last Access', 'Wed May 20 19:25:00 UTC 1925'),
Expand All @@ -213,7 +215,7 @@ def test_parse_relation_with_statistics(self):
rows = SparkAdapter(config).parse_describe_extended(relation, input_cols)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].to_dict(omit_none=False), {
'table_database': relation.database,
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
Expand Down