diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 6c1d517b7d..07f1af05d2 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -12,8 +12,14 @@ from sys import exc_info from traceback import format_exc from dogapi import dog_stats_api -from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError -from boto.ses.exceptions import SESDailyQuotaExceededError, SESMaxSendingRateExceededError +from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError, SMTPException +from boto.ses.exceptions import ( + SESDailyQuotaExceededError, + SESMaxSendingRateExceededError, + SESAddressBlacklistedError, + SESIllegalAddressError, + SESLocalAddressCharacterError, +) from boto.exception import AWSConnectionError from celery import task, current_task, group @@ -44,18 +50,25 @@ from instructor_task.subtasks import ( log = get_task_logger(__name__) +# Errors that an individual email is failing to be sent, and should just +# be treated as a fail. +SINGLE_EMAIL_FAILURE_ERRORS = (SESAddressBlacklistedError, SESIllegalAddressError, SESLocalAddressCharacterError) + # Exceptions that, if caught, should cause the task to be re-tried. -# These errors will be caught a maximum of 5 times before the task fails. -RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError) +# These errors will be caught a limited number of times before the task fails. +LIMITED_RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError) -# Errors that involve exceeding a quota of sent email -QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, ) - -# Errors that mail is being sent too quickly. When caught by a task, it -# triggers an exponential backoff and retry. Retries happen continuously until -# the email is sent. +# Errors that indicate that a mailing task should be retried without limit. +# An example is if email is being sent too quickly, but may succeed if sent +# more slowly. When caught by a task, it triggers an exponential backoff and retry. +# Retries happen continuously until the email is sent. INFINITE_RETRY_ERRORS = (SESMaxSendingRateExceededError, ) +# Errors that are known to indicate an inability to send any more emails, +# and should therefore not be retried. For example, exceeding a quota for emails. +# Also, any SMTP errors that are not explicitly enumerated above. +BULK_EMAIL_FAILURE_ERRORS = (SESDailyQuotaExceededError, SMTPException) + def _get_recipient_queryset(user_id, to_option, course_id, course_location): """ @@ -118,12 +131,14 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) """ entry = InstructorTask.objects.get(pk=entry_id) # get inputs to use in this task from the entry: - #task_id = entry.task_id user_id = entry.requester.id task_id = entry.task_id - # TODO: check this against argument passed in? - # course_id = entry.course_id + # perfunctory check, since expansion is made for convenience of other task + # code that doesn't need the entry_id. + if course_id != entry.course_id: + format_msg = "Course id conflict: explicit value %s does not match task value %s" + raise ValueError(format_msg.format(course_id, entry.course_id)) email_id = task_input['email_id'] try: @@ -138,15 +153,16 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_option = email_obj.to_option - # TODO: instead of fetching from email object, compare instead to - # confirm that they match, and raise an exception if they don't. - # course_id = email_obj.course_id + # sanity check that course for email_obj matches that of the task referencing it: + if course_id != email_obj.course_id: + format_msg = "Course id conflict: explicit value %s does not match email value %s" + raise ValueError(format_msg.format(course_id, email_obj.course_id)) try: course = get_course_by_id(course_id, depth=1) except Http404 as exc: log.exception("Task %s: get_course_by_id failed: %s", task_id, exc.args[0]) - raise Exception("get_course_by_id failed: " + exc.args[0]) + raise ValueError("Course not found: " + exc.args[0]) global_email_context = _get_course_email_context(course) recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) @@ -173,23 +189,26 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) for i in range(num_tasks_this_query): if i == num_tasks_this_query - 1: # Avoid cutting off the very last email when chunking a task that divides perfectly - # (eg num_emails_this_query = 297 and EMAILS_PER_TASK is 100) + # (e.g. num_emails_this_query = 297 and EMAILS_PER_TASK is 100) to_list = recipient_sublist[i * chunk:] else: to_list = recipient_sublist[i * chunk:i * chunk + chunk] subtask_id = str(uuid4()) subtask_id_list.append(subtask_id) - retry_progress = create_subtask_status() - task_list.append(send_course_email.subtask(( - entry_id, - email_id, - to_list, - global_email_context, - retry_progress, - ), - task_id=subtask_id, - routing_key=settings.HIGH_PRIORITY_QUEUE, - )) + subtask_status = create_subtask_status(subtask_id) + # create subtask, passing args and kwargs: + new_subtask = send_course_email.subtask( + ( + entry_id, + email_id, + to_list, + global_email_context, + subtask_status, + ), + task_id=subtask_id, + routing_key=settings.BULK_EMAIL_ROUTING_KEY, + ) + task_list.append(new_subtask) num_emails_queued += num_emails_this_query # Sanity check: we expect the chunking to be properly summing to the original count: @@ -208,7 +227,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) # now group the subtasks, and start them running: task_group = group(task_list) - task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE) + task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY) # We want to return progress here, as this is what will be stored in the # AsyncResult for the parent task as its return value. @@ -218,13 +237,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) return progress -# TODO: figure out if we really need this after all (for unit tests...) -def _get_current_task(): - """Stub to make it easier to test without actually running Celery""" - return current_task - - -@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102 +@task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES) # pylint: disable=E1102 def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status): """ Sends an email to a list of recipients. @@ -249,8 +262,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask # with it right away, but we also don't expect it to fail. InstructorTask.objects.get(pk=entry_id) - # Get information from current task's request: - current_task_id = _get_current_task().request.id + current_task_id = subtask_status['task_id'] num_to_send = len(to_list) log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s", email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status) @@ -295,6 +307,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask update_subtask_status(entry_id, current_task_id, new_subtask_status) raise send_exception + log.info("background task (%s) returning status %s", current_task_id, new_subtask_status) return new_subtask_status @@ -332,12 +345,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas 'failed' count above. """ # Get information from current task's request: - task_id = _get_current_task().request.id - retry_index = _get_current_task().request.retries + #task_id = _get_current_task().request.id + #retry_index = _get_current_task().request.retries + task_id = subtask_status['task_id'] # If this is a second attempt, then throttle the speed at which mail is sent: - throttle = retry_index > 0 + throttle = subtask_status['retried_nomax'] > 0 + # collect stats on progress: num_optout = 0 num_sent = 0 num_error = 0 @@ -354,7 +369,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # attempt. Anyone on the to_list on a retry has already passed the filter # that existed at that time, and we don't need to keep checking for changes # in the Optout list. - if retry_index == 0: + if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0: optouts = (Optout.objects.filter(course_id=course_email.course_id, user__in=[i['pk'] for i in to_list]) .values_list('user__email', flat=True)) @@ -403,7 +418,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas ) email_msg.attach_alternative(html_msg, 'text/html') - # Throttle if we tried a few times and got the rate limiter + # Throttle if we have gotten the rate limiter if throttle: sleep(0.2) @@ -413,11 +428,6 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas with dog_stats_api.timer('course_email.single_send.time.overall', tags=[_statsd_tag(course_title)]): connection.send_messages([email_msg]) - dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)]) - - log.info('Email with id %s sent to %s', email_id, email) - num_sent += 1 - except SMTPDataError as exc: # According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure. if exc.smtp_code >= 400 and exc.smtp_code < 500: @@ -429,52 +439,56 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) num_error += 1 + except SINGLE_EMAIL_FAILURE_ERRORS as exc: + # This will fall through and not retry the message, since it will be popped + log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc) + dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) + num_error += 1 + + else: + dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)]) + + log.info('Email with id %s sent to %s', email_id, email) + num_sent += 1 + # Pop the user that was emailed off the end of the list: to_list.pop() except INFINITE_RETRY_ERRORS as exc: + dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)]) subtask_progress = increment_subtask_status( subtask_status, succeeded=num_sent, failed=num_error, skipped=num_optout, - retriedA=1, + retried_nomax=1, state=RETRY ) return _submit_for_retry( entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True ) - except RETRY_ERRORS as exc: + except LIMITED_RETRY_ERRORS as exc: # Errors caught here cause the email to be retried. The entire task is actually retried # without popping the current recipient off of the existing list. # Errors caught are those that indicate a temporary condition that might succeed on retry. + dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)]) subtask_progress = increment_subtask_status( subtask_status, succeeded=num_sent, failed=num_error, skipped=num_optout, - retriedB=1, + retried_withmax=1, state=RETRY ) return _submit_for_retry( entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False ) - except Exception as exc: - - # If we have a general exception for this request, we need to figure out what to do with it. - # If we're going to just mark it as failed - # And the log message below should indicate which task_id is failing, so we have a chance to - # reconstruct the problems. - if isinstance(exc, QUOTA_EXCEEDED_ERRORS): - log.exception('WARNING: Course "%s" exceeded quota!', course_title) - log.exception('Email with id %d not sent due to exceeding quota. To list: %s', - email_id, - [i['email'] for i in to_list]) - else: - log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s', - task_id, email_id, [i['email'] for i in to_list]) + except BULK_EMAIL_FAILURE_ERRORS as exc: + dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) + log.exception('Task %s: email with id %d caused send_course_email task to fail with "fatal" exception. To list: %s', + task_id, email_id, [i['email'] for i in to_list]) num_error += len(to_list) subtask_progress = increment_subtask_status( subtask_status, @@ -484,6 +498,27 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas state=FAILURE ) return subtask_progress, exc + + except Exception as exc: + # Errors caught here cause the email to be retried. The entire task is actually retried + # without popping the current recipient off of the existing list. + # These are unexpected errors. Since they might be due to a temporary condition that might + # succeed on retry, we give them a retry. + dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)]) + log.exception('Task %s: email with id %d caused send_course_email task to fail with unexpected exception. Generating retry.', + task_id, email_id) + subtask_progress = increment_subtask_status( + subtask_status, + succeeded=num_sent, + failed=num_error, + skipped=num_optout, + retried_withmax=1, + state=RETRY + ) + return _submit_for_retry( + entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False + ) + else: # Successful completion is marked by an exception value of None: subtask_progress = increment_subtask_status( @@ -499,13 +534,18 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas connection.close() -def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress, is_sending_rate_error): +def _get_current_task(): + """Stub to make it easier to test without actually running Celery""" + return current_task + + +def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_status, is_sending_rate_error): """ Helper function to requeue a task for retry, using the new version of arguments provided. Inputs are the same as for running a task, plus two extra indicating the state at the time of retry. These include the `current_exception` that the task encountered that is causing the retry attempt, - and the `subtask_progress` that is to be returned. + and the `subtask_status` that is to be returned. Returns a tuple of two values: * First value is a dict which represents current progress. Keys are: @@ -519,27 +559,29 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current successfully submitted, this value will be the RetryTaskError that retry() returns. Otherwise, it (ought to be) the current_exception passed in. """ - task_id = _get_current_task().request.id - retry_index = _get_current_task().request.retries - + # task_id = _get_current_task().request.id + task_id = subtask_status['task_id'] log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)", - current_task.request.id, subtask_progress['succeeded'], subtask_progress['failed'], subtask_progress['skipped']) + task_id, subtask_status['succeeded'], subtask_status['failed'], subtask_status['skipped']) # Calculate time until we retry this task (in seconds): + max_retries = _get_current_task().max_retries + subtask_status['retried_nomax'] + base_delay = _get_current_task().default_retry_delay if is_sending_rate_error: + retry_index = subtask_status['retried_nomax'] exp = min(retry_index, 5) - countdown = ((2 ** exp) * 15) * random.uniform(.5, 1.25) + countdown = ((2 ** exp) * base_delay) * random.uniform(.5, 1.25) exception_type = 'sending-rate' else: - countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5) + retry_index = subtask_status['retried_withmax'] + countdown = ((2 ** retry_index) * base_delay) * random.uniform(.75, 1.5) exception_type = 'transient' # max_retries is increased by the number of times an "infinite-retry" exception - # has been retried. We want the regular retries to trigger a retry, but not these + # has been retried. We want the regular retries to trigger max-retry checking, but not these # special retries. So we count them separately. - max_retries = _get_current_task().max_retries + subtask_progress['retriedA'] - log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients (with max_retry=%s)', - task_id, email_id, exception_type, current_exception, len(to_list), max_retries) + log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients in %s seconds (with max_retry=%s)', + task_id, email_id, exception_type, current_exception, len(to_list), countdown, max_retries) try: send_course_email.retry( @@ -548,7 +590,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current email_id, to_list, global_email_context, - subtask_progress, + subtask_status, ], exc=current_exception, countdown=countdown, @@ -559,7 +601,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current # If retry call is successful, update with the current progress: log.exception('Task %s: email with id %d caused send_course_email task to retry.', task_id, email_id) - return subtask_progress, retry_error + return subtask_status, retry_error except Exception as retry_exc: # If there are no more retries, because the maximum has been reached, # we expect the original exception to be raised. We catch it here @@ -569,7 +611,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s', task_id, email_id, [i['email'] for i in to_list]) num_failed = len(to_list) - new_subtask_progress = increment_subtask_status(subtask_progress, failed=num_failed, state=FAILURE) + new_subtask_progress = increment_subtask_status(subtask_status, failed=num_failed, state=FAILURE) return new_subtask_progress, retry_exc diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index 540b81baa3..b0e20ae532 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -4,7 +4,6 @@ Unit tests for handling email sending errors from itertools import cycle from mock import patch, Mock from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError -from unittest import skip from django.test.utils import override_settings from django.conf import settings @@ -93,9 +92,9 @@ class TestEmailErrors(ModuleStoreTestCase): # Test that after the rejected email, the rest still successfully send ((_initial_results), kwargs) = result.call_args self.assertEquals(kwargs['skipped'], 0) - expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0) - self.assertEquals(kwargs['failed'], expectedNumFails) - self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expectedNumFails) + expected_fails = int((settings.EMAILS_PER_TASK + 3) / 4.0) + self.assertEquals(kwargs['failed'], expected_fails) + self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expected_fails) @patch('bulk_email.tasks.get_connection', autospec=True) @patch('bulk_email.tasks.send_course_email.retry') @@ -144,7 +143,7 @@ class TestEmailErrors(ModuleStoreTestCase): @patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException)) def test_general_exception(self, mock_log, retry, result): """ - Tests the if the error is not SMTP-related, we log and reraise + Tests the if the error is unexpected, we log and retry """ test_email = { 'action': 'Send email', @@ -156,11 +155,10 @@ class TestEmailErrors(ModuleStoreTestCase): # so we assert on the arguments of log.exception self.client.post(self.url, test_email) self.assertTrue(mock_log.exception.called) - ((log_str, _task_id, email_id, to_list), _) = mock_log.exception.call_args - self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str) + ((log_str, _task_id, email_id), _) = mock_log.exception.call_args + self.assertIn('caused send_course_email task to fail with unexpected exception.', log_str) self.assertEqual(email_id, 1) - self.assertEqual(to_list, [self.instructor.email]) - self.assertFalse(retry.called) + self.assertTrue(retry.called) # check the results being returned self.assertTrue(result.called) ((initial_results, ), kwargs) = result.call_args @@ -180,7 +178,7 @@ class TestEmailErrors(ModuleStoreTestCase): entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor) task_input = {"email_id": -1} with self.assertRaises(CourseEmail.DoesNotExist): - perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") + perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101 ((log_str, _, email_id), _) = mock_log.warning.call_args self.assertTrue(mock_log.warning.called) self.assertIn('Failed to get CourseEmail with id', log_str) @@ -196,9 +194,9 @@ class TestEmailErrors(ModuleStoreTestCase): email = CourseEmail(course_id=course_id) email.save() entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor) - task_input = {"email_id": email.id} + task_input = {"email_id": email.id} # pylint: disable=E1101 with self.assertRaises(Exception): - perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") + perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101 ((log_str, _, _), _) = mock_log.exception.call_args self.assertTrue(mock_log.exception.called) self.assertIn('get_course_by_id failed:', log_str) @@ -211,9 +209,9 @@ class TestEmailErrors(ModuleStoreTestCase): email = CourseEmail(course_id=self.course.id, to_option="IDONTEXIST") email.save() entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) - task_input = {"email_id": email.id} + task_input = {"email_id": email.id} # pylint: disable=E1101 with self.assertRaises(Exception): - perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") + perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") # pylint: disable=E1101 ((log_str, opt_str), _) = mock_log.error.call_args self.assertTrue(mock_log.error.called) self.assertIn('Unexpected bulk email TO_OPTION found', log_str) diff --git a/lms/djangoapps/bulk_email/tests/test_tasks.py b/lms/djangoapps/bulk_email/tests/test_tasks.py new file mode 100644 index 0000000000..6ee8accda5 --- /dev/null +++ b/lms/djangoapps/bulk_email/tests/test_tasks.py @@ -0,0 +1,162 @@ +""" +Unit tests for LMS instructor-initiated background tasks. + +Runs tasks on answers to course problems to validate that code +paths actually work. + +""" +import json +from uuid import uuid4 +from itertools import cycle +from mock import patch, Mock +from smtplib import SMTPDataError, SMTPServerDisconnected + +from celery.states import SUCCESS + +# from django.test.utils import override_settings +from django.conf import settings +from django.core.management import call_command + +from bulk_email.models import CourseEmail, SEND_TO_ALL + +# from instructor_task.tests.test_tasks import TestInstructorTasks +from instructor_task.tasks import send_bulk_course_email +from instructor_task.models import InstructorTask +from instructor_task.tests.test_base import InstructorTaskCourseTestCase +from instructor_task.tests.factories import InstructorTaskFactory + + +class TestTaskFailure(Exception): + """Dummy exception used for unit tests.""" + pass + + +class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): + """Tests instructor task that send bulk email.""" + + def setUp(self): + super(TestBulkEmailInstructorTask, self).setUp() + self.initialize_course() + self.instructor = self.create_instructor('instructor') + + # load initial content (since we don't run migrations as part of tests): + call_command("loaddata", "course_email_template.json") + + def _create_input_entry(self, course_id=None): + """ + Creates a InstructorTask entry for testing. + + Overrides the base class version in that this creates CourseEmail. + """ + to_option = SEND_TO_ALL + course_id = course_id or self.course.id + course_email = CourseEmail.create(course_id, self.instructor, to_option, "Test Subject", "
This is a test message
") + task_input = {'email_id': course_email.id} + task_id = str(uuid4()) + instructor_task = InstructorTaskFactory.create( + course_id=course_id, + requester=self.instructor, + task_input=json.dumps(task_input), + task_key='dummy value', + task_id=task_id, + ) + return instructor_task + + def _run_task_with_mock_celery(self, task_class, entry_id, task_id, expected_failure_message=None): + """Submit a task and mock how celery provides a current_task.""" + self.current_task = Mock() + self.current_task.max_retries = settings.BULK_EMAIL_MAX_RETRIES + self.current_task.default_retry_delay = settings.BULK_EMAIL_DEFAULT_RETRY_DELAY + task_args = [entry_id, {}] + + with patch('bulk_email.tasks._get_current_task') as mock_get_task: + mock_get_task.return_value = self.current_task + return task_class.apply(task_args, task_id=task_id).get() + + def test_email_missing_current_task(self): + task_entry = self._create_input_entry() + with self.assertRaises(ValueError): + send_bulk_course_email(task_entry.id, {}) + + def test_email_undefined_course(self): + # Check that we fail when passing in a course that doesn't exist. + task_entry = self._create_input_entry(course_id="bogus/course/id") + with self.assertRaises(ValueError): + self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id) + + def _create_students(self, num_students): + """Create students, a problem, and StudentModule objects for testing""" + students = [ + self.create_student('robot%d' % i) for i in xrange(num_students) + ] + return students + + def _test_run_with_task(self, task_class, action_name, total, succeeded, failed=0, skipped=0): + """Run a task and check the number of emails processed.""" + task_entry = self._create_input_entry() + parent_status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id) + # check return value + self.assertEquals(parent_status.get('total'), total) + self.assertEquals(parent_status.get('action_name'), action_name) + # compare with entry in table: + entry = InstructorTask.objects.get(id=task_entry.id) + status = json.loads(entry.task_output) + self.assertEquals(status.get('attempted'), succeeded + failed) + self.assertEquals(status.get('succeeded'), succeeded) + self.assertEquals(status['skipped'], skipped) + self.assertEquals(status['failed'], failed) + self.assertEquals(status.get('total'), total) + self.assertEquals(status.get('action_name'), action_name) + self.assertGreater(status.get('duration_ms'), 0) + self.assertEquals(entry.task_state, SUCCESS) + + def test_successful(self): + num_students = settings.EMAILS_PER_TASK + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + get_conn.return_value.send_messages.side_effect = cycle([None]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails) + + def test_data_err_fail(self): + # Test that celery handles permanent SMTPDataErrors by failing and not retrying. + num_students = settings.EMAILS_PER_TASK + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + expected_fails = int((num_emails + 3) / 4.0) + expected_succeeds = num_emails - expected_fails + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + # have every fourth email fail due to blacklisting: + get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"), + None, None, None]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails) + + def test_retry_after_limited_retry_error(self): + # Test that celery handles connection failures by retrying. + num_students = 1 + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + expected_fails = 0 + expected_succeeds = num_emails + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + # have every other mail attempt fail due to disconnection: + get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting"), None]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails) + + def test_max_retry(self): + # Test that celery can hit a maximum number of retries. + num_students = 1 + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + # This is an ugly hack: the failures that are reported by the EAGER version of retry + # are multiplied by the attempted number of retries (equals max plus one). + expected_fails = num_emails * (settings.BULK_EMAIL_MAX_RETRIES + 1) + expected_succeeds = 0 + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + # always fail to connect, triggering repeated retries until limit is hit: + get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting")]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails) diff --git a/lms/djangoapps/instructor/views/legacy.py b/lms/djangoapps/instructor/views/legacy.py index 6be07f1a5d..87964599ad 100644 --- a/lms/djangoapps/instructor/views/legacy.py +++ b/lms/djangoapps/instructor/views/legacy.py @@ -720,18 +720,13 @@ def instructor_dashboard(request, course_id): email_subject = request.POST.get("subject") html_message = request.POST.get("message") - # TODO: make sure this is committed before submitting it to the task. - # However, it should probably be enough to do the submit below, which - # will commit the transaction for the InstructorTask object. Both should - # therefore be committed. (Still, it might be clearer to do so here as well.) - # Actually, this should probably be moved out, so that all the validation logic - # we might want to add to it can be added. There might also be something - # that would permit validation of the email beforehand. + # Create the CourseEmail object. This is saved immediately, so that + # any transaction that has been pending up to this point will also be + # committed. email = CourseEmail.create(course_id, request.user, email_to_option, email_subject, html_message) - # TODO: make this into a task submission, so that the correct - # InstructorTask object gets created (for monitoring purposes) - submit_bulk_course_email(request, course_id, email.id) + # Submit the task, so that the correct InstructorTask object gets created (for monitoring purposes) + submit_bulk_course_email(request, course_id, email.id) # pylint: disable=E1101 if email_to_option == "all": email_msg = 'Your email was successfully queued for sending. Please note that for large public classes (~10k), it may take 1-2 hours to send all emails.
This is a test message
") - return course_email.id + return course_email.id # pylint: disable=E1101 def test_submit_bulk_email_all(self): email_id = self._define_course_email() instructor_task = submit_bulk_course_email(self.create_task_request(self.instructor), self.course.id, email_id) # test resubmitting, by updating the existing record: - instructor_task = InstructorTask.objects.get(id=instructor_task.id) + instructor_task = InstructorTask.objects.get(id=instructor_task.id) # pylint: disable=E1101 instructor_task.task_state = PROGRESS instructor_task.save() diff --git a/lms/djangoapps/instructor_task/tests/test_tasks.py b/lms/djangoapps/instructor_task/tests/test_tasks.py index e1f89a6022..448054a13d 100644 --- a/lms/djangoapps/instructor_task/tests/test_tasks.py +++ b/lms/djangoapps/instructor_task/tests/test_tasks.py @@ -85,11 +85,11 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): def _test_missing_current_task(self, task_class): """Check that a task_class fails when celery doesn't provide a current_task.""" task_entry = self._create_input_entry() - with self.assertRaises(UpdateProblemModuleStateError): + with self.assertRaises(ValueError): task_class(task_entry.id, self._get_xmodule_instance_args()) def _test_undefined_course(self, task_class): - # run with celery, but no course defined + """Run with celery, but with no course defined.""" task_entry = self._create_input_entry(course_id="bogus/course/id") with self.assertRaises(ItemNotFoundError): self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id) diff --git a/lms/envs/aws.py b/lms/envs/aws.py index 86d3d539bd..4dc928a208 100644 --- a/lms/envs/aws.py +++ b/lms/envs/aws.py @@ -93,6 +93,10 @@ CELERY_QUEUES = { DEFAULT_PRIORITY_QUEUE: {} } +# We want Bulk Email running on the high-priority queue, so we define the +# routing key that points to it. At the moment, the name is the same. +BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE + ########################## NON-SECURE ENV CONFIG ############################## # Things like server locations, ports, etc. @@ -128,7 +132,7 @@ LOG_DIR = ENV_TOKENS['LOG_DIR'] CACHES = ENV_TOKENS['CACHES'] -#Email overrides +# Email overrides DEFAULT_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_FROM_EMAIL', DEFAULT_FROM_EMAIL) DEFAULT_FEEDBACK_EMAIL = ENV_TOKENS.get('DEFAULT_FEEDBACK_EMAIL', DEFAULT_FEEDBACK_EMAIL) DEFAULT_BULK_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_BULK_FROM_EMAIL', DEFAULT_BULK_FROM_EMAIL) @@ -140,8 +144,10 @@ BUGS_EMAIL = ENV_TOKENS.get('BUGS_EMAIL', BUGS_EMAIL) PAYMENT_SUPPORT_EMAIL = ENV_TOKENS.get('PAYMENT_SUPPORT_EMAIL', PAYMENT_SUPPORT_EMAIL) PAID_COURSE_REGISTRATION_CURRENCY = ENV_TOKENS.get('PAID_COURSE_REGISTRATION_CURRENCY', PAID_COURSE_REGISTRATION_CURRENCY) +BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY) +BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES) -#Theme overrides +# Theme overrides THEME_NAME = ENV_TOKENS.get('THEME_NAME', None) if not THEME_NAME is None: enable_theme(THEME_NAME) @@ -150,10 +156,10 @@ if not THEME_NAME is None: # Marketing link overrides MKTG_URL_LINK_MAP.update(ENV_TOKENS.get('MKTG_URL_LINK_MAP', {})) -#Timezone overrides +# Timezone overrides TIME_ZONE = ENV_TOKENS.get('TIME_ZONE', TIME_ZONE) -#Additional installed apps +# Additional installed apps for app in ENV_TOKENS.get('ADDL_INSTALLED_APPS', []): INSTALLED_APPS += (app,) diff --git a/lms/envs/common.py b/lms/envs/common.py index 96b304294d..0c12d088c9 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -795,6 +795,17 @@ CELERY_QUEUES = { DEFAULT_PRIORITY_QUEUE: {} } +# let logging work as configured: +CELERYD_HIJACK_ROOT_LOGGER = False + +################################ Bulk Email ################################### + +# We want Bulk Email running on the high-priority queue, so we define the +# routing key that points to it. At the moment, the name is the same. +BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE +BULK_EMAIL_DEFAULT_RETRY_DELAY = 15 +BULK_EMAIL_MAX_RETRIES = 5 + ################################### APPS ###################################### INSTALLED_APPS = ( # Standard ones that are always installed...