Merge pull request #14345 from edx/beryl/rerun-failed-tasks

Create infrastructure for reapplying tasks
This commit is contained in:
Cliff Dyer
2017-01-26 09:46:03 -05:00
committed by GitHub
12 changed files with 334 additions and 14 deletions

View File

@@ -11,7 +11,7 @@ from logging import getLogger
from courseware.model_data import get_score
from lms.djangoapps.course_blocks.api import get_course_blocks
from openedx.core.djangoapps.celery_utils.task import PersistOnFailureTask
from openedx.core.djangoapps.celery_utils.persist_on_failure import PersistOnFailureTask
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.locator import CourseLocator
from submissions import api as sub_api
@@ -19,8 +19,6 @@ from student.models import anonymous_id_for_user
from track.event_transaction_utils import (
set_event_transaction_type,
set_event_transaction_id,
get_event_transaction_type,
get_event_transaction_id
)
from util.date_utils import from_timestamp
from xmodule.modulestore.django import modulestore

View File

@@ -0,0 +1,58 @@
"""
Reset persistent grades for learners.
"""
from datetime import timedelta
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from django.utils.timezone import now
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Delete records of FailedTasks that have been resolved
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--dry-run',
action='store_true',
default=False,
help="Output what we're going to do, but don't actually do it."
)
parser.add_argument(
'--task-name', '-t',
default=None,
help=u"Restrict cleanup to tasks matching the given task-name.",
)
parser.add_argument(
'--age', '-a',
type=int,
default=30,
help=u"Only delete tasks that have been resolved for at least the specified number of days",
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved__lt=now() - timedelta(days=options['age']))
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Cleaning up {} tasks'.format(tasks.count()))
if options['dry_run']:
log.info(u"Tasks to clean up:\n{}".format(
u'\n '.join(u'{!r}, resolved {}'.format(task, task.datetime_resolved) for task in tasks)
))
else:
tasks.delete()

View File

@@ -0,0 +1,45 @@
"""
Reset persistent grades for learners.
"""
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Reapply tasks that failed previously.
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--task-name', '-t',
action='store',
default=None,
help=u"Restrict reapplied tasks to those matching the given task-name."
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved=None)
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Reapplying {} tasks'.format(tasks.count()))
log.debug(u'Reapplied tasks: {}'.format(list(tasks)))
seen_tasks = set()
for task in tasks:
if task.task_id in seen_tasks:
continue
seen_tasks.add(task.task_id)
task.reapply()

View File

@@ -0,0 +1,63 @@
"""
Test management command to cleanup resolved tasks.
"""
from datetime import timedelta
import ddt
from django.test import TestCase
from django.core.management import call_command
from django.utils.timezone import now
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models
DAY = timedelta(days=1)
MONTH_AGO = now() - (30 * DAY)
@ddt.ddt
@skip_unless_lms
class TestCleanupResolvedTasksCommand(TestCase):
"""
Test cleanup_resolved_tasks management command.
"""
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'old',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO + DAY,
task_id=u'new',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=None,
task_id=u'unresolved',
),
models.FailedTask.objects.create(
task_name=u'other',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'other',
),
]
super(TestCleanupResolvedTasksCommand, self).setUp()
@ddt.data(
([], {u'new', u'unresolved'}),
([u'--task-name=task'], {u'new', u'unresolved', u'other'}),
([u'--age=0'], {u'unresolved'}),
([u'--age=0', u'--task-name=task'], {u'unresolved', u'other'}),
([u'--dry-run'], {u'old', u'new', u'unresolved', u'other'}),
)
@ddt.unpack
def test_call_command(self, args, remaining_task_ids):
call_command(u'cleanup_resolved_tasks', *args)
results = set(models.FailedTask.objects.values_list('task_id', flat=True))
self.assertEqual(remaining_task_ids, results)

View File

