feat!: modulestore_migrator (#36873)

This introduces the modulestore_migrator app, which can be
used to copy content (courses and libraries) from modulestore
into Learning Core. It is currently aimed to work on the legacy
library -> v2 library migration, but it will be used in the future
for course->library and course->course migrations.

This includes an initial REST API, Django admin interface,
and Python API.

Closes: https://github.com/openedx/edx-platform/issues/37211

Requires some follow-up work before this is production-ready:
https://github.com/openedx/edx-platform/issues/37259

Co-authored-by: Andrii <andrii.hantkovskyi@raccoongang.com>
Co-authored-by: Maksim Sokolskiy <maksim.sokolskiy@raccoongang.com>
This commit is contained in:
Kyle McCormick
2025-09-24 11:02:05 -04:00
committed by GitHub
parent 5d9fc2442f
commit 7275ce1634
27 changed files with 3324 additions and 1 deletions

View File

@@ -238,6 +238,7 @@
"cms/djangoapps/cms_user_tasks/",
"cms/djangoapps/course_creators/",
"cms/djangoapps/export_course_metadata/",
"cms/djangoapps/modulestore_migrator/",
"cms/djangoapps/maintenance/",
"cms/djangoapps/models/",
"cms/djangoapps/pipeline_js/",

View File

@@ -0,0 +1,192 @@
"""
A nice little admin interface for migrating courses and libraries from modulstore to Learning Core.
"""
import logging
from django import forms
from django.contrib import admin, messages
from django.contrib.admin.helpers import ActionForm
from django.db import models
from opaque_keys import InvalidKeyError
from opaque_keys.edx.locator import LibraryCollectionLocator, LibraryLocatorV2
from user_tasks.models import UserTaskStatus
from openedx.core.types.http import AuthenticatedHttpRequest
from . import api
from .data import CompositionLevel, RepeatHandlingStrategy
from .models import ModulestoreSource, ModulestoreMigration, ModulestoreBlockSource, ModulestoreBlockMigration
log = logging.getLogger(__name__)
class StartMigrationTaskForm(ActionForm):
"""
Params for start_migration_task admin adtion, displayed next the "Go" button.
"""
target_key = forms.CharField(label="Target library or collection key →", required=False)
repeat_handling_strategy = forms.ChoiceField(
label="How to handle existing content? →",
choices=RepeatHandlingStrategy.supported_choices,
required=False,
)
preserve_url_slugs = forms.BooleanField(label="Preserve current slugs? →", required=False, initial=True)
forward_to_target = forms.BooleanField(label="Forward references? →", required=False)
composition_level = forms.ChoiceField(
label="Aggregate up to →", choices=CompositionLevel.supported_choices, required=False
)
def task_status_details(obj: ModulestoreMigration) -> str:
"""
Return the state and, if available, details of the status of the migration.
"""
details: str | None = None
if obj.task_status.state == UserTaskStatus.FAILED:
# Calling fail(msg) from a task should automatically generates an "Error" artifact with that msg.
# https://django-user-tasks.readthedocs.io/en/latest/user_tasks.html#user_tasks.models.UserTaskStatus.fail
if error_artifacts := obj.task_status.artifacts.filter(name="Error"):
if error_text := error_artifacts.order_by("-created").first().text:
details = error_text
elif obj.task_status.state == UserTaskStatus.SUCCEEDED:
details = f"Migrated {obj.block_migrations.count()} blocks"
return f"{obj.task_status.state}: {details}" if details else obj.task_status.state
migration_admin_fields = (
"target",
"target_collection",
"task_status",
# The next line works, but django-stubs incorrectly thinks that these should all be strings,
# so we will need to use type:ignore below.
task_status_details,
"composition_level",
"repeat_handling_strategy",
"preserve_url_slugs",
"change_log",
"staged_content",
)
class ModulestoreMigrationInline(admin.TabularInline):
"""
Readonly table within the ModulestoreSource page; each row is a Migration from this Source.
"""
model = ModulestoreMigration
fk_name = "source"
show_change_link = True
readonly_fields = migration_admin_fields # type: ignore[assignment]
ordering = ("-task_status__created",)
def has_add_permission(self, _request, _obj):
return False
class ModulestoreBlockSourceInline(admin.TabularInline):
"""
Readonly table within the ModulestoreSource page; each row is a BlockSource.
"""
model = ModulestoreBlockSource
fk_name = "overall_source"
readonly_fields = (
"key",
"forwarded"
)
def has_add_permission(self, _request, _obj):
return False
@admin.register(ModulestoreSource)
class ModulestoreSourceAdmin(admin.ModelAdmin):
"""
Admin interface for source legacy libraries and courses.
"""
readonly_fields = ("forwarded",)
list_display = ("id", "key", "forwarded")
actions = ["start_migration_task"]
action_form = StartMigrationTaskForm
inlines = [ModulestoreMigrationInline, ModulestoreBlockSourceInline]
@admin.action(description="Start migration for selected sources")
def start_migration_task(
self,
request: AuthenticatedHttpRequest,
queryset: models.QuerySet[ModulestoreSource],
) -> None:
"""
Start a migration for each selected source
"""
form = StartMigrationTaskForm(request.POST)
form.is_valid()
target_key_string = form.cleaned_data['target_key']
if not target_key_string:
messages.add_message(request, messages.ERROR, "Target key is required")
return
try:
target_library_key = LibraryLocatorV2.from_string(target_key_string)
target_collection_slug = None
except InvalidKeyError:
try:
target_collection_key = LibraryCollectionLocator.from_string(target_key_string)
target_library_key = target_collection_key.lib_key
target_collection_slug = target_collection_key.collection_id
except InvalidKeyError:
messages.add_message(request, messages.ERROR, f"Invalid target key: {target_key_string}")
return
started = 0
total = 0
for source in queryset:
total += 1
try:
api.start_migration_to_library(
user=request.user,
source_key=source.key,
target_library_key=target_library_key,
target_collection_slug=target_collection_slug,
composition_level=form.cleaned_data['composition_level'],
repeat_handling_strategy=form.cleaned_data['repeat_handling_strategy'],
preserve_url_slugs=form.cleaned_data['preserve_url_slugs'],
forward_source_to_target=form.cleaned_data['forward_to_target'],
)
except Exception as exc: # pylint: disable=broad-except
message = f"Failed to start migration {source.key} -> {target_key_string}"
messages.add_message(request, messages.ERROR, f"{message}: {exc}")
log.exception(message)
continue
started += 1
click_in = "Click into the source objects to see migration details."
if not started:
messages.add_message(request, messages.WARNING, f"Failed to start {total} migration(s).")
if started < total:
messages.add_message(request, messages.WARNING, f"Started {started} of {total} migration(s). {click_in}")
else:
messages.add_message(request, messages.INFO, f"Started {started} migration(s). {click_in}")
class ModulestoreBlockMigrationInline(admin.TabularInline):
"""
Readonly table witin the Migration admin; each row is a block
"""
model = ModulestoreBlockMigration
fk_name = "overall_migration"
readonly_fields = (
"source",
"target",
"change_log_record",
)
list_display = ("id", *readonly_fields)
@admin.register(ModulestoreMigration)
class ModulestoreMigrationAdmin(admin.ModelAdmin):
"""
Readonly admin page for viewing Migrations
"""
readonly_fields = ("source", *migration_admin_fields) # type: ignore[assignment]
list_display = ("id", "source", *migration_admin_fields) # type: ignore[assignment]
inlines = [ModulestoreBlockMigrationInline]

View File

@@ -0,0 +1,58 @@
"""
API for migration from modulestore to learning core
"""
from opaque_keys.edx.locator import LibraryLocatorV2
from opaque_keys.edx.keys import LearningContextKey
from openedx_learning.api.authoring import get_collection
from celery.result import AsyncResult
from openedx.core.djangoapps.content_libraries.api import get_library
from openedx.core.types.user import AuthUser
from . import tasks
from .data import RepeatHandlingStrategy
from .models import ModulestoreSource
__all__ = (
"start_migration_to_library",
)
def start_migration_to_library(
*,
user: AuthUser,
source_key: LearningContextKey,
target_library_key: LibraryLocatorV2,
target_collection_slug: str | None = None,
composition_level: str,
repeat_handling_strategy: str,
preserve_url_slugs: bool,
forward_source_to_target: bool,
) -> AsyncResult:
"""
Import a course or legacy library into a V2 library (or, a collection within a V2 library).
"""
# Can raise NotImplementedError for the Fork strategy
assert RepeatHandlingStrategy(repeat_handling_strategy).is_implemented()
source, _ = ModulestoreSource.objects.get_or_create(key=source_key)
target_library = get_library(target_library_key)
# get_library ensures that the library is connected to a learning package.
target_package_id: int = target_library.learning_package_id # type: ignore[assignment]
target_collection_id = None
if target_collection_slug:
target_collection_id = get_collection(target_package_id, target_collection_slug).id
return tasks.migrate_from_modulestore.delay(
user_id=user.id,
source_pk=source.id,
target_package_pk=target_package_id,
target_library_key=str(target_library_key),
target_collection_pk=target_collection_id,
composition_level=composition_level,
repeat_handling_strategy=repeat_handling_strategy,
preserve_url_slugs=preserve_url_slugs,
forward_source_to_target=forward_source_to_target,
)

View File

@@ -0,0 +1,13 @@
"""
App configurations
"""
from django.apps import AppConfig
class ModulestoreMigratorConfig(AppConfig):
"""
App for importing legacy content from the modulestore.
"""
name = 'cms.djangoapps.modulestore_migrator'

View File

@@ -0,0 +1,6 @@
"""
Constants
"""
CONTENT_STAGING_PURPOSE_PREFIX = "modulestore_migrator"
CONTENT_STAGING_PURPOSE_TEMPLATE = CONTENT_STAGING_PURPOSE_PREFIX + "({source_key})"

View File

@@ -0,0 +1,81 @@
"""
Value objects
"""
from __future__ import annotations
from enum import Enum
from openedx.core.djangoapps.content_libraries.api import ContainerType
class CompositionLevel(Enum):
"""
Enumeration of composition levels for legacy content.
Defined in increasing order of complexity so that `is_higher_than` works correctly.
"""
# Components are individual XBlocks, e.g. Problem
Component = 'component'
# Container types currently supported by Content Libraries
Unit = ContainerType.Unit.value
Subsection = ContainerType.Subsection.value
Section = ContainerType.Section.value
@property
def is_container(self) -> bool:
return self is not self.Component
def is_higher_than(self, other: 'CompositionLevel') -> bool:
"""
Is this composition level 'above' (more complex than) the other?
"""
levels: list[CompositionLevel] = list(self.__class__)
return levels.index(self) > levels.index(other)
@classmethod
def supported_choices(cls) -> list[tuple[str, str]]:
"""
Returns all supported composition levels as a list of tuples,
for use in a Django Models ChoiceField.
"""
return [
(composition_level.value, composition_level.name)
for composition_level in cls
]
class RepeatHandlingStrategy(Enum):
"""
Enumeration of repeat handling strategies for imported content.
"""
Skip = 'skip'
Fork = 'fork'
Update = 'update'
@classmethod
def supported_choices(cls) -> list[tuple[str, str]]:
"""
Returns all supported repeat handling strategies as a list of tuples,
for use in a Django Models ChoiceField.
"""
return [
(strategy.value, strategy.name)
for strategy in cls
]
@classmethod
def default(cls) -> RepeatHandlingStrategy:
"""
Returns the default repeat handling strategy.
"""
return cls.Skip
def is_implemented(self) -> bool:
"""
Returns True if the repeat handling strategy is implemented.
"""
if self == self.Fork:
raise NotImplementedError("Forking is not implemented yet.")
return True

View File

@@ -0,0 +1,108 @@
# Generated by Django 4.2.24 on 2025-09-10 15:14
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
import model_utils.fields
import opaque_keys.edx.django.models
class Migration(migrations.Migration):
initial = True
dependencies = [
('content_staging', '0006_alter_userclipboard_source_usage_key'),
('oel_collections', '0005_alter_collection_options_alter_collection_enabled'),
('oel_publishing', '0008_alter_draftchangelogrecord_options_and_more'),
('user_tasks', '0004_url_textfield'),
]
operations = [
migrations.CreateModel(
name='ModulestoreBlockMigration',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')),
('change_log_record', models.OneToOneField(null=True, on_delete=django.db.models.deletion.SET_NULL, to='oel_publishing.draftchangelogrecord')),
],
),
migrations.CreateModel(
name='ModulestoreMigration',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('source_version', models.CharField(blank=True, help_text='Migrated content version, the hash of published content version', max_length=255, null=True)),
('composition_level', models.CharField(choices=[('component', 'Component'), ('unit', 'Unit'), ('subsection', 'Subsection'), ('section', 'Section')], default='component', help_text='Maximum hierachy level at which content should be aggregated in target library', max_length=255)),
('repeat_handling_strategy', models.CharField(choices=[('skip', 'Skip'), ('fork', 'Fork'), ('update', 'Update')], default='skip', help_text='If a piece of content already exists in the content library, choose how to handle it.', max_length=24)),
('preserve_url_slugs', models.BooleanField(default=False, help_text='Should the migration preserve the location IDs of the existing blocks?If not, then new, unique human-readable IDs will be generated based on the block titles.')),
('change_log', models.ForeignKey(help_text='Changelog entry in the target learning package which records this migration', null=True, on_delete=django.db.models.deletion.SET_NULL, to='oel_publishing.draftchangelog')),
],
),
migrations.CreateModel(
name='ModulestoreSource',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('key', opaque_keys.edx.django.models.LearningContextKeyField(help_text='Key of the content source (a course or a legacy library)', max_length=255, unique=True)),
('forwarded', models.OneToOneField(blank=True, help_text='If set, the system will forward references of this source over to the target of this migration', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='forwards', to='modulestore_migrator.modulestoremigration')),
],
),
migrations.AddField(
model_name='modulestoremigration',
name='source',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='migrations', to='modulestore_migrator.modulestoresource'),
),
migrations.AddField(
model_name='modulestoremigration',
name='staged_content',
field=models.OneToOneField(help_text='Modulestore content is processed and staged before importing it to a learning packge. We temporarily save the staged content to allow for troubleshooting of failed migrations.', null=True, on_delete=django.db.models.deletion.SET_NULL, to='content_staging.stagedcontent'),
),
migrations.AddField(
model_name='modulestoremigration',
name='target',
field=models.ForeignKey(help_text='Content will be imported into this library', on_delete=django.db.models.deletion.CASCADE, to='oel_publishing.learningpackage'),
),
migrations.AddField(
model_name='modulestoremigration',
name='target_collection',
field=models.ForeignKey(blank=True, help_text='Optional - Collection (within the target library) into which imported content will be grouped', null=True, on_delete=django.db.models.deletion.SET_NULL, to='oel_collections.collection'),
),
migrations.AddField(
model_name='modulestoremigration',
name='task_status',
field=models.OneToOneField(help_text='Tracks the status of the task which is executing this migration', on_delete=django.db.models.deletion.RESTRICT, to='user_tasks.usertaskstatus'),
),
migrations.CreateModel(
name='ModulestoreBlockSource',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')),
('key', opaque_keys.edx.django.models.UsageKeyField(help_text='Original usage key of the XBlock that has been imported.', max_length=255)),
('forwarded', models.OneToOneField(help_text='If set, the system will forward references of this block source over to the target of this block migration', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='forwards', to='modulestore_migrator.modulestoreblockmigration')),
('overall_source', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='blocks', to='modulestore_migrator.modulestoresource')),
],
options={
'abstract': False,
},
),
migrations.AddField(
model_name='modulestoreblockmigration',
name='overall_migration',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='block_migrations', to='modulestore_migrator.modulestoremigration'),
),
migrations.AddField(
model_name='modulestoreblockmigration',
name='source',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='modulestore_migrator.modulestoreblocksource'),
),
migrations.AddField(
model_name='modulestoreblockmigration',
name='target',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='oel_publishing.publishableentity'),
),
migrations.AlterUniqueTogether(
name='modulestoreblockmigration',
unique_together={('overall_migration', 'source'), ('overall_migration', 'target')},
),
]

