From 1a17b31a8ba4c3aa33b1591b81bcbe114a76a548 Mon Sep 17 00:00:00 2001 From: Calen Pennington Date: Thu, 25 Sep 2014 11:17:23 -0400 Subject: [PATCH] Make definitions and structures truly append-only --- .../split_mongo/mongo_connection.py | 14 +- .../xmodule/modulestore/split_mongo/split.py | 172 ++++++++++-------- .../test_split_modulestore_bulk_operations.py | 33 ++-- 3 files changed, 121 insertions(+), 98 deletions(-) diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py index 1f04a263c8..a48b2f3ea7 100644 --- a/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py +++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py @@ -4,6 +4,10 @@ Segregation of pymongo functions from the data modeling mechanisms for split mod import re import pymongo import time + +# Import this just to export it +from pymongo.errors import DuplicateKeyError # pylint: disable=unused-import + from contracts import check from functools import wraps from pymongo.errors import AutoReconnect @@ -182,11 +186,11 @@ class MongoConnection(object): } })] - def upsert_structure(self, structure): + def insert_structure(self, structure): """ - Update the db record for structure, creating that record if it doesn't already exist + Insert a new structure into the database. """ - self.structures.update({'_id': structure['_id']}, structure_to_mongo(structure), upsert=True) + self.structures.insert(structure_to_mongo(structure)) @autoretry_read() def get_course_index(self, key, ignore_case=False): @@ -274,11 +278,11 @@ class MongoConnection(object): """ return self.definitions.find({'$in': {'_id': definitions}}) - def upsert_definition(self, definition): + def insert_definition(self, definition): """ Create the definition in the db """ - self.definitions.update({'_id': definition['_id']}, definition, upsert=True) + self.definitions.insert(definition) def ensure_indexes(self): """ diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py index 544415da04..563daafdd1 100644 --- a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py +++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py @@ -75,7 +75,7 @@ from xmodule.modulestore import ( from ..exceptions import ItemNotFoundError from .caching_descriptor_system import CachingDescriptorSystem -from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection +from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection, DuplicateKeyError from xmodule.modulestore.split_mongo import BlockKey, CourseEnvelope from xmodule.error_module import ErrorDescriptor from collections import defaultdict @@ -226,10 +226,22 @@ class SplitBulkWriteMixin(BulkOperationsMixin): """ # If the content is dirty, then update the database for _id in bulk_write_record.structures.viewkeys() - bulk_write_record.structures_in_db: - self.db_connection.upsert_structure(bulk_write_record.structures[_id]) + try: + self.db_connection.insert_structure(bulk_write_record.structures[_id]) + except DuplicateKeyError: + # We may not have looked up this structure inside this bulk operation, and thus + # didn't realize that it was already in the database. That's OK, the store is + # append only, so if it's already been written, we can just keep going. + log.debug("Attempted to insert duplicate structure %s", _id) for _id in bulk_write_record.definitions.viewkeys() - bulk_write_record.definitions_in_db: - self.db_connection.upsert_definition(bulk_write_record.definitions[_id]) + try: + self.db_connection.insert_definition(bulk_write_record.definitions[_id]) + except DuplicateKeyError: + # We may not have looked up this definition inside this bulk operation, and thus + # didn't realize that it was already in the database. That's OK, the store is + # append only, so if it's already been written, we can just keep going. + log.debug("Attempted to insert duplicate definition %s", _id) if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index: if bulk_write_record.initial_index is None: @@ -295,7 +307,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): if bulk_write_record.active: bulk_write_record.structures[structure['_id']] = structure else: - self.db_connection.upsert_structure(structure) + self.db_connection.insert_structure(structure) def get_definition(self, course_key, definition_guid): """ @@ -323,7 +335,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): definition_guid = course_key.as_object_id(definition_guid) return self.db_connection.get_definition(definition_guid) - def get_definitions(self, ids): + def get_definitions(self, course_key, ids): """ Return all definitions that specified in ``ids``. @@ -331,13 +343,16 @@ class SplitBulkWriteMixin(BulkOperationsMixin): the cached version will be preferred. Arguments: + course_key (:class:`.CourseKey`): The course that these definitions are being loaded + for (to respect bulk operations). ids (list): A list of definition ids """ definitions = [] ids = set(ids) - for _, record in self._active_records: - for definition in record.definitions.values(): + bulk_write_record = self._get_bulk_ops_record(course_key) + if bulk_write_record.active: + for definition in bulk_write_record.definitions.values(): definition_id = definition.get('_id') if definition_id in ids: ids.remove(definition_id) @@ -356,7 +371,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): if bulk_write_record.active: bulk_write_record.definitions[definition['_id']] = definition else: - self.db_connection.upsert_definition(definition) + self.db_connection.insert_definition(definition) def version_structure(self, course_key, structure, user_id): """ @@ -596,10 +611,13 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): if not lazy: # Load all descendants by id - descendent_definitions = self.get_definitions([ - block['definition'] - for block in new_module_data.itervalues() - ]) + descendent_definitions = self.get_definitions( + course_key, + [ + block['definition'] + for block in new_module_data.itervalues() + ] + ) # turn into a map definitions = {definition['_id']: definition for definition in descendent_definitions} @@ -1163,16 +1181,17 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): new_def_data = self._serialize_fields(old_definition['block_type'], new_def_data) if needs_saved(): - # new id to create new version - old_definition['_id'] = ObjectId() - old_definition['fields'] = new_def_data - old_definition['edit_info']['edited_by'] = user_id - old_definition['edit_info']['edited_on'] = datetime.datetime.now(UTC) + # Do a deep copy so that we don't corrupt the cached version of the definition + new_definition = copy.deepcopy(old_definition) + new_definition['_id'] = ObjectId() + new_definition['fields'] = new_def_data + new_definition['edit_info']['edited_by'] = user_id + new_definition['edit_info']['edited_on'] = datetime.datetime.now(UTC) # previous version id - old_definition['edit_info']['previous_version'] = definition_locator.definition_id - old_definition['schema_version'] = self.SCHEMA_VERSION - self.update_definition(course_key, old_definition) - return DefinitionLocator(old_definition['block_type'], old_definition['_id']), True + new_definition['edit_info']['previous_version'] = definition_locator.definition_id + new_definition['schema_version'] = self.SCHEMA_VERSION + self.update_definition(course_key, new_definition) + return DefinitionLocator(new_definition['block_type'], new_definition['_id']), True else: return definition_locator, False @@ -1482,7 +1501,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): if block_fields is not None: root_block['fields'].update(self._serialize_fields(root_category, block_fields)) if definition_fields is not None: - definition = self.get_definition(locator, root_block['definition']) + definition = copy.deepcopy(self.get_definition(locator, root_block['definition'])) definition['fields'].update(definition_fields) definition['edit_info']['previous_version'] = definition['_id'] definition['edit_info']['edited_by'] = user_id @@ -1839,65 +1858,66 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): """ # get the destination's index, and source and destination structures. with self.bulk_operations(source_course): - with self.bulk_operations(destination_course): - source_structure = self._lookup_course(source_course).structure - index_entry = self.get_course_index(destination_course) - if index_entry is None: - # brand new course - raise ItemNotFoundError(destination_course) - if destination_course.branch not in index_entry['versions']: - # must be copying the dag root if there's no current dag - root_block_key = source_structure['root'] - if not any(root_block_key == BlockKey.from_usage_key(subtree) for subtree in subtree_list): - raise ItemNotFoundError(u'Must publish course root {}'.format(root_block_key)) - root_source = source_structure['blocks'][root_block_key] - # create branch - destination_structure = self._new_structure( - user_id, root_block_key, - # leave off the fields b/c the children must be filtered - definition_id=root_source['definition'], - ) - else: - destination_structure = self._lookup_course(destination_course).structure - destination_structure = self.version_structure(destination_course, destination_structure, user_id) + source_structure = self._lookup_course(source_course).structure - if blacklist != EXCLUDE_ALL: - blacklist = [BlockKey.from_usage_key(shunned) for shunned in blacklist or []] - # iterate over subtree list filtering out blacklist. - orphans = set() - destination_blocks = destination_structure['blocks'] - for subtree_root in subtree_list: - if BlockKey.from_usage_key(subtree_root) != source_structure['root']: - # find the parents and put root in the right sequence - parent = self._get_parent_from_structure(BlockKey.from_usage_key(subtree_root), source_structure) - if parent is not None: # may be a detached category xblock - if not parent in destination_blocks: - raise ItemNotFoundError(parent) - orphans.update( - self._sync_children( - source_structure['blocks'][parent], - destination_blocks[parent], - BlockKey.from_usage_key(subtree_root) - ) + with self.bulk_operations(destination_course): + index_entry = self.get_course_index(destination_course) + if index_entry is None: + # brand new course + raise ItemNotFoundError(destination_course) + if destination_course.branch not in index_entry['versions']: + # must be copying the dag root if there's no current dag + root_block_key = source_structure['root'] + if not any(root_block_key == BlockKey.from_usage_key(subtree) for subtree in subtree_list): + raise ItemNotFoundError(u'Must publish course root {}'.format(root_block_key)) + root_source = source_structure['blocks'][root_block_key] + # create branch + destination_structure = self._new_structure( + user_id, root_block_key, + # leave off the fields b/c the children must be filtered + definition_id=root_source['definition'], + ) + else: + destination_structure = self._lookup_course(destination_course).structure + destination_structure = self.version_structure(destination_course, destination_structure, user_id) + + if blacklist != EXCLUDE_ALL: + blacklist = [BlockKey.from_usage_key(shunned) for shunned in blacklist or []] + # iterate over subtree list filtering out blacklist. + orphans = set() + destination_blocks = destination_structure['blocks'] + for subtree_root in subtree_list: + if BlockKey.from_usage_key(subtree_root) != source_structure['root']: + # find the parents and put root in the right sequence + parent = self._get_parent_from_structure(BlockKey.from_usage_key(subtree_root), source_structure) + if parent is not None: # may be a detached category xblock + if not parent in destination_blocks: + raise ItemNotFoundError(parent) + orphans.update( + self._sync_children( + source_structure['blocks'][parent], + destination_blocks[parent], + BlockKey.from_usage_key(subtree_root) ) - # update/create the subtree and its children in destination (skipping blacklist) - orphans.update( - self._copy_subdag( - user_id, destination_structure['_id'], - BlockKey.from_usage_key(subtree_root), - source_structure['blocks'], - destination_blocks, - blacklist ) + # update/create the subtree and its children in destination (skipping blacklist) + orphans.update( + self._copy_subdag( + user_id, destination_structure['_id'], + BlockKey.from_usage_key(subtree_root), + source_structure['blocks'], + destination_blocks, + blacklist ) - # remove any remaining orphans - for orphan in orphans: - # orphans will include moved as well as deleted xblocks. Only delete the deleted ones. - self._delete_if_true_orphan(orphan, destination_structure) + ) + # remove any remaining orphans + for orphan in orphans: + # orphans will include moved as well as deleted xblocks. Only delete the deleted ones. + self._delete_if_true_orphan(orphan, destination_structure) - # update the db - self.update_structure(destination_course, destination_structure) - self._update_head(destination_course, index_entry, destination_course.branch, destination_structure['_id']) + # update the db + self.update_structure(destination_course, destination_structure) + self._update_head(destination_course, index_entry, destination_course.branch, destination_structure['_id']) def delete_item(self, usage_locator, user_id, force=False): """ diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py index f5e6af6e8c..5ff112c961 100644 --- a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py @@ -64,7 +64,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): # call through to the db_connection. It should also clear the # system cache self.bulk.update_structure(self.course_key, self.structure) - self.assertConnCalls(call.upsert_structure(self.structure)) + self.assertConnCalls(call.insert_structure(self.structure)) self.clear_cache.assert_called_once_with(self.structure['_id']) @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) @@ -79,7 +79,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): # Writing a definition when no bulk operation is active should just # call through to the db_connection. self.bulk.update_definition(self.course_key, self.definition) - self.assertConnCalls(call.upsert_definition(self.definition)) + self.assertConnCalls(call.insert_definition(self.definition)) @ddt.data(True, False) def test_no_bulk_read_index(self, ignore_case): @@ -128,7 +128,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk.update_structure(self.course_key, self.structure) self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) - self.assertConnCalls(call.upsert_structure(self.structure)) + self.assertConnCalls(call.insert_structure(self.structure)) def test_write_multiple_structures_on_close(self): self.conn.get_course_index.return_value = None @@ -140,7 +140,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( - [call.upsert_structure(self.structure), call.upsert_structure(other_structure)], + [call.insert_structure(self.structure), call.insert_structure(other_structure)], self.conn.mock_calls ) @@ -154,7 +154,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertConnCalls( - call.upsert_definition(self.definition), + call.insert_definition(self.definition), call.update_course_index( {'versions': {self.course_key.branch: self.definition['_id']}}, from_index=original_index @@ -173,8 +173,8 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( [ - call.upsert_definition(self.definition), - call.upsert_definition(other_definition), + call.insert_definition(self.definition), + call.insert_definition(other_definition), call.update_course_index( {'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}}, from_index=original_index @@ -190,7 +190,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk.update_definition(self.course_key, self.definition) self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) - self.assertConnCalls(call.upsert_definition(self.definition)) + self.assertConnCalls(call.insert_definition(self.definition)) def test_write_multiple_definitions_on_close(self): self.conn.get_course_index.return_value = None @@ -202,7 +202,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( - [call.upsert_definition(self.definition), call.upsert_definition(other_definition)], + [call.insert_definition(self.definition), call.insert_definition(other_definition)], self.conn.mock_calls ) @@ -216,7 +216,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertConnCalls( - call.upsert_structure(self.structure), + call.insert_structure(self.structure), call.update_course_index( {'versions': {self.course_key.branch: self.structure['_id']}}, from_index=original_index @@ -235,8 +235,8 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( [ - call.upsert_structure(self.structure), - call.upsert_structure(other_structure), + call.insert_structure(self.structure), + call.insert_structure(other_structure), call.update_course_index( {'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}}, from_index=original_index @@ -397,13 +397,12 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin): active_definition = lambda _id: {'active': 'definition', '_id': _id} db_definitions = [db_definition(_id) for _id in db_ids if _id not in active_ids] + self.bulk._begin_bulk_operation(self.course_key) for n, _id in enumerate(active_ids): - course_key = CourseLocator('org', 'course', 'run{}'.format(n)) - self.bulk._begin_bulk_operation(course_key) - self.bulk.update_definition(course_key, active_definition(_id)) + self.bulk.update_definition(self.course_key, active_definition(_id)) self.conn.get_definitions.return_value = db_definitions - results = self.bulk.get_definitions(search_ids) + results = self.bulk.get_definitions(self.course_key, search_ids) self.conn.get_definitions.assert_called_once_with(list(set(search_ids) - set(active_ids))) for _id in active_ids: if _id in search_ids: @@ -669,7 +668,7 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin): index_copy['versions']['draft'] = index['versions']['published'] self.bulk.update_course_index(self.course_key, index_copy) self.bulk._end_bulk_operation(self.course_key) - self.conn.upsert_structure.assert_called_once_with(published_structure) + self.conn.insert_structure.assert_called_once_with(published_structure) self.conn.update_course_index.assert_called_once_with(index_copy, from_index=self.conn.get_course_index.return_value) self.conn.get_course_index.assert_called_once_with(self.course_key)