diff --git a/lms/djangoapps/grades/management/commands/compute_grades.py b/lms/djangoapps/grades/management/commands/compute_grades.py index f2431f8cdc..ee32f01809 100644 --- a/lms/djangoapps/grades/management/commands/compute_grades.py +++ b/lms/djangoapps/grades/management/commands/compute_grades.py @@ -5,8 +5,9 @@ Command to compute all grades for specified courses. from __future__ import absolute_import, division, print_function, unicode_literals import logging +from random import shuffle -from django.core.management.base import BaseCommand, CommandError +from django.core.management.base import BaseCommand import six from openedx.core.lib.command_utils import ( @@ -17,7 +18,6 @@ from lms.djangoapps.grades.config.models import ComputeGradesSetting from student.models import CourseEnrollment from xmodule.modulestore.django import modulestore -from ...config.waffle import waffle, ESTIMATE_FIRST_ATTEMPTED from ... import tasks @@ -80,32 +80,15 @@ class Command(BaseCommand): ) def handle(self, *args, **options): - self._set_log_level(options) + self.enqueue_all_shuffled_tasks(options) - for course_key in self._get_course_keys(options): - self.enqueue_compute_grades_for_course_tasks(course_key, options) - - def enqueue_compute_grades_for_course_tasks(self, course_key, options): + def enqueue_all_shuffled_tasks(self, options): """ - Enqueue celery tasks to compute and persist all grades for the - specified course, in batches. + Enqueue all tasks, in shuffled order. """ - enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count() - if enrollment_count == 0: - log.warning("No enrollments found for {}".format(course_key)) - batch_size = self._latest_settings().batch_size if options.get('from_settings') else options['batch_size'] - for offset in six.moves.range(options['start_index'], enrollment_count, batch_size): - # If the number of enrollments increases after the tasks are - # created, the most recent enrollments may not get processed. - # This is an acceptable limitation for our known use cases. - task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {} - kwargs = { - 'course_key': six.text_type(course_key), - 'offset': offset, - 'batch_size': batch_size, - 'estimate_first_attempted': options['estimate_first_attempted'] - } + task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {} + for kwargs in self._shuffled_task_kwargs(options): result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options) log.info("Grades: Created {task_name}[{task_id}] with arguments {kwargs}".format( task_name=tasks.compute_grades_for_course.name, @@ -113,6 +96,35 @@ class Command(BaseCommand): kwargs=kwargs, )) + def _shuffled_task_kwargs(self, options): + """ + Iterate over all task keyword arguments in random order. + + Randomizing them will help even out the load on the task workers, + though it will not entirely prevent the possibility of spikes. It will + also make the overall time to completion more predictable. + """ + all_args = [] + estimate_first_attempted = options['estimate_first_attempted'] + for course_key in self._get_course_keys(options): + enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count() + if enrollment_count == 0: + log.warning("No enrollments found for {}".format(course_key)) + batch_size = self._latest_settings().batch_size if options.get('from_settings') else options['batch_size'] + for offset in six.moves.range(options['start_index'], enrollment_count, batch_size): + # This is a tuple to reduce memory consumption. + # The dictionaries with their extra overhead will be created + # and consumed one at a time. + all_args.append((six.text_type(course_key), offset, batch_size)) + shuffle(all_args) + for args in all_args: + yield { + 'course_key': args[0], + 'offset': args[1], + 'batch_size': args[2], + 'estimate_first_attempted': estimate_first_attempted, + } + def _get_course_keys(self, options): """ Return a list of courses that need scores computed. @@ -139,4 +151,7 @@ class Command(BaseCommand): log.setLevel(log_level) def _latest_settings(self): + """ + Return the latest version of the ComputeGradesSetting + """ return ComputeGradesSetting.current() diff --git a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py index bd533de6ec..76cffdf084 100644 --- a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py +++ b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py @@ -17,11 +17,16 @@ from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory from lms.djangoapps.grades.config.models import ComputeGradesSetting -from openedx.core.djangolib.waffle_utils import WaffleSwitchPlus -from lms.djangoapps.grades.config.waffle import ESTIMATE_FIRST_ATTEMPTED from lms.djangoapps.grades.management.commands import compute_grades +def _sorted_by_batch(calls): + """ + Return the list of calls sorted by course_key and batch. + """ + return sorted(calls, key=lambda x: (x[1]['kwargs']['course_key'], x[1]['kwargs']['offset'])) + + @ddt.ddt class TestComputeGrades(SharedModuleStoreTestCase): """ @@ -98,7 +103,7 @@ class TestComputeGrades(SharedModuleStoreTestCase): 'estimate_first_attempted': estimate_first_attempted } self.assertEqual( - mock_task.apply_async.call_args_list, + _sorted_by_batch(mock_task.apply_async.call_args_list), [ ({ 'routing_key': 'key', @@ -124,7 +129,7 @@ class TestComputeGrades(SharedModuleStoreTestCase): ComputeGradesSetting.objects.create(course_ids=self.course_keys[1], batch_size=2) call_command('compute_grades', '--from_settings') self.assertEqual( - mock_task.apply_async.call_args_list, + _sorted_by_batch(mock_task.apply_async.call_args_list), [ ({ 'kwargs': {