diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py index 289c871af1..ab7f922df5 100644 --- a/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py +++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py @@ -1,50 +1,180 @@ """ Segregation of pymongo functions from the data modeling mechanisms for split modulestore. """ -import re -from mongodb_proxy import autoretry_read, MongoProxy +import datetime +import math import pymongo +import pytz +import re +from contextlib import contextmanager +from time import time # Import this just to export it from pymongo.errors import DuplicateKeyError # pylint: disable=unused-import from contracts import check, new_contract +from mongodb_proxy import autoretry_read, MongoProxy from xmodule.exceptions import HeartbeatFailure from xmodule.modulestore import BlockData from xmodule.modulestore.split_mongo import BlockKey -import datetime -import pytz +import dogstats_wrapper as dog_stats_api new_contract('BlockData', BlockData) -def structure_from_mongo(structure): +def round_power_2(value): + """ + Return value rounded up to the nearest power of 2. + """ + if value == 0: + return 0 + + return math.pow(2, math.ceil(math.log(value, 2))) + + +class Tagger(object): + """ + An object used by :class:`QueryTimer` to allow timed code blocks + to add measurements and tags to the timer. + """ + def __init__(self): + self.added_tags = [] + self.measures = [] + + def measure(self, name, size): + """ + Record a measurement of the timed data. This would be something to + indicate the size of the value being timed. + + Arguments: + name: The name of the measurement. + size (float): The size of the measurement. + """ + self.measures.append((name, size)) + + def tag(self, **kwargs): + """ + Add tags to the timer. + + Arguments: + **kwargs: Each keyword is treated as a tag name, and the + value of the argument is the tag value. + """ + self.added_tags.extend(kwargs.items()) + + @property + def tags(self): + """ + Return all tags for this (this includes any tags added with :meth:`tag`, + and also all of the added measurements, bucketed into powers of 2). + """ + return [ + '{}:{}'.format(name, round_power_2(size)) + for name, size in self.measures + ] + [ + '{}:{}'.format(name, value) + for name, value in self.added_tags + ] + + +class QueryTimer(object): + """ + An object that allows timing a block of code while also recording measurements + about that code. + """ + def __init__(self, metric_base, sample_rate=1): + """ + Arguments: + metric_base: The prefix to be used for all queries captured + with this :class:`QueryTimer`. + """ + self._metric_base = metric_base + self._sample_rate = sample_rate + + @contextmanager + def timer(self, metric_name, course_context): + """ + Contextmanager which acts as a timer for the metric ``metric_name``, + but which also yields a :class:`Tagger` object that allows the timed block + of code to add tags and quantity measurements. Tags are added verbatim to the + timer output. Measurements are recorded as histogram measurements in their own, + and also as bucketed tags on the timer measurement. + + Arguments: + metric_name: The name used to aggregate all of these metrics. + course_context: The course which the query is being made for. + """ + tagger = Tagger() + metric_name = "{}.{}".format(self._metric_base, metric_name) + + start = time() + try: + yield tagger + finally: + end = time() + tags = tagger.tags + tags.append('course:{}'.format(course_context)) + for name, size in tagger.measures: + dog_stats_api.histogram( + '{}.{}'.format(metric_name, name), + size, + timestamp=end, + tags=[tag for tag in tags if not tag.startswith('{}:'.format(metric_name))], + sample_rate=self._sample_rate, + ) + dog_stats_api.histogram( + '{}.duration'.format(metric_name), + end - start, + timestamp=end, + tags=tags, + sample_rate=self._sample_rate, + ) + dog_stats_api.increment( + metric_name, + timestamp=end, + tags=tags, + sample_rate=self._sample_rate, + ) + + +TIMER = QueryTimer(__name__, 0.001) + + +def structure_from_mongo(structure, course_context=None): """ Converts the 'blocks' key from a list [block_data] to a map {BlockKey: block_data}. Converts 'root' from [block_type, block_id] to BlockKey. Converts 'blocks.*.fields.children' from [[block_type, block_id]] to [BlockKey]. N.B. Does not convert any other ReferenceFields (because we don't know which fields they are at this level). + + Arguments: + structure: The document structure to convert + course_context (CourseKey): For metrics gathering, the CourseKey + for the course that this data is being processed for. """ - check('seq[2]', structure['root']) - check('list(dict)', structure['blocks']) - for block in structure['blocks']: - if 'children' in block['fields']: - check('list(list[2])', block['fields']['children']) + with TIMER.timer('structure_from_mongo', course_context) as tagger: + tagger.measure('blocks', len(structure['blocks'])) - structure['root'] = BlockKey(*structure['root']) - new_blocks = {} - for block in structure['blocks']: - if 'children' in block['fields']: - block['fields']['children'] = [BlockKey(*child) for child in block['fields']['children']] - new_blocks[BlockKey(block['block_type'], block.pop('block_id'))] = BlockData(**block) - structure['blocks'] = new_blocks + check('seq[2]', structure['root']) + check('list(dict)', structure['blocks']) + for block in structure['blocks']: + if 'children' in block['fields']: + check('list(list[2])', block['fields']['children']) - return structure + structure['root'] = BlockKey(*structure['root']) + new_blocks = {} + for block in structure['blocks']: + if 'children' in block['fields']: + block['fields']['children'] = [BlockKey(*child) for child in block['fields']['children']] + new_blocks[BlockKey(block['block_type'], block.pop('block_id'))] = BlockData(**block) + structure['blocks'] = new_blocks + + return structure -def structure_to_mongo(structure): +def structure_to_mongo(structure, course_context=None): """ Converts the 'blocks' key from a map {BlockKey: block_data} to a list [block_data], inserting BlockKey.type as 'block_type' @@ -52,22 +182,25 @@ def structure_to_mongo(structure): Doesn't convert 'root', since namedtuple's can be inserted directly into mongo. """ - check('BlockKey', structure['root']) - check('dict(BlockKey: BlockData)', structure['blocks']) - for block in structure['blocks'].itervalues(): - if 'children' in block.fields: - check('list(BlockKey)', block.fields['children']) + with TIMER.timer('structure_to_mongo', course_context) as tagger: + tagger.measure('blocks', len(structure['blocks'])) - new_structure = dict(structure) - new_structure['blocks'] = [] + check('BlockKey', structure['root']) + check('dict(BlockKey: BlockData)', structure['blocks']) + for block in structure['blocks'].itervalues(): + if 'children' in block.fields: + check('list(BlockKey)', block.fields['children']) - for block_key, block in structure['blocks'].iteritems(): - new_block = dict(block.to_storable()) - new_block.setdefault('block_type', block_key.type) - new_block['block_id'] = block_key.id - new_structure['blocks'].append(new_block) + new_structure = dict(structure) + new_structure['blocks'] = [] - return new_structure + for block_key, block in structure['blocks'].iteritems(): + new_block = dict(block.to_storable()) + new_block.setdefault('block_type', block_key.type) + new_block['block_id'] = block_key.id + new_structure['blocks'].append(new_block) + + return new_structure class MongoConnection(object): @@ -121,34 +254,54 @@ class MongoConnection(object): else: raise HeartbeatFailure("Can't connect to {}".format(self.database.name)) - def get_structure(self, key): + def get_structure(self, key, course_context=None): """ Get the structure from the persistence mechanism whose id is the given key """ - return structure_from_mongo(self.structures.find_one({'_id': key})) + with TIMER.timer("get_structure", course_context) as tagger_get_structure: + with TIMER.timer("get_structure.find_one", course_context) as tagger_find_one: + doc = self.structures.find_one({'_id': key}) + tagger_find_one.measure("blocks", len(doc['blocks'])) + tagger_get_structure.measure("blocks", len(doc['blocks'])) + + return structure_from_mongo(doc, course_context) @autoretry_read() - def find_structures_by_id(self, ids): + def find_structures_by_id(self, ids, course_context=None): """ Return all structures that specified in ``ids``. Arguments: ids (list): A list of structure ids """ - return [structure_from_mongo(structure) for structure in self.structures.find({'_id': {'$in': ids}})] + with TIMER.timer("find_structures_by_id", course_context) as tagger: + tagger.measure("requested_ids", len(ids)) + docs = [ + structure_from_mongo(structure, course_context) + for structure in self.structures.find({'_id': {'$in': ids}}) + ] + tagger.measure("structures", len(docs)) + return docs @autoretry_read() - def find_structures_derived_from(self, ids): + def find_structures_derived_from(self, ids, course_context=None): """ Return all structures that were immediately derived from a structure listed in ``ids``. Arguments: ids (list): A list of structure ids """ - return [structure_from_mongo(structure) for structure in self.structures.find({'previous_version': {'$in': ids}})] + with TIMER.timer("find_structures_derived_from", course_context) as tagger: + tagger.measure("base_ids", len(ids)) + docs = [ + structure_from_mongo(structure, course_context) + for structure in self.structures.find({'previous_version': {'$in': ids}}) + ] + tagger.measure("structures", len(docs)) + return docs @autoretry_read() - def find_ancestor_structures(self, original_version, block_key): + def find_ancestor_structures(self, original_version, block_key, course_context=None): """ Find all structures that originated from ``original_version`` that contain ``block_key``. @@ -156,45 +309,51 @@ class MongoConnection(object): original_version (str or ObjectID): The id of a structure block_key (BlockKey): The id of the block in question """ - return [ - structure_from_mongo(structure) - for structure in self.structures.find({ - 'original_version': original_version, - 'blocks': { - '$elemMatch': { - 'block_id': block_key.id, - 'block_type': block_key.type, - 'edit_info.update_version': { - '$exists': True, + with TIMER.timer("find_ancestor_structures", course_context) as tagger: + docs = [ + structure_from_mongo(structure, course_context) + for structure in self.structures.find({ + 'original_version': original_version, + 'blocks': { + '$elemMatch': { + 'block_id': block_key.id, + 'block_type': block_key.type, + 'edit_info.update_version': { + '$exists': True, + }, }, }, - }, - }) - ] + }) + ] + tagger.measure("structures", len(docs)) + return docs - def insert_structure(self, structure): + def insert_structure(self, structure, course_context=None): """ Insert a new structure into the database. """ - self.structures.insert(structure_to_mongo(structure)) + with TIMER.timer("insert_structure", course_context) as tagger: + tagger.measure("blocks", len(structure["blocks"])) + self.structures.insert(structure_to_mongo(structure, course_context)) def get_course_index(self, key, ignore_case=False): """ Get the course_index from the persistence mechanism whose id is the given key """ - if ignore_case: - query = { - key_attr: re.compile(u'^{}$'.format(re.escape(getattr(key, key_attr))), re.IGNORECASE) - for key_attr in ('org', 'course', 'run') - } - else: - query = { - key_attr: getattr(key, key_attr) - for key_attr in ('org', 'course', 'run') - } - return self.course_index.find_one(query) + with TIMER.timer("get_course_index", key): + if ignore_case: + query = { + key_attr: re.compile(u'^{}$'.format(re.escape(getattr(key, key_attr))), re.IGNORECASE) + for key_attr in ('org', 'course', 'run') + } + else: + query = { + key_attr: getattr(key, key_attr) + for key_attr in ('org', 'course', 'run') + } + return self.course_index.find_one(query) - def find_matching_course_indexes(self, branch=None, search_targets=None, org_target=None): + def find_matching_course_indexes(self, branch=None, search_targets=None, org_target=None, course_context=None): """ Find the course_index matching particular conditions. @@ -205,75 +364,89 @@ class MongoConnection(object): org_target: If specified, this is an ORG filter so that only course_indexs are returned for the specified ORG """ - query = {} - if branch is not None: - query['versions.{}'.format(branch)] = {'$exists': True} + with TIMER.timer("find_matching_course_indexes", course_context): + query = {} + if branch is not None: + query['versions.{}'.format(branch)] = {'$exists': True} - if search_targets: - for key, value in search_targets.iteritems(): - query['search_targets.{}'.format(key)] = value + if search_targets: + for key, value in search_targets.iteritems(): + query['search_targets.{}'.format(key)] = value - if org_target: - query['org'] = org_target + if org_target: + query['org'] = org_target - return self.course_index.find(query) + return self.course_index.find(query) - def insert_course_index(self, course_index): + def insert_course_index(self, course_index, course_context=None): """ Create the course_index in the db """ - course_index['last_update'] = datetime.datetime.now(pytz.utc) - self.course_index.insert(course_index) + with TIMER.timer("insert_course_index", course_context): + course_index['last_update'] = datetime.datetime.now(pytz.utc) + self.course_index.insert(course_index) - def update_course_index(self, course_index, from_index=None): + def update_course_index(self, course_index, from_index=None, course_context=None): """ Update the db record for course_index. Arguments: from_index: If set, only update an index if it matches the one specified in `from_index`. """ - if from_index: - query = {"_id": from_index["_id"]} - # last_update not only tells us when this course was last updated but also helps - # prevent collisions - if 'last_update' in from_index: - query['last_update'] = from_index['last_update'] - else: - query = { - 'org': course_index['org'], - 'course': course_index['course'], - 'run': course_index['run'], - } - course_index['last_update'] = datetime.datetime.now(pytz.utc) - self.course_index.update(query, course_index, upsert=False,) + with TIMER.timer("update_course_index", course_context): + if from_index: + query = {"_id": from_index["_id"]} + # last_update not only tells us when this course was last updated but also helps + # prevent collisions + if 'last_update' in from_index: + query['last_update'] = from_index['last_update'] + else: + query = { + 'org': course_index['org'], + 'course': course_index['course'], + 'run': course_index['run'], + } + course_index['last_update'] = datetime.datetime.now(pytz.utc) + self.course_index.update(query, course_index, upsert=False,) def delete_course_index(self, course_key): """ Delete the course_index from the persistence mechanism whose id is the given course_index """ - query = { - key_attr: getattr(course_key, key_attr) - for key_attr in ('org', 'course', 'run') - } - return self.course_index.remove(query) + with TIMER.timer("delete_course_index", course_key): + query = { + key_attr: getattr(course_key, key_attr) + for key_attr in ('org', 'course', 'run') + } + return self.course_index.remove(query) - def get_definition(self, key): + def get_definition(self, key, course_context=None): """ Get the definition from the persistence mechanism whose id is the given key """ - return self.definitions.find_one({'_id': key}) + with TIMER.timer("get_definition", course_context) as tagger: + definition = self.definitions.find_one({'_id': key}) + tagger.measure("fields", len(definition['fields'])) + tagger.tag(block_type=definition['block_type']) + return definition - def get_definitions(self, definitions): + def get_definitions(self, definitions, course_context=None): """ Retrieve all definitions listed in `definitions`. """ - return self.definitions.find({'_id': {'$in': definitions}}) + with TIMER.timer("get_definitions", course_context) as tagger: + tagger.measure('definitions', len(definitions)) + definitions = self.definitions.find({'_id': {'$in': definitions}}) + return definitions - def insert_definition(self, definition): + def insert_definition(self, definition, course_context=None): """ Create the definition in the db """ - self.definitions.insert(definition) + with TIMER.timer("insert_definition", course_context) as tagger: + tagger.measure('fields', len(definition['fields'])) + tagger.tag(block_type=definition['block_type']) + self.definitions.insert(definition) def ensure_indexes(self): """ diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py index e0f7df1900..722dbf8dcf 100644 --- a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py +++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py @@ -127,6 +127,7 @@ class SplitBulkWriteRecord(BulkOpsRecord): self.modules = defaultdict(dict) self.definitions = {} self.definitions_in_db = set() + self.course_key = None # TODO: This needs to track which branches have actually been modified/versioned, # so that copying one branch to another doesn't update the original branch. @@ -228,6 +229,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): bulk_write_record.initial_index = self.db_connection.get_course_index(course_key) # Ensure that any edits to the index don't pollute the initial_index bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index) + bulk_write_record.course_key = course_key def _end_outermost_bulk_operation(self, bulk_write_record, structure_key, emit_signals=True): """ @@ -241,7 +243,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): dirty = True try: - self.db_connection.insert_structure(bulk_write_record.structures[_id]) + self.db_connection.insert_structure(bulk_write_record.structures[_id], bulk_write_record.course_key) except DuplicateKeyError: # We may not have looked up this structure inside this bulk operation, and thus # didn't realize that it was already in the database. That's OK, the store is @@ -252,7 +254,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): dirty = True try: - self.db_connection.insert_definition(bulk_write_record.definitions[_id]) + self.db_connection.insert_definition(bulk_write_record.definitions[_id], bulk_write_record.course_key) except DuplicateKeyError: # We may not have looked up this definition inside this bulk operation, and thus # didn't realize that it was already in the database. That's OK, the store is @@ -263,9 +265,13 @@ class SplitBulkWriteMixin(BulkOperationsMixin): dirty = True if bulk_write_record.initial_index is None: - self.db_connection.insert_course_index(bulk_write_record.index) + self.db_connection.insert_course_index(bulk_write_record.index, bulk_write_record.course_key) else: - self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index) + self.db_connection.update_course_index( + bulk_write_record.index, + from_index=bulk_write_record.initial_index, + course_context=bulk_write_record.course_key + ) if dirty and emit_signals: self.send_bulk_published_signal(bulk_write_record, structure_key) @@ -294,7 +300,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): if bulk_write_record.active: bulk_write_record.index = index_entry else: - self.db_connection.insert_course_index(index_entry) + self.db_connection.insert_course_index(index_entry, course_key) def update_course_index(self, course_key, updated_index_entry): """ @@ -308,7 +314,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): if bulk_write_record.active: bulk_write_record.index = updated_index_entry else: - self.db_connection.update_course_index(updated_index_entry) + self.db_connection.update_course_index(updated_index_entry, course_key) def get_structure(self, course_key, version_guid): bulk_write_record = self._get_bulk_ops_record(course_key) @@ -317,7 +323,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): # The structure hasn't been loaded from the db yet, so load it if structure is None: - structure = self.db_connection.get_structure(version_guid) + structure = self.db_connection.get_structure(version_guid, course_key) bulk_write_record.structures[version_guid] = structure if structure is not None: bulk_write_record.structures_in_db.add(version_guid) @@ -326,7 +332,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): else: # cast string to ObjectId if necessary version_guid = course_key.as_object_id(version_guid) - return self.db_connection.get_structure(version_guid) + return self.db_connection.get_structure(version_guid, course_key) def update_structure(self, course_key, structure): """ @@ -338,7 +344,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): if bulk_write_record.active: bulk_write_record.structures[structure['_id']] = structure else: - self.db_connection.insert_structure(structure) + self.db_connection.insert_structure(structure, course_key) def get_cached_block(self, course_key, version_guid, block_id): """ @@ -387,7 +393,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): # The definition hasn't been loaded from the db yet, so load it if definition is None: - definition = self.db_connection.get_definition(definition_guid) + definition = self.db_connection.get_definition(definition_guid, course_key) bulk_write_record.definitions[definition_guid] = definition if definition is not None: bulk_write_record.definitions_in_db.add(definition_guid) @@ -396,7 +402,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): else: # cast string to ObjectId if necessary definition_guid = course_key.as_object_id(definition_guid) - return self.db_connection.get_definition(definition_guid) + return self.db_connection.get_definition(definition_guid, course_key) def get_definitions(self, course_key, ids): """ @@ -424,7 +430,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): if len(ids): # Query the db for the definitions. - defs_from_db = self.db_connection.get_definitions(list(ids)) + defs_from_db = self.db_connection.get_definitions(list(ids), course_key) # Add the retrieved definitions to the cache. bulk_write_record.definitions.update({d.get('_id'): d for d in defs_from_db}) definitions.extend(defs_from_db) @@ -439,7 +445,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin): if bulk_write_record.active: bulk_write_record.definitions[definition['_id']] = definition else: - self.db_connection.insert_definition(definition) + self.db_connection.insert_definition(definition, course_key) def version_structure(self, course_key, structure, user_id): """ @@ -1206,7 +1212,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): 'edited_on': course['edited_on'] } - def get_definition_history_info(self, definition_locator): + def get_definition_history_info(self, definition_locator, course_context=None): """ Because xblocks doesn't give a means to separate the definition's meta information from the usage xblock's, this method will get that info for the definition @@ -1220,7 +1226,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase): # The supplied locator is of the wrong type, so it can't possibly be stored in this modulestore. raise ItemNotFoundError(definition_locator) - definition = self.db_connection.get_definition(definition_locator.definition_id) + definition = self.db_connection.get_definition(definition_locator.definition_id, course_context) if definition is None: return None return definition['edit_info'] diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py index 3262a060c8..2de4d4d3d4 100644 --- a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py +++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore_bulk_operations.py @@ -55,7 +55,9 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): # Reading a structure when no bulk operation is active should just call # through to the db_connection result = self.bulk.get_structure(self.course_key, version_guid) - self.assertConnCalls(call.get_structure(self.course_key.as_object_id(version_guid))) + self.assertConnCalls( + call.get_structure(self.course_key.as_object_id(version_guid), self.course_key) + ) self.assertEqual(result, self.conn.get_structure.return_value) self.assertCacheNotCleared() @@ -64,7 +66,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): # call through to the db_connection. It should also clear the # system cache self.bulk.update_structure(self.course_key, self.structure) - self.assertConnCalls(call.insert_structure(self.structure)) + self.assertConnCalls(call.insert_structure(self.structure, self.course_key)) self.clear_cache.assert_called_once_with(self.structure['_id']) @ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId()) @@ -72,14 +74,19 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): # Reading a definition when no bulk operation is active should just call # through to the db_connection result = self.bulk.get_definition(self.course_key, version_guid) - self.assertConnCalls(call.get_definition(self.course_key.as_object_id(version_guid))) + self.assertConnCalls( + call.get_definition( + self.course_key.as_object_id(version_guid), + self.course_key + ) + ) self.assertEqual(result, self.conn.get_definition.return_value) def test_no_bulk_write_definition(self): # Writing a definition when no bulk operation is active should just # call through to the db_connection. self.bulk.update_definition(self.course_key, self.definition) - self.assertConnCalls(call.insert_definition(self.definition)) + self.assertConnCalls(call.insert_definition(self.definition, self.course_key)) @ddt.data(True, False) def test_no_bulk_read_index(self, ignore_case): @@ -94,7 +101,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): # Writing a course index when no bulk operation is active should just call # through to the db_connection self.bulk.insert_course_index(self.course_key, self.index_entry) - self.assertConnCalls(call.insert_course_index(self.index_entry)) + self.assertConnCalls(call.insert_course_index(self.index_entry, self.course_key)) self.assertCacheNotCleared() def test_out_of_order_end(self): @@ -109,7 +116,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk.insert_course_index(self.course_key, self.index_entry) self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) - self.conn.insert_course_index.assert_called_once_with(self.index_entry) + self.conn.insert_course_index.assert_called_once_with(self.index_entry, self.course_key) def test_write_updated_index_on_close(self): old_index = {'this': 'is', 'an': 'old index'} @@ -119,7 +126,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk.insert_course_index(self.course_key, self.index_entry) self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) - self.conn.update_course_index.assert_called_once_with(self.index_entry, from_index=old_index) + self.conn.update_course_index.assert_called_once_with( + self.index_entry, + from_index=old_index, + course_context=self.course_key, + ) def test_write_structure_on_close(self): self.conn.get_course_index.return_value = None @@ -128,7 +139,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk.update_structure(self.course_key, self.structure) self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) - self.assertConnCalls(call.insert_structure(self.structure)) + self.assertConnCalls(call.insert_structure(self.structure, self.course_key)) def test_write_multiple_structures_on_close(self): self.conn.get_course_index.return_value = None @@ -140,7 +151,10 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( - [call.insert_structure(self.structure), call.insert_structure(other_structure)], + [ + call.insert_structure(self.structure, self.course_key), + call.insert_structure(other_structure, self.course_key) + ], self.conn.mock_calls ) @@ -154,10 +168,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertConnCalls( - call.insert_definition(self.definition), + call.insert_definition(self.definition, self.course_key), call.update_course_index( {'versions': {self.course_key.branch: self.definition['_id']}}, - from_index=original_index + from_index=original_index, + course_context=self.course_key ) ) @@ -173,11 +188,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( [ - call.insert_definition(self.definition), - call.insert_definition(other_definition), + call.insert_definition(self.definition, self.course_key), + call.insert_definition(other_definition, self.course_key), call.update_course_index( {'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}}, - from_index=original_index + from_index=original_index, + course_context=self.course_key, ) ], self.conn.mock_calls @@ -190,7 +206,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk.update_definition(self.course_key, self.definition) self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) - self.assertConnCalls(call.insert_definition(self.definition)) + self.assertConnCalls(call.insert_definition(self.definition, self.course_key)) def test_write_multiple_definitions_on_close(self): self.conn.get_course_index.return_value = None @@ -202,7 +218,10 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( - [call.insert_definition(self.definition), call.insert_definition(other_definition)], + [ + call.insert_definition(self.definition, self.course_key), + call.insert_definition(other_definition, self.course_key) + ], self.conn.mock_calls ) @@ -216,10 +235,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.assertConnCalls() self.bulk._end_bulk_operation(self.course_key) self.assertConnCalls( - call.insert_structure(self.structure), + call.insert_structure(self.structure, self.course_key), call.update_course_index( {'versions': {self.course_key.branch: self.structure['_id']}}, - from_index=original_index + from_index=original_index, + course_context=self.course_key, ) ) @@ -235,11 +255,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin): self.bulk._end_bulk_operation(self.course_key) self.assertItemsEqual( [ - call.insert_structure(self.structure), - call.insert_structure(other_structure), + call.insert_structure(self.structure, self.course_key), + call.insert_structure(other_structure, self.course_key), call.update_course_index( {'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}}, - from_index=original_index + from_index=original_index, + course_context=self.course_key, ) ], self.conn.mock_calls @@ -408,7 +429,7 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin): results = self.bulk.get_definitions(self.course_key, search_ids) definitions_gotten = list(set(search_ids) - set(active_ids)) if len(definitions_gotten) > 0: - self.conn.get_definitions.assert_called_once_with(definitions_gotten) + self.conn.get_definitions.assert_called_once_with(definitions_gotten, self.course_key) else: # If no definitions to get, then get_definitions() should *not* have been called. self.assertEquals(self.conn.get_definitions.call_count, 0) @@ -677,8 +698,12 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin): index_copy['versions']['draft'] = index['versions']['published'] self.bulk.update_course_index(self.course_key, index_copy) self.bulk._end_bulk_operation(self.course_key) - self.conn.insert_structure.assert_called_once_with(published_structure) - self.conn.update_course_index.assert_called_once_with(index_copy, from_index=self.conn.get_course_index.return_value) + self.conn.insert_structure.assert_called_once_with(published_structure, self.course_key) + self.conn.update_course_index.assert_called_once_with( + index_copy, + from_index=self.conn.get_course_index.return_value, + course_context=self.course_key, + ) self.conn.get_course_index.assert_called_once_with(self.course_key)