From e2d985209ca75789c4edebefc7dc0d450d602d60 Mon Sep 17 00:00:00 2001 From: Brian Wilson Date: Tue, 24 Sep 2013 11:37:58 -0400 Subject: [PATCH] Move updates for InstructorTask into BaseInstructorTask abstract class. --- lms/djangoapps/instructor_task/tasks.py | 24 ++-- .../instructor_task/tasks_helper.py | 134 ++++++++++-------- .../instructor_task/tests/test_tasks.py | 71 +++------- 3 files changed, 109 insertions(+), 120 deletions(-) diff --git a/lms/djangoapps/instructor_task/tasks.py b/lms/djangoapps/instructor_task/tasks.py index fb15c5fe8d..a6a082f2b9 100644 --- a/lms/djangoapps/instructor_task/tasks.py +++ b/lms/djangoapps/instructor_task/tasks.py @@ -21,16 +21,18 @@ of the query for traversing StudentModule objects. """ from celery import task from functools import partial -from instructor_task.tasks_helper import (run_main_task, - perform_module_state_update, - rescore_problem_module_state, - reset_attempts_module_state, - delete_problem_module_state, - ) +from instructor_task.tasks_helper import ( + run_main_task, + BaseInstructorTask, + perform_module_state_update, + rescore_problem_module_state, + reset_attempts_module_state, + delete_problem_module_state, +) from bulk_email.tasks import perform_delegate_email_batches -@task +@task(base=BaseInstructorTask) def rescore_problem(entry_id, xmodule_instance_args): """Rescores a problem in a course, for all students or one specific student. @@ -59,7 +61,7 @@ def rescore_problem(entry_id, xmodule_instance_args): return run_main_task(entry_id, visit_fcn, action_name) -@task +@task(base=BaseInstructorTask) def reset_problem_attempts(entry_id, xmodule_instance_args): """Resets problem attempts to zero for a particular problem for all students in a course. @@ -80,7 +82,7 @@ def reset_problem_attempts(entry_id, xmodule_instance_args): return run_main_task(entry_id, visit_fcn, action_name) -@task +@task(base=BaseInstructorTask) def delete_problem_state(entry_id, xmodule_instance_args): """Deletes problem state entirely for all students on a particular problem in a course. @@ -101,7 +103,7 @@ def delete_problem_state(entry_id, xmodule_instance_args): return run_main_task(entry_id, visit_fcn, action_name) -@task +@task(base=BaseInstructorTask) def send_bulk_course_email(entry_id, xmodule_instance_args): """Sends emails to in a course. @@ -116,4 +118,4 @@ def send_bulk_course_email(entry_id, xmodule_instance_args): """ action_name = 'emailed' visit_fcn = perform_delegate_email_batches - return run_main_task(entry_id, visit_fcn, action_name, spawns_subtasks=True) + return run_main_task(entry_id, visit_fcn, action_name) diff --git a/lms/djangoapps/instructor_task/tasks_helper.py b/lms/djangoapps/instructor_task/tasks_helper.py index a4d3a08f8d..8e37bfe7c0 100644 --- a/lms/djangoapps/instructor_task/tasks_helper.py +++ b/lms/djangoapps/instructor_task/tasks_helper.py @@ -5,10 +5,8 @@ running state of a course. """ import json from time import time -from sys import exc_info -from traceback import format_exc -from celery import current_task +from celery import Task, current_task from celery.utils.log import get_task_logger from celery.states import SUCCESS, FAILURE @@ -37,6 +35,66 @@ UPDATE_STATUS_FAILED = 'failed' UPDATE_STATUS_SKIPPED = 'skipped' +class BaseInstructorTask(Task): + """ + Base task class for use with InstructorTask models. + + Permits updating information about task in corresponding InstructorTask for monitoring purposes. + + Assumes that the entry_id of the InstructorTask model is the first argument to the task. + """ + abstract = True + + def on_success(self, task_progress, task_id, args, kwargs): + """ + Update InstructorTask object corresponding to this task with info about success. + + Updates task_output and task_state. But it shouldn't actually do anything + if the task is only creating subtasks to actually do the work. + """ + TASK_LOG.info('Task success returned: %r' % (self.request, )) + # We should be able to find the InstructorTask object to update + # based on the task_id here, without having to dig into the + # original args to the task. On the other hand, the entry_id + # is the first value passed to all such args, so we'll use that. + # And we assume that it exists, else we would already have had a failure. + entry_id = args[0] + entry = InstructorTask.objects.get(pk=entry_id) + # Check to see if any subtasks had been defined as part of this task. + # If not, then we know that we're done. (If so, let the subtasks + # handle updating task_state themselves.) + if len(entry.subtasks) == 0: + entry.task_output = InstructorTask.create_output_for_success(task_progress) + entry.task_state = SUCCESS + entry.save_now() + + def on_failure(self, exc, task_id, args, kwargs, einfo): + """ + Update InstructorTask object corresponding to this task with info about failure. + + Fetches and updates exception and traceback information on failure. + """ + TASK_LOG.info('Task failure returned: %r' % (self.request, )) + entry_id = args[0] + try: + entry = InstructorTask.objects.get(pk=entry_id) + except InstructorTask.DoesNotExist: + # if the InstructorTask object does not exist, then there's no point + # trying to update it. + pass + else: + TASK_LOG.warning("background task (%s) failed: %s %s", task_id, einfo.exception, einfo.traceback) + entry.task_output = InstructorTask.create_output_for_failure(einfo.exception, einfo.traceback) + entry.task_state = FAILURE + entry.save_now() + + def on_retry(self, exc, task_id, args, kwargs, einfo): + # We don't expect this to be called for top-level tasks, at the moment.... + # If it were, not sure what kind of status to report for it. + # But it would be good to know that it's being called, so at least log it. + TASK_LOG.info('Task retry returned: %r' % (self.request, )) + + class UpdateProblemModuleStateError(Exception): """ Error signaling a fatal condition while updating problem modules. @@ -162,7 +220,7 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta return task_progress -def run_main_task(entry_id, task_fcn, action_name, spawns_subtasks=False): +def run_main_task(entry_id, task_fcn, action_name): """ Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask. @@ -221,64 +279,18 @@ def run_main_task(entry_id, task_fcn, action_name, spawns_subtasks=False): TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string) - # Now that we have an entry we can try to catch failures: - task_progress = None - try: - # Check that the task_id submitted in the InstructorTask matches the current task - # that is running. - request_task_id = _get_current_task().request.id - if task_id != request_task_id: - fmt = 'Requested task did not match actual task "{actual_id}": {task_info}' - message = fmt.format(actual_id=request_task_id, task_info=task_info_string) - TASK_LOG.error(message) - raise UpdateProblemModuleStateError(message) + # Check that the task_id submitted in the InstructorTask matches the current task + # that is running. + request_task_id = _get_current_task().request.id + if task_id != request_task_id: + fmt = 'Requested task did not match actual task "{actual_id}": {task_info}' + message = fmt.format(actual_id=request_task_id, task_info=task_info_string) + TASK_LOG.error(message) + raise UpdateProblemModuleStateError(message) - # Now do the work: - with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]): - task_progress = task_fcn(entry_id, course_id, task_input, action_name) - - # If we get here, we assume we've succeeded, so update the InstructorTask entry in anticipation. - # But we do this within the try, in case creating the task_output causes an exception to be - # raised. - # TODO: This is not the case if there are outstanding subtasks that were spawned asynchronously - # as part of the main task. There is probably some way to represent this more elegantly, but for - # now, we will just use an explicit flag. - if spawns_subtasks: - # TODO: UPDATE THIS. - # we change the rules here. If it's a task with subtasks running, then we - # explicitly set its state, with the idea that progress will be updated - # directly into the InstructorTask object, rather than into the parent task's - # AsyncResult object. This is because we have to write to the InstructorTask - # object anyway, so we may as well put status in there. And because multiple - # clients are writing to it, we need the locking that a DB can provide, rather - # than the speed that the AsyncResult provides. - # So we need to change the logic of the monitor to pull status from the - # InstructorTask directly when the state is PROGRESS, and to pull from the - # AsyncResult when it's running but not marked as in PROGRESS state. (I.e. - # if it's started.) Admittedly, it's misnamed, but it should work. - # But we've already started the subtasks by the time we get here, - # so these values should already have been written. Too late. - # entry.task_output = InstructorTask.create_output_for_success(task_progress) - # entry.task_state = PROGRESS - # Weird. Note that by exiting this function successfully, will - # result in the AsyncResult for this task as being marked as SUCCESS. - # Below, we were just marking the entry to match. But it shouldn't - # match, if it's not really done. - pass - else: - entry.task_output = InstructorTask.create_output_for_success(task_progress) - entry.task_state = SUCCESS - entry.save_now() - - except Exception: - # try to write out the failure to the entry before failing - _, exception, traceback = exc_info() - traceback_string = format_exc(traceback) if traceback is not None else '' - TASK_LOG.warning("background task (%s) failed: %s %s", task_id, exception, traceback_string) - entry.task_output = InstructorTask.create_output_for_failure(exception, traceback_string) - entry.task_state = FAILURE - entry.save_now() - raise + # Now do the work: + with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]): + task_progress = task_fcn(entry_id, course_id, task_input, action_name) # Release any queries that the connection has been hanging onto: reset_queries() diff --git a/lms/djangoapps/instructor_task/tests/test_tasks.py b/lms/djangoapps/instructor_task/tests/test_tasks.py index 1d9e3dba96..b0456822a6 100644 --- a/lms/djangoapps/instructor_task/tests/test_tasks.py +++ b/lms/djangoapps/instructor_task/tests/test_tasks.py @@ -69,7 +69,7 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): 'request_info': {}, } - def _run_task_with_mock_celery(self, task_function, entry_id, task_id, expected_failure_message=None): + def _run_task_with_mock_celery(self, task_class, entry_id, task_id, expected_failure_message=None): """Submit a task and mock how celery provides a current_task.""" self.current_task = Mock() self.current_task.request = Mock() @@ -77,32 +77,34 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): self.current_task.update_state = Mock() if expected_failure_message is not None: self.current_task.update_state.side_effect = TestTaskFailure(expected_failure_message) + task_args = [entry_id, self._get_xmodule_instance_args()] + with patch('instructor_task.tasks_helper._get_current_task') as mock_get_task: mock_get_task.return_value = self.current_task - return task_function(entry_id, self._get_xmodule_instance_args()) + return task_class.apply(task_args, task_id=task_id).get() - def _test_missing_current_task(self, task_function): - """Check that a task_function fails when celery doesn't provide a current_task.""" + def _test_missing_current_task(self, task_class): + """Check that a task_class fails when celery doesn't provide a current_task.""" task_entry = self._create_input_entry() with self.assertRaises(UpdateProblemModuleStateError): - task_function(task_entry.id, self._get_xmodule_instance_args()) + task_class(task_entry.id, self._get_xmodule_instance_args()) - def _test_undefined_course(self, task_function): + def _test_undefined_course(self, task_class): # run with celery, but no course defined task_entry = self._create_input_entry(course_id="bogus/course/id") with self.assertRaises(ItemNotFoundError): - self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id) + self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id) - def _test_undefined_problem(self, task_function): + def _test_undefined_problem(self, task_class): """Run with celery, but no problem defined.""" task_entry = self._create_input_entry() with self.assertRaises(ItemNotFoundError): - self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id) + self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id) - def _test_run_with_task(self, task_function, action_name, expected_num_succeeded): + def _test_run_with_task(self, task_class, action_name, expected_num_succeeded): """Run a task and check the number of StudentModules processed.""" task_entry = self._create_input_entry() - status = self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id) + status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id) # check return value self.assertEquals(status.get('attempted'), expected_num_succeeded) self.assertEquals(status.get('succeeded'), expected_num_succeeded) @@ -114,10 +116,10 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): self.assertEquals(json.loads(entry.task_output), status) self.assertEquals(entry.task_state, SUCCESS) - def _test_run_with_no_state(self, task_function, action_name): + def _test_run_with_no_state(self, task_class, action_name): """Run with no StudentModules defined for the current problem.""" self.define_option_problem(PROBLEM_URL_NAME) - self._test_run_with_task(task_function, action_name, 0) + self._test_run_with_task(task_class, action_name, 0) def _create_students_with_state(self, num_students, state=None, grade=0, max_grade=1): """Create students, a problem, and StudentModule objects for testing""" @@ -145,12 +147,12 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): state = json.loads(module.state) self.assertEquals(state['attempts'], num_attempts) - def _test_run_with_failure(self, task_function, expected_message): + def _test_run_with_failure(self, task_class, expected_message): """Run a task and trigger an artificial failure with the given message.""" task_entry = self._create_input_entry() self.define_option_problem(PROBLEM_URL_NAME) with self.assertRaises(TestTaskFailure): - self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id, expected_message) + self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id, expected_message) # compare with entry in table: entry = InstructorTask.objects.get(id=task_entry.id) self.assertEquals(entry.task_state, FAILURE) @@ -158,7 +160,7 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): self.assertEquals(output['exception'], 'TestTaskFailure') self.assertEquals(output['message'], expected_message) - def _test_run_with_long_error_msg(self, task_function): + def _test_run_with_long_error_msg(self, task_class): """ Run with an error message that is so long it will require truncation (as well as the jettisoning of the traceback). @@ -167,7 +169,7 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): self.define_option_problem(PROBLEM_URL_NAME) expected_message = "x" * 1500 with self.assertRaises(TestTaskFailure): - self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id, expected_message) + self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id, expected_message) # compare with entry in table: entry = InstructorTask.objects.get(id=task_entry.id) self.assertEquals(entry.task_state, FAILURE) @@ -177,7 +179,7 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): self.assertEquals(output['message'], expected_message[:len(output['message']) - 3] + "...") self.assertTrue('traceback' not in output) - def _test_run_with_short_error_msg(self, task_function): + def _test_run_with_short_error_msg(self, task_class): """ Run with an error message that is short enough to fit in the output, but long enough that the traceback won't. @@ -187,7 +189,7 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): self.define_option_problem(PROBLEM_URL_NAME) expected_message = "x" * 900 with self.assertRaises(TestTaskFailure): - self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id, expected_message) + self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id, expected_message) # compare with entry in table: entry = InstructorTask.objects.get(id=task_entry.id) self.assertEquals(entry.task_state, FAILURE) @@ -198,33 +200,6 @@ class TestInstructorTasks(InstructorTaskModuleTestCase): self.assertEquals(output['traceback'][-3:], "...") -class TestGeneralInstructorTask(TestInstructorTasks): - """Tests instructor task mechanism using custom tasks""" - - def test_successful_result_too_long(self): - # while we don't expect the existing tasks to generate output that is too - # long, we can test the framework will handle such an occurrence. - task_entry = self._create_input_entry() - self.define_option_problem(PROBLEM_URL_NAME) - action_name = 'x' * 1000 - # define a custom task that does nothing: - update_fcn = lambda(_module_descriptor, _student_module): UPDATE_STATUS_SUCCEEDED - visit_fcn = partial(perform_module_state_update, update_fcn, None) - task_function = (lambda entry_id, xmodule_instance_args: - run_main_task(entry_id, visit_fcn, action_name)) - # run the task: - with self.assertRaises(ValueError): - self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id) - # compare with entry in table: - entry = InstructorTask.objects.get(id=task_entry.id) - self.assertEquals(entry.task_state, FAILURE) - self.assertGreater(1023, len(entry.task_output)) - output = json.loads(entry.task_output) - self.assertEquals(output['exception'], 'ValueError') - self.assertTrue("Length of task output is too long" in output['message']) - self.assertTrue('traceback' not in output) - - class TestRescoreInstructorTask(TestInstructorTasks): """Tests problem-rescoring instructor task.""" @@ -257,8 +232,8 @@ class TestRescoreInstructorTask(TestInstructorTasks): task_entry = self._create_input_entry() mock_instance = MagicMock() del mock_instance.rescore_problem - # TODO: figure out why this patch isn't working - # with patch('courseware.module_render.get_module_for_descriptor_internal') as mock_get_module: + # TODO: figure out why this patch isn't working, when it seems to work fine for + # the test_rescoring_success test below. Weird. with patch('courseware.module_render.get_module_for_descriptor_internal') as mock_get_module: mock_get_module.return_value = mock_instance with self.assertRaises(UpdateProblemModuleStateError):