Skip to content

Commit

Permalink
Feat cli (#26)
Browse files Browse the repository at this point in the history
* ✨ feat: cli
  • Loading branch information
mic1on authored Nov 25, 2023
1 parent bb86ac1 commit 5942f00
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 8 deletions.
8 changes: 8 additions & 0 deletions example/cli-demo/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from onestep import step, MemoryBroker, CronBroker

cron_broker = CronBroker("* * * * * */3", body="hi cron")


@step(from_broker=cron_broker)
def cron_job(message):
print(message)
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ packages = [
{ include = 'onestep', from = 'src' }
]

[tool.poetry.scripts]
onestep = "onestep.cli:main"


[tool.poetry.dependencies]
python = "^3.8"
asgiref = "^3.6.0"
Expand Down
6 changes: 5 additions & 1 deletion src/onestep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,9 @@
'DropMessage',
'RetryException',
'RetryInQueue',
'RetryInLocal'
'RetryInLocal',

'__version__'
]

__version__ = '0.3.5'
58 changes: 58 additions & 0 deletions src/onestep/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import argparse
import importlib
import logging
import sys
from onestep import step, __version__

LOGFORMAT = "[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s"


def setup_logging():
logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT, stream=sys.stdout)

# exclude amqpstorm logs
logging.getLogger("amqpstorm").setLevel(logging.CRITICAL)
return logging.getLogger("onestep")


logger = setup_logging()


def parse_args():
parser = argparse.ArgumentParser(
description='run onestep'
)
parser.add_argument(
"step",
help="the run step",
)
parser.add_argument(
"--group", "-G", default=None,
help="the run group",
type=str
)
parser.add_argument(
"--print",
action="store_true",
help="enable printing")
parser.add_argument(
"--path", "-P", default=".", nargs="*", type=str,
help="the step import path (default: current running directory)"
)
return parser.parse_args()


def main():
args = parse_args()
for path in args.path:
sys.path.insert(0, path)
logger.info(f"OneStep {__version__} is start up.")
try:
importlib.import_module(args.step)
step.start(group=args.group, block=True, print_jobs=args.print)
except KeyboardInterrupt:
step.shutdown()


if __name__ == '__main__':
sys.exit(main())
40 changes: 33 additions & 7 deletions src/onestep/onestep.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging

from inspect import isgenerator, iscoroutinefunction, isasyncgenfunction, isasyncgen
from itertools import groupby
from typing import Optional, List, Dict, Any, Callable, Union, Type

from .broker.base import BaseBroker
Expand All @@ -16,6 +17,7 @@

logger = logging.getLogger(__name__)

MAX_WORKERS = 20
DEFAULT_WORKERS = 1
DEFAULT_WORKER_CLASS = ThreadWorker

Expand All @@ -39,6 +41,9 @@ def __init__(self, fn,
self.name = name or fn.__name__
self.workers = workers or DEFAULT_WORKERS
self.worker_class = worker_class or DEFAULT_WORKER_CLASS
if self.workers > MAX_WORKERS:
logger.warning(f"workers[{self.workers}] litter than {MAX_WORKERS}")
self.workers = MAX_WORKERS
self.middlewares = middlewares or []

self.from_brokers = self._init_broker(from_broker)
Expand Down Expand Up @@ -84,15 +89,33 @@ def _find_consumers(cls, group: Optional[str] = None):
return consumers

@classmethod
def start(cls, group: Optional[str] = None):
logger.debug(f"start: {group=}")
for consumer in cls._find_consumers(group):
def print_jobs(cls, group):
print("Jobs:")
prints = []
_consumers = cls._find_consumers(group)
group_instance = groupby(_consumers, key=lambda x: x.instance)
for instance, _ in group_instance:
prints.append([instance.name, instance.group, instance.workers, str(instance.from_brokers)])
print("{:<15} {:<10} {:<10} {:<20}".format("Job", "Group", "Workers", "From Brokers"))
for v in prints:
print("{:<15} {:<10} {:<10} {:<20}".format(*v))

@classmethod
def start(cls, group: Optional[str] = None, print_jobs: bool = False):
logger.debug(f"start group [{group or 'all'}]")
_consumers = cls._find_consumers(group)
if not _consumers:
logger.debug(f"no consumer found in group [{group or 'all'}]")
return
if print_jobs:
cls.print_jobs(group)
for consumer in _consumers:
consumer.start()
logger.debug(f"started: {consumer=}")

@classmethod
def shutdown(cls, group: Optional[str] = None):
logger.debug(f"stop: {group=}")
logger.debug(f"stop group [{group or 'all'}]")
for consumer in cls._find_consumers(group):
consumer.shutdown()
logger.debug(f"stopped: {consumer=}")
Expand Down Expand Up @@ -145,7 +168,10 @@ def emit(self, signal, *args, **kwargs):
@classmethod
def is_shutdown(cls, group):
# check all broker
return all(broker._shutdown for broker in cls._find_consumers(group))
_consumers = cls._find_consumers(group)
if not _consumers:
return True
return all(broker._shutdown for broker in _consumers)


def decorator_func_proxy(func):
Expand Down Expand Up @@ -223,8 +249,8 @@ def __call__(self, func, *_args, **_kwargs):
return os.wraps(func)

@staticmethod
def start(group=None, block=None):
BaseOneStep.start(group=group)
def start(group=None, block=None, print_jobs=False):
BaseOneStep.start(group=group, print_jobs=print_jobs)
started.send()
if block:
import time
Expand Down
7 changes: 7 additions & 0 deletions src/onestep/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def __init__(self, onestep, broker: BaseBroker, *args, **kwargs):
self.kwargs = kwargs
self._shutdown = False

@property
def instance_name(self):
return self.instance.fn.__name__

def start(self):
"""启动 Worker"""
raise NotImplementedError
Expand Down Expand Up @@ -103,6 +107,9 @@ def handle_message(self, message: Message):
if self.broker.cancel_consume and self.broker.cancel_consume(message):
self.shutdown()

def __repr__(self):
return f"<{self.__class__.__name__} {self.instance.name}>"


class ThreadWorker(BaseWorker):

Expand Down

0 comments on commit 5942f00

Please sign in to comment.