From afb2dae631b97cd381a2a74f8f74d2634bbb877b Mon Sep 17 00:00:00 2001 From: Adam Palay Date: Wed, 15 Apr 2015 12:03:23 -0400 Subject: [PATCH 1/2] Revert "Revert "use different queue for smaller emails (TNL-1591)"" This reverts commit 228cca4e38ec79f5cbb54098c788ab9556f602e2. --- lms/djangoapps/bulk_email/tasks.py | 23 +++++++++++++------ lms/djangoapps/bulk_email/tests/test_email.py | 7 ++++++ lms/djangoapps/instructor_task/subtasks.py | 13 +++++++++-- .../instructor_task/tests/test_subtasks.py | 1 + lms/envs/common.py | 9 ++++++++ 5 files changed, 44 insertions(+), 9 deletions(-) diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index fda5b33ae5..45b57d3111 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -222,6 +222,20 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_option = email_obj.to_option global_email_context = _get_course_email_context(course) + recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id) + recipient_fields = ['profile__name', 'email'] + + log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s", + task_id, course_id, email_id, to_option) + + total_recipients = sum([recipient_queryset.count() for recipient_queryset in recipient_qsets]) + + routing_key = settings.BULK_EMAIL_ROUTING_KEY + # if there are few enough emails, send them through a different queue + # to avoid large courses blocking emails to self and staff + if total_recipients <= settings.BULK_EMAIL_JOB_SIZE_THRESHOLD: + routing_key = settings.BULK_EMAIL_ROUTING_KEY_SMALL_JOBS + def _create_send_email_subtask(to_list, initial_subtask_status): """Creates a subtask to send email to a given recipient list.""" subtask_id = initial_subtask_status.task_id @@ -234,16 +248,10 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) initial_subtask_status.to_dict(), ), task_id=subtask_id, - routing_key=settings.BULK_EMAIL_ROUTING_KEY, + routing_key=routing_key, ) return new_subtask - recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id) - recipient_fields = ['profile__name', 'email'] - - log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s", - task_id, course_id, email_id, to_option) - progress = queue_subtasks_for_query( entry, action_name, @@ -251,6 +259,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) recipient_qsets, recipient_fields, settings.BULK_EMAIL_EMAILS_PER_TASK, + total_recipients, ) # We want to return progress here, as this is what will be stored in the diff --git a/lms/djangoapps/bulk_email/tests/test_email.py b/lms/djangoapps/bulk_email/tests/test_email.py index d825a93c44..874712523c 100644 --- a/lms/djangoapps/bulk_email/tests/test_email.py +++ b/lms/djangoapps/bulk_email/tests/test_email.py @@ -179,6 +179,13 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase) [self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students] ) + @override_settings(BULK_EMAIL_JOB_SIZE_THRESHOLD=1) + def test_send_to_all_high_queue(self): + """ + Test that email is still sent when the high priority queue is used + """ + self.test_send_to_all() + def test_no_duplicate_emails_staff_instructor(self): """ Test that no duplicate emails are sent to a course instructor that is diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index 29f4ee8ee2..7401e06f7d 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -275,7 +275,16 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): return task_progress -def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querysets, item_fields, items_per_task): +# pylint: disable=bad-continuation +def queue_subtasks_for_query( + entry, + action_name, + create_subtask_fcn, + item_querysets, + item_fields, + items_per_task, + total_num_items, +): """ Generates and queues subtasks to each execute a chunk of "items" generated by a queryset. @@ -289,12 +298,12 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys `item_fields` : the fields that should be included in the dict that is returned. These are in addition to the 'pk' field. `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. + `total_num_items` : total amount of items that will be put into subtasks Returns: the task progress as stored in the InstructorTask object. """ task_id = entry.task_id - total_num_items = sum([item_queryset.count() for item_queryset in item_querysets]) # Calculate the number of tasks that will be created, and create a list of ids for each task. total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task) diff --git a/lms/djangoapps/instructor_task/tests/test_subtasks.py b/lms/djangoapps/instructor_task/tests/test_subtasks.py index bd19caca2f..8d23d26115 100644 --- a/lms/djangoapps/instructor_task/tests/test_subtasks.py +++ b/lms/djangoapps/instructor_task/tests/test_subtasks.py @@ -54,6 +54,7 @@ class TestSubtasks(InstructorTaskCourseTestCase): item_querysets=task_querysets, item_fields=[], items_per_task=items_per_task, + total_num_items=initial_count, ) def test_queue_subtasks_for_query1(self): diff --git a/lms/envs/common.py b/lms/envs/common.py index 0c56923331..e50412f34a 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -1522,6 +1522,15 @@ BULK_EMAIL_INFINITE_RETRY_CAP = 1000 # routing key that points to it. At the moment, the name is the same. BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE +# We also define a queue for smaller jobs so that large courses don't block +# smaller emails (see BULK_EMAIL_JOB_SIZE_THRESHOLD setting) +BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = DEFAULT_PRIORITY_QUEUE + +# For emails with fewer than these number of recipients, send them through +# a different queue to avoid large courses blocking emails that are meant to be +# sent to self and staff +BULK_EMAIL_JOB_SIZE_THRESHOLD = 100 + # Flag to indicate if individual email addresses should be logged as they are sent # a bulk email message. BULK_EMAIL_LOG_SENT_EMAILS = False From f37c7934f0707ac72db236eb8b76369ddedcba5d Mon Sep 17 00:00:00 2001 From: Adam Palay Date: Thu, 16 Apr 2015 15:41:16 -0400 Subject: [PATCH 2/2] reset queue reference in aws.py, use low priority queue (TNL-1591) (TNL-2003) --- lms/envs/aws.py | 6 +++++- lms/envs/common.py | 2 +- lms/envs/yaml_config.py | 6 +++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/lms/envs/aws.py b/lms/envs/aws.py index 69ae6cf9d8..4fa99517d0 100644 --- a/lms/envs/aws.py +++ b/lms/envs/aws.py @@ -204,10 +204,14 @@ BULK_EMAIL_INFINITE_RETRY_CAP = ENV_TOKENS.get('BULK_EMAIL_INFINITE_RETRY_CAP', BULK_EMAIL_LOG_SENT_EMAILS = ENV_TOKENS.get('BULK_EMAIL_LOG_SENT_EMAILS', BULK_EMAIL_LOG_SENT_EMAILS) BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS = ENV_TOKENS.get('BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS', BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS) # We want Bulk Email running on the high-priority queue, so we define the -# routing key that points to it. At the moment, the name is the same. +# routing key that points to it. At the moment, the name is the same. # We have to reset the value here, since we have changed the value of the queue name. BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE +# We can run smaller jobs on the low priority queue. See note above for why +# we have to reset the value here. +BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = LOW_PRIORITY_QUEUE + # Theme overrides THEME_NAME = ENV_TOKENS.get('THEME_NAME', None) diff --git a/lms/envs/common.py b/lms/envs/common.py index e50412f34a..714730b66a 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -1524,7 +1524,7 @@ BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE # We also define a queue for smaller jobs so that large courses don't block # smaller emails (see BULK_EMAIL_JOB_SIZE_THRESHOLD setting) -BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = DEFAULT_PRIORITY_QUEUE +BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = LOW_PRIORITY_QUEUE # For emails with fewer than these number of recipients, send them through # a different queue to avoid large courses blocking emails that are meant to be diff --git a/lms/envs/yaml_config.py b/lms/envs/yaml_config.py index 0a1d120259..65a3a8090d 100644 --- a/lms/envs/yaml_config.py +++ b/lms/envs/yaml_config.py @@ -221,10 +221,14 @@ if 'loc_cache' not in CACHES: } # We want Bulk Email running on the high-priority queue, so we define the -# routing key that points to it. At the moment, the name is the same. +# routing key that points to it. At the moment, the name is the same. # We have to reset the value here, since we have changed the value of the queue name. BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE +# We can run smaller jobs on the low priority queue. See note above for why +# we have to reset the value here. +BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = LOW_PRIORITY_QUEUE + LANGUAGE_DICT = dict(LANGUAGES) # Additional installed apps