diff --git a/cms/djangoapps/contentstore/views/course.py b/cms/djangoapps/contentstore/views/course.py index 93f0c9c6cf..8ba35628c2 100644 --- a/cms/djangoapps/contentstore/views/course.py +++ b/cms/djangoapps/contentstore/views/course.py @@ -423,7 +423,7 @@ def course_index(request, course_key): """ # A depth of None implies the whole course. The course outline needs this in order to compute has_changes. # A unit may not have a draft version, but one of its components could, and hence the unit itself has changes. - with modulestore().bulk_temp_noop_operations(course_key): # FIXME + with modulestore().bulk_operations(course_key): course_module = _get_course_module(course_key, request.user, depth=None) lms_link = get_lms_link_for_item(course_module.location) sections = course_module.get_children() diff --git a/common/lib/xmodule/xmodule/modulestore/__init__.py b/common/lib/xmodule/xmodule/modulestore/__init__.py index 01b370d62c..592c455e32 100644 --- a/common/lib/xmodule/xmodule/modulestore/__init__.py +++ b/common/lib/xmodule/xmodule/modulestore/__init__.py @@ -25,6 +25,7 @@ from opaque_keys.edx.locations import SlashSeparatedCourseKey from xblock.runtime import Mixologist from xblock.core import XBlock import functools +import threading log = logging.getLogger('edx.modulestore') @@ -91,6 +92,173 @@ class ModuleStoreEnum(object): test = -3 +class BulkOpsRecord(object): + """ + For handling nesting of bulk operations + """ + def __init__(self): + self._active_count = 0 + + @property + def active(self): + """ + Return whether this bulk write is active. + """ + return self._active_count > 0 + + def nest(self): + """ + Record another level of nesting of this bulk write operation + """ + self._active_count += 1 + + def unnest(self): + """ + Record the completion of a level of nesting of the bulk write operation + """ + self._active_count -= 1 + + @property + def is_root(self): + """ + Return whether the bulk write is at the root (first) level of nesting + """ + return self._active_count == 1 + + +class ActiveBulkThread(threading.local): + """ + Add the expected vars to the thread. + """ + def __init__(self, bulk_ops_record_type, **kwargs): + super(ActiveBulkThread, self).__init__(**kwargs) + self.records = defaultdict(bulk_ops_record_type) + + +class BulkOperationsMixin(object): + """ + This implements the :meth:`bulk_operations` modulestore semantics which handles nested invocations + + In particular, it implements :meth:`_begin_bulk_operation` and + :meth:`_end_bulk_operation` to provide the external interface + + Internally, this mixin records the set of all active bulk operations (keyed on the active course), + and only writes those values when :meth:`_end_bulk_operation` is called. + If a bulk write operation isn't active, then the changes are immediately written to the underlying + mongo_connection. + """ + def __init__(self, *args, **kwargs): + super(BulkOperationsMixin, self).__init__(*args, **kwargs) + self._active_bulk_ops = ActiveBulkThread(self._bulk_ops_record_type) + + @contextmanager + def bulk_operations(self, course_id): + """ + A context manager for notifying the store of bulk operations. This affects only the current thread. + + In the case of Mongo, it temporarily disables refreshing the metadata inheritance tree + until the bulk operation is completed. + """ + try: + self._begin_bulk_operation(course_id) + yield + finally: + self._end_bulk_operation(course_id) + + # the relevant type of bulk_ops_record for the mixin (overriding classes should override + # this variable) + _bulk_ops_record_type = BulkOpsRecord + + def _get_bulk_ops_record(self, course_key, ignore_case=False): + """ + Return the :class:`.BulkOpsRecord` for this course. + """ + if course_key is None: + return self._bulk_ops_record_type() + + # Retrieve the bulk record based on matching org/course/run (possibly ignoring case) + if ignore_case: + for key, record in self._active_bulk_ops.records.iteritems(): + if ( + key.org.lower() == course_key.org.lower() and + key.course.lower() == course_key.course.lower() and + key.run.lower() == course_key.run.lower() + ): + return record + return self._active_bulk_ops.records[course_key.for_branch(None)] + + @property + def _active_records(self): + """ + Yield all active (CourseLocator, BulkOpsRecord) tuples. + """ + for course_key, record in self._active_bulk_ops.records.iteritems(): + if record.active: + yield (course_key, record) + + def _clear_bulk_ops_record(self, course_key): + """ + Clear the record for this course + """ + del self._active_bulk_ops.records[course_key.for_branch(None)] + + def _start_outermost_bulk_operation(self, bulk_ops_record, course_key): + """ + The outermost nested bulk_operation call: do the actual begin of the bulk operation. + + Implementing classes must override this method; otherwise, the bulk operations are a noop + """ + pass + + def _begin_bulk_operation(self, course_key): + """ + Begin a bulk operation on course_key. + """ + bulk_ops_record = self._get_bulk_ops_record(course_key) + + # Increment the number of active bulk operations (bulk operations + # on the same course can be nested) + bulk_ops_record.nest() + + # If this is the highest level bulk operation, then initialize it + if bulk_ops_record.is_root: + self._start_outermost_bulk_operation(bulk_ops_record, course_key) + + def _end_outermost_bulk_operation(self, bulk_ops_record, course_key): + """ + The outermost nested bulk_operation call: do the actual end of the bulk operation. + + Implementing classes must override this method; otherwise, the bulk operations are a noop + """ + pass + + def _end_bulk_operation(self, course_key): + """ + End the active bulk operation on course_key. + """ + # If no bulk op is active, return + bulk_ops_record = self._get_bulk_ops_record(course_key) + if not bulk_ops_record.active: + return + + bulk_ops_record.unnest() + + # If this wasn't the outermost context, then don't close out the + # bulk operation. + if bulk_ops_record.active: + return + + self._end_outermost_bulk_operation(bulk_ops_record, course_key) + + self._clear_bulk_ops_record(course_key) + + def _is_in_bulk_operation(self, course_key, ignore_case=False): + """ + Return whether a bulk operation is active on `course_key`. + """ + return self._get_bulk_ops_record(course_key, ignore_case).active + + class ModuleStoreRead(object): """ An abstract interface for a database backend that stores XModuleDescriptor @@ -436,7 +604,7 @@ class ModuleStoreWrite(ModuleStoreRead): pass -class ModuleStoreReadBase(ModuleStoreRead): +class ModuleStoreReadBase(BulkOperationsMixin, ModuleStoreRead): ''' Implement interface functionality that can be shared. ''' @@ -456,7 +624,9 @@ class ModuleStoreReadBase(ModuleStoreRead): ''' Set up the error-tracking logic. ''' + super(ModuleStoreReadBase, self).__init__() self._course_errors = defaultdict(make_error_tracker) # location -> ErrorLog + # TODO move the inheritance_cache_subsystem to classes which use it self.metadata_inheritance_cache_subsystem = metadata_inheritance_cache_subsystem self.request_cache = request_cache self.xblock_mixins = xblock_mixins @@ -551,41 +721,6 @@ class ModuleStoreReadBase(ModuleStoreRead): raise ValueError(u"Cannot set default store to type {}".format(store_type)) yield - @contextmanager - def bulk_operations(self, course_id): - """ - A context manager for notifying the store of bulk operations. This affects only the current thread. - - In the case of Mongo, it temporarily disables refreshing the metadata inheritance tree - until the bulk operation is completed. - """ - # TODO: Make this multi-process-safe if future operations need it. - try: - self._begin_bulk_operation(course_id) - yield - finally: - self._end_bulk_operation(course_id) - - @contextmanager - def bulk_temp_noop_operations(self, course_id): - """ - A hotfix noop b/c old mongo does not properly handle nested bulk operations and does unnecessary work - if the bulk operation only reads data. Replace with bulk_operations once fixed (or don't merge to master) - """ - yield - - def _begin_bulk_operation(self, course_id): - """ - Begin a bulk write operation on course_id. - """ - pass - - def _end_bulk_operation(self, course_id): - """ - End the active bulk write operation on course_id. - """ - pass - @staticmethod def memoize_request_cache(func): """ @@ -608,6 +743,7 @@ class ModuleStoreReadBase(ModuleStoreRead): return func(self, *args, **kwargs) return wrapper + def hashvalue(arg): """ If arg is an xblock, use its location. otherwise just turn it into a string @@ -617,6 +753,7 @@ def hashvalue(arg): else: return unicode(arg) + class ModuleStoreWriteBase(ModuleStoreReadBase, ModuleStoreWrite): ''' Implement interface functionality that can be shared. diff --git a/common/lib/xmodule/xmodule/modulestore/draft_and_published.py b/common/lib/xmodule/xmodule/modulestore/draft_and_published.py index 9b521a68a1..5f36e11ceb 100644 --- a/common/lib/xmodule/xmodule/modulestore/draft_and_published.py +++ b/common/lib/xmodule/xmodule/modulestore/draft_and_published.py @@ -10,6 +10,7 @@ from . import ModuleStoreEnum # Things w/ these categories should never be marked as version=DRAFT DIRECT_ONLY_CATEGORIES = ['course', 'chapter', 'sequential', 'about', 'static_tab', 'course_info'] + class BranchSettingMixin(object): """ A mixin to manage a module store's branch setting. diff --git a/common/lib/xmodule/xmodule/modulestore/mongo/base.py b/common/lib/xmodule/xmodule/modulestore/mongo/base.py index c23ffcc6a5..6864134b62 100644 --- a/common/lib/xmodule/xmodule/modulestore/mongo/base.py +++ b/common/lib/xmodule/xmodule/modulestore/mongo/base.py @@ -17,7 +17,6 @@ import sys import logging import copy import re -import threading from uuid import uuid4 from bson.son import SON @@ -34,7 +33,7 @@ from xblock.runtime import KvsFieldData from xblock.exceptions import InvalidScopeError from xblock.fields import Scope, ScopeIds, Reference, ReferenceList, ReferenceValueDict -from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum +from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum, BulkOperationsMixin, BulkOpsRecord from xmodule.modulestore.draft_and_published import ModuleStoreDraftAndPublished, DIRECT_ONLY_CATEGORIES from opaque_keys.edx.locations import Location from xmodule.modulestore.exceptions import ItemNotFoundError, DuplicateCourseError, ReferentialIntegrityError @@ -384,7 +383,47 @@ def as_published(location): return location.replace(revision=MongoRevisionKey.published) -class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): +class MongoBulkOpsRecord(BulkOpsRecord): + """ + Tracks whether there've been any writes per course and disables inheritance generation + """ + def __init__(self): + super(MongoBulkOpsRecord, self).__init__() + self.dirty = False + + +class MongoBulkOpsMixin(BulkOperationsMixin): + """ + Mongo bulk operation support + """ + _bulk_ops_record_type = MongoBulkOpsRecord + + def _start_outermost_bulk_operation(self, bulk_ops_record, course_key): + """ + Prevent updating the meta-data inheritance cache for the given course + """ + # ensure it starts clean + bulk_ops_record.dirty = False + + def _end_outermost_bulk_operation(self, bulk_ops_record, course_id): + """ + Restart updating the meta-data inheritance cache for the given course. + Refresh the meta-data inheritance cache now since it was temporarily disabled. + """ + if bulk_ops_record.dirty: + self.refresh_cached_metadata_inheritance_tree(course_id) + bulk_ops_record.dirty = False # brand spanking clean now + + def _is_in_bulk_operation(self, course_id, ignore_case=False): + """ + Returns whether a bulk operation is in progress for the given course. + """ + return super(MongoBulkOpsMixin, self)._is_in_bulk_operation( + course_id.for_branch(None), ignore_case + ) + + +class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, MongoBulkOpsMixin): """ A Mongodb backed ModuleStore """ @@ -441,9 +480,6 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): self.i18n_service = i18n_service self.fs_service = fs_service - # performance optimization to prevent updating the meta-data inheritance tree during - # bulk write operations - self.ignore_write_events_on_courses = threading.local() self._course_run_cache = {} def close_connections(self): @@ -464,37 +500,6 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): connection.drop_database(self.collection.database) connection.close() - def _begin_bulk_operation(self, course_id): - """ - Prevent updating the meta-data inheritance cache for the given course - """ - if not hasattr(self.ignore_write_events_on_courses, 'courses'): - self.ignore_write_events_on_courses.courses = set() - - self.ignore_write_events_on_courses.courses.add(course_id) - - def _end_bulk_operation(self, course_id): - """ - Restart updating the meta-data inheritance cache for the given course. - Refresh the meta-data inheritance cache now since it was temporarily disabled. - """ - if not hasattr(self.ignore_write_events_on_courses, 'courses'): - return - - if course_id in self.ignore_write_events_on_courses.courses: - self.ignore_write_events_on_courses.courses.remove(course_id) - self.refresh_cached_metadata_inheritance_tree(course_id) - - def _is_bulk_write_in_progress(self, course_id): - """ - Returns whether a bulk write operation is in progress for the given course. - """ - if not hasattr(self.ignore_write_events_on_courses, 'courses'): - return False - - course_id = course_id.for_branch(None) - return course_id in self.ignore_write_events_on_courses.courses - def fill_in_run(self, course_key): """ In mongo some course_keys are used without runs. This helper function returns @@ -655,7 +660,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): a runtime may mean that some objects report old values for inherited data. """ course_id = course_id.for_branch(None) - if not self._is_bulk_write_in_progress(course_id): + if not self._is_in_bulk_operation(course_id): # below is done for side effects when runtime is None cached_metadata = self._get_cached_metadata_inheritance_tree(course_id, force_refresh=True) if runtime: @@ -1137,7 +1142,8 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): Set update on the specified item, and raises ItemNotFoundError if the location doesn't exist """ - + bulk_record = self._get_bulk_ops_record(location.course_key) + bulk_record.dirty = True # See http://www.mongodb.org/display/DOCS/Updating for # atomic update syntax result = self.collection.update( @@ -1205,7 +1211,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): # don't update the subtree info for descendants of the publish root for efficiency if ( (not isPublish or (isPublish and is_publish_root)) and - not self._is_bulk_write_in_progress(xblock.location.course_key) + not self._is_in_bulk_operation(xblock.location.course_key) ): ancestor_payload = { 'edit_info.subtree_edited_on': now, @@ -1225,7 +1231,6 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): elif not self.has_course(xblock.location.course_key): raise ItemNotFoundError(xblock.location.course_key) - return xblock def _serialize_scope(self, xblock, scope): @@ -1257,6 +1262,9 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): from orphan parents to avoid parents calculation overhead next time. """ non_orphan_parents = [] + # get bulk_record once rather than for each iteration + bulk_record = self._get_bulk_ops_record(location.course_key) + for parent in parents: parent_loc = Location._from_deprecated_son(parent['_id'], location.course_key.run) @@ -1266,6 +1274,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase): current_loc = ancestor_loc ancestor_loc = self._get_raw_parent_location(current_loc, revision) if ancestor_loc is None: + bulk_record.dirty = True # The parent is an orphan, so remove all the children including # the location whose parent we are looking for from orphan parent self.collection.update( diff --git a/common/lib/xmodule/xmodule/modulestore/mongo/draft.py b/common/lib/xmodule/xmodule/modulestore/mongo/draft.py index 33dc967b89..f72e36625e 100644 --- a/common/lib/xmodule/xmodule/modulestore/mongo/draft.py +++ b/common/lib/xmodule/xmodule/modulestore/mongo/draft.py @@ -147,6 +147,8 @@ class DraftModuleStore(MongoModuleStore): :param course_key: which course to delete :param user_id: id of the user deleting the course """ + # Note: does not need to inform the bulk mechanism since after the course is deleted, + # it can't calculate inheritance anyway. Nothing is there to be dirty. # delete the assets super(DraftModuleStore, self).delete_course(course_key, user_id) @@ -414,6 +416,8 @@ class DraftModuleStore(MongoModuleStore): item['_id']['revision'] = MongoRevisionKey.draft # ensure keys are in fixed and right order before inserting item['_id'] = self._id_dict_to_son(item['_id']) + bulk_record = self._get_bulk_ops_record(location.course_key) + bulk_record.dirty = True try: self.collection.insert(item) except pymongo.errors.DuplicateKeyError: @@ -588,7 +592,10 @@ class DraftModuleStore(MongoModuleStore): _internal(next_tier) _internal([root_usage.to_deprecated_son() for root_usage in root_usages]) - self.collection.remove({'_id': {'$in': to_be_deleted}}, safe=self.collection.safe) + if len(to_be_deleted) > 0: + bulk_record = self._get_bulk_ops_record(root_usages[0].course_key) + bulk_record.dirty = True + self.collection.remove({'_id': {'$in': to_be_deleted}}, safe=self.collection.safe) @MongoModuleStore.memoize_request_cache def has_changes(self, xblock): @@ -679,6 +686,8 @@ class DraftModuleStore(MongoModuleStore): _internal_depth_first(location, True) if len(to_be_deleted) > 0: + bulk_record = self._get_bulk_ops_record(location.course_key) + bulk_record.dirty = True self.collection.remove({'_id': {'$in': to_be_deleted}}) return self.get_item(as_published(location)) diff --git a/common/lib/xmodule/xmodule/modulestore/search.py b/common/lib/xmodule/xmodule/modulestore/search.py index 43fe6878b9..8854aa1865 100644 --- a/common/lib/xmodule/xmodule/modulestore/search.py +++ b/common/lib/xmodule/xmodule/modulestore/search.py @@ -68,8 +68,7 @@ def path_to_location(modulestore, usage_key): newpath = (next_usage, path) queue.append((parent, newpath)) - # FIXME replace with bulk_operations once it's fixed for old mongo - with modulestore.bulk_temp_noop_operations(usage_key.course_key): + with modulestore.bulk_operations(usage_key.course_key): if not modulestore.has_item(usage_key): raise ItemNotFoundError(usage_key) diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py index 112daf2844..49e3ed3c25 100644 --- a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py +++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py @@ -68,7 +68,7 @@ from opaque_keys.edx.locator import ( from xmodule.modulestore.exceptions import InsufficientSpecificationError, VersionConflictError, DuplicateItemError, \ DuplicateCourseError from xmodule.modulestore import ( - inheritance, ModuleStoreWriteBase, ModuleStoreEnum + inheritance, ModuleStoreWriteBase, ModuleStoreEnum, BulkOpsRecord, BulkOperationsMixin ) from ..exceptions import ItemNotFoundError @@ -82,7 +82,7 @@ from types import NoneType log = logging.getLogger(__name__) -#============================================================================== +# ============================================================================== # # Known issue: # Inheritance for cached kvs doesn't work on edits. Use case. @@ -98,46 +98,20 @@ log = logging.getLogger(__name__) # 10) BUG: a.foo < 0! # Local fix wont' permanently work b/c xblock may cache a.foo... # -#============================================================================== +# ============================================================================== # When blacklists are this, all children should be excluded EXCLUDE_ALL = '*' -class BulkWriteRecord(object): +class SplitBulkWriteRecord(BulkOpsRecord): def __init__(self): - self._active_count = 0 + super(SplitBulkWriteRecord, self).__init__() self.initial_index = None self.index = None self.structures = {} self.structures_in_db = set() - @property - def active(self): - """ - Return whether this bulk write is active. - """ - return self._active_count > 0 - - def nest(self): - """ - Record another level of nesting of this bulk write operation - """ - self._active_count += 1 - - def unnest(self): - """ - Record the completion of a level of nesting of the bulk write operation - """ - self._active_count -= 1 - - @property - def is_root(self): - """ - Return whether the bulk write is at the root (first) level of nesting - """ - return self._active_count == 1 - # TODO: This needs to track which branches have actually been modified/versioned, # so that copying one branch to another doesn't update the original branch. @property @@ -172,7 +146,7 @@ class BulkWriteRecord(object): self.structures[structure['_id']] = structure def __repr__(self): - return u"BulkWriteRecord<{!r}, {!r}, {!r}, {!r}, {!r}>".format( + return u"SplitBulkWriteRecord<{!r}, {!r}, {!r}, {!r}, {!r}>".format( self._active_count, self.initial_index, self.index, @@ -181,7 +155,7 @@ class BulkWriteRecord(object): ) -class BulkWriteMixin(object): +class SplitBulkWriteMixin(BulkOperationsMixin): """ This implements the :meth:`bulk_operations` modulestore semantics for the :class:`SplitMongoModuleStore`. @@ -194,93 +168,55 @@ class BulkWriteMixin(object): If a bulk write operation isn't active, then the changes are immediately written to the underlying mongo_connection. """ - def __init__(self, *args, **kwargs): - super(BulkWriteMixin, self).__init__(*args, **kwargs) - self._active_bulk_writes = threading.local() + _bulk_ops_record_type = SplitBulkWriteRecord - def _get_bulk_write_record(self, course_key, ignore_case=False): + def _get_bulk_ops_record(self, course_key, ignore_case=False): """ - Return the :class:`.BulkWriteRecord` for this course. + Return the :class:`.SplitBulkWriteRecord` for this course. """ + # handle split specific things and defer to super otherwise if course_key is None: - return BulkWriteRecord() + return self._bulk_ops_record_type() if not isinstance(course_key, CourseLocator): raise TypeError(u'{!r} is not a CourseLocator'.format(course_key)) - if not hasattr(self._active_bulk_writes, 'records'): - self._active_bulk_writes.records = defaultdict(BulkWriteRecord) + # handle version_guid based retrieval locally + if course_key.org is None or course_key.course is None or course_key.run is None: + return self._active_bulk_ops.records[ + course_key.replace(org=None, course=None, run=None, branch=None) + ] - # Retrieve the bulk record based on matching org/course/run (possibly ignoring case) - if course_key.org and course_key.course and course_key.run: - if ignore_case: - for key, record in self._active_bulk_writes.records.iteritems(): - if ( - key.org.lower() == course_key.org.lower() and - key.course.lower() == course_key.course.lower() and - key.run.lower() == course_key.run.lower() - ): - return record - # If nothing matches case-insensitively, fall through to creating a new record with the passed in case - return self._active_bulk_writes.records[course_key.replace(branch=None, version_guid=None)] - else: - # If nothing org/course/run aren't set, use a bulk record that is identified just by the version_guid - return self._active_bulk_writes.records[course_key.replace(org=None, course=None, run=None, branch=None)] + return super(SplitBulkWriteMixin, self)._get_bulk_ops_record( + course_key.replace(branch=None, version_guid=None), ignore_case + ) - @property - def _active_records(self): + def _clear_bulk_ops_record(self, course_key): """ - Yield all active (CourseLocator, BulkWriteRecord) tuples. + Clear the record for this course """ - for course_key, record in getattr(self._active_bulk_writes, 'records', {}).iteritems(): - if record.active: - yield (course_key, record) - - def _clear_bulk_write_record(self, course_key): if not isinstance(course_key, CourseLocator): raise TypeError('{!r} is not a CourseLocator'.format(course_key)) - if not hasattr(self._active_bulk_writes, 'records'): - return - if course_key.org and course_key.course and course_key.run: - del self._active_bulk_writes.records[course_key.replace(branch=None, version_guid=None)] + del self._active_bulk_ops.records[course_key.replace(branch=None, version_guid=None)] else: - del self._active_bulk_writes.records[course_key.replace(org=None, course=None, run=None, branch=None)] + del self._active_bulk_ops.records[ + course_key.replace(org=None, course=None, run=None, branch=None) + ] - def _begin_bulk_operation(self, course_key): + def _start_outermost_bulk_operation(self, bulk_write_record, course_key): """ Begin a bulk write operation on course_key. """ - bulk_write_record = self._get_bulk_write_record(course_key) + bulk_write_record.initial_index = self.db_connection.get_course_index(course_key) + # Ensure that any edits to the index don't pollute the initial_index + bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index) - # Increment the number of active bulk operations (bulk operations - # on the same course can be nested) - bulk_write_record.nest() - - # If this is the highest level bulk operation, then initialize it - if bulk_write_record.is_root: - bulk_write_record.initial_index = self.db_connection.get_course_index(course_key) - # Ensure that any edits to the index don't pollute the initial_index - bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index) - - def _end_bulk_operation(self, course_key): + def _end_outermost_bulk_operation(self, bulk_write_record, course_key): """ End the active bulk write operation on course_key. """ - # If no bulk write is active, return - bulk_write_record = self._get_bulk_write_record(course_key) - if not bulk_write_record.active: - return - - bulk_write_record.unnest() - - # If this wasn't the outermost context, then don't close out the - # bulk write operation. - if bulk_write_record.active: - return - - # This is the last active bulk write. If the content is dirty, - # then update the database + # If the content is dirty, then update the database for _id in bulk_write_record.structures.viewkeys() - bulk_write_record.structures_in_db: self.db_connection.upsert_structure(bulk_write_record.structures[_id]) @@ -290,25 +226,17 @@ class BulkWriteMixin(object): else: self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index) - self._clear_bulk_write_record(course_key) - - def _is_in_bulk_write_operation(self, course_key, ignore_case=False): - """ - Return whether a bulk write is active on `course_key`. - """ - return self._get_bulk_write_record(course_key, ignore_case).active - def get_course_index(self, course_key, ignore_case=False): """ Return the index for course_key. """ - if self._is_in_bulk_write_operation(course_key, ignore_case): - return self._get_bulk_write_record(course_key, ignore_case).index + if self._is_in_bulk_operation(course_key, ignore_case): + return self._get_bulk_ops_record(course_key, ignore_case).index else: return self.db_connection.get_course_index(course_key, ignore_case) def insert_course_index(self, course_key, index_entry): - bulk_write_record = self._get_bulk_write_record(course_key) + bulk_write_record = self._get_bulk_ops_record(course_key) if bulk_write_record.active: bulk_write_record.index = index_entry else: @@ -322,14 +250,14 @@ class BulkWriteMixin(object): Does not return anything useful. """ - bulk_write_record = self._get_bulk_write_record(course_key) + bulk_write_record = self._get_bulk_ops_record(course_key) if bulk_write_record.active: bulk_write_record.index = updated_index_entry else: self.db_connection.update_course_index(updated_index_entry) def get_structure(self, course_key, version_guid): - bulk_write_record = self._get_bulk_write_record(course_key) + bulk_write_record = self._get_bulk_ops_record(course_key) if bulk_write_record.active: structure = bulk_write_record.structures.get(version_guid) @@ -352,7 +280,7 @@ class BulkWriteMixin(object): (no data will be written to the database if a bulk operation is active.) """ self._clear_cache(structure['_id']) - bulk_write_record = self._get_bulk_write_record(course_key) + bulk_write_record = self._get_bulk_ops_record(course_key) if bulk_write_record.active: bulk_write_record.structures[structure['_id']] = structure else: @@ -365,7 +293,7 @@ class BulkWriteMixin(object): if course_key.branch is None: raise InsufficientSpecificationError(course_key) - bulk_write_record = self._get_bulk_write_record(course_key) + bulk_write_record = self._get_bulk_ops_record(course_key) # If we have an active bulk write, and it's already been edited, then just use that structure if bulk_write_record.active and course_key.branch in bulk_write_record.dirty_branches: @@ -507,7 +435,7 @@ class BulkWriteMixin(object): return structures -class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): +class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): """ A Mongodb backed ModuleStore supporting versions, inheritance, and sharing. @@ -2124,7 +2052,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): if not isinstance(new_id, ObjectId): raise TypeError('new_id must be an ObjectId, but is {!r}'.format(new_id)) index_entry['versions'][branch] = new_id - self.insert_course_index(course_key, index_entry) + self.update_course_index(course_key, index_entry) def partition_xblock_fields_by_scope(self, xblock): """ diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_mixed_modulestore.py b/common/lib/xmodule/xmodule/modulestore/tests/test_mixed_modulestore.py index 0d1330be47..bca5456cb4 100644 --- a/common/lib/xmodule/xmodule/modulestore/tests/test_mixed_modulestore.py +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_mixed_modulestore.py @@ -916,7 +916,7 @@ class TestMixedModuleStore(unittest.TestCase): # TODO: LMS-11220: Document why draft send count is 5 # TODO: LMS-11220: Document why draft find count is [19, 6] # TODO: LMS-11220: Document why split find count is [2, 2] - @ddt.data(('draft', [20, 5], 0), ('split', [17, 6], 0)) # FIXME, replace w/ above when bulk reenabled + @ddt.data(('draft', [20, 5], 0), ('split', [2, 2], 0)) @ddt.unpack def test_path_to_location(self, default_ms, num_finds, num_sends): """ diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_publish.py b/common/lib/xmodule/xmodule/modulestore/tests/test_publish.py index 76511faebd..24a85a9cad 100644 --- a/common/lib/xmodule/xmodule/modulestore/tests/test_publish.py +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_publish.py @@ -37,17 +37,19 @@ class TestPublish(SplitWMongoCourseBoostrapper): # - get last error # - load parent # - load inheritable data - with check_mongo_calls(10, 6): + with check_mongo_calls(7, 4): self._create_item('vertical', 'Vert1', {}, {'display_name': 'Vertical 1'}, 'chapter', 'Chapter1', split=False) self._create_item('vertical', 'Vert2', {}, {'display_name': 'Vertical 2'}, 'chapter', 'Chapter1', split=False) # For each (4) item created - # - load draft - # - load non-draft + # - try to find draft + # - try to find non-draft + # - retrieve draft of new parent # - get last error # - load parent # - load inheritable data # - load parent - with check_mongo_calls(24, 12): + # count for updates increased to 16 b/c of edit_info updating + with check_mongo_calls(16, 8): self._create_item('html', 'Html1', "

