Files
Muhammad Farhan Khan f51343c871 refactor: move transcripts_utils from xmodule to video-config (#37600)
As part of the ongoing effort to deprecate and eventually remove xmodule,
we’ve started gradually migrating the necessary code files from xmodule
to more appropriate locations within the codebase.

Ticket: https://github.com/openedx/public-engineering/issues/445

Also: this tweaks importlinter ignores & add follow-up issue links

Co-authored-by: Kyle McCormick <kyle@axim.org>
2025-11-14 18:26:35 +00:00

842 lines
35 KiB
Python

"""
Helper methods for Studio views.
Before adding more stuff here, take a look at:
https://github.com/openedx/edx-platform/issues/37637
Only Studio-specfic helper functions should be added here.
Platform-wide Python APIs should be added to an appropriate api.py file instead.
"""
from __future__ import annotations
import json
import logging
import pathlib
import urllib
from lxml import etree
from mimetypes import guess_type
import re
from attrs import frozen, Factory
from django.core.files.base import ContentFile
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils.translation import gettext as _
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import DefinitionLocator, LocalId
from openedx.core.djangoapps.content_tagging.types import TagValuesByObjectIdDict
from xblock.core import XBlock
from xblock.fields import ScopeIds
from xblock.runtime import IdGenerator
from xmodule.contentstore.content import StaticContent
from xmodule.contentstore.django import contentstore
from xmodule.exceptions import NotFoundError
from xmodule.modulestore.django import modulestore
from xmodule.xml_block import XmlMixin
from openedx.core.djangoapps.video_config.transcripts_utils import Transcript, build_components_import_path
from edxval.api import (
create_external_video,
create_or_update_video_transcript,
)
from cms.djangoapps.models.settings.course_grading import CourseGradingModel
from cms.lib.xblock.upstream_sync import UpstreamLink, UpstreamLinkException
from cms.lib.xblock.upstream_sync_block import fetch_customizable_fields_from_block
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
import openedx.core.djangoapps.content_staging.api as content_staging_api
import openedx.core.djangoapps.content_tagging.api as content_tagging_api
from openedx.core.djangoapps.content_staging.data import LIBRARY_SYNC_PURPOSE
from .utils import reverse_course_url, reverse_library_url, reverse_usage_url
log = logging.getLogger(__name__)
User = get_user_model()
# Note: Grader types are used throughout the platform but most usages are simply in-line
# strings. In addition, new grader types can be defined on the fly anytime one is needed
# (because they're just strings). This dict is an attempt to constrain the sprawl in Studio.
GRADER_TYPES = {
"HOMEWORK": "Homework",
"LAB": "Lab",
"ENTRANCE_EXAM": "Entrance Exam",
"MIDTERM_EXAM": "Midterm Exam",
"FINAL_EXAM": "Final Exam"
}
def get_parent_xblock(xblock):
"""
Returns the xblock that is the parent of the specified xblock, or None if it has no parent.
"""
locator = xblock.location
parent_location = modulestore().get_parent_location(locator)
if parent_location is None:
return None
return modulestore().get_item(parent_location)
def is_unit(xblock, parent_xblock=None):
"""
Returns true if the specified xblock is a vertical that is treated as a unit.
A unit is a vertical that is a direct child of a sequential (aka a subsection).
"""
if xblock.category == 'vertical':
if parent_xblock is None:
parent_xblock = get_parent_xblock(xblock)
parent_category = parent_xblock.category if parent_xblock else None
return parent_category == 'sequential'
return False
def is_library_content(xblock):
"""
Returns true if the specified xblock is library content.
"""
return xblock.category == 'library_content'
def get_parent_if_split_test(xblock):
"""
Returns the parent of the specified xblock if it is a split test, otherwise returns None.
"""
parent_xblock = get_parent_xblock(xblock)
if parent_xblock and parent_xblock.category == 'split_test':
return parent_xblock
def xblock_has_own_studio_page(xblock, parent_xblock=None):
"""
Returns true if the specified xblock has an associated Studio page. Most xblocks do
not have their own page but are instead shown on the page of their parent. There
are a few exceptions:
1. Courses
2. Verticals that are either:
- themselves treated as units
- a direct child of a unit
3. XBlocks that support children
"""
category = xblock.category
if is_unit(xblock, parent_xblock):
return True
elif category == 'vertical':
if parent_xblock is None:
parent_xblock = get_parent_xblock(xblock)
return is_unit(parent_xblock) if parent_xblock else False
# All other xblocks with children have their own page
return xblock.has_children
def xblock_studio_url(xblock, parent_xblock=None, find_parent=False):
"""
Returns the Studio editing URL for the specified xblock.
You can pass the parent xblock as an optimization, to avoid needing to load
it twice, as sometimes the parent has to be checked.
If you pass in a leaf block that doesn't have its own Studio page, this will
normally return None, but if you use find_parent=True, this will find the
nearest ancestor (usually the parent unit) that does have a Studio page and
return that URL.
"""
if not xblock_has_own_studio_page(xblock, parent_xblock):
if find_parent:
while xblock and not xblock_has_own_studio_page(xblock, parent_xblock):
xblock = parent_xblock or get_parent_xblock(xblock)
parent_xblock = None
if not xblock:
return None
else:
return None
category = xblock.category
if category == 'course':
return reverse_course_url('course_handler', xblock.location.course_key)
elif category in ('chapter', 'sequential'):
return '{url}?show={usage_key}'.format(
url=reverse_course_url('course_handler', xblock.location.course_key),
usage_key=urllib.parse.quote(str(xblock.location))
)
elif category == 'library':
library_key = xblock.location.course_key
return reverse_library_url('library_handler', library_key)
else:
return reverse_usage_url('container_handler', xblock.location)
def xblock_lms_url(xblock) -> str:
"""
Returns the LMS URL for the specified xblock.
Args:
xblock: The xblock to get the LMS URL for.
Returns:
str: The LMS URL for the specified xblock.
"""
lms_root_url = configuration_helpers.get_value('LMS_ROOT_URL', settings.LMS_ROOT_URL)
return f"{lms_root_url}/courses/{xblock.location.course_key}/jump_to/{xblock.location}"
def xblock_embed_lms_url(xblock) -> str:
"""
Returns the LMS URL for the specified xblock in embed mode.
Args:
xblock: The xblock to get the LMS URL for.
Returns:
str: The LMS URL for the specified xblock in embed mode.
"""
lms_root_url = configuration_helpers.get_value('LMS_ROOT_URL', settings.LMS_ROOT_URL)
return f"{lms_root_url}/xblock/{xblock.location}"
def xblock_type_display_name(xblock, default_display_name=None):
"""
Returns the display name for the specified type of xblock. Note that an instance can be passed in
for context dependent names, e.g. a vertical beneath a sequential is a Unit.
:param xblock: An xblock instance or the type of xblock (as a string).
:param default_display_name: The default value to return if no display name can be found.
:return:
"""
if hasattr(xblock, 'category'):
category = xblock.category
if category == 'vertical' and not is_unit(xblock):
return _('Vertical')
else:
category = xblock
if category == 'chapter':
return _('Section')
elif category == 'sequential':
return _('Subsection')
elif category == 'vertical':
return _('Unit')
elif category == 'problem':
# The problem XBlock's display_name.default is not helpful ("Blank Problem") but changing it could have
# too many ripple effects in other places, so we have a special case for capa problems here.
# Note: With a ProblemBlock instance, we could actually check block.problem_types to give a more specific
# description like "Multiple Choice Problem", but that won't work if our 'block' argument is just the block_type
# string ("problem").
return _('Problem')
elif category == 'library_v2':
return _('Library Content')
elif category == 'itembank':
return _('Problem Bank')
component_class = XBlock.load_class(category)
if hasattr(component_class, 'display_name') and component_class.display_name.default:
return _(component_class.display_name.default) # lint-amnesty, pylint: disable=translation-of-non-string
else:
return default_display_name
def xblock_primary_child_category(xblock):
"""
Returns the primary child category for the specified xblock, or None if there is not a primary category.
"""
category = xblock.category
if category == 'course':
return 'chapter'
elif category == 'chapter':
return 'sequential'
elif category == 'sequential':
return 'vertical'
return None
def remove_entrance_exam_graders(course_key, user):
"""
Removes existing entrance exam graders attached to the specified course
Typically used when adding/removing an entrance exam.
"""
grading_model = CourseGradingModel.fetch(course_key)
graders = grading_model.graders
for i, grader in enumerate(graders):
if grader['type'] == GRADER_TYPES['ENTRANCE_EXAM']:
CourseGradingModel.delete_grader(course_key, i, user)
class ImportIdGenerator(IdGenerator):
"""
Modulestore's IdGenerator doesn't work for importing single blocks as OLX,
so we implement our own
"""
def __init__(self, context_key):
super().__init__()
self.context_key = context_key
def create_aside(self, definition_id, usage_id, aside_type):
""" Generate a new aside key """
raise NotImplementedError()
def create_usage(self, def_id) -> UsageKey:
""" Generate a new UsageKey for an XBlock """
# Note: Split modulestore will detect this temporary ID and create a new block ID when the XBlock is saved.
return self.context_key.make_usage_key(def_id.block_type, LocalId())
def create_definition(self, block_type, slug=None) -> DefinitionLocator:
""" Generate a new definition_id for an XBlock """
# Note: Split modulestore will detect this temporary ID and create a new definition ID when the XBlock is saved.
return DefinitionLocator(block_type, LocalId(block_type))
@frozen
class StaticFileNotices:
""" Information about what static files were updated (or not) when pasting content into another course """
new_files: list[str] = Factory(list)
conflicting_files: list[str] = Factory(list)
error_files: list[str] = Factory(list)
def _rewrite_static_asset_references(downstream_xblock: XBlock, substitutions: dict[str, str], user_id: int) -> None:
"""
Rewrite the static asset references in the OLX string to point to the new locations in the course.
"""
store = modulestore()
if hasattr(downstream_xblock, "data"):
data_with_substitutions = downstream_xblock.data
for old_static_ref, new_static_ref in substitutions.items():
data_with_substitutions = _replace_strings(
data_with_substitutions,
old_static_ref,
new_static_ref,
)
downstream_xblock.data = data_with_substitutions
store.update_item(downstream_xblock, user_id)
for child in downstream_xblock.get_children():
_rewrite_static_asset_references(child, substitutions, user_id)
def _insert_static_files_into_downstream_xblock(
downstream_xblock: XBlock, staged_content_id: int, request
) -> StaticFileNotices:
"""
Gets static files from staged content, and inserts them into the downstream XBlock.
"""
static_files = content_staging_api.get_staged_content_static_files(staged_content_id)
notices, substitutions = _import_files_into_course(
course_key=downstream_xblock.context_key,
staged_content_id=staged_content_id,
static_files=static_files,
usage_key=downstream_xblock.usage_key,
)
# FIXME: This code shouldn't have any special cases for specific block types like video
# in the future.
if downstream_xblock.usage_key.block_type == 'video':
_import_transcripts(
downstream_xblock,
staged_content_id=staged_content_id,
static_files=static_files,
)
if substitutions:
# Rewrite the OLX's static asset references to point to the new
# locations for those assets. See _import_files_into_course for more
# info on why this is necessary.
_rewrite_static_asset_references(downstream_xblock, substitutions, request.user.id)
return notices
def _replace_strings(obj: dict | list | str, old_str: str, new_str: str):
"""
Replacing any instances of the given `old_str` string with `new_str` in any strings found in the the given object.
Returns the updated object.
"""
if isinstance(obj, dict):
for key, value in obj.items():
obj[key] = _replace_strings(value, old_str, new_str)
elif isinstance(obj, list):
for index, item in enumerate(obj):
obj[index] = _replace_strings(item, old_str, new_str)
elif isinstance(obj, str):
return obj.replace(old_str, new_str)
return obj
def import_staged_content_from_user_clipboard(parent_key: UsageKey, request) -> tuple[XBlock | None, StaticFileNotices]:
"""
Import a block (along with its children and any required static assets) from
the "staged" OLX in the user's clipboard.
Does not deal with permissions or REST stuff - do that before calling this.
Returns (1) the newly created block on success or None if the clipboard is
empty, and (2) a summary of changes made to static files in the destination
course.
"""
from cms.djangoapps.contentstore.views.preview import _load_preview_block
user_clipboard = content_staging_api.get_user_clipboard(request.user.id)
if not user_clipboard:
# Clipboard is empty or expired/error/loading
return None, StaticFileNotices()
olx_str = content_staging_api.get_staged_content_olx(user_clipboard.content.id)
node = etree.fromstring(olx_str)
store = modulestore()
with store.bulk_operations(parent_key.course_key):
parent_descriptor = store.get_item(parent_key)
# Some blocks like drag-and-drop only work here with the full XBlock runtime loaded:
parent_xblock = _load_preview_block(request, parent_descriptor)
new_xblock = _import_xml_node_to_parent(
node,
parent_xblock,
store,
user=request.user,
slug_hint=(
user_clipboard.source_usage_key.block_id
if isinstance(user_clipboard.source_usage_key, UsageKey) else None
),
tags=user_clipboard.content.tags,
)
usage_key = new_xblock.usage_key
if usage_key.block_type == 'video':
# The edx_video_id must always be new so as not
# to interfere with the data of the copied block
new_xblock.edx_video_id = create_external_video(display_name='external video')
store.update_item(new_xblock, request.user.id)
notices = _insert_static_files_into_downstream_xblock(new_xblock, user_clipboard.content.id, request)
return new_xblock, notices
def import_static_assets_for_library_sync(downstream_xblock: XBlock, lib_block: XBlock, request) -> StaticFileNotices:
"""
Import the static assets from the library xblock to the downstream xblock
through staged content. Also updates the OLX references to point to the new
locations of those assets in the downstream course.
Does not deal with permissions or REST stuff - do that before calling this.
Returns a summary of changes made to static files in the destination
course.
"""
if not lib_block.runtime.get_block_assets(lib_block, fetch_asset_data=False):
return StaticFileNotices()
staged_content = content_staging_api.stage_xblock_temporarily(lib_block, request.user.id, LIBRARY_SYNC_PURPOSE)
if not staged_content:
# expired/error/loading
return StaticFileNotices()
store = modulestore()
try:
with store.bulk_operations(downstream_xblock.context_key):
# FIXME: This code shouldn't have any special cases for specific block types like video
# in the future.
if downstream_xblock.usage_key.block_type == 'video' and not downstream_xblock.edx_video_id:
# If the `downstream_xblock` is a new created block, we need to create
# a new `edx_video_id` to import the transcripts.
downstream_xblock.edx_video_id = create_external_video(display_name='external video')
store.update_item(downstream_xblock, request.user.id)
# Now handle static files that need to go into Files & Uploads.
# If the required files already exist, nothing will happen besides updating the olx.
notices = _insert_static_files_into_downstream_xblock(downstream_xblock, staged_content.id, request)
finally:
staged_content.delete()
return notices
def _fetch_and_set_upstream_link(
copied_from_block: str,
copied_from_version_num: int,
temp_xblock: XBlock,
user: User
):
"""
Fetch and set upstream link for the given xblock which is being pasted. This function handles following cases:
* the xblock is copied from a v2 library; the library block is set as upstream.
* the xblock is copied from a course; no upstream is set, only copied_from_block is set.
* the xblock is copied from a course where the source block was imported from a library; the original library block
is set as upstream.
"""
# Try to link the pasted block (downstream) to the copied block (upstream).
temp_xblock.upstream = copied_from_block
try:
upstream_link = UpstreamLink.get_for_block(temp_xblock)
except UpstreamLinkException:
# Usually this will fail. For example, if the copied block is a modulestore course block, it can't be an
# upstream. That's fine! Instead, we store a reference to where this block was copied from, in the
# 'copied_from_block' field (from AuthoringMixin).
# In case if the source block was imported from a library, we need to check its upstream
# and set the same upstream link for the new block.
source_descriptor = modulestore().get_item(UsageKey.from_string(copied_from_block))
if source_descriptor.upstream:
_fetch_and_set_upstream_link(
source_descriptor.upstream,
source_descriptor.upstream_version,
temp_xblock,
user,
)
else:
# else we store a reference to where this block was copied from, in the 'copied_from_block'
# field (from AuthoringMixin).
temp_xblock.upstream = None
temp_xblock.copied_from_block = copied_from_block
else:
# But if it doesn't fail, then populate the `upstream_version` field based on what was copied. Note that
# this could be the latest published version, or it could be an an even newer draft version.
temp_xblock.upstream_version = copied_from_version_num
# Also, fetch upstream values (`upstream_display_name`, etc.).
# Recall that the copied block could be a draft. So, rather than fetching from the published upstream (which
# could be older), fetch from the copied block itself. That way, if an author customizes a field, but then
# later wants to restore it, it will restore to the value that the field had when the block was pasted. Of
# course, if the author later syncs updates from a *future* published upstream version, then that will fetch
# new values from the published upstream content.
if isinstance(upstream_link.upstream_key, UsageKey): # only if upstream is a block, not a container
fetch_customizable_fields_from_block(downstream=temp_xblock, user=user, upstream=temp_xblock)
# Although the above function will set all customisable fields to match its upstream_* counterpart
# We copy the downstream_customized list to the new block to avoid overriding user customisations on sync
# So we will have:
# temp_xblock.display_name == temp_xblock.upstream_display_name
# temp_xblock.data == temp_xblock.upstream_data # for html blocks
# Even then we want to set `downstream_customized` value to avoid overriding user customisations on sync
downstream_customized = temp_xblock.xml_attributes.get("downstream_customized", '[]')
temp_xblock.downstream_customized = json.loads(downstream_customized)
def _import_xml_node_to_parent(
node,
parent_xblock: XBlock,
# The modulestore we're using
store,
# The user who is performing this operation
user: User,
# Hint to use as usage ID (block_id) for the new XBlock
slug_hint: str | None = None,
# Content tags applied to the source XBlock(s)
tags: TagValuesByObjectIdDict | None = None,
) -> XBlock:
"""
Given an XML node representing a serialized XBlock (OLX), import it into modulestore 'store' as a child of the
specified parent block. Recursively copy children as needed.
"""
# pylint: disable=too-many-statements
runtime = parent_xblock.runtime
parent_key = parent_xblock.scope_ids.usage_id
block_type = node.tag
node_copied_from = node.attrib.get('copied_from_block', None)
node_copied_version = node.attrib.get('copied_from_version', None)
# Modulestore's IdGenerator here is SplitMongoIdManager which is assigned
# by SplitModuleStoreRuntime and since we need our custom ImportIdGenerator
# here we are temporaraliy swtiching it.
original_id_generator = runtime.id_generator
# Generate the new ID:
runtime.id_generator = ImportIdGenerator(parent_key.context_key)
def_id = runtime.id_generator.create_definition(block_type, slug_hint)
usage_id = runtime.id_generator.create_usage(def_id)
keys = ScopeIds(None, block_type, def_id, usage_id)
# parse_xml is a really messy API. We pass both 'keys' and 'id_generator' and, depending on the XBlock, either
# one may be used to determine the new XBlock's usage key, and the other will be ignored. e.g. video ignores
# 'keys' and uses 'id_generator', but the default XBlock parse_xml ignores 'id_generator' and uses 'keys'.
# For children of this block, obviously only id_generator is used.
xblock_class = runtime.load_block_type(block_type)
# Note: if we find a case where any XBlock needs access to the block-specific static files that were saved to
# export_fs during copying, we could make them available here via runtime.resources_fs before calling parse_xml.
# However, currently the only known case for that is video block's transcript files, and those will
# automatically be "carried over" to the new XBlock even in a different course because the video ID is the same,
# and VAL will thus make the transcript available.
child_nodes = []
if issubclass(xblock_class, XmlMixin):
# Hack: XBlocks that use "XmlMixin" have their own XML parsing behavior, and in particular if they encounter
# an XML node that has no children and has only a "url_name" attribute, they'll try to load the XML data
# from an XML file in runtime.resources_fs. But that file doesn't exist here. So we set at least one
# additional attribute here to make sure that url_name is not the only attribute; otherwise in some cases,
# XmlMixin.parse_xml will try to load an XML file that doesn't exist, giving an error. The name and value
# of this attribute don't matter and should be ignored.
node.attrib["x-is-pointer-node"] = "no"
if not xblock_class.has_children:
# No children to worry about. The XML may contain child nodes, but they're not XBlocks.
temp_xblock = xblock_class.parse_xml(node, runtime, keys)
else:
# We have to handle the children ourselves, because there are lots of complex interactions between
# * the vanilla XBlock parse_xml() method, and its lack of API for "create and save a new XBlock"
# * the XmlMixin version of parse_xml() which only works with XMLImportingModuleStoreRuntime,
# not modulestore or the v2 runtime
# * the modulestore APIs for creating and saving a new XBlock, which work but don't support XML parsing.
# We can safely assume that if the XBLock class supports children, every child node will be the XML
# serialization of a child block, in order. For blocks that don't support children, their XML content/nodes
# could be anything (e.g. HTML, capa)
node_without_children = etree.Element(node.tag, **node.attrib)
temp_xblock = xblock_class.parse_xml(node_without_children, runtime, keys)
child_nodes = list(node)
if issubclass(xblock_class, XmlMixin) and "x-is-pointer-node" in getattr(temp_xblock, "data", ""):
# Undo the "pointer node" hack if needed (e.g. for capa problems)
temp_xblock.data = re.sub(r'([^>]+) x-is-pointer-node="no"', r'\1', temp_xblock.data, count=1)
# Restore the original id_generator
runtime.id_generator = original_id_generator
if xblock_class.has_children and temp_xblock.children:
raise NotImplementedError("We don't yet support pasting XBlocks with children")
if node_copied_from:
_fetch_and_set_upstream_link(node_copied_from, node_copied_version, temp_xblock, user)
# Save the XBlock into modulestore. We need to save the block and its parent for this to work:
new_xblock = store.update_item(temp_xblock, user.id, allow_not_found=True)
new_xblock.parent = parent_key
parent_xblock.children.append(new_xblock.location)
store.update_item(parent_xblock, user.id)
children_handled = False
if hasattr(new_xblock, 'studio_post_paste'):
# Allow an XBlock to do anything fancy it may need to when pasted from the clipboard.
# These blocks may handle their own children or parenting if needed. Let them return booleans to
# let us know if we need to handle these or not.
children_handled = new_xblock.studio_post_paste(store, node)
if not children_handled:
for child_node in child_nodes:
_import_xml_node_to_parent(
child_node,
new_xblock,
store,
user=user,
tags=tags,
)
# Copy content tags to the new xblock
if new_xblock.upstream:
# copy the tags from the upstream as ready-only
content_tagging_api.copy_tags_as_read_only(
new_xblock.upstream,
new_xblock.location,
)
elif tags and node_copied_from:
object_tags = tags.get(node_copied_from)
if object_tags:
content_tagging_api.set_all_object_tags(
content_key=new_xblock.location,
object_tags=object_tags,
)
return new_xblock
def _import_files_into_course(
course_key: CourseKey,
staged_content_id: int,
static_files: list[content_staging_api.StagedContentFileData],
usage_key: UsageKey,
) -> tuple[StaticFileNotices, dict[str, str]]:
"""
For the given staged static asset files (which are in "Staged Content" such
as the user's clipbaord, but which need to end up in the course's Files &
Uploads page), import them into the destination course, unless they already
exist.
This function returns a tuple of StaticFileNotices (assets added, errors,
conflicts), and static asset path substitutions that should be made in the
OLX in order to paste this content into this course. The latter is for the
case in which we're brining content in from a v2 library, which stores
static assets locally to a Component and needs to go into a subdirectory
when pasting into a course to avoid overwriting commonly named things, e.g.
"figure1.png".
"""
# List of files that were newly added to the destination course
new_files = []
# List of files that conflicted with identically named files already in the destination course
conflicting_files = []
# List of files that had an error (shouldn't happen unless we have some kind of bug)
error_files = []
# Store a mapping of asset URLs that need to be modified for the destination
# assets. This is necessary when you take something from a library and paste
# it into a course, because we need to translate Component-local static
# assets and shove them into the Course's global Files & Uploads space in a
# nested directory structure.
substitutions = {}
for file_data_obj in static_files:
# At this point, we know this is a "Files & Uploads" asset that we may need to copy into the course:
try:
result, substitution_for_file = _import_file_into_course(
course_key,
staged_content_id,
file_data_obj,
usage_key,
)
if result is True:
new_files.append(file_data_obj.filename)
substitutions.update(substitution_for_file)
elif substitution_for_file:
# substitutions need to be made because OLX references to these files need to be updated
substitutions.update(substitution_for_file)
elif result is None:
pass # This file already exists; no action needed.
else:
conflicting_files.append(file_data_obj.filename)
except Exception: # lint-amnesty, pylint: disable=broad-except
error_files.append(file_data_obj.filename)
log.exception(f"Failed to import Files & Uploads file {file_data_obj.filename}")
notices = StaticFileNotices(
new_files=new_files,
conflicting_files=conflicting_files,
error_files=error_files,
)
return notices, substitutions
def _import_file_into_course(
course_key: CourseKey,
staged_content_id: int,
file_data_obj: content_staging_api.StagedContentFileData,
usage_key: UsageKey,
) -> tuple[bool | None, dict]:
"""
Import a single staged static asset file into the course, unless it already exists.
Returns True if it was imported, False if there's a conflict, or None if
the file already existed (no action needed).
"""
clipboard_file_path = file_data_obj.filename
# We need to generate an AssetKey to add an asset to a course. The mapping
# of directories '/' -> '_' is a long-existing contentstore convention that
# we're not going to attempt to change.
if clipboard_file_path.startswith('static/'):
# If it's in this form, it came from a library and assumes component-local assets
file_path = clipboard_file_path.removeprefix('static/')
import_path = build_components_import_path(usage_key, file_path)
filename = pathlib.Path(file_path).name
new_key = course_key.make_asset_key("asset", import_path.replace("/", "_"))
else:
# Otherwise it came from a course...
file_path = clipboard_file_path
import_path = None
filename = pathlib.Path(file_path).name
new_key = course_key.make_asset_key("asset", file_path.replace("/", "_"))
try:
current_file = contentstore().find(new_key)
except NotFoundError:
current_file = None
if not current_file:
# This static asset should be imported into the new course:
content_type = guess_type(filename)[0]
data = content_staging_api.get_staged_content_static_file_data(staged_content_id, clipboard_file_path)
if data is None:
raise NotFoundError(file_data_obj.source_key)
content = StaticContent(
new_key,
name=filename,
content_type=content_type,
data=data,
import_path=import_path
)
# If it's an image file, also generate the thumbnail:
thumbnail_content, thumbnail_location = contentstore().generate_thumbnail(content)
if thumbnail_content is not None:
content.thumbnail_location = thumbnail_location
contentstore().save(content)
return True, {clipboard_file_path: filename if not import_path else f"static/{import_path}"}
elif current_file.content_digest == file_data_obj.md5_hash:
# The file already exists and matches exactly, so no action is needed
return None, {}
else:
# There is a conflict with some other file that has the same name.
return False, {}
def _import_transcripts(
block: XBlock,
staged_content_id: int,
static_files: list[content_staging_api.StagedContentFileData],
):
"""
Adds transcripts to VAL using the new edx_video_id.
"""
for file_data_obj in static_files:
clipboard_file_path = file_data_obj.filename
data = content_staging_api.get_staged_content_static_file_data(
staged_content_id,
clipboard_file_path
)
if data is None:
raise NotFoundError(file_data_obj.source_key)
if clipboard_file_path.startswith('static/'):
# If it's in this form, it came from a library and assumes component-local assets
file_path = clipboard_file_path.removeprefix('static/')
else:
# Otherwise it came from a course...
file_path = clipboard_file_path
filename = pathlib.Path(file_path).name
language_code = next((k for k, v in block.transcripts.items() if v == filename), None)
if language_code:
sjson_subs = Transcript.convert(
content=data,
input_format=Transcript.SRT,
output_format=Transcript.SJSON
).encode()
create_or_update_video_transcript(
video_id=block.edx_video_id,
language_code=language_code,
metadata={
'file_format': Transcript.SJSON,
'language_code': language_code
},
file_data=ContentFile(sjson_subs),
)
def is_item_in_course_tree(item):
"""
Check that the item is in the course tree.
It's possible that the item is not in the course tree
if its parent has been deleted and is now an orphan.
"""
ancestor = item.get_parent()
while ancestor is not None and ancestor.location.block_type != "course":
ancestor = ancestor.get_parent()
return ancestor is not None
def concat_static_file_notices(notices: list[StaticFileNotices]) -> StaticFileNotices:
"""Combines multiple static file notices into a single object
Args:
notices: list of StaticFileNotices
Returns:
Single StaticFileNotices
"""
new_files = []
conflicting_files = []
error_files = []
for notice in notices:
new_files.extend(notice.new_files)
conflicting_files.extend(notice.conflicting_files)
error_files.extend(notice.error_files)
return StaticFileNotices(
new_files=list(set(new_files)),
conflicting_files=list(set(conflicting_files)),
error_files=list(set(error_files)),
)