Skip to content

Commit

Permalink
feat: adding async client code via switch to httpx (#167) (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe authored Dec 11, 2023
1 parent d84db77 commit 7b76770
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 148 deletions.
304 changes: 217 additions & 87 deletions clinvar_api/client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""REST API client code for communicating with server endpoints."""

import asyncio
import contextlib
import json
import typing

import httpx
from jsonschema import ValidationError
from logzero import logger
from pydantic import BaseModel, SecretStr
from pydantic.config import ConfigDict
import requests

from clinvar_api import common, exceptions, models, msg, schemas

Expand Down Expand Up @@ -42,63 +44,98 @@ class Config(BaseModel):
verify_ssl: bool = True


class _SubmitData:
"""Helper class to reduce redundancy betwee sync/async `sumbit_data`."""

def __init__(self, submission_container: models.SubmissionContainer, config: Config):
self.submission_container = submission_container
self.config = config

self.url: str = ""
self.headers: typing.Dict[str, str] = {}
self.post_data: typing.Dict[str, typing.Any] = {}

def before_post(self) -> typing.Tuple[str, typing.Dict[str, str], typing.Dict[str, typing.Any]]:
logger.info("Submitting with config %s", self.config)

url_prefix = ENDPOINT_URL_TEST if self.config.use_testing else ENDPOINT_URL_PROD
url_suffix = SUFFIX_DRYRUN if self.config.use_dryrun else ""
url = f"{url_prefix}{url_suffix}"
logger.debug("Will submit to URL %s", url)
headers = {
"SP-API-KEY": self.config.auth_token.get_secret_value(),
}

payload = self.submission_container.to_msg().model_dump(mode="json")
logger.debug("Payload data is %s", json.dumps(payload, indent=2))
cleaned_payload = common.clean_for_json(payload)
logger.debug("Cleaned payload data is %s", json.dumps(cleaned_payload, indent=2))
if self.config.presubmission_validation:
logger.info("Validating payload...")
schemas.validate_submission_payload(cleaned_payload)
logger.info("... done validating payload")
else:
logger.info("Configured to NOT validate payload before submission")

post_data = {
"actions": [
{"type": "AddData", "targetDb": "clinvar", "data": {"content": cleaned_payload}}
]
}
logger.debug("Overall POST payload is %s", post_data)
return url, headers, post_data

def after_post(self, response: httpx.Response):
if httpx.codes.is_success(response.status_code):
logger.info("API returned OK - %s", response.status_code)
if response.status_code == 204: # no content, on dry-run
logger.info("Server returned '204: No Content', constructing fake created message.")
return models.Created(id="--NONE--dry-run-result--")
else:
created_msg = msg.Created.model_validate_json(response.content)
return models.Created.from_msg(created_msg)
else:
logger.warning("API returned an error - %s", response.status_code)
error_msg = msg.Error.model_validate_json(response.content)
error_obj = models.Error.from_msg(error_msg)
logger.debug("Full server response is %s", response.json())
if hasattr(error_obj, "errors"):
raise exceptions.SubmissionFailed(
f"ClinVar submission failed: {error_obj.message}, errors: {error_obj.errors}"
)
else:
raise exceptions.SubmissionFailed(f"ClinVar submission failed: {error_obj.message}")


def submit_data(submission_container: models.SubmissionContainer, config: Config) -> models.Created:
"""Submit new data to ClinVar API.
"""Submit new data to ClinVar API (sync).
:param payload: The submission data.
:param submission_container: The submission data.
:param config: The connfiguration to use.
:return: The information about the created submission.
:raises exceptions.SubmissionFailed: on problems with the submission.
"""
logger.info("Submitting with config %s", config)

url_prefix = ENDPOINT_URL_TEST if config.use_testing else ENDPOINT_URL_PROD
url_suffix = SUFFIX_DRYRUN if config.use_dryrun else ""
url = f"{url_prefix}{url_suffix}"
logger.debug("Will submit to URL %s", url)
headers = {
"SP-API-KEY": config.auth_token.get_secret_value(),
}

payload = submission_container.to_msg().model_dump(mode="json")
logger.debug("Payload data is %s", json.dumps(payload, indent=2))
cleaned_payload = common.clean_for_json(payload)
logger.debug("Cleaned payload data is %s", json.dumps(cleaned_payload, indent=2))
if config.presubmission_validation:
logger.info("Validating payload...")
schemas.validate_submission_payload(cleaned_payload)
logger.info("... done validating payload")
else:
logger.info("Configured to NOT validate payload before submission")

post_data = {
"actions": [
{"type": "AddData", "targetDb": "clinvar", "data": {"content": cleaned_payload}}
]
}
logger.debug("Overall POST payload is %s", post_data)

response = requests.post(url, headers=headers, json=post_data, verify=config.verify_ssl)

if response.ok:
logger.info("API returned OK - %s: %s", response.status_code, response.reason)
if response.status_code == 204: # no content, on dry-run
logger.info("Server returned '204: No Content', constructing fake created message.")
return models.Created(id="--NONE--dry-run-result--")
else:
created_msg = msg.Created.model_validate_json(response.content)
return models.Created.from_msg(created_msg)
else:
logger.warning("API returned an error - %s: %s", response.status_code, response.reason)
error_msg = msg.Error.model_validate_json(response.content)
error_obj = models.Error.from_msg(error_msg)
logger.debug("Full server response is %s", response.json())
if hasattr(error_obj, "errors"):
raise exceptions.SubmissionFailed(
f"ClinVar submission failed: {error_obj.message}, errors: {error_obj.errors}"
)
else:
raise exceptions.SubmissionFailed(f"ClinVar submission failed: {error_obj.message}")
helper = _SubmitData(submission_container, config)
url, headers, post_data = helper.before_post()
response = httpx.post(url, headers=headers, json=post_data, verify=config.verify_ssl)
return helper.after_post(response)


async def async_submit_data(
submission_container: models.SubmissionContainer, config: Config
) -> models.Created:
"""Submit new data to ClinVar API via async API (async).
:param submission_container: The submission data.
:param config: The connfiguration to use.
:return: The information about the created submission.
:raises exceptions.SubmissionFailed: on problems with the submission.
"""
helper = _SubmitData(submission_container, config)
url, headers, post_data = helper.before_post()
async with httpx.AsyncClient(verify=config.verify_ssl) as client:
response = await client.post(url, headers=headers, json=post_data)
return helper.after_post(response)


class RetrieveStatusResult(BaseModel):
Expand All @@ -112,12 +149,11 @@ class RetrieveStatusResult(BaseModel):
summaries: typing.Dict[str, models.SummaryResponse]


def _retrieve_status_summary(
url: str, validate_response_json: bool = True
def _handle_retrieved_status_summaries(
response: httpx.Response, validate_response_json: bool = True
) -> models.SummaryResponse:
"""Retrieve status summary from the given URL."""
response = requests.get(url)
if response.ok:
"""Handle retrieved status summary from the given URL."""
if httpx.codes.is_success(response.status_code):
response_json = response.json()
if validate_response_json:
logger.debug("Validating status summary response ...")
Expand All @@ -129,32 +165,35 @@ def _retrieve_status_summary(
sr_msg = msg.SummaryResponse.model_validate_json(response.content)
return models.SummaryResponse.from_msg(sr_msg)
else:
raise exceptions.QueryFailed(
f"Could not perform query: {response.status_code} {response.reason}"
)
raise exceptions.QueryFailed(f"Could not perform query: {response.status_code}")


def retrieve_status(
submission_id: str,
config: Config,
) -> RetrieveStatusResult:
"""Retrieve submission status from API.
class _RetrieveStatus:
"""Helper class to reduce redundancy betwee sync/async `retrieve_status`."""

:param submission_id: The identifier of the submission as returned earlier from API.
:param config: The connfiguration to use.
:return: The information about the created submission.
:raises exceptions.QueryFailed: on problems with the communication to the server.
"""
url_prefix = ENDPOINT_URL_TEST if config.use_testing else ENDPOINT_URL_PROD
url_suffix = SUFFIX_DRYRUN if config.use_dryrun else ""
url = f"{url_prefix}{submission_id}/actions/{url_suffix}"
headers = {
"SP-API-KEY": config.auth_token.get_secret_value(),
}
logger.debug("Will query URL %s", url)
response = requests.get(url, headers=headers)
if response.ok:
logger.info("API returned OK - %s: %s", response.status_code, response.reason)
def __init__(self, submission_id: str, config: Config):
self.submission_id = submission_id
self.config = config

def before_first_get(self) -> typing.Tuple[str, typing.Dict[str, str]]:
url_prefix = ENDPOINT_URL_TEST if self.config.use_testing else ENDPOINT_URL_PROD
url_suffix = SUFFIX_DRYRUN if self.config.use_dryrun else ""
url = f"{url_prefix}{self.submission_id}/actions/{url_suffix}"
headers = {
"SP-API-KEY": self.config.auth_token.get_secret_value(),
}
logger.debug("Will query URL %s", url)
return url, headers

def after_first_get_failure(self, response: httpx.Response):
logger.info("API returned an error %s", response.status_code, response)
response_json = response.json()
raise exceptions.QueryFailed(f"ClinVar query failed: {response_json}")

def after_first_get_success(
self, response: httpx.Response
) -> typing.Tuple[typing.List[str], models.SubmissionStatus]:
logger.info("API returned OK - %s", response.status_code)
logger.debug("Structuring response ...")
status_msg = msg.SubmissionStatus.model_validate_json(response.content)
logger.debug(
Expand All @@ -174,22 +213,88 @@ def retrieve_status(
]
),
)
summaries = {}
more_urls: typing.List[str] = []
for action in status_obj.actions:
for action_response in action.responses:
for file_ in action_response.files:
logger.info(" - fetching %s", file_.url)
summaries[file_.url] = _retrieve_status_summary(file_.url)
more_urls.append(file_.url)
return more_urls, status_obj

def after_get_more_urls(
self, status_obj: models.SubmissionStatus, more_results: typing.Dict[str, httpx.Response]
) -> RetrieveStatusResult:
summaries = {}
for url, response in more_results.items():
summaries[url] = _handle_retrieved_status_summaries(response)
logger.info("... done fetching status summary files")
return RetrieveStatusResult(status=status_obj, summaries=summaries)


def retrieve_status(
submission_id: str,
config: Config,
) -> RetrieveStatusResult:
"""Retrieve submission status from API.
:param submission_id: The identifier of the submission as returned earlier from API.
:param config: The connfiguration to use.
:return: The information about the created submission.
:raises exceptions.QueryFailed: on problems with the communication to the server.
"""
helper = _RetrieveStatus(submission_id, config)
url, headers = helper.before_first_get()
response = httpx.get(url, headers=headers)
if httpx.codes.is_success(response.status_code):
more_urls, status_obj = helper.after_first_get_success(response)

more_results = {}
for url in more_urls:
logger.info(" - fetching %s", url)
more_results[url] = httpx.get(url)

return helper.after_get_more_urls(status_obj, more_results)
else:
logger.info("API returned an error %s: %s", response.status_code, response.reason)
response_json = response.json()
raise exceptions.QueryFailed(f"ClinVar query failed: {response_json}")
return helper.after_first_get_failure(response)


async def async_retrieve_status(
submission_id: str,
config: Config,
) -> RetrieveStatusResult:
"""Retrieve submission status from API.
:param submission_id: The identifier of the submission as returned earlier from API.
:param config: The connfiguration to use.
:return: The information about the created submission.
:raises exceptions.QueryFailed: on problems with the communication to the server.
"""
helper = _RetrieveStatus(submission_id, config)
url, headers = helper.before_first_get()
async with httpx.AsyncClient(verify=config.verify_ssl) as client:
response = await client.get(url, headers=headers)
if httpx.codes.is_success(response.status_code):
more_urls, status_obj = helper.after_first_get_success(response)

async with contextlib.AsyncExitStack() as stack:
tasks: typing.Dict[str, typing.Awaitable[httpx.Response]] = {}
for url in more_urls:
logger.info(" - fetching %s", url)
client = await stack.enter_async_context(
httpx.AsyncClient(verify=config.verify_ssl)
)
tasks[url] = client.get(url)

more_results: typing.Dict[str, httpx.Response] = dict(
zip(tasks.keys(), await asyncio.gather(*tasks.values()))
)

return helper.after_get_more_urls(status_obj, more_results)
else:
return helper.after_first_get_failure(response)


class Client:
"""NCBI ClinVar REST API client."""
"""NCBI ClinVar REST API client (sync)."""

def __init__(self, config: Config):
self.config = config
Expand All @@ -211,3 +316,28 @@ def retrieve_status(self, submission_id: str) -> RetrieveStatusResult:
:raises exceptions.QueryFailed: on problems with the communication to the server.
"""
return retrieve_status(submission_id, self.config)


class AsyncClient:
"""NCBI ClinVar REST API client (async)."""

def __init__(self, config: Config):
self.config = config

async def submit_data(self, payload: models.SubmissionContainer) -> models.Created:
"""Submit new data to ClinVar API.
:param payload: The submission data.
:return: The information about the created submission.
:raises exceptions.SubmissionFailed: on problems with the submission.
"""
return await async_submit_data(payload, self.config)

async def retrieve_status(self, submission_id: str) -> RetrieveStatusResult:
"""Retrieve submission status from API.
:param submission_id: The identifier of the submission as returned earlier from API.
:return: The information about the created submission.
:raises exceptions.QueryFailed: on problems with the communication to the server.
"""
return await async_retrieve_status(submission_id, self.config)
2 changes: 1 addition & 1 deletion docs/api_vs_cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ A Python module ``clinvar_api`` that you can use for making calls to the ClinVar
**If you want to integrate ClinVar API submission into your Python software, this is for you.**

The module provides a "Pythonic" API based on `pydantic <https://pydantic.dev/>`__ with ``snake_case`` syntax that has full Python type annotations.
Of course, you could just roll your own JSON based submission based on Python requests but using the module has some advantages:
Of course, you could just roll your own JSON based submission based on ``requests`` or ``httpx`` but using the module has some advantages:

* ``clinvar_api`` is fully typed so you can work with Python data types and all advantages that come this this (linting, editor completion, ...)
* the module authors monitor the ClinVar API docs and are on the relevant mailing lists and will adjust the library in case of API changes
Expand Down
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
logzero >=1.7.0, <2.0
requests >=2.28.1, <3.0
httpx >=0.24
pydantic >=2.5, <3.0
python-dateutil >=2.8.2, <3.0
click >=8.1.3, <9.0
Expand Down
Loading

0 comments on commit 7b76770

Please sign in to comment.