feat: bulk modulestore migration [FC-0097] (#37381)

- Adds the task, python api, and rest api view for bulk migration.
- Refactor the code to share code between single migration and bulk migration.
This commit is contained in:
Chris Chávez
2025-10-13 16:34:36 -05:00
committed by GitHub
parent 19c9a340f9
commit 3db4399f74
9 changed files with 1469 additions and 171 deletions

View File

@@ -15,6 +15,7 @@ from .models import ModulestoreSource
__all__ = (
"start_migration_to_library",
"start_bulk_migration_to_library",
"is_successfully_migrated",
"get_migration_info",
)
@@ -46,7 +47,6 @@ def start_migration_to_library(
return tasks.migrate_from_modulestore.delay(
user_id=user.id,
source_pk=source.id,
target_package_pk=target_package_id,
target_library_key=str(target_library_key),
target_collection_pk=target_collection_id,
composition_level=composition_level,
@@ -56,6 +56,52 @@ def start_migration_to_library(
)
def start_bulk_migration_to_library(
*,
user: AuthUser,
source_key_list: list[LearningContextKey],
target_library_key: LibraryLocatorV2,
target_collection_slug_list: list[str | None] | None = None,
create_collections: bool = False,
composition_level: str,
repeat_handling_strategy: str,
preserve_url_slugs: bool,
forward_source_to_target: bool,
) -> AsyncResult:
"""
Import a list of courses or legacy libraries into a V2 library (or, a collections within a V2 library).
"""
target_library = get_library(target_library_key)
# get_library ensures that the library is connected to a learning package.
target_package_id: int = target_library.learning_package_id # type: ignore[assignment]
sources_pks: list[int] = []
for source_key in source_key_list:
source, _ = ModulestoreSource.objects.get_or_create(key=source_key)
sources_pks.append(source.id)
target_collection_pks: list[int | None] = []
if target_collection_slug_list:
for target_collection_slug in target_collection_slug_list:
if target_collection_slug:
target_collection_id = get_collection(target_package_id, target_collection_slug).id
target_collection_pks.append(target_collection_id)
else:
target_collection_pks.append(None)
return tasks.bulk_migrate_from_modulestore.delay(
user_id=user.id,
sources_pks=sources_pks,
target_library_key=str(target_library_key),
target_collection_pks=target_collection_pks,
create_collections=create_collections,
composition_level=composition_level,
repeat_handling_strategy=repeat_handling_strategy,
preserve_url_slugs=preserve_url_slugs,
forward_source_to_target=forward_source_to_target,
)
def is_successfully_migrated(source_key: CourseKey | LibraryLocator) -> bool:
"""
Check if the source course/library has been migrated successfully.

View File

@@ -0,0 +1,20 @@
# Generated by Django 4.2.24 on 2025-09-29 20:28
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('user_tasks', '0004_url_textfield'),
('modulestore_migrator', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='modulestoremigration',
name='task_status',
field=models.ForeignKey(help_text='Tracks the status of the task which is executing this migration. In a bulk migration, the same task can be multiple migrations', on_delete=django.db.models.deletion.RESTRICT, related_name='migrations', to='user_tasks.usertaskstatus'),
),
]

View File

@@ -107,10 +107,14 @@ class ModulestoreMigration(models.Model):
)
## MIGRATION ARTIFACTS
task_status = models.OneToOneField(
task_status = models.ForeignKey(
UserTaskStatus,
on_delete=models.RESTRICT,
help_text=_("Tracks the status of the task which is executing this migration"),
help_text=_(
"Tracks the status of the task which is executing this migration. "
"In a bulk migration, the same task can be multiple migrations"
),
related_name="migrations",
)
change_log = models.ForeignKey(
DraftChangeLog,

View File

@@ -12,9 +12,9 @@ from cms.djangoapps.modulestore_migrator.data import CompositionLevel, RepeatHan
from cms.djangoapps.modulestore_migrator.models import ModulestoreMigration
class ModulestoreMigrationSerializer(serializers.ModelSerializer):
class ModulestoreMigrationSerializer(serializers.Serializer):
"""
Serializer for the course to library import creation API.
Serializer for the course or legacylibrary to library V2 import creation API.
"""
source = serializers.CharField( # type: ignore[assignment]
@@ -22,7 +22,7 @@ class ModulestoreMigrationSerializer(serializers.ModelSerializer):
required=True,
)
target = serializers.CharField(
help_text="The target library key to import into.",
help_text="The target library V2 key to import into.",
required=True,
)
composition_level = serializers.ChoiceField(
@@ -54,18 +54,6 @@ class ModulestoreMigrationSerializer(serializers.ModelSerializer):
default=False,
)
class Meta:
model = ModulestoreMigration
fields = [
'source',
'target',
'target_collection_slug',
'composition_level',
'repeat_handling_strategy',
'preserve_url_slugs',
'forward_source_to_target',
]
def get_fields(self):
fields = super().get_fields()
request = self.context.get('request')
@@ -100,19 +88,74 @@ class ModulestoreMigrationSerializer(serializers.ModelSerializer):
def to_representation(self, instance):
"""
Override to customize the serialized representation."""
Override to customize the serialized representation.
"""
data = super().to_representation(instance)
# Custom logic for forward_source_to_target during serialization
data['forward_source_to_target'] = self.get_forward_source_to_target(instance)
return data
class BulkModulestoreMigrationSerializer(ModulestoreMigrationSerializer):
"""
Serializer for a bulk migration (of several courses or legacy libraries) to a V2 library.
"""
sources = serializers.ListField(
child=serializers.CharField(),
help_text="The list of sources course or legacy library keys to import from.",
required=True,
)
target_collection_slug_list = serializers.ListField(
child=serializers.CharField(),
help_text="The list of target collection slugs within the library to import into. Optional.",
required=False,
allow_empty=True,
default=None,
)
create_collections = serializers.BooleanField(
help_text=(
"If true and `target_collection_slug_list` is not set, "
"create the collections in the library where the import will be made"
),
required=False,
default=False,
)
def get_fields(self):
fields = super().get_fields()
fields.pop("source", None)
fields.pop("target_collection_slug", None)
return fields
def validate_sources(self, value):
"""
Validate all the source key format
"""
validated_sources = []
for v in value:
try:
validated_sources.append(LearningContextKey.from_string(v))
except InvalidKeyError as exc:
raise serializers.ValidationError(f"Invalid source key: {str(exc)}") from exc
return validated_sources
def to_representation(self, instance):
"""
Override to customize the serialized representation.
"""
if isinstance(instance, list):
return [super().to_representation(obj) for obj in instance]
return super().to_representation(instance)
class StatusWithModulestoreMigrationSerializer(StatusSerializer):
"""
Serializer for the import task status.
"""
parameters = ModulestoreMigrationSerializer(source='modulestoremigration')
parameters = ModulestoreMigrationSerializer(source='migrations', many=True)
class Meta:
model = StatusSerializer.Meta.model

View File

@@ -3,9 +3,10 @@ Course to Library Import API v1 URLs.
"""
from rest_framework.routers import SimpleRouter
from .views import MigrationViewSet
from .views import MigrationViewSet, BulkMigrationViewSet
ROUTER = SimpleRouter()
ROUTER.register(r'migrations', MigrationViewSet)
ROUTER.register(r'migrations', MigrationViewSet, basename='migrations')
ROUTER.register(r'bulk_migration', BulkMigrationViewSet, basename='bulk-migration')
urlpatterns = ROUTER.urls

View File

@@ -3,6 +3,7 @@ API v1 views.
"""
import logging
import edx_api_doc_tools as apidocs
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser
from rest_framework.permissions import IsAdminUser
@@ -11,10 +12,14 @@ from rest_framework import status
from user_tasks.models import UserTaskStatus
from user_tasks.views import StatusViewSet
from cms.djangoapps.modulestore_migrator.api import start_migration_to_library
from cms.djangoapps.modulestore_migrator.api import start_migration_to_library, start_bulk_migration_to_library
from openedx.core.lib.api.authentication import BearerAuthenticationAllowInactiveUser
from .serializers import ModulestoreMigrationSerializer, StatusWithModulestoreMigrationSerializer
from .serializers import (
StatusWithModulestoreMigrationSerializer,
ModulestoreMigrationSerializer,
BulkModulestoreMigrationSerializer,
)
log = logging.getLogger(__name__)
@@ -22,7 +27,7 @@ log = logging.getLogger(__name__)
class MigrationViewSet(StatusViewSet):
"""
Import course content from modulestore into a content library.
Import course content or legacy library content from modulestore into a content library.
This viewset handles the import process, including creating the import task and
retrieving the status of the import task. Meant to be used by admin users only.
@@ -84,12 +89,14 @@ class MigrationViewSet(StatusViewSet):
"modified": "2025-05-14T22:24:59.128068Z",
"artifacts": [],
"uuid": "3de23e5d-fd34-4a6f-bf02-b183374120f0",
"parameters": {
"source": "course-v1:OpenedX+DemoX+DemoCourse2",
"composition_level": "component",
"repeat_handling_strategy": "skip",
"preserve_url_slugs": false
}
"parameters": [
{
"source": "course-v1:OpenedX+DemoX+DemoCourse2",
"composition_level": "component",
"repeat_handling_strategy": "skip",
"preserve_url_slugs": false
}
]
}
"""
@@ -103,33 +110,172 @@ class MigrationViewSet(StatusViewSet):
def get_queryset(self):
"""
Override the default queryset to filter by the import event and user.
Override the default queryset to filter by the migration event and user.
"""
return StatusViewSet.queryset.filter(modulestoremigration__isnull=False, user=self.request.user)
return StatusViewSet.queryset.filter(migrations__isnull=False, user=self.request.user).distinct()
@apidocs.schema(
body=ModulestoreMigrationSerializer,
responses={
201: StatusWithModulestoreMigrationSerializer,
401: "The requester is not authenticated.",
},
summary="Start a modulestore to content library migration",
description=(
"Create a migration task to import course or legacy library content into "
"a content library.\n\n"
"**Request example**:\n\n"
"```json\n"
"{\n"
' "source": "course-v1:edX+DemoX+2014_T1",\n'
' "target": "library-v1:org1+lib_1",\n'
' "composition_level": "unit",\n'
' "repeat_handling_strategy": "update",\n'
' "preserve_url_slugs": true\n'
"}\n"
"```"
),
)
def create(self, request, *args, **kwargs):
"""
Handle the import task creation.
Handle the migration task creation.
"""
serializer_data = ModulestoreMigrationSerializer(data=request.data)
serializer_data.is_valid(raise_exception=True)
validated_data = serializer_data.validated_data
try:
task = start_migration_to_library(
user=request.user,
source_key=validated_data['source'],
target_library_key=validated_data['target'],
target_collection_slug=validated_data['target_collection_slug'],
composition_level=validated_data['composition_level'],
repeat_handling_strategy=validated_data['repeat_handling_strategy'],
preserve_url_slugs=validated_data['preserve_url_slugs'],
forward_source_to_target=validated_data['forward_source_to_target'],
)
except NotImplementedError as e:
log.exception(str(e))
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
task = start_migration_to_library(
user=request.user,
source_key=validated_data['source'],
target_library_key=validated_data['target'],
target_collection_slug=validated_data['target_collection_slug'],
composition_level=validated_data['composition_level'],
repeat_handling_strategy=validated_data['repeat_handling_strategy'],
preserve_url_slugs=validated_data['preserve_url_slugs'],
forward_source_to_target=validated_data['forward_source_to_target'],
)
task_status = UserTaskStatus.objects.get(task_id=task.id)
serializer = self.get_serializer(task_status)
return Response(serializer.data, status=status.HTTP_201_CREATED)
class BulkMigrationViewSet(StatusViewSet):
"""
Import content of a list of courses or legacy libraries from modulestore into a content library.
This viewset handles the import process, including creating the import task and
retrieving the status of the import task. Meant to be used by admin users only.
API Endpoints
------------
POST /api/modulestore_migrator/v1/bulk-migration/
Start the bulk import process.
Request body:
{
"sources": ["<source_course_key_1>", "<source_course_key_2>"],
"target": "<target_library>",
"composition_level": "<composition_level>", # Optional, defaults to "component"
"target_collection_slugs": ["<target_collection_slug_1>", "<target_collection_slug_1>"], # Optional
"create_collections": "<boolean>" # Optional, defaults to false
"repeat_handling_strategy": "<repeat_handling_strategy>" # Optional, defaults to Skip
"preserve_url_slugs": "<boolean>" # Optional, defaults to true
}
Example request:
{
"sources": ["course-v1:edX+DemoX+2014_T1", "course-v1:edX+DemoX+2014_T2"],
"target": "library-v1:org1+lib_1",
"composition_level": "unit",
"repeat_handling_strategy": "update",
"preserve_url_slugs": true,
"create_collections": true
}
Example response:
{
"state": "Succeeded",
"state_text": "Succeeded", # Translation into the current language of the current state
"completed_steps": 11,
"total_steps": 11,
"attempts": 1,
"created": "2025-05-14T22:24:37.048539Z",
"modified": "2025-05-14T22:24:59.128068Z",
"artifacts": [],
"uuid": "3de23e5d-fd34-4a6f-bf02-b183374120f0",
"parameters": [
{
"source": "course-v1:edX+DemoX+2014_T1",
"composition_level": "unit",
"repeat_handling_strategy": "update",
"preserve_url_slugs": true
},
{
"source": "course-v1:edX+DemoX+2014_T2",
"composition_level": "unit",
"repeat_handling_strategy": "update",
"preserve_url_slugs": true
},
]
}
GET Not Alowed
"""
permission_classes = (IsAdminUser,)
authentication_classes = (
BearerAuthenticationAllowInactiveUser,
JwtAuthentication,
SessionAuthenticationAllowInactiveUser,
)
serializer_class = StatusWithModulestoreMigrationSerializer
http_method_names = ["post"]
@apidocs.schema(
body=BulkModulestoreMigrationSerializer,
responses={
201: StatusWithModulestoreMigrationSerializer,
401: "The requester is not authenticated.",
},
summary="Start a bulk modulestore to content library migration",
description=(
"Create a migration task to import multiple courses or legacy libraries "
"into a single content library.\n\n"
"**Request example**:\n\n"
"```json\n"
"{\n"
' "sources": ["course-v1:edX+DemoX+2014_T1", "course-v1:edX+DemoX+2014_T2"],\n'
' "target": "library-v1:org1+lib_1",\n'
' "composition_level": "unit",\n'
' "repeat_handling_strategy": "update",\n'
' "preserve_url_slugs": true,\n'
' "create_collections": true\n'
"}\n"
"```"
),
)
def create(self, request, *args, **kwargs):
"""
Handle the bulk migration task creation.
"""
serializer_data = BulkModulestoreMigrationSerializer(data=request.data)
serializer_data.is_valid(raise_exception=True)
validated_data = serializer_data.validated_data
task = start_bulk_migration_to_library(
user=request.user,
source_key_list=validated_data['sources'],
target_library_key=validated_data['target'],
target_collection_slug_list=validated_data['target_collection_slug_list'],
create_collections=validated_data['create_collections'],
composition_level=validated_data['composition_level'],
repeat_handling_strategy=validated_data['repeat_handling_strategy'],
preserve_url_slugs=validated_data['preserve_url_slugs'],
forward_source_to_target=validated_data['forward_source_to_target'],
)
task_status = UserTaskStatus.objects.get(task_id=task.id)
serializer = self.get_serializer(task_status)

View File

@@ -15,6 +15,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.exceptions import ObjectDoesNotExist
from django.utils.text import slugify
from django.db import transaction
from edx_django_utils.monitoring import set_code_owner_attribute_from_module
from lxml import etree
from lxml.etree import _ElementTree as XmlTree
@@ -37,8 +38,11 @@ from openedx_learning.api.authoring_models import (
PublishableEntityVersion
)
from user_tasks.tasks import UserTask, UserTaskStatus
from xblock.core import XBlock
from django.utils.translation import gettext_lazy as _
from common.djangoapps.split_modulestore_django.models import SplitModulestoreCourseIndex
from common.djangoapps.util.date_utils import strftime_localized, DEFAULT_DATE_TIME_FORMAT
from openedx.core.djangoapps.content_libraries import api as libraries_api
from openedx.core.djangoapps.content_libraries.api import ContainerType, get_library
from openedx.core.djangoapps.content_staging import api as staging_api
@@ -71,6 +75,7 @@ class MigrationStep(Enum):
MAPPING_OLD_TO_NEW = 'Saving map of legacy content to migrated content'
FORWARDING = 'Forwarding legacy content to migrated content'
POPULATING_COLLECTION = 'Assigning imported items to the specified collection'
BULK_MIGRATION_PREFIX = 'Migrating legacy content'
class _MigrationTask(UserTask):
@@ -83,7 +88,36 @@ class _MigrationTask(UserTask):
"""
Get number of in-progress steps in importing process, as shown in the UI.
"""
return len(list(MigrationStep))
# We subtract the BULK_MIGRATION_PREFIX
return len(list(MigrationStep)) - 1
class _BulkMigrationTask(UserTask):
"""
Base class for bulk_migrate_from_modulestore
"""
@staticmethod
def calculate_total_steps(arguments_dict):
"""
Get number of in-progress steps in importing process, as shown in the UI.
There are steps that are general for all sources, but there are steps that are repeated in each source.
All of this is taken into account to make the sum
"""
sources_count = len(arguments_dict.get('sources_pks', 1))
# STAGING, PARSING, IMPORTING_ASSETS, IMPORTING_STRUCTURE, MAPPING_OLD_TO_NEW, UNSTAGING
steps_repeated_count = 6
return (
# All migration steps and subtract the BULK_MIGRATION_PREFIX
len(list(MigrationStep)) - 1
# We don't want to count these steps again, they will be counted in the operation below.
- steps_repeated_count
# Each source repeats all the `steps_repeated_count`
+ steps_repeated_count * sources_count
)
@dataclass(frozen=True)
@@ -157,44 +191,34 @@ class _MigrationContext:
return self.repeat_handling_strategy is RepeatHandlingStrategy.Fork
@shared_task(base=_MigrationTask, bind=True)
# Note: The decorator @set_code_owner_attribute cannot be used here because the UserTaskMixin
# does stack inspection and can't handle additional decorators.
def migrate_from_modulestore(
self: _MigrationTask,
*,
user_id: int,
@dataclass()
class _MigrationSourceData:
"""
Data related to a ModulestoreSource
"""
source: ModulestoreSource
source_root_usage_key: UsageKey
source_version: str | None
migration: ModulestoreMigration
def _validate_input(
status: UserTaskStatus,
source_pk: int,
target_package_pk: int,
target_library_key: str,
target_collection_pk: int,
repeat_handling_strategy: str,
preserve_url_slugs: bool,
composition_level: str,
forward_source_to_target: bool,
) -> None:
target_package: LearningPackage,
target_collection: Collection | None,
) -> _MigrationSourceData | None:
"""
Import a course or legacy library into a learning package.
Currently, the target learning package must be associated with a V2 content library, but that
restriction may be loosened in the future as more types of learning packages are developed.
Validates and build the source data related to `source_pk`
"""
# pylint: disable=too-many-statements
# This is a large function, but breaking it up futher would probably not
# make it any easier to understand.
set_code_owner_attribute_from_module(__name__)
status: UserTaskStatus = self.status
status.set_state(MigrationStep.VALIDATING_INPUT.value)
try:
source = ModulestoreSource.objects.get(pk=source_pk)
target_package = LearningPackage.objects.get(pk=target_package_pk)
target_library = get_library(LibraryLocatorV2.from_string(target_library_key))
target_collection = Collection.objects.get(pk=target_collection_pk) if target_collection_pk else None
except (ObjectDoesNotExist, InvalidKeyError) as exc:
except (ObjectDoesNotExist) as exc:
status.fail(str(exc))
return
return None
# The Model is used for Course and Legacy Library
course_index = SplitModulestoreCourseIndex.objects.filter(course_id=source.key).first()
@@ -209,7 +233,7 @@ def migrate_from_modulestore(
f"Not a valid source context key: {source.key}. "
"Source key must reference a course or a legacy library."
)
return
return None
migration = ModulestoreMigration.objects.create(
source=source,
@@ -221,21 +245,36 @@ def migrate_from_modulestore(
target_collection=target_collection,
task_status=status,
)
status.increment_completed_steps()
status.set_state(MigrationStep.CANCELLING_OLD.value)
return _MigrationSourceData(
source=source,
source_root_usage_key=source_root_usage_key,
source_version=source_version,
migration=migration,
)
def _cancel_old_tasks(
source_list: list[ModulestoreSource],
status: UserTaskStatus,
target_package: LearningPackage,
migration_ids_to_exclude: list[int],
) -> None:
"""
Cancel all migration tasks related to the user and the source list
"""
# In order to prevent a user from accidentally starting a bunch of identical import tasks...
migrations_to_cancel = ModulestoreMigration.objects.filter(
# get all Migration tasks by this user with the same source and target
# get all Migration tasks by this user with the same sources and target
task_status__user=status.user,
source=source,
source__in=source_list,
target=target_package,
).select_related('task_status').exclude(
# (excluding that aren't running)
task_status__state__in=(UserTaskStatus.CANCELED, UserTaskStatus.FAILED, UserTaskStatus.SUCCEEDED)
).exclude(
# (excluding this migration itself)
id=migration.id
# (excluding these migrations themselves)
id__in=migration_ids_to_exclude
)
# ... and cancel their tasks and clean away their staged content.
for migration_to_cancel in migrations_to_cancel:
@@ -243,45 +282,41 @@ def migrate_from_modulestore(
migration_to_cancel.task_status.cancel()
if migration_to_cancel.staged_content:
migration_to_cancel.staged_content.delete()
status.increment_completed_steps()
status.set_state(MigrationStep.LOADING)
def _load_xblock(
status: UserTaskStatus,
usage_key: UsageKey,
) -> XBlock | None:
"""
Loads the Xblock for the given usage_key
"""
try:
legacy_root = modulestore().get_item(source_root_usage_key)
xblock = modulestore().get_item(usage_key)
except modulestore_exceptions.ItemNotFoundError as exc:
status.fail(f"Failed to load source item '{source_root_usage_key}' from ModuleStore: {exc}")
return
if not legacy_root:
status.fail(f"Could not find source item '{source_root_usage_key}' in ModuleStore")
return
status.increment_completed_steps()
status.fail(f"Failed to load source item '{usage_key}' from ModuleStore: {exc}")
return None
if not xblock:
status.fail(f"Could not find source item '{usage_key}' in ModuleStore")
return None
return xblock
status.set_state(MigrationStep.STAGING.value)
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root,
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source.key),
)
migration.staged_content = staged_content
status.increment_completed_steps()
status.set_state(MigrationStep.PARSING.value)
parser = etree.XMLParser(strip_cdata=False)
try:
root_node = etree.fromstring(staged_content.olx, parser=parser)
except etree.ParseError as exc:
status.fail(f"Failed to parse source OLX (from staged content with id = {staged_content.id}): {exc}")
status.increment_completed_steps()
def _import_assets(migration: ModulestoreMigration) -> dict[str, int]:
"""
Import the assets of the staged content to the migration target
"""
if migration.staged_content is None:
return {}
status.set_state(MigrationStep.IMPORTING_ASSETS.value)
content_by_filename: dict[str, int] = {}
now = datetime.now(tz=timezone.utc)
for staged_content_file_data in staging_api.get_staged_content_static_files(staged_content.id):
for staged_content_file_data in staging_api.get_staged_content_static_files(migration.staged_content.id):
old_path = staged_content_file_data.filename
file_data = staging_api.get_staged_content_static_file_data(staged_content.id, old_path)
file_data = staging_api.get_staged_content_static_file_data(migration.staged_content.id, old_path)
if not file_data:
log.error(
f"Staged content {staged_content.id} included referenced file {old_path}, "
f"Staged content {migration.staged_content.id} included referenced file {old_path}, "
"but no file data was found."
)
continue
@@ -294,10 +329,45 @@ def migrate_from_modulestore(
data=file_data,
created=now,
).id
status.increment_completed_steps()
return content_by_filename
status.set_state(MigrationStep.IMPORTING_STRUCTURE.value)
def _import_structure(
migration: ModulestoreMigration,
source_data: _MigrationSourceData,
target_library: libraries_api.ContentLibraryMetadata,
content_by_filename: dict[str, int],
root_node: XmlTree,
status: UserTaskStatus,
) -> tuple[t.Any, _MigratedNode]:
"""
Import the staged content structure into the target Learning Core library.
Args:
migration (ModulestoreMigration):
The migration record representing the ongoing modulestore-to-learning-core migration.
source_data (_MigrationSourceData):
Data extracted from the legacy modulestore, including the source root usage key.
Use `_validate_input()` to generate this data.
target_library (libraries_api.ContentLibraryMetadata):
The target library where the new Learning Core content will be created.
content_by_filename (dict[str, int]):
A mapping between OLX file names and their associated file IDs in the staging area.
Use `_import_assets` to generate this content.
root_node (XmlTree):
The parsed XML tree representing the root of the staged OLX content.
status (UserTaskStatus):
The user task used to record progress and state updates throughout the import.
Returns:
tuple[Any, _MigratedNode]:
A tuple containing:
- The first element (`change_log`): the bulk draft change log generated by
`authoring_api.bulk_draft_changes_for`, containing all the imported changes.
- The second element (`root_migrated_node`): a `_MigratedNode` object that
represents the mapping between the legacy root node and its newly created
Learning Core equivalent.
"""
# "key" is locally unique across all PublishableEntities within
# a given LearningPackage.
# We use this mapping to ensure that we don't create duplicate
@@ -313,13 +383,13 @@ def migrate_from_modulestore(
migration_context = _MigrationContext(
existing_source_to_target_keys=existing_source_to_target_keys,
target_package_id=target_package_pk,
target_package_id=migration.target.pk,
target_library_key=target_library.key,
source_context_key=source_root_usage_key.course_key,
source_context_key=source_data.source_root_usage_key.course_key,
content_by_filename=content_by_filename,
composition_level=CompositionLevel(composition_level),
repeat_handling_strategy=RepeatHandlingStrategy(repeat_handling_strategy),
preserve_url_slugs=preserve_url_slugs,
composition_level=CompositionLevel(migration.composition_level),
repeat_handling_strategy=RepeatHandlingStrategy(migration.repeat_handling_strategy),
preserve_url_slugs=migration.preserve_url_slugs,
created_by=status.user_id,
created_at=datetime.now(timezone.utc),
)
@@ -330,6 +400,215 @@ def migrate_from_modulestore(
source_node=root_node,
)
change_log.save()
return change_log, root_migrated_node
def _forwarding_content(source_data: _MigrationSourceData) -> None:
"""
Forwarding legacy content to migrated content
"""
block_migrations = ModulestoreBlockMigration.objects.filter(overall_migration=source_data.migration)
block_sources_to_block_migrations = {
block_migration.source: block_migration for block_migration in block_migrations
}
for block_source, block_migration in block_sources_to_block_migrations.items():
block_source.forwarded = block_migration
block_source.save()
source_data.source.forwarded = source_data.migration
source_data.source.save()
def _populate_collection(user_id: int, migration: ModulestoreMigration) -> None:
"""
Assigning imported items to the specified collection in the migration
"""
if migration.target_collection is None:
return
block_target_pks: list[int] = list(
ModulestoreBlockMigration.objects.filter(
overall_migration=migration
).values_list('target_id', flat=True)
)
if block_target_pks:
authoring_api.add_to_collection(
learning_package_id=migration.target.pk,
key=migration.target_collection.key,
entities_qset=PublishableEntity.objects.filter(id__in=block_target_pks),
created_by=user_id,
)
else:
log.warning("No target entities found to add to collection")
def _create_collection(library_key: LibraryLocatorV2, title: str) -> Collection:
"""
Creates a collection in the given library
If there's a collection with the same key, try again, adding the attempt number at the end.
The same is true for the title.
"""
key = slugify(title)
collection = None
attempt = 0
created_at = strftime_localized(datetime.now(timezone.utc), DEFAULT_DATE_TIME_FORMAT)
description = f"{_('This collection contains content migrated from a legacy library on')}: {created_at}"
while not collection:
modified_key = key if attempt == 0 else key + '-' + str(attempt)
try:
# Add transaction here to avoid TransactionManagementError on retry
with transaction.atomic():
collection = libraries_api.create_library_collection(
library_key=library_key,
collection_key=modified_key,
title=f"{title}{f'_{attempt}' if attempt > 0 else ''}",
description=description,
)
except libraries_api.LibraryCollectionAlreadyExists as e:
attempt += 1
return collection
@shared_task(base=_MigrationTask, bind=True)
# Note: The decorator @set_code_owner_attribute cannot be used here because the UserTaskMixin
# does stack inspection and can't handle additional decorators.
def migrate_from_modulestore(
self: _MigrationTask,
*,
user_id: int,
source_pk: int,
target_library_key: str,
target_collection_pk: int | None,
repeat_handling_strategy: str,
preserve_url_slugs: bool,
composition_level: str,
forward_source_to_target: bool,
) -> None:
"""
Import a single course or legacy library from modulestore into a V2 legacy library.
This task performs the end-to-end migration for one legacy source (course or library),
including staging, parsing OLX, importing assets and structure, and assigning the
migrated content to the specified target library and collection.
A new `UserTaskStatus` entry is created for each invocation of this task, meaning
that each migration runs independently with its own progress tracking and final
success or failure state.
If the migration encounters an unrecoverable error at any step (for example, invalid
OLX, missing assets, or database constraints), the task is marked as **failed** and
the partial results are rolled back as necessary. The migration state can be queried
through the REST API endpoint `/api/modulestore_migrator/v1/migrations/<uuid>/`.
Args:
self (_MigrationTask):
The Celery task instance that wraps the user task logic.
user_id (int):
The ID of the user initiating the migration.
source_pk (int):
Primary key of the modulestore source to migrate.
target_library_key (str):
Key of the target V2 library that will receive the imported content.
target_collection_pk (int | None):
Optional ID of a target collection to which imported content will be assigned.
repeat_handling_strategy (str):
Strategy for handling repeated imports (e.g., "skip", "update").
preserve_url_slugs (bool):
Whether to preserve original XBlock URL slugs during import.
composition_level (str):
The structural level to migrate (e.g., component, unit, or section).
forward_source_to_target (bool):
Whether to forward legacy content references to the migrated content after import.
See Also:
- `bulk_migrate_from_modulestore`: Multi-source batch migration equivalent.
- API docs: `/api/cms/v1/migrations/` for REST behavior and responses.
"""
# pylint: disable=too-many-statements
# This is a large function, but breaking it up futher would probably not
# make it any easier to understand.
set_code_owner_attribute_from_module(__name__)
status: UserTaskStatus = self.status
# Validating input
status.set_state(MigrationStep.VALIDATING_INPUT.value)
try:
target_library = get_library(LibraryLocatorV2.from_string(target_library_key))
if target_library.learning_package_id is None:
raise ValueError("Target library has no associated learning package.")
target_package = LearningPackage.objects.get(pk=target_library.learning_package_id)
target_collection = Collection.objects.get(pk=target_collection_pk) if target_collection_pk else None
except (ObjectDoesNotExist, InvalidKeyError) as exc:
status.fail(str(exc))
return
source_data = _validate_input(
status,
source_pk,
repeat_handling_strategy,
preserve_url_slugs,
composition_level,
target_package,
target_collection,
)
if source_data is None:
# Fail
return
migration = source_data.migration
status.increment_completed_steps()
# Cancelling old tasks
status.set_state(MigrationStep.CANCELLING_OLD.value)
_cancel_old_tasks([source_data.source], status, target_package, [migration.id])
status.increment_completed_steps()
# Loading `legacy_root`
status.set_state(MigrationStep.LOADING)
legacy_root = _load_xblock(status, source_data.source_root_usage_key)
if legacy_root is None:
# Fail
return
status.increment_completed_steps()
# Staging legacy block
status.set_state(MigrationStep.STAGING.value)
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root,
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source_data.source.key),
)
migration.staged_content = staged_content
status.increment_completed_steps()
# Parsing OLX
status.set_state(MigrationStep.PARSING.value)
parser = etree.XMLParser(strip_cdata=False)
try:
root_node = etree.fromstring(staged_content.olx, parser=parser)
except etree.ParseError as exc:
status.fail(f"Failed to parse source OLX (from staged content with id = {staged_content.id}): {exc}")
status.increment_completed_steps()
# Importing assets of the legacy block
status.set_state(MigrationStep.IMPORTING_ASSETS.value)
content_by_filename = _import_assets(migration)
status.increment_completed_steps()
# Importing structure of the legacy block
status.set_state(MigrationStep.IMPORTING_STRUCTURE.value)
change_log, root_migrated_node = _import_structure(
migration,
source_data,
target_library,
content_by_filename,
root_node,
status,
)
migration.change_log = change_log
status.increment_completed_steps()
@@ -339,43 +618,271 @@ def migrate_from_modulestore(
_create_migration_artifacts_incrementally(
root_migrated_node=root_migrated_node,
source=source,
source=source_data.source,
migration=migration,
status=status,
)
block_migrations = ModulestoreBlockMigration.objects.filter(overall_migration=migration)
status.increment_completed_steps()
# Forwarding legacy content to migrated content
status.set_state(MigrationStep.FORWARDING.value)
if forward_source_to_target:
block_sources_to_block_migrations = {
block_migration.source: block_migration for block_migration in block_migrations
}
for block_source, block_migration in block_sources_to_block_migrations.items():
block_source.forwarded = block_migration
block_source.save()
source.forwarded = migration
source.save()
_forwarding_content(source_data)
status.increment_completed_steps()
# Populating the collection
status.set_state(MigrationStep.POPULATING_COLLECTION.value)
if target_collection:
block_target_pks: list[int] = list(
ModulestoreBlockMigration.objects.filter(
overall_migration=migration
).values_list('target_id', flat=True)
_populate_collection(user_id, migration)
status.increment_completed_steps()
@shared_task(base=_BulkMigrationTask, bind=True)
# Note: The decorator @set_code_owner_attribute cannot be used here because the UserTaskMixin
# does stack inspection and can't handle additional decorators.
def bulk_migrate_from_modulestore(
self: _BulkMigrationTask,
*,
user_id: int,
sources_pks: list[int],
target_library_key: str,
target_collection_pks: list[int | None],
create_collections: bool = False,
repeat_handling_strategy: str,
preserve_url_slugs: bool,
composition_level: str,
forward_source_to_target: bool,
) -> None:
"""
Import multiple legacy courses or libraries into a single V2 library.
This task performs the same logical steps as `migrate_from_modulestore`, but allows
batching several migrations together under **one single user task** (`UserTaskStatus`).
Unlike running `migrate_from_modulestore` in a loop (which would create multiple
independent Celery tasks and separate statuses), the bulk migration maintains
**one unified status record** that tracks progress across all included sources.
This simplifies monitoring, since the client only needs to observe one task state.
Each source item (course or library) still creates its own `ModulestoreMigration`
database record, but all of them share the same parent task (`UserTaskStatus`).
If any sub-migration fails (for example, due to invalid OLX or missing assets),
the bulk migration **marks the entire task as failed** — there is no partial success.
Args:
self (_BulkMigrationTask):
The Celery task instance that wraps the user task logic.
user_id (int):
The ID of the user initiating the migration.
sources_pks (list[int]):
Primary keys of the legacy modulestore sources to migrate.
target_library_key (str):
Key of the V2 library that will receive the imported content.
target_collection_pks (list[int | None]):
Optional list of target collection IDs corresponding to each source.
create_collections (bool):
Whether to automatically create new collections when none exist.
repeat_handling_strategy (str):
Strategy to handle repeated imports of the same content.
preserve_url_slugs (bool):
Whether to preserve existing XBlock URL slugs during import.
composition_level (str):
Composition level at which content should be imported (e.g. course, section).
forward_source_to_target (bool):
Whether to forward legacy content to its migrated equivalent after import.
See Also:
- `migrate_from_modulestore`: Single-source migration equivalent.
- API docs: `/api/cms/v1/migrations/bulk/` for REST behavior and responses.
"""
# pylint: disable=too-many-statements
# This is a large function, but breaking it up futher would probably not
# make it any easier to understand.
set_code_owner_attribute_from_module(__name__)
status: UserTaskStatus = self.status
# Validating input
status.set_state(MigrationStep.VALIDATING_INPUT.value)
target_collection_list: list[Collection | None] = []
try:
target_library_locator = LibraryLocatorV2.from_string(target_library_key)
target_library = get_library(target_library_locator)
if target_library.learning_package_id is None:
raise ValueError("Target library has no associated learning package.")
target_package = LearningPackage.objects.get(pk=target_library.learning_package_id)
if target_collection_pks:
for target_collection_pk in target_collection_pks:
target_collection_list.append(
Collection.objects.get(pk=target_collection_pk) if target_collection_pk else None
)
except (ObjectDoesNotExist, InvalidKeyError, ValueError) as exc:
status.fail(str(exc))
return
source_data_list: list[_MigrationSourceData] = []
for i in range(len(sources_pks)):
source_data = _validate_input(
status,
sources_pks[i],
repeat_handling_strategy,
preserve_url_slugs,
composition_level,
target_package,
target_collection_list[i] if target_collection_list else None,
)
if block_target_pks:
authoring_api.add_to_collection(
learning_package_id=target_package_pk,
key=target_collection.key,
entities_qset=PublishableEntity.objects.filter(id__in=block_target_pks),
created_by=user_id,
)
else:
log.warning("No target entities found to add to collection")
if source_data is None:
# Fail
return
source_data_list.append(source_data)
status.increment_completed_steps()
# Cancelling old tasks
status.set_state(MigrationStep.CANCELLING_OLD.value)
_cancel_old_tasks(
[x.source for x in source_data_list],
status,
target_package,
[migration.id for migration in [x.migration for x in source_data_list]],
)
status.increment_completed_steps()
# Loading legacy blocks
status.set_state(MigrationStep.LOADING)
legacy_root_list: list[XBlock] = []
for source_data in source_data_list:
legacy_root = _load_xblock(status, source_data.source_root_usage_key)
if legacy_root is None:
# Fail
return
legacy_root_list.append(legacy_root)
status.increment_completed_steps()
for i, source_pk in enumerate(sources_pks):
source_data = source_data_list[i]
# Start migration for `source_pk`
# Staging legacy blocks
status.set_state(f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.STAGING.value}")
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root_list[i],
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source_data.source.key),
)
source_data.migration.staged_content = staged_content
status.increment_completed_steps()
# Parsing OLX
status.set_state(f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.PARSING.value}")
parser = etree.XMLParser(strip_cdata=False)
try:
root_node = etree.fromstring(staged_content.olx, parser=parser)
except etree.ParseError as exc:
status.fail(f"Failed to parse source OLX (from staged content with id = {staged_content.id}): {exc}")
status.increment_completed_steps()
# Importing assets
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.IMPORTING_ASSETS.value}"
)
content_by_filename = _import_assets(source_data.migration)
status.increment_completed_steps()
# Importing structure of the legacy block
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.IMPORTING_STRUCTURE.value}"
)
change_log, root_migrated_node = _import_structure(
source_data.migration,
source_data,
target_library,
content_by_filename,
root_node,
status,
)
source_data.migration.change_log = change_log
status.increment_completed_steps()
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.UNSTAGING.value}"
)
staged_content.delete()
status.increment_completed_steps()
_create_migration_artifacts_incrementally(
root_migrated_node=root_migrated_node,
source=source_data.source,
migration=source_data.migration,
status=status,
source_pk=source_pk,
)
status.increment_completed_steps()
# Forwarding legacy content to migrated content
status.set_state(MigrationStep.FORWARDING.value)
if forward_source_to_target:
for source_data in source_data_list:
_forwarding_content(source_data)
status.increment_completed_steps()
# Populating collections
status.set_state(MigrationStep.POPULATING_COLLECTION.value)
# Used to check if the source has a previous migration in a V2 library collection
# It is placed here to avoid the circular import
from .api import get_migration_info
for i, source_data in enumerate(source_data_list):
migration = source_data.migration
title = legacy_root_list[i].display_name
if migration.target_collection is None:
if not create_collections:
continue
source_key = source_data.source.key
if migration.repeat_handling_strategy == RepeatHandlingStrategy.Fork.value:
# Create a new collection when it is Fork
migration.target_collection = _create_collection(target_library_locator, title)
else:
# It is Skip or Update
# We need to verify if there is a previous migration with collection
# TODO: This only fetches the latest migration, if different migrations have been done
# on different V2 libraries, this could break the logic.
previous_migration = get_migration_info([source_key])
if (
source_key in previous_migration
and previous_migration[source_key].migrations__target_collection__key
):
# Has previous migration with collection
try:
# Get the previous collection
previous_collection = authoring_api.get_collection(
target_package.id,
previous_migration[source_key].migrations__target_collection__key,
)
migration.target_collection = previous_collection
except Collection.DoesNotExist:
# The collection no longer exists or is being migrated to a different library.
# In that case, create a new collection independent of strategy
migration.target_collection = _create_collection(target_library_locator, title)
else:
# Create collection and save in migration
migration.target_collection = _create_collection(target_library_locator, title)
_populate_collection(user_id, migration)
ModulestoreMigration.objects.bulk_update(
[x.migration for x in source_data_list],
["target_collection"],
)
status.increment_completed_steps()
@@ -751,7 +1258,8 @@ def _create_migration_artifacts_incrementally(
root_migrated_node: _MigratedNode,
source: ModulestoreSource,
migration: ModulestoreMigration,
status: UserTaskStatus
status: UserTaskStatus,
source_pk: int | None = None,
) -> None:
"""
Create ModulestoreBlockSource and ModulestoreBlockMigration objects incrementally.
@@ -774,6 +1282,12 @@ def _create_migration_artifacts_incrementally(
processed += 1
if processed % 10 == 0 or processed == total_nodes:
status.set_state(
f"{MigrationStep.MAPPING_OLD_TO_NEW.value} ({processed}/{total_nodes})"
)
if source_pk:
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): "
f"{MigrationStep.MAPPING_OLD_TO_NEW.value} ({processed}/{total_nodes})"
)
else:
status.set_state(
f"{MigrationStep.MAPPING_OLD_TO_NEW.value} ({processed}/{total_nodes})"
)

View File

@@ -67,6 +67,45 @@ class TestModulestoreMigratorAPI(LibraryTestCase):
assert modulestoremigration.task_status is not None
assert modulestoremigration.task_status.user == user
def test_start_bulk_migration_to_library(self):
"""
Test that the API can start a bulk migration to a library.
"""
source = ModulestoreSourceFactory()
source_2 = ModulestoreSourceFactory()
user = UserFactory()
api.start_bulk_migration_to_library(
user=user,
source_key_list=[source.key, source_2.key],
target_library_key=self.library_v2.library_key,
target_collection_slug_list=None,
composition_level=CompositionLevel.Component.value,
repeat_handling_strategy=RepeatHandlingStrategy.Skip.value,
preserve_url_slugs=True,
forward_source_to_target=False,
)
modulestoremigration = ModulestoreMigration.objects.get(source=source)
assert modulestoremigration.source.key == source.key
assert (
modulestoremigration.composition_level == CompositionLevel.Component.value
)
assert modulestoremigration.repeat_handling_strategy == RepeatHandlingStrategy.Skip.value
assert modulestoremigration.preserve_url_slugs is True
assert modulestoremigration.task_status is not None
assert modulestoremigration.task_status.user == user
modulestoremigration_2 = ModulestoreMigration.objects.get(source=source_2)
assert modulestoremigration_2.source.key == source_2.key
assert (
modulestoremigration_2.composition_level == CompositionLevel.Component.value
)
assert modulestoremigration_2.repeat_handling_strategy == RepeatHandlingStrategy.Skip.value
assert modulestoremigration_2.preserve_url_slugs is True
assert modulestoremigration_2.task_status is not None
assert modulestoremigration_2.task_status.user == user
def test_start_migration_to_library_with_collection(self):
"""
Test that the API can start a migration to a library with a target collection.

