fix: Unfinished migration on fail in one legacy library [FC-0107] (#37521)

- Fix the issue described in https://github.com/openedx/frontend-app-authoring/issues/2169#issuecomment-3412840187
- Adds `is_failed` field to migrations.
- Adds the logic of partial migration: If the import of a library fails, then mark it as failed and continue with the next library.
This commit is contained in:
Chris Chávez
2025-10-22 17:07:03 -05:00
committed by GitHub
parent 774f3b37cf
commit 89d3491fef
6 changed files with 336 additions and 195 deletions

View File

@@ -125,7 +125,9 @@ def get_migration_info(source_keys: list[CourseKey | LibraryLocator]) -> dict:
return {
info.key: info
for info in ModulestoreSource.objects.filter(
migrations__task_status__state=UserTaskStatus.SUCCEEDED, key__in=source_keys
migrations__task_status__state=UserTaskStatus.SUCCEEDED,
migrations__is_failed=False,
key__in=source_keys,
)
.values_list(
'migrations__target__key',

View File

@@ -0,0 +1,18 @@
# Generated by Django 4.2.24 on 2025-10-21 23:37
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('modulestore_migrator', '0002_alter_modulestoremigration_task_status'),
]
operations = [
migrations.AddField(
model_name='modulestoremigration',
name='is_failed',
field=models.BooleanField(default=False, help_text='is the migration failed?'),
),
]

View File

@@ -41,7 +41,7 @@ class ModulestoreSource(models.Model):
)
def __str__(self):
return f"{self.__class__.__name__}('{self.key}')"
return f"{self.key}"
__repr__ = __str__
@@ -131,6 +131,17 @@ class ModulestoreMigration(models.Model):
"We temporarily save the staged content to allow for troubleshooting of failed migrations."
)
)
# Mostly used in bulk migrations. The `UserTaskStatus` represents the status of the entire bulk migration;
# a `FAILED` status means that the entire bulk-migration has failed.
# Each `ModulestoreMigration` saves the data of the migration of each legacy library.
# The `is_failed` value is to keep track a failed legacy library in the bulk migration,
# but allow continuing with the migration of the rest of the legacy libraries.
is_failed = models.BooleanField(
default=False,
help_text=_(
"is the migration failed?"
),
)
def __str__(self):
return (

View File

@@ -53,6 +53,11 @@ class ModulestoreMigrationSerializer(serializers.Serializer):
required=False,
default=False,
)
is_failed = serializers.BooleanField(
help_text="It is true if this migration is failed",
required=False,
default=False,
)
def get_fields(self):
fields = super().get_fields()

View File

