Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove manifest from adapter.get_catalog signature #9212

Closed
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20231205-120559.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove usage of dbt.contracts in dbt/adapters
time: 2023-12-05T12:05:59.936775+09:00
custom:
Author: michelleark
Issue: "9208"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20231205-170725.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Introduce RelationConfig Protocol, consolidate Relation.create_from
time: 2023-12-05T17:07:25.33861+09:00
custom:
Author: michelleark
Issue: "9215"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20231206-000343.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: ' remove manifest from adapter catalog method signatures'
time: 2023-12-06T00:03:43.824252+09:00
custom:
Author: michelleark
Issue: "9218"
77 changes: 39 additions & 38 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from datetime import datetime
from enum import Enum
import time
from itertools import chain
from typing import (
Any,
Callable,
Expand All @@ -19,6 +18,7 @@
Type,
TypedDict,
Union,
FrozenSet,
)
from multiprocessing.context import SpawnContext

Expand Down Expand Up @@ -76,6 +76,7 @@
)
from dbt.common.utils import filter_null_values, executor, cast_to_str, AttrDict

from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.base.connections import Connection, AdapterResponse, BaseConnectionManager
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
Expand Down Expand Up @@ -110,11 +111,13 @@
return row[key]


def _catalog_filter_schemas(manifest: Manifest) -> Callable[[agate.Row], bool]:
def _catalog_filter_schemas(
used_schemas: FrozenSet[Tuple[str, str]]
) -> Callable[[agate.Row], bool]:
"""Return a function that takes a row and decides if the row should be
included in the catalog output.
"""
schemas = frozenset((d.lower(), s.lower()) for d, s in manifest.get_used_schemas())
schemas = frozenset((d.lower(), s.lower()) for d, s in used_schemas)

def test(row: agate.Row) -> bool:
table_database = _expect_row_value("table_database", row)
Expand Down Expand Up @@ -429,12 +432,12 @@
"""
# the cache only cares about executable nodes
return {
self.Relation.create_from(self.config, node).without_identifier()
self.Relation.create_from(self.config, node).without_identifier() # type: ignore[arg-type]
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model and not node.is_external_node)
}

def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
def _get_catalog_schemas(self, relation_configs: Iterable[RelationConfig]) -> SchemaSearchMap:
"""Get a mapping of each node's "information_schema" relations to a
set of all schemas expected in that information_schema.

