Extensive testing of bulk operations
This commit is contained in:
@@ -114,7 +114,8 @@ class BulkWriteRecord(object):
|
||||
self._structures = {}
|
||||
|
||||
def set_structure(self, branch, structure):
|
||||
self.index['versions'][branch] = structure['_id']
|
||||
if self.index is not None:
|
||||
self.index['versions'][branch] = structure['_id']
|
||||
self._structures[branch] = structure
|
||||
self.dirty_branches.add(branch)
|
||||
|
||||
@@ -170,7 +171,7 @@ class BulkWriteMixin(object):
|
||||
key.run.lower() == course_key.run.lower()
|
||||
):
|
||||
return record
|
||||
# If nothing matches case-insesitively, fall through to creating a new record with the passed in case
|
||||
# If nothing matches case-insensitively, fall through to creating a new record with the passed in case
|
||||
return self._active_bulk_writes.records[course_key.replace(branch=None, version_guid=None)]
|
||||
else:
|
||||
# If nothing org/course/run aren't set, use a bulk record that is identified just by the version_guid
|
||||
@@ -198,7 +199,9 @@ class BulkWriteMixin(object):
|
||||
if bulk_write_record.active_count > 1:
|
||||
return
|
||||
|
||||
bulk_write_record.initial_index = bulk_write_record.index = self.db_connection.get_course_index(course_key)
|
||||
bulk_write_record.initial_index = self.db_connection.get_course_index(course_key)
|
||||
# Ensure that any edits to the index don't pollute the initial_index
|
||||
bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index)
|
||||
|
||||
def _end_bulk_write_operation(self, course_key):
|
||||
"""
|
||||
@@ -224,12 +227,13 @@ class BulkWriteMixin(object):
|
||||
except DuplicateKeyError:
|
||||
pass # The structure already exists, so we don't have to write it out again
|
||||
|
||||
if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index:
|
||||
if bulk_write_record.initial_index is None:
|
||||
self.db_connection.insert_course_index(bulk_write_record.index)
|
||||
else:
|
||||
self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index)
|
||||
self._clear_bulk_write_record(course_key)
|
||||
if bulk_write_record.index is not None and bulk_write_record.index != bulk_write_record.initial_index:
|
||||
if bulk_write_record.initial_index is None:
|
||||
self.db_connection.insert_course_index(bulk_write_record.index)
|
||||
else:
|
||||
self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index)
|
||||
|
||||
self._clear_bulk_write_record(course_key)
|
||||
|
||||
def _is_in_bulk_write_operation(self, course_key, ignore_case=False):
|
||||
"""
|
||||
@@ -318,7 +322,7 @@ class BulkWriteMixin(object):
|
||||
return new_structure
|
||||
|
||||
|
||||
class SplitMongoModuleStore(ModuleStoreWriteBase, BulkWriteMixin):
|
||||
class SplitMongoModuleStore(BulkWriteMixin, ModuleStoreWriteBase):
|
||||
"""
|
||||
A Mongodb backed ModuleStore supporting versions, inheritance,
|
||||
and sharing.
|
||||
|
||||
@@ -7,7 +7,6 @@ import re
|
||||
import unittest
|
||||
import uuid
|
||||
from importlib import import_module
|
||||
from mock import MagicMock, Mock, call
|
||||
from path import path
|
||||
|
||||
from xmodule.course_module import CourseDescriptor
|
||||
@@ -21,8 +20,7 @@ from opaque_keys.edx.locator import CourseLocator, BlockUsageLocator, VersionTre
|
||||
from xmodule.modulestore.inheritance import InheritanceMixin
|
||||
from xmodule.x_module import XModuleMixin
|
||||
from xmodule.fields import Date, Timedelta
|
||||
from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore, BulkWriteMixin
|
||||
from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection
|
||||
from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore
|
||||
from xmodule.modulestore.tests.test_modulestore import check_has_course_method
|
||||
from xmodule.modulestore.tests.mongo_connection import MONGO_PORT_NUM, MONGO_HOST
|
||||
|
||||
@@ -1736,101 +1734,6 @@ class TestSchema(SplitModuleTest):
|
||||
)
|
||||
|
||||
|
||||
class TestBulkWriteMixin(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.bulk = BulkWriteMixin()
|
||||
self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache')
|
||||
self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection)
|
||||
|
||||
self.course_key = Mock(name='course_key', spec=CourseLocator)
|
||||
self.course_key_b = Mock(name='course_key_b', spec=CourseLocator)
|
||||
self.version_guid = Mock(name='version_guid')
|
||||
self.structure = MagicMock(name='structure')
|
||||
self.index_entry = MagicMock(name='index_entry')
|
||||
|
||||
def assertConnCalls(self, *calls):
|
||||
self.assertEqual(list(calls), self.conn.mock_calls)
|
||||
|
||||
def assertCacheNotCleared(self):
|
||||
self.assertFalse(self.clear_cache.called)
|
||||
|
||||
def test_no_bulk_read_structure(self):
|
||||
# Reading a structure when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
result = self.bulk.get_structure(self.course_key, self.version_guid)
|
||||
self.assertConnCalls(call.get_structure(self.course_key.as_object_id.return_value))
|
||||
self.assertEqual(result, self.conn.get_structure.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_no_bulk_write_structure(self):
|
||||
# Writing a structure when no bulk operation is active should just
|
||||
# call through to the db_connection. It should also clear the
|
||||
# system cache
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
self.assertConnCalls(call.upsert_structure(self.structure))
|
||||
self.clear_cache.assert_called_once_with(self.structure['_id'])
|
||||
|
||||
def test_no_bulk_read_index(self):
|
||||
# Reading a course index when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
result = self.bulk.get_course_index(self.course_key, ignore_case=True)
|
||||
self.assertConnCalls(call.get_course_index(self.course_key, True))
|
||||
self.assertEqual(result, self.conn.get_course_index.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_no_bulk_write_index(self):
|
||||
# Writing a course index when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
self.assertConnCalls(call.insert_course_index(self.index_entry))
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_read_structure_without_write_from_db(self):
|
||||
# Reading a structure before it's been written (while in bulk operation mode)
|
||||
# returns the structure from the database
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
result = self.bulk.get_structure(self.course_key, self.version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 1)
|
||||
self.assertEqual(result, self.conn.get_structure.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_read_structure_without_write_only_reads_once(self):
|
||||
# Reading the same structure multiple times shouldn't hit the database
|
||||
# more than once
|
||||
self.test_read_structure_without_write_from_db()
|
||||
result = self.bulk.get_structure(self.course_key, self.version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 1)
|
||||
self.assertEqual(result, self.conn.get_structure.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_read_structure_after_write_no_db(self):
|
||||
# Reading a structure that's already been written shouldn't hit the db at all
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
result = self.bulk.get_structure(self.course_key, self.version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 0)
|
||||
self.assertEqual(result, self.structure)
|
||||
|
||||
def test_read_structure_after_write_after_read(self):
|
||||
# Reading a structure that's been updated after being pulled from the db should
|
||||
# still get the updated value
|
||||
self.test_read_structure_without_write_only_reads_once()
|
||||
self.conn.get_structure.reset_mock()
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
result = self.bulk.get_structure(self.course_key, self.version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 0)
|
||||
self.assertEqual(result, self.structure)
|
||||
|
||||
# read index after close
|
||||
# read structure after close
|
||||
# write index on close
|
||||
# write structure on close
|
||||
# close with new index
|
||||
# close with existing index
|
||||
# read index after update
|
||||
# read index without update
|
||||
|
||||
|
||||
#===========================================
|
||||
def modulestore():
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,266 @@
|
||||
import copy
|
||||
import ddt
|
||||
import unittest
|
||||
from bson.objectid import ObjectId
|
||||
from mock import MagicMock, Mock, call
|
||||
from xmodule.modulestore.split_mongo.split import BulkWriteMixin
|
||||
from xmodule.modulestore.split_mongo.mongo_connection import MongoConnection
|
||||
|
||||
from opaque_keys.edx.locator import CourseLocator, BlockUsageLocator, VersionTree, LocalId
|
||||
|
||||
|
||||
class TestBulkWriteMixin(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super(TestBulkWriteMixin, self).setUp()
|
||||
self.bulk = BulkWriteMixin()
|
||||
self.clear_cache = self.bulk._clear_cache = Mock(name='_clear_cache')
|
||||
self.conn = self.bulk.db_connection = MagicMock(name='db_connection', spec=MongoConnection)
|
||||
|
||||
self.course_key = CourseLocator('org', 'course', 'run-a')
|
||||
self.course_key_b = CourseLocator('org', 'course', 'run-b')
|
||||
self.structure = {'this': 'is', 'a': 'structure', '_id': ObjectId()}
|
||||
self.index_entry = {'this': 'is', 'an': 'index'}
|
||||
|
||||
def assertConnCalls(self, *calls):
|
||||
self.assertEqual(list(calls), self.conn.mock_calls)
|
||||
|
||||
def assertCacheNotCleared(self):
|
||||
self.assertFalse(self.clear_cache.called)
|
||||
|
||||
|
||||
class TestBulkWriteMixinPreviousTransaction(TestBulkWriteMixin):
|
||||
"""
|
||||
Verify that opening and closing a transaction doesn't affect later behaviour.
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestBulkWriteMixinPreviousTransaction, self).setUp()
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.bulk.insert_course_index(self.course_key, MagicMock('prev-index-entry'))
|
||||
self.bulk.update_structure(self.course_key, {'this': 'is', 'the': 'previous structure', '_id': ObjectId()})
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
self.conn.reset_mock()
|
||||
self.clear_cache.reset_mock()
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
"""
|
||||
Tests of the bulk write mixin when bulk operations aren't active.
|
||||
"""
|
||||
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
|
||||
def test_no_bulk_read_structure(self, version_guid):
|
||||
# Reading a structure when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
result = self.bulk.get_structure(self.course_key, version_guid)
|
||||
self.assertConnCalls(call.get_structure(self.course_key.as_object_id(version_guid)))
|
||||
self.assertEqual(result, self.conn.get_structure.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_no_bulk_write_structure(self):
|
||||
# Writing a structure when no bulk operation is active should just
|
||||
# call through to the db_connection. It should also clear the
|
||||
# system cache
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
self.assertConnCalls(call.upsert_structure(self.structure))
|
||||
self.clear_cache.assert_called_once_with(self.structure['_id'])
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_no_bulk_read_index(self, ignore_case):
|
||||
# Reading a course index when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case)
|
||||
self.assertConnCalls(call.get_course_index(self.course_key, ignore_case))
|
||||
self.assertEqual(result, self.conn.get_course_index.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_no_bulk_write_index(self):
|
||||
# Writing a course index when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
self.assertConnCalls(call.insert_course_index(self.index_entry))
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_out_of_order_end(self):
|
||||
# Calling _end_bulk_write_operation without a corresponding _begin...
|
||||
# is a noop
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
|
||||
def test_write_new_index_on_close(self):
|
||||
self.conn.get_course_index.return_value = None
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.conn.reset_mock()
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
self.conn.insert_course_index.assert_called_once_with(self.index_entry)
|
||||
|
||||
def test_write_updated_index_on_close(self):
|
||||
old_index = {'this': 'is', 'an': 'old index'}
|
||||
self.conn.get_course_index.return_value = old_index
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.conn.reset_mock()
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
self.conn.update_course_index.assert_called_once_with(self.index_entry, from_index=old_index)
|
||||
|
||||
def test_write_structure_on_close(self):
|
||||
self.conn.get_course_index.return_value = None
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.conn.reset_mock()
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
self.assertConnCalls(call.insert_structure(self.structure))
|
||||
|
||||
def test_write_multiple_structures_on_close(self):
|
||||
self.conn.get_course_index.return_value = None
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.conn.reset_mock()
|
||||
self.bulk.update_structure(self.course_key.replace(branch='a'), self.structure)
|
||||
other_structure = {'another': 'structure', '_id': ObjectId()}
|
||||
self.bulk.update_structure(self.course_key.replace(branch='b'), other_structure)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
self.assertItemsEqual(
|
||||
[call.insert_structure(self.structure), call.insert_structure(other_structure)],
|
||||
self.conn.mock_calls
|
||||
)
|
||||
|
||||
def test_write_index_and_structure_on_close(self):
|
||||
original_index = {'versions': {}}
|
||||
self.conn.get_course_index.return_value = copy.deepcopy(original_index)
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.conn.reset_mock()
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
self.assertConnCalls(
|
||||
call.insert_structure(self.structure),
|
||||
call.update_course_index(
|
||||
{'versions': {self.course_key.branch: self.structure['_id']}},
|
||||
from_index=original_index
|
||||
)
|
||||
)
|
||||
|
||||
def test_write_index_and_multiple_structures_on_close(self):
|
||||
original_index = {'versions': {'a': ObjectId(), 'b': ObjectId()}}
|
||||
self.conn.get_course_index.return_value = copy.deepcopy(original_index)
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
self.conn.reset_mock()
|
||||
self.bulk.update_structure(self.course_key.replace(branch='a'), self.structure)
|
||||
other_structure = {'another': 'structure', '_id': ObjectId()}
|
||||
self.bulk.update_structure(self.course_key.replace(branch='b'), other_structure)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_write_operation(self.course_key)
|
||||
self.assertItemsEqual(
|
||||
[
|
||||
call.insert_structure(self.structure),
|
||||
call.insert_structure(other_structure),
|
||||
call.update_course_index(
|
||||
{'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}},
|
||||
from_index=original_index
|
||||
)
|
||||
],
|
||||
self.conn.mock_calls
|
||||
)
|
||||
|
||||
class TestBulkWriteMixinClosedAfterPrevTransaction(TestBulkWriteMixinClosed, TestBulkWriteMixinPreviousTransaction):
|
||||
"""
|
||||
Test that operations on with a closed transaction aren't affected by a previously executed transaction
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestBulkWriteMixinOpen(TestBulkWriteMixin):
|
||||
"""
|
||||
Tests of the bulk write mixin when bulk write operations are open
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestBulkWriteMixinOpen, self).setUp()
|
||||
self.bulk._begin_bulk_write_operation(self.course_key)
|
||||
|
||||
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
|
||||
def test_read_structure_without_write_from_db(self, version_guid):
|
||||
# Reading a structure before it's been written (while in bulk operation mode)
|
||||
# returns the structure from the database
|
||||
result = self.bulk.get_structure(self.course_key, version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 1)
|
||||
self.assertEqual(result, self.conn.get_structure.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
|
||||
def test_read_structure_without_write_only_reads_once(self, version_guid):
|
||||
# Reading the same structure multiple times shouldn't hit the database
|
||||
# more than once
|
||||
for _ in xrange(2):
|
||||
result = self.bulk.get_structure(self.course_key, version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 1)
|
||||
self.assertEqual(result, self.conn.get_structure.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
|
||||
def test_read_structure_after_write_no_db(self, version_guid):
|
||||
# Reading a structure that's already been written shouldn't hit the db at all
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
result = self.bulk.get_structure(self.course_key, version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 0)
|
||||
self.assertEqual(result, self.structure)
|
||||
|
||||
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
|
||||
def test_read_structure_after_write_after_read(self, version_guid):
|
||||
# Reading a structure that's been updated after being pulled from the db should
|
||||
# still get the updated value
|
||||
result = self.bulk.get_structure(self.course_key, version_guid)
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
result = self.bulk.get_structure(self.course_key, version_guid)
|
||||
self.assertEquals(self.conn.get_structure.call_count, 1)
|
||||
self.assertEqual(result, self.structure)
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_read_index_without_write_from_db(self, ignore_case):
|
||||
# Reading the index without writing to it should pull from the database
|
||||
result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case)
|
||||
self.assertEquals(self.conn.get_course_index.call_count, 1)
|
||||
self.assertEquals(self.conn.get_course_index.return_value, result)
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_read_index_without_write_only_reads_once(self, ignore_case):
|
||||
# Reading the index multiple times should only result in one read from
|
||||
# the database
|
||||
for _ in xrange(2):
|
||||
result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case)
|
||||
self.assertEquals(self.conn.get_course_index.call_count, 1)
|
||||
self.assertEquals(self.conn.get_course_index.return_value, result)
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_read_index_after_write(self, ignore_case):
|
||||
# Reading the index after a write still should hit the database once to fetch the
|
||||
# initial index, and should return the written index_entry
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
result = self.bulk.get_course_index(self.course_key, ignore_case=ignore_case)
|
||||
self.assertEquals(self.conn.get_course_index.call_count, 1)
|
||||
self.assertEquals(self.index_entry, result)
|
||||
|
||||
def test_read_index_ignore_case(self):
|
||||
# Reading using ignore case should find an already written entry with a different case
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
result = self.bulk.get_course_index(
|
||||
self.course_key.replace(
|
||||
org=self.course_key.org.upper(),
|
||||
course=self.course_key.course.title(),
|
||||
run=self.course_key.run.upper()
|
||||
),
|
||||
ignore_case=True
|
||||
)
|
||||
self.assertEquals(self.conn.get_course_index.call_count, 1)
|
||||
self.assertEquals(self.index_entry, result)
|
||||
|
||||
|
||||
|
||||
class TestBulkWriteMixinOpenAfterPrevTransaction(TestBulkWriteMixinOpen, TestBulkWriteMixinPreviousTransaction):
|
||||
"""
|
||||
Test that operations on with an open transaction aren't affected by a previously executed transaction
|
||||
"""
|
||||
pass
|
||||
Reference in New Issue
Block a user