Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include adapter response info in execution results #2961

Merged
merged 4 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Hourly, monthly and yearly partitions available in BigQuery ([#2476](https://github.com/fishtown-analytics/dbt/issues/2476), [#2903](https://github.com/fishtown-analytics/dbt/pull/2903))
- Allow BigQuery to default to the environment's default project ([#2828](https://github.com/fishtown-analytics/dbt/pull/2828), [#2908](https://github.com/fishtown-analytics/dbt/pull/2908))
- Rationalize run result status reporting and clean up artifact schema ([#2493](https://github.com/fishtown-analytics/dbt/issues/2493), [#2943](https://github.com/fishtown-analytics/dbt/pull/2943))
- Add adapter specific query execution info to run results and source freshness results artifacts ([#2747](https://github.com/fishtown-analytics/dbt/issues/2747), [#2961](https://github.com/fishtown-analytics/dbt/pull/2961))

### Fixes
- Respect --project-dir in dbt clean command ([#2840](https://github.com/fishtown-analytics/dbt/issues/2840), [#2841](https://github.com/fishtown-analytics/dbt/pull/2841))
Expand Down
9 changes: 5 additions & 4 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from multiprocessing.synchronize import RLock
from threading import get_ident
from typing import (
Dict, Tuple, Hashable, Optional, ContextManager, List
Dict, Tuple, Hashable, Optional, ContextManager, List, Union
)

import agate

import dbt.exceptions
from dbt.contracts.connection import (
Connection, Identifier, ConnectionState, AdapterRequiredConfig, LazyHandle
Connection, Identifier, ConnectionState,
AdapterRequiredConfig, LazyHandle, AdapterResponse
)
from dbt.contracts.graph.manifest import Manifest
from dbt.adapters.base.query_headers import (
Expand Down Expand Up @@ -290,15 +291,15 @@ def _add_query_comment(self, sql: str) -> str:
@abc.abstractmethod
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[str, AdapterResponse], agate.Table]:
"""Execute the given SQL.

:param str sql: The sql to execute.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the status and the results (empty if fetch=False).
:rtype: Tuple[str, agate.Table]
:rtype: Tuple[Union[str, AdapterResponse], agate.Table]
"""
raise dbt.exceptions.NotImplementedException(
'`execute` is not implemented for this adapter!'
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values, executor

from dbt.adapters.base.connections import Connection
from dbt.adapters.base.connections import Connection, AdapterResponse
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
ComponentName, BaseRelation, InformationSchema, SchemaSearchMap
Expand Down Expand Up @@ -213,7 +213,7 @@ def connection_for(
@available.parse(lambda *a, **k: ('', empty_table()))
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[str, AdapterResponse], agate.Table]:
"""Execute the given SQL. This is a thin wrapper around
ConnectionManager.execute.

Expand All @@ -222,7 +222,7 @@ def execute(
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the status and the results (empty if fetch=False).
:rtype: Tuple[str, agate.Table]
:rtype: Tuple[Union[str, AdapterResponse], agate.Table]
"""
return self.connections.execute(
sql=sql,
Expand Down
6 changes: 4 additions & 2 deletions core/dbt/adapters/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import agate

from dbt.contracts.connection import Connection, AdapterRequiredConfig
from dbt.contracts.connection import (
Connection, AdapterRequiredConfig, AdapterResponse
)
from dbt.contracts.graph.compiled import (
CompiledNode, ManifestNode, NonSourceCompiledNode
)
Expand Down Expand Up @@ -154,7 +156,7 @@ def commit_if_has_connection(self) -> None:

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[str, AdapterResponse], agate.Table]:
...

def get_compiler(self) -> Compiler_T:
Expand Down
21 changes: 11 additions & 10 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import abc
import time
from typing import List, Optional, Tuple, Any, Iterable, Dict
from typing import List, Optional, Tuple, Any, Iterable, Dict, Union

import agate

import dbt.clients.agate_helper
import dbt.exceptions
from dbt.adapters.base import BaseConnectionManager
from dbt.contracts.connection import Connection, ConnectionState
from dbt.contracts.connection import (
Connection, ConnectionState, AdapterResponse
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt import flags

Expand All @@ -18,7 +20,7 @@ class SQLConnectionManager(BaseConnectionManager):
Methods to implement:
- exception_handler
- cancel
- get_status
- get_response
- open
"""
@abc.abstractmethod
Expand Down Expand Up @@ -76,20 +78,19 @@ def add_query(

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)

logger.debug(
"SQL status: {status} in {elapsed:0.2f} seconds",
status=self.get_status(cursor),
status=self.get_response(cursor),
elapsed=(time.time() - pre)
)

return connection, cursor

@abc.abstractclassmethod
def get_status(cls, cursor: Any) -> str:
def get_response(cls, cursor: Any) -> Union[AdapterResponse, str]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that other adapter plugins may need to account for, this might cause a breaking change.

Example: this is override in dbt-spark, will need to update this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call this out in the release notes. I'd like to reorganize + clean up the changelog before final release, anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! I'll add a breaking section next time I break something :) please should tap me if you want a hand cleaning the changelog up

"""Get the status of the cursor."""
raise dbt.exceptions.NotImplementedException(
'`get_status` is not implemented for this adapter!'
'`get_response` is not implemented for this adapter!'
)

@classmethod
Expand Down Expand Up @@ -118,15 +119,15 @@ def get_result_from_cursor(cls, cursor: Any) -> agate.Table:

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[AdapterResponse, str], agate.Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
status = self.get_status(cursor)
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor)
else:
table = dbt.clients.agate_helper.empty_table()
return status, table
return response, table

def add_begin_query(self):
return self.add_query('BEGIN', auto_begin=False)
Expand Down
24 changes: 22 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .macros import MacroNamespaceBuilder, MacroNamespace
from .manifest import ManifestContext
from dbt.contracts.graph.manifest import Manifest, Disabled
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.compiled import (
CompiledResource,
CompiledSeedNode,
Expand Down Expand Up @@ -83,6 +84,7 @@ class BaseDatabaseWrapper:
Wrapper for runtime database interaction. Applies the runtime quote policy
via a relation proxy.
"""

def __init__(self, adapter, namespace: MacroNamespace):
self._adapter = adapter
self.Relation = RelationProxy(adapter)
Expand Down Expand Up @@ -379,6 +381,7 @@ class ParseDatabaseWrapper(BaseDatabaseWrapper):
"""The parser subclass of the database wrapper applies any explicit
parse-time overrides.
"""

def __getattr__(self, name):
override = (name in self._adapter._available_ and
name in self._adapter._parse_replacements_)
Expand All @@ -399,6 +402,7 @@ class RuntimeDatabaseWrapper(BaseDatabaseWrapper):
"""The runtime database wrapper exposes everything the adapter marks
available.
"""

def __getattr__(self, name):
if name in self._adapter._available_:
return getattr(self._adapter, name)
Expand Down Expand Up @@ -660,18 +664,33 @@ def load_result(self, name: str) -> Optional[AttrDict]:

@contextmember
def store_result(
self, name: str, status: Any, agate_table: Optional[agate.Table] = None
self, name: str,
response: Any,
agate_table: Optional[agate.Table] = None
) -> str:
if agate_table is None:
agate_table = agate_helper.empty_table()

self.sql_results[name] = AttrDict({
'status': status,
'response': response,
'data': agate_helper.as_matrix(agate_table),
'table': agate_table
})
return ''

@contextmember
def store_raw_result(
self,
name: str,
message=Optional[str],
code=Optional[str],
rows_affected=Optional[str],
agate_table: Optional[agate.Table] = None
) -> str:
response = AdapterResponse(
_message=message, code=code, rows_affected=rows_affected)
return self.store_result(name, response, agate_table)

@contextproperty
def validation(self):
def validate_any(*args) -> Callable[[T], None]:
Expand Down Expand Up @@ -1179,6 +1198,7 @@ class MacroContext(ProviderContext):
- 'schema' does not use any 'model' information
- they can't be configured with config() directives
"""

def __init__(
self,
model: ParsedMacro,
Expand Down
11 changes: 11 additions & 0 deletions core/dbt/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
register_pattern(Identifier, r'^[A-Za-z_][A-Za-z0-9_]+$')


@dataclass
class AdapterResponse(JsonSchemaMixin):
_message: str
code: Optional[str] = None
rows_affected: Optional[int] = None

def __str__(self):
return self._message


class ConnectionState(StrEnum):
INIT = 'init'
OPEN = 'open'
Expand Down Expand Up @@ -85,6 +95,7 @@ class LazyHandle:
"""Opener must be a callable that takes a Connection object and opens the
connection, updating the handle on the Connection.
"""

def __init__(self, opener: Callable[[Connection], Connection]):
self.opener = opener

Expand Down
3 changes: 2 additions & 1 deletion core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Union, Any, NewType

PIN_PACKAGE_URL = 'https://docs.getdbt.com/docs/package-management#section-specifying-package-versions' # noqa
PIN_PACKAGE_URL = 'https://docs.getdbt.com/docs/package-management#section-specifying-package-versions' # noqa
DEFAULT_SEND_ANONYMOUS_USAGE_STATS = True


Expand Down Expand Up @@ -142,6 +142,7 @@ class RegistryPackageMetadata(
'sql',
'sql_now',
'store_result',
'store_raw_result',
'target',
'this',
'tojson',
Expand Down
35 changes: 13 additions & 22 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dbt.contracts.util import (
BaseArtifactMetadata,
ArtifactMixin,
Writable,
VersionedSchema,
Replaceable,
schema_version,
Expand Down Expand Up @@ -94,6 +93,7 @@ class BaseResult(JsonSchemaMixin):
thread_id: str
execution_time: float
message: Optional[Union[str, int]]
adapter_response: Dict[str, Any]


@dataclass
Expand All @@ -102,23 +102,9 @@ class NodeResult(BaseResult):


@dataclass
class PartialNodeResult(NodeResult):
# if the result got to the point where it could be skipped/failed, we would
# be returning a real result, not a partial.
@property
def skipped(self):
return False


@dataclass
class RunModelResult(NodeResult):
class RunResult(NodeResult):
agate_table: Optional[agate.Table] = None

def to_dict(self, *args, **kwargs):
dct = super().to_dict(*args, **kwargs)
dct.pop('agate_table', None)
return dct

@property
def skipped(self):
return self.status == RunStatus.Skipped
Expand All @@ -139,9 +125,6 @@ def __getitem__(self, idx):
return self.results[idx]


RunResult = Union[PartialNodeResult, RunModelResult]


@dataclass
class RunResultsMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
Expand All @@ -162,6 +145,7 @@ def process_run_result(result: RunResult) -> RunResultOutput:
thread_id=result.thread_id,
execution_time=result.execution_time,
message=result.message,
adapter_response=result.adapter_response
)


Expand Down Expand Up @@ -247,11 +231,12 @@ def from_success(
success=success,
)


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult


@dataclass
class SourceFreshnessResult(NodeResult, Writable):
class SourceFreshnessResult(NodeResult):
node: ParsedSourceDefinition
status: FreshnessStatus
max_loaded_at: datetime
Expand Down Expand Up @@ -282,12 +267,17 @@ class SourceFreshnessOutput(JsonSchemaMixin):
max_loaded_at_time_ago_in_s: float
status: FreshnessStatus
criteria: FreshnessThreshold
adapter_response: Dict[str, Any]


@dataclass
class PartialSourceFreshnessResult(PartialNodeResult):
class PartialSourceFreshnessResult(NodeResult):
status: FreshnessStatus

@property
def skipped(self):
return False


FreshnessNodeResult = Union[PartialSourceFreshnessResult,
SourceFreshnessResult]
Expand Down Expand Up @@ -326,6 +316,7 @@ def process_freshness_result(
max_loaded_at_time_ago_in_s=result.age,
status=result.status,
criteria=criteria,
adapter_response=result.adapter_response
)


Expand Down
Loading