From a4c35ac4ff086110c3d14cdc18c3abd752d7b5e5 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Mon, 30 Sep 2013 13:19:45 -0400 Subject: [PATCH] Use separate retry count for calculating retry delay. --- lms/djangoapps/bulk_email/tasks.py | 204 +++++++++++------- .../bulk_email/tests/test_err_handling.py | 26 ++- lms/djangoapps/bulk_email/tests/test_tasks.py | 162 ++++++++++++++ lms/djangoapps/instructor/views/legacy.py | 16 +- lms/djangoapps/instructor_task/api.py | 12 +- lms/djangoapps/instructor_task/api_helper.py | 2 +- lms/djangoapps/instructor_task/subtasks.py | 144 ++++++++----- lms/djangoapps/instructor_task/tasks.py | 21 +- .../instructor_task/tasks_helper.py | 185 ++++++++-------- .../instructor_task/tests/test_api.py | 5 +- .../instructor_task/tests/test_tasks.py | 4 +- lms/envs/aws.py | 14 +- lms/envs/common.py | 11 + 13 files changed, 528 insertions(+), 278 deletions(-) create mode 100644 lms/djangoapps/bulk_email/tests/test_tasks.py diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 6c1d517b7d..07f1af05d2 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -12,8 +12,14 @@ from sys import exc_info from traceback import format_exc from dogapi import dog_stats_api -from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError -from boto.ses.exceptions import SESDailyQuotaExceededError, SESMaxSendingRateExceededError +from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError, SMTPException +from boto.ses.exceptions import ( + SESDailyQuotaExceededError, + SESMaxSendingRateExceededError, + SESAddressBlacklistedError, + SESIllegalAddressError, + SESLocalAddressCharacterError, +) from boto.exception import AWSConnectionError from celery import task, current_task, group @@ -44,18 +50,25 @@ from instructor_task.subtasks import ( log = get_task_logger(__name__) +# Errors that an individual email is failing to be sent, and should just +# be treated as a fail. +SINGLE_EMAIL_FAILURE_ERRORS = (SESAddressBlacklistedError, SESIllegalAddressError, SESLocalAddressCharacterError) + # Exceptions that, if caught, should cause the task to be re-tried. -# These errors will be caught a maximum of 5 times before the task fails. -RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError) +# These errors will be caught a limited number of times before the task fails. +LIMITED_RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError) -# Errors that involve exceeding a quota of sent email -QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, ) - -# Errors that mail is being sent too quickly. When caught by a task, it -# triggers an exponential backoff and retry. Retries happen continuously until -# the email is sent. +# Errors that indicate that a mailing task should be retried without limit. +# An example is if email is being sent too quickly, but may succeed if sent +# more slowly. When caught by a task, it triggers an exponential backoff and retry. +# Retries happen continuously until the email is sent. INFINITE_RETRY_ERRORS = (SESMaxSendingRateExceededError, ) +# Errors that are known to indicate an inability to send any more emails, +# and should therefore not be retried. For example, exceeding a quota for emails. +# Also, any SMTP errors that are not explicitly enumerated above. +BULK_EMAIL_FAILURE_ERRORS = (SESDailyQuotaExceededError, SMTPException) + def _get_recipient_queryset(user_id, to_option, course_id, course_location): """ @@ -118,12 +131,14 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) """ entry = InstructorTask.objects.get(pk=entry_id) # get inputs to use in this task from the entry: - #task_id = entry.task_id user_id = entry.requester.id task_id = entry.task_id - # TODO: check this against argument passed in? - # course_id = entry.course_id + # perfunctory check, since expansion is made for convenience of other task + # code that doesn't need the entry_id. + if course_id != entry.course_id: + format_msg = "Course id conflict: explicit value %s does not match task value %s" + raise ValueError(format_msg.format(course_id, entry.course_id)) email_id = task_input['email_id'] try: @@ -138,15 +153,16 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_option = email_obj.to_option - # TODO: instead of fetching from email object, compare instead to - # confirm that they match, and raise an exception if they don't. - # course_id = email_obj.course_id + # sanity check that course for email_obj matches that of the task referencing it: + if course_id != email_obj.course_id: + format_msg = "Course id conflict: explicit value %s does not match email value %s" + raise ValueError(format_msg.format(course_id, email_obj.course_id)) try: course = get_course_by_id(course_id, depth=1) except Http404 as exc: log.exception("Task %s: get_course_by_id failed: %s", task_id, exc.args[0]) - raise Exception("get_course_by_id failed: " + exc.args[0]) + raise ValueError("Course not found: " + exc.args[0]) global_email_context = _get_course_email_context(course) recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) @@ -173,23 +189,26 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) for i in range(num_tasks_this_query): if i == num_tasks_this_query - 1: # Avoid cutting off the very last email when chunking a task that divides perfectly - # (eg num_emails_this_query = 297 and EMAILS_PER_TASK is 100) + # (e.g. num_emails_this_query = 297 and EMAILS_PER_TASK is 100) to_list = recipient_sublist[i * chunk:] else: to_list = recipient_sublist[i * chunk:i * chunk + chunk] subtask_id = str(uuid4()) subtask_id_list.append(subtask_id) - retry_progress = create_subtask_status() - task_list.append(send_course_email.subtask(( - entry_id, - email_id, - to_list, - global_email_context, - retry_progress, - ), - task_id=subtask_id, - routing_key=settings.HIGH_PRIORITY_QUEUE, - )) + subtask_status = create_subtask_status(subtask_id) + # create subtask, passing args and kwargs: + new_subtask = send_course_email.subtask( + ( + entry_id, + email_id, + to_list, + global_email_context, + subtask_status, + ), + task_id=subtask_id, + routing_key=settings.BULK_EMAIL_ROUTING_KEY, + ) + task_list.append(new_subtask) num_emails_queued += num_emails_this_query # Sanity check: we expect the chunking to be properly summing to the original count: @@ -208,7 +227,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) # now group the subtasks, and start them running: task_group = group(task_list) - task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE) + task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY) # We want to return progress here, as this is what will be stored in the # AsyncResult for the parent task as its return value. @@ -218,13 +237,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) return progress -# TODO: figure out if we really need this after all (for unit tests...) -def _get_current_task(): - """Stub to make it easier to test without actually running Celery""" - return current_task - - -@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102 +@task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES) # pylint: disable=E1102 def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status): """ Sends an email to a list of recipients. @@ -249,8 +262,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask # with it right away, but we also don't expect it to fail. InstructorTask.objects.get(pk=entry_id) - # Get information from current task's request: - current_task_id = _get_current_task().request.id + current_task_id = subtask_status['task_id'] num_to_send = len(to_list) log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s", email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status) @@ -295,6 +307,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask update_subtask_status(entry_id, current_task_id, new_subtask_status) raise send_exception + log.info("background task (%s) returning status %s", current_task_id, new_subtask_status) return new_subtask_status @@ -332,12 +345,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas 'failed' count above. """ # Get information from current task's request: - task_id = _get_current_task().request.id - retry_index = _get_current_task().request.retries + #task_id = _get_current_task().request.id + #retry_index = _get_current_task().request.retries + task_id = subtask_status['task_id'] # If this is a second attempt, then throttle the speed at which mail is sent: - throttle = retry_index > 0 + throttle = subtask_status['retried_nomax'] > 0 + # collect stats on progress: num_optout = 0 num_sent = 0 num_error = 0 @@ -354,7 +369,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # attempt. Anyone on the to_list on a retry has already passed the filter # that existed at that time, and we don't need to keep checking for changes # in the Optout list. - if retry_index == 0: + if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0: optouts = (Optout.objects.filter(course_id=course_email.course_id, user__in=[i['pk'] for i in to_list]) .values_list('user__email', flat=True)) @@ -403,7 +418,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas ) email_msg.attach_alternative(html_msg, 'text/html') - # Throttle if we tried a few times and got the rate limiter + # Throttle if we have gotten the rate limiter if throttle: sleep(0.2) @@ -413,11 +428,6 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas with dog_stats_api.timer('course_email.single_send.time.overall', tags=[_statsd_tag(course_title)]): connection.send_messages([email_msg]) - dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)]) - - log.info('Email with id %s sent to %s', email_id, email) - num_sent += 1 - except SMTPDataError as exc: # According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure. if exc.smtp_code >= 400 and exc.smtp_code < 500: @@ -429,52 +439,56 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) num_error += 1 + except SINGLE_EMAIL_FAILURE_ERRORS as exc: + # This will fall through and not retry the message, since it will be popped + log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc) + dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) + num_error += 1 + + else: + dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)]) + + log.info('Email with id %s sent to %s', email_id, email) + num_sent += 1 + # Pop the user that was emailed off the end of the list: to_list.pop() except INFINITE_RETRY_ERRORS as exc: + dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)]) subtask_progress = increment_subtask_status( subtask_status, succeeded=num_sent, failed=num_error, skipped=num_optout, - retriedA=1, + retried_nomax=1, state=RETRY ) return _submit_for_retry( entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True ) - except RETRY_ERRORS as exc: + except LIMITED_RETRY_ERRORS as exc: # Errors caught here cause the email to be retried. The entire task is actually retried # without popping the current recipient off of the existing list. # Errors caught are those that indicate a temporary condition that might succeed on retry. + dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)]) subtask_progress = increment_subtask_status( subtask_status, succeeded=num_sent, failed=num_error, skipped=num_optout, - retriedB=1, + retried_withmax=1, state=RETRY ) return _submit_for_retry( entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False ) - except Exception as exc: - - # If we have a general exception for this request, we need to figure out what to do with it. - # If we're going to just mark it as failed - # And the log message below should indicate which task_id is failing, so we have a chance to - # reconstruct the problems. - if isinstance(exc, QUOTA_EXCEEDED_ERRORS): - log.exception('WARNING: Course "%s" exceeded quota!', course_title) - log.exception('Email with id %d not sent due to exceeding quota. To list: %s', - email_id, - [i['email'] for i in to_list]) - else: - log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s', - task_id, email_id, [i['email'] for i in to_list]) + except BULK_EMAIL_FAILURE_ERRORS as exc: + dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) + log.exception('Task %s: email with id %d caused send_course_email task to fail with "fatal" exception. To list: %s', + task_id, email_id, [i['email'] for i in to_list]) num_error += len(to_list) subtask_progress = increment_subtask_status( subtask_status, @@ -484,6 +498,27 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas state=FAILURE ) return subtask_progress, exc + + except Exception as exc: + # Errors caught here cause the email to be retried. The entire task is actually retried + # without popping the current recipient off of the existing list. + # These are unexpected errors. Since they might be due to a temporary condition that might + # succeed on retry, we give them a retry. + dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)]) + log.exception('Task %s: email with id %d caused send_course_email task to fail with unexpected exception. Generating retry.', + task_id, email_id) + subtask_progress = increment_subtask_status( + subtask_status, + succeeded=num_sent, + failed=num_error, + skipped=num_optout, + retried_withmax=1, + state=RETRY + ) + return _submit_for_retry( + entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False + ) + else: # Successful completion is marked by an exception value of None: subtask_progress = increment_subtask_status( @@ -499,13 +534,18 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas connection.close() -def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress, is_sending_rate_error): +def _get_current_task(): + """Stub to make it easier to test without actually running Celery""" + return current_task + + +def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_status, is_sending_rate_error): """ Helper function to requeue a task for retry, using the new version of arguments provided. Inputs are the same as for running a task, plus two extra indicating the state at the time of retry. These include the `current_exception` that the task encountered that is causing the retry attempt, - and the `subtask_progress` that is to be returned. + and the `subtask_status` that is to be returned. Returns a tuple of two values: * First value is a dict which represents current progress. Keys are: @@ -519,27 +559,29 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current successfully submitted, this value will be the RetryTaskError that retry() returns. Otherwise, it (ought to be) the current_exception passed in. """ - task_id = _get_current_task().request.id - retry_index = _get_current_task().request.retries - + # task_id = _get_current_task().request.id + task_id = subtask_status['task_id'] log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)", - current_task.request.id, subtask_progress['succeeded'], subtask_progress['failed'], subtask_progress['skipped']) + task_id, subtask_status['succeeded'], subtask_status['failed'], subtask_status['skipped']) # Calculate time until we retry this task (in seconds): + max_retries = _get_current_task().max_retries + subtask_status['retried_nomax'] + base_delay = _get_current_task().default_retry_delay if is_sending_rate_error: + retry_index = subtask_status['retried_nomax'] exp = min(retry_index, 5) - countdown = ((2 ** exp) * 15) * random.uniform(.5, 1.25) + countdown = ((2 ** exp) * base_delay) * random.uniform(.5, 1.25) exception_type = 'sending-rate' else: - countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5) + retry_index = subtask_status['retried_withmax'] + countdown = ((2 ** retry_index) * base_delay) * random.uniform(.75, 1.5) exception_type = 'transient' # max_retries is increased by the number of times an "infinite-retry" exception - # has been retried. We want the regular retries to trigger a retry, but not these + # has been retried. We want the regular retries to trigger max-retry checking, but not these # special retries. So we count them separately. - max_retries = _get_current_task().max_retries + subtask_progress['retriedA'] - log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients (with max_retry=%s)', - task_id, email_id, exception_type, current_exception, len(to_list), max_retries) + log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients in %s seconds (with max_retry=%s)', + task_id, email_id, exception_type, current_exception, len(to_list), countdown, max_retries) try: send_course_email.retry( @@ -548,7 +590,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current email_id, to_list, global_email_context, - subtask_progress, + subtask_status, ], exc=current_exception, countdown=countdown, @@ -559,7 +601,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current # If retry call is successful, update with the current progress: log.exception('Task %s: email with id %d caused send_course_email task to retry.', task_id, email_id) - return subtask_progress, retry_error + return subtask_status, retry_error except Exception as retry_exc: # If there are no more retries, because the maximum has been reached, # we expect the original exception to be raised. We catch it here @@ -569,7 +611,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s', task_id, email_id, [i['email'] for i in to_list]) num_failed = len(to_list) - new_subtask_progress = increment_subtask_status(subtask_progress, failed=num_failed, state=FAILURE) + new_subtask_progress = increment_subtask_status(subtask_status, failed=num_failed, state=FAILURE) return new_subtask_progress, retry_exc diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index 540b81baa3..b0e20ae532 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -4,7 +4,6 @@ Unit tests for handling email sending errors from itertools import cycle from mock import patch, Mock from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError -from unittest import skip from django.test.utils import override_settings from django.conf import settings @@ -93,9 +92,9 @@ class TestEmailErrors(ModuleStoreTestCase): # Test that after the rejected email, the rest still successfully send ((_initial_results), kwargs) = result.call_args self.assertEquals(kwargs['skipped'], 0) - expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0) - self.assertEquals(kwargs['failed'], expectedNumFails) - self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expectedNumFails) + expected_fails = int((settings.EMAILS_PER_TASK + 3) / 4.0) + self.assertEquals(kwargs['failed'], expected_fails) + self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expected_fails) @patch('bulk_email.tasks.get_connection', autospec=True) @patch('bulk_email.tasks.send_course_email.retry') @@ -144,7 +143,7 @@ class TestEmailErrors(ModuleStoreTestCase): @patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException)) def test_general_exception(self, mock_log, retry, result): """ - Tests the if the error is not SMTP-related, we log and reraise + Tests the if the error is unexpected, we log and retry """ test_email = { 'action': 'Send email', @@ -156,11 +155,10 @@ class TestEmailErrors(ModuleStoreTestCase): # so we assert on the arguments of log.exception self.client.post(self.url, test_email) self.assertTrue(mock_log.exception.called) - ((log_str, _task_id, email_id, to_list), _) = mock_log.exception.call_args - self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str) + ((log_str, _task_id, email_id), _) = mock_log.exception.call_args + self.assertIn('caused send_course_email task to fail with unexpected exception.', log_str) self.assertEqual(email_id, 1) - self.assertEqual(to_list, [self.instructor.email]) - self.assertFalse(retry.called) + self.assertTrue(retry.called) # check the results being returned self.assertTrue(result.called) ((initial_results, ), kwargs) = result.call_args @@ -180,7 +178,7 @@ class TestEmailErrors(ModuleStoreTestCase): entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor) task_input = {"email_id": -1} with self.assertRaises(CourseEmail.DoesNotExist): - perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") + perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101 ((log_str, _, email_id), _) = mock_log.warning.call_args self.assertTrue(mock_log.warning.called) self.assertIn('Failed to get CourseEmail with id', log_str) @@ -196,9 +194,9 @@ class TestEmailErrors(ModuleStoreTestCase): email = CourseEmail(course_id=course_id) email.save() entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor) - task_input = {"email_id": email.id} + task_input = {"email_id": email.id} # pylint: disable=E1101 with self.assertRaises(Exception): - perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") + perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101 ((log_str, _, _), _) = mock_log.exception.call_args self.assertTrue(mock_log.exception.called) self.assertIn('get_course_by_id failed:', log_str) @@ -211,9 +209,9 @@ class TestEmailErrors(ModuleStoreTestCase): email = CourseEmail(course_id=self.course.id, to_option="IDONTEXIST") email.save() entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) - task_input = {"email_id": email.id} + task_input = {"email_id": email.id} # pylint: disable=E1101 with self.assertRaises(Exception): - perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") + perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") # pylint: disable=E1101 ((log_str, opt_str), _) = mock_log.error.call_args self.assertTrue(mock_log.error.called) self.assertIn('Unexpected bulk email TO_OPTION found', log_str) diff --git a/lms/djangoapps/bulk_email/tests/test_tasks.py b/lms/djangoapps/bulk_email/tests/test_tasks.py new file mode 100644 index 0000000000..6ee8accda5 --- /dev/null +++ b/lms/djangoapps/bulk_email/tests/test_tasks.py @@ -0,0 +1,162 @@ +""" +Unit tests for LMS instructor-initiated background tasks. + +Runs tasks on answers to course problems to validate that code +paths actually work. + +""" +import json +from uuid import uuid4 +from itertools import cycle +from mock import patch, Mock +from smtplib import SMTPDataError, SMTPServerDisconnected + +from celery.states import SUCCESS + +# from django.test.utils import override_settings +from django.conf import settings +from django.core.management import call_command + +from bulk_email.models import CourseEmail, SEND_TO_ALL + +# from instructor_task.tests.test_tasks import TestInstructorTasks +from instructor_task.tasks import send_bulk_course_email +from instructor_task.models import InstructorTask +from instructor_task.tests.test_base import InstructorTaskCourseTestCase +from instructor_task.tests.factories import InstructorTaskFactory + + +class TestTaskFailure(Exception): + """Dummy exception used for unit tests.""" + pass + + +class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): + """Tests instructor task that send bulk email.""" + + def setUp(self): + super(TestBulkEmailInstructorTask, self).setUp() + self.initialize_course() + self.instructor = self.create_instructor('instructor') + + # load initial content (since we don't run migrations as part of tests): + call_command("loaddata", "course_email_template.json") + + def _create_input_entry(self, course_id=None): + """ + Creates a InstructorTask entry for testing. + + Overrides the base class version in that this creates CourseEmail. + """ + to_option = SEND_TO_ALL + course_id = course_id or self.course.id + course_email = CourseEmail.create(course_id, self.instructor, to_option, "Test Subject", "

This is a test message

") + task_input = {'email_id': course_email.id} + task_id = str(uuid4()) + instructor_task = InstructorTaskFactory.create( + course_id=course_id, + requester=self.instructor, + task_input=json.dumps(task_input), + task_key='dummy value', + task_id=task_id, + ) + return instructor_task + + def _run_task_with_mock_celery(self, task_class, entry_id, task_id, expected_failure_message=None): + """Submit a task and mock how celery provides a current_task.""" + self.current_task = Mock() + self.current_task.max_retries = settings.BULK_EMAIL_MAX_RETRIES + self.current_task.default_retry_delay = settings.BULK_EMAIL_DEFAULT_RETRY_DELAY + task_args = [entry_id, {}] + + with patch('bulk_email.tasks._get_current_task') as mock_get_task: + mock_get_task.return_value = self.current_task + return task_class.apply(task_args, task_id=task_id).get() + + def test_email_missing_current_task(self): + task_entry = self._create_input_entry() + with self.assertRaises(ValueError): + send_bulk_course_email(task_entry.id, {}) + + def test_email_undefined_course(self): + # Check that we fail when passing in a course that doesn't exist. + task_entry = self._create_input_entry(course_id="bogus/course/id") + with self.assertRaises(ValueError): + self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id) + + def _create_students(self, num_students): + """Create students, a problem, and StudentModule objects for testing""" + students = [ + self.create_student('robot%d' % i) for i in xrange(num_students) + ] + return students + + def _test_run_with_task(self, task_class, action_name, total, succeeded, failed=0, skipped=0): + """Run a task and check the number of emails processed.""" + task_entry = self._create_input_entry() + parent_status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id) + # check return value + self.assertEquals(parent_status.get('total'), total) + self.assertEquals(parent_status.get('action_name'), action_name) + # compare with entry in table: + entry = InstructorTask.objects.get(id=task_entry.id) + status = json.loads(entry.task_output) + self.assertEquals(status.get('attempted'), succeeded + failed) + self.assertEquals(status.get('succeeded'), succeeded) + self.assertEquals(status['skipped'], skipped) + self.assertEquals(status['failed'], failed) + self.assertEquals(status.get('total'), total) + self.assertEquals(status.get('action_name'), action_name) + self.assertGreater(status.get('duration_ms'), 0) + self.assertEquals(entry.task_state, SUCCESS) + + def test_successful(self): + num_students = settings.EMAILS_PER_TASK + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + get_conn.return_value.send_messages.side_effect = cycle([None]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails) + + def test_data_err_fail(self): + # Test that celery handles permanent SMTPDataErrors by failing and not retrying. + num_students = settings.EMAILS_PER_TASK + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + expected_fails = int((num_emails + 3) / 4.0) + expected_succeeds = num_emails - expected_fails + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + # have every fourth email fail due to blacklisting: + get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"), + None, None, None]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails) + + def test_retry_after_limited_retry_error(self): + # Test that celery handles connection failures by retrying. + num_students = 1 + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + expected_fails = 0 + expected_succeeds = num_emails + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + # have every other mail attempt fail due to disconnection: + get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting"), None]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails) + + def test_max_retry(self): + # Test that celery can hit a maximum number of retries. + num_students = 1 + self._create_students(num_students) + # we also send email to the instructor: + num_emails = num_students + 1 + # This is an ugly hack: the failures that are reported by the EAGER version of retry + # are multiplied by the attempted number of retries (equals max plus one). + expected_fails = num_emails * (settings.BULK_EMAIL_MAX_RETRIES + 1) + expected_succeeds = 0 + with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: + # always fail to connect, triggering repeated retries until limit is hit: + get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting")]) + self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails) diff --git a/lms/djangoapps/instructor/views/legacy.py b/lms/djangoapps/instructor/views/legacy.py index 6be07f1a5d..87964599ad 100644 --- a/lms/djangoapps/instructor/views/legacy.py +++ b/lms/djangoapps/instructor/views/legacy.py @@ -720,18 +720,13 @@ def instructor_dashboard(request, course_id): email_subject = request.POST.get("subject") html_message = request.POST.get("message") - # TODO: make sure this is committed before submitting it to the task. - # However, it should probably be enough to do the submit below, which - # will commit the transaction for the InstructorTask object. Both should - # therefore be committed. (Still, it might be clearer to do so here as well.) - # Actually, this should probably be moved out, so that all the validation logic - # we might want to add to it can be added. There might also be something - # that would permit validation of the email beforehand. + # Create the CourseEmail object. This is saved immediately, so that + # any transaction that has been pending up to this point will also be + # committed. email = CourseEmail.create(course_id, request.user, email_to_option, email_subject, html_message) - # TODO: make this into a task submission, so that the correct - # InstructorTask object gets created (for monitoring purposes) - submit_bulk_course_email(request, course_id, email.id) + # Submit the task, so that the correct InstructorTask object gets created (for monitoring purposes) + submit_bulk_course_email(request, course_id, email.id) # pylint: disable=E1101 if email_to_option == "all": email_msg = '

Your email was successfully queued for sending. Please note that for large public classes (~10k), it may take 1-2 hours to send all emails.

' @@ -1535,7 +1530,6 @@ def get_background_task_table(course_id, problem_url=None, student=None, task_ty # (note that we don't have to check that the arguments are valid; it # just won't find any entries.) if (history_entries.count()) == 0: - # TODO: figure out how to deal with task_type better here... if problem_url is None: msg += 'Failed to find any background tasks for course "{course}".'.format(course=course_id) elif student is not None: diff --git a/lms/djangoapps/instructor_task/api.py b/lms/djangoapps/instructor_task/api.py index c1e473f84b..7521a8eb3a 100644 --- a/lms/djangoapps/instructor_task/api.py +++ b/lms/djangoapps/instructor_task/api.py @@ -178,8 +178,8 @@ def submit_bulk_course_email(request, course_id, email_id): The specified CourseEmail object will be sent be updated for all students who have enrolled in a course. Parameters are the `course_id` and the `email_id`, the id of the CourseEmail object. - AlreadyRunningError is raised if the course's students are already being emailed. - TODO: is this the right behavior? Or should multiple emails be allowed in the pipeline at the same time? + AlreadyRunningError is raised if the same recipients are already being emailed with the same + CourseEmail object. This method makes sure the InstructorTask entry is committed. When called from any view that is wrapped by TransactionMiddleware, @@ -188,11 +188,9 @@ def submit_bulk_course_email(request, course_id, email_id): save here. Any future database operations will take place in a separate transaction. """ - # check arguments: make sure that the course is defined? - # TODO: what is the right test here? - - # This should also make sure that the email exists. - # We can also pull out the To argument here, so that is displayed in + # Assume that the course is defined, and that the user has already been verified to have + # appropriate access to the course. But make sure that the email exists. + # We also pull out the To argument here, so that is displayed in # the InstructorTask status. email_obj = CourseEmail.objects.get(id=email_id) to_option = email_obj.to_option diff --git a/lms/djangoapps/instructor_task/api_helper.py b/lms/djangoapps/instructor_task/api_helper.py index 1451963693..d6d97a9e28 100644 --- a/lms/djangoapps/instructor_task/api_helper.py +++ b/lms/djangoapps/instructor_task/api_helper.py @@ -268,7 +268,7 @@ def submit_task(request, task_type, task_class, course_id, task_input, task_key) # submit task: task_id = instructor_task.task_id - task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)] + task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)] # pylint: disable=E1101 task_class.apply_async(task_args, task_id=task_id) return instructor_task diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index f8a0bd08f9..00c98e88f2 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -14,50 +14,75 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING TASK_LOG = get_task_logger(__name__) -def create_subtask_status(succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None): +def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): """ - Create a dict for tracking the status of a subtask. + Create and return a dict for tracking the status of a subtask. + + Subtask status keys are: + + 'task_id' : id of subtask. This is used to pass task information across retries. + 'attempted' : number of attempts -- should equal succeeded plus failed + 'succeeded' : number that succeeded in processing + 'skipped' : number that were not processed. + 'failed' : number that failed during processing + 'retried_nomax' : number of times the subtask has been retried for conditions that + should not have a maximum count applied + 'retried_withmax' : number of times the subtask has been retried for conditions that + should have a maximum count applied + 'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS) - Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'. -TODO: update Object must be JSON-serializable, so that it can be passed as an argument to tasks. - TODO: decide if in future we want to include specific error information + In future, we may want to include specific error information indicating the reason for failure. - Also, we should count up "not attempted" separately from - attempted/failed. + Also, we should count up "not attempted" separately from attempted/failed. """ attempted = succeeded + failed current_result = { + 'task_id': task_id, 'attempted': attempted, 'succeeded': succeeded, - 'pending': pending, 'skipped': skipped, 'failed': failed, - 'retriedA': retriedA, - 'retriedB': retriedB, + 'retried_nomax': retried_nomax, + 'retried_withmax': retried_withmax, 'state': state if state is not None else QUEUING, } return current_result -def increment_subtask_status(subtask_result, succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None): +def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): """ Update the result of a subtask with additional results. - Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'. + Create and return a dict for tracking the status of a subtask. + + Keys for input `subtask_result` and returned subtask_status are: + + 'task_id' : id of subtask. This is used to pass task information across retries. + 'attempted' : number of attempts -- should equal succeeded plus failed + 'succeeded' : number that succeeded in processing + 'skipped' : number that were not processed. + 'failed' : number that failed during processing + 'retried_nomax' : number of times the subtask has been retried for conditions that + should not have a maximum count applied + 'retried_withmax' : number of times the subtask has been retried for conditions that + should have a maximum count applied + 'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS) + + Kwarg arguments are incremented to the corresponding key in `subtask_result`. + The exception is for `state`, which if specified is used to override the existing value. """ - # TODO: rewrite this if we have additional fields added to original subtask_result, - # that are not part of the increment. Tradeoff on duplicating the 'attempts' logic. - new_result = create_subtask_status(succeeded, failed, pending, skipped, retriedA, retriedB, state) - for keyname in new_result: - if keyname == 'state': - # does not get incremented. If no new value, copy old value: - if state is None: - new_result[keyname] = subtask_result[keyname] - elif keyname in subtask_result: - new_result[keyname] += subtask_result[keyname] + new_result = dict(subtask_result) + new_result['attempted'] += (succeeded + failed) + new_result['succeeded'] += succeeded + new_result['failed'] += failed + new_result['skipped'] += skipped + new_result['retried_nomax'] += retried_nomax + new_result['retried_withmax'] += retried_withmax + if state is not None: + new_result['state'] = state return new_result @@ -70,7 +95,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero, as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations, and the total number of "things to do" is set, so the user can be told how much needs to be - done overall. The `action_name` is also stored, to also help with constructing more readable + done overall. The `action_name` is also stored, to help with constructing more readable task_progress messages. The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict. @@ -80,8 +105,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i the InstructorTask's "status" will be changed to SUCCESS. The "subtasks" field also contains a 'status' key, that contains a dict that stores status - information for each subtask. At the moment, the value for each subtask (keyed by its task_id) - is the value of `status`, which is initialized here to QUEUING. + information for each subtask. The value for each subtask (keyed by its task_id) + is its subtask status, as defined by create_subtask_status(). This information needs to be set up in the InstructorTask before any of the subtasks start running. If not, there is a chance that the subtasks could complete before the parent task @@ -92,7 +117,6 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i rely on the status stored in the InstructorTask object, rather than status stored in the corresponding AsyncResult. """ - # TODO: also add 'pending' count here? (Even though it's total-attempted-skipped task_progress = { 'action_name': action_name, 'attempted': 0, @@ -108,12 +132,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i # Write out the subtasks information. num_subtasks = len(subtask_id_list) - # using fromkeys to initialize uses a single value. we need the value - # to be distinct, since it's now a dict: - # subtask_status = dict.fromkeys(subtask_id_list, QUEUING) - # TODO: may not be necessary to store initial value with all those zeroes! - # Instead, use a placemarker.... - subtask_status = {subtask_id: create_subtask_status() for subtask_id in subtask_id_list} + # Note that may not be necessary to store initial value with all those zeroes! + subtask_status = {subtask_id: create_subtask_status(subtask_id) for subtask_id in subtask_id_list} subtask_dict = { 'total': num_subtasks, 'succeeded': 0, @@ -129,7 +149,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i @transaction.commit_manually -def update_subtask_status(entry_id, current_task_id, subtask_status): +def update_subtask_status(entry_id, current_task_id, new_subtask_status): """ Update the status of the subtask in the parent InstructorTask object tracking its progress. @@ -138,9 +158,11 @@ def update_subtask_status(entry_id, current_task_id, subtask_status): committed on completion, or rolled back on error. The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict. - Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_progress` + Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `new_subtask_status` into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms' - value with the current interval since the original InstructorTask started. + value with the current interval since the original InstructorTask started. Note that this + value is only approximate, since the subtask may be running on a different server than the + original task, so is subject to clock skew. The InstructorTask's "subtasks" field is also updated. This is also a JSON-serialized dict. Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of @@ -155,13 +177,13 @@ def update_subtask_status(entry_id, current_task_id, subtask_status): messages, progress made, etc. """ TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s", - current_task_id, entry_id, subtask_status) + current_task_id, entry_id, new_subtask_status) try: entry = InstructorTask.objects.select_for_update().get(pk=entry_id) subtask_dict = json.loads(entry.subtasks) - subtask_status = subtask_dict['status'] - if current_task_id not in subtask_status: + subtask_status_info = subtask_dict['status'] + if current_task_id not in subtask_status_info: # unexpected error -- raise an exception format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'" msg = format_str.format(current_task_id, entry_id) @@ -173,39 +195,45 @@ def update_subtask_status(entry_id, current_task_id, subtask_status): # will be updating before the original call, and we don't want their # ultimate status to be clobbered by the "earlier" updates. This # should not be a problem in normal (non-eager) processing. - old_status = subtask_status[current_task_id] - # TODO: check this logic... - state = subtask_status['state'] -# if state != RETRY or old_status['status'] == QUEUING: - # instead replace the status only if it's 'newer' - # i.e. has fewer pending - if subtask_status['pending'] <= old_status['pending']: - subtask_status[current_task_id] = subtask_status + current_subtask_status = subtask_status_info[current_task_id] + current_state = current_subtask_status['state'] + new_state = new_subtask_status['state'] + if new_state != RETRY or current_state == QUEUING or current_state in READY_STATES: + subtask_status_info[current_task_id] = new_subtask_status # Update the parent task progress + # Set the estimate of duration, but only if it + # increases. Clock skew between time() returned by different machines + # may result in non-monotonic values for duration. task_progress = json.loads(entry.task_output) start_time = task_progress['start_time'] - task_progress['duration_ms'] = int((time() - start_time) * 1000) - # change behavior so we don't update on progress now: - # TODO: figure out if we can make this more responsive later, - # by figuring out how to handle retries better. - if subtask_status is not None and state in READY_STATES: + prev_duration = task_progress['duration_ms'] + new_duration = int((time() - start_time) * 1000) + task_progress['duration_ms'] = max(prev_duration, new_duration) + + # Update counts only when subtask is done. + # In future, we can make this more responsive by updating status + # between retries, by comparing counts that change from previous + # retry. + if new_subtask_status is not None and new_state in READY_STATES: for statname in ['attempted', 'succeeded', 'failed', 'skipped']: - task_progress[statname] += subtask_status[statname] + task_progress[statname] += new_subtask_status[statname] # Figure out if we're actually done (i.e. this is the last task to complete). # This is easier if we just maintain a counter, rather than scanning the - # entire subtask_status dict. - if state == SUCCESS: + # entire new_subtask_status dict. + if new_state == SUCCESS: subtask_dict['succeeded'] += 1 - elif state == RETRY: + elif new_state == RETRY: subtask_dict['retried'] += 1 else: subtask_dict['failed'] += 1 num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed'] - # If we're done with the last task, update the parent status to indicate that: - # TODO: see if there was a catastrophic failure that occurred, and figure out - # how to report that here. + + # If we're done with the last task, update the parent status to indicate that. + # At present, we mark the task as having succeeded. In future, we should see + # if there was a catastrophic failure that occurred, and figure out how to + # report that here. if num_remaining <= 0: entry.task_state = SUCCESS entry.subtasks = json.dumps(subtask_dict) diff --git a/lms/djangoapps/instructor_task/tasks.py b/lms/djangoapps/instructor_task/tasks.py index a6a082f2b9..9291d7dd16 100644 --- a/lms/djangoapps/instructor_task/tasks.py +++ b/lms/djangoapps/instructor_task/tasks.py @@ -32,7 +32,7 @@ from instructor_task.tasks_helper import ( from bulk_email.tasks import perform_delegate_email_batches -@task(base=BaseInstructorTask) +@task(base=BaseInstructorTask) # pylint: disable=E1102 def rescore_problem(entry_id, xmodule_instance_args): """Rescores a problem in a course, for all students or one specific student. @@ -55,13 +55,14 @@ def rescore_problem(entry_id, xmodule_instance_args): update_fcn = partial(rescore_problem_module_state, xmodule_instance_args) def filter_fcn(modules_to_update): + """Filter that matches problems which are marked as being done""" return modules_to_update.filter(state__contains='"done": true') visit_fcn = partial(perform_module_state_update, update_fcn, filter_fcn) return run_main_task(entry_id, visit_fcn, action_name) -@task(base=BaseInstructorTask) +@task(base=BaseInstructorTask) # pylint: disable=E1102 def reset_problem_attempts(entry_id, xmodule_instance_args): """Resets problem attempts to zero for a particular problem for all students in a course. @@ -82,7 +83,7 @@ def reset_problem_attempts(entry_id, xmodule_instance_args): return run_main_task(entry_id, visit_fcn, action_name) -@task(base=BaseInstructorTask) +@task(base=BaseInstructorTask) # pylint: disable=E1102 def delete_problem_state(entry_id, xmodule_instance_args): """Deletes problem state entirely for all students on a particular problem in a course. @@ -103,18 +104,20 @@ def delete_problem_state(entry_id, xmodule_instance_args): return run_main_task(entry_id, visit_fcn, action_name) -@task(base=BaseInstructorTask) -def send_bulk_course_email(entry_id, xmodule_instance_args): - """Sends emails to in a course. +@task(base=BaseInstructorTask) # pylint: disable=E1102 +def send_bulk_course_email(entry_id, _xmodule_instance_args): + """Sends emails to recipients enrolled in a course. `entry_id` is the id value of the InstructorTask entry that corresponds to this task. The entry contains the `course_id` that identifies the course, as well as the `task_input`, which contains task-specific input. - The task_input should be a dict with no entries. + The task_input should be a dict with the following entries: - `xmodule_instance_args` provides information needed by _get_module_instance_for_task() - to instantiate an xmodule instance. + 'email_id': the full URL to the problem to be rescored. (required) + + `_xmodule_instance_args` provides information needed by _get_module_instance_for_task() + to instantiate an xmodule instance. This is unused here. """ action_name = 'emailed' visit_fcn = perform_delegate_email_batches diff --git a/lms/djangoapps/instructor_task/tasks_helper.py b/lms/djangoapps/instructor_task/tasks_helper.py index 1ca444756d..a30d94862d 100644 --- a/lms/djangoapps/instructor_task/tasks_helper.py +++ b/lms/djangoapps/instructor_task/tasks_helper.py @@ -42,6 +42,12 @@ class BaseInstructorTask(Task): Permits updating information about task in corresponding InstructorTask for monitoring purposes. Assumes that the entry_id of the InstructorTask model is the first argument to the task. + + The `entry_id` is the primary key for the InstructorTask entry representing the task. This class + updates the entry on success and failure of the task it wraps. It is setting the entry's value + for task_state based on what Celery would set it to once the task returns to Celery: + FAILURE if an exception is encountered, and SUCCESS if it returns normally. + Other arguments are pass-throughs to perform_module_state_update, and documented there. """ abstract = True @@ -51,8 +57,22 @@ class BaseInstructorTask(Task): Updates task_output and task_state. But it shouldn't actually do anything if the task is only creating subtasks to actually do the work. + + Assumes `task_progress` is a dict containing the task's result, with the following keys: + + 'attempted': number of attempts made + 'succeeded': number of attempts that "succeeded" + 'skipped': number of attempts that "skipped" + 'failed': number of attempts that "failed" + 'total': number of possible subtasks to attempt + 'action_name': user-visible verb to use in status messages. Should be past-tense. + Pass-through of input `action_name`. + 'duration_ms': how long the task has (or had) been running. + + This is JSON-serialized and stored in the task_output column of the InstructorTask entry. + """ - TASK_LOG.info('Task success returned: %r' % (self.request, )) + TASK_LOG.debug('Task %s: success returned with progress: %s', task_id, task_progress) # We should be able to find the InstructorTask object to update # based on the task_id here, without having to dig into the # original args to the task. On the other hand, the entry_id @@ -72,9 +92,20 @@ class BaseInstructorTask(Task): """ Update InstructorTask object corresponding to this task with info about failure. - Fetches and updates exception and traceback information on failure. + Fetches and updates exception and traceback information on failure. + + If an exception is raised internal to the task, it is caught by celery and provided here. + The information is recorded in the InstructorTask object as a JSON-serialized dict + stored in the task_output column. It contains the following keys: + + 'exception': type of exception object + 'message': error message from exception object + 'traceback': traceback information (truncated if necessary) + + Note that there is no way to record progress made within the task (e.g. attempted, + succeeded, etc.) when such failures occur. """ - TASK_LOG.info('Task failure returned: %r' % (self.request, )) + TASK_LOG.debug('Task %s: failure returned', task_id) entry_id = args[0] try: entry = InstructorTask.objects.get(pk=entry_id) @@ -88,12 +119,6 @@ class BaseInstructorTask(Task): entry.task_state = FAILURE entry.save_now() - def on_retry(self, exc, task_id, args, kwargs, einfo): - # We don't expect this to be called for top-level tasks, at the moment.... - # If it were, not sure what kind of status to report for it. - # But it would be good to know that it's being called, so at least log it. - TASK_LOG.info('Task retry returned: %r' % (self.request, )) - class UpdateProblemModuleStateError(Exception): """ @@ -110,6 +135,67 @@ def _get_current_task(): return current_task +def run_main_task(entry_id, task_fcn, action_name): + """ + Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask. + + Arguments passed to `task_fcn` are: + + `entry_id` : the primary key for the InstructorTask entry representing the task. + `course_id` : the id for the course. + `task_input` : dict containing task-specific arguments, JSON-decoded from InstructorTask's task_input. + `action_name` : past-tense verb to use for constructing status messages. + + If no exceptions are raised, the `task_fcn` should return a dict containing + the task's result with the following keys: + + 'attempted': number of attempts made + 'succeeded': number of attempts that "succeeded" + 'skipped': number of attempts that "skipped" + 'failed': number of attempts that "failed" + 'total': number of possible subtasks to attempt + 'action_name': user-visible verb to use in status messages. + Should be past-tense. Pass-through of input `action_name`. + 'duration_ms': how long the task has (or had) been running. + + """ + + # get the InstructorTask to be updated. If this fails, then let the exception return to Celery. + # There's no point in catching it here. + entry = InstructorTask.objects.get(pk=entry_id) + + # get inputs to use in this task from the entry: + task_id = entry.task_id + course_id = entry.course_id + task_input = json.loads(entry.task_input) + + # construct log message: + fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"' + task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input) + + TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string) + + # Check that the task_id submitted in the InstructorTask matches the current task + # that is running. + request_task_id = _get_current_task().request.id + if task_id != request_task_id: + fmt = 'Requested task did not match actual task "{actual_id}": {task_info}' + message = fmt.format(actual_id=request_task_id, task_info=task_info_string) + TASK_LOG.error(message) + raise ValueError(message) + + # Now do the work: + with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]): + task_progress = task_fcn(entry_id, course_id, task_input, action_name) + + # Release any queries that the connection has been hanging onto: + reset_queries() + + # log and exit, returning task_progress info as task result: + TASK_LOG.info('Finishing %s: final: %s', task_info_string, task_progress) + return task_progress + + def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, task_input, action_name): """ Performs generic update by visiting StudentModule instances with the update_fcn provided. @@ -220,92 +306,13 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta return task_progress -def run_main_task(entry_id, task_fcn, action_name): - """ - Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask. - - TODO: UPDATE THIS DOCSTRING - (IT's not just visiting StudentModule instances....) - - Performs generic update by visiting StudentModule instances with the update_fcn provided. - - The `entry_id` is the primary key for the InstructorTask entry representing the task. This function - updates the entry on success and failure of the perform_module_state_update function it - wraps. It is setting the entry's value for task_state based on what Celery would set it to once - the task returns to Celery: FAILURE if an exception is encountered, and SUCCESS if it returns normally. - Other arguments are pass-throughs to perform_module_state_update, and documented there. - - If no exceptions are raised, a dict containing the task's result is returned, with the following keys: - - 'attempted': number of attempts made - 'succeeded': number of attempts that "succeeded" - 'skipped': number of attempts that "skipped" - 'failed': number of attempts that "failed" - 'total': number of possible subtasks to attempt - 'action_name': user-visible verb to use in status messages. Should be past-tense. - Pass-through of input `action_name`. - 'duration_ms': how long the task has (or had) been running. - - Before returning, this is also JSON-serialized and stored in the task_output column of the InstructorTask entry. - - If an exception is raised internally, it is caught and recorded in the InstructorTask entry. - This is also a JSON-serialized dict, stored in the task_output column, containing the following keys: - - 'exception': type of exception object - 'message': error message from exception object - 'traceback': traceback information (truncated if necessary) - - Once the exception is caught, it is raised again and allowed to pass up to the - task-running level, so that it can also set the failure modes and capture the error trace in the - result object that Celery creates. - - """ - - # get the InstructorTask to be updated. If this fails, then let the exception return to Celery. - # There's no point in catching it here. - entry = InstructorTask.objects.get(pk=entry_id) - - # get inputs to use in this task from the entry: - task_id = entry.task_id - course_id = entry.course_id - task_input = json.loads(entry.task_input) - - # construct log message: - fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"' - task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input) - - TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string) - - # Check that the task_id submitted in the InstructorTask matches the current task - # that is running. - request_task_id = _get_current_task().request.id - if task_id != request_task_id: - fmt = 'Requested task did not match actual task "{actual_id}": {task_info}' - message = fmt.format(actual_id=request_task_id, task_info=task_info_string) - TASK_LOG.error(message) - raise UpdateProblemModuleStateError(message) - - # Now do the work: - with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]): - task_progress = task_fcn(entry_id, course_id, task_input, action_name) - - # Release any queries that the connection has been hanging onto: - reset_queries() - - # log and exit, returning task_progress info as task result: - TASK_LOG.info('Finishing %s: final: %s', task_info_string, task_progress) - return task_progress - - def _get_task_id_from_xmodule_args(xmodule_instance_args): """Gets task_id from `xmodule_instance_args` dict, or returns default value if missing.""" return xmodule_instance_args.get('task_id', UNKNOWN_TASK_ID) if xmodule_instance_args is not None else UNKNOWN_TASK_ID def _get_xqueue_callback_url_prefix(xmodule_instance_args): - """ - - """ + """Gets prefix to use when constructing xqueue_callback_url.""" return xmodule_instance_args.get('xqueue_callback_url_prefix', '') if xmodule_instance_args is not None else '' diff --git a/lms/djangoapps/instructor_task/tests/test_api.py b/lms/djangoapps/instructor_task/tests/test_api.py index 5dc9a05d53..66926ad22c 100644 --- a/lms/djangoapps/instructor_task/tests/test_api.py +++ b/lms/djangoapps/instructor_task/tests/test_api.py @@ -152,15 +152,16 @@ class InstructorTaskCourseSubmitTest(InstructorTaskCourseTestCase): self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org") def _define_course_email(self): + """Create CourseEmail object for testing.""" course_email = CourseEmail.create(self.course.id, self.instructor, SEND_TO_ALL, "Test Subject", "

This is a test message

") - return course_email.id + return course_email.id # pylint: disable=E1101 def test_submit_bulk_email_all(self): email_id = self._define_course_email() instructor_task = submit_bulk_course_email(self.create_task_request(self.instructor), self.course.id, email_id) # test resubmitting, by updating the existing record: - instructor_task = InstructorTask.objects.get(id=instructor_task.id) + instructor_task = InstructorTask.objects.get(id=instructor_task.id) # pylint: disable=E1101 instructor_task.task_state = PROGRESS instructor_task.save() diff --git a/lms/djangoapps/instructor_task/tests/test_tasks.py b/lms/djangoapps/instructor_task/tests/test_tasks.py index e1f89a6022..448054a13d 100644 --- a/lms/djangoapps/instructor_task/tests/test_tasks.py +++ b/lms/djangoapps/instructor_task/tests/test_tasks.py @@ -85,11 +85,11 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): def _test_missing_current_task(self, task_class): """Check that a task_class fails when celery doesn't provide a current_task.""" task_entry = self._create_input_entry() - with self.assertRaises(UpdateProblemModuleStateError): + with self.assertRaises(ValueError): task_class(task_entry.id, self._get_xmodule_instance_args()) def _test_undefined_course(self, task_class): - # run with celery, but no course defined + """Run with celery, but with no course defined.""" task_entry = self._create_input_entry(course_id="bogus/course/id") with self.assertRaises(ItemNotFoundError): self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id) diff --git a/lms/envs/aws.py b/lms/envs/aws.py index 86d3d539bd..4dc928a208 100644 --- a/lms/envs/aws.py +++ b/lms/envs/aws.py @@ -93,6 +93,10 @@ CELERY_QUEUES = { DEFAULT_PRIORITY_QUEUE: {} } +# We want Bulk Email running on the high-priority queue, so we define the +# routing key that points to it. At the moment, the name is the same. +BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE + ########################## NON-SECURE ENV CONFIG ############################## # Things like server locations, ports, etc. @@ -128,7 +132,7 @@ LOG_DIR = ENV_TOKENS['LOG_DIR'] CACHES = ENV_TOKENS['CACHES'] -#Email overrides +# Email overrides DEFAULT_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_FROM_EMAIL', DEFAULT_FROM_EMAIL) DEFAULT_FEEDBACK_EMAIL = ENV_TOKENS.get('DEFAULT_FEEDBACK_EMAIL', DEFAULT_FEEDBACK_EMAIL) DEFAULT_BULK_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_BULK_FROM_EMAIL', DEFAULT_BULK_FROM_EMAIL) @@ -140,8 +144,10 @@ BUGS_EMAIL = ENV_TOKENS.get('BUGS_EMAIL', BUGS_EMAIL) PAYMENT_SUPPORT_EMAIL = ENV_TOKENS.get('PAYMENT_SUPPORT_EMAIL', PAYMENT_SUPPORT_EMAIL) PAID_COURSE_REGISTRATION_CURRENCY = ENV_TOKENS.get('PAID_COURSE_REGISTRATION_CURRENCY', PAID_COURSE_REGISTRATION_CURRENCY) +BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY) +BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES) -#Theme overrides +# Theme overrides THEME_NAME = ENV_TOKENS.get('THEME_NAME', None) if not THEME_NAME is None: enable_theme(THEME_NAME) @@ -150,10 +156,10 @@ if not THEME_NAME is None: # Marketing link overrides MKTG_URL_LINK_MAP.update(ENV_TOKENS.get('MKTG_URL_LINK_MAP', {})) -#Timezone overrides +# Timezone overrides TIME_ZONE = ENV_TOKENS.get('TIME_ZONE', TIME_ZONE) -#Additional installed apps +# Additional installed apps for app in ENV_TOKENS.get('ADDL_INSTALLED_APPS', []): INSTALLED_APPS += (app,) diff --git a/lms/envs/common.py b/lms/envs/common.py index 96b304294d..0c12d088c9 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -795,6 +795,17 @@ CELERY_QUEUES = { DEFAULT_PRIORITY_QUEUE: {} } +# let logging work as configured: +CELERYD_HIJACK_ROOT_LOGGER = False + +################################ Bulk Email ################################### + +# We want Bulk Email running on the high-priority queue, so we define the +# routing key that points to it. At the moment, the name is the same. +BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE +BULK_EMAIL_DEFAULT_RETRY_DELAY = 15 +BULK_EMAIL_MAX_RETRIES = 5 + ################################### APPS ###################################### INSTALLED_APPS = ( # Standard ones that are always installed...