A platonic wrapper for Amazon SQS queue service. Abstracts away the implementation details of SQS, providing you with a clean and typed queue interface.
pip install platonic-sqs
from platonic.sqs.queue import SQSSender
numbers_out = SQSSender[int](
url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name',
)
numbers_out.send(15)
numbers_out.send_many([1, 1, 2, 3, 5, 8, 13])
from platonic.sqs.queue import SQSReceiver
from platonic.timeout import ConstantTimeout
from datetime import timedelta
incoming_numbers = SQSReceiver[int](
url='https://sqs.us-west-2.amazonaws.com/123456789012/queue-name',
# Thus we prevent the receiver from blocking forever if queue is empty
timeout=ConstantTimeout(period=timedelta(minutes=3)),
)
# If the queue is empty, this call with block until there is a message.
cmd = incoming_numbers.receive()
assert cmd.value == 15
# Do complicated stuff with the value
print(cmd.value * 1234 + 5767)
incoming_numbers.acknowledge(cmd)
- This project was generated with
wemake-python-package
. Current template version is: c9e9ea8b9be2464cacd00b9c2a438e821da9121b. See what is updated since then. - This project is partially sponsored by Recall Masters - look out for issues with
sponsored
tag.