replaced unittest assertions pytest assertions (#26564)

This commit is contained in:
Aarif
2021-02-19 16:04:32 +05:00
committed by GitHub
parent 7d5f9b2016
commit ba16e05899
22 changed files with 626 additions and 795 deletions

View File

@@ -138,7 +138,7 @@ class ContentLibrariesRestApiTest(APITestCase):
Like python 2's assertDictContainsSubset, but with the arguments in the
correct order.
"""
self.assertGreaterEqual(big_dict.items(), subset_dict.items())
assert big_dict.items() >= subset_dict.items()
# API helpers
@@ -147,10 +147,8 @@ class ContentLibrariesRestApiTest(APITestCase):
Call a REST API
"""
response = getattr(self.client, method)(url, data, format="json")
self.assertEqual(
response.status_code, expect_response,
"Unexpected response code {}:\n{}".format(response.status_code, getattr(response, 'data', '(no data)')),
)
assert response.status_code == expect_response,\
'Unexpected response code {}:\n{}'.format(response.status_code, getattr(response, 'data', '(no data)'))
return response.data
@contextmanager
@@ -323,10 +321,8 @@ class ContentLibrariesRestApiTest(APITestCase):
file_handle = BytesIO(content)
url = URL_LIB_BLOCK_ASSET_FILE.format(block_key=block_key, file_name=file_name)
response = self.client.put(url, data={"content": file_handle})
self.assertEqual(
response.status_code, expect_response,
"Unexpected response code {}:\n{}".format(response.status_code, getattr(response, 'data', '(no data)')),
)
assert response.status_code == expect_response,\
'Unexpected response code {}:\n{}'.format(response.status_code, getattr(response, 'data', '(no data)'))
def _delete_library_block_asset(self, block_key, file_name, expect_response=200):
""" Delete a static asset file. """

View File

@@ -131,17 +131,17 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
lib = self._create_library(
slug=slug, title="A Test Library", description="Just Testing", library_type=start_type,
)
self.assertEqual(lib['type'], start_type)
assert lib['type'] == start_type
for block_type, block_slug in xblock_specs:
self._add_block_to_library(lib['id'], block_type, block_slug)
self._commit_library_changes(lib['id'])
result = self._update_library(lib['id'], type=target_type, expect_response=expect_response)
if expect_response == 200:
self.assertEqual(result['type'], target_type)
self.assertIn('type', result)
assert result['type'] == target_type
assert 'type' in result
else:
lib = self._get_library(lib['id'])
self.assertEqual(lib['type'], start_type)
assert lib['type'] == start_type
def test_no_convert_on_unpublished(self):
"""
@@ -153,7 +153,7 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
)
self._add_block_to_library(lib['id'], "video", 'vid-block')
result = self._update_library(lib['id'], type=VIDEO, expect_response=400)
self.assertIn('type', result)
assert 'type' in result
def test_no_convert_on_pending_deletes(self):
"""
@@ -167,7 +167,7 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
self._commit_library_changes(lib['id'])
self._delete_library_block(block['id'])
result = self._update_library(lib['id'], type=VIDEO, expect_response=400)
self.assertIn('type', result)
assert 'type' in result
def test_library_validation(self):
"""
@@ -195,30 +195,30 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
lib1['has_unpublished_deletes'] = lib2['has_unpublished_deletes'] = None
result = self._list_libraries()
self.assertEqual(len(result), 2)
self.assertIn(lib1, result)
self.assertIn(lib2, result)
assert len(result) == 2
assert lib1 in result
assert lib2 in result
result = self._list_libraries({'pagination': 'true'})
self.assertEqual(len(result['results']), 2)
self.assertEqual(result['next'], None)
assert len(result['results']) == 2
assert result['next'] is None
# Create another library which causes number of libraries to exceed the page size
self._create_library(slug="some-slug-3", title="Existing Library")
# Verify that if `pagination` param isn't sent, API still honors the max page size.
# This is for maintaining compatibility with older non pagination-aware clients.
result = self._list_libraries()
self.assertEqual(len(result), 2)
assert len(result) == 2
# Pagination enabled:
# Verify total elements and valid 'next' in page 1
result = self._list_libraries({'pagination': 'true'})
self.assertEqual(len(result['results']), 2)
self.assertIn('page=2', result['next'])
self.assertIn('pagination=true', result['next'])
assert len(result['results']) == 2
assert 'page=2' in result['next']
assert 'pagination=true' in result['next']
# Verify total elements and null 'next' in page 2
result = self._list_libraries({'pagination': 'true', 'page': '2'})
self.assertEqual(len(result['results']), 1)
self.assertEqual(result['next'], None)
assert len(result['results']) == 1
assert result['next'] is None
@ddt.data(True, False)
def test_library_filters(self, is_indexing_enabled):
@@ -251,19 +251,17 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
org=f'org-test-{suffix}',
)
self.assertEqual(len(self._list_libraries()), 5)
self.assertEqual(len(self._list_libraries({'org': f'org-test-{suffix}'})), 2)
self.assertEqual(len(self._list_libraries({'text_search': f'test-lib-filter-{suffix}'})), 2)
self.assertEqual(len(self._list_libraries({'text_search': f'test-lib-filter-{suffix}', 'type': VIDEO})), 1)
self.assertEqual(len(self._list_libraries({'text_search': f'library-title-{suffix}'})), 4)
self.assertEqual(len(self._list_libraries({'text_search': f'library-title-{suffix}', 'type': VIDEO})), 2)
self.assertEqual(len(self._list_libraries({'text_search': f'bar-{suffix}'})), 2)
self.assertEqual(len(self._list_libraries({'text_search': f'org-test-{suffix}'})), 2)
self.assertEqual(
len(self._list_libraries({'org': f'org-test-{suffix}', 'text_search': f'library-title-{suffix}-4'})),
1,
)
self.assertEqual(len(self._list_libraries({'type': VIDEO})), 3)
assert len(self._list_libraries()) == 5
assert len(self._list_libraries({'org': f'org-test-{suffix}'})) == 2
assert len(self._list_libraries({'text_search': f'test-lib-filter-{suffix}'})) == 2
assert len(self._list_libraries({'text_search': f'test-lib-filter-{suffix}', 'type': VIDEO})) == 1
assert len(self._list_libraries({'text_search': f'library-title-{suffix}'})) == 4
assert len(self._list_libraries({'text_search': f'library-title-{suffix}', 'type': VIDEO})) == 2
assert len(self._list_libraries({'text_search': f'bar-{suffix}'})) == 2
assert len(self._list_libraries({'text_search': f'org-test-{suffix}'})) == 2
assert len(self._list_libraries({'org': f'org-test-{suffix}',
'text_search': f'library-title-{suffix}-4'})) == 1
assert len(self._list_libraries({'type': VIDEO})) == 3
# General Content Library XBlock tests:
@@ -274,10 +272,10 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
"""
lib = self._create_library(slug="testlib1", title="A Test Library", description="Testing XBlocks")
lib_id = lib["id"]
self.assertEqual(lib["has_unpublished_changes"], False)
assert lib['has_unpublished_changes'] is False
# A library starts out empty:
self.assertEqual(self._get_library_blocks(lib_id), [])
assert self._get_library_blocks(lib_id) == []
# Add a 'problem' XBlock to the library:
block_data = self._add_block_to_library(lib_id, "problem", "problem1")
@@ -290,23 +288,23 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
block_id = block_data["id"]
# Confirm that the result contains a definition key, but don't check its value,
# which for the purposes of these tests is an implementation detail.
self.assertIn("def_key", block_data)
assert 'def_key' in block_data
# now the library should contain one block and have unpublished changes:
self.assertEqual(self._get_library_blocks(lib_id), [block_data])
self.assertEqual(self._get_library(lib_id)["has_unpublished_changes"], True)
assert self._get_library_blocks(lib_id) == [block_data]
assert self._get_library(lib_id)['has_unpublished_changes'] is True
# Publish the changes:
self._commit_library_changes(lib_id)
self.assertEqual(self._get_library(lib_id)["has_unpublished_changes"], False)
assert self._get_library(lib_id)['has_unpublished_changes'] is False
# And now the block information should also show that block has no unpublished changes:
block_data["has_unpublished_changes"] = False
self.assertDictContainsEntries(self._get_library_block(block_id), block_data)
self.assertEqual(self._get_library_blocks(lib_id), [block_data])
assert self._get_library_blocks(lib_id) == [block_data]
# Now update the block's OLX:
orig_olx = self._get_library_block_olx(block_id)
self.assertIn("<problem", orig_olx)
assert '<problem' in orig_olx
new_olx = """
<problem display_name="New Multi Choice Question" max_attempts="5">
<multiplechoiceresponse>
@@ -323,7 +321,7 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
""".strip()
self._set_library_block_olx(block_id, new_olx)
# now reading it back, we should get that exact OLX (no change to whitespace etc.):
self.assertEqual(self._get_library_block_olx(block_id), new_olx)
assert self._get_library_block_olx(block_id) == new_olx
# And the display name and "unpublished changes" status of the block should be updated:
self.assertDictContainsEntries(self._get_library_block(block_id), {
"display_name": "New Multi Choice Question",
@@ -332,27 +330,27 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
# Now view the XBlock's student_view (including draft changes):
fragment = self._render_block_view(block_id, "student_view")
self.assertIn("resources", fragment)
self.assertIn("Blockstore is designed to store.", fragment["content"])
assert 'resources' in fragment
assert 'Blockstore is designed to store.' in fragment['content']
# Also call a handler to make sure that's working:
handler_url = self._get_block_handler_url(block_id, "xmodule_handler") + "problem_get"
problem_get_response = self.client.get(handler_url)
self.assertEqual(problem_get_response.status_code, 200)
self.assertIn("You have used 0 of 5 attempts", problem_get_response.content.decode('utf-8'))
assert problem_get_response.status_code == 200
assert 'You have used 0 of 5 attempts' in problem_get_response.content.decode('utf-8')
# Now delete the block:
self.assertEqual(self._get_library(lib_id)["has_unpublished_deletes"], False)
assert self._get_library(lib_id)['has_unpublished_deletes'] is False
self._delete_library_block(block_id)
# Confirm it's deleted:
self._render_block_view(block_id, "student_view", expect_response=404)
self._get_library_block(block_id, expect_response=404)
self.assertEqual(self._get_library(lib_id)["has_unpublished_deletes"], True)
assert self._get_library(lib_id)['has_unpublished_deletes'] is True
# Now revert all the changes back until the last publish:
self._revert_library_changes(lib_id)
self.assertEqual(self._get_library(lib_id)["has_unpublished_deletes"], False)
self.assertEqual(self._get_library_block_olx(block_id), orig_olx)
assert self._get_library(lib_id)['has_unpublished_deletes'] is False
assert self._get_library_block_olx(block_id) == orig_olx
# fin
@@ -370,24 +368,24 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
self._add_block_to_library(lib["id"], "problem", "problem2", parent_block=block2["id"])
result = self._get_library_blocks(lib["id"])
self.assertEqual(len(result), 2)
self.assertIn(block1, result)
assert len(result) == 2
assert block1 in result
result = self._get_library_blocks(lib["id"], {'pagination': 'true'})
self.assertEqual(len(result['results']), 2)
self.assertEqual(result['next'], None)
assert len(result['results']) == 2
assert result['next'] is None
self._add_block_to_library(lib["id"], "problem", "problem3")
# Test pagination
result = self._get_library_blocks(lib["id"])
self.assertEqual(len(result), 3)
assert len(result) == 3
result = self._get_library_blocks(lib["id"], {'pagination': 'true'})
self.assertEqual(len(result['results']), 2)
self.assertIn('page=2', result['next'])
self.assertIn('pagination=true', result['next'])
assert len(result['results']) == 2
assert 'page=2' in result['next']
assert 'pagination=true' in result['next']
result = self._get_library_blocks(lib["id"], {'pagination': 'true', 'page': '2'})
self.assertEqual(len(result['results']), 1)
self.assertEqual(result['next'], None)
assert len(result['results']) == 1
assert result['next'] is None
@ddt.data(True, False)
def test_library_blocks_filters(self, is_indexing_enabled):
@@ -404,19 +402,17 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
self._set_library_block_olx(block1["id"], "<problem display_name=\"DisplayName\"></problem>")
self.assertEqual(len(self._get_library_blocks(lib["id"])), 5)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Foo'})), 2)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Display'})), 1)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Video'})), 1)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Foo', 'block_type': 'video'})), 0)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'text_search': 'Baz', 'block_type': 'video'})), 1)
self.assertEqual(len(
self._get_library_blocks(lib["id"], {'text_search': 'Baz', 'block_type': ['video', 'html']})),
2,
)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'block_type': 'video'})), 1)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'block_type': 'problem'})), 3)
self.assertEqual(len(self._get_library_blocks(lib["id"], {'block_type': 'squirrel'})), 0)
assert len(self._get_library_blocks(lib['id'])) == 5
assert len(self._get_library_blocks(lib['id'], {'text_search': 'Foo'})) == 2
assert len(self._get_library_blocks(lib['id'], {'text_search': 'Display'})) == 1
assert len(self._get_library_blocks(lib['id'], {'text_search': 'Video'})) == 1
assert len(self._get_library_blocks(lib['id'], {'text_search': 'Foo', 'block_type': 'video'})) == 0
assert len(self._get_library_blocks(lib['id'], {'text_search': 'Baz', 'block_type': 'video'})) == 1
assert len(self._get_library_blocks(lib['id'], {'text_search': 'Baz', 'block_type': ['video', 'html']})) ==\
2
assert len(self._get_library_blocks(lib['id'], {'block_type': 'video'})) == 1
assert len(self._get_library_blocks(lib['id'], {'block_type': 'problem'})) == 3
assert len(self._get_library_blocks(lib['id'], {'block_type': 'squirrel'})) == 0
@ddt.data(
('video-problem', VIDEO, 'problem', 400),
@@ -464,17 +460,14 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
""")
# Check the resulting OLX of the unit:
self.assertEqual(self._get_library_block_olx(unit_block["id"]), (
'<unit xblock-family="xblock.v1">\n'
' <xblock-include definition="html/html1"/>\n'
' <xblock-include definition="problem/problem1"/>\n'
'</unit>\n'
))
assert self._get_library_block_olx(unit_block['id']) ==\
'<unit xblock-family="xblock.v1">\n <xblock-include definition="html/html1"/>\n' \
' <xblock-include definition="problem/problem1"/>\n</unit>\n'
# The unit can see and render its children:
fragment = self._render_block_view(unit_block["id"], "student_view")
self.assertIn("Hello world", fragment["content"])
self.assertIn("What is an even number?", fragment["content"])
assert 'Hello world' in fragment['content']
assert 'What is an even number?' in fragment['content']
# We cannot add a duplicate ID to the library, either at the top level or as a child:
self._add_block_to_library(lib_id, "problem", "problem1", expect_response=400)
@@ -508,12 +501,12 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
lib = self._create_library(slug="permtest", title="Permission Test Library", description="Testing")
lib_id = lib["id"]
# By default, "public learning" and public read access are disallowed.
self.assertEqual(lib["allow_public_learning"], False)
self.assertEqual(lib["allow_public_read"], False)
assert lib['allow_public_learning'] is False
assert lib['allow_public_read'] is False
# By default, the creator of a new library is the only admin
data = self._get_library_team(lib_id)
self.assertEqual(len(data), 1)
assert len(data) == 1
self.assertDictContainsEntries(data[0], {
"username": admin.username, "group_name": None, "access_level": "admin",
})
@@ -528,7 +521,7 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
self._set_group_access_level(lib_id, group.name, access_level="author")
team_response = self._get_library_team(lib_id)
self.assertEqual(len(team_response), 4)
assert len(team_response) == 4
# We'll use this one later.
reader_grant = {"username": reader.username, "group_name": None, "access_level": "read"}
# The response should also always be sorted in a specific order (by username and group name):
@@ -552,9 +545,9 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
with self.as_user(user):
self._get_library(lib_id)
data = self._get_library_team(lib_id)
self.assertEqual(data, team_response)
assert data == team_response
data = self._get_user_access_level(lib_id, reader.username)
self.assertEqual(data, {**reader_grant, 'username': 'Reader', 'email': 'reader@example.com'})
assert data == {**reader_grant, 'username': 'Reader', 'email': 'reader@example.com'}
# A user with only read permission can get data about the library but not the team:
with self.as_user(reader):
@@ -586,8 +579,8 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
# Verify the permitted changes were made:
with self.as_user(admin):
data = self._get_library(lib_id)
self.assertEqual(data["description"], "Revised description")
self.assertEqual(data["title"], "New Library Title")
assert data['description'] == 'Revised description'
assert data['title'] == 'New Library Title'
# Library XBlock editing ###############################################
@@ -723,26 +716,26 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
# The unit can see and render its children:
fragment = self._render_block_view(unit_block["id"], "student_view")
self.assertIn("What is an odd number?", fragment["content"])
self.assertIn("What is an even number?", fragment["content"])
self.assertIn("What holds this XBlock?", fragment["content"])
assert 'What is an odd number?' in fragment['content']
assert 'What is an even number?' in fragment['content']
assert 'What holds this XBlock?' in fragment['content']
# Also check the API for retrieving links:
links_created = self._get_library_links(lib_id)
links_created.sort(key=lambda link: link["id"])
self.assertEqual(len(links_created), 2)
assert len(links_created) == 2
self.assertEqual(links_created[0]["id"], "problem_bank")
self.assertEqual(links_created[0]["bundle_uuid"], bank_lib["bundle_uuid"])
self.assertEqual(links_created[0]["version"], 2)
self.assertEqual(links_created[0]["latest_version"], 2)
self.assertEqual(links_created[0]["opaque_key"], bank_lib_id)
assert links_created[0]['id'] == 'problem_bank'
assert links_created[0]['bundle_uuid'] == bank_lib['bundle_uuid']
assert links_created[0]['version'] == 2
assert links_created[0]['latest_version'] == 2
assert links_created[0]['opaque_key'] == bank_lib_id
self.assertEqual(links_created[1]["id"], "problem_bank_v1")
self.assertEqual(links_created[1]["bundle_uuid"], bank_lib["bundle_uuid"])
self.assertEqual(links_created[1]["version"], 1)
self.assertEqual(links_created[1]["latest_version"], 2)
self.assertEqual(links_created[1]["opaque_key"], bank_lib_id)
assert links_created[1]['id'] == 'problem_bank_v1'
assert links_created[1]['bundle_uuid'] == bank_lib['bundle_uuid']
assert links_created[1]['version'] == 1
assert links_created[1]['latest_version'] == 2
assert links_created[1]['opaque_key'] == bank_lib_id
def test_library_blocks_limit(self):
"""
@@ -770,7 +763,7 @@ class ContentLibrariesTest(ContentLibrariesRestApiTest):
lib = self._create_library(slug=slug, title='Test Block Types', library_type=library_type)
types = self._get_library_block_types(lib['id'])
if constrained:
self.assertEqual(len(types), 1)
self.assertEqual(types[0]['block_type'], library_type)
assert len(types) == 1
assert types[0]['block_type'] == library_type
else:
self.assertGreater(len(types), 1)
assert len(types) > 1

View File

@@ -38,15 +38,15 @@ class ContentLibraryIndexerTest(ContentLibrariesRestApiTest):
library_key = LibraryLocatorV2.from_string(result['id'])
response = ContentLibraryIndexer.get_items([library_key])[0]
self.assertEqual(response['id'], result['id'])
self.assertEqual(response['title'], result['title'])
self.assertEqual(response['description'], result['description'])
self.assertEqual(response['uuid'], result['bundle_uuid'])
self.assertEqual(response['num_blocks'], 0)
self.assertEqual(response['version'], result['version'])
self.assertEqual(response['last_published'], None)
self.assertEqual(response['has_unpublished_changes'], False)
self.assertEqual(response['has_unpublished_deletes'], False)
assert response['id'] == result['id']
assert response['title'] == result['title']
assert response['description'] == result['description']
assert response['uuid'] == result['bundle_uuid']
assert response['num_blocks'] == 0
assert response['version'] == result['version']
assert response['last_published'] is None
assert response['has_unpublished_changes'] is False
assert response['has_unpublished_deletes'] is False
def test_schema_updates(self):
"""
@@ -56,15 +56,15 @@ class ContentLibraryIndexerTest(ContentLibrariesRestApiTest):
new=0):
result = self._create_library(slug="test-lib-schemaupdates-1", title="Title 1", description="Description")
library_key = LibraryLocatorV2.from_string(result['id'])
self.assertEqual(len(ContentLibraryIndexer.get_items([library_key])), 1)
assert len(ContentLibraryIndexer.get_items([library_key])) == 1
with patch("openedx.core.djangoapps.content_libraries.libraries_index.ContentLibraryIndexer.SCHEMA_VERSION",
new=1):
self.assertEqual(len(ContentLibraryIndexer.get_items([library_key])), 0)
assert len(ContentLibraryIndexer.get_items([library_key])) == 0
call_command("reindex_content_library", all=True, force=True)
self.assertEqual(len(ContentLibraryIndexer.get_items([library_key])), 1)
assert len(ContentLibraryIndexer.get_items([library_key])) == 1
def test_remove_all_libraries(self):
"""
@@ -75,10 +75,10 @@ class ContentLibraryIndexerTest(ContentLibrariesRestApiTest):
library_key1 = LibraryLocatorV2.from_string(lib1['id'])
library_key2 = LibraryLocatorV2.from_string(lib2['id'])
self.assertEqual(len(ContentLibraryIndexer.get_items([library_key1, library_key2])), 2)
assert len(ContentLibraryIndexer.get_items([library_key1, library_key2])) == 2
ContentLibraryIndexer.remove_all_items()
self.assertEqual(len(ContentLibraryIndexer.get_items()), 0)
assert len(ContentLibraryIndexer.get_items()) == 0
def test_update_libraries(self):
"""
@@ -91,18 +91,18 @@ class ContentLibraryIndexerTest(ContentLibrariesRestApiTest):
response = ContentLibraryIndexer.get_items([library_key])[0]
self.assertEqual(response['id'], lib['id'])
self.assertEqual(response['title'], "New Title")
self.assertEqual(response['description'], "New Title")
self.assertEqual(response['uuid'], lib['bundle_uuid'])
self.assertEqual(response['num_blocks'], 0)
self.assertEqual(response['version'], lib['version'])
self.assertEqual(response['last_published'], None)
self.assertEqual(response['has_unpublished_changes'], False)
self.assertEqual(response['has_unpublished_deletes'], False)
assert response['id'] == lib['id']
assert response['title'] == 'New Title'
assert response['description'] == 'New Title'
assert response['uuid'] == lib['bundle_uuid']
assert response['num_blocks'] == 0
assert response['version'] == lib['version']
assert response['last_published'] is None
assert response['has_unpublished_changes'] is False
assert response['has_unpublished_deletes'] is False
self._delete_library(lib['id'])
self.assertEqual(ContentLibraryIndexer.get_items([library_key]), [])
assert ContentLibraryIndexer.get_items([library_key]) == []
ContentLibraryIndexer.get_items([library_key])
def test_update_library_blocks(self):
@@ -116,9 +116,9 @@ class ContentLibraryIndexerTest(ContentLibrariesRestApiTest):
last_published = ContentLibraryIndexer.get_items([library_key])[0]['last_published']
self._commit_library_changes(str(library_key))
response = ContentLibraryIndexer.get_items([library_key])[0]
self.assertEqual(response['has_unpublished_changes'], False)
self.assertEqual(response['has_unpublished_deletes'], False)
self.assertGreaterEqual(response['last_published'], last_published)
assert response['has_unpublished_changes'] is False
assert response['has_unpublished_deletes'] is False
assert response['last_published'] >= last_published
return response
def verify_uncommitted_libraries(library_key, has_unpublished_changes, has_unpublished_deletes):
@@ -126,8 +126,8 @@ class ContentLibraryIndexerTest(ContentLibrariesRestApiTest):
Verify uncommitted changes and deletes in the index
"""
response = ContentLibraryIndexer.get_items([library_key])[0]
self.assertEqual(response['has_unpublished_changes'], has_unpublished_changes)
self.assertEqual(response['has_unpublished_deletes'], has_unpublished_deletes)
assert response['has_unpublished_changes'] == has_unpublished_changes
assert response['has_unpublished_deletes'] == has_unpublished_deletes
return response
lib = self._create_library(slug="test-lib-update-block", title="Title", description="Description")
@@ -136,20 +136,20 @@ class ContentLibraryIndexerTest(ContentLibrariesRestApiTest):
# Verify uncommitted new blocks
block = self._add_block_to_library(lib['id'], "problem", "problem1")
response = verify_uncommitted_libraries(library_key, True, False)
self.assertEqual(response['last_published'], None)
self.assertEqual(response['num_blocks'], 1)
assert response['last_published'] is None
assert response['num_blocks'] == 1
# Verify committed new blocks
self._commit_library_changes(lib['id'])
response = verify_uncommitted_libraries(library_key, False, False)
self.assertEqual(response['num_blocks'], 1)
assert response['num_blocks'] == 1
# Verify uncommitted deleted blocks
self._delete_library_block(block['id'])
response = verify_uncommitted_libraries(library_key, True, True)
self.assertEqual(response['num_blocks'], 0)
assert response['num_blocks'] == 0
# Verify committed deleted blocks
self._commit_library_changes(lib['id'])
response = verify_uncommitted_libraries(library_key, False, False)
self.assertEqual(response['num_blocks'], 0)
assert response['num_blocks'] == 0
block = self._add_block_to_library(lib['id'], "problem", "problem1")
self._commit_library_changes(lib['id'])
@@ -201,17 +201,17 @@ class LibraryBlockIndexerTest(ContentLibrariesRestApiTest):
block1 = self._add_block_to_library(lib['id'], "problem", "problem1")
block2 = self._add_block_to_library(lib['id'], "problem", "problem2")
self.assertEqual(len(LibraryBlockIndexer.get_items()), 2)
assert len(LibraryBlockIndexer.get_items()) == 2
for block in [block1, block2]:
usage_key = LibraryUsageLocatorV2.from_string(block['id'])
response = LibraryBlockIndexer.get_items([usage_key])[0]
self.assertEqual(response['id'], block['id'])
self.assertEqual(response['def_key'], block['def_key'])
self.assertEqual(response['block_type'], block['block_type'])
self.assertEqual(response['display_name'], block['display_name'])
self.assertEqual(response['has_unpublished_changes'], block['has_unpublished_changes'])
assert response['id'] == block['id']
assert response['def_key'] == block['def_key']
assert response['block_type'] == block['block_type']
assert response['display_name'] == block['display_name']
assert response['has_unpublished_changes'] == block['has_unpublished_changes']
def test_schema_updates(self):
"""
@@ -221,15 +221,15 @@ class LibraryBlockIndexerTest(ContentLibrariesRestApiTest):
with patch("openedx.core.djangoapps.content_libraries.libraries_index.LibraryBlockIndexer.SCHEMA_VERSION",
new=0):
block = self._add_block_to_library(lib['id'], "problem", "problem1")
self.assertEqual(len(LibraryBlockIndexer.get_items([block['id']])), 1)
assert len(LibraryBlockIndexer.get_items([block['id']])) == 1
with patch("openedx.core.djangoapps.content_libraries.libraries_index.LibraryBlockIndexer.SCHEMA_VERSION",
new=1):
self.assertEqual(len(LibraryBlockIndexer.get_items([block['id']])), 0)
assert len(LibraryBlockIndexer.get_items([block['id']])) == 0
call_command("reindex_content_library", all=True, force=True)
self.assertEqual(len(LibraryBlockIndexer.get_items([block['id']])), 1)
assert len(LibraryBlockIndexer.get_items([block['id']])) == 1
def test_remove_all_items(self):
"""
@@ -238,10 +238,10 @@ class LibraryBlockIndexerTest(ContentLibrariesRestApiTest):
lib1 = self._create_library(slug="test-lib-rm-all", title="Title 1", description="Description")
self._add_block_to_library(lib1['id'], "problem", "problem1")
self._add_block_to_library(lib1['id'], "problem", "problem2")
self.assertEqual(len(LibraryBlockIndexer.get_items()), 2)
assert len(LibraryBlockIndexer.get_items()) == 2
LibraryBlockIndexer.remove_all_items()
self.assertEqual(len(LibraryBlockIndexer.get_items()), 0)
assert len(LibraryBlockIndexer.get_items()) == 0
def test_crud_block(self):
"""
@@ -253,29 +253,29 @@ class LibraryBlockIndexerTest(ContentLibrariesRestApiTest):
# Update OLX, verify updates in index
self._set_library_block_olx(block["id"], '<problem display_name="new_name"/>')
response = LibraryBlockIndexer.get_items([block['id']])[0]
self.assertEqual(response['display_name'], "new_name")
self.assertEqual(response['has_unpublished_changes'], True)
assert response['display_name'] == 'new_name'
assert response['has_unpublished_changes'] is True
# Verify has_unpublished_changes after committing library
self._commit_library_changes(lib['id'])
response = LibraryBlockIndexer.get_items([block['id']])[0]
self.assertEqual(response['has_unpublished_changes'], False)
assert response['has_unpublished_changes'] is False
# Verify has_unpublished_changes after reverting library
self._set_library_block_asset(block["id"], "whatever.png", b"data")
response = LibraryBlockIndexer.get_items([block['id']])[0]
self.assertEqual(response['has_unpublished_changes'], True)
assert response['has_unpublished_changes'] is True
self._revert_library_changes(lib['id'])
response = LibraryBlockIndexer.get_items([block['id']])[0]
self.assertEqual(response['has_unpublished_changes'], False)
assert response['has_unpublished_changes'] is False
# Verify that deleting block removes it from index
self._delete_library_block(block['id'])
self.assertEqual(LibraryBlockIndexer.get_items([block['id']]), [])
assert LibraryBlockIndexer.get_items([block['id']]) == []
# Verify that deleting a library removes its blocks from index too
self._add_block_to_library(lib['id'], "problem", "problem1")
LibraryBlockIndexer.get_items([block['id']])
self._delete_library(lib['id'])
self.assertEqual(LibraryBlockIndexer.get_items([block['id']]), [])
assert LibraryBlockIndexer.get_items([block['id']]) == []

View File

@@ -99,11 +99,8 @@ class ContentLibraryRuntimeTest(ContentLibraryContentTestMixin, TestCase):
# Load both blocks:
unit_block = xblock_api.load_block(unit_block_key, self.student_a)
unit_block2 = xblock_api.load_block(unit_block2_key, self.student_a)
self.assertEqual(
library_api.get_library_block_olx(unit_block_key),
library_api.get_library_block_olx(unit_block2_key),
)
self.assertNotEqual(unit_block.children, unit_block2.children)
assert library_api.get_library_block_olx(unit_block_key) == library_api.get_library_block_olx(unit_block2_key)
assert unit_block.children != unit_block2.children
def test_has_score(self):
"""
@@ -116,10 +113,12 @@ class ContentLibraryRuntimeTest(ContentLibraryContentTestMixin, TestCase):
unit_block = xblock_api.load_block(unit_block_key, self.student_a)
problem_block = xblock_api.load_block(problem_block_key, self.student_a)
self.assertFalse(hasattr(UnitBlock, 'has_score')) # The block class doesn't declare 'has_score'
self.assertEqual(unit_block.has_score, False) # But it gets added by the runtime and defaults to False
assert not hasattr(UnitBlock, 'has_score')
# The block class doesn't declare 'has_score'
assert unit_block.has_score is False
# But it gets added by the runtime and defaults to False
# And problems do have has_score True:
self.assertEqual(problem_block.has_score, True)
assert problem_block.has_score is True
@skip_unless_cms # creating child blocks only works properly in Studio
def test_xblock_metadata(self):
@@ -154,23 +153,24 @@ class ContentLibraryRuntimeTest(ContentLibraryContentTestMixin, TestCase):
URL_BLOCK_METADATA_URL.format(block_key=unit_block_key),
{"include": "children,editable_children"},
)
self.assertEqual(metadata_view_result.data["children"], [str(problem_key)])
self.assertEqual(metadata_view_result.data["editable_children"], [str(problem_key)])
assert metadata_view_result.data['children'] == [str(problem_key)]
assert metadata_view_result.data['editable_children'] == [str(problem_key)]
# Check the metadata API for the problem:
metadata_view_result = client.get(
URL_BLOCK_METADATA_URL.format(block_key=problem_key),
{"include": "student_view_data,index_dictionary"},
)
self.assertEqual(metadata_view_result.data["block_id"], str(problem_key))
self.assertEqual(metadata_view_result.data["display_name"], "New Multi Choice Question")
self.assertNotIn("children", metadata_view_result.data)
self.assertNotIn("editable_children", metadata_view_result.data)
assert metadata_view_result.data['block_id'] == str(problem_key)
assert metadata_view_result.data['display_name'] == 'New Multi Choice Question'
assert 'children' not in metadata_view_result.data
assert 'editable_children' not in metadata_view_result.data
self.assertDictContainsSubset({
"content_type": "CAPA",
"problem_types": ["multiplechoiceresponse"],
}, metadata_view_result.data["index_dictionary"])
self.assertEqual(metadata_view_result.data["student_view_data"], None) # Capa doesn't provide student_view_data
assert metadata_view_result.data['student_view_data'] is None
# Capa doesn't provide student_view_data
@requires_blockstore
@@ -197,11 +197,11 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
block_alice = xblock_api.load_block(block_usage_key, self.student_a)
self.assertEqual(block_alice.scope_ids.user_id, self.student_a.id)
self.assertEqual(block_alice.user_str, 'default value')
self.assertEqual(block_alice.uss_str, 'default value')
self.assertEqual(block_alice.pref_str, 'default value')
self.assertEqual(block_alice.user_info_str, 'default value')
assert block_alice.scope_ids.user_id == self.student_a.id
assert block_alice.user_str == 'default value'
assert block_alice.uss_str == 'default value'
assert block_alice.pref_str == 'default value'
assert block_alice.user_info_str == 'default value'
@XBlock.register_temp_plugin(UserStateTestBlock, UserStateTestBlock.BLOCK_TYPE)
def test_modify_state_directly(self):
@@ -226,30 +226,30 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
# Now load it back and expect the same field data:
block1_alice = xblock_api.load_block(block1_usage_key, self.student_a)
self.assertEqual(block1_alice.scope_ids.user_id, self.student_a.id)
self.assertEqual(block1_alice.user_str, 'Alice was here')
self.assertEqual(block1_alice.uss_str, 'Alice was here (USS)')
self.assertEqual(block1_alice.pref_str, 'Alice was here (prefs)')
self.assertEqual(block1_alice.user_info_str, 'Alice was here (user info)')
assert block1_alice.scope_ids.user_id == self.student_a.id
assert block1_alice.user_str == 'Alice was here'
assert block1_alice.uss_str == 'Alice was here (USS)'
assert block1_alice.pref_str == 'Alice was here (prefs)'
assert block1_alice.user_info_str == 'Alice was here (user info)'
# Now load a different block for Alice:
block2_alice = xblock_api.load_block(block2_usage_key, self.student_a)
# User state should be default:
self.assertEqual(block2_alice.user_str, 'default value')
assert block2_alice.user_str == 'default value'
# User state summary should be default:
self.assertEqual(block2_alice.uss_str, 'default value')
assert block2_alice.uss_str == 'default value'
# But prefs and user info should be shared:
self.assertEqual(block2_alice.pref_str, 'Alice was here (prefs)')
self.assertEqual(block2_alice.user_info_str, 'Alice was here (user info)')
assert block2_alice.pref_str == 'Alice was here (prefs)'
assert block2_alice.user_info_str == 'Alice was here (user info)'
# Now load the first block, block1, for Bob:
block1_bob = xblock_api.load_block(block1_usage_key, self.student_b)
self.assertEqual(block1_bob.scope_ids.user_id, self.student_b.id)
self.assertEqual(block1_bob.user_str, 'default value')
self.assertEqual(block1_bob.uss_str, 'Alice was here (USS)')
self.assertEqual(block1_bob.pref_str, 'default value')
self.assertEqual(block1_bob.user_info_str, 'default value')
assert block1_bob.scope_ids.user_id == self.student_b.id
assert block1_bob.user_str == 'default value'
assert block1_bob.uss_str == 'Alice was here (USS)'
assert block1_bob.pref_str == 'default value'
assert block1_bob.user_info_str == 'default value'
@XBlock.register_temp_plugin(UserStateTestBlock, UserStateTestBlock.BLOCK_TYPE)
def test_state_for_anonymous_users(self):
@@ -273,7 +273,7 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
url = url_result.data["handler_url"]
data_json = json.dumps(data) if data else None
response = getattr(client, method)(url, data_json, content_type="application/json")
self.assertEqual(response.status_code, 200)
assert response.status_code == 200
return response.json()
# Now client1 sets all the fields via a handler:
@@ -286,33 +286,33 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
# Now load it back and expect the same data:
data = call_handler(client1, block1_usage_key, "get_user_state", "get")
self.assertEqual(data["user_str"], "1 was here")
self.assertEqual(data["uss_str"], "1 was here (USS)")
self.assertEqual(data["pref_str"], "1 was here (prefs)")
self.assertEqual(data["user_info_str"], "1 was here (user info)")
assert data['user_str'] == '1 was here'
assert data['uss_str'] == '1 was here (USS)'
assert data['pref_str'] == '1 was here (prefs)'
assert data['user_info_str'] == '1 was here (user info)'
# Now load a different XBlock and expect only pref_str and user_info_str to be set:
data = call_handler(client1, block2_usage_key, "get_user_state", "get")
self.assertEqual(data["user_str"], "default value")
self.assertEqual(data["uss_str"], "default value")
self.assertEqual(data["pref_str"], "1 was here (prefs)")
self.assertEqual(data["user_info_str"], "1 was here (user info)")
assert data['user_str'] == 'default value'
assert data['uss_str'] == 'default value'
assert data['pref_str'] == '1 was here (prefs)'
assert data['user_info_str'] == '1 was here (user info)'
# Now a different anonymous user loading the first block should see only the uss_str set:
data = call_handler(client2, block1_usage_key, "get_user_state", "get")
self.assertEqual(data["user_str"], "default value")
self.assertEqual(data["uss_str"], "1 was here (USS)")
self.assertEqual(data["pref_str"], "default value")
self.assertEqual(data["user_info_str"], "default value")
assert data['user_str'] == 'default value'
assert data['uss_str'] == '1 was here (USS)'
assert data['pref_str'] == 'default value'
assert data['user_info_str'] == 'default value'
# The "user state summary" should not be shared between registered and anonymous users:
client_registered = APIClient()
client_registered.login(username=self.student_a.username, password='edx')
data = call_handler(client_registered, block1_usage_key, "get_user_state", "get")
self.assertEqual(data["user_str"], "default value")
self.assertEqual(data["uss_str"], "default value")
self.assertEqual(data["pref_str"], "default value")
self.assertEqual(data["user_info_str"], "default value")
assert data['user_str'] == 'default value'
assert data['uss_str'] == 'default value'
assert data['pref_str'] == 'default value'
assert data['user_info_str'] == 'default value'
def test_views_for_anonymous_users(self):
"""
@@ -330,14 +330,14 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
public_view_result = anon_client.get(
URL_BLOCK_RENDER_VIEW.format(block_key=block_usage_key, view_name='public_view'),
)
self.assertEqual(public_view_result.status_code, 200)
self.assertIn("Hello world", public_view_result.data["content"])
assert public_view_result.status_code == 200
assert 'Hello world' in public_view_result.data['content']
# Try to view the student_view:
public_view_result = anon_client.get(
URL_BLOCK_RENDER_VIEW.format(block_key=block_usage_key, view_name='student_view'),
)
self.assertEqual(public_view_result.status_code, 403)
assert public_view_result.status_code == 403
@XBlock.register_temp_plugin(UserStateTestBlock, UserStateTestBlock.BLOCK_TYPE)
def test_independent_instances(self):
@@ -359,14 +359,14 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
# block.
block_instance1.user_str = 'changed to this'
self.assertNotEqual(block_instance1.user_str, block_instance2.user_str)
assert block_instance1.user_str != block_instance2.user_str
block_instance1.save()
self.assertNotEqual(block_instance1.user_str, block_instance2.user_str)
assert block_instance1.user_str != block_instance2.user_str
block_instance2 = block_instance1.runtime.get_block(block_usage_key)
# Now they should be equal, because we've saved and re-loaded instance2:
self.assertEqual(block_instance1.user_str, block_instance2.user_str)
assert block_instance1.user_str == block_instance2.user_str
@skip_unless_lms # Scores are only used in the LMS
def test_scores_persisted(self):
@@ -399,14 +399,14 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
client.login(username=self.student_a.username, password='edx')
student_view_result = client.get(URL_BLOCK_RENDER_VIEW.format(block_key=block_id, view_name='student_view'))
problem_key = "input_{}_2_1".format(block_id)
self.assertIn(problem_key, student_view_result.data["content"])
assert problem_key in student_view_result.data['content']
# And submit a wrong answer:
result = client.get(URL_BLOCK_GET_HANDLER_URL.format(block_key=block_id, handler_name='xmodule_handler'))
problem_check_url = result.data["handler_url"] + 'problem_check'
submit_result = client.post(problem_check_url, data={problem_key: "choice_3"})
self.assertEqual(submit_result.status_code, 200)
assert submit_result.status_code == 200
submit_data = json.loads(submit_result.content.decode('utf-8'))
self.assertDictContainsSubset({
"current_score": 0,
@@ -417,12 +417,12 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
# Now test that the score is also persisted in StudentModule:
# If we add a REST API to get an individual block's score, that should be checked instead of StudentModule.
sm = get_score(self.student_a, block_id)
self.assertEqual(sm.grade, 0)
self.assertEqual(sm.max_grade, 1)
assert sm.grade == 0
assert sm.max_grade == 1
# And submit a correct answer:
submit_result = client.post(problem_check_url, data={problem_key: "choice_1"})
self.assertEqual(submit_result.status_code, 200)
assert submit_result.status_code == 200
submit_data = json.loads(submit_result.content.decode('utf-8'))
self.assertDictContainsSubset({
"current_score": 1,
@@ -432,8 +432,8 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
# Now test that the score is also updated in StudentModule:
# If we add a REST API to get an individual block's score, that should be checked instead of StudentModule.
sm = get_score(self.student_a, block_id)
self.assertEqual(sm.grade, 1)
self.assertEqual(sm.max_grade, 1)
assert sm.grade == 1
assert sm.max_grade == 1
@skip_unless_lms
def test_i18n(self):
@@ -469,14 +469,14 @@ class ContentLibraryXBlockUserStateTest(ContentLibraryContentTestMixin, TestCase
# View the problem without specifying a language
default_public_view = client.get(URL_BLOCK_RENDER_VIEW.format(block_key=block_id, view_name='public_view'))
self.assertIn("Submit", default_public_view.data["content"])
self.assertNotIn("Süßmït", default_public_view.data["content"])
assert 'Submit' in default_public_view.data['content']
assert 'Süßmït' not in default_public_view.data['content']
# View the problem and request the dummy language
dummy_public_view = client.get(URL_BLOCK_RENDER_VIEW.format(block_key=block_id, view_name='public_view'),
HTTP_ACCEPT_LANGUAGE='eo')
self.assertIn("Süßmït", dummy_public_view.data["content"])
self.assertNotIn("Submit", dummy_public_view.data["content"])
assert 'Süßmït' in dummy_public_view.data['content']
assert 'Submit' not in dummy_public_view.data['content']
@requires_blockstore
@@ -518,7 +518,7 @@ class ContentLibraryXBlockCompletionTest(ContentLibraryContentTestMixin, Complet
return service.get_completions([block_id])[block_id]
# At first the block is not completed
self.assertEqual(get_block_completion_status(), 0)
assert get_block_completion_status() == 0
# Now call the 'publish_completion' handler:
client = APIClient()
@@ -528,7 +528,7 @@ class ContentLibraryXBlockCompletionTest(ContentLibraryContentTestMixin, Complet
# This will test the 'completion' service and the completion event handler:
result2 = client.post(publish_completion_url, {"completion": 1.0}, format='json')
self.assertEqual(result2.status_code, 200)
assert result2.status_code == 200
# Now the block is completed
self.assertEqual(get_block_completion_status(), 1)
assert get_block_completion_status() == 1

View File

@@ -47,7 +47,7 @@ class ContentLibrariesStaticAssetsTest(ContentLibrariesRestApiTest):
file_name = "image.svg"
# A new block has no assets:
self.assertEqual(self._get_library_block_assets(block_id), [])
assert self._get_library_block_assets(block_id) == []
self._get_library_block_asset(block_id, file_name, expect_response=404)
# Upload an asset file
@@ -55,22 +55,22 @@ class ContentLibrariesStaticAssetsTest(ContentLibrariesRestApiTest):
# Get metadata about the uploaded asset file
metadata = self._get_library_block_asset(block_id, file_name)
self.assertEqual(metadata["path"], file_name)
self.assertEqual(metadata["size"], len(SVG_DATA))
assert metadata['path'] == file_name
assert metadata['size'] == len(SVG_DATA)
asset_list = self._get_library_block_assets(block_id)
# We don't just assert that 'asset_list == [metadata]' because that may
# break in the future if the "get asset" view returns more detail than
# the "list assets" view.
self.assertEqual(len(asset_list), 1)
self.assertEqual(asset_list[0]["path"], metadata["path"])
self.assertEqual(asset_list[0]["size"], metadata["size"])
self.assertEqual(asset_list[0]["url"], metadata["url"])
assert len(asset_list) == 1
assert asset_list[0]['path'] == metadata['path']
assert asset_list[0]['size'] == metadata['size']
assert asset_list[0]['url'] == metadata['url']
# Download the file and check that it matches what was uploaded.
# We need to download using requests since this is served by Blockstore,
# which the django test client can't interact with.
content_get_result = requests.get(metadata["url"])
self.assertEqual(content_get_result.content, SVG_DATA)
assert content_get_result.content == SVG_DATA
# Set some OLX referencing this asset:
self._set_library_block_olx(block_id, """
@@ -82,16 +82,16 @@ class ContentLibrariesStaticAssetsTest(ContentLibrariesRestApiTest):
# served differently by Blockstore and we should test that too.
self._commit_library_changes(library["id"])
metadata = self._get_library_block_asset(block_id, file_name)
self.assertEqual(metadata["path"], file_name)
self.assertEqual(metadata["size"], len(SVG_DATA))
assert metadata['path'] == file_name
assert metadata['size'] == len(SVG_DATA)
# Download the file from the new URL:
content_get_result = requests.get(metadata["url"])
self.assertEqual(content_get_result.content, SVG_DATA)
assert content_get_result.content == SVG_DATA
# Check that the URL in the student_view gets rewritten:
fragment = self._render_block_view(block_id, "student_view")
self.assertNotIn("/static/image.svg", fragment["content"])
self.assertIn(metadata["url"], fragment["content"])
assert '/static/image.svg' not in fragment['content']
assert metadata['url'] in fragment['content']
def test_asset_filenames(self):
"""
@@ -105,14 +105,14 @@ class ContentLibrariesStaticAssetsTest(ContentLibrariesRestApiTest):
# Unicode names are allowed
file_name = "🏕.svg" # (camping).svg
self._set_library_block_asset(block_id, file_name, SVG_DATA)
self.assertEqual(self._get_library_block_asset(block_id, file_name)["path"], file_name)
self.assertEqual(self._get_library_block_asset(block_id, file_name)["size"], file_size)
assert self._get_library_block_asset(block_id, file_name)['path'] == file_name
assert self._get_library_block_asset(block_id, file_name)['size'] == file_size
# Subfolder names are allowed
file_name = "transcripts/en.srt"
self._set_library_block_asset(block_id, file_name, SVG_DATA)
self.assertEqual(self._get_library_block_asset(block_id, file_name)["path"], file_name)
self.assertEqual(self._get_library_block_asset(block_id, file_name)["size"], file_size)
assert self._get_library_block_asset(block_id, file_name)['path'] == file_name
assert self._get_library_block_asset(block_id, file_name)['size'] == file_size
# '../' is definitely not allowed
file_name = "../definition.xml"
@@ -148,8 +148,8 @@ class ContentLibrariesStaticAssetsTest(ContentLibrariesRestApiTest):
"""
url = transcript_handler_url + 'translation/en'
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertIn("Welcome to edX", response.content.decode('utf-8'))
assert response.status_code == 200
assert 'Welcome to edX' in response.content.decode('utf-8')
def check_download():
"""
@@ -157,8 +157,8 @@ class ContentLibrariesStaticAssetsTest(ContentLibrariesRestApiTest):
"""
url = transcript_handler_url + 'download'
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, TRANSCRIPT_DATA)
assert response.status_code == 200
assert response.content == TRANSCRIPT_DATA
check_sjson()
check_download()

