feat: Copy/Paste associated static assets along with components (#32346)

* refactor: improve typing of StaticFile named tuple
* feat: copy static asset files into the clipboard
* feat: paste static assets
* feat: show notification in studio about pasted assets
* fix: HTML XBlocks would lose the editor="raw" setting when copy-pasted.
* feat: copy python_lib.zip to the clipboard when it seems to be in use
This commit is contained in:
Braden MacDonald
2023-06-27 12:06:43 -07:00
committed by GitHub
parent 32a3e2a94f
commit 12a8d99824
18 changed files with 735 additions and 45 deletions

View File

@@ -1,31 +1,33 @@
"""
Helper methods for Studio views.
"""
from __future__ import annotations
import logging
import urllib
from lxml import etree
from mimetypes import guess_type
from attrs import frozen, Factory
from django.utils.translation import gettext as _
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.keys import AssetKey, CourseKey, UsageKey
from opaque_keys.edx.locator import DefinitionLocator, LocalId
from xblock.core import XBlock
from xblock.fields import ScopeIds
from xblock.runtime import IdGenerator
from xmodule.contentstore.content import StaticContent
from xmodule.contentstore.django import contentstore
from xmodule.exceptions import NotFoundError
from xmodule.modulestore.django import modulestore
# from cms.djangoapps.contentstore.views.preview import _load_preview_block
from cms.djangoapps.models.settings.course_grading import CourseGradingModel
from common.djangoapps.student import auth
from common.djangoapps.student.roles import CourseCreatorRole, OrgContentCreatorRole
try:
# Technically this is a django app plugin, so we should not error if it's not installed:
import openedx.core.djangoapps.content_staging.api as content_staging_api
except ImportError:
content_staging_api = None
import openedx.core.djangoapps.content_staging.api as content_staging_api
from .utils import reverse_course_url, reverse_library_url, reverse_usage_url
log = logging.getLogger(__name__)
# Note: Grader types are used throughout the platform but most usages are simply in-line
# strings. In addition, new grader types can be defined on the fly anytime one is needed
# (because they're just strings). This dict is an attempt to constrain the sprawl in Studio.
@@ -210,13 +212,24 @@ class ImportIdGenerator(IdGenerator):
return DefinitionLocator(block_type, LocalId(block_type))
def import_staged_content_from_user_clipboard(parent_key: UsageKey, request):
@frozen
class StaticFileNotices:
""" Information about what static files were updated (or not) when pasting content into another course """
new_files: list[str] = Factory(list)
conflicting_files: list[str] = Factory(list)
error_files: list[str] = Factory(list)
def import_staged_content_from_user_clipboard(parent_key: UsageKey, request) -> tuple[XBlock | None, StaticFileNotices]:
"""
Import a block (and any children it has) from "staged" OLX.
Import a block (along with its children and any required static assets) from
the "staged" OLX in the user's clipboard.
Does not deal with permissions or REST stuff - do that before calling this.
Returns the newly created block on success or None if the clipboard is
empty.
Returns (1) the newly created block on success or None if the clipboard is
empty, and (2) a summary of changes made to static files in the destination
course.
"""
from cms.djangoapps.contentstore.views.preview import _load_preview_block
@@ -226,9 +239,10 @@ def import_staged_content_from_user_clipboard(parent_key: UsageKey, request):
user_clipboard = content_staging_api.get_user_clipboard(request.user.id)
if not user_clipboard:
# Clipboard is empty or expired/error/loading
return None
return None, StaticFileNotices()
block_type = user_clipboard.content.block_type
olx_str = content_staging_api.get_staged_content_olx(user_clipboard.content.id)
static_files = content_staging_api.get_staged_content_static_files(user_clipboard.content.id)
node = etree.fromstring(olx_str)
store = modulestore()
with store.bulk_operations(parent_key.course_key):
@@ -247,6 +261,11 @@ def import_staged_content_from_user_clipboard(parent_key: UsageKey, request):
# 'keys' and uses 'id_generator', but the default XBlock parse_xml ignores 'id_generator' and uses 'keys'.
# For children of this block, obviously only id_generator is used.
xblock_class = runtime.load_block_type(block_type)
# Note: if we find a case where any XBlock needs access to the block-specific static files that were saved to
# export_fs during copying, we could make them available here via runtime.resources_fs before calling parse_xml.
# However, currently the only known case for that is video block's transcript files, and those will
# automatically be "carried over" to the new XBlock even in a different course because the video ID is the same,
# and VAL will thus make the transcript available.
temp_xblock = xblock_class.parse_xml(node, runtime, keys, id_generator)
if xblock_class.has_children and temp_xblock.children:
raise NotImplementedError("We don't yet support pasting XBlocks with children")
@@ -257,7 +276,93 @@ def import_staged_content_from_user_clipboard(parent_key: UsageKey, request):
new_xblock = store.update_item(temp_xblock, request.user.id, allow_not_found=True)
parent_xblock.children.append(new_xblock.location)
store.update_item(parent_xblock, request.user.id)
return new_xblock
# Now handle static files that need to go into Files & Uploads:
notices = _import_files_into_course(
course_key=parent_key.context_key,
staged_content_id=user_clipboard.content.id,
static_files=static_files,
)
return new_xblock, notices
def _import_files_into_course(
course_key: CourseKey,
staged_content_id: int,
static_files: list[content_staging_api.StagedContentFileData],
) -> StaticFileNotices:
"""
For the given staged static asset files (which are in "Staged Content" such as the user's clipbaord, but which
need to end up in the course's Files & Uploads page), import them into the destination course, unless they already
exist.
"""
# List of files that were newly added to the destination course
new_files = []
# List of files that conflicted with identically named files already in the destination course
conflicting_files = []
# List of files that had an error (shouldn't happen unless we have some kind of bug)
error_files = []
for file_data_obj in static_files:
if not isinstance(file_data_obj.source_key, AssetKey):
# This static asset was managed by the XBlock and instead of being added to "Files & Uploads", it is stored
# using some other system. We could make it available via runtime.resources_fs during XML parsing, but it's
# not needed here.
continue
# At this point, we know this is a "Files & Uploads" asset that we may need to copy into the course:
try:
result = _import_file_into_course(course_key, staged_content_id, file_data_obj)
if result is True:
new_files.append(file_data_obj.filename)
elif result is None:
pass # This file already exists; no action needed.
else:
conflicting_files.append(file_data_obj.filename)
except Exception: # lint-amnesty, pylint: disable=broad-except
error_files.append(file_data_obj.filename)
log.exception(f"Failed to import Files & Uploads file {file_data_obj.filename}")
return StaticFileNotices(
new_files=new_files,
conflicting_files=conflicting_files,
error_files=error_files,
)
def _import_file_into_course(
course_key: CourseKey,
staged_content_id: int,
file_data_obj: content_staging_api.StagedContentFileData,
) -> bool | None:
"""
Import a single staged static asset file into the course, unless it already exists.
Returns True if it was imported, False if there's a conflict, or None if
the file already existed (no action needed).
"""
filename = file_data_obj.filename
new_key = course_key.make_asset_key("asset", filename)
try:
current_file = contentstore().find(new_key)
except NotFoundError:
current_file = None
if not current_file:
# This static asset should be imported into the new course:
content_type = guess_type(filename)[0]
data = content_staging_api.get_staged_content_static_file_data(staged_content_id, filename)
if data is None:
raise NotFoundError(file_data_obj.source_key)
content = StaticContent(new_key, name=filename, content_type=content_type, data=data)
# If it's an image file, also generate the thumbnail:
thumbnail_content, thumbnail_location = contentstore().generate_thumbnail(content)
if thumbnail_content is not None:
content.thumbnail_location = thumbnail_location
contentstore().save(content)
return True
elif current_file.content_digest == file_data_obj.md5_hash:
# The file already exists and matches exactly, so no action is needed
return None
else:
# There is a conflict with some other file that has the same name.
return False
def is_item_in_course_tree(item):

View File

@@ -5,9 +5,9 @@ APIs.
"""
from opaque_keys.edx.keys import UsageKey
from rest_framework.test import APIClient
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import ToyCourseFactory
from xmodule.modulestore.django import contentstore, modulestore
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase, upload_file_to_course
from xmodule.modulestore.tests.factories import BlockFactory, CourseFactory, ToyCourseFactory
CLIPBOARD_ENDPOINT = "/api/content-staging/v1/clipboard/"
XBLOCK_ENDPOINT = "/xblock/"
@@ -60,3 +60,72 @@ class ClipboardPasteTestCase(ModuleStoreTestCase):
assert new_video.youtube_id_1_0 == orig_video.youtube_id_1_0
# The new block should store a reference to where it was copied from
assert new_video.copied_from_block == str(video_key)
def test_paste_with_assets(self):
"""
When pasting into a different course, any required static assets should
be pasted too, unless they already exist in the destination course.
"""
dest_course_key, client = self._setup_course()
# Make sure some files exist in the source course to be copied:
source_course = CourseFactory.create()
upload_file_to_course(
course_key=source_course.id,
contentstore=contentstore(),
source_file='./common/test/data/static/picture1.jpg',
target_filename="picture1.jpg",
)
upload_file_to_course(
course_key=source_course.id,
contentstore=contentstore(),
source_file='./common/test/data/static/picture2.jpg',
target_filename="picture2.jpg",
)
source_html = BlockFactory.create(
parent_location=source_course.location,
category="html",
display_name="Some HTML",
data="""
<p>
<a href="/static/picture1.jpg">Picture 1</a>
<a href="/static/picture2.jpg">Picture 2</a>
</p>
""",
)
# Now, to test conflict handling, we also upload a CONFLICTING image to
# the destination course under the same filename.
upload_file_to_course(
course_key=dest_course_key,
contentstore=contentstore(),
# Note this is picture 3, not picture 2, but we save it as picture 2:
source_file='./common/test/data/static/picture3.jpg',
target_filename="picture2.jpg",
)
# Now copy the HTML block from the source cost and paste it into the destination:
copy_response = client.post(CLIPBOARD_ENDPOINT, {"usage_key": str(source_html.location)}, format="json")
assert copy_response.status_code == 200
# Paste the video
dest_parent_key = dest_course_key.make_usage_key("vertical", "vertical_test")
paste_response = client.post(XBLOCK_ENDPOINT, {
"parent_locator": str(dest_parent_key),
"staged_content": "clipboard",
}, format="json")
assert paste_response.status_code == 200
static_file_notices = paste_response.json()["static_file_notices"]
assert static_file_notices == {
"error_files": [],
"new_files": ["picture1.jpg"],
# The new course already had a file named "picture2.jpg" with different md5 hash, so it's a conflict:
"conflicting_files": ["picture2.jpg"],
}
# Check that the files are as we expect:
source_pic1_hash = contentstore().find(source_course.id.make_asset_key("asset", "picture1.jpg")).content_digest
dest_pic1_hash = contentstore().find(dest_course_key.make_asset_key("asset", "picture1.jpg")).content_digest
assert source_pic1_hash == dest_pic1_hash
source_pic2_hash = contentstore().find(source_course.id.make_asset_key("asset", "picture2.jpg")).content_digest
dest_pic2_hash = contentstore().find(dest_course_key.make_asset_key("asset", "picture2.jpg")).content_digest
assert source_pic2_hash != dest_pic2_hash # Because there was a conflict, this file was unchanged.

View File

@@ -13,6 +13,7 @@ import logging
from datetime import datetime
from uuid import uuid4
from attrs import asdict
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import (User) # lint-amnesty, pylint: disable=imported-auth-user
@@ -562,7 +563,7 @@ def _create_block(request):
if request.json.get("staged_content") == "clipboard":
# Paste from the user's clipboard (content_staging app clipboard, not browser clipboard) into 'usage_key':
try:
created_xblock = import_staged_content_from_user_clipboard(
created_xblock, notices = import_staged_content_from_user_clipboard(
parent_key=usage_key, request=request
)
except Exception: # pylint: disable=broad-except
@@ -576,12 +577,11 @@ def _create_block(request):
return JsonResponse(
{"error": _("Your clipboard is empty or invalid.")}, status=400
)
return JsonResponse(
{
"locator": str(created_xblock.location),
"courseKey": str(created_xblock.location.course_key),
}
)
return JsonResponse({
"locator": str(created_xblock.location),
"courseKey": str(created_xblock.location.course_key),
"static_file_notices": asdict(notices),
})
category = request.json["category"]
if isinstance(usage_key, LibraryUsageLocator):

View File

@@ -2728,3 +2728,11 @@ BRAZE_COURSE_ENROLLMENT_CANVAS_ID = ''
DISCUSSIONS_INCONTEXT_FEEDBACK_URL = ''
DISCUSSIONS_INCONTEXT_LEARNMORE_URL = ''
OPEN_EDX_FILTERS_CONFIG = {
"org.openedx.content_authoring.staged_content.static_filter_source.v1": {
"pipeline": [
"openedx.core.djangoapps.content_staging.filters.IgnoreLargeFiles",
]
}
}

View File

@@ -6,10 +6,12 @@ define(['jquery', 'underscore', 'backbone', 'gettext', 'js/views/pages/base_page
'common/js/components/utils/view_utils', 'js/views/container', 'js/views/xblock',
'js/views/components/add_xblock', 'js/views/modals/edit_xblock', 'js/views/modals/move_xblock_modal',
'js/models/xblock_info', 'js/views/xblock_string_field_editor', 'js/views/xblock_access_editor',
'js/views/pages/container_subviews', 'js/views/unit_outline', 'js/views/utils/xblock_utils'],
'js/views/pages/container_subviews', 'js/views/unit_outline', 'js/views/utils/xblock_utils',
'common/js/components/views/feedback_notification', 'common/js/components/views/feedback_prompt',
],
function($, _, Backbone, gettext, BasePage, ViewUtils, ContainerView, XBlockView, AddXBlockComponent,
EditXBlockModal, MoveXBlockModal, XBlockInfo, XBlockStringFieldEditor, XBlockAccessEditor,
ContainerSubviews, UnitOutlineView, XBlockUtils) {
ContainerSubviews, UnitOutlineView, XBlockUtils, NotificationView, PromptView) {
'use strict';
var XBlockContainerPage = BasePage.extend({
@@ -255,10 +257,60 @@ function($, _, Backbone, gettext, BasePage, ViewUtils, ContainerView, XBlockView
staged_content: "clipboard",
}).then((data) => {
this.onNewXBlock(placeholderElement, scrollOffset, false, data);
return data;
}).fail(() => {
// Remove the placeholder if the paste failed
placeholderElement.remove();
});
}).done((data) => {
const {
conflicting_files: conflictingFiles,
error_files: errorFiles,
new_files: newFiles,
} = data.static_file_notices;
const notices = [];
if (errorFiles.length) {
notices.push((next) => new PromptView.Error({
title: gettext("Some errors occurred"),
message: (
gettext("The following required files could not be added to the course:") +
" " + errorFiles.join(", ")
),
actions: {primary: {text: gettext("OK"), click: (x) => { x.hide(); next(); }}},
}));
}
if (conflictingFiles.length) {
notices.push((next) => new PromptView.Warning({
title: gettext("You may need to update a file(s) manually"),
message: (
gettext(
"The following files already exist in this course but don't match the " +
"version used by the component you pasted:"
) + " " + conflictingFiles.join(", ")
),
actions: {primary: {text: gettext("OK"), click: (x) => { x.hide(); next(); }}},
}));
}
if (newFiles.length) {
notices.push(() => new NotificationView.Confirmation({
title: gettext("New files were added to this course's Files & Uploads"),
message: (
gettext("The following required files were imported to this course:") +
" " + newFiles.join(", ")
),
closeIcon: true,
}));
}
if (notices.length) {
// Show the notices, one at a time:
const showNext = () => {
const view = notices.shift()(showNext);
view.show();
}
// Delay to avoid conflict with the "Pasting..." notification.
setTimeout(showNext, 1250);
}
});
},

View File

@@ -4,7 +4,13 @@ Admin views for Staged Content and Clipboard
from django.contrib import admin
from django.urls import reverse
from django.utils.html import format_html
from .models import StagedContent, UserClipboard
from .models import StagedContent, StagedContentFile, UserClipboard
class StagedContentFileInline(admin.TabularInline):
""" Inline admin UI for StagedContentFile """
model = StagedContentFile
readonly_fields = ('filename', 'md5_hash', 'source_key_str', 'data_file')
@admin.register(StagedContent)
@@ -14,6 +20,7 @@ class StagedContentAdmin(admin.ModelAdmin):
list_filter = ('purpose', 'status', 'block_type')
search_fields = ('user__username', 'display_name', 'suggested_url_name')
readonly_fields = ('id', 'user', 'created', 'purpose', 'status', 'block_type', 'olx')
inlines = (StagedContentFileInline, )
@admin.register(UserClipboard)

View File

@@ -4,8 +4,10 @@ Public python API for content staging
from __future__ import annotations
from django.http import HttpRequest
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import AssetKey, UsageKey
from .data import StagedContentData, StagedContentStatus, UserClipboardData
from .data import StagedContentData, StagedContentFileData, StagedContentStatus, UserClipboardData
from .models import UserClipboard as _UserClipboard, StagedContent as _StagedContent
from .serializers import UserClipboardSerializer as _UserClipboardSerializer
@@ -75,3 +77,45 @@ def get_staged_content_olx(staged_content_id: int) -> str | None:
return sc.olx
except _StagedContent.DoesNotExist:
return None
def get_staged_content_static_files(staged_content_id: int) -> list[StagedContentFileData]:
"""
Get the filenames and metadata for any static files used by the given staged content.
Does not check permissions!
"""
sc = _StagedContent.objects.get(pk=staged_content_id)
def str_to_key(source_key_str: str):
if not str:
return None
try:
return AssetKey.from_string(source_key_str)
except InvalidKeyError:
return UsageKey.from_string(source_key_str)
return [
StagedContentFileData(
filename=f.filename,
# For performance, we don't load data unless it's needed, so there's
# a separate API call for that.
data=None,
source_key=str_to_key(f.source_key_str),
md5_hash=f.md5_hash,
)
for f in sc.files.all()
]
def get_staged_content_static_file_data(staged_content_id: int, filename: str) -> bytes | None:
"""
Get the data for the static asset associated with the given staged content.
Does not check permissions!
"""
sc = _StagedContent.objects.get(pk=staged_content_id)
file_data_obj = sc.files.filter(filename=filename).first()
if file_data_obj:
return file_data_obj.data_file.open().read()
return None

View File

@@ -1,12 +1,13 @@
"""
Public python data types for content staging
"""
from __future__ import annotations
from attrs import field, frozen, validators
from datetime import datetime
from django.db.models import TextChoices
from django.utils.translation import gettext_lazy as _
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.keys import UsageKey, AssetKey
class StagedContentStatus(TextChoices):
@@ -43,6 +44,21 @@ class StagedContentData:
display_name: str = field(validator=validators.instance_of(str))
@frozen
class StagedContentFileData:
"""Read-only data model for a single file used by some staged content."""
filename: str = field(validator=validators.instance_of(str))
# Everything below is optional:
data: bytes | None = field(validator=validators.optional(validators.instance_of(bytes)))
# If this asset came from Files & Uploads in a course, this is an AssetKey
# as a string. If this asset came from an XBlock's filesystem, this is the
# UsageKey of the XBlock.
source_key: AssetKey | UsageKey | None = field(
validator=validators.optional(validators.instance_of((AssetKey, UsageKey)))
)
md5_hash: str | None = field(validator=validators.optional(validators.instance_of(str)))
@frozen
class UserClipboardData:
""" Read-only data model for User Clipboard data (copied OLX) """

View File

@@ -0,0 +1,55 @@
"""
Filters that affect the behavior of staged content (and the clipboard)
"""
# pylint: disable=unused-argument
from __future__ import annotations
from attrs import asdict
from openedx_filters import PipelineStep
from openedx_filters.tooling import OpenEdxPublicFilter
from .data import StagedContentFileData
from .models import StagedContent
class StagingStaticAssetFilter(OpenEdxPublicFilter):
"""
A filter used to determine which static assets associate with an XBlock(s)
should be staged in the StagedContent app (e.g. the clipboard).
This API is considered BETA. Once it is stable, this definition should be moved into openedx_filters.
"""
filter_type = "org.openedx.content_authoring.staged_content.static_filter_source.v1"
@classmethod
def run_filter(cls, staged_content: StagedContent, file_datas: list[StagedContentFileData]):
"""
Run this filter, which requires the following arguments:
staged_content (StagedContent): details of the content being staged, as saved to the DB.
file_datas (list[StagedContentFileData]): details of the files being staged
"""
data = super().run_pipeline(staged_content=staged_content, file_datas=file_datas)
return data.get("file_datas")
class IgnoreLargeFiles(PipelineStep):
"""
Don't copy files over 10MB into the clipboard
"""
# pylint: disable=arguments-differ
def run_filter(self, staged_content: StagedContent, file_datas: list[StagedContentFileData]):
"""
Filter the list of file_datas to remove any large files
"""
limit = 10 * 1024 * 1024
def remove_large_data(fd: StagedContentFileData):
""" Remove 'data' from the immutable StagedContentFileData object, if it's above the size limit """
if fd.data and len(fd.data) > limit:
# these data objects are immutable so make a copy with data=None:
return StagedContentFileData(**{**asdict(fd), "data": None})
return fd
return {"file_datas": [remove_large_data(fd) for fd in file_datas]}

View File

@@ -0,0 +1,25 @@
# Generated by Django 3.2.19 on 2023-06-02 00:47
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('content_staging', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='StagedContentFile',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('filename', models.CharField(max_length=255)),
('data_file', models.FileField(blank=True, upload_to='staged-content-temp/')),
('source_key_str', models.CharField(blank=True, max_length=255)),
('md5_hash', models.CharField(blank=True, max_length=32)),
('for_content', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='files', to='content_staging.stagedcontent')),
],
),
]

