Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Databricks CLI installer PoC #19

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ include = src/*.py
omit =
*/site-packages/*
tests/*
src/install.py
src/config.py

[report]
exclude_lines =
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,7 @@ deployment-merged.yaml
.vscode/

# ignore integration test onboarding file.
integration-tests/conf/dlt-meta/onboarding.json
integration-tests/conf/dlt-meta/onboarding.json

.databricks
.databricks-login.json
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
clean:
rm -fr build .databricks dlt_meta.egg-info

dev:
python3 -m venv .databricks
.databricks/bin/python -m pip install -e .
31 changes: 31 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
name: dlt-meta
description: Metadata-driven framework based on Databricks Delta Live Tables
install:
script: src/install.py
entrypoint: src/__main__.py
min_python: 3.10
commands:
- name: onboard
description: Opens remote configuration in the browser
flags:
- name: onboarding_file_path
description: onboarding file path
- name: onboard_layer
description: onboard layer
- name: database
description: database
- name: env
description: env
- name: bronze_dataflowspec_table
description: bronze dataflow spec table
- name: bronze_dataflowspec_path
description: bronze dataflow spec path
- name: silver_dataflowspec_table
description: silver dataflow spec table
- name: silver_dataflowspec_path
description: silver dataflow spec path
- name: import_author
description: import author
- name: version
description: overwrite
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
with open("README.md", "r") as fh:
long_description = fh.read()

INSTALL_REQUIRES = ["setuptools"]
INSTALL_REQUIRES = ["setuptools", "databricks-sdk"]

DEV_REQUIREMENTS = [
"flake8==6.0",
Expand Down
1 change: 1 addition & 0 deletions src/__about__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = '0.0.4'
2 changes: 1 addition & 1 deletion src/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Init."""
__version__ = "0.0.1"
__version__ = "0.0.4"
__author__ = "ravi@databricks.com"
161 changes: 161 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from abc import abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Generic, Optional, TypeVar
from databricks.sdk.core import Config
from .__about__ import __version__
from databricks.sdk import WorkspaceClient


@dataclass
class ConnectConfig:
# Keep all the fields in sync with databricks.sdk.core.Config
host: Optional[str] = None
account_id: Optional[str] = None
token: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
azure_client_id: Optional[str] = None
azure_tenant_id: Optional[str] = None
azure_client_secret: Optional[str] = None
azure_environment: Optional[str] = None
cluster_id: Optional[str] = None
profile: Optional[str] = None
debug_headers: bool = False
rate_limit: int = None
max_connections_per_pool: int = None
max_connection_pools: int = None

@staticmethod
def from_databricks_config(cfg: Config) -> "ConnectConfig":
return ConnectConfig(
host=cfg.host,
token=cfg.token,
client_id=cfg.client_id,
client_secret=cfg.client_secret,
azure_client_id=cfg.azure_client_id,
azure_tenant_id=cfg.azure_tenant_id,
azure_client_secret=cfg.azure_client_secret,
azure_environment=cfg.azure_environment,
cluster_id=cfg.cluster_id,
profile=cfg.profile,
debug_headers=cfg.debug_headers,
rate_limit=cfg.rate_limit,
max_connection_pools=cfg.max_connection_pools,
max_connections_per_pool=cfg.max_connections_per_pool,
)

def to_databricks_config(self):
return Config(
host=self.host,
account_id=self.account_id,
token=self.token,
client_id=self.client_id,
client_secret=self.client_secret,
azure_client_id=self.azure_client_id,
azure_tenant_id=self.azure_tenant_id,
azure_client_secret=self.azure_client_secret,
azure_environment=self.azure_environment,
cluster_id=self.cluster_id,
profile=self.profile,
debug_headers=self.debug_headers,
rate_limit=self.rate_limit,
max_connection_pools=self.max_connection_pools,
max_connections_per_pool=self.max_connections_per_pool,
product="dlt-meta",
product_version=__version__,
)

@classmethod
def from_dict(cls, raw: dict):
return cls(**raw)


# Used to set the right expectation about configuration file schema
_CONFIG_VERSION = 1

T = TypeVar("T")


class _Config(Generic[T]):
connect: Optional[ConnectConfig] = None

@classmethod
@abstractmethod
def from_dict(cls, raw: dict) -> T:
...

@classmethod
def from_bytes(cls, raw: str) -> T:
import json
raw = json.loads(raw)
return cls.from_dict({} if not raw else raw)

@classmethod
def from_file(cls, config_file: Path) -> T:
return cls.from_bytes(config_file.read_text())

@classmethod
def _verify_version(cls, raw):
stored_version = raw.pop("version", None)
if stored_version != _CONFIG_VERSION:
msg = (
f"Unsupported config version: {stored_version}. "
f"v{__version__} expects config version to be {_CONFIG_VERSION}"
)
raise ValueError(msg)

def __post_init__(self):
if self.connect is None:
self.connect = ConnectConfig()

def to_databricks_config(self) -> Config:
connect = self.connect
if connect is None:
# default empty config
connect = ConnectConfig()
return connect.to_databricks_config()

def as_dict(self) -> dict[str, Any]:
from dataclasses import fields, is_dataclass

def inner(x):
if is_dataclass(x):
result = []
for f in fields(x):
value = inner(getattr(x, f.name))
if not value:
continue
result.append((f.name, value))
return dict(result)
return x

serialized = inner(self)
serialized["version"] = _CONFIG_VERSION
return serialized


@dataclass
class WorkspaceConfig(_Config["WorkspaceConfig"]):
dbr_version: str
dbfs_path: str
source: str
eventhub_name: str = None
eventhub_producer_accesskey_name: str = None
eventhub_consumer_accesskey_name: str = None
eventhub_producer_accesskey_secret_name: str = None
eventhub_consumer_accesskey_secret_name: str = None
eventhub_secrets_scope_name: str = None
eventhub_namespace: str = None
eventhub_port: str = None
kafka_topic_name: str = None
kafka_broker: str = None

@classmethod
def from_dict(cls, raw: dict):
cls._verify_version(raw)
connect = ConnectConfig.from_dict(raw.pop("connect", {}))
return cls(connect=connect, **raw)

def to_workspace_client(self) -> WorkspaceClient:
return WorkspaceClient(config=self.to_databricks_config())
Loading
Loading