Skip to content

Commit

Permalink
Allow one job queue per user (#421)
Browse files Browse the repository at this point in the history
* allow one job queue per user

* update config comments

* simplify

* lint

* update readme
  • Loading branch information
mmacata authored May 3, 2023
1 parent 48d59e4 commit 5235458
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
9 changes: 8 additions & 1 deletion redis_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ queue_type = redis
`cd ~/repos/actinia` + press F5
- Start Container for worker
```
docker-compose -f actinia-docker/docker-compose-dev-rq.yml run --rm --service-ports --entrypoint sh actinia-worker
MY_ACTINIA_DATA=$HOME/actinia
docker run --rm -it --entrypoint sh \
-v $HOME/repos/actinia/actinia-docker/actinia-dev/actinia.cfg:/etc/default/actinia \
-v $MY_ACTINIA_DATA/workspace:/actinia_core/workspace \
-v $MY_ACTINIA_DATA/resources:/actinia_core/resources \
-v $MY_ACTINIA_DATA/grassdb:/actinia_core/grassdb \
-v $MY_ACTINIA_DATA/grassdb_user:/actinia_core/userdata \
--network actinia-docker_actinia-dev mundialis/actinia:2.5.6
```
- inside container, start worker listening to specified queue
```
Expand Down
8 changes: 6 additions & 2 deletions src/actinia_core/core/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ def __init__(self):
# It will be extended by a numerical suffix that represents
# the worker id/number database to re-queue it, usually this is not
# necessary. If QUEUE_TYPE = per_job, it is extended by the
# resource_id of the job.
# resource_id of the job, if QUEUE_TYPE = per_user, it is extended
# by the user_id
self.WORKER_QUEUE_PREFIX = "job_queue"
# Type of queue.
# "local": Single queue for all jobs, processed by same actinia
Expand All @@ -396,8 +397,11 @@ def __init__(self):
# is ignored. Processed by different actinia instance
# (actinia worker). Resource_id will be added to above
# WORKER_QUEUE_PREFIX.
# "per_user": Separate queue for each user, config for NUMBER_OF_WORKERS
# is ignored. Processed by different actinia instance
# (actinia worker). User_id will be added to above
# WORKER_QUEUE_PREFIX.
# future ideas
# - redis separate queue per user
# - redis separate queue per process type
# - redis separate queue per resource consumption
self.QUEUE_TYPE = "local"
Expand Down
38 changes: 25 additions & 13 deletions src/actinia_core/core/common/redis_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,22 @@ def __create_job_queue(queue_name):
# Redis work queue and connection
global job_queues, redis_conn

host = global_config.REDIS_QUEUE_SERVER_URL
port = global_config.REDIS_QUEUE_SERVER_PORT
password = global_config.REDIS_QUEUE_SERVER_PASSWORD
if not any(q.name == queue_name for q in job_queues):
host = global_config.REDIS_QUEUE_SERVER_URL
port = global_config.REDIS_QUEUE_SERVER_PORT
password = global_config.REDIS_QUEUE_SERVER_PASSWORD

kwargs = dict()
kwargs["host"] = host
kwargs["port"] = port
if password and password is not None:
kwargs["password"] = password
redis_conn = Redis(**kwargs)
kwargs = dict()
kwargs["host"] = host
kwargs["port"] = port
if password and password is not None:
kwargs["password"] = password
redis_conn = Redis(**kwargs)

string = "Create queue %s with server %s:%s" % (queue_name, host, port)
log.info(string)
queue = rq.Queue(queue_name, connection=redis_conn)
job_queues.append(queue)
string = "Create queue %s with server %s:%s" % (queue_name, host, port)
log.info(string)
queue = rq.Queue(queue_name, connection=redis_conn)
job_queues.append(queue)


def __enqueue_job_redis(queue, timeout, func, *args):
Expand Down Expand Up @@ -140,6 +141,17 @@ def enqueue_job(timeout, func, *args, queue_type_overwrite=None):
args[0].set_queue_name(queue_name)
__enqueue_job_redis(i, timeout, func, *args)

elif queue_type == "per_user":
user_id = args[0].user_id
queue_name = "%s_%s" % (global_config.WORKER_QUEUE_PREFIX, user_id)
# Run __create_job_queue everytime.
# If queue already exists, it does nothing.
__create_job_queue(queue_name)
for i in job_queues:
if i.name == queue_name:
args[0].set_queue_name(queue_name)
__enqueue_job_redis(i, timeout, func, *args)

elif queue_type == "redis":
if job_queues == []:
for i in range(num_queues):
Expand Down
5 changes: 5 additions & 0 deletions src/actinia_core/rest/base/resource_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ def __init__(self, resource_id=None, iteration=None, post_url=None):
global_config.WORKER_QUEUE_PREFIX,
self.resource_id,
)
elif global_config.QUEUE_TYPE == "per_user":
self.queue = "%s_%s" % (
global_config.WORKER_QUEUE_PREFIX,
self.user_id,
)
elif global_config.QUEUE_TYPE == "redis":
self.queue = "%s_%s" % (global_config.WORKER_QUEUE_PREFIX, "count")
else:
Expand Down

0 comments on commit 5235458

Please sign in to comment.