Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: invalidate cache on bad connection info and failed IP lookup #389

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
19 changes: 17 additions & 2 deletions google/cloud/alloydb/connector/async_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,14 @@ async def connect(
# if ip_type is str, convert to IPTypes enum
if isinstance(ip_type, str):
ip_type = IPTypes(ip_type.upper())
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
try:
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
except Exception:
# with an error from AlloyDB Admin API call or IP type, invalidate
rhatgadkar-goog marked this conversation as resolved.
Show resolved Hide resolved
# the cache and re-raise the error
await self._remove_cached(instance_uri)
raise
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")

# callable to be used for auto IAM authn
Expand All @@ -202,6 +208,15 @@ def get_authentication_token() -> str:
await cache.force_refresh()
raise

async def _remove_cached(self, instance_uri: str) -> None:
"""Stops all background refreshes and deletes the connection
info cache from the map of caches.
"""
logger.debug(f"['{instance_uri}']: Removing connection info from cache")
# remove cache from stored caches and close it
cache = self._cache.pop(instance_uri)
await cache.close()

async def __aenter__(self) -> Any:
"""Enter async context manager by returning Connector object"""
return self
Expand Down
19 changes: 17 additions & 2 deletions google/cloud/alloydb/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,14 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
# if ip_type is str, convert to IPTypes enum
if isinstance(ip_type, str):
ip_type = IPTypes(ip_type.upper())
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
try:
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
except Exception:
# with an error from AlloyDB Admin API call or IP type, invalidate
rhatgadkar-goog marked this conversation as resolved.
Show resolved Hide resolved
# the cache and re-raise the error
await self._remove_cached(instance_uri)
raise
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")

# synchronous drivers are blocking and run using executor
Expand Down Expand Up @@ -334,6 +340,15 @@ def metadata_exchange(

return sock

async def _remove_cached(self, instance_uri: str) -> None:
"""Stops all background refreshes and deletes the connection
info cache from the map of caches.
"""
logger.debug(f"['{instance_uri}']: Removing connection info from cache")
# remove cache from stored caches and close it
cache = self._cache.pop(instance_uri)
await cache.close()

def __enter__(self) -> "Connector":
"""Enter context manager by returning Connector object"""
return self
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import struct
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple

from aiohttp import ClientResponseError
from aiohttp import RequestInfo
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
Expand Down Expand Up @@ -206,7 +208,18 @@ def __init__(
self._user_agent = f"test-user-agent+{driver}"
self._credentials = FakeCredentials()

i = FakeInstance()
# The instances that currently exist and the client can send API requests to.
self.existing_instances = [
f"projects/{i.project}/locations/{i.region}/clusters/{i.cluster}/instances/{i.name}"
]
rhatgadkar-goog marked this conversation as resolved.
Show resolved Hide resolved

async def _get_metadata(self, *args: Any, **kwargs: Any) -> str:
instance_uri = f"projects/{self.instance.project}/locations/{self.instance.region}/clusters/{self.instance.cluster}/instances/{self.instance.name}"
if instance_uri not in self.existing_instances:
raise ClientResponseError(
RequestInfo(url=instance_uri, method="GET", headers=None), 404
)
rhatgadkar-goog marked this conversation as resolved.
Show resolved Hide resolved
return self.instance.ip_addrs

async def _get_client_certificate(
Expand All @@ -216,6 +229,11 @@ async def _get_client_certificate(
cluster: str,
pub_key: str,
) -> Tuple[str, List[str]]:
instance_uri = f"projects/{self.instance.project}/locations/{self.instance.region}/clusters/{self.instance.cluster}/instances/{self.instance.name}"
if instance_uri not in self.existing_instances:
raise ClientResponseError(
RequestInfo(url=instance_uri, method="POST", headers=None), 404
)
root_cert, intermediate_cert, server_cert = self.instance.get_pem_certs()
# encode public key to bytes
pub_key_bytes: rsa.RSAPublicKey = serialization.load_pem_public_key(
Expand Down
45 changes: 45 additions & 0 deletions tests/unit/test_async_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
import asyncio
from typing import Union

from aiohttp import ClientResponseError
from mock import patch
from mocks import FakeAlloyDBClient
from mocks import FakeConnectionInfo
from mocks import FakeCredentials
from mocks import FakeInstance
import pytest

from google.cloud.alloydb.connector import AsyncConnector
from google.cloud.alloydb.connector import IPTypes
from google.cloud.alloydb.connector.exceptions import IPTypeNotFoundError
from google.cloud.alloydb.connector.instance import RefreshAheadCache

ALLOYDB_API_ENDPOINT = "https://alloydb.googleapis.com"

Expand Down Expand Up @@ -294,3 +298,44 @@ async def test_async_connect_bad_ip_type(
exc_info.value.args[0]
== f"Incorrect value for ip_type, got '{bad_ip_type}'. Want one of: 'PUBLIC', 'PRIVATE', 'PSC'."
)


async def test_Connector_remove_cached_bad_instance(
credentials: FakeCredentials,
) -> None:
"""When a Connector attempts to retrieve connection info for a
non-existent instance, it should delete the instance from
the cache and ensure no background refresh happens (which would be
wasted cycles).
"""
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/bad-test-instance"
async with AsyncConnector(credentials=credentials) as connector:
connector._client = FakeAlloyDBClient(
instance=FakeInstance(name="bad-test-instance")
)
cache = RefreshAheadCache(instance_uri, connector._client, connector._keys)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not just use the fake_client fixture for these tests?

def fake_client(fake_instance: FakeInstance) -> FakeAlloyDBClient:
return FakeAlloyDBClient(fake_instance)

The connector should just use the cache's client, i don't think we need to override the connector._client

Suggested change
connector._client = FakeAlloyDBClient(
instance=FakeInstance(name="bad-test-instance")
)
cache = RefreshAheadCache(instance_uri, connector._client, connector._keys)
cache = RefreshAheadCache(instance_uri, fake_client, connector._keys)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I'm observing: if I change a member variable of a fixture object, it affects other test cases that are run in the session.

For example, if test_Connector_remove_cached_bad_instance is run right before test_connect in a test session, and if I do something like fake_client.instance.name = "bad-test-instace" in test_Connector_remove_cached_bad_instance, this will also be propagated to test_connect.

So that's why I'm not using the fixture here. Because I was seeing similar behavior when running nox -s unit-3.11.

Copy link
Collaborator

@jackwotherspoon jackwotherspoon Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhatgadkar-goog you do not need to change any of the fake_client.instance attributes... since you do not need the bad-instance to be a FakeInstance it should give a 404 without needing it...

  instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/bad-test-instance"
  async with AsyncConnector(credentials=credentials) as connector:
      cache = RefreshAheadCache(instance_uri, fake_client, connector._keys)
      connector._cache[instance_uri] = cache
      with pytest.raises(ClientResponseError):
          await connector.connect(instance_uri, "asyncpg")
      assert instance_uri not in connector._cache

Copy link
Collaborator Author

@rhatgadkar-goog rhatgadkar-goog Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I did this for test_Connector_remove_cached_bad_instance. I think I still need to initialize a FakeAlloyDBClient for test_Connector_remove_cached_no_ip_type to override the instance's IP address and to not send an actual API request.

connector._cache[instance_uri] = cache
with pytest.raises(ClientResponseError):
await connector.connect(instance_uri, "asyncpg")
assert instance_uri not in connector._cache


async def test_Connector_remove_cached_no_ip_type(credentials: FakeCredentials) -> None:
"""When a Connector attempts to connect and preferred IP type is not present,
it should delete the instance from the cache and ensure no background refresh
happens (which would be wasted cycles).
"""
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/test-instance"
# set instance to only have Public IP
fake_client = FakeAlloyDBClient()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here and below spots, can we not use the fake_client fixture?

fake_client.instance.ip_addrs = {"PUBLIC": "127.0.0.1"}
async with AsyncConnector(credentials=credentials) as connector:
connector._client = fake_client
# populate cache
cache = RefreshAheadCache(instance_uri, fake_client, connector._keys)
connector._cache[instance_uri] = cache
# test instance does not have Private IP, thus should invalidate cache
with pytest.raises(IPTypeNotFoundError):
await connector.connect(instance_uri, "asyncpg", ip_type="private")
# check that cache has been removed from dict
assert instance_uri not in connector._cache
51 changes: 51 additions & 0 deletions tests/unit/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
from threading import Thread
from typing import Union

from aiohttp import ClientResponseError
from mock import patch
from mocks import FakeAlloyDBClient
from mocks import FakeCredentials
from mocks import FakeInstance
import pytest

from google.cloud.alloydb.connector import Connector
from google.cloud.alloydb.connector import IPTypes
from google.cloud.alloydb.connector.exceptions import IPTypeNotFoundError
from google.cloud.alloydb.connector.instance import RefreshAheadCache
from google.cloud.alloydb.connector.utils import generate_keys


def test_Connector_init(credentials: FakeCredentials) -> None:
Expand Down Expand Up @@ -203,3 +208,49 @@ def test_Connector_close_called_multiple_times(credentials: FakeCredentials) ->
assert connector._thread.is_alive() is False
# call connector.close a second time
connector.close()


async def test_Connector_remove_cached_bad_instance(
credentials: FakeCredentials,
) -> None:
"""When a Connector attempts to retrieve connection info for a
non-existent instance, it should delete the instance from
the cache and ensure no background refresh happens (which would be
wasted cycles).
"""
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/bad-test-instance"
with Connector(credentials) as connector:
connector._client = FakeAlloyDBClient(
instance=FakeInstance(name="bad-test-instance")
)
cache = RefreshAheadCache(instance_uri, connector._client, connector._keys)
connector._cache[instance_uri] = cache
with pytest.raises(ClientResponseError):
await connector.connect_async(instance_uri, "pg8000")
assert instance_uri not in connector._cache


async def test_Connector_remove_cached_no_ip_type(
credentials: FakeCredentials, fake_client: FakeAlloyDBClient
) -> None:
"""When a Connector attempts to connect and preferred IP type is not present,
it should delete the instance from the cache and ensure no background refresh
happens (which would be wasted cycles).
"""
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/test-instance"
# set instance to only have Public IP
fake_client = FakeAlloyDBClient()
fake_client.instance.ip_addrs = {"PUBLIC": "127.0.0.1"}
with Connector(credentials=credentials) as connector:
connector._client = fake_client
connector._keys = asyncio.wrap_future(
asyncio.run_coroutine_threadsafe(generate_keys(), asyncio.get_event_loop()),
loop=asyncio.get_event_loop(),
)
cache = RefreshAheadCache(instance_uri, fake_client, connector._keys)
connector._cache[instance_uri] = cache
# test instance does not have Private IP, thus should invalidate cache
with pytest.raises(IPTypeNotFoundError):
await connector.connect_async(instance_uri, "pg8000", ip_type="private")
# check that cache has been removed from dict
assert instance_uri not in connector._cache