View File

@@ -14,7 +14,7 @@ from organizations.tests.factories import OrganizationFactory
from user_tasks.models import UserTaskArtifact
from user_tasks.tasks import UserTaskStatus
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.factories import CourseFactory, LibraryFactory
from common.djangoapps.student.tests.factories import UserFactory
from cms.djangoapps.modulestore_migrator.data import CompositionLevel, RepeatHandlingStrategy
@@ -29,7 +29,9 @@ from cms.djangoapps.modulestore_migrator.tasks import (
_MigratedNode,
_MigrationContext,
_MigrationTask,
_BulkMigrationTask,
migrate_from_modulestore,
bulk_migrate_from_modulestore,
MigrationStep,
)
from openedx.core.djangoapps.content_libraries import api as lib_api
@@ -48,24 +50,55 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
self.lib_key = LibraryLocatorV2.from_string(
f"lib:{self.organization.short_name}:test-key"
)
self.lib_key_2 = LibraryLocatorV2.from_string(
f"lib:{self.organization.short_name}:test-key-2"
)
lib_api.create_library(
org=self.organization,
slug=self.lib_key.slug,
title="Test Library",
)
lib_api.create_library(
org=self.organization,
slug=self.lib_key_2.slug,
title="Test Library 2",
)
self.library = lib_api.ContentLibrary.objects.get(slug=self.lib_key.slug)
self.library_2 = lib_api.ContentLibrary.objects.get(slug=self.lib_key_2.slug)
self.learning_package = self.library.learning_package
self.learning_package_2 = self.library_2.learning_package
self.course = CourseFactory(
org=self.organization.short_name,
course="TestCourse",
run="TestRun",
display_name="Test Course",
)
self.course_2 = CourseFactory(
org=self.organization.short_name,
course="TestCourse2",
run="TestRun2",
display_name="Test Course 2",
)
self.legacy_library = LibraryFactory(
org=self.organization.short_name,
library="LegacyLibrary",
display_name="Legacy Library",
)
self.legacy_library_2 = LibraryFactory(
org=self.organization.short_name,
library="LegacyLibrary2",
display_name="Legacy Library 2",
)
self.collection = Collection.objects.create(
learning_package=self.learning_package,
key="test_collection",
title="Test Collection",
)
self.collection2 = Collection.objects.create(
learning_package=self.learning_package,
key="test_collection2",
title="Test Collection 2",
)
def _get_task_status_fail_message(self, status):
"""
@@ -272,7 +305,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": 999999, # Non-existent source
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -286,21 +318,16 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(self._get_task_status_fail_message(status), "ModulestoreSource matching query does not exist.")
def test_migrate_from_modulestore_invalid_target_package(self):
def test_bulk_migrate_invalid_sources(self):
"""
Test migrate_from_modulestore with invalid target package
Test bulk_migrate_from_modulestore with invalid source
"""
source = ModulestoreSource.objects.create(
key=self.course.id,
)
task = migrate_from_modulestore.apply_async(
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": 999999, # Non-existent package
"sources_pks": [999999], # Non-existent source
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"target_collection_pks": [self.collection.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
@@ -310,7 +337,7 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(self._get_task_status_fail_message(status), "LearningPackage matching query does not exist.")
self.assertEqual(self._get_task_status_fail_message(status), "ModulestoreSource matching query does not exist.")
def test_migrate_from_modulestore_invalid_collection(self):
"""
@@ -324,7 +351,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": 999999, # Non-existent collection
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -338,12 +364,47 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(self._get_task_status_fail_message(status), "Collection matching query does not exist.")
def test_bulk_migrate_invalid_collection(self):
"""
Test bulk_migrate_from_modulestore with invalid collection
"""
source = ModulestoreSource.objects.create(
key=self.course.id,
)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [999999], # Non-existent collection
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(self._get_task_status_fail_message(status), "Collection matching query does not exist.")
def test_migration_task_calculate_total_steps(self):
"""
Test _MigrationTask.calculate_total_steps returns correct count
"""
total_steps = _MigrationTask.calculate_total_steps({})
expected_steps = len(list(MigrationStep))
expected_steps = len(list(MigrationStep)) - 1
self.assertEqual(total_steps, expected_steps)
def test_bulk_migration_task_calculate_total_steps(self):
"""
Test _BulkMigrationTask.calculate_total_steps returns correct count
"""
total_steps = _BulkMigrationTask.calculate_total_steps({
"sources_pks": [1, 2, 3, 4],
})
expected_steps = len(list(MigrationStep)) - 1 + 6 * 3
self.assertEqual(total_steps, expected_steps)
def test_migrate_component_success(self):
@@ -1247,7 +1308,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -1266,6 +1326,338 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
def test_bulk_migrate_success_courses(self):
"""
Test successful bulk migration from courses to library
"""
source_1 = ModulestoreSource.objects.create(key=self.course.id)
source_2 = ModulestoreSource.objects.create(key=self.course_2.id)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source_1.id, source_2.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [self.collection.id, self.collection2.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source_1.id, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
migration_2 = ModulestoreMigration.objects.get(
source=source_2.id, target=self.learning_package
)
self.assertEqual(migration_2.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration_2.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
def test_migrate_from_modulestore_success_legacy_library(self):
"""
Test successful migration from legacy library to V2 library
"""
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
def test_bulk_migrate_success_legacy_libraries(self):
"""
Test successful bulk migration from legacy libraries to V2 library
"""
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
source_2 = ModulestoreSource.objects.create(key=self.legacy_library_2.location.library_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id, source_2.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [self.collection.id, self.collection2.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
migration_2 = ModulestoreMigration.objects.get(
source=source_2, target=self.learning_package
)
self.assertEqual(migration_2.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration_2.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
def test_bulk_migrate_create_collections(self):
"""
Test successful bulk migration from legacy libraries to V2 library with create collections
"""
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
source_2 = ModulestoreSource.objects.create(key=self.legacy_library_2.location.library_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id, source_2.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [],
"create_collections": True,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
self.assertEqual(migration.target_collection.title, self.legacy_library.display_name)
migration_2 = ModulestoreMigration.objects.get(
source=source_2, target=self.learning_package
)
self.assertEqual(migration_2.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration_2.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
self.assertEqual(migration_2.target_collection.title, self.legacy_library_2.display_name)
@ddt.data(
RepeatHandlingStrategy.Skip,
RepeatHandlingStrategy.Update,
)
def test_bulk_migrate_use_previous_collection_on_skip_and_update(self, repeat_handling_strategy):
"""
Test successful bulk migration from legacy libraries to V2 library using previous collection
"""
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [],
"create_collections": True,
"repeat_handling_strategy": repeat_handling_strategy.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, repeat_handling_strategy.value)
self.assertEqual(migration.target_collection.title, self.legacy_library.display_name)
# Migrate again and check that the migration uses the previos collection
previous_collection = migration.target_collection
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [],
"create_collections": True,
"repeat_handling_strategy": repeat_handling_strategy.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migrations = ModulestoreMigration.objects.filter(
source=source, target=self.learning_package
)
for migration in migrations:
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, repeat_handling_strategy.value)
self.assertEqual(migration.target_collection.title, self.legacy_library.display_name)
self.assertEqual(migration.target_collection.id, previous_collection.id)
@ddt.data(
RepeatHandlingStrategy.Skip,
RepeatHandlingStrategy.Update,
)
def test_bulk_migrate_create_collection_in_different_learning_packages(self, repeat_handling_strategy):
"""
Test successful bulk migration from legacy libraries to different V2 libraries
"""
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [],
"create_collections": True,
"repeat_handling_strategy": repeat_handling_strategy.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, repeat_handling_strategy.value)
self.assertEqual(migration.target_collection.title, self.legacy_library.display_name)
# Migrate again in other V2 library, verify that the collections are different
previous_collection = migration.target_collection
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key_2),
"target_collection_pks": [],
"create_collections": True,
"repeat_handling_strategy": repeat_handling_strategy.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, repeat_handling_strategy.value)
self.assertEqual(migration.target_collection.title, self.legacy_library.display_name)
self.assertEqual(migration.target_collection.id, previous_collection.id)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package_2
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, repeat_handling_strategy.value)
self.assertEqual(migration.target_collection.title, self.legacy_library.display_name)
self.assertNotEqual(migration.target_collection.id, previous_collection.id)
def test_bulk_migrate_create_a_new_collection_on_fork(self):
"""
Test successful bulk migration from legacy libraries to V2 library using previous collection
"""
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [],
"create_collections": True,
"repeat_handling_strategy": RepeatHandlingStrategy.Fork.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, RepeatHandlingStrategy.Fork.value)
self.assertEqual(migration.target_collection.title, self.legacy_library.display_name)
previous_collection = migration.target_collection
# Migrate again and check that it creates a new collection
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [],
"create_collections": True,
"repeat_handling_strategy": RepeatHandlingStrategy.Fork.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migrations = ModulestoreMigration.objects.filter(
source=source, target=self.learning_package
)
# First migration
self.assertEqual(migrations[0].composition_level, CompositionLevel.Unit.value)
self.assertEqual(migrations[0].repeat_handling_strategy, RepeatHandlingStrategy.Fork.value)
self.assertEqual(migrations[0].target_collection.title, self.legacy_library.display_name)
self.assertEqual(migrations[0].target_collection.id, previous_collection.id)
# Second migration
self.assertEqual(migrations[1].composition_level, CompositionLevel.Unit.value)
self.assertEqual(migrations[1].repeat_handling_strategy, RepeatHandlingStrategy.Fork.value)
self.assertEqual(migrations[1].target_collection.title, f"{self.legacy_library.display_name}_1")
self.assertNotEqual(migrations[1].target_collection.id, previous_collection.id)
def test_migrate_from_modulestore_library_validation_failure(self):
"""
Test migration from legacy library fails when modulestore content doesn't exist
@@ -1278,7 +1670,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": None,
"repeat_handling_strategy": RepeatHandlingStrategy.Update.value,
@@ -1309,7 +1700,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -1326,6 +1716,33 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
f"Not a valid source context key: {invalid_key}. Source key must reference a course or a legacy library."
)
def test_bulk_migrate_invalid_source_key_type(self):
"""
Test bulk migration with invalid source key type
"""
invalid_key = LibraryLocatorV2.from_string("lib:testorg:invalid")
source = ModulestoreSource.objects.create(key=invalid_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [self.collection.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(
self._get_task_status_fail_message(status),
f"Not a valid source context key: {invalid_key}. Source key must reference a course or a legacy library."
)
def test_migrate_from_modulestore_nonexistent_modulestore_item(self):
"""
Test migration when modulestore item doesn't exist
@@ -1339,7 +1756,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -1357,6 +1773,36 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
"from ModuleStore: course-v1:NonExistent+Course+Run+branch@draft-branch"
)
def test_bulk_migrate_nonexistent_modulestore_item(self):
"""
Test bulk migration when modulestore item doesn't exist
"""
nonexistent_course_key = CourseKey.from_string(
"course-v1:NonExistent+Course+Run"
)
source = ModulestoreSource.objects.create(key=nonexistent_course_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [self.collection.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(
self._get_task_status_fail_message(status),
"Failed to load source item 'block-v1:NonExistent+Course+Run+type@course+block@course' "
"from ModuleStore: course-v1:NonExistent+Course+Run+branch@draft-branch"
)
def test_migrate_from_modulestore_task_status_progression(self):
"""Test that task status progresses through expected steps"""
source = ModulestoreSource.objects.create(key=self.course.id)
@@ -1365,7 +1811,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -1396,7 +1841,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -1410,7 +1854,6 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
kwargs={
"user_id": other_user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
@@ -1428,3 +1871,45 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
# The first task should not be cancelled since it's from a different user
self.assertNotEqual(status1.state, UserTaskStatus.CANCELED)
def test_bulk_migrate_multiple_users_no_interference(self):
"""
Test that migrations by different users don't interfere with each other
"""
source = ModulestoreSource.objects.create(key=self.course.id)
other_user = UserFactory()
task1 = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [self.collection.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
task2 = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": other_user.id,
"sources_pks": [source.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [self.collection.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status1 = UserTaskStatus.objects.get(task_id=task1.id)
status2 = UserTaskStatus.objects.get(task_id=task2.id)
self.assertEqual(status1.user, self.user)
self.assertEqual(status2.user, other_user)
# The first task should not be cancelled since it's from a different user
self.assertNotEqual(status1.state, UserTaskStatus.CANCELED)