-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
#68 Updated the open_pyexasol_connection
- Loading branch information
Showing
1 changed file
with
46 additions
and
45 deletions.
There are no files selected for viewing
91 changes: 46 additions & 45 deletions
91
exasol/python_extension_common/connections/pyexasol_connection.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,67 +1,68 @@ | ||
from typing import Any | ||
import pyexasol # type: ignore | ||
import exasol.saas.client.api_access as saas_api # type: ignore | ||
|
||
from exasol.python_extension_common.cli.std_options import StdParams | ||
from exasol.python_extension_common.deployment.language_container_deployer import get_websocket_sslopt | ||
|
||
|
||
def open_pyexasol_connection( | ||
dsn: str | None = None, | ||
db_user: str | None = None, | ||
db_pass: str | None = None, | ||
saas_url: str | None = None, | ||
saas_account_id: str | None = None, | ||
saas_database_id: str | None = None, | ||
saas_database_name: str | None = None, | ||
saas_token: str | None = None, | ||
schema: str = '', | ||
use_ssl_cert_validation: bool = True, | ||
ssl_trusted_ca: str | None = None, | ||
ssl_client_certificate: str | None = None, | ||
ssl_private_key: str | None = None) -> pyexasol.ExaConnection: | ||
def check_params(std_params: list[StdParams | list[StdParams]], param_kwargs: dict[str, Any]) -> bool: | ||
def check_param(std_param: StdParams | list[StdParams]) -> bool: | ||
if isinstance(std_param, list): | ||
return any(check_param(std_param_i) for std_param_i in std_param) | ||
return (std_param.name in param_kwargs) and bool(param_kwargs[std_param.name]) | ||
|
||
return all(check_param(std_param) for std_param in std_params) | ||
|
||
|
||
def open_pyexasol_connection(**kwargs) -> pyexasol.ExaConnection: | ||
""" | ||
Creates a database connections object, either in an On-Prem or SaaS database, | ||
depending on the provided parameters. | ||
depending on the provided parameters. The provided parameters should correspond | ||
to the cli options defined in the cli/std_options.py. | ||
Raises a ValueError if the provided parameters are sufficient for neither On-Prem | ||
nor SaaS connections. | ||
Parameters: | ||
dsn - On-Prem database host address, including the port | ||
db_user - On-Prem database username | ||
db_pass - On-Prem database user password | ||
saas_url - SaaS service url | ||
saas_account_id - SaaS account id | ||
saas_database_id - SaaS database id | ||
saas_database_name - SaaS database name, to be used in case the id is unknown | ||
saas_token - SaaS Personal Access Token (PAT) | ||
schema - Schema where the scripts should be created | ||
use_ssl_cert_validation - Use SSL server certificate validation | ||
ssl_trusted_ca - Path to a file or directory with an SSL trusted CA bundle | ||
ssl_client_certificate - Path to a file with the SSL client certificate | ||
ssl_private_key - Path to a file with the SSL client private key | ||
""" | ||
|
||
# Fix the compatibility issue | ||
if ('db_pass' in kwargs) and not (StdParams.db_password.name in kwargs): | ||
kwargs[StdParams.db_password.name] = kwargs['db_pass'] | ||
|
||
# Infer where the database is - On-Prem or SaaS. | ||
if all((dsn, db_user, db_pass)): | ||
connection_params = {'dsn': dsn, 'user': db_user, 'password': db_pass} | ||
elif all((saas_url, saas_account_id, saas_token, | ||
any((saas_database_id, saas_database_name)))): | ||
connection_params = saas_api.get_connection_params(host=saas_url, | ||
account_id=saas_account_id, | ||
database_id=saas_database_id, | ||
database_name=saas_database_name, | ||
pat=saas_token) | ||
if check_params([StdParams.dsn, StdParams.db_user, StdParams.db_password], kwargs): | ||
connection_params = { | ||
'dsn': kwargs[StdParams.dsn.name], | ||
'user': kwargs[StdParams.db_user.name], | ||
'password': kwargs[StdParams.db_password.name] | ||
} | ||
elif check_params([StdParams.saas_url, StdParams.saas_account_id, StdParams.saas_token, | ||
[StdParams.saas_database_id, StdParams.saas_database_name]], kwargs): | ||
connection_params = saas_api.get_connection_params( | ||
host=kwargs[StdParams.saas_url.name], | ||
account_id=kwargs[StdParams.saas_account_id.name], | ||
database_id=kwargs.get(StdParams.saas_database_id.name), | ||
database_name=kwargs.get(StdParams.saas_database_name.name), | ||
pat=kwargs[StdParams.saas_token.name] | ||
) | ||
else: | ||
raise ValueError('Incomplete parameter list. ' | ||
'Please either provide the parameters [dsn, db_user, db_pass] ' | ||
'for an On-Prem database or [saas_url, saas_account_id, ' | ||
'saas_database_id or saas_database_name, saas_token] ' | ||
'for a SaaS database.') | ||
|
||
websocket_sslopt = get_websocket_sslopt(use_ssl_cert_validation, ssl_trusted_ca, | ||
ssl_client_certificate, ssl_private_key) | ||
websocket_sslopt = get_websocket_sslopt( | ||
use_ssl_cert_validation=kwargs.get(StdParams.use_ssl_cert_validation.name, True), | ||
ssl_trusted_ca=kwargs.get(StdParams.ssl_cert_path.name, ''), | ||
ssl_client_certificate=kwargs.get(StdParams.ssl_client_cert_path.name, ''), | ||
ssl_private_key=kwargs.get(StdParams.ssl_client_private_key.name, '') | ||
) | ||
|
||
return pyexasol.connect(**connection_params, schema=schema, | ||
encryption=True, | ||
websocket_sslopt=websocket_sslopt, | ||
compression=True) | ||
return pyexasol.connect( | ||
**connection_params, | ||
schema=kwargs.get(StdParams.schema.name, ''), | ||
encryption=True, | ||
websocket_sslopt=websocket_sslopt, | ||
compression=True | ||
) |