Fix get_courses to handle inflight changes
more rigorously.
This commit is contained in:
@@ -4,14 +4,11 @@ Segregation of pymongo functions from the data modeling mechanisms for split mod
|
||||
import re
|
||||
from mongodb_proxy import autoretry_read, MongoProxy
|
||||
import pymongo
|
||||
import time
|
||||
|
||||
# Import this just to export it
|
||||
from pymongo.errors import DuplicateKeyError # pylint: disable=unused-import
|
||||
|
||||
from contracts import check
|
||||
from functools import wraps
|
||||
from pymongo.errors import AutoReconnect
|
||||
from xmodule.exceptions import HeartbeatFailure
|
||||
from xmodule.modulestore.split_mongo import BlockKey
|
||||
import datetime
|
||||
@@ -238,15 +235,15 @@ class MongoConnection(object):
|
||||
course_index['last_update'] = datetime.datetime.now(pytz.utc)
|
||||
self.course_index.update(query, course_index, upsert=False,)
|
||||
|
||||
def delete_course_index(self, course_index):
|
||||
def delete_course_index(self, course_key):
|
||||
"""
|
||||
Delete the course_index from the persistence mechanism whose id is the given course_index
|
||||
"""
|
||||
return self.course_index.remove({
|
||||
'org': course_index['org'],
|
||||
'course': course_index['course'],
|
||||
'run': course_index['run'],
|
||||
})
|
||||
query = {
|
||||
key_attr: getattr(course_key, key_attr)
|
||||
for key_attr in ('org', 'course', 'run')
|
||||
}
|
||||
return self.course_index.remove(query)
|
||||
|
||||
def get_definition(self, key):
|
||||
"""
|
||||
|
||||
@@ -262,6 +262,15 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
else:
|
||||
return self.db_connection.get_course_index(course_key, ignore_case)
|
||||
|
||||
def delete_course_index(self, course_key):
|
||||
"""
|
||||
Delete the course index from cache and the db
|
||||
"""
|
||||
if self._is_in_bulk_operation(course_key, False):
|
||||
self._clear_bulk_ops_record(course_key)
|
||||
|
||||
self.db_connection.delete_course_index(course_key)
|
||||
|
||||
def insert_course_index(self, course_key, index_entry):
|
||||
bulk_write_record = self._get_bulk_ops_record(course_key)
|
||||
if bulk_write_record.active:
|
||||
@@ -452,9 +461,23 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
def find_matching_course_indexes(self, branch=None, search_targets=None):
|
||||
"""
|
||||
Find the course_indexes which have the specified branch and search_targets.
|
||||
|
||||
Returns:
|
||||
a Cursor if there are no changes in flight or a list if some have changed in current bulk op
|
||||
"""
|
||||
indexes = self.db_connection.find_matching_course_indexes(branch, search_targets)
|
||||
|
||||
def _replace_or_append_index(altered_index):
|
||||
"""
|
||||
If the index is already in indexes, replace it. Otherwise, append it.
|
||||
"""
|
||||
for index, existing in enumerate(indexes):
|
||||
if all(existing[attr] == altered_index[attr] for attr in ['org', 'course', 'run']):
|
||||
indexes[index] = altered_index
|
||||
return
|
||||
indexes.append(altered_index)
|
||||
|
||||
# add any being built but not yet persisted or in the process of being updated
|
||||
for _, record in self._active_records:
|
||||
if branch and branch not in record.index.get('versions', {}):
|
||||
continue
|
||||
@@ -468,7 +491,10 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
):
|
||||
continue
|
||||
|
||||
indexes.append(record.index)
|
||||
if not hasattr(indexes, 'append'): # Just in time conversion to list from cursor
|
||||
indexes = list(indexes)
|
||||
|
||||
_replace_or_append_index(record.index)
|
||||
|
||||
return indexes
|
||||
|
||||
@@ -2116,12 +2142,9 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
|
||||
with a versions hash to restore the course; however, the edited_on and
|
||||
edited_by won't reflect the originals, of course.
|
||||
"""
|
||||
index = self.get_course_index(course_key)
|
||||
if index is None:
|
||||
raise ItemNotFoundError(course_key)
|
||||
# this is the only real delete in the system. should it do something else?
|
||||
log.info(u"deleting course from split-mongo: %s", course_key)
|
||||
self.db_connection.delete_course_index(index)
|
||||
self.delete_course_index(course_key)
|
||||
|
||||
# We do NOT call the super class here since we need to keep the assets
|
||||
# in case the course is later restored.
|
||||
|
||||
@@ -1558,6 +1558,40 @@ class TestCourseCreation(SplitModuleTest):
|
||||
dupe_course_key.org, dupe_course_key.course, dupe_course_key.run, user, BRANCH_NAME_DRAFT
|
||||
)
|
||||
|
||||
def test_bulk_ops_get_courses(self):
|
||||
"""
|
||||
Test get_courses when some are created, updated, and deleted w/in a bulk operation
|
||||
"""
|
||||
# create 3 courses before bulk operation
|
||||
split_store = modulestore()
|
||||
|
||||
user = random.getrandbits(32)
|
||||
to_be_created = split_store.make_course_key('new', 'created', 'course')
|
||||
with split_store.bulk_operations(to_be_created):
|
||||
split_store.create_course(
|
||||
to_be_created.org, to_be_created.course, to_be_created.run, user, master_branch=BRANCH_NAME_DRAFT,
|
||||
)
|
||||
|
||||
modified_course_loc = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
|
||||
with split_store.bulk_operations(modified_course_loc):
|
||||
modified_course = modulestore().get_course(modified_course_loc)
|
||||
modified_course.advertised_start = 'coming soon to a theater near you'
|
||||
split_store.update_item(modified_course, user)
|
||||
|
||||
to_be_deleted = split_store.make_course_key("guestx", "contender", "run")
|
||||
with split_store.bulk_operations(to_be_deleted):
|
||||
split_store.delete_course(to_be_deleted, user)
|
||||
|
||||
# now get_courses
|
||||
courses = split_store.get_courses(BRANCH_NAME_DRAFT)
|
||||
|
||||
self.assertEqual(len(courses), 3)
|
||||
course_ids = [course.id.for_branch(None) for course in courses]
|
||||
self.assertNotIn(to_be_deleted, course_ids)
|
||||
self.assertIn(to_be_created, course_ids)
|
||||
fetched_modified = [course for course in courses if course.id == modified_course_loc][0]
|
||||
self.assertEqual(fetched_modified.advertised_start, modified_course.advertised_start)
|
||||
|
||||
|
||||
class TestInheritance(SplitModuleTest):
|
||||
"""
|
||||
|
||||
@@ -333,10 +333,12 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin):
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_find_matching_course_indexes(self, branch, search_targets, matching, unmatching):
|
||||
db_indexes = [Mock(name='from_db')]
|
||||
db_indexes = [{'org': 'what', 'course': 'this', 'run': 'needs'}]
|
||||
for n, index in enumerate(matching + unmatching):
|
||||
course_key = CourseLocator('org', 'course', 'run{}'.format(n))
|
||||
self.bulk._begin_bulk_operation(course_key)
|
||||
for attr in ['org', 'course', 'run']:
|
||||
index[attr] = getattr(course_key, attr)
|
||||
self.bulk.insert_course_index(course_key, index)
|
||||
|
||||
expected = matching + db_indexes
|
||||
|
||||
Reference in New Issue
Block a user