diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 8c53c561d0..3e83f0e6e0 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -219,6 +219,20 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_option = email_obj.to_option global_email_context = _get_course_email_context(course) + recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id) + recipient_fields = ['profile__name', 'email'] + + log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s", + task_id, course_id, email_id, to_option) + + total_recipients = sum([recipient_queryset.count() for recipient_queryset in recipient_qsets]) + + routing_key = settings.BULK_EMAIL_ROUTING_KEY + # if there are few enough emails, send them through a different queue + # to avoid large courses blocking emails to self and staff + if total_recipients <= settings.BULK_EMAIL_JOB_SIZE_THRESHOLD: + routing_key = settings.BULK_EMAIL_ROUTING_KEY_SMALL_JOBS + def _create_send_email_subtask(to_list, initial_subtask_status): """Creates a subtask to send email to a given recipient list.""" subtask_id = initial_subtask_status.task_id @@ -231,16 +245,10 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) initial_subtask_status.to_dict(), ), task_id=subtask_id, - routing_key=settings.BULK_EMAIL_ROUTING_KEY, + routing_key=routing_key, ) return new_subtask - recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id) - recipient_fields = ['profile__name', 'email'] - - log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s", - task_id, course_id, email_id, to_option) - progress = queue_subtasks_for_query( entry, action_name, @@ -248,6 +256,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) recipient_qsets, recipient_fields, settings.BULK_EMAIL_EMAILS_PER_TASK, + total_recipients, ) # We want to return progress here, as this is what will be stored in the diff --git a/lms/djangoapps/bulk_email/tests/test_email.py b/lms/djangoapps/bulk_email/tests/test_email.py index d825a93c44..874712523c 100644 --- a/lms/djangoapps/bulk_email/tests/test_email.py +++ b/lms/djangoapps/bulk_email/tests/test_email.py @@ -179,6 +179,13 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase) [self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students] ) + @override_settings(BULK_EMAIL_JOB_SIZE_THRESHOLD=1) + def test_send_to_all_high_queue(self): + """ + Test that email is still sent when the high priority queue is used + """ + self.test_send_to_all() + def test_no_duplicate_emails_staff_instructor(self): """ Test that no duplicate emails are sent to a course instructor that is diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 29f4ee8ee2..7401e06f7d 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -275,7 +275,16 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): return task_progress -def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querysets, item_fields, items_per_task): +# pylint: disable=bad-continuation +def queue_subtasks_for_query( + entry, + action_name, + create_subtask_fcn, + item_querysets, + item_fields, + items_per_task, + total_num_items, +): """ Generates and queues subtasks to each execute a chunk of "items" generated by a queryset. @@ -289,12 +298,12 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys `item_fields` : the fields that should be included in the dict that is returned. These are in addition to the 'pk' field. `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. + `total_num_items` : total amount of items that will be put into subtasks Returns: the task progress as stored in the InstructorTask object. """ task_id = entry.task_id - total_num_items = sum([item_queryset.count() for item_queryset in item_querysets]) # Calculate the number of tasks that will be created, and create a list of ids for each task. total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task) diff --git a/lms/djangoapps/instructor_task/tests/test_subtasks.py b/lms/djangoapps/instructor_task/tests/test_subtasks.py index bd19caca2f..8d23d26115 100644 --- a/lms/djangoapps/instructor_task/tests/test_subtasks.py +++ b/lms/djangoapps/instructor_task/tests/test_subtasks.py @@ -54,6 +54,7 @@ class TestSubtasks(InstructorTaskCourseTestCase): item_querysets=task_querysets, item_fields=[], items_per_task=items_per_task, + total_num_items=initial_count, ) def test_queue_subtasks_for_query1(self): diff --git a/lms/envs/common.py b/lms/envs/common.py index abdfdbd01a..e047893e7b 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -1499,6 +1499,15 @@ BULK_EMAIL_INFINITE_RETRY_CAP = 1000 # routing key that points to it. At the moment, the name is the same. BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE +# We also define a queue for smaller jobs so that large courses don't block +# smaller emails (see BULK_EMAIL_JOB_SIZE_THRESHOLD setting) +BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = DEFAULT_PRIORITY_QUEUE + +# For emails with fewer than these number of recipients, send them through +# a different queue to avoid large courses blocking emails that are meant to be +# sent to self and staff +BULK_EMAIL_JOB_SIZE_THRESHOLD = 100 + # Flag to indicate if individual email addresses should be logged as they are sent # a bulk email message. BULK_EMAIL_LOG_SENT_EMAILS = False