View File

@@ -0,0 +1,224 @@
"""
Models for the modulestore migration tool.
"""
from __future__ import annotations
from django.contrib.auth import get_user_model
from django.db import models
from django.utils.translation import gettext_lazy as _
from user_tasks.models import UserTaskStatus
from model_utils.models import TimeStampedModel
from opaque_keys.edx.django.models import (
LearningContextKeyField,
UsageKeyField,
)
from openedx_learning.api.authoring_models import (
LearningPackage, PublishableEntity, Collection, DraftChangeLog, DraftChangeLogRecord
)
from .data import CompositionLevel, RepeatHandlingStrategy
User = get_user_model()
class ModulestoreSource(models.Model):
"""
A legacy learning context (course or library) which can be a source of a migration.
"""
key = LearningContextKeyField(
max_length=255,
unique=True,
help_text=_('Key of the content source (a course or a legacy library)'),
)
forwarded = models.OneToOneField(
'modulestore_migrator.ModulestoreMigration',
null=True,
blank=True,
on_delete=models.SET_NULL,
help_text=_('If set, the system will forward references of this source over to the target of this migration'),
related_name="forwards",
)
def __str__(self):
return f"{self.__class__.__name__}('{self.key}')"
__repr__ = __str__
class ModulestoreMigration(models.Model):
"""
Tracks the action of a user importing a Modulestore-based course or legacy library into a
learning-core based learning package
Notes:
* As of Ulmo, a learning package is always associated with a v2 content library, but we
will not bake that assumption into this model)
* Each Migration is tied to a single UserTaskStatus, which connects it to a user and
contains the progress of the import.
* A single ModulestoreSource may very well have multiple ModulestoreMigrations; however,
at most one of them with be the "authoritative" migration, as indicated by `forwarded`.
"""
## MIGRATION SPECIFICATION
source = models.ForeignKey(
ModulestoreSource,
on_delete=models.CASCADE,
related_name="migrations",
)
source_version = models.CharField(
max_length=255,
blank=True,
null=True,
help_text=_('Migrated content version, the hash of published content version'),
)
composition_level = models.CharField(
max_length=255,
choices=CompositionLevel.supported_choices(),
default=CompositionLevel.Component.value,
help_text=_('Maximum hierachy level at which content should be aggregated in target library'),
)
repeat_handling_strategy = models.CharField(
choices=RepeatHandlingStrategy.supported_choices(),
default=RepeatHandlingStrategy.default().value,
max_length=24,
help_text=_(
"If a piece of content already exists in the content library, choose how to handle it."
),
)
preserve_url_slugs = models.BooleanField(
default=False,
help_text=_(
"Should the migration preserve the location IDs of the existing blocks?"
"If not, then new, unique human-readable IDs will be generated based on the block titles."
),
)
target = models.ForeignKey(
LearningPackage,
on_delete=models.CASCADE,
help_text=_('Content will be imported into this library'),
)
target_collection = models.ForeignKey(
Collection,
on_delete=models.SET_NULL,
null=True,
blank=True,
help_text=_('Optional - Collection (within the target library) into which imported content will be grouped'),
)
## MIGRATION ARTIFACTS
task_status = models.OneToOneField(
UserTaskStatus,
on_delete=models.RESTRICT,
help_text=_("Tracks the status of the task which is executing this migration"),
)
change_log = models.ForeignKey(
DraftChangeLog,
on_delete=models.SET_NULL,
null=True,
help_text=_("Changelog entry in the target learning package which records this migration"),
)
staged_content = models.OneToOneField(
"content_staging.StagedContent",
null=True,
on_delete=models.SET_NULL, # Staged content is liable to be deleted in order to save space
help_text=_(
"Modulestore content is processed and staged before importing it to a learning packge. "
"We temporarily save the staged content to allow for troubleshooting of failed migrations."
)
)
def __str__(self):
return (
f"{self.__class__.__name__} #{self.pk}: "
f"{self.source.key}{self.target_collection or self.target}"
)
def __repr__(self):
return (
f"{self.__class__.__name__}("
f"id={self.id}, source='{self.source}',"
f"target='{self.target_collection or self.target}')"
)
class ModulestoreBlockSource(TimeStampedModel):
"""
A legacy block usage (in a course or library) which can be a source of a block migration.
"""
overall_source = models.ForeignKey(
ModulestoreSource,
on_delete=models.CASCADE,
related_name="blocks",
)
key = UsageKeyField(
max_length=255,
help_text=_('Original usage key of the XBlock that has been imported.'),
)
forwarded = models.OneToOneField(
'modulestore_migrator.ModulestoreBlockMigration',
null=True,
on_delete=models.SET_NULL,
help_text=_(
'If set, the system will forward references of this block source over to the target of this block migration'
),
related_name="forwards",
)
unique_together = [("overall_source", "key")]
def __str__(self):
return f"{self.__class__.__name__}('{self.key}')"
__repr__ = __str__
class ModulestoreBlockMigration(TimeStampedModel):
"""
The migration of a single legacy block into a learning package.
Is always tied to a greater overall ModulestoreMigration.
Note:
* A single ModulestoreBlockSource may very well have multiple ModulestoreBlockMigrations; however,
at most one of them with be the "authoritative" migration, as indicated by `forwarded`.
This will coincide with the `overall_migration` being pointed to by `forwarded` as well.
"""
overall_migration = models.ForeignKey(
ModulestoreMigration,
on_delete=models.CASCADE,
related_name="block_migrations",
)
source = models.ForeignKey(
ModulestoreBlockSource,
on_delete=models.CASCADE,
)
target = models.ForeignKey(
PublishableEntity,
on_delete=models.CASCADE,
)
change_log_record = models.OneToOneField(
DraftChangeLogRecord,
# a changelog record can be pruned, which would set this to NULL, but not delete the
# entire import record
null=True,
on_delete=models.SET_NULL,
)
class Meta:
unique_together = [
('overall_migration', 'source'),
('overall_migration', 'target'),
]
def __str__(self):
return (
f"{self.__class__.__name__} #{self.pk}: "
f"{self.source.key}{self.target}"
)
def __repr__(self):
return (
f"{self.__class__.__name__}("
f"id={self.id}, source='{self.source}',"
f"target='{self.target}')"
)

