Skip to content

Commit

Permalink
Merge branch 'main' into dparent1/iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
dparent1 committed Nov 7, 2022
2 parents 93d6b97 + b759267 commit 8d3984f
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 18 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220926-123609.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Migrate dbt-utils current_timestamp macros into core + adapters
time: 2022-09-26T12:36:09.319981-07:00
custom:
Author: colin-rogers-dbt
Issue: "483"
PR: "480"
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20220926-112857.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Password doesn't pass to server using LDAP connection via thrift (#310)
time: 2022-09-26T11:28:57.306285-04:00
custom:
Author: VShkaberda
Issue: "310"
PR: "396"
10 changes: 5 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
persist-credentials: false

- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: '3.8'

Expand Down Expand Up @@ -89,7 +89,7 @@ jobs:
uses: actions/checkout@v2

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: ${{ matrix.python-version }}

Expand Down Expand Up @@ -127,9 +127,9 @@ jobs:
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: 3.8
python-version: '3.8'

- name: Install python dependencies
run: |
Expand Down Expand Up @@ -179,7 +179,7 @@ jobs:

steps:
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4.3.0
with:
python-version: ${{ matrix.python-version }}

Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class SparkCredentials(Credentials):
endpoint: Optional[str] = None
token: Optional[str] = None
user: Optional[str] = None
password: Optional[str] = None
port: int = 443
auth: Optional[str] = None
kerberos_service_name: Optional[str] = None
Expand Down Expand Up @@ -375,6 +376,7 @@ def open(cls, connection):
username=creds.user,
auth=creds.auth,
kerberos_service_name=creds.kerberos_service_name,
password=creds.password,
)
conn = hive.connect(thrift_transport=transport)
else:
Expand All @@ -384,6 +386,7 @@ def open(cls, connection):
username=creds.user,
auth=creds.auth,
kerberos_service_name=creds.kerberos_service_name,
password=creds.password,
) # noqa
handle = PyhiveConnectionWrapper(conn)
elif creds.method == SparkConnectionMethod.ODBC:
Expand Down
16 changes: 13 additions & 3 deletions dbt/adapters/spark/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, parsed_model: Dict, credentials: SparkCredentials) -> None:

@property
def cluster_id(self) -> str:
return self.parsed_model.get("cluster_id", self.credentials.cluster_id)
return self.parsed_model["config"].get("cluster_id", self.credentials.cluster_id)

