diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py index ef45f6b689..d4b69a03e4 100644 --- a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py +++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py @@ -114,7 +114,8 @@ class BulkWriteRecord(object): self._structures = {} def set_structure(self, branch, structure): - self.index['versions'][branch] = structure['_id'] + if self.index is not None: + self.index['versions'][branch] = structure['_id'] self._structures[branch] = structure self.dirty_branches.add(branch) @@ -170,7 +171,7 @@ class BulkWriteMixin(object): key.run.lower() == course_key.run.lower() ): return record - # If nothing matches case-insesitively, fall through to creating a new record with the passed in case + # If nothing matches case-insensitively, fall through to creating a new record with the passed in case return self._active_bulk_writes.records[course_key.replace(branch=None, version_guid=None)] else: # If nothing org/course/run aren't set, use a bulk record that is identified just by the version_guid @@ -198,7 +199,9 @@ class BulkWriteMixin(object): if bulk_write_record.active_count > 1: return - bulk_write_record.initial_index = bulk_write_record.index = self.db_connection.get_course_index(course_key) + bulk_write_record.initial_index = self.db_connection.get_course_index(course_key) + # Ensure that any edits to the index don't pollute the initial_index + bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index) def _end_bulk_write_operation(self, course_key): """ @@ -224,12 +227,13 @@ class BulkWriteMixin(object): except DuplicateKeyError: pass # The structure already exists, so we don't have to write it out again - if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index: - if bulk_write_record.initial_index is None: - self.db_connection.insert_course_index(bulk_write_record.index) - else: - self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index) - self._clear_bulk_write_record(course_key) + if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index: + if bulk_write_record.initial_index is None: + self.db_connection.insert_course_index(bulk_write_record.index) + else: + self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index) + + self._clear_bulk_write_record(course_key) def _is_in_bulk_write_operation(self, course_key, ignore_case=False): """ @@ -318,7 +322,7 @@ class BulkWriteMixin(object): return new_structure -class SplitMongoModuleStore(ModuleStoreWriteBase, BulkWriteMixin): +class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase): """ A Mongodb backed ModuleStore supporting versions, inheritance, and sharing. diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py index 22574acc9b..25b73786df 100644 --- a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py @@ -7,7 +7,6 @@ import re import unittest import uuid from importlib import import_module -from mock import MagicMock, Mock, call from path import path from xmodule.course_module import CourseDescriptor @@ -21,8 +20,7 @@ from opaque_keys.edx.locator import CourseLocator, BlockUsageLocator, VersionTre from xmodule.modulestore.inheritance import InheritanceMixin from xmodule.x_module import XModuleMixin from xmodule.fields import Date, Timedelta -from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore, BulkWriteMixin -from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection +from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore from xmodule.modulestore.tests.test_modulestore import check_has_course_method from xmodule.modulestore.tests.mongo_connection import MONGO_PORT_NUM, MONGO_HOST @@ -1736,101 +1734,6 @@ class TestSchema(SplitModuleTest): ) -class TestBulkWriteMixin(unittest.TestCase): - def setUp(self): - self.bulk = BulkWriteMixin() - self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache') - self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection) - - self.course_key = Mock(name='course_key', spec=CourseLocator) - self.course_key_b = Mock(name='course_key_b', spec=CourseLocator) - self.version_guid = Mock(name='version_guid') - self.structure = MagicMock(name='structure') - self.index_entry = MagicMock(name='index_entry') - - def assertConnCalls(self, *calls): - self.assertEqual(list(calls), self.conn.mock_calls) - - def assertCacheNotCleared(self): - self.assertFalse(self.clear_cache.called) - - def test_no_bulk_read_structure(self): - # Reading a structure when no bulk operation is active should just call - # through to the db_connection - result = self.bulk.get_structure(self.course_key, self.version_guid) - self.assertConnCalls(call.get_structure(self.course_key.as_object_id.return_value)) - self.assertEqual(result, self.conn.get_structure.return_value) - self.assertCacheNotCleared() - - def test_no_bulk_write_structure(self): - # Writing a structure when no bulk operation is active should just - # call through to the db_connection. It should also clear the - # system cache - self.bulk.update_structure(self.course_key, self.structure) - self.assertConnCalls(call.upsert_structure(self.structure)) - self.clear_cache.assert_called_once_with(self.structure['_id']) - - def test_no_bulk_read_index(self): - # Reading a course index when no bulk operation is active should just call - # through to the db_connection - result = self.bulk.get_course_index(self.course_key, ignore_case=True) - self.assertConnCalls(call.get_course_index(self.course_key, True)) - self.assertEqual(result, self.conn.get_course_index.return_value) - self.assertCacheNotCleared() - - def test_no_bulk_write_index(self): - # Writing a course index when no bulk operation is active should just call - # through to the db_connection - self.bulk.insert_course_index(self.course_key, self.index_entry) - self.assertConnCalls(call.insert_course_index(self.index_entry)) - self.assertCacheNotCleared() - - def test_read_structure_without_write_from_db(self): - # Reading a structure before it's been written (while in bulk operation mode) - # returns the structure from the database - self.bulk._begin_bulk_write_operation(self.course_key) - result = self.bulk.get_structure(self.course_key, self.version_guid) - self.assertEquals(self.conn.get_structure.call_count, 1) - self.assertEqual(result, self.conn.get_structure.return_value) - self.assertCacheNotCleared() - - def test_read_structure_without_write_only_reads_once(self): - # Reading the same structure multiple times shouldn't hit the database - # more than once - self.test_read_structure_without_write_from_db() - result = self.bulk.get_structure(self.course_key, self.version_guid) - self.assertEquals(self.conn.get_structure.call_count, 1) - self.assertEqual(result, self.conn.get_structure.return_value) - self.assertCacheNotCleared() - - def test_read_structure_after_write_no_db(self): - # Reading a structure that's already been written shouldn't hit the db at all - self.bulk._begin_bulk_write_operation(self.course_key) - self.bulk.update_structure(self.course_key, self.structure) - result = self.bulk.get_structure(self.course_key, self.version_guid) - self.assertEquals(self.conn.get_structure.call_count, 0) - self.assertEqual(result, self.structure) - - def test_read_structure_after_write_after_read(self): - # Reading a structure that's been updated after being pulled from the db should - # still get the updated value - self.test_read_structure_without_write_only_reads_once() - self.conn.get_structure.reset_mock() - self.bulk.update_structure(self.course_key, self.structure) - result = self.bulk.get_structure(self.course_key, self.version_guid) - self.assertEquals(self.conn.get_structure.call_count, 0) - self.assertEqual(result, self.structure) - - # read index after close - # read structure after close - # write index on close - # write structure on close - # close with new index - # close with existing index - # read index after update - # read index without update - - #=========================================== def modulestore(): """ diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py new file mode 100644 index 0000000000..0eadcd4023 --- /dev/null +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py @@ -0,0 +1,266 @@ +import copy +import ddt +import unittest +from bson.objectid import ObjectId +from mock import MagicMock, Mock, call +from xmodule.modulestore.split_mongo.split import BulkWriteMixin +from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection + +from opaque_keys.edx.locator import CourseLocator, BlockUsageLocator, VersionTree, LocalId + + +class TestBulkWriteMixin(unittest.TestCase): + def setUp(self): + super(TestBulkWriteMixin, self).setUp() + self.bulk = BulkWriteMixin() + self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache') + self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection) + + self.course_key = CourseLocator('org', 'course', 'run-a') + self.course_key_b = CourseLocator('org', 'course', 'run-b') + self.structure = {'this': 'is', 'a': 'structure', '_id': ObjectId()} + self.index_entry = {'this': 'is', 'an': 'index'} + + def assertConnCalls(self, *calls): + self.assertEqual(list(calls), self.conn.mock_calls) + + def assertCacheNotCleared(self): + self.assertFalse(self.clear_cache.called) + + +class TestBulkWriteMixinPreviousTransaction(TestBulkWriteMixin): + """ + Verify that opening and closing a transaction doesn't affect later behaviour. + """ + def setUp(self): + super(TestBulkWriteMixinPreviousTransaction, self).setUp() + self.bulk._begin_bulk_write_operation(self.course_key) + self.bulk.insert_course_index(self.course_key, MagicMock('prev-index-entry')) + self.bulk.update_structure(self.course_key, {'this': 'is', 'the': 'previous structure', '_id': ObjectId()}) + self.bulk._end_bulk_write_operation(self.course_key) + self.conn.reset_mock() + self.clear_cache.reset_mock() + + +@ddt.ddt +class TestBulkWriteMixinClosed(TestBulkWriteMixin): + """ + Tests of the bulk write mixin when bulk operations aren't active. + """ + @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) + def test_no_bulk_read_structure(self, version_guid): + # Reading a structure when no bulk operation is active should just call + # through to the db_connection + result = self.bulk.get_structure(self.course_key, version_guid) + self.assertConnCalls(call.get_structure(self.course_key.as_object_id(version_guid))) + self.assertEqual(result, self.conn.get_structure.return_value) + self.assertCacheNotCleared() + + def test_no_bulk_write_structure(self): + # Writing a structure when no bulk operation is active should just + # call through to the db_connection. It should also clear the + # system cache + self.bulk.update_structure(self.course_key, self.structure) + self.assertConnCalls(call.upsert_structure(self.structure)) + self.clear_cache.assert_called_once_with(self.structure['_id']) + + @ddt.data(True, False) + def test_no_bulk_read_index(self, ignore_case): + # Reading a course index when no bulk operation is active should just call + # through to the db_connection + result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case) + self.assertConnCalls(call.get_course_index(self.course_key, ignore_case)) + self.assertEqual(result, self.conn.get_course_index.return_value) + self.assertCacheNotCleared() + + def test_no_bulk_write_index(self): + # Writing a course index when no bulk operation is active should just call + # through to the db_connection + self.bulk.insert_course_index(self.course_key, self.index_entry) + self.assertConnCalls(call.insert_course_index(self.index_entry)) + self.assertCacheNotCleared() + + def test_out_of_order_end(self): + # Calling _end_bulk_write_operation without a corresponding _begin... + # is a noop + self.bulk._end_bulk_write_operation(self.course_key) + + def test_write_new_index_on_close(self): + self.conn.get_course_index.return_value = None + self.bulk._begin_bulk_write_operation(self.course_key) + self.conn.reset_mock() + self.bulk.insert_course_index(self.course_key, self.index_entry) + self.assertConnCalls() + self.bulk._end_bulk_write_operation(self.course_key) + self.conn.insert_course_index.assert_called_once_with(self.index_entry) + + def test_write_updated_index_on_close(self): + old_index = {'this': 'is', 'an': 'old index'} + self.conn.get_course_index.return_value = old_index + self.bulk._begin_bulk_write_operation(self.course_key) + self.conn.reset_mock() + self.bulk.insert_course_index(self.course_key, self.index_entry) + self.assertConnCalls() + self.bulk._end_bulk_write_operation(self.course_key) + self.conn.update_course_index.assert_called_once_with(self.index_entry, from_index=old_index) + + def test_write_structure_on_close(self): + self.conn.get_course_index.return_value = None + self.bulk._begin_bulk_write_operation(self.course_key) + self.conn.reset_mock() + self.bulk.update_structure(self.course_key, self.structure) + self.assertConnCalls() + self.bulk._end_bulk_write_operation(self.course_key) + self.assertConnCalls(call.insert_structure(self.structure)) + + def test_write_multiple_structures_on_close(self): + self.conn.get_course_index.return_value = None + self.bulk._begin_bulk_write_operation(self.course_key) + self.conn.reset_mock() + self.bulk.update_structure(self.course_key.replace(branch='a'), self.structure) + other_structure = {'another': 'structure', '_id': ObjectId()} + self.bulk.update_structure(self.course_key.replace(branch='b'), other_structure) + self.assertConnCalls() + self.bulk._end_bulk_write_operation(self.course_key) + self.assertItemsEqual( + [call.insert_structure(self.structure), call.insert_structure(other_structure)], + self.conn.mock_calls + ) + + def test_write_index_and_structure_on_close(self): + original_index = {'versions': {}} + self.conn.get_course_index.return_value = copy.deepcopy(original_index) + self.bulk._begin_bulk_write_operation(self.course_key) + self.conn.reset_mock() + self.bulk.update_structure(self.course_key, self.structure) + self.assertConnCalls() + self.bulk._end_bulk_write_operation(self.course_key) + self.assertConnCalls( + call.insert_structure(self.structure), + call.update_course_index( + {'versions': {self.course_key.branch: self.structure['_id']}}, + from_index=original_index + ) + ) + + def test_write_index_and_multiple_structures_on_close(self): + original_index = {'versions': {'a': ObjectId(), 'b': ObjectId()}} + self.conn.get_course_index.return_value = copy.deepcopy(original_index) + self.bulk._begin_bulk_write_operation(self.course_key) + self.conn.reset_mock() + self.bulk.update_structure(self.course_key.replace(branch='a'), self.structure) + other_structure = {'another': 'structure', '_id': ObjectId()} + self.bulk.update_structure(self.course_key.replace(branch='b'), other_structure) + self.assertConnCalls() + self.bulk._end_bulk_write_operation(self.course_key) + self.assertItemsEqual( + [ + call.insert_structure(self.structure), + call.insert_structure(other_structure), + call.update_course_index( + {'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}}, + from_index=original_index + ) + ], + self.conn.mock_calls + ) + +class TestBulkWriteMixinClosedAfterPrevTransaction(TestBulkWriteMixinClosed, TestBulkWriteMixinPreviousTransaction): + """ + Test that operations on with a closed transaction aren't affected by a previously executed transaction + """ + pass + + +@ddt.ddt +class TestBulkWriteMixinOpen(TestBulkWriteMixin): + """ + Tests of the bulk write mixin when bulk write operations are open + """ + def setUp(self): + super(TestBulkWriteMixinOpen, self).setUp() + self.bulk._begin_bulk_write_operation(self.course_key) + + @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) + def test_read_structure_without_write_from_db(self, version_guid): + # Reading a structure before it's been written (while in bulk operation mode) + # returns the structure from the database + result = self.bulk.get_structure(self.course_key, version_guid) + self.assertEquals(self.conn.get_structure.call_count, 1) + self.assertEqual(result, self.conn.get_structure.return_value) + self.assertCacheNotCleared() + + @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) + def test_read_structure_without_write_only_reads_once(self, version_guid): + # Reading the same structure multiple times shouldn't hit the database + # more than once + for _ in xrange(2): + result = self.bulk.get_structure(self.course_key, version_guid) + self.assertEquals(self.conn.get_structure.call_count, 1) + self.assertEqual(result, self.conn.get_structure.return_value) + self.assertCacheNotCleared() + + @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) + def test_read_structure_after_write_no_db(self, version_guid): + # Reading a structure that's already been written shouldn't hit the db at all + self.bulk.update_structure(self.course_key, self.structure) + result = self.bulk.get_structure(self.course_key, version_guid) + self.assertEquals(self.conn.get_structure.call_count, 0) + self.assertEqual(result, self.structure) + + @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) + def test_read_structure_after_write_after_read(self, version_guid): + # Reading a structure that's been updated after being pulled from the db should + # still get the updated value + result = self.bulk.get_structure(self.course_key, version_guid) + self.bulk.update_structure(self.course_key, self.structure) + result = self.bulk.get_structure(self.course_key, version_guid) + self.assertEquals(self.conn.get_structure.call_count, 1) + self.assertEqual(result, self.structure) + + @ddt.data(True, False) + def test_read_index_without_write_from_db(self, ignore_case): + # Reading the index without writing to it should pull from the database + result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case) + self.assertEquals(self.conn.get_course_index.call_count, 1) + self.assertEquals(self.conn.get_course_index.return_value, result) + + @ddt.data(True, False) + def test_read_index_without_write_only_reads_once(self, ignore_case): + # Reading the index multiple times should only result in one read from + # the database + for _ in xrange(2): + result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case) + self.assertEquals(self.conn.get_course_index.call_count, 1) + self.assertEquals(self.conn.get_course_index.return_value, result) + + @ddt.data(True, False) + def test_read_index_after_write(self, ignore_case): + # Reading the index after a write still should hit the database once to fetch the + # initial index, and should return the written index_entry + self.bulk.insert_course_index(self.course_key, self.index_entry) + result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case) + self.assertEquals(self.conn.get_course_index.call_count, 1) + self.assertEquals(self.index_entry, result) + + def test_read_index_ignore_case(self): + # Reading using ignore case should find an already written entry with a different case + self.bulk.insert_course_index(self.course_key, self.index_entry) + result = self.bulk.get_course_index( + self.course_key.replace( + org=self.course_key.org.upper(), + course=self.course_key.course.title(), + run=self.course_key.run.upper() + ), + ignore_case=True + ) + self.assertEquals(self.conn.get_course_index.call_count, 1) + self.assertEquals(self.index_entry, result) + + + +class TestBulkWriteMixinOpenAfterPrevTransaction(TestBulkWriteMixinOpen, TestBulkWriteMixinPreviousTransaction): + """ + Test that operations on with an open transaction aren't affected by a previously executed transaction + """ + pass