Merge pull request #7072 from edx/hotfix/2015-02-23
split up bulk email query for students and unenrolled course staff (TNL-...
This commit is contained in:
@@ -36,6 +36,7 @@ from django.core.urlresolvers import reverse
|
||||
from bulk_email.models import (
|
||||
CourseEmail, Optout, CourseEmailTemplate,
|
||||
SEND_TO_MYSELF, SEND_TO_ALL, TO_OPTIONS,
|
||||
SEND_TO_STAFF,
|
||||
)
|
||||
from courseware.courses import get_course, course_image_url
|
||||
from student.roles import CourseStaffRole, CourseInstructorRole
|
||||
@@ -92,25 +93,30 @@ BULK_EMAIL_FAILURE_ERRORS = (
|
||||
)
|
||||
|
||||
|
||||
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
|
||||
def _get_recipient_querysets(user_id, to_option, course_id):
|
||||
"""
|
||||
Returns a query set of email recipients corresponding to the requested to_option category.
|
||||
Returns a list of query sets of email recipients corresponding to the
|
||||
requested `to_option` category.
|
||||
|
||||
`to_option` is either SEND_TO_MYSELF, SEND_TO_STAFF, or SEND_TO_ALL.
|
||||
|
||||
Recipients who are in more than one category (e.g. enrolled in the course and are staff or self)
|
||||
will be properly deduped.
|
||||
Recipients who are in more than one category (e.g. enrolled in the course
|
||||
and are staff or self) will be properly deduped.
|
||||
"""
|
||||
if to_option not in TO_OPTIONS:
|
||||
log.error("Unexpected bulk email TO_OPTION found: %s", to_option)
|
||||
raise Exception("Unexpected bulk email TO_OPTION found: {0}".format(to_option))
|
||||
|
||||
if to_option == SEND_TO_MYSELF:
|
||||
recipient_qset = User.objects.filter(id=user_id)
|
||||
user = User.objects.filter(id=user_id)
|
||||
return [use_read_replica_if_available(user)]
|
||||
else:
|
||||
staff_qset = CourseStaffRole(course_id).users_with_role()
|
||||
instructor_qset = CourseInstructorRole(course_id).users_with_role()
|
||||
recipient_qset = (staff_qset | instructor_qset).distinct()
|
||||
staff_instructor_qset = (staff_qset | instructor_qset).distinct()
|
||||
if to_option == SEND_TO_STAFF:
|
||||
return [use_read_replica_if_available(staff_instructor_qset)]
|
||||
|
||||
if to_option == SEND_TO_ALL:
|
||||
# We also require students to have activated their accounts to
|
||||
# provide verification that the provided email address is valid.
|
||||
@@ -119,20 +125,19 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location):
|
||||
courseenrollment__course_id=course_id,
|
||||
courseenrollment__is_active=True
|
||||
)
|
||||
# Now we do some queryset sidestepping to avoid doing a DISTINCT
|
||||
# query across the course staff and the enrolled students, which
|
||||
# forces the creation of a temporary table in the db.
|
||||
unenrolled_staff_qset = recipient_qset.exclude(
|
||||
|
||||
# to avoid duplicates, we only want to email unenrolled course staff
|
||||
# members here
|
||||
unenrolled_staff_qset = staff_instructor_qset.exclude(
|
||||
courseenrollment__course_id=course_id, courseenrollment__is_active=True
|
||||
)
|
||||
# use read_replica if available:
|
||||
unenrolled_staff_qset = use_read_replica_if_available(unenrolled_staff_qset)
|
||||
|
||||
unenrolled_staff_ids = [user.id for user in unenrolled_staff_qset]
|
||||
recipient_qset = enrollment_qset | User.objects.filter(id__in=unenrolled_staff_ids)
|
||||
|
||||
# again, use read_replica if available to lighten the load for large queries
|
||||
return use_read_replica_if_available(recipient_qset)
|
||||
# use read_replica if available
|
||||
recipient_qsets = [
|
||||
use_read_replica_if_available(unenrolled_staff_qset),
|
||||
use_read_replica_if_available(enrollment_qset),
|
||||
]
|
||||
return recipient_qsets
|
||||
|
||||
|
||||
def _get_course_email_context(course):
|
||||
@@ -230,7 +235,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
)
|
||||
return new_subtask
|
||||
|
||||
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
|
||||
recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id)
|
||||
recipient_fields = ['profile__name', 'email']
|
||||
|
||||
log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s",
|
||||
@@ -240,7 +245,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
entry,
|
||||
action_name,
|
||||
_create_send_email_subtask,
|
||||
recipient_qset,
|
||||
recipient_qsets,
|
||||
recipient_fields,
|
||||
settings.BULK_EMAIL_EMAILS_PER_TASK,
|
||||
)
|
||||
|
||||
@@ -179,6 +179,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
|
||||
response = self.client.post(self.send_mail_url, test_email)
|
||||
self.assertEquals(json.loads(response.content), self.success_content)
|
||||
|
||||
# the 1 is for the instructor
|
||||
self.assertEquals(len(mail.outbox), 1 + len(self.staff) + len(self.students))
|
||||
self.assertItemsEqual(
|
||||
[e.to[0] for e in mail.outbox],
|
||||
@@ -195,12 +196,25 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
|
||||
|
||||
def test_no_duplicate_emails_enrolled_staff(self):
|
||||
"""
|
||||
Test that no duplicate emials are sent to a course instructor that is
|
||||
Test that no duplicate emails are sent to a course instructor that is
|
||||
also enrolled in the course
|
||||
"""
|
||||
CourseEnrollment.enroll(self.instructor, self.course.id)
|
||||
self.test_send_to_all()
|
||||
|
||||
def test_no_duplicate_emails_unenrolled_staff(self):
|
||||
"""
|
||||
Test that no duplicate emails are sent to a course staff that is
|
||||
not enrolled in the course, but is enrolled in other courses
|
||||
"""
|
||||
course_1 = CourseFactory.create()
|
||||
course_2 = CourseFactory.create()
|
||||
# make sure self.instructor isn't enrolled in the course
|
||||
self.assertFalse(CourseEnrollment.is_enrolled(self.instructor, self.course.id))
|
||||
CourseEnrollment.enroll(self.instructor, course_1.id)
|
||||
CourseEnrollment.enroll(self.instructor, course_2.id)
|
||||
self.test_send_to_all()
|
||||
|
||||
def test_unicode_subject_send_to_all(self):
|
||||
"""
|
||||
Make sure email (with Unicode characters) send to all goes there.
|
||||
|
||||
@@ -71,7 +71,7 @@ def track_memory_usage(metric, course_id):
|
||||
|
||||
|
||||
def _generate_items_for_subtask(
|
||||
item_queryset,
|
||||
item_querysets,
|
||||
item_fields,
|
||||
total_num_items,
|
||||
items_per_task,
|
||||
@@ -82,10 +82,10 @@ def _generate_items_for_subtask(
|
||||
Generates a chunk of "items" that should be passed into a subtask.
|
||||
|
||||
Arguments:
|
||||
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
|
||||
`item_querysets` : a list of query sets, each of which defines the "items" that should be passed to subtasks.
|
||||
`item_fields` : the fields that should be included in the dict that is returned.
|
||||
These are in addition to the 'pk' field.
|
||||
`total_num_items` : the result of item_queryset.count().
|
||||
`total_num_items` : the result of summing the count of each queryset in `item_querysets`.
|
||||
`items_per_query` : size of chunks to break the query operation into.
|
||||
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
|
||||
`course_id` : course_id of the course. Only needed for the track_memory_usage context manager.
|
||||
@@ -102,13 +102,14 @@ def _generate_items_for_subtask(
|
||||
items_for_task = []
|
||||
|
||||
with track_memory_usage('course_email.subtask_generation.memory', course_id):
|
||||
for item in item_queryset.values(*all_item_fields).iterator():
|
||||
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
|
||||
yield items_for_task
|
||||
num_items_queued += items_per_task
|
||||
items_for_task = []
|
||||
num_subtasks += 1
|
||||
items_for_task.append(item)
|
||||
for queryset in item_querysets:
|
||||
for item in queryset.values(*all_item_fields).iterator():
|
||||
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
|
||||
yield items_for_task
|
||||
num_items_queued += items_per_task
|
||||
items_for_task = []
|
||||
num_subtasks += 1
|
||||
items_for_task.append(item)
|
||||
|
||||
# yield remainder items for task, if any
|
||||
if items_for_task:
|
||||
@@ -275,7 +276,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
return task_progress
|
||||
|
||||
|
||||
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_task):
|
||||
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querysets, item_fields, items_per_task):
|
||||
"""
|
||||
Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.
|
||||
|
||||
@@ -285,7 +286,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
|
||||
`create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object.
|
||||
Arguments are the list of items to be processed by this subtask, and a SubtaskStatus
|
||||
object reflecting initial status (and containing the subtask's id).
|
||||
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
|
||||
`item_querysets` : a list of query sets that define the "items" that should be passed to subtasks.
|
||||
`item_fields` : the fields that should be included in the dict that is returned.
|
||||
These are in addition to the 'pk' field.
|
||||
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
|
||||
@@ -294,7 +295,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
|
||||
|
||||
"""
|
||||
task_id = entry.task_id
|
||||
total_num_items = item_queryset.count()
|
||||
total_num_items = sum([item_queryset.count() for item_queryset in item_querysets])
|
||||
|
||||
# Calculate the number of tasks that will be created, and create a list of ids for each task.
|
||||
total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task)
|
||||
@@ -313,7 +314,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
|
||||
# Construct a generator that will return the recipients to use for each subtask.
|
||||
# Pass in the desired fields to fetch for each recipient.
|
||||
item_list_generator = _generate_items_for_subtask(
|
||||
item_queryset,
|
||||
item_querysets,
|
||||
item_fields,
|
||||
total_num_items,
|
||||
items_per_task,
|
||||
|
||||
@@ -38,7 +38,7 @@ class TestSubtasks(InstructorTaskCourseTestCase):
|
||||
)
|
||||
|
||||
self._enroll_students_in_course(self.course.id, initial_count)
|
||||
task_queryset = CourseEnrollment.objects.filter(course_id=self.course.id)
|
||||
task_querysets = [CourseEnrollment.objects.filter(course_id=self.course.id)]
|
||||
|
||||
def initialize_subtask_info(*args): # pylint: disable=unused-argument
|
||||
"""Instead of initializing subtask info enroll some more students into course."""
|
||||
@@ -51,7 +51,7 @@ class TestSubtasks(InstructorTaskCourseTestCase):
|
||||
entry=instructor_task,
|
||||
action_name='action_name',
|
||||
create_subtask_fcn=create_subtask_fcn,
|
||||
item_queryset=task_queryset,
|
||||
item_querysets=task_querysets,
|
||||
item_fields=[],
|
||||
items_per_task=items_per_task,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user