diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index df0dcb5ee..95a8b776e 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -449,7 +449,7 @@ def open(cls, connection): SessionConnectionWrapper, ) - handle = SessionConnectionWrapper(Connection()) + handle = SessionConnectionWrapper(Connection(creds.server_side_parameters)) else: raise dbt.exceptions.DbtProfileError( f"invalid credential method: {creds.method}" diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index d275c73c5..2d87f19c4 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -24,9 +24,10 @@ class Cursor: https://github.com/mkleehammer/pyodbc/wiki/Cursor """ - def __init__(self) -> None: + def __init__(self, server_side_parameters) -> None: self._df: Optional[DataFrame] = None self._rows: Optional[List[Row]] = None + self.server_side_parameters = server_side_parameters def __enter__(self) -> Cursor: return self @@ -106,7 +107,12 @@ def execute(self, sql: str, *parameters: Any) -> None: """ if len(parameters) > 0: sql = sql % parameters - spark_session = SparkSession.builder.enableHiveSupport().getOrCreate() + builder = SparkSession.builder.enableHiveSupport() + + for k, v in self.server_side_parameters.items(): + builder = builder.config(k, v) + + spark_session = builder.getOrCreate() self._df = spark_session.sql(sql) def fetchall(self) -> Optional[List[Row]]: @@ -159,6 +165,9 @@ class Connection: https://github.com/mkleehammer/pyodbc/wiki/Connection """ + def __init__(self, server_side_parameters) -> None: + self.server_side_parameters = server_side_parameters + def cursor(self) -> Cursor: """ Get a cursor. @@ -168,11 +177,11 @@ def cursor(self) -> Cursor: out : Cursor The cursor. """ - return Cursor() + return Cursor(self.server_side_parameters) class SessionConnectionWrapper(object): - """Connection wrapper for the sessoin connection method.""" + """Connection wrapper for the session connection method.""" def __init__(self, handle): self.handle = handle