Move subtask update logic that was only needed for tests into the tests that needed it.
This commit is contained in:
@@ -21,14 +21,13 @@ from boto.exception import AWSConnectionError
|
||||
|
||||
from celery.states import SUCCESS, FAILURE
|
||||
|
||||
# from django.test.utils import override_settings
|
||||
from django.conf import settings
|
||||
from django.core.management import call_command
|
||||
|
||||
from bulk_email.models import CourseEmail, Optout, SEND_TO_ALL
|
||||
|
||||
# from instructor_task.tests.test_tasks import TestInstructorTasks
|
||||
from instructor_task.tasks import send_bulk_course_email
|
||||
from instructor_task.subtasks import update_subtask_status
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.tests.test_base import InstructorTaskCourseTestCase
|
||||
from instructor_task.tests.factories import InstructorTaskFactory
|
||||
@@ -39,6 +38,41 @@ class TestTaskFailure(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def my_update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
"""
|
||||
Check whether a subtask has been updated before really updating.
|
||||
|
||||
Check whether a subtask which has been retried
|
||||
has had the retry already write its results here before the code
|
||||
that was invoking the retry had a chance to update this status.
|
||||
|
||||
This is the norm in "eager" mode (used by tests) where the retry is called
|
||||
and run to completion before control is returned to the code that
|
||||
invoked the retry. If the retries eventually end in failure (e.g. due to
|
||||
a maximum number of retries being attempted), the "eager" code will return
|
||||
the error for each retry that is on the stack. We want to just ignore the
|
||||
later updates that are called as the result of the earlier retries.
|
||||
|
||||
This should not be an issue in production, where status is updated before
|
||||
a task is retried, and is then updated afterwards if the retry fails.
|
||||
"""
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status_info = subtask_dict['status']
|
||||
current_subtask_status = subtask_status_info[current_task_id]
|
||||
|
||||
def _get_retry_count(subtask_result):
|
||||
"""Return the number of retries counted for the given subtask."""
|
||||
retry_count = subtask_result.get('retried_nomax', 0)
|
||||
retry_count += subtask_result.get('retried_withmax', 0)
|
||||
return retry_count
|
||||
|
||||
current_retry_count = _get_retry_count(current_subtask_status)
|
||||
new_retry_count = _get_retry_count(new_subtask_status)
|
||||
if current_retry_count <= new_retry_count:
|
||||
update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
|
||||
|
||||
class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
|
||||
"""Tests instructor task that send bulk email."""
|
||||
|
||||
@@ -244,14 +278,15 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
|
||||
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
|
||||
# always fail to connect, triggering repeated retries until limit is hit:
|
||||
get_conn.return_value.send_messages.side_effect = cycle([exception])
|
||||
self._test_run_with_task(
|
||||
send_bulk_course_email,
|
||||
'emailed',
|
||||
num_emails,
|
||||
expected_succeeds,
|
||||
failed=expected_fails,
|
||||
retried_withmax=(settings.BULK_EMAIL_MAX_RETRIES + 1)
|
||||
)
|
||||
with patch('bulk_email.tasks.update_subtask_status', my_update_subtask_status):
|
||||
self._test_run_with_task(
|
||||
send_bulk_course_email,
|
||||
'emailed',
|
||||
num_emails,
|
||||
expected_succeeds,
|
||||
failed=expected_fails,
|
||||
retried_withmax=(settings.BULK_EMAIL_MAX_RETRIES + 1)
|
||||
)
|
||||
|
||||
def test_retry_after_smtp_disconnect(self):
|
||||
self._test_retry_after_limited_retry_error(SMTPServerDisconnected(425, "Disconnecting"))
|
||||
|
||||
@@ -87,11 +87,11 @@ def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, r
|
||||
return new_result
|
||||
|
||||
|
||||
def _get_retry_count(subtask_result):
|
||||
"""Return the number of retries counted for the given subtask."""
|
||||
retry_count = subtask_result.get('retried_nomax', 0)
|
||||
retry_count += subtask_result.get('retried_withmax', 0)
|
||||
return retry_count
|
||||
# def _get_retry_count(subtask_result):
|
||||
# """Return the number of retries counted for the given subtask."""
|
||||
# retry_count = subtask_result.get('retried_nomax', 0)
|
||||
# retry_count += subtask_result.get('retried_withmax', 0)
|
||||
# return retry_count
|
||||
|
||||
|
||||
def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_id_list):
|
||||
@@ -196,34 +196,8 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
TASK_LOG.warning(msg)
|
||||
raise ValueError(msg)
|
||||
|
||||
# Check for race condition where a subtask which has been retried
|
||||
# has the retry already write its results here before the code
|
||||
# that was invoking the retry has had a chance to update this status.
|
||||
# While we think this is highly unlikely in production code, it is
|
||||
# the norm in "eager" mode (used by tests) where the retry is called
|
||||
# and run to completion before control is returned to the code that
|
||||
# invoked the retry.
|
||||
current_subtask_status = subtask_status_info[current_task_id]
|
||||
current_retry_count = _get_retry_count(current_subtask_status)
|
||||
new_retry_count = _get_retry_count(new_subtask_status)
|
||||
if current_retry_count > new_retry_count:
|
||||
TASK_LOG.warning("Task id %s: Retry %s has already updated InstructorTask -- skipping update for retry %s.",
|
||||
current_task_id, current_retry_count, new_retry_count)
|
||||
transaction.rollback()
|
||||
return
|
||||
elif new_retry_count > 0:
|
||||
TASK_LOG.debug("Task id %s: previous retry %s is not newer -- applying update for retry %s.",
|
||||
current_task_id, current_retry_count, new_retry_count)
|
||||
|
||||
# Update status unless it has already been set. This can happen
|
||||
# when a task is retried and running in eager mode -- the retries
|
||||
# will be updating before the original call, and we don't want their
|
||||
# ultimate status to be clobbered by the "earlier" updates. This
|
||||
# should not be a problem in normal (non-eager) processing.
|
||||
current_state = current_subtask_status['state']
|
||||
new_state = new_subtask_status['state']
|
||||
if new_state != RETRY or current_state not in READY_STATES:
|
||||
subtask_status_info[current_task_id] = new_subtask_status
|
||||
# Update status:
|
||||
subtask_status_info[current_task_id] = new_subtask_status
|
||||
|
||||
# Update the parent task progress.
|
||||
# Set the estimate of duration, but only if it
|
||||
@@ -239,6 +213,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
# In future, we can make this more responsive by updating status
|
||||
# between retries, by comparing counts that change from previous
|
||||
# retry.
|
||||
new_state = new_subtask_status['state']
|
||||
if new_subtask_status is not None and new_state in READY_STATES:
|
||||
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
|
||||
task_progress[statname] += new_subtask_status[statname]
|
||||
|
||||
Reference in New Issue
Block a user