View File

@@ -0,0 +1,13 @@
"""
Course to Library Import API URLs.
"""
from django.urls import include, path
from .v1 import urls as v1_urls
app_name = 'modulestore_migrator'
urlpatterns = [
path('v1/', include(v1_urls)),
]

View File

@@ -0,0 +1,126 @@
"""
Serializers for the Course to Library Import API.
"""
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import LearningContextKey
from opaque_keys.edx.locator import LibraryLocatorV2
from rest_framework import serializers
from user_tasks.serializers import StatusSerializer
from cms.djangoapps.modulestore_migrator.data import CompositionLevel, RepeatHandlingStrategy
from cms.djangoapps.modulestore_migrator.models import ModulestoreMigration
class ModulestoreMigrationSerializer(serializers.ModelSerializer):
"""
Serializer for the course to library import creation API.
"""
source = serializers.CharField( # type: ignore[assignment]
help_text="The source course or legacy library key to import from.",
required=True,
)
target = serializers.CharField(
help_text="The target library key to import into.",
required=True,
)
composition_level = serializers.ChoiceField(
help_text="The composition level to import the content at.",
choices=CompositionLevel.supported_choices(),
required=False,
default=CompositionLevel.Component.value,
)
repeat_handling_strategy = serializers.ChoiceField(
help_text="If a piece of content already exists in the content library, choose how to handle it.",
choices=RepeatHandlingStrategy.supported_choices(),
required=False,
default=RepeatHandlingStrategy.Skip.value,
)
preserve_url_slugs = serializers.BooleanField(
help_text="If true, current slugs will be preserved.",
required=False,
default=True,
)
target_collection_slug = serializers.CharField(
help_text="The target collection slug within the library to import into. Optional.",
required=False,
allow_blank=True,
)
forward_source_to_target = serializers.BooleanField(
help_text="Forward references of this block source over to the target of this block migration.",
required=False,
default=False,
)
class Meta:
model = ModulestoreMigration
fields = [
'source',
'target',
'target_collection_slug',
'composition_level',
'repeat_handling_strategy',
'preserve_url_slugs',
'forward_source_to_target',
]
def get_fields(self):
fields = super().get_fields()
request = self.context.get('request')
if request and request.method != 'POST':
fields.pop('target', None)
fields.pop('target_collection_slug', None)
return fields
def validate_source(self, value):
"""
Validate the source key format.
"""
try:
return LearningContextKey.from_string(value)
except InvalidKeyError as exc:
raise serializers.ValidationError(f"Invalid source key: {str(exc)}") from exc
def validate_target(self, value):
"""
Validate the target library key format.
"""
try:
return LibraryLocatorV2.from_string(value)
except InvalidKeyError as exc:
raise serializers.ValidationError(f"Invalid target library key: {str(exc)}") from exc
def get_forward_source_to_target(self, obj: ModulestoreMigration):
"""
Check if the source block was forwarded to the target.
"""
return obj.id == obj.source.forwarded_id
def to_representation(self, instance):
"""
Override to customize the serialized representation."""
data = super().to_representation(instance)
# Custom logic for forward_source_to_target during serialization
data['forward_source_to_target'] = self.get_forward_source_to_target(instance)
return data
class StatusWithModulestoreMigrationSerializer(StatusSerializer):
"""
Serializer for the import task status.
"""
parameters = ModulestoreMigrationSerializer(source='modulestoremigration')
class Meta:
model = StatusSerializer.Meta.model
fields = [*StatusSerializer.Meta.fields, 'uuid', 'parameters']
def get_fields(self):
"""
Remove unwanted fields
"""
fields = super().get_fields()
fields.pop('name', None)
return fields

View File

@@ -0,0 +1,11 @@
"""
Course to Library Import API v1 URLs.
"""
from rest_framework.routers import SimpleRouter
from .views import MigrationViewSet
ROUTER = SimpleRouter()
ROUTER.register(r'migrations', MigrationViewSet)
urlpatterns = ROUTER.urls

View File

@@ -0,0 +1,137 @@
"""
API v1 views.
"""
import logging
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from rest_framework import status
from user_tasks.models import UserTaskStatus
from user_tasks.views import StatusViewSet
from cms.djangoapps.modulestore_migrator.api import start_migration_to_library
from openedx.core.lib.api.authentication import BearerAuthenticationAllowInactiveUser
from .serializers import ModulestoreMigrationSerializer, StatusWithModulestoreMigrationSerializer
log = logging.getLogger(__name__)
class MigrationViewSet(StatusViewSet):
"""
Import course content from modulestore into a content library.
This viewset handles the import process, including creating the import task and
retrieving the status of the import task. Meant to be used by admin users only.
API Endpoints
------------
POST /api/modulestore_migrator/v1/migrations/
Start the import process.
Request body:
{
"source": "<source_course_key>",
"target": "<target_library>",
"composition_level": "<composition_level>", # Optional, defaults to "component"
"target_collection_slug": "<target_collection_slug>", # Optional
"repeat_handling_strategy": "<repeat_handling_strategy>" # Optional, defaults to Skip
"preserve_url_slugs": "<boolean>" # Optional, defaults to true
}
Example request:
{
"source": "course-v1:edX+DemoX+2014_T1",
"target": "library-v1:org1+lib_1",
"composition_level": "unit",
"repeat_handling_strategy": "update",
"preserve_url_slugs": true
}
Example response:
{
"state": "Succeeded",
"state_text": "Succeeded", # Translation into the current language of the current state
"completed_steps": 11,
"total_steps": 11,
"attempts": 1,
"created": "2025-05-14T22:24:37.048539Z",
"modified": "2025-05-14T22:24:59.128068Z",
"artifacts": [],
"uuid": "3de23e5d-fd34-4a6f-bf02-b183374120f0",
"parameters": {
"source": "course-v1:OpenedX+DemoX+DemoCourse",
"composition_level": "unit",
"repeat_handling_strategy": "update",
"preserve_url_slugs": true
}
}
GET /api/modulestore_migrator/v1/migrations/<uuid>/
Get the status of the import task.
Example response:
{
"state": "Importing staged content structure",
"state_text": "Importing staged content structure",
"completed_steps": 6,
"total_steps": 11,
"attempts": 1,
"created": "2025-05-14T22:24:37.048539Z",
"modified": "2025-05-14T22:24:59.128068Z",
"artifacts": [],
"uuid": "3de23e5d-fd34-4a6f-bf02-b183374120f0",
"parameters": {
"source": "course-v1:OpenedX+DemoX+DemoCourse2",
"composition_level": "component",
"repeat_handling_strategy": "skip",
"preserve_url_slugs": false
}
}
"""
permission_classes = (IsAdminUser,)
authentication_classes = (
BearerAuthenticationAllowInactiveUser,
JwtAuthentication,
SessionAuthenticationAllowInactiveUser,
)
serializer_class = StatusWithModulestoreMigrationSerializer
def get_queryset(self):
"""
Override the default queryset to filter by the import event and user.
"""
return StatusViewSet.queryset.filter(modulestoremigration__isnull=False, user=self.request.user)
def create(self, request, *args, **kwargs):
"""
Handle the import task creation.
"""
serializer_data = ModulestoreMigrationSerializer(data=request.data)
serializer_data.is_valid(raise_exception=True)
validated_data = serializer_data.validated_data
try:
task = start_migration_to_library(
user=request.user,
source_key=validated_data['source'],
target_library_key=validated_data['target'],
target_collection_slug=validated_data['target_collection_slug'],
composition_level=validated_data['composition_level'],
repeat_handling_strategy=validated_data['repeat_handling_strategy'],
preserve_url_slugs=validated_data['preserve_url_slugs'],
forward_source_to_target=validated_data['forward_source_to_target'],
)
except NotImplementedError as e:
log.exception(str(e))
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
task_status = UserTaskStatus.objects.get(task_id=task.id)
serializer = self.get_serializer(task_status)
return Response(serializer.data, status=status.HTTP_201_CREATED)

View File

