Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

web hooks #1046

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions crowdsourcing/migrations/0014_auto_20180109_2115.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.3 on 2018-01-09 21:15
from __future__ import unicode_literals

import django.contrib.postgres.fields.jsonb
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('crowdsourcing', '0013_auto_20171212_0049'),
]

operations = [
migrations.CreateModel(
name='WebHook',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('name', models.CharField(blank=True, max_length=128, null=True)),
('url', models.CharField(max_length=256)),
('content_type', models.CharField(default=b'application/json', max_length=64)),
('event', models.CharField(db_index=True, max_length=16)),
('object', models.CharField(blank=True, db_index=True, max_length=16, null=True)),
('payload', django.contrib.postgres.fields.jsonb.JSONField(default={}, null=True)),
('filters', django.contrib.postgres.fields.jsonb.JSONField(default={}, null=True)),
('retry_count', models.SmallIntegerField(default=1)),
('is_active', models.BooleanField(default=True)),
('secret', models.CharField(blank=True, max_length=128, null=True)),
],
),
migrations.AlterIndexTogether(
name='webhook',
index_together=set([('event', 'object')]),
),
]
23 changes: 23 additions & 0 deletions crowdsourcing/migrations/0015_webhook_owner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.3 on 2018-01-09 21:35
from __future__ import unicode_literals

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('crowdsourcing', '0014_auto_20180109_2115'),
]

