Skip to content

Commit

Permalink
Try catch tasks (#133)
Browse files Browse the repository at this point in the history
* try await coro

* Update sqs_tasks.py

* test

* fix tests

* rebase

* rebase

* test connectionn error on receive_message

* bump version

Co-authored-by: rogeliolopez <rogelio.lpz94@gmail.com>
Co-authored-by: Felipe López <flh.1989@gmail.com>
  • Loading branch information
3 people authored Dec 20, 2022
1 parent 2961953 commit 9d6e810
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
16 changes: 10 additions & 6 deletions fast_agave/tasks/sqs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from itertools import count
from typing import Callable, Coroutine

from aiobotocore.httpsession import HTTPClientError
from aiobotocore.session import get_session

from ..exc import RetryTask
Expand Down Expand Up @@ -43,16 +44,19 @@ async def start_task(*args, **kwargs) -> None:
session = get_session()
async with session.create_client('sqs', region_name) as sqs:
for _ in count():
response = await sqs.receive_message(
QueueUrl=queue_url,
WaitTimeSeconds=wait_time_seconds,
VisibilityTimeout=visibility_timeout,
AttributeNames=['ApproximateReceiveCount'],
)
try:
response = await sqs.receive_message(
QueueUrl=queue_url,
WaitTimeSeconds=wait_time_seconds,
VisibilityTimeout=visibility_timeout,
AttributeNames=['ApproximateReceiveCount'],
)
messages = response['Messages']
except KeyError:
continue
except HTTPClientError:
await asyncio.sleep(1)
continue

for message in messages:
body = json.loads(message['Body'])
Expand Down
2 changes: 1 addition & 1 deletion fast_agave/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.6.3'
__version__ = '0.7.0'
52 changes: 51 additions & 1 deletion tests/tasks/test_sqs_tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
from unittest.mock import AsyncMock, call
from unittest.mock import AsyncMock, call, patch

import aiobotocore.client
import pytest
from aiobotocore.httpsession import HTTPClientError

from fast_agave.exc import RetryTask
from fast_agave.tasks.sqs_tasks import task
Expand Down Expand Up @@ -54,6 +56,54 @@ async def test_not_execute_tasks(sqs_client) -> None:
assert 'Messages' not in resp


@pytest.mark.asyncio
async def test_http_client_error_tasks(sqs_client) -> None:
"""
Este test prueba el caso cuando hay un error de conexión al intentar
obtener recibir el mensaje del queue. Se maneja correctamente la
excepción `HTTPClientError` para evitar que el loop que consume mensajes
se rompe inesperadamente.
"""

test_message = dict(id='abc123', name='fast-agave')

await sqs_client.send_message(
MessageBody=json.dumps(test_message),
MessageGroupId='1234',
)

original_create_client = aiobotocore.client.AioClientCreator.create_client

# Esta función hace un patch de la función `receive_message` para simular
# un error de conexión, la recuperación de la conexión y posteriores
# recepciones de mensajes sin body del queue.
async def mock_create_client(*args, **kwargs):
client = await original_create_client(*args, **kwargs)
client.receive_message = AsyncMock(
side_effect=[
HTTPClientError(error='[Errno 104] Connection reset by peer'),
await sqs_client.receive_message(),
dict(),
dict(),
dict(),
]
)
return client

async_mock_function = AsyncMock(return_value=None)
with patch(
'aiobotocore.client.AioClientCreator.create_client', mock_create_client
):
await task(
queue_url=sqs_client.queue_url,
region_name=CORE_QUEUE_REGION,
wait_time_seconds=1,
visibility_timeout=3,
max_retries=1,
)(async_mock_function)()
async_mock_function.assert_called_once()


@pytest.mark.asyncio
async def test_retry_tasks_default_max_retries(sqs_client) -> None:
"""
Expand Down

0 comments on commit 9d6e810

Please sign in to comment.