Finishes refactoring libraries APIs [FC-0083] (#36468)

Addresses libraries API TODOs introduced by https://github.com/openedx/edx-platform/pull/36371
This commit is contained in:
Jillian
2025-04-07 23:47:04 +09:30
committed by GitHub
parent 0595e5a57d
commit 94468ef4b3
10 changed files with 2018 additions and 1715 deletions

View File

@@ -1,7 +1,10 @@
"""
Python API for working with content libraries
"""
from .collections import *
from .containers import *
from .courseware_import import *
from .exceptions import *
from .libraries import *
from .blocks import *
from . import permissions

View File

@@ -3,28 +3,835 @@ Content libraries API methods related to XBlocks/Components.
These methods don't enforce permissions (only the REST APIs do).
"""
# pylint: disable=unused-import
from dataclasses import dataclass
from datetime import datetime, timezone
import logging
import mimetypes
# TODO: move all the API methods related to blocks and assets in here from 'libraries.py'
# TODO: use __all__ to limit what symbols are public.
from .libraries import (
LibraryXBlockMetadata,
LibraryXBlockStaticFile,
LibraryXBlockType,
get_library_components,
get_library_block,
set_library_block_olx,
library_component_usage_key,
get_component_from_usage_key,
validate_can_add_block_to_library,
create_library_block,
import_staged_content_from_user_clipboard,
get_or_create_olx_media_type,
delete_library_block,
restore_library_block,
get_library_block_static_asset_files,
add_library_block_static_asset_file,
delete_library_block_static_asset_file,
publish_component_changes,
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core.validators import validate_unicode_slug
from django.db.models import QuerySet
from django.db import transaction
from django.utils.translation import gettext as _
from django.urls import reverse
from lxml import etree
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
from opaque_keys.edx.keys import UsageKeyV2
from openedx_events.content_authoring.data import (
LibraryBlockData,
LibraryCollectionData,
LibraryContainerData,
ContentObjectChangedData,
)
from openedx_events.content_authoring.signals import (
LIBRARY_BLOCK_CREATED,
LIBRARY_BLOCK_DELETED,
LIBRARY_BLOCK_UPDATED,
LIBRARY_COLLECTION_UPDATED,
LIBRARY_CONTAINER_UPDATED,
CONTENT_OBJECT_ASSOCIATIONS_CHANGED,
)
from xblock.core import XBlock
from openedx_learning.api import authoring as authoring_api
from openedx_learning.api.authoring_models import (
Component,
ComponentVersion,
LearningPackage,
MediaType,
)
from openedx.core.djangoapps.xblock.api import (
get_component_from_usage_key,
get_xblock_app_config,
xblock_type_display_name,
)
from openedx.core.types import User as UserType
from openedx.core.djangoapps.content_libraries import api as lib_api
from ..models import ContentLibrary
from ..permissions import CAN_EDIT_THIS_CONTENT_LIBRARY
from .exceptions import (
BlockLimitReachedError,
ContentLibraryBlockNotFound,
InvalidNameError,
LibraryBlockAlreadyExists,
)
from .libraries import (
library_component_usage_key,
require_permission_for_library_key,
PublishableItem,
)
log = logging.getLogger(__name__)
# The public API is only the following symbols:
__all__ = [
# Models
"LibraryXBlockMetadata",
"LibraryXBlockStaticFile",
# API methods
"get_library_components",
"get_library_block",
"set_library_block_olx",
"get_component_from_usage_key",
"validate_can_add_block_to_library",
"create_library_block",
"import_staged_content_from_user_clipboard",
"get_or_create_olx_media_type",
"delete_library_block",
"restore_library_block",
"get_library_block_static_asset_files",
"add_library_block_static_asset_file",
"delete_library_block_static_asset_file",
"publish_component_changes",
]
@dataclass(frozen=True, kw_only=True)
class LibraryXBlockMetadata(PublishableItem):
"""
Class that represents the metadata about an XBlock in a content library.
"""
usage_key: LibraryUsageLocatorV2
@classmethod
def from_component(cls, library_key, component, associated_collections=None):
"""
Construct a LibraryXBlockMetadata from a Component object.
"""
last_publish_log = component.versioning.last_publish_log
published_by = None
if last_publish_log and last_publish_log.published_by:
published_by = last_publish_log.published_by.username
draft = component.versioning.draft
published = component.versioning.published
last_draft_created = draft.created if draft else None
last_draft_created_by = draft.publishable_entity_version.created_by if draft else None
return cls(
usage_key=library_component_usage_key(
library_key,
component,
),
display_name=draft.title,
created=component.created,
modified=draft.created,
draft_version_num=draft.version_num,
published_version_num=published.version_num if published else None,
last_published=None if last_publish_log is None else last_publish_log.published_at,
published_by=published_by,
last_draft_created=last_draft_created,
last_draft_created_by=last_draft_created_by,
has_unpublished_changes=component.versioning.has_unpublished_changes,
collections=associated_collections or [],
)
@dataclass(frozen=True)
class LibraryXBlockStaticFile:
"""
Class that represents a static file in a content library, associated with
a particular XBlock.
"""
# File path e.g. "diagram.png"
# In some rare cases it might contain a folder part, e.g. "en/track1.srt"
path: str
# Publicly accessible URL where the file can be downloaded
url: str
# Size in bytes
size: int
def get_library_components(
library_key: LibraryLocatorV2,
text_search: str | None = None,
block_types: list[str] | None = None,
) -> QuerySet[Component]:
"""
Get the library components and filter.
TODO: Full text search needs to be implemented as a custom lookup for MySQL,
but it should have a fallback to still work in SQLite.
"""
lib = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
learning_package = lib.learning_package
assert learning_package is not None
components = authoring_api.get_components(
learning_package.id,
draft=True,
namespace='xblock.v1',
type_names=block_types,
draft_title=text_search,
)
return components
def get_library_block(usage_key: LibraryUsageLocatorV2, include_collections=False) -> LibraryXBlockMetadata:
"""
Get metadata about (the draft version of) one specific XBlock in a library.
This will raise ContentLibraryBlockNotFound if there is no draft version of
this block (i.e. it's been soft-deleted from Studio), even if there is a
live published version of it in the LMS.
"""
try:
component = get_component_from_usage_key(usage_key)
except ObjectDoesNotExist as exc:
raise ContentLibraryBlockNotFound(usage_key) from exc
# The component might have existed at one point, but no longer does because
# the draft was soft-deleted. This is actually a weird edge case and I'm not
# clear on what the proper behavior should be, since (a) the published
# version still exists; and (b) we might want to make some queries on the
# block even after it's been removed, since there might be versioned
# references to it.
draft_version = component.versioning.draft
if not draft_version:
raise ContentLibraryBlockNotFound(usage_key)
if include_collections:
associated_collections = authoring_api.get_entity_collections(
component.learning_package_id,
component.key,
).values('key', 'title')
else:
associated_collections = None
xblock_metadata = LibraryXBlockMetadata.from_component(
library_key=usage_key.context_key,
component=component,
associated_collections=associated_collections,
)
return xblock_metadata
def set_library_block_olx(usage_key: LibraryUsageLocatorV2, new_olx_str: str) -> ComponentVersion:
"""
Replace the OLX source of the given XBlock.
This is only meant for use by developers or API client applications, as
very little validation is done and this can easily result in a broken XBlock
that won't load.
Returns the version number of the newly created ComponentVersion.
"""
assert isinstance(usage_key, LibraryUsageLocatorV2)
# HTMLBlock uses CDATA to preserve HTML inside the XML, so make sure we
# don't strip that out.
parser = etree.XMLParser(strip_cdata=False)
# Verify that the OLX parses, at least as generic XML, and the root tag is correct:
node = etree.fromstring(new_olx_str, parser=parser)
if node.tag != usage_key.block_type:
raise ValueError(
f"Tried to set the OLX of a {usage_key.block_type} block to a <{node.tag}> node. "
f"{usage_key=!s}, {new_olx_str=}"
)
# We're intentionally NOT checking if the XBlock type is installed, since
# this is one of the only tools you can reach for to edit content for an
# XBlock that's broken or missing.
component = get_component_from_usage_key(usage_key)
# Get the title from the new OLX (or default to the default specified on the
# XBlock's display_name field.
new_title = node.attrib.get(
"display_name",
xblock_type_display_name(usage_key.block_type),
)
# Libraries don't use the url_name attribute, because they encode that into
# the Component key. Normally this is stripped out by the XBlockSerializer,
# but we're not actually creating the XBlock when it's coming from the
# clipboard right now.
if "url_name" in node.attrib:
del node.attrib["url_name"]
new_olx_str = etree.tostring(node, encoding='unicode')
now = datetime.now(tz=timezone.utc)
with transaction.atomic():
new_content = authoring_api.get_or_create_text_content(
component.learning_package_id,
get_or_create_olx_media_type(usage_key.block_type).id,
text=new_olx_str,
created=now,
)
new_component_version = authoring_api.create_next_component_version(
component.pk,
title=new_title,
content_to_replace={
'block.xml': new_content.pk,
},
created=now,
)
LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.context_key,
usage_key=usage_key
)
)
# For each container, trigger LIBRARY_CONTAINER_UPDATED signal and set background=True to trigger
# container indexing asynchronously.
affected_containers = lib_api.get_containers_contains_component(usage_key)
for container in affected_containers:
LIBRARY_CONTAINER_UPDATED.send_event(
library_container=LibraryContainerData(
library_key=usage_key.lib_key,
container_key=str(container.container_key),
background=True,
)
)
return new_component_version
def validate_can_add_block_to_library(
library_key: LibraryLocatorV2,
block_type: str,
block_id: str,
) -> tuple[ContentLibrary, LibraryUsageLocatorV2]:
"""
Perform checks to validate whether a new block with `block_id` and type `block_type` can be added to
the library with key `library_key`.
Returns the ContentLibrary that has the passed in `library_key` and newly created LibraryUsageLocatorV2 if
validation successful, otherwise raises errors.
"""
assert isinstance(library_key, LibraryLocatorV2)
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
# If adding a component would take us over our max, return an error.
assert content_library.learning_package_id is not None
component_count = authoring_api.get_all_drafts(content_library.learning_package_id).count()
if component_count + 1 > settings.MAX_BLOCKS_PER_CONTENT_LIBRARY:
raise BlockLimitReachedError(
_("Library cannot have more than {} Components").format(
settings.MAX_BLOCKS_PER_CONTENT_LIBRARY
)
)
# Make sure the proposed ID will be valid:
validate_unicode_slug(block_id)
# Ensure the XBlock type is valid and installed:
XBlock.load_class(block_type) # Will raise an exception if invalid
# Make sure the new ID is not taken already:
usage_key = LibraryUsageLocatorV2( # type: ignore[abstract]
lib_key=library_key,
block_type=block_type,
usage_id=block_id,
)
if _component_exists(usage_key):
raise LibraryBlockAlreadyExists(f"An XBlock with ID '{usage_key}' already exists")
return content_library, usage_key
def create_library_block(
library_key: LibraryLocatorV2,
block_type: str,
definition_id: str,
user_id: int | None = None,
can_stand_alone: bool = True,
):
"""
Create a new XBlock in this library of the specified type (e.g. "html").
Set can_stand_alone = False when a component is created under a container, like unit.
"""
# It's in the serializer as ``definition_id``, but for our purposes, it's
# the block_id. See the comments in ``LibraryXBlockCreationSerializer`` for
# more details. TODO: Change the param name once we change the serializer.
block_id = definition_id
content_library, usage_key = validate_can_add_block_to_library(library_key, block_type, block_id)
_create_component_for_block(content_library, usage_key, user_id, can_stand_alone)
# Now return the metadata about the new block:
LIBRARY_BLOCK_CREATED.send_event(
library_block=LibraryBlockData(
library_key=content_library.library_key,
usage_key=usage_key
)
)
return get_library_block(usage_key)
def import_staged_content_from_user_clipboard(library_key: LibraryLocatorV2, user, block_id) -> XBlock:
"""
Create a new library block and populate it with staged content from clipboard
Returns the newly created library block
"""
from openedx.core.djangoapps.content_staging import api as content_staging_api
if not content_staging_api:
raise RuntimeError("The required content_staging app is not installed")
user_clipboard = content_staging_api.get_user_clipboard(user)
if not user_clipboard:
return None
staged_content_id = user_clipboard.content.id
olx_str = content_staging_api.get_staged_content_olx(staged_content_id)
if olx_str is None:
return None # Shouldn't happen since we checked that the clipboard exists - mostly here for type checker
staged_content_files = content_staging_api.get_staged_content_static_files(staged_content_id)
content_library, usage_key = validate_can_add_block_to_library(
library_key,
user_clipboard.content.block_type,
block_id
)
# content_library.learning_package is technically a nullable field because
# it was added in a later migration, but we can't actually make a Library
# without one at the moment. TODO: fix this at the model level.
learning_package: LearningPackage = content_library.learning_package # type: ignore
now = datetime.now(tz=timezone.utc)
# Create component for block then populate it with clipboard data
with transaction.atomic():
# First create the Component, but do not initialize it to anything (i.e.
# no ComponentVersion).
component_type = authoring_api.get_or_create_component_type(
"xblock.v1", usage_key.block_type
)
component = authoring_api.create_component(
learning_package.id,
component_type=component_type,
local_key=usage_key.block_id,
created=now,
created_by=user.id,
)
# This will create the first component version and set the OLX/title
# appropriately. It will not publish. Once we get the newly created
# ComponentVersion back from this, we can attach all our files to it.
component_version = set_library_block_olx(usage_key, olx_str)
for staged_content_file_data in staged_content_files:
# The ``data`` attribute is going to be None because the clipboard
# is optimized to not do redundant file copying when copying/pasting
# within the same course (where all the Files and Uploads are
# shared). Learning Core backed content Components will always store
# a Component-local "copy" of the data, and rely on lower-level
# deduplication to happen in the ``contents`` app.
filename = staged_content_file_data.filename
# Grab our byte data for the file...
file_data = content_staging_api.get_staged_content_static_file_data(
staged_content_id,
filename,
)
if not file_data:
log.error(
f"Staged content {staged_content_id} included referenced "
f"file {filename}, but no file data was found."
)
continue
# Courses don't support having assets that are local to a specific
# component, and instead store all their content together in a
# shared Files and Uploads namespace. If we're pasting that into a
# Learning Core backed data model (v2 Libraries), then we want to
# prepend "static/" to the filename. This will need to get updated
# when we start moving courses over to Learning Core, or if we start
# storing course component assets in sub-directories of Files and
# Uploads.
#
# The reason we don't just search for a "static/" prefix is that
# Learning Core components can store other kinds of files if they
# wish (though none currently do).
source_assumes_global_assets = not isinstance(
user_clipboard.source_context_key, LibraryLocatorV2
)
if source_assumes_global_assets:
filename = f"static/{filename}"
# Now construct the Learning Core data models for it...
# TODO: more of this logic should be pushed down to openedx-learning
media_type_str, _encoding = mimetypes.guess_type(filename)
if not media_type_str:
media_type_str = "application/octet-stream"
media_type = authoring_api.get_or_create_media_type(media_type_str)
content = authoring_api.get_or_create_file_content(
learning_package.id,
media_type.id,
data=file_data,
created=now,
)
authoring_api.create_component_version_content(
component_version.pk,
content.id,
key=filename,
)
# Emit library block created event
LIBRARY_BLOCK_CREATED.send_event(
library_block=LibraryBlockData(
library_key=content_library.library_key,
usage_key=usage_key
)
)
# Now return the metadata about the new block
return get_library_block(usage_key)
def get_or_create_olx_media_type(block_type: str) -> MediaType:
"""
Get or create a MediaType for the block type.
Learning Core stores all Content with a Media Type (a.k.a. MIME type). For
OLX, we use the "application/vnd.*" convention, per RFC 6838.
"""
return authoring_api.get_or_create_media_type(
f"application/vnd.openedx.xblock.v1.{block_type}+xml"
)
def delete_library_block(usage_key: LibraryUsageLocatorV2, remove_from_parent=True) -> None:
"""
Delete the specified block from this library (soft delete).
"""
component = get_component_from_usage_key(usage_key)
library_key = usage_key.context_key
affected_collections = authoring_api.get_entity_collections(component.learning_package_id, component.key)
affected_containers = lib_api.get_containers_contains_component(usage_key)
authoring_api.soft_delete_draft(component.pk)
LIBRARY_BLOCK_DELETED.send_event(
library_block=LibraryBlockData(
library_key=library_key,
usage_key=usage_key
)
)
# For each collection, trigger LIBRARY_COLLECTION_UPDATED signal and set background=True to trigger
# collection indexing asynchronously.
#
# To delete the component on collections
for collection in affected_collections:
LIBRARY_COLLECTION_UPDATED.send_event(
library_collection=LibraryCollectionData(
library_key=library_key,
collection_key=collection.key,
background=True,
)
)
# For each container, trigger LIBRARY_CONTAINER_UPDATED signal and set background=True to trigger
# container indexing asynchronously.
#
# To update the components count in containers
for container in affected_containers:
LIBRARY_CONTAINER_UPDATED.send_event(
library_container=LibraryContainerData(
library_key=library_key,
container_key=str(container.container_key),
background=True,
)
)
def restore_library_block(usage_key: LibraryUsageLocatorV2) -> None:
"""
Restore the specified library block.
"""
component = get_component_from_usage_key(usage_key)
library_key = usage_key.context_key
affected_collections = authoring_api.get_entity_collections(component.learning_package_id, component.key)
# Set draft version back to the latest available component version id.
authoring_api.set_draft_version(component.pk, component.versioning.latest.pk)
LIBRARY_BLOCK_CREATED.send_event(
library_block=LibraryBlockData(
library_key=library_key,
usage_key=usage_key
)
)
# Add tags and collections back to index
CONTENT_OBJECT_ASSOCIATIONS_CHANGED.send_event(
content_object=ContentObjectChangedData(
object_id=str(usage_key),
changes=["collections", "tags"],
),
)
# For each collection, trigger LIBRARY_COLLECTION_UPDATED signal and set background=True to trigger
# collection indexing asynchronously.
#
# To restore the component in the collections
for collection in affected_collections:
LIBRARY_COLLECTION_UPDATED.send_event(
library_collection=LibraryCollectionData(
library_key=library_key,
collection_key=collection.key,
background=True,
)
)
# For each container, trigger LIBRARY_CONTAINER_UPDATED signal and set background=True to trigger
# container indexing asynchronously.
#
# To update the components count in containers
affected_containers = lib_api.get_containers_contains_component(usage_key)
for container in affected_containers:
LIBRARY_CONTAINER_UPDATED.send_event(
library_container=LibraryContainerData(
library_key=library_key,
container_key=str(container.container_key),
background=True,
)
)
def get_library_block_static_asset_files(usage_key: LibraryUsageLocatorV2) -> list[LibraryXBlockStaticFile]:
"""
Given an XBlock in a content library, list all the static asset files
associated with that XBlock.
Returns a list of LibraryXBlockStaticFile objects, sorted by path.
TODO: Should this be in the general XBlock API rather than the libraries API?
"""
component = get_component_from_usage_key(usage_key)
component_version = component.versioning.draft
# If there is no Draft version, then this was soft-deleted
if component_version is None:
return []
# cvc = the ComponentVersionContent through table
cvc_set = (
component_version
.componentversioncontent_set
.filter(content__has_file=True)
.order_by('key')
.select_related('content')
)
site_root_url = get_xblock_app_config().get_site_root_url()
return [
LibraryXBlockStaticFile(
path=cvc.key,
size=cvc.content.size,
url=site_root_url + reverse(
'content_libraries:library-assets',
kwargs={
'component_version_uuid': component_version.uuid,
'asset_path': cvc.key,
}
),
)
for cvc in cvc_set
]
def add_library_block_static_asset_file(
usage_key: LibraryUsageLocatorV2,
file_path: str,
file_content: bytes,
user: UserType | None = None,
) -> LibraryXBlockStaticFile:
"""
Upload a static asset file into the library, to be associated with the
specified XBlock. Will silently overwrite an existing file of the same name.
file_path should be a name like "doc.pdf". It may optionally contain slashes
like 'en/doc.pdf'
file_content should be a binary string.
Returns a LibraryXBlockStaticFile object.
Sends a LIBRARY_BLOCK_UPDATED event.
Example:
video_block = UsageKey.from_string("lb:VideoTeam:python-intro:video:1")
add_library_block_static_asset_file(video_block, "subtitles-en.srt", subtitles.encode('utf-8'))
"""
# File path validations copied over from v1 library logic. This can't really
# hurt us inside our system because we never use these paths in an actual
# file systemthey're just string keys that point to hash-named data files
# in a common library (learning package) level directory. But it might
# become a security issue during import/export serialization.
if file_path != file_path.strip().strip('/'):
raise InvalidNameError("file_path cannot start/end with / or whitespace.")
if '//' in file_path or '..' in file_path:
raise InvalidNameError("Invalid sequence (// or ..) in file_path.")
component = get_component_from_usage_key(usage_key)
with transaction.atomic():
component_version = authoring_api.create_next_component_version(
component.pk,
content_to_replace={file_path: file_content},
created=datetime.now(tz=timezone.utc),
created_by=user.id if user else None,
)
transaction.on_commit(
lambda: LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.context_key,
usage_key=usage_key,
)
)
)
# Now figure out the URL for the newly created asset...
site_root_url = get_xblock_app_config().get_site_root_url()
local_path = reverse(
'content_libraries:library-assets',
kwargs={
'component_version_uuid': component_version.uuid,
'asset_path': file_path,
}
)
return LibraryXBlockStaticFile(
path=file_path,
url=site_root_url + local_path,
size=len(file_content),
)
def delete_library_block_static_asset_file(usage_key, file_path, user=None):
"""
Delete a static asset file from the library.
Sends a LIBRARY_BLOCK_UPDATED event.
Example:
video_block = UsageKey.from_string("lb:VideoTeam:python-intro:video:1")
delete_library_block_static_asset_file(video_block, "subtitles-en.srt")
"""
component = get_component_from_usage_key(usage_key)
now = datetime.now(tz=timezone.utc)
with transaction.atomic():
component_version = authoring_api.create_next_component_version(
component.pk,
content_to_replace={file_path: None},
created=now,
created_by=user.id if user else None,
)
transaction.on_commit(
lambda: LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.context_key,
usage_key=usage_key,
)
)
)
def publish_component_changes(usage_key: LibraryUsageLocatorV2, user: UserType):
"""
Publish all pending changes in a single component.
"""
content_library = require_permission_for_library_key(
usage_key.lib_key,
user,
CAN_EDIT_THIS_CONTENT_LIBRARY
)
learning_package = content_library.learning_package
assert learning_package
component = get_component_from_usage_key(usage_key)
drafts_to_publish = authoring_api.get_all_drafts(learning_package.id).filter(
entity__key=component.key
)
authoring_api.publish_from_drafts(learning_package.id, draft_qset=drafts_to_publish, published_by=user.id)
LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.lib_key,
usage_key=usage_key,
)
)
def _component_exists(usage_key: UsageKeyV2) -> bool:
"""
Does a Component exist for this usage key?
This is a lower-level function that will return True if a Component object
exists, even if it was soft-deleted, and there is no active draft version.
"""
try:
get_component_from_usage_key(usage_key)
except ObjectDoesNotExist:
return False
return True
def _create_component_for_block(
content_lib: ContentLibrary,
usage_key: LibraryUsageLocatorV2,
user_id: int | None = None,
can_stand_alone: bool = True,
):
"""
Create a Component for an XBlock type, initialize it, and return the ComponentVersion.
This will create a Component, along with its first ComponentVersion. The tag
in the OLX will have no attributes, e.g. `<problem />`. This first version
will be set as the current draft. This function does not publish the
Component.
Set can_stand_alone = False when a component is created under a container, like unit.
TODO: We should probably shift this to openedx.core.djangoapps.xblock.api
(along with its caller) since it gives runtime storage specifics. The
Library-specific logic stays in this module, so "create a block for my lib"
should stay here, but "making a block means creating a component with
text data like X" goes in xblock.api.
"""
display_name = xblock_type_display_name(usage_key.block_type)
now = datetime.now(tz=timezone.utc)
xml_text = f'<{usage_key.block_type} />'
learning_package = content_lib.learning_package
assert learning_package is not None # mostly for type checker
with transaction.atomic():
component_type = authoring_api.get_or_create_component_type(
"xblock.v1", usage_key.block_type
)
component, component_version = authoring_api.create_component_and_version(
learning_package.id,
component_type=component_type,
local_key=usage_key.block_id,
title=display_name,
created=now,
created_by=user_id,
can_stand_alone=can_stand_alone,
)
content = authoring_api.get_or_create_text_content(
learning_package.id,
get_or_create_olx_media_type(usage_key.block_type).id,
text=xml_text,
created=now,
)
authoring_api.create_component_version_content(
component_version.pk,
content.id,
key="block.xml",
)
return component_version

View File

@@ -0,0 +1,258 @@
"""
Python API for library collections
==================================
"""
from django.db import IntegrityError
from opaque_keys.edx.keys import BlockTypeKey, UsageKeyV2
from opaque_keys.edx.locator import (
LibraryLocatorV2,
LibraryCollectionLocator,
)
from openedx_events.content_authoring.data import LibraryCollectionData
from openedx_events.content_authoring.signals import LIBRARY_COLLECTION_UPDATED
from openedx_learning.api import authoring as authoring_api
from openedx_learning.api.authoring_models import (
Collection,
Component,
PublishableEntity,
)
from .exceptions import (
ContentLibraryBlockNotFound,
ContentLibraryCollectionNotFound,
LibraryCollectionAlreadyExists,
)
from ..models import ContentLibrary
# The public API is only the following symbols:
__all__ = [
"create_library_collection",
"update_library_collection",
"update_library_collection_components",
"set_library_component_collections",
"get_library_collection_usage_key",
"get_library_collection_from_usage_key",
]
def create_library_collection(
library_key: LibraryLocatorV2,
collection_key: str,
title: str,
*,
description: str = "",
created_by: int | None = None,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Collection:
"""
Creates a Collection in the given ContentLibrary.
If you've already fetched a ContentLibrary for the given library_key, pass it in here to avoid refetching.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
try:
collection = authoring_api.create_collection(
learning_package_id=content_library.learning_package_id,
key=collection_key,
title=title,
description=description,
created_by=created_by,
)
except IntegrityError as err:
raise LibraryCollectionAlreadyExists from err
return collection
def update_library_collection(
library_key: LibraryLocatorV2,
collection_key: str,
*,
title: str | None = None,
description: str | None = None,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Collection:
"""
Updates a Collection in the given ContentLibrary.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
try:
collection = authoring_api.update_collection(
learning_package_id=content_library.learning_package_id,
key=collection_key,
title=title,
description=description,
)
except Collection.DoesNotExist as exc:
raise ContentLibraryCollectionNotFound from exc
return collection
def update_library_collection_components(
library_key: LibraryLocatorV2,
collection_key: str,
*,
usage_keys: list[UsageKeyV2],
created_by: int | None = None,
remove=False,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Collection:
"""
Associates the Collection with Components for the given UsageKeys.
By default the Components are added to the Collection.
If remove=True, the Components are removed from the Collection.
If you've already fetched the ContentLibrary, pass it in to avoid refetching.
Raises:
* ContentLibraryCollectionNotFound if no Collection with the given pk is found in the given library.
* ContentLibraryBlockNotFound if any of the given usage_keys don't match Components in the given library.
Returns the updated Collection.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
# Fetch the Component.key values for the provided UsageKeys.
component_keys = []
for usage_key in usage_keys:
# Parse the block_family from the key to use as namespace.
block_type = BlockTypeKey.from_string(str(usage_key))
try:
component = authoring_api.get_component_by_key(
content_library.learning_package_id,
namespace=block_type.block_family,
type_name=usage_key.block_type,
local_key=usage_key.block_id,
)
except Component.DoesNotExist as exc:
raise ContentLibraryBlockNotFound(usage_key) from exc
component_keys.append(component.key)
# Note: Component.key matches its PublishableEntity.key
entities_qset = PublishableEntity.objects.filter(
key__in=component_keys,
)
if remove:
collection = authoring_api.remove_from_collection(
content_library.learning_package_id,
collection_key,
entities_qset,
)
else:
collection = authoring_api.add_to_collection(
content_library.learning_package_id,
collection_key,
entities_qset,
created_by=created_by,
)
return collection
def set_library_component_collections(
library_key: LibraryLocatorV2,
component: Component,
*,
collection_keys: list[str],
created_by: int | None = None,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Component:
"""
It Associates the component with collections for the given collection keys.
Only collections in queryset are associated with component, all previous component-collections
associations are removed.
If you've already fetched the ContentLibrary, pass it in to avoid refetching.
Raises:
* ContentLibraryCollectionNotFound if any of the given collection_keys don't match Collections in the given library.
Returns the updated Component.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
# Note: Component.key matches its PublishableEntity.key
collection_qs = authoring_api.get_collections(content_library.learning_package_id).filter(
key__in=collection_keys
)
affected_collections = authoring_api.set_collections(
content_library.learning_package_id,
component,
collection_qs,
created_by=created_by,
)
# For each collection, trigger LIBRARY_COLLECTION_UPDATED signal and set background=True to trigger
# collection indexing asynchronously.
for collection in affected_collections:
LIBRARY_COLLECTION_UPDATED.send_event(
library_collection=LibraryCollectionData(
library_key=library_key,
collection_key=collection.key,
background=True,
)
)
return component
def get_library_collection_usage_key(
library_key: LibraryLocatorV2,
collection_key: str,
) -> LibraryCollectionLocator:
"""
Returns the LibraryCollectionLocator associated to a collection
"""
return LibraryCollectionLocator(library_key, collection_key)
def get_library_collection_from_usage_key(
collection_usage_key: LibraryCollectionLocator,
) -> Collection:
"""
Return a Collection using the LibraryCollectionLocator
"""
library_key = collection_usage_key.library_key
collection_key = collection_usage_key.collection_id
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library.learning_package_id is not None # shouldn't happen but it's technically possible.
try:
return authoring_api.get_collection(
content_library.learning_package_id,
collection_key,
)
except Collection.DoesNotExist as exc:
raise ContentLibraryCollectionNotFound from exc

View File

@@ -27,14 +27,16 @@ from openedx_learning.api.authoring_models import Container
from openedx.core.djangoapps.xblock.api import get_component_from_usage_key
from ..models import ContentLibrary
from .exceptions import ContentLibraryContainerNotFound
from .libraries import LibraryXBlockMetadata, PublishableItem
# The public API is only the following symbols:
__all__ = [
"ContentLibraryContainerNotFound",
# Models
"ContainerMetadata",
"ContainerType",
# API methods
"get_container",
"create_container",
"get_container_children",
@@ -47,9 +49,6 @@ __all__ = [
]
ContentLibraryContainerNotFound = Container.DoesNotExist
class ContainerType(Enum):
Unit = "unit"

View File

@@ -0,0 +1,356 @@
"""
Content Libraries Python API to import blocks from Courseware
=============================================================
Content Libraries can import blocks from Courseware (Modulestore). The import
can be done per-course, by listing its content, and supports both access to
remote platform instances as well as local modulestore APIs. Additionally,
there are Celery-based interfaces suitable for background processing controlled
through RESTful APIs (see :mod:`.views`).
"""
import abc
import collections
import base64
import hashlib
import logging
from django.conf import settings
import requests
from opaque_keys.edx.locator import (
LibraryUsageLocatorV2,
LibraryLocator as LibraryLocatorV1,
)
from opaque_keys.edx.keys import UsageKey
from edx_rest_api_client.client import OAuthAPIClient
from openedx.core.lib.xblock_serializer.api import serialize_modulestore_block_for_learning_core
from xmodule.modulestore.django import modulestore
from .. import tasks
from ..models import ContentLibrary, ContentLibraryBlockImportTask
from .blocks import (
LibraryBlockAlreadyExists,
add_library_block_static_asset_file,
create_library_block,
get_library_block_static_asset_files,
get_library_block,
set_library_block_olx,
)
from .libraries import publish_changes
log = logging.getLogger(__name__)
__all__ = [
"EdxModulestoreImportClient",
"EdxApiImportClient",
"import_blocks_create_task",
]
class BaseEdxImportClient(abc.ABC):
"""
Base class for all courseware import clients.
Import clients are wrappers tailored to implement the steps used in the
import APIs and can leverage different backends. It is not aimed towards
being a generic API client for Open edX.
"""
EXPORTABLE_BLOCK_TYPES = {
"drag-and-drop-v2",
"problem",
"html",
"video",
}
def __init__(self, library_key=None, library=None, use_course_key_as_block_id_suffix=True):
"""
Initialize an import client for a library.
The method accepts either a library object or a key to a library object.
"""
self.use_course_key_as_block_id_suffix = use_course_key_as_block_id_suffix
if bool(library_key) == bool(library):
raise ValueError('Provide at least one of `library_key` or '
'`library`, but not both.')
if library is None:
library = ContentLibrary.objects.get_by_key(library_key)
self.library = library
@abc.abstractmethod
def get_block_data(self, block_key):
"""
Get the block's OLX and static files, if any.
"""
@abc.abstractmethod
def get_export_keys(self, course_key):
"""
Get all exportable block keys of a given course.
"""
@abc.abstractmethod
def get_block_static_data(self, asset_file):
"""
Get the contents of an asset_file..
"""
def import_block(self, modulestore_key):
"""
Import a single modulestore block.
"""
block_data = self.get_block_data(modulestore_key)
# Get or create the block in the library.
#
# To dedup blocks from different courses with the same ID, we hash the
# course key into the imported block id.
course_key_id = base64.b32encode(
hashlib.blake2s(
str(modulestore_key.course_key).encode()
).digest()
)[:16].decode().lower()
# add the course_key_id if use_course_key_as_suffix is enabled to increase the namespace.
# The option exists to not use the course key as a suffix because
# in order to preserve learner state in the v1 to v2 libraries migration,
# the v2 and v1 libraries' child block ids must be the same.
block_id = (
# Prepend 'c' to allow changing hash without conflicts.
f"{modulestore_key.block_id}_c{course_key_id}"
if self.use_course_key_as_block_id_suffix
else f"{modulestore_key.block_id}"
)
log.info('Importing to library block: id=%s', block_id)
try:
library_block = create_library_block(
self.library.library_key,
modulestore_key.block_type,
block_id,
)
dest_key = library_block.usage_key
except LibraryBlockAlreadyExists:
dest_key = LibraryUsageLocatorV2(
lib_key=self.library.library_key,
block_type=modulestore_key.block_type,
usage_id=block_id,
)
get_library_block(dest_key)
log.warning('Library block already exists: Appending static files '
'and overwriting OLX: %s', str(dest_key))
# Handle static files.
files = [
f.path for f in
get_library_block_static_asset_files(dest_key)
]
for filename, static_file in block_data.get('static_files', {}).items():
if filename in files:
# Files already added, move on.
continue
file_content = self.get_block_static_data(static_file)
add_library_block_static_asset_file(dest_key, filename, file_content)
files.append(filename)
# Import OLX.
set_library_block_olx(dest_key, block_data['olx'])
def import_blocks_from_course(self, course_key, progress_callback):
"""
Import all eligible blocks from course key.
Progress is reported through ``progress_callback``, guaranteed to be
called within an exception handler if ``exception is not None``.
"""
# Query the course and rerieve all course blocks.
export_keys = self.get_export_keys(course_key)
if not export_keys:
raise ValueError(f"The courseware course {course_key} does not have "
"any exportable content. No action taken.")
# Import each block, skipping the ones that fail.
for index, block_key in enumerate(export_keys):
try:
log.info('Importing block: %s/%s: %s', index + 1, len(export_keys), block_key)
self.import_block(block_key)
except Exception as exc: # pylint: disable=broad-except
log.exception("Error importing block: %s", block_key)
progress_callback(block_key, index + 1, len(export_keys), exc)
else:
log.info('Successfully imported: %s/%s: %s', index + 1, len(export_keys), block_key)
progress_callback(block_key, index + 1, len(export_keys), None)
log.info("Publishing library: %s", self.library.library_key)
publish_changes(self.library.library_key)
class EdxModulestoreImportClient(BaseEdxImportClient):
"""
An import client based on the local instance of modulestore.
"""
def __init__(self, modulestore_instance=None, **kwargs):
"""
Initialize the client with a modulestore instance.
"""
super().__init__(**kwargs)
self.modulestore = modulestore_instance or modulestore()
def get_block_data(self, block_key):
"""
Get block OLX by serializing it from modulestore directly.
"""
block = self.modulestore.get_item(block_key)
data = serialize_modulestore_block_for_learning_core(block)
return {'olx': data.olx_str,
'static_files': {s.name: s for s in data.static_files}}
def get_export_keys(self, course_key):
"""
Retrieve the course from modulestore and traverse its content tree.
"""
course = self.modulestore.get_course(course_key)
if isinstance(course_key, LibraryLocatorV1):
course = self.modulestore.get_library(course_key)
export_keys = set()
blocks_q = collections.deque(course.get_children())
while blocks_q:
block = blocks_q.popleft()
usage_id = block.scope_ids.usage_id
if usage_id in export_keys:
continue
if usage_id.block_type in self.EXPORTABLE_BLOCK_TYPES:
export_keys.add(usage_id)
if block.has_children:
blocks_q.extend(block.get_children())
return list(export_keys)
def get_block_static_data(self, asset_file):
"""
Get static content from its URL if available, otherwise from its data.
"""
if asset_file.data:
return asset_file.data
resp = requests.get(f"http://{settings.CMS_BASE}" + asset_file.url)
resp.raise_for_status()
return resp.content
class EdxApiImportClient(BaseEdxImportClient):
"""
An import client based on a remote Open Edx API interface.
TODO: Look over this class. We'll probably need to completely re-implement
the import process.
"""
URL_COURSES = "/api/courses/v1/courses/{course_key}"
URL_MODULESTORE_BLOCK_OLX = "/api/olx-export/v1/xblock/{block_key}/"
def __init__(self, lms_url, studio_url, oauth_key, oauth_secret, *args, **kwargs):
"""
Initialize the API client with URLs and OAuth keys.
"""
super().__init__(**kwargs)
self.lms_url = lms_url
self.studio_url = studio_url
self.oauth_client = OAuthAPIClient(
self.lms_url,
oauth_key,
oauth_secret,
)
def get_block_data(self, block_key):
"""
See parent's docstring.
"""
olx_path = self.URL_MODULESTORE_BLOCK_OLX.format(block_key=block_key)
resp = self._get(self.studio_url + olx_path)
return resp['blocks'][str(block_key)]
def get_export_keys(self, course_key):
"""
See parent's docstring.
"""
course_blocks_url = self._get_course(course_key)['blocks_url']
course_blocks = self._get(
course_blocks_url,
params={'all_blocks': True, 'depth': 'all'})['blocks']
export_keys = []
for block_info in course_blocks.values():
if block_info['type'] in self.EXPORTABLE_BLOCK_TYPES:
export_keys.append(UsageKey.from_string(block_info['id']))
return export_keys
def get_block_static_data(self, asset_file):
"""
See parent's docstring.
"""
if (asset_file['url'].startswith(self.studio_url)
and 'export-file' in asset_file['url']):
# We must call download this file with authentication. But
# we only want to pass the auth headers if this is the same
# studio instance, or else we could leak credentials to a
# third party.
path = asset_file['url'][len(self.studio_url):]
resp = self._call('get', path)
else:
resp = requests.get(asset_file['url'])
resp.raise_for_status()
return resp.content
def _get(self, *args, **kwargs):
"""
Perform a get request to the client.
"""
return self._json_call('get', *args, **kwargs)
def _get_course(self, course_key):
"""
Request details for a course.
"""
course_url = self.lms_url + self.URL_COURSES.format(course_key=course_key)
return self._get(course_url)
def _json_call(self, method, *args, **kwargs):
"""
Wrapper around request calls that ensures valid json responses.
"""
return self._call(method, *args, **kwargs).json()
def _call(self, method, *args, **kwargs):
"""
Wrapper around request calls.
"""
response = getattr(self.oauth_client, method)(*args, **kwargs)
response.raise_for_status()
return response
def import_blocks_create_task(library_key, course_key, use_course_key_as_block_id_suffix=True):
"""
Create a new import block task.
This API will schedule a celery task to perform the import, and it returns a
import task object for polling.
"""
library = ContentLibrary.objects.get_by_key(library_key)
import_task = ContentLibraryBlockImportTask.objects.create(
library=library,
course_id=course_key,
)
result = tasks.import_blocks_from_course.apply_async(
args=(import_task.pk, str(course_key), use_course_key_as_block_id_suffix)
)
log.info(f"Import block task created: import_task={import_task} "
f"celery_task={result.id}")
return import_task

View File

@@ -0,0 +1,64 @@
"""
Exceptions that can be thrown by the Content Libraries API.
"""
from django.db import IntegrityError
from openedx_learning.api.authoring_models import Collection, Container
from xblock.exceptions import XBlockNotFoundError
from ..models import ContentLibrary
# The public API is only the following symbols:
__all__ = [
"ContentLibraryNotFound",
"ContentLibraryCollectionNotFound",
"ContentLibraryContainerNotFound",
"ContentLibraryBlockNotFound",
"LibraryAlreadyExists",
"LibraryCollectionAlreadyExists",
"LibraryBlockAlreadyExists",
"BlockLimitReachedError",
"IncompatibleTypesError",
"InvalidNameError",
"LibraryPermissionIntegrityError",
]
ContentLibraryNotFound = ContentLibrary.DoesNotExist
ContentLibraryCollectionNotFound = Collection.DoesNotExist
ContentLibraryContainerNotFound = Container.DoesNotExist
class ContentLibraryBlockNotFound(XBlockNotFoundError):
""" XBlock not found in the content library """
class LibraryAlreadyExists(KeyError):
""" A library with the specified slug already exists """
class LibraryCollectionAlreadyExists(IntegrityError):
""" A Collection with that key already exists in the library """
class LibraryBlockAlreadyExists(KeyError):
""" An XBlock with that ID already exists in the library """
class BlockLimitReachedError(Exception):
""" Maximum number of allowed XBlocks in the library reached """
class IncompatibleTypesError(Exception):
""" Library type constraint violated """
class InvalidNameError(ValueError):
""" The specified name/identifier is not valid """
class LibraryPermissionIntegrityError(IntegrityError):
""" Thrown when an operation would cause insane permissions. """

View File

@@ -38,111 +38,58 @@ components in content libraries and may not be appropriate for other learning
contexts so they are implemented here in the library API only. In the future,
if we find a need for these in most other learning contexts then those methods
could be promoted to the core XBlock API and made generic.
Import from Courseware
----------------------
Content Libraries can import blocks from Courseware (Modulestore). The import
can be done per-course, by listing its content, and supports both access to
remote platform instances as well as local modulestore APIs. Additionally,
there are Celery-based interfaces suitable for background processing controlled
through RESTful APIs (see :mod:`.views`).
"""
from __future__ import annotations
import abc
import collections
from dataclasses import dataclass, field
from datetime import datetime, timezone
import base64
import hashlib
from dataclasses import dataclass, field as dataclass_field
from datetime import datetime
import logging
import mimetypes
import requests
from django.conf import settings
from django.contrib.auth.models import AbstractUser, AnonymousUser, Group
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.core.exceptions import PermissionDenied
from django.core.validators import validate_unicode_slug
from django.db import IntegrityError, transaction
from django.db.models import Q, QuerySet
from django.utils.translation import gettext as _
from edx_rest_api_client.client import OAuthAPIClient
from django.urls import reverse
from lxml import etree
from opaque_keys.edx.keys import BlockTypeKey, UsageKey, UsageKeyV2
from opaque_keys.edx.locator import (
LibraryLocatorV2,
LibraryUsageLocatorV2,
LibraryLocator as LibraryLocatorV1,
LibraryCollectionLocator,
)
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
from openedx_events.content_authoring.data import (
ContentLibraryData,
LibraryBlockData,
LibraryCollectionData,
LibraryContainerData,
ContentObjectChangedData,
)
from openedx_events.content_authoring.signals import (
CONTENT_LIBRARY_CREATED,
CONTENT_LIBRARY_DELETED,
CONTENT_LIBRARY_UPDATED,
LIBRARY_BLOCK_CREATED,
LIBRARY_BLOCK_DELETED,
LIBRARY_BLOCK_UPDATED,
LIBRARY_COLLECTION_UPDATED,
LIBRARY_CONTAINER_UPDATED,
CONTENT_OBJECT_ASSOCIATIONS_CHANGED,
)
from openedx_learning.api import authoring as authoring_api
from openedx_learning.api.authoring_models import (
Collection,
Component,
ComponentVersion,
MediaType,
LearningPackage,
PublishableEntity,
)
from openedx_learning.api.authoring_models import Component
from organizations.models import Organization
from xblock.core import XBlock
from xblock.exceptions import XBlockNotFoundError
from openedx.core.djangoapps.xblock.api import (
get_component_from_usage_key,
get_xblock_app_config,
xblock_type_display_name,
)
from openedx.core.lib.xblock_serializer.api import serialize_modulestore_block_for_learning_core
from openedx.core.djangoapps.content_libraries import api as lib_api
from openedx.core.types import User as UserType
from xmodule.modulestore.django import modulestore
from .. import permissions, tasks
from .. import permissions
from ..constants import ALL_RIGHTS_RESERVED
from ..models import ContentLibrary, ContentLibraryPermission, ContentLibraryBlockImportTask
from ..models import ContentLibrary, ContentLibraryPermission
from .exceptions import (
LibraryAlreadyExists,
LibraryPermissionIntegrityError,
)
log = logging.getLogger(__name__)
# The public API is only the following symbols:
__all__ = [
# Exceptions - maybe move them to a new file?
"ContentLibraryNotFound",
"ContentLibraryCollectionNotFound",
"ContentLibraryBlockNotFound",
"LibraryAlreadyExists",
"LibraryCollectionAlreadyExists",
"LibraryBlockAlreadyExists",
"BlockLimitReachedError",
"IncompatibleTypesError",
"InvalidNameError",
"LibraryPermissionIntegrityError",
# Library Models
"ContentLibrary", # Should this be public or not?
"ContentLibraryMetadata",
"AccessLevel",
"ContentLibraryPermissionEntry",
"LibraryXBlockType",
"CollectionMetadata",
# Library API methods
"user_can_create_library",
@@ -157,64 +104,13 @@ __all__ = [
"set_library_group_permissions",
"update_library",
"delete_library",
"library_component_usage_key",
"get_allowed_block_types",
"publish_changes",
"revert_changes",
# Collections - TODO: move to a new file
"create_library_collection",
"update_library_collection",
"update_library_collection_components",
"set_library_component_collections",
"get_library_collection_usage_key",
"get_library_collection_from_usage_key",
# Import - TODO: move to a new file
"EdxModulestoreImportClient",
"EdxApiImportClient",
"import_blocks_create_task",
]
# Exceptions
# ==========
ContentLibraryNotFound = ContentLibrary.DoesNotExist
ContentLibraryCollectionNotFound = Collection.DoesNotExist
class ContentLibraryBlockNotFound(XBlockNotFoundError):
""" XBlock not found in the content library """
class LibraryAlreadyExists(KeyError):
""" A library with the specified slug already exists """
class LibraryCollectionAlreadyExists(IntegrityError):
""" A Collection with that key already exists in the library """
class LibraryBlockAlreadyExists(KeyError):
""" An XBlock with that ID already exists in the library """
class BlockLimitReachedError(Exception):
""" Maximum number of allowed XBlocks in the library reached """
class IncompatibleTypesError(Exception):
""" Library type constraint violated """
class InvalidNameError(ValueError):
""" The specified name/identifier is not valid """
class LibraryPermissionIntegrityError(IntegrityError):
""" Thrown when an operation would cause insane permissions. """
# Models
# ======
@@ -304,7 +200,7 @@ class PublishableItem(LibraryItem):
# The username of the user who created the last draft.
last_draft_created_by: str = ""
has_unpublished_changes: bool = False
collections: list[CollectionMetadata] = field(default_factory=list)
collections: list[CollectionMetadata] = dataclass_field(default_factory=list)
can_stand_alone: bool = True
@@ -760,165 +656,6 @@ def delete_library(library_key: LibraryLocatorV2) -> None:
)
def _get_library_component_tags_count(library_key: LibraryLocatorV2) -> dict:
"""
Get the count of tags that are applied to each component in this library, as a dict.
"""
# Import content_tagging.api here to avoid circular imports
from openedx.core.djangoapps.content_tagging.api import get_object_tag_counts
# Create a pattern to match the IDs of the library components, e.g. "lb:org:id*"
library_key_pattern = str(library_key).replace("lib:", "lb:", 1) + "*"
return get_object_tag_counts(library_key_pattern, count_implicit=True)
def get_library_components(
library_key: LibraryLocatorV2,
text_search: str | None = None,
block_types: list[str] | None = None,
) -> QuerySet[Component]:
"""
Get the library components and filter.
TODO: Full text search needs to be implemented as a custom lookup for MySQL,
but it should have a fallback to still work in SQLite.
"""
lib = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
learning_package = lib.learning_package
assert learning_package is not None
components = authoring_api.get_components(
learning_package.id,
draft=True,
namespace='xblock.v1',
type_names=block_types,
draft_title=text_search,
)
return components
def get_library_block(usage_key: LibraryUsageLocatorV2, include_collections=False) -> LibraryXBlockMetadata:
"""
Get metadata about (the draft version of) one specific XBlock in a library.
This will raise ContentLibraryBlockNotFound if there is no draft version of
this block (i.e. it's been soft-deleted from Studio), even if there is a
live published version of it in the LMS.
"""
try:
component = get_component_from_usage_key(usage_key)
except ObjectDoesNotExist as exc:
raise ContentLibraryBlockNotFound(usage_key) from exc
# The component might have existed at one point, but no longer does because
# the draft was soft-deleted. This is actually a weird edge case and I'm not
# clear on what the proper behavior should be, since (a) the published
# version still exists; and (b) we might want to make some queries on the
# block even after it's been removed, since there might be versioned
# references to it.
draft_version = component.versioning.draft
if not draft_version:
raise ContentLibraryBlockNotFound(usage_key)
if include_collections:
associated_collections = authoring_api.get_entity_collections(
component.learning_package_id,
component.key,
).values('key', 'title')
else:
associated_collections = None
xblock_metadata = LibraryXBlockMetadata.from_component(
library_key=usage_key.context_key,
component=component,
associated_collections=associated_collections,
)
return xblock_metadata
def set_library_block_olx(usage_key: LibraryUsageLocatorV2, new_olx_str: str) -> ComponentVersion:
"""
Replace the OLX source of the given XBlock.
This is only meant for use by developers or API client applications, as
very little validation is done and this can easily result in a broken XBlock
that won't load.
Returns the version number of the newly created ComponentVersion.
"""
assert isinstance(usage_key, LibraryUsageLocatorV2)
# HTMLBlock uses CDATA to preserve HTML inside the XML, so make sure we
# don't strip that out.
parser = etree.XMLParser(strip_cdata=False)
# Verify that the OLX parses, at least as generic XML, and the root tag is correct:
node = etree.fromstring(new_olx_str, parser=parser)
if node.tag != usage_key.block_type:
raise ValueError(
f"Tried to set the OLX of a {usage_key.block_type} block to a <{node.tag}> node. "
f"{usage_key=!s}, {new_olx_str=}"
)
# We're intentionally NOT checking if the XBlock type is installed, since
# this is one of the only tools you can reach for to edit content for an
# XBlock that's broken or missing.
component = get_component_from_usage_key(usage_key)
# Get the title from the new OLX (or default to the default specified on the
# XBlock's display_name field.
new_title = node.attrib.get(
"display_name",
xblock_type_display_name(usage_key.block_type),
)
# Libraries don't use the url_name attribute, because they encode that into
# the Component key. Normally this is stripped out by the XBlockSerializer,
# but we're not actually creating the XBlock when it's coming from the
# clipboard right now.
if "url_name" in node.attrib:
del node.attrib["url_name"]
new_olx_str = etree.tostring(node, encoding='unicode')
now = datetime.now(tz=timezone.utc)
with transaction.atomic():
new_content = authoring_api.get_or_create_text_content(
component.learning_package_id,
get_or_create_olx_media_type(usage_key.block_type).id,
text=new_olx_str,
created=now,
)
new_component_version = authoring_api.create_next_component_version(
component.pk,
title=new_title,
content_to_replace={
'block.xml': new_content.pk,
},
created=now,
)
LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.context_key,
usage_key=usage_key
)
)
# For each container, trigger LIBRARY_CONTAINER_UPDATED signal and set background=True to trigger
# container indexing asynchronously.
affected_containers = lib_api.get_containers_contains_component(usage_key)
for container in affected_containers:
LIBRARY_CONTAINER_UPDATED.send_event(
library_container=LibraryContainerData(
library_key=usage_key.lib_key,
container_key=str(container.container_key),
background=True,
)
)
return new_component_version
def library_component_usage_key(
library_key: LibraryLocatorV2,
component: Component,
@@ -933,511 +670,6 @@ def library_component_usage_key(
)
def validate_can_add_block_to_library(
library_key: LibraryLocatorV2,
block_type: str,
block_id: str,
) -> tuple[ContentLibrary, LibraryUsageLocatorV2]:
"""
Perform checks to validate whether a new block with `block_id` and type `block_type` can be added to
the library with key `library_key`.
Returns the ContentLibrary that has the passed in `library_key` and newly created LibraryUsageLocatorV2 if
validation successful, otherwise raises errors.
"""
assert isinstance(library_key, LibraryLocatorV2)
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
# If adding a component would take us over our max, return an error.
assert content_library.learning_package_id is not None
component_count = authoring_api.get_all_drafts(content_library.learning_package_id).count()
if component_count + 1 > settings.MAX_BLOCKS_PER_CONTENT_LIBRARY:
raise BlockLimitReachedError(
_("Library cannot have more than {} Components").format(
settings.MAX_BLOCKS_PER_CONTENT_LIBRARY
)
)
# Make sure the proposed ID will be valid:
validate_unicode_slug(block_id)
# Ensure the XBlock type is valid and installed:
XBlock.load_class(block_type) # Will raise an exception if invalid
# Make sure the new ID is not taken already:
usage_key = LibraryUsageLocatorV2( # type: ignore[abstract]
lib_key=library_key,
block_type=block_type,
usage_id=block_id,
)
if _component_exists(usage_key):
raise LibraryBlockAlreadyExists(f"An XBlock with ID '{usage_key}' already exists")
return content_library, usage_key
def create_library_block(
library_key: LibraryLocatorV2,
block_type: str,
definition_id: str,
user_id: int | None = None,
can_stand_alone: bool = True,
):
"""
Create a new XBlock in this library of the specified type (e.g. "html").
Set can_stand_alone = False when a component is created under a container, like unit.
"""
# It's in the serializer as ``definition_id``, but for our purposes, it's
# the block_id. See the comments in ``LibraryXBlockCreationSerializer`` for
# more details. TODO: Change the param name once we change the serializer.
block_id = definition_id
content_library, usage_key = validate_can_add_block_to_library(library_key, block_type, block_id)
_create_component_for_block(content_library, usage_key, user_id, can_stand_alone)
# Now return the metadata about the new block:
LIBRARY_BLOCK_CREATED.send_event(
library_block=LibraryBlockData(
library_key=content_library.library_key,
usage_key=usage_key
)
)
return get_library_block(usage_key)
def _component_exists(usage_key: UsageKeyV2) -> bool:
"""
Does a Component exist for this usage key?
This is a lower-level function that will return True if a Component object
exists, even if it was soft-deleted, and there is no active draft version.
"""
try:
get_component_from_usage_key(usage_key)
except ObjectDoesNotExist:
return False
return True
def import_staged_content_from_user_clipboard(library_key: LibraryLocatorV2, user, block_id) -> XBlock:
"""
Create a new library block and populate it with staged content from clipboard
Returns the newly created library block
"""
from openedx.core.djangoapps.content_staging import api as content_staging_api
if not content_staging_api:
raise RuntimeError("The required content_staging app is not installed")
user_clipboard = content_staging_api.get_user_clipboard(user)
if not user_clipboard:
return None
staged_content_id = user_clipboard.content.id
olx_str = content_staging_api.get_staged_content_olx(staged_content_id)
if olx_str is None:
return None # Shouldn't happen since we checked that the clipboard exists - mostly here for type checker
staged_content_files = content_staging_api.get_staged_content_static_files(staged_content_id)
content_library, usage_key = validate_can_add_block_to_library(
library_key,
user_clipboard.content.block_type,
block_id
)
# content_library.learning_package is technically a nullable field because
# it was added in a later migration, but we can't actually make a Library
# without one at the moment. TODO: fix this at the model level.
learning_package: LearningPackage = content_library.learning_package # type: ignore
now = datetime.now(tz=timezone.utc)
# Create component for block then populate it with clipboard data
with transaction.atomic():
# First create the Component, but do not initialize it to anything (i.e.
# no ComponentVersion).
component_type = authoring_api.get_or_create_component_type(
"xblock.v1", usage_key.block_type
)
component = authoring_api.create_component(
learning_package.id,
component_type=component_type,
local_key=usage_key.block_id,
created=now,
created_by=user.id,
)
# This will create the first component version and set the OLX/title
# appropriately. It will not publish. Once we get the newly created
# ComponentVersion back from this, we can attach all our files to it.
component_version = set_library_block_olx(usage_key, olx_str)
for staged_content_file_data in staged_content_files:
# The ``data`` attribute is going to be None because the clipboard
# is optimized to not do redundant file copying when copying/pasting
# within the same course (where all the Files and Uploads are
# shared). Learning Core backed content Components will always store
# a Component-local "copy" of the data, and rely on lower-level
# deduplication to happen in the ``contents`` app.
filename = staged_content_file_data.filename
# Grab our byte data for the file...
file_data = content_staging_api.get_staged_content_static_file_data(
staged_content_id,
filename,
)
if not file_data:
log.error(
f"Staged content {staged_content_id} included referenced "
f"file {filename}, but no file data was found."
)
continue
# Courses don't support having assets that are local to a specific
# component, and instead store all their content together in a
# shared Files and Uploads namespace. If we're pasting that into a
# Learning Core backed data model (v2 Libraries), then we want to
# prepend "static/" to the filename. This will need to get updated
# when we start moving courses over to Learning Core, or if we start
# storing course component assets in sub-directories of Files and
# Uploads.
#
# The reason we don't just search for a "static/" prefix is that
# Learning Core components can store other kinds of files if they
# wish (though none currently do).
source_assumes_global_assets = not isinstance(
user_clipboard.source_context_key, LibraryLocatorV2
)
if source_assumes_global_assets:
filename = f"static/{filename}"
# Now construct the Learning Core data models for it...
# TODO: more of this logic should be pushed down to openedx-learning
media_type_str, _encoding = mimetypes.guess_type(filename)
if not media_type_str:
media_type_str = "application/octet-stream"
media_type = authoring_api.get_or_create_media_type(media_type_str)
content = authoring_api.get_or_create_file_content(
learning_package.id,
media_type.id,
data=file_data,
created=now,
)
authoring_api.create_component_version_content(
component_version.pk,
content.id,
key=filename,
)
# Emit library block created event
LIBRARY_BLOCK_CREATED.send_event(
library_block=LibraryBlockData(
library_key=content_library.library_key,
usage_key=usage_key
)
)
# Now return the metadata about the new block
return get_library_block(usage_key)
def get_or_create_olx_media_type(block_type: str) -> MediaType:
"""
Get or create a MediaType for the block type.
Learning Core stores all Content with a Media Type (a.k.a. MIME type). For
OLX, we use the "application/vnd.*" convention, per RFC 6838.
"""
return authoring_api.get_or_create_media_type(
f"application/vnd.openedx.xblock.v1.{block_type}+xml"
)
def _create_component_for_block(
content_lib: ContentLibrary,
usage_key: LibraryUsageLocatorV2,
user_id: int | None = None,
can_stand_alone: bool = True,
):
"""
Create a Component for an XBlock type, initialize it, and return the ComponentVersion.
This will create a Component, along with its first ComponentVersion. The tag
in the OLX will have no attributes, e.g. `<problem />`. This first version
will be set as the current draft. This function does not publish the
Component.
Set can_stand_alone = False when a component is created under a container, like unit.
TODO: We should probably shift this to openedx.core.djangoapps.xblock.api
(along with its caller) since it gives runtime storage specifics. The
Library-specific logic stays in this module, so "create a block for my lib"
should stay here, but "making a block means creating a component with
text data like X" goes in xblock.api.
"""
display_name = xblock_type_display_name(usage_key.block_type)
now = datetime.now(tz=timezone.utc)
xml_text = f'<{usage_key.block_type} />'
learning_package = content_lib.learning_package
assert learning_package is not None # mostly for type checker
with transaction.atomic():
component_type = authoring_api.get_or_create_component_type(
"xblock.v1", usage_key.block_type
)
component, component_version = authoring_api.create_component_and_version(
learning_package.id,
component_type=component_type,
local_key=usage_key.block_id,
title=display_name,
created=now,
created_by=user_id,
can_stand_alone=can_stand_alone,
)
content = authoring_api.get_or_create_text_content(
learning_package.id,
get_or_create_olx_media_type(usage_key.block_type).id,
text=xml_text,
created=now,
)
authoring_api.create_component_version_content(
component_version.pk,
content.id,
key="block.xml",
)
return component_version
def delete_library_block(usage_key: LibraryUsageLocatorV2, remove_from_parent=True) -> None:
"""
Delete the specified block from this library (soft delete).
"""
component = get_component_from_usage_key(usage_key)
library_key = usage_key.context_key
affected_collections = authoring_api.get_entity_collections(component.learning_package_id, component.key)
affected_containers = lib_api.get_containers_contains_component(usage_key)
authoring_api.soft_delete_draft(component.pk)
LIBRARY_BLOCK_DELETED.send_event(
library_block=LibraryBlockData(
library_key=library_key,
usage_key=usage_key
)
)
# For each collection, trigger LIBRARY_COLLECTION_UPDATED signal and set background=True to trigger
# collection indexing asynchronously.
#
# To delete the component on collections
for collection in affected_collections:
LIBRARY_COLLECTION_UPDATED.send_event(
library_collection=LibraryCollectionData(
library_key=library_key,
collection_key=collection.key,
background=True,
)
)
# For each container, trigger LIBRARY_CONTAINER_UPDATED signal and set background=True to trigger
# container indexing asynchronously.
#
# To update the components count in containers
for container in affected_containers:
LIBRARY_CONTAINER_UPDATED.send_event(
library_container=LibraryContainerData(
library_key=library_key,
container_key=str(container.container_key),
background=True,
)
)
def restore_library_block(usage_key: LibraryUsageLocatorV2) -> None:
"""
Restore the specified library block.
"""
component = get_component_from_usage_key(usage_key)
library_key = usage_key.context_key
affected_collections = authoring_api.get_entity_collections(component.learning_package_id, component.key)
# Set draft version back to the latest available component version id.
authoring_api.set_draft_version(component.pk, component.versioning.latest.pk)
LIBRARY_BLOCK_CREATED.send_event(
library_block=LibraryBlockData(
library_key=library_key,
usage_key=usage_key
)
)
# Add tags and collections back to index
CONTENT_OBJECT_ASSOCIATIONS_CHANGED.send_event(
content_object=ContentObjectChangedData(
object_id=str(usage_key),
changes=["collections", "tags"],
),
)
# For each collection, trigger LIBRARY_COLLECTION_UPDATED signal and set background=True to trigger
# collection indexing asynchronously.
#
# To restore the component in the collections
for collection in affected_collections:
LIBRARY_COLLECTION_UPDATED.send_event(
library_collection=LibraryCollectionData(
library_key=library_key,
collection_key=collection.key,
background=True,
)
)
def get_library_block_static_asset_files(usage_key: LibraryUsageLocatorV2) -> list[LibraryXBlockStaticFile]:
"""
Given an XBlock in a content library, list all the static asset files
associated with that XBlock.
Returns a list of LibraryXBlockStaticFile objects, sorted by path.
TODO: Should this be in the general XBlock API rather than the libraries API?
"""
component = get_component_from_usage_key(usage_key)
component_version = component.versioning.draft
# If there is no Draft version, then this was soft-deleted
if component_version is None:
return []
# cvc = the ComponentVersionContent through table
cvc_set = (
component_version
.componentversioncontent_set
.filter(content__has_file=True)
.order_by('key')
.select_related('content')
)
site_root_url = get_xblock_app_config().get_site_root_url()
return [
LibraryXBlockStaticFile(
path=cvc.key,
size=cvc.content.size,
url=site_root_url + reverse(
'content_libraries:library-assets',
kwargs={
'component_version_uuid': component_version.uuid,
'asset_path': cvc.key,
}
),
)
for cvc in cvc_set
]
def add_library_block_static_asset_file(
usage_key: LibraryUsageLocatorV2,
file_path: str,
file_content: bytes,
user: UserType | None = None,
) -> LibraryXBlockStaticFile:
"""
Upload a static asset file into the library, to be associated with the
specified XBlock. Will silently overwrite an existing file of the same name.
file_path should be a name like "doc.pdf". It may optionally contain slashes
like 'en/doc.pdf'
file_content should be a binary string.
Returns a LibraryXBlockStaticFile object.
Sends a LIBRARY_BLOCK_UPDATED event.
Example:
video_block = UsageKey.from_string("lb:VideoTeam:python-intro:video:1")
add_library_block_static_asset_file(video_block, "subtitles-en.srt", subtitles.encode('utf-8'))
"""
# File path validations copied over from v1 library logic. This can't really
# hurt us inside our system because we never use these paths in an actual
# file systemthey're just string keys that point to hash-named data files
# in a common library (learning package) level directory. But it might
# become a security issue during import/export serialization.
if file_path != file_path.strip().strip('/'):
raise InvalidNameError("file_path cannot start/end with / or whitespace.")
if '//' in file_path or '..' in file_path:
raise InvalidNameError("Invalid sequence (// or ..) in file_path.")
component = get_component_from_usage_key(usage_key)
with transaction.atomic():
component_version = authoring_api.create_next_component_version(
component.pk,
content_to_replace={file_path: file_content},
created=datetime.now(tz=timezone.utc),
created_by=user.id if user else None,
)
transaction.on_commit(
lambda: LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.context_key,
usage_key=usage_key,
)
)
)
# Now figure out the URL for the newly created asset...
site_root_url = get_xblock_app_config().get_site_root_url()
local_path = reverse(
'content_libraries:library-assets',
kwargs={
'component_version_uuid': component_version.uuid,
'asset_path': file_path,
}
)
return LibraryXBlockStaticFile(
path=file_path,
url=site_root_url + local_path,
size=len(file_content),
)
def delete_library_block_static_asset_file(usage_key, file_path, user=None):
"""
Delete a static asset file from the library.
Sends a LIBRARY_BLOCK_UPDATED event.
Example:
video_block = UsageKey.from_string("lb:VideoTeam:python-intro:video:1")
delete_library_block_static_asset_file(video_block, "subtitles-en.srt")
"""
component = get_component_from_usage_key(usage_key)
now = datetime.now(tz=timezone.utc)
with transaction.atomic():
component_version = authoring_api.create_next_component_version(
component.pk,
content_to_replace={file_path: None},
created=now,
created_by=user.id if user else None,
)
transaction.on_commit(
lambda: LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.context_key,
usage_key=usage_key,
)
)
)
def get_allowed_block_types(library_key: LibraryLocatorV2): # pylint: disable=unused-argument
"""
Get a list of XBlock types that can be added to the specified content
@@ -1483,31 +715,6 @@ def publish_changes(library_key: LibraryLocatorV2, user_id: int | None = None):
)
def publish_component_changes(usage_key: LibraryUsageLocatorV2, user: UserType):
"""
Publish all pending changes in a single component.
"""
content_library = require_permission_for_library_key(
usage_key.lib_key,
user,
permissions.CAN_EDIT_THIS_CONTENT_LIBRARY
)
learning_package = content_library.learning_package
assert learning_package
component = get_component_from_usage_key(usage_key)
drafts_to_publish = authoring_api.get_all_drafts(learning_package.id).filter(
entity__key=component.key
)
authoring_api.publish_from_drafts(learning_package.id, draft_qset=drafts_to_publish, published_by=user.id)
LIBRARY_BLOCK_UPDATED.send_event(
library_block=LibraryBlockData(
library_key=usage_key.lib_key,
usage_key=usage_key,
)
)
def revert_changes(library_key: LibraryLocatorV2) -> None:
"""
Revert all pending changes to the specified library, restoring it to the
@@ -1556,536 +763,3 @@ def revert_changes(library_key: LibraryLocatorV2) -> None:
changes=["collections"],
),
)
def create_library_collection(
library_key: LibraryLocatorV2,
collection_key: str,
title: str,
*,
description: str = "",
created_by: int | None = None,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Collection:
"""
Creates a Collection in the given ContentLibrary.
If you've already fetched a ContentLibrary for the given library_key, pass it in here to avoid refetching.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
try:
collection = authoring_api.create_collection(
learning_package_id=content_library.learning_package_id,
key=collection_key,
title=title,
description=description,
created_by=created_by,
)
except IntegrityError as err:
raise LibraryCollectionAlreadyExists from err
return collection
def update_library_collection(
library_key: LibraryLocatorV2,
collection_key: str,
*,
title: str | None = None,
description: str | None = None,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Collection:
"""
Updates a Collection in the given ContentLibrary.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
try:
collection = authoring_api.update_collection(
learning_package_id=content_library.learning_package_id,
key=collection_key,
title=title,
description=description,
)
except Collection.DoesNotExist as exc:
raise ContentLibraryCollectionNotFound from exc
return collection
def update_library_collection_components(
library_key: LibraryLocatorV2,
collection_key: str,
*,
usage_keys: list[UsageKeyV2],
created_by: int | None = None,
remove=False,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Collection:
"""
Associates the Collection with Components for the given UsageKeys.
By default the Components are added to the Collection.
If remove=True, the Components are removed from the Collection.
If you've already fetched the ContentLibrary, pass it in to avoid refetching.
Raises:
* ContentLibraryCollectionNotFound if no Collection with the given pk is found in the given library.
* ContentLibraryBlockNotFound if any of the given usage_keys don't match Components in the given library.
Returns the updated Collection.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
# Fetch the Component.key values for the provided UsageKeys.
component_keys = []
for usage_key in usage_keys:
# Parse the block_family from the key to use as namespace.
block_type = BlockTypeKey.from_string(str(usage_key))
try:
component = authoring_api.get_component_by_key(
content_library.learning_package_id,
namespace=block_type.block_family,
type_name=usage_key.block_type,
local_key=usage_key.block_id,
)
except Component.DoesNotExist as exc:
raise ContentLibraryBlockNotFound(usage_key) from exc
component_keys.append(component.key)
# Note: Component.key matches its PublishableEntity.key
entities_qset = PublishableEntity.objects.filter(
key__in=component_keys,
)
if remove:
collection = authoring_api.remove_from_collection(
content_library.learning_package_id,
collection_key,
entities_qset,
)
else:
collection = authoring_api.add_to_collection(
content_library.learning_package_id,
collection_key,
entities_qset,
created_by=created_by,
)
return collection
def set_library_component_collections(
library_key: LibraryLocatorV2,
component: Component,
*,
collection_keys: list[str],
created_by: int | None = None,
# As an optimization, callers may pass in a pre-fetched ContentLibrary instance
content_library: ContentLibrary | None = None,
) -> Component:
"""
It Associates the component with collections for the given collection keys.
Only collections in queryset are associated with component, all previous component-collections
associations are removed.
If you've already fetched the ContentLibrary, pass it in to avoid refetching.
Raises:
* ContentLibraryCollectionNotFound if any of the given collection_keys don't match Collections in the given library.
Returns the updated Component.
"""
if not content_library:
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library
assert content_library.learning_package_id
assert content_library.library_key == library_key
# Note: Component.key matches its PublishableEntity.key
collection_qs = authoring_api.get_collections(content_library.learning_package_id).filter(
key__in=collection_keys
)
affected_collections = authoring_api.set_collections(
content_library.learning_package_id,
component,
collection_qs,
created_by=created_by,
)
# For each collection, trigger LIBRARY_COLLECTION_UPDATED signal and set background=True to trigger
# collection indexing asynchronously.
for collection in affected_collections:
LIBRARY_COLLECTION_UPDATED.send_event(
library_collection=LibraryCollectionData(
library_key=library_key,
collection_key=collection.key,
background=True,
)
)
return component
def get_library_collection_usage_key(
library_key: LibraryLocatorV2,
collection_key: str,
) -> LibraryCollectionLocator:
"""
Returns the LibraryCollectionLocator associated to a collection
"""
return LibraryCollectionLocator(library_key, collection_key)
def get_library_collection_from_usage_key(
collection_usage_key: LibraryCollectionLocator,
) -> Collection:
"""
Return a Collection using the LibraryCollectionLocator
"""
library_key = collection_usage_key.library_key
collection_key = collection_usage_key.collection_id
content_library = ContentLibrary.objects.get_by_key(library_key) # type: ignore[attr-defined]
assert content_library.learning_package_id is not None # shouldn't happen but it's technically possible.
try:
return authoring_api.get_collection(
content_library.learning_package_id,
collection_key,
)
except Collection.DoesNotExist as exc:
raise ContentLibraryCollectionNotFound from exc
# Import from Courseware
# ======================
class BaseEdxImportClient(abc.ABC):
"""
Base class for all courseware import clients.
Import clients are wrappers tailored to implement the steps used in the
import APIs and can leverage different backends. It is not aimed towards
being a generic API client for Open edX.
"""
EXPORTABLE_BLOCK_TYPES = {
"drag-and-drop-v2",
"problem",
"html",
"video",
}
def __init__(self, library_key=None, library=None, use_course_key_as_block_id_suffix=True):
"""
Initialize an import client for a library.
The method accepts either a library object or a key to a library object.
"""
self.use_course_key_as_block_id_suffix = use_course_key_as_block_id_suffix
if bool(library_key) == bool(library):
raise ValueError('Provide at least one of `library_key` or '
'`library`, but not both.')
if library is None:
library = ContentLibrary.objects.get_by_key(library_key)
self.library = library
@abc.abstractmethod
def get_block_data(self, block_key):
"""
Get the block's OLX and static files, if any.
"""
@abc.abstractmethod
def get_export_keys(self, course_key):
"""
Get all exportable block keys of a given course.
"""
@abc.abstractmethod
def get_block_static_data(self, asset_file):
"""
Get the contents of an asset_file..
"""
def import_block(self, modulestore_key):
"""
Import a single modulestore block.
"""
block_data = self.get_block_data(modulestore_key)
# Get or create the block in the library.
#
# To dedup blocks from different courses with the same ID, we hash the
# course key into the imported block id.
course_key_id = base64.b32encode(
hashlib.blake2s(
str(modulestore_key.course_key).encode()
).digest()
)[:16].decode().lower()
# add the course_key_id if use_course_key_as_suffix is enabled to increase the namespace.
# The option exists to not use the course key as a suffix because
# in order to preserve learner state in the v1 to v2 libraries migration,
# the v2 and v1 libraries' child block ids must be the same.
block_id = (
# Prepend 'c' to allow changing hash without conflicts.
f"{modulestore_key.block_id}_c{course_key_id}"
if self.use_course_key_as_block_id_suffix
else f"{modulestore_key.block_id}"
)
log.info('Importing to library block: id=%s', block_id)
try:
library_block = create_library_block(
self.library.library_key,
modulestore_key.block_type,
block_id,
)
dest_key = library_block.usage_key
except LibraryBlockAlreadyExists:
dest_key = LibraryUsageLocatorV2(
lib_key=self.library.library_key,
block_type=modulestore_key.block_type,
usage_id=block_id,
)
get_library_block(dest_key)
log.warning('Library block already exists: Appending static files '
'and overwriting OLX: %s', str(dest_key))
# Handle static files.
files = [
f.path for f in
get_library_block_static_asset_files(dest_key)
]
for filename, static_file in block_data.get('static_files', {}).items():
if filename in files:
# Files already added, move on.
continue
file_content = self.get_block_static_data(static_file)
add_library_block_static_asset_file(dest_key, filename, file_content)
files.append(filename)
# Import OLX.
set_library_block_olx(dest_key, block_data['olx'])
def import_blocks_from_course(self, course_key, progress_callback):
"""
Import all eligible blocks from course key.
Progress is reported through ``progress_callback``, guaranteed to be
called within an exception handler if ``exception is not None``.
"""
# Query the course and rerieve all course blocks.
export_keys = self.get_export_keys(course_key)
if not export_keys:
raise ValueError(f"The courseware course {course_key} does not have "
"any exportable content. No action taken.")
# Import each block, skipping the ones that fail.
for index, block_key in enumerate(export_keys):
try:
log.info('Importing block: %s/%s: %s', index + 1, len(export_keys), block_key)
self.import_block(block_key)
except Exception as exc: # pylint: disable=broad-except
log.exception("Error importing block: %s", block_key)
progress_callback(block_key, index + 1, len(export_keys), exc)
else:
log.info('Successfully imported: %s/%s: %s', index + 1, len(export_keys), block_key)
progress_callback(block_key, index + 1, len(export_keys), None)
log.info("Publishing library: %s", self.library.library_key)
publish_changes(self.library.library_key)
class EdxModulestoreImportClient(BaseEdxImportClient):
"""
An import client based on the local instance of modulestore.
"""
def __init__(self, modulestore_instance=None, **kwargs):
"""
Initialize the client with a modulestore instance.
"""
super().__init__(**kwargs)
self.modulestore = modulestore_instance or modulestore()
def get_block_data(self, block_key):
"""
Get block OLX by serializing it from modulestore directly.
"""
block = self.modulestore.get_item(block_key)
data = serialize_modulestore_block_for_learning_core(block)
return {'olx': data.olx_str,
'static_files': {s.name: s for s in data.static_files}}
def get_export_keys(self, course_key):
"""
Retrieve the course from modulestore and traverse its content tree.
"""
course = self.modulestore.get_course(course_key)
if isinstance(course_key, LibraryLocatorV1):
course = self.modulestore.get_library(course_key)
export_keys = set()
blocks_q = collections.deque(course.get_children())
while blocks_q:
block = blocks_q.popleft()
usage_id = block.scope_ids.usage_id
if usage_id in export_keys:
continue
if usage_id.block_type in self.EXPORTABLE_BLOCK_TYPES:
export_keys.add(usage_id)
if block.has_children:
blocks_q.extend(block.get_children())
return list(export_keys)
def get_block_static_data(self, asset_file):
"""
Get static content from its URL if available, otherwise from its data.
"""
if asset_file.data:
return asset_file.data
resp = requests.get(f"http://{settings.CMS_BASE}" + asset_file.url)
resp.raise_for_status()
return resp.content
class EdxApiImportClient(BaseEdxImportClient):
"""
An import client based on a remote Open Edx API interface.
TODO: Look over this class. We'll probably need to completely re-implement
the import process.
"""
URL_COURSES = "/api/courses/v1/courses/{course_key}"
URL_MODULESTORE_BLOCK_OLX = "/api/olx-export/v1/xblock/{block_key}/"
def __init__(self, lms_url, studio_url, oauth_key, oauth_secret, *args, **kwargs):
"""
Initialize the API client with URLs and OAuth keys.
"""
super().__init__(**kwargs)
self.lms_url = lms_url
self.studio_url = studio_url
self.oauth_client = OAuthAPIClient(
self.lms_url,
oauth_key,
oauth_secret,
)
def get_block_data(self, block_key):
"""
See parent's docstring.
"""
olx_path = self.URL_MODULESTORE_BLOCK_OLX.format(block_key=block_key)
resp = self._get(self.studio_url + olx_path)
return resp['blocks'][str(block_key)]
def get_export_keys(self, course_key):
"""
See parent's docstring.
"""
course_blocks_url = self._get_course(course_key)['blocks_url']
course_blocks = self._get(
course_blocks_url,
params={'all_blocks': True, 'depth': 'all'})['blocks']
export_keys = []
for block_info in course_blocks.values():
if block_info['type'] in self.EXPORTABLE_BLOCK_TYPES:
export_keys.append(UsageKey.from_string(block_info['id']))
return export_keys
def get_block_static_data(self, asset_file):
"""
See parent's docstring.
"""
if (asset_file['url'].startswith(self.studio_url)
and 'export-file' in asset_file['url']):
# We must call download this file with authentication. But
# we only want to pass the auth headers if this is the same
# studio instance, or else we could leak credentials to a
# third party.
path = asset_file['url'][len(self.studio_url):]
resp = self._call('get', path)
else:
resp = requests.get(asset_file['url'])
resp.raise_for_status()
return resp.content
def _get(self, *args, **kwargs):
"""
Perform a get request to the client.
"""
return self._json_call('get', *args, **kwargs)
def _get_course(self, course_key):
"""
Request details for a course.
"""
course_url = self.lms_url + self.URL_COURSES.format(course_key=course_key)
return self._get(course_url)
def _json_call(self, method, *args, **kwargs):
"""
Wrapper around request calls that ensures valid json responses.
"""
return self._call(method, *args, **kwargs).json()
def _call(self, method, *args, **kwargs):
"""
Wrapper around request calls.
"""
response = getattr(self.oauth_client, method)(*args, **kwargs)
response.raise_for_status()
return response
def import_blocks_create_task(library_key, course_key, use_course_key_as_block_id_suffix=True):
"""
Create a new import block task.
This API will schedule a celery task to perform the import, and it returns a
import task object for polling.
"""
library = ContentLibrary.objects.get_by_key(library_key)
import_task = ContentLibraryBlockImportTask.objects.create(
library=library,
course_id=course_key,
)
result = tasks.import_blocks_from_course.apply_async(
args=(import_task.pk, str(course_key), use_course_key_as_block_id_suffix)
)
log.info(f"Import block task created: import_task={import_task} "
f"celery_task={result.id}")
return import_task

View File

@@ -1,19 +1,453 @@
"""
Content Library REST APIs related to XBlocks/Components and their static assets
"""
# pylint: disable=unused-import
from django.core.exceptions import ObjectDoesNotExist
from django.db.transaction import non_atomic_requests
from django.http import Http404, HttpResponse, StreamingHttpResponse
from django.urls import reverse
from django.utils.decorators import method_decorator
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
from rest_framework.exceptions import NotFound, ValidationError
from rest_framework.generics import GenericAPIView
from rest_framework.parsers import MultiPartParser
from rest_framework.response import Response
from rest_framework.views import APIView
# TODO: move the block and block asset related views from 'libraries' into this file
from .libraries import (
LibraryBlockAssetListView,
LibraryBlockAssetView,
LibraryBlockCollectionsView,
LibraryBlockLtiUrlView,
LibraryBlockOlxView,
LibraryBlockPublishView,
LibraryBlockRestore,
LibraryBlocksView,
LibraryBlockView,
LibraryComponentAssetView,
LibraryComponentDraftAssetView,
import edx_api_doc_tools as apidocs
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
from openedx_learning.api import authoring as authoring_api
from openedx.core.djangoapps.content_libraries import api, permissions
from openedx.core.djangoapps.content_libraries.rest_api.serializers import (
ContentLibraryComponentCollectionsUpdateSerializer,
LibraryXBlockCreationSerializer,
LibraryXBlockMetadataSerializer,
LibraryXBlockOlxSerializer,
LibraryXBlockStaticFileSerializer,
LibraryXBlockStaticFilesSerializer,
)
from openedx.core.lib.api.view_utils import view_auth_classes
from openedx.core.types.http import RestRequest
from openedx.core.djangoapps.xblock import api as xblock_api
from .libraries import LibraryApiPaginationDocs
from .utils import convert_exceptions
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlocksView(GenericAPIView):
"""
Views to work with XBlocks in a specific content library.
"""
serializer_class = LibraryXBlockMetadataSerializer
@apidocs.schema(
parameters=[
*LibraryApiPaginationDocs.apidoc_params,
apidocs.query_parameter(
'text_search',
str,
description="The string used to filter libraries by searching in title, id, org, or description",
),
apidocs.query_parameter(
'block_type',
str,
description="The block type to search for. If omitted or blank, searches for all types. "
"May be specified multiple times to match multiple types."
)
],
)
@convert_exceptions
def get(self, request, lib_key_str):
"""
Get the list of all top-level blocks in this content library
"""
key = LibraryLocatorV2.from_string(lib_key_str)
text_search = request.query_params.get('text_search', None)
block_types = request.query_params.getlist('block_type') or None
api.require_permission_for_library_key(key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
components = api.get_library_components(key, text_search=text_search, block_types=block_types)
paginated_xblock_metadata = [
api.LibraryXBlockMetadata.from_component(key, component)
for component in self.paginate_queryset(components)
]
serializer = LibraryXBlockMetadataSerializer(paginated_xblock_metadata, many=True)
return self.get_paginated_response(serializer.data)
@convert_exceptions
@swagger_auto_schema(
request_body=LibraryXBlockCreationSerializer,
responses={200: LibraryXBlockMetadataSerializer}
)
def post(self, request, lib_key_str):
"""
Add a new XBlock to this content library
"""
library_key = LibraryLocatorV2.from_string(lib_key_str)
api.require_permission_for_library_key(library_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
serializer = LibraryXBlockCreationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Create a new regular top-level block:
try:
result = api.create_library_block(library_key, user_id=request.user.id, **serializer.validated_data)
except api.IncompatibleTypesError as err:
raise ValidationError( # lint-amnesty, pylint: disable=raise-missing-from
detail={'block_type': str(err)},
)
return Response(LibraryXBlockMetadataSerializer(result).data)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockView(APIView):
"""
Views to work with an existing XBlock in a content library.
"""
@convert_exceptions
def get(self, request, usage_key_str):
"""
Get metadata about an existing XBlock in the content library.
This API doesn't support versioning; most of the information it returns
is related to the latest draft version, or to all versions of the block.
If you need to get the display name of a previous version, use the
similar "metadata" API from djangoapps.xblock, which does support
versioning.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
result = api.get_library_block(key, include_collections=True)
return Response(LibraryXBlockMetadataSerializer(result).data)
@convert_exceptions
def delete(self, request, usage_key_str): # pylint: disable=unused-argument
"""
Delete a usage of a block from the library (and any children it has).
If this is the only usage of the block's definition within this library,
both the definition and the usage will be deleted. If this is only one
of several usages, the definition will be kept. Usages by linked bundles
are ignored and will not prevent deletion of the definition.
If the usage points to a definition in a linked bundle, the usage will
be deleted but the link and the linked bundle will be unaffected.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
api.delete_library_block(key)
return Response({})
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockAssetListView(APIView):
"""
Views to list an existing XBlock's static asset files
"""
@convert_exceptions
def get(self, request, usage_key_str):
"""
List the static asset files belonging to this block.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
files = api.get_library_block_static_asset_files(key)
return Response(LibraryXBlockStaticFilesSerializer({"files": files}).data)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockAssetView(APIView):
"""
Views to work with an existing XBlock's static asset files
"""
parser_classes = (MultiPartParser, )
@convert_exceptions
def get(self, request, usage_key_str, file_path):
"""
Get a static asset file belonging to this block.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
files = api.get_library_block_static_asset_files(key)
for f in files:
if f.path == file_path:
return Response(LibraryXBlockStaticFileSerializer(f).data)
raise NotFound
@convert_exceptions
def put(self, request, usage_key_str, file_path):
"""
Replace a static asset file belonging to this block.
"""
file_path = file_path.replace(" ", "_") # Messes up url/name correspondence due to URL encoding.
usage_key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(
usage_key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
)
file_wrapper = request.data['content']
if file_wrapper.size > 20 * 1024 * 1024: # > 20 MiB
# TODO: This check was written when V2 Libraries were backed by the Blockstore micro-service.
# Now that we're on Learning Core, do we still need it? Here's the original comment:
# In the future, we need a way to use file_wrapper.chunks() to read
# the file in chunks and stream that to Blockstore, but Blockstore
# currently lacks an API for streaming file uploads.
# Ref: https://github.com/openedx/edx-platform/issues/34737
raise ValidationError("File too big")
file_content = file_wrapper.read()
try:
result = api.add_library_block_static_asset_file(usage_key, file_path, file_content, request.user)
except ValueError:
raise ValidationError("Invalid file path") # lint-amnesty, pylint: disable=raise-missing-from
return Response(LibraryXBlockStaticFileSerializer(result).data)
@convert_exceptions
def delete(self, request, usage_key_str, file_path):
"""
Delete a static asset file belonging to this block.
"""
usage_key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(
usage_key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
)
try:
api.delete_library_block_static_asset_file(usage_key, file_path, request.user)
except ValueError:
raise ValidationError("Invalid file path") # lint-amnesty, pylint: disable=raise-missing-from
return Response(status=status.HTTP_204_NO_CONTENT)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockPublishView(APIView):
"""
Commit/publish all of the draft changes made to the component.
"""
@convert_exceptions
def post(self, request, usage_key_str):
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.publish_component_changes(key, request.user)
return Response({})
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockCollectionsView(APIView):
"""
View to set collections for a component.
"""
@convert_exceptions
def patch(self, request: RestRequest, usage_key_str) -> Response:
"""
Sets Collections for a Component.
Collection and Components must all be part of the given library/learning package.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
content_library = api.require_permission_for_library_key(
key.lib_key,
request.user,
permissions.CAN_EDIT_THIS_CONTENT_LIBRARY
)
component = api.get_component_from_usage_key(key)
serializer = ContentLibraryComponentCollectionsUpdateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
collection_keys = serializer.validated_data['collection_keys']
api.set_library_component_collections(
library_key=key.lib_key,
component=component,
collection_keys=collection_keys,
created_by=request.user.id,
content_library=content_library,
)
return Response({'count': len(collection_keys)})
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockLtiUrlView(APIView):
"""
Views to generate LTI URL for existing XBlocks in a content library.
Returns 404 in case the block not found by the given key.
"""
@convert_exceptions
def get(self, request, usage_key_str):
"""
Get the LTI launch URL for the XBlock.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
# Get the block to validate its existence
api.get_library_block(key)
lti_login_url = f"{reverse('content_libraries:lti-launch')}?id={key}"
return Response({"lti_url": lti_login_url})
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockOlxView(APIView):
"""
Views to work with an existing XBlock's OLX
"""
@convert_exceptions
def get(self, request, usage_key_str):
"""
DEPRECATED. Use get_block_olx_view() in xblock REST-API.
Can be removed post-Teak.
Get the block's OLX
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
xml_str = xblock_api.get_block_draft_olx(key)
return Response(LibraryXBlockOlxSerializer({"olx": xml_str}).data)
@convert_exceptions
def post(self, request, usage_key_str):
"""
Replace the block's OLX.
This API is only meant for use by developers or API client applications.
Very little validation is done.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
serializer = LibraryXBlockOlxSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
new_olx_str = serializer.validated_data["olx"]
try:
version_num = api.set_library_block_olx(key, new_olx_str).version_num
except ValueError as err:
raise ValidationError(detail=str(err)) # lint-amnesty, pylint: disable=raise-missing-from
return Response(LibraryXBlockOlxSerializer({"olx": new_olx_str, "version_num": version_num}).data)
@view_auth_classes()
class LibraryBlockRestore(APIView):
"""
View to restore soft-deleted library xblocks.
"""
@convert_exceptions
def post(self, request, usage_key_str) -> Response:
"""
Restores a soft-deleted library block that belongs to a Content Library
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
api.restore_library_block(key)
return Response(None, status=status.HTTP_204_NO_CONTENT)
def get_component_version_asset(request, component_version_uuid, asset_path):
"""
Serves static assets associated with particular Component versions.
Important notes:
* This is meant for Studio/authoring use ONLY. It requires read access to
the content library.
* It uses the UUID because that's easier to parse than the key field (which
could be part of an OpaqueKey, but could also be almost anything else).
* This is not very performant, and we still want to use the X-Accel-Redirect
method for serving LMS traffic in the longer term (and probably Studio
eventually).
"""
try:
component_version = authoring_api.get_component_version_by_uuid(
component_version_uuid
)
except ObjectDoesNotExist as exc:
raise Http404() from exc
# Permissions check...
learning_package = component_version.component.learning_package
library_key = LibraryLocatorV2.from_string(learning_package.key)
api.require_permission_for_library_key(
library_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY,
)
# We already have logic for getting the correct content and generating the
# proper headers in Learning Core, but the response generated here is an
# X-Accel-Redirect and lacks the actual content. We eventually want to use
# this response in conjunction with a media reverse proxy (Caddy or Nginx),
# but in the short term we're just going to remove the redirect and stream
# the content directly.
redirect_response = authoring_api.get_redirect_response_for_component_asset(
component_version_uuid,
asset_path,
public=False,
)
# If there was any error, we return that response because it will have the
# correct headers set and won't have any X-Accel-Redirect header set.
if redirect_response.status_code != 200:
return redirect_response
# If we got here, we know that the asset exists and it's okay to download.
cv_content = component_version.componentversioncontent_set.get(key=asset_path)
content = cv_content.content
# Delete the re-direct part of the response headers. We'll copy the rest.
headers = redirect_response.headers
headers.pop('X-Accel-Redirect')
# We need to set the content size header manually because this is a
# streaming response. It's not included in the redirect headers because it's
# not needed there (the reverse-proxy would have direct access to the file).
headers['Content-Length'] = content.size
if request.method == "HEAD":
return HttpResponse(headers=headers)
# Otherwise it's going to be a GET response. We don't support response
# offsets or anything fancy, because we don't expect to run this view at
# LMS-scale.
return StreamingHttpResponse(
content.read_file().chunks(),
headers=redirect_response.headers,
)
@view_auth_classes()
class LibraryComponentAssetView(APIView):
"""
Serves static assets associated with particular Component versions.
"""
@convert_exceptions
def get(self, request, component_version_uuid, asset_path):
"""
GET API for fetching static asset for given component_version_uuid.
"""
return get_component_version_asset(request, component_version_uuid, asset_path)
@view_auth_classes()
class LibraryComponentDraftAssetView(APIView):
"""
Serves the draft version of static assets associated with a Library Component.
See `get_component_version_asset` for more details
"""
@convert_exceptions
def get(self, request, usage_key, asset_path):
"""
Fetches component_version_uuid for given usage_key and returns component asset.
"""
try:
component_version_uuid = api.get_component_from_usage_key(usage_key).versioning.draft.uuid
except ObjectDoesNotExist as exc:
raise Http404() from exc
return get_component_version_asset(request, component_version_uuid, asset_path)

View File

@@ -69,11 +69,9 @@ import logging
from django.conf import settings
from django.contrib.auth import authenticate, get_user_model, login
from django.contrib.auth.models import Group
from django.core.exceptions import ObjectDoesNotExist
from django.db.transaction import atomic, non_atomic_requests
from django.http import Http404, HttpResponse, HttpResponseBadRequest, JsonResponse, StreamingHttpResponse
from django.http import Http404, HttpResponseBadRequest, JsonResponse
from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.translation import gettext as _
from django.views.decorators.clickjacking import xframe_options_exempt
@@ -85,14 +83,12 @@ from pylti1p3.exception import LtiException, OIDCException
import edx_api_doc_tools as apidocs
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
from openedx_learning.api import authoring
from organizations.api import ensure_organization
from organizations.exceptions import InvalidOrganizationException
from organizations.models import Organization
from rest_framework import status
from rest_framework.exceptions import NotFound, PermissionDenied, ValidationError
from rest_framework.generics import GenericAPIView
from rest_framework.parsers import MultiPartParser
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import GenericViewSet
@@ -110,13 +106,9 @@ from openedx.core.djangoapps.content_libraries.rest_api.serializers import (
ContentLibraryPermissionLevelSerializer,
ContentLibraryPermissionSerializer,
ContentLibraryUpdateSerializer,
ContentLibraryComponentCollectionsUpdateSerializer,
LibraryXBlockCreationSerializer,
LibraryXBlockMetadataSerializer,
LibraryXBlockTypeSerializer,
LibraryXBlockOlxSerializer,
LibraryXBlockStaticFileSerializer,
LibraryXBlockStaticFilesSerializer,
ContentLibraryAddPermissionByEmailSerializer,
LibraryPasteClipboardSerializer,
)
@@ -124,7 +116,6 @@ import openedx.core.djangoapps.site_configuration.helpers as configuration_helpe
from openedx.core.lib.api.view_utils import view_auth_classes
from openedx.core.djangoapps.safe_sessions.middleware import mark_user_change_as_expected
from openedx.core.djangoapps.xblock import api as xblock_api
from openedx.core.types.http import RestRequest
from .utils import convert_exceptions
from ..models import ContentLibrary, LtiGradedResource, LtiProfile
@@ -632,212 +623,6 @@ class LibraryBlockView(APIView):
return Response({})
@view_auth_classes()
class LibraryBlockRestore(APIView):
"""
View to restore soft-deleted library xblocks.
"""
@convert_exceptions
def post(self, request, usage_key_str) -> Response:
"""
Restores a soft-deleted library block that belongs to a Content Library
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
api.restore_library_block(key)
return Response(None, status=status.HTTP_204_NO_CONTENT)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockCollectionsView(APIView):
"""
View to set collections for a component.
"""
@convert_exceptions
def patch(self, request: RestRequest, usage_key_str) -> Response:
"""
Sets Collections for a Component.
Collection and Components must all be part of the given library/learning package.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
content_library = api.require_permission_for_library_key(
key.lib_key,
request.user,
permissions.CAN_EDIT_THIS_CONTENT_LIBRARY
)
component = api.get_component_from_usage_key(key)
serializer = ContentLibraryComponentCollectionsUpdateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
collection_keys = serializer.validated_data['collection_keys']
api.set_library_component_collections(
library_key=key.lib_key,
component=component,
collection_keys=collection_keys,
created_by=request.user.id,
content_library=content_library,
)
return Response({'count': len(collection_keys)})
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockLtiUrlView(APIView):
"""
Views to generate LTI URL for existing XBlocks in a content library.
Returns 404 in case the block not found by the given key.
"""
@convert_exceptions
def get(self, request, usage_key_str):
"""
Get the LTI launch URL for the XBlock.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
# Get the block to validate its existence
api.get_library_block(key)
lti_login_url = f"{reverse('content_libraries:lti-launch')}?id={key}"
return Response({"lti_url": lti_login_url})
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockOlxView(APIView):
"""
Views to work with an existing XBlock's OLX
"""
@convert_exceptions
def get(self, request, usage_key_str):
"""
DEPRECATED. Use get_block_olx_view() in xblock REST-API.
Can be removed post-Teak.
Get the block's OLX
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
xml_str = xblock_api.get_block_draft_olx(key)
return Response(LibraryXBlockOlxSerializer({"olx": xml_str}).data)
@convert_exceptions
def post(self, request, usage_key_str):
"""
Replace the block's OLX.
This API is only meant for use by developers or API client applications.
Very little validation is done.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
serializer = LibraryXBlockOlxSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
new_olx_str = serializer.validated_data["olx"]
try:
version_num = api.set_library_block_olx(key, new_olx_str).version_num
except ValueError as err:
raise ValidationError(detail=str(err)) # lint-amnesty, pylint: disable=raise-missing-from
return Response(LibraryXBlockOlxSerializer({"olx": new_olx_str, "version_num": version_num}).data)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockAssetListView(APIView):
"""
Views to list an existing XBlock's static asset files
"""
@convert_exceptions
def get(self, request, usage_key_str):
"""
List the static asset files belonging to this block.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
files = api.get_library_block_static_asset_files(key)
return Response(LibraryXBlockStaticFilesSerializer({"files": files}).data)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockAssetView(APIView):
"""
Views to work with an existing XBlock's static asset files
"""
parser_classes = (MultiPartParser, )
@convert_exceptions
def get(self, request, usage_key_str, file_path):
"""
Get a static asset file belonging to this block.
"""
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(key.lib_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY)
files = api.get_library_block_static_asset_files(key)
for f in files:
if f.path == file_path:
return Response(LibraryXBlockStaticFileSerializer(f).data)
raise NotFound
@convert_exceptions
def put(self, request, usage_key_str, file_path):
"""
Replace a static asset file belonging to this block.
"""
file_path = file_path.replace(" ", "_") # Messes up url/name correspondence due to URL encoding.
usage_key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(
usage_key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
)
file_wrapper = request.data['content']
if file_wrapper.size > 20 * 1024 * 1024: # > 20 MiB
# TODO: This check was written when V2 Libraries were backed by the Blockstore micro-service.
# Now that we're on Learning Core, do we still need it? Here's the original comment:
# In the future, we need a way to use file_wrapper.chunks() to read
# the file in chunks and stream that to Blockstore, but Blockstore
# currently lacks an API for streaming file uploads.
# Ref: https://github.com/openedx/edx-platform/issues/34737
raise ValidationError("File too big")
file_content = file_wrapper.read()
try:
result = api.add_library_block_static_asset_file(usage_key, file_path, file_content, request.user)
except ValueError:
raise ValidationError("Invalid file path") # lint-amnesty, pylint: disable=raise-missing-from
return Response(LibraryXBlockStaticFileSerializer(result).data)
@convert_exceptions
def delete(self, request, usage_key_str, file_path):
"""
Delete a static asset file belonging to this block.
"""
usage_key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.require_permission_for_library_key(
usage_key.lib_key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
)
try:
api.delete_library_block_static_asset_file(usage_key, file_path, request.user)
except ValueError:
raise ValidationError("Invalid file path") # lint-amnesty, pylint: disable=raise-missing-from
return Response(status=status.HTTP_204_NO_CONTENT)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryBlockPublishView(APIView):
"""
Commit/publish all of the draft changes made to the component.
"""
@convert_exceptions
def post(self, request, usage_key_str):
key = LibraryUsageLocatorV2.from_string(usage_key_str)
api.publish_component_changes(key, request.user)
return Response({})
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryImportTaskViewSet(GenericViewSet):
@@ -859,7 +644,7 @@ class LibraryImportTaskViewSet(GenericViewSet):
request.user,
permissions.CAN_VIEW_THIS_CONTENT_LIBRARY
)
queryset = api.ContentLibrary.objects.get_by_key(library_key).import_tasks
queryset = ContentLibrary.objects.get_by_key(library_key).import_tasks
result = ContentLibraryBlockImportTaskSerializer(queryset, many=True).data
return self.get_paginated_response(
@@ -1175,105 +960,3 @@ class LtiToolJwksView(LtiToolView):
Return the JWKS.
"""
return JsonResponse(self.lti_tool_config.get_jwks(), safe=False)
def get_component_version_asset(request, component_version_uuid, asset_path):
"""
Serves static assets associated with particular Component versions.
Important notes:
* This is meant for Studio/authoring use ONLY. It requires read access to
the content library.
* It uses the UUID because that's easier to parse than the key field (which
could be part of an OpaqueKey, but could also be almost anything else).
* This is not very performant, and we still want to use the X-Accel-Redirect
method for serving LMS traffic in the longer term (and probably Studio
eventually).
"""
try:
component_version = authoring.get_component_version_by_uuid(
component_version_uuid
)
except ObjectDoesNotExist as exc:
raise Http404() from exc
# Permissions check...
learning_package = component_version.component.learning_package
library_key = LibraryLocatorV2.from_string(learning_package.key)
api.require_permission_for_library_key(
library_key, request.user, permissions.CAN_VIEW_THIS_CONTENT_LIBRARY,
)
# We already have logic for getting the correct content and generating the
# proper headers in Learning Core, but the response generated here is an
# X-Accel-Redirect and lacks the actual content. We eventually want to use
# this response in conjunction with a media reverse proxy (Caddy or Nginx),
# but in the short term we're just going to remove the redirect and stream
# the content directly.
redirect_response = authoring.get_redirect_response_for_component_asset(
component_version_uuid,
asset_path,
public=False,
)
# If there was any error, we return that response because it will have the
# correct headers set and won't have any X-Accel-Redirect header set.
if redirect_response.status_code != 200:
return redirect_response
# If we got here, we know that the asset exists and it's okay to download.
cv_content = component_version.componentversioncontent_set.get(key=asset_path)
content = cv_content.content
# Delete the re-direct part of the response headers. We'll copy the rest.
headers = redirect_response.headers
headers.pop('X-Accel-Redirect')
# We need to set the content size header manually because this is a
# streaming response. It's not included in the redirect headers because it's
# not needed there (the reverse-proxy would have direct access to the file).
headers['Content-Length'] = content.size
if request.method == "HEAD":
return HttpResponse(headers=headers)
# Otherwise it's going to be a GET response. We don't support response
# offsets or anything fancy, because we don't expect to run this view at
# LMS-scale.
return StreamingHttpResponse(
content.read_file().chunks(),
headers=redirect_response.headers,
)
@view_auth_classes()
class LibraryComponentAssetView(APIView):
"""
Serves static assets associated with particular Component versions.
"""
@convert_exceptions
def get(self, request, component_version_uuid, asset_path):
"""
GET API for fetching static asset for given component_version_uuid.
"""
return get_component_version_asset(request, component_version_uuid, asset_path)
@view_auth_classes()
class LibraryComponentDraftAssetView(APIView):
"""
Serves the draft version of static assets associated with a Library Component.
See `get_component_version_asset` for more details
"""
@convert_exceptions
def get(self, request, usage_key, asset_path):
"""
Fetches component_version_uuid for given usage_key and returns component asset.
"""
try:
component_version_uuid = api.get_component_from_usage_key(usage_key).versioning.draft.uuid
except ObjectDoesNotExist as exc:
raise Http404() from exc
return get_component_version_asset(request, component_version_uuid, asset_path)

View File

@@ -67,11 +67,11 @@ class EdxModulestoreImportClientTest(TestCase):
with self.assertRaises(ValueError):
self.client.import_blocks_from_course('foobar', lambda *_: None)
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.create_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.get_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.get_library_block_static_asset_files')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.publish_changes')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.set_library_block_olx')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.create_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.get_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.get_library_block_static_asset_files')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.publish_changes')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.set_library_block_olx')
def test_import_blocks_from_course_on_block_with_olx(
self,
mock_set_library_block_olx,
@@ -103,9 +103,9 @@ class EdxModulestoreImportClientTest(TestCase):
mock.ANY, 'fake-olx')
mock_publish_changes.assert_called_once()
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.create_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.get_library_block_static_asset_files')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.set_library_block_olx')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.create_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.get_library_block_static_asset_files')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.set_library_block_olx')
def test_import_block_when_called_twice_same_block_but_different_course(
self,
mock_set_library_block_olx,
@@ -140,7 +140,7 @@ class EdxModulestoreImportClientTest(TestCase):
mock_set_library_block_olx.assert_called_once()
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.OAuthAPIClient')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.OAuthAPIClient')
class EdxApiImportClientTest(TestCase):
"""
Tests for EdxApiImportClient.
@@ -197,11 +197,11 @@ class EdxApiImportClientTest(TestCase):
return mock_response, mock_content
return mock_response
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.add_library_block_static_asset_file')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.create_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.get_library_block_static_asset_files')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.publish_changes')
@mock.patch('openedx.core.djangoapps.content_libraries.api.libraries.set_library_block_olx')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.add_library_block_static_asset_file')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.create_library_block')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.get_library_block_static_asset_files')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.publish_changes')
@mock.patch('openedx.core.djangoapps.content_libraries.api.courseware_import.set_library_block_olx')
def test_import_block_when_url_is_from_studio(
self,
mock_set_library_block_olx,
@@ -746,10 +746,26 @@ class ContentLibraryCollectionsTest(ContentLibrariesRestApiTest, OpenEdxEventsTe
)
class ContentLibraryContainersTest(ContentLibrariesRestApiTest, TestCase):
class ContentLibraryContainersTest(ContentLibrariesRestApiTest, OpenEdxEventsTestMixin):
"""
Tests for Content Library API containers methods.
"""
ENABLED_OPENEDX_EVENTS = [
LIBRARY_CONTAINER_UPDATED.event_type,
]
@classmethod
def setUpClass(cls):
"""
Set up class method for the Test class.
TODO: It's unclear why we need to call start_events_isolation ourselves rather than relying on
OpenEdxEventsTestMixin.setUpClass to handle it. It fails it we don't, and many other test cases do it,
so we're following a pattern here. But that pattern doesn't really make sense.
"""
super().setUpClass()
cls.start_events_isolation()
def setUp(self):
super().setUp()
@@ -835,6 +851,15 @@ class ContentLibraryContainersTest(ContentLibrariesRestApiTest, TestCase):
api.delete_library_block(self.html_block_usage_key)
self._validate_calls_of_html_block(container_update_event_receiver)
def test_call_container_update_signal_when_restore_component(self):
api.delete_library_block(self.html_block_usage_key)
container_update_event_receiver = mock.Mock()
LIBRARY_CONTAINER_UPDATED.connect(container_update_event_receiver)
api.restore_library_block(self.html_block_usage_key)
self._validate_calls_of_html_block(container_update_event_receiver)
def test_call_container_update_signal_when_update_olx(self):
block_olx = "<html><b>Hello world!</b></html>"
container_update_event_receiver = mock.Mock()