diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 3e62eadb65..6c1d517b7d 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -36,8 +36,8 @@ from courseware.courses import get_course_by_id, course_image_url from instructor_task.models import InstructorTask from instructor_task.subtasks import ( update_subtask_status, - create_subtask_result, - increment_subtask_result, + create_subtask_status, + increment_subtask_status, update_instructor_task_for_subtasks, ) @@ -54,7 +54,7 @@ QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, ) # Errors that mail is being sent too quickly. When caught by a task, it # triggers an exponential backoff and retry. Retries happen continuously until # the email is sent. -SENDING_RATE_ERRORS = (SESMaxSendingRateExceededError, ) +INFINITE_RETRY_ERRORS = (SESMaxSendingRateExceededError, ) def _get_recipient_queryset(user_id, to_option, course_id, course_location): @@ -120,6 +120,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) # get inputs to use in this task from the entry: #task_id = entry.task_id user_id = entry.requester.id + task_id = entry.task_id # TODO: check this against argument passed in? # course_id = entry.course_id @@ -132,7 +133,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) # is submitted and reaches this point. It is possible to add retry behavior here, # to keep trying until the object is actually committed by the view function's return, # but it's cleaner to just expect to be done. - log.warning("Failed to get CourseEmail with id %s", email_id) + log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id) raise to_option = email_obj.to_option @@ -144,26 +145,25 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) try: course = get_course_by_id(course_id, depth=1) except Http404 as exc: - log.exception("get_course_by_id failed: %s", exc.args[0]) + log.exception("Task %s: get_course_by_id failed: %s", task_id, exc.args[0]) raise Exception("get_course_by_id failed: " + exc.args[0]) global_email_context = _get_course_email_context(course) recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) total_num_emails = recipient_qset.count() - log.info("Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s", - total_num_emails, course_id, email_id, to_option) + log.info("Task %s: Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s", + task_id, total_num_emails, course_id, email_id, to_option) num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY))) last_pk = recipient_qset[0].pk - 1 - num_workers = 0 + num_emails_queued = 0 task_list = [] subtask_id_list = [] for _ in range(num_queries): # Note that if we were doing this for regrading we probably only need 'pk', and not # either profile__name or email. That's because we'll have to do # a lot more work in the individual regrade for each user, but using user_id as a key. - # TODO: figure out how to pass these values as an argument, when refactoring this code. recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk) .values('profile__name', 'email', 'pk')[:settings.EMAILS_PER_QUERY]) last_pk = recipient_sublist[-1]['pk'] @@ -179,22 +179,32 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) to_list = recipient_sublist[i * chunk:i * chunk + chunk] subtask_id = str(uuid4()) subtask_id_list.append(subtask_id) + retry_progress = create_subtask_status() task_list.append(send_course_email.subtask(( entry_id, email_id, to_list, global_email_context, + retry_progress, ), task_id=subtask_id, routing_key=settings.HIGH_PRIORITY_QUEUE, )) - num_workers += num_tasks_this_query + num_emails_queued += num_emails_this_query + + # Sanity check: we expect the chunking to be properly summing to the original count: + if num_emails_queued != total_num_emails: + error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format( + task_id, num_emails_queued, total_num_emails + ) + log.error(error_msg) + raise Exception(error_msg) # Update the InstructorTask with information about the subtasks we've defined. progress = update_instructor_task_for_subtasks(entry, action_name, total_num_emails, subtask_id_list) num_subtasks = len(subtask_id_list) - log.info("Preparing to queue %d email tasks for course %s, email %s, to %s", - num_subtasks, course_id, email_id, to_option) + log.info("Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s", + num_subtasks, total_num_emails, course_id, email_id, to_option) # now group the subtasks, and start them running: task_group = group(task_list) @@ -202,9 +212,9 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) # We want to return progress here, as this is what will be stored in the # AsyncResult for the parent task as its return value. - # The AsyncResult will then be marked as SUCCEEDED, and have this return value as it's "result". + # The AsyncResult will then be marked as SUCCEEDED, and have this return value as its "result". # That's okay, for the InstructorTask will have the "real" status, and monitoring code - # will use that instead. + # should be using that instead. return progress @@ -215,7 +225,7 @@ def _get_current_task(): @task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102 -def send_course_email(entry_id, email_id, to_list, global_email_context): +def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status): """ Sends an email to a list of recipients. @@ -242,19 +252,20 @@ def send_course_email(entry_id, email_id, to_list, global_email_context): # Get information from current task's request: current_task_id = _get_current_task().request.id num_to_send = len(to_list) - log.info("Preparing to send %s emails as subtask %s for instructor task %d: request = %s", - num_to_send, current_task_id, entry_id, _get_current_task().request) + log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s", + email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status) send_exception = None - course_email_result_value = None + new_subtask_status = None try: course_title = global_email_context['course_title'] with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]): - course_email_result_value, send_exception = _send_course_email( + new_subtask_status, send_exception = _send_course_email( entry_id, email_id, to_list, global_email_context, + subtask_status, ) except Exception: # Unexpected exception. Try to write out the failure to the entry before failing @@ -264,28 +275,30 @@ def send_course_email(entry_id, email_id, to_list, global_email_context): # We got here for really unexpected reasons. Since we don't know how far # the task got in emailing, we count all recipients as having failed. # It at least keeps the counts consistent. - course_email_result_value = create_subtask_result(0, num_to_send, 0) + new_subtask_status = increment_subtask_status(subtask_status, failed=num_to_send, state=FAILURE) + update_subtask_status(entry_id, current_task_id, new_subtask_status) + raise send_exception if send_exception is None: # Update the InstructorTask object that is storing its progress. log.info("background task (%s) succeeded", current_task_id) - update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value) + update_subtask_status(entry_id, current_task_id, new_subtask_status) elif isinstance(send_exception, RetryTaskError): # If retrying, record the progress made before the retry condition # was encountered. Once the retry is running, it will be only processing # what wasn't already accomplished. log.warning("background task (%s) being retried", current_task_id) - update_subtask_status(entry_id, current_task_id, RETRY, course_email_result_value) + update_subtask_status(entry_id, current_task_id, new_subtask_status) raise send_exception else: log.error("background task (%s) failed: %s", current_task_id, send_exception) - update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value) + update_subtask_status(entry_id, current_task_id, new_subtask_status) raise send_exception - return course_email_result_value + return new_subtask_status -def _send_course_email(entry_id, email_id, to_list, global_email_context): +def _send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status): """ Performs the email sending task. @@ -312,6 +325,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): 'skipped': number of emails skipped (due to optout) 'failed': number of emails not sent because of some failure + The dict may also contain information about retries. + * Second value is an exception returned by the innards of the method, indicating a fatal error. In this case, the number of recipients that were not sent have already been added to the 'failed' count above. @@ -319,6 +334,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): # Get information from current task's request: task_id = _get_current_task().request.id retry_index = _get_current_task().request.retries + + # If this is a second attempt, then throttle the speed at which mail is sent: throttle = retry_index > 0 num_optout = 0 @@ -400,6 +417,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): log.info('Email with id %s sent to %s', email_id, email) num_sent += 1 + except SMTPDataError as exc: # According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure. if exc.smtp_code >= 400 and exc.smtp_code < 500: @@ -414,8 +432,15 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): # Pop the user that was emailed off the end of the list: to_list.pop() - except SENDING_RATE_ERRORS as exc: - subtask_progress = create_subtask_result(num_sent, num_error, num_optout) + except INFINITE_RETRY_ERRORS as exc: + subtask_progress = increment_subtask_status( + subtask_status, + succeeded=num_sent, + failed=num_error, + skipped=num_optout, + retriedA=1, + state=RETRY + ) return _submit_for_retry( entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True ) @@ -424,7 +449,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): # Errors caught here cause the email to be retried. The entire task is actually retried # without popping the current recipient off of the existing list. # Errors caught are those that indicate a temporary condition that might succeed on retry. - subtask_progress = create_subtask_result(num_sent, num_error, num_optout) + subtask_progress = increment_subtask_status( + subtask_status, + succeeded=num_sent, + failed=num_error, + skipped=num_optout, + retriedB=1, + state=RETRY + ) return _submit_for_retry( entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False ) @@ -444,10 +476,24 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s', task_id, email_id, [i['email'] for i in to_list]) num_error += len(to_list) - return create_subtask_result(num_sent, num_error, num_optout), exc + subtask_progress = increment_subtask_status( + subtask_status, + succeeded=num_sent, + failed=num_error, + skipped=num_optout, + state=FAILURE + ) + return subtask_progress, exc else: # Successful completion is marked by an exception value of None: - return create_subtask_result(num_sent, num_error, num_optout), None + subtask_progress = increment_subtask_status( + subtask_status, + succeeded=num_sent, + failed=num_error, + skipped=num_optout, + state=SUCCESS + ) + return subtask_progress, None finally: # clean up at the end connection.close() @@ -476,16 +522,24 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current task_id = _get_current_task().request.id retry_index = _get_current_task().request.retries - log.warning('Task %s: email with id %d not delivered due to temporary error %s, retrying send to %d recipients', - task_id, email_id, current_exception, len(to_list)) - - # Don't resend emails that have already succeeded. - # Retry the email at increasing exponential backoff. + log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)", + current_task.request.id, subtask_progress['succeeded'], subtask_progress['failed'], subtask_progress['skipped']) + # Calculate time until we retry this task (in seconds): if is_sending_rate_error: - countdown = ((2 ** retry_index) * 15) * random.uniform(.5, 1.5) + exp = min(retry_index, 5) + countdown = ((2 ** exp) * 15) * random.uniform(.5, 1.25) + exception_type = 'sending-rate' else: countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5) + exception_type = 'transient' + + # max_retries is increased by the number of times an "infinite-retry" exception + # has been retried. We want the regular retries to trigger a retry, but not these + # special retries. So we count them separately. + max_retries = _get_current_task().max_retries + subtask_progress['retriedA'] + log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients (with max_retry=%s)', + task_id, email_id, exception_type, current_exception, len(to_list), max_retries) try: send_course_email.retry( @@ -494,9 +548,11 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current email_id, to_list, global_email_context, + subtask_progress, ], exc=current_exception, countdown=countdown, + max_retries=max_retries, throw=True, ) except RetryTaskError as retry_error: @@ -513,7 +569,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s', task_id, email_id, [i['email'] for i in to_list]) num_failed = len(to_list) - new_subtask_progress = increment_subtask_result(subtask_progress, 0, num_failed, 0) + new_subtask_progress = increment_subtask_status(subtask_progress, failed=num_failed, state=FAILURE) return new_subtask_progress, retry_exc diff --git a/lms/djangoapps/bulk_email/tests/test_email.py b/lms/djangoapps/bulk_email/tests/test_email.py index 787b623a81..bc5b448f78 100644 --- a/lms/djangoapps/bulk_email/tests/test_email.py +++ b/lms/djangoapps/bulk_email/tests/test_email.py @@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory from bulk_email.models import Optout -from instructor_task.subtasks import create_subtask_result +from instructor_task.subtasks import increment_subtask_status STAFF_COUNT = 3 STUDENT_COUNT = 10 @@ -29,13 +29,13 @@ class MockCourseEmailResult(object): """ emails_sent = 0 - def get_mock_create_subtask_result(self): + def get_mock_increment_subtask_status(self): """Wrapper for mock email function.""" - def mock_create_subtask_result(sent, failed, output, **kwargs): # pylint: disable=W0613 + def mock_increment_subtask_status(original_status, **kwargs): # pylint: disable=W0613 """Increments count of number of emails sent.""" - self.emails_sent += sent - return create_subtask_result(sent, failed, output) - return mock_create_subtask_result + self.emails_sent += kwargs['succeeded'] + return increment_subtask_status(original_status, **kwargs) + return mock_increment_subtask_status @override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE) @@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase): ) @override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7) - @patch('bulk_email.tasks.create_subtask_result') + @patch('bulk_email.tasks.increment_subtask_status') def test_chunked_queries_send_numerous_emails(self, email_mock): """ Test sending a large number of emails, to test the chunked querying """ mock_factory = MockCourseEmailResult() - email_mock.side_effect = mock_factory.get_mock_create_subtask_result() + email_mock.side_effect = mock_factory.get_mock_increment_subtask_status() added_users = [] for _ in xrange(LARGE_NUM_EMAILS): user = UserFactory() diff --git a/lms/djangoapps/bulk_email/tests/test_err_handling.py b/lms/djangoapps/bulk_email/tests/test_err_handling.py index 24fb4218a6..540b81baa3 100644 --- a/lms/djangoapps/bulk_email/tests/test_err_handling.py +++ b/lms/djangoapps/bulk_email/tests/test_err_handling.py @@ -67,7 +67,7 @@ class TestEmailErrors(ModuleStoreTestCase): self.assertTrue(type(exc) == SMTPDataError) @patch('bulk_email.tasks.get_connection', autospec=True) - @patch('bulk_email.tasks.create_subtask_result') + @patch('bulk_email.tasks.increment_subtask_status') @patch('bulk_email.tasks.send_course_email.retry') def test_data_err_fail(self, retry, result, get_conn): """ @@ -91,11 +91,11 @@ class TestEmailErrors(ModuleStoreTestCase): # We shouldn't retry when hitting a 5xx error self.assertFalse(retry.called) # Test that after the rejected email, the rest still successfully send - ((sent, fail, optouts), _) = result.call_args - self.assertEquals(optouts, 0) + ((_initial_results), kwargs) = result.call_args + self.assertEquals(kwargs['skipped'], 0) expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0) - self.assertEquals(fail, expectedNumFails) - self.assertEquals(sent, settings.EMAILS_PER_TASK - expectedNumFails) + self.assertEquals(kwargs['failed'], expectedNumFails) + self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expectedNumFails) @patch('bulk_email.tasks.get_connection', autospec=True) @patch('bulk_email.tasks.send_course_email.retry') @@ -138,7 +138,7 @@ class TestEmailErrors(ModuleStoreTestCase): exc = kwargs['exc'] self.assertTrue(type(exc) == SMTPConnectError) - @patch('bulk_email.tasks.create_subtask_result') + @patch('bulk_email.tasks.increment_subtask_status') @patch('bulk_email.tasks.send_course_email.retry') @patch('bulk_email.tasks.log') @patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException)) @@ -163,12 +163,13 @@ class TestEmailErrors(ModuleStoreTestCase): self.assertFalse(retry.called) # check the results being returned self.assertTrue(result.called) - ((sent, fail, optouts), _) = result.call_args - self.assertEquals(optouts, 0) - self.assertEquals(fail, 1) # just myself - self.assertEquals(sent, 0) + ((initial_results, ), kwargs) = result.call_args + self.assertEquals(initial_results['skipped'], 0) + self.assertEquals(initial_results['failed'], 0) + self.assertEquals(initial_results['succeeded'], 0) + self.assertEquals(kwargs['failed'], 1) - @patch('bulk_email.tasks.create_subtask_result') + @patch('bulk_email.tasks.increment_subtask_status') @patch('bulk_email.tasks.log') def test_nonexist_email(self, mock_log, result): """ @@ -180,7 +181,7 @@ class TestEmailErrors(ModuleStoreTestCase): task_input = {"email_id": -1} with self.assertRaises(CourseEmail.DoesNotExist): perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") - ((log_str, email_id), _) = mock_log.warning.call_args + ((log_str, _, email_id), _) = mock_log.warning.call_args self.assertTrue(mock_log.warning.called) self.assertIn('Failed to get CourseEmail with id', log_str) self.assertEqual(email_id, -1) @@ -198,7 +199,7 @@ class TestEmailErrors(ModuleStoreTestCase): task_input = {"email_id": email.id} with self.assertRaises(Exception): perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") - ((log_str, _), _) = mock_log.exception.call_args + ((log_str, _, _), _) = mock_log.exception.call_args self.assertTrue(mock_log.exception.called) self.assertIn('get_course_by_id failed:', log_str) diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index f303b1ce6e..f8a0bd08f9 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -5,7 +5,7 @@ from time import time import json from celery.utils.log import get_task_logger -from celery.states import SUCCESS, RETRY +from celery.states import SUCCESS, RETRY, READY_STATES from django.db import transaction @@ -14,29 +14,51 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING TASK_LOG = get_task_logger(__name__) -def create_subtask_result(num_sent, num_error, num_optout): +def create_subtask_status(succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None): """ - Create a result of a subtask. + Create a dict for tracking the status of a subtask. - Keys are: 'attempted', 'succeeded', 'skipped', 'failed'. + Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'. +TODO: update + Object must be JSON-serializable, so that it can be passed as an argument + to tasks. - Object must be JSON-serializable. + TODO: decide if in future we want to include specific error information + indicating the reason for failure. + Also, we should count up "not attempted" separately from + attempted/failed. """ - attempted = num_sent + num_error - current_result = {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error} + attempted = succeeded + failed + current_result = { + 'attempted': attempted, + 'succeeded': succeeded, + 'pending': pending, + 'skipped': skipped, + 'failed': failed, + 'retriedA': retriedA, + 'retriedB': retriedB, + 'state': state if state is not None else QUEUING, + } return current_result -def increment_subtask_result(subtask_result, new_num_sent, new_num_error, new_num_optout): +def increment_subtask_status(subtask_result, succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None): """ Update the result of a subtask with additional results. - Keys are: 'attempted', 'succeeded', 'skipped', 'failed'. + Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'. """ - new_result = create_subtask_result(new_num_sent, new_num_error, new_num_optout) + # TODO: rewrite this if we have additional fields added to original subtask_result, + # that are not part of the increment. Tradeoff on duplicating the 'attempts' logic. + new_result = create_subtask_status(succeeded, failed, pending, skipped, retriedA, retriedB, state) for keyname in new_result: - if keyname in subtask_result: + if keyname == 'state': + # does not get incremented. If no new value, copy old value: + if state is None: + new_result[keyname] = subtask_result[keyname] + elif keyname in subtask_result: new_result[keyname] += subtask_result[keyname] + return new_result @@ -49,7 +71,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations, and the total number of "things to do" is set, so the user can be told how much needs to be done overall. The `action_name` is also stored, to also help with constructing more readable - progress messages. + task_progress messages. The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict. Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of @@ -70,7 +92,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i rely on the status stored in the InstructorTask object, rather than status stored in the corresponding AsyncResult. """ - progress = { + # TODO: also add 'pending' count here? (Even though it's total-attempted-skipped + task_progress = { 'action_name': action_name, 'attempted': 0, 'failed': 0, @@ -80,22 +103,33 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i 'duration_ms': int(0), 'start_time': time() } - entry.task_output = InstructorTask.create_output_for_success(progress) + entry.task_output = InstructorTask.create_output_for_success(task_progress) entry.task_state = PROGRESS # Write out the subtasks information. num_subtasks = len(subtask_id_list) - subtask_status = dict.fromkeys(subtask_id_list, QUEUING) - subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'retried': 0, 'status': subtask_status} + # using fromkeys to initialize uses a single value. we need the value + # to be distinct, since it's now a dict: + # subtask_status = dict.fromkeys(subtask_id_list, QUEUING) + # TODO: may not be necessary to store initial value with all those zeroes! + # Instead, use a placemarker.... + subtask_status = {subtask_id: create_subtask_status() for subtask_id in subtask_id_list} + subtask_dict = { + 'total': num_subtasks, + 'succeeded': 0, + 'failed': 0, + 'retried': 0, + 'status': subtask_status + } entry.subtasks = json.dumps(subtask_dict) # and save the entry immediately, before any subtasks actually start work: entry.save_now() - return progress + return task_progress @transaction.commit_manually -def update_subtask_status(entry_id, current_task_id, status, subtask_result): +def update_subtask_status(entry_id, current_task_id, subtask_status): """ Update the status of the subtask in the parent InstructorTask object tracking its progress. @@ -104,7 +138,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result): committed on completion, or rolled back on error. The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict. - Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_result` + Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_progress` into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms' value with the current interval since the original InstructorTask started. @@ -121,7 +155,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result): messages, progress made, etc. """ TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s", - current_task_id, entry_id, subtask_result) + current_task_id, entry_id, subtask_status) try: entry = InstructorTask.objects.select_for_update().get(pk=entry_id) @@ -140,28 +174,38 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result): # ultimate status to be clobbered by the "earlier" updates. This # should not be a problem in normal (non-eager) processing. old_status = subtask_status[current_task_id] - if status != RETRY or old_status == QUEUING: - subtask_status[current_task_id] = status + # TODO: check this logic... + state = subtask_status['state'] +# if state != RETRY or old_status['status'] == QUEUING: + # instead replace the status only if it's 'newer' + # i.e. has fewer pending + if subtask_status['pending'] <= old_status['pending']: + subtask_status[current_task_id] = subtask_status # Update the parent task progress task_progress = json.loads(entry.task_output) start_time = task_progress['start_time'] task_progress['duration_ms'] = int((time() - start_time) * 1000) - if subtask_result is not None: + # change behavior so we don't update on progress now: + # TODO: figure out if we can make this more responsive later, + # by figuring out how to handle retries better. + if subtask_status is not None and state in READY_STATES: for statname in ['attempted', 'succeeded', 'failed', 'skipped']: - task_progress[statname] += subtask_result[statname] + task_progress[statname] += subtask_status[statname] # Figure out if we're actually done (i.e. this is the last task to complete). # This is easier if we just maintain a counter, rather than scanning the # entire subtask_status dict. - if status == SUCCESS: + if state == SUCCESS: subtask_dict['succeeded'] += 1 - elif status == RETRY: + elif state == RETRY: subtask_dict['retried'] += 1 else: subtask_dict['failed'] += 1 num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed'] # If we're done with the last task, update the parent status to indicate that: + # TODO: see if there was a catastrophic failure that occurred, and figure out + # how to report that here. if num_remaining <= 0: entry.task_state = SUCCESS entry.subtasks = json.dumps(subtask_dict)