diff --git a/common/djangoapps/track/views.py b/common/djangoapps/track/views.py index b2935a6a89..f56a8db5eb 100644 --- a/common/djangoapps/track/views.py +++ b/common/djangoapps/track/views.py @@ -1,13 +1,11 @@ import json import logging -import os import pytz import datetime import dateutil.parser from django.contrib.auth.decorators import login_required from django.http import HttpResponse -from django.http import Http404 from django.shortcuts import redirect from django.conf import settings from mitxmako.shortcuts import render_to_response @@ -95,6 +93,46 @@ def server_track(request, event_type, event, page=None): log_event(event) +def task_track(request_info, task_info, event_type, event, page=None): + """ + Outputs tracking information for events occuring within celery tasks. + + The `event_type` is a string naming the particular event being logged, + while `event` is a dict containing whatever additional contextual information + is desired. + + The `request_info` is a dict containing information about the original + task request. Relevant keys are `username`, `ip`, `agent`, and `host`. + + In addition, a `task_info` dict provides more information to be stored with + the `event` dict. + + The `page` parameter is optional, and allows the name of the page to + be provided. + """ + + # supplement event information with additional information + # about the task in which it is running. + full_event = dict(event, **task_info) + + # All fields must be specified, in case the tracking information is + # also saved to the TrackingLog model. Get values from the task-level + # information, or just add placeholder values. + event = { + "username": request_info.get('username', 'unknown'), + "ip": request_info.get('ip', 'unknown'), + "event_source": "task", + "event_type": event_type, + "event": full_event, + "agent": request_info.get('agent', 'unknown'), + "page": page, + "time": datetime.datetime.utcnow().isoformat(), + "host": request_info.get('host', 'unknown') + } + + log_event(event) + + @login_required @ensure_csrf_cookie def view_tracking_log(request, args=''): diff --git a/lms/djangoapps/courseware/module_render.py b/lms/djangoapps/courseware/module_render.py index 2ae7bcdc1f..eee085d7e7 100644 --- a/lms/djangoapps/courseware/module_render.py +++ b/lms/djangoapps/courseware/module_render.py @@ -121,7 +121,7 @@ def toc_for_course(user, request, course, active_chapter, active_section, model_ def get_module(user, request, location, model_data_cache, course_id, - position=None, not_found_ok = False, wrap_xmodule_display=True, + position=None, not_found_ok=False, wrap_xmodule_display=True, grade_bucket_type=None, depth=0): """ Get an instance of the xmodule class identified by location, @@ -161,10 +161,45 @@ def get_module(user, request, location, model_data_cache, course_id, return None +def get_xqueue_callback_url_prefix(request): + """ + Calculates default prefix based on request, but allows override via settings + + This is separated so that it can be called by the LMS before submitting + background tasks to run. The xqueue callbacks should go back to the LMS, + not to the worker. + """ + default_xqueue_callback_url_prefix = '{proto}://{host}'.format( + proto=request.META.get('HTTP_X_FORWARDED_PROTO', 'https' if request.is_secure() else 'http'), + host=request.get_host() + ) + return settings.XQUEUE_INTERFACE.get('callback_url', default_xqueue_callback_url_prefix) + + def get_module_for_descriptor(user, request, descriptor, model_data_cache, course_id, position=None, wrap_xmodule_display=True, grade_bucket_type=None): """ - Actually implement get_module. See docstring there for details. + Implements get_module, extracting out the request-specific functionality. + + See get_module() docstring for further details. + """ + track_function = make_track_function(request) + xqueue_callback_url_prefix = get_xqueue_callback_url_prefix(request) + + return get_module_for_descriptor_internal(user, descriptor, model_data_cache, course_id, + track_function, xqueue_callback_url_prefix, + position=position, + wrap_xmodule_display=wrap_xmodule_display, + grade_bucket_type=grade_bucket_type) + + +def get_module_for_descriptor_internal(user, descriptor, model_data_cache, course_id, + track_function, xqueue_callback_url_prefix, + position=None, wrap_xmodule_display=True, grade_bucket_type=None): + """ + Actually implement get_module, without requiring a request. + + See get_module() docstring for further details. """ # allow course staff to masquerade as student @@ -186,19 +221,13 @@ def get_module_for_descriptor(user, request, descriptor, model_data_cache, cours def make_xqueue_callback(dispatch='score_update'): # Fully qualified callback URL for external queueing system - xqueue_callback_url = '{proto}://{host}'.format( - host=request.get_host(), - proto=request.META.get('HTTP_X_FORWARDED_PROTO', 'https' if request.is_secure() else 'http') - ) - xqueue_callback_url = settings.XQUEUE_INTERFACE.get('callback_url',xqueue_callback_url) # allow override - - xqueue_callback_url += reverse('xqueue_callback', - kwargs=dict(course_id=course_id, - userid=str(user.id), - id=descriptor.location.url(), - dispatch=dispatch), - ) - return xqueue_callback_url + relative_xqueue_callback_url = reverse('xqueue_callback', + kwargs=dict(course_id=course_id, + userid=str(user.id), + id=descriptor.location.url(), + dispatch=dispatch), + ) + return xqueue_callback_url_prefix + relative_xqueue_callback_url # Default queuename is course-specific and is derived from the course that # contains the current module. @@ -211,20 +240,20 @@ def get_module_for_descriptor(user, request, descriptor, model_data_cache, cours 'waittime': settings.XQUEUE_WAITTIME_BETWEEN_REQUESTS } - #This is a hacky way to pass settings to the combined open ended xmodule - #It needs an S3 interface to upload images to S3 - #It needs the open ended grading interface in order to get peer grading to be done - #this first checks to see if the descriptor is the correct one, and only sends settings if it is + # This is a hacky way to pass settings to the combined open ended xmodule + # It needs an S3 interface to upload images to S3 + # It needs the open ended grading interface in order to get peer grading to be done + # this first checks to see if the descriptor is the correct one, and only sends settings if it is - #Get descriptor metadata fields indicating needs for various settings + # Get descriptor metadata fields indicating needs for various settings needs_open_ended_interface = getattr(descriptor, "needs_open_ended_interface", False) needs_s3_interface = getattr(descriptor, "needs_s3_interface", False) - #Initialize interfaces to None + # Initialize interfaces to None open_ended_grading_interface = None s3_interface = None - #Create interfaces if needed + # Create interfaces if needed if needs_open_ended_interface: open_ended_grading_interface = settings.OPEN_ENDED_GRADING_INTERFACE open_ended_grading_interface['mock_peer_grading'] = settings.MOCK_PEER_GRADING @@ -240,8 +269,13 @@ def get_module_for_descriptor(user, request, descriptor, model_data_cache, cours """ Delegate to get_module. It does an access check, so may return None """ - return get_module_for_descriptor(user, request, descriptor, - model_data_cache, course_id, position) + # TODO: fix this so that make_xqueue_callback uses the descriptor passed into + # inner_get_module, not the parent's callback. Add it as an argument.... + return get_module_for_descriptor_internal(user, descriptor, model_data_cache, course_id, + track_function, make_xqueue_callback, + position=position, + wrap_xmodule_display=wrap_xmodule_display, + grade_bucket_type=grade_bucket_type) def xblock_model_data(descriptor): return DbModel( @@ -291,7 +325,7 @@ def get_module_for_descriptor(user, request, descriptor, model_data_cache, cours # TODO (cpennington): When modules are shared between courses, the static # prefix is going to have to be specific to the module, not the directory # that the xml was loaded from - system = ModuleSystem(track_function=make_track_function(request), + system = ModuleSystem(track_function=track_function, render_template=render_to_string, ajax_url=ajax_url, xqueue=xqueue, diff --git a/lms/djangoapps/courseware/task_queue.py b/lms/djangoapps/courseware/task_queue.py index d439050039..946ba99d5e 100644 --- a/lms/djangoapps/courseware/task_queue.py +++ b/lms/djangoapps/courseware/task_queue.py @@ -1,28 +1,30 @@ import json import logging from django.http import HttpResponse +from django.db import transaction from celery.result import AsyncResult from celery.states import READY_STATES from courseware.models import CourseTaskLog +from courseware.module_render import get_xqueue_callback_url_prefix from courseware.tasks import regrade_problem_for_all_students from xmodule.modulestore.django import modulestore -# define different loggers for use within tasks and on client side log = logging.getLogger(__name__) def get_running_course_tasks(course_id): + """Returns a query of CourseTaskLog objects of running tasks for a given course.""" course_tasks = CourseTaskLog.objects.filter(course_id=course_id) - # exclude(task_state='SUCCESS').exclude(task_state='FAILURE').exclude(task_state='REVOKED') for state in READY_STATES: course_tasks = course_tasks.exclude(task_state=state) return course_tasks def _task_is_running(course_id, task_name, task_args, student=None): + """Checks if a particular task is already running""" runningTasks = CourseTaskLog.objects.filter(course_id=course_id, task_name=task_name, task_args=task_args) if student is not None: runningTasks = runningTasks.filter(student=student) @@ -31,56 +33,83 @@ def _task_is_running(course_id, task_name, task_args, student=None): return len(runningTasks) > 0 -def submit_regrade_problem_for_all_students(request, course_id, problem_url): - # check arguments: in particular, make sure that problem_url is defined - # (since that's currently typed in). If the corresponding module descriptor doesn't exist, - # an exception should be raised. Let it continue to the caller. - modulestore().get_instance(course_id, problem_url) +@transaction.autocommit +def _reserve_task(course_id, task_name, task_args, requester, student=None): + """ + Creates a database entry to indicate that a task is in progress. - # TODO: adjust transactions so that one request will not be about to create an - # entry while a second is testing to see if the entry exists. (Need to handle - # quick accidental double-clicks when submitting.) + An exception is thrown if the task is already in progress. - # check to see if task is already running - task_name = 'regrade' - if _task_is_running(course_id, task_name, problem_url): - # TODO: figure out how to return info that it's already running - raise Exception("task is already running") + Autocommit annotation makes sure the database entry is committed. + """ + + if _task_is_running(course_id, task_name, task_args, student): + raise Exception("requested task is already running") # Create log entry now, so that future requests won't tasklog_args = {'course_id': course_id, 'task_name': task_name, - 'task_args': problem_url, + 'task_args': task_args, 'task_state': 'QUEUING', - 'requester': request.user} + 'requester': requester} + if student is not None: + tasklog_args['student'] = student + course_task_log = CourseTaskLog.objects.create(**tasklog_args) - - - # At a low level of processing, the task currently fetches some information from the web request. - # This is used for setting up X-Queue, as well as for tracking. - # An actual request will not successfully serialize with json or with pickle. - # TODO: we can just pass all META info as a dict. - request_environ = {'HTTP_USER_AGENT': request.META['HTTP_USER_AGENT'], - 'REMOTE_ADDR': request.META['REMOTE_ADDR'], - 'SERVER_NAME': request.META['SERVER_NAME'], - 'REQUEST_METHOD': 'GET', -# 'HTTP_X_FORWARDED_PROTO': request.META['HTTP_X_FORWARDED_PROTO'], - } - - # Submit task: - task_args = [request_environ, course_id, problem_url] - result = regrade_problem_for_all_students.apply_async(task_args) - - # Put info into table with the resulting task_id. - course_task_log.task_state = result.state - course_task_log.task_id = result.id - course_task_log.save() return course_task_log +@transaction.autocommit +def _update_task(course_task_log, task_result): + """ + Updates a database entry with information about the submitted task. + + Autocommit annotation makes sure the database entry is committed. + """ + course_task_log.task_state = task_result.state + course_task_log.task_id = task_result.id + course_task_log.save() + + +def _get_xmodule_instance_args(request): + """ + Calculate parameters needed for instantiating xmodule instances. + + The `request_info` will be passed to a tracking log function, to provide information + about the source of the task request. The `xqueue_callback_urul_prefix` is used to + permit old-style xqueue callbacks directly to the appropriate module in the LMS. + """ + request_info = {'username': request.user.username, + 'ip': request.META['REMOTE_ADDR'], + 'agent': request.META.get('HTTP_USER_AGENT', ''), + 'host': request.META['SERVER_NAME'], + } + + xmodule_instance_args = {'xqueue_callback_url_prefix': get_xqueue_callback_url_prefix(request), + 'request_info': request_info, + } + return xmodule_instance_args + + def course_task_log_status(request, task_id=None): """ This returns the status of a course-related task as a JSON-serialized dict. + + The task_id can be specified in one of three ways: + + * explicitly as an argument to the method (by specifying in the url) + Returns a dict containing status information for the specified task_id + + * by making a post request containing 'task_id' as a parameter with a single value + Returns a dict containing status information for the specified task_id + + * by making a post request containing 'task_ids' as a parameter, + with a list of task_id values. + Returns a dict of dicts, with the task_id as key, and the corresponding + dict containing status information for the specified task_id + + Task_id values that are unrecognized are skipped. + """ output = {} if task_id is not None: @@ -108,7 +137,17 @@ def _get_course_task_log_status(task_id): 'task_id' 'task_state' 'in_progress': boolean indicating if the task is still running. + 'message': status message reporting on progress, or providing exception message if failed. + 'task_progress': dict containing progress information. This includes: + 'attempted': number of attempts made + 'updated': number of attempts that "succeeded" + 'total': number of possible subtasks to attempt + 'action_name': user-visible verb to use in status messages. Should be past-tense. 'task_traceback': optional, returned if task failed and produced a traceback. + 'succeeded': on complete tasks, indicates if the task outcome was successful: + did it achieve what it set out to do. + This is in contrast with a successful task_state, which indicates that the + task merely completed. If task doesn't exist, returns None. """ @@ -119,45 +158,73 @@ def _get_course_task_log_status(task_id): # TODO: log a message here return None + # define ajax return value: output = {} # if the task is already known to be done, then there's no reason to query - # the underlying task: + # the underlying task's result object: if course_task_log_entry.task_state not in READY_STATES: # we need to get information from the task result directly now. - # Just create the result object. + + # Just create the result object, and pull values out once. + # (If we check them later, the state and result may have changed.) result = AsyncResult(task_id) + result_state = result.state + returned_result = result.result + result_traceback = result.traceback - if result.traceback is not None: - output['task_traceback'] = result.traceback + # Assume we don't always update the CourseTaskLog entry if we don't have to: + entry_needs_saving = False - if result.state == "PROGRESS": - # construct a status message directly from the task result's metadata: - if hasattr(result, 'result') and 'current' in result.result: + if result_state == 'PROGRESS': + # construct a status message directly from the task result's result: + if hasattr(result, 'result') and 'attempted' in returned_result: fmt = "Attempted {attempted} of {total}, {action_name} {updated}" - message = fmt.format(attempted=result.result['attempted'], - updated=result.result['updated'], - total=result.result['total'], - action_name=result.result['action_name']) + message = fmt.format(attempted=returned_result['attempted'], + updated=returned_result['updated'], + total=returned_result['total'], + action_name=returned_result['action_name']) output['message'] = message - log.info("progress: {0}".format(message)) - for name in ['attempted', 'updated', 'total', 'action_name']: - output[name] = result.result[name] + log.info("task progress: {0}".format(message)) else: log.info("still making progress... ") + output['task_progress'] = returned_result - # update the entry if the state has changed: - if result.state != course_task_log_entry.task_state: - course_task_log_entry.task_state = result.state + elif result_state == 'SUCCESS': + # on success, save out the result here, but the message + # will be calculated later + output['task_progress'] = returned_result + course_task_log_entry.task_progress = json.dumps(returned_result) + log.info("task succeeded: {0}".format(returned_result)) + entry_needs_saving = True + + elif result_state == 'FAILURE': + # on failure, the result's result contains the exception that caused the failure + exception = str(returned_result) + course_task_log_entry.task_progress = exception + entry_needs_saving = True + output['message'] = exception + log.info("task failed: {0}".format(returned_result)) + if result_traceback is not None: + output['task_traceback'] = result_traceback + + # always update the entry if the state has changed: + if result_state != course_task_log_entry.task_state: + course_task_log_entry.task_state = result_state + entry_needs_saving = True + + if entry_needs_saving: course_task_log_entry.save() + else: + # task is already known to have finished, but report on its status: + if course_task_log_entry.task_progress is not None: + output['task_progress'] = json.loads(course_task_log_entry.task_progress) + # output basic information matching what's stored in CourseTaskLog: output['task_id'] = course_task_log_entry.task_id output['task_state'] = course_task_log_entry.task_state output['in_progress'] = course_task_log_entry.task_state not in READY_STATES - if course_task_log_entry.task_progress is not None: - output['task_progress'] = course_task_log_entry.task_progress - if course_task_log_entry.task_state == 'SUCCESS': succeeded, message = _get_task_completion_message(course_task_log_entry) output['message'] = message @@ -187,21 +254,53 @@ def _get_task_completion_message(course_task_log_entry): if num_attempted == 0: msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." elif num_updated == 0: - msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" + msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'" else: succeeded = True msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" elif num_attempted == 0: msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." elif num_updated == 0: - msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" + msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'" elif num_updated == num_attempted: succeeded = True - msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" + msg = "Problem successfully {action} for {attempted} students for problem '{problem}'" elif num_updated < num_attempted: - msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" + msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'" # Update status in task result object itself: - message = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, + message = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=course_task_log_entry.student, problem=course_task_log_entry.task_args) return (succeeded, message) + + +def submit_regrade_problem_for_all_students(request, course_id, problem_url): + """ + Request a problem to be regraded as a background task. + + The problem will be regraded for all students who have accessed the + particular problem in a course. Parameters are the `course_id` and + the `problem_url`. The url must specify the location of the problem, + using i4x-type notation. + + An exception is thrown if the problem doesn't exist, or if the particular + problem is already being regraded. + """ + # check arguments: make sure that the problem_url is defined + # (since that's currently typed in). If the corresponding module descriptor doesn't exist, + # an exception will be raised. Let it pass up to the caller. + modulestore().get_instance(course_id, problem_url) + + task_name = 'regrade_problem' + + # check to see if task is already running, and reserve it otherwise + course_task_log = _reserve_task(course_id, task_name, problem_url, request.user) + + # Submit task: + task_args = [course_id, problem_url, _get_xmodule_instance_args(request)] + task_result = regrade_problem_for_all_students.apply_async(task_args) + + # Update info in table with the resulting task_id (and state). + _update_task(course_task_log, task_result) + + return course_task_log diff --git a/lms/djangoapps/courseware/tasks.py b/lms/djangoapps/courseware/tasks.py index f29ffb58ce..5b05eb725d 100644 --- a/lms/djangoapps/courseware/tasks.py +++ b/lms/djangoapps/courseware/tasks.py @@ -1,28 +1,28 @@ import json -#import logging from time import sleep + from django.contrib.auth.models import User -from django.test.client import RequestFactory +from django.db import transaction from celery import task, current_task -from celery.signals import worker_ready +# from celery.signals import worker_ready from celery.utils.log import get_task_logger import mitxmako.middleware as middleware from courseware.models import StudentModule, CourseTaskLog from courseware.model_data import ModelDataCache -from courseware.module_render import get_module +# from courseware.module_render import get_module +from courseware.module_render import get_module_for_descriptor_internal from xmodule.modulestore.django import modulestore -#from xmodule.modulestore.exceptions import ItemNotFoundError, InvalidLocationError -import track.views + +from track.views import task_track # define different loggers for use within tasks and on client side task_log = get_task_logger(__name__) -# log = logging.getLogger(__name__) @task @@ -40,38 +40,17 @@ def waitawhile(value): class UpdateProblemModuleStateError(Exception): pass -#def get_module_descriptor(course_id, module_state_key): -# """Return module descriptor for requested module, or None if not found.""" -# try: -# module_descriptor = modulestore().get_instance(course_id, module_state_key) -# except ItemNotFoundError: -# pass -# except InvalidLocationError: -# pass -# return module_descriptor -# except ItemNotFoundError: -# msg = "Couldn't find problem with that urlname." -# except InvalidLocationError: -# msg = "Couldn't find problem with that urlname." -# if module_descriptor is None: -# msg = "Couldn't find problem with that urlname." -# if not succeeded: -# current_task.update_state( -# meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) -# The task should still succeed, but should have metadata indicating -# that the result of the successful task was a failure. (It's not -# the queue that failed, but the task put on the queue.) +def _update_problem_module_state(course_id, module_state_key, student, update_fcn, action_name, filter_fcn, + xmodule_instance_args): + """ + Performs generic update by visiting StudentModule instances with the update_fcn provided. -def _update_problem_module_state(request, course_id, module_state_key, student, update_fcn, action_name, filter_fcn): - ''' - Performs generic update by visiting StudentModule instances with the update_fcn provided - - If student is None, performs update on modules for all students on the specified problem - ''' + If student is None, performs update on modules for all students on the specified problem. + """ # add hack so that mako templates will work on celery worker server: - # The initialization of Make templating is usually done when Django is - # initialize middleware packages as part of processing a server request. + # The initialization of Make templating is usually done when Django is + # initializing middleware packages as part of processing a server request. # When this is run on a celery worker server, no such initialization is # called. So we look for the result: the defining of the lookup paths # for templates. @@ -83,7 +62,6 @@ def _update_problem_module_state(request, course_id, module_state_key, student, module_descriptor = modulestore().get_instance(course_id, module_state_key) # find the module in question - succeeded = False modules_to_update = StudentModule.objects.filter(course_id=course_id, module_state_key=module_state_key) @@ -114,7 +92,7 @@ def _update_problem_module_state(request, course_id, module_state_key, student, num_attempted += 1 # There is no try here: if there's an error, we let it throw, and the task will # be marked as FAILED, with a stack trace. - if update_fcn(request, module_to_update, module_descriptor): + if update_fcn(module_descriptor, module_to_update, xmodule_instance_args): # If the update_fcn returns true, then it performed some kind of work. num_updated += 1 @@ -123,48 +101,19 @@ def _update_problem_module_state(request, course_id, module_state_key, student, # -- it may not make sense to do so every time through the loop # -- may depend on each iteration's duration current_task.update_state(state='PROGRESS', meta=get_task_progress()) + + # TODO: remove this once done with manual testing sleep(5) # in seconds - # Done with looping through all modules, so just return final statistics: - # TODO: these messages should be rendered at the view level -- move them there! -# if student is not None: -# if num_attempted == 0: -# msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." -# elif num_updated == 0: -# msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" -# else: -# succeeded = True -# msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" -# elif num_attempted == 0: -# msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." -# elif num_updated == 0: -# msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" -# elif num_updated == num_attempted: -# succeeded = True -# msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" -# elif num_updated < num_attempted: -# msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" -# -# # Update status in task result object itself: -# msg = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=student, problem=module_state_key) - task_progress = get_task_progress() # succeeded=succeeded, message=msg) + task_progress = get_task_progress() current_task.update_state(state='PROGRESS', meta=task_progress) - # Update final progress in course task table as well: - # The actual task result state is updated by celery when this task completes, and thus - # clobbers any custom metadata. So if we want any such status to persist, we have to - # write it to the CourseTaskLog instead. - task_log.info("Finished processing task, updating CourseTaskLog entry") - - course_task_log_entry = CourseTaskLog.objects.get(task_id=current_task.request.id) - course_task_log_entry.task_progress = json.dumps(task_progress) - course_task_log_entry.save() - - return succeeded + task_log.info("Finished processing task") + return task_progress -def _update_problem_module_state_for_student(request, course_id, problem_url, student_identifier, - update_fcn, action_name, filter_fcn=None): +def _update_problem_module_state_for_student(course_id, problem_url, student_identifier, + update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None): msg = '' success = False # try to uniquely id student by email address or username @@ -173,18 +122,49 @@ def _update_problem_module_state_for_student(request, course_id, problem_url, st student_to_update = User.objects.get(email=student_identifier) elif student_identifier is not None: student_to_update = User.objects.get(username=student_identifier) - return _update_problem_module_state(request, course_id, problem_url, student_to_update, update_fcn, action_name, filter_fcn) + return _update_problem_module_state(course_id, problem_url, student_to_update, update_fcn, + action_name, filter_fcn, xmodule_instance_args) except User.DoesNotExist: msg = "Couldn't find student with that email or username." return (success, msg) -def _update_problem_module_state_for_all_students(request, course_id, problem_url, update_fcn, action_name, filter_fcn=None): - return _update_problem_module_state(request, course_id, problem_url, None, update_fcn, action_name, filter_fcn) +def _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name, filter_fcn=None, xmodule_instance_args=None): + return _update_problem_module_state(course_id, problem_url, None, update_fcn, action_name, filter_fcn, xmodule_instance_args) -def _regrade_problem_module_state(request, module_to_regrade, module_descriptor): +def _get_module_instance_for_task(course_id, student, module_descriptor, module_state_key, xmodule_instance_args=None, + grade_bucket_type=None): + # reconstitute the problem's corresponding XModule: + model_data_cache = ModelDataCache.cache_for_descriptor_descendents(course_id, student, module_descriptor) + # Note that the request is passed to get_module() to provide xqueue-related URL information +# instance = get_module(student, request, module_state_key, model_data_cache, +# course_id, grade_bucket_type='regrade') + + request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {} + task_info = {} + + def make_track_function(): + ''' + Make a tracking function that logs what happened. + For insertion into ModuleSystem, and use by CapaModule. + ''' + def f(event_type, event): + return task_track(request_info, task_info, event_type, event, page='x_module_task') + return f + + xqueue_callback_url_prefix = '' + if xmodule_instance_args is not None: + xqueue_callback_url_prefix = xmodule_instance_args.get('xqueue_callback_url_prefix') + + return get_module_for_descriptor_internal(student, module_descriptor, model_data_cache, course_id, + make_track_function(), xqueue_callback_url_prefix, + grade_bucket_type=grade_bucket_type) + + +@transaction.autocommit +def _regrade_problem_module_state(module_descriptor, student_module, xmodule_instance_args=None): ''' Takes an XModule descriptor and a corresponding StudentModule object, and performs regrading on the student's problem submission. @@ -192,16 +172,11 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) Throws exceptions if the regrading is fatal and should be aborted if in a loop. ''' # unpack the StudentModule: - course_id = module_to_regrade.course_id - student = module_to_regrade.student - module_state_key = module_to_regrade.module_state_key + course_id = student_module.course_id + student = student_module.student + module_state_key = student_module.module_state_key - # reconstitute the problem's corresponding XModule: - model_data_cache = ModelDataCache.cache_for_descriptor_descendents(course_id, student, - module_descriptor) - # Note that the request is passed to get_module() to provide xqueue-related URL information - instance = get_module(student, request, module_state_key, model_data_cache, - course_id, grade_bucket_type='regrade') + instance = _get_module_instance_for_task(course_id, student, module_descriptor, module_state_key, xmodule_instance_args, grade_bucket_type='regrade') if instance is None: # Either permissions just changed, or someone is trying to be clever @@ -220,21 +195,16 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) result = instance.regrade_problem() if 'success' not in result: # don't consider these fatal, but false means that the individual call didn't complete: - task_log.debug("error processing regrade call for problem {loc} and student {student}: " + task_log.warning("error processing regrade call for problem {loc} and student {student}: " "unexpected response {msg}".format(msg=result, loc=module_state_key, student=student)) return False elif result['success'] != 'correct' and result['success'] != 'incorrect': - task_log.debug("error processing regrade call for problem {loc} and student {student}: " + task_log.warning("error processing regrade call for problem {loc} and student {student}: " "{msg}".format(msg=result['success'], loc=module_state_key, student=student)) return False else: - track.views.server_track(request, - 'regrade problem {problem} for student {student} ' - 'in {course}'.format(student=student.id, - problem=module_to_regrade.module_state_key, - course=course_id), - {}, - page='idashboard') + task_log.debug("successfully processed regrade call for problem {loc} and student {student}: " + "{msg}".format(msg=result['success'], loc=module_state_key, student=student)) return True @@ -243,111 +213,89 @@ def filter_problem_module_state_for_done(modules_to_update): @task -def _regrade_problem_for_student(request, course_id, problem_url, student_identifier): +def regrade_problem_for_student(course_id, problem_url, student_identifier, xmodule_instance_args): action_name = 'regraded' update_fcn = _regrade_problem_module_state filter_fcn = filter_problem_module_state_for_done - return _update_problem_module_state_for_student(request, course_id, problem_url, student_identifier, - update_fcn, action_name, filter_fcn) - - -def regrade_problem_for_student(request, course_id, problem_url, student_identifier): - # First submit task. Then put stuff into table with the resulting task_id. - result = _regrade_problem_for_student.apply_async(request, course_id, problem_url, student_identifier) - task_id = result.id - # TODO: for log, would want student_identifier to already be mapped to the student - tasklog_args = {'course_id': course_id, - 'task_name': 'regrade', - 'task_args': problem_url, - 'task_id': task_id, - 'task_state': result.state, - 'requester': request.user} - - CourseTaskLog.objects.create(**tasklog_args) - return result + return _update_problem_module_state_for_student(course_id, problem_url, student_identifier, + update_fcn, action_name, filter_fcn, xmodule_instance_args) @task -def regrade_problem_for_all_students(request_environ, course_id, problem_url): - factory = RequestFactory(**request_environ) - request = factory.get('/') +def regrade_problem_for_all_students(course_id, problem_url, xmodule_instance_args): +# factory = RequestFactory(**request_environ) +# request = factory.get('/') action_name = 'regraded' update_fcn = _regrade_problem_module_state filter_fcn = filter_problem_module_state_for_done - return _update_problem_module_state_for_all_students(request, course_id, problem_url, - update_fcn, action_name, filter_fcn) + return _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name, filter_fcn, + xmodule_instance_args) -def _reset_problem_attempts_module_state(request, module_to_reset, module_descriptor): +@transaction.autocommit +def _reset_problem_attempts_module_state(module_descriptor, student_module, xmodule_instance_args=None): # modify the problem's state # load the state json and change state - problem_state = json.loads(module_to_reset.state) + problem_state = json.loads(student_module.state) if 'attempts' in problem_state: old_number_of_attempts = problem_state["attempts"] if old_number_of_attempts > 0: problem_state["attempts"] = 0 # convert back to json and save - module_to_reset.state = json.dumps(problem_state) - module_to_reset.save() - # write out tracking info - track.views.server_track(request, - '{instructor} reset attempts from {old_attempts} to 0 for {student} ' - 'on problem {problem} in {course}'.format(old_attempts=old_number_of_attempts, - student=module_to_reset.student, - problem=module_to_reset.module_state_key, - instructor=request.user, - course=module_to_reset.course_id), - {}, - page='idashboard') + student_module.state = json.dumps(problem_state) + student_module.save() # consider the reset to be successful, even if no update was performed. (It's just "optimized".) return True -def _reset_problem_attempts_for_student(request, course_id, problem_url, student_identifier): +@task +def reset_problem_attempts_for_student(course_id, problem_url, student_identifier): action_name = 'reset' update_fcn = _reset_problem_attempts_module_state - return _update_problem_module_state_for_student(request, course_id, problem_url, student_identifier, + return _update_problem_module_state_for_student(course_id, problem_url, student_identifier, update_fcn, action_name) -def _reset_problem_attempts_for_all_students(request, course_id, problem_url): +@task +def reset_problem_attempts_for_all_students(course_id, problem_url): action_name = 'reset' update_fcn = _reset_problem_attempts_module_state - return _update_problem_module_state_for_all_students(request, course_id, problem_url, + return _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name) -def _delete_problem_module_state(request, module_to_delete, module_descriptor): - ''' - delete the state - ''' - module_to_delete.delete() +@transaction.autocommit +def _delete_problem_module_state(module_descriptor, student_module, xmodule_instance_args=None): + """Delete the StudentModule entry.""" + student_module.delete() return True -def _delete_problem_state_for_student(request, course_id, problem_url, student_ident): +@task +def delete_problem_state_for_student(course_id, problem_url, student_ident): action_name = 'deleted' update_fcn = _delete_problem_module_state - return _update_problem_module_state_for_student(request, course_id, problem_url, + return _update_problem_module_state_for_student(course_id, problem_url, student_ident, update_fcn, action_name) -def _delete_problem_state_for_all_students(request, course_id, problem_url): +@task +def delete_problem_state_for_all_students(course_id, problem_url): action_name = 'deleted' update_fcn = _delete_problem_module_state - return _update_problem_module_state_for_all_students(request, course_id, problem_url, + return _update_problem_module_state_for_all_students(course_id, problem_url, update_fcn, action_name) -@worker_ready.connect -def initialize_middleware(**kwargs): - # The initialize Django middleware - some middleware components - # are initialized lazily when the first request is served. Since - # the celery workers do not serve request, the components never - # get initialized, causing errors in some dependencies. - # In particular, the Mako template middleware is used by some xmodules - task_log.info("Initializing all middleware from worker_ready.connect hook") - - from django.core.handlers.base import BaseHandler - BaseHandler().load_middleware() +#@worker_ready.connect +#def initialize_middleware(**kwargs): +# # The initialize Django middleware - some middleware components +# # are initialized lazily when the first request is served. Since +# # the celery workers do not serve request, the components never +# # get initialized, causing errors in some dependencies. +# # In particular, the Mako template middleware is used by some xmodules +# task_log.info("Initializing all middleware from worker_ready.connect hook") +# +# from django.core.handlers.base import BaseHandler +# BaseHandler().load_middleware() diff --git a/lms/djangoapps/instructor/views.py b/lms/djangoapps/instructor/views.py index 95482f1ee8..cf403132d1 100644 --- a/lms/djangoapps/instructor/views.py +++ b/lms/djangoapps/instructor/views.py @@ -220,7 +220,10 @@ def instructor_dashboard(request, course_id): try: course_task_log_entry = task_queue.submit_regrade_problem_for_all_students(request, course_id, problem_url) if course_task_log_entry is None: - msg += 'Failed to create a background task for regrading "{0}".'.format(problem_url) + msg += 'Failed to create a background task for regrading "{0}".'.format(problem_url) + else: + track_msg = 'regrade problem {problem} for all students in {course}'.format(problem=problem_url, course=course_id) + track.views.server_track(request, track_msg, {}, page='idashboard') except Exception as e: log.error("Encountered exception from regrade: {0}".format(e)) msg += 'Failed to create a background task for regrading "{0}": {1}.'.format(problem_url, e) diff --git a/lms/envs/test.py b/lms/envs/test.py index 3ccfa24014..3a93f6d820 100644 --- a/lms/envs/test.py +++ b/lms/envs/test.py @@ -188,3 +188,8 @@ PASSWORD_HASHERS = ( 'django.contrib.auth.hashers.MD5PasswordHasher', # 'django.contrib.auth.hashers.CryptPasswordHasher', ) + +################################# CELERY ###################################### + +# By default don't use a worker, execute tasks as if they were local functions +CELERY_ALWAYS_EAGER = True