Skip to content

Commit

Permalink
Support new error messages. (#226)
Browse files Browse the repository at this point in the history
### Description

Supports new error messages.

In `SparkAdapter.get_columns_in_relation`, it checks the error message when the specified table or view doesn't exist:

https://github.com/dbt-labs/dbt-spark/blob/c87b6b2c48bcefb0ce52cd64984d3129d6f14ea0/dbt/adapters/spark/impl.py#L223

but, Spark will change the error message in the future release (apache/spark#37887), which causes the function to raise the `dbt.exceptions.RuntimeException` instead of returning an empty list.

The function should also check whether the error message contains `[TABLE_OR_VIEW_NOT_FOUND]` or not.

This will be reverted once dbt-labs/dbt-spark#515 is resolved.
  • Loading branch information
ueshin committed Nov 15, 2022
1 parent 4ea9bce commit df6abc0
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.spark.impl import (
SparkAdapter,
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME,
KEY_TABLE_OWNER,
KEY_TABLE_STATISTICS,
LIST_RELATIONS_MACRO_NAME,
Expand Down Expand Up @@ -162,6 +163,48 @@ def parse_describe_extended(
for idx, column in enumerate(rows)
]

def get_columns_in_relation(self, relation: DatabricksRelation) -> List[DatabricksColumn]:
cached_relations = self.cache.get_relations(relation.database, relation.schema)
cached_relation = next(
(
cached_relation
for cached_relation in cached_relations
if str(cached_relation) == str(relation)
),
None,
)
columns = []
if cached_relation and cached_relation.information:
columns = self.parse_columns_from_information(cached_relation)
if not columns:
# in open source delta 'show table extended' query output doesnt
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
try:
rows: List[Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if any(
msg in errmsg
for msg in (
"[TABLE_OR_VIEW_NOT_FOUND]",
"Table or view not found",
"NoSuchTableException",
)
):
pass
else:
raise e

# strip hudi metadata columns.
return [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS]

def parse_columns_from_information(
self, relation: DatabricksRelation
) -> List[DatabricksColumn]:
Expand Down

0 comments on commit df6abc0

Please sign in to comment.