From e83182c7439cbdf2340fbdc368eb777e5d5b9dcc Mon Sep 17 00:00:00 2001 From: Nimisha Asthagiri Date: Fri, 17 Mar 2017 19:55:47 -0400 Subject: [PATCH] Have generate_course_blocks pass with_storage option to celery tasks --- cms/celery.py | 1 + .../contentstore/tests/test_import.py | 2 +- .../block_structure/config/__init__.py | 20 +++- .../commands/generate_course_blocks.py | 35 +++--- .../tests/test_generate_course_blocks.py | 15 +-- .../content/block_structure/signals.py | 10 +- .../content/block_structure/tasks.py | 101 +++++++++++++----- .../block_structure/tests/test_signals.py | 6 +- .../block_structure/tests/test_tasks.py | 6 +- 9 files changed, 129 insertions(+), 67 deletions(-) diff --git a/cms/celery.py b/cms/celery.py index e35bf4d7c1..49f31fbfa0 100644 --- a/cms/celery.py +++ b/cms/celery.py @@ -33,4 +33,5 @@ class Router(AlternateEnvironmentRouter): """ return { 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms', + 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms', } diff --git a/cms/djangoapps/contentstore/tests/test_import.py b/cms/djangoapps/contentstore/tests/test_import.py index e6c3e152f2..6b271d7c82 100644 --- a/cms/djangoapps/contentstore/tests/test_import.py +++ b/cms/djangoapps/contentstore/tests/test_import.py @@ -41,7 +41,7 @@ class ContentStoreImportTest(SignalDisconnectTestMixin, ModuleStoreTestCase): self.client.login(username=self.user.username, password=self.user_password) # block_structure.update_course_in_cache cannot succeed in tests, as it needs to be run async on an lms worker - self.task_patcher = patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache') + self.task_patcher = patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2') self._mock_lms_task = self.task_patcher.start() def tearDown(self): diff --git a/openedx/core/djangoapps/content/block_structure/config/__init__.py b/openedx/core/djangoapps/content/block_structure/config/__init__.py index c5feba2990..ae473f8338 100644 --- a/openedx/core/djangoapps/content/block_structure/config/__init__.py +++ b/openedx/core/djangoapps/content/block_structure/config/__init__.py @@ -2,12 +2,17 @@ This module contains various configuration settings via waffle switches for the Block Structure framework. """ +import logging + from openedx.core.djangolib.waffle_utils import is_switch_enabled -from request_cache.middleware import request_cached +from request_cache.middleware import request_cached, RequestCache, func_call_cache_key from .models import BlockStructureConfiguration +log = logging.getLogger(__name__) + + INVALIDATE_CACHE_ON_PUBLISH = u'invalidate_cache_on_publish' STORAGE_BACKING_FOR_CACHE = u'storage_backing_for_cache' RAISE_ERROR_WHEN_NOT_FOUND = u'raise_error_when_not_found' @@ -23,6 +28,19 @@ def is_enabled(setting_name): return is_switch_enabled(bs_waffle_name) +def enable_for_current_request(setting_name): + """ + Enables the given block_structure setting for the + duration of the current request. + """ + cache_key = func_call_cache_key( + is_switch_enabled.request_cached_contained_func, + _bs_waffle_switch_name(setting_name), + ) + RequestCache.get_request_cache().data[cache_key] = True + log.warning(u'BlockStructure: Config %s is enabled for current request.', setting_name) + + @request_cached def num_versions_to_keep(): """ diff --git a/openedx/core/djangoapps/content/block_structure/management/commands/generate_course_blocks.py b/openedx/core/djangoapps/content/block_structure/management/commands/generate_course_blocks.py index 4f0d056ed7..ed7388acde 100644 --- a/openedx/core/djangoapps/content/block_structure/management/commands/generate_course_blocks.py +++ b/openedx/core/djangoapps/content/block_structure/management/commands/generate_course_blocks.py @@ -7,7 +7,7 @@ from django.core.management.base import BaseCommand from xmodule.modulestore.django import modulestore import openedx.core.djangoapps.content.block_structure.api as api -from openedx.core.djangoapps.content.block_structure.config import _bs_waffle_switch_name, STORAGE_BACKING_FOR_CACHE +from openedx.core.djangoapps.content.block_structure.config import STORAGE_BACKING_FOR_CACHE, enable_for_current_request import openedx.core.djangoapps.content.block_structure.tasks as tasks import openedx.core.djangoapps.content.block_structure.store as store from openedx.core.lib.command_utils import ( @@ -15,8 +15,6 @@ from openedx.core.lib.command_utils import ( validate_dependent_option, parse_course_keys, ) -from request_cache.middleware import RequestCache, func_call_cache_key -from openedx.core.djangolib.waffle_utils import is_switch_enabled log = logging.getLogger(__name__) @@ -100,9 +98,9 @@ class Command(BaseCommand): self._set_log_levels(options) - log.critical(u'STARTED generating Course Blocks for %d courses.', len(course_keys)) + log.critical(u'BlockStructure: STARTED generating Course Blocks for %d courses.', len(course_keys)) self._generate_course_blocks(options, course_keys) - log.critical(u'FINISHED generating Course Blocks for %d courses.', len(course_keys)) + log.critical(u'BlockStructure: FINISHED generating Course Blocks for %d courses.', len(course_keys)) def _set_log_levels(self, options): """ @@ -129,16 +127,14 @@ class Command(BaseCommand): Generates course blocks for the given course_keys per the given options. """ if options.get('with_storage'): - self._enable_storage() + enable_for_current_request(STORAGE_BACKING_FOR_CACHE) for course_key in course_keys: try: - log.info(u'STARTED generating Course Blocks for course: %s.', course_key) self._generate_for_course(options, course_key) - log.info(u'FINISHED generating Course Blocks for course: %s.', course_key) except Exception as ex: # pylint: disable=broad-except log.exception( - u'An error occurred while generating course blocks for %s: %s', + u'BlockStructure: An error occurred while generating course blocks for %s: %s', unicode(course_key), ex.message, ) @@ -148,20 +144,15 @@ class Command(BaseCommand): Generates course blocks for the given course_key per the given options. """ if options.get('enqueue_task'): - action = tasks.update_course_in_cache if options.get('force_update') else tasks.get_course_in_cache + action = tasks.update_course_in_cache_v2 if options.get('force_update') else tasks.get_course_in_cache_v2 task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {} - action.apply_async([unicode(course_key)], **task_options) + result = action.apply_async( + kwargs=dict(course_id=unicode(course_key), with_storage=options.get('with_storage')), + **task_options + ) + log.info(u'BlockStructure: ENQUEUED generating for course: %s, task_id: %s.', course_key, result.id) else: + log.info(u'BlockStructure: STARTED generating for course: %s.', course_key) action = api.update_course_in_cache if options.get('force_update') else api.get_course_in_cache action(course_key) - - def _enable_storage(self): - """ - Enables storage backing by setting the waffle's cached value to True. - """ - cache_key = func_call_cache_key( - is_switch_enabled.request_cached_contained_func, - _bs_waffle_switch_name(STORAGE_BACKING_FOR_CACHE), - ) - RequestCache.get_request_cache().data[cache_key] = True - log.warning(u'STORAGE_BACKING_FOR_CACHE is enabled.') + log.info(u'BlockStructure: FINISHED generating for course: %s.', course_key) diff --git a/openedx/core/djangoapps/content/block_structure/management/commands/tests/test_generate_course_blocks.py b/openedx/core/djangoapps/content/block_structure/management/commands/tests/test_generate_course_blocks.py index b2c873ab82..14bddeac7b 100644 --- a/openedx/core/djangoapps/content/block_structure/management/commands/tests/test_generate_course_blocks.py +++ b/openedx/core/djangoapps/content/block_structure/management/commands/tests/test_generate_course_blocks.py @@ -107,20 +107,20 @@ class TestGenerateCourseBlocks(ModuleStoreTestCase): command_options['routing_key'] = routing_key with patch( - 'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.tasks' + 'openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.tasks' ) as mock_tasks: with patch( - 'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.api' + 'openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.api' ) as mock_api: self.command.handle(**command_options) self.assertEqual( - mock_tasks.update_course_in_cache.apply_async.call_count, + mock_tasks.update_course_in_cache_v2.apply_async.call_count, self.num_courses if enqueue_task and force_update else 0, ) self.assertEqual( - mock_tasks.get_course_in_cache.apply_async.call_count, + mock_tasks.get_course_in_cache_v2.apply_async.call_count, self.num_courses if enqueue_task and not force_update else 0, ) @@ -134,14 +134,17 @@ class TestGenerateCourseBlocks(ModuleStoreTestCase): ) if enqueue_task: - task_action = mock_tasks.update_course_in_cache if force_update else mock_tasks.get_course_in_cache + if force_update: + task_action = mock_tasks.update_course_in_cache_v2 + else: + task_action = mock_tasks.get_course_in_cache_v2 task_options = task_action.apply_async.call_args[1] if routing_key: self.assertEquals(task_options['routing_key'], routing_key) else: self.assertNotIn('routing_key', task_options) - @patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log') + @patch('openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.log') def test_not_found_key(self, mock_log): self.command.handle(courses=['fake/course/id']) self.assertTrue(mock_log.exception.called) diff --git a/openedx/core/djangoapps/content/block_structure/signals.py b/openedx/core/djangoapps/content/block_structure/signals.py index fdc010561e..baba9f8590 100644 --- a/openedx/core/djangoapps/content/block_structure/signals.py +++ b/openedx/core/djangoapps/content/block_structure/signals.py @@ -10,11 +10,11 @@ from opaque_keys.edx.locator import LibraryLocator from . import config from .api import clear_course_from_cache -from .tasks import update_course_in_cache +from .tasks import update_course_in_cache_v2 @receiver(SignalHandler.course_published) -def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument +def _update_block_structure_on_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument """ Catches the signal that a course has been published in the module store and creates/updates the corresponding cache entry. @@ -26,14 +26,14 @@ def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable if config.is_enabled(config.INVALIDATE_CACHE_ON_PUBLISH): clear_course_from_cache(course_key) - update_course_in_cache.apply_async( - [unicode(course_key)], + update_course_in_cache_v2.apply_async( + kwargs=dict(course_id=unicode(course_key)), countdown=settings.BLOCK_STRUCTURES_SETTINGS['COURSE_PUBLISH_TASK_DELAY'], ) @receiver(SignalHandler.course_deleted) -def _listen_for_course_delete(sender, course_key, **kwargs): # pylint: disable=unused-argument +def _delete_block_structure_on_course_delete(sender, course_key, **kwargs): # pylint: disable=unused-argument """ Catches the signal that a course has been deleted from the module store and invalidates the corresponding cache entry if one diff --git a/openedx/core/djangoapps/content/block_structure/tasks.py b/openedx/core/djangoapps/content/block_structure/tasks.py index 3a4abbc0b5..feabe659ed 100644 --- a/openedx/core/djangoapps/content/block_structure/tasks.py +++ b/openedx/core/djangoapps/content/block_structure/tasks.py @@ -13,6 +13,7 @@ from opaque_keys.edx.keys import CourseKey from xmodule.modulestore.exceptions import ItemNotFoundError from openedx.core.djangoapps.content.block_structure import api +from openedx.core.djangoapps.content.block_structure.config import STORAGE_BACKING_FOR_CACHE, enable_for_current_request log = logging.getLogger('edx.celery.task') @@ -21,53 +22,101 @@ RETRY_TASKS = (ItemNotFoundError, TypeError, ValInternalError) NO_RETRY_TASKS = (XMLSyntaxError, LoncapaProblemError, UnicodeEncodeError) -@task( - default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'], - max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'], - bind=True, -) +def block_structure_task(**kwargs): + """ + Decorator for block structure tasks. + """ + return task( + default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'], + max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'], + bind=True, + **kwargs + ) + + +@block_structure_task() +def update_course_in_cache_v2(self, **kwargs): + """ + Updates the course blocks (mongo -> BlockStructure) for the specified course. + Keyword Arguments: + course_id (string) - The string serialized value of the course key. + with_storage (boolean) - Whether or not storage backing should be + enabled for the generated block structure(s). + """ + _update_course_in_cache(self, **kwargs) + + +@block_structure_task() def update_course_in_cache(self, course_id): """ - Updates the course blocks (in the database) for the specified course. + Updates the course blocks (mongo -> BlockStructure) for the specified course. """ - _call_and_retry_if_needed(course_id, api.update_course_in_cache, update_course_in_cache, self.request.id) + _update_course_in_cache(self, course_id=course_id) -@task( - default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'], - max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'], - bind=True, -) +def _update_course_in_cache(self, **kwargs): + """ + Updates the course blocks (mongo -> BlockStructure) for the specified course. + """ + if kwargs.get('with_storage'): + enable_for_current_request(STORAGE_BACKING_FOR_CACHE) + _call_and_retry_if_needed(self, api.update_course_in_cache, **kwargs) + + +@block_structure_task() +def get_course_in_cache_v2(self, **kwargs): + """ + Gets the course blocks for the specified course, updating the cache if needed. + Keyword Arguments: + course_id (string) - The string serialized value of the course key. + with_storage (boolean) - Whether or not storage backing should be + enabled for any generated block structure(s). + """ + _get_course_in_cache(self, **kwargs) + + +@block_structure_task() def get_course_in_cache(self, course_id): """ Gets the course blocks for the specified course, updating the cache if needed. """ - _call_and_retry_if_needed(course_id, api.get_course_in_cache, get_course_in_cache, self.request.id) + _get_course_in_cache(self, course_id=course_id) -def _call_and_retry_if_needed(course_id, api_method, task_method, task_id): +def _get_course_in_cache(self, **kwargs): + """ + Gets the course blocks for the specified course, updating the cache if needed. + """ + if kwargs.get('with_storage'): + enable_for_current_request(STORAGE_BACKING_FOR_CACHE) + _call_and_retry_if_needed(self, api.get_course_in_cache, **kwargs) + + +def _call_and_retry_if_needed(self, api_method, **kwargs): """ Calls the given api_method with the given course_id, retrying task_method upon failure. """ try: - course_key = CourseKey.from_string(course_id) + course_key = CourseKey.from_string(kwargs['course_id']) api_method(course_key) - except NO_RETRY_TASKS as exc: + except NO_RETRY_TASKS: # Known unrecoverable errors log.exception( - "update_course_in_cache encountered unrecoverable error in course {}, task_id {}".format( - course_id, - task_id - ) + "BlockStructure: %s encountered unrecoverable error in course %s, task_id %s", + self.__name__, + kwargs.get('course_id'), + self.request.id, ) raise except RETRY_TASKS as exc: - log.exception("%s encountered expected error, retrying.", task_method.__name__) - raise task_method.retry(args=[course_id], exc=exc) + log.exception("%s encountered expected error, retrying.", self.__name__) + raise self.retry(kwargs=kwargs, exc=exc) except Exception as exc: # pylint: disable=broad-except log.exception( - "%s encountered unknown error. Retry #%d", - task_method.__name__, - task_method.request.retries, + "BlockStructure: %s encountered unknown error in course %s, task_id %s. Retry #%d", + self.__name__, + kwargs.get('course_id'), + self.request.id, + self.request.retries, ) - raise task_method.retry(args=[course_id], exc=exc) + raise self.retry(kwargs=kwargs, exc=exc) diff --git a/openedx/core/djangoapps/content/block_structure/tests/test_signals.py b/openedx/core/djangoapps/content/block_structure/tests/test_signals.py index ae53af7c2d..539b1d8fb6 100644 --- a/openedx/core/djangoapps/content/block_structure/tests/test_signals.py +++ b/openedx/core/djangoapps/content/block_structure/tests/test_signals.py @@ -11,7 +11,7 @@ from xmodule.modulestore.tests.factories import CourseFactory from ..api import get_block_structure_manager from ..config import INVALIDATE_CACHE_ON_PUBLISH -from ..signals import _listen_for_course_publish +from ..signals import _update_block_structure_on_course_publish from .helpers import is_course_in_block_structure_cache, override_config_setting @@ -76,7 +76,7 @@ class CourseBlocksSignalTest(ModuleStoreTestCase): (LibraryLocator(org='org', course='course'), False), ) @ddt.unpack - @patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache.apply_async') + @patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2.apply_async') def test_update_only_for_courses(self, key, expect_update_called, mock_update): - _listen_for_course_publish(sender=None, course_key=key) + _update_block_structure_on_course_publish(sender=None, course_key=key) self.assertEqual(mock_update.called, expect_update_called) diff --git a/openedx/core/djangoapps/content/block_structure/tests/test_tasks.py b/openedx/core/djangoapps/content/block_structure/tests/test_tasks.py index dd38d0736c..a4d3bf2f3b 100644 --- a/openedx/core/djangoapps/content/block_structure/tests/test_tasks.py +++ b/openedx/core/djangoapps/content/block_structure/tests/test_tasks.py @@ -6,19 +6,19 @@ from mock import patch from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase -from ..tasks import update_course_in_cache +from ..tasks import update_course_in_cache_v2 class UpdateCourseInCacheTaskTest(ModuleStoreTestCase): """ Ensures that the update_course_in_cache task runs as expected. """ - @patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache.retry') + @patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2.retry') @patch('openedx.core.djangoapps.content.block_structure.api.update_course_in_cache') def test_retry_on_error(self, mock_update, mock_retry): """ Ensures that tasks will be retried if IntegrityErrors are encountered. """ mock_update.side_effect = Exception("WHAMMY") - update_course_in_cache.apply(args=["invalid_course_key raises exception 12345 meow"]) + update_course_in_cache_v2.apply(kwargs=dict(course_id="invalid_course_key raises exception 12345 meow")) self.assertTrue(mock_retry.called)