Generalize bulk_operations logic
LMS-11366
This commit is contained in:
@@ -423,7 +423,7 @@ def course_index(request, course_key):
|
||||
"""
|
||||
# A depth of None implies the whole course. The course outline needs this in order to compute has_changes.
|
||||
# A unit may not have a draft version, but one of its components could, and hence the unit itself has changes.
|
||||
with modulestore().bulk_temp_noop_operations(course_key): # FIXME
|
||||
with modulestore().bulk_operations(course_key):
|
||||
course_module = _get_course_module(course_key, request.user, depth=None)
|
||||
lms_link = get_lms_link_for_item(course_module.location)
|
||||
sections = course_module.get_children()
|
||||
|
||||
@@ -25,6 +25,7 @@ from opaque_keys.edx.locations import SlashSeparatedCourseKey
|
||||
from xblock.runtime import Mixologist
|
||||
from xblock.core import XBlock
|
||||
import functools
|
||||
import threading
|
||||
|
||||
log = logging.getLogger('edx.modulestore')
|
||||
|
||||
@@ -91,6 +92,173 @@ class ModuleStoreEnum(object):
|
||||
test = -3
|
||||
|
||||
|
||||
class BulkOpsRecord(object):
|
||||
"""
|
||||
For handling nesting of bulk operations
|
||||
"""
|
||||
def __init__(self):
|
||||
self._active_count = 0
|
||||
|
||||
@property
|
||||
def active(self):
|
||||
"""
|
||||
Return whether this bulk write is active.
|
||||
"""
|
||||
return self._active_count > 0
|
||||
|
||||
def nest(self):
|
||||
"""
|
||||
Record another level of nesting of this bulk write operation
|
||||
"""
|
||||
self._active_count += 1
|
||||
|
||||
def unnest(self):
|
||||
"""
|
||||
Record the completion of a level of nesting of the bulk write operation
|
||||
"""
|
||||
self._active_count -= 1
|
||||
|
||||
@property
|
||||
def is_root(self):
|
||||
"""
|
||||
Return whether the bulk write is at the root (first) level of nesting
|
||||
"""
|
||||
return self._active_count == 1
|
||||
|
||||
|
||||
class ActiveBulkThread(threading.local):
|
||||
"""
|
||||
Add the expected vars to the thread.
|
||||
"""
|
||||
def __init__(self, bulk_ops_record_type, **kwargs):
|
||||
super(ActiveBulkThread, self).__init__(**kwargs)
|
||||
self.records = defaultdict(bulk_ops_record_type)
|
||||
|
||||
|
||||
class BulkOperationsMixin(object):
|
||||
"""
|
||||
This implements the :meth:`bulk_operations` modulestore semantics which handles nested invocations
|
||||
|
||||
In particular, it implements :meth:`_begin_bulk_operation` and
|
||||
:meth:`_end_bulk_operation` to provide the external interface
|
||||
|
||||
Internally, this mixin records the set of all active bulk operations (keyed on the active course),
|
||||
and only writes those values when :meth:`_end_bulk_operation` is called.
|
||||
If a bulk write operation isn't active, then the changes are immediately written to the underlying
|
||||
mongo_connection.
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BulkOperationsMixin, self).__init__(*args, **kwargs)
|
||||
self._active_bulk_ops = ActiveBulkThread(self._bulk_ops_record_type)
|
||||
|
||||
@contextmanager
|
||||
def bulk_operations(self, course_id):
|
||||
"""
|
||||
A context manager for notifying the store of bulk operations. This affects only the current thread.
|
||||
|
||||
In the case of Mongo, it temporarily disables refreshing the metadata inheritance tree
|
||||
until the bulk operation is completed.
|
||||
"""
|
||||
try:
|
||||
self._begin_bulk_operation(course_id)
|
||||
yield
|
||||
finally:
|
||||
self._end_bulk_operation(course_id)
|
||||
|
||||
# the relevant type of bulk_ops_record for the mixin (overriding classes should override
|
||||
# this variable)
|
||||
_bulk_ops_record_type = BulkOpsRecord
|
||||
|
||||
def _get_bulk_ops_record(self, course_key, ignore_case=False):
|
||||
"""
|
||||
Return the :class:`.BulkOpsRecord` for this course.
|
||||
"""
|
||||
if course_key is None:
|
||||
return self._bulk_ops_record_type()
|
||||
|
||||
# Retrieve the bulk record based on matching org/course/run (possibly ignoring case)
|
||||
if ignore_case:
|
||||
for key, record in self._active_bulk_ops.records.iteritems():
|
||||
if (
|
||||
key.org.lower() == course_key.org.lower() and
|
||||
key.course.lower() == course_key.course.lower() and
|
||||
key.run.lower() == course_key.run.lower()
|
||||
):
|
||||
return record
|
||||
return self._active_bulk_ops.records[course_key.for_branch(None)]
|
||||
|
||||
@property
|
||||
def _active_records(self):
|
||||
"""
|
||||
Yield all active (CourseLocator, BulkOpsRecord) tuples.
|
||||
"""
|
||||
for course_key, record in self._active_bulk_ops.records.iteritems():
|
||||
if record.active:
|
||||
yield (course_key, record)
|
||||
|
||||
def _clear_bulk_ops_record(self, course_key):
|
||||
"""
|
||||
Clear the record for this course
|
||||
"""
|
||||
del self._active_bulk_ops.records[course_key.for_branch(None)]
|
||||
|
||||
def _start_outermost_bulk_operation(self, bulk_ops_record, course_key):
|
||||
"""
|
||||
The outermost nested bulk_operation call: do the actual begin of the bulk operation.
|
||||
|
||||
Implementing classes must override this method; otherwise, the bulk operations are a noop
|
||||
"""
|
||||
pass
|
||||
|
||||
def _begin_bulk_operation(self, course_key):
|
||||
"""
|
||||
Begin a bulk operation on course_key.
|
||||
"""
|
||||
bulk_ops_record = self._get_bulk_ops_record(course_key)
|
||||
|
||||
# Increment the number of active bulk operations (bulk operations
|
||||
# on the same course can be nested)
|
||||
bulk_ops_record.nest()
|
||||
|
||||
# If this is the highest level bulk operation, then initialize it
|
||||
if bulk_ops_record.is_root:
|
||||
self._start_outermost_bulk_operation(bulk_ops_record, course_key)
|
||||
|
||||
def _end_outermost_bulk_operation(self, bulk_ops_record, course_key):
|
||||
"""
|
||||
The outermost nested bulk_operation call: do the actual end of the bulk operation.
|
||||
|
||||
Implementing classes must override this method; otherwise, the bulk operations are a noop
|
||||
"""
|
||||
pass
|
||||
|
||||
def _end_bulk_operation(self, course_key):
|
||||
"""
|
||||
End the active bulk operation on course_key.
|
||||
"""
|
||||
# If no bulk op is active, return
|
||||
bulk_ops_record = self._get_bulk_ops_record(course_key)
|
||||
if not bulk_ops_record.active:
|
||||
return
|
||||
|
||||
bulk_ops_record.unnest()
|
||||
|
||||
# If this wasn't the outermost context, then don't close out the
|
||||
# bulk operation.
|
||||
if bulk_ops_record.active:
|
||||
return
|
||||
|
||||
self._end_outermost_bulk_operation(bulk_ops_record, course_key)
|
||||
|
||||
self._clear_bulk_ops_record(course_key)
|
||||
|
||||
def _is_in_bulk_operation(self, course_key, ignore_case=False):
|
||||
"""
|
||||
Return whether a bulk operation is active on `course_key`.
|
||||
"""
|
||||
return self._get_bulk_ops_record(course_key, ignore_case).active
|
||||
|
||||
|
||||
class ModuleStoreRead(object):
|
||||
"""
|
||||
An abstract interface for a database backend that stores XModuleDescriptor
|
||||
@@ -436,7 +604,7 @@ class ModuleStoreWrite(ModuleStoreRead):
|
||||
pass
|
||||
|
||||
|
||||
class ModuleStoreReadBase(ModuleStoreRead):
|
||||
class ModuleStoreReadBase(BulkOperationsMixin, ModuleStoreRead):
|
||||
'''
|
||||
Implement interface functionality that can be shared.
|
||||
'''
|
||||
@@ -456,7 +624,9 @@ class ModuleStoreReadBase(ModuleStoreRead):
|
||||
'''
|
||||
Set up the error-tracking logic.
|
||||
'''
|
||||
super(ModuleStoreReadBase, self).__init__()
|
||||
self._course_errors = defaultdict(make_error_tracker) # location -> ErrorLog
|
||||
# TODO move the inheritance_cache_subsystem to classes which use it
|
||||
self.metadata_inheritance_cache_subsystem = metadata_inheritance_cache_subsystem
|
||||
self.request_cache = request_cache
|
||||
self.xblock_mixins = xblock_mixins
|
||||
@@ -551,41 +721,6 @@ class ModuleStoreReadBase(ModuleStoreRead):
|
||||
raise ValueError(u"Cannot set default store to type {}".format(store_type))
|
||||
yield
|
||||
|
||||
@contextmanager
|
||||
def bulk_operations(self, course_id):
|
||||
"""
|
||||
A context manager for notifying the store of bulk operations. This affects only the current thread.
|
||||
|
||||
In the case of Mongo, it temporarily disables refreshing the metadata inheritance tree
|
||||
until the bulk operation is completed.
|
||||
"""
|
||||
# TODO: Make this multi-process-safe if future operations need it.
|
||||
try:
|
||||
self._begin_bulk_operation(course_id)
|
||||
yield
|
||||
finally:
|
||||
self._end_bulk_operation(course_id)
|
||||
|
||||
@contextmanager
|
||||
def bulk_temp_noop_operations(self, course_id):
|
||||
"""
|
||||
A hotfix noop b/c old mongo does not properly handle nested bulk operations and does unnecessary work
|
||||
if the bulk operation only reads data. Replace with bulk_operations once fixed (or don't merge to master)
|
||||
"""
|
||||
yield
|
||||
|
||||
def _begin_bulk_operation(self, course_id):
|
||||
"""
|
||||
Begin a bulk write operation on course_id.
|
||||
"""
|
||||
pass
|
||||
|
||||
def _end_bulk_operation(self, course_id):
|
||||
"""
|
||||
End the active bulk write operation on course_id.
|
||||
"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def memoize_request_cache(func):
|
||||
"""
|
||||
@@ -608,6 +743,7 @@ class ModuleStoreReadBase(ModuleStoreRead):
|
||||
return func(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def hashvalue(arg):
|
||||
"""
|
||||
If arg is an xblock, use its location. otherwise just turn it into a string
|
||||
@@ -617,6 +753,7 @@ def hashvalue(arg):
|
||||
else:
|
||||
return unicode(arg)
|
||||
|
||||
|
||||
class ModuleStoreWriteBase(ModuleStoreReadBase, ModuleStoreWrite):
|
||||
'''
|
||||
Implement interface functionality that can be shared.
|
||||
|
||||
@@ -10,6 +10,7 @@ from . import ModuleStoreEnum
|
||||
# Things w/ these categories should never be marked as version=DRAFT
|
||||
DIRECT_ONLY_CATEGORIES = ['course', 'chapter', 'sequential', 'about', 'static_tab', 'course_info']
|
||||
|
||||
|
||||
class BranchSettingMixin(object):
|
||||
"""
|
||||
A mixin to manage a module store's branch setting.
|
||||
|
||||
@@ -17,7 +17,6 @@ import sys
|
||||
import logging
|
||||
import copy
|
||||
import re
|
||||
import threading
|
||||
from uuid import uuid4
|
||||
|
||||
from bson.son import SON
|
||||
@@ -34,7 +33,7 @@ from xblock.runtime import KvsFieldData
|
||||
from xblock.exceptions import InvalidScopeError
|
||||
from xblock.fields import Scope, ScopeIds, Reference, ReferenceList, ReferenceValueDict
|
||||
|
||||
from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum
|
||||
from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum, BulkOperationsMixin, BulkOpsRecord
|
||||
from xmodule.modulestore.draft_and_published import ModuleStoreDraftAndPublished, DIRECT_ONLY_CATEGORIES
|
||||
from opaque_keys.edx.locations import Location
|
||||
from xmodule.modulestore.exceptions import ItemNotFoundError, DuplicateCourseError, ReferentialIntegrityError
|
||||
@@ -384,7 +383,47 @@ def as_published(location):
|
||||
return location.replace(revision=MongoRevisionKey.published)
|
||||
|
||||
|
||||
class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
class MongoBulkOpsRecord(BulkOpsRecord):
|
||||
"""
|
||||
Tracks whether there've been any writes per course and disables inheritance generation
|
||||
"""
|
||||
def __init__(self):
|
||||
super(MongoBulkOpsRecord, self).__init__()
|
||||
self.dirty = False
|
||||
|
||||
|
||||
class MongoBulkOpsMixin(BulkOperationsMixin):
|
||||
"""
|
||||
Mongo bulk operation support
|
||||
"""
|
||||
_bulk_ops_record_type = MongoBulkOpsRecord
|
||||
|
||||
def _start_outermost_bulk_operation(self, bulk_ops_record, course_key):
|
||||
"""
|
||||
Prevent updating the meta-data inheritance cache for the given course
|
||||
"""
|
||||
# ensure it starts clean
|
||||
bulk_ops_record.dirty = False
|
||||
|
||||
def _end_outermost_bulk_operation(self, bulk_ops_record, course_id):
|
||||
"""
|
||||
Restart updating the meta-data inheritance cache for the given course.
|
||||
Refresh the meta-data inheritance cache now since it was temporarily disabled.
|
||||
"""
|
||||
if bulk_ops_record.dirty:
|
||||
self.refresh_cached_metadata_inheritance_tree(course_id)
|
||||
bulk_ops_record.dirty = False # brand spanking clean now
|
||||
|
||||
def _is_in_bulk_operation(self, course_id, ignore_case=False):
|
||||
"""
|
||||
Returns whether a bulk operation is in progress for the given course.
|
||||
"""
|
||||
return super(MongoBulkOpsMixin, self)._is_in_bulk_operation(
|
||||
course_id.for_branch(None), ignore_case
|
||||
)
|
||||
|
||||
|
||||
class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, MongoBulkOpsMixin):
|
||||
"""
|
||||
A Mongodb backed ModuleStore
|
||||
"""
|
||||
@@ -441,9 +480,6 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
self.i18n_service = i18n_service
|
||||
self.fs_service = fs_service
|
||||
|
||||
# performance optimization to prevent updating the meta-data inheritance tree during
|
||||
# bulk write operations
|
||||
self.ignore_write_events_on_courses = threading.local()
|
||||
self._course_run_cache = {}
|
||||
|
||||
def close_connections(self):
|
||||
@@ -464,37 +500,6 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
connection.drop_database(self.collection.database)
|
||||
connection.close()
|
||||
|
||||
def _begin_bulk_operation(self, course_id):
|
||||
"""
|
||||
Prevent updating the meta-data inheritance cache for the given course
|
||||
"""
|
||||
if not hasattr(self.ignore_write_events_on_courses, 'courses'):
|
||||
self.ignore_write_events_on_courses.courses = set()
|
||||
|
||||
self.ignore_write_events_on_courses.courses.add(course_id)
|
||||
|
||||
def _end_bulk_operation(self, course_id):
|
||||
"""
|
||||
Restart updating the meta-data inheritance cache for the given course.
|
||||
Refresh the meta-data inheritance cache now since it was temporarily disabled.
|
||||
"""
|
||||
if not hasattr(self.ignore_write_events_on_courses, 'courses'):
|
||||
return
|
||||
|
||||
if course_id in self.ignore_write_events_on_courses.courses:
|
||||
self.ignore_write_events_on_courses.courses.remove(course_id)
|
||||
self.refresh_cached_metadata_inheritance_tree(course_id)
|
||||
|
||||
def _is_bulk_write_in_progress(self, course_id):
|
||||
"""
|
||||
Returns whether a bulk write operation is in progress for the given course.
|
||||
"""
|
||||
if not hasattr(self.ignore_write_events_on_courses, 'courses'):
|
||||
return False
|
||||
|
||||
course_id = course_id.for_branch(None)
|
||||
return course_id in self.ignore_write_events_on_courses.courses
|
||||
|
||||
def fill_in_run(self, course_key):
|
||||
"""
|
||||
In mongo some course_keys are used without runs. This helper function returns
|
||||
@@ -655,7 +660,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
a runtime may mean that some objects report old values for inherited data.
|
||||
"""
|
||||
course_id = course_id.for_branch(None)
|
||||
if not self._is_bulk_write_in_progress(course_id):
|
||||
if not self._is_in_bulk_operation(course_id):
|
||||
# below is done for side effects when runtime is None
|
||||
cached_metadata = self._get_cached_metadata_inheritance_tree(course_id, force_refresh=True)
|
||||
if runtime:
|
||||
@@ -1137,7 +1142,8 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
Set update on the specified item, and raises ItemNotFoundError
|
||||
if the location doesn't exist
|
||||
"""
|
||||
|
||||
bulk_record = self._get_bulk_ops_record(location.course_key)
|
||||
bulk_record.dirty = True
|
||||
# See http://www.mongodb.org/display/DOCS/Updating for
|
||||
# atomic update syntax
|
||||
result = self.collection.update(
|
||||
@@ -1205,7 +1211,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
# don't update the subtree info for descendants of the publish root for efficiency
|
||||
if (
|
||||
(not isPublish or (isPublish and is_publish_root)) and
|
||||
not self._is_bulk_write_in_progress(xblock.location.course_key)
|
||||
not self._is_in_bulk_operation(xblock.location.course_key)
|
||||
):
|
||||
ancestor_payload = {
|
||||
'edit_info.subtree_edited_on': now,
|
||||
@@ -1225,7 +1231,6 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
elif not self.has_course(xblock.location.course_key):
|
||||
raise ItemNotFoundError(xblock.location.course_key)
|
||||
|
||||
|
||||
return xblock
|
||||
|
||||
def _serialize_scope(self, xblock, scope):
|
||||
@@ -1257,6 +1262,9 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
from orphan parents to avoid parents calculation overhead next time.
|
||||
"""
|
||||
non_orphan_parents = []
|
||||
# get bulk_record once rather than for each iteration
|
||||
bulk_record = self._get_bulk_ops_record(location.course_key)
|
||||
|
||||
for parent in parents:
|
||||
parent_loc = Location._from_deprecated_son(parent['_id'], location.course_key.run)
|
||||
|
||||
@@ -1266,6 +1274,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase):
|
||||
current_loc = ancestor_loc
|
||||
ancestor_loc = self._get_raw_parent_location(current_loc, revision)
|
||||
if ancestor_loc is None:
|
||||
bulk_record.dirty = True
|
||||
# The parent is an orphan, so remove all the children including
|
||||
# the location whose parent we are looking for from orphan parent
|
||||
self.collection.update(
|
||||
|
||||
@@ -147,6 +147,8 @@ class DraftModuleStore(MongoModuleStore):
|
||||
:param course_key: which course to delete
|
||||
:param user_id: id of the user deleting the course
|
||||
"""
|
||||
# Note: does not need to inform the bulk mechanism since after the course is deleted,
|
||||
# it can't calculate inheritance anyway. Nothing is there to be dirty.
|
||||
# delete the assets
|
||||
super(DraftModuleStore, self).delete_course(course_key, user_id)
|
||||
|
||||
@@ -414,6 +416,8 @@ class DraftModuleStore(MongoModuleStore):
|
||||
item['_id']['revision'] = MongoRevisionKey.draft
|
||||
# ensure keys are in fixed and right order before inserting
|
||||
item['_id'] = self._id_dict_to_son(item['_id'])
|
||||
bulk_record = self._get_bulk_ops_record(location.course_key)
|
||||
bulk_record.dirty = True
|
||||
try:
|
||||
self.collection.insert(item)
|
||||
except pymongo.errors.DuplicateKeyError:
|
||||
@@ -588,7 +592,10 @@ class DraftModuleStore(MongoModuleStore):
|
||||
_internal(next_tier)
|
||||
|
||||
_internal([root_usage.to_deprecated_son() for root_usage in root_usages])
|
||||
self.collection.remove({'_id': {'$in': to_be_deleted}}, safe=self.collection.safe)
|
||||
if len(to_be_deleted) > 0:
|
||||
bulk_record = self._get_bulk_ops_record(root_usages[0].course_key)
|
||||
bulk_record.dirty = True
|
||||
self.collection.remove({'_id': {'$in': to_be_deleted}}, safe=self.collection.safe)
|
||||
|
||||
@MongoModuleStore.memoize_request_cache
|
||||
def has_changes(self, xblock):
|
||||
@@ -679,6 +686,8 @@ class DraftModuleStore(MongoModuleStore):
|
||||
|
||||
_internal_depth_first(location, True)
|
||||
if len(to_be_deleted) > 0:
|
||||
bulk_record = self._get_bulk_ops_record(location.course_key)
|
||||
bulk_record.dirty = True
|
||||
self.collection.remove({'_id': {'$in': to_be_deleted}})
|
||||
return self.get_item(as_published(location))
|
||||
|
||||
|
||||
@@ -68,8 +68,7 @@ def path_to_location(modulestore, usage_key):
|
||||
newpath = (next_usage, path)
|
||||
queue.append((parent, newpath))
|
||||
|
||||
# FIXME replace with bulk_operations once it's fixed for old mongo
|
||||
with modulestore.bulk_temp_noop_operations(usage_key.course_key):
|
||||
with modulestore.bulk_operations(usage_key.course_key):
|
||||
if not modulestore.has_item(usage_key):
|
||||
raise ItemNotFoundError(usage_key)
|
||||
|
||||
|
||||
@@ -68,7 +68,7 @@ from opaque_keys.edx.locator import (
|
||||
from xmodule.modulestore.exceptions import InsufficientSpecificationError, VersionConflictError, DuplicateItemError, \
|
||||
DuplicateCourseError
|
||||
from xmodule.modulestore import (
|
||||
inheritance, ModuleStoreWriteBase, ModuleStoreEnum
|
||||
inheritance, ModuleStoreWriteBase, ModuleStoreEnum, BulkOpsRecord, BulkOperationsMixin
|
||||
)
|
||||
|
||||
from ..exceptions import ItemNotFoundError
|
||||
@@ -82,7 +82,7 @@ from types import NoneType
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
#==============================================================================
|
||||
# ==============================================================================
|
||||
#
|
||||
# Known issue:
|
||||
# Inheritance for cached kvs doesn't work on edits. Use case.
|
||||
@@ -98,46 +98,20 @@ log = logging.getLogger(__name__)
|
||||
# 10) BUG: a.foo < 0!
|
||||
# Local fix wont' permanently work b/c xblock may cache a.foo...
|
||||
#
|
||||
#==============================================================================
|
||||
# ==============================================================================
|
||||
|
||||
# When blacklists are this, all children should be excluded
|
||||
EXCLUDE_ALL = '*'
|
||||
|
||||
|
||||
class BulkWriteRecord(object):
|
||||
class SplitBulkWriteRecord(BulkOpsRecord):
|
||||
def __init__(self):
|
||||
self._active_count = 0
|
||||
super(SplitBulkWriteRecord, self).__init__()
|
||||
self.initial_index = None
|
||||
self.index = None
|
||||
self.structures = {}
|
||||
self.structures_in_db = set()
|
||||
|
||||
@property
|
||||
def active(self):
|
||||
"""
|
||||
Return whether this bulk write is active.
|
||||
"""
|
||||
return self._active_count > 0
|
||||
|
||||
def nest(self):
|
||||
"""
|
||||
Record another level of nesting of this bulk write operation
|
||||
"""
|
||||
self._active_count += 1
|
||||
|
||||
def unnest(self):
|
||||
"""
|
||||
Record the completion of a level of nesting of the bulk write operation
|
||||
"""
|
||||
self._active_count -= 1
|
||||
|
||||
@property
|
||||
def is_root(self):
|
||||
"""
|
||||
Return whether the bulk write is at the root (first) level of nesting
|
||||
"""
|
||||
return self._active_count == 1
|
||||
|
||||
# TODO: This needs to track which branches have actually been modified/versioned,
|
||||
# so that copying one branch to another doesn't update the original branch.
|
||||
@property
|
||||
@@ -172,7 +146,7 @@ class BulkWriteRecord(object):
|
||||
self.structures[structure['_id']] = structure
|
||||
|
||||
def __repr__(self):
|
||||
return u"BulkWriteRecord<{!r}, {!r}, {!r}, {!r}, {!r}>".format(
|
||||
return u"SplitBulkWriteRecord<{!r}, {!r}, {!r}, {!r}, {!r}>".format(
|
||||
self._active_count,
|
||||
self.initial_index,
|
||||
self.index,
|
||||
@@ -181,7 +155,7 @@ class BulkWriteRecord(object):
|
||||
)
|
||||
|
||||
|
||||
class BulkWriteMixin(object):
|
||||
class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
"""
|
||||
This implements the :meth:`bulk_operations` modulestore semantics for the :class:`SplitMongoModuleStore`.
|
||||
|
||||
@@ -194,93 +168,55 @@ class BulkWriteMixin(object):
|
||||
If a bulk write operation isn't active, then the changes are immediately written to the underlying
|
||||
mongo_connection.
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BulkWriteMixin, self).__init__(*args, **kwargs)
|
||||
self._active_bulk_writes = threading.local()
|
||||
_bulk_ops_record_type = SplitBulkWriteRecord
|
||||
|
||||
def _get_bulk_write_record(self, course_key, ignore_case=False):
|
||||
def _get_bulk_ops_record(self, course_key, ignore_case=False):
|
||||
"""
|
||||
Return the :class:`.BulkWriteRecord` for this course.
|
||||
Return the :class:`.SplitBulkWriteRecord` for this course.
|
||||
"""
|
||||
# handle split specific things and defer to super otherwise
|
||||
if course_key is None:
|
||||
return BulkWriteRecord()
|
||||
return self._bulk_ops_record_type()
|
||||
|
||||
if not isinstance(course_key, CourseLocator):
|
||||
raise TypeError(u'{!r} is not a CourseLocator'.format(course_key))
|
||||
if not hasattr(self._active_bulk_writes, 'records'):
|
||||
self._active_bulk_writes.records = defaultdict(BulkWriteRecord)
|
||||
# handle version_guid based retrieval locally
|
||||
if course_key.org is None or course_key.course is None or course_key.run is None:
|
||||
return self._active_bulk_ops.records[
|
||||
course_key.replace(org=None, course=None, run=None, branch=None)
|
||||
]
|
||||
|
||||
# Retrieve the bulk record based on matching org/course/run (possibly ignoring case)
|
||||
if course_key.org and course_key.course and course_key.run:
|
||||
if ignore_case:
|
||||
for key, record in self._active_bulk_writes.records.iteritems():
|
||||
if (
|
||||
key.org.lower() == course_key.org.lower() and
|
||||
key.course.lower() == course_key.course.lower() and
|
||||
key.run.lower() == course_key.run.lower()
|
||||
):
|
||||
return record
|
||||
# If nothing matches case-insensitively, fall through to creating a new record with the passed in case
|
||||
return self._active_bulk_writes.records[course_key.replace(branch=None, version_guid=None)]
|
||||
else:
|
||||
# If nothing org/course/run aren't set, use a bulk record that is identified just by the version_guid
|
||||
return self._active_bulk_writes.records[course_key.replace(org=None, course=None, run=None, branch=None)]
|
||||
return super(SplitBulkWriteMixin, self)._get_bulk_ops_record(
|
||||
course_key.replace(branch=None, version_guid=None), ignore_case
|
||||
)
|
||||
|
||||
@property
|
||||
def _active_records(self):
|
||||
def _clear_bulk_ops_record(self, course_key):
|
||||
"""
|
||||
Yield all active (CourseLocator, BulkWriteRecord) tuples.
|
||||
Clear the record for this course
|
||||
"""
|
||||
for course_key, record in getattr(self._active_bulk_writes, 'records', {}).iteritems():
|
||||
if record.active:
|
||||
yield (course_key, record)
|
||||
|
||||
def _clear_bulk_write_record(self, course_key):
|
||||
if not isinstance(course_key, CourseLocator):
|
||||
raise TypeError('{!r} is not a CourseLocator'.format(course_key))
|
||||
|
||||
if not hasattr(self._active_bulk_writes, 'records'):
|
||||
return
|
||||
|
||||
if course_key.org and course_key.course and course_key.run:
|
||||
del self._active_bulk_writes.records[course_key.replace(branch=None, version_guid=None)]
|
||||
del self._active_bulk_ops.records[course_key.replace(branch=None, version_guid=None)]
|
||||
else:
|
||||
del self._active_bulk_writes.records[course_key.replace(org=None, course=None, run=None, branch=None)]
|
||||
del self._active_bulk_ops.records[
|
||||
course_key.replace(org=None, course=None, run=None, branch=None)
|
||||
]
|
||||
|
||||
def _begin_bulk_operation(self, course_key):
|
||||
def _start_outermost_bulk_operation(self, bulk_write_record, course_key):
|
||||
"""
|
||||
Begin a bulk write operation on course_key.
|
||||
"""
|
||||
bulk_write_record = self._get_bulk_write_record(course_key)
|
||||
bulk_write_record.initial_index = self.db_connection.get_course_index(course_key)
|
||||
# Ensure that any edits to the index don't pollute the initial_index
|
||||
bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index)
|
||||
|
||||
# Increment the number of active bulk operations (bulk operations
|
||||
# on the same course can be nested)
|
||||
bulk_write_record.nest()
|
||||
|
||||
# If this is the highest level bulk operation, then initialize it
|
||||
if bulk_write_record.is_root:
|
||||
bulk_write_record.initial_index = self.db_connection.get_course_index(course_key)
|
||||
# Ensure that any edits to the index don't pollute the initial_index
|
||||
bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index)
|
||||
|
||||
def _end_bulk_operation(self, course_key):
|
||||
def _end_outermost_bulk_operation(self, bulk_write_record, course_key):
|
||||
"""
|
||||
End the active bulk write operation on course_key.
|
||||
"""
|
||||
# If no bulk write is active, return
|
||||
bulk_write_record = self._get_bulk_write_record(course_key)
|
||||
if not bulk_write_record.active:
|
||||
return
|
||||
|
||||
bulk_write_record.unnest()
|
||||
|
||||
# If this wasn't the outermost context, then don't close out the
|
||||
# bulk write operation.
|
||||
if bulk_write_record.active:
|
||||
return
|
||||
|
||||
# This is the last active bulk write. If the content is dirty,
|
||||
# then update the database
|
||||
# If the content is dirty, then update the database
|
||||
for _id in bulk_write_record.structures.viewkeys() - bulk_write_record.structures_in_db:
|
||||
self.db_connection.upsert_structure(bulk_write_record.structures[_id])
|
||||
|
||||
@@ -290,25 +226,17 @@ class BulkWriteMixin(object):
|
||||
else:
|
||||
self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index)
|
||||
|
||||
self._clear_bulk_write_record(course_key)
|
||||
|
||||
def _is_in_bulk_write_operation(self, course_key, ignore_case=False):
|
||||
"""
|
||||
Return whether a bulk write is active on `course_key`.
|
||||
"""
|
||||
return self._get_bulk_write_record(course_key, ignore_case).active
|
||||
|
||||
def get_course_index(self, course_key, ignore_case=False):
|
||||
"""
|
||||
Return the index for course_key.
|
||||
"""
|
||||
if self._is_in_bulk_write_operation(course_key, ignore_case):
|
||||
return self._get_bulk_write_record(course_key, ignore_case).index
|
||||
if self._is_in_bulk_operation(course_key, ignore_case):
|
||||
return self._get_bulk_ops_record(course_key, ignore_case).index
|
||||
else:
|
||||
return self.db_connection.get_course_index(course_key, ignore_case)
|
||||
|
||||
def insert_course_index(self, course_key, index_entry):
|
||||
bulk_write_record = self._get_bulk_write_record(course_key)
|
||||
bulk_write_record = self._get_bulk_ops_record(course_key)
|
||||
if bulk_write_record.active:
|
||||
bulk_write_record.index = index_entry
|
||||
else:
|
||||
@@ -322,14 +250,14 @@ class BulkWriteMixin(object):
|
||||
|
||||
Does not return anything useful.
|
||||
"""
|
||||
bulk_write_record = self._get_bulk_write_record(course_key)
|
||||
bulk_write_record = self._get_bulk_ops_record(course_key)
|
||||
if bulk_write_record.active:
|
||||
bulk_write_record.index = updated_index_entry
|
||||
else:
|
||||
self.db_connection.update_course_index(updated_index_entry)
|
||||
|
||||
def get_structure(self, course_key, version_guid):
|
||||
bulk_write_record = self._get_bulk_write_record(course_key)
|
||||
bulk_write_record = self._get_bulk_ops_record(course_key)
|
||||
if bulk_write_record.active:
|
||||
structure = bulk_write_record.structures.get(version_guid)
|
||||
|
||||
@@ -352,7 +280,7 @@ class BulkWriteMixin(object):
|
||||
(no data will be written to the database if a bulk operation is active.)
|
||||
"""
|
||||
self._clear_cache(structure['_id'])
|
||||
bulk_write_record = self._get_bulk_write_record(course_key)
|
||||
bulk_write_record = self._get_bulk_ops_record(course_key)
|
||||
if bulk_write_record.active:
|
||||
bulk_write_record.structures[structure['_id']] = structure
|
||||
else:
|
||||
@@ -365,7 +293,7 @@ class BulkWriteMixin(object):
|
||||
if course_key.branch is None:
|
||||
raise InsufficientSpecificationError(course_key)
|
||||
|
||||
bulk_write_record = self._get_bulk_write_record(course_key)
|
||||
bulk_write_record = self._get_bulk_ops_record(course_key)
|
||||
|
||||
# If we have an active bulk write, and it's already been edited, then just use that structure
|
||||
if bulk_write_record.active and course_key.branch in bulk_write_record.dirty_branches:
|
||||
@@ -507,7 +435,7 @@ class BulkWriteMixin(object):
|
||||
return structures
|
||||
|
||||
|
||||
class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
|
||||
class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
|
||||
"""
|
||||
A Mongodb backed ModuleStore supporting versions, inheritance,
|
||||
and sharing.
|
||||
@@ -2124,7 +2052,7 @@ class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
|
||||
if not isinstance(new_id, ObjectId):
|
||||
raise TypeError('new_id must be an ObjectId, but is {!r}'.format(new_id))
|
||||
index_entry['versions'][branch] = new_id
|
||||
self.insert_course_index(course_key, index_entry)
|
||||
self.update_course_index(course_key, index_entry)
|
||||
|
||||
def partition_xblock_fields_by_scope(self, xblock):
|
||||
"""
|
||||
|
||||
@@ -916,7 +916,7 @@ class TestMixedModuleStore(unittest.TestCase):
|
||||
# TODO: LMS-11220: Document why draft send count is 5
|
||||
# TODO: LMS-11220: Document why draft find count is [19, 6]
|
||||
# TODO: LMS-11220: Document why split find count is [2, 2]
|
||||
@ddt.data(('draft', [20, 5], 0), ('split', [17, 6], 0)) # FIXME, replace w/ above when bulk reenabled
|
||||
@ddt.data(('draft', [20, 5], 0), ('split', [2, 2], 0))
|
||||
@ddt.unpack
|
||||
def test_path_to_location(self, default_ms, num_finds, num_sends):
|
||||
"""
|
||||
|
||||
@@ -37,17 +37,19 @@ class TestPublish(SplitWMongoCourseBoostrapper):
|
||||
# - get last error
|
||||
# - load parent
|
||||
# - load inheritable data
|
||||
with check_mongo_calls(10, 6):
|
||||
with check_mongo_calls(7, 4):
|
||||
self._create_item('vertical', 'Vert1', {}, {'display_name': 'Vertical 1'}, 'chapter', 'Chapter1', split=False)
|
||||
self._create_item('vertical', 'Vert2', {}, {'display_name': 'Vertical 2'}, 'chapter', 'Chapter1', split=False)
|
||||
# For each (4) item created
|
||||
# - load draft
|
||||
# - load non-draft
|
||||
# - try to find draft
|
||||
# - try to find non-draft
|
||||
# - retrieve draft of new parent
|
||||
# - get last error
|
||||
# - load parent
|
||||
# - load inheritable data
|
||||
# - load parent
|
||||
with check_mongo_calls(24, 12):
|
||||
# count for updates increased to 16 b/c of edit_info updating
|
||||
with check_mongo_calls(16, 8):
|
||||
self._create_item('html', 'Html1', "<p>Goodbye</p>", {'display_name': 'Parented Html'}, 'vertical', 'Vert1', split=False)
|
||||
self._create_item(
|
||||
'discussion', 'Discussion1',
|
||||
@@ -92,7 +94,7 @@ class TestPublish(SplitWMongoCourseBoostrapper):
|
||||
# 25-June-2014 find calls are 19. Probably due to inheritance recomputation?
|
||||
# 02-July-2014 send calls are 7. 5 from above, plus 2 for updating subtree edit info for Chapter1 and course
|
||||
# find calls are 22. 19 from above, plus 3 for finding the parent of Vert1, Chapter1, and course
|
||||
with check_mongo_calls(25, 7):
|
||||
with check_mongo_calls(20, 7):
|
||||
self.draft_mongo.publish(item.location, self.user_id)
|
||||
|
||||
# verify status
|
||||
|
||||
@@ -3,16 +3,16 @@ import ddt
|
||||
import unittest
|
||||
from bson.objectid import ObjectId
|
||||
from mock import MagicMock, Mock, call
|
||||
from xmodule.modulestore.split_mongo.split import BulkWriteMixin
|
||||
from xmodule.modulestore.split_mongo.split import SplitBulkWriteMixin
|
||||
from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection
|
||||
|
||||
from opaque_keys.edx.locator import CourseLocator, BlockUsageLocator, VersionTree, LocalId
|
||||
from opaque_keys.edx.locator import CourseLocator
|
||||
|
||||
|
||||
class TestBulkWriteMixin(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super(TestBulkWriteMixin, self).setUp()
|
||||
self.bulk = BulkWriteMixin()
|
||||
self.bulk = SplitBulkWriteMixin()
|
||||
self.bulk.SCHEMA_VERSION = 1
|
||||
self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache')
|
||||
self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection)
|
||||
@@ -407,6 +407,7 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin):
|
||||
self.conn.find_ancestor_structures.assert_called_once_with(original_version, block_id)
|
||||
self.assertItemsEqual(active_match + db_match, results)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestBulkWriteMixinOpen(TestBulkWriteMixin):
|
||||
"""
|
||||
|
||||
@@ -10,6 +10,7 @@ from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore
|
||||
from xmodule.modulestore.mongo import DraftMongoModuleStore
|
||||
from xmodule.modulestore import ModuleStoreEnum
|
||||
from xmodule.modulestore.tests.mongo_connection import MONGO_PORT_NUM, MONGO_HOST
|
||||
from xmodule.modulestore.tests.test_cross_modulestore_import_export import MemoryCache
|
||||
|
||||
|
||||
class SplitWMongoCourseBoostrapper(unittest.TestCase):
|
||||
@@ -26,7 +27,7 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase):
|
||||
* split_course_key (CourseLocator): of the new course
|
||||
* old_course_key: the SlashSpecifiedCourseKey for the course
|
||||
"""
|
||||
# Snippet of what would be in the django settings envs file
|
||||
# Snippet of what would be in the django settings envs file
|
||||
db_config = {
|
||||
'host': MONGO_HOST,
|
||||
'port': MONGO_PORT_NUM,
|
||||
@@ -55,7 +56,9 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase):
|
||||
self.addCleanup(self.split_mongo.db.connection.close)
|
||||
self.addCleanup(self.tear_down_split)
|
||||
self.draft_mongo = DraftMongoModuleStore(
|
||||
None, self.db_config, branch_setting_func=lambda: ModuleStoreEnum.Branch.draft_preferred, **self.modulestore_options
|
||||
None, self.db_config, branch_setting_func=lambda: ModuleStoreEnum.Branch.draft_preferred,
|
||||
metadata_inheritance_cache_subsystem=MemoryCache(),
|
||||
**self.modulestore_options
|
||||
)
|
||||
self.addCleanup(self.tear_down_mongo)
|
||||
self.old_course_key = None
|
||||
|
||||
@@ -110,8 +110,7 @@ class FieldDataCache(object):
|
||||
|
||||
return descriptors
|
||||
|
||||
# FIXME
|
||||
with modulestore().bulk_temp_noop_operations(descriptor.location.course_key):
|
||||
with modulestore().bulk_operations(descriptor.location.course_key):
|
||||
descriptors = get_child_descriptors(descriptor, depth, descriptor_filter)
|
||||
|
||||
return FieldDataCache(descriptors, course_id, user, select_for_update)
|
||||
|
||||
@@ -332,10 +332,9 @@ class TestTOC(ModuleStoreTestCase):
|
||||
self.toy_loc, self.request.user, self.toy_course, depth=2
|
||||
)
|
||||
|
||||
|
||||
# TODO: LMS-11220: Document why split find count is 9
|
||||
# TODO: LMS-11220: Document why mongo find count is 4
|
||||
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 21, 0))
|
||||
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 9, 0))
|
||||
@ddt.unpack
|
||||
def test_toc_toy_from_chapter(self, default_ms, num_finds, num_sends):
|
||||
with self.store.default_store(default_ms):
|
||||
@@ -364,7 +363,7 @@ class TestTOC(ModuleStoreTestCase):
|
||||
|
||||
# TODO: LMS-11220: Document why split find count is 9
|
||||
# TODO: LMS-11220: Document why mongo find count is 4
|
||||
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 21, 0))
|
||||
@ddt.data((ModuleStoreEnum.Type.mongo, 3, 0), (ModuleStoreEnum.Type.split, 9, 0))
|
||||
@ddt.unpack
|
||||
def test_toc_toy_from_section(self, default_ms, num_finds, num_sends):
|
||||
with self.store.default_store(default_ms):
|
||||
|
||||
Reference in New Issue
Block a user