Skip to content

Commit

Permalink
update naming
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Wigley committed Dec 21, 2020
1 parent dddf1bc commit 122bd07
Show file tree
Hide file tree
Showing 18 changed files with 118 additions and 109 deletions.
6 changes: 3 additions & 3 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import dbt.exceptions
from dbt.contracts.connection import (
Connection, Identifier, ConnectionState,
AdapterRequiredConfig, LazyHandle, ExecutionStatus
AdapterRequiredConfig, LazyHandle, AdapterResponse
)
from dbt.contracts.graph.manifest import Manifest
from dbt.adapters.base.query_headers import (
Expand Down Expand Up @@ -291,15 +291,15 @@ def _add_query_comment(self, sql: str) -> str:
@abc.abstractmethod
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
) -> Tuple[Union[str, AdapterResponse], agate.Table]:
"""Execute the given SQL.
:param str sql: The sql to execute.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the status and the results (empty if fetch=False).
:rtype: Tuple[Union[str, ExecutionStatus], agate.Table]
:rtype: Tuple[Union[str, AdapterResponse], agate.Table]
"""
raise dbt.exceptions.NotImplementedException(
'`execute` is not implemented for this adapter!'
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values, executor

from dbt.adapters.base.connections import Connection, ExecutionStatus
from dbt.adapters.base.connections import Connection, AdapterResponse
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
ComponentName, BaseRelation, InformationSchema, SchemaSearchMap
Expand Down Expand Up @@ -213,7 +213,7 @@ def connection_for(
@available.parse(lambda *a, **k: ('', empty_table()))
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
) -> Tuple[Union[str, AdapterResponse], agate.Table]:
"""Execute the given SQL. This is a thin wrapper around
ConnectionManager.execute.
Expand All @@ -222,7 +222,7 @@ def execute(
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the status and the results (empty if fetch=False).
:rtype: Tuple[Union[str, ExecutionStatus], agate.Table]
:rtype: Tuple[Union[str, AdapterResponse], agate.Table]
"""
return self.connections.execute(
sql=sql,
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/adapters/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import agate

from dbt.contracts.connection import (
Connection, AdapterRequiredConfig, ExecutionStatus
Connection, AdapterRequiredConfig, AdapterResponse
)
from dbt.contracts.graph.compiled import (
CompiledNode, ManifestNode, NonSourceCompiledNode
Expand Down Expand Up @@ -156,7 +156,7 @@ def commit_if_has_connection(self) -> None:

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
) -> Tuple[Union[str, AdapterResponse], agate.Table]:
...

def get_compiler(self) -> Compiler_T:
Expand Down
16 changes: 8 additions & 8 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import dbt.exceptions
from dbt.adapters.base import BaseConnectionManager
from dbt.contracts.connection import (
Connection, ConnectionState, ExecutionStatus
Connection, ConnectionState, AdapterResponse
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt import flags
Expand All @@ -20,7 +20,7 @@ class SQLConnectionManager(BaseConnectionManager):
Methods to implement:
- exception_handler
- cancel
- get_status
- get_response
- open
"""
@abc.abstractmethod
Expand Down Expand Up @@ -80,17 +80,17 @@ def add_query(
cursor.execute(sql, bindings)
logger.debug(
"SQL status: {status} in {elapsed:0.2f} seconds",
status=self.get_status(cursor),
status=self.get_response(cursor),
elapsed=(time.time() - pre)
)

return connection, cursor

@abc.abstractclassmethod
def get_status(cls, cursor: Any) -> Union[str, ExecutionStatus]:
def get_response(cls, cursor: Any) -> Union[AdapterResponse, str]:
"""Get the status of the cursor."""
raise dbt.exceptions.NotImplementedException(
'`get_status` is not implemented for this adapter!'
'`get_response` is not implemented for this adapter!'
)

@classmethod
Expand Down Expand Up @@ -119,15 +119,15 @@ def get_result_from_cursor(cls, cursor: Any) -> agate.Table:

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
) -> Tuple[Union[AdapterResponse, str], agate.Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
status = self.get_status(cursor)
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor)
else:
table = dbt.clients.agate_helper.empty_table()
return status, table
return response, table

def add_begin_query(self):
return self.add_query('BEGIN', auto_begin=False)
Expand Down
17 changes: 9 additions & 8 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dbt import deprecations
from dbt.adapters.base.column import Column
from dbt.adapters.factory import get_adapter, get_adapter_package_names
from dbt.adapters.factory import Adapter, get_adapter, get_adapter_package_names
from dbt.clients import agate_helper
from dbt.clients.jinja import get_rendered, MacroGenerator
from dbt.config import RuntimeConfig, Project
Expand All @@ -18,7 +18,7 @@
from .macros import MacroNamespaceBuilder, MacroNamespace
from .manifest import ManifestContext
from dbt.contracts.graph.manifest import Manifest, Disabled
from dbt.contracts.connection import ExecutionStatus
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.compiled import (
CompiledResource,
CompiledSeedNode,
Expand Down Expand Up @@ -664,13 +664,13 @@ def load_result(self, name: str) -> Optional[AttrDict]:

@contextmember
def store_result(
self, name: str, status: Any, agate_table: Optional[agate.Table] = None
self, name: str, response: Any, agate_table: Optional[agate.Table] = None
) -> str:
if agate_table is None:
agate_table = agate_helper.empty_table()

self.sql_results[name] = AttrDict({
'status': status,
'response': response,
'data': agate_helper.as_matrix(agate_table),
'table': agate_table
})
Expand All @@ -681,12 +681,13 @@ def store_raw_result(
self,
name: str,
message=Optional[str],
state=Optional[str],
rows=Optional[str],
code=Optional[str],
rows_affected=Optional[str],
agate_table: Optional[agate.Table] = None
) -> str:
status = ExecutionStatus(message=message, state=state, rows=rows)
return self.store_result(name, status, agate_table)
response = AdapterResponse(
message=message, code=code, rows_affected=rows_affected)
return self.store_result(name, response, agate_table)

@contextproperty
def validation(self):
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@


@dataclass
class ExecutionStatus(JsonSchemaMixin):
class AdapterResponse(JsonSchemaMixin):
message: str
state: Optional[str] = None
rows: Optional[str] = None
code: Optional[str] = None
rows_affected: Optional[int] = None

def __str__(self):
return self.message
Expand Down
7 changes: 3 additions & 4 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from dbt.utils import lowercase
from hologram.helpers import StrEnum
from hologram import JsonDict, JsonSchemaMixin
from hologram import JsonSchemaMixin

import agate

Expand Down Expand Up @@ -94,6 +94,7 @@ class BaseResult(JsonSchemaMixin):
thread_id: str
execution_time: float
message: Optional[Union[str, int]]
adapter_response: Dict[str, Any]


@dataclass
Expand All @@ -104,7 +105,6 @@ class NodeResult(BaseResult):
@dataclass
class RunResult(NodeResult):
agate_table: Optional[agate.Table] = None
adapter_query_status: JsonDict = field(default_factory=dict)

@property
def skipped(self):
Expand Down Expand Up @@ -136,7 +136,6 @@ class RunResultsMetadata(BaseArtifactMetadata):
@dataclass
class RunResultOutput(BaseResult):
unique_id: str
adapter_query_status: JsonDict = field(default_factory=dict)


def process_run_result(result: RunResult) -> RunResultOutput:
Expand All @@ -147,7 +146,7 @@ def process_run_result(result: RunResult) -> RunResultOutput:
thread_id=result.thread_id,
execution_time=result.execution_time,
message=result.message,
adapter_query_status=result.adapter_query_status
adapter_response=result.adapter_response
)


Expand Down
8 changes: 4 additions & 4 deletions core/dbt/include/global_project/macros/core.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
{{ write(sql) }}
{%- endif -%}

{%- set status, res = adapter.execute(sql, auto_begin=auto_begin, fetch=fetch_result) -%}
{%- set res, table = adapter.execute(sql, auto_begin=auto_begin, fetch=fetch_result) -%}
{%- if name is not none -%}
{{ store_result(name, status=status, agate_table=res) }}
{{ store_result(name, response=res, agate_table=table) }}
{%- endif -%}

{%- endif -%}
{%- endmacro %}

{% macro noop_statement(name=None, message=None, state=None, rows=None, res=None) -%}
{% macro noop_statement(name=None, message=None, code=None, rows_affected=None, res=None) -%}
{%- set sql = caller() -%}

{%- if name == 'main' -%}
Expand All @@ -24,7 +24,7 @@
{%- endif -%}

{%- if name is not none -%}
{{ store_raw_result(name, message=message, state=state, rows=rows, agate_table=res) }}
{{ store_raw_result(name, message=message, code=code, rows_affected=rows_affected, agate_table=res) }}
{%- endif -%}

{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

{%- set agate_table = load_agate_table() -%}
{%- do store_result('agate_table', status='OK', agate_table=agate_table) -%}
{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

Expand All @@ -129,11 +129,11 @@
{% set create_table_sql = create_csv_table(model, agate_table) %}
{% endif %}

{% set state = 'CREATE' if full_refresh_mode else 'INSERT' %}
{% set num_rows = (agate_table.rows | length) %}
{% set code = 'CREATE' if full_refresh_mode else 'INSERT' %}
{% set rows_affected = (agate_table.rows | length) %}
{% set sql = load_csv_rows(model, agate_table) %}

{% call noop_statement('main', state ~ ' ' ~ num_rows, state, num_rows) %}
{% call noop_statement('main', code ~ ' ' ~ rows_affected, code, rows_affected) %}
{{ create_table_sql }};
-- dbt seed --
{{ sql }}
Expand Down
11 changes: 6 additions & 5 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ def run_with_hooks(self, manifest):
return result

def _build_run_result(self, node, start_time, status, timing_info, message,
agate_table=None, adapter_query_status=None):
agate_table=None, adapter_response=None):
execution_time = time.time() - start_time
thread_id = threading.current_thread().name
if adapter_query_status is None:
adapter_query_status = {}
if adapter_response is None:
adapter_response = {}
return RunResult(
status=status,
thread_id=thread_id,
Expand All @@ -227,7 +227,7 @@ def _build_run_result(self, node, start_time, status, timing_info, message,
message=message,
node=node,
agate_table=agate_table,
adapter_query_status=adapter_query_status
adapter_response=adapter_response
)

def error_result(self, node, message, start_time, timing_info):
Expand Down Expand Up @@ -256,7 +256,7 @@ def from_run_result(self, result, start_time, timing_info):
timing_info=timing_info,
message=result.message,
agate_table=result.agate_table,
adapter_query_status=result.adapter_query_status
adapter_response=result.adapter_response
)

def skip_result(self, node, message):
Expand All @@ -268,6 +268,7 @@ def skip_result(self, node, message):
timing=[],
message=message,
node=node,
adapter_response={}
)

def compile_and_execute(self, manifest, ctx):
Expand Down
12 changes: 6 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,17 @@ def after_execute(self, result):

def _build_run_model_result(self, model, context):
result = context['load_result']('main')
adapter_query_status = {}
if isinstance(result.status, JsonSchemaMixin):
adapter_query_status = result.status.to_dict()
adapter_response = {}
if isinstance(result.response, JsonSchemaMixin):
adapter_response = result.response.to_dict()
return RunResult(
node=model,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message=str(result.status),
adapter_query_status=adapter_query_status
message=str(result.response),
adapter_response=adapter_response
)

def _materialization_relations(
Expand Down Expand Up @@ -336,7 +336,7 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):

with finishctx, DbtModelState({'node_status': 'passed'}):
print_hook_end_line(
hook_text, status, idx, num_hooks, timer.elapsed
hook_text, str(status), idx, num_hooks, timer.elapsed
)

self._total_executed += len(ordered_hooks)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def execute(self, test: CompiledTestNode, manifest: Manifest):
thread_id=thread_id,
execution_time=0,
message=failed_rows,
adapter_response={}
)

def after_execute(self, result):
Expand Down
Loading

0 comments on commit 122bd07

Please sign in to comment.