From 7711c00e2c2f0ea3bc0513cedf882e68e9d361f0 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Mon, 6 May 2013 18:51:56 -0400 Subject: [PATCH] Pull task_queue.py methods out from tasks.py, to represent API calls from client. Tasks.py remains the task implementations running on the celery worker. In particular, move status message generation out of task thread to client side. --- .../0010_add_courseware_coursetasklog.py | 10 +- lms/djangoapps/courseware/models.py | 19 +- lms/djangoapps/courseware/task_queue.py | 207 +++++++++++++ lms/djangoapps/courseware/tasks.py | 277 +++++++----------- lms/djangoapps/instructor/views.py | 225 +++++++------- .../courseware/instructor_dashboard.html | 20 +- 6 files changed, 455 insertions(+), 303 deletions(-) create mode 100644 lms/djangoapps/courseware/task_queue.py diff --git a/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py b/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py index c24bcbd46e..345eebb535 100644 --- a/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py +++ b/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py @@ -8,22 +8,23 @@ from django.db import models class Migration(SchemaMigration): def forwards(self, orm): - # Adding model 'CourseTaskLog' db.create_table('courseware_coursetasklog', ( ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('task_name', self.gf('django.db.models.fields.CharField')(max_length=50, db_index=True)), ('course_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), ('student', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', null=True, to=orm['auth.User'])), - ('task_name', self.gf('django.db.models.fields.CharField')(max_length=50, db_index=True)), ('task_args', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), ('task_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), - ('task_status', self.gf('django.db.models.fields.CharField')(max_length=50, null=True, db_index=True)), + ('task_state', self.gf('django.db.models.fields.CharField')(max_length=50, null=True, db_index=True)), + ('task_progress', self.gf('django.db.models.fields.CharField')(max_length=1024, null=True, db_index=True)), ('requester', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', to=orm['auth.User'])), ('created', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, null=True, db_index=True, blank=True)), ('updated', self.gf('django.db.models.fields.DateTimeField')(auto_now=True, db_index=True, blank=True)), )) db.send_create_signal('courseware', ['CourseTaskLog']) + def backwards(self, orm): # Deleting model 'CourseTaskLog' db.delete_table('courseware_coursetasklog') @@ -76,7 +77,8 @@ class Migration(SchemaMigration): 'task_args': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), 'task_id': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}), - 'task_status': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True', 'db_index': 'True'}), + 'task_progress': ('django.db.models.fields.CharField', [], {'max_length': '1024', 'null': 'True', 'db_index': 'True'}), + 'task_state': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True', 'db_index': 'True'}), 'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'db_index': 'True', 'blank': 'True'}) }, 'courseware.offlinecomputedgrade': { diff --git a/lms/djangoapps/courseware/models.py b/lms/djangoapps/courseware/models.py index 5e58dc2e96..4700bcfb0b 100644 --- a/lms/djangoapps/courseware/models.py +++ b/lms/djangoapps/courseware/models.py @@ -271,12 +271,27 @@ class CourseTaskLog(models.Model): perform course-specific work. Examples include grading and regrading. """ + task_name = models.CharField(max_length=50, db_index=True) course_id = models.CharField(max_length=255, db_index=True) student = models.ForeignKey(User, null=True, db_index=True, related_name='+') # optional: None = task applies to all students - task_name = models.CharField(max_length=50, db_index=True) task_args = models.CharField(max_length=255, db_index=True) task_id = models.CharField(max_length=255, db_index=True) # max_length from celery_taskmeta - task_status = models.CharField(max_length=50, null=True, db_index=True) # max_length from celery_taskmeta + task_state = models.CharField(max_length=50, null=True, db_index=True) # max_length from celery_taskmeta + task_progress = models.CharField(max_length=1024, null=True, db_index=True) requester = models.ForeignKey(User, db_index=True, related_name='+') created = models.DateTimeField(auto_now_add=True, null=True, db_index=True) updated = models.DateTimeField(auto_now=True, db_index=True) + + def __repr__(self): + return 'CourseTaskLog<%r>' % ({ + 'task_name': self.task_name, + 'course_id': self.course_id, + 'student': self.student.username, + 'task_args': self.task_args, + 'task_id': self.task_id, + 'task_state': self.task_state, + 'task_progress': self.task_progress, + },) + + def __unicode__(self): + return unicode(repr(self)) diff --git a/lms/djangoapps/courseware/task_queue.py b/lms/djangoapps/courseware/task_queue.py new file mode 100644 index 0000000000..d439050039 --- /dev/null +++ b/lms/djangoapps/courseware/task_queue.py @@ -0,0 +1,207 @@ +import json +import logging +from django.http import HttpResponse + +from celery.result import AsyncResult +from celery.states import READY_STATES + +from courseware.models import CourseTaskLog +from courseware.tasks import regrade_problem_for_all_students +from xmodule.modulestore.django import modulestore + + +# define different loggers for use within tasks and on client side +log = logging.getLogger(__name__) + + +def get_running_course_tasks(course_id): + course_tasks = CourseTaskLog.objects.filter(course_id=course_id) + # exclude(task_state='SUCCESS').exclude(task_state='FAILURE').exclude(task_state='REVOKED') + for state in READY_STATES: + course_tasks = course_tasks.exclude(task_state=state) + return course_tasks + + +def _task_is_running(course_id, task_name, task_args, student=None): + runningTasks = CourseTaskLog.objects.filter(course_id=course_id, task_name=task_name, task_args=task_args) + if student is not None: + runningTasks = runningTasks.filter(student=student) + for state in READY_STATES: + runningTasks = runningTasks.exclude(task_state=state) + return len(runningTasks) > 0 + + +def submit_regrade_problem_for_all_students(request, course_id, problem_url): + # check arguments: in particular, make sure that problem_url is defined + # (since that's currently typed in). If the corresponding module descriptor doesn't exist, + # an exception should be raised. Let it continue to the caller. + modulestore().get_instance(course_id, problem_url) + + # TODO: adjust transactions so that one request will not be about to create an + # entry while a second is testing to see if the entry exists. (Need to handle + # quick accidental double-clicks when submitting.) + + # check to see if task is already running + task_name = 'regrade' + if _task_is_running(course_id, task_name, problem_url): + # TODO: figure out how to return info that it's already running + raise Exception("task is already running") + + # Create log entry now, so that future requests won't + tasklog_args = {'course_id': course_id, + 'task_name': task_name, + 'task_args': problem_url, + 'task_state': 'QUEUING', + 'requester': request.user} + course_task_log = CourseTaskLog.objects.create(**tasklog_args) + + + # At a low level of processing, the task currently fetches some information from the web request. + # This is used for setting up X-Queue, as well as for tracking. + # An actual request will not successfully serialize with json or with pickle. + # TODO: we can just pass all META info as a dict. + request_environ = {'HTTP_USER_AGENT': request.META['HTTP_USER_AGENT'], + 'REMOTE_ADDR': request.META['REMOTE_ADDR'], + 'SERVER_NAME': request.META['SERVER_NAME'], + 'REQUEST_METHOD': 'GET', +# 'HTTP_X_FORWARDED_PROTO': request.META['HTTP_X_FORWARDED_PROTO'], + } + + # Submit task: + task_args = [request_environ, course_id, problem_url] + result = regrade_problem_for_all_students.apply_async(task_args) + + # Put info into table with the resulting task_id. + course_task_log.task_state = result.state + course_task_log.task_id = result.id + course_task_log.save() + return course_task_log + + +def course_task_log_status(request, task_id=None): + """ + This returns the status of a course-related task as a JSON-serialized dict. + """ + output = {} + if task_id is not None: + output = _get_course_task_log_status(task_id) + elif 'task_id' in request.POST: + task_id = request.POST['task_id'] + output = _get_course_task_log_status(task_id) + elif 'task_ids[]' in request.POST: + tasks = request.POST.getlist('task_ids[]') + for task_id in tasks: + task_output = _get_course_task_log_status(task_id) + if task_output is not None: + output[task_id] = task_output + # TODO decide whether to raise exception if bad args are passed. + # May be enough just to return an empty output. + + return HttpResponse(json.dumps(output, indent=4)) + + +def _get_course_task_log_status(task_id): + """ + Get the status for a given task_id. + + Returns a dict, with the following keys: + 'task_id' + 'task_state' + 'in_progress': boolean indicating if the task is still running. + 'task_traceback': optional, returned if task failed and produced a traceback. + + If task doesn't exist, returns None. + """ + # First check if the task_id is known + try: + course_task_log_entry = CourseTaskLog.objects.get(task_id=task_id) + except CourseTaskLog.DoesNotExist: + # TODO: log a message here + return None + + output = {} + + # if the task is already known to be done, then there's no reason to query + # the underlying task: + if course_task_log_entry.task_state not in READY_STATES: + # we need to get information from the task result directly now. + # Just create the result object. + result = AsyncResult(task_id) + + if result.traceback is not None: + output['task_traceback'] = result.traceback + + if result.state == "PROGRESS": + # construct a status message directly from the task result's metadata: + if hasattr(result, 'result') and 'current' in result.result: + fmt = "Attempted {attempted} of {total}, {action_name} {updated}" + message = fmt.format(attempted=result.result['attempted'], + updated=result.result['updated'], + total=result.result['total'], + action_name=result.result['action_name']) + output['message'] = message + log.info("progress: {0}".format(message)) + for name in ['attempted', 'updated', 'total', 'action_name']: + output[name] = result.result[name] + else: + log.info("still making progress... ") + + # update the entry if the state has changed: + if result.state != course_task_log_entry.task_state: + course_task_log_entry.task_state = result.state + course_task_log_entry.save() + + output['task_id'] = course_task_log_entry.task_id + output['task_state'] = course_task_log_entry.task_state + output['in_progress'] = course_task_log_entry.task_state not in READY_STATES + + if course_task_log_entry.task_progress is not None: + output['task_progress'] = course_task_log_entry.task_progress + + if course_task_log_entry.task_state == 'SUCCESS': + succeeded, message = _get_task_completion_message(course_task_log_entry) + output['message'] = message + output['succeeded'] = succeeded + + return output + + +def _get_task_completion_message(course_task_log_entry): + """ + Construct progress message from progress information in CourseTaskLog entry. + + Returns (boolean, message string) duple. + """ + succeeded = False + + if course_task_log_entry.task_progress is None: + log.warning("No task_progress information found for course_task {0}".format(course_task_log_entry.task_id)) + return (succeeded, "No status information available") + + task_progress = json.loads(course_task_log_entry.task_progress) + action_name = task_progress['action_name'] + num_attempted = task_progress['attempted'] + num_updated = task_progress['updated'] + # num_total = task_progress['total'] + if course_task_log_entry.student is not None: + if num_attempted == 0: + msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." + elif num_updated == 0: + msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" + else: + succeeded = True + msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" + elif num_attempted == 0: + msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." + elif num_updated == 0: + msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" + elif num_updated == num_attempted: + succeeded = True + msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" + elif num_updated < num_attempted: + msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" + + # Update status in task result object itself: + message = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, + student=course_task_log_entry.student, problem=course_task_log_entry.task_args) + return (succeeded, message) diff --git a/lms/djangoapps/courseware/tasks.py b/lms/djangoapps/courseware/tasks.py index 674ea1effc..f29ffb58ce 100644 --- a/lms/djangoapps/courseware/tasks.py +++ b/lms/djangoapps/courseware/tasks.py @@ -1,36 +1,35 @@ import json -import logging +#import logging from time import sleep from django.contrib.auth.models import User -import mitxmako.middleware as middleware -from django.http import HttpResponse -# from django.http import HttpRequest from django.test.client import RequestFactory from celery import task, current_task -from celery.result import AsyncResult +from celery.signals import worker_ready from celery.utils.log import get_task_logger +import mitxmako.middleware as middleware + from courseware.models import StudentModule, CourseTaskLog from courseware.model_data import ModelDataCache from courseware.module_render import get_module from xmodule.modulestore.django import modulestore -from xmodule.modulestore.exceptions import ItemNotFoundError, InvalidLocationError +#from xmodule.modulestore.exceptions import ItemNotFoundError, InvalidLocationError import track.views # define different loggers for use within tasks and on client side -logger = get_task_logger(__name__) -log = logging.getLogger(__name__) +task_log = get_task_logger(__name__) +# log = logging.getLogger(__name__) @task def waitawhile(value): for i in range(value): sleep(1) # in seconds - logger.info('Waited {0} seconds...'.format(i)) + task_log.info('Waited {0} seconds...'.format(i)) current_task.update_state(state='PROGRESS', meta={'current': i, 'total': value}) @@ -41,19 +40,35 @@ def waitawhile(value): class UpdateProblemModuleStateError(Exception): pass +#def get_module_descriptor(course_id, module_state_key): +# """Return module descriptor for requested module, or None if not found.""" +# try: +# module_descriptor = modulestore().get_instance(course_id, module_state_key) +# except ItemNotFoundError: +# pass +# except InvalidLocationError: +# pass +# return module_descriptor +# except ItemNotFoundError: +# msg = "Couldn't find problem with that urlname." +# except InvalidLocationError: +# msg = "Couldn't find problem with that urlname." +# if module_descriptor is None: +# msg = "Couldn't find problem with that urlname." +# if not succeeded: +# current_task.update_state( +# meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) +# The task should still succeed, but should have metadata indicating +# that the result of the successful task was a failure. (It's not +# the queue that failed, but the task put on the queue.) -def _update_problem_module_state(request, course_id, problem_url, student, update_fcn, action_name, filter_fcn): + +def _update_problem_module_state(request, course_id, module_state_key, student, update_fcn, action_name, filter_fcn): ''' Performs generic update by visiting StudentModule instances with the update_fcn provided If student is None, performs update on modules for all students on the specified problem ''' - module_state_key = problem_url - # TODO: store this in the task state, not as a separate return value. - # (Unless that's not what the task state is intended to mean. The task can successfully - # complete, as far as celery is concerned, but have an internal status of failed.) - succeeded = False - # add hack so that mako templates will work on celery worker server: # The initialization of Make templating is usually done when Django is # initialize middleware packages as part of processing a server request. @@ -61,24 +76,11 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat # called. So we look for the result: the defining of the lookup paths # for templates. if 'main' not in middleware.lookup: + task_log.info("Initializing Mako middleware explicitly") middleware.MakoMiddleware() - # find the problem descriptor, if any: - try: - module_descriptor = modulestore().get_instance(course_id, module_state_key) - succeeded = True - except ItemNotFoundError: - msg = "Couldn't find problem with that urlname." - except InvalidLocationError: - msg = "Couldn't find problem with that urlname." - if module_descriptor is None: - msg = "Couldn't find problem with that urlname." -# if not succeeded: -# current_task.update_state( -# meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) -# The task should still succeed, but should have metadata indicating -# that the result of the successful task was a failure. (It's not -# the queue that failed, but the task put on the queue.) + # find the problem descriptor: + module_descriptor = modulestore().get_instance(course_id, module_state_key) # find the module in question succeeded = False @@ -97,54 +99,67 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat num_updated = 0 num_attempted = 0 num_total = len(modules_to_update) # TODO: make this more efficient. Count()? + + def get_task_progress(): + progress = {'action_name': action_name, + 'attempted': num_attempted, + 'updated': num_updated, + 'total': num_total, + } + return progress + + task_log.info("Starting to process task {0}".format(current_task.request.id)) + for module_to_update in modules_to_update: num_attempted += 1 -# try: + # There is no try here: if there's an error, we let it throw, and the task will + # be marked as FAILED, with a stack trace. if update_fcn(request, module_to_update, module_descriptor): + # If the update_fcn returns true, then it performed some kind of work. num_updated += 1 -# if there's an error, just let it throw, and the task will -# be marked as FAILED, with a stack trace. -# except UpdateProblemModuleStateError as e: - # something bad happened, so exit right away -# return (succeeded, e.message) + # update task status: - current_task.update_state(state='PROGRESS', - meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) + # TODO: decide on the frequency for updating this: + # -- it may not make sense to do so every time through the loop + # -- may depend on each iteration's duration + current_task.update_state(state='PROGRESS', meta=get_task_progress()) + sleep(5) # in seconds - # done with looping through all modules, so just return final statistics: - if student is not None: - if num_attempted == 0: - msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." - elif num_updated == 0: - msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" - else: - succeeded = True - msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" - elif num_attempted == 0: - msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." - elif num_updated == 0: - msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" - elif num_updated == num_attempted: - succeeded = True - msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" - elif num_updated < num_attempted: - msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" + # Done with looping through all modules, so just return final statistics: + # TODO: these messages should be rendered at the view level -- move them there! +# if student is not None: +# if num_attempted == 0: +# msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." +# elif num_updated == 0: +# msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" +# else: +# succeeded = True +# msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" +# elif num_attempted == 0: +# msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." +# elif num_updated == 0: +# msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" +# elif num_updated == num_attempted: +# succeeded = True +# msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" +# elif num_updated < num_attempted: +# msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" +# +# # Update status in task result object itself: +# msg = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=student, problem=module_state_key) + task_progress = get_task_progress() # succeeded=succeeded, message=msg) + current_task.update_state(state='PROGRESS', meta=task_progress) - msg = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=student, problem=module_state_key) - # update status in task result object itself: - current_task.update_state(state='DONE', - meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total, - 'succeeded': succeeded, 'message': msg}) + # Update final progress in course task table as well: + # The actual task result state is updated by celery when this task completes, and thus + # clobbers any custom metadata. So if we want any such status to persist, we have to + # write it to the CourseTaskLog instead. + task_log.info("Finished processing task, updating CourseTaskLog entry") - # and update status in course task table as well: - # TODO: figure out how this is legal. The actual task result - # status is updated by celery when this task completes, and is - # presumably going to clobber this custom metadata. So if we want - # any such status to persist, we have to write it to the CourseTaskLog instead. -# course_task_log_entry = CourseTaskLog.objects.get(task_id=current_task.id) -# course_task_log_entry.task_status = ... + course_task_log_entry = CourseTaskLog.objects.get(task_id=current_task.request.id) + course_task_log_entry.task_progress = json.dumps(task_progress) + course_task_log_entry.save() - # return (succeeded, msg) return succeeded @@ -193,7 +208,7 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) # and load something they shouldn't have access to. msg = "No module {loc} for student {student}--access denied?".format(loc=module_state_key, student=student) - log.debug(msg) + task_log.debug(msg) raise UpdateProblemModuleStateError(msg) if not hasattr(instance, 'regrade_problem'): @@ -205,11 +220,11 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) result = instance.regrade_problem() if 'success' not in result: # don't consider these fatal, but false means that the individual call didn't complete: - log.debug("error processing regrade call for problem {loc} and student {student}: " + task_log.debug("error processing regrade call for problem {loc} and student {student}: " "unexpected response {msg}".format(msg=result, loc=module_state_key, student=student)) return False elif result['success'] != 'correct' and result['success'] != 'incorrect': - log.debug("error processing regrade call for problem {loc} and student {student}: " + task_log.debug("error processing regrade call for problem {loc} and student {student}: " "{msg}".format(msg=result['success'], loc=module_state_key, student=student)) return False else: @@ -245,7 +260,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif 'task_name': 'regrade', 'task_args': problem_url, 'task_id': task_id, - 'task_status': result.state, + 'task_state': result.state, 'requester': request.user} CourseTaskLog.objects.create(**tasklog_args) @@ -253,9 +268,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif @task -def _regrade_problem_for_all_students(request_environ, course_id, problem_url): -# request = HttpRequest() -# request.META.update(request_environ) +def regrade_problem_for_all_students(request_environ, course_id, problem_url): factory = RequestFactory(**request_environ) request = factory.get('/') action_name = 'regraded' @@ -265,101 +278,6 @@ def _regrade_problem_for_all_students(request_environ, course_id, problem_url): update_fcn, action_name, filter_fcn) -def regrade_problem_for_all_students(request, course_id, problem_url): - # Figure out (for now) how to serialize what we need of the request. The actual - # request will not successfully serialize with json or with pickle. - # Maybe we can just pass all META info as a dict. - request_environ = {'HTTP_USER_AGENT': request.META['HTTP_USER_AGENT'], - 'REMOTE_ADDR': request.META['REMOTE_ADDR'], - 'SERVER_NAME': request.META['SERVER_NAME'], - 'REQUEST_METHOD': 'GET', -# 'HTTP_X_FORWARDED_PROTO': request.META['HTTP_X_FORWARDED_PROTO'], - } - - # Submit task. Then put stuff into table with the resulting task_id. - task_args = [request_environ, course_id, problem_url] - result = _regrade_problem_for_all_students.apply_async(task_args) - task_id = result.id - tasklog_args = {'course_id': course_id, - 'task_name': 'regrade', - 'task_args': problem_url, - 'task_id': task_id, - 'task_status': result.state, - 'requester': request.user} - course_task_log = CourseTaskLog.objects.create(**tasklog_args) - return course_task_log - - -def course_task_log_status(request, task_id=None): - """ - This returns the status of a course-related task as a JSON-serialized dict. - """ - output = {} - if task_id is not None: - output = _get_course_task_log_status(task_id) - elif 'task_id' in request.POST: - task_id = request.POST['task_id'] - output = _get_course_task_log_status(task_id) - elif 'task_ids[]' in request.POST: - tasks = request.POST.getlist('task_ids[]') - for task_id in tasks: - task_output = _get_course_task_log_status(task_id) - output[task_id] = task_output - # TODO else: raise exception? - - return HttpResponse(json.dumps(output, indent=4)) - - -def _get_course_task_log_status(task_id): - course_task_log_entry = CourseTaskLog.objects.get(task_id=task_id) - # TODO: error handling if it doesn't exist... - - def not_in_progress(entry): - # TODO: do better than to copy list from celery.states.READY_STATES - return entry.task_status in ['SUCCESS', 'FAILURE', 'REVOKED'] - - # if the task is already known to be done, then there's no reason to query - # the underlying task: - if not_in_progress(course_task_log_entry): - output = { - 'task_id': course_task_log_entry.task_id, - 'task_status': course_task_log_entry.task_status, - 'in_progress': False - } - return output - - # we need to get information from the task result directly now. - result = AsyncResult(task_id) - - output = { - 'task_id': result.id, - 'task_status': result.state, - 'in_progress': True - } - if result.traceback is not None: - output['task_traceback'] = result.traceback - - if result.state == "PROGRESS": - if hasattr(result, 'result') and 'current' in result.result: - log.info("still waiting... progress at {0} of {1}".format(result.result['current'], - result.result['total'])) - output['current'] = result.result['current'] - output['total'] = result.result['total'] - else: - log.info("still making progress... ") - - if result.successful(): - value = result.result - output['value'] = value - - # update the entry if necessary: - if course_task_log_entry.task_status != result.state: - course_task_log_entry.task_status = result.state - course_task_log_entry.save() - - return output - - def _reset_problem_attempts_module_state(request, module_to_reset, module_descriptor): # modify the problem's state # load the state json and change state @@ -420,3 +338,16 @@ def _delete_problem_state_for_all_students(request, course_id, problem_url): update_fcn = _delete_problem_module_state return _update_problem_module_state_for_all_students(request, course_id, problem_url, update_fcn, action_name) + + +@worker_ready.connect +def initialize_middleware(**kwargs): + # The initialize Django middleware - some middleware components + # are initialized lazily when the first request is served. Since + # the celery workers do not serve request, the components never + # get initialized, causing errors in some dependencies. + # In particular, the Mako template middleware is used by some xmodules + task_log.info("Initializing all middleware from worker_ready.connect hook") + + from django.core.handlers.base import BaseHandler + BaseHandler().load_middleware() diff --git a/lms/djangoapps/instructor/views.py b/lms/djangoapps/instructor/views.py index 67ea0d1ea9..95482f1ee8 100644 --- a/lms/djangoapps/instructor/views.py +++ b/lms/djangoapps/instructor/views.py @@ -10,9 +10,9 @@ import os import re import requests from requests.status_codes import codes -import urllib +#import urllib from collections import OrderedDict -from time import sleep +#from time import sleep from StringIO import StringIO @@ -25,7 +25,8 @@ from mitxmako.shortcuts import render_to_response from django.core.urlresolvers import reverse from courseware import grades -from courseware import tasks +#from courseware import tasks # for now... should remove once things are in queue instead +from courseware import task_queue from courseware.access import (has_access, get_access_group_name, course_beta_test_group_name) from courseware.courses import get_course_with_access @@ -176,12 +177,12 @@ def instructor_dashboard(request, course_id): datatable['title'] = 'List of students enrolled in {0}'.format(course_id) track.views.server_track(request, 'list-students', {}, page='idashboard') - elif 'Test Celery' in action: - args = (10,) - result = tasks.waitawhile.apply_async(args, retry=False) - task_id = result.id - celery_ajax_url = reverse('celery_ajax_status', kwargs={'task_id': task_id}) - msg += '