@@ -470,6 +470,19 @@ def _create_collection(library_key: LibraryLocatorV2, title: str) -> Collection:
return collection
def _set_migrations_to_fail(source_data_list: list[_MigrationSourceData]):
"""
Set and save all migrations in `source_data_list` as failed
"""
for source_data in source_data_list:
source_data.migration.is_failed = True
ModulestoreMigration.objects.bulk_update(
[x.migration for x in source_data_list],
["is_failed"],
)
@shared_task(base=_MigrationTask, bind=True)
# Note: The decorator @set_code_owner_attribute cannot be used here because the UserTaskMixin
# does stack inspection and can't handle additional decorators.
@@ -562,79 +575,86 @@ def migrate_from_modulestore(
migration = source_data.migration
status.increment_completed_steps()
# Cancelling old tasks
status.set_state(MigrationStep.CANCELLING_OLD.value)
_cancel_old_tasks([source_data.source], status, target_package, [migration.id])
status.increment_completed_steps()
# Loading `legacy_root`
status.set_state(MigrationStep.LOADING)
legacy_root = _load_xblock(status, source_data.source_root_usage_key)
if legacy_root is None:
# Fail
return
status.increment_completed_steps()
# Staging legacy block
status.set_state(MigrationStep.STAGING.value)
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root,
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source_pk),
)
migration.staged_content = staged_content
status.increment_completed_steps()
# Parsing OLX
status.set_state(MigrationStep.PARSING.value)
parser = etree.XMLParser(strip_cdata=False)
try:
root_node = etree.fromstring(staged_content.olx, parser=parser)
except etree.ParseError as exc:
status.fail(f"Failed to parse source OLX (from staged content with id = {staged_content.id}): {exc}")
status.increment_completed_steps()
# Cancelling old tasks
status.set_state(MigrationStep.CANCELLING_OLD.value)
_cancel_old_tasks([source_data.source], status, target_package, [migration.id])
status.increment_completed_steps()
# Importing assets of the legacy block
status.set_state(MigrationStep.IMPORTING_ASSETS.value)
content_by_filename = _import_assets(migration)
status.increment_completed_steps()
# Loading `legacy_root`
status.set_state(MigrationStep.LOADING)
legacy_root = _load_xblock(status, source_data.source_root_usage_key)
if legacy_root is None:
# Fail
_set_migrations_to_fail([source_data])
return
status.increment_completed_steps()
# Importing structure of the legacy block
status.set_state(MigrationStep.IMPORTING_STRUCTURE.value)
change_log, root_migrated_node = _import_structure(
migration,
source_data,
target_library,
content_by_filename,
root_node,
status,
)
migration.change_log = change_log
status.increment_completed_steps()
# Staging legacy block
status.set_state(MigrationStep.STAGING.value)
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root,
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source_pk),
)
migration.staged_content = staged_content
status.increment_completed_steps()
status.set_state(MigrationStep.UNSTAGING.value)
staged_content.delete()
status.increment_completed_steps()
# Parsing OLX
status.set_state(MigrationStep.PARSING.value)
parser = etree.XMLParser(strip_cdata=False)
try:
root_node = etree.fromstring(staged_content.olx, parser=parser)
except etree.ParseError as exc:
status.fail(f"Failed to parse source OLX (from staged content with id = {staged_content.id}): {exc}")
_set_migrations_to_fail([source_data])
return
status.increment_completed_steps()
_create_migration_artifacts_incrementally(
root_migrated_node=root_migrated_node,
source=source_data.source,
migration=migration,
status=status,
)
status.increment_completed_steps()
# Importing assets of the legacy block
status.set_state(MigrationStep.IMPORTING_ASSETS.value)
content_by_filename = _import_assets(migration)
status.increment_completed_steps()
# Forwarding legacy content to migrated content
status.set_state(MigrationStep.FORWARDING.value)
if forward_source_to_target:
_forwarding_content(source_data)
status.increment_completed_steps()
# Importing structure of the legacy block
status.set_state(MigrationStep.IMPORTING_STRUCTURE.value)
change_log, root_migrated_node = _import_structure(
migration,
source_data,
target_library,
content_by_filename,
root_node,
status,
)
migration.change_log = change_log
status.increment_completed_steps()
# Populating the collection
status.set_state(MigrationStep.POPULATING_COLLECTION.value)
if target_collection:
_populate_collection(user_id, migration)
status.increment_completed_steps()
status.set_state(MigrationStep.UNSTAGING.value)
staged_content.delete()
status.increment_completed_steps()
_create_migration_artifacts_incrementally(
root_migrated_node=root_migrated_node,
source=source_data.source,
migration=migration,
status=status,
)
status.increment_completed_steps()
# Forwarding legacy content to migrated content
status.set_state(MigrationStep.FORWARDING.value)
if forward_source_to_target:
_forwarding_content(source_data)
status.increment_completed_steps()
# Populating the collection
status.set_state(MigrationStep.POPULATING_COLLECTION.value)
if target_collection:
_populate_collection(user_id, migration)
status.increment_completed_steps()
except Exception as exc: # pylint: disable=broad-exception-caught
_set_migrations_to_fail([source_data])
status.fail(str(exc))
@shared_task(base=_BulkMigrationTask, bind=True)
@@ -743,147 +763,166 @@ def bulk_migrate_from_modulestore(
status.increment_completed_steps()
# Cancelling old tasks
status.set_state(MigrationStep.CANCELLING_OLD.value)
_cancel_old_tasks(
[x.source for x in source_data_list],
status,
target_package,
[migration.id for migration in [x.migration for x in source_data_list]],
)
status.increment_completed_steps()
# Loading legacy blocks
status.set_state(MigrationStep.LOADING)
legacy_root_list: list[XBlock] = []
for source_data in source_data_list:
legacy_root = _load_xblock(status, source_data.source_root_usage_key)
if legacy_root is None:
# Fail
return
legacy_root_list.append(legacy_root)
status.increment_completed_steps()
for i, source_pk in enumerate(sources_pks):
source_data = source_data_list[i]
# Start migration for `source_pk`
# Staging legacy blocks
status.set_state(f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.STAGING.value}")
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root_list[i],
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source_pk),
)
source_data.migration.staged_content = staged_content
status.increment_completed_steps()
# Parsing OLX
status.set_state(f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.PARSING.value}")
parser = etree.XMLParser(strip_cdata=False)
try:
root_node = etree.fromstring(staged_content.olx, parser=parser)
except etree.ParseError as exc:
status.fail(f"Failed to parse source OLX (from staged content with id = {staged_content.id}): {exc}")
status.increment_completed_steps()
# Importing assets
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.IMPORTING_ASSETS.value}"
)
content_by_filename = _import_assets(source_data.migration)
status.increment_completed_steps()
# Importing structure of the legacy block
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.IMPORTING_STRUCTURE.value}"
)
change_log, root_migrated_node = _import_structure(
source_data.migration,
source_data,
target_library,
content_by_filename,
root_node,
try: # pylint: disable=too-many-nested-blocks
# Cancelling old tasks
status.set_state(MigrationStep.CANCELLING_OLD.value)
_cancel_old_tasks(
[x.source for x in source_data_list],
status,
)
source_data.migration.change_log = change_log
status.increment_completed_steps()
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.UNSTAGING.value}"
)
staged_content.delete()
status.increment_completed_steps()
_create_migration_artifacts_incrementally(
root_migrated_node=root_migrated_node,
source=source_data.source,
migration=source_data.migration,
status=status,
source_pk=source_pk,
target_package,
[migration.id for migration in [x.migration for x in source_data_list]],
)
status.increment_completed_steps()
# Forwarding legacy content to migrated content
status.set_state(MigrationStep.FORWARDING.value)
if forward_source_to_target:
# Loading legacy blocks
status.set_state(MigrationStep.LOADING)
legacy_root_list: list[XBlock] = []
for source_data in source_data_list:
_forwarding_content(source_data)
status.increment_completed_steps()
legacy_root = _load_xblock(status, source_data.source_root_usage_key)
if legacy_root is None:
# Fail
_set_migrations_to_fail(source_data_list)
return
legacy_root_list.append(legacy_root)
status.increment_completed_steps()
# Populating collections
status.set_state(MigrationStep.POPULATING_COLLECTION.value)
for i, source_pk in enumerate(sources_pks):
source_data = source_data_list[i]
try:
with transaction.atomic():
# Start migration for `source_pk`
# Staging legacy blocks
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.STAGING.value}"
)
staged_content = staging_api.stage_xblock_temporarily(
block=legacy_root_list[i],
user_id=status.user.pk,
purpose=CONTENT_STAGING_PURPOSE_TEMPLATE.format(source_key=source_pk),
)
source_data.migration.staged_content = staged_content
status.increment_completed_steps()
# Used to check if the source has a previous migration in a V2 library collection
# It is placed here to avoid the circular import
from .api import get_migration_info
for i, source_data in enumerate(source_data_list):
migration = source_data.migration
# Parsing OLX
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.PARSING.value}"
)
parser = etree.XMLParser(strip_cdata=False)
root_node = etree.fromstring(staged_content.olx, parser=parser)
status.increment_completed_steps()
title = legacy_root_list[i].display_name
if migration.target_collection is None:
if not create_collections:
# Importing assets
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): "
f"{MigrationStep.IMPORTING_ASSETS.value}"
)
content_by_filename = _import_assets(source_data.migration)
status.increment_completed_steps()
# Importing structure of the legacy block
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): "
f"{MigrationStep.IMPORTING_STRUCTURE.value}"
)
change_log, root_migrated_node = _import_structure(
source_data.migration,
source_data,
target_library,
content_by_filename,
root_node,
status,
)
source_data.migration.change_log = change_log
status.increment_completed_steps()
status.set_state(
f"{MigrationStep.STAGING.BULK_MIGRATION_PREFIX} ({source_pk}): {MigrationStep.UNSTAGING.value}"
)
staged_content.delete()
status.increment_completed_steps()
_create_migration_artifacts_incrementally(
root_migrated_node=root_migrated_node,
source=source_data.source,
migration=source_data.migration,
status=status,
source_pk=source_pk,
)
status.increment_completed_steps()
except: # pylint: disable=bare-except
# Mark this library as failed, migration of other libraries can continue
# If this case occurs and the migration ends without any further issues,
# the bulk migration status is success,
# TODO: add an intermediate status such as 'partially satisfactory'
source_data.migration.is_failed = True
# Forwarding legacy content to migrated content
status.set_state(MigrationStep.FORWARDING.value)
if forward_source_to_target:
for source_data in source_data_list:
if not source_data.migration.is_failed:
_forwarding_content(source_data)
status.increment_completed_steps()
# Populating collections
status.set_state(MigrationStep.POPULATING_COLLECTION.value)
# Used to check if the source has a previous migration in a V2 library collection
# It is placed here to avoid the circular import
from .api import get_migration_info
for i, source_data in enumerate(source_data_list):
migration = source_data.migration
if migration.is_failed:
continue
source_key = source_data.source.key
title = legacy_root_list[i].display_name
if migration.target_collection is None:
if not create_collections:
continue
if migration.repeat_handling_strategy == RepeatHandlingStrategy.Fork.value:
# Create a new collection when it is Fork
migration.target_collection = _create_collection(target_library_locator, title)
else:
# It is Skip or Update
# We need to verify if there is a previous migration with collection
# TODO: This only fetches the latest migration, if different migrations have been done
# on different V2 libraries, this could break the logic.
previous_migration = get_migration_info([source_key])
if (
source_key in previous_migration
and previous_migration[source_key].migrations__target_collection__key
):
# Has previous migration with collection
try:
# Get the previous collection
previous_collection = authoring_api.get_collection(
target_package.id,
previous_migration[source_key].migrations__target_collection__key,
)
source_key = source_data.source.key
migration.target_collection = previous_collection
except Collection.DoesNotExist:
# The collection no longer exists or is being migrated to a different library.
# In that case, create a new collection independent of strategy
migration.target_collection = _create_collection(target_library_locator, title)
else:
# Create collection and save in migration
if migration.repeat_handling_strategy == RepeatHandlingStrategy.Fork.value:
# Create a new collection when it is Fork
migration.target_collection = _create_collection(target_library_locator, title)
else:
# It is Skip or Update
# We need to verify if there is a previous migration with collection
# TODO: This only fetches the latest migration, if different migrations have been done
# on different V2 libraries, this could break the logic.
previous_migration = get_migration_info([source_key])
if (
source_key in previous_migration
and previous_migration[source_key].migrations__target_collection__key
):
# Has previous migration with collection
try:
# Get the previous collection
previous_collection = authoring_api.get_collection(
target_package.id,
previous_migration[source_key].migrations__target_collection__key,
)
_populate_collection(user_id, migration)
migration.target_collection = previous_collection
except Collection.DoesNotExist:
# The collection no longer exists or is being migrated to a different library.
# In that case, create a new collection independent of strategy
migration.target_collection = _create_collection(target_library_locator, title)
else:
# Create collection and save in migration
migration.target_collection = _create_collection(target_library_locator, title)
ModulestoreMigration.objects.bulk_update(
[x.migration for x in source_data_list],
["target_collection"],
)
status.increment_completed_steps()
_populate_collection(user_id, migration)
ModulestoreMigration.objects.bulk_update(
[x.migration for x in source_data_list],
["target_collection", "is_failed"],
)
status.increment_completed_steps()
except Exception as exc: # pylint: disable=broad-exception-caught
# If there is an exception in this block, all migrations fail.
_set_migrations_to_fail(source_data_list)
status.fail(str(exc))
@dataclass(frozen=True)

