Add some handling for SES exceptions.
This commit is contained in:
@@ -4,6 +4,7 @@ to a course.
|
||||
"""
|
||||
import math
|
||||
import re
|
||||
import random
|
||||
from uuid import uuid4
|
||||
from time import sleep
|
||||
|
||||
@@ -12,6 +13,8 @@ from traceback import format_exc
|
||||
|
||||
from dogapi import dog_stats_api
|
||||
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError
|
||||
from boto.ses.exceptions import SESDailyQuotaExceededError, SESMaxSendingRateExceededError
|
||||
from boto.exception import AWSConnectionError
|
||||
|
||||
from celery import task, current_task, group
|
||||
from celery.utils.log import get_task_logger
|
||||
@@ -34,12 +37,26 @@ from instructor_task.models import InstructorTask
|
||||
from instructor_task.subtasks import (
|
||||
update_subtask_status,
|
||||
create_subtask_result,
|
||||
increment_subtask_result,
|
||||
update_instructor_task_for_subtasks,
|
||||
)
|
||||
|
||||
log = get_task_logger(__name__)
|
||||
|
||||
|
||||
# Exceptions that, if caught, should cause the task to be re-tried.
|
||||
# These errors will be caught a maximum of 5 times before the task fails.
|
||||
RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError)
|
||||
|
||||
# Errors that involve exceeding a quota of sent email
|
||||
QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, )
|
||||
|
||||
# Errors that mail is being sent too quickly. When caught by a task, it
|
||||
# triggers an exponential backoff and retry. Retries happen continuously until
|
||||
# the email is sent.
|
||||
SENDING_RATE_ERRORS = (SESMaxSendingRateExceededError, )
|
||||
|
||||
|
||||
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
|
||||
"""
|
||||
Generates a query set corresponding to the requested category.
|
||||
@@ -154,7 +171,12 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.EMAILS_PER_TASK)))
|
||||
chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query)))
|
||||
for i in range(num_tasks_this_query):
|
||||
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
|
||||
if i == num_tasks_this_query - 1:
|
||||
# Avoid cutting off the very last email when chunking a task that divides perfectly
|
||||
# (eg num_emails_this_query = 297 and EMAILS_PER_TASK is 100)
|
||||
to_list = recipient_sublist[i * chunk:]
|
||||
else:
|
||||
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
|
||||
subtask_id = str(uuid4())
|
||||
subtask_id_list.append(subtask_id)
|
||||
task_list.append(send_course_email.subtask((
|
||||
@@ -165,7 +187,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
),
|
||||
task_id=subtask_id,
|
||||
routing_key=settings.HIGH_PRIORITY_QUEUE,
|
||||
queue=settings.HIGH_PRIORITY_QUEUE,
|
||||
))
|
||||
num_workers += num_tasks_this_query
|
||||
|
||||
@@ -177,7 +198,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
|
||||
# now group the subtasks, and start them running:
|
||||
task_group = group(task_list)
|
||||
task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE, queue=settings.HIGH_PRIORITY_QUEUE)
|
||||
task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE)
|
||||
|
||||
# We want to return progress here, as this is what will be stored in the
|
||||
# AsyncResult for the parent task as its return value.
|
||||
@@ -220,8 +241,9 @@ def send_course_email(entry_id, email_id, to_list, global_email_context):
|
||||
|
||||
# Get information from current task's request:
|
||||
current_task_id = _get_current_task().request.id
|
||||
log.info("Preparing to send email as subtask %s for instructor task %d: request = %s",
|
||||
current_task_id, entry_id, _get_current_task().request)
|
||||
num_to_send = len(to_list)
|
||||
log.info("Preparing to send %s emails as subtask %s for instructor task %d: request = %s",
|
||||
num_to_send, current_task_id, entry_id, _get_current_task().request)
|
||||
|
||||
send_exception = None
|
||||
course_email_result_value = None
|
||||
@@ -239,9 +261,10 @@ def send_course_email(entry_id, email_id, to_list, global_email_context):
|
||||
_, send_exception, traceback = exc_info()
|
||||
traceback_string = format_exc(traceback) if traceback is not None else ''
|
||||
log.error("background task (%s) failed unexpectedly: %s %s", current_task_id, send_exception, traceback_string)
|
||||
# consider all emails to not be sent, and update stats:
|
||||
num_error = len(to_list)
|
||||
course_email_result_value = create_subtask_result(0, num_error, 0)
|
||||
# We got here for really unexpected reasons. Since we don't know how far
|
||||
# the task got in emailing, we count all recipients as having failed.
|
||||
# It at least keeps the counts consistent.
|
||||
course_email_result_value = create_subtask_result(0, num_to_send, 0)
|
||||
|
||||
if send_exception is None:
|
||||
# Update the InstructorTask object that is storing its progress.
|
||||
@@ -391,13 +414,19 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
|
||||
# Pop the user that was emailed off the end of the list:
|
||||
to_list.pop()
|
||||
|
||||
except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc:
|
||||
except SENDING_RATE_ERRORS as exc:
|
||||
subtask_progress = create_subtask_result(num_sent, num_error, num_optout)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True
|
||||
)
|
||||
|
||||
except RETRY_ERRORS as exc:
|
||||
# Errors caught here cause the email to be retried. The entire task is actually retried
|
||||
# without popping the current recipient off of the existing list.
|
||||
# Errors caught are those that indicate a temporary condition that might succeed on retry.
|
||||
subtask_progress = create_subtask_result(num_sent, num_error, num_optout)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
@@ -406,8 +435,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
|
||||
# If we're going to just mark it as failed
|
||||
# And the log message below should indicate which task_id is failing, so we have a chance to
|
||||
# reconstruct the problems.
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
|
||||
task_id, email_id, [i['email'] for i in to_list])
|
||||
if isinstance(exc, QUOTA_EXCEEDED_ERRORS):
|
||||
log.exception('WARNING: Course "%s" exceeded quota!', course_title)
|
||||
log.exception('Email with id %d not sent due to exceeding quota. To list: %s',
|
||||
email_id,
|
||||
[i['email'] for i in to_list])
|
||||
else:
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
|
||||
task_id, email_id, [i['email'] for i in to_list])
|
||||
num_error += len(to_list)
|
||||
return create_subtask_result(num_sent, num_error, num_optout), exc
|
||||
else:
|
||||
@@ -418,7 +453,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
|
||||
connection.close()
|
||||
|
||||
|
||||
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress):
|
||||
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress, is_sending_rate_error):
|
||||
"""
|
||||
Helper function to requeue a task for retry, using the new version of arguments provided.
|
||||
|
||||
@@ -443,6 +478,15 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
|
||||
log.warning('Task %s: email with id %d not delivered due to temporary error %s, retrying send to %d recipients',
|
||||
task_id, email_id, current_exception, len(to_list))
|
||||
|
||||
# Don't resend emails that have already succeeded.
|
||||
# Retry the email at increasing exponential backoff.
|
||||
|
||||
if is_sending_rate_error:
|
||||
countdown = ((2 ** retry_index) * 15) * random.uniform(.5, 1.5)
|
||||
else:
|
||||
countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5)
|
||||
|
||||
try:
|
||||
send_course_email.retry(
|
||||
args=[
|
||||
@@ -452,7 +496,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
global_email_context,
|
||||
],
|
||||
exc=current_exception,
|
||||
countdown=(2 ** retry_index) * 15,
|
||||
countdown=countdown,
|
||||
throw=True,
|
||||
)
|
||||
except RetryTaskError as retry_error:
|
||||
@@ -464,10 +508,13 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
# If there are no more retries, because the maximum has been reached,
|
||||
# we expect the original exception to be raised. We catch it here
|
||||
# (and put it in retry_exc just in case it's different, but it shouldn't be),
|
||||
# and update status as if it were any other failure.
|
||||
# and update status as if it were any other failure. That means that
|
||||
# the recipients still in the to_list are counted as failures.
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
|
||||
task_id, email_id, [i['email'] for i in to_list])
|
||||
return subtask_progress, retry_exc
|
||||
num_failed = len(to_list)
|
||||
new_subtask_progress = increment_subtask_result(subtask_progress, 0, num_failed, 0)
|
||||
return new_subtask_progress, retry_exc
|
||||
|
||||
|
||||
def _statsd_tag(course_title):
|
||||
|
||||
@@ -115,7 +115,7 @@ def _update_instructor_task(instructor_task, task_result):
|
||||
task_output = None
|
||||
entry_needs_updating = True
|
||||
|
||||
if result_state == SUCCESS and instructor_task.task_state == PROGRESS and len(instructor_task.subtasks) > 0:
|
||||
if instructor_task.task_state == PROGRESS and len(instructor_task.subtasks) > 0:
|
||||
# This happens when running subtasks: the result object is marked with SUCCESS,
|
||||
# meaning that the subtasks have successfully been defined. However, the InstructorTask
|
||||
# will be marked as in PROGRESS, until the last subtask completes and marks it as SUCCESS.
|
||||
|
||||
@@ -14,27 +14,61 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
|
||||
TASK_LOG = get_task_logger(__name__)
|
||||
|
||||
|
||||
def create_subtask_result(new_num_sent, new_num_error, new_num_optout):
|
||||
"""Return the result of course_email sending as a dict (not a string)."""
|
||||
attempted = new_num_sent + new_num_error
|
||||
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
|
||||
def create_subtask_result(num_sent, num_error, num_optout):
|
||||
"""
|
||||
Create a result of a subtask.
|
||||
|
||||
Keys are: 'attempted', 'succeeded', 'skipped', 'failed'.
|
||||
|
||||
Object must be JSON-serializable.
|
||||
"""
|
||||
attempted = num_sent + num_error
|
||||
current_result = {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error}
|
||||
return current_result
|
||||
|
||||
|
||||
def increment_subtask_result(subtask_result, new_num_sent, new_num_error, new_num_optout):
|
||||
"""
|
||||
Update the result of a subtask with additional results.
|
||||
|
||||
Keys are: 'attempted', 'succeeded', 'skipped', 'failed'.
|
||||
"""
|
||||
new_result = create_subtask_result(new_num_sent, new_num_error, new_num_optout)
|
||||
for keyname in new_result:
|
||||
if keyname in subtask_result:
|
||||
new_result[keyname] += subtask_result[keyname]
|
||||
return new_result
|
||||
|
||||
|
||||
def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_id_list):
|
||||
"""
|
||||
Store initial subtask information to InstructorTask object.
|
||||
|
||||
# Before we actually start running the tasks we've defined,
|
||||
# the InstructorTask needs to be updated with their information.
|
||||
# So we update the InstructorTask object here, not in the return.
|
||||
# The monitoring code knows that it shouldn't go to the InstructorTask's task's
|
||||
# Result for its progress when there are subtasks. So we accumulate
|
||||
# the results of each subtask as it completes into the InstructorTask.
|
||||
# At this point, we have some status that we can report, as to the magnitude of the overall
|
||||
# task. That is, we know the total. Set that, and our subtasks should work towards that goal.
|
||||
# Note that we add start_time in here, so that it can be used
|
||||
# by subtasks to calculate duration_ms values:
|
||||
The InstructorTask's "task_output" field is initialized. This is a JSON-serialized dict.
|
||||
Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero,
|
||||
as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations,
|
||||
and the total number of "things to do" is set, so the user can be told how much needs to be
|
||||
done overall. The `action_name` is also stored, to also help with constructing more readable
|
||||
progress messages.
|
||||
|
||||
The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict.
|
||||
Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
|
||||
subtasks. 'Total' is set here to the total number, while the other three are initialized to zero.
|
||||
Once the counters for 'succeeded' and 'failed' match the 'total', the subtasks are done and
|
||||
the InstructorTask's "status" will be changed to SUCCESS.
|
||||
|
||||
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
|
||||
information for each subtask. At the moment, the value for each subtask (keyed by its task_id)
|
||||
is the value of `status`, which is initialized here to QUEUING.
|
||||
|
||||
This information needs to be set up in the InstructorTask before any of the subtasks start
|
||||
running. If not, there is a chance that the subtasks could complete before the parent task
|
||||
is done creating subtasks. Doing so also simplifies the save() here, as it avoids the need
|
||||
for locking.
|
||||
|
||||
Monitoring code should assume that if an InstructorTask has subtask information, that it should
|
||||
rely on the status stored in the InstructorTask object, rather than status stored in the
|
||||
corresponding AsyncResult.
|
||||
"""
|
||||
progress = {
|
||||
'action_name': action_name,
|
||||
@@ -64,6 +98,27 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
|
||||
def update_subtask_status(entry_id, current_task_id, status, subtask_result):
|
||||
"""
|
||||
Update the status of the subtask in the parent InstructorTask object tracking its progress.
|
||||
|
||||
Uses select_for_update to lock the InstructorTask object while it is being updated.
|
||||
The operation is surrounded by a try/except/else that permit the manual transaction to be
|
||||
committed on completion, or rolled back on error.
|
||||
|
||||
The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict.
|
||||
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_result`
|
||||
into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms'
|
||||
value with the current interval since the original InstructorTask started.
|
||||
|
||||
The InstructorTask's "subtasks" field is also updated. This is also a JSON-serialized dict.
|
||||
Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
|
||||
subtasks. 'Total' is expected to have been set at the time the subtasks were created.
|
||||
The other three counters are incremented depending on the value of `status`. Once the counters
|
||||
for 'succeeded' and 'failed' match the 'total', the subtasks are done and the InstructorTask's
|
||||
"status" is changed to SUCCESS.
|
||||
|
||||
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
|
||||
information for each subtask. At the moment, the value for each subtask (keyed by its task_id)
|
||||
is the value of `status`, but could be expanded in future to store information about failure
|
||||
messages, progress made, etc.
|
||||
"""
|
||||
TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
|
||||
current_task_id, entry_id, subtask_result)
|
||||
|
||||
@@ -271,11 +271,8 @@ def run_main_task(entry_id, task_fcn, action_name):
|
||||
task_input = json.loads(entry.task_input)
|
||||
|
||||
# construct log message:
|
||||
# TODO: generalize this beyond just problem and student, so it includes email_id and to_option.
|
||||
# Can we just loop over all keys and output them all? Just print the task_input dict itself?
|
||||
module_state_key = task_input.get('problem_url')
|
||||
fmt = 'task "{task_id}": course "{course_id}" problem "{state_key}"'
|
||||
task_info_string = fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key)
|
||||
fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"'
|
||||
task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input)
|
||||
|
||||
TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user