Skip to content

Commit

Permalink
Merge pull request #2767 from fishtown-analytics/feature/schema-versions
Browse files Browse the repository at this point in the history
Feature/schema versions
  • Loading branch information
beckjake authored Sep 21, 2020
2 parents daff0ba + 764c9b2 commit 4994cc0
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- dbt will compare configurations using the un-rendered form of the config block in dbt_project.yml ([#2713](https://github.com/fishtown-analytics/dbt/issues/2713), [#2735](https://github.com/fishtown-analytics/dbt/pull/2735))
- Added state and defer arguments to the RPC client, matching the CLI ([#2678](https://github.com/fishtown-analytics/dbt/issues/2678), [#2736](https://github.com/fishtown-analytics/dbt/pull/2736))
- Added schema and dbt versions to JSON artifacts ([#2670](https://github.com/fishtown-analytics/dbt/issues/2670), [#2767](https://github.com/fishtown-analytics/dbt/pull/2767))

## dbt 0.18.1 (Release TBD)

Expand Down
6 changes: 4 additions & 2 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from dbt.contracts.files import SourceFile
from dbt.contracts.util import (
Readable, Writable, Replaceable, MacroKey, SourceKey
VersionedSchema, Replaceable, MacroKey, SourceKey, SchemaVersion
)
from dbt.exceptions import (
raise_duplicate_resource_name, raise_compiler_error, warn_or_error,
Expand Down Expand Up @@ -924,7 +924,9 @@ def __reduce_ex__(self, protocol):


@dataclass
class WritableManifest(JsonSchemaMixin, Writable, Readable):
class WritableManifest(VersionedSchema):
dbt_schema_version = SchemaVersion('manifest', 1)

nodes: Mapping[UniqueID, ManifestNode] = field(
metadata=dict(description=(
'The nodes defined in the dbt project and its dependencies'
Expand Down
13 changes: 9 additions & 4 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
Time, FreshnessStatus, FreshnessThreshold
)
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import Writable, Replaceable
from dbt.contracts.util import (
Writable, VersionedSchema, Replaceable, SchemaVersion
)
from dbt.exceptions import InternalException
from dbt.logger import (
TimingProcessor,
Expand Down Expand Up @@ -86,7 +88,8 @@ def to_dict(self, *args, **kwargs):


@dataclass
class ExecutionResult(JsonSchemaMixin, Writable):
class ExecutionResult(VersionedSchema):
dbt_schema_version = SchemaVersion('run-results', 1)
results: List[Union[WritableRunModelResult, PartialResult]]
generated_at: datetime
elapsed_time: float
Expand Down Expand Up @@ -140,7 +143,8 @@ class FreshnessMetadata(JsonSchemaMixin):


@dataclass
class FreshnessExecutionResult(FreshnessMetadata):
class FreshnessExecutionResult(VersionedSchema, FreshnessMetadata):
dbt_schema_version = SchemaVersion('sources', 1)
results: List[Union[PartialResult, SourceFreshnessResult]]

def write(self, path, omit_none=True):
Expand Down Expand Up @@ -293,7 +297,8 @@ def key(self) -> CatalogKey:


@dataclass
class CatalogResults(JsonSchemaMixin, Writable):
class CatalogResults(VersionedSchema):
dbt_schema_version = SchemaVersion('catalog', 1)
nodes: Dict[str, CatalogTable]
sources: Dict[str, CatalogTable]
generated_at: datetime
Expand Down
38 changes: 28 additions & 10 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CatalogResults,
ExecutionResult,
)
from dbt.contracts.util import SchemaVersion, VersionedSchema
from dbt.exceptions import InternalException
from dbt.logger import LogMessage
from dbt.utils import restrict_to
Expand Down Expand Up @@ -96,7 +97,7 @@ class RPCCliParameters(RPCParameters):


@dataclass
class RPCNoParameters(RPCParameters):
class RPCDepsParameters(RPCParameters):
pass


Expand Down Expand Up @@ -170,22 +171,23 @@ class GetManifestParameters(RPCParameters):


@dataclass
class RemoteResult(JsonSchemaMixin):
class RemoteResult(VersionedSchema):
logs: List[LogMessage]


@dataclass
class RemoteEmptyResult(RemoteResult):
pass
class RemoteDepsResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-deps-result', 1)


@dataclass
class RemoteCatalogResults(CatalogResults, RemoteResult):
pass
dbt_schema_version = SchemaVersion('remote-catalog-result', 1)


@dataclass
class RemoteCompileResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-compile-result', 1)
raw_sql: str
compiled_sql: str
node: CompileResultNode
Expand All @@ -198,7 +200,7 @@ def error(self):

@dataclass
class RemoteExecutionResult(ExecutionResult, RemoteResult):
pass
dbt_schema_version = SchemaVersion('remote-execution-result', 1)


@dataclass
Expand All @@ -209,19 +211,21 @@ class ResultTable(JsonSchemaMixin):

@dataclass
class RemoteRunOperationResult(ExecutionResult, RemoteResult):
dbt_schema_version = SchemaVersion('remote-run-operation-result', 1)
success: bool


@dataclass
class RemoteRunResult(RemoteCompileResult):
dbt_schema_version = SchemaVersion('remote-run-result', 1)
table: ResultTable


RPCResult = Union[
RemoteCompileResult,
RemoteExecutionResult,
RemoteCatalogResults,
RemoteEmptyResult,
RemoteDepsResult,
RemoteRunOperationResult,
]

Expand All @@ -237,6 +241,7 @@ class GCResultState(StrEnum):

@dataclass
class GCResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-gc-result', 1)
logs: List[LogMessage] = field(default_factory=list)
deleted: List[TaskID] = field(default_factory=list)
missing: List[TaskID] = field(default_factory=list)
Expand Down Expand Up @@ -330,6 +335,7 @@ class TaskRow(TaskTiming):

@dataclass
class PSResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-ps-result', 1)
rows: List[TaskRow]


Expand All @@ -342,12 +348,14 @@ class KillResultStatus(StrEnum):

@dataclass
class KillResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-kill-result', 1)
state: KillResultStatus = KillResultStatus.Missing
logs: List[LogMessage] = field(default_factory=list)


@dataclass
class GetManifestResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-manifest-result', 1)
manifest: Optional[WritableManifest]


Expand All @@ -374,7 +382,8 @@ class PollResult(RemoteResult, TaskTiming):


@dataclass
class PollRemoteEmptyCompleteResult(PollResult, RemoteEmptyResult):
class PollRemoteEmptyCompleteResult(PollResult, RemoteDepsResult):
dbt_schema_version = SchemaVersion('poll-remote-deps-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand All @@ -383,7 +392,7 @@ class PollRemoteEmptyCompleteResult(PollResult, RemoteEmptyResult):
@classmethod
def from_result(
cls: Type['PollRemoteEmptyCompleteResult'],
base: RemoteEmptyResult,
base: RemoteDepsResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
Expand All @@ -400,13 +409,16 @@ def from_result(

@dataclass
class PollKilledResult(PollResult):
dbt_schema_version = SchemaVersion('poll-remote-killed-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Killed),
)


@dataclass
class PollExecuteCompleteResult(RemoteExecutionResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-execution-result', 1)

state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -435,6 +447,7 @@ def from_result(

@dataclass
class PollCompileCompleteResult(RemoteCompileResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-compile-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -464,6 +477,7 @@ def from_result(

@dataclass
class PollRunCompleteResult(RemoteRunResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-run-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -494,6 +508,7 @@ def from_result(

@dataclass
class PollRunOperationCompleteResult(RemoteRunOperationResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-run-operation-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -523,6 +538,7 @@ def from_result(

@dataclass
class PollCatalogCompleteResult(RemoteCatalogResults, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-catalog-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -553,11 +569,12 @@ def from_result(

@dataclass
class PollInProgressResult(PollResult):
pass
dbt_schema_version = SchemaVersion('poll-in-progress-result', 1)


@dataclass
class PollGetManifestResult(GetManifestResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-get-manifest-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -593,6 +610,7 @@ class ManifestStatus(StrEnum):

@dataclass
class LastParse(RemoteResult):
dbt_schema_version = SchemaVersion('status-result', 1)
state: ManifestStatus = ManifestStatus.Init
logs: List[LogMessage] = field(default_factory=list)
error: Optional[Dict[str, Any]] = None
Expand Down
7 changes: 6 additions & 1 deletion core/dbt/contracts/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from .graph.manifest import WritableManifest
from typing import Optional
from dbt.exceptions import IncompatibleSchemaException


class PreviousState:
Expand All @@ -10,4 +11,8 @@ def __init__(self, path: Path):

manifest_path = self.path / 'manifest.json'
if manifest_path.exists() and manifest_path.is_file():
self.manifest = WritableManifest.read(str(manifest_path))
try:
self.manifest = WritableManifest.read(str(manifest_path))
except IncompatibleSchemaException as exc:
exc.add_filename(str(manifest_path))
raise
75 changes: 72 additions & 3 deletions core/dbt/contracts/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import dataclasses
from typing import List, Tuple
from typing import List, Tuple, ClassVar, Type, TypeVar, Dict, Any

from dbt.clients.system import write_json, read_json
from dbt.exceptions import RuntimeException

from dbt.exceptions import RuntimeException, IncompatibleSchemaException
from dbt.version import __version__
from hologram import JsonSchemaMixin

MacroKey = Tuple[str, str]
SourceKey = Tuple[str, str]
Expand Down Expand Up @@ -94,3 +95,71 @@ def read(cls, path: str):
) from exc

return cls.from_dict(data) # type: ignore


T = TypeVar('T', bound='VersionedSchema')


BASE_SCHEMAS_URL = 'https://schemas.getdbt.com/dbt/{name}/v{version}.json'


@dataclasses.dataclass
class SchemaVersion:
name: str
version: int

def __str__(self) -> str:
return BASE_SCHEMAS_URL.format(
name=self.name,
version=self.version,
)


DBT_VERSION_KEY = 'dbt_version'
SCHEMA_VERSION_KEY = 'dbt_schema_version'


@dataclasses.dataclass
class VersionedSchema(JsonSchemaMixin, Readable, Writable):
dbt_schema_version: ClassVar[SchemaVersion]

def to_dict(
self, omit_none: bool = True, validate: bool = False
) -> Dict[str, Any]:
dct = super().to_dict(omit_none=omit_none, validate=validate)
dct[SCHEMA_VERSION_KEY] = str(self.dbt_schema_version)
dct[DBT_VERSION_KEY] = __version__
return dct

@classmethod
def from_dict(
cls: Type[T], data: Dict[str, Any], validate: bool = True
) -> T:
if validate:
expected = str(cls.dbt_schema_version)
found = data.get(SCHEMA_VERSION_KEY)
if found != expected:
raise IncompatibleSchemaException(expected, found)

return super().from_dict(data=data, validate=validate)

@classmethod
def _collect_json_schema(
cls, definitions: Dict[str, Any]
) -> Dict[str, Any]:
result = super()._collect_json_schema(definitions)
result['properties'][SCHEMA_VERSION_KEY] = {
'const': str(cls.dbt_schema_version)
}
result['properties'][DBT_VERSION_KEY] = {'type': 'string'}
result['required'].extend([SCHEMA_VERSION_KEY, DBT_VERSION_KEY])
return result

@classmethod
def json_schema(cls, embeddable: bool = False) -> Dict[str, Any]:
result = super().json_schema(embeddable=embeddable)
# it would be nice to do this in hologram!
# in the schema itself, include the version url as $id
if not embeddable:
result['$id'] = str(cls.dbt_schema_version)
return result
Loading

0 comments on commit 4994cc0

Please sign in to comment.