Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INTERNAL] Move remaining celery tasks to tasks.py #139

Merged
merged 1 commit into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions promgen/management/commands/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

from django.conf import settings
from django.core.management.base import BaseCommand

from promgen import prometheus
from promgen import prometheus, tasks

logger = logging.getLogger(__name__)

Expand All @@ -26,7 +25,7 @@ def add_arguments(self, parser):

def handle(self, **kwargs):
if kwargs['out']:
prometheus.write_rules(
tasks.write_rules(
path=kwargs['out'],
reload=kwargs['reload'],
version=kwargs['version']
Expand Down
5 changes: 2 additions & 3 deletions promgen/management/commands/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import logging

from django.core.management.base import BaseCommand

from promgen import prometheus
from promgen import prometheus, tasks

logger = logging.getLogger(__name__)

Expand All @@ -22,6 +21,6 @@ def add_arguments(self, parser):

def handle(self, **kwargs):
if kwargs['out']:
prometheus.write_config(kwargs['out'], kwargs['reload'], kwargs['mode'])
tasks.write_config(kwargs['out'], kwargs['reload'], kwargs['mode'])
else:
self.stdout.write(prometheus.render_config())
6 changes: 2 additions & 4 deletions promgen/management/commands/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import logging

from django.core.management.base import BaseCommand

from promgen import models
from promgen import prometheus
from promgen import models, prometheus, tasks

logger = logging.getLogger(__name__)

Expand All @@ -23,6 +21,6 @@ def add_arguments(self, parser):
def handle(self, **kwargs):
prometheus.check_rules(models.Rule.objects.all())
if kwargs['out']:
prometheus.write_rules(kwargs['out'], kwargs['reload'])
tasks.write_rules(kwargs['out'], kwargs['reload'])
else:
self.stdout.write(prometheus.render_urls())
51 changes: 2 additions & 49 deletions promgen/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,21 @@
import datetime
import json
import logging
import os
import re
import subprocess
import tempfile
from urllib.parse import urljoin

import promgen.templatetags.promgen as macro
import pytz
import yaml
from atomicwrites import atomic_write
from dateutil import parser
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db.models import prefetch_related_objects
from django.template.loader import render_to_string
from django.utils import timezone
from django.core.exceptions import ValidationError
import promgen.templatetags.promgen as macro
from promgen import models, util
from promgen.celery import app as celery

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -127,18 +124,6 @@ def render_urls():
return json.dumps(data, indent=2, sort_keys=True)


@celery.task
def write_urls(path=None, reload=True, chmod=0o644):
if path is None:
path = settings.PROMGEN['url_writer']['path']
with atomic_write(path, overwrite=True) as fp:
# Set mode on our temporary file before we write and move it
os.chmod(fp.name, chmod)
fp.write(render_urls())
if reload:
reload_prometheus()


def render_config(service=None, project=None):
data = []
for exporter in models.Exporter.objects.\
Expand Down Expand Up @@ -180,38 +165,6 @@ def render_config(service=None, project=None):
return json.dumps(data, indent=2, sort_keys=True)


@celery.task
def write_config(path=None, reload=True, chmod=0o644):
if path is None:
path = settings.PROMGEN['config_writer']['path']
with atomic_write(path, overwrite=True) as fp:
# Set mode on our temporary file before we write and move it
os.chmod(fp.name, chmod)
fp.write(render_config())
if reload:
reload_prometheus()


@celery.task
def write_rules(path=None, reload=True, chmod=0o644, version=None):
if path is None:
path = settings.PROMGEN['prometheus']['rules']
with atomic_write(path, mode='wb', overwrite=True) as fp:
# Set mode on our temporary file before we write and move it
os.chmod(fp.name, chmod)
fp.write(render_rules(version=version))
if reload:
reload_prometheus()


@celery.task
def reload_prometheus():
from promgen.signals import post_reload
target = urljoin(settings.PROMGEN['prometheus']['url'], '/-/reload')
response = util.post(target)
post_reload.send(response)


def import_rules_v2(config, content_object=None):
'''
Loop through a dictionary and add rules to the database
Expand Down
10 changes: 4 additions & 6 deletions promgen/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from django.db.models.signals import (post_delete, post_save, pre_delete,
pre_save)
from django.dispatch import Signal, receiver
from promgen import models, prometheus
from promgen import models, prometheus, tasks

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,7 +63,7 @@ def _trigger_write_config(signal, **kwargs):
targets = [server.host for server in models.Prometheus.objects.all()]
for target in targets:
logger.info('Queueing write_config on %s', target)
prometheus.write_config.apply_async(queue=target)
tasks.write_config.apply_async(queue=target)
if 'request' in kwargs:
messages.info(kwargs['request'], 'Updating config on {}'.format(targets))
return True
Expand All @@ -74,7 +74,7 @@ def _trigger_write_rules(signal, **kwargs):
targets = [server.host for server in models.Prometheus.objects.all()]
for target in targets:
logger.info('Queueing write_rules on %s', target)
prometheus.write_rules.apply_async(queue=target)
tasks.write_rules.apply_async(queue=target)
if 'request' in kwargs:
messages.info(kwargs['request'], 'Updating rules on {}'.format(targets))
return True
Expand All @@ -85,7 +85,7 @@ def _trigger_write_urls(signal, **kwargs):
targets = [server.host for server in models.Prometheus.objects.all()]
for target in targets:
logger.info('Queueing write_urls on %s', target)
prometheus.write_urls.apply_async(queue=target)
tasks.write_urls.apply_async(queue=target)
if 'request' in kwargs:
messages.info(kwargs['request'], 'Updating urls on {}'.format(targets))
return True
Expand Down Expand Up @@ -278,7 +278,6 @@ def add_default_service_subscription(instance, created, **kwargs):
sender="promgen.notification.user",
value=instance.owner.username,
)
print(sender)


@receiver(post_save, sender=models.Project)
Expand All @@ -289,4 +288,3 @@ def add_default_project_subscription(instance, created, **kwargs):
sender="promgen.notification.user",
value=instance.owner.username,
)
print(sender)
77 changes: 62 additions & 15 deletions promgen/tasks.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,52 @@
# Copyright (c) 2017 LINE Corporation
# These sources are released under the terms of the MIT license: see LICENSE

import collections
import logging
from django.conf import settings
import os
from urllib.parse import urljoin

from atomicwrites import atomic_write
from celery import shared_task
from promgen import models, plugins, prometheus, signals # NOQA
from django.conf import settings
from promgen import models, plugins, prometheus, util

logger = logging.getLogger(__name__)


@shared_task
def process_alert(alert_pk):
'''
"""
Process alert for routing and notifications

We load our Alert from the database and expand it to determine which labels are routable

Next we loop through all senders configured and de-duplicate sender:target pairs before
queing the notification to actually be sent
'''
"""
alert = models.Alert.objects.get(pk=alert_pk)
routable, data = alert.expand()

# For any blacklisted label patterns, we delete them from the queue
# and consider it done (do not send any notification)
blacklist = settings.PROMGEN.get('alert_blacklist', {})
blacklist = settings.PROMGEN.get("alert_blacklist", {})
for key in blacklist:
logger.debug('Checking key %s', key)
if key in data['commonLabels']:
if data['commonLabels'][key] in blacklist[key]:
logger.debug('Blacklisted label %s', blacklist[key])
logger.debug("Checking key %s", key)
if key in data["commonLabels"]:
if data["commonLabels"][key] in blacklist[key]:
logger.debug("Blacklisted label %s", blacklist[key])
alert.delete()
return

# Now that we have our routable items, we want to check which senders are
# configured and expand those as needed
senders = collections.defaultdict(set)
for label, obj in routable.items():
logger.debug('Processing %s %s', label, obj)
logger.debug("Processing %s %s", label, obj)
for sender in models.Sender.objects.filter(obj=obj):
if sender.filtered(data):
logger.debug("Filtered out sender %s", sender)
continue
if hasattr(sender.driver, 'splay'):
if hasattr(sender.driver, "splay"):
for splay in sender.driver.splay(sender.value):
senders[splay.sender].add(splay.value)
else:
Expand All @@ -57,13 +59,58 @@ def process_alert(alert_pk):

@shared_task
def send_alert(sender, target, data, alert_pk=None):
'''
"""
Send alert to specific target

alert_pk is used for debugging purposes
'''
logger.debug('Sending %s %s', sender, target)
"""
logger.debug("Sending %s %s", sender, target)
for plugin in plugins.notifications():
if sender == plugin.module_name:
instance = plugin.load()()
instance._send(target, data)


@shared_task
def reload_prometheus():
from promgen import signals

target = urljoin(settings.PROMGEN["prometheus"]["url"], "/-/reload")
response = util.post(target)
signals.post_reload.send(response)


@shared_task
def write_urls(path=None, reload=True, chmod=0o644):
if path is None:
path = settings.PROMGEN["url_writer"]["path"]
with atomic_write(path, overwrite=True) as fp:
# Set mode on our temporary file before we write and move it
os.chmod(fp.name, chmod)
fp.write(prometheus.render_urls())
if reload:
reload_prometheus()


@shared_task
def write_config(path=None, reload=True, chmod=0o644):
if path is None:
path = settings.PROMGEN["config_writer"]["path"]
with atomic_write(path, overwrite=True) as fp:
# Set mode on our temporary file before we write and move it
os.chmod(fp.name, chmod)
fp.write(prometheus.render_config())
if reload:
reload_prometheus()


@shared_task
def write_rules(path=None, reload=True, chmod=0o644, version=None):
if path is None:
path = settings.PROMGEN["prometheus"]["rules"]
with atomic_write(path, mode="wb", overwrite=True) as fp:
# Set mode on our temporary file before we write and move it
os.chmod(fp.name, chmod)
fp.write(prometheus.render_rules(version=version))
if reload:
reload_prometheus()
4 changes: 2 additions & 2 deletions promgen/tests/test_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_alert(self):
@override_settings(PROMGEN=TEST_SETTINGS)
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
@mock.patch('promgen.signals._trigger_write_config')
@mock.patch('promgen.prometheus.reload_prometheus')
@mock.patch('promgen.tasks.reload_prometheus')
def test_import(self, mock_write, mock_reload):
self.user.user_permissions.add(
Permission.objects.get(codename='change_rule'),
Expand All @@ -55,7 +55,7 @@ def test_import(self, mock_write, mock_reload):
@override_settings(PROMGEN=TEST_SETTINGS)
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
@mock.patch('promgen.signals._trigger_write_config')
@mock.patch('promgen.prometheus.reload_prometheus')
@mock.patch('promgen.tasks.reload_prometheus')
def test_replace(self, mock_write, mock_reload):
# Set the required permissions
self.user.user_permissions.add(
Expand Down
2 changes: 1 addition & 1 deletion promgen/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ def get(self, request):
return HttpResponse(prometheus.render_urls(), content_type='application/json')

def post(self, request):
prometheus.write_urls()
tasks.write_urls()
return HttpResponse('OK', status=202)


Expand Down