Merge pull request #1388 from edx/brian/fix-email-subtask-dupes
Check that a subtask has not already completed before running.
This commit is contained in:
@@ -318,9 +318,12 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
# Check that the requested subtask is actually known to the current InstructorTask entry.
|
||||
# If this fails, it throws an exception, which should fail this subtask immediately.
|
||||
# This can happen when the parent task has been run twice, and results in duplicate
|
||||
# subtasks being created for the same InstructorTask entry. We hope to catch this condition
|
||||
# in perform_delegate_email_batches(), but just in case we fail to do so there,
|
||||
# we check here as well.
|
||||
# subtasks being created for the same InstructorTask entry. This can happen when Celery
|
||||
# loses its connection to its broker, and any current tasks get requeued.
|
||||
# We hope to catch this condition in perform_delegate_email_batches() when it's the parent
|
||||
# task that is resubmitted, but just in case we fail to do so there, we check here as well.
|
||||
# There is also a possibility that this task will be run twice by Celery, for the same reason.
|
||||
# To deal with that, we need to confirm that the task has not already been completed.
|
||||
check_subtask_is_valid(entry_id, current_task_id)
|
||||
|
||||
send_exception = None
|
||||
|
||||
@@ -5,6 +5,8 @@ from itertools import cycle
|
||||
from mock import patch
|
||||
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
|
||||
|
||||
from celery.states import SUCCESS
|
||||
|
||||
from django.test.utils import override_settings
|
||||
from django.conf import settings
|
||||
from django.core.management import call_command
|
||||
@@ -19,7 +21,12 @@ from student.tests.factories import UserFactory, AdminFactory, CourseEnrollmentF
|
||||
from bulk_email.models import CourseEmail, SEND_TO_ALL
|
||||
from bulk_email.tasks import perform_delegate_email_batches, send_course_email
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.subtasks import create_subtask_status, initialize_subtask_info
|
||||
from instructor_task.subtasks import (
|
||||
create_subtask_status,
|
||||
initialize_subtask_info,
|
||||
update_subtask_status,
|
||||
DuplicateTaskException,
|
||||
)
|
||||
|
||||
|
||||
class EmailTestException(Exception):
|
||||
@@ -210,7 +217,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
subtask_id = "subtask-id-value"
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
email_id = 1001
|
||||
with self.assertRaisesRegexp(ValueError, 'unable to find email subtasks of instructor task'):
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find email subtasks of instructor task'):
|
||||
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def test_send_email_missing_subtask(self):
|
||||
@@ -224,9 +231,24 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
different_subtask_id = "bogus-subtask-id-value"
|
||||
subtask_status = create_subtask_status(different_subtask_id)
|
||||
bogus_email_id = 1001
|
||||
with self.assertRaisesRegexp(ValueError, 'unable to find status for email subtask of instructor task'):
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for email subtask of instructor task'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def test_send_email_completed_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id, state=SUCCESS)
|
||||
update_subtask_status(entry_id, subtask_id, subtask_status)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
new_subtask_status = create_subtask_status(subtask_id)
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already completed'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
|
||||
|
||||
def dont_test_send_email_undefined_email(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
|
||||
@@ -14,6 +14,11 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
|
||||
TASK_LOG = get_task_logger(__name__)
|
||||
|
||||
|
||||
class DuplicateTaskException(Exception):
|
||||
"""Exception indicating that a task already exists or has already completed."""
|
||||
pass
|
||||
|
||||
|
||||
def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
|
||||
"""
|
||||
Create and return a dict for tracking the status of a subtask.
|
||||
@@ -149,30 +154,48 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
|
||||
def check_subtask_is_valid(entry_id, current_task_id):
|
||||
"""
|
||||
Confirms that the current subtask is known to the InstructorTask.
|
||||
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
|
||||
|
||||
This may happen if a task that spawns subtasks is called twice with
|
||||
the same task_id and InstructorTask entry_id. The set of subtasks
|
||||
that are recorded in the InstructorTask from the first call get clobbered
|
||||
by the the second set of subtasks. So when the first set of subtasks
|
||||
actually run, they won't be found in the InstructorTask.
|
||||
Problems can occur when the parent task has been run twice, and results in duplicate
|
||||
subtasks being created for the same InstructorTask entry. This can happen when Celery
|
||||
loses its connection to its broker, and any current tasks get requeued.
|
||||
|
||||
Raises a ValueError exception if not.
|
||||
If a parent task gets requeued, then the same InstructorTask may have a different set of
|
||||
subtasks defined (to do the same thing), so the subtasks from the first queuing would not
|
||||
be known to the InstructorTask. We return an exception in this case.
|
||||
|
||||
If a subtask gets requeued, then the first time the subtask runs it should run fine to completion.
|
||||
However, we want to prevent it from running again, so we check here to see what the existing
|
||||
subtask's status is. If it is complete, we return an exception.
|
||||
|
||||
Raises a DuplicateTaskException exception if it's not a task that should be run.
|
||||
"""
|
||||
# Confirm that the InstructorTask actually defines subtasks.
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
if len(entry.subtasks) == 0:
|
||||
format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise ValueError(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
# Confirm that the InstructorTask knows about this particular subtask.
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status_info = subtask_dict['status']
|
||||
if current_task_id not in subtask_status_info:
|
||||
format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise ValueError(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
# Confirm that the InstructorTask doesn't think that this subtask has already been
|
||||
# performed successfully.
|
||||
subtask_status = subtask_status_info[current_task_id]
|
||||
subtask_state = subtask_status.get('state')
|
||||
if subtask_state in READY_STATES:
|
||||
format_str = "Unexpected task_id '{}': already completed - status {} for email subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, subtask_status, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
|
||||
@transaction.commit_manually
|
||||
|
||||
Reference in New Issue
Block a user