From 9d6e810ba475830ecb8789b324126b26fee7367b Mon Sep 17 00:00:00 2001 From: Pach Date: Mon, 19 Dec 2022 22:46:59 -0600 Subject: [PATCH] Try catch tasks (#133) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * try await coro * Update sqs_tasks.py * test * fix tests * rebase * rebase * test connectionn error on receive_message * bump version Co-authored-by: rogeliolopez Co-authored-by: Felipe López --- fast_agave/tasks/sqs_tasks.py | 16 +++++++---- fast_agave/version.py | 2 +- tests/tasks/test_sqs_tasks.py | 52 ++++++++++++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/fast_agave/tasks/sqs_tasks.py b/fast_agave/tasks/sqs_tasks.py index 9e95ed8..0115e10 100644 --- a/fast_agave/tasks/sqs_tasks.py +++ b/fast_agave/tasks/sqs_tasks.py @@ -4,6 +4,7 @@ from itertools import count from typing import Callable, Coroutine +from aiobotocore.httpsession import HTTPClientError from aiobotocore.session import get_session from ..exc import RetryTask @@ -43,16 +44,19 @@ async def start_task(*args, **kwargs) -> None: session = get_session() async with session.create_client('sqs', region_name) as sqs: for _ in count(): - response = await sqs.receive_message( - QueueUrl=queue_url, - WaitTimeSeconds=wait_time_seconds, - VisibilityTimeout=visibility_timeout, - AttributeNames=['ApproximateReceiveCount'], - ) try: + response = await sqs.receive_message( + QueueUrl=queue_url, + WaitTimeSeconds=wait_time_seconds, + VisibilityTimeout=visibility_timeout, + AttributeNames=['ApproximateReceiveCount'], + ) messages = response['Messages'] except KeyError: continue + except HTTPClientError: + await asyncio.sleep(1) + continue for message in messages: body = json.loads(message['Body']) diff --git a/fast_agave/version.py b/fast_agave/version.py index a68d2bd..a71c5c7 100644 --- a/fast_agave/version.py +++ b/fast_agave/version.py @@ -1 +1 @@ -__version__ = '0.6.3' +__version__ = '0.7.0' diff --git a/tests/tasks/test_sqs_tasks.py b/tests/tasks/test_sqs_tasks.py index a38972a..0a175e2 100644 --- a/tests/tasks/test_sqs_tasks.py +++ b/tests/tasks/test_sqs_tasks.py @@ -1,7 +1,9 @@ import json -from unittest.mock import AsyncMock, call +from unittest.mock import AsyncMock, call, patch +import aiobotocore.client import pytest +from aiobotocore.httpsession import HTTPClientError from fast_agave.exc import RetryTask from fast_agave.tasks.sqs_tasks import task @@ -54,6 +56,54 @@ async def test_not_execute_tasks(sqs_client) -> None: assert 'Messages' not in resp +@pytest.mark.asyncio +async def test_http_client_error_tasks(sqs_client) -> None: + """ + Este test prueba el caso cuando hay un error de conexión al intentar + obtener recibir el mensaje del queue. Se maneja correctamente la + excepción `HTTPClientError` para evitar que el loop que consume mensajes + se rompe inesperadamente. + """ + + test_message = dict(id='abc123', name='fast-agave') + + await sqs_client.send_message( + MessageBody=json.dumps(test_message), + MessageGroupId='1234', + ) + + original_create_client = aiobotocore.client.AioClientCreator.create_client + + # Esta función hace un patch de la función `receive_message` para simular + # un error de conexión, la recuperación de la conexión y posteriores + # recepciones de mensajes sin body del queue. + async def mock_create_client(*args, **kwargs): + client = await original_create_client(*args, **kwargs) + client.receive_message = AsyncMock( + side_effect=[ + HTTPClientError(error='[Errno 104] Connection reset by peer'), + await sqs_client.receive_message(), + dict(), + dict(), + dict(), + ] + ) + return client + + async_mock_function = AsyncMock(return_value=None) + with patch( + 'aiobotocore.client.AioClientCreator.create_client', mock_create_client + ): + await task( + queue_url=sqs_client.queue_url, + region_name=CORE_QUEUE_REGION, + wait_time_seconds=1, + visibility_timeout=3, + max_retries=1, + )(async_mock_function)() + async_mock_function.assert_called_once() + + @pytest.mark.asyncio async def test_retry_tasks_default_max_retries(sqs_client) -> None: """