Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQSPublishOperator should allow sending messages to a FIFO Queue #25171

Merged
merged 2 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions airflow/providers/amazon/aws/hooks/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def send_message(
message_body: str,
delay_seconds: int = 0,
message_attributes: Optional[Dict] = None,
message_group_id: Optional[str] = None,
) -> Dict:
"""
Send message to the queue
Expand All @@ -67,17 +68,23 @@ def send_message(
:param delay_seconds: seconds to delay the message
:param message_attributes: additional attributes for the message (default: None)
For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
:param message_group_id: This applies only to FIFO (first-in-first-out) queues. (default: None)
For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`

:return: dict with the information about the message sent
For details of the returned value see :py:meth:`botocore.client.SQS.send_message`
:rtype: dict
"""
return self.get_conn().send_message(
QueueUrl=queue_url,
MessageBody=message_body,
DelaySeconds=delay_seconds,
MessageAttributes=message_attributes or {},
)
params = {
'QueueUrl': queue_url,
'MessageBody': message_body,
'DelaySeconds': delay_seconds,
'MessageAttributes': message_attributes or {},
}
if message_group_id:
params['MessageGroupId'] = message_group_id

return self.get_conn().send_message(**params)


class SQSHook(SqsHook):
Expand Down
13 changes: 12 additions & 1 deletion airflow/providers/amazon/aws/operators/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,18 @@ class SqsPublishOperator(BaseOperator):
:param message_attributes: additional attributes for the message (default: None)
For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
:param delay_seconds: message delay (templated) (default: 1 second)
:param message_group_id: This parameter applies only to FIFO (first-in-first-out) queues. (default: None)
For details of the attributes parameter see :py:meth:`botocore.client.SQS.send_message`
:param aws_conn_id: AWS connection id (default: aws_default)
"""

template_fields: Sequence[str] = ('sqs_queue', 'message_content', 'delay_seconds', 'message_attributes')
template_fields: Sequence[str] = (
'sqs_queue',
'message_content',
'delay_seconds',
'message_attributes',
'message_group_id',
)
template_fields_renderers = {'message_attributes': 'json'}
ui_color = '#6ad3fa'

Expand All @@ -53,6 +61,7 @@ def __init__(
message_content: str,
message_attributes: Optional[dict] = None,
delay_seconds: int = 0,
message_group_id: Optional[str] = None,
aws_conn_id: str = 'aws_default',
**kwargs,
):
Expand All @@ -62,6 +71,7 @@ def __init__(
self.message_content = message_content
self.delay_seconds = delay_seconds
self.message_attributes = message_attributes or {}
self.message_group_id = message_group_id

def execute(self, context: 'Context'):
"""
Expand All @@ -79,6 +89,7 @@ def execute(self, context: 'Context'):
message_body=self.message_content,
delay_seconds=self.delay_seconds,
message_attributes=self.message_attributes,
message_group_id=self.message_group_id,
)

self.log.info('send_message result: %s', result)
Expand Down
33 changes: 33 additions & 0 deletions tests/providers/amazon/aws/operators/test_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import unittest
from unittest.mock import MagicMock

import pytest
from botocore.exceptions import ClientError
from moto import mock_sqs

from airflow.models.dag import DAG
Expand All @@ -32,6 +34,9 @@
QUEUE_NAME = 'test-queue'
QUEUE_URL = f'https://{QUEUE_NAME}'

FIFO_QUEUE_NAME = 'test-queue.fifo'
FIFO_QUEUE_URL = f'https://{FIFO_QUEUE_NAME}'


class TestSqsPublishOperator(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -66,3 +71,31 @@ def test_execute_success(self):
context_calls = []

assert self.mock_context['ti'].method_calls == context_calls, "context call should be same"

@mock_sqs
def test_execute_failure_fifo_queue(self):
self.operator.sqs_queue = FIFO_QUEUE_URL
self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'})
with pytest.raises(ClientError) as ctx:
self.operator.execute(self.mock_context)
err_msg = (
"An error occurred (MissingParameter) when calling the SendMessage operation: The request must "
"contain the parameter MessageGroupId."
)
assert err_msg == str(ctx.value)

@mock_sqs
def test_execute_success_fifo_queue(self):
self.operator.sqs_queue = FIFO_QUEUE_URL
self.operator.message_group_id = "abc"
self.sqs_hook.create_queue(FIFO_QUEUE_NAME, attributes={'FifoQueue': 'true'})
result = self.operator.execute(self.mock_context)
assert 'MD5OfMessageBody' in result
assert 'MessageId' in result
message = self.sqs_hook.get_conn().receive_message(
QueueUrl=FIFO_QUEUE_URL, AttributeNames=['MessageGroupId']
)
assert len(message['Messages']) == 1
assert message['Messages'][0]['MessageId'] == result['MessageId']
assert message['Messages'][0]['Body'] == 'hello'
assert message['Messages'][0]['Attributes']['MessageGroupId'] == 'abc'