diff --git a/pyproject.toml b/pyproject.toml index 5012958..9b67728 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ asgiref = "^3.6.0" blinker = "^1.5" croniter = "^1.3.8" usepy-plugin-rabbitmq = "^0.1.0" -usepy-plugin-redis = "^0.1.4" +usepy-plugin-redis = "^0.1.5" [tool.poetry.group.dev.dependencies] nacos-sdk-python = "^0.1.12" diff --git a/tests/test_redis_broker.py b/tests/test_redis_broker.py index 974699a..73ede67 100644 --- a/tests/test_redis_broker.py +++ b/tests/test_redis_broker.py @@ -1,77 +1,77 @@ +import time + +import pytest +from queue import Empty, Queue +from onestep.broker.redis import RedisStreamBroker, RedisStreamConsumer, Message + try: from collections import Iterable except ImportError: from collections.abc import Iterable -from queue import Queue -import pytest - -from onestep.broker.redis import RedisStreamBroker, RedisStreamConsumer @pytest.fixture def broker(): - return RedisStreamBroker( - stream='test_stream', - group='test_group', - consumer='test_consumer' - ) + return RedisStreamBroker(stream="test_stream", group="test_group") -# RedisStreamBroker测试用例 -def test_redis_stream_broker(broker): - assert broker.client.connection.ping() - - broker.publish('{"body": {"a": "b"}}') - - # assert broker.client.connection.xlen('test_stream') == 1 - - -def test_redis_stream_broker_consume(broker): - broker.publish('{"body": {"a": "b"}}') +def test_send_and_consume(broker): + broker.client.connection.flushall() # 清空测试数据 + + message_body = {"data": "Test message"} + broker.send(message_body) + consumer = broker.consume() - assert isinstance(consumer, RedisStreamConsumer) - result = next(consumer) - assert isinstance(result, Iterable) - message = next(result) - assert message.body == {'a': 'b'} + received_message = next(next(consumer)) # noqa + assert received_message.body == message_body broker.client.shutdown() -@pytest.fixture -def queue(): - return Queue() - - -def test_redis_consume_message(queue): - mock_message = [(b"1", {b"a": b"b"})] - queue.put(mock_message) - consumer = RedisStreamConsumer(queue) - result = next(consumer) - assert isinstance(result, Iterable) - message = next(result) # noqa - assert message.body == {'a': 'b'} - assert message.msg == mock_message[0] - - def test_redis_consume_multi_messages(broker): + broker.client.connection.flushall() # 清空测试数据 + broker.prefetch = 2 # mock prefetch - broker.publish('{"body": {"a1": "b1"}}') - broker.publish('{"body": {"a2": "b2"}}') + broker.send({"body": {"a1": "b1"}}) + broker.send({"body": {"a2": "b2"}}) + + consumer = broker.consume() + time.sleep(3) # 等待消息取到本地 + assert consumer.queue.qsize() == 2 # Ensure that 2 messages are received + broker.client.shutdown() + +def test_confirm_reject(broker): + broker.client.connection.flushall() # 清空测试数据 + + message_body = "Test message" + broker.send(message_body) + consumer = broker.consume() - data = consumer.queue.get() - assert len(data) == 2 # Ensure that 2 messages are received + received_message = next(next(consumer)) # noqa + + broker.confirm(received_message) + assert next(consumer) is None + + broker.send(message_body) + received_message = next(next(consumer)) # noqa + + broker.reject(received_message) + assert next(consumer) is None broker.client.shutdown() def test_requeue(broker): - broker.publish('{"body": {"a1": "b2"}}') + broker.client.connection.flushall() # 清空测试数据 + + message_body = "Test message" + broker.send(message_body) + consumer = broker.consume() - - assert isinstance(consumer, RedisStreamConsumer) - result = next(consumer) - assert isinstance(result, Iterable) - message = next(result) - broker.requeue(message, is_source=True) + received_message = next(next(consumer)) # noqa + + broker.requeue(received_message) + + requeued_message = next(next(consumer)) + assert requeued_message.body == message_body broker.client.shutdown()