View File

@@ -120,7 +120,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
self.client.logout()
resp = self.client.get(self.url_unlocked)
self.assertEqual(resp.status_code, 200)
assert resp.status_code == 200
def test_unlocked_versioned_asset(self):
"""
@@ -128,7 +128,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
self.client.logout()
resp = self.client.get(self.url_unlocked_versioned)
self.assertEqual(resp.status_code, 200)
assert resp.status_code == 200
def test_unlocked_versioned_asset_old_style(self):
"""
@@ -136,7 +136,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
self.client.logout()
resp = self.client.get(self.url_unlocked_versioned_old_style)
self.assertEqual(resp.status_code, 200)
assert resp.status_code == 200
def test_unlocked_versioned_asset_with_nonexistent_version(self):
"""
@@ -147,30 +147,30 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
self.client.logout()
resp = self.client.get(url_unlocked_versioned_old)
self.assertEqual(resp.status_code, 301)
self.assertTrue(resp.url.endswith(self.url_unlocked_versioned))
assert resp.status_code == 301
assert resp.url.endswith(self.url_unlocked_versioned)
def test_locked_versioned_asset(self):
"""
Test that locked assets that are versioned are being served.
"""
CourseEnrollment.enroll(self.non_staff_usr, self.course_key)
self.assertTrue(CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key))
assert CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key)
self.client.login(username=self.non_staff_usr, password='test')
resp = self.client.get(self.url_locked_versioned)
self.assertEqual(resp.status_code, 200)
assert resp.status_code == 200
def test_locked_versioned_old_styleasset(self):
"""
Test that locked assets that are versioned (old-style) are being served.
"""
CourseEnrollment.enroll(self.non_staff_usr, self.course_key)
self.assertTrue(CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key))
assert CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key)
self.client.login(username=self.non_staff_usr, password='test')
resp = self.client.get(self.url_locked_versioned_old_style)
self.assertEqual(resp.status_code, 200)
assert resp.status_code == 200
def test_locked_asset_not_logged_in(self):
"""
@@ -179,7 +179,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
self.client.logout()
resp = self.client.get(self.url_locked)
self.assertEqual(resp.status_code, 403)
assert resp.status_code == 403
def test_locked_asset_not_registered(self):
"""
@@ -188,7 +188,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
self.client.login(username=self.non_staff_usr, password='test')
resp = self.client.get(self.url_locked)
self.assertEqual(resp.status_code, 403)
assert resp.status_code == 403
def test_locked_asset_registered(self):
"""
@@ -196,11 +196,11 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
and registered for the course.
"""
CourseEnrollment.enroll(self.non_staff_usr, self.course_key)
self.assertTrue(CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key))
assert CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key)
self.client.login(username=self.non_staff_usr, password='test')
resp = self.client.get(self.url_locked)
self.assertEqual(resp.status_code, 200)
assert resp.status_code == 200
def test_locked_asset_staff(self):
"""
@@ -208,7 +208,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
self.client.login(username=self.staff_usr, password='test')
resp = self.client.get(self.url_locked)
self.assertEqual(resp.status_code, 200)
assert resp.status_code == 200
def test_range_request_full_file(self):
"""
@@ -217,10 +217,12 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
resp = self.client.get(self.url_unlocked, HTTP_RANGE='bytes=0-')
self.assertEqual(resp.status_code, 206) # HTTP_206_PARTIAL_CONTENT
self.assertEqual(resp['Content-Range'], u'bytes {first}-{last}/{length}'
.format(first=0, last=self.length_unlocked - 1, length=self.length_unlocked))
self.assertEqual(resp['Content-Length'], str(self.length_unlocked))
assert resp.status_code == 206
# HTTP_206_PARTIAL_CONTENT
assert resp['Content-Range'] ==\
u'bytes {first}-{last}/{length}'.format(first=0, last=(self.length_unlocked - 1),
length=self.length_unlocked)
assert resp['Content-Length'] == str(self.length_unlocked)
def test_range_request_partial_file(self):
"""
@@ -233,10 +235,12 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
resp = self.client.get(self.url_unlocked, HTTP_RANGE='bytes={first}-{last}'.format(
first=first_byte, last=last_byte))
self.assertEqual(resp.status_code, 206) # HTTP_206_PARTIAL_CONTENT
self.assertEqual(resp['Content-Range'], u'bytes {first}-{last}/{length}'.format(
first=first_byte, last=last_byte, length=self.length_unlocked))
self.assertEqual(resp['Content-Length'], str(last_byte - first_byte + 1))
assert resp.status_code == 206
# HTTP_206_PARTIAL_CONTENT
assert resp['Content-Range'] == u'bytes {first}-{last}/{length}'.format(first=first_byte,
last=last_byte,
length=self.length_unlocked)
assert resp['Content-Length'] == str(((last_byte - first_byte) + 1))
def test_range_request_multiple_ranges(self):
"""
@@ -248,9 +252,9 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
resp = self.client.get(self.url_unlocked, HTTP_RANGE='bytes={first}-{last}, -100'.format(
first=first_byte, last=last_byte))
self.assertEqual(resp.status_code, 200)
self.assertNotIn('Content-Range', resp)
self.assertEqual(resp['Content-Length'], str(self.length_unlocked))
assert resp.status_code == 200
assert 'Content-Range' not in resp
assert resp['Content-Length'] == str(self.length_unlocked)
@ddt.data(
'bytes 0-',
@@ -263,8 +267,8 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
Test that syntactically invalid Range values result in a 200 OK full content response.
"""
resp = self.client.get(self.url_unlocked, HTTP_RANGE=header_value)
self.assertEqual(resp.status_code, 200)
self.assertNotIn('Content-Range', resp)
assert resp.status_code == 200
assert 'Content-Range' not in resp
def test_range_request_malformed_invalid_range(self):
"""
@@ -273,7 +277,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
resp = self.client.get(self.url_unlocked, HTTP_RANGE='bytes={first}-{last}'.format(
first=(self.length_unlocked // 2), last=(self.length_unlocked // 4)))
self.assertEqual(resp.status_code, 416)
assert resp.status_code == 416
def test_range_request_malformed_out_of_bounds(self):
"""
@@ -282,7 +286,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
"""
resp = self.client.get(self.url_unlocked, HTTP_RANGE='bytes={first}-{last}'.format(
first=(self.length_unlocked), last=(self.length_unlocked)))
self.assertEqual(resp.status_code, 416)
assert resp.status_code == 416
def test_vary_header_sent(self):
"""
@@ -290,8 +294,8 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
cached in a way that breaks XHR requests to the same asset.
"""
resp = self.client.get(self.url_unlocked)
self.assertEqual(resp.status_code, 200)
self.assertEqual('Origin', resp['Vary'])
assert resp.status_code == 200
assert 'Origin' == resp['Vary']
@patch('openedx.core.djangoapps.contentserver.models.CourseAssetCacheTtlConfig.get_cache_ttl')
def test_cache_headers_with_ttl_unlocked(self, mock_get_cache_ttl):
@@ -302,9 +306,9 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
mock_get_cache_ttl.return_value = 10
resp = self.client.get(self.url_unlocked)
self.assertEqual(resp.status_code, 200)
self.assertIn('Expires', resp)
self.assertEqual('public, max-age=10, s-maxage=10', resp['Cache-Control'])
assert resp.status_code == 200
assert 'Expires' in resp
assert 'public, max-age=10, s-maxage=10' == resp['Cache-Control']
@patch('openedx.core.djangoapps.contentserver.models.CourseAssetCacheTtlConfig.get_cache_ttl')
def test_cache_headers_with_ttl_locked(self, mock_get_cache_ttl):
@@ -315,13 +319,13 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
mock_get_cache_ttl.return_value = 10
CourseEnrollment.enroll(self.non_staff_usr, self.course_key)
self.assertTrue(CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key))
assert CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key)
self.client.login(username=self.non_staff_usr, password='test')
resp = self.client.get(self.url_locked)
self.assertEqual(resp.status_code, 200)
self.assertNotIn('Expires', resp)
self.assertEqual('private, no-cache, no-store', resp['Cache-Control'])
assert resp.status_code == 200
assert 'Expires' not in resp
assert 'private, no-cache, no-store' == resp['Cache-Control']
@patch('openedx.core.djangoapps.contentserver.models.CourseAssetCacheTtlConfig.get_cache_ttl')
def test_cache_headers_without_ttl_unlocked(self, mock_get_cache_ttl):
@@ -332,9 +336,9 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
mock_get_cache_ttl.return_value = 0
resp = self.client.get(self.url_unlocked)
self.assertEqual(resp.status_code, 200)
self.assertNotIn('Expires', resp)
self.assertNotIn('Cache-Control', resp)
assert resp.status_code == 200
assert 'Expires' not in resp
assert 'Cache-Control' not in resp
@patch('openedx.core.djangoapps.contentserver.models.CourseAssetCacheTtlConfig.get_cache_ttl')
def test_cache_headers_without_ttl_locked(self, mock_get_cache_ttl):
@@ -345,18 +349,18 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
mock_get_cache_ttl.return_value = 0
CourseEnrollment.enroll(self.non_staff_usr, self.course_key)
self.assertTrue(CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key))
assert CourseEnrollment.is_enrolled(self.non_staff_usr, self.course_key)
self.client.login(username=self.non_staff_usr, password='test')
resp = self.client.get(self.url_locked)
self.assertEqual(resp.status_code, 200)
self.assertNotIn('Expires', resp)
self.assertEqual('private, no-cache, no-store', resp['Cache-Control'])
assert resp.status_code == 200
assert 'Expires' not in resp
assert 'private, no-cache, no-store' == resp['Cache-Control']
def test_get_expiration_value(self):
start_dt = datetime.datetime.strptime("Thu, 01 Dec 1983 20:00:00 GMT", HTTP_DATE_FORMAT)
near_expire_dt = StaticContentServer.get_expiration_value(start_dt, 55)
self.assertEqual("Thu, 01 Dec 1983 20:00:55 GMT", near_expire_dt)
assert 'Thu, 01 Dec 1983 20:00:55 GMT' == near_expire_dt
@patch('openedx.core.djangoapps.contentserver.models.CdnUserAgentsConfig.get_cdn_user_agents')
def test_cache_is_cdn_with_normal_request(self, mock_get_cdn_user_agents):
@@ -370,7 +374,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
browser_request = request_factory.get('/fake', HTTP_USER_AGENT='Chrome 1234')
is_from_cdn = StaticContentServer.is_cdn_request(browser_request)
self.assertEqual(is_from_cdn, False)
assert is_from_cdn is False
@patch('openedx.core.djangoapps.contentserver.models.CdnUserAgentsConfig.get_cdn_user_agents')
def test_cache_is_cdn_with_cdn_request(self, mock_get_cdn_user_agents):
@@ -384,7 +388,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
browser_request = request_factory.get('/fake', HTTP_USER_AGENT='Amazon CloudFront')
is_from_cdn = StaticContentServer.is_cdn_request(browser_request)
self.assertEqual(is_from_cdn, True)
assert is_from_cdn is True
@patch('openedx.core.djangoapps.contentserver.models.CdnUserAgentsConfig.get_cdn_user_agents')
def test_cache_is_cdn_with_cdn_request_multiple_user_agents(self, mock_get_cdn_user_agents):
@@ -399,7 +403,7 @@ class ContentStoreToyCourseTest(SharedModuleStoreTestCase):
browser_request = request_factory.get('/fake', HTTP_USER_AGENT='Amazon CloudFront')
is_from_cdn = StaticContentServer.is_cdn_request(browser_request)
self.assertEqual(is_from_cdn, True)
assert is_from_cdn is True
@ddt.ddt
@@ -414,7 +418,7 @@ class ParseRangeHeaderTestCase(unittest.TestCase):
def test_bytes_unit(self):
unit, __ = parse_range_header('bytes=100-', self.content_length)
self.assertEqual(unit, 'bytes')
assert unit == 'bytes'
@ddt.data(
('bytes=100-', 1, [(100, 9999)]),
@@ -427,8 +431,8 @@ class ParseRangeHeaderTestCase(unittest.TestCase):
@ddt.unpack
def test_valid_syntax(self, header_value, excepted_ranges_length, expected_ranges):
__, ranges = parse_range_header(header_value, self.content_length)
self.assertEqual(len(ranges), excepted_ranges_length)
self.assertEqual(ranges, expected_ranges)
assert len(ranges) == excepted_ranges_length
assert ranges == expected_ranges
@ddt.data(
('bytes=one-20', ValueError, 'invalid literal for int()'),

View File

@@ -47,8 +47,8 @@ class CrossDomainAuthTest(TestCase):
def test_skip_csrf_referer_check(self):
request = self._fake_request()
result = self.auth.enforce_csrf(request)
self.assertIs(result, None)
self.assertTrue(request.is_secure())
assert result is None
assert request.is_secure()
def _fake_request(self):
"""Construct a fake request with a referer and CSRF token over a secure connection. """

View File

@@ -24,5 +24,5 @@ class TestEnsureCsrfCookieCrossDomain(TestCase):
wrapped_view = ensure_csrf_cookie_cross_domain(fake_view)
response = wrapped_view(request)
response_meta = json.loads(response.content.decode('utf-8'))
self.assertEqual(response_meta['CROSS_DOMAIN_CSRF_COOKIE_USED'], True)
self.assertEqual(response_meta['CSRF_COOKIE_USED'], True)
assert response_meta['CROSS_DOMAIN_CSRF_COOKIE_USED'] is True
assert response_meta['CSRF_COOKIE_USED'] is True

View File

@@ -6,7 +6,7 @@ Tests for the CORS CSRF middleware
from mock import patch, Mock
import ddt
import six
import pytest
from django.test import TestCase
from django.test.utils import override_settings
from django.core.exceptions import MiddlewareNotUsed, ImproperlyConfigured
@@ -44,8 +44,8 @@ class TestCorsMiddlewareProcessRequest(TestCase):
with patch.object(CsrfViewMiddleware, 'process_view') as mock_method:
res = self.middleware.process_view(request, None, None, None)
self.assertIsNone(res)
self.assertFalse(mock_method.called)
assert res is None
assert not mock_method.called
def check_enabled(self, request):
"""
@@ -55,15 +55,15 @@ class TestCorsMiddlewareProcessRequest(TestCase):
"""
Check that the request doesn't pass (yet) the `is_secure()` test
"""
self.assertFalse(request.is_secure())
assert not request.is_secure()
return SENTINEL
with patch.object(CsrfViewMiddleware, 'process_view') as mock_method:
mock_method.side_effect = cb_check_req_is_secure_false
res = self.middleware.process_view(request, None, None, None)
self.assertIs(res, SENTINEL)
self.assertTrue(request.is_secure())
assert res is SENTINEL
assert request.is_secure()
@override_settings(CORS_ORIGIN_WHITELIST=['foo.com'])
def test_enabled(self):
@@ -75,7 +75,7 @@ class TestCorsMiddlewareProcessRequest(TestCase):
CORS_ORIGIN_WHITELIST=['foo.com']
)
def test_disabled_no_cors_headers(self):
with self.assertRaises(MiddlewareNotUsed):
with pytest.raises(MiddlewareNotUsed):
CorsCSRFMiddleware()
@override_settings(CORS_ORIGIN_WHITELIST=['bar.com'])
@@ -119,7 +119,7 @@ class TestCsrfCrossDomainCookieMiddleware(TestCase):
@override_settings(FEATURES={'ENABLE_CROSS_DOMAIN_CSRF_COOKIE': False})
def test_disabled_by_feature_flag(self):
with self.assertRaises(MiddlewareNotUsed):
with pytest.raises(MiddlewareNotUsed):
CsrfCrossDomainCookieMiddleware()
@ddt.data('CROSS_DOMAIN_CSRF_COOKIE_NAME', 'CROSS_DOMAIN_CSRF_COOKIE_DOMAIN')
@@ -132,7 +132,7 @@ class TestCsrfCrossDomainCookieMiddleware(TestCase):
del settings[missing_setting]
with override_settings(**settings):
with self.assertRaises(ImproperlyConfigured):
with pytest.raises(ImproperlyConfigured):
CsrfCrossDomainCookieMiddleware()
@override_settings(
@@ -262,7 +262,7 @@ class TestCsrfCrossDomainCookieMiddleware(TestCase):
def _assert_cookie_sent(self, response, is_set):
"""Check that the cross-domain CSRF cookie was sent. """
if is_set:
self.assertIn(self.COOKIE_NAME, response.cookies)
assert self.COOKIE_NAME in response.cookies
cookie_header = six.text_type(response.cookies[self.COOKIE_NAME])
# lint-amnesty, pylint: disable=bad-option-value, unicode-format-string
expected = six.u('Set-Cookie: {name}={value}; Domain={domain};').format(
@@ -270,8 +270,8 @@ class TestCsrfCrossDomainCookieMiddleware(TestCase):
value=self.COOKIE_VALUE,
domain=self.COOKIE_DOMAIN
)
self.assertIn(expected, cookie_header)
assert expected in cookie_header
# added lower function because in python 3 the value of cookie_header has Secure and secure in python 2
self.assertIn('Max-Age=31449600; Path=/; secure'.lower(), cookie_header.lower())
assert 'Max-Age=31449600; Path=/; secure'.lower() in cookie_header.lower()
else:
self.assertNotIn(self.COOKIE_NAME, response.cookies)
assert self.COOKIE_NAME not in response.cookies

View File

@@ -34,13 +34,13 @@ class XDomainProxyTest(TestCase):
def test_xdomain_proxy_disabled(self):
self._configure(False)
response = self._load_page()
self.assertEqual(response.status_code, 404)
assert response.status_code == 404
@ddt.data(None, [' '], [' ', ' '])
def test_xdomain_proxy_enabled_no_whitelist(self, whitelist):
self._configure(True, whitelist=whitelist)
response = self._load_page()
self.assertEqual(response.status_code, 404)
assert response.status_code == 404
@ddt.data(
(['example.com'], ['example.com']),

View File

@@ -28,7 +28,7 @@ class SelfPacedDueDatesTests(ModuleStoreTestCase): # lint-amnesty, pylint: disa
with patch.object(utils, 'get_expected_duration', return_value=timedelta(weeks=4)):
actual = [(idx, section.display_name, offset) for (idx, section, offset) in utils.spaced_out_sections(self.course)] # lint-amnesty, pylint: disable=line-too-long
self.assertEqual(actual, expected_sections)
assert actual == expected_sections
def test_hidden_sections(self):
for _ in range(2):
@@ -42,7 +42,7 @@ class SelfPacedDueDatesTests(ModuleStoreTestCase): # lint-amnesty, pylint: disa
with patch.object(utils, 'get_expected_duration', return_value=timedelta(weeks=4)):
actual = [(idx, section.display_name, offset) for (idx, section, offset) in utils.spaced_out_sections(self.course)] # lint-amnesty, pylint: disable=line-too-long
self.assertEqual(actual, expected_sections)
assert actual == expected_sections
def test_dates_for_ungraded_assignments(self):
"""
@@ -53,7 +53,7 @@ class SelfPacedDueDatesTests(ModuleStoreTestCase): # lint-amnesty, pylint: disa
sequence = ItemFactory(parent=self.course, category="sequential")
vertical = ItemFactory(parent=sequence, category="vertical")
sequence = modulestore().get_item(sequence.location)
self.assertEqual(_has_assignment_blocks(sequence), False)
assert _has_assignment_blocks(sequence) is False
# Ungraded problems do not count as assignment blocks
ItemFactory.create(
@@ -63,7 +63,7 @@ class SelfPacedDueDatesTests(ModuleStoreTestCase): # lint-amnesty, pylint: disa
weight=0,
)
sequence = modulestore().get_item(sequence.location)
self.assertEqual(_has_assignment_blocks(sequence), False)
assert _has_assignment_blocks(sequence) is False
ItemFactory.create(
parent=vertical,
category='problem',
@@ -71,7 +71,7 @@ class SelfPacedDueDatesTests(ModuleStoreTestCase): # lint-amnesty, pylint: disa
weight=1,
)
sequence = modulestore().get_item(sequence.location)
self.assertEqual(_has_assignment_blocks(sequence), False)
assert _has_assignment_blocks(sequence) is False
# Method will return true after adding a graded, scored assignment block
ItemFactory.create(
@@ -81,7 +81,7 @@ class SelfPacedDueDatesTests(ModuleStoreTestCase): # lint-amnesty, pylint: disa
weight=1,
)
sequence = modulestore().get_item(sequence.location)
self.assertEqual(_has_assignment_blocks(sequence), True)
assert _has_assignment_blocks(sequence) is True
def test_sequence_with_graded_and_ungraded_assignments(self):
"""

View File

@@ -62,13 +62,13 @@ class TestCohortOauth(SharedModuleStoreTestCase):
# Non-staff users should not have access to the API
response = self.client.get(path=path, **headers)
self.assertEqual(response.status_code, 403)
assert response.status_code == 403
# Staff users should have access to the API
user.is_staff = True
user.save()
response = self.client.get(path=path, **headers)
self.assertEqual(response.status_code, 200)
assert response.status_code == 200
def test_oauth_users(self):
""" Verify the endpoint supports OAuth, and only allows authorization for staff users. """
@@ -86,13 +86,13 @@ class TestCohortOauth(SharedModuleStoreTestCase):
# Non-staff users should not have access to the API
response = self.client.post(path=path, data=data, **headers)
self.assertEqual(response.status_code, 403)
assert response.status_code == 403
# Staff users should have access to the API
user.is_staff = True
user.save()
response = self.client.post(path=path, data=data, **headers)
self.assertEqual(response.status_code, 200)
assert response.status_code == 200
def test_oauth_csv(self):
""" Verify the endpoint supports OAuth, and only allows authorization for staff users. """
@@ -107,13 +107,13 @@ class TestCohortOauth(SharedModuleStoreTestCase):
# Non-staff users should not have access to the API
response = self.client.post(path=path, **headers)
self.assertEqual(response.status_code, 403)
assert response.status_code == 403
# Staff users should have access to the API
user.is_staff = True
user.save()
response = self.client.post(path=path, **headers)
self.assertEqual(response.status_code, 400)
assert response.status_code == 400
@skip_unless_lms

View File

@@ -3,7 +3,7 @@ Tests for cohorts
"""
# pylint: disable=no-member
import pytest
import ddt
from django.contrib.auth.models import AnonymousUser, User # lint-amnesty, pylint: disable=imported-auth-user
from django.db import IntegrityError
@@ -52,7 +52,7 @@ class TestCohortSignals(TestCase):
# Modify existing cohort
cohort.name = "NewName"
cohort.save()
self.assertFalse(mock_tracker.called)
assert not mock_tracker.called
# Add non-cohort group
CourseUserGroup.objects.create(
@@ -60,7 +60,7 @@ class TestCohortSignals(TestCase):
course_id=self.course_key,
group_type="dummy"
)
self.assertFalse(mock_tracker.called)
assert not mock_tracker.called
def test_cohort_membership_changed(self, mock_tracker):
cohort_list = [CohortFactory() for _ in range(2)]
@@ -108,7 +108,7 @@ class TestCohortSignals(TestCase):
# Clear users from non-cohort group
non_cohort.users.add(*user_list)
non_cohort.users.clear()
self.assertFalse(mock_tracker.emit.called)
assert not mock_tracker.emit.called
# Add cohorts to user
user_list[0].course_groups.add(*cohort_list)
@@ -129,7 +129,7 @@ class TestCohortSignals(TestCase):
# Clear non-cohort groups from user
user_list[0].course_groups.add(non_cohort)
user_list[0].course_groups.clear()
self.assertFalse(mock_tracker.emit.called)
assert not mock_tracker.emit.called
@ddt.ddt
@@ -159,15 +159,15 @@ class TestCohorts(ModuleStoreTestCase):
Make sure cohorts.is_course_cohorted() correctly reports if a course is cohorted or not.
"""
course = modulestore().get_course(self.toy_course_key)
self.assertFalse(cohorts.is_course_cohorted(course.id))
assert not cohorts.is_course_cohorted(course.id)
config_course_cohorts(course, is_cohorted=True)
self.assertTrue(cohorts.is_course_cohorted(course.id))
assert cohorts.is_course_cohorted(course.id)
# Make sure we get a Http404 if there's no course
fake_key = CourseLocator('a', 'b', 'c')
self.assertRaises(Http404, lambda: cohorts.is_course_cohorted(fake_key))
pytest.raises(Http404, (lambda: cohorts.is_course_cohorted(fake_key)))
def test_get_cohort_id(self):
"""
@@ -175,19 +175,16 @@ class TestCohorts(ModuleStoreTestCase):
invalid course key.
"""
course = modulestore().get_course(self.toy_course_key)
self.assertFalse(cohorts.is_course_cohorted(course.id))
assert not cohorts.is_course_cohorted(course.id)
user = UserFactory(username="test", email="a@b.com")
self.assertIsNone(cohorts.get_cohort_id(user, course.id))
assert cohorts.get_cohort_id(user, course.id) is None
config_course_cohorts(course, is_cohorted=True)
cohort = CohortFactory(course_id=course.id, name="TestCohort", users=[user])
self.assertEqual(cohorts.get_cohort_id(user, course.id), cohort.id)
assert cohorts.get_cohort_id(user, course.id) == cohort.id
self.assertRaises(
Http404,
lambda: cohorts.get_cohort_id(user, CourseLocator("course", "does_not", "exist"))
)
pytest.raises(Http404, (lambda: cohorts.get_cohort_id(user, CourseLocator('course', 'does_not', 'exist'))))
def test_assignment_type(self):
"""
@@ -201,13 +198,13 @@ class TestCohorts(ModuleStoreTestCase):
self._create_cohort(course.id, "TestCohort2", CourseCohort.RANDOM)
cohort3 = self._create_cohort(course.id, "TestCohort3", CourseCohort.MANUAL)
self.assertEqual(cohorts.get_assignment_type(cohort1), CourseCohort.RANDOM)
assert cohorts.get_assignment_type(cohort1) == CourseCohort.RANDOM
cohorts.set_assignment_type(cohort1, CourseCohort.MANUAL)
self.assertEqual(cohorts.get_assignment_type(cohort1), CourseCohort.MANUAL)
assert cohorts.get_assignment_type(cohort1) == CourseCohort.MANUAL
cohorts.set_assignment_type(cohort3, CourseCohort.RANDOM)
self.assertEqual(cohorts.get_assignment_type(cohort3), CourseCohort.RANDOM)
assert cohorts.get_assignment_type(cohort3) == CourseCohort.RANDOM
def test_cannot_set_assignment_type(self):
"""
@@ -217,48 +214,39 @@ class TestCohorts(ModuleStoreTestCase):
cohort = self._create_cohort(course.id, "TestCohort", CourseCohort.RANDOM)
self.assertEqual(cohorts.get_assignment_type(cohort), CourseCohort.RANDOM)
assert cohorts.get_assignment_type(cohort) == CourseCohort.RANDOM
exception_msg = "There must be one cohort to which students can automatically be assigned."
with self.assertRaises(ValueError) as context_manager:
with pytest.raises(ValueError) as context_manager:
cohorts.set_assignment_type(cohort, CourseCohort.MANUAL)
self.assertEqual(exception_msg, str(context_manager.exception))
assert exception_msg == str(context_manager.value)
def test_get_cohort(self):
"""
Make sure cohorts.get_cohort() does the right thing when the course is cohorted
"""
course = modulestore().get_course(self.toy_course_key)
self.assertEqual(course.id, self.toy_course_key)
self.assertFalse(cohorts.is_course_cohorted(course.id))
assert course.id == self.toy_course_key
assert not cohorts.is_course_cohorted(course.id)
user = UserFactory(username="test", email="a@b.com")
other_user = UserFactory(username="test2", email="a2@b.com")
self.assertIsNone(cohorts.get_cohort(user, course.id), "No cohort created yet")
assert cohorts.get_cohort(user, course.id) is None, 'No cohort created yet'
cohort = CohortFactory(course_id=course.id, name="TestCohort", users=[user])
self.assertIsNone(
cohorts.get_cohort(user, course.id),
"Course isn't cohorted, so shouldn't have a cohort"
)
assert cohorts.get_cohort(user, course.id) is None, "Course isn't cohorted, so shouldn't have a cohort"
# Make the course cohorted...
config_course_cohorts(course, is_cohorted=True)
self.assertEqual(
cohorts.get_cohort(user, course.id).id,
cohort.id,
"user should be assigned to the correct cohort"
)
assert cohorts.get_cohort(user, course.id).id == cohort.id, 'user should be assigned to the correct cohort'
self.assertEqual(
cohorts.get_cohort(other_user, course.id).id,
cohorts.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).id,
"other_user should be assigned to the default cohort"
)
assert cohorts.get_cohort(other_user, course.id).id == cohorts\
.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).id,\
'other_user should be assigned to the default cohort'
def test_get_cohort_preassigned_user(self):
"""
@@ -272,18 +260,11 @@ class TestCohorts(ModuleStoreTestCase):
# Add email address to the cohort
(user, previous_cohort, prereg) = cohorts.add_user_to_cohort(cohort, "email@example.com")
self.assertEqual(
(user, previous_cohort, prereg),
(None, None, True)
)
assert (user, previous_cohort, prereg) == (None, None, True)
# Create user with this email address
user = UserFactory(username="test", email="email@example.com")
self.assertEqual(
cohorts.get_cohort(user, course.id).id,
cohort.id,
"User should be assigned to the right cohort"
)
assert cohorts.get_cohort(user, course.id).id == cohort.id, 'User should be assigned to the right cohort'
def test_get_cohort_multiple_preassignments(self):
"""
@@ -298,25 +279,15 @@ class TestCohorts(ModuleStoreTestCase):
# Add email address to the first cohort
(user, previous_cohort, prereg) = cohorts.add_user_to_cohort(cohort, "email@example.com")
self.assertEqual(
(user, previous_cohort, prereg),
(None, None, True)
)
assert (user, previous_cohort, prereg) == (None, None, True)
# Add email address to the second cohort
(user, previous_cohort, prereg) = cohorts.add_user_to_cohort(cohort2, "email@example.com")
self.assertEqual(
(user, previous_cohort, prereg),
(None, None, True)
)
assert (user, previous_cohort, prereg) == (None, None, True)
# Create user with this email address
user = UserFactory(username="test", email="email@example.com")
self.assertEqual(
cohorts.get_cohort(user, course.id).id,
cohort2.id,
"User should be assigned to the right cohort"
)
assert cohorts.get_cohort(user, course.id).id == cohort2.id, 'User should be assigned to the right cohort'
@ddt.data(
(True, 2),
@@ -342,7 +313,7 @@ class TestCohorts(ModuleStoreTestCase):
assigned to a user instead of assigning/creating a group automatically
"""
course = modulestore().get_course(self.toy_course_key)
self.assertFalse(cohorts.is_course_cohorted(course.id))
assert not cohorts.is_course_cohorted(course.id)
user = UserFactory(username="test", email="a@b.com")
@@ -354,10 +325,10 @@ class TestCohorts(ModuleStoreTestCase):
)
# get_cohort should return None as no group is assigned to user
self.assertIsNone(cohorts.get_cohort(user, course.id, assign=False))
assert cohorts.get_cohort(user, course.id, assign=False) is None
# get_cohort should return a group for user
self.assertEqual(cohorts.get_cohort(user, course.id).name, "AutoGroup")
assert cohorts.get_cohort(user, course.id).name == 'AutoGroup'
def test_cohorting_with_auto_cohorts(self):
"""
@@ -365,7 +336,7 @@ class TestCohorts(ModuleStoreTestCase):
If there are auto cohort groups then a user should be assigned one.
"""
course = modulestore().get_course(self.toy_course_key)
self.assertFalse(cohorts.is_course_cohorted(course.id))
assert not cohorts.is_course_cohorted(course.id)
user1 = UserFactory(username="test", email="a@b.com")
user2 = UserFactory(username="test2", email="a2@b.com")
@@ -379,9 +350,9 @@ class TestCohorts(ModuleStoreTestCase):
auto_cohorts=["AutoGroup"]
)
self.assertEqual(cohorts.get_cohort(user1, course.id).id, cohort.id, "user1 should stay put")
assert cohorts.get_cohort(user1, course.id).id == cohort.id, 'user1 should stay put'
self.assertEqual(cohorts.get_cohort(user2, course.id).name, "AutoGroup", "user2 should be auto-cohorted")
assert cohorts.get_cohort(user2, course.id).name == 'AutoGroup', 'user2 should be auto-cohorted'
def test_anonymous_user_cohort(self):
"""
@@ -390,14 +361,14 @@ class TestCohorts(ModuleStoreTestCase):
course = modulestore().get_course(self.toy_course_key)
# verify cohorts is None when course is not cohorted
self.assertIsNone(cohorts.get_cohort(AnonymousUser(), course.id))
assert cohorts.get_cohort(AnonymousUser(), course.id) is None
config_course_cohorts(
course,
is_cohorted=True,
auto_cohorts=["AutoGroup"]
)
self.assertIsNone(cohorts.get_cohort(AnonymousUser(), course.id))
assert cohorts.get_cohort(AnonymousUser(), course.id) is None
def test_cohorting_with_migrations_done(self):
"""
@@ -416,7 +387,7 @@ class TestCohorts(ModuleStoreTestCase):
auto_cohorts=["AutoGroup"]
)
self.assertEqual(cohorts.get_cohort(user1, course.id).name, "AutoGroup", "user1 should be auto-cohorted")
assert cohorts.get_cohort(user1, course.id).name == 'AutoGroup', 'user1 should be auto-cohorted'
# Now set the auto_cohort_group to something different
# This will have no effect on lms side as we are already done with migrations
@@ -426,13 +397,10 @@ class TestCohorts(ModuleStoreTestCase):
auto_cohort_groups=["OtherGroup"]
)
self.assertEqual(
cohorts.get_cohort(user2, course.id).name, "AutoGroup", "user2 should be assigned to AutoGroups"
)
assert cohorts.get_cohort(user2, course.id).name == 'AutoGroup', 'user2 should be assigned to AutoGroups'
self.assertEqual(
cohorts.get_cohort(user1, course.id).name, "AutoGroup", "user1 should still be in originally placed cohort"
)
assert cohorts.get_cohort(user1, course.id).name == 'AutoGroup',\
'user1 should still be in originally placed cohort'
def test_cohorting_with_no_auto_cohorts(self):
"""
@@ -442,7 +410,7 @@ class TestCohorts(ModuleStoreTestCase):
not be reflected on lms after the migrations are done.
"""
course = modulestore().get_course(self.toy_course_key)
self.assertFalse(cohorts.is_course_cohorted(course.id))
assert not cohorts.is_course_cohorted(course.id)
user1 = UserFactory(username="test", email="a@b.com")
user2 = UserFactory(username="test2", email="a2@b.com")
@@ -454,11 +422,8 @@ class TestCohorts(ModuleStoreTestCase):
auto_cohorts=[]
)
self.assertEqual(
cohorts.get_cohort(user1, course.id).id,
cohorts.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).id,
"No groups->default cohort for user1"
)
assert cohorts.get_cohort(user1, course.id).id == cohorts\
.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).id, 'No groups->default cohort for user1'
# Add an auto_cohort_group to the course
# This will have no effect on lms side as we are already done with migrations
@@ -468,24 +433,19 @@ class TestCohorts(ModuleStoreTestCase):
auto_cohort_groups=["AutoGroup"]
)
self.assertEqual(
cohorts.get_cohort(user1, course.id).name,
cohorts.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).name,
"user1 should still be in the default cohort"
)
assert cohorts.get_cohort(user1, course.id).name == cohorts\
.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).name,\
'user1 should still be in the default cohort'
self.assertEqual(
cohorts.get_cohort(user2, course.id).id,
cohorts.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).id,
"No groups->default cohort for user2"
)
assert cohorts.get_cohort(user2, course.id).id == cohorts\
.get_cohort_by_name(course.id, cohorts.DEFAULT_COHORT_NAME).id, 'No groups->default cohort for user2'
def test_auto_cohorting_randomization(self):
"""
Make sure cohorts.get_cohort() randomizes properly.
"""
course = modulestore().get_course(self.toy_course_key)
self.assertFalse(cohorts.is_course_cohorted(course.id))
assert not cohorts.is_course_cohorted(course.id)
groups = ["group_{0}".format(n) for n in range(5)]
config_course_cohorts(
@@ -507,8 +467,8 @@ class TestCohorts(ModuleStoreTestCase):
for cohort_name in groups:
cohort = cohorts.get_cohort_by_name(course.id, cohort_name)
num_users = cohort.users.count()
self.assertGreater(num_users, 1)
self.assertLess(num_users, 50)
assert num_users > 1
assert num_users < 50
def test_get_course_cohorts_noop(self):
"""
@@ -516,7 +476,7 @@ class TestCohorts(ModuleStoreTestCase):
"""
course = modulestore().get_course(self.toy_course_key)
config_course_cohorts(course, is_cohorted=True)
self.assertEqual([], cohorts.get_course_cohorts(course))
assert [] == cohorts.get_course_cohorts(course)
def test_get_course_cohorts(self):
"""
@@ -534,16 +494,13 @@ class TestCohorts(ModuleStoreTestCase):
CohortFactory(course_id=course.id, name="ManualCohort2")
cohort_set = {c.name for c in cohorts.get_course_cohorts(course)}
self.assertEqual(cohort_set, {"AutoGroup1", "AutoGroup2", "ManualCohort", "ManualCohort2"})
assert cohort_set == {'AutoGroup1', 'AutoGroup2', 'ManualCohort', 'ManualCohort2'}
def test_get_cohort_names(self):
course = modulestore().get_course(self.toy_course_key)
cohort1 = CohortFactory(course_id=course.id, name="Cohort1")
cohort2 = CohortFactory(course_id=course.id, name="Cohort2")
self.assertEqual(
cohorts.get_cohort_names(course),
{cohort1.id: cohort1.name, cohort2.id: cohort2.name}
)
assert cohorts.get_cohort_names(course) == {cohort1.id: cohort1.name, cohort2.id: cohort2.name}
def test_get_cohort_by_name(self):
"""
@@ -552,19 +509,15 @@ class TestCohorts(ModuleStoreTestCase):
"""
course = modulestore().get_course(self.toy_course_key)
self.assertRaises(
CourseUserGroup.DoesNotExist,
lambda: cohorts.get_cohort_by_name(course.id, "CohortDoesNotExist")
)
pytest.raises(CourseUserGroup.DoesNotExist,
(lambda: cohorts.get_cohort_by_name(course.id, 'CohortDoesNotExist')))
cohort = CohortFactory(course_id=course.id, name="MyCohort")
self.assertEqual(cohorts.get_cohort_by_name(course.id, "MyCohort"), cohort)
assert cohorts.get_cohort_by_name(course.id, 'MyCohort') == cohort
self.assertRaises(
CourseUserGroup.DoesNotExist,
lambda: cohorts.get_cohort_by_name(CourseLocator("course", "does_not", "exist"), cohort)
)
pytest.raises(CourseUserGroup.DoesNotExist,
(lambda: cohorts.get_cohort_by_name(CourseLocator('course', 'does_not', 'exist'), cohort)))
def test_get_cohort_by_id(self):
"""
@@ -574,14 +527,11 @@ class TestCohorts(ModuleStoreTestCase):
course = modulestore().get_course(self.toy_course_key)
cohort = CohortFactory(course_id=course.id, name="MyCohort")
self.assertEqual(cohorts.get_cohort_by_id(course.id, cohort.id), cohort)
assert cohorts.get_cohort_by_id(course.id, cohort.id) == cohort
cohort.delete()
self.assertRaises(
CourseUserGroup.DoesNotExist,
lambda: cohorts.get_cohort_by_id(course.id, cohort.id)
)
pytest.raises(CourseUserGroup.DoesNotExist, (lambda: cohorts.get_cohort_by_id(course.id, cohort.id)))
@patch("openedx.core.djangoapps.course_groups.cohorts.tracker")
def test_add_cohort(self, mock_tracker):
@@ -597,16 +547,10 @@ class TestCohorts(ModuleStoreTestCase):
{"cohort_name": added_cohort.name, "cohort_id": added_cohort.id}
)
self.assertEqual(added_cohort.name, "My Cohort")
self.assertRaises(
ValueError,
lambda: cohorts.add_cohort(course.id, "My Cohort", assignment_type)
)
assert added_cohort.name == 'My Cohort'
pytest.raises(ValueError, (lambda: cohorts.add_cohort(course.id, 'My Cohort', assignment_type)))
does_not_exist_course_key = CourseLocator("course", "does_not", "exist")
self.assertRaises(
ValueError,
lambda: cohorts.add_cohort(does_not_exist_course_key, "My Cohort", assignment_type)
)
pytest.raises(ValueError, (lambda: cohorts.add_cohort(does_not_exist_course_key, 'My Cohort', assignment_type)))
@patch("openedx.core.djangoapps.course_groups.cohorts.tracker")
@patch("openedx.core.djangoapps.course_groups.cohorts.COHORT_MEMBERSHIP_UPDATED")
@@ -628,10 +572,7 @@ class TestCohorts(ModuleStoreTestCase):
# Success cases
# We shouldn't get back a previous cohort, since the user wasn't in one
self.assertEqual(
cohorts.add_user_to_cohort(first_cohort, "Username"),
(course_user, None, False)
)
assert cohorts.add_user_to_cohort(first_cohort, 'Username') == (course_user, None, False)
mock_tracker.emit.assert_any_call(
"edx.cohort.user_add_requested",
{
@@ -646,10 +587,7 @@ class TestCohorts(ModuleStoreTestCase):
# Should get (user, previous_cohort_name) when moved from one cohort to
# another
self.assertEqual(
cohorts.add_user_to_cohort(second_cohort, "Username"),
(course_user, "FirstCohort", False)
)
assert cohorts.add_user_to_cohort(second_cohort, 'Username') == (course_user, 'FirstCohort', False)
mock_tracker.emit.assert_any_call(
"edx.cohort.user_add_requested",
{
@@ -665,10 +603,7 @@ class TestCohorts(ModuleStoreTestCase):
# Should preregister email address for a cohort if an email address
# not associated with a user is added
(user, previous_cohort, prereg) = cohorts.add_user_to_cohort(first_cohort, "new_email@example.com")
self.assertEqual(
(user, previous_cohort, prereg),
(None, None, True)
)
assert (user, previous_cohort, prereg) == (None, None, True)
mock_tracker.emit.assert_any_call(
"edx.cohort.email_address_preassigned",
{
@@ -680,15 +615,9 @@ class TestCohorts(ModuleStoreTestCase):
# Error cases
# Should get ValueError if user already in cohort
self.assertRaises(
ValueError,
lambda: cohorts.add_user_to_cohort(second_cohort, "Username")
)
pytest.raises(ValueError, (lambda: cohorts.add_user_to_cohort(second_cohort, 'Username')))
# UserDoesNotExist if user truly does not exist
self.assertRaises(
User.DoesNotExist,
lambda: cohorts.add_user_to_cohort(first_cohort, "non_existent_username")
)
pytest.raises(User.DoesNotExist, (lambda: cohorts.add_user_to_cohort(first_cohort, 'non_existent_username')))
def test_set_cohorted_with_invalid_data_type(self):
"""
@@ -696,10 +625,10 @@ class TestCohorts(ModuleStoreTestCase):
"""
course = modulestore().get_course(self.toy_course_key)
with self.assertRaises(ValueError) as value_error:
with pytest.raises(ValueError) as value_error:
cohorts.set_course_cohorted(course.id, 'not a boolean')
self.assertEqual("Cohorted must be a boolean", text_type(value_error.exception))
assert 'Cohorted must be a boolean' == text_type(value_error.value)
@ddt.ddt
@@ -742,10 +671,7 @@ class TestCohortsAndPartitionGroups(ModuleStoreTestCase):
Basic test of the partition_group_info accessor function
"""
# api should return nothing for an unmapped cohort
self.assertEqual(
cohorts.get_group_info_for_cohort(self.first_cohort),
(None, None),
)
assert cohorts.get_group_info_for_cohort(self.first_cohort) == (None, None)
# create a link for the cohort in the db
link = self._link_cohort_partition_group(
self.first_cohort,
@@ -753,17 +679,11 @@ class TestCohortsAndPartitionGroups(ModuleStoreTestCase):
self.group1_id
)
# api should return the specified partition and group
self.assertEqual(
cohorts.get_group_info_for_cohort(self.first_cohort),
(self.group1_id, self.partition_id)
)
assert cohorts.get_group_info_for_cohort(self.first_cohort) == (self.group1_id, self.partition_id)
# delete the link in the db
link.delete()
# api should return nothing again
self.assertEqual(
cohorts.get_group_info_for_cohort(self.first_cohort),
(None, None),
)
assert cohorts.get_group_info_for_cohort(self.first_cohort) == (None, None)
@ddt.data(
(True, 1),
@@ -782,7 +702,7 @@ class TestCohortsAndPartitionGroups(ModuleStoreTestCase):
)
with self.assertNumQueries(num_sql_queries):
for __ in range(3):
self.assertIsNotNone(cohorts.get_group_info_for_cohort(self.first_cohort, use_cached=use_cached))
assert cohorts.get_group_info_for_cohort(self.first_cohort, use_cached=use_cached) is not None
def test_multiple_cohorts(self):
"""
@@ -798,14 +718,9 @@ class TestCohortsAndPartitionGroups(ModuleStoreTestCase):
self.partition_id,
self.group1_id,
)
self.assertEqual(
cohorts.get_group_info_for_cohort(self.first_cohort),
(self.group1_id, self.partition_id),
)
self.assertEqual(
cohorts.get_group_info_for_cohort(self.second_cohort),
cohorts.get_group_info_for_cohort(self.first_cohort),
)
assert cohorts.get_group_info_for_cohort(self.first_cohort) == (self.group1_id, self.partition_id)
assert cohorts.get_group_info_for_cohort(self.second_cohort) ==\
cohorts.get_group_info_for_cohort(self.first_cohort)
def test_multiple_partition_groups(self):
"""
@@ -816,7 +731,7 @@ class TestCohortsAndPartitionGroups(ModuleStoreTestCase):
self.partition_id,
self.group1_id,
)
with self.assertRaises(IntegrityError):
with pytest.raises(IntegrityError):
self._link_cohort_partition_group(
self.first_cohort,
self.partition_id,
@@ -833,19 +748,13 @@ class TestCohortsAndPartitionGroups(ModuleStoreTestCase):
self.partition_id,
self.group1_id
)
self.assertEqual(
cohorts.get_group_info_for_cohort(self.first_cohort),
(self.group1_id, self.partition_id)
)
assert cohorts.get_group_info_for_cohort(self.first_cohort) == (self.group1_id, self.partition_id)
# delete the link
self.first_cohort.delete()
# api should return nothing at that point
self.assertEqual(
cohorts.get_group_info_for_cohort(self.first_cohort),
(None, None),
)
assert cohorts.get_group_info_for_cohort(self.first_cohort) == (None, None)
# link should no longer exist because of delete cascade
with self.assertRaises(CourseUserGroupPartitionGroup.DoesNotExist):
with pytest.raises(CourseUserGroupPartitionGroup.DoesNotExist):
CourseUserGroupPartitionGroup.objects.get(
course_user_group_id=self.first_cohort.id
)
@@ -876,13 +785,13 @@ class TestUnregisteredLearnerCohortAssignments(TestCase):
field='email'
)
self.assertTrue(was_retired)
assert was_retired
search_retired_user_results = \
UnregisteredLearnerCohortAssignments.objects.filter(
email=self.cohort_assignment.email
)
self.assertFalse(search_retired_user_results)
assert not search_retired_user_results
def test_retired_user_with_no_cohort_returns_false(self):
known_learner_email = self.cohort_assignment.email
@@ -891,5 +800,5 @@ class TestUnregisteredLearnerCohortAssignments(TestCase):
field='email'
)
self.assertFalse(was_retired)
self.assertEqual(self.cohort_assignment.email, known_learner_email)
assert not was_retired
assert self.cohort_assignment.email == known_learner_email

