Skip to content

Commit

Permalink
Update test_redis_broker
Browse files Browse the repository at this point in the history
  • Loading branch information
nowanti committed Sep 9, 2023
1 parent 59d7036 commit 7646e71
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ asgiref = "^3.6.0"
blinker = "^1.5"
croniter = "^1.3.8"
usepy-plugin-rabbitmq = "^0.1.0"
usepy-plugin-redis = "^0.1.4"
usepy-plugin-redis = "^0.1.5"

[tool.poetry.group.dev.dependencies]
nacos-sdk-python = "^0.1.12"
Expand Down
104 changes: 52 additions & 52 deletions tests/test_redis_broker.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,77 @@
import time

import pytest
from queue import Empty, Queue
from onestep.broker.redis import RedisStreamBroker, RedisStreamConsumer, Message

try:
from collections import Iterable
except ImportError:
from collections.abc import Iterable
from queue import Queue
import pytest

from onestep.broker.redis import RedisStreamBroker, RedisStreamConsumer


@pytest.fixture
def broker():
return RedisStreamBroker(
stream='test_stream',
group='test_group',
consumer='test_consumer'
)
return RedisStreamBroker(stream="test_stream", group="test_group")


# RedisStreamBroker测试用例
def test_redis_stream_broker(broker):
assert broker.client.connection.ping()

broker.publish('{"body": {"a": "b"}}')

# assert broker.client.connection.xlen('test_stream') == 1


def test_redis_stream_broker_consume(broker):
broker.publish('{"body": {"a": "b"}}')
def test_send_and_consume(broker):
broker.client.connection.flushall() # 清空测试数据

message_body = {"data": "Test message"}
broker.send(message_body)

consumer = broker.consume()

assert isinstance(consumer, RedisStreamConsumer)
result = next(consumer)
assert isinstance(result, Iterable)
message = next(result)
assert message.body == {'a': 'b'}
received_message = next(next(consumer)) # noqa
assert received_message.body == message_body
broker.client.shutdown()


@pytest.fixture
def queue():
return Queue()


def test_redis_consume_message(queue):
mock_message = [(b"1", {b"a": b"b"})]
queue.put(mock_message)
consumer = RedisStreamConsumer(queue)
result = next(consumer)
assert isinstance(result, Iterable)
message = next(result) # noqa
assert message.body == {'a': 'b'}
assert message.msg == mock_message[0]


def test_redis_consume_multi_messages(broker):
broker.client.connection.flushall() # 清空测试数据

broker.prefetch = 2 # mock prefetch
broker.publish('{"body": {"a1": "b1"}}')
broker.publish('{"body": {"a2": "b2"}}')
broker.send({"body": {"a1": "b1"}})
broker.send({"body": {"a2": "b2"}})

consumer = broker.consume()
time.sleep(3) # 等待消息取到本地
assert consumer.queue.qsize() == 2 # Ensure that 2 messages are received
broker.client.shutdown()


def test_confirm_reject(broker):
broker.client.connection.flushall() # 清空测试数据

message_body = "Test message"
broker.send(message_body)

consumer = broker.consume()
data = consumer.queue.get()
assert len(data) == 2 # Ensure that 2 messages are received
received_message = next(next(consumer)) # noqa

broker.confirm(received_message)
assert next(consumer) is None

broker.send(message_body)
received_message = next(next(consumer)) # noqa

broker.reject(received_message)
assert next(consumer) is None
broker.client.shutdown()


def test_requeue(broker):
broker.publish('{"body": {"a1": "b2"}}')
broker.client.connection.flushall() # 清空测试数据

message_body = "Test message"
broker.send(message_body)

consumer = broker.consume()

assert isinstance(consumer, RedisStreamConsumer)
result = next(consumer)
assert isinstance(result, Iterable)
message = next(result)
broker.requeue(message, is_source=True)
received_message = next(next(consumer)) # noqa

broker.requeue(received_message)

requeued_message = next(next(consumer))
assert requeued_message.body == message_body
broker.client.shutdown()

0 comments on commit 7646e71

Please sign in to comment.