Skip to content

Commit

Permalink
add RedisPubSubBroker (#27)
Browse files Browse the repository at this point in the history
* ✨ feat: add `RedisPubSubBroker`
  • Loading branch information
mic1on authored Nov 26, 2023
1 parent f01855e commit b6e5818
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 8 deletions.
13 changes: 13 additions & 0 deletions example/example_rds_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from onestep import step, RedisPubSubBroker

broker = RedisPubSubBroker(channel="test")


@step(from_broker=broker)
def job_rds(message):
print(message)


if __name__ == '__main__':
step.set_debugging()
step.start(block=True)
3 changes: 2 additions & 1 deletion src/onestep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
)
from .broker import (
BaseBroker, BaseConsumer, BaseLocalBroker, BaseLocalConsumer,
MemoryBroker, RabbitMQBroker, WebHookBroker, CronBroker, RedisStreamBroker
MemoryBroker, RabbitMQBroker, WebHookBroker, CronBroker, RedisStreamBroker, RedisPubSubBroker
)
from .middleware import (
BaseMiddleware, BaseConfigMiddleware,
Expand All @@ -30,6 +30,7 @@
'WebHookBroker',
'CronBroker',
'RedisStreamBroker',
'RedisPubSubBroker',

# retry
'BaseRetry',
Expand Down
2 changes: 1 addition & 1 deletion src/onestep/broker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
from .memory import MemoryBroker
from .webhook import WebHookBroker
from .rabbitmq import RabbitMQBroker
from .redis import RedisStreamBroker
from .redis import RedisStreamBroker, RedisPubSubBroker
from .cron import CronBroker
9 changes: 9 additions & 0 deletions src/onestep/broker/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .stream import RedisStreamBroker, RedisStreamConsumer
from .pubsub import RedisPubSubBroker, RedisPubSubConsumer

__all__ = [
"RedisStreamBroker",
"RedisStreamConsumer",
"RedisPubSubBroker",
"RedisPubSubConsumer"
]
86 changes: 86 additions & 0 deletions src/onestep/broker/redis/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import json
import threading
from queue import Queue
from typing import Any

try:
from usepy_plugin_redis import useRedis
except ImportError:
...

from ..base import BaseBroker, BaseConsumer, Message


class RedisPubSubBroker(BaseBroker):
""" Redis PubSub Broker """

def __init__(self, channel: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel = channel
self.queue = Queue()

self.threads = []

self.client = useRedis(**kwargs).connection

def _consume(self):
def callback(message: dict):
if message.get('type') != 'message':
return
self.queue.put(message)

ps = self.client.pubsub()
ps.subscribe(self.channel)
for message in ps.listen():
callback(message)

def consume(self, *args, **kwargs):
daemon = kwargs.pop('daemon', True)
thread = threading.Thread(target=self._consume, *args, **kwargs)
thread.daemon = daemon
thread.start()
self.threads.append(thread)
return RedisPubSubConsumer(self.queue)

def send(self, message: Any):
"""Publish message to the Redis channel"""
if not isinstance(message, Message):
message = Message(body=message)

print(self.client.publish(self.channel, message.to_json()))

publish = send

def confirm(self, message: Message):
pass

def reject(self, message: Message):
pass

def requeue(self, message: Message, is_source=False):
"""
重发消息:先拒绝 再 重入
:param message: 消息
:param is_source: 是否是原始消息消息,True: 使用原始消息重入当前队列,False: 使用消息的最新数据重入当前队列
"""
self.reject(message)

if is_source:
self.client.publish(self.channel, message.msg['data'])
else:
self.send(message)


class RedisPubSubConsumer(BaseConsumer):
def _to_message(self, data):
if "channel" in data:
try:
message = json.loads(data.get("data")) # 已转换的 message
except (json.JSONDecodeError, TypeError):
message = {"body": data.get("data")} # 未转换的 message
else:
# 来自 外部的消息 直接认为都是 message.body
message = {"body": data.body}

yield Message(body=message.get("body"), extra=message.get("extra"), msg=data)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
except ImportError:
...

from .base import BaseBroker, BaseConsumer, Message
from ..base import BaseBroker, BaseConsumer, Message


class RedisStreamBroker(BaseBroker):
Expand Down Expand Up @@ -76,7 +76,7 @@ def reject(self, message: Message):
def requeue(self, message: Message, is_source=False):
"""
重发消息:先拒绝 再 重入
:param message: 消息
:param is_source: 是否是原始消息消息,True: 使用原始消息重入当前队列,False: 使用消息的最新数据重入当前队列
"""
Expand Down
4 changes: 0 additions & 4 deletions tests/test_broker_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

from onestep.message import Message

try:
from collections import Iterable
except ImportError:
from collections.abc import Iterable
from queue import Queue
import pytest

Expand Down
55 changes: 55 additions & 0 deletions tests/test_broker_redis_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pytest
import threading
import time

from onestep.broker.redis.pubsub import RedisPubSubConsumer

from onestep import RedisPubSubBroker
from onestep.message import Message


# Assuming `your_module` is the module where your broker implementation resides.

@pytest.fixture
def broker():
# Assuming you have a Redis server running locally on the default port.
broker = RedisPubSubBroker(channel="test_channel")
yield broker
broker.client.close() # Cleanup after the test


def test_publish_and_consume(broker):
consumer = broker.consume(daemon=True)

message_to_publish = "Test Message"
broker.publish(message_to_publish)

time.sleep(3)

received_message = next(next(consumer))
assert isinstance(received_message, Message)
assert received_message.body == message_to_publish


def test_redis_consume_multi_messages(broker):
broker.prefetch = 2 # mock prefetch
consumer = broker.consume() # must consume before send, because the message will be lost

broker.send({"body": {"a1": "b1"}})
broker.send({"body": {"a2": "b2"}})

time.sleep(3) # 等待消息取到本地
assert consumer.queue.qsize() == 2 # Ensure that 2 messages are received


def test_requeue(broker):
consumer = broker.consume()
message_body = "Test message"
broker.send(message_body)

received_message = next(next(consumer)) # noqa

broker.requeue(received_message)
time.sleep(2) # 等待消息取到本地
requeued_message = next(next(consumer))
assert requeued_message.body == message_body

0 comments on commit b6e5818

Please sign in to comment.