From 8eb327741431996c58d9876b31bc4d29d61546dd Mon Sep 17 00:00:00 2001 From: Jansen Kantor Date: Thu, 1 Dec 2022 09:48:50 -0500 Subject: [PATCH] feat: add problemgradereport config for writing to disk (#31385) in the problemgradereport currently we must currently hold the entire file in memory before writing it all at once. to avoid out of memory celery issues to resolve a blocking bug for some MIT courses, add a temp waffle flag `instructor_task.use_on_disk_grade_reporting` which when activated uses this new report to (hopefully) allow the report to complete. Additional testing and consideration is required for this approach. --- .../instructor_task/config/waffle.py | 21 +++++ .../instructor_task/tasks_helper/grades.py | 91 ++++++++++++++++++- .../instructor_task/tasks_helper/utils.py | 25 +++++ .../tests/test_tasks_helper.py | 23 +++-- 4 files changed, 147 insertions(+), 13 deletions(-) diff --git a/lms/djangoapps/instructor_task/config/waffle.py b/lms/djangoapps/instructor_task/config/waffle.py index b3938096eb..67c3247ff2 100644 --- a/lms/djangoapps/instructor_task/config/waffle.py +++ b/lms/djangoapps/instructor_task/config/waffle.py @@ -24,6 +24,18 @@ GENERATE_COURSE_GRADE_REPORT_VERIFIED_ONLY = CourseWaffleFlag( # lint-amnesty, f'{WAFFLE_NAMESPACE}.generate_course_grade_report_verified_only', __name__ ) +# .. toggle_name: instructor_task.use_on_disk_grade_reporting +# .. toggle_implementation: CourseWaffleFlag +# .. toggle_default: False +# .. toggle_description: When generating grade reports, write chunks to disk to avoid out of memory errors. +# .. toggle_use_cases: temporary +# .. toggle_creation_date: 2022-12-01 +# .. toggle_target_removal_date: 2022-01-10 +# .. toggle_tickets: AU-926 +USE_ON_DISK_GRADE_REPORTING = CourseWaffleFlag( + f'{WAFFLE_NAMESPACE}.use_on_disk_grade_reporting', __name__ +) + def optimize_get_learners_switch_enabled(): """ @@ -48,3 +60,12 @@ def course_grade_report_verified_only(course_id): False otherwise. """ return GENERATE_COURSE_GRADE_REPORT_VERIFIED_ONLY.is_enabled(course_id) + + +def use_on_disk_grade_reporting(course_id): + """ + Returns True if problem grade reports should write + chunks to disk rather than holding all in memory. + False otherwise. + """ + return USE_ON_DISK_GRADE_REPORTING.is_enabled(course_id) diff --git a/lms/djangoapps/instructor_task/tasks_helper/grades.py b/lms/djangoapps/instructor_task/tasks_helper/grades.py index e49a51c10b..23220ceaae 100644 --- a/lms/djangoapps/instructor_task/tasks_helper/grades.py +++ b/lms/djangoapps/instructor_task/tasks_helper/grades.py @@ -2,11 +2,14 @@ Functionality for generating grade reports. """ +import csv import logging import re from collections import OrderedDict, defaultdict from datetime import datetime from itertools import chain +from tempfile import TemporaryFile + from sys import getsizeof from time import time @@ -32,7 +35,8 @@ from lms.djangoapps.instructor_analytics.csvs import format_dictlist from lms.djangoapps.instructor_task.config.waffle import ( course_grade_report_verified_only, optimize_get_learners_switch_enabled, - problem_grade_report_verified_only + problem_grade_report_verified_only, + use_on_disk_grade_reporting, ) from lms.djangoapps.teams.models import CourseTeamMembership from lms.djangoapps.verify_student.services import IDVerificationService @@ -46,7 +50,7 @@ from xmodule.partitions.partitions_service import PartitionService # lint-amnes from xmodule.split_test_module import get_split_user_partitions # lint-amnesty, pylint: disable=wrong-import-order from .runner import TaskProgress -from .utils import upload_csv_to_report_store +from .utils import upload_csv_to_report_store, upload_csv_file_to_report_store TASK_LOG = logging.getLogger('edx.celery.task') @@ -747,8 +751,12 @@ class ProblemGradeReport(GradeReportBase): """ with modulestore().bulk_operations(course_id): context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name) - # pylint: disable=protected-access - return ProblemGradeReport()._generate(context) + if use_on_disk_grade_reporting(course_id): # AU-926 + # pylint: disable=protected-access + return TempFileProblemGradeReport()._generate(context) + else: + # pylint: disable=protected-access + return ProblemGradeReport()._generate(context) def _generate(self, context): """ @@ -872,6 +880,81 @@ class ProblemGradeReport(GradeReportBase): get_cache(CourseEnrollment.MODE_CACHE_NAMESPACE).clear() +class TempFileProblemGradeReport(ProblemGradeReport): + """ + ProblemGradeReport that instead of holding all resultant file info in memory, + writes chunked data to disk. + """ + def _generate(self, context): + """ + Generate a CSV containing all students' problem grades within a given `course_id`. + """ + context.update_status('TempFileProblemGradeReport - 1: Starting problem grades') + batched_rows = self._batched_rows(context) + + with TemporaryFile('r+') as success_file, TemporaryFile('r+') as error_file: + context.update_status('TempFileProblemGradeReport - 2: Compiling grades into temp files') + has_errors = self.iter_and_write_batched_rows(context, success_file, error_file, batched_rows) + + context.update_status('TempFileProblemGradeReport - 3: Uploading files') + self.upload_temp_files(context, success_file, error_file, has_errors) + + return context.update_status('ProblemGradeReport - 4: Completed problem grades') + + def iter_and_write_batched_rows(self, context, success_file, error_file, batched_rows): + """ + Iterate through batched rows, writing returned chunks to disk as we go. + This should hopefully help us avoid out of memory errors. + """ + context.task_progress.succeeded = 0 + context.task_progress.failed = 0 + + success_writer = csv.writer(success_file) + error_writer = csv.writer(error_file) + + # Write headers + success_writer.writerow(self._success_headers(context)) + error_writer.writerow(self._error_headers()) + + # Iterate through batched rows, writing to temp file + for success_rows, error_rows in batched_rows: + context.task_progress.succeeded += len(success_rows) + success_writer.writerows(success_rows) + if len(error_rows) > 0: + context.task_progress.failed += len(error_rows) + error_writer.writerows(error_rows) + + context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed + context.task_progress.total = context.task_progress.attempted + + return context.task_progress.failed > 0 + + def upload_temp_files(self, context, success_file, error_file, has_errors): + """ + Uploads success and error csv files to report store + """ + date = datetime.now(UTC) + + success_file.seek(0) + upload_csv_file_to_report_store( + success_file, + context.upload_filename, + context.course_id, + date, + parent_dir=context.upload_parent_dir + ) + + if has_errors: + error_file.seek(0) + upload_csv_file_to_report_store( + error_file, + context.upload_filename + '_err', + context.course_id, + date, + parent_dir=context.upload_parent_dir + ) + + class ProblemResponses: """ Class to encapsulate functionality related to generating Problem Responses Reports. diff --git a/lms/djangoapps/instructor_task/tasks_helper/utils.py b/lms/djangoapps/instructor_task/tasks_helper/utils.py index 3929126ea0..e3e6779f4c 100644 --- a/lms/djangoapps/instructor_task/tasks_helper/utils.py +++ b/lms/djangoapps/instructor_task/tasks_helper/utils.py @@ -49,6 +49,31 @@ def upload_csv_to_report_store(rows, csv_name, course_id, timestamp, config_name return report_name +def upload_csv_file_to_report_store(file, csv_name, course_id, timestamp, config_name='GRADES_DOWNLOAD', parent_dir=''): + """ + Upload data as a CSV using ReportStore. + + Arguments: + rows: CSV data in a file-like object + csv_name: Name of the resulting CSV + course_id: ID of the course + parent_dor: Name of the directory where the CSV file will be stored + + Returns: + report_name: string - Name of the generated report + """ + report_store = ReportStore.from_config(config_name) + report_name = "{course_prefix}_{csv_name}_{timestamp_str}.csv".format( + course_prefix=course_filename_prefix_generator(course_id), + csv_name=csv_name, + timestamp_str=timestamp.strftime("%Y-%m-%d-%H%M") + ) + + report_store.store(course_id, report_name, file, parent_dir) + tracker_emit(csv_name) + return report_name + + def upload_zip_to_report_store(file, zip_name, course_id, timestamp, config_name='GRADES_DOWNLOAD'): """ Upload given file buffer as a zip file using ReportStore. diff --git a/lms/djangoapps/instructor_task/tests/test_tasks_helper.py b/lms/djangoapps/instructor_task/tests/test_tasks_helper.py index 1b2ddccab1..58d0cbb4e8 100644 --- a/lms/djangoapps/instructor_task/tests/test_tasks_helper.py +++ b/lms/djangoapps/instructor_task/tests/test_tasks_helper.py @@ -44,7 +44,8 @@ from lms.djangoapps.instructor_task.tasks_helper.grades import ( NOT_ENROLLED_IN_COURSE, CourseGradeReport, ProblemGradeReport, - ProblemResponses + ProblemResponses, + TempFileProblemGradeReport, ) from lms.djangoapps.instructor_task.tasks_helper.misc import ( cohort_students_and_upload, @@ -793,12 +794,13 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.csv_header_row = ['Student ID', 'Email', 'Username', 'Enrollment Status', 'Grade'] @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - def test_no_problems(self, _get_current_task): + @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) + def test_no_problems(self, problem_grade_report_class, _): """ Verify that we see no grade information for a course with no graded problems. """ - result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') + result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) self.verify_rows_in_csv([ dict(list(zip( @@ -812,7 +814,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): ]) @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - def test_single_problem(self, _get_current_task): + @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) + def test_single_problem(self, problem_grade_report_class, _): vertical = ItemFactory.create( parent_location=self.problem_section.location, category='vertical', @@ -822,7 +825,7 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.define_option_problem('Problem1', parent=vertical) self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1']) - result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') + result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result) problem_name = 'Homework 1: Subsection - Problem1' header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)'] @@ -850,7 +853,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): ]) @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - def test_single_problem_verified_student_only(self, _get_current_task): + @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) + def test_single_problem_verified_student_only(self, problem_grade_report_class, _): with patch( 'lms.djangoapps.instructor_task.tasks_helper.grades.problem_grade_report_verified_only', return_value=True, @@ -866,13 +870,14 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1']) self.submit_student_answer(student_verified.username, 'Problem1', ['Option 1']) - result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') + result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset( {'action_name': 'graded', 'attempted': 1, 'succeeded': 1, 'failed': 0}, result ) @patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') - def test_inactive_enrollment_included(self, _get_current_task): + @ddt.data(ProblemGradeReport, TempFileProblemGradeReport) + def test_inactive_enrollment_included(self, problem_grade_report_class, _): """ Students with inactive enrollments in a course should be included in Problem Grade Report. """ @@ -886,7 +891,7 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase): self.define_option_problem('Problem1', parent=vertical) self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1']) - result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded') + result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded') self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 3, 'succeeded': 3, 'failed': 0}, result) problem_name = 'Homework 1: Subsection - Problem1' header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)']