diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py index 45f5fa385d..354c21c860 100644 --- a/lms/djangoapps/grades/tasks.py +++ b/lms/djangoapps/grades/tasks.py @@ -11,7 +11,7 @@ from logging import getLogger from courseware.model_data import get_score from lms.djangoapps.course_blocks.api import get_course_blocks -from openedx.core.djangoapps.celery_utils.task import PersistOnFailureTask +from openedx.core.djangoapps.celery_utils.persist_on_failure import PersistOnFailureTask from opaque_keys.edx.keys import UsageKey from opaque_keys.edx.locator import CourseLocator from submissions import api as sub_api @@ -19,8 +19,6 @@ from student.models import anonymous_id_for_user from track.event_transaction_utils import ( set_event_transaction_type, set_event_transaction_id, - get_event_transaction_type, - get_event_transaction_id ) from util.date_utils import from_timestamp from xmodule.modulestore.django import modulestore diff --git a/openedx/core/djangoapps/celery_utils/management/__init__.py b/openedx/core/djangoapps/celery_utils/management/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openedx/core/djangoapps/celery_utils/management/commands/__init__.py b/openedx/core/djangoapps/celery_utils/management/commands/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openedx/core/djangoapps/celery_utils/management/commands/cleanup_resolved_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/cleanup_resolved_tasks.py new file mode 100644 index 0000000000..e67c0b319e --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/management/commands/cleanup_resolved_tasks.py @@ -0,0 +1,58 @@ +""" +Reset persistent grades for learners. +""" +from datetime import timedelta +import logging +from textwrap import dedent + +from django.core.management.base import BaseCommand +from django.utils.timezone import now + +from ...models import FailedTask + + +log = logging.getLogger(__name__) + + +class Command(BaseCommand): + """ + Delete records of FailedTasks that have been resolved + """ + help = dedent(__doc__).strip() + + def add_arguments(self, parser): + """ + Add arguments to the command parser. + + Uses argparse syntax. See documentation at + https://docs.python.org/3/library/argparse.html. + """ + parser.add_argument( + '--dry-run', + action='store_true', + default=False, + help="Output what we're going to do, but don't actually do it." + ) + parser.add_argument( + '--task-name', '-t', + default=None, + help=u"Restrict cleanup to tasks matching the given task-name.", + ) + parser.add_argument( + '--age', '-a', + type=int, + default=30, + help=u"Only delete tasks that have been resolved for at least the specified number of days", + ) + + def handle(self, *args, **options): + tasks = FailedTask.objects.filter(datetime_resolved__lt=now() - timedelta(days=options['age'])) + if options['task_name'] is not None: + tasks = tasks.filter(task_name=options['task_name']) + log.info(u'Cleaning up {} tasks'.format(tasks.count())) + if options['dry_run']: + log.info(u"Tasks to clean up:\n{}".format( + u'\n '.join(u'{!r}, resolved {}'.format(task, task.datetime_resolved) for task in tasks) + )) + else: + tasks.delete() diff --git a/openedx/core/djangoapps/celery_utils/management/commands/reapply_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/reapply_tasks.py new file mode 100644 index 0000000000..407027517b --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/management/commands/reapply_tasks.py @@ -0,0 +1,45 @@ +""" +Reset persistent grades for learners. +""" +import logging +from textwrap import dedent + +from django.core.management.base import BaseCommand + +from ...models import FailedTask + +log = logging.getLogger(__name__) + + +class Command(BaseCommand): + """ + Reapply tasks that failed previously. + """ + help = dedent(__doc__).strip() + + def add_arguments(self, parser): + """ + Add arguments to the command parser. + + Uses argparse syntax. See documentation at + https://docs.python.org/3/library/argparse.html. + """ + parser.add_argument( + '--task-name', '-t', + action='store', + default=None, + help=u"Restrict reapplied tasks to those matching the given task-name." + ) + + def handle(self, *args, **options): + tasks = FailedTask.objects.filter(datetime_resolved=None) + if options['task_name'] is not None: + tasks = tasks.filter(task_name=options['task_name']) + log.info(u'Reapplying {} tasks'.format(tasks.count())) + log.debug(u'Reapplied tasks: {}'.format(list(tasks))) + seen_tasks = set() + for task in tasks: + if task.task_id in seen_tasks: + continue + seen_tasks.add(task.task_id) + task.reapply() diff --git a/openedx/core/djangoapps/celery_utils/management/commands/tests/__init__.py b/openedx/core/djangoapps/celery_utils/management/commands/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openedx/core/djangoapps/celery_utils/management/commands/tests/test_cleanup_resolved_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/tests/test_cleanup_resolved_tasks.py new file mode 100644 index 0000000000..ac8e6792de --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/management/commands/tests/test_cleanup_resolved_tasks.py @@ -0,0 +1,63 @@ +""" +Test management command to cleanup resolved tasks. +""" + +from datetime import timedelta + +import ddt +from django.test import TestCase +from django.core.management import call_command +from django.utils.timezone import now + +from openedx.core.djangolib.testing.utils import skip_unless_lms + +from .... import models + +DAY = timedelta(days=1) +MONTH_AGO = now() - (30 * DAY) + + +@ddt.ddt +@skip_unless_lms +class TestCleanupResolvedTasksCommand(TestCase): + """ + Test cleanup_resolved_tasks management command. + """ + + def setUp(self): + self.failed_tasks = [ + models.FailedTask.objects.create( + task_name=u'task', + datetime_resolved=MONTH_AGO - DAY, + task_id=u'old', + ), + models.FailedTask.objects.create( + task_name=u'task', + datetime_resolved=MONTH_AGO + DAY, + task_id=u'new', + ), + models.FailedTask.objects.create( + task_name=u'task', + datetime_resolved=None, + task_id=u'unresolved', + ), + models.FailedTask.objects.create( + task_name=u'other', + datetime_resolved=MONTH_AGO - DAY, + task_id=u'other', + ), + ] + super(TestCleanupResolvedTasksCommand, self).setUp() + + @ddt.data( + ([], {u'new', u'unresolved'}), + ([u'--task-name=task'], {u'new', u'unresolved', u'other'}), + ([u'--age=0'], {u'unresolved'}), + ([u'--age=0', u'--task-name=task'], {u'unresolved', u'other'}), + ([u'--dry-run'], {u'old', u'new', u'unresolved', u'other'}), + ) + @ddt.unpack + def test_call_command(self, args, remaining_task_ids): + call_command(u'cleanup_resolved_tasks', *args) + results = set(models.FailedTask.objects.values_list('task_id', flat=True)) + self.assertEqual(remaining_task_ids, results) diff --git a/openedx/core/djangoapps/celery_utils/management/commands/tests/test_reapply_tasks.py b/openedx/core/djangoapps/celery_utils/management/commands/tests/test_reapply_tasks.py new file mode 100644 index 0000000000..8947586c9a --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/management/commands/tests/test_reapply_tasks.py @@ -0,0 +1,118 @@ +""" +Test management command to reapply failed tasks. +""" +from collections import Counter +from datetime import datetime + +import celery +from django.test import TestCase +from django.core.management import call_command +import mock + +from openedx.core.djangolib.testing.utils import skip_unless_lms + +from .... import models, persist_on_failure + + +@skip_unless_lms +class TestReapplyTaskCommand(TestCase): + """ + Test reapply_task management command. + """ + + fallible_task_name = ( + u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.fallible_task' + ) + passing_task_name = u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.other_task' + + @classmethod + def setUpClass(cls): + @celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.fallible_task_name) + def fallible_task(error_message=None): + """ + Simple task to let us test retry functionality. + """ + if error_message: + raise ValueError(error_message) + + cls.fallible_task = fallible_task + + @celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.passing_task_name) + def passing_task(): + """ + This task always passes + """ + return 5 + cls.passing_task = passing_task + super(TestReapplyTaskCommand, cls).setUpClass() + + def setUp(self): + self.failed_tasks = [ + models.FailedTask.objects.create( + task_name=self.fallible_task_name, + task_id=u'fail_again', + args=[], + kwargs={"error_message": "Err, yo!"}, + exc=u'UhOhError().', + ), + models.FailedTask.objects.create( + task_name=self.fallible_task_name, + task_id=u'will_succeed', + args=[], + kwargs={}, + exc=u'NetworkErrorMaybe?()', + ), + models.FailedTask.objects.create( + task_name=self.passing_task_name, + task_id=u'other_task', + args=[], + kwargs={}, + exc=u'RaceCondition()', + ), + ] + super(TestReapplyTaskCommand, self).setUp() + + def _assert_resolved(self, task_object): + """ + Raises an assertion error if the task failed to complete successfully + and record its resolution in the failedtask record. + """ + self.assertIsInstance(task_object.datetime_resolved, datetime) + + def _assert_unresolved(self, task_object): + """ + Raises an assertion error if the task completed successfully. + The resolved_datetime will still be None. + """ + self.assertIsNone(task_object.datetime_resolved) + + def test_call_command(self): + call_command(u'reapply_tasks') + self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again')) + self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed')) + self._assert_resolved(models.FailedTask.objects.get(task_id=u'other_task')) + + def test_call_command_with_specified_task(self): + call_command(u'reapply_tasks', u'--task-name={}'.format(self.fallible_task_name)) + self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again')) + self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed')) + self._assert_unresolved(models.FailedTask.objects.get(task_id=u'other_task')) + + def test_duplicate_tasks(self): + models.FailedTask.objects.create( + task_name=self.fallible_task_name, + task_id=u'will_succeed', + args=[], + kwargs={}, + exc=u'AlsoThisOtherError()', + ) + # Verify that only one task got run for this task_id. + with mock.patch.object(self.fallible_task, u'apply_async', wraps=self.fallible_task.apply_async) as mock_apply: + call_command(u'reapply_tasks') + task_id_counts = Counter(call[2][u'task_id'] for call in mock_apply.mock_calls) + self.assertEqual(task_id_counts[u'will_succeed'], 1) + # Verify that both tasks matching that task_id are resolved. + will_succeed_tasks = models.FailedTask.objects.filter(task_id=u'will_succeed').all() + self.assertEqual(len(will_succeed_tasks), 2) + for task_object in will_succeed_tasks: + self._assert_resolved(task_object) diff --git a/openedx/core/djangoapps/celery_utils/models.py b/openedx/core/djangoapps/celery_utils/models.py index b96dd0aabc..07874ebd35 100644 --- a/openedx/core/djangoapps/celery_utils/models.py +++ b/openedx/core/djangoapps/celery_utils/models.py @@ -2,10 +2,17 @@ Models to support persistent tasks. """ +import logging + +from celery import current_app from django.db import models from jsonfield import JSONField from model_utils.models import TimeStampedModel +from . import tasks + +log = logging.getLogger(__name__) + class FailedTask(TimeStampedModel): """ @@ -23,6 +30,21 @@ class FailedTask(TimeStampedModel): (u'task_name', u'exc'), ] + def reapply(self): + """ + Enqueue new celery task with the same arguments as the failed task. + """ + if self.datetime_resolved is not None: + raise TypeError(u'Cannot reapply a resolved task: {}'.format(self)) + log.info(u'Reapplying failed task: {}'.format(self)) + original_task = current_app.tasks[self.task_name] + original_task.apply_async( + self.args, + self.kwargs, + task_id=self.task_id, + link=tasks.mark_resolved.si(self.task_id) + ) + def __unicode__(self): return u'FailedTask: {task_name}, args={args}, kwargs={kwargs} ({resolution})'.format( task_name=self.task_name, diff --git a/openedx/core/djangoapps/celery_utils/task.py b/openedx/core/djangoapps/celery_utils/persist_on_failure.py similarity index 68% rename from openedx/core/djangoapps/celery_utils/task.py rename to openedx/core/djangoapps/celery_utils/persist_on_failure.py index 2170368db6..227fca8015 100644 --- a/openedx/core/djangoapps/celery_utils/task.py +++ b/openedx/core/djangoapps/celery_utils/persist_on_failure.py @@ -7,8 +7,7 @@ from celery import Task from .models import FailedTask -# pylint: disable=abstract-method -class PersistOnFailureTask(Task): +class PersistOnFailureTask(Task): # pylint: disable=abstract-method """ Custom Celery Task base class that persists task data on failure. """ @@ -17,13 +16,14 @@ class PersistOnFailureTask(Task): """ If the task fails, persist a record of the task. """ - FailedTask.objects.create( - task_name=_truncate_to_field(FailedTask, 'task_name', self.name), - task_id=task_id, # Fixed length UUID: No need to truncate - args=args, - kwargs=kwargs, - exc=_truncate_to_field(FailedTask, 'exc', repr(exc)), - ) + if not FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).exists(): + FailedTask.objects.create( + task_name=_truncate_to_field(FailedTask, 'task_name', self.name), + task_id=task_id, # Fixed length UUID: No need to truncate + args=args, + kwargs=kwargs, + exc=_truncate_to_field(FailedTask, 'exc', repr(exc)), + ) super(PersistOnFailureTask, self).on_failure(exc, task_id, args, kwargs, einfo) diff --git a/openedx/core/djangoapps/celery_utils/tasks.py b/openedx/core/djangoapps/celery_utils/tasks.py new file mode 100644 index 0000000000..da57f87369 --- /dev/null +++ b/openedx/core/djangoapps/celery_utils/tasks.py @@ -0,0 +1,16 @@ +""" +Celery tasks that support the utils in this module. +""" + +from celery import task +from django.utils.timezone import now + + +@task +def mark_resolved(task_id): + """ + Given a task_id, mark all records of that task as resolved in the + FailedTask table + """ + from . import models # Imported inside the task to resolve circular imports. + models.FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).update(datetime_resolved=now()) diff --git a/openedx/core/djangoapps/celery_utils/tests/test_task.py b/openedx/core/djangoapps/celery_utils/tests/test_persist_on_failure.py similarity index 95% rename from openedx/core/djangoapps/celery_utils/tests/test_task.py rename to openedx/core/djangoapps/celery_utils/tests/test_persist_on_failure.py index 8fd8a511a5..a8f0f9eb47 100644 --- a/openedx/core/djangoapps/celery_utils/tests/test_task.py +++ b/openedx/core/djangoapps/celery_utils/tests/test_persist_on_failure.py @@ -10,7 +10,7 @@ import six from openedx.core.djangolib.testing.utils import skip_unless_lms from ..models import FailedTask -from ..task import PersistOnFailureTask +from ..persist_on_failure import PersistOnFailureTask @skip_unless_lms @@ -47,7 +47,7 @@ class PersistOnFailureTaskTestCase(TestCase): # Assert that we get the kind of data we expect self.assertEqual( failed_task_object.task_name, - u'openedx.core.djangoapps.celery_utils.tests.test_task.exampletask' + u'openedx.core.djangoapps.celery_utils.tests.test_persist_on_failure.exampletask' ) self.assertEqual(failed_task_object.args, []) self.assertEqual(failed_task_object.kwargs, {u'message': u'The example task failed'})