Merge pull request #1585 from edx/brian/retry-email-after-dbtimeout
Add retries when encountering DatabaseError when updating InstructorTask.
This commit is contained in:
@@ -11,7 +11,7 @@ from django.test.utils import override_settings
|
||||
from django.conf import settings
|
||||
from django.core.management import call_command
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
from django.db import DatabaseError
|
||||
|
||||
from courseware.tests.tests import TEST_DATA_MONGO_MODULESTORE
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
@@ -27,6 +27,7 @@ from instructor_task.subtasks import (
|
||||
check_subtask_is_valid,
|
||||
update_subtask_status,
|
||||
DuplicateTaskException,
|
||||
MAX_DATABASE_LOCK_RETRIES,
|
||||
)
|
||||
|
||||
|
||||
@@ -285,13 +286,29 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())
|
||||
|
||||
def dont_test_send_email_undefined_email(self):
|
||||
def test_send_email_with_locked_instructor_task(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
subtask_id = "subtask-id-locked-model"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = SubtaskStatus.create(subtask_id)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
with patch('instructor_task.subtasks.InstructorTask.save') as mock_task_save:
|
||||
mock_task_save.side_effect = DatabaseError
|
||||
with self.assertRaises(DatabaseError):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())
|
||||
self.assertEquals(mock_task_save.call_count, MAX_DATABASE_LOCK_RETRIES)
|
||||
|
||||
def test_send_email_undefined_email(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
subtask_id = "subtask-id-value"
|
||||
subtask_id = "subtask-id-undefined-email"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = SubtaskStatus.create(subtask_id)
|
||||
bogus_email_id = 1001
|
||||
|
||||
@@ -10,7 +10,7 @@ from celery.utils.log import get_task_logger
|
||||
from celery.states import SUCCESS, READY_STATES, RETRY
|
||||
from dogapi import dog_stats_api
|
||||
|
||||
from django.db import transaction
|
||||
from django.db import transaction, DatabaseError
|
||||
from django.core.cache import cache
|
||||
|
||||
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
|
||||
@@ -19,6 +19,9 @@ TASK_LOG = get_task_logger(__name__)
|
||||
|
||||
# Lock expiration should be long enough to allow a subtask to complete.
|
||||
SUBTASK_LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
|
||||
# Number of times to retry if a subtask update encounters a lock on the InstructorTask.
|
||||
# (These are recursive retries, so don't make this number too large.)
|
||||
MAX_DATABASE_LOCK_RETRIES = 5
|
||||
|
||||
|
||||
class DuplicateTaskException(Exception):
|
||||
@@ -411,8 +414,42 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
|
||||
def update_subtask_status(entry_id, current_task_id, new_subtask_status, retry_count=0):
|
||||
"""
|
||||
Update the status of the subtask in the parent InstructorTask object tracking its progress.
|
||||
|
||||
Because select_for_update is used to lock the InstructorTask object while it is being updated,
|
||||
multiple subtasks updating at the same time may time out while waiting for the lock.
|
||||
The actual update operation is surrounded by a try/except/else that permits the update to be
|
||||
retried if the transaction times out.
|
||||
|
||||
The subtask lock acquired in the call to check_subtask_is_valid() is released here, only when
|
||||
the attempting of retries has concluded.
|
||||
"""
|
||||
try:
|
||||
_update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
except DatabaseError:
|
||||
# If we fail, try again recursively.
|
||||
retry_count += 1
|
||||
if retry_count < MAX_DATABASE_LOCK_RETRIES:
|
||||
TASK_LOG.info("Retrying to update status for subtask %s of instructor task %d with status %s: retry %d",
|
||||
current_task_id, entry_id, new_subtask_status, retry_count)
|
||||
dog_stats_api.increment('instructor_task.subtask.retry_after_failed_update')
|
||||
update_subtask_status(entry_id, current_task_id, new_subtask_status, retry_count)
|
||||
else:
|
||||
TASK_LOG.info("Failed to update status after %d retries for subtask %s of instructor task %d with status %s",
|
||||
retry_count, current_task_id, entry_id, new_subtask_status)
|
||||
dog_stats_api.increment('instructor_task.subtask.failed_after_update_retries')
|
||||
raise
|
||||
finally:
|
||||
# Only release the lock on the subtask when we're done trying to update it.
|
||||
# Note that this will be called each time a recursive call to update_subtask_status()
|
||||
# returns. Fortunately, it's okay to release a lock that has already been released.
|
||||
_release_subtask_lock(current_task_id)
|
||||
|
||||
|
||||
@transaction.commit_manually
|
||||
def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
def _update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
"""
|
||||
Update the status of the subtask in the parent InstructorTask object tracking its progress.
|
||||
|
||||
@@ -493,17 +530,15 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
entry.subtasks = json.dumps(subtask_dict)
|
||||
entry.task_output = InstructorTask.create_output_for_success(task_progress)
|
||||
|
||||
TASK_LOG.info("Task output updated to %s for subtask %s of instructor task %d",
|
||||
entry.task_output, current_task_id, entry_id)
|
||||
TASK_LOG.debug("about to save....")
|
||||
entry.save()
|
||||
TASK_LOG.info("Task output updated to %s for subtask %s of instructor task %d",
|
||||
entry.task_output, current_task_id, entry_id)
|
||||
except Exception:
|
||||
TASK_LOG.exception("Unexpected error while updating InstructorTask.")
|
||||
transaction.rollback()
|
||||
dog_stats_api.increment('instructor_task.subtask.update_exception', tags=[entry.course_id])
|
||||
dog_stats_api.increment('instructor_task.subtask.update_exception')
|
||||
raise
|
||||
else:
|
||||
TASK_LOG.debug("about to commit....")
|
||||
transaction.commit()
|
||||
finally:
|
||||
_release_subtask_lock(current_task_id)
|
||||
|
||||
Reference in New Issue
Block a user