Expand All @@ -444,7 +447,7 @@
lowercase strings.
"""
info_schema_name_map = SchemaSearchMap()
relations = self._get_catalog_relations(manifest)
relations = self._get_catalog_relations(relation_configs)
for relation in relations:
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
Expand All @@ -465,18 +468,13 @@

return relations_by_info_schema

def _get_catalog_relations(self, manifest: Manifest) -> List[BaseRelation]:

nodes = chain(
[
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
],
manifest.sources.values(),
)

relations = [self.Relation.create_from(self.config, n) for n in nodes]
def _get_catalog_relations(
self, relation_configs: Iterable[RelationConfig]
) -> List[BaseRelation]:
relations = [

Check warning on line 474 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L474

Added line #L474 was not covered by tests
self.Relation.create_from(quoting=self.config, config=relation_config)
for relation_config in relation_configs
]
return relations

def _relations_cache_for_schemas(
Expand Down Expand Up @@ -1122,7 +1120,9 @@
return result

@classmethod
def _catalog_filter_table(cls, table: agate.Table, manifest: Manifest) -> agate.Table:
def _catalog_filter_table(
cls, table: agate.Table, used_schemas: FrozenSet[Tuple[str, str]]
) -> agate.Table:
"""Filter the table as appropriate for catalog entries. Subclasses can
override this to change filtering rules on a per-adapter basis.
"""
Expand All @@ -1132,31 +1132,28 @@
table.column_names,
text_only_columns=["table_database", "table_schema", "table_name"],
)
return table.where(_catalog_filter_schemas(manifest))
return table.where(_catalog_filter_schemas(used_schemas))

def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
used_schemas: FrozenSet[Tuple[str, str]],
) -> agate.Table:
kwargs = {"information_schema": information_schema, "schemas": schemas}
table = self.execute_macro(
GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest so we get any local project
# overrides
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manifest=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
results = self._catalog_filter_table(table, used_schemas) # type: ignore[arg-type]
return results

def _get_one_catalog_by_relations(
self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
used_schemas: FrozenSet[Tuple[str, str]],
) -> agate.Table:

kwargs = {
Expand All @@ -1166,16 +1163,16 @@
table = self.execute_macro(
GET_CATALOG_RELATIONS_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest, so we get any local project
# overrides
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is quite old, and I believe there is testing in place for this (will dig it up shortly) and that it is safe to remove at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manifest=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
results = self._catalog_filter_table(table, used_schemas) # type: ignore[arg-type]
return results

def get_filtered_catalog(
self, manifest: Manifest, relations: Optional[Set[BaseRelation]] = None
self,
relation_configs: Iterable[RelationConfig],
used_schemas: FrozenSet[Tuple[str, str]],
relations: Optional[Set[BaseRelation]] = None,
):
catalogs: agate.Table
if (
Expand All @@ -1184,11 +1181,11 @@
or not self.supports(Capability.SchemaMetadataByRelations)
):
# Do it the traditional way. We get the full catalog.
catalogs, exceptions = self.get_catalog(manifest)
catalogs, exceptions = self.get_catalog(relation_configs, used_schemas)

Check warning on line 1184 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1184

Added line #L1184 was not covered by tests
else:
# Do it the new way. We try to save time by selecting information
# only for the exact set of relations we are interested in.
catalogs, exceptions = self.get_catalog_by_relations(manifest, relations)
catalogs, exceptions = self.get_catalog_by_relations(used_schemas, relations)

if relations and catalogs:
relation_map = {
Expand Down Expand Up @@ -1216,24 +1213,28 @@
def row_matches_relation(self, row: agate.Row, relations: Set[BaseRelation]):
pass

def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
def get_catalog(
self,
relation_configs: Iterable[RelationConfig],
used_schemas: FrozenSet[Tuple[str, str]],
) -> Tuple[agate.Table, List[Exception]]:
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
schema_map: SchemaSearchMap = self._get_catalog_schemas(manifest)
schema_map: SchemaSearchMap = self._get_catalog_schemas(relation_configs)
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])
fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
self, name, self._get_one_catalog, info, schemas, used_schemas
)
futures.append(fut)

catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def get_catalog_by_relations(
self, manifest: Manifest, relations: Set[BaseRelation]
self, used_schemas: FrozenSet[Tuple[str, str]], relations: Set[BaseRelation]
) -> Tuple[agate.Table, List[Exception]]:
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
Expand All @@ -1247,7 +1248,7 @@
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
used_schemas,
)
futures.append(fut)

Expand Down
77 changes: 21 additions & 56 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@
from dataclasses import dataclass, field
from typing import Optional, TypeVar, Any, Type, Dict, Iterator, Tuple, Set, Union, FrozenSet

from dbt.contracts.graph.nodes import SourceDefinition, ManifestNode, ResultNode, ParsedNode
from dbt.contracts.relation import (
from dbt.adapters.contracts.relation import (
RelationConfig,
RelationType,
ComponentName,
HasQuoting,
FakeAPIObject,
Policy,
Path,
)
from dbt.common.exceptions import DbtInternalError
from dbt.adapters.exceptions import MultipleDatabasesNotAllowedError, ApproximateMatchError
from dbt.node_types import NodeType
from dbt.common.utils import filter_null_values, deep_merge
from dbt.adapters.utils import classproperty

Expand Down Expand Up @@ -198,33 +196,14 @@ def quoted(self, identifier):
identifier=identifier,
)

@classmethod
def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) -> Self:
source_quoting = source.quoting.to_dict(omit_none=True)
source_quoting.pop("column", None)
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
source_quoting,
kwargs.get("quote_policy", {}),
)

return cls.create(
database=source.database,
schema=source.schema,
identifier=source.identifier,
quote_policy=quote_policy,
**kwargs,
)

@staticmethod
def add_ephemeral_prefix(name: str):
return f"__dbt__cte__{name}"

@classmethod
def create_ephemeral_from_node(
def create_ephemeral_from(
cls: Type[Self],
config: HasQuoting,
node: ManifestNode,
node: RelationConfig,
) -> Self:
# Note that ephemeral models are based on the name.
identifier = cls.add_ephemeral_prefix(node.name)
Expand All @@ -234,47 +213,33 @@ def create_ephemeral_from_node(
).quote(identifier=False)

@classmethod
def create_from_node(
def create_from(
cls: Type[Self],
config: HasQuoting,
node,
quote_policy: Optional[Dict[str, bool]] = None,
quoting: HasQuoting,
config: RelationConfig,
**kwargs: Any,
) -> Self:
if quote_policy is None:
quote_policy = {}
quote_policy = kwargs.pop("quote_policy", {})

config_quoting = config.quoting_dict
config_quoting.pop("column", None)

quote_policy = dbt.common.utils.merge(config.quoting, quote_policy)
# precedence: kwargs quoting > config quoting > base quoting > default quoting
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
quoting.quoting,
config_quoting,
quote_policy,
)

return cls.create(
database=node.database,
schema=node.schema,
identifier=node.alias,
database=config.database,
schema=config.schema,
identifier=config.identifier,
quote_policy=quote_policy,
**kwargs,
)

@classmethod
def create_from(
cls: Type[Self],
config: HasQuoting,
node: ResultNode,
**kwargs: Any,
) -> Self:
if node.resource_type == NodeType.Source:
if not isinstance(node, SourceDefinition):
raise DbtInternalError(
"type mismatch, expected SourceDefinition but got {}".format(type(node))
)
return cls.create_from_source(node, **kwargs)
else:
# Can't use ManifestNode here because of parameterized generics
if not isinstance(node, (ParsedNode)):
raise DbtInternalError(
f"type mismatch, expected ManifestNode but got {type(node)}"
)
return cls.create_from_node(config, node, **kwargs)

@classmethod
def create(
cls: Type[Self],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

from dbt.common.dataclass_schema import dbtClassMixin, StrEnum

from dbt.contracts.util import Replaceable
from dbt.common.exceptions import CompilationError
from dbt.exceptions import DataclassNotDictError
from dbt.common.contracts.util import Replaceable
from dbt.common.exceptions import CompilationError, DataclassNotDictError
from dbt.common.utils import deep_merge


Expand All @@ -23,6 +22,14 @@ class RelationType(StrEnum):
Ephemeral = "ephemeral"


class RelationConfig(Protocol):
name: str
database: str
schema: str
identifier: str
quoting_dict: Dict[str, bool]


class ComponentName(StrEnum):
Database = "database"
Schema = "schema"
Expand Down
Loading