From 3f3bba5968eaa7c910e5e24e653e972112a0acd2 Mon Sep 17 00:00:00 2001 From: Andreas Kloeckner <inform@tiker.net> Date: Tue, 15 Sep 2015 04:32:18 -0500 Subject: [PATCH] Add support for using Celery for long-running operations --- course/flow.py | 34 +--- course/grades.py | 181 +++++--------------- course/tasks.py | 194 ++++++++++++++++++++++ course/templates/course/keypair.html | 2 - course/templates/course/task-monitor.html | 62 +++++++ course/views.py | 34 ++++ doc/misc.rst | 6 + impose-deadline | 28 ---- relate/__init__.py | 5 + relate/celery.py | 22 +++ relate/settings.py | 26 ++- relate/urls.py | 6 + requirements.txt | 4 + 13 files changed, 398 insertions(+), 206 deletions(-) create mode 100644 course/tasks.py create mode 100644 course/templates/course/task-monitor.html delete mode 100755 impose-deadline create mode 100644 relate/celery.py diff --git a/course/flow.py b/course/flow.py index 92ffd405..8457e09c 100644 --- a/course/flow.py +++ b/course/flow.py @@ -1578,18 +1578,6 @@ class RegradeFlowForm(StyledForm): Submit("regrade", _("Regrade"))) -@transaction.atomic -def _regrade_sessions(repo, course, sessions): - count = 0 - - from course.flow import regrade_session - for session in sessions: - regrade_session(repo, course, session) - count += 1 - - return count - - @course_view def regrade_not_for_credit_flows_view(pctx): if pctx.role != participation_role.instructor: @@ -1602,28 +1590,20 @@ def regrade_not_for_credit_flows_view(pctx): if request.method == "POST": form = RegradeFlowForm(flow_ids, request.POST, request.FILES) if form.is_valid(): - sessions = (FlowSession.objects - .filter( - course=pctx.course, - flow_id=form.cleaned_data["flow_id"])) - if form.cleaned_data["access_rules_tag"]: - sessions = sessions.filter( - access_rules_tag=form.cleaned_data["access_rules_tag"]) - inprog_value = { "any": None, "yes": True, "no": False, }[form.cleaned_data["regraded_session_in_progress"]] - if inprog_value is not None: - sessions = sessions.filter( - in_progress=inprog_value) - - count = _regrade_sessions(pctx.repo, pctx.course, sessions) + from course.tasks import regrade_flow_sessions + async_res = regrade_flow_sessions.delay( + pctx.course.id, + form.cleaned_data["flow_id"], + form.cleaned_data["access_rules_tag"], + inprog_value) - messages.add_message(request, messages.SUCCESS, - _("%d sessions regraded.") % count) + return redirect("relate-monitor_task", async_res.id) else: form = RegradeFlowForm(flow_ids) diff --git a/course/grades.py b/course/grades.py index bc4a0e7e..e77604bb 100644 --- a/course/grades.py +++ b/course/grades.py @@ -366,100 +366,6 @@ class ModifySessionsForm(StyledForm): Submit("recalculate", _("Recalculate grades of ended sessions"))) -@transaction.atomic -def expire_in_progress_sessions(repo, course, flow_id, rule_tag, now_datetime, - past_due_only, print_progress=False): - sessions = (FlowSession.objects - .filter( - course=course, - flow_id=flow_id, - participation__isnull=False, - access_rules_tag=rule_tag, - in_progress=True, - )) - - count = 0 - - from course.flow import expire_flow_session_standalone - - nsessions = sessions.count() - - for session in sessions: - if expire_flow_session_standalone(repo, course, session, now_datetime, - past_due_only=past_due_only): - count += 1 - - if print_progress: - print("%d/%d" % (count, nsessions)) - - return count - - -@transaction.atomic -def finish_in_progress_sessions(repo, course, flow_id, rule_tag, now_datetime, - past_due_only): - sessions = (FlowSession.objects - .filter( - course=course, - flow_id=flow_id, - participation__isnull=False, - access_rules_tag=rule_tag, - in_progress=True, - )) - - count = 0 - - from course.flow import finish_flow_session_standalone - for session in sessions: - if finish_flow_session_standalone(repo, course, session, - now_datetime=now_datetime, past_due_only=past_due_only): - count += 1 - - return count - - -@transaction.atomic -def regrade_ended_sessions(repo, course, flow_id, rule_tag): - sessions = (FlowSession.objects - .filter( - course=course, - flow_id=flow_id, - participation__isnull=False, - access_rules_tag=rule_tag, - in_progress=False, - )) - - count = 0 - - from course.flow import regrade_session - for session in sessions: - regrade_session(repo, course, session) - count += 1 - - return count - - -@transaction.atomic -def recalculate_ended_sessions(repo, course, flow_id, rule_tag): - sessions = (FlowSession.objects - .filter( - course=course, - flow_id=flow_id, - participation__isnull=False, - access_rules_tag=rule_tag, - in_progress=False, - )) - - count = 0 - - from course.flow import recalculate_session_grade - for session in sessions: - recalculate_session_grade(repo, course, session) - count += 1 - - return count - - RULE_TAG_NONE_STRING = "<<<NONE>>>" @@ -517,54 +423,45 @@ def view_grades_by_opportunity(pctx, opp_id): if rule_tag == RULE_TAG_NONE_STRING: rule_tag = None - try: - if op == "expire": - count = expire_in_progress_sessions( - pctx.repo, pctx.course, opportunity.flow_id, - rule_tag, now_datetime, - past_due_only=past_due_only) - - messages.add_message(pctx.request, messages.SUCCESS, - _("%d session(s) expired.") % count) - - elif op == "end": - count = finish_in_progress_sessions( - pctx.repo, pctx.course, opportunity.flow_id, - rule_tag, now_datetime, - past_due_only=past_due_only) - - messages.add_message(pctx.request, messages.SUCCESS, - _("%d session(s) ended.") % count) - - elif op == "regrade": - count = regrade_ended_sessions( - pctx.repo, pctx.course, opportunity.flow_id, - rule_tag) - - messages.add_message(pctx.request, messages.SUCCESS, - _("%d session(s) regraded.") % count) - - elif op == "recalculate": - count = recalculate_ended_sessions( - pctx.repo, pctx.course, opportunity.flow_id, - rule_tag) - - messages.add_message(pctx.request, messages.SUCCESS, - _("Grade recalculated for %d session(s).") - % count) - - else: - raise SuspiciousOperation("invalid operation") - except Exception as e: - messages.add_message(pctx.request, messages.ERROR, - string_concat( - pgettext_lazy("Starting of Error message", - "Error"), - ": %(err_type)s %(err_str)s") - % { - "err_type": type(e).__name__, - "err_str": str(e)}) - raise + + from course.tasks import ( + expire_in_progress_sessions, + finish_in_progress_sessions, + regrade_ended_sessions, + recalculate_ended_sessions) + + if op == "expire": + async_res = expire_in_progress_sessions.delay( + pctx.course.id, opportunity.flow_id, + rule_tag, now_datetime, + past_due_only=past_due_only) + + return redirect("relate-monitor_task", async_res.id) + + elif op == "end": + async_res = finish_in_progress_sessions.delay( + pctx.course.id, opportunity.flow_id, + rule_tag, now_datetime, + past_due_only=past_due_only) + + return redirect("relate-monitor_task", async_res.id) + + elif op == "regrade": + async_res = regrade_ended_sessions.delay( + pctx.course.id, opportunity.flow_id, + rule_tag) + + return redirect("relate-monitor_task", async_res.id) + + elif op == "recalculate": + async_res = recalculate_ended_sessions.delay( + pctx.course.id, opportunity.flow_id, + rule_tag) + + return redirect("relate-monitor_task", async_res.id) + + else: + raise SuspiciousOperation("invalid operation") else: batch_session_ops_form = ModifySessionsForm(session_rule_tags) diff --git a/course/tasks.py b/course/tasks.py new file mode 100644 index 00000000..5e0030f4 --- /dev/null +++ b/course/tasks.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- + +from __future__ import division, absolute_import + +__copyright__ = "Copyright (C) 2015 Andreas Kloeckner" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +from celery import shared_task + +from django.db import transaction + +from course.models import (Course, FlowSession) +from course.content import get_course_repo + + +@shared_task(bind=True) +@transaction.atomic +def expire_in_progress_sessions(self, course_id, flow_id, rule_tag, now_datetime, + past_due_only): + course = Course.objects.get(id=course_id) + repo = get_course_repo(course) + + sessions = (FlowSession.objects + .filter( + course=course, + flow_id=flow_id, + participation__isnull=False, + access_rules_tag=rule_tag, + in_progress=True, + )) + + count = 0 + nsessions = sessions.count() + + from course.flow import expire_flow_session_standalone + + for i, session in enumerate(sessions): + if expire_flow_session_standalone(repo, course, session, now_datetime, + past_due_only=past_due_only): + count += 1 + + self.update_state( + state='PROGRESS', + meta={'current': i, 'total': nsessions}) + + repo.close() + + +@shared_task(bind=True) +@transaction.atomic +def finish_in_progress_sessions(self, course_id, flow_id, rule_tag, now_datetime, + past_due_only): + course = Course.objects.get(id=course_id) + repo = get_course_repo(course) + + sessions = (FlowSession.objects + .filter( + course=course, + flow_id=flow_id, + participation__isnull=False, + access_rules_tag=rule_tag, + in_progress=True, + )) + + count = 0 + nsessions = sessions.count() + + from course.flow import finish_flow_session_standalone + for i, session in enumerate(sessions): + if finish_flow_session_standalone(repo, course, session, + now_datetime=now_datetime, past_due_only=past_due_only): + count += 1 + + self.update_state( + state='PROGRESS', + meta={'current': i, 'total': nsessions}) + + repo.close() + + return count + + +@shared_task(bind=True) +@transaction.atomic +def regrade_ended_sessions(self, course_id, flow_id, rule_tag): + course = Course.objects.get(id=course_id) + repo = get_course_repo(course) + + sessions = (FlowSession.objects + .filter( + course=course, + flow_id=flow_id, + participation__isnull=False, + access_rules_tag=rule_tag, + in_progress=False, + )) + + nsessions = sessions.count() + count = 0 + + from course.flow import regrade_session + for session in sessions: + regrade_session(repo, course, session) + count += 1 + + self.update_state( + state='PROGRESS', + meta={'current': count, 'total': nsessions}) + + repo.close() + + +@shared_task(bind=True) +@transaction.atomic +def recalculate_ended_sessions(self, course_id, flow_id, rule_tag): + course = Course.objects.get(id=course_id) + repo = get_course_repo(course) + + sessions = (FlowSession.objects + .filter( + course=course, + flow_id=flow_id, + participation__isnull=False, + access_rules_tag=rule_tag, + in_progress=False, + )) + + nsessions = sessions.count() + count = 0 + + from course.flow import recalculate_session_grade + for session in sessions: + recalculate_session_grade(repo, course, session) + count += 1 + + self.update_state( + state='PROGRESS', + meta={'current': count, 'total': nsessions}) + + repo.close() + + +@shared_task(bind=True) +@transaction.atomic +def regrade_flow_sessions(self, course_id, flow_id, access_rules_tag, inprog_value): + course = Course.objects.get(id=course_id) + repo = get_course_repo(course) + + sessions = (FlowSession.objects + .filter( + course=course, + flow_id=flow_id)) + + if access_rules_tag is not None: + sessions = sessions.filter(access_rules_tag=access_rules_tag) + + if inprog_value is not None: + sessions = sessions.filter(in_progress=inprog_value) + + nsessions = sessions.count() + count = 0 + + from course.flow import regrade_session + for session in sessions: + regrade_session(repo, course, session) + count += 1 + + self.update_state( + state='PROGRESS', + meta={'current': count, 'total': nsessions}) + + repo.close() + + +# vim: foldmethod=marker diff --git a/course/templates/course/keypair.html b/course/templates/course/keypair.html index 1e131521..33eee56b 100644 --- a/course/templates/course/keypair.html +++ b/course/templates/course/keypair.html @@ -1,8 +1,6 @@ {% extends "base.html" %} {% load i18n %} -{% load crispy_forms_tags %} - {% block content %} <h1>{% trans "SSH Key Pair" %}</h1> diff --git a/course/templates/course/task-monitor.html b/course/templates/course/task-monitor.html new file mode 100644 index 00000000..b8b3a33d --- /dev/null +++ b/course/templates/course/task-monitor.html @@ -0,0 +1,62 @@ +{% extends "base.html" %} +{% load i18n %} + +{% block header_extra %} + {% if state != "FAILURE" and state != "SUCCESS" %} + <meta http-equiv="refresh" content="2" > + {% endif %} +{% endblock %} + +{% block content %} + <h1> + {% trans "Task Progress" %} + </h1> + + <table class="table"> + <tr> + <th>{% trans "State" %}</th> + <td>{{ state }}</td> + </tr> + {% if progress_statement %} + <tr> + <th>{% trans "Progress" %}</th> + <td>{{ progress_statement }}</td> + </tr> + {% endif %} + </table> + + {% if progress_percent != None %} + <div class="progress"> + <div class="progress-bar" role="progressbar" + aria-valuenow="{{ progress_percentage }}" aria-valuemin="0" aria-valuemax="100" + style="width: {{ progress_percentage|stringformat:".9f" }}%;"> + {{ progress_percentage|floatformat:0 }}% + </div> + </div> + {% else %} + <div class="progress"> + <div class="progress-bar + {% if state == "FAILURE" %} + progress-bar-danger + {% elif state == "SUCCESS" %} + progress-bar-success + {% else %} + progress-bar-striped active + {% endif %}" + role="progressbar" + aria-valuenow="100" aria-valuemin="0" aria-valuemax="100" + style="width: 100%;"> + </div> + </div> + {% endif %} + + {% if traceback %} + {% blocktrans %} + The process failed and reported the following error: + {% endblocktrans %} + <pre>{{ traceback }}</pre> + {% endif %} + +{% endblock %} + + diff --git a/course/views.py b/course/views.py index d3900f92..01fe765f 100644 --- a/course/views.py +++ b/course/views.py @@ -1024,4 +1024,38 @@ def generate_ssh_keypair(request): # }}} +# {{{ celery task monitoring + +def monitor_task(request, task_id): + from celery.result import AsyncResult + async_res = AsyncResult(task_id) + + progress_percent = None + progress_statement = None + + if async_res.state == "PROGRESS": + meta = async_res.info + current = meta["current"] + total = meta["total"] + if total > 0: + progress_percent = current / total + + progress_statement = ( + _("%d out of %d items processed.") + % (current, total)) + + traceback = None + if request.user.is_staff and async_res.state == "FAILURE": + traceback = async_res.traceback + + return render(request, "course/task-monitor.html", { + "state": async_res.state, + "progress_percent": progress_percent, + "progress_statement": progress_statement, + "traceback": traceback, + }) + +# }}} + + # vim: foldmethod=marker diff --git a/doc/misc.rst b/doc/misc.rst index e0495fe9..7476c4d2 100644 --- a/doc/misc.rst +++ b/doc/misc.rst @@ -47,6 +47,12 @@ Open a browser to http://localhost:8000, sign in (your user name will be the same as your system user name, or whatever `whoami` returned above) and select "Set up new course". +As you play with the web interface, you may notice that some long-running tasks +just sit there: That is because RELATE relies on a task queue to process +those long-running tasks. Start a worker by running:: + + celery worker -A relate + Additional setup steps for Docker --------------------------------- diff --git a/impose-deadline b/impose-deadline deleted file mode 100755 index f345b08a..00000000 --- a/impose-deadline +++ /dev/null @@ -1,28 +0,0 @@ -#! /usr/bin/env python -import os -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "relate.settings") - -import django -django.setup() - -from course.models import Course -import sys - -course_identifier = sys.argv[1] -flow_id = sys.argv[2] -rule_tag = sys.argv[3] - -if rule_tag == "None": - rule_tag = None - -c = Course.objects.get(identifier=course_identifier) - -from course.grades import expire_in_progress_sessions -from course.content import get_course_repo - -repo = get_course_repo(c) - -from django.utils.timezone import now -now_datetime = now() -expire_in_progress_sessions(repo, c, flow_id, rule_tag, now_datetime, - past_due_only=True, print_progress=True) diff --git a/relate/__init__.py b/relate/__init__.py index e69de29b..d13e9513 100644 --- a/relate/__init__.py +++ b/relate/__init__.py @@ -0,0 +1,5 @@ +from __future__ import absolute_import + +# This will make sure the app is always imported when +# Django starts so that shared_task will use this app. +from .celery import app as celery_app # noqa diff --git a/relate/celery.py b/relate/celery.py new file mode 100644 index 00000000..7f37f345 --- /dev/null +++ b/relate/celery.py @@ -0,0 +1,22 @@ +from __future__ import absolute_import + +import os + +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'relate.settings') + +from django.conf import settings + +app = Celery('relate') + +# Using a string here means the worker will not have to +# pickle the object when using Windows. +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) + + +@app.task(bind=True) +def debug_task(self): + print('Request: {0!r}'.format(self.request)) diff --git a/relate/settings.py b/relate/settings.py index 83f23da0..bd06e97e 100644 --- a/relate/settings.py +++ b/relate/settings.py @@ -1,11 +1,7 @@ -""" -Django settings for relate project. - -For more information on this file, see -https://docs.djangoproject.com/en/dev/topics/settings/ +from __future__ import absolute_import -For the full list of settings and their values, see -https://docs.djangoproject.com/en/dev/ref/settings/ +""" +Django settings for RELATE. """ # Do not change this file. All these settings can be overridden in @@ -43,6 +39,10 @@ INSTALLED_APPS = ( "jsonfield", "bootstrap3_datetime", "djangobower", + + # message queue + "djcelery", + "kombu.transport.django" ) MIDDLEWARE_CLASSES = ( @@ -78,6 +78,18 @@ TEMPLATE_CONTEXT_PROCESSORS = ( + RELATE_EXTRA_CONTEXT_PROCESSORS ) +# {{{ celery config + +BROKER_URL = 'django://' + +CELERY_ACCEPT_CONTENT = ['pickle'] +CELERY_TASK_SERIALIZER = 'pickle' +CELERY_RESULT_SERIALIZER = 'pickle' +CELERY_TRACK_STARTED = True + +CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' + +# }}} # {{{ bower packages diff --git a/relate/urls.py b/relate/urls.py index 0c2340f3..cd9cb87b 100644 --- a/relate/urls.py +++ b/relate/urls.py @@ -78,6 +78,12 @@ urlpatterns = [ course.views.generate_ssh_keypair, name="relate-generate_ssh_keypair"), + url(r"^monitor-task" + "/(?P<task_id>[-0-9a-f]+)" + "$", + course.views.monitor_task, + name="relate-monitor_task"), + # {{{ troubleshooting url(r'^user/impersonate/$', diff --git a/requirements.txt b/requirements.txt index 60daf50a..1415e9d0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -63,3 +63,7 @@ ipaddress # For localized datetime format babel + +# A task queue, used to execute long-running tasks +celery +django-celery -- GitLab