feat: Build out temporary fix for TempFiles into a more robust solution (#31458)

* test: add test for d'optimize_get_learners_switch'

* feat: on-disk reporting
This commit is contained in:
Jansen Kantor
2022-12-22 09:33:31 -05:00
committed by GitHub
parent 20fac60c0c
commit 6431b97c70
2 changed files with 339 additions and 472 deletions

View File

@@ -10,7 +10,6 @@ from datetime import datetime
from itertools import chain
from tempfile import TemporaryFile
from sys import getsizeof
from time import time
from django.conf import settings
@@ -34,7 +33,6 @@ from lms.djangoapps.instructor_analytics.basic import list_problem_responses
from lms.djangoapps.instructor_analytics.csvs import format_dictlist
from lms.djangoapps.instructor_task.config.waffle import (
course_grade_report_verified_only,
optimize_get_learners_switch_enabled,
problem_grade_report_verified_only,
use_on_disk_grade_reporting,
)
@@ -74,151 +72,6 @@ def _flatten(iterable):
return list(chain.from_iterable(iterable))
class GradeReportBase:
"""
Base class for grade reports (ProblemGradeReport and CourseGradeReport).
"""
def _get_enrolled_learner_count(self, context):
"""
Returns count of number of learner enrolled in course.
"""
return CourseEnrollment.objects.users_enrolled_in(
course_id=context.course_id,
include_inactive=True,
verified_only=context.report_for_verified_only,
).count()
def log_task_info(self, context, message):
"""
Updates the status on the celery task to the given message.
Also logs the update.
"""
fmt = 'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
task_info_string = fmt.format(
task_id=context.task_id,
entry_id=context.entry_id,
course_id=context.course_id,
task_input=context.task_input
)
TASK_LOG.info('%s, Task type: %s, %s, %s', task_info_string, context.action_name,
message, context.task_progress.state)
def _handle_empty_generator(self, generator, default):
"""
Handle empty generator.
Return default if the generator is emtpy, otherwise return all
its iterations (including the first which was used for validation).
"""
TASK_LOG.info('GradeReport: Checking generator')
empty_generator_sentinel = object()
first_iteration_output = next(generator, empty_generator_sentinel)
generator_is_empty = first_iteration_output == empty_generator_sentinel
if generator_is_empty:
TASK_LOG.info('GradeReport: Generator is empty')
yield default
else:
TASK_LOG.info('GradeReport: Generator is not empty')
yield first_iteration_output
yield from generator
def _batch_users(self, context):
"""
Returns a generator of batches of users.
"""
def grouper(iterable, chunk_size=100, fillvalue=None):
args = [iter(iterable)] * chunk_size
return zip_longest(*args, fillvalue=fillvalue)
def get_enrolled_learners_for_course(course_id, verified_only=False):
"""
Get all the enrolled users in a course chunk by chunk.
This generator method fetches & loads the enrolled user objects on demand which in chunk
size defined. This method is a workaround to avoid out-of-memory errors.
"""
self.log_additional_info_for_testing(
context,
'ProblemGradeReport: Starting batching of enrolled students'
)
filter_kwargs = {
'courseenrollment__course_id': course_id,
}
if verified_only:
filter_kwargs['courseenrollment__mode'] = CourseMode.VERIFIED
user_ids_list = get_user_model().objects.filter(**filter_kwargs).values_list('id', flat=True).order_by('id')
user_chunks = grouper(user_ids_list)
for user_ids in user_chunks:
user_ids = [user_id for user_id in user_ids if user_id is not None]
min_id = min(user_ids)
max_id = max(user_ids)
users = get_user_model().objects.filter(
id__gte=min_id,
id__lte=max_id,
**filter_kwargs
).select_related('profile')
self.log_additional_info_for_testing(context, 'ProblemGradeReport: user chunk yielded successfully')
yield users
course_id = context.course_id
return get_enrolled_learners_for_course(course_id=course_id, verified_only=context.report_for_verified_only)
def _compile(self, context, batched_rows):
"""
Compiles and returns the complete list of (success_rows, error_rows) for
the given batched_rows and context.
"""
# partition and chain successes and errors
self.log_additional_info_for_testing(context, "Begin Zipping Batched Rows")
success_rows, error_rows = zip(*batched_rows)
self.log_additional_info_for_testing(context, "Evaluating Success Rows")
success_rows = list(chain(*success_rows))
self.log_additional_info_for_testing(context, "Evaluating Error Rows")
error_rows = list(chain(*error_rows))
self.log_additional_info_for_testing(context, "Compilation complete")
# update metrics on task status
context.task_progress.succeeded = len(success_rows)
context.task_progress.failed = len(error_rows)
context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed
context.task_progress.total = context.task_progress.attempted
return success_rows, error_rows
def _upload(self, context, success_rows, error_rows):
"""
Creates and uploads a CSV for the given headers and rows.
"""
date = datetime.now(UTC)
upload_csv_to_report_store(
success_rows,
context.upload_filename,
context.course_id,
date,
parent_dir=context.upload_parent_dir
)
if len(error_rows) > 1:
upload_csv_to_report_store(
error_rows,
context.upload_filename + '_err',
context.course_id,
date,
parent_dir=context.upload_parent_dir
)
def log_additional_info_for_testing(self, context, message):
"""
Investigation logs for test problem grade report.
TODO -- Remove as a part of PROD-1287
"""
context.update_status(message)
class _CourseGradeReportContext:
"""
Internal class that provides a common context to use for a single grade
@@ -423,72 +276,53 @@ class _CourseGradeBulkContext: # lint-amnesty, pylint: disable=missing-class-do
BulkCourseTags.prefetch(context.course_id, users)
class CourseGradeReport:
class InMemoryReportMixin:
"""
Class to encapsulate functionality related to generating Grade Reports.
Mixin for a file report that will generate file in memory and then upload to report store
"""
# Batch size for chunking the list of enrollees in the course.
USER_BATCH_SIZE = 100
@classmethod
def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
"""
Public method to generate a grade report.
"""
with modulestore().bulk_operations(course_id):
context = _CourseGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name)
return CourseGradeReport()._generate(context) # lint-amnesty, pylint: disable=protected-access
def _generate(self, context):
def _generate(self):
"""
Internal method for generating a grade report for the given context.
"""
context.update_status('Starting grades')
success_headers = self._success_headers(context)
self.context.update_status('InMemoryReportMixin - 1: Starting grade report')
success_headers = self._success_headers()
error_headers = self._error_headers()
batched_rows = self._batched_rows(context)
batched_rows = self._batched_rows()
context.update_status('Compiling grades')
success_rows, error_rows = self._compile(context, batched_rows)
self.context.update_status('InMemoryReportMixin - 2: Compiling grades')
success_rows, error_rows = self._compile(batched_rows)
context.update_status('Uploading grades')
self._upload(context, success_headers, success_rows, error_headers, error_rows)
self.context.update_status('InMemoryReportMixin - 3: Uploading grades')
self._upload(success_headers, success_rows, error_headers, error_rows)
return context.update_status('Completed grades')
return self.context.update_status('InMemoryReportMixin - 4: Completed grades')
def _success_headers(self, context):
def _upload(self, success_headers, success_rows, error_headers, error_rows):
"""
Returns a list of all applicable column headers for this grade report.
Creates and uploads a CSV for the given headers and rows.
"""
return (
["Student ID", "Email", "Username"] +
self._grades_header(context) +
(['Cohort Name'] if context.cohorts_enabled else []) +
[f'Experiment Group ({partition.name})' for partition in context.course_experiments] +
(['Team Name'] if context.teams_enabled else []) +
['Enrollment Track', 'Verification Status'] +
['Certificate Eligible', 'Certificate Delivered', 'Certificate Type'] +
['Enrollment Status']
date = datetime.now(UTC)
upload_csv_to_report_store(
[success_headers] + success_rows,
self.context.upload_filename,
self.context.course_id,
date,
parent_dir=self.context.upload_parent_dir
)
def _error_headers(self):
"""
Returns a list of error headers for this grade report.
"""
return ["Student ID", "Username", "Error"]
if len(error_rows) > 0:
upload_csv_to_report_store(
[error_headers] + error_rows,
self.context.upload_filename + '_err',
self.context.course_id,
date,
parent_dir=self.context.upload_parent_dir
)
def _batched_rows(self, context):
"""
A generator of batches of (success_rows, error_rows) for this report.
"""
for users in self._batch_users(context):
users = [u for u in users if u is not None]
yield self._rows_for_users(context, users)
def _compile(self, context, batched_rows):
def _compile(self, batched_rows):
"""
Compiles and returns the complete list of (success_rows, error_rows) for
the given batched_rows and context.
the given batched_rows.
"""
# partition and chain successes and errors
success_rows, error_rows = zip(*batched_rows)
@@ -496,84 +330,128 @@ class CourseGradeReport:
error_rows = list(chain(*error_rows))
# update metrics on task status
context.task_progress.succeeded = len(success_rows)
context.task_progress.failed = len(error_rows)
context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed
context.task_progress.total = context.task_progress.attempted
self.context.task_progress.succeeded = len(success_rows)
self.context.task_progress.failed = len(error_rows)
self.context.task_progress.attempted = self.context.task_progress.succeeded + self.context.task_progress.failed
self.context.task_progress.total = self.context.task_progress.attempted
return success_rows, error_rows
def _upload(self, context, success_headers, success_rows, error_headers, error_rows):
class TemporaryFileReportMixin:
"""
Mixin for a file report that will write rows iteratively to a TempFile
"""
def _generate(self):
"""
Creates and uploads a CSV for the given headers and rows.
Generate a CSV containing all students' problem grades within a given `course_id`.
"""
self.context.update_status('TemporaryFileReportMixin - 1: Starting grade report')
batched_rows = self._batched_rows()
with TemporaryFile('r+') as success_file, TemporaryFile('r+') as error_file:
self.context.update_status('TemporaryFileReportMixin - 2: Compiling grades into temp files')
has_errors = self.iter_and_write_batched_rows(batched_rows, success_file, error_file)
self.context.update_status('TemporaryFileReportMixin - 3: Uploading files')
self.upload_temp_files(success_file, error_file, has_errors)
return self.context.update_status('TemporaryFileReportMixin - 4: Completed grades')
def iter_and_write_batched_rows(self, batched_rows, success_file, error_file):
"""
Iterate through batched rows, writing returned chunks to disk as we go.
This should hopefully help us avoid out of memory errors.
"""
success_writer = csv.writer(success_file)
error_writer = csv.writer(error_file)
# Write headers
success_writer.writerow(self._success_headers())
error_writer.writerow(self._error_headers())
succeeded, failed = 0, 0
# Iterate through batched rows, writing to temp file
for success_rows, error_rows in batched_rows:
success_writer.writerows(success_rows)
if len(error_rows) > 0:
error_writer.writerows(error_rows)
succeeded += len(success_rows)
failed += len(error_rows)
self.context.task_progress.succeeded = succeeded
self.context.task_progress.failed = failed
self.context.task_progress.attempted = succeeded + failed
self.context.task_progress.total = self.context.task_progress.attempted
return self.context.task_progress.failed > 0
def upload_temp_files(self, success_file, error_file, has_errors):
"""
Uploads success and error csv files to report store
"""
date = datetime.now(UTC)
upload_csv_to_report_store(
[success_headers] + success_rows,
context.upload_filename,
context.course_id,
success_file.seek(0)
upload_csv_file_to_report_store(
success_file,
self.context.upload_filename,
self.context.course_id,
date,
parent_dir=context.upload_parent_dir
parent_dir=self.context.upload_parent_dir
)
if len(error_rows) > 0:
upload_csv_to_report_store(
[error_headers] + error_rows,
'{}_err'.format(context.upload_filename),
context.course_id,
if has_errors:
error_file.seek(0)
upload_csv_file_to_report_store(
error_file,
self.context.upload_filename + '_err',
self.context.course_id,
date,
parent_dir=context.upload_parent_dir
parent_dir=self.context.upload_parent_dir
)
def _grades_header(self, context):
"""
Returns the applicable grades-related headers for this report.
"""
graded_assignments = context.graded_assignments
grades_header = ["Grade"]
for assignment_info in graded_assignments.values():
if assignment_info['separate_subsection_avg_headers']:
grades_header.extend(assignment_info['subsection_headers'].values())
grades_header.append(assignment_info['average_header'])
return grades_header
def _batch_users(self, context):
class GradeReportBase:
"""
Base class for grade reports (ProblemGradeReport and CourseGradeReport).
"""
def __init__(self, context):
self.context = context
def _get_enrolled_learner_count(self):
"""
Returns count of number of learner enrolled in course.
"""
return CourseEnrollment.objects.users_enrolled_in(
course_id=self.context.course_id,
include_inactive=True,
verified_only=self.context.report_for_verified_only,
).count()
def log_task_info(self, message):
"""
Updates the status on the celery task to the given message.
Also logs the update.
"""
fmt = 'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
task_info_string = fmt.format(
task_id=self.context.task_id,
entry_id=self.context.entry_id,
course_id=self.context.course_id,
task_input=self.context.task_input
)
TASK_LOG.info('%s, Task type: %s, %s, %s', task_info_string, self.context.action_name,
message, self.context.task_progress.state)
def _batch_users(self):
"""
Returns a generator of batches of users.
"""
def grouper(iterable, chunk_size=self.USER_BATCH_SIZE, fillvalue=None):
def grouper(iterable, chunk_size=100, fillvalue=None):
args = [iter(iterable)] * chunk_size
return zip_longest(*args, fillvalue=fillvalue)
def get_enrolled_learners_for_course(course_id, verified_only=False):
"""
Get enrolled learners in a course.
Arguments:
course_id (CourseLocator): course_id to return enrollees for.
verified_only (boolean): is a boolean when True, returns only verified enrollees.
"""
if optimize_get_learners_switch_enabled():
TASK_LOG.info('%s, Creating Course Grade with optimization', task_log_message)
return users_for_course_v2(course_id, verified_only=verified_only)
TASK_LOG.info('%s, Creating Course Grade without optimization', task_log_message)
return users_for_course(course_id, verified_only=verified_only)
def users_for_course(course_id, verified_only=False):
"""
Get all the enrolled users in a course.
This method fetches & loads the enrolled user objects at once which may cause
out-of-memory errors in large courses. This method will be removed when
`OPTIMIZE_GET_LEARNERS_FOR_COURSE` waffle flag is removed.
"""
users = CourseEnrollment.objects.users_enrolled_in(
course_id,
include_inactive=True,
verified_only=verified_only,
)
users = users.select_related('profile')
return grouper(users)
def users_for_course_v2(course_id, verified_only=False):
"""
Get all the enrolled users in a course chunk by chunk.
This generator method fetches & loads the enrolled user objects on demand which in chunk
@@ -596,18 +474,126 @@ class CourseGradeReport:
id__lte=max_id,
**filter_kwargs
).select_related('profile')
yield users
course_id = context.course_id
task_log_message = f'{context.task_info_string}, Task type: {context.action_name}'
return get_enrolled_learners_for_course(course_id=course_id, verified_only=context.report_for_verified_only)
def _user_grades(self, course_grade, context):
yield users
return get_enrolled_learners_for_course(
course_id=self.context.course_id,
verified_only=self.context.report_for_verified_only
)
def log_additional_info_for_testing(self, message):
"""
Investigation logs for test problem grade report.
TODO -- Remove as a part of PROD-1287
"""
self.context.update_status(message)
def _clear_caches(self):
"""
Override if a report type wants to clear caches after a batch of learners has
been processed
"""
def _batched_rows(self):
"""
A generator of batches of (success_rows, error_rows) for this report.
"""
for users in self._batch_users():
yield self._rows_for_users(users)
self._clear_caches()
class CourseGradeReport(GradeReportBase):
"""
Class to encapsulate functionality related to generating user/row had header data for Corse Grade Reports.
"""
# Batch size for chunking the list of enrollees in the course.
USER_BATCH_SIZE = 100
@classmethod
def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
"""
Public method to generate a grade report.
"""
with modulestore().bulk_operations(course_id):
context = _CourseGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name)
if use_on_disk_grade_reporting(course_id): # AU-926
return TempFileCourseGradeReport(context)._generate() # pylint: disable=protected-access
else:
return InMemoryCourseGradeReport(context)._generate() # pylint: disable=protected-access
def _success_headers(self):
"""
Returns a list of all applicable column headers for this grade report.
"""
return (
["Student ID", "Email", "Username"] +
self._grades_header() +
(['Cohort Name'] if self.context.cohorts_enabled else []) +
[f'Experiment Group ({partition.name})' for partition in self.context.course_experiments] +
(['Team Name'] if self.context.teams_enabled else []) +
['Enrollment Track', 'Verification Status'] +
['Certificate Eligible', 'Certificate Delivered', 'Certificate Type'] +
['Enrollment Status']
)
def _error_headers(self):
"""
Returns a list of error headers for this grade report.
"""
return ["Student ID", "Username", "Error"]
def _grades_header(self):
"""
Returns the applicable grades-related headers for this report.
"""
graded_assignments = self.context.graded_assignments
grades_header = ["Grade"]
for assignment_info in graded_assignments.values():
if assignment_info['separate_subsection_avg_headers']:
grades_header.extend(assignment_info['subsection_headers'].values())
grades_header.append(assignment_info['average_header'])
return grades_header
def _rows_for_users(self, users):
"""
Returns a list of rows for the given users for this report.
"""
with modulestore().bulk_operations(self.context.course_id):
bulk_context = _CourseGradeBulkContext(self.context, users)
success_rows, error_rows = [], []
for user, course_grade, error in CourseGradeFactory().iter(
users,
course=self.context.course,
collected_block_structure=self.context.course_structure,
course_key=self.context.course_id,
):
if not course_grade:
# An empty gradeset means we failed to grade a student.
error_rows.append([user.id, user.username, str(error)])
else:
success_rows.append(
[user.id, user.email, user.username] +
self._user_grades(course_grade) +
self._user_cohort_group_names(user) +
self._user_experiment_group_names(user) +
self._user_team_names(user, bulk_context.teams) +
self._user_verification_mode(user, bulk_context.enrollments) +
self._user_certificate_info(user, course_grade, bulk_context.certs) +
[_user_enrollment_status(user, self.context.course_id)]
)
return success_rows, error_rows
def _user_grades(self, course_grade):
"""
Returns a list of grade results for the given course_grade corresponding
to the headers for this report.
"""
grade_results = []
for _, assignment_info in context.graded_assignments.items():
for _, assignment_info in self.context.graded_assignments.items():
subsection_grades, subsection_grades_results = self._user_subsection_grades(
course_grade,
assignment_info['subsection_headers'],
@@ -637,7 +623,10 @@ class CourseGradeReport:
subsection_grades.append(subsection_grade)
return subsection_grades, grade_results
def _user_assignment_average(self, course_grade, subsection_grades, assignment_info): # lint-amnesty, pylint: disable=missing-function-docstring
def _user_assignment_average(self, course_grade, subsection_grades, assignment_info):
"""
Returns grade averages for assignment types
"""
if assignment_info['separate_subsection_avg_headers']:
if assignment_info['grader']:
if course_grade.attempted:
@@ -650,25 +639,25 @@ class CourseGradeReport:
assignment_average = 0.0
return assignment_average
def _user_cohort_group_names(self, user, context):
def _user_cohort_group_names(self, user):
"""
Returns a list of names of cohort groups in which the given user
belongs.
"""
cohort_group_names = []
if context.cohorts_enabled:
group = get_cohort(user, context.course_id, assign=False, use_cached=True)
if self.context.cohorts_enabled:
group = get_cohort(user, self.context.course_id, assign=False, use_cached=True)
cohort_group_names.append(group.name if group else '')
return cohort_group_names
def _user_experiment_group_names(self, user, context):
def _user_experiment_group_names(self, user):
"""
Returns a list of names of course experiments in which the given user
belongs.
"""
experiment_group_names = []
for partition in context.course_experiments:
group = PartitionService(context.course_id).get_group(user, partition, assign=False)
for partition in self.context.course_experiments:
group = PartitionService(self.context.course_id).get_group(user, partition, assign=False)
experiment_group_names.append(group.name if group else '')
return experiment_group_names
@@ -681,12 +670,12 @@ class CourseGradeReport:
team_names = [bulk_teams.teams_by_user.get(user.id, '')]
return team_names
def _user_verification_mode(self, user, context, bulk_enrollments):
def _user_verification_mode(self, user, bulk_enrollments):
"""
Returns a list of enrollment-mode and verification-status for the
given user.
"""
enrollment_mode = CourseEnrollment.enrollment_mode_for_user(user, context.course_id)[0]
enrollment_mode = CourseEnrollment.enrollment_mode_for_user(user, self.context.course_id)[0]
verification_status = IDVerificationService.verification_status_for_user(
user,
enrollment_mode,
@@ -694,54 +683,32 @@ class CourseGradeReport:
)
return [enrollment_mode, verification_status]
def _user_certificate_info(self, user, context, course_grade, bulk_certs):
def _user_certificate_info(self, user, course_grade, bulk_certs):
"""
Returns the course certification information for the given user.
"""
is_allowlisted = user.id in bulk_certs.allowlisted_user_ids
certificate_info = certs_api.certificate_info_for_user(
user,
context.course_id,
self.context.course_id,
course_grade.letter_grade,
is_allowlisted,
bulk_certs.certificates_by_user.get(user.id),
)
return certificate_info
def _rows_for_users(self, context, users):
"""
Returns a list of rows for the given users for this report.
"""
with modulestore().bulk_operations(context.course_id):
bulk_context = _CourseGradeBulkContext(context, users)
success_rows, error_rows = [], []
for user, course_grade, error in CourseGradeFactory().iter(
users,
course=context.course,
collected_block_structure=context.course_structure,
course_key=context.course_id,
):
if not course_grade:
# An empty gradeset means we failed to grade a student.
error_rows.append([user.id, user.username, str(error)])
else:
success_rows.append(
[user.id, user.email, user.username] +
self._user_grades(course_grade, context) +
self._user_cohort_group_names(user, context) +
self._user_experiment_group_names(user, context) +
self._user_team_names(user, bulk_context.teams) +
self._user_verification_mode(user, context, bulk_context.enrollments) +
self._user_certificate_info(user, context, course_grade, bulk_context.certs) +
[_user_enrollment_status(user, context.course_id)]
)
return success_rows, error_rows
class InMemoryCourseGradeReport(CourseGradeReport, InMemoryReportMixin):
""" Course Grade Report that compiles and then uploads all rows at once """
class TempFileCourseGradeReport(CourseGradeReport, TemporaryFileReportMixin):
""" Course Grade Report that writes file iteratively to a TempFile to then be uploaded """
class ProblemGradeReport(GradeReportBase):
"""
Class to encapsulate functionality related to generating Problem Grade Reports.
Class to encapsulate functionality related to generating user/row had header data for Problem Grade Reports.
"""
@classmethod
@@ -752,34 +719,11 @@ class ProblemGradeReport(GradeReportBase):
with modulestore().bulk_operations(course_id):
context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name)
if use_on_disk_grade_reporting(course_id): # AU-926
# pylint: disable=protected-access
return TempFileProblemGradeReport()._generate(context)
return TempFileProblemGradeReport(context)._generate() # pylint: disable=protected-access
else:
# pylint: disable=protected-access
return ProblemGradeReport()._generate(context)
return InMemoryProblemGradeReport(context)._generate() # pylint: disable=protected-access
def _generate(self, context):
"""
Generate a CSV containing all students' problem grades within a given
`course_id`.
"""
context.update_status('ProblemGradeReport - 1: Starting problem grades')
success_headers = self._success_headers(context)
error_headers = self._error_headers()
batched_rows = self._batched_rows(context)
context.update_status('ProblemGradeReport - 2: Compiling grades')
success_rows, error_rows = self._compile(context, batched_rows)
context.update_status('ProblemGradeReport - 3: Uploading grades')
self._upload(context, [success_headers] + success_rows, [error_headers] + error_rows)
return context.update_status('ProblemGradeReport - 4: Completed problem grades')
def _problem_grades_header(self):
"""Problem Grade report header."""
return OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')])
def _success_headers(self, context):
def _success_headers(self):
"""
Returns headers for all gradable blocks including fixed headers
for report.
@@ -787,7 +731,7 @@ class ProblemGradeReport(GradeReportBase):
list: combined header and scorable blocks
"""
header_row = list(self._problem_grades_header().values()) + ['Enrollment Status', 'Grade']
return header_row + _flatten(list(context.graded_scorable_blocks_header.values()))
return header_row + _flatten(list(self.context.graded_scorable_blocks_header.values()))
def _error_headers(self):
"""
@@ -797,24 +741,21 @@ class ProblemGradeReport(GradeReportBase):
"""
return list(self._problem_grades_header().values()) + ['error_msg']
def _rows_for_users(self, context, users):
def _problem_grades_header(self):
"""Problem Grade report header."""
return OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')])
def _rows_for_users(self, users):
"""
Returns a list of rows for the given users for this report.
"""
self.log_additional_info_for_testing(context, 'ProblemGradeReport: Starting to process new user batch.')
success_rows, error_rows = [], []
success_rows_size, error_rows_size = 0, 0
for student, course_grade, error in CourseGradeFactory().iter(
users,
course=context.course,
collected_block_structure=context.course_structure,
course_key=context.course_id,
course=self.context.course,
collected_block_structure=self.context.course_structure,
course_key=self.context.course_id,
):
context.task_progress.attempted += 1
self.log_additional_info_for_testing(
context,
f'ProblemGradeReport: Attempt {context.task_progress.attempted}'
)
if not course_grade:
err_msg = str(error)
# There was an error grading this student.
@@ -824,17 +765,10 @@ class ProblemGradeReport(GradeReportBase):
[student.id, student.email, student.username] +
[err_msg]
)
error_rows_size += getsizeof(error_rows[-1])
context.task_progress.failed += 1
self.log_additional_info_for_testing(
context,
f'ProblemGradeReport: Failed {context.task_progress.failed}'
)
continue
self.log_additional_info_for_testing(context, 'ProblemGradeReport: Succeeded in reading grade')
earned_possible_values = []
for block_location in context.graded_scorable_blocks_header:
for block_location in self.context.graded_scorable_blocks_header:
try:
problem_score = course_grade.problem_scores[block_location]
except KeyError:
@@ -845,114 +779,26 @@ class ProblemGradeReport(GradeReportBase):
else:
earned_possible_values.append(['Not Attempted', problem_score.possible])
self.log_additional_info_for_testing(context, 'ProblemGradeReport: earned possible values done')
context.task_progress.succeeded += 1
enrollment_status = _user_enrollment_status(student, context.course_id)
self.log_additional_info_for_testing(
context,
f'ProblemGradeReport: Succeeded {context.task_progress.succeeded}'
)
enrollment_status = _user_enrollment_status(student, self.context.course_id)
success_rows.append(
[student.id, student.email, student.username] +
[enrollment_status, course_grade.percent] +
_flatten(earned_possible_values)
)
success_rows_size += getsizeof(success_rows[-1])
self.log_additional_info_for_testing(context, 'ProblemGradeReport: Added rows')
success_rows_size += getsizeof(success_rows)
error_rows_size += getsizeof(error_rows)
self.log_additional_info_for_testing(
context,
f'ProblemGradeReport memory usage: succeess {success_rows_size} error {error_rows_size}'
)
return success_rows, error_rows
def _batched_rows(self, context):
"""
A generator of batches of (success_rows, error_rows) for this report.
"""
for users in self._batch_users(context):
yield self._rows_for_users(context, users)
# Clear the CourseEnrollment caches after each batch of users has been processed
get_cache('get_enrollment').clear()
get_cache(CourseEnrollment.MODE_CACHE_NAMESPACE).clear()
def _clear_caches(self):
get_cache('get_enrollment').clear()
get_cache(CourseEnrollment.MODE_CACHE_NAMESPACE).clear()
class TempFileProblemGradeReport(ProblemGradeReport):
"""
ProblemGradeReport that instead of holding all resultant file info in memory,
writes chunked data to disk.
"""
def _generate(self, context):
"""
Generate a CSV containing all students' problem grades within a given `course_id`.
"""
context.update_status('TempFileProblemGradeReport - 1: Starting problem grades')
batched_rows = self._batched_rows(context)
class InMemoryProblemGradeReport(ProblemGradeReport, InMemoryReportMixin):
""" Program Grade Report that compiles and then uploads all rows at once """
with TemporaryFile('r+') as success_file, TemporaryFile('r+') as error_file:
context.update_status('TempFileProblemGradeReport - 2: Compiling grades into temp files')
has_errors = self.iter_and_write_batched_rows(context, success_file, error_file, batched_rows)
context.update_status('TempFileProblemGradeReport - 3: Uploading files')
self.upload_temp_files(context, success_file, error_file, has_errors)
return context.update_status('ProblemGradeReport - 4: Completed problem grades')
def iter_and_write_batched_rows(self, context, success_file, error_file, batched_rows):
"""
Iterate through batched rows, writing returned chunks to disk as we go.
This should hopefully help us avoid out of memory errors.
"""
context.task_progress.succeeded = 0
context.task_progress.failed = 0
success_writer = csv.writer(success_file)
error_writer = csv.writer(error_file)
# Write headers
success_writer.writerow(self._success_headers(context))
error_writer.writerow(self._error_headers())
# Iterate through batched rows, writing to temp file
for success_rows, error_rows in batched_rows:
context.task_progress.succeeded += len(success_rows)
success_writer.writerows(success_rows)
if len(error_rows) > 0:
context.task_progress.failed += len(error_rows)
error_writer.writerows(error_rows)
context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed
context.task_progress.total = context.task_progress.attempted
return context.task_progress.failed > 0
def upload_temp_files(self, context, success_file, error_file, has_errors):
"""
Uploads success and error csv files to report store
"""
date = datetime.now(UTC)
success_file.seek(0)
upload_csv_file_to_report_store(
success_file,
context.upload_filename,
context.course_id,
date,
parent_dir=context.upload_parent_dir
)
if has_errors:
error_file.seek(0)
upload_csv_file_to_report_store(
error_file,
context.upload_filename + '_err',
context.course_id,
date,
parent_dir=context.upload_parent_dir
)
class TempFileProblemGradeReport(ProblemGradeReport, TemporaryFileReportMixin):
""" Program Grade Report that writes file iteratively to a TempFile to then be uploaded """
class ProblemResponses:

