initial work to optimise problem_grade_report code.
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
Functionality for generating grade reports.
|
||||
"""
|
||||
|
||||
|
||||
import logging
|
||||
import re
|
||||
from collections import OrderedDict, defaultdict
|
||||
@@ -69,6 +68,95 @@ def _flatten(iterable):
|
||||
return list(chain.from_iterable(iterable))
|
||||
|
||||
|
||||
class GradeReportBase(object):
|
||||
"""
|
||||
Base class for grade reports (ProblemGradeReport and CourseGradeReport).
|
||||
"""
|
||||
def _handle_empty_generator(self, generator, default):
|
||||
"""
|
||||
Handle empty generator.
|
||||
Return default if the generator is emtpy, otherwise return all
|
||||
its iterations (including the first which was used for validation).
|
||||
"""
|
||||
empty_generator_sentinel = object()
|
||||
first_iteration_output = next(generator, empty_generator_sentinel)
|
||||
generator_is_empty = first_iteration_output == empty_generator_sentinel
|
||||
|
||||
if generator_is_empty:
|
||||
yield default
|
||||
|
||||
else:
|
||||
yield first_iteration_output
|
||||
for element in generator:
|
||||
yield element
|
||||
|
||||
def _batch_users(self, context):
|
||||
"""
|
||||
Returns a generator of batches of users.
|
||||
"""
|
||||
|
||||
def grouper(iterable, chunk_size=100, fillvalue=None):
|
||||
args = [iter(iterable)] * chunk_size
|
||||
return zip_longest(*args, fillvalue=fillvalue)
|
||||
|
||||
def get_enrolled_learners_for_course(course_id, verified_only=False):
|
||||
"""
|
||||
Get all the enrolled users in a course chunk by chunk.
|
||||
|
||||
This generator method fetches & loads the enrolled user objects on demand which in chunk
|
||||
size defined. This method is a workaround to avoid out-of-memory errors.
|
||||
"""
|
||||
filter_kwargs = {
|
||||
'courseenrollment__course_id': course_id,
|
||||
}
|
||||
if verified_only:
|
||||
filter_kwargs['courseenrollment__mode'] = CourseMode.VERIFIED
|
||||
|
||||
user_ids_list = get_user_model().objects.filter(**filter_kwargs).values_list('id', flat=True).order_by('id')
|
||||
user_chunks = grouper(user_ids_list)
|
||||
for user_ids in user_chunks:
|
||||
user_ids = [user_id for user_id in user_ids if user_id is not None]
|
||||
min_id = min(user_ids)
|
||||
max_id = max(user_ids)
|
||||
users = get_user_model().objects.filter(
|
||||
id__gte=min_id,
|
||||
id__lte=max_id,
|
||||
**filter_kwargs
|
||||
).select_related('profile')
|
||||
yield users
|
||||
|
||||
course_id = context.course_id
|
||||
|
||||
report_for_verified_only = generate_grade_report_for_verified_only()
|
||||
return get_enrolled_learners_for_course(course_id=course_id, verified_only=report_for_verified_only)
|
||||
|
||||
def _compile(self, context, batched_rows):
|
||||
"""
|
||||
Compiles and returns the complete list of (success_rows, error_rows) for
|
||||
the given batched_rows and context.
|
||||
"""
|
||||
# partition and chain successes and errors
|
||||
success_rows, error_rows = zip(*batched_rows)
|
||||
success_rows = list(chain(*success_rows))
|
||||
error_rows = list(chain(*error_rows))
|
||||
|
||||
# update metrics on task status
|
||||
context.task_progress.succeeded = len(success_rows)
|
||||
context.task_progress.failed = len(error_rows)
|
||||
context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed
|
||||
context.task_progress.total = context.task_progress.attempted
|
||||
return success_rows, error_rows
|
||||
|
||||
def _upload(self, context, success_rows, error_rows):
|
||||
"""
|
||||
Creates and uploads a CSV for the given headers and rows.
|
||||
"""
|
||||
date = datetime.now(UTC)
|
||||
upload_csv_to_report_store(success_rows, context.file_name, context.course_id, date)
|
||||
if len(error_rows) > 1:
|
||||
upload_csv_to_report_store(error_rows, context.file_name + '_err', context.course_id, date)
|
||||
|
||||
|
||||
class _CourseGradeReportContext(object):
|
||||
"""
|
||||
Internal class that provides a common context to use for a single grade
|
||||
@@ -76,6 +164,7 @@ class _CourseGradeReportContext(object):
|
||||
elements of this context are serialized and parsed across process
|
||||
boundaries.
|
||||
"""
|
||||
|
||||
def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
|
||||
self.task_info_string = (
|
||||
u'Task: {task_id}, '
|
||||
@@ -156,6 +245,41 @@ class _CourseGradeReportContext(object):
|
||||
return self.task_progress.update_task_state(extra_meta={'step': message})
|
||||
|
||||
|
||||
class _ProblemGradeReportContext(object):
|
||||
"""
|
||||
Internal class that provides a common context to use for a single problem
|
||||
grade report. When a report is parallelized across multiple processes,
|
||||
elements of this context are serialized and parsed across process
|
||||
boundaries.
|
||||
"""
|
||||
|
||||
def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
|
||||
self.task_info_string = (
|
||||
'Task: {task_id}, '
|
||||
'InstructorTask ID: {entry_id}, '
|
||||
'Course: {course_id}, '
|
||||
'Input: {task_input}'
|
||||
).format(
|
||||
task_id=_xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None,
|
||||
entry_id=_entry_id,
|
||||
course_id=course_id,
|
||||
task_input=_task_input,
|
||||
)
|
||||
self.action_name = action_name
|
||||
self.course_id = course_id
|
||||
self.task_progress = TaskProgress(self.action_name, total=None, start_time=time())
|
||||
self.course = get_course_by_id(self.course_id)
|
||||
self.file_name = 'problem_grade_report'
|
||||
|
||||
def update_status(self, message):
|
||||
"""
|
||||
Updates the status on the celery task to the given message.
|
||||
Also logs the update.
|
||||
"""
|
||||
TASK_LOG.info('%s, Task type: %s, %s', self.task_info_string, self.action_name, message)
|
||||
return self.task_progress.update_task_state(extra_meta={'step': message})
|
||||
|
||||
|
||||
class _CertificateBulkContext(object):
|
||||
def __init__(self, context, users):
|
||||
certificate_whitelist = CertificateWhitelist.objects.filter(course_id=context.course_id, whitelist=True)
|
||||
@@ -302,6 +426,7 @@ class CourseGradeReport(object):
|
||||
"""
|
||||
Returns a generator of batches of users.
|
||||
"""
|
||||
|
||||
def grouper(iterable, chunk_size=self.USER_BATCH_SIZE, fillvalue=None):
|
||||
args = [iter(iterable)] * chunk_size
|
||||
return zip_longest(*args, fillvalue=fillvalue)
|
||||
@@ -519,106 +644,70 @@ class CourseGradeReport(object):
|
||||
return success_rows, error_rows
|
||||
|
||||
|
||||
class ProblemGradeReport(object):
|
||||
class ProblemGradeReport(GradeReportBase):
|
||||
"""
|
||||
Class to encapsulate functionality related to generating Problem Grade Reports.
|
||||
"""
|
||||
@classmethod
|
||||
def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
|
||||
"""
|
||||
Public method to generate a grade report.
|
||||
"""
|
||||
with modulestore().bulk_operations(course_id):
|
||||
context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name)
|
||||
# pylint: disable=protected-access
|
||||
return ProblemGradeReport()._generate(context)
|
||||
|
||||
def _generate(self, context):
|
||||
"""
|
||||
Generate a CSV containing all students' problem grades within a given
|
||||
`course_id`.
|
||||
"""
|
||||
|
||||
def log_task_info(message):
|
||||
"""
|
||||
Updates the status on the celery task to the given message.
|
||||
Also logs the update.
|
||||
"""
|
||||
fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
|
||||
task_info_string = fmt.format(
|
||||
task_id=task_id, entry_id=_entry_id, course_id=course_id, task_input=_task_input
|
||||
)
|
||||
TASK_LOG.info(u'%s, Task type: %s, %s, %s', task_info_string, action_name, message, task_progress.state)
|
||||
|
||||
start_time = time()
|
||||
start_date = datetime.now(UTC)
|
||||
status_interval = 100
|
||||
task_id = _xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None
|
||||
|
||||
report_for_verified_only = generate_grade_report_for_verified_only()
|
||||
enrolled_students = CourseEnrollment.objects.users_enrolled_in(
|
||||
course_id=course_id,
|
||||
include_inactive=True,
|
||||
verified_only=report_for_verified_only,
|
||||
)
|
||||
task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)
|
||||
|
||||
# This struct encapsulates both the display names of each static item in the
|
||||
# header row as values as well as the django User field names of those items
|
||||
# as the keys. It is structured in this way to keep the values related.
|
||||
context.update_status('ProblemGradeReport Step 1: Starting problem grades')
|
||||
course = get_course_by_id(context.course_id)
|
||||
header_row = OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')])
|
||||
context.update_status('ProblemGradeReport Step 2: Retrieving scorable grade blocks for course')
|
||||
graded_scorable_blocks = self._graded_scorable_blocks_to_header(course)
|
||||
context.update_status('ProblemGradeReport Step 3: Setting up headers for problem grade report')
|
||||
success_headers = self._success_headers(header_row, graded_scorable_blocks)
|
||||
error_headers = self._error_headers(header_row)
|
||||
|
||||
course = get_course_by_id(course_id)
|
||||
log_task_info(u'Retrieving graded scorable blocks')
|
||||
graded_scorable_blocks = cls._graded_scorable_blocks_to_header(course)
|
||||
context.update_status('ProblemGradeReport Step 4: Retrieving problem scores for course for enrolled users')
|
||||
generated_rows = self._batched_rows(context, header_row, graded_scorable_blocks)
|
||||
success_rows, error_rows = self._compile(context, generated_rows)
|
||||
context.update_status('ProblemGradeReport Step 5: Uploading data to CSV report file')
|
||||
self._upload(context, [success_headers] + success_rows, [error_headers] + error_rows)
|
||||
|
||||
# Just generate the static fields for now.
|
||||
rows = [
|
||||
list(header_row.values()) + ['Enrollment Status', 'Grade'] + _flatten(list(graded_scorable_blocks.values()))
|
||||
]
|
||||
error_rows = [list(header_row.values()) + ['error_msg']]
|
||||
return context.update_status('ProblemGradeReport Step 6: Completed problem grades report')
|
||||
|
||||
# Bulk fetch and cache enrollment states so we can efficiently determine
|
||||
# whether each user is currently enrolled in the course.
|
||||
log_task_info(u'Fetching enrollment status')
|
||||
CourseEnrollment.bulk_fetch_enrollment_states(enrolled_students, course_id)
|
||||
def _success_headers(self, header_row, graded_scorable_blocks):
|
||||
"""
|
||||
Returns headers for all gradable blocks including fixed headers
|
||||
for report.
|
||||
|
||||
for student, course_grade, error in CourseGradeFactory().iter(enrolled_students, course):
|
||||
student_fields = [getattr(student, field_name) for field_name in header_row]
|
||||
task_progress.attempted += 1
|
||||
Arguments:
|
||||
header_row (dict): list of header values
|
||||
graded_scorable_blocks (dict): dict of graded_scorable_blocks
|
||||
|
||||
if not course_grade:
|
||||
err_msg = text_type(error)
|
||||
# There was an error grading this student.
|
||||
if not err_msg:
|
||||
err_msg = u'Unknown error'
|
||||
error_rows.append(student_fields + [err_msg])
|
||||
task_progress.failed += 1
|
||||
continue
|
||||
Returns:
|
||||
list: combined header and scorable blocks
|
||||
"""
|
||||
header_row = list(header_row.values()) + ['Enrollment Status', 'Grade']
|
||||
return header_row + _flatten(list(graded_scorable_blocks.values()))
|
||||
|
||||
enrollment_status = _user_enrollment_status(student, course_id)
|
||||
def _error_headers(self, header_row):
|
||||
"""
|
||||
Returns error headers for error report.
|
||||
|
||||
earned_possible_values = []
|
||||
for block_location in graded_scorable_blocks:
|
||||
try:
|
||||
problem_score = course_grade.problem_scores[block_location]
|
||||
except KeyError:
|
||||
earned_possible_values.append([u'Not Available', u'Not Available'])
|
||||
else:
|
||||
if problem_score.first_attempted:
|
||||
earned_possible_values.append([problem_score.earned, problem_score.possible])
|
||||
else:
|
||||
earned_possible_values.append([u'Not Attempted', problem_score.possible])
|
||||
Arguments:
|
||||
header_row (dict): list of header values
|
||||
|
||||
rows.append(student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values))
|
||||
Returns:
|
||||
list: error headers
|
||||
"""
|
||||
return list(header_row.values()) + ['error_msg']
|
||||
|
||||
task_progress.succeeded += 1
|
||||
if task_progress.attempted % status_interval == 0:
|
||||
step = u'Calculating Grades'
|
||||
task_progress.update_task_state(extra_meta={'step': step})
|
||||
log_message = u'{0} {1}/{2}'.format(step, task_progress.attempted, task_progress.total)
|
||||
log_task_info(log_message)
|
||||
|
||||
log_task_info('Uploading CSV to store')
|
||||
# Perform the upload if any students have been successfully graded
|
||||
if len(rows) > 1:
|
||||
upload_csv_to_report_store(rows, 'problem_grade_report', course_id, start_date)
|
||||
# If there are any error rows, write them out as well
|
||||
if len(error_rows) > 1:
|
||||
upload_csv_to_report_store(error_rows, 'problem_grade_report_err', course_id, start_date)
|
||||
|
||||
return task_progress.update_task_state(extra_meta={'step': 'Uploading CSV'})
|
||||
|
||||
@classmethod
|
||||
def _graded_scorable_blocks_to_header(cls, course):
|
||||
def _graded_scorable_blocks_to_header(self, course):
|
||||
"""
|
||||
Returns an OrderedDict that maps a scorable block's id to its
|
||||
headers in the final report.
|
||||
@@ -629,8 +718,8 @@ class ProblemGradeReport(object):
|
||||
for subsection_index, subsection_info in enumerate(subsection_infos, start=1):
|
||||
for scorable_block in subsection_info['scored_descendants']:
|
||||
header_name = (
|
||||
u"{assignment_type} {subsection_index}: "
|
||||
u"{subsection_name} - {scorable_block_name}"
|
||||
"{assignment_type} {subsection_index}: "
|
||||
"{subsection_name} - {scorable_block_name}"
|
||||
).format(
|
||||
scorable_block_name=scorable_block.display_name,
|
||||
assignment_type=assignment_type_name,
|
||||
@@ -641,9 +730,53 @@ class ProblemGradeReport(object):
|
||||
header_name + " (Possible)"]
|
||||
return scorable_blocks_map
|
||||
|
||||
def _rows_for_users(self, context, users, header_row, graded_scorable_blocks):
|
||||
"""
|
||||
Returns a list of rows for the given users for this report.
|
||||
"""
|
||||
success_rows, error_rows = [], []
|
||||
for student, course_grade, error in CourseGradeFactory().iter(users, context.course):
|
||||
student_fields = [getattr(student, field_name) for field_name in header_row]
|
||||
|
||||
if not course_grade:
|
||||
err_msg = text_type(error)
|
||||
# There was an error grading this student.
|
||||
if not err_msg:
|
||||
err_msg = 'Unknown error'
|
||||
error_rows.append(student_fields + [err_msg])
|
||||
continue
|
||||
|
||||
enrollment_status = _user_enrollment_status(student, context.course_id)
|
||||
|
||||
earned_possible_values = []
|
||||
for block_location in graded_scorable_blocks:
|
||||
try:
|
||||
problem_score = course_grade.problem_scores[block_location]
|
||||
except KeyError:
|
||||
earned_possible_values.append(['Not Available', 'Not Available'])
|
||||
else:
|
||||
if problem_score.first_attempted:
|
||||
earned_possible_values.append([problem_score.earned, problem_score.possible])
|
||||
else:
|
||||
earned_possible_values.append(['Not Attempted', problem_score.possible])
|
||||
|
||||
success_rows.append(
|
||||
student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values))
|
||||
|
||||
return success_rows, error_rows
|
||||
|
||||
def _batched_rows(self, context, header_row, graded_scorable_blocks):
|
||||
"""
|
||||
A generator of batches of (success_rows, error_rows) for this report.
|
||||
"""
|
||||
for users in self._handle_empty_generator(self._batch_users(context), default=[]):
|
||||
yield self._rows_for_users(context, users, header_row, graded_scorable_blocks)
|
||||
|
||||
|
||||
class ProblemResponses(object):
|
||||
|
||||
"""
|
||||
Class to encapsulate functionality related to generating Problem Responses Reports.
|
||||
"""
|
||||
@classmethod
|
||||
def _build_problem_list(cls, course_blocks, root, path=None):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user