@@ -0,0 +1,750 @@
"""
Tasks for the modulestore_migrator
"""
from __future__ import annotations
import mimetypes
import os
import typing as t
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.exceptions import ObjectDoesNotExist
from django.utils.text import slugify
from edx_django_utils.monitoring import set_code_owner_attribute_from_module
from lxml import etree
from lxml.etree import _ElementTree as XmlTree
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import (
CourseLocator, LibraryLocator,
LibraryLocatorV2, LibraryUsageLocatorV2, LibraryContainerLocator
)
from openedx_learning.api import authoring as authoring_api
from openedx_learning.api.authoring_models import (
Collection,
Component,
ComponentType,
LearningPackage,
PublishableEntity,
PublishableEntityVersion,
)
from user_tasks.tasks import UserTask, UserTaskStatus
from openedx.core.djangoapps.content_libraries.api import ContainerType, get_library
from openedx.core.djangoapps.content_libraries import api as libraries_api
from openedx.core.djangoapps.content_staging import api as staging_api
from xmodule.modulestore import exceptions as modulestore_exceptions
from xmodule.modulestore.django import modulestore
from common.djangoapps.split_modulestore_django.models import SplitModulestoreCourseIndex
from .constants import CONTENT_STAGING_PURPOSE_TEMPLATE
from .data import CompositionLevel, RepeatHandlingStrategy
from .models import ModulestoreSource, ModulestoreMigration, ModulestoreBlockSource, ModulestoreBlockMigration
log = get_task_logger(__name__)
class MigrationStep(Enum):
"""
Strings representation the state of an in-progress modulestore-to-learning-core import.
We use these values to set UserTaskStatus.state.
The other possible UserTaskStatus.state values are the built-in ones:
UserTaskStatus.{PENDING,FAILED,CANCELED,SUCCEEDED}.
"""
VALIDATING_INPUT = 'Validating migration parameters'
CANCELLING_OLD = 'Cancelling any redundant migration tasks'
LOADING = 'Loading legacy content from ModulesStore'
STAGING = 'Staging legacy content for import'
PARSING = 'Parsing staged OLX'
IMPORTING_ASSETS = 'Importing staged files and resources'
IMPORTING_STRUCTURE = 'Importing staged content structure'
UNSTAGING = 'Cleaning staged content'
MAPPING_OLD_TO_NEW = 'Saving map of legacy content to migrated content'
FORWARDING = 'Forwarding legacy content to migrated content'
POPULATING_COLLECTION = 'Assigning imported items to the specified collection'
class _MigrationTask(UserTask):
"""
Base class for migrate_to_modulestore
"""
@staticmethod
def calculate_total_steps(arguments_dict):
"""
Get number of in-progress steps in importing process, as shown in the UI.
"""
return len(list(MigrationStep))
@dataclass(frozen=True)
class _MigrationContext:
"""
Context for the migration process.
"""
existing_source_to_target_keys: dict[ # Note: It's intended to be mutable to reflect changes during migration.
UsageKey, PublishableEntity
]
target_package_id: int
target_library_key: LibraryLocatorV2
source_context_key: CourseKey # Note: This includes legacy LibraryLocators, which are sneakily CourseKeys.
content_by_filename: dict[str, int]
composition_level: CompositionLevel
repeat_handling_strategy: RepeatHandlingStrategy
preserve_url_slugs: bool
created_by: int
created_at: datetime
def is_already_migrated(self, source_key: UsageKey) -> bool:
return source_key in self.existing_source_to_target_keys
def get_existing_target(self, source_key: UsageKey) -> PublishableEntity:
return self.existing_source_to_target_keys[source_key]
def add_migration(self, source_key: UsageKey, target: PublishableEntity) -> None:
"""Update the context with a new migration (keeps it current)"""
self.existing_source_to_target_keys[source_key] = target
def get_existing_target_entity_keys(self, base_key: str) -> set[str]:
return set(
publishable_entity.key for _, publishable_entity in
self.existing_source_to_target_keys.items()
if publishable_entity.key.startswith(base_key)
)
@property
def should_skip_strategy(self) -> bool:
"""
Determines whether the repeat handling strategy should skip the entity.
"""
return self.repeat_handling_strategy is RepeatHandlingStrategy.Skip
@property
def should_update_strategy(self) -> bool:
"""
Determines whether the repeat handling strategy should update the entity.
"""
return self.repeat_handling_strategy is RepeatHandlingStrategy.Update
@shared_task(base=_MigrationTask, bind=True)
# Note: The decorator @set_code_owner_attribute cannot be used here because the UserTaskMixin
# does stack inspection and can't handle additional decorators.
def migrate_from_modulestore(
self: _MigrationTask,
*,
user_id: int,
source_pk: int,
target_package_pk: int,
target_library_key: str,
target_collection_pk: int,
repeat_handling_strategy: str,
preserve_url_slugs: bool,
composition_level: str,
forward_source_to_target: bool,
) -> None:
"""
Import a course or legacy library into a learning package.
Currently, the target learning package must be associated with a V2 content library, but that
restriction may be loosened in the future as more types of learning packages are developed.
"""
# pylint: disable=too-many-statements
# This is a large function, but breaking it up futher would probably not
# make it any easier to understand.
set_code_owner_attribute_from_module(__name__)
status: UserTaskStatus = self.status
status.set_state(MigrationStep.VALIDATING_INPUT.value)
try:
source = ModulestoreSource.objects.get(pk=source_pk)
target_package = LearningPackage.objects.get(pk=target_package_pk)
target_library = get_library(LibraryLocatorV2.from_string(target_library_key))
target_collection = Collection.objects.get(pk=target_collection_pk) if target_collection_pk else None
except (ObjectDoesNotExist, InvalidKeyError) as exc:
status.fail(str(exc))
return
# The Model is used for Course and Legacy Library
course_index = SplitModulestoreCourseIndex.objects.filter(course_id=source.key).first()
if isinstance(source.key, CourseLocator):
source_root_usage_key = source.key.make_usage_key('course', 'course')
source_version = course_index.published_version if course_index else None
elif isinstance(source.key, LibraryLocator):
source_root_usage_key = source.key.make_usage_key('library', 'library')
source_version = course_index.library_version if course_index else None
else:
status.fail(
f"Not a valid source context key: {source.key}. "
"Source key must reference a course or a legacy library."
)
return
migration = ModulestoreMigration.objects.create(
source=source,
source_version=source_version,
composition_level=composition_level,
repeat_handling_strategy=repeat_handling_strategy,
preserve_url_slugs=preserve_url_slugs,
target=target_package,
target_collection=target_collection,
task_status=status,
)
status.increment_completed_steps()
status.set_state(MigrationStep.CANCELLING_OLD.value)
# In order to prevent a user from accidentally starting a bunch of identical import tasks...
migrations_to_cancel = ModulestoreMigration.objects.filter(
# get all Migration tasks by this user with the same source and target
task_status__user=status.user,
source=source,
target=target_package,
).select_related('task_status').exclude(
# (excluding that aren't running)
task_status__state__in=(UserTaskStatus.CANCELED, UserTaskStatus.FAILED, UserTaskStatus.SUCCEEDED)
).exclude(
# (excluding this migration itself)
id=migration.id
)
# ... and cancel their tasks and clean away their staged content.
for migration_to_cancel in migrations_to_cancel:
if migration_to_cancel.task_status:
migration_to_cancel.task_status.cancel()
if migration_to_cancel.staged_content:
migration_to_cancel.staged_content.delete()
status.increment_completed_steps()
status.set_state(MigrationStep.LOADING)
try:
legacy_root = modulestore().get_item(source_root_usage_key)
except modulestore_exceptions.ItemNotFoundError as exc:
status.fail(f"Failed to load source item '{source_root_usage_key}' from ModuleStore: {exc}")
return
if not legacy_root:
status.fail(f"Could not find source item '{source_root_usage_key}' in ModuleStore")
return
status.increment_completed_steps()
status.set_state(MigrationStep.STAGING.value)
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root,
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source.key),
)
migration.staged_content = staged_content
status.increment_completed_steps()
status.set_state(MigrationStep.PARSING.value)
parser = etree.XMLParser(strip_cdata=False)
try:
root_node = etree.fromstring(staged_content.olx, parser=parser)
except etree.ParseError as exc:
status.fail(f"Failed to parse source OLX (from staged content with id = {staged_content.id}): {exc}")
status.increment_completed_steps()
status.set_state(MigrationStep.IMPORTING_ASSETS.value)
content_by_filename: dict[str, int] = {}
now = datetime.now(tz=timezone.utc)
for staged_content_file_data in staging_api.get_staged_content_static_files(staged_content.id):
old_path = staged_content_file_data.filename
file_data = staging_api.get_staged_content_static_file_data(staged_content.id, old_path)
if not file_data:
log.error(
f"Staged content {staged_content.id} included referenced file {old_path}, "
"but no file data was found."
)
continue
filename = os.path.basename(old_path)
media_type_str = mimetypes.guess_type(filename)[0] or "application/octet-stream"
media_type = authoring_api.get_or_create_media_type(media_type_str)
content_by_filename[filename] = authoring_api.get_or_create_file_content(
migration.target_id,
media_type.id,
data=file_data,
created=now,
).id
status.increment_completed_steps()
status.set_state(MigrationStep.IMPORTING_STRUCTURE.value)
# "key" is locally unique across all PublishableEntities within
# a given LearningPackage.
# We use this mapping to ensure that we don't create duplicate
# PublishableEntities during the migration process for a given LearningPackage.
existing_source_to_target_keys = {
block.source.key: block.target for block in ModulestoreBlockMigration.objects.filter(
overall_migration__target=migration.target.id
)
}
migration_context = _MigrationContext(
existing_source_to_target_keys=existing_source_to_target_keys,
target_package_id=target_package_pk,
target_library_key=target_library.key,
source_context_key=source_root_usage_key.course_key,
content_by_filename=content_by_filename,
composition_level=CompositionLevel(composition_level),
repeat_handling_strategy=RepeatHandlingStrategy(repeat_handling_strategy),
preserve_url_slugs=preserve_url_slugs,
created_by=status.user_id,
created_at=datetime.now(timezone.utc),
)
with authoring_api.bulk_draft_changes_for(migration.target.id) as change_log:
root_migrated_node = _migrate_node(
context=migration_context,
source_node=root_node,
)
change_log.save()
migration.change_log = change_log
status.increment_completed_steps()
status.set_state(MigrationStep.UNSTAGING.value)
staged_content.delete()
status.increment_completed_steps()
_create_migration_artifacts_incrementally(
root_migrated_node=root_migrated_node,
source=source,
migration=migration,
status=status,
)
block_migrations = ModulestoreBlockMigration.objects.filter(overall_migration=migration)
status.increment_completed_steps()
status.set_state(MigrationStep.FORWARDING.value)
if forward_source_to_target:
block_sources_to_block_migrations = {
block_migration.source: block_migration for block_migration in block_migrations
}
for block_source, block_migration in block_sources_to_block_migrations.items():
block_source.forwarded = block_migration
block_source.save()
source.forwarded = migration
source.save()
status.increment_completed_steps()
status.set_state(MigrationStep.POPULATING_COLLECTION.value)
if target_collection:
block_target_pks: list[int] = list(
ModulestoreBlockMigration.objects.filter(
overall_migration=migration
).values_list('target_id', flat=True)
)
if block_target_pks:
authoring_api.add_to_collection(
learning_package_id=target_package_pk,
key=target_collection.key,
entities_qset=PublishableEntity.objects.filter(id__in=block_target_pks),
created_by=user_id,
)
else:
log.warning("No target entities found to add to collection")
status.increment_completed_steps()
@dataclass(frozen=True)
class _MigratedNode:
"""
A node in the source tree, its target (if migrated), and any migrated children.
Note that target_version can equal None even when there migrated children.
This happens, particularly, if the node is above the requested composition level
but has descendents which are at or below that level.
"""
source_to_target: tuple[UsageKey, PublishableEntityVersion] | None
children: list[_MigratedNode]
def all_source_to_target_pairs(self) -> t.Iterable[tuple[UsageKey, PublishableEntityVersion]]:
"""
Get all source_key->target_ver pairs via a pre-order traversal.
"""
if self.source_to_target:
yield self.source_to_target
for child in self.children:
yield from child.all_source_to_target_pairs()
def _migrate_node(
*,
context: _MigrationContext,
source_node: XmlTree,
) -> _MigratedNode:
"""
Migrate an OLX node (source_node) from a legacy course or library (context.source_context_key)
to a learning package (context.target_library). If the node is a container, create it in the
target if it is at or above the requested composition_level; otherwise, just import its contents.
Recursively apply the same logic to all children.
"""
# The OLX tag will map to one of the following...
# * A wiki tag --> Ignore
# * A recognized container type --> Migrate children, and import container if requested.
# * A legacy library root --> Migrate children, but NOT the root itself.
# * A course root --> Migrate children, but NOT the root itself (for Ulmo, at least. Future
# releases may support treating the Course as an importable container).
# * Something else --> Try to import it as a component. If that fails, then it's either an un-
# supported component type, or it's an XBlock with dynamic children, which we
# do not support in libraries as of Ulmo.
should_migrate_node: bool
should_migrate_children: bool
container_type: ContainerType | None # if None, it's a Component
if source_node.tag == "wiki":
return _MigratedNode(None, [])
try:
container_type = ContainerType.from_source_olx_tag(source_node.tag)
except ValueError:
container_type = None
if source_node.tag in {"course", "library"}:
should_migrate_node = False
should_migrate_children = True
else:
should_migrate_node = True
should_migrate_children = False
else:
node_level = CompositionLevel(container_type.value)
should_migrate_node = not node_level.is_higher_than(context.composition_level)
should_migrate_children = True
migrated_children: list[_MigratedNode] = []
if should_migrate_children:
migrated_children = [
_migrate_node(
context=context,
source_node=source_node_child,
)
for source_node_child in source_node.getchildren()
]
source_to_target: tuple[UsageKey, PublishableEntityVersion] | None = None
if should_migrate_node:
source_olx = etree.tostring(source_node).decode('utf-8')
if source_block_id := source_node.get('url_name'):
source_key: UsageKey = context.source_context_key.make_usage_key(source_node.tag, source_block_id)
title = source_node.get('display_name', source_block_id)
target_entity_version = (
_migrate_container(
context=context,
source_key=source_key,
container_type=container_type,
title=title,
children=[
migrated_child.source_to_target[1]
for migrated_child in migrated_children if
migrated_child.source_to_target
],
)
if container_type else
_migrate_component(
context=context,
source_key=source_key,
olx=source_olx,
title=title,
)
)
if target_entity_version:
source_to_target = (source_key, target_entity_version)
context.add_migration(source_key, target_entity_version.entity)
else:
log.warning(
f"Cannot migrate node from {context.source_context_key} to {context.target_library_key} "
f"because it lacks an url_name and thus has no identity: {source_olx}"
)
return _MigratedNode(source_to_target=source_to_target, children=migrated_children)
def _migrate_container(
*,
context: _MigrationContext,
source_key: UsageKey,
container_type: ContainerType,
title: str,
children: list[PublishableEntityVersion],
) -> PublishableEntityVersion:
"""
Create, update, or replace a container in a library based on a source key and children.
(We assume that the destination is a library rather than some other future kind of learning
package, but let's keep than an internal assumption.)
"""
target_key = _get_distinct_target_container_key(
context,
source_key,
container_type,
title,
)
try:
container = libraries_api.get_container(target_key)
container_exists = True
except libraries_api.ContentLibraryContainerNotFound:
container_exists = False
if PublishableEntity.objects.filter(
learning_package_id=context.target_package_id,
key=target_key.container_id,
).exists():
libraries_api.restore_container(container_key=target_key)
container = libraries_api.get_container(target_key)
else:
container = libraries_api.create_container(
library_key=context.target_library_key,
container_type=container_type,
slug=target_key.container_id,
title=title,
created=context.created_at,
user_id=context.created_by,
)
if container_exists and context.should_skip_strategy:
return PublishableEntityVersion.objects.get(
entity_id=container.container_pk,
version_num=container.draft_version_num,
)
return authoring_api.create_next_container_version(
container.container_pk,
title=title,
entity_rows=[
authoring_api.ContainerEntityRow(entity_pk=child.entity_id, version_pk=None)
for child in children
],
created=context.created_at,
created_by=context.created_by,
container_version_cls=container_type.container_model_classes[1],
).publishable_entity_version
def _migrate_component(
*,
context: _MigrationContext,
source_key: UsageKey,
olx: str,
title: str,
) -> PublishableEntityVersion | None:
"""
Create, update, or replace a component in a library based on a source key and OLX.
(We assume that the destination is a library rather than some other future kind of learning
package, but let's keep than an internal assumption.)
"""
component_type = authoring_api.get_or_create_component_type("xblock.v1", source_key.block_type)
target_key = _get_distinct_target_usage_key(
context,
source_key,
component_type,
title,
)
try:
component = authoring_api.get_components(context.target_package_id).get(
component_type=component_type,
local_key=target_key.block_id,
)
component_existed = True
# Do we have a specific method for this?
component_deleted = not component.versioning.draft
except Component.DoesNotExist:
component_existed = False
component_deleted = False
try:
libraries_api.validate_can_add_block_to_library(
context.target_library_key, target_key.block_type, target_key.block_id
)
except libraries_api.IncompatibleTypesError as e:
log.error(f"Error validating block for library {context.target_library_key}: {e}")
return None
component = authoring_api.create_component(
context.target_package_id,
component_type=component_type,
local_key=target_key.block_id,
created=context.created_at,
created_by=context.created_by,
)
# Component existed and we do not replace it and it is not deleted previously
if component_existed and not component_deleted and context.should_skip_strategy:
return component.versioning.draft.publishable_entity_version
# If component existed and was deleted or we have to replace the current version
# Create the new component version for it
component_version = libraries_api.set_library_block_olx(target_key, new_olx_str=olx)
for filename, content_pk in context.content_by_filename.items():
filename_no_ext, _ = os.path.splitext(filename)
if filename_no_ext not in olx:
continue
new_path = f"static/{filename}"
authoring_api.create_component_version_content(
component_version.pk, content_pk, key=new_path
)
return component_version.publishable_entity_version
def _get_distinct_target_container_key(
context: _MigrationContext,
source_key: UsageKey,
container_type: ContainerType,
title: str,
) -> LibraryContainerLocator:
"""
Find a unique key for block_id by appending a unique identifier if necessary.
Args:
context (_MigrationContext): The migration context.
source_key (UsageKey): The source key.
container_type (ContainerType): The container type.
title (str): The title.
Returns:
LibraryContainerLocator: The target container key.
"""
# Check if we already processed this block
if context.is_already_migrated(source_key):
existing_version = context.get_existing_target(source_key)
return LibraryContainerLocator(
context.target_library_key,
container_type.value,
existing_version.key
)
# Generate new unique block ID
base_slug = (
source_key.block_id
if context.preserve_url_slugs
else (slugify(title) or source_key.block_id)
)
unique_slug = _find_unique_slug(context, base_slug)
return LibraryContainerLocator(
context.target_library_key,
container_type.value,
unique_slug
)
def _get_distinct_target_usage_key(
context: _MigrationContext,
source_key: UsageKey,
component_type: ComponentType,
title: str,
) -> LibraryUsageLocatorV2:
"""
Find a unique key for block_id by appending a unique identifier if necessary.
Args:
context: The migration context
source_key: The original usage key from the source
component_type: The component type string
olx: The OLX content of the component
Returns:
A unique LibraryUsageLocatorV2 for the target
Raises:
ValueError: If source_key is invalid
"""
# Check if we already processed this block
if context.is_already_migrated(source_key):
log.debug(f"Block {source_key} already exists, reusing existing target")
existing_target = context.get_existing_target(source_key)
block_id = existing_target.component.local_key
# mypy thinks LibraryUsageLocatorV2 is abstract. It's not.
return LibraryUsageLocatorV2( # type: ignore[abstract]
context.target_library_key,
source_key.block_type,
block_id
)
# Generate new unique block ID
base_slug = (
source_key.block_id
if context.preserve_url_slugs
else (slugify(title) or source_key.block_id)
)
unique_slug = _find_unique_slug(context, base_slug, component_type)
# mypy thinks LibraryUsageLocatorV2 is abstract. It's not.
return LibraryUsageLocatorV2( # type: ignore[abstract]
context.target_library_key,
source_key.block_type,
unique_slug
)
def _find_unique_slug(
context: _MigrationContext,
base_slug: str,
component_type: ComponentType | None = None,
max_attempts: int = 1000
) -> str:
"""
Find a unique slug by appending incrementing numbers if necessary.
Using batch querying to avoid multiple database roundtrips.
Args:
component_type: The component type to check against
base_slug: The base slug to make unique
max_attempts: Maximum number of attempts to prevent infinite loops
Returns:
A unique slug string
Raises:
RuntimeError: If unable to find unique slug within max_attempts
"""
if not component_type:
base_key = base_slug
else:
base_key = f"{component_type}:{base_slug}"
existing_publishable_entity_keys = context.get_existing_target_entity_keys(base_key)
# Check if base slug is available
if base_key not in existing_publishable_entity_keys:
return base_slug
# Try numbered variations until we find one that doesn't exist
for i in range(1, max_attempts + 1):
candidate_slug = f"{base_slug}_{i}"
candidate_key = f"{component_type}:{candidate_slug}" if component_type else candidate_slug
if candidate_key not in existing_publishable_entity_keys:
return candidate_slug
raise RuntimeError(f"Unable to find unique slug after {max_attempts} attempts for base: {base_slug}")
def _create_migration_artifacts_incrementally(
root_migrated_node: _MigratedNode,
source: ModulestoreSource,
migration: ModulestoreMigration,
status: UserTaskStatus
) -> None:
"""
Create ModulestoreBlockSource and ModulestoreBlockMigration objects incrementally.
"""
nodes = tuple(root_migrated_node.all_source_to_target_pairs())
total_nodes = len(nodes)
processed = 0
for source_usage_key, target_version in root_migrated_node.all_source_to_target_pairs():
block_source, _ = ModulestoreBlockSource.objects.get_or_create(
overall_source=source,
key=source_usage_key
)
ModulestoreBlockMigration.objects.create(
overall_migration=migration,
source=block_source,
target_id=target_version.entity_id,
)
processed += 1
if processed % 10 == 0 or processed == total_nodes:
status.set_state(
f"{MigrationStep.MAPPING_OLD_TO_NEW.value} ({processed}/{total_nodes})"
)

