Check that email subtasks are known to the InstructorTask before executing.
This commit is contained in:
@@ -5,6 +5,7 @@ to a course.
|
||||
import math
|
||||
import re
|
||||
import random
|
||||
import json
|
||||
from uuid import uuid4
|
||||
from time import sleep
|
||||
|
||||
@@ -41,6 +42,7 @@ from instructor_task.subtasks import (
|
||||
create_subtask_status,
|
||||
increment_subtask_status,
|
||||
initialize_subtask_info,
|
||||
check_subtask_is_valid,
|
||||
)
|
||||
|
||||
log = get_task_logger(__name__)
|
||||
@@ -204,6 +206,18 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
|
||||
raise
|
||||
|
||||
# Check to see if email batches have already been defined. This seems to
|
||||
# happen sometimes when there is a loss of connection while a task is being
|
||||
# queued. When this happens, the same task gets called again, and a whole
|
||||
# new raft of subtasks gets queued up. We will assume that if subtasks
|
||||
# have already been defined, there is no need to redefine them below.
|
||||
# So we just return right away. We don't raise an exception, because we want
|
||||
# the current task to be marked with whatever it had been marked with before.
|
||||
if len(entry.subtasks) > 0 and len(entry.task_output) > 0:
|
||||
log.warning("Task %s has already been processed for email %s! InstructorTask = %s", task_id, email_id, entry)
|
||||
progress = json.loads(entry.task_output)
|
||||
return progress
|
||||
|
||||
# Sanity check that course for email_obj matches that of the task referencing it.
|
||||
if course_id != email_obj.course_id:
|
||||
format_msg = "Course id conflict: explicit value {} does not match email value {}"
|
||||
@@ -296,15 +310,19 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
Emails are sent multi-part, in both plain text and html. Updates InstructorTask object
|
||||
with status information (sends, failures, skips) and updates number of subtasks completed.
|
||||
"""
|
||||
# Get entry here, as a sanity check that it actually exists. We won't actually do anything
|
||||
# with it right away, but we also don't expect it to fail.
|
||||
InstructorTask.objects.get(pk=entry_id)
|
||||
|
||||
current_task_id = subtask_status['task_id']
|
||||
num_to_send = len(to_list)
|
||||
log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
|
||||
email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
|
||||
|
||||
# Check that the requested subtask is actually known to the current InstructorTask entry.
|
||||
# If this fails, it throws an exception, which should fail this subtask immediately.
|
||||
# This can happen when the parent task has been run twice, and results in duplicate
|
||||
# subtasks being created for the same InstructorTask entry. We hope to catch this condition
|
||||
# in perform_delegate_email_batches(), but just in case we fail to do so there,
|
||||
# we check here as well.
|
||||
check_subtask_is_valid(entry_id, current_task_id)
|
||||
|
||||
send_exception = None
|
||||
new_subtask_status = None
|
||||
try:
|
||||
|
||||
@@ -19,7 +19,7 @@ from student.tests.factories import UserFactory, AdminFactory, CourseEnrollmentF
|
||||
from bulk_email.models import CourseEmail, SEND_TO_ALL
|
||||
from bulk_email.tasks import perform_delegate_email_batches, send_course_email
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.subtasks import create_subtask_status
|
||||
from instructor_task.subtasks import create_subtask_status, initialize_subtask_info
|
||||
|
||||
|
||||
class EmailTestException(Exception):
|
||||
@@ -201,7 +201,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
with self.assertRaisesRegexp(ValueError, 'does not match email value'):
|
||||
perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") # pylint: disable=E1101
|
||||
|
||||
def test_send_email_undefined_email(self):
|
||||
def test_send_email_undefined_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
@@ -209,6 +209,33 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
subtask_id = "subtask-id-value"
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
email_id = 1001
|
||||
with self.assertRaisesRegexp(ValueError, 'unable to find email subtasks of instructor task'):
|
||||
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def test_send_email_missing_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
different_subtask_id = "bogus-subtask-id-value"
|
||||
subtask_status = create_subtask_status(different_subtask_id)
|
||||
bogus_email_id = 1001
|
||||
with self.assertRaisesRegexp(ValueError, 'unable to find status for email subtask of instructor task'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def dont_test_send_email_undefined_email(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
bogus_email_id = 1001
|
||||
with self.assertRaises(CourseEmail.DoesNotExist):
|
||||
# we skip the call that updates subtask status, since we've not set up the InstructorTask
|
||||
|
||||
@@ -186,6 +186,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
|
||||
self.assertGreater(status.get('duration_ms'), 0)
|
||||
self.assertEquals(entry.task_state, SUCCESS)
|
||||
self._assert_single_subtask_status(entry, succeeded, failed, skipped, retried_nomax, retried_withmax)
|
||||
return entry
|
||||
|
||||
def test_successful(self):
|
||||
# Select number of emails to fit into a single subtask.
|
||||
@@ -196,6 +197,23 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
|
||||
get_conn.return_value.send_messages.side_effect = cycle([None])
|
||||
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)
|
||||
|
||||
def test_successful_twice(self):
|
||||
# Select number of emails to fit into a single subtask.
|
||||
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
|
||||
# We also send email to the instructor:
|
||||
self._create_students(num_emails - 1)
|
||||
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
|
||||
get_conn.return_value.send_messages.side_effect = cycle([None])
|
||||
task_entry = self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)
|
||||
|
||||
# submit the same task a second time, and confirm that it is not run again.
|
||||
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
|
||||
get_conn.return_value.send_messages.side_effect = cycle([Exception("This should not happen!")])
|
||||
parent_status = self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)
|
||||
self.assertEquals(parent_status.get('total'), num_emails)
|
||||
self.assertEquals(parent_status.get('succeeded'), num_emails)
|
||||
self.assertEquals(parent_status.get('failed'), 0)
|
||||
|
||||
def test_unactivated_user(self):
|
||||
# Select number of emails to fit into a single subtask.
|
||||
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
|
||||
|
||||
@@ -147,6 +147,34 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
return task_progress
|
||||
|
||||
|
||||
def check_subtask_is_valid(entry_id, current_task_id):
|
||||
"""
|
||||
Confirms that the current subtask is known to the InstructorTask.
|
||||
|
||||
This may happen if a task that spawns subtasks is called twice with
|
||||
the same task_id and InstructorTask entry_id. The set of subtasks
|
||||
that are recorded in the InstructorTask from the first call get clobbered
|
||||
by the the second set of subtasks. So when the first set of subtasks
|
||||
actually run, they won't be found in the InstructorTask.
|
||||
|
||||
Raises a ValueError exception if not.
|
||||
"""
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
if len(entry.subtasks) == 0:
|
||||
format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise ValueError(msg)
|
||||
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status_info = subtask_dict['status']
|
||||
if current_task_id not in subtask_status_info:
|
||||
format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
@transaction.commit_manually
|
||||
def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user