Skip to content

Commit

Permalink
Merge pull request #23 from mic1on/fix_CronBroker
Browse files Browse the repository at this point in the history
Fix cron broker
  • Loading branch information
mic1on authored Nov 22, 2023
2 parents 3a09cd6 + 9e8b868 commit 3406163
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
9 changes: 5 additions & 4 deletions example/example_cron.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from onestep import step, CronBroker

a = CronBroker("* * * * * */3", a=1)
once_broker = CronBroker("* * * * * */3", once=True)
cron_broker = CronBroker("* * * * * */3", body="hi cron")


@step(from_broker=a, workers=3)
@step(from_broker=[once_broker, cron_broker], workers=3)
def cron_task(message):
print(message)
return message


if __name__ == '__main__':
step.set_debugging()
step.start()
step.shutdown()
step.start(block=True)
# step.shutdown()
9 changes: 5 additions & 4 deletions src/onestep/broker/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import threading
from datetime import datetime
from typing import Any

from croniter import croniter

Expand All @@ -15,17 +16,17 @@
class CronBroker(BaseLocalBroker):
_thread = None

def __init__(self, cron, name=None, middlewares=None, **kwargs):
super().__init__(name=name, middlewares=middlewares)
def __init__(self, cron, name=None, middlewares=None, body: Any = None, *args, **kwargs):
super().__init__(name=name, middlewares=middlewares, *args, **kwargs)
self.cron = cron
self.itr = croniter(cron, datetime.now())
self.next_fire_time = self.itr.get_next(datetime)
self.kwargs = kwargs
self.body = body

def _scheduler(self):
if self.next_fire_time <= datetime.now():
self.next_fire_time = self.itr.get_next(datetime)
self.publish(self.kwargs)
self.publish(self.body)

self._thread = threading.Timer(interval=1, function=self._scheduler)
self._thread.start()
Expand Down
12 changes: 8 additions & 4 deletions src/onestep/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
将指定的函数放入线程中运行
"""
from typing import Dict

try:
from collections import Iterable
Expand All @@ -22,6 +23,7 @@


class WorkerThread(threading.Thread):
broker_exit: Dict[BaseBroker, bool] = {}

def __init__(self, onestep, broker: BaseBroker, *args, **kwargs):
"""
Expand All @@ -46,12 +48,12 @@ def run(self):
"""

while not self._shutdown:
# TODO:consume应当传入一些配置参数
if WorkerThread.broker_exit.get(self.broker, False):
self.shutdown()
break
for result in self.broker.consume():
if self._shutdown:
if self._shutdown or result is None:
break
if result is None:
continue
messages = (
result
if isinstance(result, Iterable)
Expand All @@ -70,13 +72,15 @@ def run(self):
logger.warning(f"{self.instance.name} dropped <{type(e).__name__}: {str(e)}>")
message.reject()
finally:
# When message is triggered by cancel_consume, it will be shutdown
if self.broker.cancel_consume and self.broker.cancel_consume(message):
self.shutdown()
else:
if self.broker.once:
self.shutdown()

def shutdown(self):
WorkerThread.broker_exit[self.broker] = True
self.broker.shutdown()
self._shutdown = True

Expand Down

0 comments on commit 3406163

Please sign in to comment.