View File

@@ -0,0 +1,21 @@
"""
Factories for creating test data for the modulestore migrator.
"""
import uuid
import factory
from opaque_keys.edx.keys import LearningContextKey
from cms.djangoapps.modulestore_migrator.models import ModulestoreSource
class ModulestoreSourceFactory(factory.django.DjangoModelFactory):
"""
Factory for creating ModulestoreSource instances.
"""
class Meta:
model = ModulestoreSource
@factory.lazy_attribute
def key(self):
return LearningContextKey.from_string(f"course-v1:edX+DemoX+{uuid.uuid4()}")

View File

@@ -0,0 +1,115 @@
"""
Test cases for the modulestore migrator API.
"""
from opaque_keys.edx.locator import LibraryLocatorV2
from openedx_learning.api import authoring as authoring_api
from organizations.tests.factories import OrganizationFactory
import pytest
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from common.djangoapps.student.tests.factories import UserFactory
from cms.djangoapps.modulestore_migrator import api
from cms.djangoapps.modulestore_migrator.data import CompositionLevel, RepeatHandlingStrategy
from cms.djangoapps.modulestore_migrator.models import ModulestoreMigration
from cms.djangoapps.modulestore_migrator.tests.factories import ModulestoreSourceFactory
from openedx.core.djangoapps.content_libraries import api as lib_api
@pytest.mark.django_db
class TestModulestoreMigratorAPI(ModuleStoreTestCase):
"""
Test cases for the modulestore migrator API.
"""
def setUp(self):
super().setUp()
self.organization = OrganizationFactory()
self.lib_key = LibraryLocatorV2.from_string(
f"lib:{self.organization.short_name}:test-key"
)
lib_api.create_library(
org=self.organization,
slug=self.lib_key.slug,
title="Test Library",
)
self.library = lib_api.ContentLibrary.objects.get(slug=self.lib_key.slug)
self.learning_package = self.library.learning_package
def test_start_migration_to_library(self):
"""
Test that the API can start a migration to a library.
"""
source = ModulestoreSourceFactory()
user = UserFactory()
api.start_migration_to_library(
user=user,
source_key=source.key,
target_library_key=self.library.library_key,
target_collection_slug=None,
composition_level=CompositionLevel.Component.value,
repeat_handling_strategy=RepeatHandlingStrategy.Skip.value,
preserve_url_slugs=True,
forward_source_to_target=False,
)
modulestoremigration = ModulestoreMigration.objects.get()
assert modulestoremigration.source.key == source.key
assert (
modulestoremigration.composition_level == CompositionLevel.Component.value
)
assert modulestoremigration.repeat_handling_strategy == RepeatHandlingStrategy.Skip.value
assert modulestoremigration.preserve_url_slugs is True
assert modulestoremigration.task_status is not None
assert modulestoremigration.task_status.user == user
def test_start_migration_to_library_with_collection(self):
"""
Test that the API can start a migration to a library with a target collection.
"""
source = ModulestoreSourceFactory()
user = UserFactory()
collection_key = "test-collection"
authoring_api.create_collection(
learning_package_id=self.learning_package.id,
key=collection_key,
title="Test Collection",
created_by=user.id,
)
api.start_migration_to_library(
user=user,
source_key=source.key,
target_library_key=self.library.library_key,
target_collection_slug=collection_key,
composition_level=CompositionLevel.Component.value,
repeat_handling_strategy=RepeatHandlingStrategy.Skip.value,
preserve_url_slugs=True,
forward_source_to_target=False,
)
modulestoremigration = ModulestoreMigration.objects.get()
assert modulestoremigration.target_collection.key == collection_key
def test_forking_is_not_implemented(self):
"""
Test that the API raises NotImplementedError for the Fork strategy.
"""
source = ModulestoreSourceFactory()
user = UserFactory()
with pytest.raises(NotImplementedError):
api.start_migration_to_library(
user=user,
source_key=source.key,
target_library_key=self.library.library_key,
target_collection_slug=None,
composition_level=CompositionLevel.Component.value,
repeat_handling_strategy=RepeatHandlingStrategy.Fork.value,
preserve_url_slugs=True,
forward_source_to_target=False,
)

