From 266bfb1ab3162c1fbaa29fe0a98cb8bbc7da50d1 Mon Sep 17 00:00:00 2001 From: Nimisha Asthagiri Date: Tue, 8 Nov 2016 09:56:25 -0500 Subject: [PATCH] Optimize async grades computation TNL-5909 --- lms/djangoapps/grades/new/course_grade.py | 3 +- lms/djangoapps/grades/new/subsection_grade.py | 50 +++------- lms/djangoapps/grades/signals/handlers.py | 15 ++- lms/djangoapps/grades/signals/signals.py | 1 + lms/djangoapps/grades/tasks.py | 94 ++++++++----------- lms/djangoapps/grades/tests/test_tasks.py | 73 ++------------ openedx/core/djangoapps/performance/utils.py | 44 +++++++++ 7 files changed, 110 insertions(+), 170 deletions(-) create mode 100644 openedx/core/djangoapps/performance/utils.py diff --git a/lms/djangoapps/grades/new/course_grade.py b/lms/djangoapps/grades/new/course_grade.py index 8efef24382..958f084312 100644 --- a/lms/djangoapps/grades/new/course_grade.py +++ b/lms/djangoapps/grades/new/course_grade.py @@ -340,11 +340,10 @@ class CourseGradeFactory(object): self._compute_and_update_grade(course, course_structure, read_only) ) - def update(self, course): + def update(self, course, course_structure): """ Updates the CourseGrade for this Factory's student. """ - course_structure = get_course_blocks(self.student, course.location) self._compute_and_update_grade(course, course_structure) def get_persisted(self, course): diff --git a/lms/djangoapps/grades/new/subsection_grade.py b/lms/djangoapps/grades/new/subsection_grade.py index 33623009e0..1ab85d3ccf 100644 --- a/lms/djangoapps/grades/new/subsection_grade.py +++ b/lms/djangoapps/grades/new/subsection_grade.py @@ -12,7 +12,7 @@ from lms.djangoapps.grades.scores import get_score, possibly_scored from lms.djangoapps.grades.models import BlockRecord, PersistentSubsectionGrade from lms.djangoapps.grades.config.models import PersistentGradesEnabledFlag from openedx.core.lib.grade_utils import is_score_higher -from student.models import anonymous_id_for_user, User +from student.models import anonymous_id_for_user from submissions import api as submissions_api from traceback import format_exc from xmodule import block_metadata_utils, graders @@ -49,7 +49,7 @@ class SubsectionGrade(object): self.due = getattr(subsection, 'due', None) self.graded = getattr(subsection, 'graded', False) - self.course_version = getattr(course, 'course_version', None) + self.course_version = getattr(subsection, 'course_version', None) self.subtree_edited_timestamp = subsection.subtree_edited_on self.graded_total = None # aggregated grade for all graded problems @@ -211,24 +211,20 @@ class SubsectionGradeFactory(object): self._cached_subsection_grades = None self._unsaved_subsection_grades = [] - def create(self, subsection, block_structure=None, read_only=False): + def create(self, subsection, read_only=False): """ Returns the SubsectionGrade object for the student and subsection. - If block_structure is provided, uses it for finding and computing - the grade instead of the course_structure passed in earlier. - If read_only is True, doesn't save any updates to the grades. """ self._log_event( - log.debug, u"create, read_only: {0}, subsection: {1}".format(read_only, subsection.location) + log.debug, u"create, read_only: {0}, subsection: {1}".format(read_only, subsection.location), subsection, ) - block_structure = self._get_block_structure(block_structure) - subsection_grade = self._get_bulk_cached_grade(subsection, block_structure) + subsection_grade = self._get_bulk_cached_grade(subsection) if not subsection_grade: subsection_grade = SubsectionGrade(subsection, self.course).init_from_structure( - self.student, block_structure, self._submissions_scores, self._csm_scores, + self.student, self.course_structure, self._submissions_scores, self._csm_scores, ) if PersistentGradesEnabledFlag.feature_enabled(self.course.id): if read_only: @@ -243,13 +239,11 @@ class SubsectionGradeFactory(object): """ Bulk creates all the unsaved subsection_grades to this point. """ - self._log_event(log.debug, u"bulk_create_unsaved") - with persistence_safe_fallback(): SubsectionGrade.bulk_create_models(self.student, self._unsaved_subsection_grades, self.course.id) self._unsaved_subsection_grades = [] - def update(self, subsection, block_structure=None, only_if_higher=None): + def update(self, subsection, only_if_higher=None): """ Updates the SubsectionGrade object for the student and subsection. """ @@ -258,11 +252,10 @@ class SubsectionGradeFactory(object): if not PersistentGradesEnabledFlag.feature_enabled(self.course.id): return - self._log_event(log.warning, u"update, subsection: {}".format(subsection.location)) + self._log_event(log.warning, u"update, subsection: {}".format(subsection.location), subsection) - block_structure = self._get_block_structure(block_structure) calculated_grade = SubsectionGrade(subsection, self.course).init_from_structure( - self.student, block_structure, self._submissions_scores, self._csm_scores, + self.student, self.course_structure, self._submissions_scores, self._csm_scores, ) if only_if_higher: @@ -272,7 +265,7 @@ class SubsectionGradeFactory(object): pass else: orig_subsection_grade = SubsectionGrade(subsection, self.course).init_from_model( - self.student, grade_model, block_structure, self._submissions_scores, self._csm_scores, + self.student, grade_model, self.course_structure, self._submissions_scores, self._csm_scores, ) if not is_score_higher( orig_subsection_grade.graded_total.earned, @@ -304,7 +297,7 @@ class SubsectionGradeFactory(object): anonymous_user_id = anonymous_id_for_user(self.student, self.course.id) return submissions_api.get_scores(unicode(self.course.id), anonymous_user_id) - def _get_bulk_cached_grade(self, subsection, block_structure): # pylint: disable=unused-argument + def _get_bulk_cached_grade(self, subsection): """ Returns the student's SubsectionGrade for the subsection, while caching the results of a bulk retrieval for the @@ -318,7 +311,7 @@ class SubsectionGradeFactory(object): subsection_grade = saved_subsection_grades.get(subsection.location) if subsection_grade: return SubsectionGrade(subsection, self.course).init_from_model( - self.student, subsection_grade, block_structure, self._submissions_scores, self._csm_scores, + self.student, subsection_grade, self.course_structure, self._submissions_scores, self._csm_scores, ) def _get_bulk_cached_subsection_grades(self): @@ -342,27 +335,14 @@ class SubsectionGradeFactory(object): if self._cached_subsection_grades is not None: self._cached_subsection_grades[subsection_usage_key] = subsection_model - def _get_block_structure(self, block_structure): - """ - If block_structure is None, returns self.course_structure. - Otherwise, returns block_structure after verifying that the - given block_structure is a sub-structure of self.course_structure. - """ - if block_structure: - if block_structure.root_block_usage_key not in self.course_structure: - raise ValueError - return block_structure - else: - return self.course_structure - - def _log_event(self, log_func, log_statement): + def _log_event(self, log_func, log_statement, subsection): """ Logs the given statement, for this instance. """ log_func(u"Persistent Grades: SGF.{}, course: {}, version: {}, edit: {}, user: {}".format( log_statement, self.course.id, - getattr(self.course, 'course_version', None), - self.course.subtree_edited_on, + getattr(subsection, 'course_version', None), + subsection.subtree_edited_on, self.student.id, )) diff --git a/lms/djangoapps/grades/signals/handlers.py b/lms/djangoapps/grades/signals/handlers.py index e78ee4a0d0..fada098f77 100644 --- a/lms/djangoapps/grades/signals/handlers.py +++ b/lms/djangoapps/grades/signals/handlers.py @@ -2,7 +2,6 @@ Grades related signals. """ -from celery import Task from django.dispatch import receiver from logging import getLogger @@ -12,7 +11,8 @@ from student.models import user_by_anonymous_id from submissions.models import score_set, score_reset from .signals import PROBLEM_SCORE_CHANGED, SUBSECTION_SCORE_CHANGED, SCORE_PUBLISHED -from ..tasks import recalculate_subsection_grade, recalculate_course_grade +from ..new.course_grade import CourseGradeFactory +from ..tasks import recalculate_subsection_grade log = getLogger(__name__) @@ -92,7 +92,7 @@ def score_published_handler(sender, block, user, raw_earned, raw_possible, only_ if only_if_higher: previous_score = get_score(user.id, block.location) - if previous_score: + if previous_score is not None: prev_raw_earned, prev_raw_possible = previous_score # pylint: disable=unpacking-non-sequence if not is_score_higher(prev_raw_earned, prev_raw_possible, raw_earned, raw_possible): @@ -136,11 +136,8 @@ def enqueue_subsection_update(sender, **kwargs): # pylint: disable=unused-argum @receiver(SUBSECTION_SCORE_CHANGED) -def enqueue_course_update(sender, **kwargs): # pylint: disable=unused-argument +def recalculate_course_grade(sender, course, course_structure, user, **kwargs): # pylint: disable=unused-argument """ - Handles the SUBSECTION_SCORE_CHANGED signal by enqueueing a course update operation to occur asynchronously. + Updates a saved course grade. """ - if isinstance(sender, Task): # We're already in a async worker, just do the task - recalculate_course_grade.apply(args=(kwargs['user'].id, unicode(kwargs['course'].id))) - else: # Otherwise, queue the work to be done asynchronously - recalculate_course_grade.apply_async(args=(kwargs['user'].id, unicode(kwargs['course'].id))) + CourseGradeFactory(user).update(course, course_structure) diff --git a/lms/djangoapps/grades/signals/signals.py b/lms/djangoapps/grades/signals/signals.py index 8418f90ca5..0f0465f62b 100644 --- a/lms/djangoapps/grades/signals/signals.py +++ b/lms/djangoapps/grades/signals/signals.py @@ -45,6 +45,7 @@ SCORE_PUBLISHED = Signal( SUBSECTION_SCORE_CHANGED = Signal( providing_args=[ 'course', # Course object + 'course_structure', # BlockStructure object 'user', # User object 'subsection_grade', # SubsectionGrade object ] diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py index ba02ac2d47..ef5be52971 100644 --- a/lms/djangoapps/grades/tasks.py +++ b/lms/djangoapps/grades/tasks.py @@ -12,11 +12,9 @@ from courseware.model_data import get_score from lms.djangoapps.course_blocks.api import get_course_blocks from opaque_keys.edx.keys import UsageKey from opaque_keys.edx.locator import CourseLocator -from openedx.core.djangoapps.content.block_structure.api import get_course_in_cache from xmodule.modulestore.django import modulestore from .config.models import PersistentGradesEnabledFlag -from .new.course_grade import CourseGradeFactory from .new.subsection_grade import SubsectionGradeFactory from .signals.signals import SUBSECTION_SCORE_CHANGED from .transformer import GradesTransformer @@ -40,17 +38,19 @@ def recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, r - raw_possible: the max raw points the leaner could have earned on the problem """ - if not PersistentGradesEnabledFlag.feature_enabled(course_id): + course_key = CourseLocator.from_string(course_id) + if not PersistentGradesEnabledFlag.feature_enabled(course_key): return - course_key = CourseLocator.from_string(course_id) scored_block_usage_key = UsageKey.from_string(usage_id).replace(course_key=course_key) score = get_score(user_id, scored_block_usage_key) # If the score is None, it has not been saved at all yet # and we need to retry until it has been saved. if score is None: - _retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible) + raise _retry_recalculate_subsection_grade( + user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible, + ) else: module_raw_earned, module_raw_possible = score # pylint: disable=unpacking-non-sequence @@ -65,7 +65,9 @@ def recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, r state_deleted = module_raw_earned is None and module_raw_possible is None and raw_earned == 0 if not (state_deleted or grades_match): - _retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible) + raise _retry_recalculate_subsection_grade( + user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible, + ) _update_subsection_grades( course_key, @@ -79,44 +81,6 @@ def recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, r ) -@task(default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY) -def recalculate_course_grade(user_id, course_id): - """ - Updates a saved course grade. - This method expects the following parameters: - - user_id: serialized id of applicable User object - - course_id: Unicode string representing the course - """ - if not PersistentGradesEnabledFlag.feature_enabled(course_id): - return - student = User.objects.get(id=user_id) - course_key = CourseLocator.from_string(course_id) - course = modulestore().get_course(course_key, depth=0) - - try: - CourseGradeFactory(student).update(course) - except IntegrityError as exc: - raise recalculate_course_grade.retry(args=[user_id, course_id], exc=exc) - - -def _retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, grade, max_grade, exc=None): - """ - Calls retry for the recalculate_subsection_grade task with the - given inputs. - """ - raise recalculate_subsection_grade.retry( - args=[ - user_id, - course_id, - usage_id, - only_if_higher, - grade, - max_grade, - ], - exc=exc - ) - - def _update_subsection_grades( course_key, scored_block_usage_key, @@ -132,35 +96,51 @@ def _update_subsection_grades( for each subsection containing the given block, and to signal that those subsection grades were updated. """ - collected_block_structure = get_course_in_cache(course_key) - course = modulestore().get_course(course_key, depth=0) student = User.objects.get(id=user_id) - subsection_grade_factory = SubsectionGradeFactory(student, course, collected_block_structure) - subsections_to_update = collected_block_structure.get_transformer_block_field( + course_structure = get_course_blocks(student, modulestore().make_course_usage_key(course_key)) + subsections_to_update = course_structure.get_transformer_block_field( scored_block_usage_key, GradesTransformer, 'subsections', - set() + set(), ) + course = modulestore().get_course(course_key, depth=0) + subsection_grade_factory = SubsectionGradeFactory(student, course, course_structure) + try: for subsection_usage_key in subsections_to_update: - transformed_subsection_structure = get_course_blocks( - student, - subsection_usage_key, - collected_block_structure=collected_block_structure, - ) subsection_grade = subsection_grade_factory.update( - transformed_subsection_structure[subsection_usage_key], - transformed_subsection_structure, + course_structure[subsection_usage_key], only_if_higher, ) SUBSECTION_SCORE_CHANGED.send( sender=recalculate_subsection_grade, course=course, + course_structure=course_structure, user=student, subsection_grade=subsection_grade, ) except IntegrityError as exc: - _retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible, exc) + raise _retry_recalculate_subsection_grade( + user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible, exc, + ) + + +def _retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, grade, max_grade, exc=None): + """ + Calls retry for the recalculate_subsection_grade task with the + given inputs. + """ + recalculate_subsection_grade.retry( + args=[ + user_id, + course_id, + usage_id, + only_if_higher, + grade, + max_grade, + ], + exc=exc + ) diff --git a/lms/djangoapps/grades/tests/test_tasks.py b/lms/djangoapps/grades/tests/test_tasks.py index 60689b47a5..45dca070d1 100644 --- a/lms/djangoapps/grades/tests/test_tasks.py +++ b/lms/djangoapps/grades/tests/test_tasks.py @@ -21,7 +21,7 @@ from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory, chec from lms.djangoapps.grades.config.models import PersistentGradesEnabledFlag from lms.djangoapps.grades.signals.signals import PROBLEM_SCORE_CHANGED, SUBSECTION_SCORE_CHANGED -from lms.djangoapps.grades.tasks import recalculate_course_grade, recalculate_subsection_grade +from lms.djangoapps.grades.tasks import recalculate_subsection_grade @patch.dict(settings.FEATURES, {'PERSISTENT_GRADES_ENABLED_FOR_ALL_TESTS': False}) @@ -85,7 +85,6 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase): @ddt.data( ('lms.djangoapps.grades.tasks.recalculate_subsection_grade.apply_async', PROBLEM_SCORE_CHANGED), - ('lms.djangoapps.grades.tasks.recalculate_course_grade.apply_async', SUBSECTION_SCORE_CHANGED) ) @ddt.unpack def test_signal_queues_task(self, enqueue_op, test_signal): @@ -115,52 +114,8 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase): Ensures that the subsection update operation also updates the course grade. """ self.set_up_course() - mock_update_return = uuid4() - - course_key = CourseLocator.from_string(unicode(self.course.id)) - course = modulestore().get_course(course_key, depth=0) - with patch( - 'lms.djangoapps.grades.new.subsection_grade.SubsectionGradeFactory.update', - return_value=mock_update_return - ): - self._apply_recalculate_subsection_grade() - mock_course_signal.assert_called_once_with( - sender=recalculate_subsection_grade, - course=course, - user=self.user, - subsection_grade=mock_update_return, - ) - - @ddt.data(True, False) - def test_course_update_enqueuing(self, should_be_async): - """ - Ensures that the course update operation is enqueued on an async queue (or not) as expected. - """ - base = 'lms.djangoapps.grades.tasks.recalculate_course_grade' - if should_be_async: - executed = base + '.apply_async' - other = base + '.apply' - sender = None - else: - executed = base + '.apply' - other = base + '.apply_async' - sender = recalculate_subsection_grade - self.set_up_course() - - with patch(executed) as executed_task: - with patch(other) as other_task: - SUBSECTION_SCORE_CHANGED.send( - sender=sender, - course=self.course, - user=self.user, - ) - other_task.assert_not_called() - executed_task.assert_called_once_with( - args=( - self.problem_score_changed_kwargs['user_id'], - self.problem_score_changed_kwargs['course_id'], - ) - ) + self._apply_recalculate_subsection_grade() + self.assertTrue(mock_course_signal.called) @ddt.data( (ModuleStoreEnum.Type.mongo, 1), @@ -171,7 +126,7 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase): with self.store.default_store(default_store): self.set_up_course() self.assertTrue(PersistentGradesEnabledFlag.feature_enabled(self.course.id)) - with check_mongo_calls(2) and self.assertNumQueries(25 + added_queries): + with check_mongo_calls(2) and self.assertNumQueries(22 + added_queries): self._apply_recalculate_subsection_grade() def test_single_call_to_create_block_structure(self): @@ -182,7 +137,7 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase): return_value=None, ) as mock_block_structure_create: self._apply_recalculate_subsection_grade() - self.assertEquals(mock_block_structure_create.call_count, 2) + self.assertEquals(mock_block_structure_create.call_count, 1) @ddt.data( (ModuleStoreEnum.Type.mongo, 1), @@ -195,7 +150,7 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase): self.assertTrue(PersistentGradesEnabledFlag.feature_enabled(self.course.id)) ItemFactory.create(parent=self.sequential, category='problem', display_name='problem2') ItemFactory.create(parent=self.sequential, category='problem', display_name='problem3') - with check_mongo_calls(2) and self.assertNumQueries(25 + added_queries): + with check_mongo_calls(2) and self.assertNumQueries(22 + added_queries): self._apply_recalculate_subsection_grade() @ddt.data(ModuleStoreEnum.Type.mongo, ModuleStoreEnum.Type.split) @@ -232,22 +187,6 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase): self._apply_recalculate_subsection_grade() self.assertTrue(mock_retry.called) - @patch('lms.djangoapps.grades.tasks.recalculate_course_grade.retry') - @patch('lms.djangoapps.grades.new.course_grade.CourseGradeFactory.update') - def test_retry_course_update_on_integrity_error(self, mock_update, mock_retry): - """ - Ensures that tasks will be retried if IntegrityErrors are encountered. - """ - self.set_up_course() - mock_update.side_effect = IntegrityError("WHAMMY") - recalculate_course_grade.apply( - args=( - self.problem_score_changed_kwargs['user_id'], - self.problem_score_changed_kwargs['course_id'], - ) - ) - self.assertTrue(mock_retry.called) - @patch('lms.djangoapps.grades.tasks.recalculate_subsection_grade.retry') def test_retry_subsection_grade_on_update_not_complete(self, mock_retry): self.set_up_course() diff --git a/openedx/core/djangoapps/performance/utils.py b/openedx/core/djangoapps/performance/utils.py new file mode 100644 index 0000000000..12f4664b96 --- /dev/null +++ b/openedx/core/djangoapps/performance/utils.py @@ -0,0 +1,44 @@ +""" +Common utilities for performance testing. +""" +from contextlib import contextmanager + + +def collect_profile_func(file_prefix, enabled=False): + """ + Method decorator for collecting profile. + """ + import functools + + def _outer(func): + """ + Outer function decorator. + """ + @functools.wraps(func) + def _inner(self, *args, **kwargs): + """ + Inner wrapper function. + """ + if enabled: + with collect_profile(file_prefix): + return func(self, *args, **kwargs) + else: + return func(self, *args, **kwargs) + return _inner + return _outer + + +@contextmanager +def collect_profile(file_prefix): + """ + Context manager to collect profile information. + """ + import cProfile + import uuid + profiler = cProfile.Profile() + profiler.enable() + try: + yield + finally: + profiler.disable() + profiler.dump_stats("{0}_{1}_master.profile".format(file_prefix, uuid.uuid4()))