From 08a08448eef53141af99ce383b2c903fb6c5cca4 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Wed, 25 Sep 2013 18:12:26 -0400 Subject: [PATCH] Add some handling for SES exceptions. --- lms/djangoapps/bulk_email/tasks.py | 79 ++++++++++++++---- lms/djangoapps/instructor_task/api_helper.py | 2 +- lms/djangoapps/instructor_task/subtasks.py | 83 +++++++++++++++---- .../instructor_task/tasks_helper.py | 7 +- 4 files changed, 135 insertions(+), 36 deletions(-) diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 8a51f75816..3e62eadb65 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -4,6 +4,7 @@ to a course. """ import math import re +import random from uuid import uuid4 from time import sleep @@ -12,6 +13,8 @@ from traceback import format_exc from dogapi import dog_stats_api from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError +from boto.ses.exceptions import SESDailyQuotaExceededError, SESMaxSendingRateExceededError +from boto.exception import AWSConnectionError from celery import task, current_task, group from celery.utils.log import get_task_logger @@ -34,12 +37,26 @@ from instructor_task.models import InstructorTask from instructor_task.subtasks import ( update_subtask_status, create_subtask_result, + increment_subtask_result, update_instructor_task_for_subtasks, ) log = get_task_logger(__name__) +# Exceptions that, if caught, should cause the task to be re-tried. +# These errors will be caught a maximum of 5 times before the task fails. +RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError) + +# Errors that involve exceeding a quota of sent email +QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, ) + +# Errors that mail is being sent too quickly. When caught by a task, it +# triggers an exponential backoff and retry. Retries happen continuously until +# the email is sent. +SENDING_RATE_ERRORS = (SESMaxSendingRateExceededError, ) + + def _get_recipient_queryset(user_id, to_option, course_id, course_location): """ Generates a query set corresponding to the requested category. @@ -154,7 +171,12 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.EMAILS_PER_TASK))) chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query))) for i in range(num_tasks_this_query): - to_list = recipient_sublist[i * chunk:i * chunk + chunk] + if i == num_tasks_this_query - 1: + # Avoid cutting off the very last email when chunking a task that divides perfectly + # (eg num_emails_this_query = 297 and EMAILS_PER_TASK is 100) + to_list = recipient_sublist[i * chunk:] + else: + to_list = recipient_sublist[i * chunk:i * chunk + chunk] subtask_id = str(uuid4()) subtask_id_list.append(subtask_id) task_list.append(send_course_email.subtask(( @@ -165,7 +187,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ), task_id=subtask_id, routing_key=settings.HIGH_PRIORITY_QUEUE, - queue=settings.HIGH_PRIORITY_QUEUE, )) num_workers += num_tasks_this_query @@ -177,7 +198,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) # now group the subtasks, and start them running: task_group = group(task_list) - task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE, queue=settings.HIGH_PRIORITY_QUEUE) + task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE) # We want to return progress here, as this is what will be stored in the # AsyncResult for the parent task as its return value. @@ -220,8 +241,9 @@ def send_course_email(entry_id, email_id, to_list, global_email_context): # Get information from current task's request: current_task_id = _get_current_task().request.id - log.info("Preparing to send email as subtask %s for instructor task %d: request = %s", - current_task_id, entry_id, _get_current_task().request) + num_to_send = len(to_list) + log.info("Preparing to send %s emails as subtask %s for instructor task %d: request = %s", + num_to_send, current_task_id, entry_id, _get_current_task().request) send_exception = None course_email_result_value = None @@ -239,9 +261,10 @@ def send_course_email(entry_id, email_id, to_list, global_email_context): _, send_exception, traceback = exc_info() traceback_string = format_exc(traceback) if traceback is not None else '' log.error("background task (%s) failed unexpectedly: %s %s", current_task_id, send_exception, traceback_string) - # consider all emails to not be sent, and update stats: - num_error = len(to_list) - course_email_result_value = create_subtask_result(0, num_error, 0) + # We got here for really unexpected reasons. Since we don't know how far + # the task got in emailing, we count all recipients as having failed. + # It at least keeps the counts consistent. + course_email_result_value = create_subtask_result(0, num_to_send, 0) if send_exception is None: # Update the InstructorTask object that is storing its progress. @@ -391,13 +414,19 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): # Pop the user that was emailed off the end of the list: to_list.pop() - except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc: + except SENDING_RATE_ERRORS as exc: + subtask_progress = create_subtask_result(num_sent, num_error, num_optout) + return _submit_for_retry( + entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True + ) + + except RETRY_ERRORS as exc: # Errors caught here cause the email to be retried. The entire task is actually retried # without popping the current recipient off of the existing list. # Errors caught are those that indicate a temporary condition that might succeed on retry. subtask_progress = create_subtask_result(num_sent, num_error, num_optout) return _submit_for_retry( - entry_id, email_id, to_list, global_email_context, exc, subtask_progress + entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False ) except Exception as exc: @@ -406,8 +435,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): # If we're going to just mark it as failed # And the log message below should indicate which task_id is failing, so we have a chance to # reconstruct the problems. - log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s', - task_id, email_id, [i['email'] for i in to_list]) + if isinstance(exc, QUOTA_EXCEEDED_ERRORS): + log.exception('WARNING: Course "%s" exceeded quota!', course_title) + log.exception('Email with id %d not sent due to exceeding quota. To list: %s', + email_id, + [i['email'] for i in to_list]) + else: + log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s', + task_id, email_id, [i['email'] for i in to_list]) num_error += len(to_list) return create_subtask_result(num_sent, num_error, num_optout), exc else: @@ -418,7 +453,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): connection.close() -def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress): +def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress, is_sending_rate_error): """ Helper function to requeue a task for retry, using the new version of arguments provided. @@ -443,6 +478,15 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current log.warning('Task %s: email with id %d not delivered due to temporary error %s, retrying send to %d recipients', task_id, email_id, current_exception, len(to_list)) + + # Don't resend emails that have already succeeded. + # Retry the email at increasing exponential backoff. + + if is_sending_rate_error: + countdown = ((2 ** retry_index) * 15) * random.uniform(.5, 1.5) + else: + countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5) + try: send_course_email.retry( args=[ @@ -452,7 +496,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current global_email_context, ], exc=current_exception, - countdown=(2 ** retry_index) * 15, + countdown=countdown, throw=True, ) except RetryTaskError as retry_error: @@ -464,10 +508,13 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current # If there are no more retries, because the maximum has been reached, # we expect the original exception to be raised. We catch it here # (and put it in retry_exc just in case it's different, but it shouldn't be), - # and update status as if it were any other failure. + # and update status as if it were any other failure. That means that + # the recipients still in the to_list are counted as failures. log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s', task_id, email_id, [i['email'] for i in to_list]) - return subtask_progress, retry_exc + num_failed = len(to_list) + new_subtask_progress = increment_subtask_result(subtask_progress, 0, num_failed, 0) + return new_subtask_progress, retry_exc def _statsd_tag(course_title): diff --git a/lms/djangoapps/instructor_task/api_helper.py b/lms/djangoapps/instructor_task/api_helper.py index 0e9a91263e..1451963693 100644 --- a/lms/djangoapps/instructor_task/api_helper.py +++ b/lms/djangoapps/instructor_task/api_helper.py @@ -115,7 +115,7 @@ def _update_instructor_task(instructor_task, task_result): task_output = None entry_needs_updating = True - if result_state == SUCCESS and instructor_task.task_state == PROGRESS and len(instructor_task.subtasks) > 0: + if instructor_task.task_state == PROGRESS and len(instructor_task.subtasks) > 0: # This happens when running subtasks: the result object is marked with SUCCESS, # meaning that the subtasks have successfully been defined. However, the InstructorTask # will be marked as in PROGRESS, until the last subtask completes and marks it as SUCCESS. diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 179fc13cfd..f303b1ce6e 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -14,27 +14,61 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING TASK_LOG = get_task_logger(__name__) -def create_subtask_result(new_num_sent, new_num_error, new_num_optout): - """Return the result of course_email sending as a dict (not a string).""" - attempted = new_num_sent + new_num_error - current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error} +def create_subtask_result(num_sent, num_error, num_optout): + """ + Create a result of a subtask. + + Keys are: 'attempted', 'succeeded', 'skipped', 'failed'. + + Object must be JSON-serializable. + """ + attempted = num_sent + num_error + current_result = {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error} return current_result +def increment_subtask_result(subtask_result, new_num_sent, new_num_error, new_num_optout): + """ + Update the result of a subtask with additional results. + + Keys are: 'attempted', 'succeeded', 'skipped', 'failed'. + """ + new_result = create_subtask_result(new_num_sent, new_num_error, new_num_optout) + for keyname in new_result: + if keyname in subtask_result: + new_result[keyname] += subtask_result[keyname] + return new_result + + def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_id_list): """ Store initial subtask information to InstructorTask object. - # Before we actually start running the tasks we've defined, - # the InstructorTask needs to be updated with their information. - # So we update the InstructorTask object here, not in the return. - # The monitoring code knows that it shouldn't go to the InstructorTask's task's - # Result for its progress when there are subtasks. So we accumulate - # the results of each subtask as it completes into the InstructorTask. - # At this point, we have some status that we can report, as to the magnitude of the overall - # task. That is, we know the total. Set that, and our subtasks should work towards that goal. - # Note that we add start_time in here, so that it can be used - # by subtasks to calculate duration_ms values: + The InstructorTask's "task_output" field is initialized. This is a JSON-serialized dict. + Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero, + as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations, + and the total number of "things to do" is set, so the user can be told how much needs to be + done overall. The `action_name` is also stored, to also help with constructing more readable + progress messages. + + The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict. + Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of + subtasks. 'Total' is set here to the total number, while the other three are initialized to zero. + Once the counters for 'succeeded' and 'failed' match the 'total', the subtasks are done and + the InstructorTask's "status" will be changed to SUCCESS. + + The "subtasks" field also contains a 'status' key, that contains a dict that stores status + information for each subtask. At the moment, the value for each subtask (keyed by its task_id) + is the value of `status`, which is initialized here to QUEUING. + + This information needs to be set up in the InstructorTask before any of the subtasks start + running. If not, there is a chance that the subtasks could complete before the parent task + is done creating subtasks. Doing so also simplifies the save() here, as it avoids the need + for locking. + + Monitoring code should assume that if an InstructorTask has subtask information, that it should + rely on the status stored in the InstructorTask object, rather than status stored in the + corresponding AsyncResult. """ progress = { 'action_name': action_name, @@ -64,6 +98,27 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i def update_subtask_status(entry_id, current_task_id, status, subtask_result): """ Update the status of the subtask in the parent InstructorTask object tracking its progress. + + Uses select_for_update to lock the InstructorTask object while it is being updated. + The operation is surrounded by a try/except/else that permit the manual transaction to be + committed on completion, or rolled back on error. + + The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict. + Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_result` + into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms' + value with the current interval since the original InstructorTask started. + + The InstructorTask's "subtasks" field is also updated. This is also a JSON-serialized dict. + Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of + subtasks. 'Total' is expected to have been set at the time the subtasks were created. + The other three counters are incremented depending on the value of `status`. Once the counters + for 'succeeded' and 'failed' match the 'total', the subtasks are done and the InstructorTask's + "status" is changed to SUCCESS. + + The "subtasks" field also contains a 'status' key, that contains a dict that stores status + information for each subtask. At the moment, the value for each subtask (keyed by its task_id) + is the value of `status`, but could be expanded in future to store information about failure + messages, progress made, etc. """ TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s", current_task_id, entry_id, subtask_result) diff --git a/lms/djangoapps/instructor_task/tasks_helper.py b/lms/djangoapps/instructor_task/tasks_helper.py index 786ea2b91f..1ca444756d 100644 --- a/lms/djangoapps/instructor_task/tasks_helper.py +++ b/lms/djangoapps/instructor_task/tasks_helper.py @@ -271,11 +271,8 @@ def run_main_task(entry_id, task_fcn, action_name): task_input = json.loads(entry.task_input) # construct log message: - # TODO: generalize this beyond just problem and student, so it includes email_id and to_option. - # Can we just loop over all keys and output them all? Just print the task_input dict itself? - module_state_key = task_input.get('problem_url') - fmt = 'task "{task_id}": course "{course_id}" problem "{state_key}"' - task_info_string = fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key) + fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"' + task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input) TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string)