Add logging to recalculate grades base task.

TNL-6294

* Use external celery_utils in edx-platform (TNL-6454)
* Remove old openedx.core celery_utils implementation.
This commit is contained in:
J. Cliff Dyer
2017-02-02 13:22:39 -05:00
parent 4926a6b263
commit be8a898e68
18 changed files with 13 additions and 515 deletions

View File

@@ -9,11 +9,12 @@ from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.db.utils import DatabaseError
from logging import getLogger
import newrelic.agent
from celery_utils.logged_task import LoggedTask
from celery_utils.persist_on_failure import PersistOnFailureTask
from courseware.model_data import get_score
from lms.djangoapps.course_blocks.api import get_course_blocks
import newrelic.agent
from openedx.core.djangoapps.celery_utils.persist_on_failure import PersistOnFailureTask
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.locator import CourseLocator
from submissions import api as sub_api
@@ -36,7 +37,14 @@ KNOWN_RETRY_ERRORS = (DatabaseError, ValidationError) # Errors we expect occasi
RECALCULATE_GRADE_DELAY = 2 # in seconds, to prevent excessive _has_db_updated failures. See TNL-6424.
@task(bind=True, base=PersistOnFailureTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-method
"""
Include persistence features, as well as logging of task invocation.
"""
abstract = True
@task(bind=True, base=_BaseTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
def recalculate_subsection_grade_v3(self, **kwargs):
"""
Latest version of the recalculate_subsection_grade task. See docstring

View File

@@ -2161,7 +2161,7 @@ INSTALLED_APPS = (
# Customized celery tasks, including persisting failed tasks so they can
# be retried
'openedx.core.djangoapps.celery_utils',
'celery_utils',
# Ability to detect and special-case crawler behavior
'openedx.core.djangoapps.crawlers',

View File

@@ -1,58 +0,0 @@
"""
Reset persistent grades for learners.
"""
from datetime import timedelta
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from django.utils.timezone import now
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Delete records of FailedTasks that have been resolved
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--dry-run',
action='store_true',
default=False,
help="Output what we're going to do, but don't actually do it."
)
parser.add_argument(
'--task-name', '-t',
default=None,
help=u"Restrict cleanup to tasks matching the given task-name.",
)
parser.add_argument(
'--age', '-a',
type=int,
default=30,
help=u"Only delete tasks that have been resolved for at least the specified number of days",
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved__lt=now() - timedelta(days=options['age']))
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Cleaning up {} tasks'.format(tasks.count()))
if options['dry_run']:
log.info(u"Tasks to clean up:\n{}".format(
u'\n '.join(u'{!r}, resolved {}'.format(task, task.datetime_resolved) for task in tasks)
))
else:
tasks.delete()

View File

@@ -1,45 +0,0 @@
"""
Reset persistent grades for learners.
"""
import logging
from textwrap import dedent
from django.core.management.base import BaseCommand
from ...models import FailedTask
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""
Reapply tasks that failed previously.
"""
help = dedent(__doc__).strip()
def add_arguments(self, parser):
"""
Add arguments to the command parser.
Uses argparse syntax. See documentation at
https://docs.python.org/3/library/argparse.html.
"""
parser.add_argument(
'--task-name', '-t',
action='store',
default=None,
help=u"Restrict reapplied tasks to those matching the given task-name."
)
def handle(self, *args, **options):
tasks = FailedTask.objects.filter(datetime_resolved=None)
if options['task_name'] is not None:
tasks = tasks.filter(task_name=options['task_name'])
log.info(u'Reapplying {} tasks'.format(tasks.count()))
log.debug(u'Reapplied tasks: {}'.format(list(tasks)))
seen_tasks = set()
for task in tasks:
if task.task_id in seen_tasks:
continue
seen_tasks.add(task.task_id)
task.reapply()

View File

@@ -1,63 +0,0 @@
"""
Test management command to cleanup resolved tasks.
"""
from datetime import timedelta
import ddt
from django.test import TestCase
from django.core.management import call_command
from django.utils.timezone import now
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models
DAY = timedelta(days=1)
MONTH_AGO = now() - (30 * DAY)
@ddt.ddt
@skip_unless_lms
class TestCleanupResolvedTasksCommand(TestCase):
"""
Test cleanup_resolved_tasks management command.
"""
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'old',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=MONTH_AGO + DAY,
task_id=u'new',
),
models.FailedTask.objects.create(
task_name=u'task',
datetime_resolved=None,
task_id=u'unresolved',
),
models.FailedTask.objects.create(
task_name=u'other',
datetime_resolved=MONTH_AGO - DAY,
task_id=u'other',
),
]
super(TestCleanupResolvedTasksCommand, self).setUp()
@ddt.data(
([], {u'new', u'unresolved'}),
([u'--task-name=task'], {u'new', u'unresolved', u'other'}),
([u'--age=0'], {u'unresolved'}),
([u'--age=0', u'--task-name=task'], {u'unresolved', u'other'}),
([u'--dry-run'], {u'old', u'new', u'unresolved', u'other'}),
)
@ddt.unpack
def test_call_command(self, args, remaining_task_ids):
call_command(u'cleanup_resolved_tasks', *args)
results = set(models.FailedTask.objects.values_list('task_id', flat=True))
self.assertEqual(remaining_task_ids, results)

