Change updating of CourseTaskLog to mostly occur in worker thread.

This commit is contained in:
Brian Wilson
2013-05-30 01:34:27 -04:00
parent c1fff1568e
commit 76773c5bdf
3 changed files with 100 additions and 36 deletions

View File

@@ -111,7 +111,12 @@ def _update_task(course_task_log, task_result):
Autocommit annotation makes sure the database entry is committed.
"""
# we at least update the entry with the task_id, and for EAGER mode,
# we update other status as well. (For non-EAGER modes, the entry
# should not have changed except for setting PENDING state and the
# addition of the task_id.)
_update_course_task_log(course_task_log, task_result)
course_task_log.save()
def _get_xmodule_instance_args(request):
@@ -169,27 +174,31 @@ def _update_course_task_log(course_task_log_entry, task_result):
output['task_progress'] = returned_result
elif result_state == 'SUCCESS':
output['task_progress'] = returned_result
# save progress into the entry, even if it's not being saved here -- for EAGER,
# it needs to go back with the entry passed in.
course_task_log_entry.task_progress = json.dumps(returned_result)
output['task_progress'] = returned_result
log.info("task succeeded: %s", returned_result)
entry_needs_saving = True
elif result_state == 'FAILURE':
# on failure, the result's result contains the exception that caused the failure
exception = returned_result
traceback = result_traceback if result_traceback is not None else ''
entry_needs_saving = True
task_progress = {'exception': type(exception).__name__, 'message': str(exception.message)}
output['message'] = exception.message
log.warning("background task (%s) failed: %s %s", task_id, returned_result, traceback)
if result_traceback is not None:
output['task_traceback'] = result_traceback
task_progress['traceback'] = result_traceback
# save progress into the entry, even if it's not being saved -- for EAGER,
# it needs to go back with the entry passed in.
course_task_log_entry.task_progress = json.dumps(task_progress)
output['task_progress'] = task_progress
elif result_state == 'REVOKED':
# on revocation, the result's result doesn't contain anything
# but we cannot rely on the worker thread to set this status,
# so we set it here.
entry_needs_saving = True
message = 'Task revoked before running'
output['message'] = message
@@ -202,7 +211,6 @@ def _update_course_task_log(course_task_log_entry, task_result):
if result_state != course_task_log_entry.task_state:
course_task_log_entry.task_state = result_state
course_task_log_entry.task_id = task_id
entry_needs_saving = True
if entry_needs_saving:
course_task_log_entry.save()
@@ -358,7 +366,7 @@ def submit_regrade_problem_for_student(request, course_id, problem_url, student)
course_task_log = _reserve_task(course_id, task_name, problem_url, request.user, student)
# Submit task:
task_args = [course_id, problem_url, student.username, _get_xmodule_instance_args(request)]
task_args = [course_task_log.id, course_id, problem_url, student.username, _get_xmodule_instance_args(request)]
task_result = regrade_problem_for_student.apply_async(task_args)
# Update info in table with the resulting task_id (and state).
@@ -387,7 +395,7 @@ def submit_regrade_problem_for_all_students(request, course_id, problem_url):
course_task_log = _reserve_task(course_id, task_name, problem_url, request.user)
# Submit task:
task_args = [course_id, problem_url, _get_xmodule_instance_args(request)]
task_args = [course_task_log.id, course_id, problem_url, _get_xmodule_instance_args(request)]
task_result = regrade_problem_for_all_students.apply_async(task_args)
# Update info in table with the resulting task_id (and state).
@@ -419,7 +427,7 @@ def submit_reset_problem_attempts_for_all_students(request, course_id, problem_u
course_task_log = _reserve_task(course_id, task_name, problem_url, request.user)
# Submit task:
task_args = [course_id, problem_url, _get_xmodule_instance_args(request)]
task_args = [course_task_log.id, course_id, problem_url, _get_xmodule_instance_args(request)]
task_result = reset_problem_attempts_for_all_students.apply_async(task_args)
# Update info in table with the resulting task_id (and state).
@@ -451,7 +459,7 @@ def submit_delete_problem_state_for_all_students(request, course_id, problem_url
course_task_log = _reserve_task(course_id, task_name, problem_url, request.user)
# Submit task:
task_args = [course_id, problem_url, _get_xmodule_instance_args(request)]
task_args = [course_task_log.id, course_id, problem_url, _get_xmodule_instance_args(request)]
task_result = delete_problem_state_for_all_students.apply_async(task_args)
# Update info in table with the resulting task_id (and state).

View File

@@ -1,6 +1,8 @@
import json
from time import time
from sys import exc_info
from traceback import format_exc
from django.contrib.auth.models import User
from django.db import transaction
@@ -12,7 +14,7 @@ from xmodule.modulestore.django import modulestore
import mitxmako.middleware as middleware
from track.views import task_track
from courseware.models import StudentModule
from courseware.models import StudentModule, CourseTaskLog
from courseware.model_data import ModelDataCache
from courseware.module_render import get_module_for_descriptor_internal
@@ -31,8 +33,8 @@ class UpdateProblemModuleStateError(Exception):
pass
def _update_problem_module_state(course_id, module_state_key, student, update_fcn, action_name, filter_fcn,
xmodule_instance_args):
def _update_problem_module_state_internal(course_id, module_state_key, student, update_fcn, action_name, filter_fcn,
xmodule_instance_args):
"""
Performs generic update by visiting StudentModule instances with the update_fcn provided.
@@ -49,18 +51,12 @@ def _update_problem_module_state(course_id, module_state_key, student, update_fc
passed through.
Because this is run internal to a task, it does not catch exceptions. These are allowed to pass up to the
task-running level, so that it can set the failure modes and capture the error trace in the result object.
next level, so that it can set the failure modes and capture the error trace in the CourseTaskLog and the
result object.
"""
task_id = current_task.request.id
fmt = 'Starting to update problem modules as task "{task_id}": course "{course_id}" problem "{state_key}": nothing {action} yet'
task_log.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, action=action_name))
# get start time for task:
start_time = time()
# add task_id to xmodule_instance_args, so that it can be output with tracking info:
xmodule_instance_args['task_id'] = task_id
# Hack to get mako templates to work on celery worker server's worker thread.
# The initialization of Mako templating is usually done when Django is
# initializing middleware packages as part of processing a server request.
@@ -119,14 +115,74 @@ def _update_problem_module_state(course_id, module_state_key, student, update_fc
current_task.update_state(state='PROGRESS', meta=get_task_progress())
task_progress = get_task_progress()
# update progress without updating the state
current_task.update_state(state='PROGRESS', meta=task_progress)
return task_progress
@transaction.autocommit
def _save_course_task_log_entry(entry):
"""Writes CourseTaskLog entry immediately."""
entry.save()
def _update_problem_module_state(entry_id, course_id, module_state_key, student, update_fcn, action_name, filter_fcn,
xmodule_instance_args):
"""
Performs generic update by visiting StudentModule instances with the update_fcn provided.
See _update_problem_module_state_internal function for more details on arguments.
The `entry_id` is the primary key for the CourseTaskLog entry representing the task. This function
updates the entry on SUCCESS and FAILURE of the _update_problem_module_state_internal function it
wraps.
Once exceptions are caught and recorded in the CourseTaskLog entry, they are allowed to pass up to the
task-running level, so that it can also set the failure modes and capture the error trace in the result object.
"""
task_id = current_task.request.id
fmt = 'Starting to update problem modules as task "{task_id}": course "{course_id}" problem "{state_key}": nothing {action} yet'
task_log.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, action=action_name))
# get the CourseTaskLog to be updated. If this fails, then let the exception return to Celery.
# There's no point in catching it here.
entry = CourseTaskLog.objects.get(pk=entry_id)
# add task_id to xmodule_instance_args, so that it can be output with tracking info:
xmodule_instance_args['task_id'] = task_id
entry.task_id = task_id
_save_course_task_log_entry(entry)
# now that we have an entry we can try to catch failures:
task_progress = None
try:
task_progress = _update_problem_module_state_internal(course_id, module_state_key, student, update_fcn,
action_name, filter_fcn, xmodule_instance_args)
except Exception:
# try to write out the failure to the entry before failing
exception_type, exception, traceback = exc_info()
traceback_string = format_exc(traceback) if traceback is not None else ''
task_progress = {'exception': exception_type.__name__, 'message': str(exception.message)}
task_log.warning("background task (%s) failed: %s %s", task_id, exception, traceback_string)
if traceback is not None:
task_progress['traceback'] = traceback_string
entry.task_progress = json.dumps(task_progress)
entry.task_state = 'FAILURE'
_save_course_task_log_entry(entry)
raise
# if we get here, we assume we've succeeded, so update the CourseTaskLog entry in anticipation:
entry.task_progress = json.dumps(task_progress)
entry.task_state = 'SUCCESS'
_save_course_task_log_entry(entry)
# log and exit, returning task_progress info as task result:
fmt = 'Finishing task "{task_id}": course "{course_id}" problem "{state_key}": final: {progress}'
task_log.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, progress=task_progress))
return task_progress
def _update_problem_module_state_for_student(course_id, problem_url, student_identifier,
def _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_identifier,
update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None):
"""
Update the StudentModule for a given student. See _update_problem_module_state().
@@ -139,7 +195,7 @@ def _update_problem_module_state_for_student(course_id, problem_url, student_ide
student_to_update = User.objects.get(email=student_identifier)
elif student_identifier is not None:
student_to_update = User.objects.get(username=student_identifier)
return _update_problem_module_state(course_id, problem_url, student_to_update, update_fcn,
return _update_problem_module_state(entry_id, course_id, problem_url, student_to_update, update_fcn,
action_name, filter_fcn, xmodule_instance_args)
except User.DoesNotExist:
msg = "Couldn't find student with that email or username."
@@ -147,11 +203,11 @@ def _update_problem_module_state_for_student(course_id, problem_url, student_ide
return (success, msg)
def _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None):
def _update_problem_module_state_for_all_students(entry_id, course_id, problem_url, update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None):
"""
Update the StudentModule for all students. See _update_problem_module_state().
"""
return _update_problem_module_state(course_id, problem_url, None, update_fcn, action_name, filter_fcn, xmodule_instance_args)
return _update_problem_module_state(entry_id, course_id, problem_url, None, update_fcn, action_name, filter_fcn, xmodule_instance_args)
def _get_module_instance_for_task(course_id, student, module_descriptor, module_state_key, xmodule_instance_args=None,
@@ -239,22 +295,22 @@ def filter_problem_module_state_for_done(modules_to_update):
@task
def regrade_problem_for_student(course_id, problem_url, student_identifier, xmodule_instance_args):
def regrade_problem_for_student(entry_id, course_id, problem_url, student_identifier, xmodule_instance_args):
"""Regrades problem `problem_url` in `course_id` for specified student."""
action_name = 'regraded'
update_fcn = _regrade_problem_module_state
filter_fcn = filter_problem_module_state_for_done
return _update_problem_module_state_for_student(course_id, problem_url, student_identifier,
return _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_identifier,
update_fcn, action_name, filter_fcn, xmodule_instance_args)
@task
def regrade_problem_for_all_students(course_id, problem_url, xmodule_instance_args):
def regrade_problem_for_all_students(entry_id, course_id, problem_url, xmodule_instance_args):
"""Regrades problem `problem_url` in `course_id` for all students."""
action_name = 'regraded'
update_fcn = _regrade_problem_module_state
filter_fcn = filter_problem_module_state_for_done
return _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name, filter_fcn,
return _update_problem_module_state_for_all_students(entry_id, course_id, problem_url, update_fcn, action_name, filter_fcn,
xmodule_instance_args)
@@ -286,21 +342,21 @@ def _reset_problem_attempts_module_state(module_descriptor, student_module, xmod
@task
def reset_problem_attempts_for_student(course_id, problem_url, student_identifier, xmodule_instance_args):
def reset_problem_attempts_for_student(entry_id, course_id, problem_url, student_identifier, xmodule_instance_args):
"""Resets problem attempts to zero for `problem_url` in `course_id` for specified student."""
action_name = 'reset'
update_fcn = _reset_problem_attempts_module_state
return _update_problem_module_state_for_student(course_id, problem_url, student_identifier,
return _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_identifier,
update_fcn, action_name,
xmodule_instance_args=xmodule_instance_args)
@task
def reset_problem_attempts_for_all_students(course_id, problem_url, xmodule_instance_args):
def reset_problem_attempts_for_all_students(entry_id, course_id, problem_url, xmodule_instance_args):
"""Resets problem attempts to zero for `problem_url` in `course_id` for all students."""
action_name = 'reset'
update_fcn = _reset_problem_attempts_module_state
return _update_problem_module_state_for_all_students(course_id, problem_url,
return _update_problem_module_state_for_all_students(entry_id, course_id, problem_url,
update_fcn, action_name,
xmodule_instance_args=xmodule_instance_args)
@@ -319,21 +375,21 @@ def _delete_problem_module_state(module_descriptor, student_module, xmodule_inst
@task
def delete_problem_state_for_student(course_id, problem_url, student_ident, xmodule_instance_args):
def delete_problem_state_for_student(entry_id, course_id, problem_url, student_ident, xmodule_instance_args):
"""Deletes problem state entirely for `problem_url` in `course_id` for specified student."""
action_name = 'deleted'
update_fcn = _delete_problem_module_state
return _update_problem_module_state_for_student(course_id, problem_url, student_ident,
return _update_problem_module_state_for_student(entry_id, course_id, problem_url, student_ident,
update_fcn, action_name,
xmodule_instance_args=xmodule_instance_args)
@task
def delete_problem_state_for_all_students(course_id, problem_url, xmodule_instance_args):
def delete_problem_state_for_all_students(entry_id, course_id, problem_url, xmodule_instance_args):
"""Deletes problem state entirely for `problem_url` in `course_id` for all students."""
action_name = 'deleted'
update_fcn = _delete_problem_module_state
return _update_problem_module_state_for_all_students(course_id, problem_url,
return _update_problem_module_state_for_all_students(entry_id, course_id, problem_url,
update_fcn, action_name,
xmodule_instance_args=xmodule_instance_args)

View File

@@ -380,7 +380,7 @@ class TestRegrading(TestRegradingBase):
data=problem_xml,
metadata={"rerandomize": "per_student"})
def WAITING_FOR_SAFEEXEC_FIX_test_regrading_randomized_problem(self):
def test_regrading_randomized_problem(self):
"""Run regrade scenario on custom problem that uses randomize"""
# First define the custom response problem:
problem_url_name = 'H1P1'