diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index fc8aeaa878..110fe4e626 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -132,10 +132,57 @@ def _get_course_email_context(course): return email_context +def _generate_subtasks(create_subtask_fcn, recipient_qset): + """ + Generates a list of subtasks to send email to a given set of recipients. + + Arguments: + `create_subtask_fcn` : a function whose inputs are a list of recipients and a subtask_id + to assign to the new subtask. Returns the subtask that will send email to that + list of recipients. + `recipient_qset` : a query set that defines the recipients who should receive emails. + + Returns: a tuple, containing: + + * A list of subtasks that will send emails to all recipients. + * A list of subtask_ids corresponding to those subtasks. + * A count of the total number of emails being sent. + + """ + total_num_emails = recipient_qset.count() + num_queries = int(math.ceil(float(total_num_emails) / float(settings.BULK_EMAIL_EMAILS_PER_QUERY))) + last_pk = recipient_qset[0].pk - 1 + num_emails_queued = 0 + task_list = [] + subtask_id_list = [] + for _ in range(num_queries): + recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk).values('profile__name', 'email', 'pk')[:settings.BULK_EMAIL_EMAILS_PER_QUERY]) + last_pk = recipient_sublist[-1]['pk'] + num_emails_this_query = len(recipient_sublist) + num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.BULK_EMAIL_EMAILS_PER_TASK))) + chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query))) + for i in range(num_tasks_this_query): + to_list = recipient_sublist[i * chunk:i * chunk + chunk] + subtask_id = str(uuid4()) + subtask_id_list.append(subtask_id) + new_subtask = create_subtask_fcn(to_list, subtask_id) + task_list.append(new_subtask) + + num_emails_queued += num_emails_this_query + + # Sanity check: we expect the chunking to be properly summing to the original count: + if num_emails_queued != total_num_emails: + error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(num_emails_queued, total_num_emails) + log.error(error_msg) + raise ValueError(error_msg) + + return task_list, subtask_id_list, total_num_emails + + def perform_delegate_email_batches(entry_id, course_id, task_input, action_name): """ Delegates emails by querying for the list of recipients who should - get the mail, chopping up into batches of settings.EMAILS_PER_TASK size, + get the mail, chopping up into batches of settings.BULK_EMAIL_EMAILS_PER_TASK size, and queueing up worker jobs. Returns the number of batches (workers) kicked off. @@ -151,86 +198,62 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) format_msg = "Course id conflict: explicit value {} does not match task value {}" raise ValueError(format_msg.format(course_id, entry.course_id)) + # Fetch the CourseEmail. email_id = task_input['email_id'] try: email_obj = CourseEmail.objects.get(id=email_id) - except CourseEmail.DoesNotExist as exc: + except CourseEmail.DoesNotExist: # The CourseEmail object should be committed in the view function before the task # is submitted and reaches this point. log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id) raise - to_option = email_obj.to_option - # Sanity check that course for email_obj matches that of the task referencing it. if course_id != email_obj.course_id: format_msg = "Course id conflict: explicit value {} does not match email value {}" raise ValueError(format_msg.format(course_id, email_obj.course_id)) + # Fetch the course object. try: course = get_course(course_id) except ValueError: log.exception("Task %s: course not found: %s", task_id, course_id) raise - global_email_context = _get_course_email_context(course) + to_option = email_obj.to_option recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) - total_num_emails = recipient_qset.count() + global_email_context = _get_course_email_context(course) - log.info("Task %s: Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s", - task_id, total_num_emails, course_id, email_id, to_option) - - num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY))) - last_pk = recipient_qset[0].pk - 1 - num_emails_queued = 0 - task_list = [] - subtask_id_list = [] - for _ in range(num_queries): - recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk) - .values('profile__name', 'email', 'pk')[:settings.EMAILS_PER_QUERY]) - last_pk = recipient_sublist[-1]['pk'] - num_emails_this_query = len(recipient_sublist) - num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.EMAILS_PER_TASK))) - chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query))) - for i in range(num_tasks_this_query): - to_list = recipient_sublist[i * chunk:i * chunk + chunk] - subtask_id = str(uuid4()) - subtask_id_list.append(subtask_id) - subtask_status = create_subtask_status(subtask_id) - # Create subtask, passing args and kwargs. - # This includes specifying the task_id to use, so we can track it. - # Specify the routing key as part of it, which is used by - # Celery to route the task request to the right worker. - new_subtask = send_course_email.subtask( - ( - entry_id, - email_id, - to_list, - global_email_context, - subtask_status, - ), - task_id=subtask_id, - routing_key=settings.BULK_EMAIL_ROUTING_KEY, - ) - task_list.append(new_subtask) - num_emails_queued += num_emails_this_query - - # Sanity check: we expect the chunking to be properly summing to the original count: - if num_emails_queued != total_num_emails: - error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format( - task_id, num_emails_queued, total_num_emails + def _create_send_email_subtask(to_list, subtask_id): + """Creates a subtask to send email to a given recipient list.""" + subtask_status = create_subtask_status(subtask_id) + new_subtask = send_course_email.subtask( + ( + entry_id, + email_id, + to_list, + global_email_context, + subtask_status, + ), + task_id=subtask_id, + routing_key=settings.BULK_EMAIL_ROUTING_KEY, ) - log.error(error_msg) - raise Exception(error_msg) + return new_subtask + + log.info("Task %s: Preparing to generate subtasks for course %s, email %s, to_option %s", + task_id, course_id, email_id, to_option) + task_list, subtask_id_list, total_num_emails = _generate_subtasks(_create_send_email_subtask, recipient_qset) # Update the InstructorTask with information about the subtasks we've defined. + log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s", + task_id, total_num_emails, course_id, email_id, to_option) progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list) num_subtasks = len(subtask_id_list) - log.info("Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s", - num_subtasks, total_num_emails, course_id, email_id, to_option) # Now group the subtasks, and start them running. This allows all the subtasks # in the list to be submitted at the same time. + log.info("Task %s: Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s", + task_id, num_subtasks, total_num_emails, course_id, email_id, to_option) task_group = group(task_list) task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY) @@ -328,6 +351,49 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask return new_subtask_status +def _filter_optouts_from_recipients(to_list, course_id): + """ + Filters a recipient list based on student opt-outs for a given course. + + Returns the filtered recipient list, as well as the number of optouts + removed from the list. + """ + optouts = Optout.objects.filter( + course_id=course_id, + user__in=[i['pk'] for i in to_list] + ).values_list('user__email', flat=True) + optouts = set(optouts) + # Only count the num_optout for the first time the optouts are calculated. + # We assume that the number will not change on retries, and so we don't need + # to calculate it each time. + num_optout = len(optouts) + to_list = [recipient for recipient in to_list if recipient['email'] not in optouts] + return to_list, num_optout + + +def _get_source_address(course_id, course_title): + """ + Calculates an email address to be used as the 'from-address' for sent emails. + + Makes a unique from name and address for each course, e.g. + + "COURSE_TITLE" Course Staff + + """ + course_title_no_quotes = re.sub(r'"', '', course_title) + + # The course_id is assumed to be in the form 'org/course_num/run', + # so pull out the course_num. Then make sure that it can be used + # in an email address, by substituting a '_' anywhere a non-(ascii, period, or dash) + # character appears. + course_num = course_id.split('/')[1] + INVALID_CHARS = re.compile(r"[^\w.-]") + course_num = INVALID_CHARS.sub('_', course_num) + + from_addr = '"{0}" Course Staff <{1}-{2}>'.format(course_title_no_quotes, course_num, settings.BULK_EMAIL_DEFAULT_FROM_EMAIL) + return from_addr + + def _send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status): """ Performs the email sending task. @@ -371,9 +437,6 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # Get information from current task's request: task_id = subtask_status['task_id'] - # If this is a second attempt due to rate-limits, then throttle the speed at which mail is sent: - throttle = subtask_status['retried_nomax'] > 0 - # collect stats on progress: num_optout = 0 num_sent = 0 @@ -392,30 +455,11 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # that existed at that time, and we don't need to keep checking for changes # in the Optout list. if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0: - optouts = (Optout.objects.filter(course_id=course_email.course_id, - user__in=[i['pk'] for i in to_list]) - .values_list('user__email', flat=True)) - - optouts = set(optouts) - # Only count the num_optout for the first time the optouts are calculated. - # We assume that the number will not change on retries, and so we don't need - # to calculate it each time. - num_optout = len(optouts) - to_list = [recipient for recipient in to_list if recipient['email'] not in optouts] + to_list, num_optout = _filter_optouts_from_recipients(to_list, course_email.course_id) course_title = global_email_context['course_title'] subject = "[" + course_title + "] " + course_email.subject - course_title_no_quotes = re.sub(r'"', '', course_title) - course_num = course_email.course_id.split('/')[1] # course_id = 'org/course_num/run' - # Substitute a '_' anywhere a non-(ascii, period, or dash) character appears. - INVALID_CHARS = re.compile(r"[^\w.-]") - course_num = INVALID_CHARS.sub('_', course_num) - - # Make a unique from name and address for each course, eg - # "COURSE_TITLE" Course Staff - from_addr = '"{0}" Course Staff <{1}-{2}>'.format( - course_title_no_quotes, course_num, settings.DEFAULT_BULK_FROM_EMAIL - ) + from_addr = _get_source_address(course_email.course_id, course_title) course_email_template = CourseEmailTemplate.get_template() try: @@ -423,17 +467,19 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas connection.open() # Define context values to use in all course emails: - email_context = { - 'name': '', - 'email': '' - } + email_context = {'name': '', 'email': ''} email_context.update(global_email_context) while to_list: - # Update context with user-specific values from the user at the end of the list: - email = to_list[-1]['email'] + # Update context with user-specific values from the user at the end of the list. + # At the end of processing this user, they will be popped off of the to_list. + # That way, the to_list will always contain the recipients remaining to be emailed. + # This is convenient for retries, which will need to send to those who haven't + # yet been emailed, but not send to those who have already been sent to. + current_recipient = to_list[-1] + email = current_recipient['email'] email_context['email'] = email - email_context['name'] = to_list[-1]['profile__name'] + email_context['name'] = current_recipient['profile__name'] # Construct message content using templates and context: plaintext_msg = course_email_template.render_plaintext(course_email.text_message, email_context) @@ -454,7 +500,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas # for a period of time between all emails within this task. Choice of # the value depends on the number of workers that might be sending email in # parallel, and what the SES throttle rate is. - if throttle: + if subtask_status['retried_nomax'] > 0: sleep(settings.BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS) try: @@ -488,7 +534,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas log.debug('Email with id %s sent to %s', email_id, email) num_sent += 1 - # Pop the user that was emailed off the end of the list: + # Pop the user that was emailed off the end of the list only once they have + # successfully been processed. (That way, if there were a failure that + # needed to be retried, the user is still on the list.) to_list.pop() except INFINITE_RETRY_ERRORS as exc: diff --git a/lms/djangoapps/bulk_email/tests/test_email.py b/lms/djangoapps/bulk_email/tests/test_email.py index 446d3fce1c..80fc692a4a 100644 --- a/lms/djangoapps/bulk_email/tests/test_email.py +++ b/lms/djangoapps/bulk_email/tests/test_email.py @@ -243,7 +243,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase): [self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students] ) - @override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7) + @override_settings(BULK_EMAIL_EMAILS_PER_TASK=3, BULK_EMAIL_EMAILS_PER_QUERY=7) @patch('bulk_email.tasks.increment_subtask_status') def test_chunked_queries_send_numerous_emails(self, email_mock): """ diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index a4b39a6a7b..98bb5a04f6 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -76,7 +76,7 @@ class TestEmailErrors(ModuleStoreTestCase): # have every fourth email fail due to blacklisting: get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"), None, None, None]) - students = [UserFactory() for _ in xrange(settings.EMAILS_PER_TASK)] + students = [UserFactory() for _ in xrange(settings.BULK_EMAIL_EMAILS_PER_TASK)] for student in students: CourseEnrollmentFactory.create(user=student, course_id=self.course.id) @@ -93,9 +93,9 @@ class TestEmailErrors(ModuleStoreTestCase): # Test that after the rejected email, the rest still successfully send ((_initial_results), kwargs) = result.call_args self.assertEquals(kwargs['skipped'], 0) - expected_fails = int((settings.EMAILS_PER_TASK + 3) / 4.0) + expected_fails = int((settings.BULK_EMAIL_EMAILS_PER_TASK + 3) / 4.0) self.assertEquals(kwargs['failed'], expected_fails) - self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expected_fails) + self.assertEquals(kwargs['succeeded'], settings.BULK_EMAIL_EMAILS_PER_TASK - expected_fails) @patch('bulk_email.tasks.get_connection', autospec=True) @patch('bulk_email.tasks.send_course_email.retry') diff --git a/lms/djangoapps/bulk_email/tests/test_tasks.py b/lms/djangoapps/bulk_email/tests/test_tasks.py index c49f295b08..fadb4122b5 100644 --- a/lms/djangoapps/bulk_email/tests/test_tasks.py +++ b/lms/djangoapps/bulk_email/tests/test_tasks.py @@ -189,7 +189,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): def test_successful(self): # Select number of emails to fit into a single subtask. - num_emails = settings.EMAILS_PER_TASK + num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK # We also send email to the instructor: self._create_students(num_emails - 1) with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn: @@ -198,7 +198,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): def test_unactivated_user(self): # Select number of emails to fit into a single subtask. - num_emails = settings.EMAILS_PER_TASK + num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK # We also send email to the instructor: students = self._create_students(num_emails - 1) # mark a student as not yet having activated their email: @@ -211,7 +211,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): def test_skipped(self): # Select number of emails to fit into a single subtask. - num_emails = settings.EMAILS_PER_TASK + num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK # We also send email to the instructor: students = self._create_students(num_emails - 1) # have every fourth student optout: @@ -227,7 +227,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase): def _test_email_address_failures(self, exception): """Test that celery handles bad address errors by failing and not retrying.""" # Select number of emails to fit into a single subtask. - num_emails = settings.EMAILS_PER_TASK + num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK # We also send email to the instructor: self._create_students(num_emails - 1) expected_fails = int((num_emails + 3) / 4.0) diff --git a/lms/envs/aws.py b/lms/envs/aws.py index 0bb547743a..ab6f93a7e3 100644 --- a/lms/envs/aws.py +++ b/lms/envs/aws.py @@ -139,9 +139,9 @@ PAID_COURSE_REGISTRATION_CURRENCY = ENV_TOKENS.get('PAID_COURSE_REGISTRATION_CUR PAID_COURSE_REGISTRATION_CURRENCY) # Bulk Email overrides -DEFAULT_BULK_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_BULK_FROM_EMAIL', DEFAULT_BULK_FROM_EMAIL) -EMAILS_PER_TASK = ENV_TOKENS.get('EMAILS_PER_TASK', EMAILS_PER_TASK) -EMAILS_PER_QUERY = ENV_TOKENS.get('EMAILS_PER_QUERY', EMAILS_PER_QUERY) +BULK_EMAIL_DEFAULT_FROM_EMAIL = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_FROM_EMAIL', BULK_EMAIL_DEFAULT_FROM_EMAIL) +BULK_EMAIL_EMAILS_PER_TASK = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_TASK', BULK_EMAIL_EMAILS_PER_TASK) +BULK_EMAIL_EMAILS_PER_QUERY = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_QUERY', BULK_EMAIL_EMAILS_PER_QUERY) BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY) BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES) BULK_EMAIL_INFINITE_RETRY_CAP = ENV_TOKENS.get('BULK_EMAIL_INFINITE_RETRY_CAP', BULK_EMAIL_INFINITE_RETRY_CAP) @@ -149,6 +149,7 @@ BULK_EMAIL_LOG_SENT_EMAILS = ENV_TOKENS.get('BULK_EMAIL_LOG_SENT_EMAILS', BULK_E BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS = ENV_TOKENS.get('BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS', BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS) # We want Bulk Email running on the high-priority queue, so we define the # routing key that points to it. At the moment, the name is the same. +# We have to reset the value here, since we have changed the value of the queue name. BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE # Theme overrides diff --git a/lms/envs/common.py b/lms/envs/common.py index 1af56e696f..dc902a9ec8 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -798,9 +798,13 @@ CELERYD_HIJACK_ROOT_LOGGER = False ################################ Bulk Email ################################### -DEFAULT_BULK_FROM_EMAIL = 'no-reply@courseupdates.edx.org' -EMAILS_PER_TASK = 100 -EMAILS_PER_QUERY = 1000 +# Suffix used to construct 'from' email address for bulk emails. +# A course-specific identifier is prepended. +BULK_EMAIL_DEFAULT_FROM_EMAIL = 'no-reply@courseupdates.edx.org' + +# Parameters for breaking down course enrollment into subtasks. +BULK_EMAIL_EMAILS_PER_TASK = 100 +BULK_EMAIL_EMAILS_PER_QUERY = 1000 # Initial delay used for retrying tasks. Additional retries use # longer delays. Value is in seconds.