Skip to content

Commit

Permalink
RQScheduler django admin integration (#587)
Browse files Browse the repository at this point in the history
* wip

* Allow all job ids that do not contain forward slash

* Tests

* WIP

* Admin link, try to support multiple redis connections

* Bring it all together

* Scheduler name

* Pull scheduler stats into the main queues view

* Undo unneeded changes

* Undo more unneeded changes

* cleanup

* cleanup

* Add a test for non-cron scheduled jobs and fix the bug that was somewhat present

* Missing imports

* Install rq-scheduler

* Pull get_scheduler_statistics out
  • Loading branch information
dmclain authored Jun 2, 2023
1 parent cc3da38 commit df32713
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install django==${{ matrix.django-version }}
pip install redis django-redis rq sentry-sdk
pip install redis django-redis rq sentry-sdk rq-scheduler
- name: Run Test
run: |
Expand Down
9 changes: 9 additions & 0 deletions django_rq/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ def get_queue_by_index(index):
config['name'], connection=get_redis_connection(config['connection_config']), is_async=config.get('ASYNC', True)
)

def get_scheduler_by_index(index):
"""
Returns an rq-scheduler Scheduler using parameters defined in ``QUEUES_LIST``
"""
from .settings import QUEUES_LIST

config = QUEUES_LIST[int(index)]
return get_scheduler(config['name'])


