""" Test for LMS courseware background task queue management """ import logging import json from celery.states import SUCCESS, FAILURE, REVOKED from mock import Mock, patch from uuid import uuid4 from django.utils.datastructures import MultiValueDict from django.test.testcases import TestCase from xmodule.modulestore.exceptions import ItemNotFoundError from courseware.tests.factories import UserFactory from instructor_task.tests.factories import InstructorTaskFactory from instructor_task.tasks_helper import PROGRESS from instructor_task.views import instructor_task_status from instructor_task.api import (get_running_instructor_tasks, submit_rescore_problem_for_all_students, submit_rescore_problem_for_student, submit_reset_problem_attempts_for_all_students, submit_delete_problem_state_for_all_students) from instructor_task.api_helper import (QUEUING, # AlreadyRunningError, encode_problem_and_student_input, ) log = logging.getLogger(__name__) TEST_COURSE_ID = 'edx/1.23x/test_course' TEST_FAILURE_MESSAGE = 'task failed horribly' class TaskSubmitTestCase(TestCase): """ Check that background tasks are properly queued and report status. """ def setUp(self): self.student = UserFactory.create(username="student", email="student@edx.org") self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org") self.problem_url = TaskSubmitTestCase.problem_location("test_urlname") @staticmethod def problem_location(problem_url_name): """ Create an internal location for a test problem. """ return "i4x://{org}/{number}/problem/{problem_url_name}".format(org='edx', number='1.23x', problem_url_name=problem_url_name) def _create_entry(self, task_state=QUEUING, task_output=None, student=None): """Creates a InstructorTask entry for testing.""" task_id = str(uuid4()) progress_json = json.dumps(task_output) task_input, task_key = encode_problem_and_student_input(self.problem_url, student) instructor_task = InstructorTaskFactory.create(course_id=TEST_COURSE_ID, requester=self.instructor, task_input=json.dumps(task_input), task_key=task_key, task_id=task_id, task_state=task_state, task_output=progress_json) return instructor_task def _create_failure_entry(self): """Creates a InstructorTask entry representing a failed task.""" # view task entry for task failure progress = {'message': TEST_FAILURE_MESSAGE, 'exception': 'RandomCauseError', } return self._create_entry(task_state=FAILURE, task_output=progress) def _create_success_entry(self, student=None): """Creates a InstructorTask entry representing a successful task.""" return self._create_progress_entry(student, task_state=SUCCESS) def _create_progress_entry(self, student=None, task_state=PROGRESS): """Creates a InstructorTask entry representing a task in progress.""" progress = {'attempted': 3, 'updated': 2, 'total': 10, 'action_name': 'rescored', 'message': 'some random string that should summarize the other info', } return self._create_entry(task_state=task_state, task_output=progress, student=student) def test_get_running_instructor_tasks(self): # when fetching running tasks, we get all running tasks, and only running tasks for _ in range(1, 5): self._create_failure_entry() self._create_success_entry() progress_task_ids = [self._create_progress_entry().task_id for _ in range(1, 5)] task_ids = [instructor_task.task_id for instructor_task in get_running_instructor_tasks(TEST_COURSE_ID)] self.assertEquals(set(task_ids), set(progress_task_ids)) def _get_instructor_task_status(self, task_id): """Returns status corresponding to task_id via api method.""" request = Mock() request.REQUEST = {'task_id': task_id} return instructor_task_status(request) def test_instructor_task_status(self): instructor_task = self._create_failure_entry() task_id = instructor_task.task_id request = Mock() request.REQUEST = {'task_id': task_id} response = instructor_task_status(request) output = json.loads(response.content) self.assertEquals(output['task_id'], task_id) def test_instructor_task_status_list(self): # Fetch status for existing tasks by arg list, as if called from ajax. # Note that ajax does something funny with the marshalling of # list data, so the key value has "[]" appended to it. task_ids = [(self._create_failure_entry()).task_id for _ in range(1, 5)] request = Mock() request.REQUEST = MultiValueDict({'task_ids[]': task_ids}) response = instructor_task_status(request) output = json.loads(response.content) self.assertEquals(len(output), len(task_ids)) for task_id in task_ids: self.assertEquals(output[task_id]['task_id'], task_id) def test_get_status_from_failure(self): instructor_task = self._create_failure_entry() task_id = instructor_task.task_id response = self._get_instructor_task_status(task_id) output = json.loads(response.content) self.assertEquals(output['task_id'], task_id) self.assertEquals(output['task_state'], FAILURE) self.assertFalse(output['in_progress']) self.assertEquals(output['message'], TEST_FAILURE_MESSAGE) def test_get_status_from_success(self): instructor_task = self._create_success_entry() task_id = instructor_task.task_id response = self._get_instructor_task_status(task_id) output = json.loads(response.content) self.assertEquals(output['task_id'], task_id) self.assertEquals(output['task_state'], SUCCESS) self.assertFalse(output['in_progress']) def test_update_progress_to_progress(self): # view task entry for task in progress instructor_task = self._create_progress_entry() task_id = instructor_task.task_id mock_result = Mock() mock_result.task_id = task_id mock_result.state = PROGRESS mock_result.result = {'attempted': 5, 'updated': 4, 'total': 10, 'action_name': 'rescored'} with patch('celery.result.AsyncResult.__new__') as mock_result_ctor: mock_result_ctor.return_value = mock_result response = self._get_instructor_task_status(task_id) output = json.loads(response.content) self.assertEquals(output['task_id'], task_id) self.assertEquals(output['task_state'], PROGRESS) self.assertTrue(output['in_progress']) # self.assertEquals(output['message'], ) def test_update_progress_to_failure(self): # view task entry for task in progress that later fails instructor_task = self._create_progress_entry() task_id = instructor_task.task_id mock_result = Mock() mock_result.task_id = task_id mock_result.state = FAILURE mock_result.result = NotImplementedError("This task later failed.") mock_result.traceback = "random traceback" with patch('celery.result.AsyncResult.__new__') as mock_result_ctor: mock_result_ctor.return_value = mock_result response = self._get_instructor_task_status(task_id) output = json.loads(response.content) self.assertEquals(output['task_id'], task_id) self.assertEquals(output['task_state'], FAILURE) self.assertFalse(output['in_progress']) self.assertEquals(output['message'], "This task later failed.") def test_update_progress_to_revoked(self): # view task entry for task in progress that later fails instructor_task = self._create_progress_entry() task_id = instructor_task.task_id mock_result = Mock() mock_result.task_id = task_id mock_result.state = REVOKED with patch('celery.result.AsyncResult.__new__') as mock_result_ctor: mock_result_ctor.return_value = mock_result response = self._get_instructor_task_status(task_id) output = json.loads(response.content) self.assertEquals(output['task_id'], task_id) self.assertEquals(output['task_state'], REVOKED) self.assertFalse(output['in_progress']) self.assertEquals(output['message'], "Task revoked before running") def _get_output_for_task_success(self, attempted, updated, total, student=None): """returns the task_id and the result returned by instructor_task_status().""" # view task entry for task in progress instructor_task = self._create_progress_entry(student) task_id = instructor_task.task_id mock_result = Mock() mock_result.task_id = task_id mock_result.state = SUCCESS mock_result.result = {'attempted': attempted, 'updated': updated, 'total': total, 'action_name': 'rescored'} with patch('celery.result.AsyncResult.__new__') as mock_result_ctor: mock_result_ctor.return_value = mock_result response = self._get_instructor_task_status(task_id) output = json.loads(response.content) return task_id, output def test_update_progress_to_success(self): task_id, output = self._get_output_for_task_success(10, 8, 10) self.assertEquals(output['task_id'], task_id) self.assertEquals(output['task_state'], SUCCESS) self.assertFalse(output['in_progress']) def test_success_messages(self): _, output = self._get_output_for_task_success(0, 0, 10) self.assertTrue("Unable to find any students with submissions to be rescored" in output['message']) self.assertFalse(output['succeeded']) _, output = self._get_output_for_task_success(10, 0, 10) self.assertTrue("Problem failed to be rescored for any of 10 students" in output['message']) self.assertFalse(output['succeeded']) _, output = self._get_output_for_task_success(10, 8, 10) self.assertTrue("Problem rescored for 8 of 10 students" in output['message']) self.assertFalse(output['succeeded']) _, output = self._get_output_for_task_success(10, 10, 10) self.assertTrue("Problem successfully rescored for 10 students" in output['message']) self.assertTrue(output['succeeded']) _, output = self._get_output_for_task_success(0, 0, 1, student=self.student) self.assertTrue("Unable to find submission to be rescored for student" in output['message']) self.assertFalse(output['succeeded']) _, output = self._get_output_for_task_success(1, 0, 1, student=self.student) self.assertTrue("Problem failed to be rescored for student" in output['message']) self.assertFalse(output['succeeded']) _, output = self._get_output_for_task_success(1, 1, 1, student=self.student) self.assertTrue("Problem successfully rescored for student" in output['message']) self.assertTrue(output['succeeded']) def test_submit_nonexistent_modules(self): # confirm that a rescore of a non-existent module returns an exception # (Note that it is easier to test a non-rescorable module in test_tasks, # where we are creating real modules. problem_url = self.problem_url course_id = "something else" request = None with self.assertRaises(ItemNotFoundError): submit_rescore_problem_for_student(request, course_id, problem_url, self.student) with self.assertRaises(ItemNotFoundError): submit_rescore_problem_for_all_students(request, course_id, problem_url) with self.assertRaises(ItemNotFoundError): submit_reset_problem_attempts_for_all_students(request, course_id, problem_url) with self.assertRaises(ItemNotFoundError): submit_delete_problem_state_for_all_students(request, course_id, problem_url) # def test_submit_when_running(self): # # get exception when trying to submit a task that is already running # instructor_task = self._create_progress_entry() # problem_url = json.loads(instructor_task.task_input).get('problem_url') # course_id = instructor_task.course_id # # requester doesn't have to be the same when determining if a task is already running # request = Mock() # request.user = self.instructor # with self.assertRaises(AlreadyRunningError): # # just skip making the argument check, so we don't have to fake it deeper down # with patch('instructor_task.api_helper.check_arguments_for_rescoring') as mock_check: # mock_check.return_value = None # submit_rescore_problem_for_all_students(request, course_id, problem_url)