From 7bef11a7f153d2534d1614533f681b44c80b7adf Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Mon, 4 Nov 2013 13:14:53 -0500 Subject: [PATCH] Add retries when encountering DatabaseError when updating InstructorTask. --- .../bulk_email/tests/test_err_handling.py | 23 +++++++-- lms/djangoapps/instructor_task/subtasks.py | 49 ++++++++++++++++--- 2 files changed, 62 insertions(+), 10 deletions(-) diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index d2c3a394f5..9fda6fc46f 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -11,7 +11,7 @@ from django.test.utils import override_settings from django.conf import settings from django.core.management import call_command from django.core.urlresolvers import reverse - +from django.db import DatabaseError from courseware.tests.tests import TEST_DATA_MONGO_MODULESTORE from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase @@ -27,6 +27,7 @@ from instructor_task.subtasks import ( check_subtask_is_valid, update_subtask_status, DuplicateTaskException, + MAX_DATABASE_LOCK_RETRIES, ) @@ -285,13 +286,29 @@ class TestEmailErrors(ModuleStoreTestCase): with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'): send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict()) - def dont_test_send_email_undefined_email(self): + def test_send_email_with_locked_instructor_task(self): + # test at a lower level, to ensure that the course gets checked down below too. + entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) + entry_id = entry.id # pylint: disable=E1101 + subtask_id = "subtask-id-locked-model" + initialize_subtask_info(entry, "emailed", 100, [subtask_id]) + subtask_status = SubtaskStatus.create(subtask_id) + bogus_email_id = 1001 + to_list = ['test@test.com'] + global_email_context = {'course_title': 'dummy course'} + with patch('instructor_task.subtasks.InstructorTask.save') as mock_task_save: + mock_task_save.side_effect = DatabaseError + with self.assertRaises(DatabaseError): + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict()) + self.assertEquals(mock_task_save.call_count, MAX_DATABASE_LOCK_RETRIES) + + def test_send_email_undefined_email(self): # test at a lower level, to ensure that the course gets checked down below too. entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) entry_id = entry.id # pylint: disable=E1101 to_list = ['test@test.com'] global_email_context = {'course_title': 'dummy course'} - subtask_id = "subtask-id-value" + subtask_id = "subtask-id-undefined-email" initialize_subtask_info(entry, "emailed", 100, [subtask_id]) subtask_status = SubtaskStatus.create(subtask_id) bogus_email_id = 1001 diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 7297ed7e0e..e4d94cd671 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -10,7 +10,7 @@ from celery.utils.log import get_task_logger from celery.states import SUCCESS, READY_STATES, RETRY from dogapi import dog_stats_api -from django.db import transaction +from django.db import transaction, DatabaseError from django.core.cache import cache from instructor_task.models import InstructorTask, PROGRESS, QUEUING @@ -19,6 +19,9 @@ TASK_LOG = get_task_logger(__name__) # Lock expiration should be long enough to allow a subtask to complete. SUBTASK_LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes +# Number of times to retry if a subtask update encounters a lock on the InstructorTask. +# (These are recursive retries, so don't make this number too large.) +MAX_DATABASE_LOCK_RETRIES = 5 class DuplicateTaskException(Exception): @@ -411,8 +414,42 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status): raise DuplicateTaskException(msg) +def update_subtask_status(entry_id, current_task_id, new_subtask_status, retry_count=0): + """ + Update the status of the subtask in the parent InstructorTask object tracking its progress. + + Because select_for_update is used to lock the InstructorTask object while it is being updated, + multiple subtasks updating at the same time may time out while waiting for the lock. + The actual update operation is surrounded by a try/except/else that permits the update to be + retried if the transaction times out. + + The subtask lock acquired in the call to check_subtask_is_valid() is released here, only when + the attempting of retries has concluded. + """ + try: + _update_subtask_status(entry_id, current_task_id, new_subtask_status) + except DatabaseError: + # If we fail, try again recursively. + retry_count += 1 + if retry_count < MAX_DATABASE_LOCK_RETRIES: + TASK_LOG.info("Retrying to update status for subtask %s of instructor task %d with status %s: retry %d", + current_task_id, entry_id, new_subtask_status, retry_count) + dog_stats_api.increment('instructor_task.subtask.retry_after_failed_update') + update_subtask_status(entry_id, current_task_id, new_subtask_status, retry_count) + else: + TASK_LOG.info("Failed to update status after %d retries for subtask %s of instructor task %d with status %s", + retry_count, current_task_id, entry_id, new_subtask_status) + dog_stats_api.increment('instructor_task.subtask.failed_after_update_retries') + raise + finally: + # Only release the lock on the subtask when we're done trying to update it. + # Note that this will be called each time a recursive call to update_subtask_status() + # returns. Fortunately, it's okay to release a lock that has already been released. + _release_subtask_lock(current_task_id) + + @transaction.commit_manually -def update_subtask_status(entry_id, current_task_id, new_subtask_status): +def _update_subtask_status(entry_id, current_task_id, new_subtask_status): """ Update the status of the subtask in the parent InstructorTask object tracking its progress. @@ -493,17 +530,15 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status): entry.subtasks = json.dumps(subtask_dict) entry.task_output = InstructorTask.create_output_for_success(task_progress) - TASK_LOG.info("Task output updated to %s for subtask %s of instructor task %d", - entry.task_output, current_task_id, entry_id) TASK_LOG.debug("about to save....") entry.save() + TASK_LOG.info("Task output updated to %s for subtask %s of instructor task %d", + entry.task_output, current_task_id, entry_id) except Exception: TASK_LOG.exception("Unexpected error while updating InstructorTask.") transaction.rollback() - dog_stats_api.increment('instructor_task.subtask.update_exception', tags=[entry.course_id]) + dog_stats_api.increment('instructor_task.subtask.update_exception') raise else: TASK_LOG.debug("about to commit....") transaction.commit() - finally: - _release_subtask_lock(current_task_id)