@@ -0,0 +1,118 @@
"""
Test management command to reapply failed tasks.
"""
from collections import Counter
from datetime import datetime
import celery
from django.test import TestCase
from django.core.management import call_command
import mock
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models, persist_on_failure
@skip_unless_lms
class TestReapplyTaskCommand(TestCase):
"""
Test reapply_task management command.
"""
fallible_task_name = (
u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.fallible_task'
)
passing_task_name = u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.other_task'
@classmethod
def setUpClass(cls):
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.fallible_task_name)
def fallible_task(error_message=None):
"""
Simple task to let us test retry functionality.
"""
if error_message:
raise ValueError(error_message)
cls.fallible_task = fallible_task
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.passing_task_name)
def passing_task():
"""
This task always passes
"""
return 5
cls.passing_task = passing_task
super(TestReapplyTaskCommand, cls).setUpClass()
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'fail_again',
args=[],
kwargs={"error_message": "Err, yo!"},
exc=u'UhOhError().',
),
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'NetworkErrorMaybe?()',
),
models.FailedTask.objects.create(
task_name=self.passing_task_name,
task_id=u'other_task',
args=[],
kwargs={},
exc=u'RaceCondition()',
),
]
super(TestReapplyTaskCommand, self).setUp()
def _assert_resolved(self, task_object):
"""
Raises an assertion error if the task failed to complete successfully
and record its resolution in the failedtask record.
"""
self.assertIsInstance(task_object.datetime_resolved, datetime)
def _assert_unresolved(self, task_object):
"""
Raises an assertion error if the task completed successfully.
The resolved_datetime will still be None.
"""
self.assertIsNone(task_object.datetime_resolved)
def test_call_command(self):
call_command(u'reapply_tasks')
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_call_command_with_specified_task(self):
call_command(u'reapply_tasks', u'--task-name={}'.format(self.fallible_task_name))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_duplicate_tasks(self):
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'AlsoThisOtherError()',
)
# Verify that only one task got run for this task_id.
with mock.patch.object(self.fallible_task, u'apply_async', wraps=self.fallible_task.apply_async) as mock_apply:
call_command(u'reapply_tasks')
task_id_counts = Counter(call[2][u'task_id'] for call in mock_apply.mock_calls)
self.assertEqual(task_id_counts[u'will_succeed'], 1)
# Verify that both tasks matching that task_id are resolved.
will_succeed_tasks = models.FailedTask.objects.filter(task_id=u'will_succeed').all()
self.assertEqual(len(will_succeed_tasks), 2)
for task_object in will_succeed_tasks:
self._assert_resolved(task_object)

View File

@@ -2,10 +2,17 @@
Models to support persistent tasks.
"""
import logging
from celery import current_app
from django.db import models
from jsonfield import JSONField
from model_utils.models import TimeStampedModel
from . import tasks
log = logging.getLogger(__name__)
class FailedTask(TimeStampedModel):
"""
@@ -23,6 +30,21 @@ class FailedTask(TimeStampedModel):
(u'task_name', u'exc'),
]
def reapply(self):
"""
Enqueue new celery task with the same arguments as the failed task.
"""
if self.datetime_resolved is not None:
raise TypeError(u'Cannot reapply a resolved task: {}'.format(self))
log.info(u'Reapplying failed task: {}'.format(self))
original_task = current_app.tasks[self.task_name]
original_task.apply_async(
self.args,
self.kwargs,
task_id=self.task_id,
link=tasks.mark_resolved.si(self.task_id)
)
def __unicode__(self):
return u'FailedTask: {task_name}, args={args}, kwargs={kwargs} ({resolution})'.format(
task_name=self.task_name,

View File

@@ -7,8 +7,7 @@ from celery import Task
from .models import FailedTask
# pylint: disable=abstract-method
class PersistOnFailureTask(Task):
class PersistOnFailureTask(Task): # pylint: disable=abstract-method
"""
Custom Celery Task base class that persists task data on failure.
"""
@@ -17,13 +16,14 @@ class PersistOnFailureTask(Task):
"""
If the task fails, persist a record of the task.
"""
FailedTask.objects.create(
task_name=_truncate_to_field(FailedTask, 'task_name', self.name),
task_id=task_id, # Fixed length UUID: No need to truncate
args=args,
kwargs=kwargs,
exc=_truncate_to_field(FailedTask, 'exc', repr(exc)),
)
if not FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).exists():
FailedTask.objects.create(
task_name=_truncate_to_field(FailedTask, 'task_name', self.name),
task_id=task_id, # Fixed length UUID: No need to truncate
args=args,
kwargs=kwargs,
exc=_truncate_to_field(FailedTask, 'exc', repr(exc)),
)
super(PersistOnFailureTask, self).on_failure(exc, task_id, args, kwargs, einfo)

View File

@@ -0,0 +1,16 @@
"""
Celery tasks that support the utils in this module.
"""
from celery import task
from django.utils.timezone import now
@task
def mark_resolved(task_id):
"""
Given a task_id, mark all records of that task as resolved in the
FailedTask table
"""
from . import models # Imported inside the task to resolve circular imports.
models.FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).update(datetime_resolved=now())

View File

@@ -10,7 +10,7 @@ import six
from openedx.core.djangolib.testing.utils import skip_unless_lms
from ..models import FailedTask
from ..task import PersistOnFailureTask
from ..persist_on_failure import PersistOnFailureTask
@skip_unless_lms
@@ -47,7 +47,7 @@ class PersistOnFailureTaskTestCase(TestCase):
# Assert that we get the kind of data we expect
self.assertEqual(
failed_task_object.task_name,
u'openedx.core.djangoapps.celery_utils.tests.test_task.exampletask'
u'openedx.core.djangoapps.celery_utils.tests.test_persist_on_failure.exampletask'
)
self.assertEqual(failed_task_object.args, [])
self.assertEqual(failed_task_object.kwargs, {u'message': u'The example task failed'})