View File

@@ -1,118 +0,0 @@
"""
Test management command to reapply failed tasks.
"""
from collections import Counter
from datetime import datetime
import celery
from django.test import TestCase
from django.core.management import call_command
import mock
from openedx.core.djangolib.testing.utils import skip_unless_lms
from .... import models, persist_on_failure
@skip_unless_lms
class TestReapplyTaskCommand(TestCase):
"""
Test reapply_task management command.
"""
fallible_task_name = (
u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.fallible_task'
)
passing_task_name = u'openedx.core.djangoapps.celery_utils.management.commands.tests.test_reapply_tasks.other_task'
@classmethod
def setUpClass(cls):
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.fallible_task_name)
def fallible_task(error_message=None):
"""
Simple task to let us test retry functionality.
"""
if error_message:
raise ValueError(error_message)
cls.fallible_task = fallible_task
@celery.task(base=persist_on_failure.PersistOnFailureTask, name=cls.passing_task_name)
def passing_task():
"""
This task always passes
"""
return 5
cls.passing_task = passing_task
super(TestReapplyTaskCommand, cls).setUpClass()
def setUp(self):
self.failed_tasks = [
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'fail_again',
args=[],
kwargs={"error_message": "Err, yo!"},
exc=u'UhOhError().',
),
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'NetworkErrorMaybe?()',
),
models.FailedTask.objects.create(
task_name=self.passing_task_name,
task_id=u'other_task',
args=[],
kwargs={},
exc=u'RaceCondition()',
),
]
super(TestReapplyTaskCommand, self).setUp()
def _assert_resolved(self, task_object):
"""
Raises an assertion error if the task failed to complete successfully
and record its resolution in the failedtask record.
"""
self.assertIsInstance(task_object.datetime_resolved, datetime)
def _assert_unresolved(self, task_object):
"""
Raises an assertion error if the task completed successfully.
The resolved_datetime will still be None.
"""
self.assertIsNone(task_object.datetime_resolved)
def test_call_command(self):
call_command(u'reapply_tasks')
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_call_command_with_specified_task(self):
call_command(u'reapply_tasks', u'--task-name={}'.format(self.fallible_task_name))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'fail_again'))
self._assert_resolved(models.FailedTask.objects.get(task_id=u'will_succeed'))
self._assert_unresolved(models.FailedTask.objects.get(task_id=u'other_task'))
def test_duplicate_tasks(self):
models.FailedTask.objects.create(
task_name=self.fallible_task_name,
task_id=u'will_succeed',
args=[],
kwargs={},
exc=u'AlsoThisOtherError()',
)
# Verify that only one task got run for this task_id.
with mock.patch.object(self.fallible_task, u'apply_async', wraps=self.fallible_task.apply_async) as mock_apply:
call_command(u'reapply_tasks')
task_id_counts = Counter(call[2][u'task_id'] for call in mock_apply.mock_calls)
self.assertEqual(task_id_counts[u'will_succeed'], 1)
# Verify that both tasks matching that task_id are resolved.
will_succeed_tasks = models.FailedTask.objects.filter(task_id=u'will_succeed').all()
self.assertEqual(len(will_succeed_tasks), 2)
for task_object in will_succeed_tasks:
self._assert_resolved(task_object)