Celery Status for task ${task}:

Status end.

'.format(task=task_id, url=celery_ajax_url) +# elif 'Test Celery' in action: +# args = (10,) +# result = tasks.waitawhile.apply_async(args, retry=False) +# task_id = result.id +# celery_ajax_url = reverse('celery_ajax_status', kwargs={'task_id': task_id}) +# msg += '

Celery Status for task ${task}:

Status end.

'.format(task=task_id, url=celery_ajax_url) elif 'Dump Grades' in action: log.debug(action) @@ -217,13 +218,13 @@ def instructor_dashboard(request, course_id): elif "Regrade ALL students' problem submissions" in action: problem_url = request.POST.get('problem_to_regrade', '') try: - course_task_log_entry = tasks.regrade_problem_for_all_students(request, course_id, problem_url) + course_task_log_entry = task_queue.submit_regrade_problem_for_all_students(request, course_id, problem_url) + if course_task_log_entry is None: + msg += 'Failed to create a background task for regrading "{0}".'.format(problem_url) except Exception as e: - log.error("Encountered exception from regrade: {0}", e) - # check that a course_task_log entry was created: - if course_task_log_entry is None: - msg += 'Failed to create a background task for regrading "{0}".'.format(problem_url) - + log.error("Encountered exception from regrade: {0}".format(e)) + msg += 'Failed to create a background task for regrading "{0}": {1}.'.format(problem_url, e) + elif "Reset student's attempts" in action or "Delete student state for problem" in action: # get the form data unique_student_identifier = request.POST.get('unique_student_identifier', '') @@ -645,7 +646,7 @@ def instructor_dashboard(request, course_id): msg += "
Grades from %s" % offline_grades_available(course_id) # generate list of pending background tasks - course_tasks = CourseTaskLog.objects.filter(course_id = course_id).exclude(task_status='SUCCESS').exclude(task_status='FAILURE') + course_tasks = task_queue.get_running_course_tasks(course_id) #---------------------------------------- # context for rendering @@ -1205,99 +1206,99 @@ def dump_grading_context(course): return msg -def old1testcelery(request): - """ - A Simple view that checks if the application can talk to the celery workers - """ - args = ('ping',) - result = tasks.echo.apply_async(args, retry=False) - value = result.get(timeout=0.5) - output = { - 'task_id': result.id, - 'value': value - } - return HttpResponse(json.dumps(output, indent=4)) - - -def old2testcelery(request): - """ - A Simple view that checks if the application can talk to the celery workers - """ - args = (10,) - result = tasks.waitawhile.apply_async(args, retry=False) - while not result.ready(): - sleep(0.5) # in seconds - if result.state == "PROGRESS": - if hasattr(result, 'result') and 'current' in result.result: - log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) - else: - log.info("still making progress... ") - if result.successful(): - value = result.result - output = { - 'task_id': result.id, - 'value': value - } - return HttpResponse(json.dumps(output, indent=4)) - - -def testcelery(request): - """ - A Simple view that checks if the application can talk to the celery workers - """ - args = (10,) - result = tasks.waitawhile.apply_async(args, retry=False) - task_id = result.id - # return the task_id to a template which will set up an ajax call to - # check the progress of the task. - return testcelery_status(request, task_id) -# return mitxmako.shortcuts.render_to_response('celery_ajax.html', { -# 'element_id': 'celery_task' -# 'id': self.task_id, -# 'ajax_url': reverse('testcelery_ajax'), -# }) - - -def testcelery_status(request, task_id): - result = tasks.waitawhile.AsyncResult(task_id) - while not result.ready(): - sleep(0.5) # in seconds - if result.state == "PROGRESS": - if hasattr(result, 'result') and 'current' in result.result: - log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) - else: - log.info("still making progress... ") - if result.successful(): - value = result.result - output = { - 'task_id': result.id, - 'value': value - } - return HttpResponse(json.dumps(output, indent=4)) - - -def celery_task_status(request, task_id): - # TODO: determine if we need to know the name of the original task, - # or if this could be any task... Sample code seems to indicate that - # we could just include the AsyncResult class directly, i.e.: - # from celery.result import AsyncResult. - result = tasks.waitawhile.AsyncResult(task_id) - - output = { - 'task_id': result.id, - 'state': result.state - } - - if result.state == "PROGRESS": - if hasattr(result, 'result') and 'current' in result.result: - log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) - output['current'] = result.result['current'] - output['total'] = result.result['total'] - else: - log.info("still making progress... ") - - if result.successful(): - value = result.result - output['value'] = value - - return HttpResponse(json.dumps(output, indent=4)) +#def old1testcelery(request): +# """ +# A Simple view that checks if the application can talk to the celery workers +# """ +# args = ('ping',) +# result = tasks.echo.apply_async(args, retry=False) +# value = result.get(timeout=0.5) +# output = { +# 'task_id': result.id, +# 'value': value +# } +# return HttpResponse(json.dumps(output, indent=4)) +# +# +#def old2testcelery(request): +# """ +# A Simple view that checks if the application can talk to the celery workers +# """ +# args = (10,) +# result = tasks.waitawhile.apply_async(args, retry=False) +# while not result.ready(): +# sleep(0.5) # in seconds +# if result.state == "PROGRESS": +# if hasattr(result, 'result') and 'current' in result.result: +# log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) +# else: +# log.info("still making progress... ") +# if result.successful(): +# value = result.result +# output = { +# 'task_id': result.id, +# 'value': value +# } +# return HttpResponse(json.dumps(output, indent=4)) +# +# +#def testcelery(request): +# """ +# A Simple view that checks if the application can talk to the celery workers +# """ +# args = (10,) +# result = tasks.waitawhile.apply_async(args, retry=False) +# task_id = result.id +# # return the task_id to a template which will set up an ajax call to +# # check the progress of the task. +# return testcelery_status(request, task_id) +## return mitxmako.shortcuts.render_to_response('celery_ajax.html', { +## 'element_id': 'celery_task' +## 'id': self.task_id, +## 'ajax_url': reverse('testcelery_ajax'), +## }) +# +# +#def testcelery_status(request, task_id): +# result = tasks.waitawhile.AsyncResult(task_id) +# while not result.ready(): +# sleep(0.5) # in seconds +# if result.state == "PROGRESS": +# if hasattr(result, 'result') and 'current' in result.result: +# log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) +# else: +# log.info("still making progress... ") +# if result.successful(): +# value = result.result +# output = { +# 'task_id': result.id, +# 'value': value +# } +# return HttpResponse(json.dumps(output, indent=4)) +# +# +#def celery_task_status(request, task_id): +# # TODO: determine if we need to know the name of the original task, +# # or if this could be any task... Sample code seems to indicate that +# # we could just include the AsyncResult class directly, i.e.: +# # from celery.result import AsyncResult. +# result = tasks.waitawhile.AsyncResult(task_id) +# +# output = { +# 'task_id': result.id, +# 'state': result.state +# } +# +# if result.state == "PROGRESS": +# if hasattr(result, 'result') and 'current' in result.result: +# log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) +# output['current'] = result.result['current'] +# output['total'] = result.result['total'] +# else: +# log.info("still making progress... ") +# +# if result.successful(): +# value = result.result +# output['value'] = value +# +# return HttpResponse(json.dumps(output, indent=4)) diff --git a/lms/templates/courseware/instructor_dashboard.html b/lms/templates/courseware/instructor_dashboard.html index 3b5ea7a0e1..acc32841be 100644 --- a/lms/templates/courseware/instructor_dashboard.html +++ b/lms/templates/courseware/instructor_dashboard.html @@ -74,18 +74,14 @@ var task_id = name; var task_dict = response[task_id]; // this should be a dict of properties for this task_id - var in_progress = task_dict.in_progress - if (in_progress === true) { + if (task_dict.in_progress === true) { something_in_progress = true; } // find the corresponding entry, and update it: - selector = '[data-task-id="' + task_id + '"]'; - entry = $(_this.element).find(selector); - var task_status_el = entry.find('.task-status'); - task_status_el.text(task_dict.task_status) - var task_progress_el = entry.find('.task-progress'); - var progress_value = task_dict.task_progress || ''; - task_progress_el.text(progress_value); + entry = $(_this.element).find('[data-task-id="' + task_id + '"]'); + entry.find('.task-state').text(task_dict.task_state) + var progress_value = task_dict.message || ''; + entry.find('.task-progress').text(progress_value); } } if (something_in_progress) { @@ -491,7 +487,7 @@ function goto( mode) ##----------------------------------------------------------------------------- ## Output tasks in progress -%if course_tasks is not None: +%if course_tasks is not None and len(course_tasks) > 0:

Pending Course Tasks

@@ -503,7 +499,7 @@ function goto( mode) - + %for tasknum, course_task in enumerate(course_tasks): @@ -515,7 +511,7 @@ function goto( mode) - + %endfor
Requester Submitted Last UpdateTask StatusTask State Task Progress
${course_task.requester} ${course_task.created}
${course_task.updated}
${course_task.task_status}
${course_task.task_state}
unknown