EDUCATOR-1207 | Run generate_course_overview command via celery task, in batches.
This commit is contained in:
committed by
Alex Dusenbery
parent
74f1975afb
commit
d89afa0832
@@ -5,11 +5,15 @@ Command to load course overviews.
|
||||
import logging
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from xmodule.modulestore.django import modulestore
|
||||
|
||||
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
|
||||
from opaque_keys import InvalidKeyError
|
||||
|
||||
from openedx.core.djangoapps.content.course_overviews.tasks import (
|
||||
DEFAULT_ALL_COURSES,
|
||||
DEFAULT_CHUNK_SIZE,
|
||||
DEFAULT_FORCE_UPDATE,
|
||||
enqueue_async_course_overview_update_tasks
|
||||
)
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -18,7 +22,7 @@ log = logging.getLogger(__name__)
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Example usage:
|
||||
$ ./manage.py lms generate_course_overview --all --settings=devstack
|
||||
$ ./manage.py lms generate_course_overview --all-courses --settings=devstack --chunk-size=100
|
||||
$ ./manage.py lms generate_course_overview 'edX/DemoX/Demo_Course' --settings=devstack
|
||||
"""
|
||||
args = '<course_id course_id ...>'
|
||||
@@ -29,29 +33,44 @@ class Command(BaseCommand):
|
||||
Add arguments to the command parser.
|
||||
"""
|
||||
parser.add_argument(
|
||||
'--all',
|
||||
'--all-courses', '--all',
|
||||
dest='all_courses',
|
||||
action='store_true',
|
||||
dest='all',
|
||||
default=False,
|
||||
default=DEFAULT_ALL_COURSES,
|
||||
help=u'Generate course overview for all courses.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--force_update',
|
||||
'--force-update', '--force_update',
|
||||
action='store_true',
|
||||
default=False,
|
||||
default=DEFAULT_FORCE_UPDATE,
|
||||
help=u'Force update course overviews for the requested courses.',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--chunk-size',
|
||||
action='store',
|
||||
type=int,
|
||||
default=DEFAULT_CHUNK_SIZE,
|
||||
help=u'The maximum number of courses each task will generate a course overview for.'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--routing-key',
|
||||
dest='routing_key',
|
||||
help=u'The celery routing key to use.'
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if not options.get('all_courses') and len(args) < 1:
|
||||
raise CommandError('At least one course or --all-courses must be specified.')
|
||||
|
||||
if options['all']:
|
||||
course_keys = [course.id for course in modulestore().get_course_summaries()]
|
||||
else:
|
||||
if len(args) < 1:
|
||||
raise CommandError('At least one course or --all must be specified.')
|
||||
try:
|
||||
course_keys = [CourseKey.from_string(arg) for arg in args]
|
||||
except InvalidKeyError:
|
||||
raise CommandError('Invalid key specified.')
|
||||
kwargs = {}
|
||||
for key in ('all_courses', 'force_update', 'chunk_size', 'routing_key'):
|
||||
if options.get(key):
|
||||
kwargs[key] = options[key]
|
||||
|
||||
CourseOverview.update_select_courses(course_keys, force_update=options.get('force_update'))
|
||||
try:
|
||||
enqueue_async_course_overview_update_tasks(
|
||||
course_ids=args,
|
||||
**kwargs
|
||||
)
|
||||
except InvalidKeyError as exc:
|
||||
raise CommandError(u'Invalid Course Key: ' + unicode(exc))
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
# pylint: disable=missing-docstring
|
||||
"""
|
||||
Tests that the generate_course_overview management command actually generates course overviews.
|
||||
"""
|
||||
from django.core.management.base import CommandError
|
||||
from mock import patch
|
||||
from nose.plugins.attrib import attr
|
||||
|
||||
from openedx.core.djangoapps.content.course_overviews.management.commands import generate_course_overview
|
||||
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
|
||||
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
|
||||
@@ -44,7 +48,7 @@ class TestGenerateCourseOverview(ModuleStoreTestCase):
|
||||
"""
|
||||
# ensure that the newly created courses aren't in course overviews
|
||||
self._assert_courses_not_in_overview(self.course_key_1, self.course_key_2)
|
||||
self.command.handle(all=True)
|
||||
self.command.handle(all_courses=True)
|
||||
|
||||
# CourseOverview will be populated with all courses in the modulestore
|
||||
self._assert_courses_in_overview(self.course_key_1, self.course_key_2)
|
||||
@@ -54,12 +58,12 @@ class TestGenerateCourseOverview(ModuleStoreTestCase):
|
||||
Test that a specified course is loaded into course overviews.
|
||||
"""
|
||||
self._assert_courses_not_in_overview(self.course_key_1, self.course_key_2)
|
||||
self.command.handle(unicode(self.course_key_1), all=False)
|
||||
self.command.handle(unicode(self.course_key_1), all_courses=False)
|
||||
self._assert_courses_in_overview(self.course_key_1)
|
||||
self._assert_courses_not_in_overview(self.course_key_2)
|
||||
|
||||
def test_generate_force_update(self):
|
||||
self.command.handle(all=True)
|
||||
self.command.handle(all_courses=True)
|
||||
|
||||
# update each course
|
||||
updated_course_name = u'test_generate_course_overview.course_edit'
|
||||
@@ -69,8 +73,8 @@ class TestGenerateCourseOverview(ModuleStoreTestCase):
|
||||
self.store.update_item(course, self.user.id)
|
||||
|
||||
# force_update course_key_1, but not course_key_2
|
||||
self.command.handle(unicode(self.course_key_1), all=False, force_update=True)
|
||||
self.command.handle(unicode(self.course_key_2), all=False, force_update=False)
|
||||
self.command.handle(unicode(self.course_key_1), all_courses=False, force_update=True)
|
||||
self.command.handle(unicode(self.course_key_2), all_courses=False, force_update=False)
|
||||
|
||||
self.assertEquals(CourseOverview.get_from_id(self.course_key_1).display_name, updated_course_name)
|
||||
self.assertNotEquals(CourseOverview.get_from_id(self.course_key_2).display_name, updated_course_name)
|
||||
@@ -80,14 +84,14 @@ class TestGenerateCourseOverview(ModuleStoreTestCase):
|
||||
Test that CommandError is raised for invalid key.
|
||||
"""
|
||||
with self.assertRaises(CommandError):
|
||||
self.command.handle('not/found', all=False)
|
||||
self.command.handle('not/found', all_courses=False)
|
||||
|
||||
@patch('openedx.core.djangoapps.content.course_overviews.models.log')
|
||||
def test_not_found_key(self, mock_log):
|
||||
"""
|
||||
Test keys not found are logged.
|
||||
"""
|
||||
self.command.handle('fake/course/id', all=False)
|
||||
self.command.handle('fake/course/id', all_courses=False)
|
||||
self.assertTrue(mock_log.exception.called)
|
||||
|
||||
def test_no_params(self):
|
||||
@@ -95,4 +99,17 @@ class TestGenerateCourseOverview(ModuleStoreTestCase):
|
||||
Test exception raised when no parameters are specified.
|
||||
"""
|
||||
with self.assertRaises(CommandError):
|
||||
self.command.handle(all=False)
|
||||
self.command.handle(all_courses=False)
|
||||
|
||||
@patch('openedx.core.djangoapps.content.course_overviews.tasks.async_course_overview_update')
|
||||
def test_routing_key(self, mock_async_task):
|
||||
self.command.handle(all_courses=True, force_update=True, routing_key='my-routing-key', chunk_size=10000)
|
||||
|
||||
called_kwargs = mock_async_task.apply_async.call_args_list[0][1]
|
||||
self.assertEquals(sorted([unicode(self.course_key_1), unicode(self.course_key_2)]), called_kwargs.pop('args'))
|
||||
self.assertEquals({
|
||||
'kwargs': {'force_update': True},
|
||||
'routing_key': 'my-routing-key'
|
||||
}, called_kwargs
|
||||
)
|
||||
self.assertEqual(1, mock_async_task.apply_async.call_count)
|
||||
|
||||
70
openedx/core/djangoapps/content/course_overviews/tasks.py
Normal file
70
openedx/core/djangoapps/content/course_overviews/tasks.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import logging
|
||||
|
||||
from celery import task
|
||||
from celery_utils.logged_task import LoggedTask
|
||||
from celery_utils.persist_on_failure import PersistOnFailureTask
|
||||
from django.conf import settings
|
||||
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from xmodule.modulestore.django import modulestore
|
||||
|
||||
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_ALL_COURSES = False
|
||||
|
||||
DEFAULT_CHUNK_SIZE = 50
|
||||
|
||||
DEFAULT_FORCE_UPDATE = False
|
||||
|
||||
|
||||
def chunks(sequence, chunk_size):
|
||||
return (sequence[index: index + chunk_size] for index in xrange(0, len(sequence), chunk_size))
|
||||
|
||||
|
||||
class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-method
|
||||
"""
|
||||
Include persistence features, as well as logging of task invocation.
|
||||
"""
|
||||
abstract = True
|
||||
|
||||
|
||||
def _task_options(routing_key):
|
||||
task_options = {}
|
||||
if getattr(settings, 'HIGH_MEM_QUEUE', None):
|
||||
task_options['routing_key'] = settings.HIGH_MEM_QUEUE
|
||||
if routing_key:
|
||||
task_options['routing_key'] = routing_key
|
||||
return task_options
|
||||
|
||||
|
||||
def enqueue_async_course_overview_update_tasks(
|
||||
course_ids,
|
||||
all_courses=False,
|
||||
force_update=False,
|
||||
chunk_size=DEFAULT_CHUNK_SIZE,
|
||||
routing_key=None
|
||||
):
|
||||
if all_courses:
|
||||
course_keys = [course.id for course in modulestore().get_course_summaries()]
|
||||
else:
|
||||
course_keys = [CourseKey.from_string(id) for id in course_ids]
|
||||
|
||||
for course_key_group in chunks(course_keys, chunk_size):
|
||||
course_key_strings = [unicode(key) for key in course_key_group]
|
||||
|
||||
options = _task_options(routing_key)
|
||||
async_course_overview_update.apply_async(
|
||||
args=course_key_strings,
|
||||
kwargs={'force_update': force_update},
|
||||
**options
|
||||
)
|
||||
|
||||
|
||||
@task(base=_BaseTask)
|
||||
def async_course_overview_update(*args, **kwargs):
|
||||
course_keys = [CourseKey.from_string(arg) for arg in args]
|
||||
CourseOverview.update_select_courses(course_keys, force_update=kwargs['force_update'])
|
||||
@@ -0,0 +1,42 @@
|
||||
import mock
|
||||
|
||||
from xmodule.modulestore import ModuleStoreEnum
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
|
||||
from ..tasks import enqueue_async_course_overview_update_tasks
|
||||
|
||||
|
||||
class BatchedAsyncCourseOverviewUpdateTests(ModuleStoreTestCase):
|
||||
def setUp(self):
|
||||
super(BatchedAsyncCourseOverviewUpdateTests, self).setUp()
|
||||
self.course_1 = CourseFactory.create(default_store=ModuleStoreEnum.Type.mongo)
|
||||
self.course_2 = CourseFactory.create(default_store=ModuleStoreEnum.Type.mongo)
|
||||
self.course_3 = CourseFactory.create(default_store=ModuleStoreEnum.Type.mongo)
|
||||
|
||||
@mock.patch('openedx.core.djangoapps.content.course_overviews.models.CourseOverview.update_select_courses')
|
||||
def test_enqueue_all_courses_in_single_batch(self, mock_update_courses):
|
||||
enqueue_async_course_overview_update_tasks(
|
||||
course_ids=[],
|
||||
force_update=True,
|
||||
all_courses=True
|
||||
)
|
||||
|
||||
called_args, called_kwargs = mock_update_courses.call_args_list[0]
|
||||
self.assertEqual(sorted([self.course_1.id, self.course_2.id, self.course_3.id]), sorted(called_args[0]))
|
||||
self.assertEqual({'force_update': True}, called_kwargs)
|
||||
self.assertEqual(1, mock_update_courses.call_count)
|
||||
|
||||
@mock.patch('openedx.core.djangoapps.content.course_overviews.models.CourseOverview.update_select_courses')
|
||||
def test_enqueue_specific_courses_in_two_batches(self, mock_update_courses):
|
||||
enqueue_async_course_overview_update_tasks(
|
||||
course_ids=[unicode(self.course_1.id), unicode(self.course_2.id)],
|
||||
force_update=True,
|
||||
chunk_size=1,
|
||||
all_courses=False
|
||||
)
|
||||
|
||||
mock_update_courses.assert_has_calls([
|
||||
mock.call([self.course_1.id], force_update=True),
|
||||
mock.call([self.course_2.id], force_update=True)
|
||||
])
|
||||
Reference in New Issue
Block a user