Shuffle tasks.
This commit is contained in:
@@ -5,8 +5,9 @@ Command to compute all grades for specified courses.
|
||||
from __future__ import absolute_import, division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
from random import shuffle
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.core.management.base import BaseCommand
|
||||
import six
|
||||
|
||||
from openedx.core.lib.command_utils import (
|
||||
@@ -17,7 +18,6 @@ from lms.djangoapps.grades.config.models import ComputeGradesSetting
|
||||
from student.models import CourseEnrollment
|
||||
from xmodule.modulestore.django import modulestore
|
||||
|
||||
from ...config.waffle import waffle, ESTIMATE_FIRST_ATTEMPTED
|
||||
from ... import tasks
|
||||
|
||||
|
||||
@@ -80,32 +80,15 @@ class Command(BaseCommand):
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
|
||||
self._set_log_level(options)
|
||||
self.enqueue_all_shuffled_tasks(options)
|
||||
|
||||
for course_key in self._get_course_keys(options):
|
||||
self.enqueue_compute_grades_for_course_tasks(course_key, options)
|
||||
|
||||
def enqueue_compute_grades_for_course_tasks(self, course_key, options):
|
||||
def enqueue_all_shuffled_tasks(self, options):
|
||||
"""
|
||||
Enqueue celery tasks to compute and persist all grades for the
|
||||
specified course, in batches.
|
||||
Enqueue all tasks, in shuffled order.
|
||||
"""
|
||||
enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count()
|
||||
if enrollment_count == 0:
|
||||
log.warning("No enrollments found for {}".format(course_key))
|
||||
batch_size = self._latest_settings().batch_size if options.get('from_settings') else options['batch_size']
|
||||
for offset in six.moves.range(options['start_index'], enrollment_count, batch_size):
|
||||
# If the number of enrollments increases after the tasks are
|
||||
# created, the most recent enrollments may not get processed.
|
||||
# This is an acceptable limitation for our known use cases.
|
||||
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
|
||||
kwargs = {
|
||||
'course_key': six.text_type(course_key),
|
||||
'offset': offset,
|
||||
'batch_size': batch_size,
|
||||
'estimate_first_attempted': options['estimate_first_attempted']
|
||||
}
|
||||
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
|
||||
for kwargs in self._shuffled_task_kwargs(options):
|
||||
result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options)
|
||||
log.info("Grades: Created {task_name}[{task_id}] with arguments {kwargs}".format(
|
||||
task_name=tasks.compute_grades_for_course.name,
|
||||
@@ -113,6 +96,35 @@ class Command(BaseCommand):
|
||||
kwargs=kwargs,
|
||||
))
|
||||
|
||||
def _shuffled_task_kwargs(self, options):
|
||||
"""
|
||||
Iterate over all task keyword arguments in random order.
|
||||
|
||||
Randomizing them will help even out the load on the task workers,
|
||||
though it will not entirely prevent the possibility of spikes. It will
|
||||
also make the overall time to completion more predictable.
|
||||
"""
|
||||
all_args = []
|
||||
estimate_first_attempted = options['estimate_first_attempted']
|
||||
for course_key in self._get_course_keys(options):
|
||||
enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count()
|
||||
if enrollment_count == 0:
|
||||
log.warning("No enrollments found for {}".format(course_key))
|
||||
batch_size = self._latest_settings().batch_size if options.get('from_settings') else options['batch_size']
|
||||
for offset in six.moves.range(options['start_index'], enrollment_count, batch_size):
|
||||
# This is a tuple to reduce memory consumption.
|
||||
# The dictionaries with their extra overhead will be created
|
||||
# and consumed one at a time.
|
||||
all_args.append((six.text_type(course_key), offset, batch_size))
|
||||
shuffle(all_args)
|
||||
for args in all_args:
|
||||
yield {
|
||||
'course_key': args[0],
|
||||
'offset': args[1],
|
||||
'batch_size': args[2],
|
||||
'estimate_first_attempted': estimate_first_attempted,
|
||||
}
|
||||
|
||||
def _get_course_keys(self, options):
|
||||
"""
|
||||
Return a list of courses that need scores computed.
|
||||
@@ -139,4 +151,7 @@ class Command(BaseCommand):
|
||||
log.setLevel(log_level)
|
||||
|
||||
def _latest_settings(self):
|
||||
"""
|
||||
Return the latest version of the ComputeGradesSetting
|
||||
"""
|
||||
return ComputeGradesSetting.current()
|
||||
|
||||
@@ -17,11 +17,16 @@ from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
|
||||
from lms.djangoapps.grades.config.models import ComputeGradesSetting
|
||||
from openedx.core.djangolib.waffle_utils import WaffleSwitchPlus
|
||||
from lms.djangoapps.grades.config.waffle import ESTIMATE_FIRST_ATTEMPTED
|
||||
from lms.djangoapps.grades.management.commands import compute_grades
|
||||
|
||||
|
||||
def _sorted_by_batch(calls):
|
||||
"""
|
||||
Return the list of calls sorted by course_key and batch.
|
||||
"""
|
||||
return sorted(calls, key=lambda x: (x[1]['kwargs']['course_key'], x[1]['kwargs']['offset']))
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestComputeGrades(SharedModuleStoreTestCase):
|
||||
"""
|
||||
@@ -98,7 +103,7 @@ class TestComputeGrades(SharedModuleStoreTestCase):
|
||||
'estimate_first_attempted': estimate_first_attempted
|
||||
}
|
||||
self.assertEqual(
|
||||
mock_task.apply_async.call_args_list,
|
||||
_sorted_by_batch(mock_task.apply_async.call_args_list),
|
||||
[
|
||||
({
|
||||
'routing_key': 'key',
|
||||
@@ -124,7 +129,7 @@ class TestComputeGrades(SharedModuleStoreTestCase):
|
||||
ComputeGradesSetting.objects.create(course_ids=self.course_keys[1], batch_size=2)
|
||||
call_command('compute_grades', '--from_settings')
|
||||
self.assertEqual(
|
||||
mock_task.apply_async.call_args_list,
|
||||
_sorted_by_batch(mock_task.apply_async.call_args_list),
|
||||
[
|
||||
({
|
||||
'kwargs': {
|
||||
|
||||
Reference in New Issue
Block a user