diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 6f1bfd005b..1bdb32d133 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -346,7 +346,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask # task that is resubmitted, but just in case we fail to do so there, we check here as well. # There is also a possibility that this task will be run twice by Celery, for the same reason. # To deal with that, we need to confirm that the task has not already been completed. - check_subtask_is_valid(entry_id, current_task_id) + check_subtask_is_valid(entry_id, current_task_id, subtask_status) send_exception = None new_subtask_status = None diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index 93188755f0..22aabeaa02 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -5,7 +5,7 @@ from itertools import cycle from mock import patch from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError -from celery.states import SUCCESS +from celery.states import SUCCESS, RETRY from django.test.utils import override_settings from django.conf import settings @@ -258,13 +258,33 @@ class TestEmailErrors(ModuleStoreTestCase): initialize_subtask_info(entry, "emailed", 100, [subtask_id]) subtask_status = create_subtask_status(subtask_id) update_subtask_status(entry_id, subtask_id, subtask_status) - check_subtask_is_valid(entry_id, subtask_id) + check_subtask_is_valid(entry_id, subtask_id, subtask_status) bogus_email_id = 1001 to_list = ['test@test.com'] global_email_context = {'course_title': 'dummy course'} with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'): send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status) + def test_send_email_retried_subtask(self): + # test at a lower level, to ensure that the course gets checked down below too. + entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) + entry_id = entry.id # pylint: disable=E1101 + subtask_id = "subtask-id-value" + initialize_subtask_info(entry, "emailed", 100, [subtask_id]) + subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=2) + update_subtask_status(entry_id, subtask_id, subtask_status) + bogus_email_id = 1001 + to_list = ['test@test.com'] + global_email_context = {'course_title': 'dummy course'} + # try running with a clean subtask: + new_subtask_status = create_subtask_status(subtask_id) + with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'): + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status) + # try again, with a retried subtask with lower count: + new_subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=1) + with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'): + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status) + def dont_test_send_email_undefined_email(self): # test at a lower level, to ensure that the course gets checked down below too. entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 1e7e266599..68bd24a1f8 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -5,7 +5,7 @@ from time import time import json from celery.utils.log import get_task_logger -from celery.states import SUCCESS, READY_STATES +from celery.states import SUCCESS, READY_STATES, RETRY from django.db import transaction from django.core.cache import cache @@ -96,6 +96,16 @@ def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, r return new_result +def _get_retry_count(subtask_status): + """ + Calculate the total number of retries. + """ + count = 0 + for keyname in ['retried_nomax', 'retried_withmax']: + count += subtask_status.get(keyname, 0) + return count + + def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): """ Store initial subtask information to InstructorTask object. @@ -187,7 +197,7 @@ def _release_subtask_lock(task_id): cache.delete(key) -def check_subtask_is_valid(entry_id, current_task_id): +def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status): """ Confirms that the current subtask is known to the InstructorTask and hasn't already been completed. @@ -210,8 +220,8 @@ def check_subtask_is_valid(entry_id, current_task_id): # Confirm that the InstructorTask actually defines subtasks. entry = InstructorTask.objects.get(pk=entry_id) if len(entry.subtasks) == 0: - format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}'" - msg = format_str.format(current_task_id, entry) + format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}': rejecting task {}" + msg = format_str.format(current_task_id, entry, new_subtask_status) TASK_LOG.warning(msg) raise DuplicateTaskException(msg) @@ -219,8 +229,8 @@ def check_subtask_is_valid(entry_id, current_task_id): subtask_dict = json.loads(entry.subtasks) subtask_status_info = subtask_dict['status'] if current_task_id not in subtask_status_info: - format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}'" - msg = format_str.format(current_task_id, entry) + format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}': rejecting task {}" + msg = format_str.format(current_task_id, entry, new_subtask_status) TASK_LOG.warning(msg) raise DuplicateTaskException(msg) @@ -229,11 +239,24 @@ def check_subtask_is_valid(entry_id, current_task_id): subtask_status = subtask_status_info[current_task_id] subtask_state = subtask_status.get('state') if subtask_state in READY_STATES: - format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}'" - msg = format_str.format(current_task_id, subtask_status, entry) + format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}': rejecting task {}" + msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status) TASK_LOG.warning(msg) raise DuplicateTaskException(msg) + # Confirm that the InstructorTask doesn't think that this subtask is already being + # retried by another task. + if subtask_state == RETRY: + # Check to see if the input number of retries is less than the recorded number. + # If so, then this is an earlier version of the task, and a duplicate. + new_retry_count = _get_retry_count(new_subtask_status) + current_retry_count = _get_retry_count(subtask_status) + if new_retry_count < current_retry_count: + format_str = "Unexpected task_id '{}': already retried - status {} for subtask of instructor task '{}': rejecting task {}" + msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status) + TASK_LOG.warning(msg) + raise DuplicateTaskException(msg) + # Now we are ready to start working on this. Try to lock it. # If it fails, then it means that another worker is already in the # middle of working on this.