Skip to content

Commit

Permalink
Add support for Galaxy v3 API (ansible-community#45)
Browse files Browse the repository at this point in the history
* Add support for Galaxy v3 API.

* Add integration tests for the Galaxy API.

The Galaxy v3 API versions list test is disabled since it is so incredibly
slow. Just listing the first page takes 30 seconds (!). The total listing
takes > 100 seconds.

* Reorganize imports.

* Improve docstrings; validate context.server == galaxy_server.

* Fix check, update docstring.

* According to ansible-galaxy CLI code, the result value can sometimes be called 'results' also for v3.
  • Loading branch information
felixfontein authored Apr 14, 2023
1 parent d910be2 commit af8f28d
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 28 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/45-galaxy-v3.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- "Allow Galaxy client to communicate with the Galaxy v3 API (https://github.com/ansible-community/antsibull-core/pull/45)."
192 changes: 164 additions & 28 deletions src/antsibull_core/galaxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from __future__ import annotations

import asyncio
import os.path
import shutil
import typing as t
from enum import Enum
from urllib.parse import urljoin

import aiofiles
Expand Down Expand Up @@ -49,47 +51,168 @@ class DownloadResults(t.NamedTuple):
download_path: str


class GalaxyVersion(Enum):
V2 = 2
V3 = 3


class GalaxyContext:
server: str
version: GalaxyVersion
base_url: str

def __init__(self, server: str, version: GalaxyVersion, base_url: str) -> None:
self.server = server
self.version = version
self.base_url = base_url

@classmethod
async def create(cls, aio_session: aiohttp.client.ClientSession, galaxy_server: str
) -> GalaxyContext:
api_url = urljoin(galaxy_server, 'api/')
async with retry_get(aio_session, api_url,
headers={'Accept': 'application/json'}) as response:
galaxy_info = await response.json()
available_versions: t.Mapping[str, str] = galaxy_info.get('available_versions') or {}
if 'v3' in available_versions:
version = GalaxyVersion.V3
base_url = urljoin(galaxy_server, 'api/' + available_versions['v3'])
elif 'v2' in available_versions:
version = GalaxyVersion.V2
base_url = urljoin(galaxy_server, 'api/' + available_versions['v2'])
else:
raise RuntimeError(
f'Information retrieved from {api_url} seems to indicate'
' neither Galaxy v2 API nor Galaxy v3 API'
)
return cls(galaxy_server, version, base_url)


_GALAXY_CONTEXT_CACHE: dict[str, t.Union[GalaxyContext, asyncio.Future]] = {}


async def _get_cached_galaxy_context(aio_session: aiohttp.client.ClientSession,
galaxy_server: str) -> GalaxyContext:
context_or_future = _GALAXY_CONTEXT_CACHE.get(galaxy_server)
if context_or_future is not None:
if asyncio.isfuture(context_or_future):
return await context_or_future
return t.cast(GalaxyContext, context_or_future)

loop = asyncio.get_running_loop()
future = loop.create_future()

async def _init():
try:
context = await GalaxyContext.create(aio_session, galaxy_server)
future.set_result(context)
_GALAXY_CONTEXT_CACHE[galaxy_server] = context
except Exception as exc: # pylint: disable=broad-exception-caught
future.set_exception(exc)

loop.create_task(_init())
_GALAXY_CONTEXT_CACHE[galaxy_server] = future
return await future


class GalaxyClient:
"""Class for querying the Galaxy REST API."""

def __init__(self, aio_session: aiohttp.client.ClientSession,
galaxy_server: str = _GALAXY_SERVER_URL) -> None:
galaxy_server: t.Optional[str] = None,
context: t.Optional[GalaxyContext] = None) -> None:
"""
Create a GalaxyClient object to query the Galaxy Server.
:arg aio_session: :obj:`aiohttp.ClientSession` with which to perform all
requests to galaxy.
:kwarg galaxy_server: URL to the galaxy server.
:kwarg galaxy_server: URL to the galaxy server. ``context`` must be provided instead
in the future.
:kwarg context: A ``GalaxyContext`` instance. Must be provided in the future.
"""
if galaxy_server is None and context is None:
# TODO: deprecate
galaxy_server = _GALAXY_SERVER_URL
elif context is not None:
# TODO: deprecate
if galaxy_server is not None and galaxy_server != context.server:
raise ValueError(
f'galaxy_server ({galaxy_server}) does not coincide'
f' with context.server ({context.server})'
)
galaxy_server = context.server
self.galaxy_server = galaxy_server
self.context = context
self.aio_session = aio_session
self.params = {'format': 'json'}
self.headers: t.Dict[str, str] = {'Accept': 'application/json'}
self.params: t.Dict[str, str] = {}
if context:
self._update_from_context(context)

