diff --git a/lms/djangoapps/grades/signals/handlers.py b/lms/djangoapps/grades/signals/handlers.py index fa05873c45..74bd721acc 100644 --- a/lms/djangoapps/grades/signals/handlers.py +++ b/lms/djangoapps/grades/signals/handlers.py @@ -4,15 +4,14 @@ Grades related signals. from logging import getLogger -from courseware.model_data import get_score, set_score from django.dispatch import receiver -from openedx.core.lib.grade_utils import is_score_higher from submissions.models import score_set, score_reset -from util.date_utils import to_timestamp from courseware.model_data import get_score, set_score from eventtracking import tracker +from openedx.core.lib.grade_utils import is_score_higher from student.models import user_by_anonymous_id +from util.date_utils import to_timestamp from track.event_transaction_utils import ( get_event_transaction_type, get_event_transaction_id, diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py index c58647adc4..6a5a235839 100644 --- a/lms/djangoapps/grades/tasks.py +++ b/lms/djangoapps/grades/tasks.py @@ -11,6 +11,7 @@ from logging import getLogger from courseware.model_data import get_score from lms.djangoapps.course_blocks.api import get_course_blocks +from openedx.core.djangoapps.celery_utils.task import PersistOnFailureTask from opaque_keys.edx.keys import UsageKey from opaque_keys.edx.locator import CourseLocator from submissions import api as sub_api @@ -54,7 +55,7 @@ def recalculate_subsection_grade( ) -@task(default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY) +@task(base=PersistOnFailureTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY) def recalculate_subsection_grade_v2(**kwargs): """ Updates a saved subsection grade. diff --git a/lms/envs/common.py b/lms/envs/common.py index c826fd1b9f..cc2145e9e9 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -2153,6 +2153,10 @@ INSTALLED_APPS = ( # additional release utilities to ease automation 'release_util', + + # Customized celery tasks, including persisting failed tasks so they can + # be retried + 'openedx.core.djangoapps.celery_utils', ) # Migrations which are not in the standard module "migrations" diff --git a/openedx/core/djangoapps/celery_utils/__init__.py b/openedx/core/djangoapps/celery_utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openedx/core/djangoapps/celery_utils/migrations/0001_initial.py b/openedx/core/djangoapps/celery_utils/migrations/0001_initial.py new file mode 100644 index 0000000000..b85dcc1801 --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/migrations/0001_initial.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models +import django.utils.timezone +import jsonfield.fields +import model_utils.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='FailedTask', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, verbose_name='created', editable=False)), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, verbose_name='modified', editable=False)), + ('task_name', models.CharField(max_length=255)), + ('task_id', models.CharField(max_length=255, db_index=True)), + ('args', jsonfield.fields.JSONField(blank=True)), + ('kwargs', jsonfield.fields.JSONField(blank=True)), + ('exc', models.CharField(max_length=255)), + ('datetime_resolved', models.DateTimeField(default=None, null=True, db_index=True, blank=True)), + ], + ), + migrations.AlterIndexTogether( + name='failedtask', + index_together=set([('task_name', 'exc')]), + ), + ] diff --git a/openedx/core/djangoapps/celery_utils/migrations/__init__.py b/openedx/core/djangoapps/celery_utils/migrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openedx/core/djangoapps/celery_utils/models.py b/openedx/core/djangoapps/celery_utils/models.py new file mode 100644 index 0000000000..b96dd0aabc --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/models.py @@ -0,0 +1,32 @@ +""" +Models to support persistent tasks. +""" + +from django.db import models +from jsonfield import JSONField +from model_utils.models import TimeStampedModel + + +class FailedTask(TimeStampedModel): + """ + Representation of tasks that have failed + """ + task_name = models.CharField(max_length=255) + task_id = models.CharField(max_length=255, db_index=True) + args = JSONField(blank=True) + kwargs = JSONField(blank=True) + exc = models.CharField(max_length=255) + datetime_resolved = models.DateTimeField(blank=True, null=True, default=None, db_index=True) + + class Meta(object): + index_together = [ + (u'task_name', u'exc'), + ] + + def __unicode__(self): + return u'FailedTask: {task_name}, args={args}, kwargs={kwargs} ({resolution})'.format( + task_name=self.task_name, + args=self.args, + kwargs=self.kwargs, + resolution=u"not resolved" if self.datetime_resolved is None else "resolved" + ) diff --git a/openedx/core/djangoapps/celery_utils/task.py b/openedx/core/djangoapps/celery_utils/task.py new file mode 100644 index 0000000000..2170368db6 --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/task.py @@ -0,0 +1,44 @@ +""" +Celery utility code for persistent tasks. +""" + +from celery import Task + +from .models import FailedTask + + +# pylint: disable=abstract-method +class PersistOnFailureTask(Task): + """ + Custom Celery Task base class that persists task data on failure. + """ + + def on_failure(self, exc, task_id, args, kwargs, einfo): + """ + If the task fails, persist a record of the task. + """ + FailedTask.objects.create( + task_name=_truncate_to_field(FailedTask, 'task_name', self.name), + task_id=task_id, # Fixed length UUID: No need to truncate + args=args, + kwargs=kwargs, + exc=_truncate_to_field(FailedTask, 'exc', repr(exc)), + ) + super(PersistOnFailureTask, self).on_failure(exc, task_id, args, kwargs, einfo) + + +def _truncate_to_field(model, field_name, value): + """ + If data is too big for the field, it would cause a failure to + insert, so we shorten it, truncating in the middle (because + valuable information often shows up at the end. + """ + field = model._meta.get_field(field_name) # pylint: disable=protected-access + if len(value) > field.max_length: + midpoint = field.max_length // 2 + len_after_midpoint = field.max_length - midpoint + first = value[:midpoint] + sep = u'...' + last = value[len(value) - len_after_midpoint + len(sep):] + value = sep.join([first, last]) + return value diff --git a/openedx/core/djangoapps/celery_utils/tests/__init__.py b/openedx/core/djangoapps/celery_utils/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openedx/core/djangoapps/celery_utils/tests/test_task.py b/openedx/core/djangoapps/celery_utils/tests/test_task.py new file mode 100644 index 0000000000..a579f7480e --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/tests/test_task.py @@ -0,0 +1,84 @@ +u""" +Testing persistent tasks +""" + +from __future__ import print_function + +from celery import task +from django.conf import settings +from django.test import TestCase +from unittest import skipUnless +import six + +from ..models import FailedTask +from ..task import PersistOnFailureTask + + +class PersistOnFailureTaskTestCase(TestCase): + """ + Test that persistent tasks save the appropriate values when needed. + """ + + @classmethod + def setUpClass(cls): + @task(base=PersistOnFailureTask) + def exampletask(message=None): + u""" + A simple task for testing persistence + """ + if message: + raise ValueError(message) + return + cls.exampletask = exampletask + super(PersistOnFailureTaskTestCase, cls).setUpClass() + + + @skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') + def test_exampletask_without_failure(self): + result = self.exampletask.delay() + result.wait() + self.assertEqual(result.status, u'SUCCESS') + self.assertFalse(FailedTask.objects.exists()) + + @skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') + def test_exampletask_with_failure(self): + result = self.exampletask.delay(message=u'The example task failed') + with self.assertRaises(ValueError): + result.wait() + self.assertEqual(result.status, u'FAILURE') + failed_task_object = FailedTask.objects.get() + # Assert that we get the kind of data we expect + self.assertEqual( + failed_task_object.task_name, + u'openedx.core.djangoapps.celery_utils.tests.test_task.exampletask' + ) + self.assertEqual(failed_task_object.args, []) + self.assertEqual(failed_task_object.kwargs, {u'message': u'The example task failed'}) + self.assertEqual(failed_task_object.exc, u"ValueError(u'The example task failed',)") + self.assertIsNone(failed_task_object.datetime_resolved) + + @skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') + def test_persists_when_called_with_wrong_args(self): + result = self.exampletask.delay(15, u'2001-03-04', err=True) + with self.assertRaises(TypeError): + result.wait() + self.assertEqual(result.status, u'FAILURE') + failed_task_object = FailedTask.objects.get() + self.assertEqual(failed_task_object.args, [15, u'2001-03-04']) + self.assertEqual(failed_task_object.kwargs, {u'err': True}) + + @skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') + def test_persists_with_overlength_field(self): + overlong_message = u''.join(u'%03d' % x for x in six.moves.range(100)) + result = self.exampletask.delay(message=overlong_message) + with self.assertRaises(ValueError): + result.wait() + failed_task_object = FailedTask.objects.get() + # Length is max field length + self.assertEqual(len(failed_task_object.exc), 255) + # Ellipses are put in the middle + self.assertEqual(u'037...590', failed_task_object.exc[124:133]) + # The beginning of the input is captured + self.assertEqual(failed_task_object.exc[:11], u"ValueError(") + # The end of the input is captured + self.assertEqual(failed_task_object.exc[-9:], u"098099',)")