Merge pull request #3841 from edx/fix/large-email-queries-2
Fix/large email queries 2
This commit is contained in:
@@ -5,6 +5,9 @@ These are notable changes in edx-platform. This is a rolling list of changes,
|
||||
in roughly chronological order, most recent first. Add your entries at or near
|
||||
the top. Include a label indicating the component affected.
|
||||
|
||||
LMS: Update bulk email implementation to lessen load on the database
|
||||
by consolidating chunked queries for recipients into a single query.
|
||||
|
||||
Blades: Fix link to javascript file in ChoiceTextResponse. BLD-1103.
|
||||
|
||||
All: refactored code to handle course_ids, module_ids, etc in a cleaner way.
|
||||
|
||||
@@ -46,6 +46,7 @@ from instructor_task.subtasks import (
|
||||
update_subtask_status,
|
||||
)
|
||||
from xmodule.modulestore import Location
|
||||
from util.query import use_read_replica_if_available
|
||||
|
||||
log = get_task_logger(__name__)
|
||||
|
||||
@@ -109,7 +110,7 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location):
|
||||
else:
|
||||
staff_qset = CourseStaffRole(course_id).users_with_role()
|
||||
instructor_qset = CourseInstructorRole(course_id).users_with_role()
|
||||
recipient_qset = staff_qset | instructor_qset
|
||||
recipient_qset = (staff_qset | instructor_qset).distinct()
|
||||
if to_option == SEND_TO_ALL:
|
||||
# We also require students to have activated their accounts to
|
||||
# provide verification that the provided email address is valid.
|
||||
@@ -118,11 +119,20 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location):
|
||||
courseenrollment__course_id=course_id,
|
||||
courseenrollment__is_active=True
|
||||
)
|
||||
recipient_qset = recipient_qset | enrollment_qset
|
||||
recipient_qset = recipient_qset.distinct()
|
||||
# Now we do some queryset sidestepping to avoid doing a DISTINCT
|
||||
# query across the course staff and the enrolled students, which
|
||||
# forces the creation of a temporary table in the db.
|
||||
unenrolled_staff_qset = recipient_qset.exclude(
|
||||
courseenrollment__course_id=course_id, courseenrollment__is_active=True
|
||||
)
|
||||
# use read_replica if available:
|
||||
unenrolled_staff_qset = use_read_replica_if_available(unenrolled_staff_qset)
|
||||
|
||||
recipient_qset = recipient_qset.order_by('pk')
|
||||
return recipient_qset
|
||||
unenrolled_staff_ids = [user.id for user in unenrolled_staff_qset]
|
||||
recipient_qset = enrollment_qset | User.objects.filter(id__in=unenrolled_staff_ids)
|
||||
|
||||
# again, use read_replica if available to lighten the load for large queries
|
||||
return use_read_replica_if_available(recipient_qset)
|
||||
|
||||
|
||||
def _get_course_email_context(course):
|
||||
@@ -232,8 +242,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
_create_send_email_subtask,
|
||||
recipient_qset,
|
||||
recipient_fields,
|
||||
settings.BULK_EMAIL_EMAILS_PER_QUERY,
|
||||
settings.BULK_EMAIL_EMAILS_PER_TASK
|
||||
settings.BULK_EMAIL_EMAILS_PER_TASK,
|
||||
)
|
||||
|
||||
# We want to return progress here, as this is what will be stored in the
|
||||
|
||||
@@ -20,6 +20,8 @@ from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from bulk_email.models import Optout
|
||||
from instructor_task.subtasks import update_subtask_status
|
||||
from student.roles import CourseStaffRole
|
||||
from student.models import CourseEnrollment
|
||||
|
||||
STAFF_COUNT = 3
|
||||
STUDENT_COUNT = 10
|
||||
@@ -176,6 +178,22 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
|
||||
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
|
||||
)
|
||||
|
||||
def test_no_duplicate_emails_staff_instructor(self):
|
||||
"""
|
||||
Test that no duplicate emails are sent to a course instructor that is
|
||||
also course staff
|
||||
"""
|
||||
CourseStaffRole(self.course.id).add_users(self.instructor)
|
||||
self.test_send_to_all()
|
||||
|
||||
def test_no_duplicate_emails_enrolled_staff(self):
|
||||
"""
|
||||
Test that no duplicate emials are sent to a course instructor that is
|
||||
also enrolled in the course
|
||||
"""
|
||||
CourseEnrollment.enroll(self.instructor, self.course.id)
|
||||
self.test_send_to_all()
|
||||
|
||||
def test_unicode_subject_send_to_all(self):
|
||||
"""
|
||||
Make sure email (with Unicode characters) send to all goes there.
|
||||
@@ -260,7 +278,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
|
||||
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
|
||||
)
|
||||
|
||||
@override_settings(BULK_EMAIL_EMAILS_PER_TASK=3, BULK_EMAIL_EMAILS_PER_QUERY=7)
|
||||
@override_settings(BULK_EMAIL_EMAILS_PER_TASK=3)
|
||||
@patch('bulk_email.tasks.update_subtask_status')
|
||||
def test_chunked_queries_send_numerous_emails(self, email_mock):
|
||||
"""
|
||||
|
||||
@@ -29,29 +29,20 @@ class DuplicateTaskException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task):
|
||||
def _get_number_of_subtasks(total_num_items, items_per_task):
|
||||
"""
|
||||
Determines number of subtasks that would be generated by _generate_items_for_subtask.
|
||||
|
||||
This needs to be calculated before a query is executed so that the list of all subtasks can be
|
||||
This needs to be calculated before the query is executed so that the list of all subtasks can be
|
||||
stored in the InstructorTask before any subtasks are started.
|
||||
|
||||
The number of subtask_id values returned by this should match the number of chunks returned
|
||||
by the generate_items_for_subtask generator.
|
||||
"""
|
||||
total_num_tasks = 0
|
||||
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
|
||||
num_items_remaining = total_num_items
|
||||
for _ in range(num_queries):
|
||||
num_items_this_query = min(num_items_remaining, items_per_query)
|
||||
num_items_remaining -= num_items_this_query
|
||||
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
|
||||
total_num_tasks += num_tasks_this_query
|
||||
|
||||
return total_num_tasks
|
||||
return int(math.ceil(float(total_num_items) / float(items_per_task)))
|
||||
|
||||
|
||||
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, total_num_subtasks, items_per_query, items_per_task):
|
||||
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_task, total_num_subtasks):
|
||||
"""
|
||||
Generates a chunk of "items" that should be passed into a subtask.
|
||||
|
||||
@@ -67,41 +58,31 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, tot
|
||||
|
||||
Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.
|
||||
"""
|
||||
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
|
||||
last_pk = item_queryset.order_by('pk')[0].pk - 1
|
||||
num_items_queued = 0
|
||||
available_num_subtasks = total_num_subtasks
|
||||
all_item_fields = list(item_fields)
|
||||
all_item_fields.append('pk')
|
||||
num_subtasks = 0
|
||||
|
||||
for query_number in range(num_queries):
|
||||
# In case total_num_items has increased since it was initially calculated
|
||||
# include all remaining items in last query.
|
||||
item_sublist = item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)
|
||||
if query_number < num_queries - 1:
|
||||
item_sublist = list(item_sublist[:items_per_query])
|
||||
else:
|
||||
item_sublist = list(item_sublist)
|
||||
|
||||
last_pk = item_sublist[-1]['pk']
|
||||
num_items_this_query = len(item_sublist)
|
||||
|
||||
# In case total_num_items has increased since it was initially calculated just distribute the extra
|
||||
# items among the available subtasks.
|
||||
num_tasks_this_query = min(available_num_subtasks, int(math.ceil(float(num_items_this_query) / float(items_per_task))))
|
||||
available_num_subtasks -= num_tasks_this_query
|
||||
|
||||
chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query)))
|
||||
for i in range(num_tasks_this_query):
|
||||
items_for_task = item_sublist[i * chunk:i * chunk + chunk]
|
||||
items_for_task = []
|
||||
for item in item_queryset.values(*all_item_fields).iterator():
|
||||
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
|
||||
yield items_for_task
|
||||
num_items_queued += items_per_task
|
||||
items_for_task = []
|
||||
num_subtasks += 1
|
||||
items_for_task.append(item)
|
||||
|
||||
num_items_queued += num_items_this_query
|
||||
# yield remainder items for task, if any
|
||||
if items_for_task:
|
||||
yield items_for_task
|
||||
num_items_queued += len(items_for_task)
|
||||
|
||||
# Because queueing does not happen in one transaction the number of items in the queryset may change
|
||||
# from the initial count. For example if the queryset is of the CourseEnrollment model students may
|
||||
# enroll or unenroll while queueing is in progress. The purpose of the original count is to estimate the
|
||||
# number of subtasks needed to perform the requested task.
|
||||
# Note, depending on what kind of DB is used, it's possible for the queryset
|
||||
# we iterate over to change in the course of the query. Therefore it's
|
||||
# possible that there are more (or fewer) items queued than were initially
|
||||
# calculated. It also means it's possible that the last task contains
|
||||
# more items than items_per_task allows. We expect this to be a small enough
|
||||
# number as to be negligible.
|
||||
if num_items_queued != total_num_items:
|
||||
TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items)
|
||||
|
||||
@@ -256,7 +237,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
return task_progress
|
||||
|
||||
|
||||
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_query, items_per_task):
|
||||
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_task):
|
||||
"""
|
||||
Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.
|
||||
|
||||
@@ -269,7 +250,6 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
|
||||
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
|
||||
`item_fields` : the fields that should be included in the dict that is returned.
|
||||
These are in addition to the 'pk' field.
|
||||
`items_per_query` : size of chunks to break the query operation into.
|
||||
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
|
||||
|
||||
Returns: the task progress as stored in the InstructorTask object.
|
||||
@@ -279,7 +259,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
|
||||
total_num_items = item_queryset.count()
|
||||
|
||||
# Calculate the number of tasks that will be created, and create a list of ids for each task.
|
||||
total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_query, items_per_task)
|
||||
total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task)
|
||||
subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)]
|
||||
|
||||
# Update the InstructorTask with information about the subtasks we've defined.
|
||||
@@ -293,9 +273,8 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
|
||||
item_queryset,
|
||||
item_fields,
|
||||
total_num_items,
|
||||
items_per_task,
|
||||
total_num_subtasks,
|
||||
items_per_query,
|
||||
items_per_task
|
||||
)
|
||||
|
||||
# Now create the subtasks, and start them running.
|
||||
|
||||
@@ -26,7 +26,7 @@ class TestSubtasks(InstructorTaskCourseTestCase):
|
||||
random_id = uuid4().hex[:8]
|
||||
self.create_student(username='student{0}'.format(random_id))
|
||||
|
||||
def _queue_subtasks(self, create_subtask_fcn, items_per_query, items_per_task, initial_count, extra_count):
|
||||
def _queue_subtasks(self, create_subtask_fcn, items_per_task, initial_count, extra_count):
|
||||
"""Queue subtasks while enrolling more students into course in the middle of the process."""
|
||||
|
||||
task_id = str(uuid4())
|
||||
@@ -53,43 +53,29 @@ class TestSubtasks(InstructorTaskCourseTestCase):
|
||||
create_subtask_fcn=create_subtask_fcn,
|
||||
item_queryset=task_queryset,
|
||||
item_fields=[],
|
||||
items_per_query=items_per_query,
|
||||
items_per_task=items_per_task,
|
||||
)
|
||||
|
||||
def test_queue_subtasks_for_query1(self):
|
||||
"""Test queue_subtasks_for_query() if in last query the subtasks only need to accommodate < items_per_tasks items."""
|
||||
"""Test queue_subtasks_for_query() if the last subtask only needs to accommodate < items_per_tasks items."""
|
||||
|
||||
mock_create_subtask_fcn = Mock()
|
||||
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 1)
|
||||
self._queue_subtasks(mock_create_subtask_fcn, 3, 7, 1)
|
||||
|
||||
# Check number of items for each subtask
|
||||
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 3)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 2)
|
||||
|
||||
def test_queue_subtasks_for_query2(self):
|
||||
"""Test queue_subtasks_for_query() if in last query the subtasks need to accommodate > items_per_task items."""
|
||||
"""Test queue_subtasks_for_query() if the last subtask needs to accommodate > items_per_task items."""
|
||||
|
||||
mock_create_subtask_fcn = Mock()
|
||||
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 3)
|
||||
self._queue_subtasks(mock_create_subtask_fcn, 3, 8, 3)
|
||||
|
||||
# Check number of items for each subtask
|
||||
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 5)
|
||||
|
||||
def test_queue_subtasks_for_query3(self):
|
||||
"""Test queue_subtasks_for_query() if in last query the number of items available > items_per_query."""
|
||||
|
||||
mock_create_subtask_fcn = Mock()
|
||||
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 11, 3)
|
||||
|
||||
# Check number of items for each subtask
|
||||
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 4)
|
||||
self.assertEqual(len(mock_create_subtask_fcn_args[3][0][0]), 4)
|
||||
|
||||
@@ -185,7 +185,6 @@ PAYMENT_REPORT_GENERATOR_GROUP = ENV_TOKENS.get('PAYMENT_REPORT_GENERATOR_GROUP'
|
||||
# Bulk Email overrides
|
||||
BULK_EMAIL_DEFAULT_FROM_EMAIL = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_FROM_EMAIL', BULK_EMAIL_DEFAULT_FROM_EMAIL)
|
||||
BULK_EMAIL_EMAILS_PER_TASK = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_TASK', BULK_EMAIL_EMAILS_PER_TASK)
|
||||
BULK_EMAIL_EMAILS_PER_QUERY = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_QUERY', BULK_EMAIL_EMAILS_PER_QUERY)
|
||||
BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY)
|
||||
BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES)
|
||||
BULK_EMAIL_INFINITE_RETRY_CAP = ENV_TOKENS.get('BULK_EMAIL_INFINITE_RETRY_CAP', BULK_EMAIL_INFINITE_RETRY_CAP)
|
||||
|
||||
@@ -1100,7 +1100,6 @@ BULK_EMAIL_DEFAULT_FROM_EMAIL = 'no-reply@example.com'
|
||||
|
||||
# Parameters for breaking down course enrollment into subtasks.
|
||||
BULK_EMAIL_EMAILS_PER_TASK = 100
|
||||
BULK_EMAIL_EMAILS_PER_QUERY = 1000
|
||||
|
||||
# Initial delay used for retrying tasks. Additional retries use
|
||||
# longer delays. Value is in seconds.
|
||||
|
||||
Reference in New Issue
Block a user