def filter_connection_params(queue_params):
"""
Expand Down
2 changes: 2 additions & 0 deletions django_rq/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

# All queues in list format so we can get them by index, includes failed queues
QUEUES_LIST = []
QUEUES_MAP = {}
for key, value in sorted(QUEUES.items(), key=itemgetter(0)):
QUEUES_LIST.append({'name': key, 'connection_config': value})
QUEUES_MAP[key] = len(QUEUES_LIST) - 1

# Get exception handlers
EXCEPTION_HANDLERS = getattr(settings, 'RQ_EXCEPTION_HANDLERS', [])
Expand Down
119 changes: 119 additions & 0 deletions django_rq/templates/django_rq/scheduler.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
{% extends "admin/base_site.html" %}

{% load static jquery_path django_rq %}

{% block title %}Scheduler Jobs in {{ scheduler.name }} {{ block.super }}{% endblock %}

{% block extrastyle %}
{{ block.super }}
<link rel="stylesheet" type="text/css" href="{% static "admin/css/changelists.css" %}">
{% endblock %}

{% block extrahead %}
{{ block.super }}
<script type="text/javascript" src="{% get_jquery_path as jquery_path %}{% static jquery_path %}"></script>
<script type="text/javascript" src="{% static "admin/js/jquery.init.js" %}"></script>
<script type="text/javascript" src="{% static "admin/js/actions.js" %}"></script>
<script type="text/javascript">
(function($) {
$(document).ready(function($) {
$("tr input.action-select").actions();
});
})(django.jQuery);
</script>
{% endblock %}


{% block breadcrumbs %}
<div class="breadcrumbs">
<a href="{% url 'admin:index' %}">Home</a> &rsaquo;
<a href="{% url 'rq_home' %}">Django RQ</a> &rsaquo;
</div>
{% endblock %}

{% block content_title %}<h1>Scheduler Managed Jobs</h1>{% endblock %}

{% block content %}

<div id="content-main">
<ul class="object-tools">
</ul>
<div class="module" id="changelist">
<form id="changelist-form" action="" method="post">
{% csrf_token %}
<div class="actions">
<label>Actions:
<select name="action">
<option value="" selected="selected">---------</option>
<option value="delete">Delete</option>
{% if job_status == 'Failed' %}
<option value="requeue">Requeue</option>
{% endif %}
</select>
</label>
<button type="submit" class="button" title="Execute selected action" name="index" value="0">Go</button>
</div>
<div class="results">
<table id="result_list">
<thead>
<tr>
<th scope="col" class="action-checkbox-column">
<div class="text">
<span><input type="checkbox" id="action-toggle" style="display: inline-block;"></span>
</div>
<div class="clear"></div>
</th>
<th><div class = 'text'><span>ID</span></div></th>
<th><div class = 'text'><span>Schedule</span></div></th>
<th><div class = 'text'><span>Next Run</span></div></th>
<th><div class = 'text'><span>Last Ended</span></div></th>
<th><div class = 'text'><span>Last Status</span></div></th>
<th><div class = 'text'><span>Callable</span></div></th>
</tr>
</thead>
<tbody>
{% for job in jobs %}
<tr class = "{% cycle 'row1' 'row2' %}">
<td class="action-checkbox">
<input class="action-select" name="_selected_action" type="checkbox" value="{{ job.id }}">
</td>
<td>
<a href = "{% url 'rq_job_detail' job.queue_index job.id %}">
{{ job.id }}
</a>
</td>
<td>{{ job.schedule }}</td>
<td>
{% if job.next_run %}
{{ job.next_run|to_localtime|date:"Y-m-d, H:i:s" }}
{% endif %}
</td>
<td>
{% if job.ended_at %}
{{ job.ended_at|to_localtime|date:"Y-m-d, H:i:s" }}
{% endif %}
</td>
<td>{{ job.get_status }}</td>
<td>{{ job|show_func_name }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<p class="paginator">
{% for p in page_range %}
{% if p == page %}
<span class="this-page">{{ p }}</span>
{% elif forloop.last %}
<a href="?page={{ p }}" class="end">{{ p }}</a>
{% else %}
<a href="?page={{ p }}">{{ p }}</a>
{% endif %}
{% endfor %}
{{ num_jobs }} jobs
</p>
</form>
</div>
</div>

{% endblock %}
20 changes: 20 additions & 0 deletions django_rq/templates/django_rq/stats.html
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@
<br />
<a href="{% url 'rq_home_json' %}">View as JSON</a>
</div>

{% if schedulers %}
<h2>RQ Scheduler</h2>
<table>
<thead>
<tr>
<th>Redis Connection</th>
<th>Recurring Jobs</th>
</tr>
</thead>
{% for connection, scheduler in schedulers.items %}
<tr class = "{% cycle 'row1' 'row2' %}">
<td><a href = "{% url 'rq_scheduler_jobs' scheduler.index %}">{{ connection }}</a></td>
<td><a href = "{% url 'rq_scheduler_jobs' scheduler.index %}">{{ scheduler.count }}</a></td>
</tr>
{% endfor %}
</table>
{% endif %}

</div>


{% endblock %}
55 changes: 54 additions & 1 deletion django_rq/tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from datetime import datetime
from datetime import datetime, timedelta, timezone
from unittest.mock import PropertyMock, patch


Expand All @@ -17,6 +17,7 @@
)

from django_rq import get_queue
from django_rq.queues import get_scheduler
from django_rq.workers import get_worker

from .fixtures import access_self, failing_job
Expand Down Expand Up @@ -395,3 +396,55 @@ def test_action_stop_jobs(self):

for job_id in job_ids:
self.assertIn(job_id, canceled_job_registry)

def test_scheduler_jobs(self):
# Override testing RQ_QUEUES
queues = [
{
"connection_config": {
"DB": 0,
"HOST": "localhost",
"PORT": 6379,
},
"name": "default",
}
]
with patch(
"django_rq.utils.QUEUES_LIST",
new_callable=PropertyMock(return_value=queues),
):
scheduler = get_scheduler("default")
scheduler_index = get_queue_index("default")

# Enqueue some jobs
cron_job = scheduler.cron("10 9 * * *", func=access_self, id="cron-job")
forever_job = scheduler.schedule(
scheduled_time=datetime.now() + timedelta(minutes=10),
interval=600,
func=access_self,
id="forever-repeat",
)
repeat_job = scheduler.schedule(
scheduled_time=datetime.now() + timedelta(minutes=30),
repeat=30,
func=access_self,
interval=600,
id="thirty-repeat",
)

response = self.client.get(
reverse("rq_scheduler_jobs", args=[scheduler_index])
)
self.assertEqual(response.context["num_jobs"], 3)
context_jobs = {job.id: job for job in response.context["jobs"]}
self.assertEqual(context_jobs["cron-job"].schedule, "cron: '10 9 * * *'")
self.assertEqual(context_jobs["forever-repeat"].schedule, "interval: 600")
self.assertEqual(
context_jobs["thirty-repeat"].schedule, "interval: 600 repeat: 30"
)

index_response = self.client.get(reverse("rq_home"))
self.assertEqual(
index_response.context["schedulers"],
{"localhost:6379/1": {"count": 3, "index": 0}},
)
1 change: 1 addition & 0 deletions django_rq/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
re_path(
r'^queues/(?P<queue_index>[\d]+)/(?P<job_id>[^/]+)/stop/$', views.stop_job, name='rq_stop_job'
),
re_path(r'^schedulers/(?P<scheduler_index>[\d]+)/$', views.scheduler_jobs, name='rq_scheduler_jobs'),
]
32 changes: 28 additions & 4 deletions django_rq/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from django.core.exceptions import ImproperlyConfigured
from rq.command import send_stop_job_command
from rq.job import Job
from rq.registry import (
DeferredJobRegistry,
Expand All @@ -7,15 +9,13 @@
StartedJobRegistry,
clean_registries,
)
from rq.command import send_stop_job_command
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry


from .queues import get_connection, get_queue_by_index, get_scheduler
from .settings import QUEUES_LIST
from .templatetags.django_rq import to_localtime
from django.core.exceptions import ImproperlyConfigured


def get_scheduler_pid(queue):
'''Checks whether there's a scheduler-lock on a particular queue, and returns the PID.
Expand All @@ -30,6 +30,7 @@ def get_scheduler_pid(queue):
return False # Not possible to give useful information without creating a performance issue (redis.keys())
except ImproperlyConfigured:
from rq.scheduler import RQScheduler

# When a scheduler acquires a lock it adds an expiring key: (e.g: rq:scheduler-lock:<queue.name>)
#TODO: (RQ>= 1.13) return queue.scheduler_pid
pid = queue.connection.get(RQScheduler.get_locking_key(queue.name))
Expand Down Expand Up @@ -89,9 +90,31 @@ def get_statistics(run_maintenance_tasks=False):
queue_data['scheduled_jobs'] = len(scheduled_job_registry)

queues.append(queue_data)

return {'queues': queues}


def get_scheduler_statistics():
schedulers = {}
for index, config in enumerate(QUEUES_LIST):
# there is only one scheduler per redis connection, so we use the connection as key
# to handle the possibility of a configuration with multiple redis connections and scheduled
# jobs in more than one of them
queue = get_queue_by_index(index)
connection_kwargs = queue.connection.connection_pool.connection_kwargs
conn_key = f"{connection_kwargs['host']}:{connection_kwargs['port']}/{connection_kwargs['db']}"
if conn_key not in schedulers:
try:
scheduler = get_scheduler(config['name'])
schedulers[conn_key] ={
'count': scheduler.count(),
'index': index,
}
except ImproperlyConfigured:
pass
return {'schedulers': schedulers}


def get_jobs(queue, job_ids, registry=None):
"""Fetch jobs in bulk from Redis.
1. If job data is not present in Redis, discard the result
Expand All @@ -108,6 +131,7 @@ def get_jobs(queue, job_ids, registry=None):

return valid_jobs


def stop_jobs(queue, job_ids):
job_ids = job_ids if isinstance(job_ids, (list, tuple)) else [job_ids]
stopped_job_ids = []
Expand All @@ -119,4 +143,4 @@ def stop_jobs(queue, job_ids):
failed_to_stop_job_ids.append(job_id)
continue
stopped_job_ids.append(job_id)
return stopped_job_ids, failed_to_stop_job_ids
return stopped_job_ids, failed_to_stop_job_ids
Loading

0 comments on commit df32713

Please sign in to comment.