View File

@@ -67,6 +67,25 @@ class StagedContent(models.Model):
return f'Staged {self.block_type} block "{self.display_name}" ({self.status})'
class StagedContentFile(models.Model):
"""
A data file ("Static Asset") associated with some StagedContent.
These usually come from a course's Files & Uploads page, but can also come
from per-xblock file storage (e.g. video transcripts or images used in
v2 content libraries).
"""
for_content = models.ForeignKey(StagedContent, on_delete=models.CASCADE, related_name="files")
filename = models.CharField(max_length=255, blank=False)
# Everything below is optional:
data_file = models.FileField(upload_to="staged-content-temp/", blank=True)
# If this asset came from Files & Uploads in a course, this is an AssetKey
# as a string. If this asset came from an XBlock's filesystem, this is the
# UsageKey of the XBlock.
source_key_str = models.CharField(max_length=255, blank=True)
md5_hash = models.CharField(max_length=32, blank=True)
class UserClipboard(models.Model):
"""
Each user has a clipboard that can hold one item at a time, where an item

View File

@@ -5,8 +5,9 @@ from textwrap import dedent
from xml.etree import ElementTree
from rest_framework.test import APIClient
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import ToyCourseFactory
from xmodule.contentstore.django import contentstore
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase, upload_file_to_course
from xmodule.modulestore.tests.factories import BlockFactory, CourseFactory, ToyCourseFactory
from openedx.core.djangoapps.content_staging import api as python_api
@@ -134,7 +135,7 @@ class ClipboardTestCase(ModuleStoreTestCase):
"""
course_key, client = self._setup_course()
# Copy the video
# Copy the HTML
html_key = course_key.make_usage_key("html", "toyhtml")
response = client.post(CLIPBOARD_ENDPOINT, {"usage_key": str(html_key)}, format="json")
@@ -199,6 +200,67 @@ class ClipboardTestCase(ModuleStoreTestCase):
# The OLX link from the video will no longer work:
self.assertEqual(client.get(old_olx_url).status_code, 404)
def test_copy_static_assets(self):
"""
Test copying an HTML from the course that references a static asset file.
"""
course_key, client = self._setup_course()
# The toy course references static files that don't actually exist yet, so we have to upload one now:
upload_file_to_course(
course_key=course_key,
contentstore=contentstore(),
source_file='./common/test/data/toy/static/just_a_test.jpg',
target_filename="foo_bar.jpg",
)
# Copy the HTML XBlock to the clipboard:
html_key = course_key.make_usage_key("html", "just_img")
response = client.post(CLIPBOARD_ENDPOINT, {"usage_key": str(html_key)}, format="json")
# Validate the response:
self.assertEqual(response.status_code, 200)
response_data = response.json()
staged_content_id = response_data["content"]["id"]
olx_str = python_api.get_staged_content_olx(staged_content_id)
assert '<img src="/static/foo_bar.jpg" />' in olx_str
static_assets = python_api.get_staged_content_static_files(staged_content_id)
assert static_assets == [python_api.StagedContentFileData(
filename="foo_bar.jpg",
source_key=course_key.make_asset_key("asset", "foo_bar.jpg"),
md5_hash="addd3c218c0c0c41e7e1ae73f5969810",
data=None,
)]
def test_copy_static_assets_nonexistent(self):
"""
Test copying a HTML block which references non-existent static assets.
"""
_other_course_key, client = self._setup_course()
course = CourseFactory.create()
html_block = BlockFactory.create(
parent_location=course.location,
category="html",
display_name="Some HTML",
data="""
<p>
<a href="/static/nonexistent1.jpg">Picture 1</a>
<a href="/static/nonexistent2.jpg">Picture 2</a>
</p>
""",
)
# Copy the HTML
response = client.post(CLIPBOARD_ENDPOINT, {"usage_key": str(html_block.location)}, format="json")
# Validate the response:
self.assertEqual(response.status_code, 200)
response_data = response.json()
staged_content_id = response_data["content"]["id"]
olx_str = python_api.get_staged_content_olx(staged_content_id)
assert '<a href="/static/nonexistent1.jpg">' in olx_str
static_assets = python_api.get_staged_content_static_files(staged_content_id)
assert static_assets == []
def test_no_course_permission(self):
"""
Test that a user without read access cannot copy items in a course

View File

@@ -1,8 +1,11 @@
"""
REST API views for content staging
"""
from __future__ import annotations
import hashlib
import logging
from django.core.files.base import ContentFile
from django.db import transaction
from django.http import HttpResponse
from django.shortcuts import get_object_or_404
@@ -17,13 +20,16 @@ from rest_framework.views import APIView
from common.djangoapps.student.auth import has_studio_read_access
from openedx.core.lib.api.view_utils import view_auth_classes
from openedx.core.lib.xblock_serializer.api import serialize_xblock_to_olx
from openedx.core.lib.xblock_serializer.api import serialize_xblock_to_olx, StaticFile
from xmodule import block_metadata_utils
from xmodule.contentstore.content import StaticContent
from xmodule.contentstore.django import contentstore
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.exceptions import ItemNotFoundError
from .data import CLIPBOARD_PURPOSE, StagedContentStatus
from .models import StagedContent, UserClipboard
from .data import CLIPBOARD_PURPOSE, StagedContentFileData, StagedContentStatus
from .filters import StagingStaticAssetFilter
from .models import StagedContent, StagedContentFile, UserClipboard
from .serializers import UserClipboardSerializer, PostToClipboardSerializer
from .tasks import delete_expired_clipboards
@@ -149,10 +155,76 @@ class ClipboardEndpoint(APIView):
serializer = UserClipboardSerializer(clipboard, context={"request": request})
# Log an event so we can analyze how this feature is used:
log.info(f"Copied {usage_key.block_type} component \"{usage_key}\" to their clipboard.")
# Try to copy the static files. If this fails, we still consider the overall copy attempt to have succeeded,
# because intra-course pasting will still work fine, and in any case users can manually resolve the file issue.
try:
self._save_static_assets_to_clipboard(block_data.static_files, usage_key, staged_content)
except Exception: # pylint: disable=broad-except
# Regardless of what happened, with get_asset_key_from_path or contentstore or run_filter, we don't want the
# whole "copy to clipboard" operation to fail, which would be a bad user experience. For copying and pasting
# within a single course, static assets don't even matter. So any such errors become warnings here.
log.exception(f"Unable to copy static files to clipboard for component {usage_key}")
# Enqueue a (potentially slow) task to delete the old staged content
try:
delete_expired_clipboards.delay(expired_ids)
except Exception as err: # pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
log.exception(f"Unable to enqueue cleanup task for StagedContents: {','.join(str(x) for x in expired_ids)}")
# Return the response:
return Response(serializer.data)
@staticmethod
def _save_static_assets_to_clipboard(
static_files: list[StaticFile], usage_key: UsageKey, staged_content: StagedContent
):
"""
Helper method for "post to clipboard" API endpoint. This deals with copying static files into the clipboard.
"""
files_to_save: list[StagedContentFileData] = []
for f in static_files:
source_key = (
StaticContent.get_asset_key_from_path(usage_key.context_key, f.url)
if (f.url and f.url.startswith('/')) else None
)
# Compute the MD5 hash and get the content:
content: bytes | None = f.data
md5_hash = "" # Unknown
if content:
md5_hash = hashlib.md5(f.data).hexdigest()
# This asset came from the XBlock's filesystem, e.g. a video block's transcript file
source_key = usage_key
# Check if the asset file exists. It can be absent if an XBlock contains an invalid link.
elif source_key and (sc := contentstore().find(source_key, throw_on_not_found=False)):
md5_hash = sc.content_digest
content = sc.data
else:
continue # Skip this file - we don't need a reference to a non-existent file.
# Load the data:
entry = StagedContentFileData(
filename=f.name,
data=content,
source_key=source_key,
md5_hash=md5_hash,
)
files_to_save.append(entry)
# run filters on files_to_save. e.g. remove large files
files_to_save = StagingStaticAssetFilter.run_filter(staged_content=staged_content, file_datas=files_to_save)
for f in files_to_save:
try:
StagedContentFile.objects.create(
for_content=staged_content,
filename=f.filename,
# In some cases (e.g. really large files), we don't store the data here but we still keep track of
# the metadata. You can still use the metadata to determine if the file is already present or not,
# and then either inform the user or find another way to import the file (e.g. if the file still
# exists in the "Files & Uploads" contentstore of the source course, based on source_key_str).
data_file=ContentFile(content=f.data, name=f.filename) if f.data else None,
source_key_str=str(f.source_key) if f.source_key else "",
md5_hash=f.md5_hash or "",
)
except Exception: # pylint: disable=broad-except
log.exception(f"Unable to copy static file {f.filename} to clipboard for component {usage_key}")

View File

@@ -1,24 +1,23 @@
"""
Code for serializing a modulestore XBlock to OLX.
"""
from __future__ import annotations
import logging
import os
from collections import namedtuple
from lxml import etree
from .data import StaticFile
from . import utils
log = logging.getLogger(__name__)
# A static file required by an XBlock
StaticFile = namedtuple('StaticFile', ['name', 'url', 'data'])
class XBlockSerializer:
"""
A class that can serialize an XBlock to OLX.
"""
static_files: list[StaticFile]
def __init__(self, block):
"""
@@ -39,6 +38,11 @@ class XBlockSerializer:
if path not in [sf.name for sf in self.static_files]:
self.static_files.append(StaticFile(name=path, url=asset['url'], data=None))
if block.scope_ids.usage_id.block_type == 'problem':
py_lib_zip_file = utils.get_python_lib_zip_if_using(self.olx_str, course_key)
if py_lib_zip_file:
self.static_files.append(py_lib_zip_file)
def _serialize_block(self, block) -> etree.Element:
""" Serialize an XBlock to OLX/XML. """
if block.scope_ids.usage_id.block_type == 'html':
@@ -100,6 +104,10 @@ class XBlockSerializer:
olx_node.attrib["url_name"] = block.scope_ids.usage_id.block_id
if block.display_name:
olx_node.attrib["display_name"] = block.display_name
if block.fields["editor"].is_set_on(block):
olx_node.attrib["editor"] = block.editor
if block.use_latex_compiler:
olx_node.attrib["use_latex_compiler"] = "true"
olx_node.text = etree.CDATA("\n" + block.data + "\n")
return olx_node

