Add metrics around all split-mongo database operations measuring count, duration, and approximate size
[PLAT-645]
This commit is contained in:
@@ -1,50 +1,180 @@
|
||||
"""
|
||||
Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
|
||||
"""
|
||||
import re
|
||||
from mongodb_proxy import autoretry_read, MongoProxy
|
||||
import datetime
|
||||
import math
|
||||
import pymongo
|
||||
import pytz
|
||||
import re
|
||||
from contextlib import contextmanager
|
||||
from time import time
|
||||
|
||||
# Import this just to export it
|
||||
from pymongo.errors import DuplicateKeyError # pylint: disable=unused-import
|
||||
|
||||
from contracts import check, new_contract
|
||||
from mongodb_proxy import autoretry_read, MongoProxy
|
||||
from xmodule.exceptions import HeartbeatFailure
|
||||
from xmodule.modulestore import BlockData
|
||||
from xmodule.modulestore.split_mongo import BlockKey
|
||||
import datetime
|
||||
import pytz
|
||||
import dogstats_wrapper as dog_stats_api
|
||||
|
||||
|
||||
new_contract('BlockData', BlockData)
|
||||
|
||||
|
||||
def structure_from_mongo(structure):
|
||||
def round_power_2(value):
|
||||
"""
|
||||
Return value rounded up to the nearest power of 2.
|
||||
"""
|
||||
if value == 0:
|
||||
return 0
|
||||
|
||||
return math.pow(2, math.ceil(math.log(value, 2)))
|
||||
|
||||
|
||||
class Tagger(object):
|
||||
"""
|
||||
An object used by :class:`QueryTimer` to allow timed code blocks
|
||||
to add measurements and tags to the timer.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.added_tags = []
|
||||
self.measures = []
|
||||
|
||||
def measure(self, name, size):
|
||||
"""
|
||||
Record a measurement of the timed data. This would be something to
|
||||
indicate the size of the value being timed.
|
||||
|
||||
Arguments:
|
||||
name: The name of the measurement.
|
||||
size (float): The size of the measurement.
|
||||
"""
|
||||
self.measures.append((name, size))
|
||||
|
||||
def tag(self, **kwargs):
|
||||
"""
|
||||
Add tags to the timer.
|
||||
|
||||
Arguments:
|
||||
**kwargs: Each keyword is treated as a tag name, and the
|
||||
value of the argument is the tag value.
|
||||
"""
|
||||
self.added_tags.extend(kwargs.items())
|
||||
|
||||
@property
|
||||
def tags(self):
|
||||
"""
|
||||
Return all tags for this (this includes any tags added with :meth:`tag`,
|
||||
and also all of the added measurements, bucketed into powers of 2).
|
||||
"""
|
||||
return [
|
||||
'{}:{}'.format(name, round_power_2(size))
|
||||
for name, size in self.measures
|
||||
] + [
|
||||
'{}:{}'.format(name, value)
|
||||
for name, value in self.added_tags
|
||||
]
|
||||
|
||||
|
||||
class QueryTimer(object):
|
||||
"""
|
||||
An object that allows timing a block of code while also recording measurements
|
||||
about that code.
|
||||
"""
|
||||
def __init__(self, metric_base, sample_rate=1):
|
||||
"""
|
||||
Arguments:
|
||||
metric_base: The prefix to be used for all queries captured
|
||||
with this :class:`QueryTimer`.
|
||||
"""
|
||||
self._metric_base = metric_base
|
||||
self._sample_rate = sample_rate
|
||||
|
||||
@contextmanager
|
||||
def timer(self, metric_name, course_context):
|
||||
"""
|
||||
Contextmanager which acts as a timer for the metric ``metric_name``,
|
||||
but which also yields a :class:`Tagger` object that allows the timed block
|
||||
of code to add tags and quantity measurements. Tags are added verbatim to the
|
||||
timer output. Measurements are recorded as histogram measurements in their own,
|
||||
and also as bucketed tags on the timer measurement.
|
||||
|
||||
Arguments:
|
||||
metric_name: The name used to aggregate all of these metrics.
|
||||
course_context: The course which the query is being made for.
|
||||
"""
|
||||
tagger = Tagger()
|
||||
metric_name = "{}.{}".format(self._metric_base, metric_name)
|
||||
|
||||
start = time()
|
||||
try:
|
||||
yield tagger
|
||||
finally:
|
||||
end = time()
|
||||
tags = tagger.tags
|
||||
tags.append('course:{}'.format(course_context))
|
||||
for name, size in tagger.measures:
|
||||
dog_stats_api.histogram(
|
||||
'{}.{}'.format(metric_name, name),
|
||||
size,
|
||||
timestamp=end,
|
||||
tags=[tag for tag in tags if not tag.startswith('{}:'.format(metric_name))],
|
||||
sample_rate=self._sample_rate,
|
||||
)
|
||||
dog_stats_api.histogram(
|
||||
'{}.duration'.format(metric_name),
|
||||
end - start,
|
||||
timestamp=end,
|
||||
tags=tags,
|
||||
sample_rate=self._sample_rate,
|
||||
)
|
||||
dog_stats_api.increment(
|
||||
metric_name,
|
||||
timestamp=end,
|
||||
tags=tags,
|
||||
sample_rate=self._sample_rate,
|
||||
)
|
||||
|
||||
|
||||
TIMER = QueryTimer(__name__, 0.001)
|
||||
|
||||
|
||||
def structure_from_mongo(structure, course_context=None):
|
||||
"""
|
||||
Converts the 'blocks' key from a list [block_data] to a map
|
||||
{BlockKey: block_data}.
|
||||
Converts 'root' from [block_type, block_id] to BlockKey.
|
||||
Converts 'blocks.*.fields.children' from [[block_type, block_id]] to [BlockKey].
|
||||
N.B. Does not convert any other ReferenceFields (because we don't know which fields they are at this level).
|
||||
|
||||
Arguments:
|
||||
structure: The document structure to convert
|
||||
course_context (CourseKey): For metrics gathering, the CourseKey
|
||||
for the course that this data is being processed for.
|
||||
"""
|
||||
check('seq[2]', structure['root'])
|
||||
check('list(dict)', structure['blocks'])
|
||||
for block in structure['blocks']:
|
||||
if 'children' in block['fields']:
|
||||
check('list(list[2])', block['fields']['children'])
|
||||
with TIMER.timer('structure_from_mongo', course_context) as tagger:
|
||||
tagger.measure('blocks', len(structure['blocks']))
|
||||
|
||||
structure['root'] = BlockKey(*structure['root'])
|
||||
new_blocks = {}
|
||||
for block in structure['blocks']:
|
||||
if 'children' in block['fields']:
|
||||
block['fields']['children'] = [BlockKey(*child) for child in block['fields']['children']]
|
||||
new_blocks[BlockKey(block['block_type'], block.pop('block_id'))] = BlockData(**block)
|
||||
structure['blocks'] = new_blocks
|
||||
check('seq[2]', structure['root'])
|
||||
check('list(dict)', structure['blocks'])
|
||||
for block in structure['blocks']:
|
||||
if 'children' in block['fields']:
|
||||
check('list(list[2])', block['fields']['children'])
|
||||
|
||||
return structure
|
||||
structure['root'] = BlockKey(*structure['root'])
|
||||
new_blocks = {}
|
||||
for block in structure['blocks']:
|
||||
if 'children' in block['fields']:
|
||||
block['fields']['children'] = [BlockKey(*child) for child in block['fields']['children']]
|
||||
new_blocks[BlockKey(block['block_type'], block.pop('block_id'))] = BlockData(**block)
|
||||
structure['blocks'] = new_blocks
|
||||
|
||||
return structure
|
||||
|
||||
|
||||
def structure_to_mongo(structure):
|
||||
def structure_to_mongo(structure, course_context=None):
|
||||
"""
|
||||
Converts the 'blocks' key from a map {BlockKey: block_data} to
|
||||
a list [block_data], inserting BlockKey.type as 'block_type'
|
||||
@@ -52,22 +182,25 @@ def structure_to_mongo(structure):
|
||||
Doesn't convert 'root', since namedtuple's can be inserted
|
||||
directly into mongo.
|
||||
"""
|
||||
check('BlockKey', structure['root'])
|
||||
check('dict(BlockKey: BlockData)', structure['blocks'])
|
||||
for block in structure['blocks'].itervalues():
|
||||
if 'children' in block.fields:
|
||||
check('list(BlockKey)', block.fields['children'])
|
||||
with TIMER.timer('structure_to_mongo', course_context) as tagger:
|
||||
tagger.measure('blocks', len(structure['blocks']))
|
||||
|
||||
new_structure = dict(structure)
|
||||
new_structure['blocks'] = []
|
||||
check('BlockKey', structure['root'])
|
||||
check('dict(BlockKey: BlockData)', structure['blocks'])
|
||||
for block in structure['blocks'].itervalues():
|
||||
if 'children' in block.fields:
|
||||
check('list(BlockKey)', block.fields['children'])
|
||||
|
||||
for block_key, block in structure['blocks'].iteritems():
|
||||
new_block = dict(block.to_storable())
|
||||
new_block.setdefault('block_type', block_key.type)
|
||||
new_block['block_id'] = block_key.id
|
||||
new_structure['blocks'].append(new_block)
|
||||
new_structure = dict(structure)
|
||||
new_structure['blocks'] = []
|
||||
|
||||
return new_structure
|
||||
for block_key, block in structure['blocks'].iteritems():
|
||||
new_block = dict(block.to_storable())
|
||||
new_block.setdefault('block_type', block_key.type)
|
||||
new_block['block_id'] = block_key.id
|
||||
new_structure['blocks'].append(new_block)
|
||||
|
||||
return new_structure
|
||||
|
||||
|
||||
class MongoConnection(object):
|
||||
@@ -121,34 +254,54 @@ class MongoConnection(object):
|
||||
else:
|
||||
raise HeartbeatFailure("Can't connect to {}".format(self.database.name))
|
||||
|
||||
def get_structure(self, key):
|
||||
def get_structure(self, key, course_context=None):
|
||||
"""
|
||||
Get the structure from the persistence mechanism whose id is the given key
|
||||
"""
|
||||
return structure_from_mongo(self.structures.find_one({'_id': key}))
|
||||
with TIMER.timer("get_structure", course_context) as tagger_get_structure:
|
||||
with TIMER.timer("get_structure.find_one", course_context) as tagger_find_one:
|
||||
doc = self.structures.find_one({'_id': key})
|
||||
tagger_find_one.measure("blocks", len(doc['blocks']))
|
||||
tagger_get_structure.measure("blocks", len(doc['blocks']))
|
||||
|
||||
return structure_from_mongo(doc, course_context)
|
||||
|
||||
@autoretry_read()
|
||||
def find_structures_by_id(self, ids):
|
||||
def find_structures_by_id(self, ids, course_context=None):
|
||||
"""
|
||||
Return all structures that specified in ``ids``.
|
||||
|
||||
Arguments:
|
||||
ids (list): A list of structure ids
|
||||
"""
|
||||
return [structure_from_mongo(structure) for structure in self.structures.find({'_id': {'$in': ids}})]
|
||||
with TIMER.timer("find_structures_by_id", course_context) as tagger:
|
||||
tagger.measure("requested_ids", len(ids))
|
||||
docs = [
|
||||
structure_from_mongo(structure, course_context)
|
||||
for structure in self.structures.find({'_id': {'$in': ids}})
|
||||
]
|
||||
tagger.measure("structures", len(docs))
|
||||
return docs
|
||||
|
||||
@autoretry_read()
|
||||
def find_structures_derived_from(self, ids):
|
||||
def find_structures_derived_from(self, ids, course_context=None):
|
||||
"""
|
||||
Return all structures that were immediately derived from a structure listed in ``ids``.
|
||||
|
||||
Arguments:
|
||||
ids (list): A list of structure ids
|
||||
"""
|
||||
return [structure_from_mongo(structure) for structure in self.structures.find({'previous_version': {'$in': ids}})]
|
||||
with TIMER.timer("find_structures_derived_from", course_context) as tagger:
|
||||
tagger.measure("base_ids", len(ids))
|
||||
docs = [
|
||||
structure_from_mongo(structure, course_context)
|
||||
for structure in self.structures.find({'previous_version': {'$in': ids}})
|
||||
]
|
||||
tagger.measure("structures", len(docs))
|
||||
return docs
|
||||
|
||||
@autoretry_read()
|
||||
def find_ancestor_structures(self, original_version, block_key):
|
||||
def find_ancestor_structures(self, original_version, block_key, course_context=None):
|
||||
"""
|
||||
Find all structures that originated from ``original_version`` that contain ``block_key``.
|
||||
|
||||
@@ -156,45 +309,51 @@ class MongoConnection(object):
|
||||
original_version (str or ObjectID): The id of a structure
|
||||
block_key (BlockKey): The id of the block in question
|
||||
"""
|
||||
return [
|
||||
structure_from_mongo(structure)
|
||||
for structure in self.structures.find({
|
||||
'original_version': original_version,
|
||||
'blocks': {
|
||||
'$elemMatch': {
|
||||
'block_id': block_key.id,
|
||||
'block_type': block_key.type,
|
||||
'edit_info.update_version': {
|
||||
'$exists': True,
|
||||
with TIMER.timer("find_ancestor_structures", course_context) as tagger:
|
||||
docs = [
|
||||
structure_from_mongo(structure, course_context)
|
||||
for structure in self.structures.find({
|
||||
'original_version': original_version,
|
||||
'blocks': {
|
||||
'$elemMatch': {
|
||||
'block_id': block_key.id,
|
||||
'block_type': block_key.type,
|
||||
'edit_info.update_version': {
|
||||
'$exists': True,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
]
|
||||
})
|
||||
]
|
||||
tagger.measure("structures", len(docs))
|
||||
return docs
|
||||
|
||||
def insert_structure(self, structure):
|
||||
def insert_structure(self, structure, course_context=None):
|
||||
"""
|
||||
Insert a new structure into the database.
|
||||
"""
|
||||
self.structures.insert(structure_to_mongo(structure))
|
||||
with TIMER.timer("insert_structure", course_context) as tagger:
|
||||
tagger.measure("blocks", len(structure["blocks"]))
|
||||
self.structures.insert(structure_to_mongo(structure, course_context))
|
||||
|
||||
def get_course_index(self, key, ignore_case=False):
|
||||
"""
|
||||
Get the course_index from the persistence mechanism whose id is the given key
|
||||
"""
|
||||
if ignore_case:
|
||||
query = {
|
||||
key_attr: re.compile(u'^{}$'.format(re.escape(getattr(key, key_attr))), re.IGNORECASE)
|
||||
for key_attr in ('org', 'course', 'run')
|
||||
}
|
||||
else:
|
||||
query = {
|
||||
key_attr: getattr(key, key_attr)
|
||||
for key_attr in ('org', 'course', 'run')
|
||||
}
|
||||
return self.course_index.find_one(query)
|
||||
with TIMER.timer("get_course_index", key):
|
||||
if ignore_case:
|
||||
query = {
|
||||
key_attr: re.compile(u'^{}$'.format(re.escape(getattr(key, key_attr))), re.IGNORECASE)
|
||||
for key_attr in ('org', 'course', 'run')
|
||||
}
|
||||
else:
|
||||
query = {
|
||||
key_attr: getattr(key, key_attr)
|
||||
for key_attr in ('org', 'course', 'run')
|
||||
}
|
||||
return self.course_index.find_one(query)
|
||||
|
||||
def find_matching_course_indexes(self, branch=None, search_targets=None, org_target=None):
|
||||
def find_matching_course_indexes(self, branch=None, search_targets=None, org_target=None, course_context=None):
|
||||
"""
|
||||
Find the course_index matching particular conditions.
|
||||
|
||||
@@ -205,75 +364,89 @@ class MongoConnection(object):
|
||||
org_target: If specified, this is an ORG filter so that only course_indexs are
|
||||
returned for the specified ORG
|
||||
"""
|
||||
query = {}
|
||||
if branch is not None:
|
||||
query['versions.{}'.format(branch)] = {'$exists': True}
|
||||
with TIMER.timer("find_matching_course_indexes", course_context):
|
||||
query = {}
|
||||
if branch is not None:
|
||||
query['versions.{}'.format(branch)] = {'$exists': True}
|
||||
|
||||
if search_targets:
|
||||
for key, value in search_targets.iteritems():
|
||||
query['search_targets.{}'.format(key)] = value
|
||||
if search_targets:
|
||||
for key, value in search_targets.iteritems():
|
||||
query['search_targets.{}'.format(key)] = value
|
||||
|
||||
if org_target:
|
||||
query['org'] = org_target
|
||||
if org_target:
|
||||
query['org'] = org_target
|
||||
|
||||
return self.course_index.find(query)
|
||||
return self.course_index.find(query)
|
||||
|
||||
def insert_course_index(self, course_index):
|
||||
def insert_course_index(self, course_index, course_context=None):
|
||||
"""
|
||||
Create the course_index in the db
|
||||
"""
|
||||
course_index['last_update'] = datetime.datetime.now(pytz.utc)
|
||||
self.course_index.insert(course_index)
|
||||
with TIMER.timer("insert_course_index", course_context):
|
||||
course_index['last_update'] = datetime.datetime.now(pytz.utc)
|
||||
self.course_index.insert(course_index)
|
||||
|
||||
def update_course_index(self, course_index, from_index=None):
|
||||
def update_course_index(self, course_index, from_index=None, course_context=None):
|
||||
"""
|
||||
Update the db record for course_index.
|
||||
|
||||
Arguments:
|
||||
from_index: If set, only update an index if it matches the one specified in `from_index`.
|
||||
"""
|
||||
if from_index:
|
||||
query = {"_id": from_index["_id"]}
|
||||
# last_update not only tells us when this course was last updated but also helps
|
||||
# prevent collisions
|
||||
if 'last_update' in from_index:
|
||||
query['last_update'] = from_index['last_update']
|
||||
else:
|
||||
query = {
|
||||
'org': course_index['org'],
|
||||
'course': course_index['course'],
|
||||
'run': course_index['run'],
|
||||
}
|
||||
course_index['last_update'] = datetime.datetime.now(pytz.utc)
|
||||
self.course_index.update(query, course_index, upsert=False,)
|
||||
with TIMER.timer("update_course_index", course_context):
|
||||
if from_index:
|
||||
query = {"_id": from_index["_id"]}
|
||||
# last_update not only tells us when this course was last updated but also helps
|
||||
# prevent collisions
|
||||
if 'last_update' in from_index:
|
||||
query['last_update'] = from_index['last_update']
|
||||
else:
|
||||
query = {
|
||||
'org': course_index['org'],
|
||||
'course': course_index['course'],
|
||||
'run': course_index['run'],
|
||||
}
|
||||
course_index['last_update'] = datetime.datetime.now(pytz.utc)
|
||||
self.course_index.update(query, course_index, upsert=False,)
|
||||
|
||||
def delete_course_index(self, course_key):
|
||||
"""
|
||||
Delete the course_index from the persistence mechanism whose id is the given course_index
|
||||
"""
|
||||
query = {
|
||||
key_attr: getattr(course_key, key_attr)
|
||||
for key_attr in ('org', 'course', 'run')
|
||||
}
|
||||
return self.course_index.remove(query)
|
||||
with TIMER.timer("delete_course_index", course_key):
|
||||
query = {
|
||||
key_attr: getattr(course_key, key_attr)
|
||||
for key_attr in ('org', 'course', 'run')
|
||||
}
|
||||
return self.course_index.remove(query)
|
||||
|
||||
def get_definition(self, key):
|
||||
def get_definition(self, key, course_context=None):
|
||||
"""
|
||||
Get the definition from the persistence mechanism whose id is the given key
|
||||
"""
|
||||
return self.definitions.find_one({'_id': key})
|
||||
with TIMER.timer("get_definition", course_context) as tagger:
|
||||
definition = self.definitions.find_one({'_id': key})
|
||||
tagger.measure("fields", len(definition['fields']))
|
||||
tagger.tag(block_type=definition['block_type'])
|
||||
return definition
|
||||
|
||||
def get_definitions(self, definitions):
|
||||
def get_definitions(self, definitions, course_context=None):
|
||||
"""
|
||||
Retrieve all definitions listed in `definitions`.
|
||||
"""
|
||||
return self.definitions.find({'_id': {'$in': definitions}})
|
||||
with TIMER.timer("get_definitions", course_context) as tagger:
|
||||
tagger.measure('definitions', len(definitions))
|
||||
definitions = self.definitions.find({'_id': {'$in': definitions}})
|
||||
return definitions
|
||||
|
||||
def insert_definition(self, definition):
|
||||
def insert_definition(self, definition, course_context=None):
|
||||
"""
|
||||
Create the definition in the db
|
||||
"""
|
||||
self.definitions.insert(definition)
|
||||
with TIMER.timer("insert_definition", course_context) as tagger:
|
||||
tagger.measure('fields', len(definition['fields']))
|
||||
tagger.tag(block_type=definition['block_type'])
|
||||
self.definitions.insert(definition)
|
||||
|
||||
def ensure_indexes(self):
|
||||
"""
|
||||
|
||||
@@ -127,6 +127,7 @@ class SplitBulkWriteRecord(BulkOpsRecord):
|
||||
self.modules = defaultdict(dict)
|
||||
self.definitions = {}
|
||||
self.definitions_in_db = set()
|
||||
self.course_key = None
|
||||
|
||||
# TODO: This needs to track which branches have actually been modified/versioned,
|
||||
# so that copying one branch to another doesn't update the original branch.
|
||||
@@ -228,6 +229,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
bulk_write_record.initial_index = self.db_connection.get_course_index(course_key)
|
||||
# Ensure that any edits to the index don't pollute the initial_index
|
||||
bulk_write_record.index = copy.deepcopy(bulk_write_record.initial_index)
|
||||
bulk_write_record.course_key = course_key
|
||||
|
||||
def _end_outermost_bulk_operation(self, bulk_write_record, structure_key, emit_signals=True):
|
||||
"""
|
||||
@@ -241,7 +243,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
dirty = True
|
||||
|
||||
try:
|
||||
self.db_connection.insert_structure(bulk_write_record.structures[_id])
|
||||
self.db_connection.insert_structure(bulk_write_record.structures[_id], bulk_write_record.course_key)
|
||||
except DuplicateKeyError:
|
||||
# We may not have looked up this structure inside this bulk operation, and thus
|
||||
# didn't realize that it was already in the database. That's OK, the store is
|
||||
@@ -252,7 +254,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
dirty = True
|
||||
|
||||
try:
|
||||
self.db_connection.insert_definition(bulk_write_record.definitions[_id])
|
||||
self.db_connection.insert_definition(bulk_write_record.definitions[_id], bulk_write_record.course_key)
|
||||
except DuplicateKeyError:
|
||||
# We may not have looked up this definition inside this bulk operation, and thus
|
||||
# didn't realize that it was already in the database. That's OK, the store is
|
||||
@@ -263,9 +265,13 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
dirty = True
|
||||
|
||||
if bulk_write_record.initial_index is None:
|
||||
self.db_connection.insert_course_index(bulk_write_record.index)
|
||||
self.db_connection.insert_course_index(bulk_write_record.index, bulk_write_record.course_key)
|
||||
else:
|
||||
self.db_connection.update_course_index(bulk_write_record.index, from_index=bulk_write_record.initial_index)
|
||||
self.db_connection.update_course_index(
|
||||
bulk_write_record.index,
|
||||
from_index=bulk_write_record.initial_index,
|
||||
course_context=bulk_write_record.course_key
|
||||
)
|
||||
|
||||
if dirty and emit_signals:
|
||||
self.send_bulk_published_signal(bulk_write_record, structure_key)
|
||||
@@ -294,7 +300,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
if bulk_write_record.active:
|
||||
bulk_write_record.index = index_entry
|
||||
else:
|
||||
self.db_connection.insert_course_index(index_entry)
|
||||
self.db_connection.insert_course_index(index_entry, course_key)
|
||||
|
||||
def update_course_index(self, course_key, updated_index_entry):
|
||||
"""
|
||||
@@ -308,7 +314,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
if bulk_write_record.active:
|
||||
bulk_write_record.index = updated_index_entry
|
||||
else:
|
||||
self.db_connection.update_course_index(updated_index_entry)
|
||||
self.db_connection.update_course_index(updated_index_entry, course_key)
|
||||
|
||||
def get_structure(self, course_key, version_guid):
|
||||
bulk_write_record = self._get_bulk_ops_record(course_key)
|
||||
@@ -317,7 +323,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
|
||||
# The structure hasn't been loaded from the db yet, so load it
|
||||
if structure is None:
|
||||
structure = self.db_connection.get_structure(version_guid)
|
||||
structure = self.db_connection.get_structure(version_guid, course_key)
|
||||
bulk_write_record.structures[version_guid] = structure
|
||||
if structure is not None:
|
||||
bulk_write_record.structures_in_db.add(version_guid)
|
||||
@@ -326,7 +332,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
else:
|
||||
# cast string to ObjectId if necessary
|
||||
version_guid = course_key.as_object_id(version_guid)
|
||||
return self.db_connection.get_structure(version_guid)
|
||||
return self.db_connection.get_structure(version_guid, course_key)
|
||||
|
||||
def update_structure(self, course_key, structure):
|
||||
"""
|
||||
@@ -338,7 +344,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
if bulk_write_record.active:
|
||||
bulk_write_record.structures[structure['_id']] = structure
|
||||
else:
|
||||
self.db_connection.insert_structure(structure)
|
||||
self.db_connection.insert_structure(structure, course_key)
|
||||
|
||||
def get_cached_block(self, course_key, version_guid, block_id):
|
||||
"""
|
||||
@@ -387,7 +393,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
|
||||
# The definition hasn't been loaded from the db yet, so load it
|
||||
if definition is None:
|
||||
definition = self.db_connection.get_definition(definition_guid)
|
||||
definition = self.db_connection.get_definition(definition_guid, course_key)
|
||||
bulk_write_record.definitions[definition_guid] = definition
|
||||
if definition is not None:
|
||||
bulk_write_record.definitions_in_db.add(definition_guid)
|
||||
@@ -396,7 +402,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
else:
|
||||
# cast string to ObjectId if necessary
|
||||
definition_guid = course_key.as_object_id(definition_guid)
|
||||
return self.db_connection.get_definition(definition_guid)
|
||||
return self.db_connection.get_definition(definition_guid, course_key)
|
||||
|
||||
def get_definitions(self, course_key, ids):
|
||||
"""
|
||||
@@ -424,7 +430,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
|
||||
if len(ids):
|
||||
# Query the db for the definitions.
|
||||
defs_from_db = self.db_connection.get_definitions(list(ids))
|
||||
defs_from_db = self.db_connection.get_definitions(list(ids), course_key)
|
||||
# Add the retrieved definitions to the cache.
|
||||
bulk_write_record.definitions.update({d.get('_id'): d for d in defs_from_db})
|
||||
definitions.extend(defs_from_db)
|
||||
@@ -439,7 +445,7 @@ class SplitBulkWriteMixin(BulkOperationsMixin):
|
||||
if bulk_write_record.active:
|
||||
bulk_write_record.definitions[definition['_id']] = definition
|
||||
else:
|
||||
self.db_connection.insert_definition(definition)
|
||||
self.db_connection.insert_definition(definition, course_key)
|
||||
|
||||
def version_structure(self, course_key, structure, user_id):
|
||||
"""
|
||||
@@ -1206,7 +1212,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
|
||||
'edited_on': course['edited_on']
|
||||
}
|
||||
|
||||
def get_definition_history_info(self, definition_locator):
|
||||
def get_definition_history_info(self, definition_locator, course_context=None):
|
||||
"""
|
||||
Because xblocks doesn't give a means to separate the definition's meta information from
|
||||
the usage xblock's, this method will get that info for the definition
|
||||
@@ -1220,7 +1226,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
|
||||
# The supplied locator is of the wrong type, so it can't possibly be stored in this modulestore.
|
||||
raise ItemNotFoundError(definition_locator)
|
||||
|
||||
definition = self.db_connection.get_definition(definition_locator.definition_id)
|
||||
definition = self.db_connection.get_definition(definition_locator.definition_id, course_context)
|
||||
if definition is None:
|
||||
return None
|
||||
return definition['edit_info']
|
||||
|
||||
@@ -55,7 +55,9 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
# Reading a structure when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
result = self.bulk.get_structure(self.course_key, version_guid)
|
||||
self.assertConnCalls(call.get_structure(self.course_key.as_object_id(version_guid)))
|
||||
self.assertConnCalls(
|
||||
call.get_structure(self.course_key.as_object_id(version_guid), self.course_key)
|
||||
)
|
||||
self.assertEqual(result, self.conn.get_structure.return_value)
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
@@ -64,7 +66,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
# call through to the db_connection. It should also clear the
|
||||
# system cache
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
self.assertConnCalls(call.insert_structure(self.structure))
|
||||
self.assertConnCalls(call.insert_structure(self.structure, self.course_key))
|
||||
self.clear_cache.assert_called_once_with(self.structure['_id'])
|
||||
|
||||
@ddt.data('deadbeef1234' * 2, u'deadbeef1234' * 2, ObjectId())
|
||||
@@ -72,14 +74,19 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
# Reading a definition when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
result = self.bulk.get_definition(self.course_key, version_guid)
|
||||
self.assertConnCalls(call.get_definition(self.course_key.as_object_id(version_guid)))
|
||||
self.assertConnCalls(
|
||||
call.get_definition(
|
||||
self.course_key.as_object_id(version_guid),
|
||||
self.course_key
|
||||
)
|
||||
)
|
||||
self.assertEqual(result, self.conn.get_definition.return_value)
|
||||
|
||||
def test_no_bulk_write_definition(self):
|
||||
# Writing a definition when no bulk operation is active should just
|
||||
# call through to the db_connection.
|
||||
self.bulk.update_definition(self.course_key, self.definition)
|
||||
self.assertConnCalls(call.insert_definition(self.definition))
|
||||
self.assertConnCalls(call.insert_definition(self.definition, self.course_key))
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_no_bulk_read_index(self, ignore_case):
|
||||
@@ -94,7 +101,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
# Writing a course index when no bulk operation is active should just call
|
||||
# through to the db_connection
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
self.assertConnCalls(call.insert_course_index(self.index_entry))
|
||||
self.assertConnCalls(call.insert_course_index(self.index_entry, self.course_key))
|
||||
self.assertCacheNotCleared()
|
||||
|
||||
def test_out_of_order_end(self):
|
||||
@@ -109,7 +116,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.conn.insert_course_index.assert_called_once_with(self.index_entry)
|
||||
self.conn.insert_course_index.assert_called_once_with(self.index_entry, self.course_key)
|
||||
|
||||
def test_write_updated_index_on_close(self):
|
||||
old_index = {'this': 'is', 'an': 'old index'}
|
||||
@@ -119,7 +126,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.bulk.insert_course_index(self.course_key, self.index_entry)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.conn.update_course_index.assert_called_once_with(self.index_entry, from_index=old_index)
|
||||
self.conn.update_course_index.assert_called_once_with(
|
||||
self.index_entry,
|
||||
from_index=old_index,
|
||||
course_context=self.course_key,
|
||||
)
|
||||
|
||||
def test_write_structure_on_close(self):
|
||||
self.conn.get_course_index.return_value = None
|
||||
@@ -128,7 +139,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.bulk.update_structure(self.course_key, self.structure)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertConnCalls(call.insert_structure(self.structure))
|
||||
self.assertConnCalls(call.insert_structure(self.structure, self.course_key))
|
||||
|
||||
def test_write_multiple_structures_on_close(self):
|
||||
self.conn.get_course_index.return_value = None
|
||||
@@ -140,7 +151,10 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertItemsEqual(
|
||||
[call.insert_structure(self.structure), call.insert_structure(other_structure)],
|
||||
[
|
||||
call.insert_structure(self.structure, self.course_key),
|
||||
call.insert_structure(other_structure, self.course_key)
|
||||
],
|
||||
self.conn.mock_calls
|
||||
)
|
||||
|
||||
@@ -154,10 +168,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertConnCalls(
|
||||
call.insert_definition(self.definition),
|
||||
call.insert_definition(self.definition, self.course_key),
|
||||
call.update_course_index(
|
||||
{'versions': {self.course_key.branch: self.definition['_id']}},
|
||||
from_index=original_index
|
||||
from_index=original_index,
|
||||
course_context=self.course_key
|
||||
)
|
||||
)
|
||||
|
||||
@@ -173,11 +188,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertItemsEqual(
|
||||
[
|
||||
call.insert_definition(self.definition),
|
||||
call.insert_definition(other_definition),
|
||||
call.insert_definition(self.definition, self.course_key),
|
||||
call.insert_definition(other_definition, self.course_key),
|
||||
call.update_course_index(
|
||||
{'versions': {'a': self.definition['_id'], 'b': other_definition['_id']}},
|
||||
from_index=original_index
|
||||
from_index=original_index,
|
||||
course_context=self.course_key,
|
||||
)
|
||||
],
|
||||
self.conn.mock_calls
|
||||
@@ -190,7 +206,7 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.bulk.update_definition(self.course_key, self.definition)
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertConnCalls(call.insert_definition(self.definition))
|
||||
self.assertConnCalls(call.insert_definition(self.definition, self.course_key))
|
||||
|
||||
def test_write_multiple_definitions_on_close(self):
|
||||
self.conn.get_course_index.return_value = None
|
||||
@@ -202,7 +218,10 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertItemsEqual(
|
||||
[call.insert_definition(self.definition), call.insert_definition(other_definition)],
|
||||
[
|
||||
call.insert_definition(self.definition, self.course_key),
|
||||
call.insert_definition(other_definition, self.course_key)
|
||||
],
|
||||
self.conn.mock_calls
|
||||
)
|
||||
|
||||
@@ -216,10 +235,11 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.assertConnCalls()
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertConnCalls(
|
||||
call.insert_structure(self.structure),
|
||||
call.insert_structure(self.structure, self.course_key),
|
||||
call.update_course_index(
|
||||
{'versions': {self.course_key.branch: self.structure['_id']}},
|
||||
from_index=original_index
|
||||
from_index=original_index,
|
||||
course_context=self.course_key,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -235,11 +255,12 @@ class TestBulkWriteMixinClosed(TestBulkWriteMixin):
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.assertItemsEqual(
|
||||
[
|
||||
call.insert_structure(self.structure),
|
||||
call.insert_structure(other_structure),
|
||||
call.insert_structure(self.structure, self.course_key),
|
||||
call.insert_structure(other_structure, self.course_key),
|
||||
call.update_course_index(
|
||||
{'versions': {'a': self.structure['_id'], 'b': other_structure['_id']}},
|
||||
from_index=original_index
|
||||
from_index=original_index,
|
||||
course_context=self.course_key,
|
||||
)
|
||||
],
|
||||
self.conn.mock_calls
|
||||
@@ -408,7 +429,7 @@ class TestBulkWriteMixinFindMethods(TestBulkWriteMixin):
|
||||
results = self.bulk.get_definitions(self.course_key, search_ids)
|
||||
definitions_gotten = list(set(search_ids) - set(active_ids))
|
||||
if len(definitions_gotten) > 0:
|
||||
self.conn.get_definitions.assert_called_once_with(definitions_gotten)
|
||||
self.conn.get_definitions.assert_called_once_with(definitions_gotten, self.course_key)
|
||||
else:
|
||||
# If no definitions to get, then get_definitions() should *not* have been called.
|
||||
self.assertEquals(self.conn.get_definitions.call_count, 0)
|
||||
@@ -677,8 +698,12 @@ class TestBulkWriteMixinOpen(TestBulkWriteMixin):
|
||||
index_copy['versions']['draft'] = index['versions']['published']
|
||||
self.bulk.update_course_index(self.course_key, index_copy)
|
||||
self.bulk._end_bulk_operation(self.course_key)
|
||||
self.conn.insert_structure.assert_called_once_with(published_structure)
|
||||
self.conn.update_course_index.assert_called_once_with(index_copy, from_index=self.conn.get_course_index.return_value)
|
||||
self.conn.insert_structure.assert_called_once_with(published_structure, self.course_key)
|
||||
self.conn.update_course_index.assert_called_once_with(
|
||||
index_copy,
|
||||
from_index=self.conn.get_course_index.return_value,
|
||||
course_context=self.course_key,
|
||||
)
|
||||
self.conn.get_course_index.assert_called_once_with(self.course_key)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user