View File

@@ -56,15 +56,10 @@ class TestCohortPartitionScheme(ModuleStoreTestCase):
Utility for checking that our test student comes up as assigned to the
specified partition (or, if None, no partition at all)
"""
self.assertEqual(
CohortPartitionScheme.get_group_for_user(
self.course_key,
self.student,
partition or self.user_partition,
use_cached=False
),
group
)
assert CohortPartitionScheme.get_group_for_user(self.course_key,
self.student,
(partition or self.user_partition),
use_cached=False) == group
def test_student_cohort_assignment(self):
"""
@@ -228,7 +223,7 @@ class TestCohortPartitionScheme(ModuleStoreTestCase):
# warning)
with patch('openedx.core.djangoapps.course_groups.partition_scheme.log') as mock_log:
self.assert_student_in_group(None, new_user_partition)
self.assertTrue(mock_log.warning.called)
assert mock_log.warning.called
self.assertRegex(mock_log.warning.call_args[0][0], 'group not found')
def test_missing_partition(self):
@@ -253,7 +248,7 @@ class TestCohortPartitionScheme(ModuleStoreTestCase):
# scheme returns None (and logs a warning).
with patch('openedx.core.djangoapps.course_groups.partition_scheme.log') as mock_log:
self.assert_student_in_group(None, new_user_partition)
self.assertTrue(mock_log.warning.called)
assert mock_log.warning.called
self.assertRegex(mock_log.warning.call_args[0][0], 'partition mismatch')
@@ -264,7 +259,7 @@ class TestExtension(django.test.TestCase):
"""
def test_get_scheme(self):
self.assertEqual(UserPartition.get_scheme('cohort'), CohortPartitionScheme)
assert UserPartition.get_scheme('cohort') == CohortPartitionScheme
with self.assertRaisesRegex(UserPartitionError, 'Unrecognized scheme'):
UserPartition.get_scheme('other')
@@ -316,14 +311,14 @@ class TestGetCohortedUserPartition(ModuleStoreTestCase):
self.course.user_partitions.append(self.random_user_partition)
self.course.user_partitions.append(self.cohort_user_partition)
self.course.user_partitions.append(self.second_cohort_user_partition)
self.assertEqual(self.cohort_user_partition, get_cohorted_user_partition(self.course))
assert self.cohort_user_partition == get_cohorted_user_partition(self.course)
def test_no_cohort_user_partitions(self):
"""
Test get_cohorted_user_partition returns None when there are no cohorted user partitions.
"""
self.course.user_partitions.append(self.random_user_partition)
self.assertIsNone(get_cohorted_user_partition(self.course))
assert get_cohorted_user_partition(self.course) is None
class TestMasqueradedGroup(StaffMasqueradeTestCase):
@@ -350,10 +345,7 @@ class TestMasqueradedGroup(StaffMasqueradeTestCase):
)
scheme = self.user_partition.scheme
self.assertEqual(
scheme.get_group_for_user(self.course.id, self.test_user, self.user_partition),
group
)
assert scheme.get_group_for_user(self.course.id, self.test_user, self.user_partition) == group
def _verify_masquerade_for_all_groups(self):
"""

