Skip to content

Commit

Permalink
chore: update pg8000 iam auth tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwotherspoon committed Sep 4, 2024
1 parent 1c7e285 commit 6cfdf9d
Showing 1 changed file with 87 additions and 43 deletions.
130 changes: 87 additions & 43 deletions tests/system/test_pg8000_iam_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,73 +14,117 @@
limitations under the License.
"""

"""
Copyright 2021 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from datetime import datetime
import os
from typing import Generator
from typing import Tuple

import pg8000
import pytest
import sqlalchemy

from google.cloud.sql.connector import Connector


# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'creator' argument to 'create_engine'
def init_connection_engine(connector: Connector) -> sqlalchemy.engine.Engine:
# initialize Connector object for connections to Cloud SQL
def create_sqlalchemy_engine(
instance_connection_name: str,
user: str,
db: str,
refresh_strategy: str = "background",
) -> Tuple[sqlalchemy.engine.Engine, Connector]:
"""Creates a connection pool for a Cloud SQL instance and returns the pool
and the connector. Callers are responsible for closing the pool and the
connector.
A sample invocation looks like:
engine, connector = create_sqlalchemy_engine(
instance_connection_name,
user,
db,
)
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
# do something with query result
connector.close()
Args:
instance_connection_name (str):
The instance connection name specifies the instance relative to the
project and region. For example: "my-project:my-region:my-instance"
user (str):
The database user name, e.g., postgres
password (str):
The database user's password, e.g., secret-password
db (str):
The name of the database, e.g., mydb
refresh_strategy (Optional[str]):
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
or "background". For serverless environments use "lazy" to avoid
errors resulting from CPU being throttled.
"""
connector = Connector(refresh_strategy=refresh_strategy)

def getconn() -> pg8000.dbapi.Connection:
conn: pg8000.dbapi.Connection = connector.connect(
os.environ["POSTGRES_IAM_CONNECTION_NAME"],
instance_connection_name,
"pg8000",
user=os.environ["POSTGRES_IAM_USER"],
db=os.environ["POSTGRES_DB"],
user=user,
db=db,
ip_type="public", # can also be "private" or "psc"
enable_iam_auth=True,
)
return conn

# create SQLAlchemy connection pool
pool = sqlalchemy.create_engine(
engine = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
execution_options={"isolation_level": "AUTOCOMMIT"},
)
pool.dialect.description_encoding = None
return pool
return engine, connector


@pytest.fixture
def pool() -> Generator:
connector = Connector()
pool = init_connection_engine(connector)

yield pool
def test_pg8000_iam_authn_connection() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_IAM_CONNECTION_NAME"]
user = os.environ["POSTGRES_IAM_USER"]
db = os.environ["POSTGRES_DB"]

engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db)
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
assert type(curr_time) is datetime
connector.close()


@pytest.fixture
def lazy_pool() -> Generator:
connector = Connector(refresh_strategy="lazy")
pool = init_connection_engine(connector)

yield pool
def test_lazy_pg8000_iam_authn_connection() -> None:
"""Basic test to get time from database."""
inst_conn_name = os.environ["POSTGRES_IAM_CONNECTION_NAME"]
user = os.environ["POSTGRES_IAM_USER"]
db = os.environ["POSTGRES_DB"]

engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db, "lazy")
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
assert type(curr_time) is datetime
connector.close()


def test_pooled_connection_with_pg8000_iam_auth(
pool: sqlalchemy.engine.Engine,
) -> None:
with pool.connect() as conn:
result = conn.execute(sqlalchemy.text("SELECT 1;")).fetchone()
assert isinstance(result[0], int)
assert result[0] == 1


def test_lazy_connection_with_pg8000_iam_auth(
lazy_pool: sqlalchemy.engine.Engine,
) -> None:
with lazy_pool.connect() as conn:
result = conn.execute(sqlalchemy.text("SELECT 1;")).fetchone()
assert isinstance(result[0], int)
assert result[0] == 1

0 comments on commit 6cfdf9d

Please sign in to comment.