diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 1bdb32d133..5d5313bccb 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -2,11 +2,9 @@ This module contains celery task functions for handling the sending of bulk email to a course. """ -import math import re import random import json -from uuid import uuid4 from time import sleep from dogapi import dog_stats_api @@ -24,7 +22,7 @@ from boto.ses.exceptions import ( ) from boto.exception import AWSConnectionError -from celery import task, current_task, group +from celery import task, current_task from celery.utils.log import get_task_logger from celery.states import SUCCESS, FAILURE, RETRY from celery.exceptions import RetryTaskError @@ -42,9 +40,11 @@ from courseware.access import _course_staff_group_name, _course_instructor_group from courseware.courses import get_course, course_image_url from instructor_task.models import InstructorTask from instructor_task.subtasks import ( - update_subtask_status, + create_subtask_ids, + generate_items_for_subtask, create_subtask_status, increment_subtask_status, + update_subtask_status, initialize_subtask_info, check_subtask_is_valid, ) @@ -152,53 +152,6 @@ def _get_course_email_context(course): return email_context -def _generate_subtasks(create_subtask_fcn, recipient_qset): - """ - Generates a list of subtasks to send email to a given set of recipients. - - Arguments: - `create_subtask_fcn` : a function whose inputs are a list of recipients and a subtask_id - to assign to the new subtask. Returns the subtask that will send email to that - list of recipients. - `recipient_qset` : a query set that defines the recipients who should receive emails. - - Returns: a tuple, containing: - - * A list of subtasks that will send emails to all recipients. - * A list of subtask_ids corresponding to those subtasks. - * A count of the total number of emails being sent. - - """ - total_num_emails = recipient_qset.count() - num_queries = int(math.ceil(float(total_num_emails) / float(settings.BULK_EMAIL_EMAILS_PER_QUERY))) - last_pk = recipient_qset[0].pk - 1 - num_emails_queued = 0 - task_list = [] - subtask_id_list = [] - for _ in range(num_queries): - recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk).values('profile__name', 'email', 'pk')[:settings.BULK_EMAIL_EMAILS_PER_QUERY]) - last_pk = recipient_sublist[-1]['pk'] - num_emails_this_query = len(recipient_sublist) - num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.BULK_EMAIL_EMAILS_PER_TASK))) - chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query))) - for i in range(num_tasks_this_query): - to_list = recipient_sublist[i * chunk:i * chunk + chunk] - subtask_id = str(uuid4()) - subtask_id_list.append(subtask_id) - new_subtask = create_subtask_fcn(to_list, subtask_id) - task_list.append(new_subtask) - - num_emails_queued += num_emails_this_query - - # Sanity check: we expect the chunking to be properly summing to the original count: - if num_emails_queued != total_num_emails: - error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(num_emails_queued, total_num_emails) - log.error(error_msg) - raise ValueError(error_msg) - - return task_list, subtask_id_list, total_num_emails - - def perform_delegate_email_batches(entry_id, course_id, task_input, action_name): """ Delegates emails by querying for the list of recipients who should @@ -252,42 +205,59 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) log.exception("Task %s: course not found: %s", task_id, course_id) raise + # Get arguments that will be passed to every subtask. to_option = email_obj.to_option - recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) global_email_context = _get_course_email_context(course) - def _create_send_email_subtask(to_list, subtask_id): - """Creates a subtask to send email to a given recipient list.""" + # Figure out the number of needed subtasks, getting id values to use for each. + recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) + total_num_emails = recipient_qset.count() + subtask_id_list = create_subtask_ids(total_num_emails, settings.BULK_EMAIL_EMAILS_PER_QUERY, settings.BULK_EMAIL_EMAILS_PER_TASK) + + # Update the InstructorTask with information about the subtasks we've defined. + log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s", + task_id, total_num_emails, course_id, email_id, to_option) + progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list) + + # Construct a generator that will return the recipients to use for each subtask. + # Pass in the desired fields to fetch for each recipient. + recipient_fields = ['profile__name', 'email'] + recipient_generator = generate_items_for_subtask( + recipient_qset, + recipient_fields, + total_num_emails, + settings.BULK_EMAIL_EMAILS_PER_QUERY, + settings.BULK_EMAIL_EMAILS_PER_TASK, + ) + + # Now create the subtasks, and start them running. This allows all the subtasks + # in the list to be submitted at the same time. + num_subtasks = len(subtask_id_list) + log.info("Task %s: Preparing to generate and queue %s subtasks for course %s, email %s, to_option %s", + task_id, num_subtasks, course_id, email_id, to_option) + num_subtasks = 0 + for recipient_list in recipient_generator: + subtask_id = subtask_id_list[num_subtasks] + num_subtasks += 1 subtask_status = create_subtask_status(subtask_id) new_subtask = send_course_email.subtask( ( entry_id, email_id, - to_list, + recipient_list, global_email_context, subtask_status, ), task_id=subtask_id, routing_key=settings.BULK_EMAIL_ROUTING_KEY, ) - return new_subtask + new_subtask.apply_async() - log.info("Task %s: Preparing to generate subtasks for course %s, email %s, to_option %s", - task_id, course_id, email_id, to_option) - task_list, subtask_id_list, total_num_emails = _generate_subtasks(_create_send_email_subtask, recipient_qset) - - # Update the InstructorTask with information about the subtasks we've defined. - log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s", - task_id, total_num_emails, course_id, email_id, to_option) - progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list) - num_subtasks = len(subtask_id_list) - - # Now group the subtasks, and start them running. This allows all the subtasks - # in the list to be submitted at the same time. - log.info("Task %s: Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s", - task_id, num_subtasks, total_num_emails, course_id, email_id, to_option) - task_group = group(task_list) - task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY) + # Sanity check: we expect the subtask to be properly summing to the original count: + if num_subtasks != len(subtask_id_list): + error_msg = "Task {}: number of tasks generated {} not equal to original total {}".format(task_id, num_subtasks, len(subtask_id_list)) + log.error(error_msg) + raise ValueError(error_msg) # We want to return progress here, as this is what will be stored in the # AsyncResult for the parent task as its return value. diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 68bd24a1f8..350c09ab96 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -3,6 +3,8 @@ This module contains celery task functions for handling the management of subtas """ from time import time import json +from uuid import uuid4 +import math from celery.utils.log import get_task_logger from celery.states import SUCCESS, READY_STATES, RETRY @@ -23,6 +25,68 @@ class DuplicateTaskException(Exception): pass +def create_subtask_ids(total_num_items, items_per_query, items_per_task): + """ + Determines number of subtasks that need to be generated, and provides a list of id values to use. + + This needs to be calculated before a query is executed so that the list of all subtasks can be + stored in the InstructorTask before any subtasks are started. + + The number of subtask_id values returned by this should match the number of chunks returned + by the generate_items_for_subtask generator. + """ + total_num_tasks = 0 + num_queries = int(math.ceil(float(total_num_items) / float(items_per_query))) + num_items_remaining = total_num_items + for _ in range(num_queries): + num_items_this_query = min(num_items_remaining, items_per_query) + num_items_remaining -= num_items_this_query + num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task))) + total_num_tasks += num_tasks_this_query + + # Now that the number of tasks is known, return a list of ids for each task. + return [str(uuid4()) for _ in range(total_num_tasks)] + + +def generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task): + """ + Generates a chunk of "items" that should be passed into a subtask. + + Arguments: + `item_queryset` : a query set that defines the "items" that should be passed to subtasks. + `item_fields` : the fields that should be included in the dict that is returned. + These are in addition to the 'pk' field. + `total_num_items` : the result of item_queryset.count(). + `items_per_query` : size of chunks to break the query operation into. + `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. + + Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field. + + """ + num_queries = int(math.ceil(float(total_num_items) / float(items_per_query))) + last_pk = item_queryset[0].pk - 1 + num_items_queued = 0 + all_item_fields = list(item_fields) + all_item_fields.append('pk') + for _ in range(num_queries): + item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query]) + last_pk = item_sublist[-1]['pk'] + num_items_this_query = len(item_sublist) + num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task))) + chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query))) + for i in range(num_tasks_this_query): + items_for_task = item_sublist[i * chunk:i * chunk + chunk] + yield items_for_task + + num_items_queued += num_items_this_query + + # Sanity check: we expect the chunking to be properly summing to the original count: + if num_items_queued != total_num_items: + error_msg = "Task {}: number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items) + TASK_LOG.error(error_msg) + raise ValueError(error_msg) + + def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): """ Create and return a dict for tracking the status of a subtask.