View File

@@ -0,0 +1,12 @@
"""
Simple data structures used for XBlock serialization
"""
from __future__ import annotations
from typing import NamedTuple
class StaticFile(NamedTuple):
""" A static file required by an XBlock """
name: str
url: str | None
data: bytes | None

View File

@@ -4,9 +4,10 @@ Test for the XBlock serialization lib's API
from xml.etree import ElementTree
from openedx.core.djangolib.testing.utils import skip_unless_cms
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import ToyCourseFactory
from xmodule.modulestore.django import contentstore, modulestore
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase, upload_file_to_course
from xmodule.modulestore.tests.factories import BlockFactory, CourseFactory, ToyCourseFactory
from xmodule.util.sandboxing import DEFAULT_PYTHON_LIB_FILENAME
from . import api
@@ -134,6 +135,32 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
# This is the only other difference - an extra field with the blockstore-specific definition ID:
self.assertEqual(serialized_blockstore.def_id, "html/just_img")
def test_html_with_fields(self):
""" Test an HTML Block with non-default fields like editor='raw' """
course = CourseFactory.create(display_name='test course', run="Testing_course")
html_block = BlockFactory.create(
parent_location=course.location,
category="html",
display_name="Non-default HTML Block",
editor="raw",
use_latex_compiler=True,
data="🍔",
)
serialized = api.serialize_xblock_to_olx(html_block)
self.assertXmlEqual(
serialized.olx_str,
"""
<html
url_name="Non-default_HTML_Block"
display_name="Non-default HTML Block"
editor="raw"
use_latex_compiler="true"
><![CDATA[
🍔
]]></html>
"""
)
def test_export_sequential(self):
"""
Export a sequential from the toy course, including all of its children.
@@ -165,3 +192,55 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
<xblock-include definition="video/Video_Resources"/>
</sequential>
""")
def test_capa_python_lib(self):
""" Test capa problem blocks with and without python_lib.zip """
course = CourseFactory.create(display_name='Python Testing course', run="PY")
upload_file_to_course(
course_key=course.id,
contentstore=contentstore(),
source_file='./common/test/data/uploads/python_lib.zip',
target_filename=DEFAULT_PYTHON_LIB_FILENAME,
)
regular_problem = BlockFactory.create(
parent_location=course.location,
category="problem",
display_name="Problem No Python",
max_attempts=3,
data="<problem><optionresponse></optionresponse></problem>",
)
python_problem = BlockFactory.create(
parent_location=course.location,
category="problem",
display_name="Python Problem",
data='<problem>This uses python: <script type="text/python">...</script>...</problem>',
)
# The regular problem doesn't use python so shouldn't contain python_lib.zip:
serialized = api.serialize_xblock_to_olx(regular_problem)
assert not serialized.static_files
self.assertXmlEqual(
serialized.olx_str,
"""
<problem display_name="Problem No Python" url_name="Problem_No_Python" max_attempts="3">
<optionresponse></optionresponse>
</problem>
"""
)
# The python problem should contain python_lib.zip:
serialized = api.serialize_xblock_to_olx(python_problem)
assert len(serialized.static_files) == 1
assert serialized.static_files[0].name == "python_lib.zip"
self.assertXmlEqual(
serialized.olx_str,
"""
<problem display_name="Python Problem" url_name="Python_Problem">
This uses python: <script type="text/python">...</script>...
</problem>
"""
)

