chore: move ModuleSystemShim above DescriptorSystem

This commit is contained in:
Kaustav Banerjee
2022-12-22 13:58:03 +05:30
committed by Agrendalath
parent e01afb1c3a
commit 4ab77adecc

View File

@@ -1043,266 +1043,6 @@ class MetricsMixin:
)
class DescriptorSystem(MetricsMixin, ConfigurableFragmentWrapper, Runtime):
"""
Base class for :class:`Runtime`s to be used with :class:`XModuleDescriptor`s
"""
def __init__(
self, load_item, resources_fs, error_tracker, get_policy=None, disabled_xblock_types=lambda: [], **kwargs
):
"""
load_item: Takes a Location and returns an XModuleDescriptor
resources_fs: A Filesystem object that contains all of the
resources needed for the course
error_tracker: A hook for tracking errors in loading the descriptor.
Used for example to get a list of all non-fatal problems on course
load, and display them to the user.
See errortracker.py for more documentation
get_policy: a function that takes a usage id and returns a dict of
policy to apply.
local_resource_url: an implementation of :meth:`xblock.runtime.Runtime.local_resource_url`
"""
kwargs.setdefault('id_reader', OpaqueKeyReader())
kwargs.setdefault('id_generator', AsideKeyGenerator())
super().__init__(**kwargs)
# This is used by XModules to write out separate files during xml export
self.export_fs = None
self.load_item = load_item
self.resources_fs = resources_fs
self.error_tracker = error_tracker
if get_policy:
self.get_policy = get_policy
else:
self.get_policy = lambda u: {}
self.disabled_xblock_types = disabled_xblock_types
def get_block(self, usage_id, for_parent=None):
"""See documentation for `xblock.runtime:Runtime.get_block`"""
return self.load_item(usage_id, for_parent=for_parent)
def load_block_type(self, block_type):
"""
Returns a subclass of :class:`.XBlock` that corresponds to the specified `block_type`.
"""
if block_type in self.disabled_xblock_types():
return self.default_class
return super().load_block_type(block_type)
def get_field_provenance(self, xblock, field):
"""
For the given xblock, return a dict for the field's current state:
{
'default_value': what json'd value will take effect if field is unset: either the field default or
inherited value,
'explicitly_set': boolean for whether the current value is set v default/inherited,
}
:param xblock:
:param field:
"""
# pylint: disable=protected-access
# in runtime b/c runtime contains app-specific xblock behavior. Studio's the only app
# which needs this level of introspection right now. runtime also is 'allowed' to know
# about the kvs, dbmodel, etc.
result = {}
result['explicitly_set'] = xblock._field_data.has(xblock, field.name)
try:
result['default_value'] = xblock._field_data.default(xblock, field.name)
except KeyError:
result['default_value'] = field.to_json(field.default)
return result
def handler_url(self, block, handler_name, suffix='', query='', thirdparty=False):
# Currently, Modulestore is responsible for instantiating DescriptorSystems
# This means that LMS/CMS don't have a way to define a subclass of DescriptorSystem
# that implements the correct handler url. So, for now, instead, we will reference a
# global function that the application can override.
return descriptor_global_handler_url(block, handler_name, suffix, query, thirdparty)
def local_resource_url(self, block, uri):
"""
See :meth:`xblock.runtime.Runtime:local_resource_url` for documentation.
"""
# Currently, Modulestore is responsible for instantiating DescriptorSystems
# This means that LMS/CMS don't have a way to define a subclass of DescriptorSystem
# that implements the correct local_resource_url. So, for now, instead, we will reference a
# global function that the application can override.
return descriptor_global_local_resource_url(block, uri)
def applicable_aside_types(self, block):
"""
See :meth:`xblock.runtime.Runtime:applicable_aside_types` for documentation.
"""
potential_set = set(super().applicable_aside_types(block))
if getattr(block, 'xmodule_runtime', None) is not None:
if hasattr(block.xmodule_runtime, 'applicable_aside_types'):
application_set = set(block.xmodule_runtime.applicable_aside_types(block))
return list(potential_set.intersection(application_set))
return list(potential_set)
def resource_url(self, resource):
"""
See :meth:`xblock.runtime.Runtime:resource_url` for documentation.
"""
raise NotImplementedError("edX Platform doesn't currently implement XBlock resource urls")
def add_block_as_child_node(self, block, node):
child = etree.SubElement(node, block.category)
child.set('url_name', block.url_name)
block.add_xml_to_node(child)
def publish(self, block, event_type, event): # lint-amnesty, pylint: disable=arguments-differ
# A stub publish method that doesn't emit any events from XModuleDescriptors.
pass
def service(self, block, service_name):
"""
Runtime-specific override for the XBlock service manager. If a service is not currently
instantiated and is declared as a critical requirement, an attempt is made to load the
module.
Arguments:
block (an XBlock): this block's class will be examined for service
decorators.
service_name (string): the name of the service requested.
Returns:
An object implementing the requested service, or None.
"""
# getting the service from parent module. making sure of block service declarations.
service = super().service(block=block, service_name=service_name)
# Passing the block to service if it is callable e.g. XBlockI18nService. It is the responsibility of calling
# service to handle the passing argument.
if callable(service):
return service(block)
return service
class XMLParsingSystem(DescriptorSystem): # lint-amnesty, pylint: disable=abstract-method, missing-class-docstring
def __init__(self, process_xml, **kwargs):
"""
process_xml: Takes an xml string, and returns a XModuleDescriptor
created from that xml
"""
super().__init__(**kwargs)
self.process_xml = process_xml
def _usage_id_from_node(self, node, parent_id, id_generator=None):
"""Create a new usage id from an XML dom node.
Args:
node (lxml.etree.Element): The DOM node to interpret.
parent_id: The usage ID of the parent block
id_generator (IdGenerator): The :class:`.IdGenerator` to use
for creating ids
Returns:
UsageKey: the usage key for the new xblock
"""
return self.xblock_from_node(node, parent_id, id_generator).scope_ids.usage_id
def xblock_from_node(self, node, parent_id, id_generator=None):
"""
Create an XBlock instance from XML data.
Args:
xml_data (string): A string containing valid xml.
system (XMLParsingSystem): The :class:`.XMLParsingSystem` used to connect the block
to the outside world.
id_generator (IdGenerator): An :class:`~xblock.runtime.IdGenerator` that
will be used to construct the usage_id and definition_id for the block.
Returns:
XBlock: The fully instantiated :class:`~xblock.core.XBlock`.
"""
id_generator = id_generator or self.id_generator
# leave next line commented out - useful for low-level debugging
# log.debug('[_usage_id_from_node] tag=%s, class=%s' % (node.tag, xblock_class))
block_type = node.tag
# remove xblock-family from elements
node.attrib.pop('xblock-family', None)
url_name = node.get('url_name') # difference from XBlock.runtime
def_id = id_generator.create_definition(block_type, url_name)
usage_id = id_generator.create_usage(def_id)
keys = ScopeIds(None, block_type, def_id, usage_id)
block_class = self.mixologist.mix(self.load_block_type(block_type))
aside_children = self.parse_asides(node, def_id, usage_id, id_generator)
asides_tags = [x.tag for x in aside_children]
block = block_class.parse_xml(node, self, keys, id_generator)
self._convert_reference_fields_to_keys(block) # difference from XBlock.runtime
block.parent = parent_id
block.save()
asides = self.get_asides(block)
for asd in asides:
if asd.scope_ids.block_type in asides_tags:
block.add_aside(asd)
return block
def parse_asides(self, node, def_id, usage_id, id_generator):
"""pull the asides out of the xml payload and instantiate them"""
aside_children = []
for child in node.iterchildren():
# get xblock-family from node
xblock_family = child.attrib.pop('xblock-family', None)
if xblock_family:
xblock_family = self._family_id_to_superclass(xblock_family)
if issubclass(xblock_family, XBlockAside):
aside_children.append(child)
# now process them & remove them from the xml payload
for child in aside_children:
self._aside_from_xml(child, def_id, usage_id, id_generator)
node.remove(child)
return aside_children
def _make_usage_key(self, course_key, value):
"""
Makes value into a UsageKey inside the specified course.
If value is already a UsageKey, returns that.
"""
if isinstance(value, UsageKey):
return value
usage_key = UsageKey.from_string(value)
return usage_key.map_into_course(course_key)
def _convert_reference_fields_to_keys(self, xblock):
"""
Find all fields of type reference and convert the payload into UsageKeys
"""
course_key = xblock.scope_ids.usage_id.course_key
for field in xblock.fields.values():
if field.is_set_on(xblock):
field_value = getattr(xblock, field.name)
if field_value is None:
continue
elif isinstance(field, Reference):
setattr(xblock, field.name, self._make_usage_key(course_key, field_value))
elif isinstance(field, ReferenceList):
setattr(xblock, field.name, [self._make_usage_key(course_key, ele) for ele in field_value])
elif isinstance(field, ReferenceValueDict):
for key, subvalue in field_value.items():
assert isinstance(subvalue, str)
field_value[key] = self._make_usage_key(course_key, subvalue)
setattr(xblock, field.name, field_value)
class ModuleSystemShim:
"""
This shim provides the properties formerly available from ModuleSystem which are now being provided by services.
@@ -1645,6 +1385,266 @@ class ModuleSystemShim:
return self.descriptor_runtime.course_id.for_branch(None)
class DescriptorSystem(MetricsMixin, ConfigurableFragmentWrapper, Runtime):
"""
Base class for :class:`Runtime`s to be used with :class:`XModuleDescriptor`s
"""
def __init__(
self, load_item, resources_fs, error_tracker, get_policy=None, disabled_xblock_types=lambda: [], **kwargs
):
"""
load_item: Takes a Location and returns an XModuleDescriptor
resources_fs: A Filesystem object that contains all of the
resources needed for the course
error_tracker: A hook for tracking errors in loading the descriptor.
Used for example to get a list of all non-fatal problems on course
load, and display them to the user.
See errortracker.py for more documentation
get_policy: a function that takes a usage id and returns a dict of
policy to apply.
local_resource_url: an implementation of :meth:`xblock.runtime.Runtime.local_resource_url`
"""
kwargs.setdefault('id_reader', OpaqueKeyReader())
kwargs.setdefault('id_generator', AsideKeyGenerator())
super().__init__(**kwargs)
# This is used by XModules to write out separate files during xml export
self.export_fs = None
self.load_item = load_item
self.resources_fs = resources_fs
self.error_tracker = error_tracker
if get_policy:
self.get_policy = get_policy
else:
self.get_policy = lambda u: {}
self.disabled_xblock_types = disabled_xblock_types
def get_block(self, usage_id, for_parent=None):
"""See documentation for `xblock.runtime:Runtime.get_block`"""
return self.load_item(usage_id, for_parent=for_parent)
def load_block_type(self, block_type):
"""
Returns a subclass of :class:`.XBlock` that corresponds to the specified `block_type`.
"""
if block_type in self.disabled_xblock_types():
return self.default_class
return super().load_block_type(block_type)
def get_field_provenance(self, xblock, field):
"""
For the given xblock, return a dict for the field's current state:
{
'default_value': what json'd value will take effect if field is unset: either the field default or
inherited value,
'explicitly_set': boolean for whether the current value is set v default/inherited,
}
:param xblock:
:param field:
"""
# pylint: disable=protected-access
# in runtime b/c runtime contains app-specific xblock behavior. Studio's the only app
# which needs this level of introspection right now. runtime also is 'allowed' to know
# about the kvs, dbmodel, etc.
result = {}
result['explicitly_set'] = xblock._field_data.has(xblock, field.name)
try:
result['default_value'] = xblock._field_data.default(xblock, field.name)
except KeyError:
result['default_value'] = field.to_json(field.default)
return result
def handler_url(self, block, handler_name, suffix='', query='', thirdparty=False):
# Currently, Modulestore is responsible for instantiating DescriptorSystems
# This means that LMS/CMS don't have a way to define a subclass of DescriptorSystem
# that implements the correct handler url. So, for now, instead, we will reference a
# global function that the application can override.
return descriptor_global_handler_url(block, handler_name, suffix, query, thirdparty)
def local_resource_url(self, block, uri):
"""
See :meth:`xblock.runtime.Runtime:local_resource_url` for documentation.
"""
# Currently, Modulestore is responsible for instantiating DescriptorSystems
# This means that LMS/CMS don't have a way to define a subclass of DescriptorSystem
# that implements the correct local_resource_url. So, for now, instead, we will reference a
# global function that the application can override.
return descriptor_global_local_resource_url(block, uri)
def applicable_aside_types(self, block):
"""
See :meth:`xblock.runtime.Runtime:applicable_aside_types` for documentation.
"""
potential_set = set(super().applicable_aside_types(block))
if getattr(block, 'xmodule_runtime', None) is not None:
if hasattr(block.xmodule_runtime, 'applicable_aside_types'):
application_set = set(block.xmodule_runtime.applicable_aside_types(block))
return list(potential_set.intersection(application_set))
return list(potential_set)
def resource_url(self, resource):
"""
See :meth:`xblock.runtime.Runtime:resource_url` for documentation.
"""
raise NotImplementedError("edX Platform doesn't currently implement XBlock resource urls")
def add_block_as_child_node(self, block, node):
child = etree.SubElement(node, block.category)
child.set('url_name', block.url_name)
block.add_xml_to_node(child)
def publish(self, block, event_type, event): # lint-amnesty, pylint: disable=arguments-differ
# A stub publish method that doesn't emit any events from XModuleDescriptors.
pass
def service(self, block, service_name):
"""
Runtime-specific override for the XBlock service manager. If a service is not currently
instantiated and is declared as a critical requirement, an attempt is made to load the
module.
Arguments:
block (an XBlock): this block's class will be examined for service
decorators.
service_name (string): the name of the service requested.
Returns:
An object implementing the requested service, or None.
"""
# getting the service from parent module. making sure of block service declarations.
service = super().service(block=block, service_name=service_name)
# Passing the block to service if it is callable e.g. XBlockI18nService. It is the responsibility of calling
# service to handle the passing argument.
if callable(service):
return service(block)
return service
class XMLParsingSystem(DescriptorSystem): # lint-amnesty, pylint: disable=abstract-method, missing-class-docstring
def __init__(self, process_xml, **kwargs):
"""
process_xml: Takes an xml string, and returns a XModuleDescriptor
created from that xml
"""
super().__init__(**kwargs)
self.process_xml = process_xml
def _usage_id_from_node(self, node, parent_id, id_generator=None):
"""Create a new usage id from an XML dom node.
Args:
node (lxml.etree.Element): The DOM node to interpret.
parent_id: The usage ID of the parent block
id_generator (IdGenerator): The :class:`.IdGenerator` to use
for creating ids
Returns:
UsageKey: the usage key for the new xblock
"""
return self.xblock_from_node(node, parent_id, id_generator).scope_ids.usage_id
def xblock_from_node(self, node, parent_id, id_generator=None):
"""
Create an XBlock instance from XML data.
Args:
xml_data (string): A string containing valid xml.
system (XMLParsingSystem): The :class:`.XMLParsingSystem` used to connect the block
to the outside world.
id_generator (IdGenerator): An :class:`~xblock.runtime.IdGenerator` that
will be used to construct the usage_id and definition_id for the block.
Returns:
XBlock: The fully instantiated :class:`~xblock.core.XBlock`.
"""
id_generator = id_generator or self.id_generator
# leave next line commented out - useful for low-level debugging
# log.debug('[_usage_id_from_node] tag=%s, class=%s' % (node.tag, xblock_class))
block_type = node.tag
# remove xblock-family from elements
node.attrib.pop('xblock-family', None)
url_name = node.get('url_name') # difference from XBlock.runtime
def_id = id_generator.create_definition(block_type, url_name)
usage_id = id_generator.create_usage(def_id)
keys = ScopeIds(None, block_type, def_id, usage_id)
block_class = self.mixologist.mix(self.load_block_type(block_type))
aside_children = self.parse_asides(node, def_id, usage_id, id_generator)
asides_tags = [x.tag for x in aside_children]
block = block_class.parse_xml(node, self, keys, id_generator)
self._convert_reference_fields_to_keys(block) # difference from XBlock.runtime
block.parent = parent_id
block.save()
asides = self.get_asides(block)
for asd in asides:
if asd.scope_ids.block_type in asides_tags:
block.add_aside(asd)
return block
def parse_asides(self, node, def_id, usage_id, id_generator):
"""pull the asides out of the xml payload and instantiate them"""
aside_children = []
for child in node.iterchildren():
# get xblock-family from node
xblock_family = child.attrib.pop('xblock-family', None)
if xblock_family:
xblock_family = self._family_id_to_superclass(xblock_family)
if issubclass(xblock_family, XBlockAside):
aside_children.append(child)
# now process them & remove them from the xml payload
for child in aside_children:
self._aside_from_xml(child, def_id, usage_id, id_generator)
node.remove(child)
return aside_children
def _make_usage_key(self, course_key, value):
"""
Makes value into a UsageKey inside the specified course.
If value is already a UsageKey, returns that.
"""
if isinstance(value, UsageKey):
return value
usage_key = UsageKey.from_string(value)
return usage_key.map_into_course(course_key)
def _convert_reference_fields_to_keys(self, xblock):
"""
Find all fields of type reference and convert the payload into UsageKeys
"""
course_key = xblock.scope_ids.usage_id.course_key
for field in xblock.fields.values():
if field.is_set_on(xblock):
field_value = getattr(xblock, field.name)
if field_value is None:
continue
elif isinstance(field, Reference):
setattr(xblock, field.name, self._make_usage_key(course_key, field_value))
elif isinstance(field, ReferenceList):
setattr(xblock, field.name, [self._make_usage_key(course_key, ele) for ele in field_value])
elif isinstance(field, ReferenceValueDict):
for key, subvalue in field_value.items():
assert isinstance(subvalue, str)
field_value[key] = self._make_usage_key(course_key, subvalue)
setattr(xblock, field.name, field_value)
class ModuleSystem(MetricsMixin, ConfigurableFragmentWrapper, ModuleSystemShim, Runtime):
"""
This is an abstraction such that x_modules can function independent
@@ -1857,4 +1857,4 @@ class DoNothingCache:
return None
def set(self, key, value, timeout=None):
pass
pass