View File

@@ -1,34 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
import jsonfield.fields
import model_utils.fields
class Migration(migrations.Migration):
dependencies = [
]
operations = [
migrations.CreateModel(
name='FailedTask',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, verbose_name='created', editable=False)),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, verbose_name='modified', editable=False)),
('task_name', models.CharField(max_length=255)),
('task_id', models.CharField(max_length=255, db_index=True)),
('args', jsonfield.fields.JSONField(blank=True)),
('kwargs', jsonfield.fields.JSONField(blank=True)),
('exc', models.CharField(max_length=255)),
('datetime_resolved', models.DateTimeField(default=None, null=True, db_index=True, blank=True)),
],
),
migrations.AlterIndexTogether(
name='failedtask',
index_together=set([('task_name', 'exc')]),
),
]

View File

@@ -1,54 +0,0 @@
"""
Models to support persistent tasks.
"""
import logging
from celery import current_app
from django.db import models
from jsonfield import JSONField
from model_utils.models import TimeStampedModel
from . import tasks
log = logging.getLogger(__name__)
class FailedTask(TimeStampedModel):
"""
Representation of tasks that have failed
"""
task_name = models.CharField(max_length=255)
task_id = models.CharField(max_length=255, db_index=True)
args = JSONField(blank=True)
kwargs = JSONField(blank=True)
exc = models.CharField(max_length=255)
datetime_resolved = models.DateTimeField(blank=True, null=True, default=None, db_index=True)
class Meta(object):
index_together = [
(u'task_name', u'exc'),
]
def reapply(self):
"""
Enqueue new celery task with the same arguments as the failed task.
"""
if self.datetime_resolved is not None:
raise TypeError(u'Cannot reapply a resolved task: {}'.format(self))
log.info(u'Reapplying failed task: {}'.format(self))
original_task = current_app.tasks[self.task_name]
original_task.apply_async(
self.args,
self.kwargs,
task_id=self.task_id,
link=tasks.mark_resolved.si(self.task_id)
)
def __unicode__(self):
return u'FailedTask: {task_name}, args={args}, kwargs={kwargs} ({resolution})'.format(
task_name=self.task_name,
args=self.args,
kwargs=self.kwargs,
resolution=u"not resolved" if self.datetime_resolved is None else "resolved"
)

View File

