diff --git a/lms/djangoapps/courseware/task_queue.py b/lms/djangoapps/courseware/task_queue.py index b408dacdc6..b42abd84d2 100644 --- a/lms/djangoapps/courseware/task_queue.py +++ b/lms/djangoapps/courseware/task_queue.py @@ -111,7 +111,12 @@ def _update_task(course_task_log, task_result): Autocommit annotation makes sure the database entry is committed. """ + # we at least update the entry with the task_id, and for EAGER mode, + # we update other status as well. (For non-EAGER modes, the entry + # should not have changed except for setting PENDING state and the + # addition of the task_id.) _update_course_task_log(course_task_log, task_result) + course_task_log.save() def _get_xmodule_instance_args(request): @@ -169,27 +174,31 @@ def _update_course_task_log(course_task_log_entry, task_result): output['task_progress'] = returned_result elif result_state == 'SUCCESS': - output['task_progress'] = returned_result + # save progress into the entry, even if it's not being saved here -- for EAGER, + # it needs to go back with the entry passed in. course_task_log_entry.task_progress = json.dumps(returned_result) + output['task_progress'] = returned_result log.info("task succeeded: %s", returned_result) - entry_needs_saving = True elif result_state == 'FAILURE': # on failure, the result's result contains the exception that caused the failure exception = returned_result traceback = result_traceback if result_traceback is not None else '' - entry_needs_saving = True task_progress = {'exception': type(exception).__name__, 'message': str(exception.message)} output['message'] = exception.message log.warning("background task (%s) failed: %s %s", task_id, returned_result, traceback) if result_traceback is not None: output['task_traceback'] = result_traceback task_progress['traceback'] = result_traceback + # save progress into the entry, even if it's not being saved -- for EAGER, + # it needs to go back with the entry passed in. course_task_log_entry.task_progress = json.dumps(task_progress) output['task_progress'] = task_progress elif result_state == 'REVOKED': # on revocation, the result's result doesn't contain anything + # but we cannot rely on the worker thread to set this status, + # so we set it here. entry_needs_saving = True message = 'Task revoked before running' output['message'] = message @@ -202,7 +211,6 @@ def _update_course_task_log(course_task_log_entry, task_result): if result_state != course_task_log_entry.task_state: course_task_log_entry.task_state = result_state course_task_log_entry.task_id = task_id - entry_needs_saving = True if entry_needs_saving: course_task_log_entry.save() @@ -358,7 +366,7 @@ def submit_regrade_problem_for_student(request, course_id, problem_url, student) course_task_log = _reserve_task(course_id, task_name, problem_url, request.user, student) # Submit task: - task_args = [course_id, problem_url, student.username, _get_xmodule_instance_args(request)] + task_args = [course_task_log.id, course_id, problem_url, student.username, _get_xmodule_instance_args(request)] task_result = regrade_problem_for_student.apply_async(task_args) # Update info in table with the resulting task_id (and state). @@ -387,7 +395,7 @@ def submit_regrade_problem_for_all_students(request, course_id, problem_url): course_task_log = _reserve_task(course_id, task_name, problem_url, request.user) # Submit task: - task_args = [course_id, problem_url, _get_xmodule_instance_args(request)] + task_args = [course_task_log.id, course_id, problem_url, _get_xmodule_instance_args(request)] task_result = regrade_problem_for_all_students.apply_async(task_args) # Update info in table with the resulting task_id (and state). @@ -419,7 +427,7 @@ def submit_reset_problem_attempts_for_all_students(request, course_id, problem_u course_task_log = _reserve_task(course_id, task_name, problem_url, request.user) # Submit task: - task_args = [course_id, problem_url, _get_xmodule_instance_args(request)] + task_args = [course_task_log.id, course_id, problem_url, _get_xmodule_instance_args(request)] task_result = reset_problem_attempts_for_all_students.apply_async(task_args) # Update info in table with the resulting task_id (and state). @@ -451,7 +459,7 @@ def submit_delete_problem_state_for_all_students(request, course_id, problem_url course_task_log = _reserve_task(course_id, task_name, problem_url, request.user) # Submit task: - task_args = [course_id, problem_url, _get_xmodule_instance_args(request)] + task_args = [course_task_log.id, course_id, problem_url, _get_xmodule_instance_args(request)] task_result = delete_problem_state_for_all_students.apply_async(task_args) # Update info in table with the resulting task_id (and state). diff --git a/lms/djangoapps/courseware/tasks.py b/lms/djangoapps/courseware/tasks.py index 292abc8ba8..911c6d7cd0 100644 --- a/lms/djangoapps/courseware/tasks.py +++ b/lms/djangoapps/courseware/tasks.py @@ -1,6 +1,8 @@ import json from time import time +from sys import exc_info +from traceback import format_exc from django.contrib.auth.models import User from django.db import transaction @@ -12,7 +14,7 @@ from xmodule.modulestore.django import modulestore import mitxmako.middleware as middleware from track.views import task_track -from courseware.models import StudentModule +from courseware.models import StudentModule, CourseTaskLog from courseware.model_data import ModelDataCache from courseware.module_render import get_module_for_descriptor_internal @@ -31,8 +33,8 @@ class UpdateProblemModuleStateError(Exception): pass -def _update_problem_module_state(course_id, module_state_key, student, update_fcn, action_name, filter_fcn, - xmodule_instance_args): +def _update_problem_module_state_internal(course_id, module_state_key, student, update_fcn, action_name, filter_fcn, + xmodule_instance_args): """ Performs generic update by visiting StudentModule instances with the update_fcn provided. @@ -49,18 +51,12 @@ def _update_problem_module_state(course_id, module_state_key, student, update_fc passed through. Because this is run internal to a task, it does not catch exceptions. These are allowed to pass up to the - task-running level, so that it can set the failure modes and capture the error trace in the result object. + next level, so that it can set the failure modes and capture the error trace in the CourseTaskLog and the + result object. """ - task_id = current_task.request.id - fmt = 'Starting to update problem modules as task "{task_id}": course "{course_id}" problem "{state_key}": nothing {action} yet' - task_log.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, action=action_name)) - # get start time for task: start_time = time() - # add task_id to xmodule_instance_args, so that it can be output with tracking info: - xmodule_instance_args['task_id'] = task_id - # Hack to get mako templates to work on celery worker server's worker thread. # The initialization of Mako templating is usually done when Django is # initializing middleware packages as part of processing a server request. @@ -119,14 +115,74 @@ def _update_problem_module_state(course_id, module_state_key, student, update_fc current_task.update_state(state='PROGRESS', meta=get_task_progress()) task_progress = get_task_progress() + # update progress without updating the state current_task.update_state(state='PROGRESS', meta=task_progress) + return task_progress + +@transaction.autocommit +def _save_course_task_log_entry(entry): + """Writes CourseTaskLog entry immediately.""" + entry.save() + + +def _update_problem_module_state(entry_id, course_id, module_state_key, student, update_fcn, action_name, filter_fcn, + xmodule_instance_args): + """ + Performs generic update by visiting StudentModule instances with the update_fcn provided. + + See _update_problem_module_state_internal function for more details on arguments. + + The `entry_id` is the primary key for the CourseTaskLog entry representing the task. This function + updates the entry on SUCCESS and FAILURE of the _update_problem_module_state_internal function it + wraps. + + Once exceptions are caught and recorded in the CourseTaskLog entry, they are allowed to pass up to the + task-running level, so that it can also set the failure modes and capture the error trace in the result object. + """ + task_id = current_task.request.id + fmt = 'Starting to update problem modules as task "{task_id}": course "{course_id}" problem "{state_key}": nothing {action} yet' + task_log.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, action=action_name)) + + # get the CourseTaskLog to be updated. If this fails, then let the exception return to Celery. + # There's no point in catching it here. + entry = CourseTaskLog.objects.get(pk=entry_id) + + # add task_id to xmodule_instance_args, so that it can be output with tracking info: + xmodule_instance_args['task_id'] = task_id + entry.task_id = task_id + _save_course_task_log_entry(entry) + + # now that we have an entry we can try to catch failures: + task_progress = None + try: + task_progress = _update_problem_module_state_internal(course_id, module_state_key, student, update_fcn, + action_name, filter_fcn, xmodule_instance_args) + except Exception: + # try to write out the failure to the entry before failing + exception_type, exception, traceback = exc_info() + traceback_string = format_exc(traceback) if traceback is not None else '' + task_progress = {'exception': exception_type.__name__, 'message': str(exception.message)} + task_log.warning("background task (%s) failed: %s %s", task_id, exception, traceback_string) + if traceback is not None: + task_progress['traceback'] = traceback_string + entry.task_progress = json.dumps(task_progress) + entry.task_state = 'FAILURE' + _save_course_task_log_entry(entry) + raise + + # if we get here, we assume we've succeeded, so update the CourseTaskLog entry in anticipation: + entry.task_progress = json.dumps(task_progress) + entry.task_state = 'SUCCESS' + _save_course_task_log_entry(entry) + + # log and exit, returning task_progress info as task result: fmt = 'Finishing task "{task_id}": course "{course_id}" problem "{state_key}": final: {progress}' task_log.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, progress=task_progress)) return task_progress -def _update_problem_module_state_for_student(course_id, problem_url, student_identifier, +def _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_identifier, update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None): """ Update the StudentModule for a given student. See _update_problem_module_state(). @@ -139,7 +195,7 @@ def _update_problem_module_state_for_student(course_id, problem_url, student_ide student_to_update = User.objects.get(email=student_identifier) elif student_identifier is not None: student_to_update = User.objects.get(username=student_identifier) - return _update_problem_module_state(course_id, problem_url, student_to_update, update_fcn, + return _update_problem_module_state(entry_id, course_id, problem_url, student_to_update, update_fcn, action_name, filter_fcn, xmodule_instance_args) except User.DoesNotExist: msg = "Couldn't find student with that email or username." @@ -147,11 +203,11 @@ def _update_problem_module_state_for_student(course_id, problem_url, student_ide return (success, msg) -def _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None): +def _update_problem_module_state_for_all_students(entry_id, course_id, problem_url, update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None): """ Update the StudentModule for all students. See _update_problem_module_state(). """ - return _update_problem_module_state(course_id, problem_url, None, update_fcn, action_name, filter_fcn, xmodule_instance_args) + return _update_problem_module_state(entry_id, course_id, problem_url, None, update_fcn, action_name, filter_fcn, xmodule_instance_args) def _get_module_instance_for_task(course_id, student, module_descriptor, module_state_key, xmodule_instance_args=None, @@ -239,22 +295,22 @@ def filter_problem_module_state_for_done(modules_to_update): @task -def regrade_problem_for_student(course_id, problem_url, student_identifier, xmodule_instance_args): +def regrade_problem_for_student(entry_id, course_id, problem_url, student_identifier, xmodule_instance_args): """Regrades problem `problem_url` in `course_id` for specified student.""" action_name = 'regraded' update_fcn = _regrade_problem_module_state filter_fcn = filter_problem_module_state_for_done - return _update_problem_module_state_for_student(course_id, problem_url, student_identifier, + return _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_identifier, update_fcn, action_name, filter_fcn, xmodule_instance_args) @task -def regrade_problem_for_all_students(course_id, problem_url, xmodule_instance_args): +def regrade_problem_for_all_students(entry_id, course_id, problem_url, xmodule_instance_args): """Regrades problem `problem_url` in `course_id` for all students.""" action_name = 'regraded' update_fcn = _regrade_problem_module_state filter_fcn = filter_problem_module_state_for_done - return _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name, filter_fcn, + return _update_problem_module_state_for_all_students(entry_id, course_id, problem_url, update_fcn, action_name, filter_fcn, xmodule_instance_args) @@ -286,21 +342,21 @@ def _reset_problem_attempts_module_state(module_descriptor, student_module, xmod @task -def reset_problem_attempts_for_student(course_id, problem_url, student_identifier, xmodule_instance_args): +def reset_problem_attempts_for_student(entry_id, course_id, problem_url, student_identifier, xmodule_instance_args): """Resets problem attempts to zero for `problem_url` in `course_id` for specified student.""" action_name = 'reset' update_fcn = _reset_problem_attempts_module_state - return _update_problem_module_state_for_student(course_id, problem_url, student_identifier, + return _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_identifier, update_fcn, action_name, xmodule_instance_args=xmodule_instance_args) @task -def reset_problem_attempts_for_all_students(course_id, problem_url, xmodule_instance_args): +def reset_problem_attempts_for_all_students(entry_id, course_id, problem_url, xmodule_instance_args): """Resets problem attempts to zero for `problem_url` in `course_id` for all students.""" action_name = 'reset' update_fcn = _reset_problem_attempts_module_state - return _update_problem_module_state_for_all_students(course_id, problem_url, + return _update_problem_module_state_for_all_students(entry_id, course_id, problem_url, update_fcn, action_name, xmodule_instance_args=xmodule_instance_args) @@ -319,21 +375,21 @@ def _delete_problem_module_state(module_descriptor, student_module, xmodule_inst @task -def delete_problem_state_for_student(course_id, problem_url, student_ident, xmodule_instance_args): +def delete_problem_state_for_student(entry_id, course_id, problem_url, student_ident, xmodule_instance_args): """Deletes problem state entirely for `problem_url` in `course_id` for specified student.""" action_name = 'deleted' update_fcn = _delete_problem_module_state - return _update_problem_module_state_for_student(course_id, problem_url, student_ident, + return _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_ident, update_fcn, action_name, xmodule_instance_args=xmodule_instance_args) @task -def delete_problem_state_for_all_students(course_id, problem_url, xmodule_instance_args): +def delete_problem_state_for_all_students(entry_id, course_id, problem_url, xmodule_instance_args): """Deletes problem state entirely for `problem_url` in `course_id` for all students.""" action_name = 'deleted' update_fcn = _delete_problem_module_state - return _update_problem_module_state_for_all_students(course_id, problem_url, + return _update_problem_module_state_for_all_students(entry_id, course_id, problem_url, update_fcn, action_name, xmodule_instance_args=xmodule_instance_args) diff --git a/lms/djangoapps/courseware/tests/test_tasks.py b/lms/djangoapps/courseware/tests/test_tasks.py index 1516df3b73..860624416e 100644 --- a/lms/djangoapps/courseware/tests/test_tasks.py +++ b/lms/djangoapps/courseware/tests/test_tasks.py @@ -380,7 +380,7 @@ class TestRegrading(TestRegradingBase): data=problem_xml, metadata={"rerandomize": "per_student"}) - def WAITING_FOR_SAFEEXEC_FIX_test_regrading_randomized_problem(self): + def test_regrading_randomized_problem(self): """Run regrade scenario on custom problem that uses randomize""" # First define the custom response problem: problem_url_name = 'H1P1'