diff --git a/crowdsourcing/migrations/0014_auto_20180109_2115.py b/crowdsourcing/migrations/0014_auto_20180109_2115.py new file mode 100644 index 000000000..5da3f3533 --- /dev/null +++ b/crowdsourcing/migrations/0014_auto_20180109_2115.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.3 on 2018-01-09 21:15 +from __future__ import unicode_literals + +import django.contrib.postgres.fields.jsonb +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('crowdsourcing', '0013_auto_20171212_0049'), + ] + + operations = [ + migrations.CreateModel( + name='WebHook', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('name', models.CharField(blank=True, max_length=128, null=True)), + ('url', models.CharField(max_length=256)), + ('content_type', models.CharField(default=b'application/json', max_length=64)), + ('event', models.CharField(db_index=True, max_length=16)), + ('object', models.CharField(blank=True, db_index=True, max_length=16, null=True)), + ('payload', django.contrib.postgres.fields.jsonb.JSONField(default={}, null=True)), + ('filters', django.contrib.postgres.fields.jsonb.JSONField(default={}, null=True)), + ('retry_count', models.SmallIntegerField(default=1)), + ('is_active', models.BooleanField(default=True)), + ('secret', models.CharField(blank=True, max_length=128, null=True)), + ], + ), + migrations.AlterIndexTogether( + name='webhook', + index_together=set([('event', 'object')]), + ), + ] diff --git a/crowdsourcing/migrations/0015_webhook_owner.py b/crowdsourcing/migrations/0015_webhook_owner.py new file mode 100644 index 000000000..e2255ffb9 --- /dev/null +++ b/crowdsourcing/migrations/0015_webhook_owner.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.3 on 2018-01-09 21:35 +from __future__ import unicode_literals + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('crowdsourcing', '0014_auto_20180109_2115'), + ] + + operations = [ + migrations.AddField( + model_name='webhook', + name='owner', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='web_hooks', to=settings.AUTH_USER_MODEL), + ), + ] diff --git a/crowdsourcing/models.py b/crowdsourcing/models.py index 29a8ef78e..8e483f814 100644 --- a/crowdsourcing/models.py +++ b/crowdsourcing/models.py @@ -406,7 +406,7 @@ def filter_by_boomerang(self, worker, sort_by='-boomerang'): ELSE worker_rating + 0.1 * worker_avg_rating END worker_rating FROM get_worker_ratings(%(worker_id)s)) worker_ratings ON worker_ratings.requester_id = ratings.owner_id - AND (worker_ratings.worker_rating >= ratings.min_rating or p.enable_boomerang is FALSE + AND (worker_ratings.worker_rating >= ratings.min_rating or p.enable_boomerang is FALSE or p.owner_id = %(worker_id)s) WHERE coalesce(p.deadline, NOW() + INTERVAL '1 minute') > NOW() AND p.status = 3 AND deleted_at IS NULL AND (requester.is_denied = FALSE OR p.enable_blacklist = FALSE) @@ -981,3 +981,50 @@ class WorkerBonus(TimeStampable): class ProjectPreview(TimeStampable): project = models.ForeignKey('Project') user = models.ForeignKey(User) + + +class WebHook(TimeStampable): + SPEC = [ + { + "event": "completed", + "object": "project", + "description": "Project picked", + "fields": ["project_id", "project_hash_id", "project_name"], + "is_active": True + }, + { + "event": "submitted", + "object": "assignment", + "description": "Assignment submitted", + "fields": ["assignment_id", "task_id", "project_id", "worker_handle"], + "is_active": True + }, + { + "event": "accepted", + "object": "assignment", + "description": "Assignment accepted", + "fields": ["assignment_id", "task_id", "project_id", "worker_handle"], + "is_active": False + }, + { + "event": "skipped", + "object": "assignment", + "description": "Assignment skipped", + "fields": ["assignment_id", "task_id", "project_id", "worker_handle"], + "is_active": False + }, + ] + name = models.CharField(max_length=128, null=True, blank=True) + url = models.CharField(max_length=256) + content_type = models.CharField(max_length=64, default='application/json') + event = models.CharField(max_length=16, db_index=True) + object = models.CharField(max_length=16, null=True, blank=True, db_index=True) + payload = JSONField(null=True, default={}) + filters = JSONField(null=True, default={}) + retry_count = models.SmallIntegerField(default=1) + is_active = models.BooleanField(default=True) + secret = models.CharField(max_length=128, null=True, blank=True) + owner = models.ForeignKey(User, related_name='web_hooks', null=True) + + class Meta: + index_together = (('event', 'object'),) diff --git a/crowdsourcing/serializers/webhooks.py b/crowdsourcing/serializers/webhooks.py new file mode 100644 index 000000000..76608175d --- /dev/null +++ b/crowdsourcing/serializers/webhooks.py @@ -0,0 +1,13 @@ +from crowdsourcing import models +from crowdsourcing.serializers.dynamic import DynamicFieldsModelSerializer + + +class WebHookSerializer(DynamicFieldsModelSerializer): + class Meta: + model = models.WebHook + fields = ('id', 'payload', 'retry_count', 'url', 'event', 'filters', + 'name', 'object', 'content_type', 'secret', 'is_active') + + def create(self, validated_data, owner=None): + hook = self.Meta.model.objects.create(owner=owner, **validated_data) + return hook diff --git a/crowdsourcing/tasks.py b/crowdsourcing/tasks.py index de95376ed..33baafad8 100644 --- a/crowdsourcing/tasks.py +++ b/crowdsourcing/tasks.py @@ -1,11 +1,13 @@ from __future__ import division import json +import time from collections import OrderedDict -from datetime import timedelta +from datetime import timedelta, datetime from decimal import Decimal, ROUND_UP import numpy as np +import requests from django.conf import settings from django.contrib.auth.models import User from django.db import connection, transaction @@ -624,10 +626,10 @@ def check_project_completed(project_id): cursor = connection.cursor() cursor.execute(query, params) remaining_count = cursor.fetchall()[0][0] if cursor.rowcount > 0 else 0 - print(remaining_count) if remaining_count == 0: with transaction.atomic(): project = models.Project.objects.select_for_update().get(id=project_id) + on_project_completed(project.id) if project.is_prototype: feedback = project.comments.all() if feedback.count() > 0 and feedback.filter(ready_for_launch=True).count() / feedback.count() < 0.66: @@ -733,3 +735,68 @@ def post_to_discourse(project_id): except Exception as e: print(e) print 'failed to update post' + + +@celery_app.task(ignore_result=True) +def on_assignment_submitted(pk): + return on_assignment_event(pk, "submitted") + + +@celery_app.task(ignore_result=True) +def on_assignment_skipped(pk): + return on_assignment_event(pk, "skipped") + + +@celery_app.task(ignore_result=True) +def on_assignment_accepted(pk): + return on_assignment_event(pk, "accepted") + + +def on_assignment_event(pk, event): + task_worker = models.TaskWorker.objects.prefetch_related('task__project').filter(id=pk).first() + if task_worker is not None: + object_name = "assignment" + hooks = task_worker.task.project.owner.web_hooks.filter(event=event, object=object_name, is_active=True) + data = { + "at": datetime.utcnow().isoformat(), + "worker_handle": task_worker.worker.profile.handle, + "assignment_id": task_worker.id, + "project_id": task_worker.task.project.id + } + for h in hooks: + post_webhook(hook=h, data=data, event=event, object_name=object_name) + + +@celery_app.task(ignore_result=True) +def on_project_completed(pk): + project = models.Project.objects.filter(id=pk).first() + if project is not None: + event = "completed" + object_name = "project" + hooks = project.owner.web_hooks.filter(event=event, object=object_name, is_active=True) + data = { + "at": datetime.utcnow().isoformat(), + "project_id": project.id, + "project_name": project.name + } + for h in hooks: + post_webhook(hook=h, data=data, event=event, object_name=object_name) + return 'SUCCESS' + + +def post_webhook(hook, data, event, object_name, attempt=1): + headers = { + "X-Daemo-Event": "{}.{}".format(object_name, event), + "Content-Type": "application/json" + } + try: + requests.post(url=hook.url, + data=json.dumps(data), + headers=headers) + return 'SUCCESS' + except Exception as e: + print(e) + if attempt < hook.retry_count: + time.sleep(1) + post_webhook(hook, data, event, object_name, attempt + 1) + return 'FAILURE' diff --git a/crowdsourcing/viewsets/task.py b/crowdsourcing/viewsets/task.py index b95965298..7428c6287 100644 --- a/crowdsourcing/viewsets/task.py +++ b/crowdsourcing/viewsets/task.py @@ -26,7 +26,8 @@ from crowdsourcing.permissions.util import IsSandbox from crowdsourcing.serializers.project import ProjectSerializer from crowdsourcing.serializers.task import * -from crowdsourcing.tasks import update_worker_cache, refund_task, send_return_notification_email +from crowdsourcing.tasks import update_worker_cache, refund_task, send_return_notification_email, \ + on_assignment_submitted from crowdsourcing.utils import get_model_or_none, hash_as_set, \ get_review_redis_message, hash_task from crowdsourcing.validators.project import validate_account_balance @@ -917,6 +918,7 @@ def submit_results(self, request, mock=False, *args, **kwargs): task_worker.submitted_at = timezone.now() task_worker.save() task_worker.sessions.all().filter(ended_at__isnull=True).update(ended_at=timezone.now()) + on_assignment_submitted.delay(task_worker.id) # check_project_completed.delay(project_id=task_worker.task.project_id) # #send_project_completed_email.delay(project_id=task_worker.task.project_id) if task_status == TaskWorker.STATUS_SUBMITTED: @@ -1057,6 +1059,7 @@ def post(self, request, *args, **kwargs): task_worker_result.result = request.data task_worker_result.save() update_worker_cache.delay([task_worker.worker_id], constants.TASK_SUBMITTED) + on_assignment_submitted.delay(task_worker.id) # check_project_completed.delay(project_id=task_worker.task.project_id) return Response(request.data, status=status.HTTP_200_OK) else: diff --git a/crowdsourcing/viewsets/webhooks.py b/crowdsourcing/viewsets/webhooks.py new file mode 100644 index 000000000..3fcb1e665 --- /dev/null +++ b/crowdsourcing/viewsets/webhooks.py @@ -0,0 +1,46 @@ +from django.db import transaction +from rest_framework import mixins, viewsets, status +from rest_framework.decorators import list_route +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from crowdsourcing.models import WebHook +from crowdsourcing.permissions.util import IsOwnerOrReadOnly +from crowdsourcing.serializers.webhooks import WebHookSerializer + + +class WebHookViewSet(mixins.CreateModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin, mixins.ListModelMixin, + mixins.RetrieveModelMixin, viewsets.GenericViewSet): + queryset = WebHook.objects.all() + serializer_class = WebHookSerializer + permission_classes = [IsAuthenticated, IsOwnerOrReadOnly] + + @list_route(methods=['get'], url_path='spec') + def spec(self, request, *args, **kwargs): + return Response([s for s in self.queryset.model.SPEC if s['is_active']]) + + def list(self, request, *args, **kwargs): + return Response(self.serializer_class(instance=request.user.web_hooks.all(), many=True).data) + + def create(self, request, *args, **kwargs): + serializer = self.serializer_class(data=request.data) + if serializer.is_valid(): + instance = serializer.create(serializer.validated_data, owner=request.user) + return Response({"id": instance.id}, status=status.HTTP_201_CREATED) + else: + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + def update(self, request, *args, **kwargs): + instance = self.get_object() + serializer = self.serializer_class(instance=instance, data=request.data, partial=True) + if serializer.is_valid(): + with transaction.atomic(): + serializer.update(instance, serializer.validated_data) + return Response({"id": instance.id}, status=status.HTTP_200_OK) + else: + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + def destroy(self, request, *args, **kwargs): + instance = self.get_object() + instance.delete() + return Response({}, status=status.HTTP_204_NO_CONTENT) diff --git a/csp/urls.py b/csp/urls.py index 46d9ed2c2..63558b285 100644 --- a/csp/urls.py +++ b/csp/urls.py @@ -18,10 +18,12 @@ from crowdsourcing.viewsets.template import TemplateViewSet, TemplateItemViewSet, TemplateItemPropertiesViewSet from crowdsourcing.viewsets.user import UserViewSet, UserProfileViewSet, UserPreferencesViewSet, CountryViewSet, \ CityViewSet +from crowdsourcing.viewsets.webhooks import WebHookViewSet from mturk import views as mturk_views from mturk.viewsets import MTurkAssignmentViewSet, MTurkConfig, MTurkAccountViewSet router = SimpleRouter(trailing_slash=True) +router.register(r'web-hooks', WebHookViewSet) router.register(r'projects', ProjectViewSet) router.register(r'tasks', TaskViewSet) router.register(r'assignments', TaskWorkerViewSet) diff --git a/static/django_templates/index.html b/static/django_templates/index.html index e22722b90..42bc10beb 100644 --- a/static/django_templates/index.html +++ b/static/django_templates/index.html @@ -1,204 +1,211 @@ {% load staticfiles compress %}
-Field Name | +Data Type | +Description | +
---|---|---|
name | +string | +A name for this web hook. | +
url | +string | +A URL to post the event payload to. + | +
event | +string | +Event to be subscribed to, allowed values are "submitted", "completed". + | +
object | +string | +The object/resource the event is occurring for, allowed values are "project", "assignment". + | +
is_active | +boolean | +Daemo will post these events as long as this field is set to true. + | +