diff --git a/lms/djangoapps/instructor_task/tasks_helper.py b/lms/djangoapps/instructor_task/tasks_helper.py index a60f955176..d712e824b2 100644 --- a/lms/djangoapps/instructor_task/tasks_helper.py +++ b/lms/djangoapps/instructor_task/tasks_helper.py @@ -148,6 +148,49 @@ def _get_current_task(): return current_task +class TaskProgress(object): + """ + Encapsulates the current task's progress by keeping track of + 'attempted', 'succeeded', 'skipped', 'failed', 'total', + 'action_name', and 'duration_ms' values. + """ + def __init__(self, action_name, total, start_time): + self.action_name = action_name + self.total = total + self.start_time = start_time + self.attempted = 0 + self.succeeded = 0 + self.skipped = 0 + self.failed = 0 + + def update_task_state(self, extra_meta=None): + """ + Update the current celery task's state to the progress state + specified by the current object. Returns the progress + dictionary for use by `run_main_task` and + `BaseInstructorTask.on_success`. + + Arguments: + extra_meta (dict): Extra metadata to pass to `update_state` + + Returns: + dict: The current task's progress dict + """ + progress_dict = { + 'action_name': self.action_name, + 'attempted': self.attempted, + 'succeeded': self.succeeded, + 'skipped': self.skipped, + 'failed': self.failed, + 'total': self.total, + 'duration_ms': int((time() - self.start_time) * 1000), + } + if extra_meta is not None: + progress_dict.update(extra_meta) + _get_current_task().update_state(state=PROGRESS, meta=progress_dict) + return progress_dict + + def run_main_task(entry_id, task_fcn, action_name): """ Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask. @@ -243,9 +286,7 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta result object. """ - # get start time for task: start_time = time() - usage_key = course_id.make_usage_key_from_deprecated_string(task_input.get('problem_url')) student_identifier = task_input.get('student') @@ -272,30 +313,11 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta if filter_fcn is not None: modules_to_update = filter_fcn(modules_to_update) - # perform the main loop - num_attempted = 0 - num_succeeded = 0 - num_skipped = 0 - num_failed = 0 - num_total = modules_to_update.count() + task_progress = TaskProgress(action_name, modules_to_update.count(), start_time) + task_progress.update_task_state() - def get_task_progress(): - """Return a dict containing info about current task""" - current_time = time() - progress = {'action_name': action_name, - 'attempted': num_attempted, - 'succeeded': num_succeeded, - 'skipped': num_skipped, - 'failed': num_failed, - 'total': num_total, - 'duration_ms': int((current_time - start_time) * 1000), - } - return progress - - task_progress = get_task_progress() - _get_current_task().update_state(state=PROGRESS, meta=task_progress) for module_to_update in modules_to_update: - num_attempted += 1 + task_progress.attempted += 1 # There is no try here: if there's an error, we let it throw, and the task will # be marked as FAILED, with a stack trace. with dog_stats_api.timer('instructor_tasks.module.time.step', tags=[u'action:{name}'.format(name=action_name)]): @@ -303,19 +325,15 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta if update_status == UPDATE_STATUS_SUCCEEDED: # If the update_fcn returns true, then it performed some kind of work. # Logging of failures is left to the update_fcn itself. - num_succeeded += 1 + task_progress.succeeded += 1 elif update_status == UPDATE_STATUS_FAILED: - num_failed += 1 + task_progress.failed += 1 elif update_status == UPDATE_STATUS_SKIPPED: - num_skipped += 1 + task_progress.skipped += 1 else: raise UpdateProblemModuleStateError("Unexpected update_status returned: {}".format(update_status)) - # update task status: - task_progress = get_task_progress() - _get_current_task().update_state(state=PROGRESS, meta=task_progress) - - return task_progress + return task_progress.update_task_state() def _get_task_id_from_xmodule_args(xmodule_instance_args): @@ -518,45 +536,26 @@ def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input, make a more general CSVDoc class instead of building out the rows like we do here. """ - start_time = datetime.now(UTC) + start_time = time() + start_date = datetime.now(UTC) status_interval = 100 - enrolled_students = CourseEnrollment.users_enrolled_in(course_id) - num_total = enrolled_students.count() - num_attempted = 0 - num_succeeded = 0 - num_failed = 0 - curr_step = "Calculating Grades" - - def update_task_progress(): - """Return a dict containing info about current task""" - current_time = datetime.now(UTC) - progress = { - 'action_name': action_name, - 'attempted': num_attempted, - 'succeeded': num_succeeded, - 'failed': num_failed, - 'total': num_total, - 'duration_ms': int((current_time - start_time).total_seconds() * 1000), - 'step': curr_step, - } - _get_current_task().update_state(state=PROGRESS, meta=progress) - - return progress + task_progress = TaskProgress(action_name, enrolled_students.count(), start_time) # Loop over all our students and build our CSV lists in memory header = None rows = [] err_rows = [["id", "username", "error_msg"]] + current_step = {'step': 'Calculating Grades'} for student, gradeset, err_msg in iterate_grades_for(course_id, enrolled_students): # Periodically update task status (this is a cache write) - if num_attempted % status_interval == 0: - update_task_progress() - num_attempted += 1 + if task_progress.attempted % status_interval == 0: + task_progress.update_task_state(extra_meta=current_step) + task_progress.attempted += 1 if gradeset: # We were able to successfully grade this student for this course. - num_succeeded += 1 + task_progress.succeeded += 1 if not header: # Encode the header row in utf-8 encoding in case there are unicode characters header = [section['label'].encode('utf-8') for section in gradeset[u'section_breakdown']] @@ -578,37 +577,50 @@ def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input, rows.append([student.id, student.email, student.username, gradeset['percent']] + row_percents) else: # An empty gradeset means we failed to grade a student. - num_failed += 1 + task_progress.failed += 1 err_rows.append([student.id, student.username, err_msg]) # By this point, we've got the rows we're going to stuff into our CSV files. - curr_step = "Uploading CSVs" - update_task_progress() + current_step = {'step': 'Uploading CSVs'} + task_progress.update_task_state(extra_meta=current_step) # Perform the actual upload - upload_csv_to_report_store(rows, 'grade_report', course_id, start_time) + upload_csv_to_report_store(rows, 'grade_report', course_id, start_date) # If there are any error rows (don't count the header), write them out as well if len(err_rows) > 1: - upload_csv_to_report_store(err_rows, 'grade_report_err', course_id, start_time) + upload_csv_to_report_store(err_rows, 'grade_report_err', course_id, start_date) # One last update before we close out... - return update_task_progress() + return task_progress.update_task_state(extra_meta=current_step) -def upload_students_csv(_xmodule_instance_args, _entry_id, course_id, task_input, _action_name): +def upload_students_csv(_xmodule_instance_args, _entry_id, course_id, task_input, action_name): """ For a given `course_id`, generate a CSV file containing profile information for all students that are enrolled, and store using a `ReportStore`. """ + start_time = time() + start_date = datetime.now(UTC) + task_progress = TaskProgress(action_name, CourseEnrollment.num_enrolled_in(course_id), start_time) + current_step = {'step': 'Calculating Profile Info'} + task_progress.update_task_state(extra_meta=current_step) + # compute the student features table and format it query_features = task_input.get('features') student_data = enrolled_students_features(course_id, query_features) header, rows = format_dictlist(student_data, query_features) + + task_progress.attempted = task_progress.succeeded = len(rows) + task_progress.skipped = task_progress.total - task_progress.attempted + rows.insert(0, header) - # Perform the upload - upload_csv_to_report_store(rows, 'student_profile_info', course_id, datetime.now(UTC)) + current_step = {'step': 'Uploading CSV'} + task_progress.update_task_state(extra_meta=current_step) - return UPDATE_STATUS_SUCCEEDED + # Perform the upload + upload_csv_to_report_store(rows, 'student_profile_info', course_id, start_date) + + return task_progress.update_task_state(extra_meta=current_step) diff --git a/lms/djangoapps/instructor_task/tests/test_tasks_helper.py b/lms/djangoapps/instructor_task/tests/test_tasks_helper.py index b7e63ed195..cdaf037338 100644 --- a/lms/djangoapps/instructor_task/tests/test_tasks_helper.py +++ b/lms/djangoapps/instructor_task/tests/test_tasks_helper.py @@ -19,7 +19,7 @@ from xmodule.modulestore.tests.factories import CourseFactory from student.tests.factories import CourseEnrollmentFactory, UserFactory from instructor_task.models import ReportStore -from instructor_task.tasks_helper import upload_grades_csv, upload_students_csv, UPDATE_STATUS_SUCCEEDED +from instructor_task.tasks_helper import upload_grades_csv, upload_students_csv class TestReport(ModuleStoreTestCase): @@ -36,6 +36,7 @@ class TestReport(ModuleStoreTestCase): def create_student(self, username, email): student = UserFactory.create(username=username, email=email) CourseEnrollmentFactory.create(user=student, course_id=self.course.id) + return student @ddt.ddt @@ -56,8 +57,25 @@ class TestInstructorGradeReport(TestReport): with patch('instructor_task.tasks_helper._get_current_task') as mock_current_task: mock_current_task.return_value = self.current_task result = upload_grades_csv(None, None, self.course.id, None, 'graded') - #This assertion simply confirms that the generation completed with no errors - self.assertEquals(result['succeeded'], result['attempted']) + num_students = len(emails) + self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result) + + @patch('instructor_task.tasks_helper._get_current_task') + @patch('instructor_task.tasks_helper.iterate_grades_for') + def test_grading_failure(self, mock_iterate_grades_for, _mock_current_task): + """ + Test that any grading errors are properly reported in the + progress dict and uploaded to the report store. + """ + # mock an error response from `iterate_grades_for` + mock_iterate_grades_for.return_value = [ + (self.create_student('username', 'student@example.com'), {}, 'Cannot grade student') + ] + result = upload_grades_csv(None, None, self.course.id, None, 'graded') + self.assertDictContainsSubset({'attempted': 1, 'succeeded': 0, 'failed': 1}, result) + + report_store = ReportStore.from_config() + self.assertTrue(any('grade_report_err' in item[0] for item in report_store.links_for(self.course.id))) @ddt.ddt @@ -66,6 +84,7 @@ class TestStudentReport(TestReport): Tests that CSV student profile report generation works. """ def test_success(self): + self.create_student('student', 'student@example.com') task_input = {'features': []} with patch('instructor_task.tasks_helper._get_current_task'): result = upload_students_csv(None, None, self.course.id, task_input, 'calculated') @@ -73,7 +92,7 @@ class TestStudentReport(TestReport): links = report_store.links_for(self.course.id) self.assertEquals(len(links), 1) - self.assertEquals(result, UPDATE_STATUS_SUCCEEDED) + self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result) @ddt.data([u'student', u'student\xec']) def test_unicode_usernames(self, students): @@ -97,4 +116,5 @@ class TestStudentReport(TestReport): mock_current_task.return_value = self.current_task result = upload_students_csv(None, None, self.course.id, task_input, 'calculated') #This assertion simply confirms that the generation completed with no errors - self.assertEquals(result, UPDATE_STATUS_SUCCEEDED) + num_students = len(students) + self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result)