diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py index 20716ddca2..b6325af654 100644 --- a/lms/djangoapps/grades/tasks.py +++ b/lms/djangoapps/grades/tasks.py @@ -9,11 +9,12 @@ from django.contrib.auth.models import User from django.core.exceptions import ValidationError from django.db.utils import DatabaseError from logging import getLogger +import newrelic.agent +from celery_utils.logged_task import LoggedTask +from celery_utils.persist_on_failure import PersistOnFailureTask from courseware.model_data import get_score from lms.djangoapps.course_blocks.api import get_course_blocks -import newrelic.agent -from openedx.core.djangoapps.celery_utils.persist_on_failure import PersistOnFailureTask from opaque_keys.edx.keys import UsageKey from opaque_keys.edx.locator import CourseLocator from submissions import api as sub_api @@ -36,7 +37,14 @@ KNOWN_RETRY_ERRORS = (DatabaseError, ValidationError) # Errors we expect occasi RECALCULATE_GRADE_DELAY = 2 # in seconds, to prevent excessive _has_db_updated failures. See TNL-6424. -@task(bind=True, base=PersistOnFailureTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY) +class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-method + """ + Include persistence features, as well as logging of task invocation. + """ + abstract = True + + +@task(bind=True, base=_BaseTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY) def recalculate_subsection_grade_v3(self, **kwargs): """ Latest version of the recalculate_subsection_grade task. See docstring diff --git a/lms/envs/common.py b/lms/envs/common.py index 1f0eb7caa6..d220de7e2d 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -2161,7 +2161,7 @@ INSTALLED_APPS = ( # Customized celery tasks, including persisting failed tasks so they can # be retried - 'openedx.core.djangoapps.celery_utils', + 'celery_utils', # Ability to detect and special-case crawler behavior 'openedx.core.djangoapps.crawlers', diff --git a/openedx/core/djangoapps/celery_utils/__init__.py b/openedx/core/djangoapps/celery_utils/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/openedx/core/djangoapps/celery_utils/management/__init__.py b/openedx/core/djangoapps/celery_utils/management/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/openedx/core/djangoapps/celery_utils/management/commands/__init__.py b/openedx/core/djangoapps/celery_utils/management/commands/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/openedx/core/djangoapps/celery_utils/management/commands/cleanup_resolved_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/cleanup_resolved_tasks.py deleted file mode 100644 index e67c0b319e..0000000000 --- a/openedx/core/djangoapps/celery_utils/management/commands/cleanup_resolved_tasks.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Reset persistent grades for learners. -""" -from datetime import timedelta -import logging -from textwrap import dedent - -from django.core.management.base import BaseCommand -from django.utils.timezone import now - -from ...models import FailedTask - - -log = logging.getLogger(__name__) - - -class Command(BaseCommand): - """ - Delete records of FailedTasks that have been resolved - """ - help = dedent(__doc__).strip() - - def add_arguments(self, parser): - """ - Add arguments to the command parser. - - Uses argparse syntax. See documentation at - https://docs.python.org/3/library/argparse.html. - """ - parser.add_argument( - '--dry-run', - action='store_true', - default=False, - help="Output what we're going to do, but don't actually do it." - ) - parser.add_argument( - '--task-name', '-t', - default=None, - help=u"Restrict cleanup to tasks matching the given task-name.", - ) - parser.add_argument( - '--age', '-a', - type=int, - default=30, - help=u"Only delete tasks that have been resolved for at least the specified number of days", - ) - - def handle(self, *args, **options): - tasks = FailedTask.objects.filter(datetime_resolved__lt=now() - timedelta(days=options['age'])) - if options['task_name'] is not None: - tasks = tasks.filter(task_name=options['task_name']) - log.info(u'Cleaning up {} tasks'.format(tasks.count())) - if options['dry_run']: - log.info(u"Tasks to clean up:\n{}".format( - u'\n '.join(u'{!r}, resolved {}'.format(task, task.datetime_resolved) for task in tasks) - )) - else: - tasks.delete() diff --git a/openedx/core/djangoapps/celery_utils/management/commands/reapply_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/reapply_tasks.py deleted file mode 100644 index 407027517b..0000000000 --- a/openedx/core/djangoapps/celery_utils/management/commands/reapply_tasks.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Reset persistent grades for learners. -""" -import logging -from textwrap import dedent - -from django.core.management.base import BaseCommand - -from ...models import FailedTask - -log = logging.getLogger(__name__) - - -class Command(BaseCommand): - """ - Reapply tasks that failed previously. - """ - help = dedent(__doc__).strip() - - def add_arguments(self, parser): - """ - Add arguments to the command parser. - - Uses argparse syntax. See documentation at - https://docs.python.org/3/library/argparse.html. - """ - parser.add_argument( - '--task-name', '-t', - action='store', - default=None, - help=u"Restrict reapplied tasks to those matching the given task-name." - ) - - def handle(self, *args, **options): - tasks = FailedTask.objects.filter(datetime_resolved=None) - if options['task_name'] is not None: - tasks = tasks.filter(task_name=options['task_name']) - log.info(u'Reapplying {} tasks'.format(tasks.count())) - log.debug(u'Reapplied tasks: {}'.format(list(tasks))) - seen_tasks = set() - for task in tasks: - if task.task_id in seen_tasks: - continue - seen_tasks.add(task.task_id) - task.reapply() diff --git a/openedx/core/djangoapps/celery_utils/management/commands/tests/__init__.py b/openedx/core/djangoapps/celery_utils/management/commands/tests/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/openedx/core/djangoapps/celery_utils/management/commands/tests/test_cleanup_resolved_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/tests/test_cleanup_resolved_tasks.py deleted file mode 100644 index ac8e6792de..0000000000 --- a/openedx/core/djangoapps/celery_utils/management/commands/tests/test_cleanup_resolved_tasks.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -Test management command to cleanup resolved tasks. -""" - -from datetime import timedelta - -import ddt -from django.test import TestCase -from django.core.management import call_command -from django.utils.timezone import now - -from openedx.core.djangolib.testing.utils import skip_unless_lms - -from .... import models - -DAY = timedelta(days=1) -MONTH_AGO = now() - (30 * DAY) - - -@ddt.ddt -@skip_unless_lms -class TestCleanupResolvedTasksCommand(TestCase): - """ - Test cleanup_resolved_tasks management command. - """ - - def setUp(self): - self.failed_tasks = [ - models.FailedTask.objects.create( - task_name=u'task', - datetime_resolved=MONTH_AGO - DAY, - task_id=u'old', - ), - models.FailedTask.objects.create( - task_name=u'task', - datetime_resolved=MONTH_AGO + DAY, - task_id=u'new', - ), - models.FailedTask.objects.create( - task_name=u'task', - datetime_resolved=None, - task_id=u'unresolved', - ), - models.FailedTask.objects.create( - task_name=u'other', - datetime_resolved=MONTH_AGO - DAY, - task_id=u'other', - ), - ] - super(TestCleanupResolvedTasksCommand, self).setUp() - - @ddt.data( - ([], {u'new', u'unresolved'}), - ([u'--task-name=task'], {u'new', u'unresolved', u'other'}), - ([u'--age=0'], {u'unresolved'}), - ([u'--age=0', u'--task-name=task'], {u'unresolved', u'other'}), - ([u'--dry-run'], {u'old', u'new', u'unresolved', u'other'}), - ) - @ddt.unpack - def test_call_command(self, args, remaining_task_ids): - call_command(u'cleanup_resolved_tasks', *args) - results = set(models.FailedTask.objects.values_list('task_id', flat=True)) - self.assertEqual(remaining_task_ids, results) diff --git a/openedx/core/djangoapps/celery_utils/management/commands/tests/test_reapply_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/tests/test_reapply_tasks.py deleted file mode 100644 index 8947586c9a..0000000000 --- a/openedx/core/djangoapps/celery_utils/management/commands/tests/test_reapply_tasks.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Test management command to reapply failed tasks. -""" -from collections import Counter -from datetime import datetime - -import celery -from django.test import TestCase -from django.core.management import call_command -import mock - -from openedx.core.djangolib.testing.utils import skip_unless_lms - -from .... import models, persist_on_failure - - -@skip_unless_lms -class TestReapplyTaskCommand(TestCase): - """ - Test reapply_task management command. - """ - - fallible_task_name = ( - u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.fallible_task' - ) - passing_task_name = u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.other_task' - - @classmethod - def setUpClass(cls): - @celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.fallible_task_name) - def fallible_task(error_message=None): - """ - Simple task to let us test retry functionality. - """ - if error_message: - raise ValueError(error_message) - - cls.fallible_task = fallible_task - - @celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.passing_task_name) - def passing_task(): - """ - This task always passes - """ - return 5 - cls.passing_task = passing_task - super(TestReapplyTaskCommand, cls).setUpClass() - - def setUp(self): - self.failed_tasks = [ - models.FailedTask.objects.create( - task_name=self.fallible_task_name, - task_id=u'fail_again', - args=[], - kwargs={"error_message": "Err, yo!"}, - exc=u'UhOhError().', - ), - models.FailedTask.objects.create( - task_name=self.fallible_task_name, - task_id=u'will_succeed', - args=[], - kwargs={}, - exc=u'NetworkErrorMaybe?()', - ), - models.FailedTask.objects.create( - task_name=self.passing_task_name, - task_id=u'other_task', - args=[], - kwargs={}, - exc=u'RaceCondition()', - ), - ] - super(TestReapplyTaskCommand, self).setUp() - - def _assert_resolved(self, task_object): - """ - Raises an assertion error if the task failed to complete successfully - and record its resolution in the failedtask record. - """ - self.assertIsInstance(task_object.datetime_resolved, datetime) - - def _assert_unresolved(self, task_object): - """ - Raises an assertion error if the task completed successfully. - The resolved_datetime will still be None. - """ - self.assertIsNone(task_object.datetime_resolved) - - def test_call_command(self): - call_command(u'reapply_tasks') - self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again')) - self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed')) - self._assert_resolved(models.FailedTask.objects.get(task_id=u'other_task')) - - def test_call_command_with_specified_task(self): - call_command(u'reapply_tasks', u'--task-name={}'.format(self.fallible_task_name)) - self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again')) - self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed')) - self._assert_unresolved(models.FailedTask.objects.get(task_id=u'other_task')) - - def test_duplicate_tasks(self): - models.FailedTask.objects.create( - task_name=self.fallible_task_name, - task_id=u'will_succeed', - args=[], - kwargs={}, - exc=u'AlsoThisOtherError()', - ) - # Verify that only one task got run for this task_id. - with mock.patch.object(self.fallible_task, u'apply_async', wraps=self.fallible_task.apply_async) as mock_apply: - call_command(u'reapply_tasks') - task_id_counts = Counter(call[2][u'task_id'] for call in mock_apply.mock_calls) - self.assertEqual(task_id_counts[u'will_succeed'], 1) - # Verify that both tasks matching that task_id are resolved. - will_succeed_tasks = models.FailedTask.objects.filter(task_id=u'will_succeed').all() - self.assertEqual(len(will_succeed_tasks), 2) - for task_object in will_succeed_tasks: - self._assert_resolved(task_object) diff --git a/openedx/core/djangoapps/celery_utils/migrations/0001_initial.py b/openedx/core/djangoapps/celery_utils/migrations/0001_initial.py deleted file mode 100644 index b85dcc1801..0000000000 --- a/openedx/core/djangoapps/celery_utils/migrations/0001_initial.py +++ /dev/null @@ -1,34 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - -from django.db import migrations, models -import django.utils.timezone -import jsonfield.fields -import model_utils.fields - - -class Migration(migrations.Migration): - - dependencies = [ - ] - - operations = [ - migrations.CreateModel( - name='FailedTask', - fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, verbose_name='created', editable=False)), - ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, verbose_name='modified', editable=False)), - ('task_name', models.CharField(max_length=255)), - ('task_id', models.CharField(max_length=255, db_index=True)), - ('args', jsonfield.fields.JSONField(blank=True)), - ('kwargs', jsonfield.fields.JSONField(blank=True)), - ('exc', models.CharField(max_length=255)), - ('datetime_resolved', models.DateTimeField(default=None, null=True, db_index=True, blank=True)), - ], - ), - migrations.AlterIndexTogether( - name='failedtask', - index_together=set([('task_name', 'exc')]), - ), - ] diff --git a/openedx/core/djangoapps/celery_utils/migrations/__init__.py b/openedx/core/djangoapps/celery_utils/migrations/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/openedx/core/djangoapps/celery_utils/models.py b/openedx/core/djangoapps/celery_utils/models.py deleted file mode 100644 index 07874ebd35..0000000000 --- a/openedx/core/djangoapps/celery_utils/models.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Models to support persistent tasks. -""" - -import logging - -from celery import current_app -from django.db import models -from jsonfield import JSONField -from model_utils.models import TimeStampedModel - -from . import tasks - -log = logging.getLogger(__name__) - - -class FailedTask(TimeStampedModel): - """ - Representation of tasks that have failed - """ - task_name = models.CharField(max_length=255) - task_id = models.CharField(max_length=255, db_index=True) - args = JSONField(blank=True) - kwargs = JSONField(blank=True) - exc = models.CharField(max_length=255) - datetime_resolved = models.DateTimeField(blank=True, null=True, default=None, db_index=True) - - class Meta(object): - index_together = [ - (u'task_name', u'exc'), - ] - - def reapply(self): - """ - Enqueue new celery task with the same arguments as the failed task. - """ - if self.datetime_resolved is not None: - raise TypeError(u'Cannot reapply a resolved task: {}'.format(self)) - log.info(u'Reapplying failed task: {}'.format(self)) - original_task = current_app.tasks[self.task_name] - original_task.apply_async( - self.args, - self.kwargs, - task_id=self.task_id, - link=tasks.mark_resolved.si(self.task_id) - ) - - def __unicode__(self): - return u'FailedTask: {task_name}, args={args}, kwargs={kwargs} ({resolution})'.format( - task_name=self.task_name, - args=self.args, - kwargs=self.kwargs, - resolution=u"not resolved" if self.datetime_resolved is None else "resolved" - ) diff --git a/openedx/core/djangoapps/celery_utils/persist_on_failure.py b/openedx/core/djangoapps/celery_utils/persist_on_failure.py deleted file mode 100644 index 227fca8015..0000000000 --- a/openedx/core/djangoapps/celery_utils/persist_on_failure.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -Celery utility code for persistent tasks. -""" - -from celery import Task - -from .models import FailedTask - - -class PersistOnFailureTask(Task): # pylint: disable=abstract-method - """ - Custom Celery Task base class that persists task data on failure. - """ - - def on_failure(self, exc, task_id, args, kwargs, einfo): - """ - If the task fails, persist a record of the task. - """ - if not FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).exists(): - FailedTask.objects.create( - task_name=_truncate_to_field(FailedTask, 'task_name', self.name), - task_id=task_id, # Fixed length UUID: No need to truncate - args=args, - kwargs=kwargs, - exc=_truncate_to_field(FailedTask, 'exc', repr(exc)), - ) - super(PersistOnFailureTask, self).on_failure(exc, task_id, args, kwargs, einfo) - - -def _truncate_to_field(model, field_name, value): - """ - If data is too big for the field, it would cause a failure to - insert, so we shorten it, truncating in the middle (because - valuable information often shows up at the end. - """ - field = model._meta.get_field(field_name) # pylint: disable=protected-access - if len(value) > field.max_length: - midpoint = field.max_length // 2 - len_after_midpoint = field.max_length - midpoint - first = value[:midpoint] - sep = u'...' - last = value[len(value) - len_after_midpoint + len(sep):] - value = sep.join([first, last]) - return value diff --git a/openedx/core/djangoapps/celery_utils/tasks.py b/openedx/core/djangoapps/celery_utils/tasks.py deleted file mode 100644 index da57f87369..0000000000 --- a/openedx/core/djangoapps/celery_utils/tasks.py +++ /dev/null @@ -1,16 +0,0 @@ -""" -Celery tasks that support the utils in this module. -""" - -from celery import task -from django.utils.timezone import now - - -@task -def mark_resolved(task_id): - """ - Given a task_id, mark all records of that task as resolved in the - FailedTask table - """ - from . import models # Imported inside the task to resolve circular imports. - models.FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).update(datetime_resolved=now()) diff --git a/openedx/core/djangoapps/celery_utils/tests/__init__.py b/openedx/core/djangoapps/celery_utils/tests/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/openedx/core/djangoapps/celery_utils/tests/test_persist_on_failure.py b/openedx/core/djangoapps/celery_utils/tests/test_persist_on_failure.py deleted file mode 100644 index a8f0f9eb47..0000000000 --- a/openedx/core/djangoapps/celery_utils/tests/test_persist_on_failure.py +++ /dev/null @@ -1,79 +0,0 @@ -u""" -Testing persistent tasks -""" - -from __future__ import print_function - -from celery import task -from django.test import TestCase -import six - -from openedx.core.djangolib.testing.utils import skip_unless_lms -from ..models import FailedTask -from ..persist_on_failure import PersistOnFailureTask - - -@skip_unless_lms -class PersistOnFailureTaskTestCase(TestCase): - """ - Test that persistent tasks save the appropriate values when needed. - """ - - @classmethod - def setUpClass(cls): - @task(base=PersistOnFailureTask) - def exampletask(message=None): - u""" - A simple task for testing persistence - """ - if message: - raise ValueError(message) - return - cls.exampletask = exampletask - super(PersistOnFailureTaskTestCase, cls).setUpClass() - - def test_exampletask_without_failure(self): - result = self.exampletask.delay() - result.wait() - self.assertEqual(result.status, u'SUCCESS') - self.assertFalse(FailedTask.objects.exists()) - - def test_exampletask_with_failure(self): - result = self.exampletask.delay(message=u'The example task failed') - with self.assertRaises(ValueError): - result.wait() - self.assertEqual(result.status, u'FAILURE') - failed_task_object = FailedTask.objects.get() - # Assert that we get the kind of data we expect - self.assertEqual( - failed_task_object.task_name, - u'openedx.core.djangoapps.celery_utils.tests.test_persist_on_failure.exampletask' - ) - self.assertEqual(failed_task_object.args, []) - self.assertEqual(failed_task_object.kwargs, {u'message': u'The example task failed'}) - self.assertEqual(failed_task_object.exc, u"ValueError(u'The example task failed',)") - self.assertIsNone(failed_task_object.datetime_resolved) - - def test_persists_when_called_with_wrong_args(self): - result = self.exampletask.delay(15, u'2001-03-04', err=True) - with self.assertRaises(TypeError): - result.wait() - self.assertEqual(result.status, u'FAILURE') - failed_task_object = FailedTask.objects.get() - self.assertEqual(failed_task_object.args, [15, u'2001-03-04']) - self.assertEqual(failed_task_object.kwargs, {u'err': True}) - - def test_persists_with_overlength_field(self): - overlong_message = u''.join(u'%03d' % x for x in six.moves.range(100)) - result = self.exampletask.delay(message=overlong_message) - with self.assertRaises(ValueError): - result.wait() - failed_task_object = FailedTask.objects.get() - # Length is max field length - self.assertEqual(len(failed_task_object.exc), 255) - # Ellipses are put in the middle - self.assertEqual(u'037...590', failed_task_object.exc[124:133]) - # The beginning of the input is captured - self.assertEqual(failed_task_object.exc[:11], u"ValueError(") - # The end of the input is captured - self.assertEqual(failed_task_object.exc[-9:], u"098099',)") diff --git a/requirements/edx/github.txt b/requirements/edx/github.txt index 2d8dc42131..6ab60a41c5 100644 --- a/requirements/edx/github.txt +++ b/requirements/edx/github.txt @@ -91,6 +91,7 @@ git+https://github.com/edx/xblock-utils.git@v1.0.3#egg=xblock-utils==1.0.3 git+https://github.com/edx/edx-user-state-client.git@1.0.1#egg=edx-user-state-client==1.0.1 git+https://github.com/edx/xblock-lti-consumer.git@v1.1.2#egg=lti_consumer-xblock==1.1.2 git+https://github.com/edx/edx-proctoring.git@0.17.0#egg=edx-proctoring==0.17.0 +git+https://github.com/edx/edx-celeryutils.git@v0.1.0#egg=edx-celeryutils==0.1.0 # Third Party XBlocks -e git+https://github.com/mitodl/edx-sga@172a90fd2738f8142c10478356b2d9ed3e55334a#egg=edx-sga