From ed4b954a53607bd30aa4c96a70a8967fbb8c84cb Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Thu, 24 Oct 2013 17:51:36 -0400 Subject: [PATCH 1/3] Remove the use of celery.group from bulk email subtasks. --- lms/djangoapps/bulk_email/tasks.py | 114 ++++++++------------- lms/djangoapps/instructor_task/subtasks.py | 64 ++++++++++++ 2 files changed, 106 insertions(+), 72 deletions(-) diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 1bdb32d133..5d5313bccb 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -2,11 +2,9 @@ This module contains celery task functions for handling the sending of bulk email to a course. """ -import math import re import random import json -from uuid import uuid4 from time import sleep from dogapi import dog_stats_api @@ -24,7 +22,7 @@ from boto.ses.exceptions import ( ) from boto.exception import AWSConnectionError -from celery import task, current_task, group +from celery import task, current_task from celery.utils.log import get_task_logger from celery.states import SUCCESS, FAILURE, RETRY from celery.exceptions import RetryTaskError @@ -42,9 +40,11 @@ from courseware.access import _course_staff_group_name, _course_instructor_group from courseware.courses import get_course, course_image_url from instructor_task.models import InstructorTask from instructor_task.subtasks import ( - update_subtask_status, + create_subtask_ids, + generate_items_for_subtask, create_subtask_status, increment_subtask_status, + update_subtask_status, initialize_subtask_info, check_subtask_is_valid, ) @@ -152,53 +152,6 @@ def _get_course_email_context(course): return email_context -def _generate_subtasks(create_subtask_fcn, recipient_qset): - """ - Generates a list of subtasks to send email to a given set of recipients. - - Arguments: - `create_subtask_fcn` : a function whose inputs are a list of recipients and a subtask_id - to assign to the new subtask. Returns the subtask that will send email to that - list of recipients. - `recipient_qset` : a query set that defines the recipients who should receive emails. - - Returns: a tuple, containing: - - * A list of subtasks that will send emails to all recipients. - * A list of subtask_ids corresponding to those subtasks. - * A count of the total number of emails being sent. - - """ - total_num_emails = recipient_qset.count() - num_queries = int(math.ceil(float(total_num_emails) / float(settings.BULK_EMAIL_EMAILS_PER_QUERY))) - last_pk = recipient_qset[0].pk - 1 - num_emails_queued = 0 - task_list = [] - subtask_id_list = [] - for _ in range(num_queries): - recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk).values('profile__name', 'email', 'pk')[:settings.BULK_EMAIL_EMAILS_PER_QUERY]) - last_pk = recipient_sublist[-1]['pk'] - num_emails_this_query = len(recipient_sublist) - num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.BULK_EMAIL_EMAILS_PER_TASK))) - chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query))) - for i in range(num_tasks_this_query): - to_list = recipient_sublist[i * chunk:i * chunk + chunk] - subtask_id = str(uuid4()) - subtask_id_list.append(subtask_id) - new_subtask = create_subtask_fcn(to_list, subtask_id) - task_list.append(new_subtask) - - num_emails_queued += num_emails_this_query - - # Sanity check: we expect the chunking to be properly summing to the original count: - if num_emails_queued != total_num_emails: - error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(num_emails_queued, total_num_emails) - log.error(error_msg) - raise ValueError(error_msg) - - return task_list, subtask_id_list, total_num_emails - - def perform_delegate_email_batches(entry_id, course_id, task_input, action_name): """ Delegates emails by querying for the list of recipients who should @@ -252,42 +205,59 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) log.exception("Task %s: course not found: %s", task_id, course_id) raise + # Get arguments that will be passed to every subtask. to_option = email_obj.to_option - recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) global_email_context = _get_course_email_context(course) - def _create_send_email_subtask(to_list, subtask_id): - """Creates a subtask to send email to a given recipient list.""" + # Figure out the number of needed subtasks, getting id values to use for each. + recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) + total_num_emails = recipient_qset.count() + subtask_id_list = create_subtask_ids(total_num_emails, settings.BULK_EMAIL_EMAILS_PER_QUERY, settings.BULK_EMAIL_EMAILS_PER_TASK) + + # Update the InstructorTask with information about the subtasks we've defined. + log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s", + task_id, total_num_emails, course_id, email_id, to_option) + progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list) + + # Construct a generator that will return the recipients to use for each subtask. + # Pass in the desired fields to fetch for each recipient. + recipient_fields = ['profile__name', 'email'] + recipient_generator = generate_items_for_subtask( + recipient_qset, + recipient_fields, + total_num_emails, + settings.BULK_EMAIL_EMAILS_PER_QUERY, + settings.BULK_EMAIL_EMAILS_PER_TASK, + ) + + # Now create the subtasks, and start them running. This allows all the subtasks + # in the list to be submitted at the same time. + num_subtasks = len(subtask_id_list) + log.info("Task %s: Preparing to generate and queue %s subtasks for course %s, email %s, to_option %s", + task_id, num_subtasks, course_id, email_id, to_option) + num_subtasks = 0 + for recipient_list in recipient_generator: + subtask_id = subtask_id_list[num_subtasks] + num_subtasks += 1 subtask_status = create_subtask_status(subtask_id) new_subtask = send_course_email.subtask( ( entry_id, email_id, - to_list, + recipient_list, global_email_context, subtask_status, ), task_id=subtask_id, routing_key=settings.BULK_EMAIL_ROUTING_KEY, ) - return new_subtask + new_subtask.apply_async() - log.info("Task %s: Preparing to generate subtasks for course %s, email %s, to_option %s", - task_id, course_id, email_id, to_option) - task_list, subtask_id_list, total_num_emails = _generate_subtasks(_create_send_email_subtask, recipient_qset) - - # Update the InstructorTask with information about the subtasks we've defined. - log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s", - task_id, total_num_emails, course_id, email_id, to_option) - progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list) - num_subtasks = len(subtask_id_list) - - # Now group the subtasks, and start them running. This allows all the subtasks - # in the list to be submitted at the same time. - log.info("Task %s: Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s", - task_id, num_subtasks, total_num_emails, course_id, email_id, to_option) - task_group = group(task_list) - task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY) + # Sanity check: we expect the subtask to be properly summing to the original count: + if num_subtasks != len(subtask_id_list): + error_msg = "Task {}: number of tasks generated {} not equal to original total {}".format(task_id, num_subtasks, len(subtask_id_list)) + log.error(error_msg) + raise ValueError(error_msg) # We want to return progress here, as this is what will be stored in the # AsyncResult for the parent task as its return value. diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 68bd24a1f8..350c09ab96 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -3,6 +3,8 @@ This module contains celery task functions for handling the management of subtas """ from time import time import json +from uuid import uuid4 +import math from celery.utils.log import get_task_logger from celery.states import SUCCESS, READY_STATES, RETRY @@ -23,6 +25,68 @@ class DuplicateTaskException(Exception): pass +def create_subtask_ids(total_num_items, items_per_query, items_per_task): + """ + Determines number of subtasks that need to be generated, and provides a list of id values to use. + + This needs to be calculated before a query is executed so that the list of all subtasks can be + stored in the InstructorTask before any subtasks are started. + + The number of subtask_id values returned by this should match the number of chunks returned + by the generate_items_for_subtask generator. + """ + total_num_tasks = 0 + num_queries = int(math.ceil(float(total_num_items) / float(items_per_query))) + num_items_remaining = total_num_items + for _ in range(num_queries): + num_items_this_query = min(num_items_remaining, items_per_query) + num_items_remaining -= num_items_this_query + num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task))) + total_num_tasks += num_tasks_this_query + + # Now that the number of tasks is known, return a list of ids for each task. + return [str(uuid4()) for _ in range(total_num_tasks)] + + +def generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task): + """ + Generates a chunk of "items" that should be passed into a subtask. + + Arguments: + `item_queryset` : a query set that defines the "items" that should be passed to subtasks. + `item_fields` : the fields that should be included in the dict that is returned. + These are in addition to the 'pk' field. + `total_num_items` : the result of item_queryset.count(). + `items_per_query` : size of chunks to break the query operation into. + `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. + + Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field. + + """ + num_queries = int(math.ceil(float(total_num_items) / float(items_per_query))) + last_pk = item_queryset[0].pk - 1 + num_items_queued = 0 + all_item_fields = list(item_fields) + all_item_fields.append('pk') + for _ in range(num_queries): + item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query]) + last_pk = item_sublist[-1]['pk'] + num_items_this_query = len(item_sublist) + num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task))) + chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query))) + for i in range(num_tasks_this_query): + items_for_task = item_sublist[i * chunk:i * chunk + chunk] + yield items_for_task + + num_items_queued += num_items_this_query + + # Sanity check: we expect the chunking to be properly summing to the original count: + if num_items_queued != total_num_items: + error_msg = "Task {}: number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items) + TASK_LOG.error(error_msg) + raise ValueError(error_msg) + + def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): """ Create and return a dict for tracking the status of a subtask. From 0f8f82c84515732b5b6aeaece58041dcfde22bc6 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Fri, 25 Oct 2013 18:15:24 -0400 Subject: [PATCH 2/3] Define and use SubtaskStatus class. --- lms/djangoapps/bulk_email/tasks.py | 123 +++++----------- lms/djangoapps/bulk_email/tests/test_email.py | 16 +- .../bulk_email/tests/test_err_handling.py | 46 +++--- lms/djangoapps/bulk_email/tests/test_tasks.py | 15 +- lms/djangoapps/instructor_task/subtasks.py | 139 ++++++++++-------- 5 files changed, 147 insertions(+), 192 deletions(-) diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 5d5313bccb..0a906acfda 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -42,8 +42,7 @@ from instructor_task.models import InstructorTask from instructor_task.subtasks import ( create_subtask_ids, generate_items_for_subtask, - create_subtask_status, - increment_subtask_status, + SubtaskStatus, update_subtask_status, initialize_subtask_info, check_subtask_is_valid, @@ -239,14 +238,14 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) for recipient_list in recipient_generator: subtask_id = subtask_id_list[num_subtasks] num_subtasks += 1 - subtask_status = create_subtask_status(subtask_id) + subtask_status_dict = SubtaskStatus.create(subtask_id).to_dict() new_subtask = send_course_email.subtask( ( entry_id, email_id, recipient_list, global_email_context, - subtask_status, + subtask_status_dict, ), task_id=subtask_id, routing_key=settings.BULK_EMAIL_ROUTING_KEY, @@ -268,7 +267,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) @task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES) # pylint: disable=E1102 -def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status): +def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status_dict): """ Sends an email to a list of recipients. @@ -282,7 +281,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask * `global_email_context`: dict containing values that are unique for this email but the same for all recipients of this email. This dict is to be used to fill in slots in email template. It does not include 'name' and 'email', which will be provided by the to_list. - * `subtask_status` : dict containing values representing current status. Keys are: + * `subtask_status_dict` : dict containing values representing current status. Keys are: 'task_id' : id of subtask. This is used to pass task information across retries. 'attempted' : number of attempts -- should equal succeeded plus failed @@ -302,7 +301,8 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask Emails are sent multi-part, in both plain text and html. Updates InstructorTask object with status information (sends, failures, skips) and updates number of subtasks completed. """ - current_task_id = subtask_status['task_id'] + subtask_status = SubtaskStatus.from_dict(subtask_status_dict) + current_task_id = subtask_status.task_id num_to_send = len(to_list) log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s", email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status) @@ -336,8 +336,8 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask # We got here for really unexpected reasons. Since we don't know how far # the task got in emailing, we count all recipients as having failed. # It at least keeps the counts consistent. - new_subtask_status = increment_subtask_status(subtask_status, failed=num_to_send, state=FAILURE) - update_subtask_status(entry_id, current_task_id, new_subtask_status) + subtask_status.increment(failed=num_to_send, state=FAILURE) + update_subtask_status(entry_id, current_task_id, subtask_status) raise if send_exception is None: @@ -419,37 +419,20 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas * `global_email_context`: dict containing values that are unique for this email but the same for all recipients of this email. This dict is to be used to fill in slots in email template. It does not include 'name' and 'email', which will be provided by the to_list. - * `subtask_status` : dict containing values representing current status. Keys are: - - 'task_id' : id of subtask. This is used to pass task information across retries. - 'attempted' : number of attempts -- should equal succeeded plus failed - 'succeeded' : number that succeeded in processing - 'skipped' : number that were not processed. - 'failed' : number that failed during processing - 'retried_nomax' : number of times the subtask has been retried for conditions that - should not have a maximum count applied - 'retried_withmax' : number of times the subtask has been retried for conditions that - should have a maximum count applied - 'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS) + * `subtask_status` : object of class SubtaskStatus representing current status. Sends to all addresses contained in to_list that are not also in the Optout table. Emails are sent multi-part, in both plain text and html. Returns a tuple of two values: - * First value is a dict which represents current progress at the end of this call. Keys are - the same as for the input subtask_status. + * First value is a SubtaskStatus object which represents current progress at the end of this call. * Second value is an exception returned by the innards of the method, indicating a fatal error. In this case, the number of recipients that were not sent have already been added to the 'failed' count above. """ # Get information from current task's request: - task_id = subtask_status['task_id'] - - # collect stats on progress: - num_optout = 0 - num_sent = 0 - num_error = 0 + task_id = subtask_status.task_id try: course_email = CourseEmail.objects.get(id=email_id) @@ -463,8 +446,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # attempt. Anyone on the to_list on a retry has already passed the filter # that existed at that time, and we don't need to keep checking for changes # in the Optout list. - if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0: + if subtask_status.get_retry_count() == 0: to_list, num_optout = _filter_optouts_from_recipients(to_list, course_email.course_id) + subtask_status.increment(skipped=num_optout) course_title = global_email_context['course_title'] subject = "[" + course_title + "] " + course_email.subject @@ -509,7 +493,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # for a period of time between all emails within this task. Choice of # the value depends on the number of workers that might be sending email in # parallel, and what the SES throttle rate is. - if subtask_status['retried_nomax'] > 0: + if subtask_status.retried_nomax > 0: sleep(settings.BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS) try: @@ -527,13 +511,13 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # This will fall through and not retry the message. log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc.smtp_error) dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) - num_error += 1 + subtask_status.increment(failed=1) except SINGLE_EMAIL_FAILURE_ERRORS as exc: # This will fall through and not retry the message. log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc) dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)]) - num_error += 1 + subtask_status.increment(failed=1) else: dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)]) @@ -541,7 +525,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas log.info('Email with id %s sent to %s', email_id, email) else: log.debug('Email with id %s sent to %s', email_id, email) - num_sent += 1 + subtask_status.increment(succeeded=1) # Pop the user that was emailed off the end of the list only once they have # successfully been processed. (That way, if there were a failure that @@ -552,16 +536,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)]) # Increment the "retried_nomax" counter, update other counters with progress to date, # and set the state to RETRY: - subtask_progress = increment_subtask_status( - subtask_status, - succeeded=num_sent, - failed=num_error, - skipped=num_optout, - retried_nomax=1, - state=RETRY - ) + subtask_status.increment(retried_nomax=1, state=RETRY) return _submit_for_retry( - entry_id, email_id, to_list, global_email_context, exc, subtask_progress, skip_retry_max=True + entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=True ) except LIMITED_RETRY_ERRORS as exc: @@ -571,16 +548,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)]) # Increment the "retried_withmax" counter, update other counters with progress to date, # and set the state to RETRY: - subtask_progress = increment_subtask_status( - subtask_status, - succeeded=num_sent, - failed=num_error, - skipped=num_optout, - retried_withmax=1, - state=RETRY - ) + subtask_status.increment(retried_withmax=1, state=RETRY) return _submit_for_retry( - entry_id, email_id, to_list, global_email_context, exc, subtask_progress, skip_retry_max=False + entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False ) except BULK_EMAIL_FAILURE_ERRORS as exc: @@ -590,14 +560,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas task_id, email_id, num_pending) # Update counters with progress to date, counting unsent emails as failures, # and set the state to FAILURE: - subtask_progress = increment_subtask_status( - subtask_status, - succeeded=num_sent, - failed=(num_error + num_pending), - skipped=num_optout, - state=FAILURE - ) - return subtask_progress, exc + subtask_status.increment(failed=num_pending, state=FAILURE) + return subtask_status, exc except Exception as exc: # Errors caught here cause the email to be retried. The entire task is actually retried @@ -609,30 +573,17 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas task_id, email_id) # Increment the "retried_withmax" counter, update other counters with progress to date, # and set the state to RETRY: - subtask_progress = increment_subtask_status( - subtask_status, - succeeded=num_sent, - failed=num_error, - skipped=num_optout, - retried_withmax=1, - state=RETRY - ) + subtask_status.increment(retried_withmax=1, state=RETRY) return _submit_for_retry( - entry_id, email_id, to_list, global_email_context, exc, subtask_progress, skip_retry_max=False + entry_id, email_id, to_list, global_email_context, exc, subtask_status, skip_retry_max=False ) else: # All went well. Update counters with progress to date, # and set the state to SUCCESS: - subtask_progress = increment_subtask_status( - subtask_status, - succeeded=num_sent, - failed=num_error, - skipped=num_optout, - state=SUCCESS - ) + subtask_status.increment(state=SUCCESS) # Successful completion is marked by an exception value of None. - return subtask_progress, None + return subtask_status, None finally: # Clean up at the end. connection.close() @@ -678,26 +629,26 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current successfully submitted, this value will be the RetryTaskError that retry() returns. Otherwise, it (ought to be) the current_exception passed in. """ - task_id = subtask_status['task_id'] + task_id = subtask_status.task_id log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)", - task_id, subtask_status['succeeded'], subtask_status['failed'], subtask_status['skipped']) + task_id, subtask_status.succeeded, subtask_status.failed, subtask_status.skipped) # Calculate time until we retry this task (in seconds): # The value for max_retries is increased by the number of times an "infinite-retry" exception # has been retried. We want the regular retries to trigger max-retry checking, but not these # special retries. So we count them separately. - max_retries = _get_current_task().max_retries + subtask_status['retried_nomax'] + max_retries = _get_current_task().max_retries + subtask_status.retried_nomax base_delay = _get_current_task().default_retry_delay if skip_retry_max: # once we reach five retries, don't increase the countdown further. - retry_index = min(subtask_status['retried_nomax'], 5) + retry_index = min(subtask_status.retried_nomax, 5) exception_type = 'sending-rate' # if we have a cap, after all, apply it now: if hasattr(settings, 'BULK_EMAIL_INFINITE_RETRY_CAP'): - retry_cap = settings.BULK_EMAIL_INFINITE_RETRY_CAP + subtask_status['retried_withmax'] + retry_cap = settings.BULK_EMAIL_INFINITE_RETRY_CAP + subtask_status.retried_withmax max_retries = min(max_retries, retry_cap) else: - retry_index = subtask_status['retried_withmax'] + retry_index = subtask_status.retried_withmax exception_type = 'transient' # Skew the new countdown value by a random factor, so that not all @@ -722,7 +673,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current email_id, to_list, global_email_context, - subtask_status, + subtask_status.to_dict(), ], exc=current_exception, countdown=countdown, @@ -743,8 +694,8 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s', task_id, email_id, [i['email'] for i in to_list]) num_failed = len(to_list) - new_subtask_progress = increment_subtask_status(subtask_status, failed=num_failed, state=FAILURE) - return new_subtask_progress, retry_exc + subtask_status.increment(subtask_status, failed=num_failed, state=FAILURE) + return subtask_status, retry_exc def _statsd_tag(course_title): diff --git a/lms/djangoapps/bulk_email/tests/test_email.py b/lms/djangoapps/bulk_email/tests/test_email.py index 9da787dbaa..ffb700fbc9 100644 --- a/lms/djangoapps/bulk_email/tests/test_email.py +++ b/lms/djangoapps/bulk_email/tests/test_email.py @@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory from bulk_email.models import Optout -from instructor_task.subtasks import increment_subtask_status +from instructor_task.subtasks import update_subtask_status STAFF_COUNT = 3 STUDENT_COUNT = 10 @@ -29,13 +29,13 @@ class MockCourseEmailResult(object): """ emails_sent = 0 - def get_mock_increment_subtask_status(self): + def get_mock_update_subtask_status(self): """Wrapper for mock email function.""" - def mock_increment_subtask_status(original_status, **kwargs): # pylint: disable=W0613 + def mock_update_subtask_status(entry_id, current_task_id, new_subtask_status): # pylint: disable=W0613 """Increments count of number of emails sent.""" - self.emails_sent += kwargs.get('succeeded', 0) - return increment_subtask_status(original_status, **kwargs) - return mock_increment_subtask_status + self.emails_sent += new_subtask_status.succeeded + return update_subtask_status(entry_id, current_task_id, new_subtask_status) + return mock_update_subtask_status @override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE) @@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase): ) @override_settings(BULK_EMAIL_EMAILS_PER_TASK=3, BULK_EMAIL_EMAILS_PER_QUERY=7) - @patch('bulk_email.tasks.increment_subtask_status') + @patch('bulk_email.tasks.update_subtask_status') def test_chunked_queries_send_numerous_emails(self, email_mock): """ Test sending a large number of emails, to test the chunked querying """ mock_factory = MockCourseEmailResult() - email_mock.side_effect = mock_factory.get_mock_increment_subtask_status() + email_mock.side_effect = mock_factory.get_mock_update_subtask_status() added_users = [] for _ in xrange(LARGE_NUM_EMAILS): user = UserFactory() diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index 22aabeaa02..d2c3a394f5 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -22,8 +22,8 @@ from bulk_email.models import CourseEmail, SEND_TO_ALL from bulk_email.tasks import perform_delegate_email_batches, send_course_email from instructor_task.models import InstructorTask from instructor_task.subtasks import ( - create_subtask_status, initialize_subtask_info, + SubtaskStatus, check_subtask_is_valid, update_subtask_status, DuplicateTaskException, @@ -75,7 +75,7 @@ class TestEmailErrors(ModuleStoreTestCase): self.assertIsInstance(exc, SMTPDataError) @patch('bulk_email.tasks.get_connection', autospec=True) - @patch('bulk_email.tasks.increment_subtask_status') + @patch('bulk_email.tasks.update_subtask_status') @patch('bulk_email.tasks.send_course_email.retry') def test_data_err_fail(self, retry, result, get_conn): """ @@ -99,11 +99,11 @@ class TestEmailErrors(ModuleStoreTestCase): # We shouldn't retry when hitting a 5xx error self.assertFalse(retry.called) # Test that after the rejected email, the rest still successfully send - ((_initial_results), kwargs) = result.call_args - self.assertEquals(kwargs['skipped'], 0) + ((_entry_id, _current_task_id, subtask_status), _kwargs) = result.call_args + self.assertEquals(subtask_status.skipped, 0) expected_fails = int((settings.BULK_EMAIL_EMAILS_PER_TASK + 3) / 4.0) - self.assertEquals(kwargs['failed'], expected_fails) - self.assertEquals(kwargs['succeeded'], settings.BULK_EMAIL_EMAILS_PER_TASK - expected_fails) + self.assertEquals(subtask_status.failed, expected_fails) + self.assertEquals(subtask_status.succeeded, settings.BULK_EMAIL_EMAILS_PER_TASK - expected_fails) @patch('bulk_email.tasks.get_connection', autospec=True) @patch('bulk_email.tasks.send_course_email.retry') @@ -146,7 +146,7 @@ class TestEmailErrors(ModuleStoreTestCase): exc = kwargs['exc'] self.assertIsInstance(exc, SMTPConnectError) - @patch('bulk_email.tasks.increment_subtask_status') + @patch('bulk_email.tasks.SubtaskStatus.increment') @patch('bulk_email.tasks.log') def test_nonexistent_email(self, mock_log, result): """ @@ -216,10 +216,10 @@ class TestEmailErrors(ModuleStoreTestCase): to_list = ['test@test.com'] global_email_context = {'course_title': 'dummy course'} subtask_id = "subtask-id-value" - subtask_status = create_subtask_status(subtask_id) + subtask_status = SubtaskStatus.create(subtask_id) email_id = 1001 with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find subtasks of instructor task'): - send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status) + send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status.to_dict()) def test_send_email_missing_subtask(self): # test at a lower level, to ensure that the course gets checked down below too. @@ -230,10 +230,10 @@ class TestEmailErrors(ModuleStoreTestCase): subtask_id = "subtask-id-value" initialize_subtask_info(entry, "emailed", 100, [subtask_id]) different_subtask_id = "bogus-subtask-id-value" - subtask_status = create_subtask_status(different_subtask_id) + subtask_status = SubtaskStatus.create(different_subtask_id) bogus_email_id = 1001 with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for subtask of instructor task'): - send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status) + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict()) def test_send_email_completed_subtask(self): # test at a lower level, to ensure that the course gets checked down below too. @@ -241,14 +241,14 @@ class TestEmailErrors(ModuleStoreTestCase): entry_id = entry.id # pylint: disable=E1101 subtask_id = "subtask-id-value" initialize_subtask_info(entry, "emailed", 100, [subtask_id]) - subtask_status = create_subtask_status(subtask_id, state=SUCCESS) + subtask_status = SubtaskStatus.create(subtask_id, state=SUCCESS) update_subtask_status(entry_id, subtask_id, subtask_status) bogus_email_id = 1001 to_list = ['test@test.com'] global_email_context = {'course_title': 'dummy course'} - new_subtask_status = create_subtask_status(subtask_id) + new_subtask_status = SubtaskStatus.create(subtask_id) with self.assertRaisesRegexp(DuplicateTaskException, 'already completed'): - send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status) + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict()) def test_send_email_running_subtask(self): # test at a lower level, to ensure that the course gets checked down below too. @@ -256,14 +256,14 @@ class TestEmailErrors(ModuleStoreTestCase): entry_id = entry.id # pylint: disable=E1101 subtask_id = "subtask-id-value" initialize_subtask_info(entry, "emailed", 100, [subtask_id]) - subtask_status = create_subtask_status(subtask_id) + subtask_status = SubtaskStatus.create(subtask_id) update_subtask_status(entry_id, subtask_id, subtask_status) check_subtask_is_valid(entry_id, subtask_id, subtask_status) bogus_email_id = 1001 to_list = ['test@test.com'] global_email_context = {'course_title': 'dummy course'} with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'): - send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status) + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict()) def test_send_email_retried_subtask(self): # test at a lower level, to ensure that the course gets checked down below too. @@ -271,19 +271,19 @@ class TestEmailErrors(ModuleStoreTestCase): entry_id = entry.id # pylint: disable=E1101 subtask_id = "subtask-id-value" initialize_subtask_info(entry, "emailed", 100, [subtask_id]) - subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=2) + subtask_status = SubtaskStatus.create(subtask_id, state=RETRY, retried_nomax=2) update_subtask_status(entry_id, subtask_id, subtask_status) bogus_email_id = 1001 to_list = ['test@test.com'] global_email_context = {'course_title': 'dummy course'} # try running with a clean subtask: - new_subtask_status = create_subtask_status(subtask_id) + new_subtask_status = SubtaskStatus.create(subtask_id) with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'): - send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status) + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict()) # try again, with a retried subtask with lower count: - new_subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=1) + new_subtask_status = SubtaskStatus.create(subtask_id, state=RETRY, retried_nomax=1) with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'): - send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status) + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict()) def dont_test_send_email_undefined_email(self): # test at a lower level, to ensure that the course gets checked down below too. @@ -293,10 +293,10 @@ class TestEmailErrors(ModuleStoreTestCase): global_email_context = {'course_title': 'dummy course'} subtask_id = "subtask-id-value" initialize_subtask_info(entry, "emailed", 100, [subtask_id]) - subtask_status = create_subtask_status(subtask_id) + subtask_status = SubtaskStatus.create(subtask_id) bogus_email_id = 1001 with self.assertRaises(CourseEmail.DoesNotExist): # we skip the call that updates subtask status, since we've not set up the InstructorTask # for the subtask, and it's not important to the test. with patch('bulk_email.tasks.update_subtask_status'): - send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status) + send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict()) diff --git a/lms/djangoapps/bulk_email/tests/test_tasks.py b/lms/djangoapps/bulk_email/tests/test_tasks.py index 192903c4a1..b964f1cb23 100644 --- a/lms/djangoapps/bulk_email/tests/test_tasks.py +++ b/lms/djangoapps/bulk_email/tests/test_tasks.py @@ -31,7 +31,7 @@ from django.core.management import call_command from bulk_email.models import CourseEmail, Optout, SEND_TO_ALL from instructor_task.tasks import send_bulk_course_email -from instructor_task.subtasks import update_subtask_status +from instructor_task.subtasks import update_subtask_status, SubtaskStatus from instructor_task.models import InstructorTask from instructor_task.tests.test_base import InstructorTaskCourseTestCase from instructor_task.tests.factories import InstructorTaskFactory @@ -63,16 +63,9 @@ def my_update_subtask_status(entry_id, current_task_id, new_subtask_status): entry = InstructorTask.objects.get(pk=entry_id) subtask_dict = json.loads(entry.subtasks) subtask_status_info = subtask_dict['status'] - current_subtask_status = subtask_status_info[current_task_id] - - def _get_retry_count(subtask_result): - """Return the number of retries counted for the given subtask.""" - retry_count = subtask_result.get('retried_nomax', 0) - retry_count += subtask_result.get('retried_withmax', 0) - return retry_count - - current_retry_count = _get_retry_count(current_subtask_status) - new_retry_count = _get_retry_count(new_subtask_status) + current_subtask_status = SubtaskStatus.from_dict(subtask_status_info[current_task_id]) + current_retry_count = current_subtask_status.get_retry_count() + new_retry_count = new_subtask_status.get_retry_count() if current_retry_count <= new_retry_count: update_subtask_status(entry_id, current_task_id, new_subtask_status) diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 350c09ab96..a3de9e505d 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -87,11 +87,11 @@ def generate_items_for_subtask(item_queryset, item_fields, total_num_items, item raise ValueError(error_msg) -def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): +class SubtaskStatus(object): """ Create and return a dict for tracking the status of a subtask. - Subtask status keys are: + SubtaskStatus values are: 'task_id' : id of subtask. This is used to pass task information across retries. 'attempted' : number of attempts -- should equal succeeded plus failed @@ -104,70 +104,77 @@ def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nom should have a maximum count applied 'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS) - Object must be JSON-serializable, so that it can be passed as an argument - to tasks. + Object is not JSON-serializable, so to_dict and from_dict methods are provided so that + it can be passed as a serializable argument to tasks. In future, we may want to include specific error information indicating the reason for failure. Also, we should count up "not attempted" separately from attempted/failed. """ - attempted = succeeded + failed - current_result = { - 'task_id': task_id, - 'attempted': attempted, - 'succeeded': succeeded, - 'skipped': skipped, - 'failed': failed, - 'retried_nomax': retried_nomax, - 'retried_withmax': retried_withmax, - 'state': state if state is not None else QUEUING, - } - return current_result + def __init__(self, task_id, attempted=None, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): + """Construct a SubtaskStatus object.""" + self.task_id = task_id + if attempted is not None: + self.attempted = attempted + else: + self.attempted = succeeded + failed + self.succeeded = succeeded + self.failed = failed + self.skipped = skipped + self.retried_nomax = retried_nomax + self.retried_withmax = retried_withmax + self.state = state if state is not None else QUEUING -def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): - """ - Update the result of a subtask with additional results. + @classmethod + def from_dict(self, d): + """Construct a SubtaskStatus object from a dict representation.""" + options = dict(d) + task_id = options['task_id'] + del options['task_id'] + return SubtaskStatus.create(task_id, **options) - Create and return a dict for tracking the status of a subtask. + @classmethod + def create(self, task_id, **options): + """Construct a SubtaskStatus object.""" + newobj = self(task_id, **options) + return newobj - Keys for input `subtask_result` and returned subtask_status are: + def to_dict(self): + """ + Output a dict representation of a SubtaskStatus object. - 'task_id' : id of subtask. This is used to pass task information across retries. - 'attempted' : number of attempts -- should equal succeeded plus failed - 'succeeded' : number that succeeded in processing - 'skipped' : number that were not processed. - 'failed' : number that failed during processing - 'retried_nomax' : number of times the subtask has been retried for conditions that - should not have a maximum count applied - 'retried_withmax' : number of times the subtask has been retried for conditions that - should have a maximum count applied - 'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS) + Use for creating a JSON-serializable representation for use by tasks. + """ + return self.__dict__ - Kwarg arguments are incremented to the corresponding key in `subtask_result`. - The exception is for `state`, which if specified is used to override the existing value. - """ - new_result = dict(subtask_result) - new_result['attempted'] += (succeeded + failed) - new_result['succeeded'] += succeeded - new_result['failed'] += failed - new_result['skipped'] += skipped - new_result['retried_nomax'] += retried_nomax - new_result['retried_withmax'] += retried_withmax - if state is not None: - new_result['state'] = state + def increment(self, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): + """ + Update the result of a subtask with additional results. - return new_result + Kwarg arguments are incremented to the existing values. + The exception is for `state`, which if specified is used to override the existing value. + """ + self.attempted += (succeeded + failed) + self.succeeded += succeeded + self.failed += failed + self.skipped += skipped + self.retried_nomax += retried_nomax + self.retried_withmax += retried_withmax + if state is not None: + self.state = state + def get_retry_count(self): + """Returns the number of retries of any kind.""" + return self.retried_nomax + self.retried_withmax -def _get_retry_count(subtask_status): - """ - Calculate the total number of retries. - """ - count = 0 - for keyname in ['retried_nomax', 'retried_withmax']: - count += subtask_status.get(keyname, 0) - return count + def __repr__(self): + """Return print representation of a SubtaskStatus object.""" + return 'SubtaskStatus<%r>' % (self.to_dict(),) + + def __unicode__(self): + """Return unicode version of a SubtaskStatus object representation.""" + return unicode(repr(self)) def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): @@ -189,7 +196,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): The "subtasks" field also contains a 'status' key, that contains a dict that stores status information for each subtask. The value for each subtask (keyed by its task_id) - is its subtask status, as defined by create_subtask_status(). + is its subtask status, as defined by SubtaskStatus.to_dict(). This information needs to be set up in the InstructorTask before any of the subtasks start running. If not, there is a chance that the subtasks could complete before the parent task @@ -216,7 +223,8 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): # Write out the subtasks information. num_subtasks = len(subtask_id_list) # Note that may not be necessary to store initial value with all those zeroes! - subtask_status = {subtask_id: create_subtask_status(subtask_id) for subtask_id in subtask_id_list} + # Write out as a dict, so it will go more smoothly into json. + subtask_status = {subtask_id: (SubtaskStatus.create(subtask_id)).to_dict() for subtask_id in subtask_id_list} subtask_dict = { 'total': num_subtasks, 'succeeded': 0, @@ -266,7 +274,7 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status): Confirms that the current subtask is known to the InstructorTask and hasn't already been completed. Problems can occur when the parent task has been run twice, and results in duplicate - subtasks being created for the same InstructorTask entry. This can happen when Celery + subtasks being created for the same InstructorTask entry. This maybe happens when Celery loses its connection to its broker, and any current tasks get requeued. If a parent task gets requeued, then the same InstructorTask may have a different set of @@ -280,6 +288,9 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status): The other worker is allowed to finish, and this raises an exception. Raises a DuplicateTaskException exception if it's not a task that should be run. + + If this succeeds, it requires that update_subtask_status() is called to release the lock on the + task. """ # Confirm that the InstructorTask actually defines subtasks. entry = InstructorTask.objects.get(pk=entry_id) @@ -300,8 +311,8 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status): # Confirm that the InstructorTask doesn't think that this subtask has already been # performed successfully. - subtask_status = subtask_status_info[current_task_id] - subtask_state = subtask_status.get('state') + subtask_status = SubtaskStatus.from_dict(subtask_status_info[current_task_id]) + subtask_state = subtask_status.state if subtask_state in READY_STATES: format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}': rejecting task {}" msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status) @@ -313,8 +324,8 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status): if subtask_state == RETRY: # Check to see if the input number of retries is less than the recorded number. # If so, then this is an earlier version of the task, and a duplicate. - new_retry_count = _get_retry_count(new_subtask_status) - current_retry_count = _get_retry_count(subtask_status) + new_retry_count = new_subtask_status.get_retry_count() + current_retry_count = subtask_status.get_retry_count() if new_retry_count < current_retry_count: format_str = "Unexpected task_id '{}': already retried - status {} for subtask of instructor task '{}': rejecting task {}" msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status) @@ -356,8 +367,8 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status): The "subtasks" field also contains a 'status' key, that contains a dict that stores status information for each subtask. At the moment, the value for each subtask (keyed by its task_id) - is the value of `status`, but could be expanded in future to store information about failure - messages, progress made, etc. + is the value of the SubtaskStatus.to_dict(), but could be expanded in future to store information + about failure messages, progress made, etc. """ TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s", current_task_id, entry_id, new_subtask_status) @@ -374,7 +385,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status): raise ValueError(msg) # Update status: - subtask_status_info[current_task_id] = new_subtask_status + subtask_status_info[current_task_id] = new_subtask_status.to_dict() # Update the parent task progress. # Set the estimate of duration, but only if it @@ -390,10 +401,10 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status): # In future, we can make this more responsive by updating status # between retries, by comparing counts that change from previous # retry. - new_state = new_subtask_status['state'] + new_state = new_subtask_status.state if new_subtask_status is not None and new_state in READY_STATES: for statname in ['attempted', 'succeeded', 'failed', 'skipped']: - task_progress[statname] += new_subtask_status[statname] + task_progress[statname] += getattr(new_subtask_status, statname) # Figure out if we're actually done (i.e. this is the last task to complete). # This is easier if we just maintain a counter, rather than scanning the From 5b48ed840b0284ab61fcc6608eda266332db4006 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Sun, 27 Oct 2013 03:46:45 -0400 Subject: [PATCH 3/3] Refactor subtask creation logic to be less email-specific. --- CHANGELOG.rst | 3 + lms/djangoapps/bulk_email/tasks.py | 85 ++++++++------------- lms/djangoapps/instructor_task/subtasks.py | 89 ++++++++++++++++++---- 3 files changed, 111 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a4b5d31fea..14a59a6413 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,9 @@ These are notable changes in edx-platform. This is a rolling list of changes, in roughly chronological order, most recent first. Add your entries at or near the top. Include a label indicating the component affected. +LMS: Change bulk email implementation to use less memory, and to better handle +duplicate tasks in celery. + LMS: Improve forum error handling so that errors in the logs are clearer and HTTP status codes from the comments service indicating client error are correctly passed through to the client. diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 0a906acfda..68f5124003 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -40,12 +40,10 @@ from courseware.access import _course_staff_group_name, _course_instructor_group from courseware.courses import get_course, course_image_url from instructor_task.models import InstructorTask from instructor_task.subtasks import ( - create_subtask_ids, - generate_items_for_subtask, SubtaskStatus, - update_subtask_status, - initialize_subtask_info, + queue_subtasks_for_query, check_subtask_is_valid, + update_subtask_status, ) log = get_task_logger(__name__) @@ -154,10 +152,8 @@ def _get_course_email_context(course): def perform_delegate_email_batches(entry_id, course_id, task_input, action_name): """ Delegates emails by querying for the list of recipients who should - get the mail, chopping up into batches of settings.BULK_EMAIL_EMAILS_PER_TASK size, - and queueing up worker jobs. - - Returns the number of batches (workers) kicked off. + get the mail, chopping up into batches of no more than settings.BULK_EMAIL_EMAILS_PER_TASK + in size, and queueing up worker jobs. """ entry = InstructorTask.objects.get(pk=entry_id) # Get inputs to use in this task from the entry. @@ -208,55 +204,37 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_option = email_obj.to_option global_email_context = _get_course_email_context(course) - # Figure out the number of needed subtasks, getting id values to use for each. - recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) - total_num_emails = recipient_qset.count() - subtask_id_list = create_subtask_ids(total_num_emails, settings.BULK_EMAIL_EMAILS_PER_QUERY, settings.BULK_EMAIL_EMAILS_PER_TASK) - - # Update the InstructorTask with information about the subtasks we've defined. - log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s", - task_id, total_num_emails, course_id, email_id, to_option) - progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list) - - # Construct a generator that will return the recipients to use for each subtask. - # Pass in the desired fields to fetch for each recipient. - recipient_fields = ['profile__name', 'email'] - recipient_generator = generate_items_for_subtask( - recipient_qset, - recipient_fields, - total_num_emails, - settings.BULK_EMAIL_EMAILS_PER_QUERY, - settings.BULK_EMAIL_EMAILS_PER_TASK, - ) - - # Now create the subtasks, and start them running. This allows all the subtasks - # in the list to be submitted at the same time. - num_subtasks = len(subtask_id_list) - log.info("Task %s: Preparing to generate and queue %s subtasks for course %s, email %s, to_option %s", - task_id, num_subtasks, course_id, email_id, to_option) - num_subtasks = 0 - for recipient_list in recipient_generator: - subtask_id = subtask_id_list[num_subtasks] - num_subtasks += 1 - subtask_status_dict = SubtaskStatus.create(subtask_id).to_dict() + def _create_send_email_subtask(to_list, initial_subtask_status): + """Creates a subtask to send email to a given recipient list.""" + subtask_id = initial_subtask_status.task_id new_subtask = send_course_email.subtask( ( entry_id, email_id, - recipient_list, + to_list, global_email_context, - subtask_status_dict, + initial_subtask_status.to_dict(), ), task_id=subtask_id, routing_key=settings.BULK_EMAIL_ROUTING_KEY, ) - new_subtask.apply_async() + return new_subtask - # Sanity check: we expect the subtask to be properly summing to the original count: - if num_subtasks != len(subtask_id_list): - error_msg = "Task {}: number of tasks generated {} not equal to original total {}".format(task_id, num_subtasks, len(subtask_id_list)) - log.error(error_msg) - raise ValueError(error_msg) + recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) + recipient_fields = ['profile__name', 'email'] + + log.info("Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s", + task_id, course_id, email_id, to_option) + + progress = queue_subtasks_for_query( + entry, + action_name, + _create_send_email_subtask, + recipient_qset, + recipient_fields, + settings.BULK_EMAIL_EMAILS_PER_QUERY, + settings.BULK_EMAIL_EMAILS_PER_TASK + ) # We want to return progress here, as this is what will be stored in the # AsyncResult for the parent task as its return value. @@ -332,7 +310,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask ) except Exception: # Unexpected exception. Try to write out the failure to the entry before failing. - log.exception("Send-email task %s: failed unexpectedly!", current_task_id) + log.exception("Send-email task %s for email %s: failed unexpectedly!", current_task_id, email_id) # We got here for really unexpected reasons. Since we don't know how far # the task got in emailing, we count all recipients as having failed. # It at least keeps the counts consistent. @@ -342,22 +320,23 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask if send_exception is None: # Update the InstructorTask object that is storing its progress. - log.info("Send-email task %s: succeeded", current_task_id) + log.info("Send-email task %s for email %s: succeeded", current_task_id, email_id) update_subtask_status(entry_id, current_task_id, new_subtask_status) elif isinstance(send_exception, RetryTaskError): # If retrying, a RetryTaskError needs to be returned to Celery. # We assume that the the progress made before the retry condition # was encountered has already been updated before the retry call was made, # so we only log here. - log.warning("Send-email task %s: being retried", current_task_id) + log.warning("Send-email task %s for email %s: being retried", current_task_id, email_id) raise send_exception # pylint: disable=E0702 else: - log.error("Send-email task %s: failed: %s", current_task_id, send_exception) + log.error("Send-email task %s for email %s: failed: %s", current_task_id, email_id, send_exception) update_subtask_status(entry_id, current_task_id, new_subtask_status) raise send_exception # pylint: disable=E0702 - log.info("Send-email task %s: returning status %s", current_task_id, new_subtask_status) - return new_subtask_status + # return status in a form that can be serialized by Celery into JSON: + log.info("Send-email task %s for email %s: returning status %s", current_task_id, email_id, new_subtask_status) + return new_subtask_status.to_dict() def _filter_optouts_from_recipients(to_list, course_id): diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index a3de9e505d..642785e97d 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -16,7 +16,7 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING TASK_LOG = get_task_logger(__name__) -# Lock expiration should be long enough to allow a send_course_email task to complete. +# Lock expiration should be long enough to allow a subtask to complete. SUBTASK_LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes @@ -25,9 +25,9 @@ class DuplicateTaskException(Exception): pass -def create_subtask_ids(total_num_items, items_per_query, items_per_task): +def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task): """ - Determines number of subtasks that need to be generated, and provides a list of id values to use. + Determines number of subtasks that would be generated by _generate_items_for_subtask. This needs to be calculated before a query is executed so that the list of all subtasks can be stored in the InstructorTask before any subtasks are started. @@ -44,11 +44,10 @@ def create_subtask_ids(total_num_items, items_per_query, items_per_task): num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task))) total_num_tasks += num_tasks_this_query - # Now that the number of tasks is known, return a list of ids for each task. - return [str(uuid4()) for _ in range(total_num_tasks)] + return total_num_tasks -def generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task): +def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task): """ Generates a chunk of "items" that should be passed into a subtask. @@ -62,6 +61,7 @@ def generate_items_for_subtask(item_queryset, item_fields, total_num_items, item Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field. + Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed. """ num_queries = int(math.ceil(float(total_num_items) / float(items_per_query))) last_pk = item_queryset[0].pk - 1 @@ -82,7 +82,7 @@ def generate_items_for_subtask(item_queryset, item_fields, total_num_items, item # Sanity check: we expect the chunking to be properly summing to the original count: if num_items_queued != total_num_items: - error_msg = "Task {}: number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items) + error_msg = "Number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items) TASK_LOG.error(error_msg) raise ValueError(error_msg) @@ -105,7 +105,7 @@ class SubtaskStatus(object): 'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS) Object is not JSON-serializable, so to_dict and from_dict methods are provided so that - it can be passed as a serializable argument to tasks. + it can be passed as a serializable argument to tasks (and be reconstituted within such tasks). In future, we may want to include specific error information indicating the reason for failure. @@ -137,8 +137,7 @@ class SubtaskStatus(object): @classmethod def create(self, task_id, **options): """Construct a SubtaskStatus object.""" - newobj = self(task_id, **options) - return newobj + return self(task_id, **options) def to_dict(self): """ @@ -238,6 +237,70 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): return task_progress +def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_query, items_per_task): + """ + Generates and queues subtasks to each execute a chunk of "items" generated by a queryset. + + Arguments: + `entry` : the InstructorTask object for which subtasks are being queued. + `action_name` : a past-tense verb that can be used for constructing readable status messages. + `create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object. + Arguments are the list of items to be processed by this subtask, and a SubtaskStatus + object reflecting initial status (and containing the subtask's id). + `item_queryset` : a query set that defines the "items" that should be passed to subtasks. + `item_fields` : the fields that should be included in the dict that is returned. + These are in addition to the 'pk' field. + `items_per_query` : size of chunks to break the query operation into. + `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. + + Returns: the task progress as stored in the InstructorTask object. + + """ + task_id = entry.task_id + total_num_items = item_queryset.count() + + # Calculate the number of tasks that will be created, and create a list of ids for each task. + total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_query, items_per_task) + subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)] + + # Update the InstructorTask with information about the subtasks we've defined. + TASK_LOG.info("Task %s: updating InstructorTask %s with subtask info for %s subtasks to process %s items.", + task_id, entry.id, total_num_subtasks, total_num_items) # pylint: disable=E1101 + progress = initialize_subtask_info(entry, action_name, total_num_items, subtask_id_list) + + # Construct a generator that will return the recipients to use for each subtask. + # Pass in the desired fields to fetch for each recipient. + item_generator = _generate_items_for_subtask( + item_queryset, + item_fields, + total_num_items, + items_per_query, + items_per_task + ) + + # Now create the subtasks, and start them running. + TASK_LOG.info("Task %s: creating %s subtasks to process %s items.", + task_id, total_num_subtasks, total_num_items) + num_subtasks = 0 + for item_list in item_generator: + subtask_id = subtask_id_list[num_subtasks] + num_subtasks += 1 + subtask_status = SubtaskStatus.create(subtask_id) + new_subtask = create_subtask_fcn(item_list, subtask_status) + new_subtask.apply_async() + + # Sanity check: we expect the subtask to be properly summing to the original count: + if num_subtasks != len(subtask_id_list): + task_id = entry.task_id + error_fmt = "Task {}: number of tasks generated {} not equal to original total {}" + error_msg = error_fmt.format(task_id, num_subtasks, len(subtask_id_list)) + TASK_LOG.error(error_msg) + raise ValueError(error_msg) + + # Return the task progress as stored in the InstructorTask object. + return progress + + def _acquire_subtask_lock(task_id): """ Mark the specified task_id as being in progress. @@ -370,7 +433,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status): is the value of the SubtaskStatus.to_dict(), but could be expanded in future to store information about failure messages, progress made, etc. """ - TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s", + TASK_LOG.info("Preparing to update status for subtask %s for instructor task %d with status %s", current_task_id, entry_id, new_subtask_status) try: @@ -379,7 +442,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status): subtask_status_info = subtask_dict['status'] if current_task_id not in subtask_status_info: # unexpected error -- raise an exception - format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'" + format_str = "Unexpected task_id '{}': unable to update status for subtask of instructor task '{}'" msg = format_str.format(current_task_id, entry_id) TASK_LOG.warning(msg) raise ValueError(msg) @@ -424,7 +487,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status): entry.subtasks = json.dumps(subtask_dict) entry.task_output = InstructorTask.create_output_for_success(task_progress) - TASK_LOG.info("Task output updated to %s for email subtask %s of instructor task %d", + TASK_LOG.info("Task output updated to %s for subtask %s of instructor task %d", entry.task_output, current_task_id, entry_id) TASK_LOG.debug("about to save....") entry.save()