From 2337b6d8639cddf700a5ce50138d2ee744be12e1 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Wed, 18 Sep 2013 12:59:31 -0400 Subject: [PATCH] Pass status into course_email for tracking retry status. --- lms/djangoapps/bulk_email/tasks.py | 110 +++++++++++------- lms/djangoapps/bulk_email/tests/test_email.py | 5 +- lms/djangoapps/instructor_task/api_helper.py | 2 +- lms/djangoapps/instructor_task/tasks.py | 6 +- .../instructor_task/tasks_helper.py | 4 +- .../instructor_task/tests/test_tasks.py | 2 +- 6 files changed, 80 insertions(+), 49 deletions(-) diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index d4a3d1e4d3..70cb4b3b02 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -52,8 +52,10 @@ def get_recipient_queryset(user_id, to_option, course_id, course_location): instructor_qset = instructor_group.user_set.all() recipient_qset = staff_qset | instructor_qset if to_option == SEND_TO_ALL: - enrollment_qset = User.objects.filter(courseenrollment__course_id=course_id, - courseenrollment__is_active=True) + enrollment_qset = User.objects.filter( + courseenrollment__course_id=course_id, + courseenrollment__is_active=True + ) recipient_qset = recipient_qset | enrollment_qset recipient_qset = recipient_qset.distinct() else: @@ -164,12 +166,13 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_list = recipient_sublist[i * chunk:i * chunk + chunk] subtask_id = str(uuid4()) subtask_id_list.append(subtask_id) + subtask_progress = _course_email_result(None, 0, 0, 0) task_list.append(send_course_email.subtask(( entry_id, email_id, to_list, global_email_context, - False + subtask_progress, ), task_id=subtask_id )) num_workers += num_tasks_this_query @@ -206,6 +209,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) return progress +# TODO: figure out if we really need this after all (for unit tests...) def _get_current_task(): """Stub to make it easier to test without actually running Celery""" return current_task @@ -224,47 +228,51 @@ def _update_subtask_status(entry_id, current_task_id, status, subtask_result): subtask_dict = json.loads(entry.subtasks) subtask_status = subtask_dict['status'] if current_task_id not in subtask_status: - # unexpected error -- raise an exception? - log.warning("Unexpected task_id '%s': unable to update status for email subtask of instructor task %d", - current_task_id, entry_id) - pass + # unexpected error -- raise an exception + format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'" + msg = format_str.format(current_task_id, entry_id) + log.warning(msg) + raise ValueError(msg) subtask_status[current_task_id] = status - # now update the parent task progress + + # Update the parent task progress task_progress = json.loads(entry.task_output) start_time = task_progress['start_time'] task_progress['duration_ms'] = int((time() - start_time) * 1000) if subtask_result is not None: for statname in ['attempted', 'succeeded', 'failed', 'skipped']: task_progress[statname] += subtask_result[statname] - # now figure out if we're actually done (i.e. this is the last task to complete) - # (This might be easier by just maintaining a counter, rather than scanning the - # entire subtask_status dict.) + + # Figure out if we're actually done (i.e. this is the last task to complete). + # This is easier if we just maintain a counter, rather than scanning the + # entire subtask_status dict. if status == SUCCESS: subtask_dict['succeeded'] += 1 else: subtask_dict['failed'] += 1 num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed'] + # If we're done with the last task, update the parent status to indicate that: if num_remaining <= 0: - # we're done with the last task: update the parent status to indicate that: entry.task_state = SUCCESS entry.subtasks = json.dumps(subtask_dict) entry.task_output = InstructorTask.create_output_for_success(task_progress) log.info("Task output updated to %s for email subtask %s of instructor task %d", entry.task_output, current_task_id, entry_id) - + # TODO: temporary -- switch to debug log.info("about to save....") entry.save() except: log.exception("Unexpected error while updating InstructorTask.") transaction.rollback() else: + # TODO: temporary -- switch to debug log.info("about to commit....") transaction.commit() @task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102 -def send_course_email(entry_id, email_id, to_list, global_email_context, throttle=False): +def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_progress): """ Takes a primary id for a CourseEmail object and a 'to_list' of recipient objects--keys are 'profile__name', 'email' (address), and 'pk' (in the user table). @@ -276,49 +284,64 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, throttl # Get entry here, as a sanity check that it actually exists. We won't actually do anything # with it right away. InstructorTask.objects.get(pk=entry_id) + + # Get information from current task's request: current_task_id = _get_current_task().request.id + retry_index = _get_current_task().request.retries log.info("Preparing to send email as subtask %s for instructor task %d", current_task_id, entry_id) try: course_title = global_email_context['course_title'] + course_email_result_value = None with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]): - course_email_result = _send_course_email(email_id, to_list, global_email_context, throttle) + course_email_result_value = _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index) # Assume that if we get here without a raise, the task was successful. # Update the InstructorTask object that is storing its progress. - _update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result) + _update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value) except Exception: # try to write out the failure to the entry before failing _, exception, traceback = exc_info() traceback_string = format_exc(traceback) if traceback is not None else '' log.warning("background task (%s) failed: %s %s", current_task_id, exception, traceback_string) - _update_subtask_status(entry_id, current_task_id, FAILURE, None) + _update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress) raise - return course_email_result + return course_email_result_value -def _send_course_email(email_id, to_list, global_email_context, throttle): +def _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index): """ Performs the email sending task. """ + throttle = retry_index > 0 + try: course_email = CourseEmail.objects.get(id=email_id) except CourseEmail.DoesNotExist: log.exception("Could not find email id:{} to send.".format(email_id)) raise - # exclude optouts - optouts = (Optout.objects.filter(course_id=course_email.course_id, - user__in=[i['pk'] for i in to_list]) - .values_list('user__email', flat=True)) + # exclude optouts (if not a retry): + # Note that we don't have to do the optout logic at all if this is a retry, + # because we have presumably already performed the optout logic on the first + # attempt. Anyone on the to_list on a retry has already passed the filter + # that existed at that time, and we don't need to keep checking for changes + # in the Optout list. + num_optout = 0 + if retry_index == 0: + optouts = (Optout.objects.filter(course_id=course_email.course_id, + user__in=[i['pk'] for i in to_list]) + .values_list('user__email', flat=True)) - optouts = set(optouts) - num_optout = len(optouts) - - to_list = [recipient for recipient in to_list if recipient['email'] not in optouts] + optouts = set(optouts) + # Only count the num_optout for the first time the optouts are calculated. + # We assume that the number will not change on retries, and so we don't need + # to calculate it each time. + num_optout = len(optouts) + to_list = [recipient for recipient in to_list if recipient['email'] not in optouts] course_title = global_email_context['course_title'] subject = "[" + course_title + "] " + course_email.subject @@ -336,11 +359,11 @@ def _send_course_email(email_id, to_list, global_email_context, throttle): course_email_template = CourseEmailTemplate.get_template() + num_sent = 0 + num_error = 0 try: connection = get_connection() connection.open() - num_sent = 0 - num_error = 0 # Define context values to use in all course emails: email_context = { @@ -370,7 +393,7 @@ def _send_course_email(email_id, to_list, global_email_context, throttle): email_msg.attach_alternative(html_msg, 'text/html') # Throttle if we tried a few times and got the rate limiter - if throttle or current_task.request.retries > 0: + if throttle: sleep(0.2) try: @@ -398,14 +421,11 @@ def _send_course_email(email_id, to_list, global_email_context, throttle): to_list.pop() - connection.close() - # TODO: figure out how to get (or persist) real statistics for this task, so that reflects progress - # made over multiple retries. - return course_email_result(num_sent, num_error, num_optout) - except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc: # Error caught here cause the email to be retried. The entire task is actually retried without popping the list # Reasoning is that all of these errors may be temporary condition. + # TODO: figure out what this means. Presumably we have popped the list with those that have succeeded + # and failed, rather than those needing a later retry. log.warning('Email with id %d not delivered due to temporary error %s, retrying send to %d recipients', email_id, exc, len(to_list)) raise send_course_email.retry( @@ -413,10 +433,10 @@ def _send_course_email(email_id, to_list, global_email_context, throttle): email_id, to_list, global_email_context, - current_task.request.retries > 0 + _course_email_result(subtask_progress, num_sent, num_error, num_optout), ], exc=exc, - countdown=(2 ** current_task.request.retries) * 15 + countdown=(2 ** retry_index) * 15 ) except: log.exception('Email with id %d caused send_course_email task to fail with uncaught exception. To list: %s', @@ -425,12 +445,22 @@ def _send_course_email(email_id, to_list, global_email_context, throttle): # Close the connection before we exit connection.close() raise + else: + connection.close() + # Add current progress to any progress stemming from previous retries: + return _course_email_result(subtask_progress, num_sent, num_error, num_optout) -def course_email_result(num_sent, num_error, num_optout): +def _course_email_result(previous_result, new_num_sent, new_num_error, new_num_optout): """Return the result of course_email sending as a dict (not a string).""" - attempted = num_sent + num_error - return {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error} + attempted = new_num_sent + new_num_error + current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error} + # add in any previous results: + if previous_result is not None: + for keyname in current_result: + if keyname in previous_result: + current_result[keyname] += previous_result[keyname] + return current_result def _statsd_tag(course_title): diff --git a/lms/djangoapps/bulk_email/tests/test_email.py b/lms/djangoapps/bulk_email/tests/test_email.py index e3cfc5bdc2..dc5b6d61ee 100644 --- a/lms/djangoapps/bulk_email/tests/test_email.py +++ b/lms/djangoapps/bulk_email/tests/test_email.py @@ -34,7 +34,7 @@ class MockCourseEmailResult(object): def get_mock_course_email_result(self): """Wrapper for mock email function.""" - def mock_course_email_result(sent, failed, output, **kwargs): # pylint: disable=W0613 + def mock_course_email_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613 """Increments count of number of emails sent.""" self.emails_sent += sent return True @@ -247,7 +247,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase): ) @override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7) - @patch('bulk_email.tasks.course_email_result') + @patch('bulk_email.tasks._course_email_result') def test_chunked_queries_send_numerous_emails(self, email_mock): """ Test sending a large number of emails, to test the chunked querying @@ -304,4 +304,3 @@ class TestEmailSendExceptions(ModuleStoreTestCase): entry = InstructorTaskFactory.create(task_key='', task_id='dummy') with self.assertRaises(CourseEmail.DoesNotExist): send_course_email(entry.id, 101, [], {'course_title': 'Test'}, False) - diff --git a/lms/djangoapps/instructor_task/api_helper.py b/lms/djangoapps/instructor_task/api_helper.py index 4da7792621..0e9a91263e 100644 --- a/lms/djangoapps/instructor_task/api_helper.py +++ b/lms/djangoapps/instructor_task/api_helper.py @@ -271,4 +271,4 @@ def submit_task(request, task_type, task_class, course_id, task_input, task_key) task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)] task_class.apply_async(task_args, task_id=task_id) - return instructor_task \ No newline at end of file + return instructor_task diff --git a/lms/djangoapps/instructor_task/tasks.py b/lms/djangoapps/instructor_task/tasks.py index 1e15eff731..fb15c5fe8d 100644 --- a/lms/djangoapps/instructor_task/tasks.py +++ b/lms/djangoapps/instructor_task/tasks.py @@ -23,7 +23,6 @@ from celery import task from functools import partial from instructor_task.tasks_helper import (run_main_task, perform_module_state_update, - # perform_delegate_email_batches, rescore_problem_module_state, reset_attempts_module_state, delete_problem_module_state, @@ -52,7 +51,10 @@ def rescore_problem(entry_id, xmodule_instance_args): """ action_name = 'rescored' update_fcn = partial(rescore_problem_module_state, xmodule_instance_args) - filter_fcn = lambda(modules_to_update): modules_to_update.filter(state__contains='"done": true') + + def filter_fcn(modules_to_update): + return modules_to_update.filter(state__contains='"done": true') + visit_fcn = partial(perform_module_state_update, update_fcn, filter_fcn) return run_main_task(entry_id, visit_fcn, action_name) diff --git a/lms/djangoapps/instructor_task/tasks_helper.py b/lms/djangoapps/instructor_task/tasks_helper.py index ed85271e07..a4d3a08f8d 100644 --- a/lms/djangoapps/instructor_task/tasks_helper.py +++ b/lms/djangoapps/instructor_task/tasks_helper.py @@ -52,7 +52,7 @@ def _get_current_task(): return current_task -def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, task_input, action_name): +def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, task_input, action_name): """ Performs generic update by visiting StudentModule instances with the update_fcn provided. @@ -76,7 +76,7 @@ def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, tas 'succeeded': number of attempts that "succeeded" 'skipped': number of attempts that "skipped" 'failed': number of attempts that "failed" - 'total': number of possible subtasks to attempt + 'total': number of possible updates to attempt 'action_name': user-visible verb to use in status messages. Should be past-tense. Pass-through of input `action_name`. 'duration_ms': how long the task has (or had) been running. diff --git a/lms/djangoapps/instructor_task/tests/test_tasks.py b/lms/djangoapps/instructor_task/tests/test_tasks.py index a475020c4d..9c8f2768b9 100644 --- a/lms/djangoapps/instructor_task/tests/test_tasks.py +++ b/lms/djangoapps/instructor_task/tests/test_tasks.py @@ -23,7 +23,7 @@ from instructor_task.models import InstructorTask from instructor_task.tests.test_base import InstructorTaskModuleTestCase from instructor_task.tests.factories import InstructorTaskFactory from instructor_task.tasks import rescore_problem, reset_problem_attempts, delete_problem_state -from instructor_task.tasks_helper import UpdateProblemModuleStateError #, update_problem_module_state +from instructor_task.tasks_helper import UpdateProblemModuleStateError PROBLEM_URL_NAME = "test_urlname"