def get_timeout(self) -> int:
timeout = self.parsed_model["config"].get("timeout", DEFAULT_TIMEOUT)
Expand Down Expand Up @@ -82,7 +82,17 @@ def _submit_job(self, path: str, cluster_spec: dict) -> str:
"notebook_path": path,
},
}
job_spec.update(cluster_spec)
job_spec.update(cluster_spec) # updates 'new_cluster' config
# PYPI packages
packages = self.parsed_model["config"].get("packages", [])
# additional format of packages
additional_libs = self.parsed_model["config"].get("additional_libs", [])
libraries = []
for package in packages:
libraries.append({"pypi": {"package": package}})
for lib in additional_libs:
libraries.append(lib)
job_spec.update({"libraries": libraries}) # type: ignore
submit_response = requests.post(
f"https://{self.credentials.host}/api/2.1/jobs/runs/submit",
headers=self.auth_header,
Expand All @@ -96,7 +106,7 @@ def _submit_job(self, path: str, cluster_spec: dict) -> str:

def _submit_through_notebook(self, compiled_code: str, cluster_spec: dict) -> None:
# it is safe to call mkdirs even if dir already exists and have content inside
work_dir = f"/dbt_python_model/{self.schema}/"
work_dir = f"/Shared/dbt_python_model/{self.schema}/"
self._create_work_dir(work_dir)
# add notebook
whole_file_path = f"{work_dir}{self.identifier}"
Expand Down
4 changes: 0 additions & 4 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@
{{ return(load_result('list_schemas').table) }}
{% endmacro %}

{% macro spark__current_timestamp() -%}
current_timestamp()
{%- endmacro %}

{% macro spark__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
{% if not from_relation.type %}
Expand Down
10 changes: 6 additions & 4 deletions dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
dbt = dbtObj(spark.table)
df = model(dbt, spark)

# make sure pyspark exists in the namepace, for 7.3.x-scala2.12 it does not exist
import pyspark
# make sure pandas exists before using it
try:
import pandas
Expand All @@ -56,9 +58,9 @@ except ImportError:
# make sure pyspark.pandas exists before using it
try:
import pyspark.pandas
pyspark_available = True
pyspark_pandas_api_available = True
except ImportError:
pyspark_available = False
pyspark_pandas_api_available = False

# make sure databricks.koalas exists before using it
try:
Expand All @@ -70,15 +72,15 @@ except ImportError:
# preferentially convert pandas DataFrames to pandas-on-Spark or Koalas DataFrames first
# since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)`
# and converting from pandas-on-Spark to Spark DataFrame has no overhead
if pyspark_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
if pyspark_pandas_api_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
df = pyspark.pandas.frame.DataFrame(df)
elif koalas_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
df = databricks.koalas.frame.DataFrame(df)

# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pyspark.sql.dataframe.DataFrame):
pass # since it is already a Spark DataFrame
elif pyspark_available and isinstance(df, pyspark.pandas.frame.DataFrame):
elif pyspark_pandas_api_available and isinstance(df, pyspark.pandas.frame.DataFrame):
df = df.to_spark()
elif koalas_available and isinstance(df, databricks.koalas.frame.DataFrame):
df = df.to_spark()
Expand Down
3 changes: 3 additions & 0 deletions dbt/include/spark/macros/utils/timestamps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro spark__current_timestamp() -%}
current_timestamp()
{%- endmacro %}
16 changes: 16 additions & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,26 @@ def project_config_update(self):

models__simple_python_model = """
import pandas
import torch
import spacy
def model(dbt, spark):
dbt.config(
materialized='table',
submission_method='job_cluster',
job_cluster_config={
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*, 4]"
},
"custom_tags": {
"ResourceClass": "SingleNode"
}
},
packages=['spacy', 'torch']
)
data = [[1,2]] * 10
return spark.createDataFrame(data, schema=['test', 'test2'])
Expand Down
18 changes: 18 additions & 0 deletions tests/functional/adapter/utils/test_timestamps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pytest
from dbt.tests.adapter.utils.test_timestamps import BaseCurrentTimestamps


class TestCurrentTimestampSpark(BaseCurrentTimestamps):
@pytest.fixture(scope="class")
def models(self):
return {"get_current_timestamp.sql": "select {{ current_timestamp() }} as current_timestamp"}

@pytest.fixture(scope="class")
def expected_schema(self):
return {
"current_timestamp": "timestamp"
}

@pytest.fixture(scope="class")
def expected_sql(self):
return """select current_timestamp() as current_timestamp"""
6 changes: 4 additions & 2 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,13 @@ def test_thrift_connection(self):
config = self._get_target_thrift(self.project_cfg)
adapter = SparkAdapter(config)

def hive_thrift_connect(host, port, username, auth, kerberos_service_name):
def hive_thrift_connect(host, port, username, auth, kerberos_service_name, password):
self.assertEqual(host, 'myorg.sparkhost.com')
self.assertEqual(port, 10001)
self.assertEqual(username, 'dbt')
self.assertIsNone(auth)
self.assertIsNone(kerberos_service_name)
self.assertIsNone(password)

with mock.patch.object(hive, 'connect', new=hive_thrift_connect):
connection = adapter.acquire_connection('dummy')
Expand Down Expand Up @@ -193,12 +194,13 @@ def test_thrift_connection_kerberos(self):
config = self._get_target_thrift_kerberos(self.project_cfg)
adapter = SparkAdapter(config)

def hive_thrift_connect(host, port, username, auth, kerberos_service_name):
def hive_thrift_connect(host, port, username, auth, kerberos_service_name, password):
self.assertEqual(host, 'myorg.sparkhost.com')
self.assertEqual(port, 10001)
self.assertEqual(username, 'dbt')
self.assertEqual(auth, 'KERBEROS')
self.assertEqual(kerberos_service_name, 'hive')
self.assertIsNone(password)

with mock.patch.object(hive, 'connect', new=hive_thrift_connect):
connection = adapter.acquire_connection('dummy')
Expand Down

0 comments on commit 8d3984f

Please sign in to comment.