View File

@@ -2,7 +2,7 @@
Tests for the modulestore_migrator tasks
"""
from unittest.mock import Mock
from unittest.mock import Mock, patch
import ddt
from django.utils import timezone
from lxml import etree
@@ -1913,3 +1913,69 @@ class TestMigrateFromModulestore(ModuleStoreTestCase):
# The first task should not be cancelled since it's from a different user
self.assertNotEqual(status1.state, UserTaskStatus.CANCELED)
@patch("cms.djangoapps.modulestore_migrator.tasks._import_assets")
def test_migrate_fails_on_import(self, mock_import_assets):
"""
Test failed migration from legacy library to V2 library
"""
mock_import_assets.side_effect = Exception("Simulated import error")
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
task = migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"source_pk": source.id,
"target_library_key": str(self.lib_key),
"target_collection_pk": self.collection.id,
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
self.assertEqual(status.state, UserTaskStatus.FAILED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertTrue(migration.is_failed)
@patch("cms.djangoapps.modulestore_migrator.tasks._import_assets")
def test_bulk_migrate_fails_on_import(self, mock_import_assets):
"""
Test failed bulk migration from legacy libraries to V2 library
"""
mock_import_assets.side_effect = Exception("Simulated import error")
source = ModulestoreSource.objects.create(key=self.legacy_library.location.library_key)
source_2 = ModulestoreSource.objects.create(key=self.legacy_library_2.location.library_key)
task = bulk_migrate_from_modulestore.apply_async(
kwargs={
"user_id": self.user.id,
"sources_pks": [source.id, source_2.id],
"target_library_key": str(self.lib_key),
"target_collection_pks": [self.collection.id, self.collection2.id],
"repeat_handling_strategy": RepeatHandlingStrategy.Skip.value,
"preserve_url_slugs": True,
"composition_level": CompositionLevel.Unit.value,
"forward_source_to_target": False,
}
)
status = UserTaskStatus.objects.get(task_id=task.id)
# The task is successful because the entire bulk migration ends successfully.
# When a legacy library fails to import, it is marked as failed but continues to the next one.
self.assertEqual(status.state, UserTaskStatus.SUCCEEDED)
migration = ModulestoreMigration.objects.get(
source=source, target=self.learning_package
)
self.assertTrue(migration.is_failed)
migration_2 = ModulestoreMigration.objects.get(
source=source_2, target=self.learning_package
)
self.assertTrue(migration_2.is_failed)