View File

@@ -45,7 +45,6 @@ from lms.djangoapps.instructor_task.tasks_helper.grades import (
CourseGradeReport,
ProblemGradeReport,
ProblemResponses,
TempFileProblemGradeReport,
)
from lms.djangoapps.instructor_task.tasks_helper.misc import (
cohort_students_and_upload,
@@ -79,17 +78,21 @@ _TEAMS_CONFIG = TeamsConfig({
'max_size': 2,
'topics': [{'id': 'topic', 'name': 'Topic', 'description': 'A Topic'}],
})
USE_ON_DISK_GRADE_REPORT = 'lms.djangoapps.instructor_task.tasks_helper.grades.use_on_disk_grade_reporting'
class InstructorGradeReportTestCase(TestReportMixin, InstructorTaskCourseTestCase):
""" Base class for grade report tests. """
def _verify_cell_data_for_user(self, username, course_id, column_header, expected_cell_content, num_rows=2):
def _verify_cell_data_for_user(
self, username, course_id, column_header, expected_cell_content, num_rows=2, use_tempfile=False
):
"""
Verify cell data in the grades CSV for a particular user.
"""
with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'):
result = CourseGradeReport.generate(None, None, course_id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = CourseGradeReport.generate(None, None, course_id, {}, 'graded')
self.assertDictContainsSubset({'attempted': num_rows, 'succeeded': num_rows, 'failed': 0}, result)
report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD')
report_csv_filename = report_store.links_for(course_id)[0][0]
@@ -112,25 +115,28 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase):
super().setUp()
self.course = CourseFactory.create()
@ddt.data(['student@example.com', 'ni\xf1o@example.com'])
def test_unicode_emails(self, emails):
@ddt.data(True, False)
def test_unicode_emails(self, use_tempfile):
"""
Test that students with unicode characters in emails is handled.
"""
for i, email in enumerate(emails):
emails = ['student@example.com', 'ni\xf1o@example.com']
for i, email in enumerate(['student@example.com', 'ni\xf1o@example.com']):
self.create_student(f'student{i}', email)
self.current_task = Mock() # pylint: disable=attribute-defined-outside-init
self.current_task.update_state = Mock()
with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task') as mock_current_task:
mock_current_task.return_value = self.current_task
result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded')
num_students = len(emails)
self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result)
@ddt.data(True, False)
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
@patch('lms.djangoapps.grades.course_grade_factory.CourseGradeFactory.iter')
def test_grading_failure(self, mock_grades_iter, _mock_current_task):
def test_grading_failure(self, use_tempfile, mock_grades_iter, _mock_current_task):
"""
Test that any grading errors are properly reported in the
progress dict and uploaded to the report store.
@@ -138,7 +144,8 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase):
mock_grades_iter.return_value = [
(self.create_student('username', 'student@example.com'), None, TypeError('Cannot grade student'))
]
result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = CourseGradeReport.generate(None, None, self.course.id, {}, 'graded')
self.assertDictContainsSubset({'attempted': 1, 'succeeded': 0, 'failed': 1}, result)
report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD')
@@ -396,7 +403,7 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase):
with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'):
with check_mongo_calls(2):
with self.assertNumQueries(48):
with self.assertNumQueries(51):
CourseGradeReport.generate(None, None, course.id, {}, 'graded')
def test_inactive_enrollments(self):
@@ -422,6 +429,7 @@ class TestInstructorGradeReport(InstructorGradeReportTestCase):
)
@ddt.ddt
class TestTeamGradeReport(InstructorGradeReportTestCase):
""" Test that teams appear correctly in the grade report when it is enabled for the course. """
@@ -433,8 +441,11 @@ class TestTeamGradeReport(InstructorGradeReportTestCase):
self.student2 = UserFactory.create()
CourseEnrollment.enroll(self.student2, self.course.id)
def test_team_in_grade_report(self):
self._verify_cell_data_for_user(self.student1.username, self.course.id, 'Team Name', '')
@ddt.data(True, False)
def test_team_in_grade_report(self, use_tempfile):
self._verify_cell_data_for_user(
self.student1.username, self.course.id, 'Team Name', '', use_tempfile=use_tempfile
)
def test_correct_team_name_in_grade_report(self):
team1 = CourseTeamFactory.create(course_id=self.course.id)
@@ -794,13 +805,14 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
self.csv_header_row = ['Student ID', 'Email', 'Username', 'Enrollment Status', 'Grade']
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
def test_no_problems(self, problem_grade_report_class, _):
@ddt.data(True, False)
def test_no_problems(self, use_tempfile, _):
"""
Verify that we see no grade information for a course with no graded
problems.
"""
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result)
self.verify_rows_in_csv([
dict(list(zip(
@@ -814,8 +826,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
])
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
def test_single_problem(self, problem_grade_report_class, _):
@ddt.data(True, False)
def test_single_problem(self, use_tempfile, _):
vertical = ItemFactory.create(
parent_location=self.problem_section.location,
category='vertical',
@@ -825,7 +837,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
self.define_option_problem('Problem1', parent=vertical)
self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1'])
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result)
problem_name = 'Homework 1: Subsection - Problem1'
header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)']
@@ -853,8 +866,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
])
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
def test_single_problem_verified_student_only(self, problem_grade_report_class, _):
@ddt.data(True, False)
def test_single_problem_verified_student_only(self, use_tempfile, _):
with patch(
'lms.djangoapps.instructor_task.tasks_helper.grades.problem_grade_report_verified_only',
return_value=True,
@@ -870,14 +883,15 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1'])
self.submit_student_answer(student_verified.username, 'Problem1', ['Option 1'])
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
self.assertDictContainsSubset(
{'action_name': 'graded', 'attempted': 1, 'succeeded': 1, 'failed': 0}, result
)
@patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task')
@ddt.data(ProblemGradeReport, TempFileProblemGradeReport)
def test_inactive_enrollment_included(self, problem_grade_report_class, _):
@ddt.data(True, False)
def test_inactive_enrollment_included(self, use_tempfile, _):
"""
Students with inactive enrollments in a course should be included in Problem Grade Report.
"""
@@ -891,7 +905,8 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
self.define_option_problem('Problem1', parent=vertical)
self.submit_student_answer(self.student_1.username, 'Problem1', ['Option 1'])
result = problem_grade_report_class.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
self.assertDictContainsSubset({'action_name': 'graded', 'attempted': 3, 'succeeded': 3, 'failed': 0}, result)
problem_name = 'Homework 1: Subsection - Problem1'
header_row = self.csv_header_row + [problem_name + ' (Earned)', problem_name + ' (Possible)']
@@ -929,6 +944,7 @@ class TestProblemGradeReport(TestReportMixin, InstructorTaskModuleTestCase):
])
@ddt.ddt
class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent, InstructorTaskModuleTestCase):
"""
Test the problem report on a course that has split tests.
@@ -943,7 +959,8 @@ class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent,
self.define_option_problem(self.problem_a_url, parent=self.vertical_a)
self.define_option_problem(self.problem_b_url, parent=self.vertical_b)
def test_problem_grade_report(self):
@ddt.data(True, False)
def test_problem_grade_report(self, use_tempfile):
"""
Test that we generate the correct grade report when dealing with A/B tests.
@@ -962,7 +979,8 @@ class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent,
self.submit_student_answer(self.student_b.username, self.problem_b_url, [self.OPTION_1, self.OPTION_2])
with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
self.assertDictContainsSubset(
{'action_name': 'graded', 'attempted': 2, 'succeeded': 2, 'failed': 0}, result
)
@@ -1061,6 +1079,7 @@ class TestProblemReportSplitTestContent(TestReportMixin, TestConditionalContent,
assert self.get_csv_row_with_headers() == header_row
@ddt.ddt
class TestProblemReportCohortedContent(TestReportMixin, ContentGroupTestCase, InstructorTaskModuleTestCase):
"""
Test the problem report on a course that has cohorted content.
@@ -1104,7 +1123,8 @@ class TestProblemReportCohortedContent(TestReportMixin, ContentGroupTestCase, In
] + grade
)))
def test_cohort_content(self):
@ddt.data(True, False)
def test_cohort_content(self, use_tempfile):
self.submit_student_answer(self.alpha_user.username, 'Problem0', ['Option 1', 'Option 1'])
resp = self.submit_student_answer(self.alpha_user.username, 'Problem1', ['Option 1', 'Option 1'])
assert resp.status_code == 404
@@ -1114,7 +1134,8 @@ class TestProblemReportCohortedContent(TestReportMixin, ContentGroupTestCase, In
self.submit_student_answer(self.beta_user.username, 'Problem1', ['Option 1', 'Option 2'])
with patch('lms.djangoapps.instructor_task.tasks_helper.runner._get_current_task'):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
with patch(USE_ON_DISK_GRADE_REPORT, return_value=use_tempfile):
result = ProblemGradeReport.generate(None, None, self.course.id, {}, 'graded')
self.assertDictContainsSubset(
{'action_name': 'graded', 'attempted': 5, 'succeeded': 5, 'failed': 0}, result
)