diff --git a/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py b/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py index c24bcbd46e..345eebb535 100644 --- a/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py +++ b/lms/djangoapps/courseware/migrations/0010_add_courseware_coursetasklog.py @@ -8,22 +8,23 @@ from django.db import models class Migration(SchemaMigration): def forwards(self, orm): - # Adding model 'CourseTaskLog' db.create_table('courseware_coursetasklog', ( ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('task_name', self.gf('django.db.models.fields.CharField')(max_length=50, db_index=True)), ('course_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), ('student', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', null=True, to=orm['auth.User'])), - ('task_name', self.gf('django.db.models.fields.CharField')(max_length=50, db_index=True)), ('task_args', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), ('task_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), - ('task_status', self.gf('django.db.models.fields.CharField')(max_length=50, null=True, db_index=True)), + ('task_state', self.gf('django.db.models.fields.CharField')(max_length=50, null=True, db_index=True)), + ('task_progress', self.gf('django.db.models.fields.CharField')(max_length=1024, null=True, db_index=True)), ('requester', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', to=orm['auth.User'])), ('created', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, null=True, db_index=True, blank=True)), ('updated', self.gf('django.db.models.fields.DateTimeField')(auto_now=True, db_index=True, blank=True)), )) db.send_create_signal('courseware', ['CourseTaskLog']) + def backwards(self, orm): # Deleting model 'CourseTaskLog' db.delete_table('courseware_coursetasklog') @@ -76,7 +77,8 @@ class Migration(SchemaMigration): 'task_args': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), 'task_id': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}), - 'task_status': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True', 'db_index': 'True'}), + 'task_progress': ('django.db.models.fields.CharField', [], {'max_length': '1024', 'null': 'True', 'db_index': 'True'}), + 'task_state': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True', 'db_index': 'True'}), 'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'db_index': 'True', 'blank': 'True'}) }, 'courseware.offlinecomputedgrade': { diff --git a/lms/djangoapps/courseware/models.py b/lms/djangoapps/courseware/models.py index 5e58dc2e96..4700bcfb0b 100644 --- a/lms/djangoapps/courseware/models.py +++ b/lms/djangoapps/courseware/models.py @@ -271,12 +271,27 @@ class CourseTaskLog(models.Model): perform course-specific work. Examples include grading and regrading. """ + task_name = models.CharField(max_length=50, db_index=True) course_id = models.CharField(max_length=255, db_index=True) student = models.ForeignKey(User, null=True, db_index=True, related_name='+') # optional: None = task applies to all students - task_name = models.CharField(max_length=50, db_index=True) task_args = models.CharField(max_length=255, db_index=True) task_id = models.CharField(max_length=255, db_index=True) # max_length from celery_taskmeta - task_status = models.CharField(max_length=50, null=True, db_index=True) # max_length from celery_taskmeta + task_state = models.CharField(max_length=50, null=True, db_index=True) # max_length from celery_taskmeta + task_progress = models.CharField(max_length=1024, null=True, db_index=True) requester = models.ForeignKey(User, db_index=True, related_name='+') created = models.DateTimeField(auto_now_add=True, null=True, db_index=True) updated = models.DateTimeField(auto_now=True, db_index=True) + + def __repr__(self): + return 'CourseTaskLog<%r>' % ({ + 'task_name': self.task_name, + 'course_id': self.course_id, + 'student': self.student.username, + 'task_args': self.task_args, + 'task_id': self.task_id, + 'task_state': self.task_state, + 'task_progress': self.task_progress, + },) + + def __unicode__(self): + return unicode(repr(self)) diff --git a/lms/djangoapps/courseware/task_queue.py b/lms/djangoapps/courseware/task_queue.py new file mode 100644 index 0000000000..d439050039 --- /dev/null +++ b/lms/djangoapps/courseware/task_queue.py @@ -0,0 +1,207 @@ +import json +import logging +from django.http import HttpResponse + +from celery.result import AsyncResult +from celery.states import READY_STATES + +from courseware.models import CourseTaskLog +from courseware.tasks import regrade_problem_for_all_students +from xmodule.modulestore.django import modulestore + + +# define different loggers for use within tasks and on client side +log = logging.getLogger(__name__) + + +def get_running_course_tasks(course_id): + course_tasks = CourseTaskLog.objects.filter(course_id=course_id) + # exclude(task_state='SUCCESS').exclude(task_state='FAILURE').exclude(task_state='REVOKED') + for state in READY_STATES: + course_tasks = course_tasks.exclude(task_state=state) + return course_tasks + + +def _task_is_running(course_id, task_name, task_args, student=None): + runningTasks = CourseTaskLog.objects.filter(course_id=course_id, task_name=task_name, task_args=task_args) + if student is not None: + runningTasks = runningTasks.filter(student=student) + for state in READY_STATES: + runningTasks = runningTasks.exclude(task_state=state) + return len(runningTasks) > 0 + + +def submit_regrade_problem_for_all_students(request, course_id, problem_url): + # check arguments: in particular, make sure that problem_url is defined + # (since that's currently typed in). If the corresponding module descriptor doesn't exist, + # an exception should be raised. Let it continue to the caller. + modulestore().get_instance(course_id, problem_url) + + # TODO: adjust transactions so that one request will not be about to create an + # entry while a second is testing to see if the entry exists. (Need to handle + # quick accidental double-clicks when submitting.) + + # check to see if task is already running + task_name = 'regrade' + if _task_is_running(course_id, task_name, problem_url): + # TODO: figure out how to return info that it's already running + raise Exception("task is already running") + + # Create log entry now, so that future requests won't + tasklog_args = {'course_id': course_id, + 'task_name': task_name, + 'task_args': problem_url, + 'task_state': 'QUEUING', + 'requester': request.user} + course_task_log = CourseTaskLog.objects.create(**tasklog_args) + + + # At a low level of processing, the task currently fetches some information from the web request. + # This is used for setting up X-Queue, as well as for tracking. + # An actual request will not successfully serialize with json or with pickle. + # TODO: we can just pass all META info as a dict. + request_environ = {'HTTP_USER_AGENT': request.META['HTTP_USER_AGENT'], + 'REMOTE_ADDR': request.META['REMOTE_ADDR'], + 'SERVER_NAME': request.META['SERVER_NAME'], + 'REQUEST_METHOD': 'GET', +# 'HTTP_X_FORWARDED_PROTO': request.META['HTTP_X_FORWARDED_PROTO'], + } + + # Submit task: + task_args = [request_environ, course_id, problem_url] + result = regrade_problem_for_all_students.apply_async(task_args) + + # Put info into table with the resulting task_id. + course_task_log.task_state = result.state + course_task_log.task_id = result.id + course_task_log.save() + return course_task_log + + +def course_task_log_status(request, task_id=None): + """ + This returns the status of a course-related task as a JSON-serialized dict. + """ + output = {} + if task_id is not None: + output = _get_course_task_log_status(task_id) + elif 'task_id' in request.POST: + task_id = request.POST['task_id'] + output = _get_course_task_log_status(task_id) + elif 'task_ids[]' in request.POST: + tasks = request.POST.getlist('task_ids[]') + for task_id in tasks: + task_output = _get_course_task_log_status(task_id) + if task_output is not None: + output[task_id] = task_output + # TODO decide whether to raise exception if bad args are passed. + # May be enough just to return an empty output. + + return HttpResponse(json.dumps(output, indent=4)) + + +def _get_course_task_log_status(task_id): + """ + Get the status for a given task_id. + + Returns a dict, with the following keys: + 'task_id' + 'task_state' + 'in_progress': boolean indicating if the task is still running. + 'task_traceback': optional, returned if task failed and produced a traceback. + + If task doesn't exist, returns None. + """ + # First check if the task_id is known + try: + course_task_log_entry = CourseTaskLog.objects.get(task_id=task_id) + except CourseTaskLog.DoesNotExist: + # TODO: log a message here + return None + + output = {} + + # if the task is already known to be done, then there's no reason to query + # the underlying task: + if course_task_log_entry.task_state not in READY_STATES: + # we need to get information from the task result directly now. + # Just create the result object. + result = AsyncResult(task_id) + + if result.traceback is not None: + output['task_traceback'] = result.traceback + + if result.state == "PROGRESS": + # construct a status message directly from the task result's metadata: + if hasattr(result, 'result') and 'current' in result.result: + fmt = "Attempted {attempted} of {total}, {action_name} {updated}" + message = fmt.format(attempted=result.result['attempted'], + updated=result.result['updated'], + total=result.result['total'], + action_name=result.result['action_name']) + output['message'] = message + log.info("progress: {0}".format(message)) + for name in ['attempted', 'updated', 'total', 'action_name']: + output[name] = result.result[name] + else: + log.info("still making progress... ") + + # update the entry if the state has changed: + if result.state != course_task_log_entry.task_state: + course_task_log_entry.task_state = result.state + course_task_log_entry.save() + + output['task_id'] = course_task_log_entry.task_id + output['task_state'] = course_task_log_entry.task_state + output['in_progress'] = course_task_log_entry.task_state not in READY_STATES + + if course_task_log_entry.task_progress is not None: + output['task_progress'] = course_task_log_entry.task_progress + + if course_task_log_entry.task_state == 'SUCCESS': + succeeded, message = _get_task_completion_message(course_task_log_entry) + output['message'] = message + output['succeeded'] = succeeded + + return output + + +def _get_task_completion_message(course_task_log_entry): + """ + Construct progress message from progress information in CourseTaskLog entry. + + Returns (boolean, message string) duple. + """ + succeeded = False + + if course_task_log_entry.task_progress is None: + log.warning("No task_progress information found for course_task {0}".format(course_task_log_entry.task_id)) + return (succeeded, "No status information available") + + task_progress = json.loads(course_task_log_entry.task_progress) + action_name = task_progress['action_name'] + num_attempted = task_progress['attempted'] + num_updated = task_progress['updated'] + # num_total = task_progress['total'] + if course_task_log_entry.student is not None: + if num_attempted == 0: + msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." + elif num_updated == 0: + msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" + else: + succeeded = True + msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" + elif num_attempted == 0: + msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." + elif num_updated == 0: + msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" + elif num_updated == num_attempted: + succeeded = True + msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" + elif num_updated < num_attempted: + msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" + + # Update status in task result object itself: + message = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, + student=course_task_log_entry.student, problem=course_task_log_entry.task_args) + return (succeeded, message) diff --git a/lms/djangoapps/courseware/tasks.py b/lms/djangoapps/courseware/tasks.py index 674ea1effc..f29ffb58ce 100644 --- a/lms/djangoapps/courseware/tasks.py +++ b/lms/djangoapps/courseware/tasks.py @@ -1,36 +1,35 @@ import json -import logging +#import logging from time import sleep from django.contrib.auth.models import User -import mitxmako.middleware as middleware -from django.http import HttpResponse -# from django.http import HttpRequest from django.test.client import RequestFactory from celery import task, current_task -from celery.result import AsyncResult +from celery.signals import worker_ready from celery.utils.log import get_task_logger +import mitxmako.middleware as middleware + from courseware.models import StudentModule, CourseTaskLog from courseware.model_data import ModelDataCache from courseware.module_render import get_module from xmodule.modulestore.django import modulestore -from xmodule.modulestore.exceptions import ItemNotFoundError, InvalidLocationError +#from xmodule.modulestore.exceptions import ItemNotFoundError, InvalidLocationError import track.views # define different loggers for use within tasks and on client side -logger = get_task_logger(__name__) -log = logging.getLogger(__name__) +task_log = get_task_logger(__name__) +# log = logging.getLogger(__name__) @task def waitawhile(value): for i in range(value): sleep(1) # in seconds - logger.info('Waited {0} seconds...'.format(i)) + task_log.info('Waited {0} seconds...'.format(i)) current_task.update_state(state='PROGRESS', meta={'current': i, 'total': value}) @@ -41,19 +40,35 @@ def waitawhile(value): class UpdateProblemModuleStateError(Exception): pass +#def get_module_descriptor(course_id, module_state_key): +# """Return module descriptor for requested module, or None if not found.""" +# try: +# module_descriptor = modulestore().get_instance(course_id, module_state_key) +# except ItemNotFoundError: +# pass +# except InvalidLocationError: +# pass +# return module_descriptor +# except ItemNotFoundError: +# msg = "Couldn't find problem with that urlname." +# except InvalidLocationError: +# msg = "Couldn't find problem with that urlname." +# if module_descriptor is None: +# msg = "Couldn't find problem with that urlname." +# if not succeeded: +# current_task.update_state( +# meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) +# The task should still succeed, but should have metadata indicating +# that the result of the successful task was a failure. (It's not +# the queue that failed, but the task put on the queue.) -def _update_problem_module_state(request, course_id, problem_url, student, update_fcn, action_name, filter_fcn): + +def _update_problem_module_state(request, course_id, module_state_key, student, update_fcn, action_name, filter_fcn): ''' Performs generic update by visiting StudentModule instances with the update_fcn provided If student is None, performs update on modules for all students on the specified problem ''' - module_state_key = problem_url - # TODO: store this in the task state, not as a separate return value. - # (Unless that's not what the task state is intended to mean. The task can successfully - # complete, as far as celery is concerned, but have an internal status of failed.) - succeeded = False - # add hack so that mako templates will work on celery worker server: # The initialization of Make templating is usually done when Django is # initialize middleware packages as part of processing a server request. @@ -61,24 +76,11 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat # called. So we look for the result: the defining of the lookup paths # for templates. if 'main' not in middleware.lookup: + task_log.info("Initializing Mako middleware explicitly") middleware.MakoMiddleware() - # find the problem descriptor, if any: - try: - module_descriptor = modulestore().get_instance(course_id, module_state_key) - succeeded = True - except ItemNotFoundError: - msg = "Couldn't find problem with that urlname." - except InvalidLocationError: - msg = "Couldn't find problem with that urlname." - if module_descriptor is None: - msg = "Couldn't find problem with that urlname." -# if not succeeded: -# current_task.update_state( -# meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) -# The task should still succeed, but should have metadata indicating -# that the result of the successful task was a failure. (It's not -# the queue that failed, but the task put on the queue.) + # find the problem descriptor: + module_descriptor = modulestore().get_instance(course_id, module_state_key) # find the module in question succeeded = False @@ -97,54 +99,67 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat num_updated = 0 num_attempted = 0 num_total = len(modules_to_update) # TODO: make this more efficient. Count()? + + def get_task_progress(): + progress = {'action_name': action_name, + 'attempted': num_attempted, + 'updated': num_updated, + 'total': num_total, + } + return progress + + task_log.info("Starting to process task {0}".format(current_task.request.id)) + for module_to_update in modules_to_update: num_attempted += 1 -# try: + # There is no try here: if there's an error, we let it throw, and the task will + # be marked as FAILED, with a stack trace. if update_fcn(request, module_to_update, module_descriptor): + # If the update_fcn returns true, then it performed some kind of work. num_updated += 1 -# if there's an error, just let it throw, and the task will -# be marked as FAILED, with a stack trace. -# except UpdateProblemModuleStateError as e: - # something bad happened, so exit right away -# return (succeeded, e.message) + # update task status: - current_task.update_state(state='PROGRESS', - meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) + # TODO: decide on the frequency for updating this: + # -- it may not make sense to do so every time through the loop + # -- may depend on each iteration's duration + current_task.update_state(state='PROGRESS', meta=get_task_progress()) + sleep(5) # in seconds - # done with looping through all modules, so just return final statistics: - if student is not None: - if num_attempted == 0: - msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." - elif num_updated == 0: - msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" - else: - succeeded = True - msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" - elif num_attempted == 0: - msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." - elif num_updated == 0: - msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" - elif num_updated == num_attempted: - succeeded = True - msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" - elif num_updated < num_attempted: - msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" + # Done with looping through all modules, so just return final statistics: + # TODO: these messages should be rendered at the view level -- move them there! +# if student is not None: +# if num_attempted == 0: +# msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'." +# elif num_updated == 0: +# msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!" +# else: +# succeeded = True +# msg = "Problem successfully {action} for student '{student}' and problem '{problem}'" +# elif num_attempted == 0: +# msg = "Unable to find any students with submissions to be {action} for problem '{problem}'." +# elif num_updated == 0: +# msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!" +# elif num_updated == num_attempted: +# succeeded = True +# msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!" +# elif num_updated < num_attempted: +# msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!" +# +# # Update status in task result object itself: +# msg = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=student, problem=module_state_key) + task_progress = get_task_progress() # succeeded=succeeded, message=msg) + current_task.update_state(state='PROGRESS', meta=task_progress) - msg = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=student, problem=module_state_key) - # update status in task result object itself: - current_task.update_state(state='DONE', - meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total, - 'succeeded': succeeded, 'message': msg}) + # Update final progress in course task table as well: + # The actual task result state is updated by celery when this task completes, and thus + # clobbers any custom metadata. So if we want any such status to persist, we have to + # write it to the CourseTaskLog instead. + task_log.info("Finished processing task, updating CourseTaskLog entry") - # and update status in course task table as well: - # TODO: figure out how this is legal. The actual task result - # status is updated by celery when this task completes, and is - # presumably going to clobber this custom metadata. So if we want - # any such status to persist, we have to write it to the CourseTaskLog instead. -# course_task_log_entry = CourseTaskLog.objects.get(task_id=current_task.id) -# course_task_log_entry.task_status = ... + course_task_log_entry = CourseTaskLog.objects.get(task_id=current_task.request.id) + course_task_log_entry.task_progress = json.dumps(task_progress) + course_task_log_entry.save() - # return (succeeded, msg) return succeeded @@ -193,7 +208,7 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) # and load something they shouldn't have access to. msg = "No module {loc} for student {student}--access denied?".format(loc=module_state_key, student=student) - log.debug(msg) + task_log.debug(msg) raise UpdateProblemModuleStateError(msg) if not hasattr(instance, 'regrade_problem'): @@ -205,11 +220,11 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) result = instance.regrade_problem() if 'success' not in result: # don't consider these fatal, but false means that the individual call didn't complete: - log.debug("error processing regrade call for problem {loc} and student {student}: " + task_log.debug("error processing regrade call for problem {loc} and student {student}: " "unexpected response {msg}".format(msg=result, loc=module_state_key, student=student)) return False elif result['success'] != 'correct' and result['success'] != 'incorrect': - log.debug("error processing regrade call for problem {loc} and student {student}: " + task_log.debug("error processing regrade call for problem {loc} and student {student}: " "{msg}".format(msg=result['success'], loc=module_state_key, student=student)) return False else: @@ -245,7 +260,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif 'task_name': 'regrade', 'task_args': problem_url, 'task_id': task_id, - 'task_status': result.state, + 'task_state': result.state, 'requester': request.user} CourseTaskLog.objects.create(**tasklog_args) @@ -253,9 +268,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif @task -def _regrade_problem_for_all_students(request_environ, course_id, problem_url): -# request = HttpRequest() -# request.META.update(request_environ) +def regrade_problem_for_all_students(request_environ, course_id, problem_url): factory = RequestFactory(**request_environ) request = factory.get('/') action_name = 'regraded' @@ -265,101 +278,6 @@ def _regrade_problem_for_all_students(request_environ, course_id, problem_url): update_fcn, action_name, filter_fcn) -def regrade_problem_for_all_students(request, course_id, problem_url): - # Figure out (for now) how to serialize what we need of the request. The actual - # request will not successfully serialize with json or with pickle. - # Maybe we can just pass all META info as a dict. - request_environ = {'HTTP_USER_AGENT': request.META['HTTP_USER_AGENT'], - 'REMOTE_ADDR': request.META['REMOTE_ADDR'], - 'SERVER_NAME': request.META['SERVER_NAME'], - 'REQUEST_METHOD': 'GET', -# 'HTTP_X_FORWARDED_PROTO': request.META['HTTP_X_FORWARDED_PROTO'], - } - - # Submit task. Then put stuff into table with the resulting task_id. - task_args = [request_environ, course_id, problem_url] - result = _regrade_problem_for_all_students.apply_async(task_args) - task_id = result.id - tasklog_args = {'course_id': course_id, - 'task_name': 'regrade', - 'task_args': problem_url, - 'task_id': task_id, - 'task_status': result.state, - 'requester': request.user} - course_task_log = CourseTaskLog.objects.create(**tasklog_args) - return course_task_log - - -def course_task_log_status(request, task_id=None): - """ - This returns the status of a course-related task as a JSON-serialized dict. - """ - output = {} - if task_id is not None: - output = _get_course_task_log_status(task_id) - elif 'task_id' in request.POST: - task_id = request.POST['task_id'] - output = _get_course_task_log_status(task_id) - elif 'task_ids[]' in request.POST: - tasks = request.POST.getlist('task_ids[]') - for task_id in tasks: - task_output = _get_course_task_log_status(task_id) - output[task_id] = task_output - # TODO else: raise exception? - - return HttpResponse(json.dumps(output, indent=4)) - - -def _get_course_task_log_status(task_id): - course_task_log_entry = CourseTaskLog.objects.get(task_id=task_id) - # TODO: error handling if it doesn't exist... - - def not_in_progress(entry): - # TODO: do better than to copy list from celery.states.READY_STATES - return entry.task_status in ['SUCCESS', 'FAILURE', 'REVOKED'] - - # if the task is already known to be done, then there's no reason to query - # the underlying task: - if not_in_progress(course_task_log_entry): - output = { - 'task_id': course_task_log_entry.task_id, - 'task_status': course_task_log_entry.task_status, - 'in_progress': False - } - return output - - # we need to get information from the task result directly now. - result = AsyncResult(task_id) - - output = { - 'task_id': result.id, - 'task_status': result.state, - 'in_progress': True - } - if result.traceback is not None: - output['task_traceback'] = result.traceback - - if result.state == "PROGRESS": - if hasattr(result, 'result') and 'current' in result.result: - log.info("still waiting... progress at {0} of {1}".format(result.result['current'], - result.result['total'])) - output['current'] = result.result['current'] - output['total'] = result.result['total'] - else: - log.info("still making progress... ") - - if result.successful(): - value = result.result - output['value'] = value - - # update the entry if necessary: - if course_task_log_entry.task_status != result.state: - course_task_log_entry.task_status = result.state - course_task_log_entry.save() - - return output - - def _reset_problem_attempts_module_state(request, module_to_reset, module_descriptor): # modify the problem's state # load the state json and change state @@ -420,3 +338,16 @@ def _delete_problem_state_for_all_students(request, course_id, problem_url): update_fcn = _delete_problem_module_state return _update_problem_module_state_for_all_students(request, course_id, problem_url, update_fcn, action_name) + + +@worker_ready.connect +def initialize_middleware(**kwargs): + # The initialize Django middleware - some middleware components + # are initialized lazily when the first request is served. Since + # the celery workers do not serve request, the components never + # get initialized, causing errors in some dependencies. + # In particular, the Mako template middleware is used by some xmodules + task_log.info("Initializing all middleware from worker_ready.connect hook") + + from django.core.handlers.base import BaseHandler + BaseHandler().load_middleware() diff --git a/lms/djangoapps/instructor/views.py b/lms/djangoapps/instructor/views.py index 67ea0d1ea9..95482f1ee8 100644 --- a/lms/djangoapps/instructor/views.py +++ b/lms/djangoapps/instructor/views.py @@ -10,9 +10,9 @@ import os import re import requests from requests.status_codes import codes -import urllib +#import urllib from collections import OrderedDict -from time import sleep +#from time import sleep from StringIO import StringIO @@ -25,7 +25,8 @@ from mitxmako.shortcuts import render_to_response from django.core.urlresolvers import reverse from courseware import grades -from courseware import tasks +#from courseware import tasks # for now... should remove once things are in queue instead +from courseware import task_queue from courseware.access import (has_access, get_access_group_name, course_beta_test_group_name) from courseware.courses import get_course_with_access @@ -176,12 +177,12 @@ def instructor_dashboard(request, course_id): datatable['title'] = 'List of students enrolled in {0}'.format(course_id) track.views.server_track(request, 'list-students', {}, page='idashboard') - elif 'Test Celery' in action: - args = (10,) - result = tasks.waitawhile.apply_async(args, retry=False) - task_id = result.id - celery_ajax_url = reverse('celery_ajax_status', kwargs={'task_id': task_id}) - msg += '
Celery Status for task ${task}:
Status end.
'.format(task=task_id, url=celery_ajax_url) +# elif 'Test Celery' in action: +# args = (10,) +# result = tasks.waitawhile.apply_async(args, retry=False) +# task_id = result.id +# celery_ajax_url = reverse('celery_ajax_status', kwargs={'task_id': task_id}) +# msg += 'Celery Status for task ${task}:
Status end.
'.format(task=task_id, url=celery_ajax_url) elif 'Dump Grades' in action: log.debug(action) @@ -217,13 +218,13 @@ def instructor_dashboard(request, course_id): elif "Regrade ALL students' problem submissions" in action: problem_url = request.POST.get('problem_to_regrade', '') try: - course_task_log_entry = tasks.regrade_problem_for_all_students(request, course_id, problem_url) + course_task_log_entry = task_queue.submit_regrade_problem_for_all_students(request, course_id, problem_url) + if course_task_log_entry is None: + msg += 'Failed to create a background task for regrading "{0}".'.format(problem_url) except Exception as e: - log.error("Encountered exception from regrade: {0}", e) - # check that a course_task_log entry was created: - if course_task_log_entry is None: - msg += 'Failed to create a background task for regrading "{0}".'.format(problem_url) - + log.error("Encountered exception from regrade: {0}".format(e)) + msg += 'Failed to create a background task for regrading "{0}": {1}.'.format(problem_url, e) + elif "Reset student's attempts" in action or "Delete student state for problem" in action: # get the form data unique_student_identifier = request.POST.get('unique_student_identifier', '') @@ -645,7 +646,7 @@ def instructor_dashboard(request, course_id): msg += "Pending Course Tasks
| Requester | Submitted | Last Update | -Task Status | +Task State | Task Progress | %for tasknum, course_task in enumerate(course_tasks): @@ -515,7 +511,7 @@ function goto( mode)${course_task.requester} | ${course_task.created} | ${course_task.updated} |
- ${course_task.task_status} |
+ ${course_task.task_state} |
unknown |
%endfor
|---|