diff --git a/lms/djangoapps/instructor_task/tasks_helper/grades.py b/lms/djangoapps/instructor_task/tasks_helper/grades.py index 805064ce16..b4246fa7e0 100644 --- a/lms/djangoapps/instructor_task/tasks_helper/grades.py +++ b/lms/djangoapps/instructor_task/tasks_helper/grades.py @@ -2,7 +2,6 @@ Functionality for generating grade reports. """ - import logging import re from collections import OrderedDict, defaultdict @@ -69,6 +68,95 @@ def _flatten(iterable): return list(chain.from_iterable(iterable)) +class GradeReportBase(object): + """ + Base class for grade reports (ProblemGradeReport and CourseGradeReport). + """ + def _handle_empty_generator(self, generator, default): + """ + Handle empty generator. + Return default if the generator is emtpy, otherwise return all + its iterations (including the first which was used for validation). + """ + empty_generator_sentinel = object() + first_iteration_output = next(generator, empty_generator_sentinel) + generator_is_empty = first_iteration_output == empty_generator_sentinel + + if generator_is_empty: + yield default + + else: + yield first_iteration_output + for element in generator: + yield element + + def _batch_users(self, context): + """ + Returns a generator of batches of users. + """ + + def grouper(iterable, chunk_size=100, fillvalue=None): + args = [iter(iterable)] * chunk_size + return zip_longest(*args, fillvalue=fillvalue) + + def get_enrolled_learners_for_course(course_id, verified_only=False): + """ + Get all the enrolled users in a course chunk by chunk. + + This generator method fetches & loads the enrolled user objects on demand which in chunk + size defined. This method is a workaround to avoid out-of-memory errors. + """ + filter_kwargs = { + 'courseenrollment__course_id': course_id, + } + if verified_only: + filter_kwargs['courseenrollment__mode'] = CourseMode.VERIFIED + + user_ids_list = get_user_model().objects.filter(**filter_kwargs).values_list('id', flat=True).order_by('id') + user_chunks = grouper(user_ids_list) + for user_ids in user_chunks: + user_ids = [user_id for user_id in user_ids if user_id is not None] + min_id = min(user_ids) + max_id = max(user_ids) + users = get_user_model().objects.filter( + id__gte=min_id, + id__lte=max_id, + **filter_kwargs + ).select_related('profile') + yield users + + course_id = context.course_id + + report_for_verified_only = generate_grade_report_for_verified_only() + return get_enrolled_learners_for_course(course_id=course_id, verified_only=report_for_verified_only) + + def _compile(self, context, batched_rows): + """ + Compiles and returns the complete list of (success_rows, error_rows) for + the given batched_rows and context. + """ + # partition and chain successes and errors + success_rows, error_rows = zip(*batched_rows) + success_rows = list(chain(*success_rows)) + error_rows = list(chain(*error_rows)) + + # update metrics on task status + context.task_progress.succeeded = len(success_rows) + context.task_progress.failed = len(error_rows) + context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed + context.task_progress.total = context.task_progress.attempted + return success_rows, error_rows + + def _upload(self, context, success_rows, error_rows): + """ + Creates and uploads a CSV for the given headers and rows. + """ + date = datetime.now(UTC) + upload_csv_to_report_store(success_rows, context.file_name, context.course_id, date) + if len(error_rows) > 1: + upload_csv_to_report_store(error_rows, context.file_name + '_err', context.course_id, date) + + class _CourseGradeReportContext(object): """ Internal class that provides a common context to use for a single grade @@ -76,6 +164,7 @@ class _CourseGradeReportContext(object): elements of this context are serialized and parsed across process boundaries. """ + def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): self.task_info_string = ( u'Task: {task_id}, ' @@ -156,6 +245,41 @@ class _CourseGradeReportContext(object): return self.task_progress.update_task_state(extra_meta={'step': message}) +class _ProblemGradeReportContext(object): + """ + Internal class that provides a common context to use for a single problem + grade report. When a report is parallelized across multiple processes, + elements of this context are serialized and parsed across process + boundaries. + """ + + def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): + self.task_info_string = ( + 'Task: {task_id}, ' + 'InstructorTask ID: {entry_id}, ' + 'Course: {course_id}, ' + 'Input: {task_input}' + ).format( + task_id=_xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None, + entry_id=_entry_id, + course_id=course_id, + task_input=_task_input, + ) + self.action_name = action_name + self.course_id = course_id + self.task_progress = TaskProgress(self.action_name, total=None, start_time=time()) + self.course = get_course_by_id(self.course_id) + self.file_name = 'problem_grade_report' + + def update_status(self, message): + """ + Updates the status on the celery task to the given message. + Also logs the update. + """ + TASK_LOG.info('%s, Task type: %s, %s', self.task_info_string, self.action_name, message) + return self.task_progress.update_task_state(extra_meta={'step': message}) + + class _CertificateBulkContext(object): def __init__(self, context, users): certificate_whitelist = CertificateWhitelist.objects.filter(course_id=context.course_id, whitelist=True) @@ -302,6 +426,7 @@ class CourseGradeReport(object): """ Returns a generator of batches of users. """ + def grouper(iterable, chunk_size=self.USER_BATCH_SIZE, fillvalue=None): args = [iter(iterable)] * chunk_size return zip_longest(*args, fillvalue=fillvalue) @@ -519,106 +644,70 @@ class CourseGradeReport(object): return success_rows, error_rows -class ProblemGradeReport(object): +class ProblemGradeReport(GradeReportBase): + """ + Class to encapsulate functionality related to generating Problem Grade Reports. + """ @classmethod def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): + """ + Public method to generate a grade report. + """ + with modulestore().bulk_operations(course_id): + context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name) + # pylint: disable=protected-access + return ProblemGradeReport()._generate(context) + + def _generate(self, context): """ Generate a CSV containing all students' problem grades within a given `course_id`. """ - - def log_task_info(message): - """ - Updates the status on the celery task to the given message. - Also logs the update. - """ - fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}' - task_info_string = fmt.format( - task_id=task_id, entry_id=_entry_id, course_id=course_id, task_input=_task_input - ) - TASK_LOG.info(u'%s, Task type: %s, %s, %s', task_info_string, action_name, message, task_progress.state) - - start_time = time() - start_date = datetime.now(UTC) - status_interval = 100 - task_id = _xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None - - report_for_verified_only = generate_grade_report_for_verified_only() - enrolled_students = CourseEnrollment.objects.users_enrolled_in( - course_id=course_id, - include_inactive=True, - verified_only=report_for_verified_only, - ) - task_progress = TaskProgress(action_name, enrolled_students.count(), start_time) - - # This struct encapsulates both the display names of each static item in the - # header row as values as well as the django User field names of those items - # as the keys. It is structured in this way to keep the values related. + context.update_status('ProblemGradeReport Step 1: Starting problem grades') + course = get_course_by_id(context.course_id) header_row = OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')]) + context.update_status('ProblemGradeReport Step 2: Retrieving scorable grade blocks for course') + graded_scorable_blocks = self._graded_scorable_blocks_to_header(course) + context.update_status('ProblemGradeReport Step 3: Setting up headers for problem grade report') + success_headers = self._success_headers(header_row, graded_scorable_blocks) + error_headers = self._error_headers(header_row) - course = get_course_by_id(course_id) - log_task_info(u'Retrieving graded scorable blocks') - graded_scorable_blocks = cls._graded_scorable_blocks_to_header(course) + context.update_status('ProblemGradeReport Step 4: Retrieving problem scores for course for enrolled users') + generated_rows = self._batched_rows(context, header_row, graded_scorable_blocks) + success_rows, error_rows = self._compile(context, generated_rows) + context.update_status('ProblemGradeReport Step 5: Uploading data to CSV report file') + self._upload(context, [success_headers] + success_rows, [error_headers] + error_rows) - # Just generate the static fields for now. - rows = [ - list(header_row.values()) + ['Enrollment Status', 'Grade'] + _flatten(list(graded_scorable_blocks.values())) - ] - error_rows = [list(header_row.values()) + ['error_msg']] + return context.update_status('ProblemGradeReport Step 6: Completed problem grades report') - # Bulk fetch and cache enrollment states so we can efficiently determine - # whether each user is currently enrolled in the course. - log_task_info(u'Fetching enrollment status') - CourseEnrollment.bulk_fetch_enrollment_states(enrolled_students, course_id) + def _success_headers(self, header_row, graded_scorable_blocks): + """ + Returns headers for all gradable blocks including fixed headers + for report. - for student, course_grade, error in CourseGradeFactory().iter(enrolled_students, course): - student_fields = [getattr(student, field_name) for field_name in header_row] - task_progress.attempted += 1 + Arguments: + header_row (dict): list of header values + graded_scorable_blocks (dict): dict of graded_scorable_blocks - if not course_grade: - err_msg = text_type(error) - # There was an error grading this student. - if not err_msg: - err_msg = u'Unknown error' - error_rows.append(student_fields + [err_msg]) - task_progress.failed += 1 - continue + Returns: + list: combined header and scorable blocks + """ + header_row = list(header_row.values()) + ['Enrollment Status', 'Grade'] + return header_row + _flatten(list(graded_scorable_blocks.values())) - enrollment_status = _user_enrollment_status(student, course_id) + def _error_headers(self, header_row): + """ + Returns error headers for error report. - earned_possible_values = [] - for block_location in graded_scorable_blocks: - try: - problem_score = course_grade.problem_scores[block_location] - except KeyError: - earned_possible_values.append([u'Not Available', u'Not Available']) - else: - if problem_score.first_attempted: - earned_possible_values.append([problem_score.earned, problem_score.possible]) - else: - earned_possible_values.append([u'Not Attempted', problem_score.possible]) + Arguments: + header_row (dict): list of header values - rows.append(student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values)) + Returns: + list: error headers + """ + return list(header_row.values()) + ['error_msg'] - task_progress.succeeded += 1 - if task_progress.attempted % status_interval == 0: - step = u'Calculating Grades' - task_progress.update_task_state(extra_meta={'step': step}) - log_message = u'{0} {1}/{2}'.format(step, task_progress.attempted, task_progress.total) - log_task_info(log_message) - - log_task_info('Uploading CSV to store') - # Perform the upload if any students have been successfully graded - if len(rows) > 1: - upload_csv_to_report_store(rows, 'problem_grade_report', course_id, start_date) - # If there are any error rows, write them out as well - if len(error_rows) > 1: - upload_csv_to_report_store(error_rows, 'problem_grade_report_err', course_id, start_date) - - return task_progress.update_task_state(extra_meta={'step': 'Uploading CSV'}) - - @classmethod - def _graded_scorable_blocks_to_header(cls, course): + def _graded_scorable_blocks_to_header(self, course): """ Returns an OrderedDict that maps a scorable block's id to its headers in the final report. @@ -629,8 +718,8 @@ class ProblemGradeReport(object): for subsection_index, subsection_info in enumerate(subsection_infos, start=1): for scorable_block in subsection_info['scored_descendants']: header_name = ( - u"{assignment_type} {subsection_index}: " - u"{subsection_name} - {scorable_block_name}" + "{assignment_type} {subsection_index}: " + "{subsection_name} - {scorable_block_name}" ).format( scorable_block_name=scorable_block.display_name, assignment_type=assignment_type_name, @@ -641,9 +730,53 @@ class ProblemGradeReport(object): header_name + " (Possible)"] return scorable_blocks_map + def _rows_for_users(self, context, users, header_row, graded_scorable_blocks): + """ + Returns a list of rows for the given users for this report. + """ + success_rows, error_rows = [], [] + for student, course_grade, error in CourseGradeFactory().iter(users, context.course): + student_fields = [getattr(student, field_name) for field_name in header_row] + + if not course_grade: + err_msg = text_type(error) + # There was an error grading this student. + if not err_msg: + err_msg = 'Unknown error' + error_rows.append(student_fields + [err_msg]) + continue + + enrollment_status = _user_enrollment_status(student, context.course_id) + + earned_possible_values = [] + for block_location in graded_scorable_blocks: + try: + problem_score = course_grade.problem_scores[block_location] + except KeyError: + earned_possible_values.append(['Not Available', 'Not Available']) + else: + if problem_score.first_attempted: + earned_possible_values.append([problem_score.earned, problem_score.possible]) + else: + earned_possible_values.append(['Not Attempted', problem_score.possible]) + + success_rows.append( + student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values)) + + return success_rows, error_rows + + def _batched_rows(self, context, header_row, graded_scorable_blocks): + """ + A generator of batches of (success_rows, error_rows) for this report. + """ + for users in self._handle_empty_generator(self._batch_users(context), default=[]): + yield self._rows_for_users(context, users, header_row, graded_scorable_blocks) + class ProblemResponses(object): - + """ + Class to encapsulate functionality related to generating Problem Responses Reports. + """ @classmethod def _build_problem_list(cls, course_blocks, root, path=None): """