def _update_from_context(self, context: GalaxyContext) -> None:
if context.version == GalaxyVersion.V2:
self.params['format'] = 'json'

async def _get_galaxy_versions(self, versions_url: str) -> list[str]:
async def _ensure_context(self) -> GalaxyContext:
"""
Ensure that ``self.context`` is present.
"""
context = self.context
if context is not None:
return context
if self.galaxy_server is None:
raise RuntimeError('Unexpected None for GalaxyClient.galaxy_server')
context = await _get_cached_galaxy_context(self.aio_session, self.galaxy_server)
self.context = context
self._update_from_context(context)
return context

async def _get_galaxy_versions(self, context: GalaxyContext, versions_url: str,
add_params: bool = True) -> list[str]:
"""
Retrieve the complete list of versions for a collection from a galaxy endpoint.
This internal function retrieves versions for collections from a Galaxy endpoint. If the
information is paged, it continues to retrieve linked pages until all of the information has
been returned.
:arg context: the ``GalaxyContext`` to use.
:arg version_url: url to the page to retrieve.
:arg add_params: used internally during recursion. Do not specify when calling this.
:returns: List of the all the versions of the collection.
"""
params = self.params.copy()
params['page_size'] = '100'
if add_params:
params = self.params.copy()
if context.version == GalaxyVersion.V2:
params['page_size'] = '100'
else:
params['limit'] = '50'
else:
params = None
async with retry_get(self.aio_session, versions_url, params=params,
acceptable_error_codes=[404]) as response:
headers=self.headers, acceptable_error_codes=[404]) as response:
if response.status == 404:
raise NoSuchCollection(f'No collection found at: {versions_url}')
collection_info = await response.json()

versions = []
for version_record in collection_info['results']:
if context.version == GalaxyVersion.V2:
results = collection_info['results']
next_link = collection_info['next']
else:
if 'data' in collection_info:
# Apparently 'data' isn't always used...
results = collection_info['data']
else:
results = collection_info['results']
next_link = collection_info['links']['next']
add_params = False
for version_record in results:
versions.append(version_record['version'])

if collection_info['next']:
versions.extend(await self._get_galaxy_versions(collection_info['next']))
if next_link:
if next_link.startswith('/'):
next_link = urljoin(context.server, next_link)
versions.extend(await self._get_galaxy_versions(context, next_link, add_params))

return versions

