From c04f4401dc1a6403817c4e32412df5c26153e71a Mon Sep 17 00:00:00 2001 From: Nimisha Asthagiri Date: Sat, 21 Jan 2017 16:49:37 -0500 Subject: [PATCH] Update generate_course_blocks management command to enqueue tasks --- .../commands/generate_course_blocks.py | 200 +++++++----------- .../tests/test_generate_course_blocks.py | 142 +++++++++---- .../management/commands/reset_grades.py | 26 +-- .../content/block_structure/tasks.py | 34 ++- openedx/core/lib/command_utils.py | 48 +++++ 5 files changed, 255 insertions(+), 195 deletions(-) create mode 100644 openedx/core/lib/command_utils.py diff --git a/lms/djangoapps/course_blocks/management/commands/generate_course_blocks.py b/lms/djangoapps/course_blocks/management/commands/generate_course_blocks.py index 2967b01316..47b1feaba6 100644 --- a/lms/djangoapps/course_blocks/management/commands/generate_course_blocks.py +++ b/lms/djangoapps/course_blocks/management/commands/generate_course_blocks.py @@ -1,15 +1,19 @@ """ Command to load course blocks. """ -from collections import defaultdict import logging -from django.core.management.base import BaseCommand, CommandError -from opaque_keys import InvalidKeyError -from opaque_keys.edx.keys import CourseKey +from django.core.management.base import BaseCommand from xmodule.modulestore.django import modulestore -from openedx.core.djangoapps.content.block_structure.api import get_course_in_cache, update_course_in_cache +import openedx.core.djangoapps.content.block_structure.api as api +import openedx.core.djangoapps.content.block_structure.tasks as tasks +import openedx.core.lib.block_structure.cache as cache +from openedx.core.lib.command_utils import ( + get_mutually_exclusive_required_option, + validate_dependent_option, + parse_course_keys, +) log = logging.getLogger(__name__) @@ -29,159 +33,109 @@ class Command(BaseCommand): Entry point for subclassed commands to add custom arguments. """ parser.add_argument( - '--all', - help='Generate course blocks for all or specified courses.', + '--courses', + dest='courses', + nargs='+', + help='Generate course blocks for the list of courses provided.', + ) + parser.add_argument( + '--all_courses', + help='Generate course blocks for all courses, given the requested start and end indices.', action='store_true', default=False, ) parser.add_argument( - '--dags', - help='Find and log DAGs for all or specified courses.', + '--enqueue_task', + help='Enqueue the tasks for asynchronous computation.', action='store_true', default=False, ) parser.add_argument( - '--force', + '--routing_key', + dest='routing_key', + help='Routing key to use for asynchronous computation.', + ) + parser.add_argument( + '--force_update', help='Force update of the course blocks for the requested courses.', action='store_true', default=False, ) parser.add_argument( - '--verbose', - help='Enable verbose logging.', - action='store_true', - default=False, - ) - parser.add_argument( - '--start', - help='Starting index of course.', + '--start_index', + help='Starting index of course list.', default=0, type=int, ) parser.add_argument( - '--end', - help='Ending index of course.', + '--end_index', + help='Ending index of course list.', default=0, type=int, ) def handle(self, *args, **options): - if options.get('all'): + courses_mode = get_mutually_exclusive_required_option(options, 'courses', 'all_courses') + validate_dependent_option(options, 'routing_key', 'enqueue_task') + validate_dependent_option(options, 'start_index', 'all_courses') + validate_dependent_option(options, 'end_index', 'all_courses') + + if courses_mode == 'all_courses': course_keys = [course.id for course in modulestore().get_course_summaries()] - if options.get('start'): - end = options.get('end') or len(course_keys) - course_keys = course_keys[options['start']:end] + if options.get('start_index'): + end = options.get('end_index') or len(course_keys) + course_keys = course_keys[options['start_index']:end] else: - if len(args) < 1: - raise CommandError('At least one course or --all must be specified.') - try: - course_keys = [CourseKey.from_string(arg) for arg in args] - except InvalidKeyError: - raise CommandError('Invalid key specified.') + course_keys = parse_course_keys(options['courses']) - log.info('Generating course blocks for %d courses.', len(course_keys)) + self._set_log_levels(options) - if options.get('verbose'): - log.setLevel(logging.DEBUG) + log.warning('STARTED generating Course Blocks for %d courses.', len(course_keys)) + self._generate_course_blocks(options, course_keys) + log.warning('FINISHED generating Course Blocks for %d courses.', len(course_keys)) + + def _set_log_levels(self, options): + """ + Sets logging levels for this module and the block structure + cache module, based on the given the options. + """ + if options.get('verbosity') == 0: + log_level = logging.CRITICAL + elif options.get('verbosity') == 1: + log_level = logging.WARNING else: - log.setLevel(logging.CRITICAL) + log_level = logging.INFO + + if options.get('verbosity') < 3: + cache_log_level = logging.CRITICAL + else: + cache_log_level = logging.INFO + + log.setLevel(log_level) + cache.logger.setLevel(cache_log_level) + + def _generate_course_blocks(self, options, course_keys): + """ + Generates course blocks for the given course_keys per the given options. + """ - dag_info = _DAGInfo() for course_key in course_keys: try: - if options.get('force'): - block_structure = update_course_in_cache(course_key) + log.info('STARTED generating Course Blocks for course: %s.', course_key) + + if options.get('enqueue_task'): + action = tasks.update_course_in_cache if options.get('force_update') else tasks.get_course_in_cache + task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {} + action.apply_async([unicode(course_key)], **task_options) else: - block_structure = get_course_in_cache(course_key) - if options.get('dags'): - self._find_and_log_dags(block_structure, course_key, dag_info) + action = api.update_course_in_cache if options.get('force_update') else api.get_course_in_cache + action(course_key) + + log.info('FINISHED generating Course Blocks for course: %s.', course_key) except Exception as ex: # pylint: disable=broad-except log.exception( 'An error occurred while generating course blocks for %s: %s', unicode(course_key), ex.message, ) - - log.info('Finished generating course blocks.') - - if options.get('dags'): - log.critical('DAG data: %s', unicode(dag_info)) - - def _find_and_log_dags(self, block_structure, course_key, dag_info): - """ - Finds all DAGs within the given block structure. - - Arguments: - BlockStructureBlockData - The block structure in which to find DAGs. - """ - for block_key in block_structure.get_block_keys(): - parents = block_structure.get_parents(block_key) - if len(parents) > 1: - dag_info.on_dag_found(course_key, block_key) - log.warning( - 'DAG alert - %s has multiple parents: %s.', - unicode(block_key), - [unicode(parent) for parent in parents], - ) - - -class PrettyDefaultDict(defaultdict): - """ - Wraps defaultdict to provide a better string representation. - """ - __repr__ = dict.__repr__ - - -class _DAGBlockTypeInfo(object): - """ - Class for aggregated DAG data for a specific block type. - """ - def __init__(self): - self.num_of_dag_blocks = 0 - - def __repr__(self): - return repr(vars(self)) - - -class _DAGCourseInfo(object): - """ - Class for aggregated DAG data for a specific course run. - """ - def __init__(self): - self.num_of_dag_blocks = 0 - self.dag_data_by_block_type = PrettyDefaultDict(_DAGBlockTypeInfo) - - def __repr__(self): - return repr(vars(self)) - - def on_dag_found(self, block_key): - """ - Updates DAG collected data for the given block. - """ - self.num_of_dag_blocks += 1 - self.dag_data_by_block_type[block_key.category].num_of_dag_blocks += 1 - - -class _DAGInfo(object): - """ - Class for aggregated DAG data. - """ - def __init__(self): - self.total_num_of_dag_blocks = 0 - self.total_num_of_dag_courses = 0 - self.dag_data_by_course = PrettyDefaultDict(_DAGCourseInfo) - self.dag_data_by_block_type = PrettyDefaultDict(_DAGBlockTypeInfo) - - def __repr__(self): - return repr(vars(self)) - - def on_dag_found(self, course_key, block_key): - """ - Updates DAG collected data for the given block. - """ - self.total_num_of_dag_blocks += 1 - if course_key not in self.dag_data_by_course: - self.total_num_of_dag_courses += 1 - self.dag_data_by_course[unicode(course_key)].on_dag_found(block_key) - self.dag_data_by_block_type[block_key.category].num_of_dag_blocks += 1 diff --git a/lms/djangoapps/course_blocks/management/commands/tests/test_generate_course_blocks.py b/lms/djangoapps/course_blocks/management/commands/tests/test_generate_course_blocks.py index 43e7cfff85..6e320f329f 100644 --- a/lms/djangoapps/course_blocks/management/commands/tests/test_generate_course_blocks.py +++ b/lms/djangoapps/course_blocks/management/commands/tests/test_generate_course_blocks.py @@ -1,7 +1,9 @@ """ Tests for generate_course_blocks management command. """ +import ddt from django.core.management.base import CommandError +import itertools from mock import patch from xmodule.modulestore import ModuleStoreEnum @@ -11,86 +13,140 @@ from .. import generate_course_blocks from openedx.core.djangoapps.content.block_structure.tests.helpers import is_course_in_block_structure_cache +@ddt.ddt class TestGenerateCourseBlocks(ModuleStoreTestCase): """ Tests generate course blocks management command. """ + num_courses = 2 + def setUp(self): """ Create courses in modulestore. """ super(TestGenerateCourseBlocks, self).setUp() - self.course_1 = CourseFactory.create() - self.course_2 = CourseFactory.create() + self.courses = [CourseFactory.create() for _ in range(self.num_courses)] + self.course_keys = [course.id for course in self.courses] self.command = generate_course_blocks.Command() - def _assert_courses_not_in_block_cache(self, *courses): + def _assert_courses_not_in_block_cache(self, *course_keys): """ Assert courses don't exist in the course block cache. """ - for course_key in courses: + for course_key in course_keys: self.assertFalse(is_course_in_block_structure_cache(course_key, self.store)) - def _assert_courses_in_block_cache(self, *courses): + def _assert_courses_in_block_cache(self, *course_keys): """ Assert courses exist in course block cache. """ - for course_key in courses: + for course_key in course_keys: self.assertTrue(is_course_in_block_structure_cache(course_key, self.store)) - def test_generate_all(self): - self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id) - self.command.handle(all=True) - self._assert_courses_in_block_cache(self.course_1.id, self.course_2.id) + def _assert_message_presence_in_logs(self, message, mock_log, expected_presence=True): + """ + Asserts that the logger was called with the given message. + """ + message_present = any([message in call_args[0][0] for call_args in mock_log.warning.call_args_list]) + if expected_presence: + self.assertTrue(message_present) + else: + self.assertFalse(message_present) + + @ddt.data(True, False) + def test_all_courses(self, force_update): + self._assert_courses_not_in_block_cache(*self.course_keys) + self.command.handle(all_courses=True) + self._assert_courses_in_block_cache(*self.course_keys) with patch( 'openedx.core.lib.block_structure.factory.BlockStructureFactory.create_from_modulestore' ) as mock_update_from_store: - self.command.handle(all=True) - mock_update_from_store.assert_not_called() + self.command.handle(all_courses=True, force_update=force_update) + self.assertEqual(mock_update_from_store.call_count, self.num_courses if force_update else 0) + + def test_one_course(self): + self._assert_courses_not_in_block_cache(*self.course_keys) + self.command.handle(courses=[unicode(self.course_keys[0])]) + self._assert_courses_in_block_cache(self.course_keys[0]) + self._assert_courses_not_in_block_cache(*self.course_keys[1:]) + + @ddt.data( + *itertools.product( + (True, False), + (True, False), + ('route_1', None), + ) + ) + @ddt.unpack + def test_enqueue(self, enqueue_task, force_update, routing_key): + command_options = dict(all_courses=True, enqueue_task=enqueue_task, force_update=force_update) + if enqueue_task and routing_key: + command_options['routing_key'] = routing_key - def test_generate_force(self): - self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id) - self.command.handle(all=True) - self._assert_courses_in_block_cache(self.course_1.id, self.course_2.id) with patch( - 'openedx.core.lib.block_structure.factory.BlockStructureFactory.create_from_modulestore' - ) as mock_update_from_store: - self.command.handle(all=True, force=True) - mock_update_from_store.assert_called() + 'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.tasks' + ) as mock_tasks: + with patch( + 'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.api' + ) as mock_api: - def test_generate_one(self): - self._assert_courses_not_in_block_cache(self.course_1.id, self.course_2.id) - self.command.handle(unicode(self.course_1.id)) - self._assert_courses_in_block_cache(self.course_1.id) - self._assert_courses_not_in_block_cache(self.course_2.id) + self.command.handle(**command_options) - @patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log') - def test_generate_no_dags(self, mock_log): - self.command.handle(dags=True, all=True) - self.assertEquals(mock_log.warning.call_count, 0) + self.assertEqual( + mock_tasks.update_course_in_cache.apply_async.call_count, + self.num_courses if enqueue_task and force_update else 0, + ) + self.assertEqual( + mock_tasks.get_course_in_cache.apply_async.call_count, + self.num_courses if enqueue_task and not force_update else 0, + ) - @patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log') - def test_generate_with_dags(self, mock_log): - with self.store.branch_setting(ModuleStoreEnum.Branch.draft_preferred): - item1 = ItemFactory.create(parent=self.course_1) - item2 = ItemFactory.create(parent=item1) - item3 = ItemFactory.create(parent=item1) - item2.children.append(item3.location) - self.store.update_item(item2, ModuleStoreEnum.UserID.mgmt_command) - self.store.publish(self.course_1.location, ModuleStoreEnum.UserID.mgmt_command) + self.assertEqual( + mock_api.update_course_in_cache.call_count, + self.num_courses if not enqueue_task and force_update else 0, + ) + self.assertEqual( + mock_api.get_course_in_cache.call_count, + self.num_courses if not enqueue_task and not force_update else 0, + ) - self.command.handle(dags=True, all=True) - self.assertEquals(mock_log.warning.call_count, 1) + if enqueue_task: + task_action = mock_tasks.update_course_in_cache if force_update else mock_tasks.get_course_in_cache + task_options = task_action.apply_async.call_args[1] + if routing_key: + self.assertEquals(task_options['routing_key'], routing_key) + else: + self.assertNotIn('routing_key', task_options) @patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log') def test_not_found_key(self, mock_log): - self.command.handle('fake/course/id', all=False) + self.command.handle(courses=['fake/course/id']) self.assertTrue(mock_log.exception.called) def test_invalid_key(self): with self.assertRaises(CommandError): - self.command.handle('not/found', all=False) + self.command.handle(courses=['not/found']) def test_no_params(self): with self.assertRaises(CommandError): - self.command.handle(all=False) + self.command.handle(all_courses=False) + + def test_no_course_mode(self): + with self.assertRaisesMessage(CommandError, 'Either --courses or --all_courses must be specified.'): + self.command.handle() + + def test_both_course_modes(self): + with self.assertRaisesMessage(CommandError, 'Both --courses and --all_courses cannot be specified.'): + self.command.handle(all_courses=True, courses=['some/course/key']) + + @ddt.data( + ('routing_key', 'enqueue_task'), + ('start_index', 'all_courses'), + ('end_index', 'all_courses'), + ) + @ddt.unpack + def test_dependent_options_error(self, dependent_option, depending_on_option): + expected_error_message = 'Option --{} requires option --{}.'.format(dependent_option, depending_on_option) + options = {dependent_option: 1, depending_on_option: False, 'courses': ['some/course/key']} + with self.assertRaisesMessage(CommandError, expected_error_message): + self.command.handle(**options) diff --git a/lms/djangoapps/grades/management/commands/reset_grades.py b/lms/djangoapps/grades/management/commands/reset_grades.py index 43e9776b6c..656786b038 100644 --- a/lms/djangoapps/grades/management/commands/reset_grades.py +++ b/lms/djangoapps/grades/management/commands/reset_grades.py @@ -8,9 +8,7 @@ from textwrap import dedent from django.core.management.base import BaseCommand, CommandError from django.db.models import Count -from opaque_keys import InvalidKeyError -from opaque_keys.edx.keys import CourseKey - +from openedx.core.lib.command_utils import get_mutually_exclusive_required_option, parse_course_keys from lms.djangoapps.grades.models import PersistentSubsectionGrade, PersistentCourseGrade @@ -73,8 +71,8 @@ class Command(BaseCommand): modified_start = None modified_end = None - run_mode = self._get_mutually_exclusive_option(options, 'delete', 'dry_run') - courses_mode = self._get_mutually_exclusive_option(options, 'courses', 'all_courses') + run_mode = get_mutually_exclusive_required_option(options, 'delete', 'dry_run') + courses_mode = get_mutually_exclusive_required_option(options, 'courses', 'all_courses') if options.get('modified_start'): modified_start = datetime.strptime(options['modified_start'], DATE_FORMAT) @@ -85,10 +83,7 @@ class Command(BaseCommand): modified_end = datetime.strptime(options['modified_end'], DATE_FORMAT) if courses_mode == 'courses': - try: - course_keys = [CourseKey.from_string(course_key_string) for course_key_string in options['courses']] - except InvalidKeyError as error: - raise CommandError('Invalid key specified: {}'.format(error.message)) + course_keys = parse_course_keys(options['courses']) log.info("reset_grade: Started in %s mode!", run_mode) @@ -135,16 +130,3 @@ class Command(BaseCommand): grade_model_class.__name__, total_for_all_courses, ) - - def _get_mutually_exclusive_option(self, options, option_1, option_2): - """ - Validates that exactly one of the 2 given options is specified. - Returns the name of the found option. - """ - if not options.get(option_1) and not options.get(option_2): - raise CommandError('Either --{} or --{} must be specified.'.format(option_1, option_2)) - - if options.get(option_1) and options.get(option_2): - raise CommandError('Both --{} and --{} cannot be specified.'.format(option_1, option_2)) - - return option_1 if options.get(option_1) else option_2 diff --git a/openedx/core/djangoapps/content/block_structure/tasks.py b/openedx/core/djangoapps/content/block_structure/tasks.py index 13b0103532..f01d29eed0 100644 --- a/openedx/core/djangoapps/content/block_structure/tasks.py +++ b/openedx/core/djangoapps/content/block_structure/tasks.py @@ -29,17 +29,37 @@ def update_course_in_cache(course_id): """ Updates the course blocks (in the database) for the specified course. """ + _call_and_retry_if_needed(course_id, api.update_course_in_cache, update_course_in_cache) + + +@task( + default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['BLOCK_STRUCTURES_TASK_DEFAULT_RETRY_DELAY'], + max_retries=settings.BLOCK_STRUCTURES_SETTINGS['BLOCK_STRUCTURES_TASK_MAX_RETRIES'], +) +def get_course_in_cache(course_id): + """ + Gets the course blocks for the specified course, updating the cache if needed. + """ + _call_and_retry_if_needed(course_id, api.get_course_in_cache, get_course_in_cache) + + +def _call_and_retry_if_needed(course_id, api_method, task_method): + """ + Calls the given api_method with the given course_id, retrying task_method upon failure. + """ try: course_key = CourseKey.from_string(course_id) - api.update_course_in_cache(course_key) + api_method(course_key) except NO_RETRY_TASKS as exc: # Known unrecoverable errors raise except RETRY_TASKS as exc: - log.exception("update_course_in_cache encounted expected error, retrying.") - raise update_course_in_cache.retry(args=[course_id], exc=exc) + log.exception("%s encountered expected error, retrying.", task_method.__name__) + raise task_method.retry(args=[course_id], exc=exc) except Exception as exc: # pylint: disable=broad-except - log.exception("update_course_in_cache encounted unknown error. Retry #{}".format( - update_course_in_cache.request.retries, - )) - raise update_course_in_cache.retry(args=[course_id], exc=exc) + log.exception( + "%s encountered unknown error. Retry #%d", + task_method.__name__, + task_method.request.retries, + ) + raise task_method.retry(args=[course_id], exc=exc) diff --git a/openedx/core/lib/command_utils.py b/openedx/core/lib/command_utils.py new file mode 100644 index 0000000000..08c17207a8 --- /dev/null +++ b/openedx/core/lib/command_utils.py @@ -0,0 +1,48 @@ +""" +Useful utilities for management commands. +""" + +from django.core.management.base import CommandError + +from opaque_keys import InvalidKeyError +from opaque_keys.edx.keys import CourseKey + + +def get_mutually_exclusive_required_option(options, option_1, option_2): + """ + Validates that exactly one of the 2 given options is specified. + Returns the name of the found option. + """ + validate_mutually_exclusive_option(options, option_1, option_2) + + if not options.get(option_1) and not options.get(option_2): + raise CommandError('Either --{} or --{} must be specified.'.format(option_1, option_2)) + + return option_1 if options.get(option_1) else option_2 + + +def validate_mutually_exclusive_option(options, option_1, option_2): + """ + Validates that both of the 2 given options are not specified. + """ + if options.get(option_1) and options.get(option_2): + raise CommandError('Both --{} and --{} cannot be specified.'.format(option_1, option_2)) + + +def validate_dependent_option(options, dependent_option, depending_on_option): + """ + Validates that option_1 is specified if dependent_option is specified. + """ + if options.get(dependent_option) and not options.get(depending_on_option): + raise CommandError('Option --{} requires option --{}.'.format(dependent_option, depending_on_option)) + + +def parse_course_keys(course_key_strings): + """ + Parses and returns a list of CourseKey objects from the given + list of course key strings. + """ + try: + return [CourseKey.from_string(course_key_string) for course_key_string in course_key_strings] + except InvalidKeyError as error: + raise CommandError('Invalid key specified: {}'.format(error.message))