View File

@@ -7,7 +7,7 @@ Tests for course group views
import json
from collections import namedtuple
import pytest
import six
from six.moves import range
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
@@ -110,7 +110,7 @@ class CohortViewsTestCase(ModuleStoreTestCase):
response = handler(request, six.text_type(course.id), cohort.id)
else:
response = handler(request, six.text_type(course.id))
self.assertEqual(response.status_code, expected_response_code)
assert response.status_code == expected_response_code
return json.loads(response.content.decode('utf-8'))
def put_handler(self, course, cohort=None, data=None, expected_response_code=200, handler=cohort_handler):
@@ -126,7 +126,7 @@ class CohortViewsTestCase(ModuleStoreTestCase):
response = handler(request, six.text_type(course.id), cohort.id)
else:
response = handler(request, six.text_type(course.id))
self.assertEqual(response.status_code, expected_response_code)
assert response.status_code == expected_response_code
return json.loads(response.content.decode('utf-8'))
def patch_handler(self, course, cohort=None, data=None, expected_response_code=200, handler=cohort_handler):
@@ -143,7 +143,7 @@ class CohortViewsTestCase(ModuleStoreTestCase):
response = handler(request, six.text_type(course.id), cohort.id)
else:
response = handler(request, six.text_type(course.id))
self.assertEqual(response.status_code, expected_response_code)
assert response.status_code == expected_response_code
return json.loads(response.content.decode('utf-8'))
@@ -178,12 +178,12 @@ class CourseCohortSettingsHandlerTestCase(CohortViewsTestCase):
expected_response = self.get_expected_response()
self.assertEqual(response, expected_response)
assert response == expected_response
expected_response['is_cohorted'] = False
response = self.patch_handler(self.course, data=expected_response, handler=course_cohort_settings_handler)
self.assertEqual(response, expected_response)
assert response == expected_response
def test_enabling_cohorts_does_not_change_division_scheme(self):
"""
@@ -196,18 +196,14 @@ class CourseCohortSettingsHandlerTestCase(CohortViewsTestCase):
expected_response = self.get_expected_response()
expected_response['is_cohorted'] = False
self.assertEqual(response, expected_response)
self.assertEqual(
CourseDiscussionSettings.NONE, get_course_discussion_settings(self.course.id).division_scheme
)
assert response == expected_response
assert CourseDiscussionSettings.NONE == get_course_discussion_settings(self.course.id).division_scheme
expected_response['is_cohorted'] = True
response = self.patch_handler(self.course, data=expected_response, handler=course_cohort_settings_handler)
self.assertEqual(response, expected_response)
self.assertEqual(
CourseDiscussionSettings.NONE, get_course_discussion_settings(self.course.id).division_scheme
)
assert response == expected_response
assert CourseDiscussionSettings.NONE == get_course_discussion_settings(self.course.id).division_scheme
def test_update_settings_with_missing_field(self):
"""
@@ -216,7 +212,7 @@ class CourseCohortSettingsHandlerTestCase(CohortViewsTestCase):
config_course_cohorts(self.course, is_cohorted=True)
response = self.patch_handler(self.course, expected_response_code=400, handler=course_cohort_settings_handler)
self.assertEqual("Bad Request", response.get("error"))
assert 'Bad Request' == response.get('error')
def test_update_settings_with_invalid_field_data_type(self):
"""
@@ -230,10 +226,7 @@ class CourseCohortSettingsHandlerTestCase(CohortViewsTestCase):
expected_response_code=400,
handler=course_cohort_settings_handler
)
self.assertEqual(
"Cohorted must be a boolean",
response.get("error")
)
assert 'Cohorted must be a boolean' == response.get('error')
class CohortHandlerTestCase(CohortViewsTestCase):
@@ -259,20 +252,9 @@ class CohortHandlerTestCase(CohortViewsTestCase):
if response_dict is None:
response_dict = self.get_handler(self.course, user=user)
self.assertEqual(
response_dict.get("cohorts"),
[
{
"name": cohort.name,
"id": cohort.id,
"user_count": cohort.user_count,
"assignment_type": cohort.assignment_type,
"user_partition_id": None,
"group_id": None
}
for cohort in expected_cohorts
]
)
assert response_dict.get('cohorts') == [{'name': cohort.name, 'id': cohort.id, 'user_count': cohort.user_count,
'assignment_type': cohort.assignment_type, 'user_partition_id': None,
'group_id': None} for cohort in expected_cohorts]
@staticmethod
def create_expected_cohort(cohort, user_count, assignment_type, user_partition_id=None, group_id=None):
@@ -376,7 +358,7 @@ class CohortHandlerTestCase(CohortViewsTestCase):
# We should expect the DoesNotExist exception because above cohort config have
# no effect on lms side so as a result there will be no AutoGroup cohort present
with self.assertRaises(CourseUserGroup.DoesNotExist):
with pytest.raises(CourseUserGroup.DoesNotExist):
get_cohort_by_name(self.course.id, "AutoGroup")
def test_get_single_cohort(self):
@@ -385,17 +367,8 @@ class CohortHandlerTestCase(CohortViewsTestCase):
"""
self._create_cohorts()
response_dict = self.get_handler(self.course, self.cohort2)
self.assertEqual(
response_dict,
{
"name": self.cohort2.name,
"id": self.cohort2.id,
"user_count": 2,
"assignment_type": CourseCohort.MANUAL,
"user_partition_id": None,
"group_id": None
}
)
assert response_dict == {'name': self.cohort2.name, 'id': self.cohort2.id, 'user_count': 2,
'assignment_type': CourseCohort.MANUAL, 'user_partition_id': None, 'group_id': None}
############### Tests of adding a new cohort ###############
@@ -407,19 +380,13 @@ class CohortHandlerTestCase(CohortViewsTestCase):
Verifies that the cohort was created properly and the correct response was returned.
"""
created_cohort = get_cohort_by_name(self.course.id, cohort_name)
self.assertIsNotNone(created_cohort)
self.assertEqual(
response_dict,
{
"name": cohort_name,
"id": created_cohort.id,
"user_count": 0,
"assignment_type": assignment_type,
"user_partition_id": expected_user_partition_id,
"group_id": expected_group_id
}
)
self.assertEqual((expected_group_id, expected_user_partition_id), get_group_info_for_cohort(created_cohort))
assert created_cohort is not None
assert response_dict == {'name': cohort_name, 'id': created_cohort.id, 'user_count': 0,
'assignment_type': assignment_type,
'user_partition_id': expected_user_partition_id,
'group_id': expected_group_id}
assert (expected_group_id, expected_user_partition_id) == get_group_info_for_cohort(created_cohort)
def test_create_new_cohort(self):
"""
@@ -450,14 +417,14 @@ class CohortHandlerTestCase(CohortViewsTestCase):
Verify that we cannot create a cohort without specifying a name.
"""
response_dict = self.put_handler(self.course, expected_response_code=400)
self.assertEqual("Cohort name must be specified.", response_dict.get("error"))
assert 'Cohort name must be specified.' == response_dict.get('error')
def test_create_new_cohort_missing_assignment_type(self):
"""
Verify that we cannot create a cohort without specifying an assignment type.
"""
response_dict = self.put_handler(self.course, data={'name': 'COHORT NAME'}, expected_response_code=400)
self.assertEqual("Assignment type must be specified.", response_dict.get("error"))
assert 'Assignment type must be specified.' == response_dict.get('error')
def test_create_new_cohort_existing_name(self):
"""
@@ -468,7 +435,7 @@ class CohortHandlerTestCase(CohortViewsTestCase):
self.course, data={'name': self.cohort1.name, 'assignment_type': CourseCohort.MANUAL},
expected_response_code=400
)
self.assertEqual("You cannot create two cohorts with the same name", response_dict.get("error"))
assert 'You cannot create two cohorts with the same name' == response_dict.get('error')
def test_create_new_cohort_missing_user_partition_id(self):
"""
@@ -476,9 +443,7 @@ class CohortHandlerTestCase(CohortViewsTestCase):
"""
data = {'name': "Cohort missing user_partition_id", 'assignment_type': CourseCohort.MANUAL, 'group_id': 2}
response_dict = self.put_handler(self.course, data=data, expected_response_code=400)
self.assertEqual(
"If group_id is specified, user_partition_id must also be specified.", response_dict.get("error")
)
assert 'If group_id is specified, user_partition_id must also be specified.' == response_dict.get('error')
############### Tests of updating an existing cohort ###############
@@ -490,9 +455,9 @@ class CohortHandlerTestCase(CohortViewsTestCase):
updated_name = self.cohort1.name + "_updated"
data = {'name': updated_name, 'assignment_type': CourseCohort.MANUAL}
response_dict = self.put_handler(self.course, self.cohort1, data=data)
self.assertEqual(updated_name, get_cohort_by_id(self.course.id, self.cohort1.id).name)
self.assertEqual(updated_name, response_dict.get("name"))
self.assertEqual(CourseCohort.MANUAL, response_dict.get("assignment_type"))
assert updated_name == get_cohort_by_id(self.course.id, self.cohort1.id).name
assert updated_name == response_dict.get('name')
assert CourseCohort.MANUAL == response_dict.get('assignment_type')
def test_update_random_cohort_name(self):
"""
@@ -503,8 +468,8 @@ class CohortHandlerTestCase(CohortViewsTestCase):
data = {'name': cohort_name, 'assignment_type': CourseCohort.RANDOM}
response_dict = self.put_handler(self.course, data=data)
self.assertEqual(cohort_name, response_dict.get("name"))
self.assertEqual(CourseCohort.RANDOM, response_dict.get("assignment_type"))
assert cohort_name == response_dict.get('name')
assert CourseCohort.RANDOM == response_dict.get('assignment_type')
# Update the newly created random cohort
newly_created_cohort = get_cohort_by_name(self.course.id, cohort_name)
@@ -512,9 +477,9 @@ class CohortHandlerTestCase(CohortViewsTestCase):
data = {'name': cohort_name, 'assignment_type': CourseCohort.RANDOM}
response_dict = self.put_handler(self.course, newly_created_cohort, data=data)
self.assertEqual(cohort_name, get_cohort_by_id(self.course.id, newly_created_cohort.id).name)
self.assertEqual(cohort_name, response_dict.get("name"))
self.assertEqual(CourseCohort.RANDOM, response_dict.get("assignment_type"))
assert cohort_name == get_cohort_by_id(self.course.id, newly_created_cohort.id).name
assert cohort_name == response_dict.get('name')
assert CourseCohort.RANDOM == response_dict.get('assignment_type')
def test_cannot_update_assignment_type_of_single_random_cohort(self):
"""
@@ -525,23 +490,21 @@ class CohortHandlerTestCase(CohortViewsTestCase):
data = {'name': cohort_name, 'assignment_type': CourseCohort.RANDOM}
response_dict = self.put_handler(self.course, data=data)
self.assertEqual(cohort_name, response_dict.get("name"))
self.assertEqual(CourseCohort.RANDOM, response_dict.get("assignment_type"))
assert cohort_name == response_dict.get('name')
assert CourseCohort.RANDOM == response_dict.get('assignment_type')
# Try to update the assignment type of newly created random cohort
cohort = get_cohort_by_name(self.course.id, cohort_name)
data = {'name': cohort_name, 'assignment_type': CourseCohort.MANUAL}
response_dict = self.put_handler(self.course, cohort, data=data, expected_response_code=400)
self.assertEqual(
'There must be one cohort to which students can automatically be assigned.', response_dict.get("error")
)
assert 'There must be one cohort to which students can automatically be assigned.' == response_dict.get('error')
def test_update_cohort_group_id(self):
"""
Test that it is possible to update the user_partition_id/group_id of an existing cohort.
"""
self._create_cohorts()
self.assertEqual((None, None), get_group_info_for_cohort(self.cohort1))
assert (None, None) == get_group_info_for_cohort(self.cohort1)
data = {
'name': self.cohort1.name,
'assignment_type': CourseCohort.MANUAL,
@@ -549,11 +512,11 @@ class CohortHandlerTestCase(CohortViewsTestCase):
'user_partition_id': 3
}
response_dict = self.put_handler(self.course, self.cohort1, data=data)
self.assertEqual((2, 3), get_group_info_for_cohort(self.cohort1))
self.assertEqual(2, response_dict.get("group_id"))
self.assertEqual(3, response_dict.get("user_partition_id"))
assert (2, 3) == get_group_info_for_cohort(self.cohort1)
assert 2 == response_dict.get('group_id')
assert 3 == response_dict.get('user_partition_id')
# Check that the name didn't change.
self.assertEqual(self.cohort1.name, response_dict.get("name"))
assert self.cohort1.name == response_dict.get('name')
def test_update_cohort_remove_group_id(self):
"""
@@ -561,12 +524,12 @@ class CohortHandlerTestCase(CohortViewsTestCase):
"""
self._create_cohorts()
link_cohort_to_partition_group(self.cohort1, 5, 0)
self.assertEqual((0, 5), get_group_info_for_cohort(self.cohort1))
assert (0, 5) == get_group_info_for_cohort(self.cohort1)
data = {'name': self.cohort1.name, 'assignment_type': CourseCohort.RANDOM, 'group_id': None}
response_dict = self.put_handler(self.course, self.cohort1, data=data)
self.assertEqual((None, None), get_group_info_for_cohort(self.cohort1))
self.assertIsNone(response_dict.get("group_id"))
self.assertIsNone(response_dict.get("user_partition_id"))
assert (None, None) == get_group_info_for_cohort(self.cohort1)
assert response_dict.get('group_id') is None
assert response_dict.get('user_partition_id') is None
def test_change_cohort_group_id(self):
"""
@@ -574,7 +537,7 @@ class CohortHandlerTestCase(CohortViewsTestCase):
different group_id.
"""
self._create_cohorts()
self.assertEqual((None, None), get_group_info_for_cohort(self.cohort4))
assert (None, None) == get_group_info_for_cohort(self.cohort4)
data = {
'name': self.cohort4.name,
'assignment_type': CourseCohort.RANDOM,
@@ -582,7 +545,7 @@ class CohortHandlerTestCase(CohortViewsTestCase):
'user_partition_id': 3
}
self.put_handler(self.course, self.cohort4, data=data)
self.assertEqual((2, 3), get_group_info_for_cohort(self.cohort4))
assert (2, 3) == get_group_info_for_cohort(self.cohort4)
data = {
'name': self.cohort4.name,
@@ -591,7 +554,7 @@ class CohortHandlerTestCase(CohortViewsTestCase):
'user_partition_id': 3
}
self.put_handler(self.course, self.cohort4, data=data)
self.assertEqual((1, 3), get_group_info_for_cohort(self.cohort4))
assert (1, 3) == get_group_info_for_cohort(self.cohort4)
def test_update_cohort_missing_user_partition_id(self):
"""
@@ -600,9 +563,7 @@ class CohortHandlerTestCase(CohortViewsTestCase):
self._create_cohorts()
data = {'name': self.cohort1.name, 'assignment_type': CourseCohort.RANDOM, 'group_id': 2}
response_dict = self.put_handler(self.course, self.cohort1, data=data, expected_response_code=400)
self.assertEqual(
"If group_id is specified, user_partition_id must also be specified.", response_dict.get("error")
)
assert 'If group_id is specified, user_partition_id must also be specified.' == response_dict.get('error')
class UsersInCohortTestCase(CohortViewsTestCase):
@@ -620,10 +581,10 @@ class UsersInCohortTestCase(CohortViewsTestCase):
response = users_in_cohort(request, six.text_type(course.id), cohort.id)
if should_return_bad_request:
self.assertEqual(response.status_code, 400)
assert response.status_code == 400
return
self.assertEqual(response.status_code, 200)
assert response.status_code == 200
return json.loads(response.content.decode('utf-8'))
def verify_users_in_cohort_and_response(self, cohort, response_dict, expected_users, expected_page,
@@ -633,14 +594,14 @@ class UsersInCohortTestCase(CohortViewsTestCase):
users, page number, and total number of pages for a given cohort. Also
verify that those users are actually in the given cohort.
"""
self.assertTrue(response_dict.get("success"))
self.assertEqual(response_dict.get("page"), expected_page)
self.assertEqual(response_dict.get("num_pages"), expected_num_pages)
assert response_dict.get('success')
assert response_dict.get('page') == expected_page
assert response_dict.get('num_pages') == expected_num_pages
returned_users = User.objects.filter(username__in=[user.get("username") for user in response_dict.get("users")])
self.assertEqual(len(returned_users), len(expected_users))
self.assertEqual(set(returned_users), set(expected_users))
self.assertTrue(set(returned_users).issubset(cohort.users.all()))
assert len(returned_users) == len(expected_users)
assert set(returned_users) == set(expected_users)
assert set(returned_users).issubset(cohort.users.all())
def test_non_staff(self):
"""
@@ -754,13 +715,10 @@ class AddUsersToCohortTestCase(CohortViewsTestCase):
request = RequestFactory().post("dummy_url", {"users": users_string})
request.user = self.staff_user
if should_raise_404:
self.assertRaises(
Http404,
lambda: add_users_to_cohort(request, six.text_type(course.id), cohort.id)
)
pytest.raises(Http404, (lambda: add_users_to_cohort(request, six.text_type(course.id), cohort.id)))
else:
response = add_users_to_cohort(request, six.text_type(course.id), cohort.id)
self.assertEqual(response.status_code, 200)
assert response.status_code == 200
return json.loads(response.content.decode('utf-8'))
@@ -778,41 +736,20 @@ class AddUsersToCohortTestCase(CohortViewsTestCase):
'expected_preassigned' is a list of email addresses
'expected_invalid' is a list of email addresses
"""
self.assertTrue(response_dict.get("success"))
self.assertEqual(
response_dict.get("added"),
[
{"username": user.username, "email": user.email}
for user in expected_added
]
)
self.assertEqual(
response_dict.get("changed"),
[
{
"username": user.username,
"email": user.email,
"previous_cohort": previous_cohort
}
for (user, previous_cohort) in expected_changed
]
)
self.assertEqual(
response_dict.get("present"),
[username_or_email for (_, username_or_email) in expected_present]
)
self.assertEqual(response_dict.get("unknown"), expected_unknown)
self.assertEqual(response_dict.get("invalid"), expected_invalid)
self.assertEqual(response_dict.get("preassigned"), expected_preassigned)
assert response_dict.get('success')
assert response_dict.get('added') == [{'username': user.username,
'email': user.email} for user in expected_added]
assert response_dict.get('changed') == [{'username': user.username, 'email': user.email,
'previous_cohort': previous_cohort} for (user, previous_cohort)
in expected_changed]
assert response_dict.get('present') == [username_or_email for (_, username_or_email) in expected_present]
assert response_dict.get('unknown') == expected_unknown
assert response_dict.get('invalid') == expected_invalid
assert response_dict.get('preassigned') == expected_preassigned
for user in expected_added + [user for (user, _) in expected_changed + expected_present]:
self.assertEqual(
CourseUserGroup.objects.get(
course_id=course.id,
group_type=CourseUserGroup.COHORT,
users__id=user.id
),
cohort
)
assert CourseUserGroup.objects.get(course_id=course.id,
group_type=CourseUserGroup.COHORT,
users__id=user.id) == cohort
def test_non_staff(self):
"""
@@ -984,8 +921,8 @@ class AddUsersToCohortTestCase(CohortViewsTestCase):
for c in self.get_handler(self.course)['cohorts']:
if c['name'] == cohort.name:
cohort_listed = True
self.assertEqual(expected_count, c['user_count'])
self.assertTrue(cohort_listed)
assert expected_count == c['user_count']
assert cohort_listed
def test_all(self):
"""
@@ -1143,7 +1080,7 @@ class RemoveUserFromCohortTestCase(CohortViewsTestCase):
request = RequestFactory().post("dummy_url")
request.user = self.staff_user
response = remove_user_from_cohort(request, six.text_type(self.course.id), cohort.id)
self.assertEqual(response.status_code, 200)
assert response.status_code == 200
return json.loads(response.content.decode('utf-8'))
def verify_removed_user_from_cohort(self, username, response_dict, cohort, expected_error_msg=None):
@@ -1153,12 +1090,12 @@ class RemoveUserFromCohortTestCase(CohortViewsTestCase):
verify that the returned error message matches the expected one.
"""
if expected_error_msg is None:
self.assertTrue(response_dict.get("success"))
self.assertIsNone(response_dict.get("msg"))
self.assertFalse(self._user_in_cohort(username, cohort))
assert response_dict.get('success')
assert response_dict.get('msg') is None
assert not self._user_in_cohort(username, cohort)
else:
self.assertFalse(response_dict.get("success"))
self.assertEqual(response_dict.get("msg"), expected_error_msg)
assert not response_dict.get('success')
assert response_dict.get('msg') == expected_error_msg
def test_non_staff(self):
"""

