From a6996740159068251b3b981cccdc071567620180 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Thu, 17 Oct 2013 13:32:44 -0400 Subject: [PATCH] Check that a subtask has not already completed before running. --- lms/djangoapps/bulk_email/tasks.py | 9 ++-- .../bulk_email/tests/test_err_handling.py | 28 +++++++++++-- lms/djangoapps/instructor_task/subtasks.py | 41 +++++++++++++++---- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index a2f376cdcc..2dd168aeb5 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -318,9 +318,12 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask # Check that the requested subtask is actually known to the current InstructorTask entry. # If this fails, it throws an exception, which should fail this subtask immediately. # This can happen when the parent task has been run twice, and results in duplicate - # subtasks being created for the same InstructorTask entry. We hope to catch this condition - # in perform_delegate_email_batches(), but just in case we fail to do so there, - # we check here as well. + # subtasks being created for the same InstructorTask entry. This can happen when Celery + # loses its connection to its broker, and any current tasks get requeued. + # We hope to catch this condition in perform_delegate_email_batches() when it's the parent + # task that is resubmitted, but just in case we fail to do so there, we check here as well. + # There is also a possibility that this task will be run twice by Celery, for the same reason. + # To deal with that, we need to confirm that the task has not already been completed. check_subtask_is_valid(entry_id, current_task_id) send_exception = None diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index a7ec72c56e..c2394a0ae0 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -5,6 +5,8 @@ from itertools import cycle from mock import patch from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError +from celery.states import SUCCESS + from django.test.utils import override_settings from django.conf import settings from django.core.management import call_command @@ -19,7 +21,12 @@ from student.tests.factories import UserFactory, AdminFactory, CourseEnrollmentF from bulk_email.models import CourseEmail, SEND_TO_ALL from bulk_email.tasks import perform_delegate_email_batches, send_course_email from instructor_task.models import InstructorTask -from instructor_task.subtasks import create_subtask_status, initialize_subtask_info +from instructor_task.subtasks import ( + create_subtask_status, + initialize_subtask_info, + update_subtask_status, + DuplicateTaskException, +) class EmailTestException(Exception): @@ -210,7 +217,7 @@ class TestEmailErrors(ModuleStoreTestCase): subtask_id = "subtask-id-value" subtask_status = create_subtask_status(subtask_id) email_id = 1001 - with self.assertRaisesRegexp(ValueError, 'unable to find email subtasks of instructor task'): + with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find email subtasks of instructor task'): send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status) def test_send_email_missing_subtask(self): @@ -224,9 +231,24 @@ class TestEmailErrors(ModuleStoreTestCase): different_subtask_id = "bogus-subtask-id-value" subtask_status = create_subtask_status(different_subtask_id) bogus_email_id = 1001 - with self.assertRaisesRegexp(ValueError, 'unable to find status for email subtask of instructor task'): + with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for email subtask of instructor task'): send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status) + def test_send_email_completed_subtask(self): + # test at a lower level, to ensure that the course gets checked down below too. + entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) + entry_id = entry.id # pylint: disable=E1101 + subtask_id = "subtask-id-value" + initialize_subtask_info(entry, "emailed", 100, [subtask_id]) + subtask_status = create_subtask_status(subtask_id, state=SUCCESS) + update_subtask_status(entry_id, subtask_id, subtask_status) + bogus_email_id = 1001 + to_list = ['test@test.com'] + global_email_context = {'course_title': 'dummy course'} + new_subtask_status = create_subtask_status(subtask_id) + with self.assertRaisesRegexp(DuplicateTaskException, 'already completed'): + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status) + def dont_test_send_email_undefined_email(self): # test at a lower level, to ensure that the course gets checked down below too. entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 175cf084a5..6b8b1e2046 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -14,6 +14,11 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING TASK_LOG = get_task_logger(__name__) +class DuplicateTaskException(Exception): + """Exception indicating that a task already exists or has already completed.""" + pass + + def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): """ Create and return a dict for tracking the status of a subtask. @@ -149,30 +154,48 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): def check_subtask_is_valid(entry_id, current_task_id): """ - Confirms that the current subtask is known to the InstructorTask. + Confirms that the current subtask is known to the InstructorTask and hasn't already been completed. - This may happen if a task that spawns subtasks is called twice with - the same task_id and InstructorTask entry_id. The set of subtasks - that are recorded in the InstructorTask from the first call get clobbered - by the the second set of subtasks. So when the first set of subtasks - actually run, they won't be found in the InstructorTask. + Problems can occur when the parent task has been run twice, and results in duplicate + subtasks being created for the same InstructorTask entry. This can happen when Celery + loses its connection to its broker, and any current tasks get requeued. - Raises a ValueError exception if not. + If a parent task gets requeued, then the same InstructorTask may have a different set of + subtasks defined (to do the same thing), so the subtasks from the first queuing would not + be known to the InstructorTask. We return an exception in this case. + + If a subtask gets requeued, then the first time the subtask runs it should run fine to completion. + However, we want to prevent it from running again, so we check here to see what the existing + subtask's status is. If it is complete, we return an exception. + + Raises a DuplicateTaskException exception if it's not a task that should be run. """ + # Confirm that the InstructorTask actually defines subtasks. entry = InstructorTask.objects.get(pk=entry_id) if len(entry.subtasks) == 0: format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'" msg = format_str.format(current_task_id, entry) TASK_LOG.warning(msg) - raise ValueError(msg) + raise DuplicateTaskException(msg) + # Confirm that the InstructorTask knows about this particular subtask. subtask_dict = json.loads(entry.subtasks) subtask_status_info = subtask_dict['status'] if current_task_id not in subtask_status_info: format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'" msg = format_str.format(current_task_id, entry) TASK_LOG.warning(msg) - raise ValueError(msg) + raise DuplicateTaskException(msg) + + # Confirm that the InstructorTask doesn't think that this subtask has already been + # performed successfully. + subtask_status = subtask_status_info[current_task_id] + subtask_state = subtask_status.get('state') + if subtask_state in READY_STATES: + format_str = "Unexpected task_id '{}': already completed - status {} for email subtask of instructor task '{}'" + msg = format_str.format(current_task_id, subtask_status, entry) + TASK_LOG.warning(msg) + raise DuplicateTaskException(msg) @transaction.commit_manually