-
Notifications
You must be signed in to change notification settings - Fork 235
/
column.py
67 lines (55 loc) · 2.2 KB
/
column.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from dataclasses import dataclass
from typing import TypeVar, Optional, Dict, Any
from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from hologram import JsonDict
Self = TypeVar('Self', bound='SparkColumn')
@dataclass
class SparkColumn(dbtClassMixin, Column):
table_database: Optional[str] = None
table_schema: Optional[str] = None
table_name: Optional[str] = None
table_type: Optional[str] = None
table_owner: Optional[str] = None
table_stats: Optional[Dict[str, Any]] = None
column_index: Optional[int] = None
@classmethod
def translate_type(cls, dtype: str) -> str:
return dtype
def can_expand_to(self: Self, other_column: Self) -> bool:
"""returns True if both columns are strings"""
return self.is_string() and other_column.is_string()
def literal(self, value):
return "cast({} as {})".format(value, self.dtype)
@property
def quoted(self) -> str:
return '`{}`'.format(self.column)
@property
def data_type(self) -> str:
return self.dtype
def __repr__(self) -> str:
return "<SparkColumn {} ({})>".format(self.name, self.data_type)
@staticmethod
def convert_table_stats(raw_stats: Optional[str]) -> Dict[str, Any]:
table_stats = {}
if raw_stats:
# format: 1109049927 bytes, 14093476 rows
stats = {
stats.split(" ")[1]: int(stats.split(" ")[0])
for stats in raw_stats.split(', ')
}
for key, val in stats.items():
table_stats[f'stats:{key}:label'] = key
table_stats[f'stats:{key}:value'] = val
table_stats[f'stats:{key}:description'] = ''
table_stats[f'stats:{key}:include'] = True
return table_stats
def to_column_dict(
self, omit_none: bool = True, validate: bool = False
) -> JsonDict:
original_dict = self.to_dict(omit_none=omit_none)
# If there are stats, merge them into the root of the dict
original_stats = original_dict.pop('table_stats', None)
if original_stats:
original_dict.update(original_stats)
return original_dict