-
Notifications
You must be signed in to change notification settings - Fork 40
/
relation.py
552 lines (460 loc) · 18.5 KB
/
relation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
from collections.abc import Hashable
from dataclasses import dataclass, field
from datetime import datetime
from typing import (
Any,
Dict,
FrozenSet,
Iterator,
List,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)
from dbt_common.exceptions import CompilationError, DbtRuntimeError
from dbt_common.utils import deep_merge, filter_null_values
from dbt.adapters.contracts.relation import (
ComponentName,
HasQuoting,
FakeAPIObject,
Path,
Policy,
RelationConfig,
RelationType,
)
from dbt.adapters.exceptions import (
ApproximateMatchError,
MultipleDatabasesNotAllowedError,
)
from dbt.adapters.utils import classproperty
Self = TypeVar("Self", bound="BaseRelation")
SerializableIterable = Union[Tuple, FrozenSet]
@dataclass
class EventTimeFilter(FakeAPIObject):
field_name: str
start: Optional[datetime] = None
end: Optional[datetime] = None
@dataclass(frozen=True, eq=False, repr=False)
class BaseRelation(FakeAPIObject, Hashable):
path: Path
type: Optional[RelationType] = None
quote_character: str = '"'
# Python 3.11 requires that these use default_factory instead of simple default
# ValueError: mutable default <class 'dbt.contracts.relation.Policy'> for field include_policy is not allowed: use default_factory
include_policy: Policy = field(default_factory=lambda: Policy())
quote_policy: Policy = field(default_factory=lambda: Policy())
dbt_created: bool = False
limit: Optional[int] = None
event_time_filter: Optional[EventTimeFilter] = None
require_alias: bool = (
True # used to govern whether to add an alias when render_limited is called
)
# register relation types that can be renamed for the purpose of replacing relations using stages and backups
# adding a relation type here also requires defining the associated rename macro
# e.g. adding RelationType.View in dbt-postgres requires that you define:
# include/postgres/macros/relations/view/rename.sql::postgres__get_rename_view_sql()
renameable_relations: SerializableIterable = field(default_factory=frozenset)
# register relation types that are atomically replaceable, e.g. they have "create or replace" syntax
# adding a relation type here also requires defining the associated replace macro
# e.g. adding RelationType.View in dbt-postgres requires that you define:
# include/postgres/macros/relations/view/replace.sql::postgres__get_replace_view_sql()
replaceable_relations: SerializableIterable = field(default_factory=frozenset)
def _is_exactish_match(self, field: ComponentName, value: str) -> bool:
if self.dbt_created and self.quote_policy.get_part(field) is False:
return self.path.get_lowered_part(field) == value.lower()
else:
return self.path.get_part(field) == value
@classmethod
def _get_field_named(cls, field_name):
for f, _ in cls._get_fields():
if f.name == field_name:
return f
# this should be unreachable
raise ValueError(f"BaseRelation has no {field_name} field!")
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.to_dict(omit_none=True) == other.to_dict(omit_none=True)
@classmethod
def get_default_quote_policy(cls) -> Policy:
return cls._get_field_named("quote_policy").default_factory()
@classmethod
def get_default_include_policy(cls) -> Policy:
return cls._get_field_named("include_policy").default_factory()
def get(self, key, default=None):
"""Override `.get` to return a metadata object so we don't break
dbt_utils.
"""
if key == "metadata":
return {"type": self.__class__.__name__}
return super().get(key, default)
def matches(
self,
database: Optional[str] = None,
schema: Optional[str] = None,
identifier: Optional[str] = None,
) -> bool:
search = filter_null_values(
{
ComponentName.Database: database,
ComponentName.Schema: schema,
ComponentName.Identifier: identifier,
}
)
if not search:
# nothing was passed in
raise DbtRuntimeError("Tried to match relation, but no search path was passed!")
exact_match = True
approximate_match = True
for k, v in search.items():
if not self._is_exactish_match(k, v):
exact_match = False
if str(self.path.get_lowered_part(k)).strip(self.quote_character) != v.lower().strip(
self.quote_character
):
approximate_match = False # type: ignore[union-attr]
if approximate_match and not exact_match:
target = self.create(database=database, schema=schema, identifier=identifier)
raise ApproximateMatchError(target, self)
return exact_match
def replace_path(self, **kwargs):
return self.replace(path=self.path.replace(**kwargs))
def quote(
self: Self,
database: Optional[bool] = None,
schema: Optional[bool] = None,
identifier: Optional[bool] = None,
) -> Self:
policy = filter_null_values(
{
ComponentName.Database: database,
ComponentName.Schema: schema,
ComponentName.Identifier: identifier,
}
)
new_quote_policy = self.quote_policy.replace_dict(policy)
return self.replace(quote_policy=new_quote_policy)
def include(
self: Self,
database: Optional[bool] = None,
schema: Optional[bool] = None,
identifier: Optional[bool] = None,
) -> Self:
policy = filter_null_values(
{
ComponentName.Database: database,
ComponentName.Schema: schema,
ComponentName.Identifier: identifier,
}
)
new_include_policy = self.include_policy.replace_dict(policy)
return self.replace(include_policy=new_include_policy)
def information_schema(self, view_name=None) -> "InformationSchema":
# some of our data comes from jinja, where things can be `Undefined`.
if not isinstance(view_name, str):
view_name = None
# Kick the user-supplied schema out of the information schema relation
# Instead address this as <database>.information_schema by default
info_schema = InformationSchema.from_relation(self, view_name)
return info_schema.incorporate(path={"schema": None})
def information_schema_only(self) -> "InformationSchema":
return self.information_schema()
def without_identifier(self) -> "BaseRelation":
"""Return a form of this relation that only has the database and schema
set to included. To get the appropriately-quoted form the schema out of
the result (for use as part of a query), use `.render()`. To get the
raw database or schema name, use `.database` or `.schema`.
The hash of the returned object is the result of render().
"""
return self.include(identifier=False).replace_path(identifier=None)
def _render_iterator(
self,
) -> Iterator[Tuple[Optional[ComponentName], Optional[str]]]:
for key in ComponentName: # type: ignore
path_part: Optional[str] = None
if self.include_policy.get_part(key):
path_part = self.path.get_part(key)
if path_part is not None and self.quote_policy.get_part(key):
path_part = self.quoted(path_part)
yield key, path_part
def render(self) -> str:
# if there is nothing set, this will return the empty string.
return ".".join(part for _, part in self._render_iterator() if part is not None)
def _render_subquery_alias(self, namespace: str) -> str:
"""Some databases require an alias for subqueries (postgres, mysql) for all others we want to avoid adding
an alias as it has the potential to introduce issues with the query if the user also defines an alias.
"""
if self.require_alias:
return f" _dbt_{namespace}_subq_{self.table}"
return ""
def _render_limited_alias(
self,
) -> str:
return self._render_subquery_alias(namespace="limit")
def render_limited(self) -> str:
rendered = self.render()
if self.limit is None:
return rendered
elif self.limit == 0:
return f"(select * from {rendered} where false limit 0){self._render_limited_alias()}"
else:
return f"(select * from {rendered} limit {self.limit}){self._render_limited_alias()}"
def render_event_time_filtered(self, rendered: Optional[str] = None) -> str:
rendered = rendered or self.render()
if self.event_time_filter is None:
return rendered
filter = self._render_event_time_filtered(self.event_time_filter)
if not filter:
return rendered
return f"(select * from {rendered} where {filter}){self._render_subquery_alias(namespace='et_filter')}"
def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str:
"""
Returns "" if start and end are both None
"""
filter = ""
if event_time_filter.start and event_time_filter.end:
filter = f"{event_time_filter.field_name} >= '{event_time_filter.start}' and {event_time_filter.field_name} < '{event_time_filter.end}'"
elif event_time_filter.start:
filter = f"{event_time_filter.field_name} >= '{event_time_filter.start}'"
elif event_time_filter.end:
filter = f"{event_time_filter.field_name} < '{event_time_filter.end}'"
return filter
def quoted(self, identifier):
return "{quote_char}{identifier}{quote_char}".format(
quote_char=self.quote_character,
identifier=identifier,
)
@staticmethod
def add_ephemeral_prefix(name: str):
return f"__dbt__cte__{name}"
@classmethod
def create_ephemeral_from(
cls: Type[Self],
relation_config: RelationConfig,
limit: Optional[int] = None,
event_time_filter: Optional[EventTimeFilter] = None,
) -> Self:
# Note that ephemeral models are based on the identifier, which will
# point to the model's alias if one exists and otherwise fall back to
# the filename. This is intended to give the user more control over
# the way that the CTE name is constructed
identifier = cls.add_ephemeral_prefix(relation_config.identifier)
return cls.create(
type=cls.CTE,
identifier=identifier,
limit=limit,
event_time_filter=event_time_filter,
).quote(identifier=False)
@classmethod
def create_from(
cls: Type[Self],
quoting: HasQuoting,
relation_config: RelationConfig,
**kwargs: Any,
) -> Self:
quote_policy = kwargs.pop("quote_policy", {})
config_quoting = relation_config.quoting_dict
config_quoting.pop("column", None)
# precedence: kwargs quoting > relation config quoting > base quoting > default quoting
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
quoting.quoting,
config_quoting,
quote_policy,
)
return cls.create(
database=relation_config.database,
schema=relation_config.schema,
identifier=relation_config.identifier,
quote_policy=quote_policy,
**kwargs,
)
@classmethod
def create(
cls: Type[Self],
database: Optional[str] = None,
schema: Optional[str] = None,
identifier: Optional[str] = None,
type: Optional[RelationType] = None,
**kwargs,
) -> Self:
kwargs.update(
{
"path": {
"database": database,
"schema": schema,
"identifier": identifier,
},
"type": type,
}
)
return cls.from_dict(kwargs)
@classmethod
def scd_args(cls: Type[Self], primary_key: Union[str, List[str]], updated_at) -> List[str]:
scd_args = []
if isinstance(primary_key, list):
scd_args.extend(primary_key)
else:
scd_args.append(primary_key)
scd_args.append(updated_at)
return scd_args
@property
def can_be_renamed(self) -> bool:
return self.type in self.renameable_relations
@property
def can_be_replaced(self) -> bool:
return self.type in self.replaceable_relations
def __repr__(self) -> str:
return "<{} {}>".format(self.__class__.__name__, self.render())
def __hash__(self) -> int:
return hash(self.render())
def __str__(self) -> str:
rendered = self.render() if self.limit is None else self.render_limited()
# Limited subquery is wrapped by the event time filter subquery, and not the other way around.
# This is because in the context of resolving limited refs, we care more about performance than reliably producing a sample of a certain size.
if self.event_time_filter:
rendered = self.render_event_time_filtered(rendered)
return rendered
@property
def database(self) -> Optional[str]:
return self.path.database
@property
def schema(self) -> Optional[str]:
return self.path.schema
@property
def identifier(self) -> Optional[str]:
return self.path.identifier
@property
def table(self) -> Optional[str]:
return self.path.identifier
# Here for compatibility with old Relation interface
@property
def name(self) -> Optional[str]:
return self.identifier
@property
def is_table(self) -> bool:
return self.type == RelationType.Table
@property
def is_cte(self) -> bool:
return self.type == RelationType.CTE
@property
def is_view(self) -> bool:
return self.type == RelationType.View
@property
def is_materialized_view(self) -> bool:
return self.type == RelationType.MaterializedView
@classproperty
def Table(cls) -> str:
return str(RelationType.Table)
@classproperty
def CTE(cls) -> str:
return str(RelationType.CTE)
@classproperty
def View(cls) -> str:
return str(RelationType.View)
@classproperty
def External(cls) -> str:
return str(RelationType.External)
@classproperty
def MaterializedView(cls) -> str:
return str(RelationType.MaterializedView)
@classproperty
def get_relation_type(cls) -> Type[RelationType]:
return RelationType
Info = TypeVar("Info", bound="InformationSchema")
@dataclass(frozen=True, eq=False, repr=False)
class InformationSchema(BaseRelation):
information_schema_view: Optional[str] = None
def __post_init__(self):
if not isinstance(self.information_schema_view, (type(None), str)):
raise CompilationError("Got an invalid name: {}".format(self.information_schema_view))
@classmethod
def get_path(cls, relation: BaseRelation, information_schema_view: Optional[str]) -> Path:
return Path(
database=relation.database,
schema=relation.schema,
identifier="INFORMATION_SCHEMA",
)
@classmethod
def get_include_policy(
cls,
relation,
information_schema_view: Optional[str],
) -> Policy:
return relation.include_policy.replace(
database=relation.database is not None,
schema=False,
identifier=True,
)
@classmethod
def get_quote_policy(
cls,
relation,
information_schema_view: Optional[str],
) -> Policy:
return relation.quote_policy.replace(
identifier=False,
)
@classmethod
def from_relation(
cls: Type[Info],
relation: BaseRelation,
information_schema_view: Optional[str],
) -> Info:
include_policy = cls.get_include_policy(relation, information_schema_view)
quote_policy = cls.get_quote_policy(relation, information_schema_view)
path = cls.get_path(relation, information_schema_view)
return cls(
type=RelationType.View, # type: ignore
path=path,
include_policy=include_policy,
quote_policy=quote_policy,
information_schema_view=information_schema_view,
)
def _render_iterator(self):
for k, v in super()._render_iterator():
yield k, v
yield None, self.information_schema_view
class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas. The schema values are all lowercased to avoid
duplication.
"""
def add(self, relation: BaseRelation):
key = relation.information_schema_only()
if key not in self:
self[key] = set()
schema: Optional[str] = None
if relation.schema is not None:
schema = relation.schema.lower()
self[key].add(schema)
def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
for information_schema, schemas in self.items():
for schema in schemas:
yield information_schema, schema
def flatten(self, allow_multiple_databases: bool = False) -> "SchemaSearchMap":
new = self.__class__()
# make sure we don't have multiple databases if allow_multiple_databases is set to False
if not allow_multiple_databases:
seen = {r.database.lower() for r in self if r.database}
if len(seen) > 1:
raise MultipleDatabasesNotAllowedError(seen)
for information_schema_name, schema in self.search():
path = {"database": information_schema_name.database, "schema": schema}
new.add(
information_schema_name.incorporate(
path=path,
quote_policy={"database": False},
include_policy={"database": False},
)
)
return new
@dataclass(frozen=True, eq=False, repr=False)
class AdapterTrackingRelationInfo(FakeAPIObject, Hashable):
adapter_name: str
base_adapter_version: str
adapter_version: str
model_adapter_details: Any