From 7ef8084b8ca8c278733ec1599957f21af342f459 Mon Sep 17 00:00:00 2001 From: Anthony LaRocca Date: Wed, 1 Mar 2023 10:23:06 -0500 Subject: [PATCH 1/9] Pass existing server_side_parameters to session connection wrapper and use to configure SparkSession. --- dbt/adapters/spark/session.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index 5e4bcc492..41d2d52bc 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -4,7 +4,7 @@ import datetime as dt from types import TracebackType -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union from dbt.events import AdapterLogger from dbt.utils import DECIMALS @@ -84,7 +84,7 @@ def close(self) -> None: self._df = None self._rows = None - def execute(self, sql: str, *parameters: Any) -> None: + def execute(self, sql: str, server_side_parameters, *parameters: Any) -> None: """ Execute a sql statement. @@ -106,7 +106,12 @@ def execute(self, sql: str, *parameters: Any) -> None: """ if len(parameters) > 0: sql = sql % parameters - spark_session = SparkSession.builder.enableHiveSupport().getOrCreate() + builder = SparkSession.builder.enableHiveSupport() + + for k, v in server_side_parameters.items(): + builder = builder.config(k, v) + + spark_session = builder.getOrCreate() self._df = spark_session.sql(sql) def fetchall(self) -> Optional[List[Row]]: @@ -177,8 +182,9 @@ class SessionConnectionWrapper(object): handle: Connection _cursor: Optional[Cursor] - def __init__(self, handle: Connection) -> None: + def __init__(self, handle: Connection, server_side_parameters: Dict[str, Any]) -> None: self.handle = handle + self.server_side_parameters = server_side_parameters self._cursor = None def cursor(self) -> "SessionConnectionWrapper": @@ -205,10 +211,10 @@ def execute(self, sql: str, bindings: Optional[List[Any]] = None) -> None: assert self._cursor, "Cursor not available" if bindings is None: - self._cursor.execute(sql) + self._cursor.execute(sql, self.server_side_parameters) else: bindings = [self._fix_binding(binding) for binding in bindings] - self._cursor.execute(sql, *bindings) + self._cursor.execute(sql, self.server_side_parameters, *bindings) @property def description(self) -> List[Tuple[str, str, None, None, None, None, bool]]: From 6b5f46023166e167a9e86b1e46d15190cf3f24b4 Mon Sep 17 00:00:00 2001 From: Anthony LaRocca Date: Tue, 16 May 2023 18:02:47 -0400 Subject: [PATCH 2/9] Incorporating feedback. Moved server side parameters to Connection and pass to cursor from there. --- dbt/adapters/spark/connections.py | 2 +- dbt/adapters/spark/session.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 2a7f8188d..68cb45d73 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -460,7 +460,7 @@ def open(cls, connection: Connection) -> Connection: SessionConnectionWrapper, ) - handle = SessionConnectionWrapper(Connection()) # type: ignore + handle = SessionConnectionWrapper(Connection(creds.server_side_parameters)) else: raise dbt.exceptions.DbtProfileError( f"invalid credential method: {creds.method}" diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index 41d2d52bc..c04df2f73 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -24,9 +24,10 @@ class Cursor: https://github.com/mkleehammer/pyodbc/wiki/Cursor """ - def __init__(self) -> None: + def __init__(self, server_side_parameters) -> None: self._df: Optional[DataFrame] = None self._rows: Optional[List[Row]] = None + self.server_side_parameters = server_side_parameters def __enter__(self) -> Cursor: return self @@ -84,7 +85,7 @@ def close(self) -> None: self._df = None self._rows = None - def execute(self, sql: str, server_side_parameters, *parameters: Any) -> None: + def execute(self, sql: str, *parameters: Any) -> None: """ Execute a sql statement. @@ -108,7 +109,7 @@ def execute(self, sql: str, server_side_parameters, *parameters: Any) -> None: sql = sql % parameters builder = SparkSession.builder.enableHiveSupport() - for k, v in server_side_parameters.items(): + for k, v in self.server_side_parameters.items(): builder = builder.config(k, v) spark_session = builder.getOrCreate() @@ -164,6 +165,9 @@ class Connection: https://github.com/mkleehammer/pyodbc/wiki/Connection """ + def __init__(self, server_side_parameters) -> None: + self.server_side_parameters = server_side_parameters + def cursor(self) -> Cursor: """ Get a cursor. @@ -173,7 +177,7 @@ def cursor(self) -> Cursor: out : Cursor The cursor. """ - return Cursor() + return Cursor(self.server_side_parameters) class SessionConnectionWrapper(object): @@ -184,7 +188,6 @@ class SessionConnectionWrapper(object): def __init__(self, handle: Connection, server_side_parameters: Dict[str, Any]) -> None: self.handle = handle - self.server_side_parameters = server_side_parameters self._cursor = None def cursor(self) -> "SessionConnectionWrapper": @@ -211,10 +214,10 @@ def execute(self, sql: str, bindings: Optional[List[Any]] = None) -> None: assert self._cursor, "Cursor not available" if bindings is None: - self._cursor.execute(sql, self.server_side_parameters) + self._cursor.execute(sql) else: bindings = [self._fix_binding(binding) for binding in bindings] - self._cursor.execute(sql, self.server_side_parameters, *bindings) + self._cursor.execute(sql, *bindings) @property def description(self) -> List[Tuple[str, str, None, None, None, None, bool]]: From ecec5c722d141bb2d91b77257c03b0546f932e0d Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 7 Jul 2023 10:42:01 +0200 Subject: [PATCH 3/9] Add changie --- .changes/unreleased/Features-20230707-104150.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20230707-104150.yaml diff --git a/.changes/unreleased/Features-20230707-104150.yaml b/.changes/unreleased/Features-20230707-104150.yaml new file mode 100644 index 000000000..183a37b45 --- /dev/null +++ b/.changes/unreleased/Features-20230707-104150.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Support server_side_parameters for Spark session connection method +time: 2023-07-07T10:41:50.01541+02:00 +custom: + Author: alarocca-apixio + Issue: "690" From 871696a7a20920cadde64c7c6da0007cecebe3cc Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 7 Jul 2023 10:53:32 +0200 Subject: [PATCH 4/9] Add type hint --- dbt/adapters/spark/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index c04df2f73..accd96537 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -24,7 +24,7 @@ class Cursor: https://github.com/mkleehammer/pyodbc/wiki/Cursor """ - def __init__(self, server_side_parameters) -> None: + def __init__(self, server_side_parameters: Dict[str, Any]) -> None: self._df: Optional[DataFrame] = None self._rows: Optional[List[Row]] = None self.server_side_parameters = server_side_parameters From 7f95e3b64032855afc40b4ff65063351604d4997 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 7 Jul 2023 10:54:07 +0200 Subject: [PATCH 5/9] Write out loop --- dbt/adapters/spark/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index accd96537..3caf595ff 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -109,8 +109,8 @@ def execute(self, sql: str, *parameters: Any) -> None: sql = sql % parameters builder = SparkSession.builder.enableHiveSupport() - for k, v in self.server_side_parameters.items(): - builder = builder.config(k, v) + for parameter, value in self.server_side_parameters.items(): + builder = builder.config(parameter, value) spark_session = builder.getOrCreate() self._df = spark_session.sql(sql) From 942bf468de48b979d721d0cf6345682dd86bab64 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 7 Jul 2023 10:54:39 +0200 Subject: [PATCH 6/9] Add type hint --- dbt/adapters/spark/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index 3caf595ff..49e22c772 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -165,7 +165,7 @@ class Connection: https://github.com/mkleehammer/pyodbc/wiki/Connection """ - def __init__(self, server_side_parameters) -> None: + def __init__(self, server_side_parameters: Dict[Any, str]) -> None: self.server_side_parameters = server_side_parameters def cursor(self) -> Cursor: From 948e733d025ac4f36db006d0e5d5686849b4ae60 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 7 Jul 2023 11:20:36 +0200 Subject: [PATCH 7/9] Remove server_side_parameters from connection wrapper --- dbt/adapters/spark/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index 49e22c772..b7d0ade10 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -186,7 +186,7 @@ class SessionConnectionWrapper(object): handle: Connection _cursor: Optional[Cursor] - def __init__(self, handle: Connection, server_side_parameters: Dict[str, Any]) -> None: + def __init__(self, handle: Connection) -> None: self.handle = handle self._cursor = None From 0c3a17225e9cf729bd67d9c392b877fa132d8811 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 7 Jul 2023 11:53:02 +0200 Subject: [PATCH 8/9] Add handle type hint --- dbt/adapters/spark/connections.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 68cb45d73..01e37b7c3 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -350,6 +350,7 @@ def open(cls, connection: Connection) -> Connection: creds = connection.credentials exc = None + handle: Any for i in range(1 + creds.connect_retries): try: From 0d0c543607e04b6d44fd5e90b879a92ff88ecc71 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 18 Jul 2023 18:12:16 +0200 Subject: [PATCH 9/9] Make server_side_parameters optional --- dbt/adapters/spark/connections.py | 4 +++- dbt/adapters/spark/session.py | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 01e37b7c3..5d3e99a64 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -461,7 +461,9 @@ def open(cls, connection: Connection) -> Connection: SessionConnectionWrapper, ) - handle = SessionConnectionWrapper(Connection(creds.server_side_parameters)) + handle = SessionConnectionWrapper( + Connection(server_side_parameters=creds.server_side_parameters) + ) else: raise dbt.exceptions.DbtProfileError( f"invalid credential method: {creds.method}" diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index b7d0ade10..0e3717172 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -24,10 +24,10 @@ class Cursor: https://github.com/mkleehammer/pyodbc/wiki/Cursor """ - def __init__(self, server_side_parameters: Dict[str, Any]) -> None: + def __init__(self, *, server_side_parameters: Optional[Dict[str, Any]] = None) -> None: self._df: Optional[DataFrame] = None self._rows: Optional[List[Row]] = None - self.server_side_parameters = server_side_parameters + self.server_side_parameters = server_side_parameters or {} def __enter__(self) -> Cursor: return self @@ -165,8 +165,8 @@ class Connection: https://github.com/mkleehammer/pyodbc/wiki/Connection """ - def __init__(self, server_side_parameters: Dict[Any, str]) -> None: - self.server_side_parameters = server_side_parameters + def __init__(self, *, server_side_parameters: Optional[Dict[Any, str]] = None) -> None: + self.server_side_parameters = server_side_parameters or {} def cursor(self) -> Cursor: """ @@ -177,7 +177,7 @@ def cursor(self) -> Cursor: out : Cursor The cursor. """ - return Cursor(self.server_side_parameters) + return Cursor(server_side_parameters=self.server_side_parameters) class SessionConnectionWrapper(object):