Expand All @@ -100,9 +223,11 @@ async def get_versions(self, collection: str) -> list[str]:
:arg collection: Name of the collection to get version info for.
:returns: List of all the versions of this collection on galaxy.
"""
context = await self._ensure_context()

collection = collection.replace('.', '/')
galaxy_url = urljoin(self.galaxy_server, f'api/v2/collections/{collection}/versions/')
retval = await self._get_galaxy_versions(galaxy_url)
galaxy_url = urljoin(context.base_url, f'collections/{collection}/versions/')
retval = await self._get_galaxy_versions(context, galaxy_url)
return retval

async def get_info(self, collection: str) -> dict[str, t.Any]:
Expand All @@ -112,18 +237,23 @@ async def get_info(self, collection: str) -> dict[str, t.Any]:
:arg collection: Namespace.collection to retrieve information about.
:returns: Dictionary of information about the collection.
Please see the Galaxy REST API documentation for information on the structure of the
returned data.
Please see the Galaxy v2 and v3 REST API documentation for information on the
structure of the returned data.
.. seealso::
An example return value from the
`Galaxy REST API <https://galaxy.ansible.com/api/v2/collections/community/general/>`_
`Galaxy v2 REST API
<https://galaxy.ansible.com/api/v2/collections/community/general/>`_
and the `Galaxy v3 REST API
<https://beta-galaxy.ansible.com/api/v3/plugin/ansible/content/published/collections/index/community/general/>`_
"""
context = await self._ensure_context()

collection = collection.replace('.', '/')
galaxy_url = urljoin(self.galaxy_server, f'api/v2/collections/{collection}/')
galaxy_url = urljoin(context.base_url, f'collections/{collection}/')

async with retry_get(self.aio_session, galaxy_url, params=self.params,
acceptable_error_codes=[404]) as response:
headers=self.headers, acceptable_error_codes=[404]) as response:
if response.status == 404:
raise NoSuchCollection(f'No collection found at: {galaxy_url}')
collection_info = await response.json()
Expand All @@ -139,20 +269,23 @@ async def get_release_info(self, collection: str,
:arg version: Version of the collection.
:returns: Dictionary of information about the release.
Please see the Galaxy REST API documentation for information on the structure of the
returned data.
Please see the Galaxy v2 and v3 REST API documentation for information on the
structure of the returned data.
.. seealso::
An example return value from the
`Galaxy REST API
`Galaxy v2 REST API
<https://galaxy.ansible.com/api/v2/collections/community/general/versions/0.1.1>`_
and the `Galaxy v3 REST API
<https://beta-galaxy.ansible.com/api/v3/plugin/ansible/content/published/collections/index/community/general/versions/0.1.1/>`_
"""
context = await self._ensure_context()

collection = collection.replace('.', '/')
galaxy_url = urljoin(self.galaxy_server,
f'api/v2/collections/{collection}/versions/{version}/')
galaxy_url = urljoin(context.base_url, f'collections/{collection}/versions/{version}/')

async with retry_get(self.aio_session, galaxy_url, params=self.params,
acceptable_error_codes=[404]) as response:
headers=self.headers, acceptable_error_codes=[404]) as response:
if response.status == 404:
raise NoSuchCollection(f'No collection found at: {galaxy_url}')
collection_info = await response.json()
Expand Down Expand Up @@ -212,20 +345,23 @@ class CollectionDownloader(GalaxyClient):

def __init__(self, aio_session: aiohttp.client.ClientSession,
download_dir: str,
galaxy_server: str = _GALAXY_SERVER_URL,
collection_cache: str | None = None) -> None:
galaxy_server: t.Optional[str] = None,
collection_cache: str | None = None,
context: t.Optional[GalaxyContext] = None) -> None:
"""
Create an object to download collections from galaxy.
:arg aio_session: :obj:`aiohttp.ClientSession` with which to perform all
requests to galaxy.
:arg download_dir: Directory to download into.
:kwarg galaxy_server: URL to the galaxy server.
:kwarg galaxy_server: URL to the galaxy server. ``context`` must be provided instead
in the future.
:kwarg context: A ``GalaxyContext`` instance. Must be provided in the future.
:kwarg collection_cache: If given, a path to a directory containing collection tarballs.
These tarballs will be used instead of downloading new tarballs provided that the
versions match the criteria (latest compatible version known to galaxy).
"""
super().__init__(aio_session, galaxy_server)
super().__init__(aio_session, galaxy_server=galaxy_server, context=context)
self.download_dir = download_dir
self.collection_cache: t.Final[str | None] = collection_cache

Expand Down
Loading

0 comments on commit af8f28d

Please sign in to comment.