Merge pull request #1458 from edx/brian/email-dupe-retry
Check for requeued subtasks when in RETRY state.
This commit is contained in:
@@ -346,7 +346,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
# task that is resubmitted, but just in case we fail to do so there, we check here as well.
|
||||
# There is also a possibility that this task will be run twice by Celery, for the same reason.
|
||||
# To deal with that, we need to confirm that the task has not already been completed.
|
||||
check_subtask_is_valid(entry_id, current_task_id)
|
||||
check_subtask_is_valid(entry_id, current_task_id, subtask_status)
|
||||
|
||||
send_exception = None
|
||||
new_subtask_status = None
|
||||
|
||||
@@ -5,7 +5,7 @@ from itertools import cycle
|
||||
from mock import patch
|
||||
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
|
||||
|
||||
from celery.states import SUCCESS
|
||||
from celery.states import SUCCESS, RETRY
|
||||
|
||||
from django.test.utils import override_settings
|
||||
from django.conf import settings
|
||||
@@ -258,13 +258,33 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
update_subtask_status(entry_id, subtask_id, subtask_status)
|
||||
check_subtask_is_valid(entry_id, subtask_id)
|
||||
check_subtask_is_valid(entry_id, subtask_id, subtask_status)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def test_send_email_retried_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=2)
|
||||
update_subtask_status(entry_id, subtask_id, subtask_status)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
# try running with a clean subtask:
|
||||
new_subtask_status = create_subtask_status(subtask_id)
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
|
||||
# try again, with a retried subtask with lower count:
|
||||
new_subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=1)
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
|
||||
|
||||
def dont_test_send_email_undefined_email(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
|
||||
@@ -5,7 +5,7 @@ from time import time
|
||||
import json
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
from celery.states import SUCCESS, READY_STATES
|
||||
from celery.states import SUCCESS, READY_STATES, RETRY
|
||||
|
||||
from django.db import transaction
|
||||
from django.core.cache import cache
|
||||
@@ -96,6 +96,16 @@ def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, r
|
||||
return new_result
|
||||
|
||||
|
||||
def _get_retry_count(subtask_status):
|
||||
"""
|
||||
Calculate the total number of retries.
|
||||
"""
|
||||
count = 0
|
||||
for keyname in ['retried_nomax', 'retried_withmax']:
|
||||
count += subtask_status.get(keyname, 0)
|
||||
return count
|
||||
|
||||
|
||||
def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
"""
|
||||
Store initial subtask information to InstructorTask object.
|
||||
@@ -187,7 +197,7 @@ def _release_subtask_lock(task_id):
|
||||
cache.delete(key)
|
||||
|
||||
|
||||
def check_subtask_is_valid(entry_id, current_task_id):
|
||||
def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
|
||||
"""
|
||||
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
|
||||
|
||||
@@ -210,8 +220,8 @@ def check_subtask_is_valid(entry_id, current_task_id):
|
||||
# Confirm that the InstructorTask actually defines subtasks.
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
if len(entry.subtasks) == 0:
|
||||
format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}': rejecting task {}"
|
||||
msg = format_str.format(current_task_id, entry, new_subtask_status)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
@@ -219,8 +229,8 @@ def check_subtask_is_valid(entry_id, current_task_id):
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status_info = subtask_dict['status']
|
||||
if current_task_id not in subtask_status_info:
|
||||
format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}': rejecting task {}"
|
||||
msg = format_str.format(current_task_id, entry, new_subtask_status)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
@@ -229,11 +239,24 @@ def check_subtask_is_valid(entry_id, current_task_id):
|
||||
subtask_status = subtask_status_info[current_task_id]
|
||||
subtask_state = subtask_status.get('state')
|
||||
if subtask_state in READY_STATES:
|
||||
format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, subtask_status, entry)
|
||||
format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}': rejecting task {}"
|
||||
msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
# Confirm that the InstructorTask doesn't think that this subtask is already being
|
||||
# retried by another task.
|
||||
if subtask_state == RETRY:
|
||||
# Check to see if the input number of retries is less than the recorded number.
|
||||
# If so, then this is an earlier version of the task, and a duplicate.
|
||||
new_retry_count = _get_retry_count(new_subtask_status)
|
||||
current_retry_count = _get_retry_count(subtask_status)
|
||||
if new_retry_count < current_retry_count:
|
||||
format_str = "Unexpected task_id '{}': already retried - status {} for subtask of instructor task '{}': rejecting task {}"
|
||||
msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
# Now we are ready to start working on this. Try to lock it.
|
||||
# If it fails, then it means that another worker is already in the
|
||||
# middle of working on this.
|
||||
|
||||
Reference in New Issue
Block a user