Files
edx-platform/xmodule/modulestore/tests/test_split_modulestore.py
2026-01-07 13:30:53 +05:00

2093 lines
97 KiB
Python

"""
Test split modulestore w/o using any django stuff.
"""
import datetime
import os
import random
import re
import unittest
from importlib import import_module
from unittest.mock import patch
import pytest
import ddt
from ccx_keys.locator import CCXBlockUsageLocator
from django.core.cache import InvalidCacheBackendError, caches
from opaque_keys.edx.locator import BlockUsageLocator, CourseKey, CourseLocator, LocalId
from xblock.fields import Date, Reference, ReferenceList, ReferenceValueDict, Timedelta
from openedx.core.djangolib.testing.utils import CacheIsolationMixin
from openedx.core.lib import tempdir
from openedx.core.lib.tests import attr
from xmodule.course_block import CourseBlock
from xmodule.modulestore import ModuleStoreEnum
from xmodule.modulestore.edit_info import EditInfoMixin
from xmodule.modulestore.exceptions import (
DuplicateCourseError,
DuplicateItemError,
InsufficientSpecificationError,
ItemNotFoundError,
VersionConflictError
)
from xmodule.modulestore.inheritance import InheritanceMixin
from xmodule.modulestore.split_mongo import BlockKey
from xmodule.modulestore.split_mongo.mongo_connection import CourseStructureCache
from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore
from xmodule.modulestore.tests.factories import check_mongo_calls
from xmodule.modulestore.tests.mongo_connection import MONGO_HOST, MONGO_PORT_NUM
from xmodule.modulestore.tests.test_modulestore import check_has_course_method
from xmodule.tabs import CourseTab
from xmodule.x_module import XModuleMixin
BRANCH_NAME_DRAFT = ModuleStoreEnum.BranchName.draft
BRANCH_NAME_PUBLISHED = ModuleStoreEnum.BranchName.published
TEST_USER_ID = ModuleStoreEnum.UserID.test
# Other user IDs for use in these tests:
TEST_OTHER_USER_ID = ModuleStoreEnum.UserID.test - 10
TEST_GUEST_USER_ID = ModuleStoreEnum.UserID.test - 11
TEST_ASSISTANT_USER_ID = ModuleStoreEnum.UserID.test - 12
@attr('mongo')
@pytest.mark.django_db
class SplitModuleTest(unittest.TestCase):
'''
The base set of tests manually populates a db w/ courses which have
versions. It creates unique collection names and removes them after all
tests finish.
'''
# Snippets of what would be in the django settings envs file
DOC_STORE_CONFIG = {
'host': MONGO_HOST,
'db': f'test_xmodule_{os.getpid()}',
'port': MONGO_PORT_NUM,
'collection': 'modulestore',
}
modulestore_options = {
'default_class': 'xmodule.hidden_block.HiddenBlock',
'fs_root': tempdir.mkdtemp_clean(),
'xblock_mixins': (InheritanceMixin, XModuleMixin, EditInfoMixin)
}
MODULESTORE = {
'ENGINE': 'xmodule.modulestore.split_mongo.split.SplitMongoModuleStore',
'DOC_STORE_CONFIG': DOC_STORE_CONFIG,
'OPTIONS': modulestore_options
}
modulestore = None
_date_field = Date()
_time_delta_field = Timedelta()
COURSE_CONTENT = {
"testx.GreekHero": {
"org": "testx",
"course": "GreekHero",
"run": "run",
"root_block_id": "head12345",
"user_id": TEST_USER_ID,
"fields": {
"tabs": [
CourseTab.load('courseware'),
CourseTab.load('discussion'),
CourseTab.load('wiki'),
],
"start": _date_field.from_json("2013-02-14T05:00"),
"display_name": "The Ancient Greek Hero",
"grading_policy": {
"GRADER": [
{
"min_count": 5,
"weight": 0.15,
"type": "Homework",
"drop_count": 1,
"short_label": "HWa"
},
{
"short_label": "",
"min_count": 2,
"type": "Lab",
"drop_count": 0,
"weight": 0.15
},
{
"short_label": "Midterm",
"min_count": 1,
"type": "Midterm Exam",
"drop_count": 0,
"weight": 0.3
},
{
"short_label": "Final",
"min_count": 1,
"type": "Final Exam",
"drop_count": 0,
"weight": 0.4
}
],
"GRADE_CUTOFFS": {
"Pass": 0.75
},
},
},
"revisions": [
{
"user_id": TEST_ASSISTANT_USER_ID,
"update": {
("course", "head12345"): {
"end": _date_field.from_json("2013-04-13T04:30"),
"tabs": [
CourseTab.load('courseware'),
CourseTab.load('discussion'),
CourseTab.load('wiki'),
CourseTab.load(
'static_tab', name="Syllabus", url_slug="01356a17b5924b17a04b7fc2426a3798"
),
CourseTab.load(
'static_tab', name="Advice for Students", url_slug="57e9991c0d794ff58f7defae3e042e"
),
],
"graceperiod": _time_delta_field.from_json("2 hours 0 minutes 0 seconds"),
"grading_policy": {
"GRADER": [
{
"min_count": 5,
"weight": 0.15,
"type": "Homework",
"drop_count": 1,
"short_label": "HWa"
},
{
"short_label": "",
"min_count": 12,
"type": "Lab",
"drop_count": 2,
"weight": 0.15
},
{
"short_label": "Midterm",
"min_count": 1,
"type": "Midterm Exam",
"drop_count": 0,
"weight": 0.3
},
{
"short_label": "Final",
"min_count": 1,
"type": "Final Exam",
"drop_count": 0,
"weight": 0.4
}
],
"GRADE_CUTOFFS": {
"Pass": 0.55
}
},
}
}
},
{
"user_id": TEST_ASSISTANT_USER_ID,
"update": {
("course", "head12345"): {
"end": _date_field.from_json("2013-06-13T04:30"),
"grading_policy": {
"GRADER": [
{
"min_count": 4,
"weight": 0.15,
"type": "Homework",
"drop_count": 2,
"short_label": "HWa"
},
{
"short_label": "",
"min_count": 12,
"type": "Lab",
"drop_count": 2,
"weight": 0.15
},
{
"short_label": "Midterm",
"min_count": 1,
"type": "Midterm Exam",
"drop_count": 0,
"weight": 0.3
},
{
"short_label": "Final",
"min_count": 1,
"type": "Final Exam",
"drop_count": 0,
"weight": 0.4
}
],
"GRADE_CUTOFFS": {
"Pass": 0.45
}
},
"enrollment_start": _date_field.from_json("2013-01-01T05:00"),
"enrollment_end": _date_field.from_json("2013-03-02T05:00"),
"advertised_start": "Fall 2013",
}
},
"create": [
{
"id": "chapter1",
"parent": "head12345",
"parent_type": "course",
"category": "chapter",
"fields": {
"display_name": "Hercules"
},
},
{
"id": "chap",
"parent": "head12345",
"parent_type": "course",
"category": "chapter",
"fields": {
"display_name": "Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo"
},
},
{
"id": "chapter2",
"parent": "head12345",
"parent_type": "course",
"category": "chapter",
"fields": {
"display_name": "Hera heckles Hercules"
},
},
{
"id": "chapter3",
"parent": "head12345",
"parent_type": "course",
"category": "chapter",
"fields": {
"display_name": "Hera cuckolds Zeus"
},
},
{
"id": "problem1",
"parent": "chapter3",
"parent_type": "chapter",
"category": "problem",
"fields": {
"display_name": "Problem 3.1",
"graceperiod": _time_delta_field.from_json("4 hours 0 minutes 0 seconds"),
},
},
{
"id": "problem3_2",
"parent": "chapter3",
"parent_type": "chapter",
"category": "problem",
"fields": {
"display_name": "Problem 3.2"
},
},
{
"id": "problem32",
"parent": "chapter3",
"parent_type": "chapter",
"category": "problem",
"fields": {
"display_name": "Problem 3.3",
"group_access": {"3": ["33"]},
},
}
]
},
]
},
"testx.wonderful": {
"org": "testx",
"course": "wonderful",
"run": "run",
"root_block_id": "head23456",
"user_id": TEST_USER_ID,
"fields": {
"tabs": [
CourseTab.load('courseware'),
CourseTab.load('discussion'),
CourseTab.load('wiki'),
],
"start": _date_field.from_json("2013-02-14T05:00"),
"display_name": "A wonderful course",
"grading_policy": {
"GRADER": [
{
"min_count": 14,
"weight": 0.25,
"type": "Homework",
"drop_count": 1,
"short_label": "HWa"
},
{
"short_label": "",
"min_count": 12,
"type": "Lab",
"drop_count": 2,
"weight": 0.25
},
{
"short_label": "Midterm",
"min_count": 1,
"type": "Midterm Exam",
"drop_count": 0,
"weight": 0.2
},
{
"short_label": "Final",
"min_count": 1,
"type": "Final Exam",
"drop_count": 0,
"weight": 0.3
}
],
"GRADE_CUTOFFS": {
"Pass": 0.95
}
},
},
"revisions": [
{
"user_id": TEST_USER_ID,
"update": {
("course", "head23456"): {
"display_name": "The most wonderful course",
"grading_policy": {
"GRADER": [
{
"min_count": 14,
"weight": 0.25,
"type": "Homework",
"drop_count": 1,
"short_label": "HWa"
},
{
"short_label": "",
"min_count": 12,
"type": "Lab",
"drop_count": 2,
"weight": 0.25
},
{
"short_label": "Midterm",
"min_count": 1,
"type": "Midterm Exam",
"drop_count": 0,
"weight": 0.2
},
{
"short_label": "Final",
"min_count": 1,
"type": "Final Exam",
"drop_count": 0,
"weight": 0.3
}
],
"GRADE_CUTOFFS": {
"Pass": 0.45
}
},
}
}
}
]
},
"guestx.contender": {
"org": "guestx",
"course": "contender",
"run": "run",
"root_block_id": "head345679",
"user_id": TEST_GUEST_USER_ID,
"fields": {
"tabs": [
CourseTab.load('courseware'),
CourseTab.load('discussion'),
CourseTab.load('wiki'),
],
"start": _date_field.from_json("2013-03-14T05:00"),
"display_name": "Yet another contender",
"grading_policy": {
"GRADER": [
{
"min_count": 4,
"weight": 0.25,
"type": "Homework",
"drop_count": 0,
"short_label": "HW"
},
{
"short_label": "Midterm",
"min_count": 1,
"type": "Midterm Exam",
"drop_count": 0,
"weight": 0.4
},
{
"short_label": "Final",
"min_count": 1,
"type": "Final Exam",
"drop_count": 0,
"weight": 0.35
}
],
"GRADE_CUTOFFS": {
"Pass": 0.25
}
},
}
},
}
@staticmethod
def bootstrapDB(split_store): # pylint: disable=invalid-name
'''
Sets up the initial data into the db
'''
for _course_id, course_spec in SplitModuleTest.COURSE_CONTENT.items():
course = split_store.create_course(
course_spec['org'],
course_spec['course'],
course_spec['run'],
course_spec['user_id'],
master_branch=BRANCH_NAME_DRAFT,
fields=course_spec['fields'],
root_block_id=course_spec['root_block_id']
)
for revision in course_spec.get('revisions', []):
for (block_type, block_id), fields in revision.get('update', {}).items():
# cheat since course is most frequent
if course.location.block_id == block_id:
block = course
else:
# not easy to figure out the category but get_item won't care
block_usage = BlockUsageLocator.make_relative(course.location, block_type, block_id)
block = split_store.get_item(block_usage)
for key, value in fields.items():
setattr(block, key, value)
# create new blocks into dag: parent must already exist; thus, order is important
new_ele_dict = {}
for spec in revision.get('create', []):
if spec['parent'] in new_ele_dict:
parent = new_ele_dict.get(spec['parent'])
elif spec['parent'] == course.location.block_id:
parent = course
else:
block_usage = BlockUsageLocator.make_relative(course.location, spec['parent_type'], spec['parent']) # lint-amnesty, pylint: disable=line-too-long
parent = split_store.get_item(block_usage)
block_id = LocalId(spec['id'])
child = split_store.create_xblock(
course.runtime, course.id, spec['category'], block_id, spec['fields'], parent_xblock=parent
)
new_ele_dict[spec['id']] = child
course = split_store.persist_xblock_dag(course, revision['user_id'])
# publish "testx.wonderful"
source_course = CourseLocator(org="testx", course="wonderful", run="run", branch=BRANCH_NAME_DRAFT)
to_publish = BlockUsageLocator(
source_course,
block_type='course',
block_id="head23456"
)
destination = CourseLocator(org="testx", course="wonderful", run="run", branch=BRANCH_NAME_PUBLISHED)
split_store.copy(TEST_USER_ID, source_course, destination, [to_publish], None)
def setUp(self):
super().setUp()
self.user_id = random.getrandbits(32)
def tearDown(self):
"""
Clear persistence between each test.
"""
if SplitModuleTest.modulestore:
modulestore()._drop_database(database=False, connections=False) # pylint: disable=protected-access
# drop the modulestore to force re init
SplitModuleTest.modulestore = None
super().tearDown()
def findByIdInResult(self, collection, _id): # pylint: disable=invalid-name
"""
Result is a collection of blocks. Find the one whose block id
matches the _id.
"""
for element in collection:
if element.location.block_id == _id:
return element
class TestHasChildrenAtDepth(SplitModuleTest):
"""Test the has_children_at_depth method of XModuleMixin. """
def test_has_children_at_depth(self):
course_locator = CourseLocator(
org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT
)
block_locator = BlockUsageLocator(
course_locator, 'course', 'head12345'
)
block = modulestore().get_item(block_locator)
self.assertRaises(
ValueError, block.has_children_at_depth, -1,
)
assert block.has_children_at_depth(0)
assert block.has_children_at_depth(1)
assert not block.has_children_at_depth(2)
ch1 = modulestore().get_item(
BlockUsageLocator(course_locator, 'chapter', block_id='chapter1')
)
assert not ch1.has_children_at_depth(0)
ch2 = modulestore().get_item(
BlockUsageLocator(course_locator, 'chapter', block_id='chapter2')
)
assert not ch2.has_children_at_depth(0)
ch3 = modulestore().get_item(
BlockUsageLocator(course_locator, 'chapter', block_id='chapter3')
)
assert ch3.has_children_at_depth(0)
assert not ch3.has_children_at_depth(1)
@ddt.ddt
class SplitModuleCourseTests(SplitModuleTest):
'''
Course CRUD operation tests
'''
def test_get_courses(self):
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT)
# should have gotten 3 draft courses
assert len(courses) == 3, 'Wrong number of courses'
# check metadata -- NOTE no promised order
course = self.findByIdInResult(courses, "head12345")
assert course.location.org == 'testx'
assert course.category == 'course', 'wrong category'
assert len(course.tabs) == 5, 'wrong number of tabs'
assert course.display_name == 'The Ancient Greek Hero', 'wrong display name'
assert course.advertised_start == 'Fall 2013', 'advertised_start'
assert len(course.children) == 4, 'children'
# check dates and graders--forces loading of block
assert course.edited_by == TEST_ASSISTANT_USER_ID
self.assertDictEqual(course.grade_cutoffs, {"Pass": 0.45})
def test_get_courses_with_same_course_index(self):
"""
Test that if two courses point to same course index,
`get_courses` should return both courses.
"""
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT)
# Should have gotten 3 draft courses.
assert len(courses) == 3
course_index = modulestore().get_course_index_info(courses[0].id)
# Creating a new course with same course index of another course.
new_draft_course = modulestore().create_course(
'testX', 'rerun_2.0', 'run_q2', 1, BRANCH_NAME_DRAFT, versions_dict=course_index['versions']
)
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT)
# Should have gotten 4 draft courses.
assert len(courses) == 4
assert new_draft_course.id.version_agnostic() in [c.id for c in courses]
def test_get_org_courses(self):
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT, org='guestx')
# should have gotten 1 draft courses
assert len(courses) == 1
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT, org='testx')
# should have gotten 2 draft courses
assert len(courses) == 2
# although this is already covered in other tests, let's
# also not pass in org= parameter to make sure we get back
# 3 courses
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT)
assert len(courses) == 3
def test_branch_requests(self):
# query w/ branch qualifier (both draft and published)
def _verify_published_course(courses_published):
""" Helper function for verifying published course. """
assert len(courses_published) == 1, len(courses_published)
course = self.findByIdInResult(courses_published, "head23456")
assert course is not None, 'published courses'
assert course.location.course_key.org == 'testx'
assert course.location.course_key.course == 'wonderful'
assert course.category == 'course', 'wrong category'
assert len(course.tabs) == 3, 'wrong number of tabs'
assert course.display_name == 'The most wonderful course', course.display_name
assert course.advertised_start is None
assert len(course.children) == 0, 'children'
_verify_published_course(modulestore().get_courses(branch=BRANCH_NAME_PUBLISHED))
def test_has_course(self):
'''
Test the various calling forms for has_course
'''
check_has_course_method(
modulestore(),
CourseLocator(org='testx', course='wonderful', run="run", branch=BRANCH_NAME_DRAFT),
locator_key_fields=['org', 'course', 'run']
)
def test_get_course(self):
'''
Test the various calling forms for get_course
'''
locator = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
head_course = modulestore().get_course(locator)
assert head_course.location.version_guid != head_course.previous_version
locator = CourseLocator(version_guid=head_course.previous_version)
course = modulestore().get_course(locator)
assert course.location.course_key.org is None
assert course.location.version_guid == head_course.previous_version
assert course.category == 'course'
assert len(course.tabs) == 5
assert course.display_name == 'The Ancient Greek Hero'
assert course.graceperiod == datetime.timedelta(hours=2)
assert course.advertised_start is None
assert len(course.children) == 0
assert course.definition_locator.definition_id != head_course.definition_locator.definition_id
# check dates and graders--forces loading of block
assert course.edited_by == TEST_ASSISTANT_USER_ID
self.assertDictEqual(course.grade_cutoffs, {"Pass": 0.55})
locator = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
course = modulestore().get_course(locator)
assert course.location.course_key.org == 'testx'
assert course.location.course_key.course == 'GreekHero'
assert course.location.course_key.run == 'run'
assert course.category == 'course'
assert len(course.tabs) == 5
assert course.display_name == 'The Ancient Greek Hero'
assert course.advertised_start == 'Fall 2013'
assert len(course.children) == 4
# check dates and graders--forces loading of block
assert course.edited_by == TEST_ASSISTANT_USER_ID
self.assertDictEqual(course.grade_cutoffs, {"Pass": 0.45})
locator = CourseLocator(org='testx', course='wonderful', run="run", branch=BRANCH_NAME_PUBLISHED)
course = modulestore().get_course(locator)
published_version = course.location.version_guid
locator = CourseLocator(org='testx', course='wonderful', run="run", branch=BRANCH_NAME_DRAFT)
course = modulestore().get_course(locator)
assert course.location.version_guid != published_version
def test_get_course_negative(self):
# Now negative testing
with pytest.raises(InsufficientSpecificationError):
modulestore().get_course(CourseLocator(org='edu', course='meh', run='blah'))
with pytest.raises(ItemNotFoundError):
modulestore().get_course(CourseLocator(org='edu', course='nosuchthing', run="run", branch=BRANCH_NAME_DRAFT)) # lint-amnesty, pylint: disable=line-too-long
with pytest.raises(ItemNotFoundError):
modulestore().get_course(CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_PUBLISHED)) # lint-amnesty, pylint: disable=line-too-long
def test_cache(self):
"""
Test that the mechanics of caching work.
"""
locator = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
course = modulestore().get_course(locator)
block_map = modulestore().cache_items(
course.runtime, [BlockKey.from_usage_key(child) for child in course.children], course.id, depth=3
)
assert BlockKey('chapter', 'chapter1') in block_map
assert BlockKey('problem', 'problem3_2') in block_map
def test_persist_dag(self):
"""
try saving temporary xblocks
"""
test_course = modulestore().create_course(
course='course', run='2014', org='testx',
display_name='fun test course', user_id=TEST_OTHER_USER_ID,
master_branch=ModuleStoreEnum.BranchName.draft
)
test_chapter = modulestore().create_xblock(
test_course.runtime, test_course.id, 'chapter', fields={'display_name': 'chapter n'},
parent_xblock=test_course
)
assert test_chapter.display_name == 'chapter n'
test_def_content = '<problem>boo</problem>'
# create child
new_block = modulestore().create_xblock(
test_course.runtime, test_course.id,
'problem',
fields={
'data': test_def_content,
'display_name': 'problem'
},
parent_xblock=test_chapter
)
assert new_block.definition_locator is not None
assert isinstance(new_block.definition_locator.definition_id, LocalId)
# better to pass in persisted parent over the subdag so
# subdag gets the parent pointer (otherwise 2 ops, persist dag, update parent children,
# persist parent
persisted_course = modulestore().persist_xblock_dag(test_course, TEST_OTHER_USER_ID)
assert len(persisted_course.children) == 1
persisted_chapter = persisted_course.get_children()[0]
assert persisted_chapter.category == 'chapter'
assert persisted_chapter.display_name == 'chapter n'
assert len(persisted_chapter.children) == 1
persisted_problem = persisted_chapter.get_children()[0]
assert persisted_problem.category == 'problem'
assert persisted_problem.data == test_def_content
# update it
persisted_problem.display_name = 'altered problem'
persisted_problem = modulestore().update_item(persisted_problem, TEST_OTHER_USER_ID)
assert persisted_problem.display_name == 'altered problem'
@ddt.data(
("course-v1:edx+test_course+test_run", BlockUsageLocator),
("ccx-v1:edX+test_course+test_run+ccx@1", CCXBlockUsageLocator),
)
@ddt.unpack
def test_make_course_usage_key(self, course_id, root_block_cls):
"""Test that we get back the appropriate usage key for the root of a course key.
In particular, we want to make sure that it properly handles CCX courses.
"""
course_key = CourseKey.from_string(course_id)
root_block_key = modulestore().make_course_usage_key(course_key)
assert isinstance(root_block_key, root_block_cls)
assert root_block_key.block_type == 'course'
assert root_block_key.block_id == 'course'
class TestCourseStructureCache(CacheIsolationMixin, SplitModuleTest):
"""Tests for the CourseStructureCache"""
# CacheIsolationMixin will reset the cache between test cases
# We'll use the "default" cache as a valid cache, and the "course_structure_cache" as a dummy cache
ENABLED_CACHES = ["default"]
def setUp(self):
# make a new course:
self.user = random.getrandbits(32)
self.new_course = modulestore().create_course(
'org', 'course', 'test_run', self.user, BRANCH_NAME_DRAFT,
)
super().setUp()
@patch('xmodule.modulestore.split_mongo.mongo_connection.get_cache')
def test_course_structure_cache(self, mock_get_cache):
# force get_cache to return the default cache so we can test
# its caching behavior
enabled_cache = caches['default']
mock_get_cache.return_value = enabled_cache
with check_mongo_calls(1):
not_cached_structure = self._get_structure(self.new_course)
# when cache is warmed, we should have one fewer mongo call
with check_mongo_calls(0):
cached_structure = self._get_structure(self.new_course)
# now make sure that you get the same structure
assert cached_structure == not_cached_structure
# If data is corrupted, get it from mongo again.
cache_key = self.new_course.id.version_guid
enabled_cache.set(cache_key, b"bad_data")
with check_mongo_calls(1):
not_corrupt_structure = self._get_structure(self.new_course)
# now make sure that you get the same structure
assert not_corrupt_structure == not_cached_structure
@patch('xmodule.modulestore.split_mongo.mongo_connection.get_cache')
def test_course_structure_cache_no_cache_configured(self, mock_get_cache):
mock_get_cache.side_effect = InvalidCacheBackendError
with check_mongo_calls(1):
not_cached_structure = self._get_structure(self.new_course)
# if the cache isn't configured, we expect to have to make
# another mongo call here if we want the same course structure
with check_mongo_calls(1):
cached_structure = self._get_structure(self.new_course)
# now make sure that you get the same structure
assert cached_structure == not_cached_structure
def test_dummy_cache(self):
with check_mongo_calls(1):
not_cached_structure = self._get_structure(self.new_course)
# Since the test is using the dummy cache, it's not actually caching
# anything
with check_mongo_calls(1):
cached_structure = self._get_structure(self.new_course)
# now make sure that you get the same structure
assert cached_structure == not_cached_structure
@patch('xmodule.modulestore.split_mongo.mongo_connection.monitoring.set_custom_attribute')
@patch('django.core.cache.cache.set')
@patch('xmodule.modulestore.split_mongo.mongo_connection.get_cache')
def test_course_structure_cache_with_data_chunk_greater_than_two_mb(self, mock_get_cache, mock_set_cache,
mock_set_custom_attribute):
enabled_cache = caches['default']
mock_get_cache.return_value = enabled_cache
course_cache = CourseStructureCache()
size = 3000000000
# this data_chunk will be compressed before being cached
data_chunk = b'\x00' * size
course_cache.set('my_data_chunk', data_chunk)
mock_set_cache.assert_not_called()
mock_set_custom_attribute.assert_called()
@patch('xmodule.modulestore.split_mongo.mongo_connection.monitoring.set_custom_attribute')
@patch('django.core.cache.cache.set')
@patch('xmodule.modulestore.split_mongo.mongo_connection.get_cache')
def test_course_structure_cache_with_data_chunk_lesser_than_two_mb(self, mock_get_cache, mock_set_cache,
mock_set_custom_attribute):
enabled_cache = caches['default']
mock_get_cache.return_value = enabled_cache
course_cache = CourseStructureCache()
size = 30000
data_chunk = b'\x00' * size
course_cache.set('my_data_chunk', data_chunk)
mock_set_cache.assert_called()
mock_set_custom_attribute.assert_not_called()
def _get_structure(self, course):
"""
Helper function to get a structure from a course.
"""
return modulestore().db_connection.get_structure(
course.location.as_object_id(course.location.version_guid)
)
class SplitModuleItemTests(SplitModuleTest):
'''
Item read tests including inheritance
'''
def test_has_item(self):
'''
has_item(BlockUsageLocator)
'''
org = 'testx'
course = 'GreekHero'
run = 'run'
course_locator = CourseLocator(org=org, course=course, run=run, branch=BRANCH_NAME_DRAFT)
course = modulestore().get_course(course_locator)
previous_version = course.previous_version
# positive tests of various forms
locator = course.location.map_into_course(CourseLocator(version_guid=previous_version))
assert modulestore().has_item(locator), ("couldn't find in %s" % previous_version)
locator = course.location.version_agnostic()
assert modulestore().has_item(locator)
assert not modulestore()\
.has_item(BlockUsageLocator(locator.course_key.for_branch(BRANCH_NAME_PUBLISHED),
block_type=locator.block_type,
block_id=locator.block_id)), 'found in published head'
# not a course obj
locator = BlockUsageLocator(course_locator, block_type='chapter', block_id='chapter1')
assert modulestore().has_item(locator), "couldn't find chapter1"
# in published course
locator = BlockUsageLocator(
CourseLocator(org="testx", course="wonderful", run="run", branch=BRANCH_NAME_DRAFT),
block_type="course",
block_id="head23456"
)
assert modulestore().has_item(locator.for_branch(BRANCH_NAME_PUBLISHED))
def test_negative_has_item(self):
# negative tests--not found
# no such course or block
locator = BlockUsageLocator(
CourseLocator(org="foo", course="doesnotexist", run="run", branch=BRANCH_NAME_DRAFT),
block_type="course",
block_id="head23456"
)
assert not modulestore().has_item(locator)
locator = BlockUsageLocator(
CourseLocator(org="testx", course="wonderful", run="run", branch=BRANCH_NAME_DRAFT),
block_type="vertical",
block_id="doesnotexist"
)
assert not modulestore().has_item(locator)
def test_get_item(self):
'''
get_item(blocklocator)
'''
hero_locator = CourseLocator(org="testx", course="GreekHero", run="run", branch=BRANCH_NAME_DRAFT)
course = modulestore().get_course(hero_locator)
previous_version = course.previous_version
# positive tests of various forms
locator = course.location.map_into_course(CourseLocator(version_guid=previous_version))
block = modulestore().get_item(locator)
assert isinstance(block, CourseBlock)
assert isinstance(modulestore().get_item(locator), CourseBlock)
def verify_greek_hero(block):
"""
Check contents of block
"""
assert block.location.org == 'testx'
assert block.location.course == 'GreekHero'
assert block.location.run == 'run'
assert len(block.tabs) == 5, 'wrong number of tabs'
assert block.display_name == 'The Ancient Greek Hero'
assert block.advertised_start == 'Fall 2013'
assert len(block.children) == 4
# check dates and graders--forces loading of block
assert block.edited_by == TEST_ASSISTANT_USER_ID
self.assertDictEqual(
block.grade_cutoffs, {"Pass": 0.45},
)
verify_greek_hero(modulestore().get_item(course.location))
# try to look up other branches
with pytest.raises(ItemNotFoundError):
modulestore().get_item(course.location.for_branch(BRANCH_NAME_PUBLISHED))
def test_get_non_root(self):
# not a course obj
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'chapter', 'chapter1'
)
block = modulestore().get_item(locator)
assert block.location.org == 'testx'
assert block.location.course == 'GreekHero'
assert block.category == 'chapter'
assert block.display_name == 'Hercules'
assert block.edited_by == TEST_ASSISTANT_USER_ID
# in published course
locator = BlockUsageLocator(
CourseLocator(org='testx', course='wonderful', run="run", branch=BRANCH_NAME_PUBLISHED), 'course', 'head23456' # lint-amnesty, pylint: disable=line-too-long
)
assert isinstance(modulestore().get_item(locator), CourseBlock)
# negative tests--not found
# no such course or block
locator = BlockUsageLocator(
CourseLocator(org='doesnotexist', course='doesnotexist', run="run", branch=BRANCH_NAME_DRAFT), 'course', 'head23456' # lint-amnesty, pylint: disable=line-too-long
)
with pytest.raises(ItemNotFoundError):
modulestore().get_item(locator)
locator = BlockUsageLocator(
CourseLocator(org='testx', course='wonderful', run="run", branch=BRANCH_NAME_DRAFT), 'html', 'doesnotexist'
)
with pytest.raises(ItemNotFoundError):
modulestore().get_item(locator)
# pylint: disable=protected-access
def test_matching(self):
'''
test the block and value matches help functions
'''
assert modulestore()._value_matches('help', 'help')
assert not modulestore()._value_matches('help', 'Help')
assert modulestore()._value_matches(['distract', 'help', 'notme'], 'help')
assert not modulestore()._value_matches(['distract', 'Help', 'notme'], 'help')
assert not modulestore()._block_matches({'field': ['distract', 'Help', 'notme']}, {'field': 'help'})
assert modulestore()._block_matches({'field': ['distract', 'help', 'notme'], 'irrelevant': 2},
{'field': 'help'})
assert modulestore()._value_matches('I need some help', re.compile('help'))
assert modulestore()._value_matches(['I need some help', 'today'], re.compile('help'))
assert not modulestore()._value_matches('I need some help', re.compile('Help'))
assert modulestore()._value_matches(['I need some help', 'today'], re.compile('Help', re.IGNORECASE))
assert modulestore()._value_matches('gotcha', {'$in': ['a', 'bunch', 'of', 'gotcha']})
assert not modulestore()._value_matches('gotcha', {'$in': ['a', 'bunch', 'of', 'gotchas']})
assert not modulestore()._value_matches('gotcha', {'$nin': ['a', 'bunch', 'of', 'gotcha']})
assert modulestore()._value_matches('gotcha', {'$nin': ['a', 'bunch', 'of', 'gotchas']})
assert modulestore()._block_matches({'group_access': {'1': [1]}}, {'group_access': {'$exists': True}})
assert modulestore()._block_matches({'a': 1, 'b': 2}, {'group_access': {'$exists': False}})
assert modulestore()._block_matches({'a': 1, 'group_access': {'1': [1]}},
{'a': 1, 'group_access': {'$exists': True}})
assert not modulestore()._block_matches({'a': 1, 'group_access': {'1': [1]}},
{'a': 111, 'group_access': {'$exists': True}})
assert modulestore()._block_matches({'a': 1, 'b': 2}, {'a': 1, 'group_access': {'$exists': False}})
assert not modulestore()._block_matches({'a': 1, 'b': 2}, {'a': 9, 'group_access': {'$exists': False}})
assert modulestore()._block_matches({'a': 1, 'b': 2}, {'a': 1})
assert not modulestore()._block_matches({'a': 1, 'b': 2}, {'a': 2})
assert not modulestore()._block_matches({'a': 1, 'b': 2}, {'c': 1})
assert not modulestore()._block_matches({'a': 1, 'b': 2}, {'a': 1, 'c': 1})
assert modulestore()._block_matches({'a': 1, 'b': 2}, {'a': (lambda i: (0 < i < 2))})
def test_get_items(self):
'''
get_items(locator, qualifiers, [branch])
'''
locator = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
# get all blocks
matches = modulestore().get_items(locator)
assert len(matches) == 8
matches = modulestore().get_items(locator)
assert len(matches) == 8
matches = modulestore().get_items(locator, qualifiers={'category': 'chapter'})
assert len(matches) == 4
matches = modulestore().get_items(locator, qualifiers={'category': 'garbage'})
assert len(matches) == 0
# Test that we don't accidentally get an item with a similar name.
matches = modulestore().get_items(locator, qualifiers={'name': 'chapter1'})
assert len(matches) == 1
matches = modulestore().get_items(locator, qualifiers={'name': ['chapter1', 'chapter2']})
assert len(matches) == 2
matches = modulestore().get_items(
locator,
qualifiers={'category': 'chapter'},
settings={'display_name': re.compile(r'Hera')},
)
assert len(matches) == 2
matches = modulestore().get_items(locator, settings={'group_access': {'$exists': True}})
assert len(matches) == 1
matches = modulestore().get_items(locator, settings={'group_access': {'$exists': False}})
assert len(matches) == 7
def test_get_parents(self):
'''
get_parent_location(locator): BlockUsageLocator
'''
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT),
'chapter', block_id='chapter1'
)
parent = modulestore().get_parent_location(locator)
assert parent is not None
assert parent.block_id == 'head12345'
assert parent.org == 'testx'
assert parent.course == 'GreekHero'
locator = locator.course_key.make_usage_key('chapter', 'chapter2')
parent = modulestore().get_parent_location(locator)
assert parent is not None
assert parent.block_id == 'head12345'
locator = locator.course_key.make_usage_key('garbage', 'nosuchblock')
parent = modulestore().get_parent_location(locator)
assert parent is None
def test_get_children(self):
"""
Test the existing get_children method on xblocks
"""
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'course', 'head12345'
)
block = modulestore().get_item(locator)
children = block.get_children()
expected_ids = [
"chapter1", "chap", "chapter2", "chapter3"
]
for child in children:
assert child.category == 'chapter'
assert child.location.block_id in expected_ids
expected_ids.remove(child.location.block_id)
assert len(expected_ids) == 0
def version_agnostic(children):
"""
children: list of blocks
Returns the `children` list with each member version-agnostic
"""
return [child.version_agnostic() for child in children]
class TestItemCrud(SplitModuleTest):
"""
Test create update and delete of items
"""
# DHM do I need to test this case which I believe won't work:
# 1) fetch a course and some of its blocks
# 2) do a series of CRUD operations on those previously fetched elements
# The problem here will be that the version_guid of the items will be the version at time of fetch.
# Each separate save will change the head version; so, the 2nd piecemeal change will flag the version
# conflict. That is, if versions are v0..vn and start as v0 in initial fetch, the first CRUD op will
# say it's changing an object from v0, splitMongo will process it and make the current head v1, the next
# crud op will pass in its v0 element and splitMongo will flag the version conflict.
# What I don't know is how realistic this test is and whether to wrap the modulestore with a higher level
# transactional operation which manages the version change or make the threading cache reason out whether or
# not the changes are independent and additive and thus non-conflicting.
# A use case I expect is
# (client) change this metadata
# (server) done, here's the new info which, btw, updates the course version to v1
# (client) add these children to this other node (which says it came from v0 or
# will the client have refreshed the version before doing the op?)
# In this case, having a server side transactional model won't help b/c the bug is a long-transaction on the
# on the client where it would be a mistake for the server to assume anything about client consistency. The best
# the server could do would be to see if the parent's children changed at all since v0.
def test_create_minimal_item(self):
"""
create_item(user, location, category, definition_locator=None, fields): new_desciptor
"""
# grab link to course to ensure new versioning works
locator = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
premod_course = modulestore().get_course(locator)
premod_history = modulestore().get_course_history_info(locator)
# add minimal one w/o a parent
category = 'sequential'
new_block = modulestore().create_item(
'user123', locator, category,
fields={'display_name': 'new sequential'}
)
# check that course version changed and course's previous is the other one
assert new_block.location.course == 'GreekHero'
assert new_block.location.version_guid != premod_course.location.version_guid
assert locator.version_guid is None,\
'Version inadvertently filled in' # lint-amnesty, pylint: disable=no-member
current_course = modulestore().get_course(locator)
assert new_block.location.version_guid == current_course.location.version_guid
history_info = modulestore().get_course_history_info(current_course.location.course_key)
assert history_info['previous_version'] == premod_course.location.version_guid
assert history_info['original_version'] == premod_history['original_version']
assert history_info['edited_by'] == 'user123'
# check block's info: category, definition_locator, and display_name
assert new_block.category == 'sequential'
assert new_block.definition_locator is not None
assert new_block.display_name == 'new sequential'
# check that block does not exist in previous version
locator = new_block.location.map_into_course(
CourseLocator(version_guid=premod_course.location.version_guid)
)
with pytest.raises(ItemNotFoundError):
modulestore().get_item(locator)
def test_create_parented_item(self):
"""
Test create_item w/ specifying the parent of the new item
"""
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT),
'chapter', block_id='chapter2'
)
original = modulestore().get_item(locator)
locator = BlockUsageLocator(
CourseLocator(org='testx', course='wonderful', run="run", branch=BRANCH_NAME_DRAFT), 'course', 'head23456'
)
premod_course = modulestore().get_course(locator.course_key)
category = 'chapter'
new_block = modulestore().create_child(
'user123', locator, category,
fields={'display_name': 'new chapter'},
definition_locator=original.definition_locator
)
# check that course version changed and course's previous is the other one
assert new_block.location.version_guid != premod_course.location.version_guid
parent = modulestore().get_item(locator)
assert new_block.location.version_agnostic() in version_agnostic(parent.children)
assert new_block.definition_locator.definition_id == original.definition_locator.definition_id
def test_unique_naming(self):
"""
Check that 2 blocks of same type get unique block_ids. Also check that if creation provides
a definition id and new def data that it branches the definition in the db.
Actually, this tries to test all create_item features not tested above.
"""
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT),
'problem', block_id='problem1'
)
original = modulestore().get_item(locator)
locator = BlockUsageLocator(
CourseLocator(org='guestx', course='contender', run="run", branch=BRANCH_NAME_DRAFT), 'course', 'head345679'
)
category = 'problem'
new_payload = "<problem>empty</problem>"
new_block = modulestore().create_child(
'anotheruser', locator, category,
fields={'display_name': 'problem 1', 'data': new_payload},
)
another_payload = "<problem>not empty</problem>"
another_block = modulestore().create_child(
'anotheruser', locator, category,
fields={'display_name': 'problem 2', 'data': another_payload},
definition_locator=original.definition_locator,
)
# check that course version changed and course's previous is the other one
parent = modulestore().get_item(locator)
assert new_block.location.block_id != another_block.location.block_id
assert new_block.location.version_agnostic() in version_agnostic(parent.children)
assert another_block.location.version_agnostic() in version_agnostic(parent.children)
assert new_block.data == new_payload
assert another_block.data == another_payload
# check definition histories
new_history = modulestore().get_definition_history_info(new_block.definition_locator)
assert new_history['previous_version'] is None
assert new_history['original_version'] == new_block.definition_locator.definition_id
assert new_history['edited_by'] == 'anotheruser'
another_history = modulestore().get_definition_history_info(another_block.definition_locator)
assert another_history['previous_version'] == original.definition_locator.definition_id
def test_encoded_naming(self):
"""
Check that using odd characters in block id don't break ability to add and retrieve block.
"""
course_key = CourseLocator(org='guestx', course='contender', run="run", branch=BRANCH_NAME_DRAFT)
parent_locator = BlockUsageLocator(course_key, 'course', block_id="head345679")
chapter_locator = BlockUsageLocator(course_key, 'chapter', block_id="foo.bar_-~:0")
modulestore().create_child(
'anotheruser', parent_locator, 'chapter',
block_id=chapter_locator.block_id,
fields={'display_name': 'chapter 99'},
)
# check that course version changed and course's previous is the other one
new_block = modulestore().get_item(chapter_locator)
assert new_block.location.block_id == 'foo.bar_-~:0'
# hardcode to ensure BUL init didn't change
# now try making that a parent of something
new_payload = "<problem>empty</problem>"
problem_locator = BlockUsageLocator(course_key, 'problem', block_id="prob.bar_-~:99a")
modulestore().create_child(
'anotheruser', chapter_locator, 'problem',
block_id=problem_locator.block_id,
fields={'display_name': 'chapter 99', 'data': new_payload},
)
# check that course version changed and course's previous is the other one
new_block = modulestore().get_item(problem_locator)
assert new_block.location.block_id == problem_locator.block_id
chapter = modulestore().get_item(chapter_locator)
assert problem_locator in version_agnostic(chapter.children)
def test_create_bulk_operations(self):
"""
Test create_item using bulk_operations
"""
# start transaction w/ simple creation
user = random.getrandbits(32)
course_key = CourseLocator('test_org', 'test_transaction', 'test_run')
with modulestore().bulk_operations(course_key):
new_course = modulestore().create_course('test_org', 'test_transaction', 'test_run', user, BRANCH_NAME_DRAFT) # lint-amnesty, pylint: disable=line-too-long
new_course_locator = new_course.id
index_history_info = modulestore().get_course_history_info(new_course.location.course_key)
course_block_prev_version = new_course.previous_version
course_block_update_version = new_course.update_version
assert new_course_locator.version_guid is not None, 'Want to test a definite version'
versionless_course_locator = new_course_locator.version_agnostic()
# positive simple case: no force, add chapter
new_ele = modulestore().create_child(
user, new_course.location, 'chapter',
fields={'display_name': 'chapter 1'},
)
# version info shouldn't change
assert new_ele.update_version == course_block_update_version
assert new_ele.update_version == new_ele.location.version_guid
refetch_course = modulestore().get_course(versionless_course_locator)
assert refetch_course.location.version_guid == new_course.location.version_guid
assert refetch_course.previous_version == course_block_prev_version
assert refetch_course.update_version == course_block_update_version
refetch_index_history_info = modulestore().get_course_history_info(refetch_course.location.course_key)
assert refetch_index_history_info == index_history_info
assert new_ele.location.version_agnostic() in version_agnostic(refetch_course.children)
# try to create existing item
with pytest.raises(DuplicateItemError):
_fail = modulestore().create_child(
user, new_course.location, 'chapter',
block_id=new_ele.location.block_id,
fields={'display_name': 'chapter 2'},
)
# start a new transaction
with modulestore().bulk_operations(course_key):
new_ele = modulestore().create_child(
user, new_course.location, 'chapter',
fields={'display_name': 'chapter 2'},
)
transaction_guid = new_ele.location.version_guid
# ensure force w/ continue gives exception
with pytest.raises(VersionConflictError):
_fail = modulestore().create_child(
user, new_course.location, 'chapter',
fields={'display_name': 'chapter 2'},
force=True
)
# ensure trying to continue the old one gives exception
with pytest.raises(VersionConflictError):
_fail = modulestore().create_child(
user, new_course.location, 'chapter',
fields={'display_name': 'chapter 3'},
)
# add new child to old parent in continued (leave off version_guid)
course_block_locator = new_course.location.version_agnostic()
new_ele = modulestore().create_child(
user, course_block_locator, 'chapter',
fields={'display_name': 'chapter 4'},
)
assert new_ele.update_version != course_block_update_version
assert new_ele.location.version_guid == transaction_guid
# check children, previous_version
refetch_course = modulestore().get_course(versionless_course_locator)
assert new_ele.location.version_agnostic() in version_agnostic(refetch_course.children)
assert refetch_course.previous_version == course_block_update_version
assert refetch_course.update_version == transaction_guid
def test_bulk_ops_org_filtering(self):
"""
Make sure of proper filtering when using bulk operations and
calling get_courses with an 'org' filter
"""
# start transaction w/ simple creation
user = random.getrandbits(32)
course_key = CourseLocator('test_org', 'test_transaction', 'test_run')
with modulestore().bulk_operations(course_key):
modulestore().create_course('test_org', 'test_transaction', 'test_run', user, BRANCH_NAME_DRAFT)
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT, org='test_org')
assert len(courses) == 1
assert courses[0].id.org == course_key.org
assert courses[0].id.course == course_key.course
assert courses[0].id.run == course_key.run
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT, org='other_org')
assert len(courses) == 0
# re-assert after the end of the with scope
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT, org='test_org')
assert len(courses) == 1
assert courses[0].id.org == course_key.org
assert courses[0].id.course == course_key.course
assert courses[0].id.run == course_key.run
courses = modulestore().get_courses(branch=BRANCH_NAME_DRAFT, org='other_org')
assert len(courses) == 0
def test_update_metadata(self):
"""
test updating an items metadata ensuring the definition doesn't version but the course does if it should
"""
locator = BlockUsageLocator(
CourseLocator(org="testx", course="GreekHero", run="run", branch=BRANCH_NAME_DRAFT),
'problem', block_id="problem3_2"
)
problem = modulestore().get_item(locator)
pre_def_id = problem.definition_locator.definition_id
pre_version_guid = problem.location.version_guid
assert pre_def_id is not None
assert pre_version_guid is not None
assert problem.max_attempts != 4, 'Invalidates rest of test'
problem.max_attempts = 4
problem.save() # decache above setting into the kvs
updated_problem = modulestore().update_item(problem, self.user_id)
# check that course version changed and course's previous is the other one
assert updated_problem.definition_locator.definition_id == pre_def_id
assert updated_problem.location.version_guid != pre_version_guid
assert updated_problem.max_attempts == 4
# refetch to ensure original didn't change
original_location = problem.location.map_into_course(CourseLocator(version_guid=pre_version_guid))
problem = modulestore().get_item(original_location)
assert problem.max_attempts != 4, 'original changed'
current_course = modulestore().get_course(locator.course_key)
assert updated_problem.location.version_guid == current_course.location.version_guid
history_info = modulestore().get_course_history_info(current_course.location.course_key)
assert history_info['previous_version'] == pre_version_guid
assert history_info['edited_by'] == self.user_id
def test_update_children(self):
"""
test updating an item's children ensuring the definition doesn't version but the course does if it should
"""
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'chapter', 'chapter3'
)
block = modulestore().get_item(locator)
pre_def_id = block.definition_locator.definition_id
pre_version_guid = block.location.version_guid
# reorder children
assert len(block.children) > 0, 'meaningless test'
moved_child = block.children.pop()
block.save() # decache model changes
updated_problem = modulestore().update_item(block, self.user_id)
# check that course version changed and course's previous is the other one
assert updated_problem.definition_locator.definition_id == pre_def_id
assert updated_problem.location.version_guid != pre_version_guid
assert version_agnostic(updated_problem.children) == version_agnostic(block.children)
assert moved_child not in version_agnostic(updated_problem.children)
locator = locator.course_key.make_usage_key('chapter', "chapter1")
other_block = modulestore().get_item(locator)
other_block.children.append(moved_child)
other_updated = modulestore().update_item(other_block, self.user_id)
assert moved_child.version_agnostic() in version_agnostic(other_updated.children)
def test_update_definition(self):
"""
test updating an item's definition: ensure it gets versioned as well as the course getting versioned
"""
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'course', 'head12345'
)
block = modulestore().get_item(locator)
pre_def_id = block.definition_locator.definition_id
pre_version_guid = block.location.version_guid
block.grading_policy['GRADER'][0]['min_count'] = 13
block.save() # decache model changes
updated_block = modulestore().update_item(block, self.user_id)
assert updated_block.definition_locator.definition_id != pre_def_id
assert updated_block.location.version_guid != pre_version_guid
assert updated_block.grading_policy['GRADER'][0]['min_count'] == 13
def test_update_manifold(self):
"""
Test updating metadata, children, and definition in a single call ensuring all the versioning occurs
"""
locator = BlockUsageLocator(
CourseLocator('testx', 'GreekHero', 'run', branch=BRANCH_NAME_DRAFT),
'problem', block_id='problem1'
)
original = modulestore().get_item(locator)
# first add 2 children to the course for the update to manipulate
locator = BlockUsageLocator(
CourseLocator('guestx', 'contender', 'run', branch=BRANCH_NAME_DRAFT),
'course', block_id="head345679"
)
category = 'problem'
new_payload = "<problem>empty</problem>"
modulestore().create_child(
'test_update_manifold', locator, category,
fields={'display_name': 'problem 1', 'data': new_payload},
)
another_payload = "<problem>not empty</problem>"
modulestore().create_child(
'test_update_manifold', locator, category,
fields={'display_name': 'problem 2', 'data': another_payload},
definition_locator=original.definition_locator,
)
# pylint: disable=protected-access
modulestore()._clear_cache()
# now begin the test
block = modulestore().get_item(locator)
pre_def_id = block.definition_locator.definition_id
pre_version_guid = block.location.version_guid
assert block.grading_policy['GRADER'][0]['min_count'] != 13
block.grading_policy['GRADER'][0]['min_count'] = 13
block.children = block.children[1:] + [block.children[0]]
block.advertised_start = "Soon"
block.save() # decache model changes
updated_block = modulestore().update_item(block, self.user_id)
assert updated_block.definition_locator.definition_id != pre_def_id
assert updated_block.location.version_guid != pre_version_guid
assert updated_block.grading_policy['GRADER'][0]['min_count'] == 13
assert updated_block.children[0].version_agnostic() == block.children[0].version_agnostic()
assert updated_block.advertised_start == 'Soon'
def test_delete_item(self):
course = self.create_course_for_deletion()
with pytest.raises(ValueError):
modulestore().delete_item(course.location, self.user_id)
reusable_location = course.id.version_agnostic().for_branch(BRANCH_NAME_DRAFT)
# delete a leaf
problems = modulestore().get_items(reusable_location, qualifiers={'category': 'problem'})
locn_to_del = problems[0].location
new_course_loc = modulestore().delete_item(locn_to_del, self.user_id)
deleted = locn_to_del.version_agnostic()
assert not modulestore().has_item(deleted)
with pytest.raises(VersionConflictError):
modulestore().has_item(locn_to_del)
with pytest.raises(ValueError):
modulestore().delete_item(deleted, self.user_id)
assert modulestore().has_item(locn_to_del.course_agnostic())
assert new_course_loc.version_guid != course.location.version_guid
# delete a subtree
nodes = modulestore().get_items(reusable_location, qualifiers={'category': 'chapter'})
new_course_loc = modulestore().delete_item(nodes[0].location, self.user_id)
# check subtree
def check_subtree(node):
"""
Check contents of subtree recursively
"""
if node:
node_loc = node.location
assert not modulestore().has_item(node_loc.version_agnostic())
assert modulestore().has_item(node_loc.course_agnostic())
if node.has_children:
for sub in node.get_children():
check_subtree(sub)
check_subtree(nodes[0])
def create_course_for_deletion(self):
"""
Create a course we can delete
"""
course = modulestore().create_course('nihilx', 'deletion', 'run', TEST_USER_ID, BRANCH_NAME_DRAFT)
root = course.location.version_agnostic().for_branch(BRANCH_NAME_DRAFT)
for _ in range(4):
self.create_subtree_for_deletion(root, ['chapter', 'vertical', 'problem'])
return modulestore().get_item(root)
def create_subtree_for_deletion(self, parent, category_queue):
"""
Create a subtree in the tb deleted course
"""
if not category_queue:
return
node = modulestore().create_child(
TEST_USER_ID, parent.version_agnostic(), category_queue[0]
)
node_loc = node.location.map_into_course(parent.course_key)
for _ in range(4):
self.create_subtree_for_deletion(node_loc, category_queue[1:])
def test_split_modulestore_create_child_with_position(self):
"""
This test is designed to hit a specific set of use cases having to do with
the child positioning logic found in split_mongo/split.py:create_child()
"""
# Set up the split module store
store = modulestore()
user = random.getrandbits(32)
course_key = CourseLocator('test_org', 'test_transaction', 'test_run')
with store.bulk_operations(course_key):
new_course = store.create_course('test_org', 'test_transaction', 'test_run', user, BRANCH_NAME_DRAFT)
new_course_locator = new_course.id
versionless_course_locator = new_course_locator.version_agnostic()
first_child = store.create_child(
self.user_id,
new_course.location,
"chapter"
)
refetch_course = store.get_course(versionless_course_locator)
second_child = store.create_child(
self.user_id,
refetch_course.location,
"chapter",
position=0
)
# First child should have been moved to second position, and better child takes the lead
refetch_course = store.get_course(versionless_course_locator)
children = refetch_course.get_children()
assert str(children[1].location) == str(first_child.location)
assert str(children[0].location) == str(second_child.location)
# Clean up the data so we don't break other tests which apparently expect a particular state
store.delete_course(refetch_course.id, user)
class TestCourseCreation(SplitModuleTest):
"""
Test create_course
"""
def test_simple_creation(self):
"""
The simplest case but probing all expected results from it.
"""
# Oddly getting differences of 200nsec
new_course = modulestore().create_course(
'test_org', 'test_course', 'test_run', TEST_USER_ID, BRANCH_NAME_DRAFT
)
new_locator = new_course.location
# check index entry
index_info = modulestore().get_course_index_info(new_locator.course_key)
assert index_info['org'] == 'test_org'
assert index_info['edited_by'] == TEST_USER_ID
# check structure info
structure_info = modulestore().get_course_history_info(new_locator.course_key)
assert structure_info['original_version'] == index_info['versions'][BRANCH_NAME_DRAFT]
assert structure_info['previous_version'] is None
assert structure_info['edited_by'] == TEST_USER_ID
# check the returned course object
assert isinstance(new_course, CourseBlock)
assert new_course.category == 'course'
assert not new_course.show_calculator
assert new_course.allow_anonymous
assert len(new_course.children) == 0
assert new_course.edited_by == TEST_USER_ID
assert len(new_course.grading_policy['GRADER']) == 4
self.assertDictEqual(new_course.grade_cutoffs, {"Pass": 0.5})
def test_cloned_course(self):
"""
Test making a course which points to an existing draft and published but not making any changes to either.
"""
original_locator = CourseLocator(org='testx', course='wonderful', run="run", branch=BRANCH_NAME_DRAFT)
original_index = modulestore().get_course_index_info(original_locator)
new_draft = modulestore().create_course(
'best', 'leech', 'leech_run', TEST_OTHER_USER_ID, BRANCH_NAME_DRAFT,
versions_dict=original_index['versions'])
new_draft_locator = new_draft.location
self.assertRegex(new_draft_locator.org, 'best')
# the edited_by and other meta fields on the new course will be the original author not this one
assert new_draft.edited_by == TEST_USER_ID
assert new_draft_locator.version_guid == original_index['versions'][BRANCH_NAME_DRAFT]
# however the edited_by and other meta fields on course_index will be this one
new_index = modulestore().get_course_index_info(new_draft_locator.course_key)
assert new_index['edited_by'] == TEST_OTHER_USER_ID
new_published_locator = new_draft_locator.course_key.for_branch(BRANCH_NAME_PUBLISHED)
new_published = modulestore().get_course(new_published_locator)
assert new_published.edited_by == TEST_USER_ID
assert new_published.location.version_guid == original_index['versions'][BRANCH_NAME_PUBLISHED]
# changing this course will not change the original course
# using new_draft.location will insert the chapter under the course root
new_item = modulestore().create_child(
TEST_OTHER_USER_ID, new_draft.location, 'chapter',
fields={'display_name': 'new chapter'}
)
new_draft_locator = new_draft_locator.course_key.version_agnostic()
new_index = modulestore().get_course_index_info(new_draft_locator)
assert new_index['versions'][BRANCH_NAME_DRAFT] != original_index['versions'][BRANCH_NAME_DRAFT]
new_draft = modulestore().get_course(new_draft_locator)
assert new_item.edited_by == TEST_OTHER_USER_ID
assert new_item.location.version_guid != original_index['versions'][BRANCH_NAME_DRAFT]
assert new_draft.location.version_guid != original_index['versions'][BRANCH_NAME_DRAFT]
structure_info = modulestore().get_course_history_info(new_draft_locator)
assert structure_info['edited_by'] == TEST_OTHER_USER_ID
original_course = modulestore().get_course(original_locator)
assert original_course.location.version_guid == original_index['versions'][BRANCH_NAME_DRAFT]
def test_derived_course(self):
"""
Create a new course which overrides metadata and course_data
"""
original_locator = CourseLocator(org='guestx', course='contender', run="run", branch=BRANCH_NAME_DRAFT)
original = modulestore().get_course(original_locator)
original_index = modulestore().get_course_index_info(original_locator)
fields = {
'grading_policy': original.grading_policy,
'display_name': 'Derivative',
}
fields['grading_policy']['GRADE_CUTOFFS'] = {'A': .9, 'B': .8, 'C': .65}
new_draft = modulestore().create_course(
'counter', 'leech', 'leech_run', TEST_OTHER_USER_ID, BRANCH_NAME_DRAFT,
versions_dict={BRANCH_NAME_DRAFT: original_index['versions'][BRANCH_NAME_DRAFT]},
fields=fields
)
new_draft_locator = new_draft.location
self.assertRegex(new_draft_locator.org, 'counter')
# the edited_by and other meta fields on the new course will be the original author not this one
assert new_draft.edited_by == TEST_OTHER_USER_ID
assert new_draft_locator.version_guid != original_index['versions'][BRANCH_NAME_DRAFT]
# however the edited_by and other meta fields on course_index will be this one
new_index = modulestore().get_course_index_info(new_draft_locator.course_key)
assert new_index['edited_by'] == TEST_OTHER_USER_ID
assert new_draft.display_name == fields['display_name']
self.assertDictEqual(
new_draft.grading_policy['GRADE_CUTOFFS'],
fields['grading_policy']['GRADE_CUTOFFS']
)
def test_update_course_index(self):
"""
Test the versions pointers. NOTE: you can change the org, course, or other things, but
it's not clear how you'd find them again or associate them w/ existing student history since
we use course_key so many places as immutable.
"""
locator = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
course_info = modulestore().get_course_index_info(locator)
# an allowed but not necessarily recommended way to revert the draft version
head_course = modulestore().get_course(locator)
versions = course_info['versions']
versions[BRANCH_NAME_DRAFT] = head_course.previous_version
modulestore().update_course_index(None, course_info)
course = modulestore().get_course(locator)
assert course.location.version_guid == versions[BRANCH_NAME_DRAFT]
# an allowed but not recommended way to publish a course
versions[BRANCH_NAME_PUBLISHED] = versions[BRANCH_NAME_DRAFT]
modulestore().update_course_index(None, course_info)
course = modulestore().get_course(locator.for_branch(BRANCH_NAME_PUBLISHED))
assert course.location.version_guid == versions[BRANCH_NAME_DRAFT]
def test_create_with_root(self):
"""
Test create_course with a specified root id and category
"""
user = random.getrandbits(32)
new_course = modulestore().create_course(
'test_org', 'test_transaction', 'test_run', user, BRANCH_NAME_DRAFT,
root_block_id='top', root_category='chapter'
)
assert new_course.location.block_id == 'top'
assert new_course.category == 'chapter'
# look at db to verify
db_structure = modulestore().db_connection.get_structure(
new_course.location.as_object_id(new_course.location.version_guid)
)
assert db_structure is not None, "Didn't find course"
assert BlockKey('course', 'course') not in db_structure['blocks']
assert BlockKey('chapter', 'top') in db_structure['blocks']
assert db_structure['blocks'][BlockKey('chapter', 'top')].block_type == 'chapter'
def test_create_id_dupe(self):
"""
Test create_course rejects duplicate id
"""
user = random.getrandbits(32)
courses = modulestore().get_courses(BRANCH_NAME_DRAFT)
with pytest.raises(DuplicateCourseError):
dupe_course_key = courses[0].location.course_key
modulestore().create_course(
dupe_course_key.org, dupe_course_key.course, dupe_course_key.run, user, BRANCH_NAME_DRAFT
)
def test_bulk_ops_get_courses(self):
"""
Test get_courses when some are created, updated, and deleted w/in a bulk operation
"""
# create 3 courses before bulk operation
split_store = modulestore()
user = random.getrandbits(32)
to_be_created = split_store.make_course_key('new', 'created', 'course')
with split_store.bulk_operations(to_be_created):
split_store.create_course(
to_be_created.org, to_be_created.course, to_be_created.run, user, master_branch=BRANCH_NAME_DRAFT,
)
modified_course_loc = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
with split_store.bulk_operations(modified_course_loc):
modified_course = modulestore().get_course(modified_course_loc)
modified_course.advertised_start = 'coming soon to a theater near you'
split_store.update_item(modified_course, user)
to_be_deleted = split_store.make_course_key("guestx", "contender", "run")
with split_store.bulk_operations(to_be_deleted):
split_store.delete_course(to_be_deleted, user)
# now get_courses
courses = split_store.get_courses(BRANCH_NAME_DRAFT)
assert len(courses) == 3
course_ids = [course.id.for_branch(None) for course in courses]
assert to_be_deleted not in course_ids
assert to_be_created in course_ids
fetched_modified = [course for course in courses if course.id == modified_course_loc][0]
assert fetched_modified.advertised_start == modified_course.advertised_start
class TestInheritance(SplitModuleTest):
"""
Test the metadata inheritance mechanism.
"""
def test_inheritance(self):
"""
The actual test
"""
# Note, not testing value where defined (course) b/c there's no
# defined accessor for it on CourseBlock.
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'problem', 'problem3_2'
)
node = modulestore().get_item(locator)
# inherited
assert node.graceperiod == datetime.timedelta(hours=2)
locator = BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'problem', 'problem1'
)
node = modulestore().get_item(locator)
# overridden
assert node.graceperiod == datetime.timedelta(hours=4)
def test_inheritance_not_saved(self):
"""
Was saving inherited settings with updated blocks causing inheritance to be sticky
"""
# set on parent, retrieve child, verify setting
chapter = modulestore().get_item(
BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'chapter', 'chapter3' # lint-amnesty, pylint: disable=line-too-long
)
)
problem = modulestore().get_item(
BlockUsageLocator(
CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT), 'problem', 'problem3_2' # lint-amnesty, pylint: disable=line-too-long
)
)
assert not problem.visible_to_staff_only
chapter.visible_to_staff_only = True
modulestore().update_item(chapter, self.user_id)
problem = modulestore().get_item(problem.location.version_agnostic())
assert problem.visible_to_staff_only
# unset on parent, retrieve child, verify unset
chapter = modulestore().get_item(chapter.location.version_agnostic())
del chapter.visible_to_staff_only
modulestore().update_item(chapter, self.user_id)
problem = modulestore().get_item(problem.location.version_agnostic())
assert not problem.visible_to_staff_only
def test_dynamic_inheritance(self):
"""
Test inheritance for create_item with and without a parent pointer
"""
course_key = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
chapter = modulestore().get_item(BlockUsageLocator(course_key, 'chapter', 'chapter3'))
chapter.visible_to_staff_only = True
orphan_problem = modulestore().create_item(self.user_id, course_key, 'problem')
assert not orphan_problem.visible_to_staff_only
parented_problem = modulestore().create_child(self.user_id, chapter.location.version_agnostic(), 'problem') # lint-amnesty, pylint: disable=unused-variable
# FIXME LMS-11376
# self.assertTrue(parented_problem.visible_to_staff_only)
orphan_problem = modulestore().create_xblock(chapter.runtime, course_key, 'problem')
assert not orphan_problem.visible_to_staff_only
parented_problem = modulestore().create_xblock(chapter.runtime, course_key, 'problem', parent_xblock=chapter)
# FIXME LMS-11376
# self.assertTrue(parented_problem.visible_to_staff_only)
class TestPublish(SplitModuleTest):
"""
Test the publishing api
"""
def test_publish_safe(self):
"""
Test the standard patterns: publish to new branch, revise and publish
"""
source_course = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
dest_course = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_PUBLISHED)
head = source_course.make_usage_key('course', "head12345")
chapter1 = source_course.make_usage_key('chapter', 'chapter1')
chapter2 = source_course.make_usage_key('chapter', 'chapter2')
chapter3 = source_course.make_usage_key('chapter', 'chapter3')
modulestore().copy(self.user_id, source_course, dest_course, [head], [chapter2, chapter3])
expected = [BlockKey.from_usage_key(head), BlockKey.from_usage_key(chapter1)]
unexpected = [
BlockKey.from_usage_key(chapter2),
BlockKey.from_usage_key(chapter3),
BlockKey("problem", "problem1"),
BlockKey("problem", "problem3_2")
]
self._check_course(source_course, dest_course, expected, unexpected)
# add a child under chapter1
new_block = modulestore().create_child(
self.user_id, chapter1, "sequential",
fields={'display_name': 'new sequential'},
)
# remove chapter1 from expected b/c its pub'd version != the source anymore since source changed
expected.remove(BlockKey.from_usage_key(chapter1))
# check that it's not in published course
with pytest.raises(ItemNotFoundError):
modulestore().get_item(new_block.location.map_into_course(dest_course))
# publish it
modulestore().copy(self.user_id, source_course, dest_course, [new_block.location], None)
expected.append(BlockKey.from_usage_key(new_block.location))
# check that it is in the published course and that its parent is the chapter
pub_block = modulestore().get_item(new_block.location.map_into_course(dest_course))
assert modulestore().get_parent_location(pub_block.location).block_id == chapter1.block_id
# ensure intentionally orphaned blocks work (e.g., course_info)
new_block = modulestore().create_item(
self.user_id, source_course, "course_info", block_id="handouts"
)
# publish it
modulestore().copy(self.user_id, source_course, dest_course, [new_block.location], None)
expected.append(BlockKey.from_usage_key(new_block.location))
# check that it is in the published course (no error means it worked)
pub_block = modulestore().get_item(new_block.location.map_into_course(dest_course))
self._check_course(source_course, dest_course, expected, unexpected)
def test_exceptions(self):
"""
Test the exceptions which preclude successful publication
"""
source_course = CourseLocator(org='testx', course='GreekHero', run="run", branch=BRANCH_NAME_DRAFT)
# destination does not exist
destination_course = CourseLocator(org='fake', course='Unknown', run="run", branch=BRANCH_NAME_PUBLISHED)
head = source_course.make_usage_key('course', "head12345")
chapter3 = source_course.make_usage_key('chapter', 'chapter3')
problem1 = source_course.make_usage_key('problem', 'problem1')
with pytest.raises(ItemNotFoundError):
modulestore().copy(self.user_id, source_course, destination_course, [chapter3], None)
# publishing into a new branch w/o publishing the root
destination_course = CourseLocator(org='testx', course='GreekHero', run='run', branch=BRANCH_NAME_PUBLISHED)
with pytest.raises(ItemNotFoundError):
modulestore().copy(self.user_id, source_course, destination_course, [chapter3], None)
# publishing a subdag w/o the parent already in course
modulestore().copy(self.user_id, source_course, destination_course, [head], [chapter3])
with pytest.raises(ItemNotFoundError):
modulestore().copy(self.user_id, source_course, destination_course, [problem1], [])
def test_move_delete(self):
"""
Test publishing moves and deletes.
"""
source_course = CourseLocator(org='testx', course='GreekHero', run='run', branch=BRANCH_NAME_DRAFT)
dest_course = CourseLocator(org='testx', course='GreekHero', run='run', branch=BRANCH_NAME_PUBLISHED)
head = source_course.make_usage_key('course', "head12345")
chapter2 = source_course.make_usage_key('chapter', 'chapter2')
problem1 = source_course.make_usage_key('problem', 'problem1')
modulestore().copy(self.user_id, source_course, dest_course, [head], [chapter2])
expected = [
BlockKey("course", "head12345"),
BlockKey("chapter", "chapter1"),
BlockKey("chapter", "chapter3"),
BlockKey("problem", "problem1"),
BlockKey("problem", "problem3_2"),
]
self._check_course(source_course, dest_course, expected, [BlockKey("chapter", "chapter2")])
# now move problem1 and delete problem3_2
chapter1 = modulestore().get_item(source_course.make_usage_key("chapter", "chapter1"))
chapter3 = modulestore().get_item(source_course.make_usage_key("chapter", "chapter3"))
chapter1.children.append(problem1)
chapter3.children.remove(problem1.map_into_course(chapter3.location.course_key))
modulestore().delete_item(source_course.make_usage_key("problem", "problem3_2"), self.user_id)
modulestore().copy(self.user_id, source_course, dest_course, [head], [chapter2])
expected = [
BlockKey("course", "head12345"),
BlockKey("chapter", "chapter1"),
BlockKey("chapter", "chapter3"),
BlockKey("problem", "problem1")
]
self._check_course(source_course, dest_course, expected, [BlockKey("chapter", "chapter2"), BlockKey("problem", "problem3_2")]) # lint-amnesty, pylint: disable=line-too-long
def _check_course(self, source_course_loc, dest_course_loc, expected_blocks, unexpected_blocks):
"""
Check that the course has the expected blocks and does not have the unexpected blocks
"""
history_info = modulestore().get_course_history_info(dest_course_loc)
assert history_info['edited_by'] == self.user_id
for expected in expected_blocks:
source = modulestore().get_item(source_course_loc.make_usage_key(expected.type, expected.id))
pub_copy = modulestore().get_item(dest_course_loc.make_usage_key(expected.type, expected.id))
# everything except previous_version & children should be the same
assert source.category == pub_copy.category
assert source.update_version == pub_copy.source_version,\
f"Versions don't match for {expected}: {source.update_version} != {pub_copy.update_version}"
assert self.user_id == pub_copy.edited_by,\
f'{pub_copy.location} edited_by {pub_copy.edited_by} not {self.user_id}'
for field in source.fields.values():
if field.name == 'children':
self._compare_children(field.read_from(source), field.read_from(pub_copy), unexpected_blocks)
elif isinstance(field, (Reference, ReferenceList, ReferenceValueDict)):
self.assertReferenceEqual(field.read_from(source), field.read_from(pub_copy))
else:
assert field.read_from(source) == field.read_from(pub_copy)
for unexp in unexpected_blocks:
with pytest.raises(ItemNotFoundError):
modulestore().get_item(dest_course_loc.make_usage_key(unexp.type, unexp.id))
def assertReferenceEqual(self, expected, actual): # lint-amnesty, pylint: disable=missing-function-docstring
if isinstance(expected, BlockUsageLocator):
expected = BlockKey.from_usage_key(expected)
actual = BlockKey.from_usage_key(actual)
elif isinstance(expected, list):
expected = [BlockKey.from_usage_key(key) for key in expected]
actual = [BlockKey.from_usage_key(key) for key in actual]
elif isinstance(expected, dict):
expected = {key: BlockKey.from_usage_key(val) for (key, val) in expected}
actual = {key: BlockKey.from_usage_key(val) for (key, val) in actual}
assert expected == actual
def _compare_children(self, source_children, dest_children, unexpected):
"""
Ensure dest_children == source_children minus unexpected
"""
source_block_keys = [
src_key
for src_key
in (BlockKey.from_usage_key(src) for src in source_children)
if src_key not in unexpected
]
dest_block_keys = [BlockKey.from_usage_key(dest) for dest in dest_children]
for unexp in unexpected:
assert unexp not in dest_block_keys
assert source_block_keys == dest_block_keys
class TestSchema(SplitModuleTest):
"""
Test the db schema (and possibly eventually migrations?)
"""
def test_schema(self):
"""
Test that the schema is set in each document
"""
db_connection = modulestore().db_connection
for collection in [db_connection.course_index, db_connection.structures, db_connection.definitions]:
assert collection.count_documents({'schema_version': {'$exists': False}}) == 0, \
f'{collection.name} has records without schema_version'
assert collection.count_documents({'schema_version': {'$ne': SplitMongoModuleStore.SCHEMA_VERSION}}) == 0, \
f'{collection.name} has records with wrong schema_version'
# ===========================================
def modulestore():
"""
Mock the django dependent global modulestore function to disentangle tests from django
"""
def load_function(engine_path):
"""
Load the given engine
"""
module_path, _, name = engine_path.rpartition('.')
return getattr(import_module(module_path), name)
if SplitModuleTest.modulestore is None:
class_ = load_function(SplitModuleTest.MODULESTORE['ENGINE'])
options = {}
options.update(SplitModuleTest.MODULESTORE['OPTIONS'])
options['render_template'] = render_to_template_mock
# lint-amnesty, pylint: disable=bad-option-value, star-args
SplitModuleTest.modulestore = class_(
None, # contentstore
SplitModuleTest.MODULESTORE['DOC_STORE_CONFIG'],
**options
)
SplitModuleTest.bootstrapDB(SplitModuleTest.modulestore)
return SplitModuleTest.modulestore
# pylint: disable=unused-argument
def render_to_template_mock(*args):
pass