diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index bf25e4ebfa..08c07037d4 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -22,7 +22,6 @@ from django.contrib.auth.models import User, Group from django.core.mail import EmailMultiAlternatives, get_connection from django.http import Http404 from django.core.urlresolvers import reverse -from django.db import transaction from bulk_email.models import ( CourseEmail, Optout, CourseEmailTemplate, @@ -30,12 +29,16 @@ from bulk_email.models import ( ) from courseware.access import _course_staff_group_name, _course_instructor_group_name from courseware.courses import get_course_by_id, course_image_url -from instructor_task.models import InstructorTask, PROGRESS, QUEUING +from instructor_task.models import InstructorTask +from instructor_task.subtasks import ( + update_subtask_result, update_subtask_status, create_subtask_result, + update_instructor_task_for_subtasks +) log = get_task_logger(__name__) -def get_recipient_queryset(user_id, to_option, course_id, course_location): +def _get_recipient_queryset(user_id, to_option, course_id, course_location): """ Generates a query set corresponding to the requested category. @@ -65,7 +68,7 @@ def get_recipient_queryset(user_id, to_option, course_id, course_location): return recipient_qset -def get_course_email_context(course): +def _get_course_email_context(course): """ Returns context arguments to apply to all emails, independent of recipient. """ @@ -125,27 +128,13 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) log.exception("get_course_by_id failed: %s", exc.args[0]) raise Exception("get_course_by_id failed: " + exc.args[0]) - global_email_context = get_course_email_context(course) - recipient_qset = get_recipient_queryset(user_id, to_option, course_id, course.location) + global_email_context = _get_course_email_context(course) + recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) total_num_emails = recipient_qset.count() log.info("Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s", total_num_emails, course_id, email_id, to_option) - # At this point, we have some status that we can report, as to the magnitude of the overall - # task. That is, we know the total. Set that, and our subtasks should work towards that goal. - # Note that we add start_time in here, so that it can be used - # by subtasks to calculate duration_ms values: - progress = {'action_name': action_name, - 'attempted': 0, - 'failed': 0, - 'skipped': 0, - 'succeeded': 0, - 'total': total_num_emails, - 'duration_ms': int(0), - 'start_time': time(), - } - num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY))) last_pk = recipient_qset[0].pk - 1 num_workers = 0 @@ -166,7 +155,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_list = recipient_sublist[i * chunk:i * chunk + chunk] subtask_id = str(uuid4()) subtask_id_list.append(subtask_id) - subtask_progress = update_subtask_result(None, 0, 0, 0) + subtask_progress = create_subtask_result() task_list.append(send_course_email.subtask(( entry_id, email_id, @@ -177,24 +166,9 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) )) num_workers += num_tasks_this_query - # Before we actually start running the tasks we've defined, - # the InstructorTask needs to be updated with their information. - # So we update the InstructorTask object here, not in the return. - # The monitoring code knows that it shouldn't go to the InstructorTask's task's - # Result for its progress when there are subtasks. So we accumulate - # the results of each subtask as it completes into the InstructorTask. - entry.task_output = InstructorTask.create_output_for_success(progress) - entry.task_state = PROGRESS - - # now write out the subtasks information. + # Update the InstructorTask with information about the subtasks we've defined. + progress = update_instructor_task_for_subtasks(entry, action_name, total_num_emails, subtask_id_list) num_subtasks = len(subtask_id_list) - subtask_status = dict.fromkeys(subtask_id_list, QUEUING) - subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'status': subtask_status} - entry.subtasks = json.dumps(subtask_dict) - - # and save the entry immediately, before any subtasks actually start work: - entry.save_now() - log.info("Preparing to queue %d email tasks for course %s, email %s, to %s", num_subtasks, course_id, email_id, to_option) @@ -215,62 +189,6 @@ def _get_current_task(): return current_task -@transaction.commit_manually -def _update_subtask_status(entry_id, current_task_id, status, subtask_result): - """ - Update the status of the subtask in the parent InstructorTask object tracking its progress. - """ - log.info("Preparing to update status for email subtask %s for instructor task %d with status %s", - current_task_id, entry_id, subtask_result) - - try: - entry = InstructorTask.objects.select_for_update().get(pk=entry_id) - subtask_dict = json.loads(entry.subtasks) - subtask_status = subtask_dict['status'] - if current_task_id not in subtask_status: - # unexpected error -- raise an exception - format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'" - msg = format_str.format(current_task_id, entry_id) - log.warning(msg) - raise ValueError(msg) - subtask_status[current_task_id] = status - - # Update the parent task progress - task_progress = json.loads(entry.task_output) - start_time = task_progress['start_time'] - task_progress['duration_ms'] = int((time() - start_time) * 1000) - if subtask_result is not None: - for statname in ['attempted', 'succeeded', 'failed', 'skipped']: - task_progress[statname] += subtask_result[statname] - - # Figure out if we're actually done (i.e. this is the last task to complete). - # This is easier if we just maintain a counter, rather than scanning the - # entire subtask_status dict. - if status == SUCCESS: - subtask_dict['succeeded'] += 1 - else: - subtask_dict['failed'] += 1 - num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed'] - # If we're done with the last task, update the parent status to indicate that: - if num_remaining <= 0: - entry.task_state = SUCCESS - entry.subtasks = json.dumps(subtask_dict) - entry.task_output = InstructorTask.create_output_for_success(task_progress) - - log.info("Task output updated to %s for email subtask %s of instructor task %d", - entry.task_output, current_task_id, entry_id) - # TODO: temporary -- switch to debug once working - log.info("about to save....") - entry.save() - except: - log.exception("Unexpected error while updating InstructorTask.") - transaction.rollback() - else: - # TODO: temporary -- switch to debug once working - log.info("about to commit....") - transaction.commit() - - @task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102 def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_progress): """ @@ -307,10 +225,10 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask ) if send_exception is None: # Update the InstructorTask object that is storing its progress. - _update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value) + update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value) else: log.error("background task (%s) failed: %s", current_task_id, send_exception) - _update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value) + update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value) raise send_exception except Exception: @@ -318,7 +236,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask _, exception, traceback = exc_info() traceback_string = format_exc(traceback) if traceback is not None else '' log.error("background task (%s) failed: %s %s", current_task_id, exception, traceback_string) - _update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress) + update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress) raise return course_email_result_value @@ -453,6 +371,9 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask exc=exc, countdown=(2 ** retry_index) * 15 ) + # TODO: what happens if there are no more retries, because the maximum has been reached? + # Assume that this then just results in the "exc" being raised directly, which means that the + # subtask status is not going to get updated correctly. except Exception as exc: # If we have a general exception for this request, we need to figure out what to do with it. @@ -470,18 +391,6 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), None -def update_subtask_result(previous_result, new_num_sent, new_num_error, new_num_optout): - """Return the result of course_email sending as a dict (not a string).""" - attempted = new_num_sent + new_num_error - current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error} - # add in any previous results: - if previous_result is not None: - for keyname in current_result: - if keyname in previous_result: - current_result[keyname] += previous_result[keyname] - return current_result - - def _statsd_tag(course_title): """ Calculate the tag we will use for DataDog. diff --git a/lms/djangoapps/bulk_email/tests/test_email.py b/lms/djangoapps/bulk_email/tests/test_email.py index c0cfdea325..6b3d79e468 100644 --- a/lms/djangoapps/bulk_email/tests/test_email.py +++ b/lms/djangoapps/bulk_email/tests/test_email.py @@ -15,6 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory from bulk_email.models import Optout +from instructor_task.subtasks import update_subtask_result STAFF_COUNT = 3 STUDENT_COUNT = 10 @@ -33,7 +34,7 @@ class MockCourseEmailResult(object): def mock_update_subtask_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613 """Increments count of number of emails sent.""" self.emails_sent += sent - return True + return update_subtask_result(prev_results, sent, failed, output) return mock_update_subtask_result diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py new file mode 100644 index 0000000000..22c77f050c --- /dev/null +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -0,0 +1,125 @@ +""" +This module contains celery task functions for handling the management of subtasks. +""" +from time import time +import json + +from celery.utils.log import get_task_logger +from celery.states import SUCCESS + +from django.db import transaction + +from instructor_task.models import InstructorTask, PROGRESS, QUEUING + +log = get_task_logger(__name__) + + +def update_subtask_result(previous_result, new_num_sent, new_num_error, new_num_optout): + """Return the result of course_email sending as a dict (not a string).""" + attempted = new_num_sent + new_num_error + current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error} + # add in any previous results: + if previous_result is not None: + for keyname in current_result: + if keyname in previous_result: + current_result[keyname] += previous_result[keyname] + return current_result + + +def create_subtask_result(): + return update_subtask_result(None, 0, 0, 0) + + +def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_id_list): + """ + Store initial subtask information to InstructorTask object. + + # Before we actually start running the tasks we've defined, + # the InstructorTask needs to be updated with their information. + # So we update the InstructorTask object here, not in the return. + # The monitoring code knows that it shouldn't go to the InstructorTask's task's + # Result for its progress when there are subtasks. So we accumulate + # the results of each subtask as it completes into the InstructorTask. + # At this point, we have some status that we can report, as to the magnitude of the overall + # task. That is, we know the total. Set that, and our subtasks should work towards that goal. + # Note that we add start_time in here, so that it can be used + # by subtasks to calculate duration_ms values: + """ + progress = { + 'action_name': action_name, + 'attempted': 0, + 'failed': 0, + 'skipped': 0, + 'succeeded': 0, + 'total': total_num, + 'duration_ms': int(0), + 'start_time': time() + } + entry.task_output = InstructorTask.create_output_for_success(progress) + entry.task_state = PROGRESS + + # Write out the subtasks information. + num_subtasks = len(subtask_id_list) + subtask_status = dict.fromkeys(subtask_id_list, QUEUING) + subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'status': subtask_status} + entry.subtasks = json.dumps(subtask_dict) + + # and save the entry immediately, before any subtasks actually start work: + entry.save_now() + return progress + + +@transaction.commit_manually +def update_subtask_status(entry_id, current_task_id, status, subtask_result): + """ + Update the status of the subtask in the parent InstructorTask object tracking its progress. + """ + log.info("Preparing to update status for email subtask %s for instructor task %d with status %s", + current_task_id, entry_id, subtask_result) + + try: + entry = InstructorTask.objects.select_for_update().get(pk=entry_id) + subtask_dict = json.loads(entry.subtasks) + subtask_status = subtask_dict['status'] + if current_task_id not in subtask_status: + # unexpected error -- raise an exception + format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'" + msg = format_str.format(current_task_id, entry_id) + log.warning(msg) + raise ValueError(msg) + subtask_status[current_task_id] = status + + # Update the parent task progress + task_progress = json.loads(entry.task_output) + start_time = task_progress['start_time'] + task_progress['duration_ms'] = int((time() - start_time) * 1000) + if subtask_result is not None: + for statname in ['attempted', 'succeeded', 'failed', 'skipped']: + task_progress[statname] += subtask_result[statname] + + # Figure out if we're actually done (i.e. this is the last task to complete). + # This is easier if we just maintain a counter, rather than scanning the + # entire subtask_status dict. + if status == SUCCESS: + subtask_dict['succeeded'] += 1 + else: + subtask_dict['failed'] += 1 + num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed'] + # If we're done with the last task, update the parent status to indicate that: + if num_remaining <= 0: + entry.task_state = SUCCESS + entry.subtasks = json.dumps(subtask_dict) + entry.task_output = InstructorTask.create_output_for_success(task_progress) + + log.info("Task output updated to %s for email subtask %s of instructor task %d", + entry.task_output, current_task_id, entry_id) + # TODO: temporary -- switch to debug once working + log.info("about to save....") + entry.save() + except: + log.exception("Unexpected error while updating InstructorTask.") + transaction.rollback() + else: + # TODO: temporary -- switch to debug once working + log.info("about to commit....") + transaction.commit()