diff --git a/lms/djangoapps/instructor_task/tasks_helper/grades.py b/lms/djangoapps/instructor_task/tasks_helper/grades.py index b4246fa7e0..805064ce16 100644 --- a/lms/djangoapps/instructor_task/tasks_helper/grades.py +++ b/lms/djangoapps/instructor_task/tasks_helper/grades.py @@ -2,6 +2,7 @@ Functionality for generating grade reports. """ + import logging import re from collections import OrderedDict, defaultdict @@ -68,95 +69,6 @@ def _flatten(iterable): return list(chain.from_iterable(iterable)) -class GradeReportBase(object): - """ - Base class for grade reports (ProblemGradeReport and CourseGradeReport). - """ - def _handle_empty_generator(self, generator, default): - """ - Handle empty generator. - Return default if the generator is emtpy, otherwise return all - its iterations (including the first which was used for validation). - """ - empty_generator_sentinel = object() - first_iteration_output = next(generator, empty_generator_sentinel) - generator_is_empty = first_iteration_output == empty_generator_sentinel - - if generator_is_empty: - yield default - - else: - yield first_iteration_output - for element in generator: - yield element - - def _batch_users(self, context): - """ - Returns a generator of batches of users. - """ - - def grouper(iterable, chunk_size=100, fillvalue=None): - args = [iter(iterable)] * chunk_size - return zip_longest(*args, fillvalue=fillvalue) - - def get_enrolled_learners_for_course(course_id, verified_only=False): - """ - Get all the enrolled users in a course chunk by chunk. - - This generator method fetches & loads the enrolled user objects on demand which in chunk - size defined. This method is a workaround to avoid out-of-memory errors. - """ - filter_kwargs = { - 'courseenrollment__course_id': course_id, - } - if verified_only: - filter_kwargs['courseenrollment__mode'] = CourseMode.VERIFIED - - user_ids_list = get_user_model().objects.filter(**filter_kwargs).values_list('id', flat=True).order_by('id') - user_chunks = grouper(user_ids_list) - for user_ids in user_chunks: - user_ids = [user_id for user_id in user_ids if user_id is not None] - min_id = min(user_ids) - max_id = max(user_ids) - users = get_user_model().objects.filter( - id__gte=min_id, - id__lte=max_id, - **filter_kwargs - ).select_related('profile') - yield users - - course_id = context.course_id - - report_for_verified_only = generate_grade_report_for_verified_only() - return get_enrolled_learners_for_course(course_id=course_id, verified_only=report_for_verified_only) - - def _compile(self, context, batched_rows): - """ - Compiles and returns the complete list of (success_rows, error_rows) for - the given batched_rows and context. - """ - # partition and chain successes and errors - success_rows, error_rows = zip(*batched_rows) - success_rows = list(chain(*success_rows)) - error_rows = list(chain(*error_rows)) - - # update metrics on task status - context.task_progress.succeeded = len(success_rows) - context.task_progress.failed = len(error_rows) - context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed - context.task_progress.total = context.task_progress.attempted - return success_rows, error_rows - - def _upload(self, context, success_rows, error_rows): - """ - Creates and uploads a CSV for the given headers and rows. - """ - date = datetime.now(UTC) - upload_csv_to_report_store(success_rows, context.file_name, context.course_id, date) - if len(error_rows) > 1: - upload_csv_to_report_store(error_rows, context.file_name + '_err', context.course_id, date) - - class _CourseGradeReportContext(object): """ Internal class that provides a common context to use for a single grade @@ -164,7 +76,6 @@ class _CourseGradeReportContext(object): elements of this context are serialized and parsed across process boundaries. """ - def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): self.task_info_string = ( u'Task: {task_id}, ' @@ -245,41 +156,6 @@ class _CourseGradeReportContext(object): return self.task_progress.update_task_state(extra_meta={'step': message}) -class _ProblemGradeReportContext(object): - """ - Internal class that provides a common context to use for a single problem - grade report. When a report is parallelized across multiple processes, - elements of this context are serialized and parsed across process - boundaries. - """ - - def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): - self.task_info_string = ( - 'Task: {task_id}, ' - 'InstructorTask ID: {entry_id}, ' - 'Course: {course_id}, ' - 'Input: {task_input}' - ).format( - task_id=_xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None, - entry_id=_entry_id, - course_id=course_id, - task_input=_task_input, - ) - self.action_name = action_name - self.course_id = course_id - self.task_progress = TaskProgress(self.action_name, total=None, start_time=time()) - self.course = get_course_by_id(self.course_id) - self.file_name = 'problem_grade_report' - - def update_status(self, message): - """ - Updates the status on the celery task to the given message. - Also logs the update. - """ - TASK_LOG.info('%s, Task type: %s, %s', self.task_info_string, self.action_name, message) - return self.task_progress.update_task_state(extra_meta={'step': message}) - - class _CertificateBulkContext(object): def __init__(self, context, users): certificate_whitelist = CertificateWhitelist.objects.filter(course_id=context.course_id, whitelist=True) @@ -426,7 +302,6 @@ class CourseGradeReport(object): """ Returns a generator of batches of users. """ - def grouper(iterable, chunk_size=self.USER_BATCH_SIZE, fillvalue=None): args = [iter(iterable)] * chunk_size return zip_longest(*args, fillvalue=fillvalue) @@ -644,70 +519,106 @@ class CourseGradeReport(object): return success_rows, error_rows -class ProblemGradeReport(GradeReportBase): - """ - Class to encapsulate functionality related to generating Problem Grade Reports. - """ +class ProblemGradeReport(object): @classmethod def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): - """ - Public method to generate a grade report. - """ - with modulestore().bulk_operations(course_id): - context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name) - # pylint: disable=protected-access - return ProblemGradeReport()._generate(context) - - def _generate(self, context): """ Generate a CSV containing all students' problem grades within a given `course_id`. """ - context.update_status('ProblemGradeReport Step 1: Starting problem grades') - course = get_course_by_id(context.course_id) + + def log_task_info(message): + """ + Updates the status on the celery task to the given message. + Also logs the update. + """ + fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}' + task_info_string = fmt.format( + task_id=task_id, entry_id=_entry_id, course_id=course_id, task_input=_task_input + ) + TASK_LOG.info(u'%s, Task type: %s, %s, %s', task_info_string, action_name, message, task_progress.state) + + start_time = time() + start_date = datetime.now(UTC) + status_interval = 100 + task_id = _xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None + + report_for_verified_only = generate_grade_report_for_verified_only() + enrolled_students = CourseEnrollment.objects.users_enrolled_in( + course_id=course_id, + include_inactive=True, + verified_only=report_for_verified_only, + ) + task_progress = TaskProgress(action_name, enrolled_students.count(), start_time) + + # This struct encapsulates both the display names of each static item in the + # header row as values as well as the django User field names of those items + # as the keys. It is structured in this way to keep the values related. header_row = OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')]) - context.update_status('ProblemGradeReport Step 2: Retrieving scorable grade blocks for course') - graded_scorable_blocks = self._graded_scorable_blocks_to_header(course) - context.update_status('ProblemGradeReport Step 3: Setting up headers for problem grade report') - success_headers = self._success_headers(header_row, graded_scorable_blocks) - error_headers = self._error_headers(header_row) - context.update_status('ProblemGradeReport Step 4: Retrieving problem scores for course for enrolled users') - generated_rows = self._batched_rows(context, header_row, graded_scorable_blocks) - success_rows, error_rows = self._compile(context, generated_rows) - context.update_status('ProblemGradeReport Step 5: Uploading data to CSV report file') - self._upload(context, [success_headers] + success_rows, [error_headers] + error_rows) + course = get_course_by_id(course_id) + log_task_info(u'Retrieving graded scorable blocks') + graded_scorable_blocks = cls._graded_scorable_blocks_to_header(course) - return context.update_status('ProblemGradeReport Step 6: Completed problem grades report') + # Just generate the static fields for now. + rows = [ + list(header_row.values()) + ['Enrollment Status', 'Grade'] + _flatten(list(graded_scorable_blocks.values())) + ] + error_rows = [list(header_row.values()) + ['error_msg']] - def _success_headers(self, header_row, graded_scorable_blocks): - """ - Returns headers for all gradable blocks including fixed headers - for report. + # Bulk fetch and cache enrollment states so we can efficiently determine + # whether each user is currently enrolled in the course. + log_task_info(u'Fetching enrollment status') + CourseEnrollment.bulk_fetch_enrollment_states(enrolled_students, course_id) - Arguments: - header_row (dict): list of header values - graded_scorable_blocks (dict): dict of graded_scorable_blocks + for student, course_grade, error in CourseGradeFactory().iter(enrolled_students, course): + student_fields = [getattr(student, field_name) for field_name in header_row] + task_progress.attempted += 1 - Returns: - list: combined header and scorable blocks - """ - header_row = list(header_row.values()) + ['Enrollment Status', 'Grade'] - return header_row + _flatten(list(graded_scorable_blocks.values())) + if not course_grade: + err_msg = text_type(error) + # There was an error grading this student. + if not err_msg: + err_msg = u'Unknown error' + error_rows.append(student_fields + [err_msg]) + task_progress.failed += 1 + continue - def _error_headers(self, header_row): - """ - Returns error headers for error report. + enrollment_status = _user_enrollment_status(student, course_id) - Arguments: - header_row (dict): list of header values + earned_possible_values = [] + for block_location in graded_scorable_blocks: + try: + problem_score = course_grade.problem_scores[block_location] + except KeyError: + earned_possible_values.append([u'Not Available', u'Not Available']) + else: + if problem_score.first_attempted: + earned_possible_values.append([problem_score.earned, problem_score.possible]) + else: + earned_possible_values.append([u'Not Attempted', problem_score.possible]) - Returns: - list: error headers - """ - return list(header_row.values()) + ['error_msg'] + rows.append(student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values)) - def _graded_scorable_blocks_to_header(self, course): + task_progress.succeeded += 1 + if task_progress.attempted % status_interval == 0: + step = u'Calculating Grades' + task_progress.update_task_state(extra_meta={'step': step}) + log_message = u'{0} {1}/{2}'.format(step, task_progress.attempted, task_progress.total) + log_task_info(log_message) + + log_task_info('Uploading CSV to store') + # Perform the upload if any students have been successfully graded + if len(rows) > 1: + upload_csv_to_report_store(rows, 'problem_grade_report', course_id, start_date) + # If there are any error rows, write them out as well + if len(error_rows) > 1: + upload_csv_to_report_store(error_rows, 'problem_grade_report_err', course_id, start_date) + + return task_progress.update_task_state(extra_meta={'step': 'Uploading CSV'}) + + @classmethod + def _graded_scorable_blocks_to_header(cls, course): """ Returns an OrderedDict that maps a scorable block's id to its headers in the final report. @@ -718,8 +629,8 @@ class ProblemGradeReport(GradeReportBase): for subsection_index, subsection_info in enumerate(subsection_infos, start=1): for scorable_block in subsection_info['scored_descendants']: header_name = ( - "{assignment_type} {subsection_index}: " - "{subsection_name} - {scorable_block_name}" + u"{assignment_type} {subsection_index}: " + u"{subsection_name} - {scorable_block_name}" ).format( scorable_block_name=scorable_block.display_name, assignment_type=assignment_type_name, @@ -730,53 +641,9 @@ class ProblemGradeReport(GradeReportBase): header_name + " (Possible)"] return scorable_blocks_map - def _rows_for_users(self, context, users, header_row, graded_scorable_blocks): - """ - Returns a list of rows for the given users for this report. - """ - success_rows, error_rows = [], [] - for student, course_grade, error in CourseGradeFactory().iter(users, context.course): - student_fields = [getattr(student, field_name) for field_name in header_row] - - if not course_grade: - err_msg = text_type(error) - # There was an error grading this student. - if not err_msg: - err_msg = 'Unknown error' - error_rows.append(student_fields + [err_msg]) - continue - - enrollment_status = _user_enrollment_status(student, context.course_id) - - earned_possible_values = [] - for block_location in graded_scorable_blocks: - try: - problem_score = course_grade.problem_scores[block_location] - except KeyError: - earned_possible_values.append(['Not Available', 'Not Available']) - else: - if problem_score.first_attempted: - earned_possible_values.append([problem_score.earned, problem_score.possible]) - else: - earned_possible_values.append(['Not Attempted', problem_score.possible]) - - success_rows.append( - student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values)) - - return success_rows, error_rows - - def _batched_rows(self, context, header_row, graded_scorable_blocks): - """ - A generator of batches of (success_rows, error_rows) for this report. - """ - for users in self._handle_empty_generator(self._batch_users(context), default=[]): - yield self._rows_for_users(context, users, header_row, graded_scorable_blocks) - class ProblemResponses(object): - """ - Class to encapsulate functionality related to generating Problem Responses Reports. - """ + @classmethod def _build_problem_list(cls, course_blocks, root, path=None): """