From 288ccb1e36f7dc462f5bf0d74f5344f4b0db0a02 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Tue, 15 Oct 2013 14:22:20 -0400 Subject: [PATCH] Check that email subtasks are known to the InstructorTask before executing. --- lms/djangoapps/bulk_email/tasks.py | 26 +++++++++++++--- .../bulk_email/tests/test_err_handling.py | 31 +++++++++++++++++-- lms/djangoapps/bulk_email/tests/test_tasks.py | 18 +++++++++++ lms/djangoapps/instructor_task/subtasks.py | 28 +++++++++++++++++ 4 files changed, 97 insertions(+), 6 deletions(-) diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 648e175ee1..a2f376cdcc 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -5,6 +5,7 @@ to a course. import math import re import random +import json from uuid import uuid4 from time import sleep @@ -41,6 +42,7 @@ from instructor_task.subtasks import ( create_subtask_status, increment_subtask_status, initialize_subtask_info, + check_subtask_is_valid, ) log = get_task_logger(__name__) @@ -204,6 +206,18 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id) raise + # Check to see if email batches have already been defined. This seems to + # happen sometimes when there is a loss of connection while a task is being + # queued. When this happens, the same task gets called again, and a whole + # new raft of subtasks gets queued up. We will assume that if subtasks + # have already been defined, there is no need to redefine them below. + # So we just return right away. We don't raise an exception, because we want + # the current task to be marked with whatever it had been marked with before. + if len(entry.subtasks) > 0 and len(entry.task_output) > 0: + log.warning("Task %s has already been processed for email %s! InstructorTask = %s", task_id, email_id, entry) + progress = json.loads(entry.task_output) + return progress + # Sanity check that course for email_obj matches that of the task referencing it. if course_id != email_obj.course_id: format_msg = "Course id conflict: explicit value {} does not match email value {}" @@ -296,15 +310,19 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask Emails are sent multi-part, in both plain text and html. Updates InstructorTask object with status information (sends, failures, skips) and updates number of subtasks completed. """ - # Get entry here, as a sanity check that it actually exists. We won't actually do anything - # with it right away, but we also don't expect it to fail. - InstructorTask.objects.get(pk=entry_id) - current_task_id = subtask_status['task_id'] num_to_send = len(to_list) log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s", email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status) + # Check that the requested subtask is actually known to the current InstructorTask entry. + # If this fails, it throws an exception, which should fail this subtask immediately. + # This can happen when the parent task has been run twice, and results in duplicate + # subtasks being created for the same InstructorTask entry. We hope to catch this condition + # in perform_delegate_email_batches(), but just in case we fail to do so there, + # we check here as well. + check_subtask_is_valid(entry_id, current_task_id) + send_exception = None new_subtask_status = None try: diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index 9d03c020e6..dfe098306c 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -19,7 +19,7 @@ from student.tests.factories import UserFactory, AdminFactory, CourseEnrollmentF from bulk_email.models import CourseEmail, SEND_TO_ALL from bulk_email.tasks import perform_delegate_email_batches, send_course_email from instructor_task.models import InstructorTask -from instructor_task.subtasks import create_subtask_status +from instructor_task.subtasks import create_subtask_status, initialize_subtask_info class EmailTestException(Exception): @@ -201,7 +201,7 @@ class TestEmailErrors(ModuleStoreTestCase): with self.assertRaisesRegexp(ValueError, 'does not match email value'): perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") # pylint: disable=E1101 - def test_send_email_undefined_email(self): + def test_send_email_undefined_subtask(self): # test at a lower level, to ensure that the course gets checked down below too. entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) entry_id = entry.id # pylint: disable=E1101 @@ -209,6 +209,33 @@ class TestEmailErrors(ModuleStoreTestCase): global_email_context = {'course_title': 'dummy course'} subtask_id = "subtask-id-value" subtask_status = create_subtask_status(subtask_id) + email_id = 1001 + with self.assertRaisesRegexp(ValueError, 'unable to find email subtasks of instructor task'): + send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status) + + def test_send_email_missing_subtask(self): + # test at a lower level, to ensure that the course gets checked down below too. + entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) + entry_id = entry.id # pylint: disable=E1101 + to_list = ['test@test.com'] + global_email_context = {'course_title': 'dummy course'} + subtask_id = "subtask-id-value" + initialize_subtask_info(entry, "emailed", 100, [subtask_id]) + different_subtask_id = "bogus-subtask-id-value" + subtask_status = create_subtask_status(different_subtask_id) + bogus_email_id = 1001 + with self.assertRaisesRegexp(ValueError, 'unable to find status for email subtask of instructor task'): + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status) + + def dont_test_send_email_undefined_email(self): + # test at a lower level, to ensure that the course gets checked down below too. + entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) + entry_id = entry.id # pylint: disable=E1101 + to_list = ['test@test.com'] + global_email_context = {'course_title': 'dummy course'} + subtask_id = "subtask-id-value" + initialize_subtask_info(entry, "emailed", 100, [subtask_id]) + subtask_status = create_subtask_status(subtask_id) bogus_email_id = 1001 with self.assertRaises(CourseEmail.DoesNotExist): # we skip the call that updates subtask status, since we've not set up the InstructorTask diff --git a/lms/djangoapps/bulk_email/tests/test_tasks.py b/lms/djangoapps/bulk_email/tests/test_tasks.py index fadb4122b5..e49a04147a 100644 --- a/lms/djangoapps/bulk_email/tests/test_tasks.py +++ b/lms/djangoapps/bulk_email/tests/test_tasks.py @@ -186,6 +186,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): self.assertGreater(status.get('duration_ms'), 0) self.assertEquals(entry.task_state, SUCCESS) self._assert_single_subtask_status(entry, succeeded, failed, skipped, retried_nomax, retried_withmax) + return entry def test_successful(self): # Select number of emails to fit into a single subtask. @@ -196,6 +197,23 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): get_conn.return_value.send_messages.side_effect = cycle([None]) self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails) + def test_successful_twice(self): + # Select number of emails to fit into a single subtask. + num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK + # We also send email to the instructor: + self._create_students(num_emails - 1) + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + get_conn.return_value.send_messages.side_effect = cycle([None]) + task_entry = self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails) + + # submit the same task a second time, and confirm that it is not run again. + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + get_conn.return_value.send_messages.side_effect = cycle([Exception("This should not happen!")]) + parent_status = self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id) + self.assertEquals(parent_status.get('total'), num_emails) + self.assertEquals(parent_status.get('succeeded'), num_emails) + self.assertEquals(parent_status.get('failed'), 0) + def test_unactivated_user(self): # Select number of emails to fit into a single subtask. num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 17da1b9ed6..175cf084a5 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -147,6 +147,34 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): return task_progress +def check_subtask_is_valid(entry_id, current_task_id): + """ + Confirms that the current subtask is known to the InstructorTask. + + This may happen if a task that spawns subtasks is called twice with + the same task_id and InstructorTask entry_id. The set of subtasks + that are recorded in the InstructorTask from the first call get clobbered + by the the second set of subtasks. So when the first set of subtasks + actually run, they won't be found in the InstructorTask. + + Raises a ValueError exception if not. + """ + entry = InstructorTask.objects.get(pk=entry_id) + if len(entry.subtasks) == 0: + format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'" + msg = format_str.format(current_task_id, entry) + TASK_LOG.warning(msg) + raise ValueError(msg) + + subtask_dict = json.loads(entry.subtasks) + subtask_status_info = subtask_dict['status'] + if current_task_id not in subtask_status_info: + format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'" + msg = format_str.format(current_task_id, entry) + TASK_LOG.warning(msg) + raise ValueError(msg) + + @transaction.commit_manually def update_subtask_status(entry_id, current_task_id, new_subtask_status): """