Skip to content

Commit

Permalink
Fix dropmessage (#25)
Browse files Browse the repository at this point in the history
* 🦄 refactor: rename `RetryVia` -> `RetryIn`

* 🦄 refactor: BaseWorker

* 🦄 refactor: worker

* 🐞 fix: type hit
  • Loading branch information
mic1on authored Nov 23, 2023
1 parent 7587c69 commit fce1efa
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 57 deletions.
4 changes: 2 additions & 2 deletions example/example_advanced_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from onestep import step
from onestep.broker import MemoryBroker, RabbitMQBroker
from onestep.exception import RetryViaLocal, RetryViaQueue
from onestep.exception import RetryInLocal, RetryInQueue
from onestep.retry import AdvancedRetry

from loguru import logger
Expand Down Expand Up @@ -67,7 +67,7 @@ def do_something(message):
# if message.body.get("id") == 1:
# raise RetryViaLocal("Invalid id")
if message.body.get("id") == 2:
raise RetryViaQueue("Invalid id", times=2)
raise RetryInQueue("Invalid id", times=2)
# elif message.body.get("id") == 3:
# raise ValueError("Invalid id")
else:
Expand Down
6 changes: 3 additions & 3 deletions example/example_memory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from onestep import step
from onestep.broker import MemoryBroker
from onestep.exception import RetryViaQueue, DropMessage
from onestep.exception import RetryInQueue, DropMessage
from onestep.retry import AdvancedRetry

todo_broker = MemoryBroker()
Expand Down Expand Up @@ -32,14 +32,14 @@ def build_todo_list():


@step(from_broker=todo_broker, workers=1,
# retry=AdvancedRetry()
retry=AdvancedRetry()
)
def do_something(todo):
print(todo)
todo.body["status"] = "done"
if todo.body["id"] == 2:
todo.body["id"] = 21
raise RetryViaQueue("test requeue")
raise RetryInQueue("test requeue")
elif todo.body["id"] == 3:
raise DropMessage("test reject")
else:
Expand Down
6 changes: 3 additions & 3 deletions src/onestep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from .exception import (
StopMiddleware, DropMessage,
RetryException, RetryViaQueue, RetryViaLocal
RetryException, RetryInQueue, RetryInLocal
)

__all__ = [
Expand Down Expand Up @@ -54,6 +54,6 @@
'StopMiddleware',
'DropMessage',
'RetryException',
'RetryViaQueue',
'RetryViaLocal'
'RetryInQueue',
'RetryInLocal'
]
4 changes: 2 additions & 2 deletions src/onestep/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ def __init__(self, message=None, times=None, **kwargs):
self.kwargs = kwargs


class RetryViaQueue(RetryException):
class RetryInQueue(RetryException):
"""消息重试-通过重试队列
抛出此异常,消息将被重新放入队列,等待下次消费。
"""


class RetryViaLocal(RetryException):
class RetryInLocal(RetryException):
"""消息重试-本地
不经过队列,直接在本地重试,直到达到重试次数。
Expand Down
12 changes: 6 additions & 6 deletions src/onestep/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from typing import Optional, Tuple, Union, Type

from .exception import RetryViaLocal, RetryViaQueue
from .exception import RetryInLocal, RetryInQueue
from .message import Message


Expand Down Expand Up @@ -56,21 +56,21 @@ def __call__(self, message) -> Optional[RetryStatus]:
class AdvancedRetry(TimesRetry):
"""高级重试策略
1. 本地重试:如果异常是 RetryViaLocal 或 指定异常,且重试次数未达到上限,则本地重试,不回调
2. 队列重试:如果异常是 RetryViaQueue,且重试次数未达到上限,则入队重试,不回调
3. 其他异常:如果异常不是 RetryViaLocalRetryViaQueue 或 指定异常,则不重试,回调
1. 本地重试:如果异常是 RetryInLocal 或 指定异常,且重试次数未达到上限,则本地重试,不回调
2. 队列重试:如果异常是 RetryInQueue,且重试次数未达到上限,则入队重试,不回调
3. 其他异常:如果异常不是 RetryInLocalRetryInQueue 或 指定异常,则不重试,回调
注:待重试的异常若继承自 RetryException,则可单独指定重试次数,否则默认为 3 次
"""

def __init__(self, times: int = 3, exceptions: Optional[Tuple[Union[Exception, Type]]] = None):
super().__init__(times=times)
self.exceptions = (RetryViaLocal, RetryViaQueue) + (exceptions or ())
self.exceptions = (RetryInLocal, RetryInQueue) + (exceptions or ())

def __call__(self, message: Message) -> Optional[RetryStatus]:
if isinstance(message.exception.exc_value, self.exceptions):
max_retry_times = getattr(message.exception.exc_value, "times", None) or self.times
if message.failure_count < max_retry_times:
if isinstance(message.exception.exc_value, RetryViaQueue):
if isinstance(message.exception.exc_value, RetryInQueue):
return RetryStatus.END_IGNORE_CALLBACK
return RetryStatus.CONTINUE
else:
Expand Down
78 changes: 39 additions & 39 deletions src/onestep/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
将指定的函数放入线程中运行
"""
from concurrent.futures import ThreadPoolExecutor
from typing import Dict
from typing import Dict, Iterable

try:
from collections import Iterable
except ImportError:
from collections.abc import Iterable
import logging
import threading
from asyncio import iscoroutinefunction
from inspect import isasyncgenfunction

from asgiref.sync import async_to_sync

from .message import Message
from .retry import RetryStatus
from .broker import BaseBroker
from .exception import DropMessage
Expand Down Expand Up @@ -48,46 +45,43 @@ def shutdown(self):
"""关闭 Worker"""
raise NotImplementedError

def _receive_message(self):
def receive_messages(self) -> Iterable[Message]:
""" 从broker中获取消息 """
for result in self.broker.consume():
if self._shutdown:
break
if result is None:
continue
messages = (
result
if isinstance(result, Iterable)
else [result]
)
for message in messages:
message.broker = message.broker or self.broker
logger.debug(f"{self.instance.name} receive message<{message}> from {self.broker!r}")
message_received.send(self, message=message)
try:
self.instance.before_emit("consume", message=message)
self._run_instance(message)
self.instance.after_emit("consume", message=message)
except DropMessage as e:
message_drop.send(self, message=message, reason=e)
logger.warning(f"{self.instance.name} dropped <{type(e).__name__}: {str(e)}>")
message.reject()
finally:
# When message is triggered by cancel_consume, it will be shutdown
if self.broker.cancel_consume and self.broker.cancel_consume(message):
self.shutdown()
else:
if self.broker.once:
self.shutdown()

def _run_instance(self, message):
"""执行实例的逻辑"""
messages = result if isinstance(result, Iterable) else [result]
yield from messages
# when broker is once, it will shut down after receive a message
if self.broker.once:
self.shutdown()

def _run_real_instance(self, message: Message) -> None:
""" 执行实例的逻辑 """
if iscoroutinefunction(self.instance.fn) or isasyncgenfunction(self.instance.fn):
async_to_sync(self.instance)(message, *self.args, **self.kwargs)
else:
self.instance(message, *self.args, **self.kwargs)

def handle_message(self, message: Message):
""" 处理消息 """
message.broker = message.broker or self.broker
logger.debug(f"{self.instance.name} receive message<{message}> from {self.broker!r}")
message_received.send(self, message=message)
try:
if iscoroutinefunction(self.instance.fn) or isasyncgenfunction(self.instance.fn):
async_to_sync(self.instance)(message, *self.args, **self.kwargs)
else:
self.instance(message, *self.args, **self.kwargs)
self.instance.before_emit("consume", message=message)

self._run_real_instance(message)
message_consumed.send(self, message=message)
message.confirm()

self.instance.after_emit("consume", message=message)
except DropMessage as e:
message_drop.send(self, message=message, reason=e)
logger.warning(f"{self.instance.name} dropped <{type(e).__name__}: {str(e)}>")
message.reject()
except Exception as e:
message_error.send(self, message=message, error=e)
if self.instance.state.debug:
Expand All @@ -104,6 +98,10 @@ def _run_instance(self, message):
elif retry_status is RetryStatus.END_IGNORE_CALLBACK:
# 由于是队列内重试,不会触发错误回调
message.requeue()
finally:
# When message is triggered by cancel_consume, it will be shutdown
if self.broker.cancel_consume and self.broker.cancel_consume(message):
self.shutdown()


class ThreadWorker(BaseWorker):
Expand Down Expand Up @@ -134,7 +132,8 @@ def run(self):
if ThreadWorker.broker_exit.get(self.broker, False):
self.shutdown()
break
self._receive_message()
for message in self.receive_messages():
self.handle_message(message)

def shutdown(self):
ThreadWorker.broker_exit[self.broker] = True
Expand Down Expand Up @@ -164,7 +163,8 @@ def run(self):
if ThreadPoolWorker.broker_exit.get(self.broker, False):
self.shutdown()
break
self._receive_message()
for message in self.receive_messages():
self.handle_message(message)

def shutdown(self):
"""关闭线程池 Worker"""
Expand Down
4 changes: 2 additions & 2 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pytest import fixture
from onestep.message import Message
from onestep import NeverRetry, AlwaysRetry, TimesRetry, RetryIfException, RetryViaQueue
from onestep import NeverRetry, AlwaysRetry, TimesRetry, RetryIfException, RetryInQueue
from onestep.retry import RetryStatus, AdvancedRetry


Expand Down Expand Up @@ -56,7 +56,7 @@ def test_AdvancedRetry(message):
message.failure_count = 1
assert RetryStatus.CONTINUE is retry_class(message)
try:
raise RetryViaQueue()
raise RetryInQueue()
except Exception as e:
message.set_exception()
assert message.failure_count == 2
Expand Down
99 changes: 99 additions & 0 deletions tests/test_worker_receive_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from typing import Generator
import pytest

from onestep import MemoryBroker, BaseMiddleware, DropMessage
from onestep.message import Message
from onestep.onestep import BaseOneStep, SyncOneStep
from onestep.worker import BaseWorker, ThreadWorker


@pytest.fixture
def step():
return SyncOneStep(fn=lambda message: message)


@pytest.fixture
def broker():
return MemoryBroker()


@pytest.fixture
def worker(step, broker):
wt = BaseWorker(step, broker)
yield wt


@pytest.fixture
def thread_worker(step, broker):
wt = ThreadWorker(step, broker)
yield wt


def test_receive_messages_shutdown(worker, broker):
worker._shutdown = True
assert isinstance(worker.receive_messages(), Generator)


def test_receive_messages(worker, broker):
broker.publish({"a": "b"})
for message in worker.receive_messages():
assert isinstance(message, Message)
assert message.body == {"a": "b"}
break


def test_receive_messages_while_once(thread_worker, broker):
broker.publish({"a": "b"})
broker.once = True
for message in thread_worker.receive_messages():
assert message.body == {"a": "b"}
assert thread_worker._shutdown is True


def test_receive_messages_while_not_once(thread_worker, broker):
broker.publish({"a": "b"})
broker.once = False
for message in thread_worker.receive_messages():
assert message.body == {"a": "b"}
break
assert thread_worker._shutdown is False


def test_receive_messages_consume(thread_worker, broker):
broker.publish({"a": "b"})
for message in thread_worker.receive_messages():
assert message.body == {"a": "b"}
break


def test_handle_message_consume_exception(step, thread_worker, broker):
class test_middleware(BaseMiddleware):
def before_consume(self, step, message, *args, **kwargs):
message.drop = True
raise DropMessage('drop message')

step.middlewares = [test_middleware()]
message = Message(body={"a": "b"})
thread_worker.handle_message(message)
assert message.drop is True


def test_handle_message(step, thread_worker, broker):
class test_middleware(BaseMiddleware):
def before_consume(self, step, message, *args, **kwargs):
assert message.body == {"a": "b"}

def after_consume(self, step, message, *args, **kwargs):
assert message.body == {"a": "c"}

step.middlewares = [test_middleware()]
message = Message(body={"a": "b"})
step.fn = lambda msg: msg.body.update({"a": "c"})
thread_worker.handle_message(message)


def test__run_real_instance(step, thread_worker, broker):
step.fn = lambda msg: msg.body.update({"a": "c"})
message = Message(body={"a": "b"})
thread_worker._run_real_instance(message)
assert message.body == {"a": "c"}

0 comments on commit fce1efa

Please sign in to comment.