View File

@@ -3,11 +3,13 @@ Test the OLX serialization utils
"""
import unittest
import ddt
from opaque_keys.edx.keys import CourseKey
from . import utils
@ddt.ddt
class TestUtils(unittest.TestCase):
"""
Test the OLX serialization utils
@@ -47,3 +49,17 @@ class TestUtils(unittest.TestCase):
"""
olx_out = utils.rewrite_absolute_static_urls(olx_in, course_id)
assert olx_out == olx_expected
@ddt.unpack
@ddt.data(
('''<problem>\n<script>ambiguous script\n</script></problem>''', False),
('''<problem>\n<script type="text/python">\npython\nscript\n</script></problem>''', True),
('''<problem>\n<script type='text/python'>\npython\nscript\n</script></problem>''', True),
('''<problem>\n<script type="loncapa/python">\npython\nscript\n</script></problem>''', True),
('''<problem>\n<script type='loncapa/python'>\npython\nscript\n</script></problem>''', True),
)
def test_has_python_script(self, olx: str, has_script: bool):
"""
Test the _has_python_script() helper
"""
assert utils._has_python_script(olx) == has_script # pylint: disable=protected-access

View File

@@ -1,21 +1,26 @@
"""
Helper functions for XBlock serialization
"""
from __future__ import annotations
import logging
import re
from contextlib import contextmanager
from django.conf import settings
from fs.memoryfs import MemoryFS
from fs.wrapfs import WrapFS
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import AssetKey, CourseKey
from common.djangoapps.static_replace import replace_static_urls
from xmodule.assetstore.assetmgr import AssetManager
from xmodule.contentstore.content import StaticContent
from xmodule.exceptions import NotFoundError
from xmodule.modulestore.exceptions import ItemNotFoundError
from xmodule.util.sandboxing import DEFAULT_PYTHON_LIB_FILENAME
from xmodule.xml_block import XmlMixin
from common.djangoapps.static_replace import replace_static_urls
from .data import StaticFile
log = logging.getLogger(__name__)
@@ -89,6 +94,42 @@ def collect_assets_from_text(text, course_id, include_content=False):
yield info
def get_python_lib_zip_if_using(olx: str, course_id: CourseKey) -> StaticFile | None:
"""
When python_lib is in use, capa problems that contain python code should be assumed to depend on it.
Note: for any given problem that uses python, there is no way to tell if it
actually uses any imports from python_lib.zip because the imports could be
named anything. So we just have to assume that any python problems may be
using python_lib.zip
"""
if _has_python_script(olx):
python_lib_filename = getattr(settings, 'PYTHON_LIB_FILENAME', DEFAULT_PYTHON_LIB_FILENAME)
asset_key = StaticContent.get_asset_key_from_path(course_id, python_lib_filename)
# Now, it seems like this capa problem uses python_lib.zip - but does it exist in the course?
if AssetManager.find(asset_key, throw_on_not_found=False):
url = '/' + str(StaticContent.compute_location(course_id, python_lib_filename))
return StaticFile(name=python_lib_filename, url=url, data=None)
return None
def _has_python_script(olx: str) -> bool:
"""
Check if the given OLX <problem> block string seems to contain any python
code. (If it does, we know that it may be using python_lib.zip.)
"""
match_strings = (
'<script type="text/python">',
"<script type='text/python'>",
'<script type="loncapa/python">',
"<script type='loncapa/python'>",
)
for check in match_strings:
if check in olx:
return True
return False
@contextmanager
def override_export_fs(block):
"""