|
|
|
|
@@ -52,8 +52,10 @@ def get_recipient_queryset(user_id, to_option, course_id, course_location):
|
|
|
|
|
instructor_qset = instructor_group.user_set.all()
|
|
|
|
|
recipient_qset = staff_qset | instructor_qset
|
|
|
|
|
if to_option == SEND_TO_ALL:
|
|
|
|
|
enrollment_qset = User.objects.filter(courseenrollment__course_id=course_id,
|
|
|
|
|
courseenrollment__is_active=True)
|
|
|
|
|
enrollment_qset = User.objects.filter(
|
|
|
|
|
courseenrollment__course_id=course_id,
|
|
|
|
|
courseenrollment__is_active=True
|
|
|
|
|
)
|
|
|
|
|
recipient_qset = recipient_qset | enrollment_qset
|
|
|
|
|
recipient_qset = recipient_qset.distinct()
|
|
|
|
|
else:
|
|
|
|
|
@@ -164,12 +166,13 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
|
|
|
|
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
|
|
|
|
|
subtask_id = str(uuid4())
|
|
|
|
|
subtask_id_list.append(subtask_id)
|
|
|
|
|
subtask_progress = _course_email_result(None, 0, 0, 0)
|
|
|
|
|
task_list.append(send_course_email.subtask((
|
|
|
|
|
entry_id,
|
|
|
|
|
email_id,
|
|
|
|
|
to_list,
|
|
|
|
|
global_email_context,
|
|
|
|
|
False
|
|
|
|
|
subtask_progress,
|
|
|
|
|
), task_id=subtask_id
|
|
|
|
|
))
|
|
|
|
|
num_workers += num_tasks_this_query
|
|
|
|
|
@@ -206,6 +209,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
|
|
|
|
return progress
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: figure out if we really need this after all (for unit tests...)
|
|
|
|
|
def _get_current_task():
|
|
|
|
|
"""Stub to make it easier to test without actually running Celery"""
|
|
|
|
|
return current_task
|
|
|
|
|
@@ -224,47 +228,51 @@ def _update_subtask_status(entry_id, current_task_id, status, subtask_result):
|
|
|
|
|
subtask_dict = json.loads(entry.subtasks)
|
|
|
|
|
subtask_status = subtask_dict['status']
|
|
|
|
|
if current_task_id not in subtask_status:
|
|
|
|
|
# unexpected error -- raise an exception?
|
|
|
|
|
log.warning("Unexpected task_id '%s': unable to update status for email subtask of instructor task %d",
|
|
|
|
|
current_task_id, entry_id)
|
|
|
|
|
pass
|
|
|
|
|
# unexpected error -- raise an exception
|
|
|
|
|
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
|
|
|
|
|
msg = format_str.format(current_task_id, entry_id)
|
|
|
|
|
log.warning(msg)
|
|
|
|
|
raise ValueError(msg)
|
|
|
|
|
subtask_status[current_task_id] = status
|
|
|
|
|
# now update the parent task progress
|
|
|
|
|
|
|
|
|
|
# Update the parent task progress
|
|
|
|
|
task_progress = json.loads(entry.task_output)
|
|
|
|
|
start_time = task_progress['start_time']
|
|
|
|
|
task_progress['duration_ms'] = int((time() - start_time) * 1000)
|
|
|
|
|
if subtask_result is not None:
|
|
|
|
|
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
|
|
|
|
|
task_progress[statname] += subtask_result[statname]
|
|
|
|
|
# now figure out if we're actually done (i.e. this is the last task to complete)
|
|
|
|
|
# (This might be easier by just maintaining a counter, rather than scanning the
|
|
|
|
|
# entire subtask_status dict.)
|
|
|
|
|
|
|
|
|
|
# Figure out if we're actually done (i.e. this is the last task to complete).
|
|
|
|
|
# This is easier if we just maintain a counter, rather than scanning the
|
|
|
|
|
# entire subtask_status dict.
|
|
|
|
|
if status == SUCCESS:
|
|
|
|
|
subtask_dict['succeeded'] += 1
|
|
|
|
|
else:
|
|
|
|
|
subtask_dict['failed'] += 1
|
|
|
|
|
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
|
|
|
|
|
# If we're done with the last task, update the parent status to indicate that:
|
|
|
|
|
if num_remaining <= 0:
|
|
|
|
|
# we're done with the last task: update the parent status to indicate that:
|
|
|
|
|
entry.task_state = SUCCESS
|
|
|
|
|
entry.subtasks = json.dumps(subtask_dict)
|
|
|
|
|
entry.task_output = InstructorTask.create_output_for_success(task_progress)
|
|
|
|
|
|
|
|
|
|
log.info("Task output updated to %s for email subtask %s of instructor task %d",
|
|
|
|
|
entry.task_output, current_task_id, entry_id)
|
|
|
|
|
|
|
|
|
|
# TODO: temporary -- switch to debug
|
|
|
|
|
log.info("about to save....")
|
|
|
|
|
entry.save()
|
|
|
|
|
except:
|
|
|
|
|
log.exception("Unexpected error while updating InstructorTask.")
|
|
|
|
|
transaction.rollback()
|
|
|
|
|
else:
|
|
|
|
|
# TODO: temporary -- switch to debug
|
|
|
|
|
log.info("about to commit....")
|
|
|
|
|
transaction.commit()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
|
|
|
|
|
def send_course_email(entry_id, email_id, to_list, global_email_context, throttle=False):
|
|
|
|
|
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_progress):
|
|
|
|
|
"""
|
|
|
|
|
Takes a primary id for a CourseEmail object and a 'to_list' of recipient objects--keys are
|
|
|
|
|
'profile__name', 'email' (address), and 'pk' (in the user table).
|
|
|
|
|
@@ -276,49 +284,64 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, throttl
|
|
|
|
|
# Get entry here, as a sanity check that it actually exists. We won't actually do anything
|
|
|
|
|
# with it right away.
|
|
|
|
|
InstructorTask.objects.get(pk=entry_id)
|
|
|
|
|
|
|
|
|
|
# Get information from current task's request:
|
|
|
|
|
current_task_id = _get_current_task().request.id
|
|
|
|
|
retry_index = _get_current_task().request.retries
|
|
|
|
|
|
|
|
|
|
log.info("Preparing to send email as subtask %s for instructor task %d",
|
|
|
|
|
current_task_id, entry_id)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
course_title = global_email_context['course_title']
|
|
|
|
|
course_email_result_value = None
|
|
|
|
|
with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
|
|
|
|
|
course_email_result = _send_course_email(email_id, to_list, global_email_context, throttle)
|
|
|
|
|
course_email_result_value = _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index)
|
|
|
|
|
# Assume that if we get here without a raise, the task was successful.
|
|
|
|
|
# Update the InstructorTask object that is storing its progress.
|
|
|
|
|
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result)
|
|
|
|
|
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
# try to write out the failure to the entry before failing
|
|
|
|
|
_, exception, traceback = exc_info()
|
|
|
|
|
traceback_string = format_exc(traceback) if traceback is not None else ''
|
|
|
|
|
log.warning("background task (%s) failed: %s %s", current_task_id, exception, traceback_string)
|
|
|
|
|
_update_subtask_status(entry_id, current_task_id, FAILURE, None)
|
|
|
|
|
_update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress)
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
return course_email_result
|
|
|
|
|
return course_email_result_value
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _send_course_email(email_id, to_list, global_email_context, throttle):
|
|
|
|
|
def _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index):
|
|
|
|
|
"""
|
|
|
|
|
Performs the email sending task.
|
|
|
|
|
"""
|
|
|
|
|
throttle = retry_index > 0
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
course_email = CourseEmail.objects.get(id=email_id)
|
|
|
|
|
except CourseEmail.DoesNotExist:
|
|
|
|
|
log.exception("Could not find email id:{} to send.".format(email_id))
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
# exclude optouts
|
|
|
|
|
optouts = (Optout.objects.filter(course_id=course_email.course_id,
|
|
|
|
|
user__in=[i['pk'] for i in to_list])
|
|
|
|
|
.values_list('user__email', flat=True))
|
|
|
|
|
# exclude optouts (if not a retry):
|
|
|
|
|
# Note that we don't have to do the optout logic at all if this is a retry,
|
|
|
|
|
# because we have presumably already performed the optout logic on the first
|
|
|
|
|
# attempt. Anyone on the to_list on a retry has already passed the filter
|
|
|
|
|
# that existed at that time, and we don't need to keep checking for changes
|
|
|
|
|
# in the Optout list.
|
|
|
|
|
num_optout = 0
|
|
|
|
|
if retry_index == 0:
|
|
|
|
|
optouts = (Optout.objects.filter(course_id=course_email.course_id,
|
|
|
|
|
user__in=[i['pk'] for i in to_list])
|
|
|
|
|
.values_list('user__email', flat=True))
|
|
|
|
|
|
|
|
|
|
optouts = set(optouts)
|
|
|
|
|
num_optout = len(optouts)
|
|
|
|
|
|
|
|
|
|
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
|
|
|
|
|
optouts = set(optouts)
|
|
|
|
|
# Only count the num_optout for the first time the optouts are calculated.
|
|
|
|
|
# We assume that the number will not change on retries, and so we don't need
|
|
|
|
|
# to calculate it each time.
|
|
|
|
|
num_optout = len(optouts)
|
|
|
|
|
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
|
|
|
|
|
|
|
|
|
|
course_title = global_email_context['course_title']
|
|
|
|
|
subject = "[" + course_title + "] " + course_email.subject
|
|
|
|
|
@@ -336,11 +359,11 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
|
|
|
|
|
|
|
|
|
course_email_template = CourseEmailTemplate.get_template()
|
|
|
|
|
|
|
|
|
|
num_sent = 0
|
|
|
|
|
num_error = 0
|
|
|
|
|
try:
|
|
|
|
|
connection = get_connection()
|
|
|
|
|
connection.open()
|
|
|
|
|
num_sent = 0
|
|
|
|
|
num_error = 0
|
|
|
|
|
|
|
|
|
|
# Define context values to use in all course emails:
|
|
|
|
|
email_context = {
|
|
|
|
|
@@ -370,7 +393,7 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
|
|
|
|
email_msg.attach_alternative(html_msg, 'text/html')
|
|
|
|
|
|
|
|
|
|
# Throttle if we tried a few times and got the rate limiter
|
|
|
|
|
if throttle or current_task.request.retries > 0:
|
|
|
|
|
if throttle:
|
|
|
|
|
sleep(0.2)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
@@ -398,14 +421,11 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
|
|
|
|
|
|
|
|
|
to_list.pop()
|
|
|
|
|
|
|
|
|
|
connection.close()
|
|
|
|
|
# TODO: figure out how to get (or persist) real statistics for this task, so that reflects progress
|
|
|
|
|
# made over multiple retries.
|
|
|
|
|
return course_email_result(num_sent, num_error, num_optout)
|
|
|
|
|
|
|
|
|
|
except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc:
|
|
|
|
|
# Error caught here cause the email to be retried. The entire task is actually retried without popping the list
|
|
|
|
|
# Reasoning is that all of these errors may be temporary condition.
|
|
|
|
|
# TODO: figure out what this means. Presumably we have popped the list with those that have succeeded
|
|
|
|
|
# and failed, rather than those needing a later retry.
|
|
|
|
|
log.warning('Email with id %d not delivered due to temporary error %s, retrying send to %d recipients',
|
|
|
|
|
email_id, exc, len(to_list))
|
|
|
|
|
raise send_course_email.retry(
|
|
|
|
|
@@ -413,10 +433,10 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
|
|
|
|
email_id,
|
|
|
|
|
to_list,
|
|
|
|
|
global_email_context,
|
|
|
|
|
current_task.request.retries > 0
|
|
|
|
|
_course_email_result(subtask_progress, num_sent, num_error, num_optout),
|
|
|
|
|
],
|
|
|
|
|
exc=exc,
|
|
|
|
|
countdown=(2 ** current_task.request.retries) * 15
|
|
|
|
|
countdown=(2 ** retry_index) * 15
|
|
|
|
|
)
|
|
|
|
|
except:
|
|
|
|
|
log.exception('Email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
|
|
|
|
|
@@ -425,12 +445,22 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
|
|
|
|
# Close the connection before we exit
|
|
|
|
|
connection.close()
|
|
|
|
|
raise
|
|
|
|
|
else:
|
|
|
|
|
connection.close()
|
|
|
|
|
# Add current progress to any progress stemming from previous retries:
|
|
|
|
|
return _course_email_result(subtask_progress, num_sent, num_error, num_optout)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def course_email_result(num_sent, num_error, num_optout):
|
|
|
|
|
def _course_email_result(previous_result, new_num_sent, new_num_error, new_num_optout):
|
|
|
|
|
"""Return the result of course_email sending as a dict (not a string)."""
|
|
|
|
|
attempted = num_sent + num_error
|
|
|
|
|
return {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error}
|
|
|
|
|
attempted = new_num_sent + new_num_error
|
|
|
|
|
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
|
|
|
|
|
# add in any previous results:
|
|
|
|
|
if previous_result is not None:
|
|
|
|
|
for keyname in current_result:
|
|
|
|
|
if keyname in previous_result:
|
|
|
|
|
current_result[keyname] += previous_result[keyname]
|
|
|
|
|
return current_result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _statsd_tag(course_title):
|
|
|
|
|
|