feat: add library restore endpoint (#37439)

Adds a library restore endpoint to restore a learning package from a
backup zip archive (/api/libraries/v2/restore/). The learning package
can then be used to create a content library.
This commit is contained in:
Taylor Payne
2025-10-22 06:30:18 -06:00
committed by GitHub
parent 5d01a40936
commit bf8ffe4cf7
12 changed files with 725 additions and 18 deletions

View File

@@ -61,7 +61,7 @@ from openedx_events.content_authoring.signals import (
CONTENT_LIBRARY_UPDATED
)
from openedx_learning.api import authoring as authoring_api
from openedx_learning.api.authoring_models import Component
from openedx_learning.api.authoring_models import Component, LearningPackage
from organizations.models import Organization
from user_tasks.models import UserTaskArtifact, UserTaskStatus
from xblock.core import XBlock
@@ -384,6 +384,7 @@ def create_library(
allow_public_learning: bool = False,
allow_public_read: bool = False,
library_license: str = ALL_RIGHTS_RESERVED,
learning_package: LearningPackage | None = None,
) -> ContentLibraryMetadata:
"""
Create a new content library.
@@ -400,6 +401,8 @@ def create_library(
allow_public_read: Allow anyone to view blocks (including source) in Studio?
learning_package: A learning package to associate with this library.
Returns a ContentLibraryMetadata instance.
"""
assert isinstance(org, Organization)
@@ -413,14 +416,25 @@ def create_library(
allow_public_read=allow_public_read,
license=library_license,
)
learning_package = authoring_api.create_learning_package(
key=str(ref.library_key),
title=title,
description=description,
)
if learning_package:
# A temporary LearningPackage was passed in, so update its key to match the library,
# and also update its title/description in case they differ.
authoring_api.update_learning_package(
learning_package.id,
key=str(ref.library_key),
title=title,
description=description,
)
else:
# We have to generate a new LearningPackage for this library.
learning_package = authoring_api.create_learning_package(
key=str(ref.library_key),
title=title,
description=description,
)
ref.learning_package = learning_package
ref.save()
except IntegrityError:
raise LibraryAlreadyExists(slug) # lint-amnesty, pylint: disable=raise-missing-from
@@ -431,6 +445,7 @@ def create_library(
library_key=ref.library_key
)
)
return ContentLibraryMetadata(
key=ref.library_key,
title=title,

View File

@@ -79,6 +79,8 @@ from django.views.decorators.clickjacking import xframe_options_exempt
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import TemplateResponseMixin, View
from drf_yasg.utils import swagger_auto_schema
from user_tasks.models import UserTaskStatus
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
from organizations.api import ensure_organization
from organizations.exceptions import InvalidOrganizationException
@@ -97,6 +99,9 @@ from cms.djangoapps.contentstore.views.course import (
get_allowed_organizations_for_libraries,
user_can_create_organizations
)
from cms.djangoapps.contentstore.storage import course_import_export_storage
from openedx.core.djangoapps.content_libraries.tasks import restore_library
from openedx.core.djangoapps.content_libraries import api, permissions
from openedx.core.djangoapps.content_libraries.api.libraries import get_backup_task_status
from openedx.core.djangoapps.content_libraries.rest_api.serializers import (
@@ -110,6 +115,9 @@ from openedx.core.djangoapps.content_libraries.rest_api.serializers import (
ContentLibraryUpdateSerializer,
LibraryBackupResponseSerializer,
LibraryBackupTaskStatusSerializer,
LibraryRestoreFileSerializer,
LibraryRestoreTaskRequestSerializer,
LibraryRestoreTaskResultSerializer,
LibraryXBlockCreationSerializer,
LibraryXBlockMetadataSerializer,
LibraryXBlockTypeSerializer,
@@ -790,6 +798,82 @@ class LibraryBackupView(APIView):
return Response(LibraryBackupTaskStatusSerializer(result, context={'request': request}).data)
@method_decorator(non_atomic_requests, name="dispatch")
@view_auth_classes()
class LibraryRestoreView(APIView):
"""
Restore a library from a backup file.
After the file is uploaded, a background task will be started to process the
file and restore the library contents. You can use the returned `task_id` to
check the status of the restore task.
The result of the restore task will be a "staged" learning package that can
then be saved into a content library.
**POST Parameters**
A POST request must include the following parameters.
* file: (required) The backup file to restore the library from. Must be a
.zip file.
**GET Parameters**
A GET request must include the following parameters.
* task_id: (required) The UUID of a restore task.
"""
@apidocs.schema(
body=LibraryRestoreFileSerializer,
responses={200: LibraryRestoreFileSerializer}
)
def post(self, request):
"""
Restore a library from a backup file.
"""
if not request.user.has_perm(permissions.CAN_CREATE_CONTENT_LIBRARY):
raise PermissionDenied
serializer = LibraryRestoreFileSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
upload = serializer.validated_data['file']
storage_path = course_import_export_storage.save(f'library_restore/{upload.name}', upload)
log.info("Learning package archive upload %s: Upload complete", upload.name)
async_result = restore_library.delay(request.user.id, storage_path)
return Response(LibraryRestoreFileSerializer({'task_id': async_result.task_id}).data)
@apidocs.schema(
parameters=[
apidocs.query_parameter(
'task_id',
str,
description="The ID of the restore library task to retrieve."
),
],
responses={200: LibraryRestoreTaskResultSerializer}
)
def get(self, request):
"""
Check the status of a library restore task.
"""
# validate input
serializer = LibraryRestoreTaskRequestSerializer(data=request.query_params)
serializer.is_valid(raise_exception=True)
task_id = serializer.validated_data.get('task_id')
# get task status and related artifact
task_status = get_object_or_404(UserTaskStatus, task_id=task_id, user=request.user)
# serialize and return result
result_serializer = LibraryRestoreTaskResultSerializer.from_task_status(task_status, request)
return Response(result_serializer.data)
# LTI 1.3 Views
# =============

View File

@@ -2,13 +2,18 @@
Serializers for the content libraries REST API
"""
# pylint: disable=abstract-method
import json
import logging
from django.core.validators import validate_unicode_slug
from opaque_keys import InvalidKeyError, OpaqueKey
from opaque_keys.edx.locator import LibraryContainerLocator, LibraryUsageLocatorV2
from openedx_learning.api.authoring_models import Collection
from openedx_learning.api.authoring_models import Collection, LearningPackage
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from user_tasks.models import UserTaskStatus
from openedx.core.djangoapps.content_libraries.tasks import LibraryRestoreTask
from openedx.core.djangoapps.content_libraries.api.containers import ContainerType
from openedx.core.djangoapps.content_libraries.constants import ALL_RIGHTS_RESERVED, LICENSE_OPTIONS
from openedx.core.djangoapps.content_libraries.models import (
@@ -22,6 +27,8 @@ from .. import permissions
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
log = logging.getLogger(__name__)
class ContentLibraryMetadataSerializer(serializers.Serializer):
"""
@@ -37,6 +44,7 @@ class ContentLibraryMetadataSerializer(serializers.Serializer):
slug = serializers.CharField(source="key.slug", validators=(validate_unicode_slug, ))
title = serializers.CharField()
description = serializers.CharField(allow_blank=True)
learning_package = serializers.PrimaryKeyRelatedField(queryset=LearningPackage.objects.all(), required=False)
num_blocks = serializers.IntegerField(read_only=True)
last_published = serializers.DateTimeField(format=DATETIME_FORMAT, read_only=True)
published_by = serializers.CharField(read_only=True)
@@ -426,3 +434,111 @@ class LibraryBackupTaskStatusSerializer(serializers.Serializer):
"""
state = serializers.CharField()
url = serializers.FileField(source='file', allow_null=True, use_url=True)
class LibraryRestoreFileSerializer(serializers.Serializer):
"""
Serializer for restoring a library from a backup file.
"""
# input only fields
file = serializers.FileField(write_only=True, help_text="A ZIP file containing a library backup.")
# output only fields
task_id = serializers.UUIDField(read_only=True)
def validate_file(self, value):
"""
Validate that the uploaded file is a ZIP file.
"""
if value.content_type != 'application/zip':
raise serializers.ValidationError("Only ZIP files are allowed.")
return value
class LibraryRestoreTaskRequestSerializer(serializers.Serializer):
"""
Serializer for requesting the status of a library restore task.
"""
task_id = serializers.UUIDField(write_only=True, help_text="The ID of the restore task to check.")
class RestoreSuccessDataSerializer(serializers.Serializer):
"""
Serializer for the data returned upon successful restoration of a library.
"""
learning_package_id = serializers.IntegerField(source="lp_restored_data.id")
title = serializers.CharField(source="lp_restored_data.title")
org = serializers.CharField(source="lp_restored_data.archive_org_key")
slug = serializers.CharField(source="lp_restored_data.archive_slug")
# The `key` is a unique temporary key assigned to the learning package during the restore process,
# whereas the `archive_key` is the original key of the learning package from the backup.
# The temporary learning package key is replaced with a standard key once it is added to a content library.
key = serializers.CharField(source="lp_restored_data.key")
archive_key = serializers.CharField(source="lp_restored_data.archive_lp_key")
containers = serializers.IntegerField(source="lp_restored_data.num_containers")
components = serializers.IntegerField(source="lp_restored_data.num_components")
collections = serializers.IntegerField(source="lp_restored_data.num_collections")
sections = serializers.IntegerField(source="lp_restored_data.num_sections")
subsections = serializers.IntegerField(source="lp_restored_data.num_subsections")
units = serializers.IntegerField(source="lp_restored_data.num_units")
created_on_server = serializers.CharField(source="backup_metadata.original_server", required=False)
created_at = serializers.DateTimeField(source="backup_metadata.created_at", format=DATETIME_FORMAT)
created_by = serializers.SerializerMethodField()
def get_created_by(self, obj):
"""
Get the user information of the archive creator, if available.
The information is stored in the backup metadata of the archive and references
a user that may not exist in the system where the restore is being performed.
"""
username = obj["backup_metadata"].get("created_by")
email = obj["backup_metadata"].get("created_by_email")
return {"username": username, "email": email}
class LibraryRestoreTaskResultSerializer(serializers.Serializer):
"""
Serializer for the result of a library restore task.
"""
state = serializers.CharField()
result = RestoreSuccessDataSerializer(required=False, allow_null=True, default=None)
error = serializers.CharField(required=False, allow_blank=True, default=None)
error_log = serializers.FileField(source='error_log_url', allow_null=True, use_url=True, default=None)
@classmethod
def from_task_status(cls, task_status, request):
"""Build serializer input from task status object."""
# If the task did not complete, just return the state.
if task_status.state not in {UserTaskStatus.SUCCEEDED, UserTaskStatus.FAILED}:
return cls({
"state": task_status.state,
})
artifact_name = LibraryRestoreTask.ARTIFACT_NAMES.get(task_status.state, '')
artifact = task_status.artifacts.filter(name=artifact_name).first()
# If the task failed, include the log artifact if it exists
if task_status.state == UserTaskStatus.FAILED:
return cls({
"state": UserTaskStatus.FAILED,
"error": "Library restore failed. See error log for details.",
"error_log_url": artifact.file if artifact else None,
}, context={'request': request})
if task_status.state == UserTaskStatus.SUCCEEDED:
input_data = {
"state": UserTaskStatus.SUCCEEDED,
}
try:
result = json.loads(artifact.text) if artifact else {}
input_data["result"] = result
except json.JSONDecodeError:
log.error("Failed to decode JSON from artifact (%s): %s", artifact.id, artifact.text)
input_data["error"] = f'Could not decode artifact JSON. Artifact Text: {artifact.text}'
return cls(input_data)

View File

@@ -16,11 +16,17 @@ Architecture note:
"""
from __future__ import annotations
from io import StringIO
import logging
import os
from datetime import datetime
from tempfile import mkdtemp
from tempfile import mkdtemp, NamedTemporaryFile
import json
import shutil
from django.core.files.base import ContentFile
from django.contrib.auth import get_user_model
from django.core.serializers.json import DjangoJSONEncoder
from celery import shared_task
from celery.utils.log import get_task_logger
from celery_utils.logged_task import LoggedTask
@@ -66,12 +72,18 @@ from xmodule.modulestore.django import modulestore
from xmodule.modulestore.exceptions import ItemNotFoundError
from xmodule.modulestore.mixed import MixedModuleStore
from cms.djangoapps.contentstore.storage import course_import_export_storage
from . import api
from .models import ContentLibraryBlockImportTask
log = logging.getLogger(__name__)
TASK_LOGGER = get_task_logger(__name__)
User = get_user_model()
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # Should match serializer format. Redefined to avoid circular import.
@shared_task(base=LoggedTask)
@set_code_owner_attribute
@@ -547,3 +559,121 @@ def backup_library(self, user_id: int, library_key_str: str) -> None:
TASK_LOGGER.exception('Error exporting library %s', library_key, exc_info=True)
if self.status.state != UserTaskStatus.FAILED:
self.status.fail({'raw_error_msg': str(exception)})
class LibraryRestoreLoadError(Exception):
def __init__(self, message, logfile=None):
super().__init__(message)
self.logfile = logfile
class LibraryRestoreTask(UserTask):
"""
Base class for library restore tasks.
"""
ARTIFACT_NAMES = {
UserTaskStatus.FAILED: 'Error log',
UserTaskStatus.SUCCEEDED: 'Library Restore',
}
ERROR_LOG_ARTIFACT_NAME = 'Error log'
@classmethod
def generate_name(cls, arguments_dict):
storage_path = arguments_dict['storage_path']
return f'learning package restore of {storage_path}'
def fail_with_error_log(self, logfile) -> None:
"""
Helper method to create an error log artifact and fail the task.
Args:
logfile (io.StringIO): The error log content
"""
# Prepare the error log to be saved as a file
error_log_file = ContentFile(logfile.getvalue().encode("utf-8"))
# Save the error log as an artifact
artifact = UserTaskArtifact(status=self.status, name=self.ERROR_LOG_ARTIFACT_NAME)
artifact.file.save(name=f'{self.status.task_id}-error.log', content=error_log_file)
artifact.save()
self.status.fail(json.dumps({'error': 'Error(s) restoring learning package'}))
def load_learning_package(self, storage_path, user):
"""
Load learning package from a backup file in storage.
Args:
storage_path (str): The path to the backup file in storage
Returns:
dict: The result of loading the learning package, including status and info
Raises:
LibraryRestoreLoadError: If there is an error loading the learning package
"""
# First ensure the backup file exists
if not course_import_export_storage.exists(storage_path):
raise LibraryRestoreLoadError(f'Uploaded file {storage_path} not found')
# Temporarily copy the file locally, and then load the learning package from it
with NamedTemporaryFile(suffix=".zip") as tmp_file:
with course_import_export_storage.open(storage_path, "rb") as storage_file:
shutil.copyfileobj(storage_file, tmp_file)
tmp_file.flush()
TASK_LOGGER.info('Restoring learning package from temporary file %s', tmp_file.name)
result = authoring_api.load_learning_package(tmp_file.name, user=user)
# If there was an error during the load, fail the task with the error log
if result.get("status") == "error":
raise LibraryRestoreLoadError(
"Error(s) loading learning package",
logfile=result.get("log_file_error")
)
return result
@shared_task(base=LibraryRestoreTask, bind=True)
def restore_library(self, user_id, storage_path):
"""
Restore a learning package from a backup file.
"""
ensure_cms("restore_library may only be executed in a CMS context")
set_code_owner_attribute_from_module(__name__)
TASK_LOGGER.info('Starting restore of learning package from %s', storage_path)
try:
# Load the learning package from the backup file
user = User.objects.get(id=user_id)
result = self.load_learning_package(storage_path, user=user)
learning_package_data = result.get("lp_restored_data", {})
TASK_LOGGER.info(
'Restored learning package (id: %s) with key %s',
learning_package_data.get('id'),
learning_package_data.get('key')
)
# Save the restore details as an artifact in JSON format
restore_data = json.dumps(result, cls=DjangoJSONEncoder)
UserTaskArtifact.objects.create(
status=self.status,
name=self.ARTIFACT_NAMES[UserTaskStatus.SUCCEEDED],
text=restore_data
)
TASK_LOGGER.info('Finished restore of learning package from %s', storage_path)
except Exception as exc: # pylint: disable=broad-except
TASK_LOGGER.exception('Error restoring learning package from %s', storage_path)
logfile = getattr(exc, 'logfile', StringIO("Unexpected error during library restore: " + str(exc)))
self.fail_with_error_log(logfile)
finally:
# Make sure to clean up the uploaded file from storage
course_import_export_storage.delete(storage_path)
TASK_LOGGER.info('Deleted uploaded file %s after restore', storage_path)

View File

@@ -34,6 +34,8 @@ URL_LIB_TEAM_GROUP = URL_LIB_TEAM + 'group/{group_name}/' # Add/edit/remove a g
URL_LIB_PASTE_CLIPBOARD = URL_LIB_DETAIL + 'paste_clipboard/' # Paste user clipboard (POST) containing Xblock data
URL_LIB_BACKUP = URL_LIB_DETAIL + 'backup/' # Start a backup task for this library
URL_LIB_BACKUP_GET = URL_LIB_BACKUP + '?{query_params}' # Get status on a backup task for this library
URL_LIB_RESTORE = URL_PREFIX + 'restore/' # Restore a library from a learning package backup file
URL_LIB_RESTORE_GET = URL_LIB_RESTORE + '?{query_params}' # Get status/result of a library restore task
URL_LIB_BLOCK = URL_PREFIX + 'blocks/{block_key}/' # Get data about a block, or delete it
URL_LIB_BLOCK_PUBLISH = URL_LIB_BLOCK + 'publish/' # Publish changes from a specified XBlock
URL_LIB_BLOCK_OLX = URL_LIB_BLOCK + 'olx/' # Get or set the OLX of the specified XBlock
@@ -139,18 +141,21 @@ class ContentLibrariesRestApiTest(APITransactionTestCase):
def _create_library(
self, slug, title, description="", org=None,
license_type=ALL_RIGHTS_RESERVED, expect_response=200,
license_type=ALL_RIGHTS_RESERVED, expect_response=200, learning_package=None
):
""" Create a library """
if org is None:
org = self.organization.short_name
return self._api('post', URL_LIB_CREATE, {
data = {
"org": org,
"slug": slug,
"title": title,
"description": description,
"license": license_type,
}, expect_response)
}
if learning_package is not None:
data["learning_package"] = learning_package
return self._api('post', URL_LIB_CREATE, data, expect_response)
def _list_libraries(self, query_params_dict=None, expect_response=200):
""" List libraries """
@@ -332,6 +337,21 @@ class ContentLibrariesRestApiTest(APITransactionTestCase):
url = URL_LIB_BACKUP_GET.format(lib_key=lib_key, query_params=query_params)
return self._api('get', url, None, expect_response)
def _start_library_restore_task(self, file, expect_response=200):
""" Start a library restore task from a backup file """
url = URL_LIB_RESTORE
data = {"file": file}
response = self.client.post(url, data, format='multipart')
assert response.status_code == expect_response, \
f'Unexpected response code {response.status_code}:\n{getattr(response, "data", "(no data)")}'
return response.data
def _get_library_restore_task(self, task_id, expect_response=200):
""" Get the status/result of a library restore task """
query_params = urlencode({"task_id": task_id})
url = URL_LIB_RESTORE_GET.format(query_params=query_params)
return self._api('get', url, None, expect_response)
def _render_block_view(self, block_key, view_name, version=None, expect_response=200):
"""
Render an XBlock's view in the active application's runtime.

View File

@@ -2,10 +2,17 @@
Tests for Learning-Core-based Content Libraries
"""
from datetime import datetime, timezone
import os
import zipfile
import uuid
import tempfile
from io import StringIO
from unittest import skip
from unittest.mock import patch
from unittest.mock import ANY, patch
import ddt
import tomlkit
from django.core.files.uploadedfile import SimpleUploadedFile
from django.contrib.auth.models import Group
from django.test import override_settings
from django.test.client import Client
@@ -13,9 +20,12 @@ from freezegun import freeze_time
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
from organizations.models import Organization
from rest_framework.test import APITestCase
from openedx_learning.api.authoring_models import LearningPackage
from user_tasks.models import UserTaskStatus, UserTaskArtifact
from common.djangoapps.student.tests.factories import UserFactory
from openedx.core.djangoapps.content_libraries.constants import CC_4_BY
from openedx.core.djangoapps.content_libraries.tasks import LibraryRestoreTask
from openedx.core.djangoapps.content_libraries.tests.base import (
URL_BLOCK_GET_HANDLER_URL,
URL_BLOCK_METADATA_URL,
@@ -26,6 +36,8 @@ from openedx.core.djangoapps.content_libraries.tests.base import (
from openedx.core.djangoapps.xblock import api as xblock_api
from openedx.core.djangolib.testing.utils import skip_unless_cms
from ..models import ContentLibrary
@skip_unless_cms
@ddt.ddt
@@ -876,6 +888,335 @@ class ContentLibrariesTestCase(ContentLibrariesRestApiTest):
assert [dict(item) for item in block_types] == expected
class LibraryRestoreViewTestCase(ContentLibrariesRestApiTest):
"""
Tests for LibraryRestoreView endpoints.
"""
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.package_author_data = {
"username": "test_author",
"email": "author@example.com",
"first_name": "Test",
"last_name": "Author",
}
cls.org_short_name = "CL-TEST"
cls.library_slug = "LIB_C001"
cls.learning_package_key = f"lib:{cls.org_short_name}:{cls.library_slug}"
cls.learning_package_data = {
"key": cls.learning_package_key,
"title": "Demo Learning Package",
"description": "A demo learning package for testing.",
"created": "2025-10-05T18:23:45.180535Z",
"updated": "2025-10-05T18:23:45.180535Z",
}
cls.learning_package_metadata = {
"format_version": 1,
"created_at": "2025-10-05T18:23:45.180535Z",
"created_by": cls.package_author_data["username"],
"created_by_email": cls.package_author_data["email"],
"origin_server": "cms.test",
}
toml_data = {
"learning_package": cls.learning_package_data,
"meta": cls.learning_package_metadata,
}
toml_content = tomlkit.dumps(toml_data)
cls.tmp_file = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
zip_path = cls.tmp_file.name
with zipfile.ZipFile(zip_path, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("package.toml", toml_content)
@classmethod
def tearDownClass(cls):
cls.tmp_file.close()
os.remove(cls.tmp_file.name)
super().tearDownClass()
def setUp(self):
super().setUp()
# The parent class provides a staff self.user ("Bob") and self.organization ("CL-TEST")
# Create additional users
self.admin_user = UserFactory.create(username="Admin", email="admin@example.com", is_staff=True)
self.non_admin_user = UserFactory.create(username="NonAdmin", email="non_admin@example.com")
self.learning_package_author = UserFactory.create(**self.package_author_data)
# Prepare the ZIP file for upload
with open(self.tmp_file.name, "rb") as f:
self.uploaded_zip_file = SimpleUploadedFile("test.zip", f.read(), content_type="application/zip")
def _create_user_task_status(
self,
user=None,
task_id='',
state=UserTaskStatus.SUCCEEDED,
total_steps=5,
task_class='test_rest_api.sample_task',
name='SampleTask',
):
"""
Helper method to create a UserTaskStatus instance.
"""
user = user or self.user
return UserTaskStatus.objects.create(
user=user,
task_id=task_id or str(uuid.uuid4()),
state=state,
total_steps=total_steps,
task_class=task_class,
name=name,
)
def test_restore_library_success(self):
"""
Test successful task creation for library restore by admin user.
"""
## POST the zip file to start restore task
with self.as_user(self.admin_user):
response_data = self._start_library_restore_task(self.uploaded_zip_file)
self.assertIn('task_id', response_data)
self.assertIsNotNone(response_data['task_id'])
## GET the task status and result (task is run synchronously in tests)
with self.as_user(self.admin_user):
response_data = self._get_library_restore_task(response_data['task_id'])
self.assertIn('state', response_data)
self.assertEqual(response_data['state'], 'Succeeded')
self.assertIn('result', response_data)
task_result = response_data.get('result', {})
# Validate the learning package data in the result
expected = {
"learning_package_id": ANY,
"key": ANY,
"title": self.learning_package_data["title"],
"org": self.org_short_name,
"slug": self.library_slug,
"archive_key": self.learning_package_key,
"collections": 0,
"components": 0,
"containers": 0,
"sections": 0,
"subsections": 0,
"units": 0,
"created_on_server": self.learning_package_metadata["origin_server"],
"created_at": ANY,
"created_by": {
"username": self.learning_package_author.username,
"email": self.learning_package_author.email,
},
}
self.assertIn('learning_package_id', task_result)
self.assertTrue(LearningPackage.objects.filter(pk=task_result['learning_package_id']).exists())
for key, value in expected.items():
self.assertEqual(task_result[key], value)
def test_create_content_library_from_restore(self):
"""
Test that a content library is created as part of the library restore process.
"""
with self.as_user(self.admin_user):
response_data = self._start_library_restore_task(self.uploaded_zip_file)
self.assertIn('task_id', response_data)
self.assertIsNotNone(response_data['task_id'])
with self.as_user(self.admin_user):
response_data = self._get_library_restore_task(response_data['task_id'])
self.assertIn('state', response_data)
self.assertEqual(response_data['state'], 'Succeeded')
task_result = response_data.get('result', {})
self.assertIn('learning_package_id', task_result)
learning_package_id = task_result['learning_package_id']
self.assertTrue(LearningPackage.objects.filter(pk=learning_package_id).exists())
library_title = "Restored Library"
library_description = "A library restored from a learning package"
with self.as_user(self.admin_user):
create_response_data = self._create_library(
org=self.org_short_name,
slug=self.library_slug,
title=library_title,
description=library_description,
learning_package=learning_package_id,
)
self.assertIn('id', create_response_data)
library_locator = LibraryLocatorV2.from_string(create_response_data['id'])
content_library = ContentLibrary.objects.get_by_key(library_locator)
self.assertIsNotNone(content_library)
self.assertEqual(content_library.learning_package.id, learning_package_id)
self.assertEqual(content_library.learning_package.title, library_title)
self.assertEqual(content_library.learning_package.description, library_description)
self.assertIn(self.org_short_name, content_library.library_key.org)
self.assertIn(self.library_slug, content_library.library_key.slug)
def test_restore_library_unauthorized(self):
"""
Test that non-admin users cannot start a library restore task.
"""
with self.as_user(self.non_admin_user):
self._start_library_restore_task(self.uploaded_zip_file, expect_response=403)
def test_restore_library_invalid_file(self):
"""
Test that uploading a non-ZIP file returns a 400 error.
"""
non_zip_file = SimpleUploadedFile(
"test.txt",
b'This is not a ZIP file',
content_type='text/plain'
)
with self.as_user(self.admin_user):
self._start_library_restore_task(non_zip_file, expect_response=400)
def test_get_restore_task_unfinished(self):
"""
Test that attempting to get the status of an unfinished task returns an appropriate response.
"""
# Create a UserTaskStatus in PENDING state
pending_task_status = self._create_user_task_status(state=UserTaskStatus.PENDING)
with patch(
'openedx.core.djangoapps.content_libraries.rest_api.libraries.get_object_or_404',
return_value=pending_task_status
):
response_data = self._get_library_restore_task(pending_task_status.task_id)
expected = {
"state": UserTaskStatus.PENDING,
"result": None,
"error": None,
"error_log": None,
}
self.assertEqual(response_data, expected)
in_progress_task_status = self._create_user_task_status(state=UserTaskStatus.IN_PROGRESS)
with patch(
'openedx.core.djangoapps.content_libraries.rest_api.libraries.get_object_or_404',
return_value=in_progress_task_status
):
response_data = self._get_library_restore_task(in_progress_task_status.task_id)
expected["state"] = UserTaskStatus.IN_PROGRESS
self.assertEqual(response_data, expected)
def test_task_user_mismatch(self):
"""
A user should not be able to access another user's library restore task.
"""
with self.as_user(self.admin_user):
post_response = self._start_library_restore_task(self.uploaded_zip_file)
other_user = UserFactory.create(username="OtherUser", email="other@example.com", is_staff=True)
with self.as_user(other_user):
self._get_library_restore_task(post_response['task_id'], expect_response=404)
def test_task_artifact_text_not_json(self):
"""
Test that a task artifact that is not JSON returns an appropriate response.
"""
task_status = self._create_user_task_status(state=UserTaskStatus.SUCCEEDED)
# Manually create a UserTaskArtifact with non-JSON text content
artifact_text = 'Some unexpected text content that is not JSON.'
UserTaskArtifact.objects.create(
status=task_status,
text=artifact_text,
name=LibraryRestoreTask.ARTIFACT_NAMES[task_status.state],
)
with patch(
'openedx.core.djangoapps.content_libraries.rest_api.libraries.get_object_or_404',
return_value=task_status
):
response_data = self._get_library_restore_task(task_status.task_id)
expected = {
"state": UserTaskStatus.SUCCEEDED,
"result": None,
"error": ANY,
"error_log": None,
}
self.assertEqual(response_data, expected)
def test_failed_task_with_error_log(self):
"""
If a task fails with an error log, include the url to the log
"""
error_result = {
'status': 'error',
'log_file_error': StringIO("Library restore failed: An unexpected error occurred during processing."),
'lp_restore_data': None,
'backup_metadata': None,
}
with self.as_user(self.admin_user):
with patch(
"openedx.core.djangoapps.content_libraries.tasks.authoring_api.load_learning_package",
return_value=error_result
):
response = self._start_library_restore_task(self.uploaded_zip_file)
with self.as_user(self.admin_user):
task_data = self._get_library_restore_task(response['task_id'])
expected = {
'state': 'Failed',
'error': ANY,
'error_log': ANY,
'result': None,
}
self.assertEqual(task_data, expected)
def test_uncaught_error_creates_error_log(self):
"""
If an uncaught error occurs during task execution, an error log should be created
"""
with self.as_user(self.admin_user):
with patch(
"openedx.core.djangoapps.content_libraries.tasks.authoring_api.load_learning_package",
side_effect=Exception("Uncaught exception during processing.")
):
response = self._start_library_restore_task(self.uploaded_zip_file)
with self.as_user(self.admin_user):
task_data = self._get_library_restore_task(response['task_id'])
expected = {
'state': 'Failed',
'error': ANY,
'error_log': ANY,
'result': None,
}
self.assertEqual(task_data, expected)
@ddt.ddt
class ContentLibraryXBlockValidationTest(APITestCase):
"""Tests only focused on service validation, no Learning Core interactions here."""

View File

@@ -33,6 +33,7 @@ urlpatterns = [
path('api/libraries/v2/', include([
# list of libraries / create a library:
path('', libraries.LibraryRootView.as_view()),
path('restore/', libraries.LibraryRestoreView.as_view()),
path('<str:lib_key_str>/', include([
# get data about a library, update a library, or delete a library:
path('', libraries.LibraryDetailsView.as_view()),

View File

@@ -61,7 +61,7 @@ numpy<2.0.0
# Date: 2023-09-18
# pinning this version to avoid updates while the library is being developed
# Issue for unpinning: https://github.com/openedx/edx-platform/issues/35269
openedx-learning==0.27.1
openedx-learning==0.29.0
# Date: 2023-11-29
# Open AI version 1.0.0 dropped support for openai.ChatCompletion which is currently in use in enterprise.

View File

@@ -841,7 +841,7 @@ openedx-filters==2.1.0
# ora2
openedx-forum==0.3.7
# via -r requirements/edx/kernel.in
openedx-learning==0.27.1
openedx-learning==0.29.0
# via
# -c requirements/constraints.txt
# -r requirements/edx/kernel.in

View File

@@ -1393,7 +1393,7 @@ openedx-forum==0.3.7
# via
# -r requirements/edx/doc.txt
# -r requirements/edx/testing.txt
openedx-learning==0.27.1
openedx-learning==0.29.0
# via
# -c requirements/constraints.txt
# -r requirements/edx/doc.txt

View File

@@ -1015,7 +1015,7 @@ openedx-filters==2.1.0
# ora2
openedx-forum==0.3.7
# via -r requirements/edx/base.txt
openedx-learning==0.27.1
openedx-learning==0.29.0
# via
# -c requirements/constraints.txt
# -r requirements/edx/base.txt

View File

@@ -1059,7 +1059,7 @@ openedx-filters==2.1.0
# ora2
openedx-forum==0.3.7
# via -r requirements/edx/base.txt
openedx-learning==0.27.1
openedx-learning==0.29.0
# via
# -c requirements/constraints.txt
# -r requirements/edx/base.txt