Goodbye

", {'display_name': 'Parented Html'}, 'vertical', 'Vert1', split=False) self._create_item( 'discussion', 'Discussion1', @@ -92,7 +94,7 @@ class TestPublish(SplitWMongoCourseBoostrapper): # 25-June-2014 find calls are 19. Probably due to inheritance recomputation? # 02-July-2014 send calls are 7. 5 from above, plus 2 for updating subtree edit info for Chapter1 and course # find calls are 22. 19 from above, plus 3 for finding the parent of Vert1, Chapter1, and course - with check_mongo_calls(25, 7): + with check_mongo_calls(20, 7): self.draft_mongo.publish(item.location, self.user_id) # verify status diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py index 92c07d0f63..61f9b93076 100644 --- a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py @@ -3,16 +3,16 @@ import ddt import unittest from bson.objectid import ObjectId from mock import MagicMock, Mock, call -from xmodule.modulestore.split_mongo.split import BulkWriteMixin +from xmodule.modulestore.split_mongo.split import SplitBulkWriteMixin from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection -from opaque_keys.edx.locator import CourseLocator, BlockUsageLocator, VersionTree, LocalId +from opaque_keys.edx.locator import CourseLocator class TestBulkWriteMixin(unittest.TestCase): def setUp(self): super(TestBulkWriteMixin, self).setUp() - self.bulk = BulkWriteMixin() + self.bulk = SplitBulkWriteMixin() self.bulk.SCHEMA_VERSION = 1 self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache') self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection) @@ -407,6 +407,7 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin): self.conn.find_ancestor_structures.assert_called_once_with(original_version, block_id) self.assertItemsEqual(active_match + db_match, results) + @ddt.ddt class TestBulkWriteMixinOpen(TestBulkWriteMixin): """ diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_w_old_mongo.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_w_old_mongo.py index da3c270641..d6947ed31c 100644 --- a/common/lib/xmodule/xmodule/modulestore/tests/test_split_w_old_mongo.py +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_w_old_mongo.py @@ -10,6 +10,7 @@ from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore from xmodule.modulestore.mongo import DraftMongoModuleStore from xmodule.modulestore import ModuleStoreEnum from xmodule.modulestore.tests.mongo_connection import MONGO_PORT_NUM, MONGO_HOST +from xmodule.modulestore.tests.test_cross_modulestore_import_export import MemoryCache class SplitWMongoCourseBoostrapper(unittest.TestCase): @@ -26,7 +27,7 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase): * split_course_key (CourseLocator): of the new course * old_course_key: the SlashSpecifiedCourseKey for the course """ - # Snippet of what would be in the django settings envs file + # Snippet of what would be in the django settings envs file db_config = { 'host': MONGO_HOST, 'port': MONGO_PORT_NUM, @@ -55,7 +56,9 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase): self.addCleanup(self.split_mongo.db.connection.close) self.addCleanup(self.tear_down_split) self.draft_mongo = DraftMongoModuleStore( - None, self.db_config, branch_setting_func=lambda: ModuleStoreEnum.Branch.draft_preferred, **self.modulestore_options + None, self.db_config, branch_setting_func=lambda: ModuleStoreEnum.Branch.draft_preferred, + metadata_inheritance_cache_subsystem=MemoryCache(), + **self.modulestore_options ) self.addCleanup(self.tear_down_mongo) self.old_course_key = None diff --git a/lms/djangoapps/courseware/model_data.py b/lms/djangoapps/courseware/model_data.py index b3345513cd..b7bb361a57 100644 --- a/lms/djangoapps/courseware/model_data.py +++ b/lms/djangoapps/courseware/model_data.py @@ -110,8 +110,7 @@ class FieldDataCache(object): return descriptors - # FIXME - with modulestore().bulk_temp_noop_operations(descriptor.location.course_key): + with modulestore().bulk_operations(descriptor.location.course_key): descriptors = get_child_descriptors(descriptor, depth, descriptor_filter) return FieldDataCache(descriptors, course_id, user, select_for_update) diff --git a/lms/djangoapps/courseware/tests/test_module_render.py b/lms/djangoapps/courseware/tests/test_module_render.py index 35b33e18f7..4498bea38b 100644 --- a/lms/djangoapps/courseware/tests/test_module_render.py +++ b/lms/djangoapps/courseware/tests/test_module_render.py @@ -332,10 +332,9 @@ class TestTOC(ModuleStoreTestCase): self.toy_loc, self.request.user, self.toy_course, depth=2 ) - # TODO: LMS-11220: Document why split find count is 9 # TODO: LMS-11220: Document why mongo find count is 4 - @ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 21, 0)) + @ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 9, 0)) @ddt.unpack def test_toc_toy_from_chapter(self, default_ms, num_finds, num_sends): with self.store.default_store(default_ms): @@ -364,7 +363,7 @@ class TestTOC(ModuleStoreTestCase): # TODO: LMS-11220: Document why split find count is 9 # TODO: LMS-11220: Document why mongo find count is 4 - @ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 21, 0)) + @ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 9, 0)) @ddt.unpack def test_toc_toy_from_section(self, default_ms, num_finds, num_sends): with self.store.default_store(default_ms):