Files
2024-08-29 18:47:54 +05:00

377 lines
15 KiB
Python

"""
This module contains tasks for asynchronous execution of grade updates.
"""
from logging import getLogger
from celery import shared_task
from celery_utils.persist_on_failure import LoggedPersistOnFailureTask
from django.conf import settings
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
from django.core.exceptions import ValidationError
from django.db.utils import DatabaseError
from edx_django_utils.monitoring import (
set_code_owner_attribute,
set_custom_attribute,
set_custom_attributes_for_course_key
)
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import CourseLocator
from submissions import api as sub_api
from common.djangoapps.student.models import CourseEnrollment
from common.djangoapps.track.event_transaction_utils import set_event_transaction_id, set_event_transaction_type
from common.djangoapps.util.date_utils import from_timestamp
from lms.djangoapps.course_blocks.api import get_course_blocks
from lms.djangoapps.courseware.model_data import get_score
from lms.djangoapps.grades.config.models import ComputeGradesSetting
from openedx.core.djangoapps.content.block_structure.api import clear_course_from_cache
from openedx.core.djangoapps.content.block_structure.exceptions import UsageKeyNotInBlockStructure
from openedx.core.djangoapps.content.course_overviews.models import \
CourseOverview # lint-amnesty, pylint: disable=unused-import
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
from .config.waffle import DISABLE_REGRADE_ON_POLICY_CHANGE
from .constants import ScoreDatabaseTableEnum
from .course_grade_factory import CourseGradeFactory
from .exceptions import ScoreNotFoundError
from .grade_utils import are_grades_frozen
from .signals.signals import SUBSECTION_SCORE_CHANGED
from .subsection_grade_factory import SubsectionGradeFactory
from .transformer import GradesTransformer
log = getLogger(__name__)
COURSE_GRADE_TIMEOUT_SECONDS = 1200
KNOWN_RETRY_ERRORS = ( # Errors we expect occasionally, should be resolved on retry
DatabaseError,
ValidationError,
ScoreNotFoundError,
UsageKeyNotInBlockStructure,
)
RECALCULATE_GRADE_DELAY_SECONDS = 2 # to prevent excessive _has_db_updated failures. See TNL-6424.
RETRY_DELAY_SECONDS = 40
SUBSECTION_GRADE_TIMEOUT_SECONDS = 300
@shared_task(base=LoggedPersistOnFailureTask)
@set_code_owner_attribute
def compute_all_grades_for_course(**kwargs):
"""
Compute grades for all students in the specified course.
Kicks off a series of compute_grades_for_course_v2 tasks
to cover all of the students in the course.
"""
if DISABLE_REGRADE_ON_POLICY_CHANGE.is_enabled():
log.debug('Grades: ignoring policy change regrade due to waffle switch')
else:
course_key = CourseKey.from_string(kwargs.pop('course_key'))
if are_grades_frozen(course_key):
log.info("Attempted compute_all_grades_for_course for course '%s', but grades are frozen.", course_key)
return
for course_key_string, offset, batch_size in _course_task_args(course_key=course_key, **kwargs):
kwargs.update({
'course_key': course_key_string,
'offset': offset,
'batch_size': batch_size,
})
compute_grades_for_course_v2.apply_async(
kwargs=kwargs, queue=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
)
@shared_task(
bind=True,
base=LoggedPersistOnFailureTask,
default_retry_delay=RETRY_DELAY_SECONDS,
max_retries=1,
time_limit=COURSE_GRADE_TIMEOUT_SECONDS,
rate_limit=settings.POLICY_CHANGE_TASK_RATE_LIMIT,
)
@set_code_owner_attribute
def compute_grades_for_course_v2(self, **kwargs):
"""
Compute grades for a set of students in the specified course.
The set of students will be determined by the order of enrollment date, and
limited to at most <batch_size> students, starting from the specified
offset.
TODO: Roll this back into compute_grades_for_course once all workers have
the version with **kwargs.
"""
if 'event_transaction_id' in kwargs:
set_event_transaction_id(kwargs['event_transaction_id'])
if 'event_transaction_type' in kwargs:
set_event_transaction_type(kwargs['event_transaction_type'])
try:
return compute_grades_for_course(kwargs['course_key'], kwargs['offset'], kwargs['batch_size'])
except Exception as exc:
raise self.retry(kwargs=kwargs, exc=exc)
@shared_task(base=LoggedPersistOnFailureTask)
@set_code_owner_attribute
def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pylint: disable=unused-argument
"""
Compute and save grades for a set of students in the specified course.
The set of students will be determined by the order of enrollment date, and
limited to at most <batch_size> students, starting from the specified
offset.
"""
course_key = CourseKey.from_string(course_key)
if are_grades_frozen(course_key):
log.info("Attempted compute_grades_for_course for course '%s', but grades are frozen.", course_key)
return
enrollments = CourseEnrollment.objects.filter(course_id=course_key).order_by('created')
student_iter = (enrollment.user for enrollment in enrollments[offset:offset + batch_size])
for result in CourseGradeFactory().iter(users=student_iter, course_key=course_key, force_update=True):
if result.error is not None:
raise result.error
@shared_task(
bind=True,
base=LoggedPersistOnFailureTask,
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS,
)
@set_code_owner_attribute
def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint: disable=unused-argument
"""
Recalculates the course grade and all subsection grades
for the given ``user`` and ``course_key`` keyword arguments.
"""
if 'lms.djangoapps.grades.apps.GradesConfig' not in settings.INSTALLED_APPS:
# This task errors when run in-process during Studio tests, just skip it
return
user_id = kwargs.get('user_id')
course_key_str = kwargs.get('course_key')
if not (user_id or course_key_str):
message = 'recalculate_course_and_subsection_grades_for_user missing "user" or "course_key" kwargs from {}'
raise Exception(message.format(kwargs))
user = User.objects.get(id=user_id)
course_key = CourseKey.from_string(course_key_str)
if are_grades_frozen(course_key):
log.info(
"Attempted recalculate_course_and_subsection_grades_for_user for course '%s', but grades are frozen.",
course_key,
)
return
previous_course_grade = CourseGradeFactory().read(user, course_key=course_key)
if previous_course_grade and previous_course_grade.attempted:
CourseGradeFactory().update(
user=user,
course_key=course_key,
force_update_subsections=True
)
@shared_task(
bind=True,
base=LoggedPersistOnFailureTask,
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS
)
@set_code_owner_attribute
def recalculate_subsection_grade_v3(self, **kwargs):
"""
Latest version of the recalculate_subsection_grade task. See docstring
for _recalculate_subsection_grade for further description.
"""
_recalculate_subsection_grade(self, **kwargs)
def _recalculate_subsection_grade(self, **kwargs):
"""
Updates a saved subsection grade.
Keyword Arguments:
user_id (int): id of applicable User object
anonymous_user_id (int, OPTIONAL): Anonymous ID of the User
course_id (string): identifying the course
usage_id (string): identifying the course block
only_if_higher (boolean): indicating whether grades should
be updated only if the new raw_earned is higher than the
previous value.
expected_modified_time (serialized timestamp): indicates when the task
was queued so that we can verify the underlying data update.
score_deleted (boolean): indicating whether the grade change is
a result of the problem's score being deleted.
event_transaction_id (string): uuid identifying the current
event transaction.
event_transaction_type (string): human-readable type of the
event at the root of the current event transaction.
score_db_table (ScoreDatabaseTableEnum): database table that houses
the changed score. Used in conjunction with expected_modified_time.
"""
try:
course_key = CourseLocator.from_string(kwargs['course_id'])
if are_grades_frozen(course_key):
log.info("Attempted _recalculate_subsection_grade for course '%s', but grades are frozen.", course_key)
return
scored_block_usage_key = UsageKey.from_string(kwargs['usage_id']).replace(course_key=course_key)
set_custom_attributes_for_course_key(course_key)
set_custom_attribute('usage_id', str(scored_block_usage_key))
# The request cache is not maintained on celery workers,
# where this code runs. So we take the values from the
# main request cache and store them in the local request
# cache. This correlates model-level grading events with
# higher-level ones.
set_event_transaction_id(kwargs.get('event_transaction_id'))
set_event_transaction_type(kwargs.get('event_transaction_type'))
# Verify the database has been updated with the scores when the task was
# created. This race condition occurs if the transaction in the task
# creator's process hasn't committed before the task initiates in the worker
# process.
has_database_updated = _has_db_updated_with_new_score(self, scored_block_usage_key, **kwargs)
if not has_database_updated:
raise ScoreNotFoundError
_update_subsection_grades(
course_key,
scored_block_usage_key,
kwargs['only_if_higher'],
kwargs['user_id'],
kwargs['score_deleted'],
kwargs.get('force_update_subsections', False),
)
except Exception as exc:
if not isinstance(exc, KNOWN_RETRY_ERRORS):
log.info("tnl-6244 grades unexpected failure: {}. task id: {}. kwargs={}".format(
repr(exc),
self.request.id,
kwargs,
))
raise self.retry(kwargs=kwargs, exc=exc)
def _has_db_updated_with_new_score(self, scored_block_usage_key, **kwargs):
"""
Returns whether the database has been updated with the
expected new score values for the given problem and user.
"""
if kwargs['score_db_table'] == ScoreDatabaseTableEnum.courseware_student_module:
score = get_score(kwargs['user_id'], scored_block_usage_key)
found_modified_time = score.modified if score is not None else None
elif kwargs['score_db_table'] == ScoreDatabaseTableEnum.submissions:
score = sub_api.get_score(
{
"student_id": kwargs['anonymous_user_id'],
"course_id": str(scored_block_usage_key.course_key),
"item_id": str(scored_block_usage_key),
}
)
found_modified_time = score['created_at'] if score is not None else None
else:
assert kwargs['score_db_table'] == ScoreDatabaseTableEnum.overrides
from . import api
score = api.get_subsection_grade_override(
user_id=kwargs['user_id'],
course_key_or_id=kwargs['course_id'],
usage_key_or_id=kwargs['usage_id']
)
found_modified_time = score.modified if score is not None else None
if score is None:
# score should be None only if it was deleted.
# Otherwise, it hasn't yet been saved.
db_is_updated = kwargs['score_deleted']
else:
db_is_updated = found_modified_time >= from_timestamp(kwargs['expected_modified_time'])
if not db_is_updated:
log.info(
"Grades: tasks._has_database_updated_with_new_score is False. Task ID: {}. Kwargs: {}. Found "
"modified time: {}".format(
self.request.id,
kwargs,
found_modified_time,
)
)
return db_is_updated
def _update_subsection_grades(
course_key, scored_block_usage_key, only_if_higher, user_id, score_deleted, force_update_subsections=False
):
"""
A helper function to update subsection grades in the database
for each subsection containing the given block, and to signal
that those subsection grades were updated.
"""
student = User.objects.get(id=user_id)
store = modulestore()
with store.bulk_operations(course_key):
course_usage_key = store.make_course_usage_key(course_key)
course_structure = get_course_blocks(student, course_usage_key)
subsections_to_update = course_structure.get_transformer_block_field(
scored_block_usage_key,
GradesTransformer,
'subsections',
set(),
)
# Clear the course cache if access is restricted and course blocks are
# cached without the restricted blocks.
if not subsections_to_update:
clear_course_from_cache(course_usage_key.course_key)
raise UsageKeyNotInBlockStructure(
"Scored block usage_key '{0}' is not found in the block_structure with root '{1}'".format(
str(scored_block_usage_key),
str(course_usage_key)
)
)
course = store.get_course(course_key, depth=0)
subsection_grade_factory = SubsectionGradeFactory(student, course, course_structure)
for subsection_usage_key in subsections_to_update:
if subsection_usage_key in course_structure:
subsection_grade = subsection_grade_factory.update(
course_structure[subsection_usage_key],
only_if_higher,
score_deleted,
force_update_subsections,
)
SUBSECTION_SCORE_CHANGED.send(
sender=None,
course=course,
course_structure=course_structure,
user=student,
subsection_grade=subsection_grade,
)
def _course_task_args(course_key, **kwargs):
"""
Helper function to generate course-grade task args.
"""
from_settings = kwargs.pop('from_settings', True)
enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count()
if enrollment_count == 0:
log.warning(f"No enrollments found for {course_key}")
if from_settings is False:
batch_size = kwargs.pop('batch_size', 100)
else:
batch_size = ComputeGradesSetting.current().batch_size
for offset in range(0, enrollment_count, batch_size):
yield (str(course_key), offset, batch_size)