View File

@@ -106,9 +106,9 @@ class TestDumpToNeo4jCommandBase(SharedModuleStoreTestCase):
number_rollbacks: number of commit rollbacks we expect
"""
courses = set([node['course_key'] for node in mock_graph.nodes]) # lint-amnesty, pylint: disable=consider-using-set-comprehension
self.assertEqual(len(courses), number_of_courses)
self.assertEqual(mock_graph.number_commits, number_commits)
self.assertEqual(mock_graph.number_rollbacks, number_rollbacks)
assert len(courses) == number_of_courses
assert mock_graph.number_commits == number_commits
assert mock_graph.number_rollbacks == number_rollbacks
@ddt.ddt
@@ -245,26 +245,26 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
Tests the serialize_item method.
"""
fields, label = serialize_item(self.course)
self.assertEqual(label, "course")
self.assertIn("edited_on", list(fields.keys()))
self.assertIn("display_name", list(fields.keys()))
self.assertIn("org", list(fields.keys()))
self.assertIn("course", list(fields.keys()))
self.assertIn("run", list(fields.keys()))
self.assertIn("course_key", list(fields.keys()))
self.assertIn("location", list(fields.keys()))
self.assertIn("block_type", list(fields.keys()))
self.assertIn("detached", list(fields.keys()))
self.assertNotIn("checklist", list(fields.keys()))
assert label == 'course'
assert 'edited_on' in list(fields.keys())
assert 'display_name' in list(fields.keys())
assert 'org' in list(fields.keys())
assert 'course' in list(fields.keys())
assert 'run' in list(fields.keys())
assert 'course_key' in list(fields.keys())
assert 'location' in list(fields.keys())
assert 'block_type' in list(fields.keys())
assert 'detached' in list(fields.keys())
assert 'checklist' not in list(fields.keys())
def test_serialize_course(self):
"""
Tests the serialize_course method.
"""
nodes, relationships = serialize_course(self.course.id)
self.assertEqual(len(nodes), 9)
assert len(nodes) == 9
# the course has 7 "PARENT_OF" relationships and 3 "PRECEDES"
self.assertEqual(len(relationships), 10)
assert len(relationships) == 10
def test_strip_version_and_branch(self):
"""
@@ -277,13 +277,13 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
'test_branch'
).for_version('test_version')
self.assertIsNotNone(location.branch)
self.assertIsNotNone(location.version_guid)
assert location.branch is not None
assert location.version_guid is not None
stripped_location = strip_branch_and_version(location)
self.assertIsNone(stripped_location.branch)
self.assertIsNone(stripped_location.version_guid)
assert stripped_location.branch is None
assert stripped_location.version_guid is None
@staticmethod
def _extract_relationship_pairs(relationships, relationship_type):
@@ -325,7 +325,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
relationship_pairs = self._extract_relationship_pairs(relationships, relationship_type)
location_pair = self._extract_location_pair(xblock1, xblock2)
self.assertIn(location_pair, relationship_pairs)
assert location_pair in relationship_pairs
def assertBlockPairIsNotRelationship(self, xblock1, xblock2, relationships, relationship_type):
"""
@@ -334,7 +334,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
"""
relationship_pairs = self._extract_relationship_pairs(relationships, relationship_type)
location_pair = self._extract_location_pair(xblock1, xblock2)
self.assertNotIn(location_pair, relationship_pairs)
assert location_pair not in relationship_pairs
def test_precedes_relationship(self):
"""
@@ -366,14 +366,14 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
# the html node should have 0 index, and the problem should have 1
html_nodes = [node for node in nodes if node['block_type'] == 'html']
self.assertEqual(len(html_nodes), 1)
assert len(html_nodes) == 1
problem_nodes = [node for node in nodes if node['block_type'] == 'problem']
self.assertEqual(len(problem_nodes), 1)
assert len(problem_nodes) == 1
html_node = html_nodes[0]
problem_node = problem_nodes[0]
self.assertEqual(html_node['index'], 0)
self.assertEqual(problem_node['index'], 1)
assert html_node['index'] == 0
assert problem_node['index'] == 1
@ddt.data(
(1, 1),
@@ -395,7 +395,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
Tests the coerce_types helper
"""
coerced_value = coerce_types(original_value)
self.assertEqual(coerced_value, coerced_expected)
assert coerced_value == coerced_expected
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
@@ -422,7 +422,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
# 9 nodes + 7 relationships from the first course
# 2 nodes and no relationships from the second
self.assertEqual(len(mock_graph.nodes), 11)
assert len(mock_graph.nodes) == 11
six.assertCountEqual(self, submitted, self.course_strings)
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@@ -480,7 +480,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
submitted, __ = self.mss.dump_courses_to_neo4j(
credentials, override_cache=override_cache
)
self.assertEqual(len(submitted), expected_number_courses)
assert len(submitted) == expected_number_courses
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph')
@@ -497,7 +497,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
# run once to warm the cache
submitted, skipped = self.mss.dump_courses_to_neo4j(credentials) # lint-amnesty, pylint: disable=unused-variable
self.assertEqual(len(submitted), len(self.course_strings))
assert len(submitted) == len(self.course_strings)
# simulate one of the courses being published
with override_waffle_switch(block_structure_config.STORAGE_BACKING_FOR_CACHE, True):
@@ -505,8 +505,8 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
# make sure only the published course was dumped
submitted, __ = self.mss.dump_courses_to_neo4j(credentials)
self.assertEqual(len(submitted), 1)
self.assertEqual(submitted[0], six.text_type(self.course.id))
assert len(submitted) == 1
assert submitted[0] == six.text_type(self.course.id)
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_course_last_published')
@mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_command_last_run')
@@ -534,7 +534,4 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
mock_get_course_last_published.return_value = last_course_published
mock_course_key = mock.Mock()
mock_graph = mock.Mock()
self.assertEqual(
should_dump_course(mock_course_key, mock_graph),
should_dump,
)
assert should_dump_course(mock_course_key, mock_graph) == should_dump

View File

@@ -28,7 +28,7 @@ class CrawlersConfigTest(TestCase): # lint-amnesty, pylint: disable=missing-cla
"""
fake_request = HttpRequest()
fake_request.META['HTTP_USER_AGENT'] = req_user_agent
self.assertFalse(CrawlersConfig.is_crawler(fake_request))
assert not CrawlersConfig.is_crawler(fake_request)
@ddt.data(
u"edX-downloader",
@@ -40,4 +40,4 @@ class CrawlersConfigTest(TestCase): # lint-amnesty, pylint: disable=missing-cla
"""
fake_request = HttpRequest()
fake_request.META['HTTP_USER_AGENT'] = req_user_agent
self.assertTrue(CrawlersConfig.is_crawler(fake_request))
assert CrawlersConfig.is_crawler(fake_request)

View File

@@ -65,9 +65,9 @@ class TestNotifyCredentials(TestCase):
@mock.patch(COMMAND_MODULE + '.Command.send_notifications')
def test_course_args(self, mock_send):
call_command(Command(), '--course', 'course-v1:edX+Test+1', 'course-v1:edX+Test+2')
self.assertTrue(mock_send.called)
self.assertEqual(list(mock_send.call_args[0][0]), [self.cert1, self.cert2])
self.assertEqual(list(mock_send.call_args[0][1]), [self.grade1, self.grade2])
assert mock_send.called
assert list(mock_send.call_args[0][0]) == [self.cert1, self.cert2]
assert list(mock_send.call_args[0][1]) == [self.grade1, self.grade2]
@freeze_time(datetime(2017, 5, 1, 4))
@mock.patch(COMMAND_MODULE + '.Command.send_notifications')
@@ -91,35 +91,35 @@ class TestNotifyCredentials(TestCase):
call_command(Command(), '--auto')
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [cert1, cert2])
self.assertListEqual(list(mock_send.call_args[0][1]), [grade1, grade2])
self.assertLessEqual(len(list(mock_send.call_args[0][0])), len(total_certificates))
self.assertLessEqual(len(list(mock_send.call_args[0][1])), len(total_grades))
assert len(list(mock_send.call_args[0][0])) <= len(total_certificates)
assert len(list(mock_send.call_args[0][1])) <= len(total_grades)
@mock.patch(COMMAND_MODULE + '.Command.send_notifications')
def test_date_args(self, mock_send):
call_command(Command(), '--start-date', '2017-01-31')
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert2, self.cert4, self.cert3])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade2, self.grade4, self.grade3])
mock_send.reset_mock()
call_command(Command(), '--start-date', '2017-02-01', '--end-date', '2017-02-02')
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert2, self.cert4])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade2, self.grade4])
mock_send.reset_mock()
call_command(Command(), '--end-date', '2017-02-02')
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert1, self.cert2, self.cert4])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade1, self.grade2, self.grade4])
mock_send.reset_mock()
call_command(Command(), '--start-date', "2017-02-01 00:00:00", '--end-date', '2017-02-01 04:00:00')
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert2])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade2])
@@ -128,7 +128,7 @@ class TestNotifyCredentials(TestCase):
call_command(
Command(), '--start-date', '2017-02-01', '--end-date', '2017-02-02', '--usernames', self.user2.username
)
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert4])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade4])
mock_send.reset_mock()
@@ -136,7 +136,7 @@ class TestNotifyCredentials(TestCase):
call_command(
Command(), '--usernames', self.user2.username
)
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert4])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade4])
mock_send.reset_mock()
@@ -144,7 +144,7 @@ class TestNotifyCredentials(TestCase):
call_command(
Command(), '--start-date', '2017-02-01', '--end-date', '2017-02-02', '--usernames', self.user.username
)
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert2])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade2])
mock_send.reset_mock()
@@ -152,7 +152,7 @@ class TestNotifyCredentials(TestCase):
call_command(
Command(), '--usernames', self.user.username
)
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert1, self.cert2, self.cert3])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade1, self.grade2, self.grade3])
mock_send.reset_mock()
@@ -160,7 +160,7 @@ class TestNotifyCredentials(TestCase):
call_command(
Command(), '--usernames', self.user.username, self.user2.username
)
self.assertTrue(mock_send.called)
assert mock_send.called
self.assertListEqual(list(mock_send.call_args[0][0]), [self.cert1, self.cert2, self.cert4, self.cert3])
self.assertListEqual(list(mock_send.call_args[0][1]), [self.grade1, self.grade2, self.grade4, self.grade3])
mock_send.reset_mock()
@@ -169,39 +169,40 @@ class TestNotifyCredentials(TestCase):
def test_no_args(self, mock_send):
with self.assertRaisesRegex(CommandError, 'You must specify a filter.*'):
call_command(Command())
self.assertFalse(mock_send.called)
assert not mock_send.called
@mock.patch(COMMAND_MODULE + '.Command.send_notifications')
def test_dry_run(self, mock_send):
call_command(Command(), '--dry-run', '--start-date', '2017-02-01')
self.assertFalse(mock_send.called)
assert not mock_send.called
@mock.patch(COMMAND_MODULE + '.handle_course_cert_awarded')
@mock.patch(COMMAND_MODULE + '.send_grade_if_interesting')
@mock.patch(COMMAND_MODULE + '.handle_course_cert_changed')
def test_hand_off(self, mock_grade_interesting, mock_program_changed, mock_program_awarded):
call_command(Command(), '--start-date', '2017-02-01')
self.assertEqual(mock_grade_interesting.call_count, 3)
self.assertEqual(mock_program_changed.call_count, 3)
self.assertEqual(mock_program_awarded.call_count, 0)
assert mock_grade_interesting.call_count == 3
assert mock_program_changed.call_count == 3
assert mock_program_awarded.call_count == 0
mock_grade_interesting.reset_mock()
mock_program_changed.reset_mock()
mock_program_awarded.reset_mock()
call_command(Command(), '--start-date', '2017-02-01', '--notify_programs')
self.assertEqual(mock_grade_interesting.call_count, 3)
self.assertEqual(mock_program_changed.call_count, 3)
self.assertEqual(mock_program_awarded.call_count, 1)
assert mock_grade_interesting.call_count == 3
assert mock_program_changed.call_count == 3
assert mock_program_awarded.call_count == 1
@mock.patch(COMMAND_MODULE + '.time')
def test_delay(self, mock_time):
call_command(Command(), '--start-date', '2017-01-01', '--page-size=2')
self.assertEqual(mock_time.sleep.call_count, 0)
assert mock_time.sleep.call_count == 0
mock_time.sleep.reset_mock()
call_command(Command(), '--start-date', '2017-01-01', '--page-size=2', '--delay', '0.2')
self.assertEqual(mock_time.sleep.call_count, 2) # Between each page, twice (2 pages, for certs and grades)
self.assertEqual(mock_time.sleep.call_args[0][0], 0.2)
assert mock_time.sleep.call_count == 2
# Between each page, twice (2 pages, for certs and grades)
assert mock_time.sleep.call_args[0][0] == 0.2
@override_settings(DEBUG=True)
def test_page_size(self):
@@ -211,11 +212,13 @@ class TestNotifyCredentials(TestCase):
reset_queries()
call_command(Command(), '--start-date', '2017-01-01', '--page-size=1')
self.assertEqual(len(connection.queries), baseline + 6) # two extra page queries each for certs & grades
assert len(connection.queries) == (baseline + 6)
# two extra page queries each for certs & grades
reset_queries()
call_command(Command(), '--start-date', '2017-01-01', '--page-size=2')
self.assertEqual(len(connection.queries), baseline + 2) # one extra page query each for certs & grades
assert len(connection.queries) == (baseline + 2)
# one extra page query each for certs & grades
@mock.patch(COMMAND_MODULE + '.send_grade_if_interesting')
def test_site(self, mock_grade_interesting):
@@ -224,7 +227,7 @@ class TestNotifyCredentials(TestCase):
)
call_command(Command(), '--site', site_config.site.domain, '--start-date', '2017-01-01')
self.assertEqual(mock_grade_interesting.call_count, 1)
assert mock_grade_interesting.call_count == 1
@mock.patch(COMMAND_MODULE + '.Command.send_notifications')
def test_args_from_database(self, mock_send):
@@ -240,11 +243,12 @@ class TestNotifyCredentials(TestCase):
# Not told to use config, should ignore it
call_command(Command(), '--start-date', '2017-01-01')
self.assertEqual(len(mock_send.call_args[0][0]), 4) # Number of certs expected
assert len(mock_send.call_args[0][0]) == 4
# Number of certs expected
# Told to use it, and enabled. Should use config in preference of command line
call_command(Command(), '--start-date', '2017-01-01', '--args-from-database')
self.assertEqual(len(mock_send.call_args[0][0]), 1)
assert len(mock_send.call_args[0][0]) == 1
config.enabled = False
config.save()

View File

@@ -24,16 +24,16 @@ class TestCredentialsApiConfig(CredentialsApiConfigMixin, TestCase):
credentials_config = self.create_credentials_config()
expected = '{root}/api/{version}/'.format(root=CREDENTIALS_INTERNAL_SERVICE_URL.strip('/'), version=API_VERSION)
self.assertEqual(credentials_config.get_internal_api_url_for_org('nope'), expected)
assert credentials_config.get_internal_api_url_for_org('nope') == expected
expected = '{root}/api/{version}/'.format(root=CREDENTIALS_INTERNAL_SERVICE_URL.strip('/'), version=API_VERSION)
self.assertEqual(credentials_config.internal_api_url, expected)
assert credentials_config.internal_api_url == expected
expected = '{root}/api/{version}/'.format(root=CREDENTIALS_INTERNAL_SERVICE_URL.strip('/'), version=API_VERSION)
self.assertEqual(credentials_config.get_internal_api_url_for_org('nope'), expected)
assert credentials_config.get_internal_api_url_for_org('nope') == expected
expected = '{root}/api/{version}/'.format(root=CREDENTIALS_PUBLIC_SERVICE_URL.strip('/'), version=API_VERSION)
self.assertEqual(credentials_config.public_api_url, expected)
assert credentials_config.public_api_url == expected
def test_is_learner_issuance_enabled(self):
"""
@@ -41,10 +41,10 @@ class TestCredentialsApiConfig(CredentialsApiConfigMixin, TestCase):
when configuration is enabled and all required configuration is provided.
"""
credentials_config = self.create_credentials_config(enabled=False)
self.assertFalse(credentials_config.is_learner_issuance_enabled)
assert not credentials_config.is_learner_issuance_enabled
credentials_config = self.create_credentials_config(enable_learner_issuance=False)
self.assertFalse(credentials_config.is_learner_issuance_enabled)
assert not credentials_config.is_learner_issuance_enabled
credentials_config = self.create_credentials_config()
self.assertTrue(credentials_config.is_learner_issuance_enabled)
assert credentials_config.is_learner_issuance_enabled

View File

@@ -58,7 +58,7 @@ class TestCredentialsSignalsSendGrade(TestCase):
# Test direct send
send_grade_if_interesting(self.user, self.key, mode, status, 'A', 1.0)
self.assertIs(mock_send_grade_to_credentials.delay.called, called)
assert mock_send_grade_to_credentials.delay.called is called
mock_send_grade_to_credentials.delay.reset_mock()
# Test query
@@ -69,11 +69,11 @@ class TestCredentialsSignalsSendGrade(TestCase):
mode=mode
)
send_grade_if_interesting(self.user, self.key, None, None, 'A', 1.0)
self.assertIs(mock_send_grade_to_credentials.delay.called, called)
assert mock_send_grade_to_credentials.delay.called is called
def test_send_grade_missing_cert(self, _, mock_send_grade_to_credentials, _mock_is_learner_issuance_enabled):
send_grade_if_interesting(self.user, self.key, None, None, 'A', 1.0)
self.assertFalse(mock_send_grade_to_credentials.delay.called)
assert not mock_send_grade_to_credentials.delay.called
@ddt.data([True], [False])
@ddt.unpack
@@ -81,7 +81,7 @@ class TestCredentialsSignalsSendGrade(TestCase):
mock_send_grade_to_credentials, _mock_is_learner_issuance_enabled):
mock_is_course_run_in_a_program.return_value = in_program
send_grade_if_interesting(self.user, self.key, 'verified', 'downloadable', 'A', 1.0)
self.assertIs(mock_send_grade_to_credentials.delay.called, in_program)
assert mock_send_grade_to_credentials.delay.called is in_program
def test_send_grade_queries_grade(self, mock_is_course_run_in_a_program, mock_send_grade_to_credentials,
_mock_is_learner_issuance_enabled):
@@ -89,9 +89,8 @@ class TestCredentialsSignalsSendGrade(TestCase):
with mock_passing_grade('B', 0.81):
send_grade_if_interesting(self.user, self.key, 'verified', 'downloadable', None, None)
self.assertTrue(mock_send_grade_to_credentials.delay.called)
self.assertEqual(mock_send_grade_to_credentials.delay.call_args[0],
(self.user.username, str(self.key), True, 'B', 0.81))
assert mock_send_grade_to_credentials.delay.called
assert mock_send_grade_to_credentials.delay.call_args[0] == (self.user.username, str(self.key), True, 'B', 0.81)
mock_send_grade_to_credentials.delay.reset_mock()
@mock.patch.dict(settings.FEATURES, {'ASSUME_ZERO_GRADE_IF_ABSENT_FOR_ALL_TESTS': False})
@@ -99,14 +98,14 @@ class TestCredentialsSignalsSendGrade(TestCase):
_mock_is_learner_issuance_enabled):
mock_is_course_run_in_a_program.return_value = True
send_grade_if_interesting(self.user, self.key, 'verified', 'downloadable', None, None)
self.assertFalse(mock_send_grade_to_credentials.delay.called)
assert not mock_send_grade_to_credentials.delay.called
def test_send_grade_without_issuance_enabled(self, _mock_is_course_run_in_a_program,
mock_send_grade_to_credentials, mock_is_learner_issuance_enabled):
mock_is_learner_issuance_enabled.return_value = False
send_grade_if_interesting(self.user, self.key, 'verified', 'downloadable', None, None)
self.assertTrue(mock_is_learner_issuance_enabled.called)
self.assertFalse(mock_send_grade_to_credentials.delay.called)
assert mock_is_learner_issuance_enabled.called
assert not mock_send_grade_to_credentials.delay.called
def test_send_grade_records_enabled(self, _mock_is_course_run_in_a_program, mock_send_grade_to_credentials,
_mock_is_learner_issuance_enabled):
@@ -116,24 +115,24 @@ class TestCredentialsSignalsSendGrade(TestCase):
# Correctly sent
send_grade_if_interesting(self.user, self.key, 'verified', 'downloadable', None, None)
self.assertTrue(mock_send_grade_to_credentials.delay.called)
assert mock_send_grade_to_credentials.delay.called
mock_send_grade_to_credentials.delay.reset_mock()
# Correctly not sent
site_config.site_values['ENABLE_LEARNER_RECORDS'] = False
site_config.save()
send_grade_if_interesting(self.user, self.key, 'verified', 'downloadable', None, None)
self.assertFalse(mock_send_grade_to_credentials.delay.called)
assert not mock_send_grade_to_credentials.delay.called
def test_send_grade_records_disabled_globally(
self, _mock_is_course_run_in_a_program, mock_send_grade_to_credentials,
_mock_is_learner_issuance_enabled
):
self.assertTrue(is_learner_records_enabled())
assert is_learner_records_enabled()
with override_settings(FEATURES={"ENABLE_LEARNER_RECORDS": False}):
self.assertFalse(is_learner_records_enabled())
assert not is_learner_records_enabled()
send_grade_if_interesting(self.user, self.key, 'verified', 'downloadable', None, None)
self.assertFalse(mock_send_grade_to_credentials.delay.called)
assert not mock_send_grade_to_credentials.delay.called
@skip_unless_lms
@@ -150,13 +149,13 @@ class TestCredentialsSignalsUtils(TestCase):
def test_is_course_run_in_a_program_success(self, mock_get_programs):
mock_get_programs.return_value = self.data
self.assertTrue(is_course_run_in_a_program(self.course_run['key']))
self.assertEqual(mock_get_programs.call_args[0], (self.site,))
assert is_course_run_in_a_program(self.course_run['key'])
assert mock_get_programs.call_args[0] == (self.site,)
def test_is_course_run_in_a_program_failure(self, mock_get_programs):
mock_get_programs.return_value = self.data
course_run2 = CourseRunFactory()
self.assertFalse(is_course_run_in_a_program(course_run2['key']))
assert not is_course_run_in_a_program(course_run2['key'])
@skip_unless_lms
@@ -167,14 +166,14 @@ class TestCredentialsSignalsEmissions(ModuleStoreTestCase):
def test_cert_changed(self, mock_send_grade_if_interesting):
user = UserFactory()
self.assertFalse(mock_send_grade_if_interesting.called)
assert not mock_send_grade_if_interesting.called
GeneratedCertificateFactory(user=user)
self.assertTrue(mock_send_grade_if_interesting.called)
assert mock_send_grade_if_interesting.called
def test_grade_changed(self, mock_send_grade_if_interesting):
user = UserFactory()
course = XModuleCourseFactory()
self.assertFalse(mock_send_grade_if_interesting.called)
assert not mock_send_grade_if_interesting.called
CourseGradeFactory().update(user, course=course)
self.assertTrue(mock_send_grade_if_interesting.called)
assert mock_send_grade_if_interesting.called

View File

@@ -2,7 +2,7 @@
Test credentials tasks
"""
import pytest
import mock
from django.conf import settings
from django.test import TestCase, override_settings
@@ -39,11 +39,11 @@ class TestSendGradeToCredentialTask(TestCase):
tasks.send_grade_to_credentials.delay('user', 'course-v1:org+course+run', True, 'A', 1.0).get()
self.assertEqual(mock_get_api_client.call_count, 1)
self.assertEqual(mock_get_api_client.call_args[0], (self.user,))
assert mock_get_api_client.call_count == 1
assert mock_get_api_client.call_args[0] == (self.user,)
self.assertDictEqual(mock_get_api_client.call_args[1], {'org': 'org'})
self.assertEqual(api_client.grades.post.call_count, 1)
assert api_client.grades.post.call_count == 1
self.assertDictEqual(api_client.grades.post.call_args[0][0], {
'username': 'user',
'course_run': 'course-v1:org+course+run',
@@ -60,5 +60,5 @@ class TestSendGradeToCredentialTask(TestCase):
task = tasks.send_grade_to_credentials.delay('user', 'course-v1:org+course+run', True, 'A', 1.0)
self.assertRaises(Exception, task.get)
self.assertEqual(mock_get_api_client.call_count, tasks.MAX_RETRIES + 1)
pytest.raises(Exception, task.get)
assert mock_get_api_client.call_count == (tasks.MAX_RETRIES + 1)

View File

@@ -47,10 +47,10 @@ class TestGetCredentials(CredentialsApiConfigMixin, CacheIsolationTestCase):
'only_visible': 'True',
}
cache_key = '{}.{}'.format(self.credentials_config.CACHE_KEY, self.user.username)
self.assertEqual(kwargs['querystring'], querystring)
self.assertEqual(kwargs['cache_key'], cache_key)
assert kwargs['querystring'] == querystring
assert kwargs['cache_key'] == cache_key
self.assertEqual(actual, expected)
assert actual == expected
def test_get_one(self, mock_get_edx_api_data):
expected = factories.UserCredential()
@@ -70,10 +70,10 @@ class TestGetCredentials(CredentialsApiConfigMixin, CacheIsolationTestCase):
'program_uuid': program_uuid,
}
cache_key = '{}.{}.{}'.format(self.credentials_config.CACHE_KEY, self.user.username, program_uuid)
self.assertEqual(kwargs['querystring'], querystring)
self.assertEqual(kwargs['cache_key'], cache_key)
assert kwargs['querystring'] == querystring
assert kwargs['cache_key'] == cache_key
self.assertEqual(actual, expected)
assert actual == expected
def test_type_filter(self, mock_get_edx_api_data):
get_credentials(self.user, credential_type='program')
@@ -88,4 +88,4 @@ class TestGetCredentials(CredentialsApiConfigMixin, CacheIsolationTestCase):
'only_visible': 'True',
'type': 'program',
}
self.assertEqual(kwargs['querystring'], querystring)
assert kwargs['querystring'] == querystring