Have generate_course_blocks pass with_storage option to celery tasks
This commit is contained in:
@@ -33,4 +33,5 @@ class Router(AlternateEnvironmentRouter):
|
||||
"""
|
||||
return {
|
||||
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
|
||||
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ class ContentStoreImportTest(SignalDisconnectTestMixin, ModuleStoreTestCase):
|
||||
self.client.login(username=self.user.username, password=self.user_password)
|
||||
|
||||
# block_structure.update_course_in_cache cannot succeed in tests, as it needs to be run async on an lms worker
|
||||
self.task_patcher = patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache')
|
||||
self.task_patcher = patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2')
|
||||
self._mock_lms_task = self.task_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
|
||||
@@ -2,12 +2,17 @@
|
||||
This module contains various configuration settings via
|
||||
waffle switches for the Block Structure framework.
|
||||
"""
|
||||
import logging
|
||||
|
||||
from openedx.core.djangolib.waffle_utils import is_switch_enabled
|
||||
from request_cache.middleware import request_cached
|
||||
from request_cache.middleware import request_cached, RequestCache, func_call_cache_key
|
||||
|
||||
from .models import BlockStructureConfiguration
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
INVALIDATE_CACHE_ON_PUBLISH = u'invalidate_cache_on_publish'
|
||||
STORAGE_BACKING_FOR_CACHE = u'storage_backing_for_cache'
|
||||
RAISE_ERROR_WHEN_NOT_FOUND = u'raise_error_when_not_found'
|
||||
@@ -23,6 +28,19 @@ def is_enabled(setting_name):
|
||||
return is_switch_enabled(bs_waffle_name)
|
||||
|
||||
|
||||
def enable_for_current_request(setting_name):
|
||||
"""
|
||||
Enables the given block_structure setting for the
|
||||
duration of the current request.
|
||||
"""
|
||||
cache_key = func_call_cache_key(
|
||||
is_switch_enabled.request_cached_contained_func,
|
||||
_bs_waffle_switch_name(setting_name),
|
||||
)
|
||||
RequestCache.get_request_cache().data[cache_key] = True
|
||||
log.warning(u'BlockStructure: Config %s is enabled for current request.', setting_name)
|
||||
|
||||
|
||||
@request_cached
|
||||
def num_versions_to_keep():
|
||||
"""
|
||||
|
||||
@@ -7,7 +7,7 @@ from django.core.management.base import BaseCommand
|
||||
from xmodule.modulestore.django import modulestore
|
||||
|
||||
import openedx.core.djangoapps.content.block_structure.api as api
|
||||
from openedx.core.djangoapps.content.block_structure.config import _bs_waffle_switch_name, STORAGE_BACKING_FOR_CACHE
|
||||
from openedx.core.djangoapps.content.block_structure.config import STORAGE_BACKING_FOR_CACHE, enable_for_current_request
|
||||
import openedx.core.djangoapps.content.block_structure.tasks as tasks
|
||||
import openedx.core.djangoapps.content.block_structure.store as store
|
||||
from openedx.core.lib.command_utils import (
|
||||
@@ -15,8 +15,6 @@ from openedx.core.lib.command_utils import (
|
||||
validate_dependent_option,
|
||||
parse_course_keys,
|
||||
)
|
||||
from request_cache.middleware import RequestCache, func_call_cache_key
|
||||
from openedx.core.djangolib.waffle_utils import is_switch_enabled
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -100,9 +98,9 @@ class Command(BaseCommand):
|
||||
|
||||
self._set_log_levels(options)
|
||||
|
||||
log.critical(u'STARTED generating Course Blocks for %d courses.', len(course_keys))
|
||||
log.critical(u'BlockStructure: STARTED generating Course Blocks for %d courses.', len(course_keys))
|
||||
self._generate_course_blocks(options, course_keys)
|
||||
log.critical(u'FINISHED generating Course Blocks for %d courses.', len(course_keys))
|
||||
log.critical(u'BlockStructure: FINISHED generating Course Blocks for %d courses.', len(course_keys))
|
||||
|
||||
def _set_log_levels(self, options):
|
||||
"""
|
||||
@@ -129,16 +127,14 @@ class Command(BaseCommand):
|
||||
Generates course blocks for the given course_keys per the given options.
|
||||
"""
|
||||
if options.get('with_storage'):
|
||||
self._enable_storage()
|
||||
enable_for_current_request(STORAGE_BACKING_FOR_CACHE)
|
||||
|
||||
for course_key in course_keys:
|
||||
try:
|
||||
log.info(u'STARTED generating Course Blocks for course: %s.', course_key)
|
||||
self._generate_for_course(options, course_key)
|
||||
log.info(u'FINISHED generating Course Blocks for course: %s.', course_key)
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
log.exception(
|
||||
u'An error occurred while generating course blocks for %s: %s',
|
||||
u'BlockStructure: An error occurred while generating course blocks for %s: %s',
|
||||
unicode(course_key),
|
||||
ex.message,
|
||||
)
|
||||
@@ -148,20 +144,15 @@ class Command(BaseCommand):
|
||||
Generates course blocks for the given course_key per the given options.
|
||||
"""
|
||||
if options.get('enqueue_task'):
|
||||
action = tasks.update_course_in_cache if options.get('force_update') else tasks.get_course_in_cache
|
||||
action = tasks.update_course_in_cache_v2 if options.get('force_update') else tasks.get_course_in_cache_v2
|
||||
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
|
||||
action.apply_async([unicode(course_key)], **task_options)
|
||||
result = action.apply_async(
|
||||
kwargs=dict(course_id=unicode(course_key), with_storage=options.get('with_storage')),
|
||||
**task_options
|
||||
)
|
||||
log.info(u'BlockStructure: ENQUEUED generating for course: %s, task_id: %s.', course_key, result.id)
|
||||
else:
|
||||
log.info(u'BlockStructure: STARTED generating for course: %s.', course_key)
|
||||
action = api.update_course_in_cache if options.get('force_update') else api.get_course_in_cache
|
||||
action(course_key)
|
||||
|
||||
def _enable_storage(self):
|
||||
"""
|
||||
Enables storage backing by setting the waffle's cached value to True.
|
||||
"""
|
||||
cache_key = func_call_cache_key(
|
||||
is_switch_enabled.request_cached_contained_func,
|
||||
_bs_waffle_switch_name(STORAGE_BACKING_FOR_CACHE),
|
||||
)
|
||||
RequestCache.get_request_cache().data[cache_key] = True
|
||||
log.warning(u'STORAGE_BACKING_FOR_CACHE is enabled.')
|
||||
log.info(u'BlockStructure: FINISHED generating for course: %s.', course_key)
|
||||
|
||||
@@ -107,20 +107,20 @@ class TestGenerateCourseBlocks(ModuleStoreTestCase):
|
||||
command_options['routing_key'] = routing_key
|
||||
|
||||
with patch(
|
||||
'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.tasks'
|
||||
'openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.tasks'
|
||||
) as mock_tasks:
|
||||
with patch(
|
||||
'lms.djangoapps.course_blocks.management.commands.generate_course_blocks.api'
|
||||
'openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.api'
|
||||
) as mock_api:
|
||||
|
||||
self.command.handle(**command_options)
|
||||
|
||||
self.assertEqual(
|
||||
mock_tasks.update_course_in_cache.apply_async.call_count,
|
||||
mock_tasks.update_course_in_cache_v2.apply_async.call_count,
|
||||
self.num_courses if enqueue_task and force_update else 0,
|
||||
)
|
||||
self.assertEqual(
|
||||
mock_tasks.get_course_in_cache.apply_async.call_count,
|
||||
mock_tasks.get_course_in_cache_v2.apply_async.call_count,
|
||||
self.num_courses if enqueue_task and not force_update else 0,
|
||||
)
|
||||
|
||||
@@ -134,14 +134,17 @@ class TestGenerateCourseBlocks(ModuleStoreTestCase):
|
||||
)
|
||||
|
||||
if enqueue_task:
|
||||
task_action = mock_tasks.update_course_in_cache if force_update else mock_tasks.get_course_in_cache
|
||||
if force_update:
|
||||
task_action = mock_tasks.update_course_in_cache_v2
|
||||
else:
|
||||
task_action = mock_tasks.get_course_in_cache_v2
|
||||
task_options = task_action.apply_async.call_args[1]
|
||||
if routing_key:
|
||||
self.assertEquals(task_options['routing_key'], routing_key)
|
||||
else:
|
||||
self.assertNotIn('routing_key', task_options)
|
||||
|
||||
@patch('lms.djangoapps.course_blocks.management.commands.generate_course_blocks.log')
|
||||
@patch('openedx.core.djangoapps.content.block_structure.management.commands.generate_course_blocks.log')
|
||||
def test_not_found_key(self, mock_log):
|
||||
self.command.handle(courses=['fake/course/id'])
|
||||
self.assertTrue(mock_log.exception.called)
|
||||
|
||||
@@ -10,11 +10,11 @@ from opaque_keys.edx.locator import LibraryLocator
|
||||
|
||||
from . import config
|
||||
from .api import clear_course_from_cache
|
||||
from .tasks import update_course_in_cache
|
||||
from .tasks import update_course_in_cache_v2
|
||||
|
||||
|
||||
@receiver(SignalHandler.course_published)
|
||||
def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
|
||||
def _update_block_structure_on_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
|
||||
"""
|
||||
Catches the signal that a course has been published in the module
|
||||
store and creates/updates the corresponding cache entry.
|
||||
@@ -26,14 +26,14 @@ def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable
|
||||
if config.is_enabled(config.INVALIDATE_CACHE_ON_PUBLISH):
|
||||
clear_course_from_cache(course_key)
|
||||
|
||||
update_course_in_cache.apply_async(
|
||||
[unicode(course_key)],
|
||||
update_course_in_cache_v2.apply_async(
|
||||
kwargs=dict(course_id=unicode(course_key)),
|
||||
countdown=settings.BLOCK_STRUCTURES_SETTINGS['COURSE_PUBLISH_TASK_DELAY'],
|
||||
)
|
||||
|
||||
|
||||
@receiver(SignalHandler.course_deleted)
|
||||
def _listen_for_course_delete(sender, course_key, **kwargs): # pylint: disable=unused-argument
|
||||
def _delete_block_structure_on_course_delete(sender, course_key, **kwargs): # pylint: disable=unused-argument
|
||||
"""
|
||||
Catches the signal that a course has been deleted from the
|
||||
module store and invalidates the corresponding cache entry if one
|
||||
|
||||
@@ -13,6 +13,7 @@ from opaque_keys.edx.keys import CourseKey
|
||||
|
||||
from xmodule.modulestore.exceptions import ItemNotFoundError
|
||||
from openedx.core.djangoapps.content.block_structure import api
|
||||
from openedx.core.djangoapps.content.block_structure.config import STORAGE_BACKING_FOR_CACHE, enable_for_current_request
|
||||
|
||||
log = logging.getLogger('edx.celery.task')
|
||||
|
||||
@@ -21,53 +22,101 @@ RETRY_TASKS = (ItemNotFoundError, TypeError, ValInternalError)
|
||||
NO_RETRY_TASKS = (XMLSyntaxError, LoncapaProblemError, UnicodeEncodeError)
|
||||
|
||||
|
||||
@task(
|
||||
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'],
|
||||
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'],
|
||||
bind=True,
|
||||
)
|
||||
def block_structure_task(**kwargs):
|
||||
"""
|
||||
Decorator for block structure tasks.
|
||||
"""
|
||||
return task(
|
||||
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'],
|
||||
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'],
|
||||
bind=True,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
@block_structure_task()
|
||||
def update_course_in_cache_v2(self, **kwargs):
|
||||
"""
|
||||
Updates the course blocks (mongo -> BlockStructure) for the specified course.
|
||||
Keyword Arguments:
|
||||
course_id (string) - The string serialized value of the course key.
|
||||
with_storage (boolean) - Whether or not storage backing should be
|
||||
enabled for the generated block structure(s).
|
||||
"""
|
||||
_update_course_in_cache(self, **kwargs)
|
||||
|
||||
|
||||
@block_structure_task()
|
||||
def update_course_in_cache(self, course_id):
|
||||
"""
|
||||
Updates the course blocks (in the database) for the specified course.
|
||||
Updates the course blocks (mongo -> BlockStructure) for the specified course.
|
||||
"""
|
||||
_call_and_retry_if_needed(course_id, api.update_course_in_cache, update_course_in_cache, self.request.id)
|
||||
_update_course_in_cache(self, course_id=course_id)
|
||||
|
||||
|
||||
@task(
|
||||
default_retry_delay=settings.BLOCK_STRUCTURES_SETTINGS['TASK_DEFAULT_RETRY_DELAY'],
|
||||
max_retries=settings.BLOCK_STRUCTURES_SETTINGS['TASK_MAX_RETRIES'],
|
||||
bind=True,
|
||||
)
|
||||
def _update_course_in_cache(self, **kwargs):
|
||||
"""
|
||||
Updates the course blocks (mongo -> BlockStructure) for the specified course.
|
||||
"""
|
||||
if kwargs.get('with_storage'):
|
||||
enable_for_current_request(STORAGE_BACKING_FOR_CACHE)
|
||||
_call_and_retry_if_needed(self, api.update_course_in_cache, **kwargs)
|
||||
|
||||
|
||||
@block_structure_task()
|
||||
def get_course_in_cache_v2(self, **kwargs):
|
||||
"""
|
||||
Gets the course blocks for the specified course, updating the cache if needed.
|
||||
Keyword Arguments:
|
||||
course_id (string) - The string serialized value of the course key.
|
||||
with_storage (boolean) - Whether or not storage backing should be
|
||||
enabled for any generated block structure(s).
|
||||
"""
|
||||
_get_course_in_cache(self, **kwargs)
|
||||
|
||||
|
||||
@block_structure_task()
|
||||
def get_course_in_cache(self, course_id):
|
||||
"""
|
||||
Gets the course blocks for the specified course, updating the cache if needed.
|
||||
"""
|
||||
_call_and_retry_if_needed(course_id, api.get_course_in_cache, get_course_in_cache, self.request.id)
|
||||
_get_course_in_cache(self, course_id=course_id)
|
||||
|
||||
|
||||
def _call_and_retry_if_needed(course_id, api_method, task_method, task_id):
|
||||
def _get_course_in_cache(self, **kwargs):
|
||||
"""
|
||||
Gets the course blocks for the specified course, updating the cache if needed.
|
||||
"""
|
||||
if kwargs.get('with_storage'):
|
||||
enable_for_current_request(STORAGE_BACKING_FOR_CACHE)
|
||||
_call_and_retry_if_needed(self, api.get_course_in_cache, **kwargs)
|
||||
|
||||
|
||||
def _call_and_retry_if_needed(self, api_method, **kwargs):
|
||||
"""
|
||||
Calls the given api_method with the given course_id, retrying task_method upon failure.
|
||||
"""
|
||||
try:
|
||||
course_key = CourseKey.from_string(course_id)
|
||||
course_key = CourseKey.from_string(kwargs['course_id'])
|
||||
api_method(course_key)
|
||||
except NO_RETRY_TASKS as exc:
|
||||
except NO_RETRY_TASKS:
|
||||
# Known unrecoverable errors
|
||||
log.exception(
|
||||
"update_course_in_cache encountered unrecoverable error in course {}, task_id {}".format(
|
||||
course_id,
|
||||
task_id
|
||||
)
|
||||
"BlockStructure: %s encountered unrecoverable error in course %s, task_id %s",
|
||||
self.__name__,
|
||||
kwargs.get('course_id'),
|
||||
self.request.id,
|
||||
)
|
||||
raise
|
||||
except RETRY_TASKS as exc:
|
||||
log.exception("%s encountered expected error, retrying.", task_method.__name__)
|
||||
raise task_method.retry(args=[course_id], exc=exc)
|
||||
log.exception("%s encountered expected error, retrying.", self.__name__)
|
||||
raise self.retry(kwargs=kwargs, exc=exc)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
log.exception(
|
||||
"%s encountered unknown error. Retry #%d",
|
||||
task_method.__name__,
|
||||
task_method.request.retries,
|
||||
"BlockStructure: %s encountered unknown error in course %s, task_id %s. Retry #%d",
|
||||
self.__name__,
|
||||
kwargs.get('course_id'),
|
||||
self.request.id,
|
||||
self.request.retries,
|
||||
)
|
||||
raise task_method.retry(args=[course_id], exc=exc)
|
||||
raise self.retry(kwargs=kwargs, exc=exc)
|
||||
|
||||
@@ -11,7 +11,7 @@ from xmodule.modulestore.tests.factories import CourseFactory
|
||||
|
||||
from ..api import get_block_structure_manager
|
||||
from ..config import INVALIDATE_CACHE_ON_PUBLISH
|
||||
from ..signals import _listen_for_course_publish
|
||||
from ..signals import _update_block_structure_on_course_publish
|
||||
from .helpers import is_course_in_block_structure_cache, override_config_setting
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ class CourseBlocksSignalTest(ModuleStoreTestCase):
|
||||
(LibraryLocator(org='org', course='course'), False),
|
||||
)
|
||||
@ddt.unpack
|
||||
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache.apply_async')
|
||||
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2.apply_async')
|
||||
def test_update_only_for_courses(self, key, expect_update_called, mock_update):
|
||||
_listen_for_course_publish(sender=None, course_key=key)
|
||||
_update_block_structure_on_course_publish(sender=None, course_key=key)
|
||||
self.assertEqual(mock_update.called, expect_update_called)
|
||||
|
||||
@@ -6,19 +6,19 @@ from mock import patch
|
||||
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
|
||||
from ..tasks import update_course_in_cache
|
||||
from ..tasks import update_course_in_cache_v2
|
||||
|
||||
|
||||
class UpdateCourseInCacheTaskTest(ModuleStoreTestCase):
|
||||
"""
|
||||
Ensures that the update_course_in_cache task runs as expected.
|
||||
"""
|
||||
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache.retry')
|
||||
@patch('openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2.retry')
|
||||
@patch('openedx.core.djangoapps.content.block_structure.api.update_course_in_cache')
|
||||
def test_retry_on_error(self, mock_update, mock_retry):
|
||||
"""
|
||||
Ensures that tasks will be retried if IntegrityErrors are encountered.
|
||||
"""
|
||||
mock_update.side_effect = Exception("WHAMMY")
|
||||
update_course_in_cache.apply(args=["invalid_course_key raises exception 12345 meow"])
|
||||
update_course_in_cache_v2.apply(kwargs=dict(course_id="invalid_course_key raises exception 12345 meow"))
|
||||
self.assertTrue(mock_retry.called)
|
||||
|
||||
Reference in New Issue
Block a user