Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/schema versions #2767

Merged
merged 4 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- dbt will compare configurations using the un-rendered form of the config block in dbt_project.yml ([#2713](https://github.com/fishtown-analytics/dbt/issues/2713), [#2735](https://github.com/fishtown-analytics/dbt/pull/2735))
- Added state and defer arguments to the RPC client, matching the CLI ([#2678](https://github.com/fishtown-analytics/dbt/issues/2678), [#2736](https://github.com/fishtown-analytics/dbt/pull/2736))
- Added schema and dbt versions to JSON artifacts ([#2670](https://github.com/fishtown-analytics/dbt/issues/2670), [#2767](https://github.com/fishtown-analytics/dbt/pull/2767))

## dbt 0.18.1 (Release TBD)

Expand Down
6 changes: 4 additions & 2 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from dbt.contracts.files import SourceFile
from dbt.contracts.util import (
Readable, Writable, Replaceable, MacroKey, SourceKey
VersionedSchema, Replaceable, MacroKey, SourceKey, SchemaVersion
)
from dbt.exceptions import (
raise_duplicate_resource_name, raise_compiler_error, warn_or_error,
Expand Down Expand Up @@ -924,7 +924,9 @@ def __reduce_ex__(self, protocol):


@dataclass
class WritableManifest(JsonSchemaMixin, Writable, Readable):
class WritableManifest(VersionedSchema):
dbt_schema_version = SchemaVersion('manifest', 1)

nodes: Mapping[UniqueID, ManifestNode] = field(
metadata=dict(description=(
'The nodes defined in the dbt project and its dependencies'
Expand Down
13 changes: 9 additions & 4 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
Time, FreshnessStatus, FreshnessThreshold
)
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import Writable, Replaceable
from dbt.contracts.util import (
Writable, VersionedSchema, Replaceable, SchemaVersion
)
from dbt.exceptions import InternalException
from dbt.logger import (
TimingProcessor,
Expand Down Expand Up @@ -86,7 +88,8 @@ def to_dict(self, *args, **kwargs):


@dataclass
class ExecutionResult(JsonSchemaMixin, Writable):
class ExecutionResult(VersionedSchema):
dbt_schema_version = SchemaVersion('run-results', 1)
results: List[Union[WritableRunModelResult, PartialResult]]
generated_at: datetime
elapsed_time: float
Expand Down Expand Up @@ -140,7 +143,8 @@ class FreshnessMetadata(JsonSchemaMixin):


@dataclass
class FreshnessExecutionResult(FreshnessMetadata):
class FreshnessExecutionResult(VersionedSchema, FreshnessMetadata):
dbt_schema_version = SchemaVersion('sources', 1)
results: List[Union[PartialResult, SourceFreshnessResult]]

def write(self, path, omit_none=True):
Expand Down Expand Up @@ -293,7 +297,8 @@ def key(self) -> CatalogKey:


@dataclass
class CatalogResults(JsonSchemaMixin, Writable):
class CatalogResults(VersionedSchema):
dbt_schema_version = SchemaVersion('catalog', 1)
nodes: Dict[str, CatalogTable]
sources: Dict[str, CatalogTable]
generated_at: datetime
Expand Down
38 changes: 28 additions & 10 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CatalogResults,
ExecutionResult,
)
from dbt.contracts.util import SchemaVersion, VersionedSchema
from dbt.exceptions import InternalException
from dbt.logger import LogMessage
from dbt.utils import restrict_to
Expand Down Expand Up @@ -96,7 +97,7 @@ class RPCCliParameters(RPCParameters):


@dataclass
class RPCNoParameters(RPCParameters):
class RPCDepsParameters(RPCParameters):
pass


Expand Down Expand Up @@ -170,22 +171,23 @@ class GetManifestParameters(RPCParameters):


@dataclass
class RemoteResult(JsonSchemaMixin):
class RemoteResult(VersionedSchema):
logs: List[LogMessage]


@dataclass
class RemoteEmptyResult(RemoteResult):
pass
class RemoteDepsResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-deps-result', 1)


@dataclass
class RemoteCatalogResults(CatalogResults, RemoteResult):
pass
dbt_schema_version = SchemaVersion('remote-catalog-result', 1)


@dataclass
class RemoteCompileResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-compile-result', 1)
raw_sql: str
compiled_sql: str
node: CompileResultNode
Expand All @@ -198,7 +200,7 @@ def error(self):

@dataclass
class RemoteExecutionResult(ExecutionResult, RemoteResult):
pass
dbt_schema_version = SchemaVersion('remote-execution-result', 1)


@dataclass
Expand All @@ -209,19 +211,21 @@ class ResultTable(JsonSchemaMixin):

@dataclass
class RemoteRunOperationResult(ExecutionResult, RemoteResult):
dbt_schema_version = SchemaVersion('remote-run-operation-result', 1)
success: bool


@dataclass
class RemoteRunResult(RemoteCompileResult):
dbt_schema_version = SchemaVersion('remote-run-result', 1)
table: ResultTable


RPCResult = Union[
RemoteCompileResult,
RemoteExecutionResult,
RemoteCatalogResults,
RemoteEmptyResult,
RemoteDepsResult,
RemoteRunOperationResult,
]

Expand All @@ -237,6 +241,7 @@ class GCResultState(StrEnum):

@dataclass
class GCResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-gc-result', 1)
logs: List[LogMessage] = field(default_factory=list)
deleted: List[TaskID] = field(default_factory=list)
missing: List[TaskID] = field(default_factory=list)
Expand Down Expand Up @@ -330,6 +335,7 @@ class TaskRow(TaskTiming):

@dataclass
class PSResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-ps-result', 1)
rows: List[TaskRow]


Expand All @@ -342,12 +348,14 @@ class KillResultStatus(StrEnum):

@dataclass
class KillResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-kill-result', 1)
state: KillResultStatus = KillResultStatus.Missing
logs: List[LogMessage] = field(default_factory=list)


@dataclass
class GetManifestResult(RemoteResult):
dbt_schema_version = SchemaVersion('remote-manifest-result', 1)
manifest: Optional[WritableManifest]


Expand All @@ -374,7 +382,8 @@ class PollResult(RemoteResult, TaskTiming):


@dataclass
class PollRemoteEmptyCompleteResult(PollResult, RemoteEmptyResult):
class PollRemoteEmptyCompleteResult(PollResult, RemoteDepsResult):
dbt_schema_version = SchemaVersion('poll-remote-deps-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand All @@ -383,7 +392,7 @@ class PollRemoteEmptyCompleteResult(PollResult, RemoteEmptyResult):
@classmethod
def from_result(
cls: Type['PollRemoteEmptyCompleteResult'],
base: RemoteEmptyResult,
base: RemoteDepsResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
Expand All @@ -400,13 +409,16 @@ def from_result(

@dataclass
class PollKilledResult(PollResult):
dbt_schema_version = SchemaVersion('poll-remote-killed-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Killed),
)


@dataclass
class PollExecuteCompleteResult(RemoteExecutionResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-execution-result', 1)

state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -435,6 +447,7 @@ def from_result(

@dataclass
class PollCompileCompleteResult(RemoteCompileResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-compile-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -464,6 +477,7 @@ def from_result(

@dataclass
class PollRunCompleteResult(RemoteRunResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-run-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -494,6 +508,7 @@ def from_result(

@dataclass
class PollRunOperationCompleteResult(RemoteRunOperationResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-run-operation-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -523,6 +538,7 @@ def from_result(

@dataclass
class PollCatalogCompleteResult(RemoteCatalogResults, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-catalog-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -553,11 +569,12 @@ def from_result(

@dataclass
class PollInProgressResult(PollResult):
pass
dbt_schema_version = SchemaVersion('poll-in-progress-result', 1)


@dataclass
class PollGetManifestResult(GetManifestResult, PollResult):
dbt_schema_version = SchemaVersion('poll-remote-get-manifest-result', 1)
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
Expand Down Expand Up @@ -593,6 +610,7 @@ class ManifestStatus(StrEnum):

@dataclass
class LastParse(RemoteResult):
dbt_schema_version = SchemaVersion('status-result', 1)
state: ManifestStatus = ManifestStatus.Init
logs: List[LogMessage] = field(default_factory=list)
error: Optional[Dict[str, Any]] = None
Expand Down
7 changes: 6 additions & 1 deletion core/dbt/contracts/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from .graph.manifest import WritableManifest
from typing import Optional
from dbt.exceptions import IncompatibleSchemaException


class PreviousState:
Expand All @@ -10,4 +11,8 @@ def __init__(self, path: Path):

manifest_path = self.path / 'manifest.json'
if manifest_path.exists() and manifest_path.is_file():
self.manifest = WritableManifest.read(str(manifest_path))
try:
self.manifest = WritableManifest.read(str(manifest_path))
except IncompatibleSchemaException as exc:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

exc.add_filename(str(manifest_path))
raise
75 changes: 72 additions & 3 deletions core/dbt/contracts/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import dataclasses
from typing import List, Tuple
from typing import List, Tuple, ClassVar, Type, TypeVar, Dict, Any

from dbt.clients.system import write_json, read_json
from dbt.exceptions import RuntimeException

from dbt.exceptions import RuntimeException, IncompatibleSchemaException
from dbt.version import __version__
from hologram import JsonSchemaMixin

MacroKey = Tuple[str, str]
SourceKey = Tuple[str, str]
Expand Down Expand Up @@ -94,3 +95,71 @@ def read(cls, path: str):
) from exc

return cls.from_dict(data) # type: ignore


T = TypeVar('T', bound='VersionedSchema')


BASE_SCHEMAS_URL = 'https://schemas.getdbt.com/dbt/{name}/v{version}.json'


@dataclasses.dataclass
class SchemaVersion:
name: str
version: int

def __str__(self) -> str:
return BASE_SCHEMAS_URL.format(
name=self.name,
version=self.version,
)


DBT_VERSION_KEY = 'dbt_version'
SCHEMA_VERSION_KEY = 'dbt_schema_version'


@dataclasses.dataclass
class VersionedSchema(JsonSchemaMixin, Readable, Writable):
dbt_schema_version: ClassVar[SchemaVersion]

def to_dict(
self, omit_none: bool = True, validate: bool = False
) -> Dict[str, Any]:
dct = super().to_dict(omit_none=omit_none, validate=validate)
dct[SCHEMA_VERSION_KEY] = str(self.dbt_schema_version)
dct[DBT_VERSION_KEY] = __version__
return dct

@classmethod
def from_dict(
cls: Type[T], data: Dict[str, Any], validate: bool = True
) -> T:
if validate:
expected = str(cls.dbt_schema_version)
found = data.get(SCHEMA_VERSION_KEY)
if found != expected:
raise IncompatibleSchemaException(expected, found)

return super().from_dict(data=data, validate=validate)

@classmethod
def _collect_json_schema(
cls, definitions: Dict[str, Any]
) -> Dict[str, Any]:
result = super()._collect_json_schema(definitions)
result['properties'][SCHEMA_VERSION_KEY] = {
'const': str(cls.dbt_schema_version)
}
result['properties'][DBT_VERSION_KEY] = {'type': 'string'}
result['required'].extend([SCHEMA_VERSION_KEY, DBT_VERSION_KEY])
return result

@classmethod
def json_schema(cls, embeddable: bool = False) -> Dict[str, Any]:
result = super().json_schema(embeddable=embeddable)
# it would be nice to do this in hologram!
# in the schema itself, include the version url as $id
if not embeddable:
result['$id'] = str(cls.dbt_schema_version)
return result
Loading