@@ -1,44 +0,0 @@
"""
Celery utility code for persistent tasks.
"""
from celery import Task
from .models import FailedTask
class PersistOnFailureTask(Task): # pylint: disable=abstract-method
"""
Custom Celery Task base class that persists task data on failure.
"""
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
If the task fails, persist a record of the task.
"""
if not FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).exists():
FailedTask.objects.create(
task_name=_truncate_to_field(FailedTask, 'task_name', self.name),
task_id=task_id, # Fixed length UUID: No need to truncate
args=args,
kwargs=kwargs,
exc=_truncate_to_field(FailedTask, 'exc', repr(exc)),
)
super(PersistOnFailureTask, self).on_failure(exc, task_id, args, kwargs, einfo)
def _truncate_to_field(model, field_name, value):
"""
If data is too big for the field, it would cause a failure to
insert, so we shorten it, truncating in the middle (because
valuable information often shows up at the end.
"""
field = model._meta.get_field(field_name) # pylint: disable=protected-access
if len(value) > field.max_length:
midpoint = field.max_length // 2
len_after_midpoint = field.max_length - midpoint
first = value[:midpoint]
sep = u'...'
last = value[len(value) - len_after_midpoint + len(sep):]
value = sep.join([first, last])
return value

View File

@@ -1,16 +0,0 @@
"""
Celery tasks that support the utils in this module.
"""
from celery import task
from django.utils.timezone import now
@task
def mark_resolved(task_id):
"""
Given a task_id, mark all records of that task as resolved in the
FailedTask table
"""
from . import models # Imported inside the task to resolve circular imports.
models.FailedTask.objects.filter(task_id=task_id, datetime_resolved=None).update(datetime_resolved=now())

View File

@@ -1,79 +0,0 @@
u"""
Testing persistent tasks
"""
from __future__ import print_function
from celery import task
from django.test import TestCase
import six
from openedx.core.djangolib.testing.utils import skip_unless_lms
from ..models import FailedTask
from ..persist_on_failure import PersistOnFailureTask
@skip_unless_lms
class PersistOnFailureTaskTestCase(TestCase):
"""
Test that persistent tasks save the appropriate values when needed.
"""
@classmethod
def setUpClass(cls):
@task(base=PersistOnFailureTask)
def exampletask(message=None):
u"""
A simple task for testing persistence
"""
if message:
raise ValueError(message)
return
cls.exampletask = exampletask
super(PersistOnFailureTaskTestCase, cls).setUpClass()
def test_exampletask_without_failure(self):
result = self.exampletask.delay()
result.wait()
self.assertEqual(result.status, u'SUCCESS')
self.assertFalse(FailedTask.objects.exists())
def test_exampletask_with_failure(self):
result = self.exampletask.delay(message=u'The example task failed')
with self.assertRaises(ValueError):
result.wait()
self.assertEqual(result.status, u'FAILURE')
failed_task_object = FailedTask.objects.get()
# Assert that we get the kind of data we expect
self.assertEqual(
failed_task_object.task_name,
u'openedx.core.djangoapps.celery_utils.tests.test_persist_on_failure.exampletask'
)
self.assertEqual(failed_task_object.args, [])
self.assertEqual(failed_task_object.kwargs, {u'message': u'The example task failed'})
self.assertEqual(failed_task_object.exc, u"ValueError(u'The example task failed',)")
self.assertIsNone(failed_task_object.datetime_resolved)
def test_persists_when_called_with_wrong_args(self):
result = self.exampletask.delay(15, u'2001-03-04', err=True)
with self.assertRaises(TypeError):
result.wait()
self.assertEqual(result.status, u'FAILURE')
failed_task_object = FailedTask.objects.get()
self.assertEqual(failed_task_object.args, [15, u'2001-03-04'])
self.assertEqual(failed_task_object.kwargs, {u'err': True})
def test_persists_with_overlength_field(self):
overlong_message = u''.join(u'%03d' % x for x in six.moves.range(100))
result = self.exampletask.delay(message=overlong_message)
with self.assertRaises(ValueError):
result.wait()
failed_task_object = FailedTask.objects.get()
# Length is max field length
self.assertEqual(len(failed_task_object.exc), 255)
# Ellipses are put in the middle
self.assertEqual(u'037...590', failed_task_object.exc[124:133])
# The beginning of the input is captured
self.assertEqual(failed_task_object.exc[:11], u"ValueError(")
# The end of the input is captured
self.assertEqual(failed_task_object.exc[-9:], u"098099',)")

View File

@@ -91,6 +91,7 @@ git+https://github.com/edx/xblock-utils.git@v1.0.3#egg=xblock-utils==1.0.3
git+https://github.com/edx/edx-user-state-client.git@1.0.1#egg=edx-user-state-client==1.0.1
git+https://github.com/edx/xblock-lti-consumer.git@v1.1.2#egg=lti_consumer-xblock==1.1.2
git+https://github.com/edx/edx-proctoring.git@0.17.0#egg=edx-proctoring==0.17.0
git+https://github.com/edx/edx-celeryutils.git@v0.1.0#egg=edx-celeryutils==0.1.0
# Third Party XBlocks
-e git+https://github.com/mitodl/edx-sga@172a90fd2738f8142c10478356b2d9ed3e55334a#egg=edx-sga