removed bulk_assertions context manager

removed usages of bulk_assertion

changes made to comply quality failure

removing test-bulk assertions.
This commit is contained in:
aarif
2019-08-09 17:26:45 +05:00
committed by arbisoft
parent e612a9bca6
commit 4da4749f84
2 changed files with 57 additions and 406 deletions

View File

@@ -225,154 +225,6 @@ def map_references(value, field, actual_course_key):
return value
class BulkAssertionError(AssertionError):
"""
An AssertionError that contains many sub-assertions.
"""
def __init__(self, assertion_errors):
self.errors = assertion_errors
super(BulkAssertionError, self).__init__("The following assertions were raised:\n{}".format(
"\n\n".join(self.errors)
))
class _BulkAssertionManager(object):
"""
This provides a facility for making a large number of assertions, and seeing all of
the failures at once, rather than only seeing single failures.
"""
def __init__(self, test_case):
self._assertion_errors = []
self._test_case = test_case
def log_error(self, formatted_exc):
"""
Record ``formatted_exc`` in the set of exceptions captured by this assertion manager.
"""
self._assertion_errors.append(formatted_exc)
def raise_assertion_errors(self):
"""
Raise a BulkAssertionError containing all of the captured AssertionErrors,
if there were any.
"""
if self._assertion_errors:
raise BulkAssertionError(self._assertion_errors)
class BulkAssertionTest(TestCase):
"""
This context manager provides a _BulkAssertionManager to assert with,
and then calls `raise_assertion_errors` at the end of the block to validate all
of the assertions.
"""
def setUp(self, *args, **kwargs):
super(BulkAssertionTest, self).setUp(*args, **kwargs)
# Use __ to not pollute the namespace of subclasses with what could be a fairly generic name.
self.__manager = None
@contextmanager
def bulk_assertions(self):
"""
A context manager that will capture all assertion failures made by self.assert*
methods within its context, and raise a single combined assertion error at
the end of the context.
"""
if self.__manager:
yield
else:
try:
self.__manager = _BulkAssertionManager(self)
yield
except Exception:
raise
else:
manager = self.__manager
self.__manager = None
manager.raise_assertion_errors()
@contextmanager
def _capture_assertion_errors(self):
"""
A context manager that captures any AssertionError raised within it,
and, if within a ``bulk_assertions`` context, records the captured
assertion to the bulk assertion manager. If not within a ``bulk_assertions``
context, just raises the original exception.
"""
try:
# Only wrap the first layer of assert functions by stashing away the manager
# before executing the assertion.
manager = self.__manager
self.__manager = None
yield
except AssertionError: # pylint: disable=broad-except
if manager is not None:
# Reconstruct the stack in which the error was thrown (so that the traceback)
# isn't cut off at `assertion(*args, **kwargs)`.
exc_type, exc_value, exc_tb = sys.exc_info()
# Count the number of stack frames before you get to a
# unittest context (walking up the stack from here).
relevant_frames = 0
for frame_record in inspect.stack():
# This is the same criterion used by unittest to decide if a
# stack frame is relevant to exception printing.
frame = frame_record[0]
if '__unittest' in frame.f_globals:
break
relevant_frames += 1
stack_above = traceback.extract_stack()[-relevant_frames:-1]
stack_below = traceback.extract_tb(exc_tb)
formatted_stack = traceback.format_list(stack_above + stack_below)
formatted_exc = traceback.format_exception_only(exc_type, exc_value)
manager.log_error(
"".join(formatted_stack + formatted_exc)
)
else:
raise
finally:
self.__manager = manager
def _wrap_assertion(self, assertion):
"""
Wraps an assert* method to capture an immediate exception,
or to generate a new assertion capturing context (in the case of assertRaises
and assertRaisesRegexp).
"""
@wraps(assertion)
def assert_(*args, **kwargs):
"""
Execute a captured assertion, and catch any assertion errors raised.
"""
context = None
# Run the assertion, and capture any raised assertionErrors
with self._capture_assertion_errors():
context = assertion(*args, **kwargs)
# Handle the assertRaises family of functions by returning
# a context manager that surrounds the assertRaises
# with our assertion capturing context manager.
if context is not None:
return nested(self._capture_assertion_errors(), context)
return assert_
def __getattribute__(self, name):
"""
Wrap all assert* methods of this class using self._wrap_assertion,
to capture all assertion errors in bulk.
"""
base_attr = super(BulkAssertionTest, self).__getattribute__(name)
if name.startswith('assert'):
return self._wrap_assertion(base_attr)
else:
return base_attr
class LazyFormat(object):
"""
An stringy object that delays formatting until it's put into a string context.
@@ -400,7 +252,7 @@ class LazyFormat(object):
return six.text_type(self)[index]
class CourseComparisonTest(BulkAssertionTest):
class CourseComparisonTest(TestCase):
"""
Mixin that has methods for comparing courses for equality.
"""
@@ -532,72 +384,61 @@ class CourseComparisonTest(BulkAssertionTest):
"""
Actual algorithm to compare courses.
"""
with self.bulk_assertions():
self.assertEqual(len(expected_items), len(actual_items))
def map_key(usage_key):
return (usage_key.block_type, usage_key.block_id)
self.assertEqual(len(expected_items), len(actual_items))
actual_item_map = {
map_key(item.location): item
for item in actual_items
}
# Split Mongo and Old-Mongo disagree about what the block_id of courses is, so skip those in
# this comparison
self.assertItemsEqual(
[map_key(item.location) for item in expected_items if item.scope_ids.block_type != 'course'],
[key for key in actual_item_map.keys() if key[0] != 'course'],
)
for expected_item in expected_items:
actual_item_location = actual_course_key.make_usage_key(expected_item.category, expected_item.location.block_id)
# split and old mongo use different names for the course root but we don't know which
# modulestore actual's come from here; so, assume old mongo and if that fails, assume split
if expected_item.location.block_type == 'course':
actual_item_location = actual_item_location.replace(name=actual_item_location.run)
def map_key(usage_key):
return (usage_key.block_type, usage_key.block_id)
actual_item_map = {
map_key(item.location): item
for item in actual_items
}
# Split Mongo and Old-Mongo disagree about what the block_id of courses is, so skip those in
# this comparison
self.assertItemsEqual(
[map_key(item.location) for item in expected_items if item.scope_ids.block_type != 'course'],
[key for key in actual_item_map.keys() if key[0] != 'course'],
)
for expected_item in expected_items:
actual_item_location = actual_course_key.make_usage_key(expected_item.category, expected_item.location.block_id)
# split and old mongo use different names for the course root but we don't know which
# modulestore actual's come from here; so, assume old mongo and if that fails, assume split
if expected_item.location.block_type == 'course':
actual_item_location = actual_item_location.replace(name=actual_item_location.run)
actual_item = actual_item_map.get(map_key(actual_item_location))
# must be split
if actual_item is None and expected_item.location.block_type == 'course':
actual_item_location = actual_item_location.replace(name='course')
actual_item = actual_item_map.get(map_key(actual_item_location))
# must be split
if actual_item is None and expected_item.location.block_type == 'course':
actual_item_location = actual_item_location.replace(name='course')
actual_item = actual_item_map.get(map_key(actual_item_location))
# Formatting the message slows down tests of large courses significantly, so only do it if it would be used
self.assertIn(map_key(actual_item_location), list(actual_item_map.keys()))
if actual_item is None:
# Formatting the message slows down tests of large courses significantly, so only do it if it would be used
self.assertIn(map_key(actual_item_location), list(actual_item_map.keys()))
if actual_item is None:
continue
# compare fields
self.assertEqual(expected_item.fields, actual_item.fields)
for field_name, field in six.iteritems(expected_item.fields):
if (expected_item.scope_ids.usage_id, field_name) in self.field_exclusions:
continue
# compare fields
self.assertEqual(expected_item.fields, actual_item.fields)
for field_name, field in six.iteritems(expected_item.fields):
if (expected_item.scope_ids.usage_id, field_name) in self.field_exclusions:
continue
if (None, field_name) in self.field_exclusions:
continue
# Children are handled specially
if field_name == 'children':
continue
self.assertFieldEqual(field, expected_item, actual_item)
# compare children
self.assertEqual(expected_item.has_children, actual_item.has_children)
if expected_item.has_children:
expected_children = [
(expected_item_child.location.block_type, expected_item_child.location.block_id)
# get_children() rather than children to strip privates from public parents
for expected_item_child in expected_item.get_children()
]
actual_children = [
(item_child.location.block_type, item_child.location.block_id)
# get_children() rather than children to strip privates from public parents
for item_child in actual_item.get_children()
]
self.assertEqual(expected_children, actual_children)
if (None, field_name) in self.field_exclusions:
continue
# Children are handled specially
if field_name == 'children':
continue
self.assertFieldEqual(field, expected_item, actual_item)
# compare children
self.assertEqual(expected_item.has_children, actual_item.has_children)
if expected_item.has_children:
expected_children = [
(expected_item_child.location.block_type, expected_item_child.location.block_id)
# get_children() rather than children to strip privates from public parents
for expected_item_child in expected_item.get_children()
]
actual_children = [
(item_child.location.block_type, item_child.location.block_id)
# get_children() rather than children to strip privates from public parents
for item_child in actual_item.get_children()
]
self.assertEqual(expected_children, actual_children)
def assertAssetEqual(self, expected_course_key, expected_asset, actual_course_key, actual_asset):
"""
@@ -640,15 +481,11 @@ class CourseComparisonTest(BulkAssertionTest):
expected_content, expected_count = expected_store.get_all_content_for_course(expected_course_key)
actual_content, actual_count = actual_store.get_all_content_for_course(actual_course_key)
with self.bulk_assertions():
self.assertEqual(expected_count, actual_count)
self._assertAssetsEqual(expected_course_key, expected_content, actual_course_key, actual_content)
expected_thumbs = expected_store.get_all_content_thumbnails_for_course(expected_course_key)
actual_thumbs = actual_store.get_all_content_thumbnails_for_course(actual_course_key)
self._assertAssetsEqual(expected_course_key, expected_thumbs, actual_course_key, actual_thumbs)
self.assertEqual(expected_count, actual_count)
self._assertAssetsEqual(expected_course_key, expected_content, actual_course_key, actual_content)
expected_thumbs = expected_store.get_all_content_thumbnails_for_course(expected_course_key)
actual_thumbs = actual_store.get_all_content_thumbnails_for_course(actual_course_key)
self._assertAssetsEqual(expected_course_key, expected_thumbs, actual_course_key, actual_thumbs)
def assertAssetsMetadataEqual(self, expected_modulestore, expected_course_key, actual_modulestore, actual_course_key):
"""

View File

@@ -1,186 +0,0 @@
from __future__ import absolute_import
import itertools
import ddt
from xmodule.tests import BulkAssertionError, BulkAssertionTest
ASSERTION_METHODS_DICT = {
"GETITEM_SPECIAL_METHOD": {}.__getitem__,
"LAMBDA": lambda: None
}
STATIC_PASSING_ASSERTIONS = (
('assertTrue', True),
('assertFalse', False),
('assertIs', 1, 1),
('assertEqual', 1, 1),
('assertEquals', 1, 1),
('assertIsNot', 1, 2),
('assertIsNone', None),
('assertIsNotNone', 1),
('assertIn', 1, (1, 2, 3)),
('assertNotIn', 5, (1, 2, 3)),
('assertIsInstance', 1, int),
('assertNotIsInstance', '1', int),
('assertItemsEqual', [1, 2, 3], [3, 2, 1])
)
STATIC_FAILING_ASSERTIONS = (
('assertTrue', False),
('assertFalse', True),
('assertIs', 1, 2),
('assertEqual', 1, 2),
('assertEquals', 1, 2),
('assertIsNot', 1, 1),
('assertIsNone', 1),
('assertIsNotNone', None),
('assertIn', 5, (1, 2, 3)),
('assertNotIn', 1, (1, 2, 3)),
('assertIsInstance', '1', int),
('assertNotIsInstance', 1, int),
('assertItemsEqual', [1, 1, 1], [1, 1])
)
CONTEXT_PASSING_ASSERTIONS = (
('assertRaises', KeyError, "GETITEM_SPECIAL_METHOD", '1'),
('assertRaisesRegexp', KeyError, "1", "GETITEM_SPECIAL_METHOD", '1'),
)
CONTEXT_FAILING_ASSERTIONS = (
('assertRaises', ValueError, "LAMBDA"),
('assertRaisesRegexp', KeyError, "2", "GETITEM_SPECIAL_METHOD", '1'),
)
@ddt.ddt
class TestBulkAssertionTestCase(BulkAssertionTest):
# We have to use assertion methods from the base UnitTest class,
# so we make a number of super calls that skip BulkAssertionTest.
# pylint: disable=bad-super-call
def _is_arg_in_assertion_methods_dict(self, argument):
"""
Takes in an argument, and returns whether
"""
return type(argument) == str and argument in ASSERTION_METHODS_DICT
def _run_assertion(self, assertion_tuple):
"""
Run the supplied tuple of (assertion, *args) as a method on this class.
"""
assertion, args = assertion_tuple[0], assertion_tuple[1:]
args_list = list(args)
for index, argument in enumerate(args_list):
if self._is_arg_in_assertion_methods_dict(argument):
args_list[index] = ASSERTION_METHODS_DICT[argument]
args = tuple(args_list)
getattr(self, assertion)(*args)
def _raw_assert(self, assertion_name, *args, **kwargs):
"""
Run an un-modified assertion.
"""
# Use super(BulkAssertionTest) to make sure we get un-adulturated assertions
return getattr(super(BulkAssertionTest, self), 'assert' + assertion_name)(*args, **kwargs)
@ddt.data(*(STATIC_PASSING_ASSERTIONS + CONTEXT_PASSING_ASSERTIONS))
def test_passing_asserts_passthrough(self, assertion_tuple):
self._run_assertion(assertion_tuple)
@ddt.data(*(STATIC_FAILING_ASSERTIONS + CONTEXT_FAILING_ASSERTIONS))
def test_failing_asserts_passthrough(self, assertion_tuple):
with self._raw_assert('Raises', AssertionError) as context:
self._run_assertion(assertion_tuple)
self._raw_assert('NotIsInstance', context.exception, BulkAssertionError)
@ddt.data(*CONTEXT_PASSING_ASSERTIONS)
@ddt.unpack
def test_passing_context_assertion_passthrough(self, assertion, *args):
assertion_args = []
args = list(args)
exception = args.pop(0)
while not self._is_arg_in_assertion_methods_dict(args[0]):
assertion_args.append(args.pop(0))
function = ASSERTION_METHODS_DICT[args.pop(0)]
with getattr(self, assertion)(exception, *assertion_args):
function(*args)
@ddt.data(*CONTEXT_FAILING_ASSERTIONS)
@ddt.unpack
def test_failing_context_assertion_passthrough(self, assertion, *args):
assertion_args = []
args = list(args)
exception = args.pop(0)
while not self._is_arg_in_assertion_methods_dict(args[0]):
assertion_args.append(args.pop(0))
function = ASSERTION_METHODS_DICT[args.pop(0)]
with self._raw_assert('Raises', AssertionError) as context:
with getattr(self, assertion)(exception, *assertion_args):
function(*args)
self._raw_assert('NotIsInstance', context.exception, BulkAssertionError)
@ddt.data(*list(itertools.product(
CONTEXT_PASSING_ASSERTIONS,
CONTEXT_FAILING_ASSERTIONS,
CONTEXT_FAILING_ASSERTIONS
)))
@ddt.unpack
def test_bulk_assert(self, passing_assertion, failing_assertion1, failing_assertion2):
contextmanager = self.bulk_assertions()
contextmanager.__enter__()
self._run_assertion(passing_assertion)
self._run_assertion(failing_assertion1)
self._run_assertion(failing_assertion2)
with self._raw_assert('Raises', BulkAssertionError) as context:
contextmanager.__exit__(None, None, None)
self._raw_assert('Equals', len(context.exception.errors), 2)
@ddt.data(*list(itertools.product(
CONTEXT_FAILING_ASSERTIONS
)))
@ddt.unpack
def test_nested_bulk_asserts(self, failing_assertion):
with self._raw_assert('Raises', BulkAssertionError) as context:
with self.bulk_assertions():
self._run_assertion(failing_assertion)
with self.bulk_assertions():
self._run_assertion(failing_assertion)
self._run_assertion(failing_assertion)
self._raw_assert('Equal', len(context.exception.errors), 3)
@ddt.data(*list(itertools.product(
CONTEXT_PASSING_ASSERTIONS,
CONTEXT_FAILING_ASSERTIONS,
CONTEXT_FAILING_ASSERTIONS
)))
@ddt.unpack
def test_bulk_assert_closed(self, passing_assertion, failing_assertion1, failing_assertion2):
with self._raw_assert('Raises', BulkAssertionError) as context:
with self.bulk_assertions():
self._run_assertion(passing_assertion)
self._run_assertion(failing_assertion1)
self._raw_assert('Equals', len(context.exception.errors), 1)
with self._raw_assert('Raises', AssertionError) as context:
self._run_assertion(failing_assertion2)
self._raw_assert('NotIsInstance', context.exception, BulkAssertionError)