operations = [
migrations.AddField(
model_name='webhook',
name='owner',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='web_hooks', to=settings.AUTH_USER_MODEL),
),
]
49 changes: 48 additions & 1 deletion crowdsourcing/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def filter_by_boomerang(self, worker, sort_by='-boomerang'):
ELSE worker_rating + 0.1 * worker_avg_rating END worker_rating
FROM get_worker_ratings(%(worker_id)s)) worker_ratings
ON worker_ratings.requester_id = ratings.owner_id
AND (worker_ratings.worker_rating >= ratings.min_rating or p.enable_boomerang is FALSE
AND (worker_ratings.worker_rating >= ratings.min_rating or p.enable_boomerang is FALSE
or p.owner_id = %(worker_id)s)
WHERE coalesce(p.deadline, NOW() + INTERVAL '1 minute') > NOW() AND p.status = 3 AND deleted_at IS NULL
AND (requester.is_denied = FALSE OR p.enable_blacklist = FALSE)
Expand Down Expand Up @@ -981,3 +981,50 @@ class WorkerBonus(TimeStampable):
class ProjectPreview(TimeStampable):
project = models.ForeignKey('Project')
user = models.ForeignKey(User)


class WebHook(TimeStampable):
SPEC = [
{
"event": "completed",
"object": "project",
"description": "Project picked",
"fields": ["project_id", "project_hash_id", "project_name"],
"is_active": True
},
{
"event": "submitted",
"object": "assignment",
"description": "Assignment submitted",
"fields": ["assignment_id", "task_id", "project_id", "worker_handle"],
"is_active": True
},
{
"event": "accepted",
"object": "assignment",
"description": "Assignment accepted",
"fields": ["assignment_id", "task_id", "project_id", "worker_handle"],
"is_active": False
},
{
"event": "skipped",
"object": "assignment",
"description": "Assignment skipped",
"fields": ["assignment_id", "task_id", "project_id", "worker_handle"],
"is_active": False
},
]
name = models.CharField(max_length=128, null=True, blank=True)
url = models.CharField(max_length=256)
content_type = models.CharField(max_length=64, default='application/json')
event = models.CharField(max_length=16, db_index=True)
object = models.CharField(max_length=16, null=True, blank=True, db_index=True)
payload = JSONField(null=True, default={})
filters = JSONField(null=True, default={})
retry_count = models.SmallIntegerField(default=1)
is_active = models.BooleanField(default=True)
secret = models.CharField(max_length=128, null=True, blank=True)
owner = models.ForeignKey(User, related_name='web_hooks', null=True)

class Meta:
index_together = (('event', 'object'),)
13 changes: 13 additions & 0 deletions crowdsourcing/serializers/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from crowdsourcing import models
from crowdsourcing.serializers.dynamic import DynamicFieldsModelSerializer


class WebHookSerializer(DynamicFieldsModelSerializer):
class Meta:
model = models.WebHook
fields = ('id', 'payload', 'retry_count', 'url', 'event', 'filters',
'name', 'object', 'content_type', 'secret', 'is_active')

def create(self, validated_data, owner=None):
hook = self.Meta.model.objects.create(owner=owner, **validated_data)
return hook
71 changes: 69 additions & 2 deletions crowdsourcing/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import division

import json
import time
from collections import OrderedDict
from datetime import timedelta
from datetime import timedelta, datetime
from decimal import Decimal, ROUND_UP

import numpy as np
import requests
from django.conf import settings
from django.contrib.auth.models import User
from django.db import connection, transaction
Expand Down Expand Up @@ -624,10 +626,10 @@ def check_project_completed(project_id):
cursor = connection.cursor()
cursor.execute(query, params)
remaining_count = cursor.fetchall()[0][0] if cursor.rowcount > 0 else 0
print(remaining_count)
if remaining_count == 0:
with transaction.atomic():
project = models.Project.objects.select_for_update().get(id=project_id)
on_project_completed(project.id)
if project.is_prototype:
feedback = project.comments.all()
if feedback.count() > 0 and feedback.filter(ready_for_launch=True).count() / feedback.count() < 0.66:
Expand Down Expand Up @@ -733,3 +735,68 @@ def post_to_discourse(project_id):
except Exception as e:
print(e)
print 'failed to update post'


@celery_app.task(ignore_result=True)
def on_assignment_submitted(pk):
return on_assignment_event(pk, "submitted")


@celery_app.task(ignore_result=True)
def on_assignment_skipped(pk):
return on_assignment_event(pk, "skipped")


@celery_app.task(ignore_result=True)
def on_assignment_accepted(pk):
return on_assignment_event(pk, "accepted")


def on_assignment_event(pk, event):
task_worker = models.TaskWorker.objects.prefetch_related('task__project').filter(id=pk).first()
if task_worker is not None:
object_name = "assignment"
hooks = task_worker.task.project.owner.web_hooks.filter(event=event, object=object_name, is_active=True)
data = {
"at": datetime.utcnow().isoformat(),
"worker_handle": task_worker.worker.profile.handle,
"assignment_id": task_worker.id,
"project_id": task_worker.task.project.id
}
for h in hooks:
post_webhook(hook=h, data=data, event=event, object_name=object_name)


@celery_app.task(ignore_result=True)
def on_project_completed(pk):
project = models.Project.objects.filter(id=pk).first()
if project is not None:
event = "completed"
object_name = "project"
hooks = project.owner.web_hooks.filter(event=event, object=object_name, is_active=True)
data = {
"at": datetime.utcnow().isoformat(),
"project_id": project.id,
"project_name": project.name
}
for h in hooks:
post_webhook(hook=h, data=data, event=event, object_name=object_name)
return 'SUCCESS'


def post_webhook(hook, data, event, object_name, attempt=1):
headers = {
"X-Daemo-Event": "{}.{}".format(object_name, event),
"Content-Type": "application/json"
}
try:
requests.post(url=hook.url,
data=json.dumps(data),
headers=headers)
return 'SUCCESS'
except Exception as e:
print(e)
if attempt < hook.retry_count:
time.sleep(1)
post_webhook(hook, data, event, object_name, attempt + 1)
return 'FAILURE'
5 changes: 4 additions & 1 deletion crowdsourcing/viewsets/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from crowdsourcing.permissions.util import IsSandbox
from crowdsourcing.serializers.project import ProjectSerializer
from crowdsourcing.serializers.task import *
from crowdsourcing.tasks import update_worker_cache, refund_task, send_return_notification_email
from crowdsourcing.tasks import update_worker_cache, refund_task, send_return_notification_email, \
on_assignment_submitted
from crowdsourcing.utils import get_model_or_none, hash_as_set, \
get_review_redis_message, hash_task
from crowdsourcing.validators.project import validate_account_balance
Expand Down Expand Up @@ -917,6 +918,7 @@ def submit_results(self, request, mock=False, *args, **kwargs):
task_worker.submitted_at = timezone.now()
task_worker.save()
task_worker.sessions.all().filter(ended_at__isnull=True).update(ended_at=timezone.now())
on_assignment_submitted.delay(task_worker.id)
# check_project_completed.delay(project_id=task_worker.task.project_id)
# #send_project_completed_email.delay(project_id=task_worker.task.project_id)
if task_status == TaskWorker.STATUS_SUBMITTED:
Expand Down Expand Up @@ -1057,6 +1059,7 @@ def post(self, request, *args, **kwargs):
task_worker_result.result = request.data
task_worker_result.save()
update_worker_cache.delay([task_worker.worker_id], constants.TASK_SUBMITTED)
on_assignment_submitted.delay(task_worker.id)
# check_project_completed.delay(project_id=task_worker.task.project_id)
return Response(request.data, status=status.HTTP_200_OK)
else:
Expand Down
46 changes: 46 additions & 0 deletions crowdsourcing/viewsets/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from django.db import transaction
from rest_framework import mixins, viewsets, status
from rest_framework.decorators import list_route
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from crowdsourcing.models import WebHook
from crowdsourcing.permissions.util import IsOwnerOrReadOnly
from crowdsourcing.serializers.webhooks import WebHookSerializer


class WebHookViewSet(mixins.CreateModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin, mixins.ListModelMixin,
mixins.RetrieveModelMixin, viewsets.GenericViewSet):
queryset = WebHook.objects.all()
serializer_class = WebHookSerializer
permission_classes = [IsAuthenticated, IsOwnerOrReadOnly]

@list_route(methods=['get'], url_path='spec')
def spec(self, request, *args, **kwargs):
return Response([s for s in self.queryset.model.SPEC if s['is_active']])

def list(self, request, *args, **kwargs):
return Response(self.serializer_class(instance=request.user.web_hooks.all(), many=True).data)

def create(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data)
if serializer.is_valid():
instance = serializer.create(serializer.validated_data, owner=request.user)
return Response({"id": instance.id}, status=status.HTTP_201_CREATED)
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.serializer_class(instance=instance, data=request.data, partial=True)
if serializer.is_valid():
with transaction.atomic():
serializer.update(instance, serializer.validated_data)
return Response({"id": instance.id}, status=status.HTTP_200_OK)
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def destroy(self, request, *args, **kwargs):
instance = self.get_object()
instance.delete()
return Response({}, status=status.HTTP_204_NO_CONTENT)
2 changes: 2 additions & 0 deletions csp/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
from crowdsourcing.viewsets.template import TemplateViewSet, TemplateItemViewSet, TemplateItemPropertiesViewSet
from crowdsourcing.viewsets.user import UserViewSet, UserProfileViewSet, UserPreferencesViewSet, CountryViewSet, \
CityViewSet
from crowdsourcing.viewsets.webhooks import WebHookViewSet
from mturk import views as mturk_views
from mturk.viewsets import MTurkAssignmentViewSet, MTurkConfig, MTurkAccountViewSet

router = SimpleRouter(trailing_slash=True)
router.register(r'web-hooks', WebHookViewSet)
router.register(r'projects', ProjectViewSet)
router.register(r'tasks', TaskViewSet)
router.register(r'assignments', TaskWorkerViewSet)
Expand Down
Loading