feat: add problemgradereport config for writing to disk (#31385)
in the problemgradereport currently we must currently hold the entire file in memory before writing it all at once. to avoid out of memory celery issues to resolve a blocking bug for some MIT courses, add a temp waffle flag `instructor_task.use_on_disk_grade_reporting` which when activated uses this new report to (hopefully) allow the report to complete. Additional testing and consideration is required for this approach.
This commit is contained in:
@@ -24,6 +24,18 @@ GENERATE_COURSE_GRADE_REPORT_VERIFIED_ONLY = CourseWaffleFlag( # lint-amnesty,
|
||||
f'{WAFFLE_NAMESPACE}.generate_course_grade_report_verified_only', __name__
|
||||
)
|
||||
|
||||
# .. toggle_name: instructor_task.use_on_disk_grade_reporting
|
||||
# .. toggle_implementation: CourseWaffleFlag
|
||||
# .. toggle_default: False
|
||||
# .. toggle_description: When generating grade reports, write chunks to disk to avoid out of memory errors.
|
||||
# .. toggle_use_cases: temporary
|
||||
# .. toggle_creation_date: 2022-12-01
|
||||
# .. toggle_target_removal_date: 2022-01-10
|
||||
# .. toggle_tickets: AU-926
|
||||
USE_ON_DISK_GRADE_REPORTING = CourseWaffleFlag(
|
||||
f'{WAFFLE_NAMESPACE}.use_on_disk_grade_reporting', __name__
|
||||
)
|
||||
|
||||
|
||||
def optimize_get_learners_switch_enabled():
|
||||
"""
|
||||
@@ -48,3 +60,12 @@ def course_grade_report_verified_only(course_id):
|
||||
False otherwise.
|
||||
"""
|
||||
return GENERATE_COURSE_GRADE_REPORT_VERIFIED_ONLY.is_enabled(course_id)
|
||||
|
||||
|
||||
def use_on_disk_grade_reporting(course_id):
|
||||
"""
|
||||
Returns True if problem grade reports should write
|
||||
chunks to disk rather than holding all in memory.
|
||||
False otherwise.
|
||||
"""
|
||||
return USE_ON_DISK_GRADE_REPORTING.is_enabled(course_id)
|
||||
|
||||
@@ -2,11 +2,14 @@
|
||||
Functionality for generating grade reports.
|
||||
"""
|
||||
|
||||
import csv
|
||||
import logging
|
||||
import re
|
||||
from collections import OrderedDict, defaultdict
|
||||
from datetime import datetime
|
||||
from itertools import chain
|
||||
from tempfile import TemporaryFile
|
||||
|
||||
from sys import getsizeof
|
||||
from time import time
|
||||
|
||||
@@ -32,7 +35,8 @@ from lms.djangoapps.instructor_analytics.csvs import format_dictlist
|
||||
from lms.djangoapps.instructor_task.config.waffle import (
|
||||
course_grade_report_verified_only,
|
||||
optimize_get_learners_switch_enabled,
|
||||
problem_grade_report_verified_only
|
||||
problem_grade_report_verified_only,
|
||||
use_on_disk_grade_reporting,
|
||||
)
|
||||
from lms.djangoapps.teams.models import CourseTeamMembership
|
||||
from lms.djangoapps.verify_student.services import IDVerificationService
|
||||
@@ -46,7 +50,7 @@ from xmodule.partitions.partitions_service import PartitionService # lint-amnes
|
||||
from xmodule.split_test_module import get_split_user_partitions # lint-amnesty, pylint: disable=wrong-import-order
|
||||
|
||||
from .runner import TaskProgress
|
||||
from .utils import upload_csv_to_report_store
|
||||
from .utils import upload_csv_to_report_store, upload_csv_file_to_report_store
|
||||
|
||||
TASK_LOG = logging.getLogger('edx.celery.task')
|
||||
|
||||
@@ -747,8 +751,12 @@ class ProblemGradeReport(GradeReportBase):
|
||||
"""
|
||||
with modulestore().bulk_operations(course_id):
|
||||
context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name)
|
||||
# pylint: disable=protected-access
|
||||
return ProblemGradeReport()._generate(context)
|
||||
if use_on_disk_grade_reporting(course_id): # AU-926
|
||||
# pylint: disable=protected-access
|
||||
return TempFileProblemGradeReport()._generate(context)
|
||||
else:
|
||||
# pylint: disable=protected-access
|
||||
return ProblemGradeReport()._generate(context)
|
||||
|
||||
def _generate(self, context):
|
||||
"""
|
||||
@@ -872,6 +880,81 @@ class ProblemGradeReport(GradeReportBase):
|
||||
get_cache(CourseEnrollment.MODE_CACHE_NAMESPACE).clear()
|
||||
|
||||
|
||||
class TempFileProblemGradeReport(ProblemGradeReport):
|
||||
"""
|
||||
ProblemGradeReport that instead of holding all resultant file info in memory,
|
||||
writes chunked data to disk.
|
||||
"""
|
||||
def _generate(self, context):
|
||||
"""
|
||||
Generate a CSV containing all students' problem grades within a given `course_id`.
|
||||
"""
|
||||
context.update_status('TempFileProblemGradeReport - 1: Starting problem grades')
|
||||
batched_rows = self._batched_rows(context)
|
||||
|
||||
with TemporaryFile('r+') as success_file, TemporaryFile('r+') as error_file:
|
||||
context.update_status('TempFileProblemGradeReport - 2: Compiling grades into temp files')
|
||||
has_errors = self.iter_and_write_batched_rows(context, success_file, error_file, batched_rows)
|
||||
|
||||
context.update_status('TempFileProblemGradeReport - 3: Uploading files')
|
||||
self.upload_temp_files(context, success_file, error_file, has_errors)
|
||||
|
||||
return context.update_status('ProblemGradeReport - 4: Completed problem grades')
|
||||
|
||||
def iter_and_write_batched_rows(self, context, success_file, error_file, batched_rows):
|
||||
"""
|
||||
Iterate through batched rows, writing returned chunks to disk as we go.
|
||||
This should hopefully help us avoid out of memory errors.
|
||||
"""
|
||||
context.task_progress.succeeded = 0
|
||||
context.task_progress.failed = 0
|
||||
|
||||
success_writer = csv.writer(success_file)
|
||||
error_writer = csv.writer(error_file)
|
||||
|
||||
# Write headers
|
||||
success_writer.writerow(self._success_headers(context))
|
||||
error_writer.writerow(self._error_headers())
|
||||
|
||||
# Iterate through batched rows, writing to temp file
|
||||
for success_rows, error_rows in batched_rows:
|
||||
context.task_progress.succeeded += len(success_rows)
|
||||
success_writer.writerows(success_rows)
|
||||
if len(error_rows) > 0:
|
||||
context.task_progress.failed += len(error_rows)
|
||||
error_writer.writerows(error_rows)
|
||||
|
||||
context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed
|
||||
context.task_progress.total = context.task_progress.attempted
|
||||
|
||||
return context.task_progress.failed > 0
|
||||
|
||||
def upload_temp_files(self, context, success_file, error_file, has_errors):
|
||||
"""
|
||||
Uploads success and error csv files to report store
|
||||
"""
|
||||
date = datetime.now(UTC)
|
||||
|
||||
success_file.seek(0)
|
||||
upload_csv_file_to_report_store(
|
||||
success_file,
|
||||
context.upload_filename,
|
||||
context.course_id,
|
||||
date,
|
||||
parent_dir=context.upload_parent_dir
|
||||
)
|
||||
|
||||
if has_errors:
|
||||
error_file.seek(0)
|
||||
upload_csv_file_to_report_store(
|
||||
error_file,
|
||||
context.upload_filename + '_err',
|
||||
context.course_id,
|
||||
date,
|
||||
parent_dir=context.upload_parent_dir
|
||||
)
|
||||
|
||||
|
||||
class ProblemResponses:
|
||||
"""
|
||||
Class to encapsulate functionality related to generating Problem Responses Reports.
|
||||
|
||||
@@ -49,6 +49,31 @@ def upload_csv_to_report_store(rows, csv_name, course_id, timestamp, config_name
|
||||
return report_name
|
||||
|
||||
|
||||
def upload_csv_file_to_report_store(file, csv_name, course_id, timestamp, config_name='GRADES_DOWNLOAD', parent_dir=''):
|
||||
"""
|
||||
Upload data as a CSV using ReportStore.
|
||||
|
||||
Arguments:
|
||||
rows: CSV data in a file-like object
|
||||
csv_name: Name of the resulting CSV
|
||||
course_id: ID of the course
|
||||
parent_dor: Name of the directory where the CSV file will be stored
|
||||
|
||||
Returns:
|
||||
report_name: string - Name of the generated report
|
||||
"""
|
||||
report_store = ReportStore.from_config(config_name)
|
||||
report_name = "{course_prefix}_{csv_name}_{timestamp_str}.csv".format(
|
||||
course_prefix=course_filename_prefix_generator(course_id),
|
||||
csv_name=csv_name,
|
||||
timestamp_str=timestamp.strftime("%Y-%m-%d-%H%M")
|
||||
)
|
||||
|
||||
report_store.store(course_id, report_name, file, parent_dir)
|
||||
tracker_emit(csv_name)
|
||||
return report_name
|
||||
|
||||
|
||||
def upload_zip_to_report_store(file, zip_name, course_id, timestamp, config_name='GRADES_DOWNLOAD'):
|
||||
"""
|
||||
Upload given file buffer as a zip file using ReportStore.
|
||||
|
||||
@@ -44,7 +44,8 @@ from lms.djangoapps.instructor_task.tasks_helper.grades import (
|
||||
NOT_ENROLLED_IN_COURSE,
|
||||
CourseGradeReport,
|
||||
ProblemGradeReport,
|
||||
ProblemResponses
|
||||
ProblemResponses,
|
||||
TempFileProblemGradeReport,
|
||||
)
|
||||
from lms.djangoapps.instructor_task.tasks_helper.misc import (
|
||||
cohort_students_and_upload,
|
||||
@@ -793,12 +794,13 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
|
||||
self.csv_header_row = ['Student ID', 'Email', 'Username', 'Enrollment Status', 'Grade']
|
||||
|
||||
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
|
||||
def test_no_problems(self, _get_current_task):
|
||||
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
|
||||
def test_no_problems(self, problem_grade_report_class, _):
|
||||
"""
|
||||
Verify that we see no grade information for a course with no graded
|
||||
problems.
|
||||
"""
|
||||
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
|
||||
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
|
||||
self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result)
|
||||
self.verify_rows_in_csv([
|
||||
dict(list(zip(
|
||||
@@ -812,7 +814,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
|
||||
])
|
||||
|
||||
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
|
||||
def test_single_problem(self, _get_current_task):
|
||||
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
|
||||
def test_single_problem(self, problem_grade_report_class, _):
|
||||
vertical = ItemFactory.create(
|
||||
parent_location=self.problem_section.location,
|
||||
category='vertical',
|
||||
@@ -822,7 +825,7 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
|
||||
self.define_option_problem('Problem1', parent=vertical)
|
||||
|
||||
self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1'])
|
||||
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
|
||||
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
|
||||
self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result)
|
||||
problem_name = 'Homework 1: Subsection - Problem1'
|
||||
header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)']
|
||||
@@ -850,7 +853,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
|
||||
])
|
||||
|
||||
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
|
||||
def test_single_problem_verified_student_only(self, _get_current_task):
|
||||
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
|
||||
def test_single_problem_verified_student_only(self, problem_grade_report_class, _):
|
||||
with patch(
|
||||
'lms.djangoapps.instructor_task.tasks_helper.grades.problem_grade_report_verified_only',
|
||||
return_value=True,
|
||||
@@ -866,13 +870,14 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
|
||||
|
||||
self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1'])
|
||||
self.submit_student_answer(student_verified.username, 'Problem1', ['Option 1'])
|
||||
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
|
||||
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
|
||||
self.assertDictContainsSubset(
|
||||
{'action_name': 'graded', 'attempted': 1, 'succeeded': 1, 'failed': 0}, result
|
||||
)
|
||||
|
||||
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
|
||||
def test_inactive_enrollment_included(self, _get_current_task):
|
||||
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
|
||||
def test_inactive_enrollment_included(self, problem_grade_report_class, _):
|
||||
"""
|
||||
Students with inactive enrollments in a course should be included in Problem Grade Report.
|
||||
"""
|
||||
@@ -886,7 +891,7 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
|
||||
self.define_option_problem('Problem1', parent=vertical)
|
||||
|
||||
self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1'])
|
||||
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
|
||||
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
|
||||
self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 3, 'succeeded': 3, 'failed': 0}, result)
|
||||
problem_name = 'Homework 1: Subsection - Problem1'
|
||||
header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)']
|
||||
|
||||
Reference in New Issue
Block a user