Merge pull request #1495 from edx/brian/remove-celery-group
Remove the use of celery.group from bulk email subtasks.
This commit is contained in:
@@ -5,6 +5,9 @@ These are notable changes in edx-platform. This is a rolling list of changes,
|
||||
in roughly chronological order, most recent first. Add your entries at or near
|
||||
the top. Include a label indicating the component affected.
|
||||
|
||||
LMS: Change bulk email implementation to use less memory, and to better handle
|
||||
duplicate tasks in celery.
|
||||
|
||||
LMS: Improve forum error handling so that errors in the logs are
|
||||
clearer and HTTP status codes from the comments service indicating
|
||||
client error are correctly passed through to the client.
|
||||
|
||||
@@ -2,11 +2,9 @@
|
||||
This module contains celery task functions for handling the sending of bulk email
|
||||
to a course.
|
||||
"""
|
||||
import math
|
||||
import re
|
||||
import random
|
||||
import json
|
||||
from uuid import uuid4
|
||||
from time import sleep
|
||||
|
||||
from dogapi import dog_stats_api
|
||||
@@ -24,7 +22,7 @@ from boto.ses.exceptions import (
|
||||
)
|
||||
from boto.exception import AWSConnectionError
|
||||
|
||||
from celery import task, current_task, group
|
||||
from celery import task, current_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from celery.states import SUCCESS, FAILURE, RETRY
|
||||
from celery.exceptions import RetryTaskError
|
||||
@@ -42,11 +40,10 @@ from courseware.access import _course_staff_group_name, _course_instructor_group
|
||||
from courseware.courses import get_course, course_image_url
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.subtasks import (
|
||||
update_subtask_status,
|
||||
create_subtask_status,
|
||||
increment_subtask_status,
|
||||
initialize_subtask_info,
|
||||
SubtaskStatus,
|
||||
queue_subtasks_for_query,
|
||||
check_subtask_is_valid,
|
||||
update_subtask_status,
|
||||
)
|
||||
|
||||
log = get_task_logger(__name__)
|
||||
@@ -152,60 +149,11 @@ def _get_course_email_context(course):
|
||||
return email_context
|
||||
|
||||
|
||||
def _generate_subtasks(create_subtask_fcn, recipient_qset):
|
||||
"""
|
||||
Generates a list of subtasks to send email to a given set of recipients.
|
||||
|
||||
Arguments:
|
||||
`create_subtask_fcn` : a function whose inputs are a list of recipients and a subtask_id
|
||||
to assign to the new subtask. Returns the subtask that will send email to that
|
||||
list of recipients.
|
||||
`recipient_qset` : a query set that defines the recipients who should receive emails.
|
||||
|
||||
Returns: a tuple, containing:
|
||||
|
||||
* A list of subtasks that will send emails to all recipients.
|
||||
* A list of subtask_ids corresponding to those subtasks.
|
||||
* A count of the total number of emails being sent.
|
||||
|
||||
"""
|
||||
total_num_emails = recipient_qset.count()
|
||||
num_queries = int(math.ceil(float(total_num_emails) / float(settings.BULK_EMAIL_EMAILS_PER_QUERY)))
|
||||
last_pk = recipient_qset[0].pk - 1
|
||||
num_emails_queued = 0
|
||||
task_list = []
|
||||
subtask_id_list = []
|
||||
for _ in range(num_queries):
|
||||
recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk).values('profile__name', 'email', 'pk')[:settings.BULK_EMAIL_EMAILS_PER_QUERY])
|
||||
last_pk = recipient_sublist[-1]['pk']
|
||||
num_emails_this_query = len(recipient_sublist)
|
||||
num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.BULK_EMAIL_EMAILS_PER_TASK)))
|
||||
chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query)))
|
||||
for i in range(num_tasks_this_query):
|
||||
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
|
||||
subtask_id = str(uuid4())
|
||||
subtask_id_list.append(subtask_id)
|
||||
new_subtask = create_subtask_fcn(to_list, subtask_id)
|
||||
task_list.append(new_subtask)
|
||||
|
||||
num_emails_queued += num_emails_this_query
|
||||
|
||||
# Sanity check: we expect the chunking to be properly summing to the original count:
|
||||
if num_emails_queued != total_num_emails:
|
||||
error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(num_emails_queued, total_num_emails)
|
||||
log.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
return task_list, subtask_id_list, total_num_emails
|
||||
|
||||
|
||||
def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
|
||||
"""
|
||||
Delegates emails by querying for the list of recipients who should
|
||||
get the mail, chopping up into batches of settings.BULK_EMAIL_EMAILS_PER_TASK size,
|
||||
and queueing up worker jobs.
|
||||
|
||||
Returns the number of batches (workers) kicked off.
|
||||
get the mail, chopping up into batches of no more than settings.BULK_EMAIL_EMAILS_PER_TASK
|
||||
in size, and queueing up worker jobs.
|
||||
"""
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
# Get inputs to use in this task from the entry.
|
||||
@@ -252,42 +200,41 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
log.exception("Task %s: course not found: %s", task_id, course_id)
|
||||
raise
|
||||
|
||||
# Get arguments that will be passed to every subtask.
|
||||
to_option = email_obj.to_option
|
||||
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
|
||||
global_email_context = _get_course_email_context(course)
|
||||
|
||||
def _create_send_email_subtask(to_list, subtask_id):
|
||||
def _create_send_email_subtask(to_list, initial_subtask_status):
|
||||
"""Creates a subtask to send email to a given recipient list."""
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
subtask_id = initial_subtask_status.task_id
|
||||
new_subtask = send_course_email.subtask(
|
||||
(
|
||||
entry_id,
|
||||
email_id,
|
||||
to_list,
|
||||
global_email_context,
|
||||
subtask_status,
|
||||
initial_subtask_status.to_dict(),
|
||||
),
|
||||
task_id=subtask_id,
|
||||
routing_key=settings.BULK_EMAIL_ROUTING_KEY,
|
||||
)
|
||||
return new_subtask
|
||||
|
||||
log.info("Task %s: Preparing to generate subtasks for course %s, email %s, to_option %s",
|
||||
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
|
||||
recipient_fields = ['profile__name', 'email']
|
||||
|
||||
log.info("Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s",
|
||||
task_id, course_id, email_id, to_option)
|
||||
task_list, subtask_id_list, total_num_emails = _generate_subtasks(_create_send_email_subtask, recipient_qset)
|
||||
|
||||
# Update the InstructorTask with information about the subtasks we've defined.
|
||||
log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s",
|
||||
task_id, total_num_emails, course_id, email_id, to_option)
|
||||
progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list)
|
||||
num_subtasks = len(subtask_id_list)
|
||||
|
||||
# Now group the subtasks, and start them running. This allows all the subtasks
|
||||
# in the list to be submitted at the same time.
|
||||
log.info("Task %s: Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s",
|
||||
task_id, num_subtasks, total_num_emails, course_id, email_id, to_option)
|
||||
task_group = group(task_list)
|
||||
task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY)
|
||||
progress = queue_subtasks_for_query(
|
||||
entry,
|
||||
action_name,
|
||||
_create_send_email_subtask,
|
||||
recipient_qset,
|
||||
recipient_fields,
|
||||
settings.BULK_EMAIL_EMAILS_PER_QUERY,
|
||||
settings.BULK_EMAIL_EMAILS_PER_TASK
|
||||
)
|
||||
|
||||
# We want to return progress here, as this is what will be stored in the
|
||||
# AsyncResult for the parent task as its return value.
|
||||
@@ -298,7 +245,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
|
||||
|
||||
@task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES) # pylint: disable=E1102
|
||||
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
|
||||
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status_dict):
|
||||
"""
|
||||
Sends an email to a list of recipients.
|
||||
|
||||
@@ -312,7 +259,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
* `global_email_context`: dict containing values that are unique for this email but the same
|
||||
for all recipients of this email. This dict is to be used to fill in slots in email
|
||||
template. It does not include 'name' and 'email', which will be provided by the to_list.
|
||||
* `subtask_status` : dict containing values representing current status. Keys are:
|
||||
* `subtask_status_dict` : dict containing values representing current status. Keys are:
|
||||
|
||||
'task_id' : id of subtask. This is used to pass task information across retries.
|
||||
'attempted' : number of attempts -- should equal succeeded plus failed
|
||||
@@ -332,7 +279,8 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
Emails are sent multi-part, in both plain text and html. Updates InstructorTask object
|
||||
with status information (sends, failures, skips) and updates number of subtasks completed.
|
||||
"""
|
||||
current_task_id = subtask_status['task_id']
|
||||
subtask_status = SubtaskStatus.from_dict(subtask_status_dict)
|
||||
current_task_id = subtask_status.task_id
|
||||
num_to_send = len(to_list)
|
||||
log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
|
||||
email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
|
||||
@@ -362,32 +310,33 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
)
|
||||
except Exception:
|
||||
# Unexpected exception. Try to write out the failure to the entry before failing.
|
||||
log.exception("Send-email task %s: failed unexpectedly!", current_task_id)
|
||||
log.exception("Send-email task %s for email %s: failed unexpectedly!", current_task_id, email_id)
|
||||
# We got here for really unexpected reasons. Since we don't know how far
|
||||
# the task got in emailing, we count all recipients as having failed.
|
||||
# It at least keeps the counts consistent.
|
||||
new_subtask_status = increment_subtask_status(subtask_status, failed=num_to_send, state=FAILURE)
|
||||
update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
subtask_status.increment(failed=num_to_send, state=FAILURE)
|
||||
update_subtask_status(entry_id, current_task_id, subtask_status)
|
||||
raise
|
||||
|
||||
if send_exception is None:
|
||||
# Update the InstructorTask object that is storing its progress.
|
||||
log.info("Send-email task %s: succeeded", current_task_id)
|
||||
log.info("Send-email task %s for email %s: succeeded", current_task_id, email_id)
|
||||
update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
elif isinstance(send_exception, RetryTaskError):
|
||||
# If retrying, a RetryTaskError needs to be returned to Celery.
|
||||
# We assume that the the progress made before the retry condition
|
||||
# was encountered has already been updated before the retry call was made,
|
||||
# so we only log here.
|
||||
log.warning("Send-email task %s: being retried", current_task_id)
|
||||
log.warning("Send-email task %s for email %s: being retried", current_task_id, email_id)
|
||||
raise send_exception # pylint: disable=E0702
|
||||
else:
|
||||
log.error("Send-email task %s: failed: %s", current_task_id, send_exception)
|
||||
log.error("Send-email task %s for email %s: failed: %s", current_task_id, email_id, send_exception)
|
||||
update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
raise send_exception # pylint: disable=E0702
|
||||
|
||||
log.info("Send-email task %s: returning status %s", current_task_id, new_subtask_status)
|
||||
return new_subtask_status
|
||||
# return status in a form that can be serialized by Celery into JSON:
|
||||
log.info("Send-email task %s for email %s: returning status %s", current_task_id, email_id, new_subtask_status)
|
||||
return new_subtask_status.to_dict()
|
||||
|
||||
|
||||
def _filter_optouts_from_recipients(to_list, course_id):
|
||||
@@ -449,37 +398,20 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
* `global_email_context`: dict containing values that are unique for this email but the same
|
||||
for all recipients of this email. This dict is to be used to fill in slots in email
|
||||
template. It does not include 'name' and 'email', which will be provided by the to_list.
|
||||
* `subtask_status` : dict containing values representing current status. Keys are:
|
||||
|
||||
'task_id' : id of subtask. This is used to pass task information across retries.
|
||||
'attempted' : number of attempts -- should equal succeeded plus failed
|
||||
'succeeded' : number that succeeded in processing
|
||||
'skipped' : number that were not processed.
|
||||
'failed' : number that failed during processing
|
||||
'retried_nomax' : number of times the subtask has been retried for conditions that
|
||||
should not have a maximum count applied
|
||||
'retried_withmax' : number of times the subtask has been retried for conditions that
|
||||
should have a maximum count applied
|
||||
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
|
||||
* `subtask_status` : object of class SubtaskStatus representing current status.
|
||||
|
||||
Sends to all addresses contained in to_list that are not also in the Optout table.
|
||||
Emails are sent multi-part, in both plain text and html.
|
||||
|
||||
Returns a tuple of two values:
|
||||
* First value is a dict which represents current progress at the end of this call. Keys are
|
||||
the same as for the input subtask_status.
|
||||
* First value is a SubtaskStatus object which represents current progress at the end of this call.
|
||||
|
||||
* Second value is an exception returned by the innards of the method, indicating a fatal error.
|
||||
In this case, the number of recipients that were not sent have already been added to the
|
||||
'failed' count above.
|
||||
"""
|
||||
# Get information from current task's request:
|
||||
task_id = subtask_status['task_id']
|
||||
|
||||
# collect stats on progress:
|
||||
num_optout = 0
|
||||
num_sent = 0
|
||||
num_error = 0
|
||||
task_id = subtask_status.task_id
|
||||
|
||||
try:
|
||||
course_email = CourseEmail.objects.get(id=email_id)
|
||||
@@ -493,8 +425,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
# attempt. Anyone on the to_list on a retry has already passed the filter
|
||||
# that existed at that time, and we don't need to keep checking for changes
|
||||
# in the Optout list.
|
||||
if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0:
|
||||
if subtask_status.get_retry_count() == 0:
|
||||
to_list, num_optout = _filter_optouts_from_recipients(to_list, course_email.course_id)
|
||||
subtask_status.increment(skipped=num_optout)
|
||||
|
||||
course_title = global_email_context['course_title']
|
||||
subject = "[" + course_title + "] " + course_email.subject
|
||||
@@ -539,7 +472,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
# for a period of time between all emails within this task. Choice of
|
||||
# the value depends on the number of workers that might be sending email in
|
||||
# parallel, and what the SES throttle rate is.
|
||||
if subtask_status['retried_nomax'] > 0:
|
||||
if subtask_status.retried_nomax > 0:
|
||||
sleep(settings.BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS)
|
||||
|
||||
try:
|
||||
@@ -557,13 +490,13 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
# This will fall through and not retry the message.
|
||||
log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc.smtp_error)
|
||||
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
|
||||
num_error += 1
|
||||
subtask_status.increment(failed=1)
|
||||
|
||||
except SINGLE_EMAIL_FAILURE_ERRORS as exc:
|
||||
# This will fall through and not retry the message.
|
||||
log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc)
|
||||
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
|
||||
num_error += 1
|
||||
subtask_status.increment(failed=1)
|
||||
|
||||
else:
|
||||
dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)])
|
||||
@@ -571,7 +504,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
log.info('Email with id %s sent to %s', email_id, email)
|
||||
else:
|
||||
log.debug('Email with id %s sent to %s', email_id, email)
|
||||
num_sent += 1
|
||||
subtask_status.increment(succeeded=1)
|
||||
|
||||
# Pop the user that was emailed off the end of the list only once they have
|
||||
# successfully been processed. (That way, if there were a failure that
|
||||
@@ -582,16 +515,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)])
|
||||
# Increment the "retried_nomax" counter, update other counters with progress to date,
|
||||
# and set the state to RETRY:
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=num_error,
|
||||
skipped=num_optout,
|
||||
retried_nomax=1,
|
||||
state=RETRY
|
||||
)
|
||||
subtask_status.increment(retried_nomax=1, state=RETRY)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, skip_retry_max=True
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=True
|
||||
)
|
||||
|
||||
except LIMITED_RETRY_ERRORS as exc:
|
||||
@@ -601,16 +527,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
|
||||
# Increment the "retried_withmax" counter, update other counters with progress to date,
|
||||
# and set the state to RETRY:
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=num_error,
|
||||
skipped=num_optout,
|
||||
retried_withmax=1,
|
||||
state=RETRY
|
||||
)
|
||||
subtask_status.increment(retried_withmax=1, state=RETRY)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, skip_retry_max=False
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False
|
||||
)
|
||||
|
||||
except BULK_EMAIL_FAILURE_ERRORS as exc:
|
||||
@@ -620,14 +539,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
task_id, email_id, num_pending)
|
||||
# Update counters with progress to date, counting unsent emails as failures,
|
||||
# and set the state to FAILURE:
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=(num_error + num_pending),
|
||||
skipped=num_optout,
|
||||
state=FAILURE
|
||||
)
|
||||
return subtask_progress, exc
|
||||
subtask_status.increment(failed=num_pending, state=FAILURE)
|
||||
return subtask_status, exc
|
||||
|
||||
except Exception as exc:
|
||||
# Errors caught here cause the email to be retried. The entire task is actually retried
|
||||
@@ -639,30 +552,17 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
task_id, email_id)
|
||||
# Increment the "retried_withmax" counter, update other counters with progress to date,
|
||||
# and set the state to RETRY:
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=num_error,
|
||||
skipped=num_optout,
|
||||
retried_withmax=1,
|
||||
state=RETRY
|
||||
)
|
||||
subtask_status.increment(retried_withmax=1, state=RETRY)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, skip_retry_max=False
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False
|
||||
)
|
||||
|
||||
else:
|
||||
# All went well. Update counters with progress to date,
|
||||
# and set the state to SUCCESS:
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=num_error,
|
||||
skipped=num_optout,
|
||||
state=SUCCESS
|
||||
)
|
||||
subtask_status.increment(state=SUCCESS)
|
||||
# Successful completion is marked by an exception value of None.
|
||||
return subtask_progress, None
|
||||
return subtask_status, None
|
||||
finally:
|
||||
# Clean up at the end.
|
||||
connection.close()
|
||||
@@ -708,26 +608,26 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
successfully submitted, this value will be the RetryTaskError that retry() returns.
|
||||
Otherwise, it (ought to be) the current_exception passed in.
|
||||
"""
|
||||
task_id = subtask_status['task_id']
|
||||
task_id = subtask_status.task_id
|
||||
log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
|
||||
task_id, subtask_status['succeeded'], subtask_status['failed'], subtask_status['skipped'])
|
||||
task_id, subtask_status.succeeded, subtask_status.failed, subtask_status.skipped)
|
||||
|
||||
# Calculate time until we retry this task (in seconds):
|
||||
# The value for max_retries is increased by the number of times an "infinite-retry" exception
|
||||
# has been retried. We want the regular retries to trigger max-retry checking, but not these
|
||||
# special retries. So we count them separately.
|
||||
max_retries = _get_current_task().max_retries + subtask_status['retried_nomax']
|
||||
max_retries = _get_current_task().max_retries + subtask_status.retried_nomax
|
||||
base_delay = _get_current_task().default_retry_delay
|
||||
if skip_retry_max:
|
||||
# once we reach five retries, don't increase the countdown further.
|
||||
retry_index = min(subtask_status['retried_nomax'], 5)
|
||||
retry_index = min(subtask_status.retried_nomax, 5)
|
||||
exception_type = 'sending-rate'
|
||||
# if we have a cap, after all, apply it now:
|
||||
if hasattr(settings, 'BULK_EMAIL_INFINITE_RETRY_CAP'):
|
||||
retry_cap = settings.BULK_EMAIL_INFINITE_RETRY_CAP + subtask_status['retried_withmax']
|
||||
retry_cap = settings.BULK_EMAIL_INFINITE_RETRY_CAP + subtask_status.retried_withmax
|
||||
max_retries = min(max_retries, retry_cap)
|
||||
else:
|
||||
retry_index = subtask_status['retried_withmax']
|
||||
retry_index = subtask_status.retried_withmax
|
||||
exception_type = 'transient'
|
||||
|
||||
# Skew the new countdown value by a random factor, so that not all
|
||||
@@ -752,7 +652,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
email_id,
|
||||
to_list,
|
||||
global_email_context,
|
||||
subtask_status,
|
||||
subtask_status.to_dict(),
|
||||
],
|
||||
exc=current_exception,
|
||||
countdown=countdown,
|
||||
@@ -773,8 +673,8 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
|
||||
task_id, email_id, [i['email'] for i in to_list])
|
||||
num_failed = len(to_list)
|
||||
new_subtask_progress = increment_subtask_status(subtask_status, failed=num_failed, state=FAILURE)
|
||||
return new_subtask_progress, retry_exc
|
||||
subtask_status.increment(subtask_status, failed=num_failed, state=FAILURE)
|
||||
return subtask_status, retry_exc
|
||||
|
||||
|
||||
def _statsd_tag(course_title):
|
||||
|
||||
@@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from bulk_email.models import Optout
|
||||
from instructor_task.subtasks import increment_subtask_status
|
||||
from instructor_task.subtasks import update_subtask_status
|
||||
|
||||
STAFF_COUNT = 3
|
||||
STUDENT_COUNT = 10
|
||||
@@ -29,13 +29,13 @@ class MockCourseEmailResult(object):
|
||||
"""
|
||||
emails_sent = 0
|
||||
|
||||
def get_mock_increment_subtask_status(self):
|
||||
def get_mock_update_subtask_status(self):
|
||||
"""Wrapper for mock email function."""
|
||||
def mock_increment_subtask_status(original_status, **kwargs): # pylint: disable=W0613
|
||||
def mock_update_subtask_status(entry_id, current_task_id, new_subtask_status): # pylint: disable=W0613
|
||||
"""Increments count of number of emails sent."""
|
||||
self.emails_sent += kwargs.get('succeeded', 0)
|
||||
return increment_subtask_status(original_status, **kwargs)
|
||||
return mock_increment_subtask_status
|
||||
self.emails_sent += new_subtask_status.succeeded
|
||||
return update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
return mock_update_subtask_status
|
||||
|
||||
|
||||
@override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE)
|
||||
@@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
|
||||
)
|
||||
|
||||
@override_settings(BULK_EMAIL_EMAILS_PER_TASK=3, BULK_EMAIL_EMAILS_PER_QUERY=7)
|
||||
@patch('bulk_email.tasks.increment_subtask_status')
|
||||
@patch('bulk_email.tasks.update_subtask_status')
|
||||
def test_chunked_queries_send_numerous_emails(self, email_mock):
|
||||
"""
|
||||
Test sending a large number of emails, to test the chunked querying
|
||||
"""
|
||||
mock_factory = MockCourseEmailResult()
|
||||
email_mock.side_effect = mock_factory.get_mock_increment_subtask_status()
|
||||
email_mock.side_effect = mock_factory.get_mock_update_subtask_status()
|
||||
added_users = []
|
||||
for _ in xrange(LARGE_NUM_EMAILS):
|
||||
user = UserFactory()
|
||||
|
||||
@@ -22,8 +22,8 @@ from bulk_email.models import CourseEmail, SEND_TO_ALL
|
||||
from bulk_email.tasks import perform_delegate_email_batches, send_course_email
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.subtasks import (
|
||||
create_subtask_status,
|
||||
initialize_subtask_info,
|
||||
SubtaskStatus,
|
||||
check_subtask_is_valid,
|
||||
update_subtask_status,
|
||||
DuplicateTaskException,
|
||||
@@ -75,7 +75,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
self.assertIsInstance(exc, SMTPDataError)
|
||||
|
||||
@patch('bulk_email.tasks.get_connection', autospec=True)
|
||||
@patch('bulk_email.tasks.increment_subtask_status')
|
||||
@patch('bulk_email.tasks.update_subtask_status')
|
||||
@patch('bulk_email.tasks.send_course_email.retry')
|
||||
def test_data_err_fail(self, retry, result, get_conn):
|
||||
"""
|
||||
@@ -99,11 +99,11 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
# We shouldn't retry when hitting a 5xx error
|
||||
self.assertFalse(retry.called)
|
||||
# Test that after the rejected email, the rest still successfully send
|
||||
((_initial_results), kwargs) = result.call_args
|
||||
self.assertEquals(kwargs['skipped'], 0)
|
||||
((_entry_id, _current_task_id, subtask_status), _kwargs) = result.call_args
|
||||
self.assertEquals(subtask_status.skipped, 0)
|
||||
expected_fails = int((settings.BULK_EMAIL_EMAILS_PER_TASK + 3) / 4.0)
|
||||
self.assertEquals(kwargs['failed'], expected_fails)
|
||||
self.assertEquals(kwargs['succeeded'], settings.BULK_EMAIL_EMAILS_PER_TASK - expected_fails)
|
||||
self.assertEquals(subtask_status.failed, expected_fails)
|
||||
self.assertEquals(subtask_status.succeeded, settings.BULK_EMAIL_EMAILS_PER_TASK - expected_fails)
|
||||
|
||||
@patch('bulk_email.tasks.get_connection', autospec=True)
|
||||
@patch('bulk_email.tasks.send_course_email.retry')
|
||||
@@ -146,7 +146,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
exc = kwargs['exc']
|
||||
self.assertIsInstance(exc, SMTPConnectError)
|
||||
|
||||
@patch('bulk_email.tasks.increment_subtask_status')
|
||||
@patch('bulk_email.tasks.SubtaskStatus.increment')
|
||||
@patch('bulk_email.tasks.log')
|
||||
def test_nonexistent_email(self, mock_log, result):
|
||||
"""
|
||||
@@ -216,10 +216,10 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
subtask_id = "subtask-id-value"
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
subtask_status = SubtaskStatus.create(subtask_id)
|
||||
email_id = 1001
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find subtasks of instructor task'):
|
||||
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status)
|
||||
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status.to_dict())
|
||||
|
||||
def test_send_email_missing_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
@@ -230,10 +230,10 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
different_subtask_id = "bogus-subtask-id-value"
|
||||
subtask_status = create_subtask_status(different_subtask_id)
|
||||
subtask_status = SubtaskStatus.create(different_subtask_id)
|
||||
bogus_email_id = 1001
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for subtask of instructor task'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())
|
||||
|
||||
def test_send_email_completed_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
@@ -241,14 +241,14 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id, state=SUCCESS)
|
||||
subtask_status = SubtaskStatus.create(subtask_id, state=SUCCESS)
|
||||
update_subtask_status(entry_id, subtask_id, subtask_status)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
new_subtask_status = create_subtask_status(subtask_id)
|
||||
new_subtask_status = SubtaskStatus.create(subtask_id)
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already completed'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())
|
||||
|
||||
def test_send_email_running_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
@@ -256,14 +256,14 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
subtask_status = SubtaskStatus.create(subtask_id)
|
||||
update_subtask_status(entry_id, subtask_id, subtask_status)
|
||||
check_subtask_is_valid(entry_id, subtask_id, subtask_status)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())
|
||||
|
||||
def test_send_email_retried_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
@@ -271,19 +271,19 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=2)
|
||||
subtask_status = SubtaskStatus.create(subtask_id, state=RETRY, retried_nomax=2)
|
||||
update_subtask_status(entry_id, subtask_id, subtask_status)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
# try running with a clean subtask:
|
||||
new_subtask_status = create_subtask_status(subtask_id)
|
||||
new_subtask_status = SubtaskStatus.create(subtask_id)
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())
|
||||
# try again, with a retried subtask with lower count:
|
||||
new_subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=1)
|
||||
new_subtask_status = SubtaskStatus.create(subtask_id, state=RETRY, retried_nomax=1)
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())
|
||||
|
||||
def dont_test_send_email_undefined_email(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
@@ -293,10 +293,10 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
subtask_status = SubtaskStatus.create(subtask_id)
|
||||
bogus_email_id = 1001
|
||||
with self.assertRaises(CourseEmail.DoesNotExist):
|
||||
# we skip the call that updates subtask status, since we've not set up the InstructorTask
|
||||
# for the subtask, and it's not important to the test.
|
||||
with patch('bulk_email.tasks.update_subtask_status'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())
|
||||
|
||||
@@ -31,7 +31,7 @@ from django.core.management import call_command
|
||||
from bulk_email.models import CourseEmail, Optout, SEND_TO_ALL
|
||||
|
||||
from instructor_task.tasks import send_bulk_course_email
|
||||
from instructor_task.subtasks import update_subtask_status
|
||||
from instructor_task.subtasks import update_subtask_status, SubtaskStatus
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.tests.test_base import InstructorTaskCourseTestCase
|
||||
from instructor_task.tests.factories import InstructorTaskFactory
|
||||
@@ -63,16 +63,9 @@ def my_update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status_info = subtask_dict['status']
|
||||
current_subtask_status = subtask_status_info[current_task_id]
|
||||
|
||||
def _get_retry_count(subtask_result):
|
||||
"""Return the number of retries counted for the given subtask."""
|
||||
retry_count = subtask_result.get('retried_nomax', 0)
|
||||
retry_count += subtask_result.get('retried_withmax', 0)
|
||||
return retry_count
|
||||
|
||||
current_retry_count = _get_retry_count(current_subtask_status)
|
||||
new_retry_count = _get_retry_count(new_subtask_status)
|
||||
current_subtask_status = SubtaskStatus.from_dict(subtask_status_info[current_task_id])
|
||||
current_retry_count = current_subtask_status.get_retry_count()
|
||||
new_retry_count = new_subtask_status.get_retry_count()
|
||||
if current_retry_count <= new_retry_count:
|
||||
update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ This module contains celery task functions for handling the management of subtas
|
||||
"""
|
||||
from time import time
|
||||
import json
|
||||
from uuid import uuid4
|
||||
import math
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
from celery.states import SUCCESS, READY_STATES, RETRY
|
||||
@@ -14,7 +16,7 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
|
||||
|
||||
TASK_LOG = get_task_logger(__name__)
|
||||
|
||||
# Lock expiration should be long enough to allow a send_course_email task to complete.
|
||||
# Lock expiration should be long enough to allow a subtask to complete.
|
||||
SUBTASK_LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
|
||||
|
||||
|
||||
@@ -23,11 +25,73 @@ class DuplicateTaskException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
|
||||
def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task):
|
||||
"""
|
||||
Determines number of subtasks that would be generated by _generate_items_for_subtask.
|
||||
|
||||
This needs to be calculated before a query is executed so that the list of all subtasks can be
|
||||
stored in the InstructorTask before any subtasks are started.
|
||||
|
||||
The number of subtask_id values returned by this should match the number of chunks returned
|
||||
by the generate_items_for_subtask generator.
|
||||
"""
|
||||
total_num_tasks = 0
|
||||
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
|
||||
num_items_remaining = total_num_items
|
||||
for _ in range(num_queries):
|
||||
num_items_this_query = min(num_items_remaining, items_per_query)
|
||||
num_items_remaining -= num_items_this_query
|
||||
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
|
||||
total_num_tasks += num_tasks_this_query
|
||||
|
||||
return total_num_tasks
|
||||
|
||||
|
||||
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task):
|
||||
"""
|
||||
Generates a chunk of "items" that should be passed into a subtask.
|
||||
|
||||
Arguments:
|
||||
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
|
||||
`item_fields` : the fields that should be included in the dict that is returned.
|
||||
These are in addition to the 'pk' field.
|
||||
`total_num_items` : the result of item_queryset.count().
|
||||
`items_per_query` : size of chunks to break the query operation into.
|
||||
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
|
||||
|
||||
Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.
|
||||
|
||||
Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.
|
||||
"""
|
||||
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
|
||||
last_pk = item_queryset[0].pk - 1
|
||||
num_items_queued = 0
|
||||
all_item_fields = list(item_fields)
|
||||
all_item_fields.append('pk')
|
||||
for _ in range(num_queries):
|
||||
item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query])
|
||||
last_pk = item_sublist[-1]['pk']
|
||||
num_items_this_query = len(item_sublist)
|
||||
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
|
||||
chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query)))
|
||||
for i in range(num_tasks_this_query):
|
||||
items_for_task = item_sublist[i * chunk:i * chunk + chunk]
|
||||
yield items_for_task
|
||||
|
||||
num_items_queued += num_items_this_query
|
||||
|
||||
# Sanity check: we expect the chunking to be properly summing to the original count:
|
||||
if num_items_queued != total_num_items:
|
||||
error_msg = "Number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items)
|
||||
TASK_LOG.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
|
||||
class SubtaskStatus(object):
|
||||
"""
|
||||
Create and return a dict for tracking the status of a subtask.
|
||||
|
||||
Subtask status keys are:
|
||||
SubtaskStatus values are:
|
||||
|
||||
'task_id' : id of subtask. This is used to pass task information across retries.
|
||||
'attempted' : number of attempts -- should equal succeeded plus failed
|
||||
@@ -40,70 +104,76 @@ def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nom
|
||||
should have a maximum count applied
|
||||
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
|
||||
|
||||
Object must be JSON-serializable, so that it can be passed as an argument
|
||||
to tasks.
|
||||
Object is not JSON-serializable, so to_dict and from_dict methods are provided so that
|
||||
it can be passed as a serializable argument to tasks (and be reconstituted within such tasks).
|
||||
|
||||
In future, we may want to include specific error information
|
||||
indicating the reason for failure.
|
||||
Also, we should count up "not attempted" separately from attempted/failed.
|
||||
"""
|
||||
attempted = succeeded + failed
|
||||
current_result = {
|
||||
'task_id': task_id,
|
||||
'attempted': attempted,
|
||||
'succeeded': succeeded,
|
||||
'skipped': skipped,
|
||||
'failed': failed,
|
||||
'retried_nomax': retried_nomax,
|
||||
'retried_withmax': retried_withmax,
|
||||
'state': state if state is not None else QUEUING,
|
||||
}
|
||||
return current_result
|
||||
|
||||
def __init__(self, task_id, attempted=None, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
|
||||
"""Construct a SubtaskStatus object."""
|
||||
self.task_id = task_id
|
||||
if attempted is not None:
|
||||
self.attempted = attempted
|
||||
else:
|
||||
self.attempted = succeeded + failed
|
||||
self.succeeded = succeeded
|
||||
self.failed = failed
|
||||
self.skipped = skipped
|
||||
self.retried_nomax = retried_nomax
|
||||
self.retried_withmax = retried_withmax
|
||||
self.state = state if state is not None else QUEUING
|
||||
|
||||
def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
|
||||
"""
|
||||
Update the result of a subtask with additional results.
|
||||
@classmethod
|
||||
def from_dict(self, d):
|
||||
"""Construct a SubtaskStatus object from a dict representation."""
|
||||
options = dict(d)
|
||||
task_id = options['task_id']
|
||||
del options['task_id']
|
||||
return SubtaskStatus.create(task_id, **options)
|
||||
|
||||
Create and return a dict for tracking the status of a subtask.
|
||||
@classmethod
|
||||
def create(self, task_id, **options):
|
||||
"""Construct a SubtaskStatus object."""
|
||||
return self(task_id, **options)
|
||||
|
||||
Keys for input `subtask_result` and returned subtask_status are:
|
||||
def to_dict(self):
|
||||
"""
|
||||
Output a dict representation of a SubtaskStatus object.
|
||||
|
||||
'task_id' : id of subtask. This is used to pass task information across retries.
|
||||
'attempted' : number of attempts -- should equal succeeded plus failed
|
||||
'succeeded' : number that succeeded in processing
|
||||
'skipped' : number that were not processed.
|
||||
'failed' : number that failed during processing
|
||||
'retried_nomax' : number of times the subtask has been retried for conditions that
|
||||
should not have a maximum count applied
|
||||
'retried_withmax' : number of times the subtask has been retried for conditions that
|
||||
should have a maximum count applied
|
||||
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
|
||||
Use for creating a JSON-serializable representation for use by tasks.
|
||||
"""
|
||||
return self.__dict__
|
||||
|
||||
Kwarg arguments are incremented to the corresponding key in `subtask_result`.
|
||||
The exception is for `state`, which if specified is used to override the existing value.
|
||||
"""
|
||||
new_result = dict(subtask_result)
|
||||
new_result['attempted'] += (succeeded + failed)
|
||||
new_result['succeeded'] += succeeded
|
||||
new_result['failed'] += failed
|
||||
new_result['skipped'] += skipped
|
||||
new_result['retried_nomax'] += retried_nomax
|
||||
new_result['retried_withmax'] += retried_withmax
|
||||
if state is not None:
|
||||
new_result['state'] = state
|
||||
def increment(self, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
|
||||
"""
|
||||
Update the result of a subtask with additional results.
|
||||
|
||||
return new_result
|
||||
Kwarg arguments are incremented to the existing values.
|
||||
The exception is for `state`, which if specified is used to override the existing value.
|
||||
"""
|
||||
self.attempted += (succeeded + failed)
|
||||
self.succeeded += succeeded
|
||||
self.failed += failed
|
||||
self.skipped += skipped
|
||||
self.retried_nomax += retried_nomax
|
||||
self.retried_withmax += retried_withmax
|
||||
if state is not None:
|
||||
self.state = state
|
||||
|
||||
def get_retry_count(self):
|
||||
"""Returns the number of retries of any kind."""
|
||||
return self.retried_nomax + self.retried_withmax
|
||||
|
||||
def _get_retry_count(subtask_status):
|
||||
"""
|
||||
Calculate the total number of retries.
|
||||
"""
|
||||
count = 0
|
||||
for keyname in ['retried_nomax', 'retried_withmax']:
|
||||
count += subtask_status.get(keyname, 0)
|
||||
return count
|
||||
def __repr__(self):
|
||||
"""Return print representation of a SubtaskStatus object."""
|
||||
return 'SubtaskStatus<%r>' % (self.to_dict(),)
|
||||
|
||||
def __unicode__(self):
|
||||
"""Return unicode version of a SubtaskStatus object representation."""
|
||||
return unicode(repr(self))
|
||||
|
||||
|
||||
def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
@@ -125,7 +195,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
|
||||
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
|
||||
information for each subtask. The value for each subtask (keyed by its task_id)
|
||||
is its subtask status, as defined by create_subtask_status().
|
||||
is its subtask status, as defined by SubtaskStatus.to_dict().
|
||||
|
||||
This information needs to be set up in the InstructorTask before any of the subtasks start
|
||||
running. If not, there is a chance that the subtasks could complete before the parent task
|
||||
@@ -152,7 +222,8 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
# Write out the subtasks information.
|
||||
num_subtasks = len(subtask_id_list)
|
||||
# Note that may not be necessary to store initial value with all those zeroes!
|
||||
subtask_status = {subtask_id: create_subtask_status(subtask_id) for subtask_id in subtask_id_list}
|
||||
# Write out as a dict, so it will go more smoothly into json.
|
||||
subtask_status = {subtask_id: (SubtaskStatus.create(subtask_id)).to_dict() for subtask_id in subtask_id_list}
|
||||
subtask_dict = {
|
||||
'total': num_subtasks,
|
||||
'succeeded': 0,
|
||||
@@ -166,6 +237,70 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
return task_progress
|
||||
|
||||
|
||||
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_query, items_per_task):
|
||||
"""
|
||||
Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.
|
||||
|
||||
Arguments:
|
||||
`entry` : the InstructorTask object for which subtasks are being queued.
|
||||
`action_name` : a past-tense verb that can be used for constructing readable status messages.
|
||||
`create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object.
|
||||
Arguments are the list of items to be processed by this subtask, and a SubtaskStatus
|
||||
object reflecting initial status (and containing the subtask's id).
|
||||
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
|
||||
`item_fields` : the fields that should be included in the dict that is returned.
|
||||
These are in addition to the 'pk' field.
|
||||
`items_per_query` : size of chunks to break the query operation into.
|
||||
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
|
||||
|
||||
Returns: the task progress as stored in the InstructorTask object.
|
||||
|
||||
"""
|
||||
task_id = entry.task_id
|
||||
total_num_items = item_queryset.count()
|
||||
|
||||
# Calculate the number of tasks that will be created, and create a list of ids for each task.
|
||||
total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_query, items_per_task)
|
||||
subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)]
|
||||
|
||||
# Update the InstructorTask with information about the subtasks we've defined.
|
||||
TASK_LOG.info("Task %s: updating InstructorTask %s with subtask info for %s subtasks to process %s items.",
|
||||
task_id, entry.id, total_num_subtasks, total_num_items) # pylint: disable=E1101
|
||||
progress = initialize_subtask_info(entry, action_name, total_num_items, subtask_id_list)
|
||||
|
||||
# Construct a generator that will return the recipients to use for each subtask.
|
||||
# Pass in the desired fields to fetch for each recipient.
|
||||
item_generator = _generate_items_for_subtask(
|
||||
item_queryset,
|
||||
item_fields,
|
||||
total_num_items,
|
||||
items_per_query,
|
||||
items_per_task
|
||||
)
|
||||
|
||||
# Now create the subtasks, and start them running.
|
||||
TASK_LOG.info("Task %s: creating %s subtasks to process %s items.",
|
||||
task_id, total_num_subtasks, total_num_items)
|
||||
num_subtasks = 0
|
||||
for item_list in item_generator:
|
||||
subtask_id = subtask_id_list[num_subtasks]
|
||||
num_subtasks += 1
|
||||
subtask_status = SubtaskStatus.create(subtask_id)
|
||||
new_subtask = create_subtask_fcn(item_list, subtask_status)
|
||||
new_subtask.apply_async()
|
||||
|
||||
# Sanity check: we expect the subtask to be properly summing to the original count:
|
||||
if num_subtasks != len(subtask_id_list):
|
||||
task_id = entry.task_id
|
||||
error_fmt = "Task {}: number of tasks generated {} not equal to original total {}"
|
||||
error_msg = error_fmt.format(task_id, num_subtasks, len(subtask_id_list))
|
||||
TASK_LOG.error(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
# Return the task progress as stored in the InstructorTask object.
|
||||
return progress
|
||||
|
||||
|
||||
def _acquire_subtask_lock(task_id):
|
||||
"""
|
||||
Mark the specified task_id as being in progress.
|
||||
@@ -202,7 +337,7 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
|
||||
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
|
||||
|
||||
Problems can occur when the parent task has been run twice, and results in duplicate
|
||||
subtasks being created for the same InstructorTask entry. This can happen when Celery
|
||||
subtasks being created for the same InstructorTask entry. This maybe happens when Celery
|
||||
loses its connection to its broker, and any current tasks get requeued.
|
||||
|
||||
If a parent task gets requeued, then the same InstructorTask may have a different set of
|
||||
@@ -216,6 +351,9 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
|
||||
The other worker is allowed to finish, and this raises an exception.
|
||||
|
||||
Raises a DuplicateTaskException exception if it's not a task that should be run.
|
||||
|
||||
If this succeeds, it requires that update_subtask_status() is called to release the lock on the
|
||||
task.
|
||||
"""
|
||||
# Confirm that the InstructorTask actually defines subtasks.
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
@@ -236,8 +374,8 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
|
||||
|
||||
# Confirm that the InstructorTask doesn't think that this subtask has already been
|
||||
# performed successfully.
|
||||
subtask_status = subtask_status_info[current_task_id]
|
||||
subtask_state = subtask_status.get('state')
|
||||
subtask_status = SubtaskStatus.from_dict(subtask_status_info[current_task_id])
|
||||
subtask_state = subtask_status.state
|
||||
if subtask_state in READY_STATES:
|
||||
format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}': rejecting task {}"
|
||||
msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
|
||||
@@ -249,8 +387,8 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
|
||||
if subtask_state == RETRY:
|
||||
# Check to see if the input number of retries is less than the recorded number.
|
||||
# If so, then this is an earlier version of the task, and a duplicate.
|
||||
new_retry_count = _get_retry_count(new_subtask_status)
|
||||
current_retry_count = _get_retry_count(subtask_status)
|
||||
new_retry_count = new_subtask_status.get_retry_count()
|
||||
current_retry_count = subtask_status.get_retry_count()
|
||||
if new_retry_count < current_retry_count:
|
||||
format_str = "Unexpected task_id '{}': already retried - status {} for subtask of instructor task '{}': rejecting task {}"
|
||||
msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
|
||||
@@ -292,10 +430,10 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
|
||||
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
|
||||
information for each subtask. At the moment, the value for each subtask (keyed by its task_id)
|
||||
is the value of `status`, but could be expanded in future to store information about failure
|
||||
messages, progress made, etc.
|
||||
is the value of the SubtaskStatus.to_dict(), but could be expanded in future to store information
|
||||
about failure messages, progress made, etc.
|
||||
"""
|
||||
TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
|
||||
TASK_LOG.info("Preparing to update status for subtask %s for instructor task %d with status %s",
|
||||
current_task_id, entry_id, new_subtask_status)
|
||||
|
||||
try:
|
||||
@@ -304,13 +442,13 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
subtask_status_info = subtask_dict['status']
|
||||
if current_task_id not in subtask_status_info:
|
||||
# unexpected error -- raise an exception
|
||||
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
|
||||
format_str = "Unexpected task_id '{}': unable to update status for subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry_id)
|
||||
TASK_LOG.warning(msg)
|
||||
raise ValueError(msg)
|
||||
|
||||
# Update status:
|
||||
subtask_status_info[current_task_id] = new_subtask_status
|
||||
subtask_status_info[current_task_id] = new_subtask_status.to_dict()
|
||||
|
||||
# Update the parent task progress.
|
||||
# Set the estimate of duration, but only if it
|
||||
@@ -326,10 +464,10 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
# In future, we can make this more responsive by updating status
|
||||
# between retries, by comparing counts that change from previous
|
||||
# retry.
|
||||
new_state = new_subtask_status['state']
|
||||
new_state = new_subtask_status.state
|
||||
if new_subtask_status is not None and new_state in READY_STATES:
|
||||
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
|
||||
task_progress[statname] += new_subtask_status[statname]
|
||||
task_progress[statname] += getattr(new_subtask_status, statname)
|
||||
|
||||
# Figure out if we're actually done (i.e. this is the last task to complete).
|
||||
# This is easier if we just maintain a counter, rather than scanning the
|
||||
@@ -349,7 +487,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
entry.subtasks = json.dumps(subtask_dict)
|
||||
entry.task_output = InstructorTask.create_output_for_success(task_progress)
|
||||
|
||||
TASK_LOG.info("Task output updated to %s for email subtask %s of instructor task %d",
|
||||
TASK_LOG.info("Task output updated to %s for subtask %s of instructor task %d",
|
||||
entry.task_output, current_task_id, entry_id)
|
||||
TASK_LOG.debug("about to save....")
|
||||
entry.save()
|
||||
|
||||
Reference in New Issue
Block a user