View File

@@ -0,0 +1,1430 @@
"""
Tests for the modulestore_migrator tasks
"""
from unittest.mock import Mock
import ddt
from django.utils import timezone
from lxml import etree
from opaque_keys.edx.keys import CourseKey
from opaque_keys.edx.locator import LibraryLocator, LibraryLocatorV2
from openedx_learning.api.authoring_models import Collection, PublishableEntityVersion
from openedx_learning.api import authoring as authoring_api
from organizations.tests.factories import OrganizationFactory
from user_tasks.models import UserTaskArtifact
from user_tasks.tasks import UserTaskStatus
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from common.djangoapps.student.tests.factories import UserFactory
from cms.djangoapps.modulestore_migrator.data import CompositionLevel, RepeatHandlingStrategy
from cms.djangoapps.modulestore_migrator.models import (
ModulestoreMigration,
ModulestoreSource,
)
from cms.djangoapps.modulestore_migrator.tasks import (
_migrate_component,
_migrate_container,
_migrate_node,
_MigratedNode,
_MigrationContext,
_MigrationTask,
migrate_from_modulestore,
MigrationStep,
)
from openedx.core.djangoapps.content_libraries import api as lib_api
@ddt.ddt
class TestMigrateFromModulestore(ModuleStoreTestCase):
"""
Test the migrate_from_modulestore task
"""
def setUp(self):
super().setUp()
self.user = UserFactory()
self.organization = OrganizationFactory(short_name="testorg")
self.lib_key = LibraryLocatorV2.from_string(
f"lib:{self.organization.short_name}:test-key"
)
lib_api.create_library(
org=self.organization,
slug=self.lib_key.slug,
title="Test Library",
)
self.library = lib_api.ContentLibrary.objects.get(slug=self.lib_key.slug)
self.learning_package = self.library.learning_package
self.course = CourseFactory(
org=self.organization.short_name,
course="TestCourse",
run="TestRun",
display_name="Test Course",
)
self.collection = Collection.objects.create(
learning_package=self.learning_package,
key="test_collection",
title="Test Collection",
)
def _get_task_status_fail_message(self, status):
"""
Helper method to get the failure message from a UserTaskStatus object.
"""
if status.state == UserTaskStatus.FAILED:
return UserTaskArtifact.objects.get(status=status, name="Error").text
return None
def test_migrate_node_wiki_tag(self):
"""
Test _migrate_node ignores wiki tags
"""
wiki_node = etree.fromstring("<wiki />")
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_node(
context=context,
source_node=wiki_node,
)
self.assertIsNone(result.source_to_target)
self.assertEqual(len(result.children), 0)
def test_migrate_node_course_root(self):
"""
Test _migrate_node handles course root
"""
course_node = etree.fromstring(
'<course url_name="course" display_name="Test Course">'
'<chapter url_name="chapter1" display_name="Chapter 1" />'
"</course>"
)
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_node(
context=context,
source_node=course_node,
)
# Course root should not be migrated
self.assertIsNone(result.source_to_target)
# But should have children processed
self.assertEqual(len(result.children), 1)
def test_migrate_node_library_root(self):
"""
Test _migrate_node handles library root
"""
library_node = etree.fromstring(
'<library url_name="library" display_name="Test Library">'
'<problem url_name="problem1" display_name="Problem 1" />'
"</library>"
)
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_node(
context=context,
source_node=library_node,
)
# Library root should not be migrated
self.assertIsNone(result.source_to_target)
# But should have children processed
self.assertEqual(len(result.children), 1)
@ddt.data(
("chapter", CompositionLevel.Unit, None),
("sequential", CompositionLevel.Unit, None),
("vertical", CompositionLevel.Unit, True),
("chapter", CompositionLevel.Section, True),
("sequential", CompositionLevel.Section, True),
("vertical", CompositionLevel.Section, True),
)
@ddt.unpack
def test_migrate_node_container_composition_level(
self, tag_name, composition_level, should_migrate
):
"""
Test _migrate_node respects composition level for containers
"""
container_node = etree.fromstring(
f'<{tag_name} url_name="test_{tag_name}" display_name="Test {tag_name.title()}" />'
)
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=composition_level,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_node(
context=context,
source_node=container_node,
)
if should_migrate:
self.assertIsNotNone(result.source_to_target)
source_key, _ = result.source_to_target
self.assertEqual(source_key.block_type, tag_name)
self.assertEqual(source_key.block_id, f"test_{tag_name}")
else:
self.assertIsNone(result.source_to_target)
def test_migrate_node_without_url_name(self):
"""
Test _migrate_node handles nodes without url_name
"""
node_without_url_name = etree.fromstring(
'<problem display_name="No URL Name" />'
)
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_node(
context=context,
source_node=node_without_url_name,
)
self.assertIsNone(result.source_to_target)
self.assertEqual(len(result.children), 0)
def test_migrated_node_all_source_to_target_pairs(self):
"""
Test _MigratedNode.all_source_to_target_pairs traversal
"""
mock_version1 = Mock(spec=PublishableEntityVersion)
mock_version2 = Mock(spec=PublishableEntityVersion)
mock_version3 = Mock(spec=PublishableEntityVersion)
key1 = self.course.id.make_usage_key("problem", "problem1")
key2 = self.course.id.make_usage_key("problem", "problem2")
key3 = self.course.id.make_usage_key("problem", "problem3")
child_node = _MigratedNode(source_to_target=(key3, mock_version3), children=[])
parent_node = _MigratedNode(
source_to_target=(key1, mock_version1),
children=[
_MigratedNode(source_to_target=(key2, mock_version2), children=[]),
child_node,
],
)
pairs = list(parent_node.all_source_to_target_pairs())
self.assertEqual(len(pairs), 3)
self.assertEqual(pairs[0][0], key1)
self.assertEqual(pairs[1][0], key2)
self.assertEqual(pairs[2][0], key3)
def test_migrate_from_modulestore_invalid_source(self):
"""
Test migrate_from_modulestore with invalid source
"""
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": 999999, # Non-existent source
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(self._get_task_status_fail_message(status), "ModulestoreSource matching query does not exist.")
def test_migrate_from_modulestore_invalid_target_package(self):
"""
Test migrate_from_modulestore with invalid target package
"""
source = ModulestoreSource.objects.create(
key=self.course.id,
)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": 999999, # Non-existent package
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(self._get_task_status_fail_message(status), "LearningPackage matching query does not exist.")
def test_migrate_from_modulestore_invalid_collection(self):
"""
Test migrate_from_modulestore with invalid collection
"""
source = ModulestoreSource.objects.create(
key=self.course.id,
)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": 999999, # Non-existent collection
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(self._get_task_status_fail_message(status), "Collection matching query does not exist.")
def test_migration_task_calculate_total_steps(self):
"""
Test _MigrationTask.calculate_total_steps returns correct count
"""
total_steps = _MigrationTask.calculate_total_steps({})
expected_steps = len(list(MigrationStep))
self.assertEqual(total_steps, expected_steps)
def test_migrate_component_success(self):
"""
Test _migrate_component successfully creates a new component
"""
source_key = self.course.id.make_usage_key("problem", "test_problem")
olx = '<problem display_name="Test Problem"><multiplechoiceresponse></multiplechoiceresponse></problem>'
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="test_problem"
)
self.assertIsNotNone(result)
self.assertIsInstance(result, PublishableEntityVersion)
self.assertEqual(
"problem", result.componentversion.component.component_type.name
)
def test_migrate_component_with_static_content(self):
"""
Test _migrate_component with static file content
"""
source_key = self.course.id.make_usage_key("problem", "test_problem_with_image")
olx = '<problem display_name="Test Problem"><p>See image: test_image.png</p></problem>'
media_type = authoring_api.get_or_create_media_type("image/png")
test_content = authoring_api.get_or_create_file_content(
self.learning_package.id,
media_type.id,
data=b"fake_image_data",
created=timezone.now(),
)
content_by_filename = {"test_image.png": test_content.id}
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename=content_by_filename,
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="test_problem"
)
self.assertIsNotNone(result)
component_content = result.componentversion.componentversioncontent_set.filter(
key="static/test_image.png"
).first()
self.assertIsNotNone(component_content)
self.assertEqual(component_content.content_id, test_content.id)
def test_migrate_component_replace_existing_false(self):
"""
Test _migrate_component with replace_existing=False returns existing component
"""
source_key = self.course.id.make_usage_key("problem", "existing_problem")
olx = '<problem display_name="Test Problem"><multiplechoiceresponse></multiplechoiceresponse></problem>'
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
first_result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="test_problem"
)
context.existing_source_to_target_keys[source_key] = first_result.entity
second_result = _migrate_component(
context=context,
source_key=source_key,
olx='<problem display_name="Updated Problem"><multiplechoiceresponse></multiplechoiceresponse></problem>',
title="updated_problem"
)
self.assertEqual(first_result.entity_id, second_result.entity_id)
self.assertEqual(first_result.version_num, second_result.version_num)
def test_migrate_component_same_title(self):
"""
Test _migrate_component for two components with the same title
Using preserve_url_slugs=False to create a new component with
a different URL slug based on the component's Title.
"""
source_key_1 = self.course.id.make_usage_key("problem", "existing_problem_1")
source_key_2 = self.course.id.make_usage_key("problem", "existing_problem_2")
olx = '<problem display_name="Test Problem"><multiplechoiceresponse></multiplechoiceresponse></problem>'
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=False,
created_at=timezone.now(),
created_by=self.user.id,
)
first_result = _migrate_component(
context=context,
source_key=source_key_1,
olx=olx,
title="test_problem"
)
context.existing_source_to_target_keys[source_key_1] = first_result.entity
second_result = _migrate_component(
context=context,
source_key=source_key_2,
olx=olx,
title="test_problem"
)
self.assertNotEqual(first_result.entity_id, second_result.entity_id)
self.assertNotEqual(first_result.entity.key, second_result.entity.key)
def test_migrate_component_replace_existing_true(self):
"""
Test _migrate_component with replace_existing=True creates new version
"""
source_key = self.course.id.make_usage_key("problem", "replaceable_problem")
original_olx = '<problem display_name="Original"><multiplechoiceresponse></multiplechoiceresponse></problem>'
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Update,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
first_result = _migrate_component(
context=context,
source_key=source_key,
olx=original_olx,
title="original"
)
context.existing_source_to_target_keys[source_key] = first_result.entity
updated_olx = '<problem display_name="Updated"><multiplechoiceresponse></multiplechoiceresponse></problem>'
second_result = _migrate_component(
context=context,
source_key=source_key,
olx=updated_olx,
title="updated"
)
self.assertEqual(first_result.entity_id, second_result.entity_id)
self.assertNotEqual(first_result.version_num, second_result.version_num)
def test_migrate_component_different_block_types(self):
"""
Test _migrate_component with different block types
"""
block_types = ["problem", "html", "video", "discussion"]
for block_type in block_types:
source_key = self.course.id.make_usage_key(block_type, f"test_{block_type}")
olx = f'<{block_type} display_name="Test {block_type.title()}"></{block_type}>'
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="test"
)
self.assertIsNotNone(result, f"Failed to migrate {block_type}")
self.assertEqual(
block_type, result.componentversion.component.component_type.name
)
def test_migrate_component_content_filename_not_in_olx(self):
"""
Test _migrate_component ignores content files not referenced in OLX
"""
source_key = self.course.id.make_usage_key(
"problem", "test_problem_selective_content"
)
olx = '<problem display_name="Test Problem"><p>See image: referenced.png</p></problem>'
media_type = authoring_api.get_or_create_media_type("image/png")
referenced_content = authoring_api.get_or_create_file_content(
self.learning_package.id,
media_type.id,
data=b"referenced_image_data",
created=timezone.now(),
)
unreferenced_content = authoring_api.get_or_create_file_content(
self.learning_package.id,
media_type.id,
data=b"unreferenced_image_data",
created=timezone.now(),
)
content_by_filename = {
"referenced.png": referenced_content.id,
"unreferenced.png": unreferenced_content.id,
}
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename=content_by_filename,
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="test_problem"
)
self.assertIsNotNone(result)
referenced_content_exists = (
result.componentversion.componentversioncontent_set.filter(
key="static/referenced.png"
).exists()
)
unreferenced_content_exists = (
result.componentversion.componentversioncontent_set.filter(
key="static/unreferenced.png"
).exists()
)
self.assertTrue(referenced_content_exists)
self.assertFalse(unreferenced_content_exists)
def test_migrate_component_library_source_key(self):
"""
Test _migrate_component with library source key
"""
library_key = LibraryLocator(org="TestOrg", library="TestLibrary")
source_key = library_key.make_usage_key("problem", "library_problem")
olx = '<problem display_name="Library Problem"><multiplechoiceresponse></multiplechoiceresponse></problem>'
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="library_problem"
)
self.assertIsNotNone(result)
self.assertEqual(
"problem", result.componentversion.component.component_type.name
)
def test_migrate_component_duplicate_content_integrity_error(self):
"""
Test _migrate_component handles IntegrityError when content already exists
"""
source_key = self.course.id.make_usage_key(
"problem", "test_problem_duplicate_content"
)
olx = '<problem display_name="Test Problem"><p>See image: duplicate.png</p></problem>'
media_type = authoring_api.get_or_create_media_type("image/png")
test_content = authoring_api.get_or_create_file_content(
self.learning_package.id,
media_type.id,
data=b"test_image_data",
created=timezone.now(),
)
content_by_filename = {"duplicate.png": test_content.id}
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename=content_by_filename,
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Update,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
first_result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="test_problem"
)
context.existing_source_to_target_keys[source_key] = first_result.entity
second_result = _migrate_component(
context=context,
source_key=source_key,
olx=olx,
title="test_problem"
)
self.assertIsNotNone(first_result)
self.assertIsNotNone(second_result)
self.assertEqual(first_result.entity_id, second_result.entity_id)
def test_migrate_container_creates_new_container(self):
"""
Test _migrate_container creates a new container when none exists
"""
source_key = self.course.id.make_usage_key("vertical", "test_vertical")
child_component_1 = authoring_api.create_component(
self.learning_package.id,
component_type=authoring_api.get_or_create_component_type(
"xblock.v1", "problem"
),
local_key="child_problem_1",
created=timezone.now(),
created_by=self.user.id,
)
child_version_1 = authoring_api.create_next_component_version(
child_component_1.pk,
content_to_replace={},
created=timezone.now(),
created_by=self.user.id,
)
child_component_2 = authoring_api.create_component(
self.learning_package.id,
component_type=authoring_api.get_or_create_component_type(
"xblock.v1", "html"
),
local_key="child_html_1",
created=timezone.now(),
created_by=self.user.id,
)
child_version_2 = authoring_api.create_next_component_version(
child_component_2.pk,
content_to_replace={},
created=timezone.now(),
created_by=self.user.id,
)
children = [
child_version_1.publishable_entity_version,
child_version_2.publishable_entity_version,
]
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Test Vertical",
children=children,
)
self.assertIsInstance(result, PublishableEntityVersion)
container_version = result.containerversion
self.assertEqual(container_version.title, "Test Vertical")
entity_rows = container_version.entity_list.entitylistrow_set.all()
self.assertEqual(len(entity_rows), 2)
child_entity_ids = {row.entity_id for row in entity_rows}
expected_entity_ids = {child.entity_id for child in children}
self.assertEqual(child_entity_ids, expected_entity_ids)
def test_migrate_container_different_container_types(self):
"""
Test _migrate_container works with different container types
"""
container_types = [
(lib_api.ContainerType.Unit, "vertical"),
(lib_api.ContainerType.Subsection, "sequential"),
(lib_api.ContainerType.Section, "chapter"),
]
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
for container_type, block_type in container_types:
with self.subTest(container_type=container_type, block_type=block_type):
source_key = self.course.id.make_usage_key(
block_type, f"test_{block_type}"
)
result = _migrate_container(
context=context,
source_key=source_key,
container_type=container_type,
title=f"Test {block_type.title()}",
children=[],
)
self.assertIsNotNone(result)
container_version = result.containerversion
self.assertEqual(container_version.title, f"Test {block_type.title()}")
def test_migrate_container_replace_existing_false(self):
"""
Test _migrate_container returns existing container when replace_existing=False
"""
source_key = self.course.id.make_usage_key("vertical", "existing_vertical")
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
first_result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Original Title",
children=[],
)
context.existing_source_to_target_keys[source_key] = first_result.entity
second_result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Updated Title",
children=[],
)
self.assertEqual(first_result.entity_id, second_result.entity_id)
self.assertEqual(first_result.version_num, second_result.version_num)
container_version = second_result.containerversion
self.assertEqual(container_version.title, "Original Title")
def test_migrate_container_same_title(self):
"""
Test _migrate_container for two containers with the same title
Using preserve_url_slugs=False to create a new Unit with
a different URL slug based on the container's Title.
"""
source_key_1 = self.course.id.make_usage_key("vertical", "human_readable_vertical_1")
source_key_2 = self.course.id.make_usage_key("vertical", "human_readable_vertical_2")
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=False,
created_at=timezone.now(),
created_by=self.user.id,
)
first_result = _migrate_container(
context=context,
source_key=source_key_1,
container_type=lib_api.ContainerType.Unit,
title="Original Human Readable Title",
children=[],
)
context.existing_source_to_target_keys[source_key_1] = first_result.entity
second_result = _migrate_container(
context=context,
source_key=source_key_2,
container_type=lib_api.ContainerType.Unit,
title="Original Human Readable Title",
children=[],
)
self.assertNotEqual(first_result.entity_id, second_result.entity_id)
self.assertNotEqual(first_result.entity.key, second_result.entity.key)
# Make sure the current logic from tasts::_find_unique_slug is used
self.assertEqual(second_result.entity.key, first_result.entity.key + "_1")
container_version = second_result.containerversion
self.assertEqual(container_version.title, "Original Human Readable Title")
def test_migrate_container_replace_existing_true(self):
"""
Test _migrate_container creates new version when replace_existing=True
"""
source_key = self.course.id.make_usage_key("vertical", "replaceable_vertical")
child_component = authoring_api.create_component(
self.learning_package.id,
component_type=authoring_api.get_or_create_component_type(
"xblock.v1", "problem"
),
local_key="child_problem",
created=timezone.now(),
created_by=self.user.id,
)
child_version = authoring_api.create_next_component_version(
child_component.pk,
content_to_replace={},
created=timezone.now(),
created_by=self.user.id,
)
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Update,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
first_result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Original Title",
children=[],
)
context.existing_source_to_target_keys[source_key] = first_result.entity
second_result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Updated Title",
children=[child_version.publishable_entity_version],
)
self.assertEqual(first_result.entity_id, second_result.entity_id)
self.assertNotEqual(first_result.version_num, second_result.version_num)
container_version = second_result.containerversion
self.assertEqual(container_version.title, "Updated Title")
self.assertEqual(container_version.entity_list.entitylistrow_set.count(), 1)
def test_migrate_container_with_library_source_key(self):
"""
Test _migrate_container with library source key
"""
library_key = LibraryLocator(org="TestOrg", library="TestLibrary")
source_key = library_key.make_usage_key("vertical", "library_vertical")
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Library Vertical",
children=[],
)
self.assertIsNotNone(result)
container_version = result.containerversion
self.assertEqual(container_version.title, "Library Vertical")
def test_migrate_container_empty_children_list(self):
"""
Test _migrate_container handles empty children list
"""
source_key = self.course.id.make_usage_key("vertical", "empty_vertical")
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Empty Vertical",
children=[],
)
self.assertIsNotNone(result)
container_version = result.containerversion
self.assertEqual(container_version.entity_list.entitylistrow_set.count(), 0)
def test_migrate_container_preserves_child_order(self):
"""
Test _migrate_container preserves the order of children
"""
source_key = self.course.id.make_usage_key("vertical", "ordered_vertical")
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
children = []
for i in range(3):
child_component = authoring_api.create_component(
self.learning_package.id,
component_type=authoring_api.get_or_create_component_type(
"xblock.v1", "problem"
),
local_key=f"child_problem_{i}",
created=timezone.now(),
created_by=self.user.id,
)
child_version = authoring_api.create_next_component_version(
child_component.pk,
content_to_replace={},
created=timezone.now(),
created_by=self.user.id,
)
children.append(child_version.publishable_entity_version)
result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Ordered Vertical",
children=children,
)
container_version = result.containerversion
entity_rows = list(
container_version.entity_list.entitylistrow_set.order_by("order_num")
)
self.assertEqual(len(entity_rows), 3)
for i, (expected_child, actual_row) in enumerate(zip(children, entity_rows)):
self.assertEqual(expected_child.entity_id, actual_row.entity_id)
def test_migrate_container_with_mixed_child_types(self):
"""
Test _migrate_container with children of different component types
"""
source_key = self.course.id.make_usage_key("vertical", "mixed_vertical")
problem_component = authoring_api.create_component(
self.learning_package.id,
component_type=authoring_api.get_or_create_component_type(
"xblock.v1", "problem"
),
local_key="mixed_problem",
created=timezone.now(),
created_by=self.user.id,
)
problem_version = authoring_api.create_next_component_version(
problem_component.pk,
content_to_replace={},
created=timezone.now(),
created_by=self.user.id,
)
html_component = authoring_api.create_component(
self.learning_package.id,
component_type=authoring_api.get_or_create_component_type(
"xblock.v1", "html"
),
local_key="mixed_html",
created=timezone.now(),
created_by=self.user.id,
)
html_version = authoring_api.create_next_component_version(
html_component.pk,
content_to_replace={},
created=timezone.now(),
created_by=self.user.id,
)
video_component = authoring_api.create_component(
self.learning_package.id,
component_type=authoring_api.get_or_create_component_type(
"xblock.v1", "video"
),
local_key="mixed_video",
created=timezone.now(),
created_by=self.user.id,
)
video_version = authoring_api.create_next_component_version(
video_component.pk,
content_to_replace={},
created=timezone.now(),
created_by=self.user.id,
)
children = [
problem_version.publishable_entity_version,
html_version.publishable_entity_version,
video_version.publishable_entity_version,
]
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
result = _migrate_container(
context=context,
source_key=source_key,
container_type=lib_api.ContainerType.Unit,
title="Mixed Content Vertical",
children=children,
)
self.assertIsNotNone(result)
container_version = result.containerversion
self.assertEqual(container_version.entity_list.entitylistrow_set.count(), 3)
child_entity_ids = set(
container_version.entity_list.entitylistrow_set.values_list(
"entity_id", flat=True
)
)
expected_entity_ids = {child.entity_id for child in children}
self.assertEqual(child_entity_ids, expected_entity_ids)
def test_migrate_container_generates_correct_target_key(self):
"""
Test _migrate_container generates correct target key from source key
"""
course_source_key = self.course.id.make_usage_key("vertical", "test_vertical")
context = _MigrationContext(
existing_source_to_target_keys={},
target_package_id=self.learning_package.id,
target_library_key=self.library.library_key,
source_context_key=self.course.id,
content_by_filename={},
composition_level=CompositionLevel.Unit,
repeat_handling_strategy=RepeatHandlingStrategy.Skip,
preserve_url_slugs=True,
created_at=timezone.now(),
created_by=self.user.id,
)
course_result = _migrate_container(
context=context,
source_key=course_source_key,
container_type=lib_api.ContainerType.Unit,
title="Course Vertical",
children=[],
)
context.add_migration(course_source_key, course_result.entity)
library_key = LibraryLocator(org="TestOrg", library="TestLibrary")
library_source_key = library_key.make_usage_key("vertical", "test_vertical")
library_result = _migrate_container(
context=context,
source_key=library_source_key,
container_type=lib_api.ContainerType.Unit,
title="Library Vertical",
children=[],
)
self.assertIsNotNone(course_result)
self.assertIsNotNone(library_result)
self.assertNotEqual(course_result.entity_id, library_result.entity_id)
def test_migrate_from_modulestore_success_course(self):
"""
Test successful migration from course to library
"""
source = ModulestoreSource.objects.create(key=self.course.id)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.composition_level, CompositionLevel.Unit.value)
self.assertEqual(migration.repeat_handling_strategy, RepeatHandlingStrategy.Skip.value)
def test_migrate_from_modulestore_library_validation_failure(self):
"""
Test migration from legacy library fails when modulestore content doesn't exist
"""
library_key = LibraryLocator(org="TestOrg", library="TestLibrary")
source = ModulestoreSource.objects.create(key=library_key)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": None,
"repeat_handling_strategy": RepeatHandlingStrategy.Update.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Section.value,
"forward_source_to_target": True,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
# Should fail at loading step since we don't have real modulestore content
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(
self._get_task_status_fail_message(status),
"Failed to load source item 'lib-block-v1:TestOrg+TestLibrary+type@library+block@library' "
"from ModuleStore: library-v1:TestOrg+TestLibrary+branch@library"
)
def test_migrate_from_modulestore_invalid_source_key_type(self):
"""
Test migration with invalid source key type
"""
invalid_key = LibraryLocatorV2.from_string("lib:testorg:invalid")
source = ModulestoreSource.objects.create(key=invalid_key)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(
self._get_task_status_fail_message(status),
f"Not a valid source context key: {invalid_key}. Source key must reference a course or a legacy library."
)
def test_migrate_from_modulestore_nonexistent_modulestore_item(self):
"""
Test migration when modulestore item doesn't exist
"""
nonexistent_course_key = CourseKey.from_string(
"course-v1:NonExistent+Course+Run"
)
source = ModulestoreSource.objects.create(key=nonexistent_course_key)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
self.assertEqual(
self._get_task_status_fail_message(status),
"Failed to load source item 'block-v1:NonExistent+Course+Run+type@course+block@course' "
"from ModuleStore: course-v1:NonExistent+Course+Run+branch@draft-branch"
)
def test_migrate_from_modulestore_task_status_progression(self):
"""Test that task status progresses through expected steps"""
source = ModulestoreSource.objects.create(key=self.course.id)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
# Should either succeed or fail, but should have progressed past validation
self.assertIn(status.state, [UserTaskStatus.SUCCEEDED, UserTaskStatus.FAILED])
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertEqual(migration.task_status, status)
def test_migrate_from_modulestore_multiple_users_no_interference(self):
"""
Test that migrations by different users don't interfere with each other
"""
source = ModulestoreSource.objects.create(key=self.course.id)
other_user = UserFactory()
task1 = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
task2 = migrate_from_modulestore.apply_async(
kwargs={
"user_id": other_user.id,
"source_pk": source.id,
"target_package_pk": self.learning_package.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status1 = UserTaskStatus.objects.get(task_id=task1.id)
status2 = UserTaskStatus.objects.get(task_id=task2.id)
self.assertEqual(status1.user, self.user)
self.assertEqual(status2.user, other_user)
# The first task should not be cancelled since it's from a different user
self.assertNotEqual(status1.state, UserTaskStatus.CANCELED)

View File

@@ -1005,6 +1005,7 @@ INSTALLED_APPS = [
'openedx.core.djangoapps.course_groups', # not used in cms (yet), but tests run
'cms.djangoapps.xblock_config.apps.XBlockConfig',
'cms.djangoapps.export_course_metadata.apps.ExportCourseMetadataConfig',
'cms.djangoapps.modulestore_migrator',
# New (Learning-Core-based) XBlock runtime
'openedx.core.djangoapps.xblock.apps.StudioXBlockAppConfig',

View File

@@ -141,6 +141,8 @@ urlpatterns = oauth2_urlpatterns + [
# rest api for course import/export
path('api/courses/', include('cms.djangoapps.contentstore.api.urls', namespace='courses_api')
),
path('api/modulestore_migrator/',
include('cms.djangoapps.modulestore_migrator.rest_api.urls', namespace='modulestore_migrator_api')),
re_path(fr'^export/{COURSELIKE_KEY_PATTERN}$', contentstore_views.export_handler,
name='export_handler'),
re_path(fr'^export_output/{COURSELIKE_KEY_PATTERN}$', contentstore_views.export_output_handler,

View File

@@ -27,6 +27,8 @@ MYSQL:
- UserOrgTag.key
- UserPreference.key
- XAPILRSConfiguration.key
- ModulestoreSource.key
- ModulestoreBlockSource.key
SNOWFLAKE:
- CourseOverview.start
- HistoricalCourseOverview.start

View File

@@ -9,6 +9,7 @@ files =
cms/lib/xblock/upstream_sync.py,
cms/lib/xblock/upstream_sync_container.py,
cms/djangoapps/contentstore/rest_api/v2/views/downstreams.py,
cms/djangoapps/modulestore_migrator,
openedx/core/djangoapps/content/learning_sequences,
# FIXME: need to solve type issues and add 'search' app here:
# openedx/core/djangoapps/content/search,

View File

@@ -9,7 +9,18 @@ from django.db.models import QuerySet
from opaque_keys.edx.locator import LibraryContainerLocator, LibraryLocatorV2, LibraryUsageLocatorV2
from openedx_learning.api import authoring as authoring_api
from openedx_learning.api.authoring_models import Container, Component, PublishableEntity
from openedx_learning.api.authoring_models import (
Component,
Container,
ContainerVersion,
Unit,
UnitVersion,
Subsection,
SubsectionVersion,
Section,
SectionVersion,
PublishableEntity,
)
from openedx.core.djangoapps.content_tagging.api import get_object_tag_counts
from openedx.core.djangoapps.xblock.api import get_component_from_usage_key
@@ -36,6 +47,25 @@ class ContainerType(Enum):
Subsection = "subsection"
Section = "section"
@property
def container_model_classes(self) -> tuple[type[Container], type[ContainerVersion]]:
"""
Get the container, containerversion subclasses associated with this type.
@@TODO Is this what we want, a hard mapping between container_types and Container classes?
* If so, then expand on this pattern, so that all ContainerType logic is contained within
this class, and get rid of the match-case statements that are all over the content_libraries
app.
* If not, then figure out what to do instead.
"""
match self:
case self.Unit:
return (Unit, UnitVersion)
case self.Subsection:
return (Subsection, SubsectionVersion)
case self.Section:
return (Section, SectionVersion)
raise TypeError(f"unexpected ContainerType: {self!r}")
@property
def olx_tag(self) -> str:
"""

View File

@@ -7,4 +7,5 @@ import typing as t
import django.contrib.auth.models
AuthUser: t.TypeAlias = django.contrib.auth.models.User
User: t.TypeAlias = django.contrib.auth.models.User | django.contrib.auth.models.AnonymousUser