diff --git a/lms/djangoapps/instructor_task/tasks_helper/grades.py b/lms/djangoapps/instructor_task/tasks_helper/grades.py index 76dbdcb72c..561bc0873e 100644 --- a/lms/djangoapps/instructor_task/tasks_helper/grades.py +++ b/lms/djangoapps/instructor_task/tasks_helper/grades.py @@ -10,7 +10,6 @@ from datetime import datetime from itertools import chain from tempfile import TemporaryFile -from sys import getsizeof from time import time from django.conf import settings @@ -34,7 +33,6 @@ from lms.djangoapps.instructor_analytics.basic import list_problem_responses from lms.djangoapps.instructor_analytics.csvs import format_dictlist from lms.djangoapps.instructor_task.config.waffle import ( course_grade_report_verified_only, - optimize_get_learners_switch_enabled, problem_grade_report_verified_only, use_on_disk_grade_reporting, ) @@ -74,151 +72,6 @@ def _flatten(iterable): return list(chain.from_iterable(iterable)) -class GradeReportBase: - """ - Base class for grade reports (ProblemGradeReport and CourseGradeReport). - """ - - def _get_enrolled_learner_count(self, context): - """ - Returns count of number of learner enrolled in course. - """ - return CourseEnrollment.objects.users_enrolled_in( - course_id=context.course_id, - include_inactive=True, - verified_only=context.report_for_verified_only, - ).count() - - def log_task_info(self, context, message): - """ - Updates the status on the celery task to the given message. - Also logs the update. - """ - fmt = 'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}' - task_info_string = fmt.format( - task_id=context.task_id, - entry_id=context.entry_id, - course_id=context.course_id, - task_input=context.task_input - ) - TASK_LOG.info('%s, Task type: %s, %s, %s', task_info_string, context.action_name, - message, context.task_progress.state) - - def _handle_empty_generator(self, generator, default): - """ - Handle empty generator. - Return default if the generator is emtpy, otherwise return all - its iterations (including the first which was used for validation). - """ - TASK_LOG.info('GradeReport: Checking generator') - empty_generator_sentinel = object() - first_iteration_output = next(generator, empty_generator_sentinel) - generator_is_empty = first_iteration_output == empty_generator_sentinel - - if generator_is_empty: - TASK_LOG.info('GradeReport: Generator is empty') - yield default - - else: - TASK_LOG.info('GradeReport: Generator is not empty') - yield first_iteration_output - yield from generator - - def _batch_users(self, context): - """ - Returns a generator of batches of users. - """ - def grouper(iterable, chunk_size=100, fillvalue=None): - args = [iter(iterable)] * chunk_size - return zip_longest(*args, fillvalue=fillvalue) - - def get_enrolled_learners_for_course(course_id, verified_only=False): - """ - Get all the enrolled users in a course chunk by chunk. - This generator method fetches & loads the enrolled user objects on demand which in chunk - size defined. This method is a workaround to avoid out-of-memory errors. - """ - self.log_additional_info_for_testing( - context, - 'ProblemGradeReport: Starting batching of enrolled students' - ) - - filter_kwargs = { - 'courseenrollment__course_id': course_id, - } - if verified_only: - filter_kwargs['courseenrollment__mode'] = CourseMode.VERIFIED - - user_ids_list = get_user_model().objects.filter(**filter_kwargs).values_list('id', flat=True).order_by('id') - user_chunks = grouper(user_ids_list) - for user_ids in user_chunks: - user_ids = [user_id for user_id in user_ids if user_id is not None] - min_id = min(user_ids) - max_id = max(user_ids) - users = get_user_model().objects.filter( - id__gte=min_id, - id__lte=max_id, - **filter_kwargs - ).select_related('profile') - - self.log_additional_info_for_testing(context, 'ProblemGradeReport: user chunk yielded successfully') - yield users - - course_id = context.course_id - return get_enrolled_learners_for_course(course_id=course_id, verified_only=context.report_for_verified_only) - - def _compile(self, context, batched_rows): - """ - Compiles and returns the complete list of (success_rows, error_rows) for - the given batched_rows and context. - """ - # partition and chain successes and errors - self.log_additional_info_for_testing(context, "Begin Zipping Batched Rows") - success_rows, error_rows = zip(*batched_rows) - self.log_additional_info_for_testing(context, "Evaluating Success Rows") - success_rows = list(chain(*success_rows)) - self.log_additional_info_for_testing(context, "Evaluating Error Rows") - error_rows = list(chain(*error_rows)) - self.log_additional_info_for_testing(context, "Compilation complete") - - # update metrics on task status - context.task_progress.succeeded = len(success_rows) - context.task_progress.failed = len(error_rows) - context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed - context.task_progress.total = context.task_progress.attempted - return success_rows, error_rows - - def _upload(self, context, success_rows, error_rows): - """ - Creates and uploads a CSV for the given headers and rows. - """ - date = datetime.now(UTC) - upload_csv_to_report_store( - success_rows, - context.upload_filename, - context.course_id, - date, - parent_dir=context.upload_parent_dir - ) - - if len(error_rows) > 1: - upload_csv_to_report_store( - error_rows, - context.upload_filename + '_err', - context.course_id, - date, - parent_dir=context.upload_parent_dir - ) - - def log_additional_info_for_testing(self, context, message): - """ - Investigation logs for test problem grade report. - - TODO -- Remove as a part of PROD-1287 - """ - context.update_status(message) - - class _CourseGradeReportContext: """ Internal class that provides a common context to use for a single grade @@ -423,72 +276,53 @@ class _CourseGradeBulkContext: # lint-amnesty, pylint: disable=missing-class-do BulkCourseTags.prefetch(context.course_id, users) -class CourseGradeReport: +class InMemoryReportMixin: """ - Class to encapsulate functionality related to generating Grade Reports. + Mixin for a file report that will generate file in memory and then upload to report store """ - # Batch size for chunking the list of enrollees in the course. - USER_BATCH_SIZE = 100 - - @classmethod - def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): - """ - Public method to generate a grade report. - """ - with modulestore().bulk_operations(course_id): - context = _CourseGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name) - return CourseGradeReport()._generate(context) # lint-amnesty, pylint: disable=protected-access - - def _generate(self, context): + def _generate(self): """ Internal method for generating a grade report for the given context. """ - context.update_status('Starting grades') - success_headers = self._success_headers(context) + self.context.update_status('InMemoryReportMixin - 1: Starting grade report') + success_headers = self._success_headers() error_headers = self._error_headers() - batched_rows = self._batched_rows(context) + batched_rows = self._batched_rows() - context.update_status('Compiling grades') - success_rows, error_rows = self._compile(context, batched_rows) + self.context.update_status('InMemoryReportMixin - 2: Compiling grades') + success_rows, error_rows = self._compile(batched_rows) - context.update_status('Uploading grades') - self._upload(context, success_headers, success_rows, error_headers, error_rows) + self.context.update_status('InMemoryReportMixin - 3: Uploading grades') + self._upload(success_headers, success_rows, error_headers, error_rows) - return context.update_status('Completed grades') + return self.context.update_status('InMemoryReportMixin - 4: Completed grades') - def _success_headers(self, context): + def _upload(self, success_headers, success_rows, error_headers, error_rows): """ - Returns a list of all applicable column headers for this grade report. + Creates and uploads a CSV for the given headers and rows. """ - return ( - ["Student ID", "Email", "Username"] + - self._grades_header(context) + - (['Cohort Name'] if context.cohorts_enabled else []) + - [f'Experiment Group ({partition.name})' for partition in context.course_experiments] + - (['Team Name'] if context.teams_enabled else []) + - ['Enrollment Track', 'Verification Status'] + - ['Certificate Eligible', 'Certificate Delivered', 'Certificate Type'] + - ['Enrollment Status'] + date = datetime.now(UTC) + upload_csv_to_report_store( + [success_headers] + success_rows, + self.context.upload_filename, + self.context.course_id, + date, + parent_dir=self.context.upload_parent_dir ) - def _error_headers(self): - """ - Returns a list of error headers for this grade report. - """ - return ["Student ID", "Username", "Error"] + if len(error_rows) > 0: + upload_csv_to_report_store( + [error_headers] + error_rows, + self.context.upload_filename + '_err', + self.context.course_id, + date, + parent_dir=self.context.upload_parent_dir + ) - def _batched_rows(self, context): - """ - A generator of batches of (success_rows, error_rows) for this report. - """ - for users in self._batch_users(context): - users = [u for u in users if u is not None] - yield self._rows_for_users(context, users) - - def _compile(self, context, batched_rows): + def _compile(self, batched_rows): """ Compiles and returns the complete list of (success_rows, error_rows) for - the given batched_rows and context. + the given batched_rows. """ # partition and chain successes and errors success_rows, error_rows = zip(*batched_rows) @@ -496,84 +330,128 @@ class CourseGradeReport: error_rows = list(chain(*error_rows)) # update metrics on task status - context.task_progress.succeeded = len(success_rows) - context.task_progress.failed = len(error_rows) - context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed - context.task_progress.total = context.task_progress.attempted + self.context.task_progress.succeeded = len(success_rows) + self.context.task_progress.failed = len(error_rows) + self.context.task_progress.attempted = self.context.task_progress.succeeded + self.context.task_progress.failed + self.context.task_progress.total = self.context.task_progress.attempted return success_rows, error_rows - def _upload(self, context, success_headers, success_rows, error_headers, error_rows): + +class TemporaryFileReportMixin: + """ + Mixin for a file report that will write rows iteratively to a TempFile + """ + def _generate(self): """ - Creates and uploads a CSV for the given headers and rows. + Generate a CSV containing all students' problem grades within a given `course_id`. + """ + self.context.update_status('TemporaryFileReportMixin - 1: Starting grade report') + batched_rows = self._batched_rows() + + with TemporaryFile('r+') as success_file, TemporaryFile('r+') as error_file: + self.context.update_status('TemporaryFileReportMixin - 2: Compiling grades into temp files') + has_errors = self.iter_and_write_batched_rows(batched_rows, success_file, error_file) + + self.context.update_status('TemporaryFileReportMixin - 3: Uploading files') + self.upload_temp_files(success_file, error_file, has_errors) + + return self.context.update_status('TemporaryFileReportMixin - 4: Completed grades') + + def iter_and_write_batched_rows(self, batched_rows, success_file, error_file): + """ + Iterate through batched rows, writing returned chunks to disk as we go. + This should hopefully help us avoid out of memory errors. + """ + success_writer = csv.writer(success_file) + error_writer = csv.writer(error_file) + + # Write headers + success_writer.writerow(self._success_headers()) + error_writer.writerow(self._error_headers()) + + succeeded, failed = 0, 0 + # Iterate through batched rows, writing to temp file + for success_rows, error_rows in batched_rows: + success_writer.writerows(success_rows) + if len(error_rows) > 0: + error_writer.writerows(error_rows) + succeeded += len(success_rows) + failed += len(error_rows) + + self.context.task_progress.succeeded = succeeded + self.context.task_progress.failed = failed + self.context.task_progress.attempted = succeeded + failed + self.context.task_progress.total = self.context.task_progress.attempted + + return self.context.task_progress.failed > 0 + + def upload_temp_files(self, success_file, error_file, has_errors): + """ + Uploads success and error csv files to report store """ date = datetime.now(UTC) - upload_csv_to_report_store( - [success_headers] + success_rows, - context.upload_filename, - context.course_id, + + success_file.seek(0) + upload_csv_file_to_report_store( + success_file, + self.context.upload_filename, + self.context.course_id, date, - parent_dir=context.upload_parent_dir + parent_dir=self.context.upload_parent_dir ) - if len(error_rows) > 0: - upload_csv_to_report_store( - [error_headers] + error_rows, - '{}_err'.format(context.upload_filename), - context.course_id, + + if has_errors: + error_file.seek(0) + upload_csv_file_to_report_store( + error_file, + self.context.upload_filename + '_err', + self.context.course_id, date, - parent_dir=context.upload_parent_dir + parent_dir=self.context.upload_parent_dir ) - def _grades_header(self, context): - """ - Returns the applicable grades-related headers for this report. - """ - graded_assignments = context.graded_assignments - grades_header = ["Grade"] - for assignment_info in graded_assignments.values(): - if assignment_info['separate_subsection_avg_headers']: - grades_header.extend(assignment_info['subsection_headers'].values()) - grades_header.append(assignment_info['average_header']) - return grades_header - def _batch_users(self, context): +class GradeReportBase: + """ + Base class for grade reports (ProblemGradeReport and CourseGradeReport). + """ + def __init__(self, context): + self.context = context + + def _get_enrolled_learner_count(self): + """ + Returns count of number of learner enrolled in course. + """ + return CourseEnrollment.objects.users_enrolled_in( + course_id=self.context.course_id, + include_inactive=True, + verified_only=self.context.report_for_verified_only, + ).count() + + def log_task_info(self, message): + """ + Updates the status on the celery task to the given message. + Also logs the update. + """ + fmt = 'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}' + task_info_string = fmt.format( + task_id=self.context.task_id, + entry_id=self.context.entry_id, + course_id=self.context.course_id, + task_input=self.context.task_input + ) + TASK_LOG.info('%s, Task type: %s, %s, %s', task_info_string, self.context.action_name, + message, self.context.task_progress.state) + + def _batch_users(self): """ Returns a generator of batches of users. """ - - def grouper(iterable, chunk_size=self.USER_BATCH_SIZE, fillvalue=None): + def grouper(iterable, chunk_size=100, fillvalue=None): args = [iter(iterable)] * chunk_size return zip_longest(*args, fillvalue=fillvalue) def get_enrolled_learners_for_course(course_id, verified_only=False): - """ - Get enrolled learners in a course. - Arguments: - course_id (CourseLocator): course_id to return enrollees for. - verified_only (boolean): is a boolean when True, returns only verified enrollees. - """ - if optimize_get_learners_switch_enabled(): - TASK_LOG.info('%s, Creating Course Grade with optimization', task_log_message) - return users_for_course_v2(course_id, verified_only=verified_only) - - TASK_LOG.info('%s, Creating Course Grade without optimization', task_log_message) - return users_for_course(course_id, verified_only=verified_only) - - def users_for_course(course_id, verified_only=False): - """ - Get all the enrolled users in a course. - This method fetches & loads the enrolled user objects at once which may cause - out-of-memory errors in large courses. This method will be removed when - `OPTIMIZE_GET_LEARNERS_FOR_COURSE` waffle flag is removed. - """ - users = CourseEnrollment.objects.users_enrolled_in( - course_id, - include_inactive=True, - verified_only=verified_only, - ) - users = users.select_related('profile') - return grouper(users) - - def users_for_course_v2(course_id, verified_only=False): """ Get all the enrolled users in a course chunk by chunk. This generator method fetches & loads the enrolled user objects on demand which in chunk @@ -596,18 +474,126 @@ class CourseGradeReport: id__lte=max_id, **filter_kwargs ).select_related('profile') - yield users - course_id = context.course_id - task_log_message = f'{context.task_info_string}, Task type: {context.action_name}' - return get_enrolled_learners_for_course(course_id=course_id, verified_only=context.report_for_verified_only) - def _user_grades(self, course_grade, context): + yield users + + return get_enrolled_learners_for_course( + course_id=self.context.course_id, + verified_only=self.context.report_for_verified_only + ) + + def log_additional_info_for_testing(self, message): + """ + Investigation logs for test problem grade report. + + TODO -- Remove as a part of PROD-1287 + """ + self.context.update_status(message) + + def _clear_caches(self): + """ + Override if a report type wants to clear caches after a batch of learners has + been processed + """ + + def _batched_rows(self): + """ + A generator of batches of (success_rows, error_rows) for this report. + """ + for users in self._batch_users(): + yield self._rows_for_users(users) + self._clear_caches() + + +class CourseGradeReport(GradeReportBase): + """ + Class to encapsulate functionality related to generating user/row had header data for Corse Grade Reports. + """ + # Batch size for chunking the list of enrollees in the course. + USER_BATCH_SIZE = 100 + + @classmethod + def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name): + """ + Public method to generate a grade report. + """ + with modulestore().bulk_operations(course_id): + context = _CourseGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name) + if use_on_disk_grade_reporting(course_id): # AU-926 + return TempFileCourseGradeReport(context)._generate() # pylint: disable=protected-access + else: + return InMemoryCourseGradeReport(context)._generate() # pylint: disable=protected-access + + def _success_headers(self): + """ + Returns a list of all applicable column headers for this grade report. + """ + return ( + ["Student ID", "Email", "Username"] + + self._grades_header() + + (['Cohort Name'] if self.context.cohorts_enabled else []) + + [f'Experiment Group ({partition.name})' for partition in self.context.course_experiments] + + (['Team Name'] if self.context.teams_enabled else []) + + ['Enrollment Track', 'Verification Status'] + + ['Certificate Eligible', 'Certificate Delivered', 'Certificate Type'] + + ['Enrollment Status'] + ) + + def _error_headers(self): + """ + Returns a list of error headers for this grade report. + """ + return ["Student ID", "Username", "Error"] + + def _grades_header(self): + """ + Returns the applicable grades-related headers for this report. + """ + graded_assignments = self.context.graded_assignments + grades_header = ["Grade"] + for assignment_info in graded_assignments.values(): + if assignment_info['separate_subsection_avg_headers']: + grades_header.extend(assignment_info['subsection_headers'].values()) + grades_header.append(assignment_info['average_header']) + return grades_header + + def _rows_for_users(self, users): + """ + Returns a list of rows for the given users for this report. + """ + with modulestore().bulk_operations(self.context.course_id): + bulk_context = _CourseGradeBulkContext(self.context, users) + + success_rows, error_rows = [], [] + for user, course_grade, error in CourseGradeFactory().iter( + users, + course=self.context.course, + collected_block_structure=self.context.course_structure, + course_key=self.context.course_id, + ): + if not course_grade: + # An empty gradeset means we failed to grade a student. + error_rows.append([user.id, user.username, str(error)]) + else: + success_rows.append( + [user.id, user.email, user.username] + + self._user_grades(course_grade) + + self._user_cohort_group_names(user) + + self._user_experiment_group_names(user) + + self._user_team_names(user, bulk_context.teams) + + self._user_verification_mode(user, bulk_context.enrollments) + + self._user_certificate_info(user, course_grade, bulk_context.certs) + + [_user_enrollment_status(user, self.context.course_id)] + ) + return success_rows, error_rows + + def _user_grades(self, course_grade): """ Returns a list of grade results for the given course_grade corresponding to the headers for this report. """ grade_results = [] - for _, assignment_info in context.graded_assignments.items(): + for _, assignment_info in self.context.graded_assignments.items(): subsection_grades, subsection_grades_results = self._user_subsection_grades( course_grade, assignment_info['subsection_headers'], @@ -637,7 +623,10 @@ class CourseGradeReport: subsection_grades.append(subsection_grade) return subsection_grades, grade_results - def _user_assignment_average(self, course_grade, subsection_grades, assignment_info): # lint-amnesty, pylint: disable=missing-function-docstring + def _user_assignment_average(self, course_grade, subsection_grades, assignment_info): + """ + Returns grade averages for assignment types + """ if assignment_info['separate_subsection_avg_headers']: if assignment_info['grader']: if course_grade.attempted: @@ -650,25 +639,25 @@ class CourseGradeReport: assignment_average = 0.0 return assignment_average - def _user_cohort_group_names(self, user, context): + def _user_cohort_group_names(self, user): """ Returns a list of names of cohort groups in which the given user belongs. """ cohort_group_names = [] - if context.cohorts_enabled: - group = get_cohort(user, context.course_id, assign=False, use_cached=True) + if self.context.cohorts_enabled: + group = get_cohort(user, self.context.course_id, assign=False, use_cached=True) cohort_group_names.append(group.name if group else '') return cohort_group_names - def _user_experiment_group_names(self, user, context): + def _user_experiment_group_names(self, user): """ Returns a list of names of course experiments in which the given user belongs. """ experiment_group_names = [] - for partition in context.course_experiments: - group = PartitionService(context.course_id).get_group(user, partition, assign=False) + for partition in self.context.course_experiments: + group = PartitionService(self.context.course_id).get_group(user, partition, assign=False) experiment_group_names.append(group.name if group else '') return experiment_group_names @@ -681,12 +670,12 @@ class CourseGradeReport: team_names = [bulk_teams.teams_by_user.get(user.id, '')] return team_names - def _user_verification_mode(self, user, context, bulk_enrollments): + def _user_verification_mode(self, user, bulk_enrollments): """ Returns a list of enrollment-mode and verification-status for the given user. """ - enrollment_mode = CourseEnrollment.enrollment_mode_for_user(user, context.course_id)[0] + enrollment_mode = CourseEnrollment.enrollment_mode_for_user(user, self.context.course_id)[0] verification_status = IDVerificationService.verification_status_for_user( user, enrollment_mode, @@ -694,54 +683,32 @@ class CourseGradeReport: ) return [enrollment_mode, verification_status] - def _user_certificate_info(self, user, context, course_grade, bulk_certs): + def _user_certificate_info(self, user, course_grade, bulk_certs): """ Returns the course certification information for the given user. """ is_allowlisted = user.id in bulk_certs.allowlisted_user_ids certificate_info = certs_api.certificate_info_for_user( user, - context.course_id, + self.context.course_id, course_grade.letter_grade, is_allowlisted, bulk_certs.certificates_by_user.get(user.id), ) return certificate_info - def _rows_for_users(self, context, users): - """ - Returns a list of rows for the given users for this report. - """ - with modulestore().bulk_operations(context.course_id): - bulk_context = _CourseGradeBulkContext(context, users) - success_rows, error_rows = [], [] - for user, course_grade, error in CourseGradeFactory().iter( - users, - course=context.course, - collected_block_structure=context.course_structure, - course_key=context.course_id, - ): - if not course_grade: - # An empty gradeset means we failed to grade a student. - error_rows.append([user.id, user.username, str(error)]) - else: - success_rows.append( - [user.id, user.email, user.username] + - self._user_grades(course_grade, context) + - self._user_cohort_group_names(user, context) + - self._user_experiment_group_names(user, context) + - self._user_team_names(user, bulk_context.teams) + - self._user_verification_mode(user, context, bulk_context.enrollments) + - self._user_certificate_info(user, context, course_grade, bulk_context.certs) + - [_user_enrollment_status(user, context.course_id)] - ) - return success_rows, error_rows +class InMemoryCourseGradeReport(CourseGradeReport, InMemoryReportMixin): + """ Course Grade Report that compiles and then uploads all rows at once """ + + +class TempFileCourseGradeReport(CourseGradeReport, TemporaryFileReportMixin): + """ Course Grade Report that writes file iteratively to a TempFile to then be uploaded """ class ProblemGradeReport(GradeReportBase): """ - Class to encapsulate functionality related to generating Problem Grade Reports. + Class to encapsulate functionality related to generating user/row had header data for Problem Grade Reports. """ @classmethod @@ -752,34 +719,11 @@ class ProblemGradeReport(GradeReportBase): with modulestore().bulk_operations(course_id): context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name) if use_on_disk_grade_reporting(course_id): # AU-926 - # pylint: disable=protected-access - return TempFileProblemGradeReport()._generate(context) + return TempFileProblemGradeReport(context)._generate() # pylint: disable=protected-access else: - # pylint: disable=protected-access - return ProblemGradeReport()._generate(context) + return InMemoryProblemGradeReport(context)._generate() # pylint: disable=protected-access - def _generate(self, context): - """ - Generate a CSV containing all students' problem grades within a given - `course_id`. - """ - context.update_status('ProblemGradeReport - 1: Starting problem grades') - success_headers = self._success_headers(context) - error_headers = self._error_headers() - batched_rows = self._batched_rows(context) - - context.update_status('ProblemGradeReport - 2: Compiling grades') - success_rows, error_rows = self._compile(context, batched_rows) - context.update_status('ProblemGradeReport - 3: Uploading grades') - self._upload(context, [success_headers] + success_rows, [error_headers] + error_rows) - - return context.update_status('ProblemGradeReport - 4: Completed problem grades') - - def _problem_grades_header(self): - """Problem Grade report header.""" - return OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')]) - - def _success_headers(self, context): + def _success_headers(self): """ Returns headers for all gradable blocks including fixed headers for report. @@ -787,7 +731,7 @@ class ProblemGradeReport(GradeReportBase): list: combined header and scorable blocks """ header_row = list(self._problem_grades_header().values()) + ['Enrollment Status', 'Grade'] - return header_row + _flatten(list(context.graded_scorable_blocks_header.values())) + return header_row + _flatten(list(self.context.graded_scorable_blocks_header.values())) def _error_headers(self): """ @@ -797,24 +741,21 @@ class ProblemGradeReport(GradeReportBase): """ return list(self._problem_grades_header().values()) + ['error_msg'] - def _rows_for_users(self, context, users): + def _problem_grades_header(self): + """Problem Grade report header.""" + return OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')]) + + def _rows_for_users(self, users): """ Returns a list of rows for the given users for this report. """ - self.log_additional_info_for_testing(context, 'ProblemGradeReport: Starting to process new user batch.') success_rows, error_rows = [], [] - success_rows_size, error_rows_size = 0, 0 for student, course_grade, error in CourseGradeFactory().iter( users, - course=context.course, - collected_block_structure=context.course_structure, - course_key=context.course_id, + course=self.context.course, + collected_block_structure=self.context.course_structure, + course_key=self.context.course_id, ): - context.task_progress.attempted += 1 - self.log_additional_info_for_testing( - context, - f'ProblemGradeReport: Attempt {context.task_progress.attempted}' - ) if not course_grade: err_msg = str(error) # There was an error grading this student. @@ -824,17 +765,10 @@ class ProblemGradeReport(GradeReportBase): [student.id, student.email, student.username] + [err_msg] ) - error_rows_size += getsizeof(error_rows[-1]) - context.task_progress.failed += 1 - self.log_additional_info_for_testing( - context, - f'ProblemGradeReport: Failed {context.task_progress.failed}' - ) continue - self.log_additional_info_for_testing(context, 'ProblemGradeReport: Succeeded in reading grade') earned_possible_values = [] - for block_location in context.graded_scorable_blocks_header: + for block_location in self.context.graded_scorable_blocks_header: try: problem_score = course_grade.problem_scores[block_location] except KeyError: @@ -845,114 +779,26 @@ class ProblemGradeReport(GradeReportBase): else: earned_possible_values.append(['Not Attempted', problem_score.possible]) - self.log_additional_info_for_testing(context, 'ProblemGradeReport: earned possible values done') - context.task_progress.succeeded += 1 - enrollment_status = _user_enrollment_status(student, context.course_id) - self.log_additional_info_for_testing( - context, - f'ProblemGradeReport: Succeeded {context.task_progress.succeeded}' - ) + enrollment_status = _user_enrollment_status(student, self.context.course_id) success_rows.append( [student.id, student.email, student.username] + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values) ) - success_rows_size += getsizeof(success_rows[-1]) - self.log_additional_info_for_testing(context, 'ProblemGradeReport: Added rows') - - success_rows_size += getsizeof(success_rows) - error_rows_size += getsizeof(error_rows) - self.log_additional_info_for_testing( - context, - f'ProblemGradeReport memory usage: succeess {success_rows_size} error {error_rows_size}' - ) return success_rows, error_rows - def _batched_rows(self, context): - """ - A generator of batches of (success_rows, error_rows) for this report. - """ - for users in self._batch_users(context): - yield self._rows_for_users(context, users) - # Clear the CourseEnrollment caches after each batch of users has been processed - get_cache('get_enrollment').clear() - get_cache(CourseEnrollment.MODE_CACHE_NAMESPACE).clear() + def _clear_caches(self): + get_cache('get_enrollment').clear() + get_cache(CourseEnrollment.MODE_CACHE_NAMESPACE).clear() -class TempFileProblemGradeReport(ProblemGradeReport): - """ - ProblemGradeReport that instead of holding all resultant file info in memory, - writes chunked data to disk. - """ - def _generate(self, context): - """ - Generate a CSV containing all students' problem grades within a given `course_id`. - """ - context.update_status('TempFileProblemGradeReport - 1: Starting problem grades') - batched_rows = self._batched_rows(context) +class InMemoryProblemGradeReport(ProblemGradeReport, InMemoryReportMixin): + """ Program Grade Report that compiles and then uploads all rows at once """ - with TemporaryFile('r+') as success_file, TemporaryFile('r+') as error_file: - context.update_status('TempFileProblemGradeReport - 2: Compiling grades into temp files') - has_errors = self.iter_and_write_batched_rows(context, success_file, error_file, batched_rows) - context.update_status('TempFileProblemGradeReport - 3: Uploading files') - self.upload_temp_files(context, success_file, error_file, has_errors) - - return context.update_status('ProblemGradeReport - 4: Completed problem grades') - - def iter_and_write_batched_rows(self, context, success_file, error_file, batched_rows): - """ - Iterate through batched rows, writing returned chunks to disk as we go. - This should hopefully help us avoid out of memory errors. - """ - context.task_progress.succeeded = 0 - context.task_progress.failed = 0 - - success_writer = csv.writer(success_file) - error_writer = csv.writer(error_file) - - # Write headers - success_writer.writerow(self._success_headers(context)) - error_writer.writerow(self._error_headers()) - - # Iterate through batched rows, writing to temp file - for success_rows, error_rows in batched_rows: - context.task_progress.succeeded += len(success_rows) - success_writer.writerows(success_rows) - if len(error_rows) > 0: - context.task_progress.failed += len(error_rows) - error_writer.writerows(error_rows) - - context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed - context.task_progress.total = context.task_progress.attempted - - return context.task_progress.failed > 0 - - def upload_temp_files(self, context, success_file, error_file, has_errors): - """ - Uploads success and error csv files to report store - """ - date = datetime.now(UTC) - - success_file.seek(0) - upload_csv_file_to_report_store( - success_file, - context.upload_filename, - context.course_id, - date, - parent_dir=context.upload_parent_dir - ) - - if has_errors: - error_file.seek(0) - upload_csv_file_to_report_store( - error_file, - context.upload_filename + '_err', - context.course_id, - date, - parent_dir=context.upload_parent_dir - ) +class TempFileProblemGradeReport(ProblemGradeReport, TemporaryFileReportMixin): + """ Program Grade Report that writes file iteratively to a TempFile to then be uploaded """ class ProblemResponses: diff --git a/lms/djangoapps/instructor_task/tests/test_tasks_helper.py b/lms/djangoapps/instructor_task/tests/test_tasks_helper.py index d632135c7e..52679fe165 100644 --- a/lms/djangoapps/instructor_task/tests/test_tasks_helper.py +++ b/lms/djangoapps/instructor_task/tests/test_tasks_helper.py @@ -45,7 +45,6 @@ from lms.djangoapps.instructor_task.tasks_helper.grades import ( CourseGradeReport, ProblemGradeReport, ProblemResponses, - TempFileProblemGradeReport, ) from lms.djangoapps.instructor_task.tasks_helper.misc import ( cohort_students_and_upload, @@ -79,17 +78,21 @@ _TEAMS_CONFIG = TeamsConfig({ 'max_size': 2, 'topics': [{'id': 'topic', 'name': 'Topic', 'description': 'A Topic'}], }) +USE_ON_DISK_GRADE_REPORT = 'lms.djangoapps.instructor_task.tasks_helper.grades.use_on_disk_grade_reporting' class InstructorGradeReportTestCase(TestReportMixin, InstructorTaskCourseTestCase): """ Base class for grade report tests. """ - def _verify_cell_data_for_user(self, username, course_id, column_header, expected_cell_content, num_rows=2): + def _verify_cell_data_for_user( + self, username, course_id, column_header, expected_cell_content, num_rows=2, use_tempfile=False + ): """ Verify cell data in the grades CSV for a particular user. """ with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'): - result = CourseGradeReport.generate(None, None, course_id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = CourseGradeReport.generate(None, None, course_id, {}, 'graded') self.assertDictContainsSubset({'attempted': num_rows, 'succeeded': num_rows, 'failed': 0}, result) report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') report_csv_filename = report_store.links_for(course_id)[0][0] @@ -112,25 +115,28 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase): super().setUp() self.course = CourseFactory.create() - @ddt.data(['student@example.com', 'ni\xf1o@example.com']) - def test_unicode_emails(self, emails): + @ddt.data(True, False) + def test_unicode_emails(self, use_tempfile): """ Test that students with unicode characters in emails is handled. """ - for i, email in enumerate(emails): + emails = ['student@example.com', 'ni\xf1o@example.com'] + for i, email in enumerate(['student@example.com', 'ni\xf1o@example.com']): self.create_student(f'student{i}', email) self.current_task = Mock() # pylint: disable=attribute-defined-outside-init self.current_task.update_state = Mock() with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') as mock_current_task: mock_current_task.return_value = self.current_task - result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded') num_students = len(emails) self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result) + @ddt.data(True, False) @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') @patch('lms.djangoapps.grades.course_grade_factory.CourseGradeFactory.iter') - def test_grading_failure(self, mock_grades_iter, _mock_current_task): + def test_grading_failure(self, use_tempfile, mock_grades_iter, _mock_current_task): """ Test that any grading errors are properly reported in the progress dict and uploaded to the report store. @@ -138,7 +144,8 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase): mock_grades_iter.return_value = [ (self.create_student('username', 'student@example.com'), None, TypeError('Cannot grade student')) ] - result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset({'attempted': 1, 'succeeded': 0, 'failed': 1}, result) report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD') @@ -396,7 +403,7 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase): with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'): with check_mongo_calls(2): - with self.assertNumQueries(48): + with self.assertNumQueries(51): CourseGradeReport.generate(None, None, course.id, {}, 'graded') def test_inactive_enrollments(self): @@ -422,6 +429,7 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase): ) +@ddt.ddt class TestTeamGradeReport(InstructorGradeReportTestCase): """ Test that teams appear correctly in the grade report when it is enabled for the course. """ @@ -433,8 +441,11 @@ class TestTeamGradeReport(InstructorGradeReportTestCase): self.student2 = UserFactory.create() CourseEnrollment.enroll(self.student2, self.course.id) - def test_team_in_grade_report(self): - self._verify_cell_data_for_user(self.student1.username, self.course.id, 'Team Name', '') + @ddt.data(True, False) + def test_team_in_grade_report(self, use_tempfile): + self._verify_cell_data_for_user( + self.student1.username, self.course.id, 'Team Name', '', use_tempfile=use_tempfile + ) def test_correct_team_name_in_grade_report(self): team1 = CourseTeamFactory.create(course_id=self.course.id) @@ -794,13 +805,14 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.csv_header_row = ['Student ID', 'Email', 'Username', 'Enrollment Status', 'Grade'] @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) - def test_no_problems(self, problem_grade_report_class, _): + @ddt.data(True, False) + def test_no_problems(self, use_tempfile, _): """ Verify that we see no grade information for a course with no graded problems. """ - result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv([ dict(list(zip( @@ -814,8 +826,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): ]) @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) - def test_single_problem(self, problem_grade_report_class, _): + @ddt.data(True, False) + def test_single_problem(self, use_tempfile, _): vertical = ItemFactory.create( parent_location=self.problem_section.location, category='vertical', @@ -825,7 +837,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.define_option_problem('Problem1', parent=vertical) self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1']) - result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) problem_name = 'Homework 1: Subsection - Problem1' header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)'] @@ -853,8 +866,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): ]) @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) - def test_single_problem_verified_student_only(self, problem_grade_report_class, _): + @ddt.data(True, False) + def test_single_problem_verified_student_only(self, use_tempfile, _): with patch( 'lms.djangoapps.instructor_task.tasks_helper.grades.problem_grade_report_verified_only', return_value=True, @@ -870,14 +883,15 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1']) self.submit_student_answer(student_verified.username, 'Problem1', ['Option 1']) - result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset( {'action_name': 'graded', 'attempted': 1, 'succeeded': 1, 'failed': 0}, result ) @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) - def test_inactive_enrollment_included(self, problem_grade_report_class, _): + @ddt.data(True, False) + def test_inactive_enrollment_included(self, use_tempfile, _): """ Students with inactive enrollments in a course should be included in Problem Grade Report. """ @@ -891,7 +905,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.define_option_problem('Problem1', parent=vertical) self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1']) - result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 3, 'succeeded': 3, 'failed': 0}, result) problem_name = 'Homework 1: Subsection - Problem1' header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)'] @@ -929,6 +944,7 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): ]) +@ddt.ddt class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent, InstructorTaskModuleTestCase): """ Test the problem report on a course that has split tests. @@ -943,7 +959,8 @@ class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent, self.define_option_problem(self.problem_a_url, parent=self.vertical_a) self.define_option_problem(self.problem_b_url, parent=self.vertical_b) - def test_problem_grade_report(self): + @ddt.data(True, False) + def test_problem_grade_report(self, use_tempfile): """ Test that we generate the correct grade report when dealing with A/B tests. @@ -962,7 +979,8 @@ class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent, self.submit_student_answer(self.student_b.username, self.problem_b_url, [self.OPTION_1, self.OPTION_2]) with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'): - result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset( {'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result ) @@ -1061,6 +1079,7 @@ class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent, assert self.get_csv_row_with_headers() == header_row +@ddt.ddt class TestProblemReportCohortedContent(TestReportMixin, ContentGroupTestCase, InstructorTaskModuleTestCase): """ Test the problem report on a course that has cohorted content. @@ -1104,7 +1123,8 @@ class TestProblemReportCohortedContent(TestReportMixin, ContentGroupTestCase, In ] + grade ))) - def test_cohort_content(self): + @ddt.data(True, False) + def test_cohort_content(self, use_tempfile): self.submit_student_answer(self.alpha_user.username, 'Problem0', ['Option 1', 'Option 1']) resp = self.submit_student_answer(self.alpha_user.username, 'Problem1', ['Option 1', 'Option 1']) assert resp.status_code == 404 @@ -1114,7 +1134,8 @@ class TestProblemReportCohortedContent(TestReportMixin, ContentGroupTestCase, In self.submit_student_answer(self.beta_user.username, 'Problem1', ['Option 1', 'Option 2']) with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'): - result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') + with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile): + result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset( {'action_name': 'graded', 'attempted': 5, 'succeeded': 5, 'failed': 0}, result )