feat: don't use OLX for tags when copying/duplicating blocks (#34386)

This commit is contained in:
Jillian
2024-04-03 03:29:57 +10:30
committed by GitHub
parent 104969c659
commit 7ad225658f
23 changed files with 571 additions and 323 deletions

View File

@@ -24,6 +24,7 @@ from xmodule.xml_block import XmlMixin
from cms.djangoapps.models.settings.course_grading import CourseGradingModel
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
import openedx.core.djangoapps.content_staging.api as content_staging_api
import openedx.core.djangoapps.content_tagging.api as content_tagging_api
from .utils import reverse_course_url, reverse_library_url, reverse_usage_url
@@ -284,6 +285,7 @@ def import_staged_content_from_user_clipboard(parent_key: UsageKey, request) ->
user_id=request.user.id,
slug_hint=user_clipboard.source_usage_key.block_id,
copied_from_block=str(user_clipboard.source_usage_key),
tags=user_clipboard.content.tags,
)
# Now handle static files that need to go into Files & Uploads:
notices = _import_files_into_course(
@@ -306,6 +308,8 @@ def _import_xml_node_to_parent(
slug_hint: str | None = None,
# UsageKey of the XBlock that this one is a copy of
copied_from_block: str | None = None,
# Content tags applied to the source XBlock(s)
tags: dict[str, str] | None = None,
) -> XBlock:
"""
Given an XML node representing a serialized XBlock (OLX), import it into modulestore 'store' as a child of the
@@ -376,7 +380,24 @@ def _import_xml_node_to_parent(
if not children_handled:
for child_node in child_nodes:
_import_xml_node_to_parent(child_node, new_xblock, store, user_id=user_id)
child_copied_from = _get_usage_key_from_node(child_node, copied_from_block) if copied_from_block else None
_import_xml_node_to_parent(
child_node,
new_xblock,
store,
user_id=user_id,
copied_from_block=str(child_copied_from),
tags=tags,
)
# Copy content tags to the new xblock
if copied_from_block and tags:
object_tags = tags.get(str(copied_from_block))
if object_tags:
content_tagging_api.set_all_object_tags(
content_key=new_xblock.location,
object_tags=object_tags,
)
return new_xblock
@@ -471,3 +492,24 @@ def is_item_in_course_tree(item):
ancestor = ancestor.get_parent()
return ancestor is not None
def _get_usage_key_from_node(node, parent_id: str) -> UsageKey | None:
"""
Returns the UsageKey for the given node and parent ID.
If the parent_id is not a valid UsageKey, or there's no "url_name" attribute in the node, then will return None.
"""
parent_key = UsageKey.from_string(parent_id)
parent_context = parent_key.context_key
usage_key = None
block_id = node.attrib.get("url_name")
block_type = node.tag
if parent_context and block_id and block_type:
usage_key = parent_context.make_usage_key(
block_type=block_type,
block_id=block_id,
)
return usage_key

View File

@@ -143,54 +143,143 @@ class ClipboardPasteTestCase(ModuleStoreTestCase):
# The new block should store a reference to where it was copied from
assert dest_block.copied_from_block == str(source_block.location)
def test_copy_and_paste_unit_with_tags(self):
def _setup_tagged_content(self, course_key) -> dict:
"""
Test copying a unit (vertical) with tags from one course into another
Create and tag content to use in copy/paste tests.
"""
course_key, client = self._setup_course()
dest_course = CourseFactory.create(display_name='Destination Course')
with self.store.bulk_operations(dest_course.id):
dest_chapter = BlockFactory.create(parent=dest_course, category='chapter', display_name='Section')
dest_sequential = BlockFactory.create(parent=dest_chapter, category='sequential', display_name='Subsection')
# Add a couple more blocks to test tagging different types of blocks.
# We add these here instead of in sample_courses.py to avoid breaking modulestore tests.
unit_key = course_key.make_usage_key("vertical", "vertical_test")
with self.store.bulk_operations(course_key):
discussion_block_key = BlockFactory.create(
parent=self.store.get_item(unit_key),
category='discussion',
display_name='Toy_forum',
publish_item=True,
).location
with self.store.bulk_operations(course_key):
html_block_key = BlockFactory.create(
parent=self.store.get_item(unit_key),
category="html",
display_name="Toy_text",
publish_item=True,
).location
library = ClipboardLibraryContentPasteTestCase.setup_library()
with self.store.bulk_operations(course_key):
library_content_block_key = BlockFactory.create(
parent=self.store.get_item(unit_key),
category="library_content",
source_library_id=str(library.key),
display_name="LC Block",
publish_item=True,
).location
# Add tags to the unit
taxonomy_all_org = tagging_api.create_taxonomy("test_taxonomy", "Test Taxonomy")
taxonomy_all_org = tagging_api.create_taxonomy(
"test_taxonomy",
"Test Taxonomy",
export_id="ALL_ORGS",
)
tagging_api.set_taxonomy_orgs(taxonomy_all_org, all_orgs=True)
Tag.objects.create(taxonomy=taxonomy_all_org, value="tag_1")
Tag.objects.create(taxonomy=taxonomy_all_org, value="tag_2")
for tag_value in ('tag_1', 'tag_2', 'tag_3', 'tag_4', 'tag_5', 'tag_6', 'tag_7'):
Tag.objects.create(taxonomy=taxonomy_all_org, value=tag_value)
tagging_api.tag_object(
object_id=str(unit_key),
taxonomy=taxonomy_all_org,
tags=["tag_1", "tag_2"],
)
taxonomy_all_org_removed = tagging_api.create_taxonomy("test_taxonomy_removed", "Test Taxonomy Removed")
# Tag some sub-blocks with different tags
video_block_key = course_key.make_usage_key("video", "sample_video")
tagging_api.tag_object(
object_id=str(video_block_key),
taxonomy=taxonomy_all_org,
tags=["tag_3"],
)
poll_block_key = course_key.make_usage_key("poll_question", "T1_changemind_poll_foo_2")
tagging_api.tag_object(
object_id=str(poll_block_key),
taxonomy=taxonomy_all_org,
tags=["tag_4"],
)
tagging_api.tag_object(
object_id=str(discussion_block_key),
taxonomy=taxonomy_all_org,
tags=["tag_5"],
)
tagging_api.tag_object(
object_id=str(html_block_key),
taxonomy=taxonomy_all_org,
tags=["tag_6"],
)
tagging_api.tag_object(
object_id=str(library_content_block_key),
taxonomy=taxonomy_all_org,
tags=["tag_7"],
)
# Tag our blocks using a taxonomy we'll remove before pasting -- so these tags won't be pasted
all_block_keys = {
key.block_type: key
for key in (
unit_key,
video_block_key,
poll_block_key,
discussion_block_key,
html_block_key,
library_content_block_key,
)
}
taxonomy_all_org_removed = tagging_api.create_taxonomy(
"test_taxonomy_removed",
"Test Taxonomy Removed",
export_id="REMOVE_ME",
)
tagging_api.set_taxonomy_orgs(taxonomy_all_org_removed, all_orgs=True)
Tag.objects.create(taxonomy=taxonomy_all_org_removed, value="tag_1")
Tag.objects.create(taxonomy=taxonomy_all_org_removed, value="tag_2")
tagging_api.tag_object(
object_id=str(unit_key),
taxonomy=taxonomy_all_org_removed,
tags=["tag_1", "tag_2"],
)
tagging_api.get_object_tags(str(unit_key))
for object_key in all_block_keys.values():
tagging_api.tag_object(
object_id=str(object_key),
taxonomy=taxonomy_all_org_removed,
tags=["tag_1", "tag_2"],
)
# Tag our blocks using a taxonomy that isn't enabled for any orgs -- these tags won't be pasted
taxonomy_no_org = tagging_api.create_taxonomy("test_taxonomy_no_org", "Test Taxonomy No Org")
Tag.objects.create(taxonomy=taxonomy_no_org, value="tag_1")
Tag.objects.create(taxonomy=taxonomy_no_org, value="tag_2")
tagging_api.tag_object(
object_id=str(unit_key),
taxonomy=taxonomy_no_org,
tags=["tag_1", "tag_2"],
)
for object_key in all_block_keys.values():
tagging_api.tag_object(
object_id=str(object_key),
taxonomy=taxonomy_no_org,
tags=["tag_1", "tag_2"],
)
return all_block_keys
def test_copy_and_paste_unit_with_tags(self):
"""
Test copying a unit (vertical) with tags from one course into another
"""
course_key, client = self._setup_course()
source_block_keys = self._setup_tagged_content(course_key)
dest_course = CourseFactory.create(display_name='Destination Course')
with self.store.bulk_operations(dest_course.id):
dest_chapter = BlockFactory.create(parent=dest_course, category='chapter', display_name='Section')
dest_sequential = BlockFactory.create(parent=dest_chapter, category='sequential', display_name='Subsection')
# Copy the unit
unit_key = source_block_keys['vertical']
copy_response = client.post(CLIPBOARD_ENDPOINT, {"usage_key": str(unit_key)}, format="json")
assert copy_response.status_code == 200
taxonomy_all_org_removed.delete()
# Delete one of the taxonomies used, to test that their tags aren't pasted.
tagging_api.get_taxonomy_by_export_id('REMOVE_ME').delete()
# Paste the unit
paste_response = client.post(XBLOCK_ENDPOINT, {
@@ -206,6 +295,34 @@ class ClipboardPasteTestCase(ModuleStoreTestCase):
assert str(tags[0]) == f'<ObjectTag> {dest_unit_key}: test_taxonomy=tag_1'
assert str(tags[1]) == f'<ObjectTag> {dest_unit_key}: test_taxonomy=tag_2'
# Ensure that the pasted child blocks were tagged too.
dest_tags, _ = tagging_api.get_all_object_tags(dest_course.id)
dest_block_ids = {
UsageKey.from_string(block_id).block_type: block_id
for block_id in dest_tags
}
taxonomy_all_org = tagging_api.get_taxonomy_by_export_id('ALL_ORGS')
dest_video_id = dest_block_ids.get('video')
assert dest_video_id, f"No tags pasted from {source_block_keys['video']}?"
assert dest_tags.get(dest_video_id, {}).get(taxonomy_all_org.id) == ["tag_3"]
dest_poll_id = dest_block_ids.get('poll_question')
assert dest_poll_id, f"No tags pasted from {source_block_keys['poll_question']}?"
assert dest_tags.get(dest_poll_id, {}).get(taxonomy_all_org.id) == ["tag_4"]
dest_discussion_id = dest_block_ids.get('discussion')
assert dest_discussion_id, f"No tags pasted from {source_block_keys['discussion']}?"
assert dest_tags.get(dest_discussion_id, {}).get(taxonomy_all_org.id) == ["tag_5"]
dest_html_id = dest_block_ids.get('html')
assert dest_html_id, f"No tags pasted from {source_block_keys['html']}?"
assert dest_tags.get(dest_html_id, {}).get(taxonomy_all_org.id) == ["tag_6"]
dest_library_id = dest_block_ids.get('library_content')
assert dest_library_id, f"No tags pasted from {source_block_keys['library_content']}?"
assert dest_tags.get(dest_library_id, {}).get(taxonomy_all_org.id) == ["tag_7"]
def test_paste_with_assets(self):
"""
When pasting into a different course, any required static assets should
@@ -289,7 +406,28 @@ class ClipboardLibraryContentPasteTestCase(ModuleStoreTestCase):
self.client = APIClient()
self.client.login(username=self.user.username, password=self.user_password)
self.store = modulestore()
# Create a content library:
library = self.setup_library()
# Create a library content block (lc), point it out our library, and sync it.
self.course = CourseFactory.create(display_name='Course')
self.orig_lc_block = BlockFactory.create(
parent=self.course,
category="library_content",
source_library_id=str(library.key),
display_name="LC Block",
publish_item=False,
)
self.dest_lc_block = None
self._sync_lc_block_from_library('orig_lc_block')
orig_child = self.store.get_item(self.orig_lc_block.children[0])
assert orig_child.display_name == "MCQ"
@classmethod
def setup_library(cls):
"""
Creates and returns a content library.
"""
library = library_api.create_library(
library_type=library_api.COMPLEX,
org=Organization.objects.create(name="Test Org", short_name="CL-TEST"),
@@ -310,21 +448,7 @@ class ClipboardLibraryContentPasteTestCase(ModuleStoreTestCase):
</problem>
""")
library_api.publish_changes(library.key)
# Create a library content block (lc), point it out our library, and sync it.
self.course = CourseFactory.create(display_name='Course')
self.orig_lc_block = BlockFactory.create(
parent=self.course,
category="library_content",
source_library_id=str(library.key),
display_name="LC Block",
publish_item=False,
)
self.dest_lc_block = None
self._sync_lc_block_from_library('orig_lc_block')
orig_child = self.store.get_item(self.orig_lc_block.children[0])
assert orig_child.display_name == "MCQ"
return library
def test_paste_library_content_block(self):
"""

View File

@@ -129,7 +129,6 @@ from django.urls import reverse_lazy
from lms.djangoapps.lms_xblock.mixin import LmsBlockMixin
from cms.lib.xblock.authoring_mixin import AuthoringMixin
from cms.lib.xblock.tagging.tagged_block_mixin import TaggedBlockMixin
from xmodule.modulestore.edit_info import EditInfoMixin
from openedx.core.djangoapps.theming.helpers_dirs import (
get_themes_unchecked,
@@ -996,7 +995,6 @@ XBLOCK_MIXINS = (
XModuleMixin,
EditInfoMixin,
AuthoringMixin,
TaggedBlockMixin,
)
XBLOCK_EXTRA_MIXINS = ()

View File

@@ -1,132 +0,0 @@
"""
Content tagging functionality for XBlocks.
"""
from urllib.parse import quote, unquote
class TaggedBlockMixin:
"""
Mixin containing XML serializing and parsing functionality for tagged blocks
"""
def __init__(self, *args, **kwargs):
"""
Initialize the tagged xblock.
"""
# We store tags internally, without an XBlock field, because we don't want tags to be stored to the modulestore.
# But we do want them persisted on duplicate, copy, or export/import.
self.tags_v1 = ""
@property
def tags_v1(self) -> str:
"""
Returns the serialized tags.
"""
return self._tags_v1
@tags_v1.setter
def tags_v1(self, tags: str) -> None:
"""
Returns the serialized tags.
"""
self._tags_v1 = tags
@classmethod
def serialize_tag_data(cls, usage_id):
"""
Serialize a block's tag data to include in the xml, escaping special characters
Example tags:
LightCast Skills Taxonomy: ["Typing", "Microsoft Office"]
Open Canada Skills Taxonomy: ["MS Office", "<some:;,skill/|=>"]
Example serialized tags:
lightcast-skills:Typing,Microsoft Office;open-canada-skills:MS Office,%3Csome%3A%3B%2Cskill%2F%7C%3D%3E
"""
# This import is done here since we import and use TaggedBlockMixin in the cms settings, but the
# content_tagging app wouldn't have loaded yet, so importing it outside causes an error
from openedx.core.djangoapps.content_tagging.api import get_object_tags
content_tags = get_object_tags(usage_id)
serialized_tags = []
taxonomies_and_tags = {}
for tag in content_tags:
taxonomy_export_id = tag.taxonomy.export_id
if not taxonomies_and_tags.get(taxonomy_export_id):
taxonomies_and_tags[taxonomy_export_id] = []
# Escape special characters in tag values, except spaces (%20) for better readability
escaped_tag = quote(tag.value).replace("%20", " ")
taxonomies_and_tags[taxonomy_export_id].append(escaped_tag)
for taxonomy in taxonomies_and_tags:
merged_tags = ','.join(taxonomies_and_tags.get(taxonomy))
serialized_tags.append(f"{taxonomy}:{merged_tags}")
return ";".join(serialized_tags)
def add_tags_to_node(self, node):
"""
Serialize and add tag data (if any) to node
"""
tag_data = self.serialize_tag_data(self.scope_ids.usage_id)
if tag_data:
node.set('tags-v1', tag_data)
def add_tags_from_field(self):
"""
Parse tags_v1 data and create tags for this block.
"""
# This import is done here since we import and use TaggedBlockMixin in the cms settings, but the
# content_tagging app wouldn't have loaded yet, so importing it outside causes an error
from openedx.core.djangoapps.content_tagging.api import set_object_tags
tag_data = self.tags_v1
if not tag_data:
return
serialized_tags = tag_data.split(';')
taxonomy_and_tags_dict = {}
for serialized_tag in serialized_tags:
taxonomy_export_id, tags = serialized_tag.split(':')
tags = tags.split(',')
tag_values = [unquote(tag) for tag in tags]
taxonomy_and_tags_dict[taxonomy_export_id] = tag_values
set_object_tags(self.usage_key, taxonomy_and_tags_dict)
def add_xml_to_node(self, node):
"""
Include the serialized tag data in XML when adding to node
"""
super().add_xml_to_node(node)
self.add_tags_to_node(node)
def studio_post_duplicate(self, store, source_item) -> bool:
"""
Duplicates content tags from the source_item.
Returns False to indicate the children have not been handled.
"""
if hasattr(super(), 'studio_post_duplicate'):
super().studio_post_duplicate()
self.tags_v1 = self.serialize_tag_data(source_item.scope_ids.usage_id)
self.add_tags_from_field()
return False
def studio_post_paste(self, store, source_node) -> bool:
"""
Copies content tags from the source_node.
Returns False to indicate the children have not been handled.
"""
if hasattr(super(), 'studio_post_paste'):
super().studio_post_paste()
if 'tags-v1' in source_node.attrib:
self.tags_v1 = str(source_node.attrib['tags-v1'])
self.add_tags_from_field()
return False

View File

@@ -51,9 +51,9 @@ class StudioDocumentsTest(SharedModuleStoreTestCase):
tagging_api.add_tag_to_taxonomy(cls.subject_tags, tag="Jump Links", parent_tag_value="Hypertext")
# Tag stuff:
tagging_api.tag_object(cls.problem_block.usage_key, cls.difficulty_tags, tags=["Easy"])
tagging_api.tag_object(cls.html_block_key, cls.subject_tags, tags=["Chinese", "Jump Links"])
tagging_api.tag_object(cls.html_block_key, cls.difficulty_tags, tags=["Normal"])
tagging_api.tag_object(str(cls.problem_block.usage_key), cls.difficulty_tags, tags=["Easy"])
tagging_api.tag_object(str(cls.html_block_key), cls.subject_tags, tags=["Chinese", "Jump Links"])
tagging_api.tag_object(str(cls.html_block_key), cls.difficulty_tags, tags=["Normal"])
def test_problem_block(self):
"""

View File

@@ -89,7 +89,6 @@ from openedx_learning.core.publishing import api as publishing_api
from openedx_learning.core.contents import api as contents_api
from openedx_learning.core.components import api as components_api
from openedx_learning.core.components.models import Component
from openedx_tagging.core.tagging import api as tagging_api
from organizations.models import Organization
from xblock.core import XBlock
from xblock.exceptions import XBlockNotFoundError
@@ -597,9 +596,12 @@ def _get_library_component_tags_count(library_key) -> dict:
"""
Get the count of tags that are applied to each component in this library, as a dict.
"""
# Import content_tagging.api here to avoid circular imports
from openedx.core.djangoapps.content_tagging.api import get_object_tag_counts
# Create a pattern to match the IDs of the library components, e.g. "lb:org:id*"
library_key_pattern = str(library_key).replace("lib:", "lb:", 1) + "*"
return tagging_api.get_object_tag_counts(library_key_pattern, count_implicit=True)
return get_object_tag_counts(library_key_pattern, count_implicit=True)
def get_library_components(library_key, text_search=None, block_types=None) -> QuerySet[Component]:

View File

@@ -19,7 +19,7 @@ class StagedContentAdmin(admin.ModelAdmin):
list_display = ('id', 'user', 'created', 'purpose', 'status', 'block_type', 'display_name', 'suggested_url_name')
list_filter = ('purpose', 'status', 'block_type')
search_fields = ('user__username', 'display_name', 'suggested_url_name')
readonly_fields = ('id', 'user', 'created', 'purpose', 'status', 'block_type', 'olx')
readonly_fields = ('id', 'user', 'created', 'purpose', 'status', 'block_type', 'olx', 'tags')
inlines = (StagedContentFileInline, )

View File

@@ -39,6 +39,7 @@ def get_user_clipboard(user_id: int, only_ready: bool = True) -> UserClipboardDa
status=content.status,
block_type=content.block_type,
display_name=content.display_name,
tags=content.tags,
),
source_usage_key=clipboard.source_usage_key,
)

View File

@@ -42,6 +42,7 @@ class StagedContentData:
status: StagedContentStatus = field(validator=validators.in_(StagedContentStatus), converter=StagedContentStatus)
block_type: str = field(validator=validators.instance_of(str))
display_name: str = field(validator=validators.instance_of(str))
tags: dict = field(validator=validators.optional(validators.instance_of(dict)))
@frozen

View File

@@ -0,0 +1,18 @@
# Generated by Django 4.2.10 on 2024-03-28 05:48
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('content_staging', '0003_olx_unicode'),
]
operations = [
migrations.AddField(
model_name='stagedcontent',
name='tags',
field=models.JSONField(help_text='Content tags applied to these blocks', null=True),
),
]

View File

@@ -64,6 +64,9 @@ class StagedContent(models.Model):
# a new url_name instead.
suggested_url_name = models.CharField(max_length=1024)
# Tags applied to the original source block(s) will be copied to the new block(s) on paste.
tags = models.JSONField(null=True, help_text=_("Content tags applied to these blocks"))
@property
def olx_filename(self) -> str:
""" Get a filename that can be used for the OLX content of this staged content """

View File

@@ -145,6 +145,7 @@ class ClipboardEndpoint(APIView):
olx=block_data.olx_str,
display_name=block_metadata_utils.display_name_with_default(block),
suggested_url_name=usage_key.block_id,
tags=block_data.tags,
)
(clipboard, _created) = UserClipboard.objects.update_or_create(user=request.user, defaults={
"content": staged_content,

View File

@@ -13,8 +13,8 @@ from openedx_tagging.core.tagging.models import ObjectTag, Taxonomy
from organizations.models import Organization
from .models import TaxonomyOrg
from .types import ContentKey, ObjectTagByObjectIdDict, TagValuesByTaxonomyExportIdDict, TaxonomyDict
from .utils import check_taxonomy_context_key_org, get_context_key_from_key
from .types import ContentKey, TagValuesByObjectIdDict, TagValuesByTaxonomyIdDict, TaxonomyDict
from .utils import check_taxonomy_context_key_org, get_content_key_from_string, get_context_key_from_key
def create_taxonomy(
@@ -132,13 +132,17 @@ def get_unassigned_taxonomies(enabled=True) -> QuerySet:
def get_all_object_tags(
content_key: LibraryLocatorV2 | CourseKey,
) -> tuple[ObjectTagByObjectIdDict, TaxonomyDict]:
content_key: ContentKey,
prefetch_orgs: bool = False,
) -> tuple[TagValuesByObjectIdDict, TaxonomyDict]:
"""
Get all the object tags applied to components in the given course/library.
Includes any tags applied to the course/library as a whole.
Returns a tuple with a dictionary of grouped object tags for all blocks and a dictionary of taxonomies.
Returns a tuple with a dictionary of grouped object tag values for all blocks and a dictionary of taxonomies.
If `prefetch_orgs` is set, then the returned ObjectTag taxonomies will have their TaxonomyOrgs prefetched,
which makes checking permissions faster.
"""
context_key_str = str(content_key)
# We use a block_id_prefix (i.e. the modified course id) to get the tags for the children of the Content
@@ -148,23 +152,33 @@ def get_all_object_tags(
elif isinstance(content_key, LibraryLocatorV2):
block_id_prefix = context_key_str.replace("lib:", "lb:", 1)
else:
raise NotImplementedError(f"Invalid content_key: {type(content_key)} -> {content_key}")
# No context, so we'll just match the object_id, with no prefix.
block_id_prefix = None
# There is no API method in oel_tagging.api that does this yet,
# so for now we have to build the ORM query directly.
all_object_tags = list(ObjectTag.objects.filter(
Q(object_id__startswith=block_id_prefix) | Q(object_id=content_key),
Q(tag__isnull=False, tag__taxonomy__isnull=False),
).select_related("tag__taxonomy"))
object_id_clause = Q(object_id=content_key)
if block_id_prefix:
object_id_clause |= Q(object_id__startswith=block_id_prefix)
grouped_object_tags: ObjectTagByObjectIdDict = {}
all_object_tags = ObjectTag.objects.filter(
Q(tag__isnull=False, tag__taxonomy__isnull=False),
object_id_clause,
).select_related("tag__taxonomy")
if prefetch_orgs:
all_object_tags = all_object_tags.prefetch_related("tag__taxonomy__taxonomyorg_set")
grouped_object_tags: TagValuesByObjectIdDict = {}
taxonomies: TaxonomyDict = {}
for object_id, block_tags in groupby(all_object_tags, lambda x: x.object_id):
grouped_object_tags[object_id] = {}
for taxonomy_id, taxonomy_tags in groupby(block_tags, lambda x: x.tag.taxonomy_id if x.tag else 0):
object_tags_list = list(taxonomy_tags)
grouped_object_tags[object_id][taxonomy_id] = object_tags_list
grouped_object_tags[object_id][taxonomy_id] = [
tag.value for tag in object_tags_list
]
if taxonomy_id not in taxonomies:
assert object_tags_list[0].tag
@@ -174,41 +188,94 @@ def get_all_object_tags(
return grouped_object_tags, taxonomies
def set_object_tags(
def set_all_object_tags(
content_key: ContentKey,
object_tags: TagValuesByTaxonomyExportIdDict,
object_tags: TagValuesByTaxonomyIdDict,
) -> None:
"""
Sets the tags for the given content object.
"""
context_key = get_context_key_from_key(content_key)
for taxonomy_export_id, tags_values in object_tags.items():
taxonomy = oel_tagging.get_taxonomy_by_export_id(taxonomy_export_id)
for taxonomy_id, tags_values in object_tags.items():
taxonomy = oel_tagging.get_taxonomy(taxonomy_id)
if not taxonomy:
continue
if not check_taxonomy_context_key_org(taxonomy, context_key):
continue
oel_tagging.tag_object(
tag_object(
object_id=str(content_key),
taxonomy=taxonomy,
tags=tags_values,
)
def copy_object_tags(
source_content_key: ContentKey,
dest_content_key: ContentKey,
) -> None:
"""
Copies the permitted object tags on source_object_id to dest_object_id.
If an source object tag is not available for use on the dest_object_id, it will not be copied.
"""
all_object_tags, taxonomies = get_all_object_tags(
content_key=source_content_key,
prefetch_orgs=True,
)
source_object_tags = all_object_tags.get(str(source_content_key), {})
for taxonomy_id, taxonomy in taxonomies.items():
tags = source_object_tags.get(taxonomy_id, [])
tag_object(
object_id=str(dest_content_key),
taxonomy=taxonomy,
tags=tags,
)
def tag_object(
object_id: str,
taxonomy: Taxonomy,
tags: list[str],
object_tag_class: type[ObjectTag] = ObjectTag,
) -> None:
"""
Replaces the existing ObjectTag entries for the given taxonomy + object_id
with the given list of tags, if the taxonomy can be used by the given object_id.
tags: A list of the values of the tags from this taxonomy to apply.
object_tag_class: Optional. Use a proxy subclass of ObjectTag for additional
validation. (e.g. only allow tagging certain types of objects.)
Raised Tag.DoesNotExist if the proposed tags are invalid for this taxonomy.
Preserves existing (valid) tags, adds new (valid) tags, and removes omitted
(or invalid) tags.
"""
content_key = get_content_key_from_string(object_id)
context_key = get_context_key_from_key(content_key)
if check_taxonomy_context_key_org(taxonomy, context_key):
oel_tagging.tag_object(
object_id=object_id,
taxonomy=taxonomy,
tags=tags,
)
# Expose the oel_tagging APIs
add_tag_to_taxonomy = oel_tagging.add_tag_to_taxonomy
update_tag_in_taxonomy = oel_tagging.update_tag_in_taxonomy
delete_tags_from_taxonomy = oel_tagging.delete_tags_from_taxonomy
get_taxonomy = oel_tagging.get_taxonomy
get_taxonomy_by_export_id = oel_tagging.get_taxonomy_by_export_id
get_taxonomies = oel_tagging.get_taxonomies
get_tags = oel_tagging.get_tags
get_object_tag_counts = oel_tagging.get_object_tag_counts
delete_object_tags = oel_tagging.delete_object_tags
resync_object_tags = oel_tagging.resync_object_tags
get_object_tags = oel_tagging.get_object_tags
tag_object = oel_tagging.tag_object
add_tag_to_taxonomy = oel_tagging.add_tag_to_taxonomy

View File

@@ -8,6 +8,7 @@ import logging
from django.dispatch import receiver
from openedx_events.content_authoring.data import (
CourseData,
DuplicatedXBlockData,
XBlockData,
LibraryBlockData,
)
@@ -16,13 +17,15 @@ from openedx_events.content_authoring.signals import (
XBLOCK_CREATED,
XBLOCK_DELETED,
XBLOCK_UPDATED,
XBLOCK_DUPLICATED,
LIBRARY_BLOCK_CREATED,
LIBRARY_BLOCK_UPDATED,
LIBRARY_BLOCK_DELETED,
)
from .tasks import delete_course_tags
from .api import copy_object_tags
from .tasks import (
delete_course_tags,
delete_xblock_tags,
update_course_tags,
update_xblock_tags,
@@ -125,3 +128,19 @@ def delete_tag_library_block(**kwargs):
delete_library_block_tags(str(library_block_data.usage_key))
except Exception as err: # pylint: disable=broad-except
log.error(f"Failed to delete library block tags: {err}")
@receiver(XBLOCK_DUPLICATED)
def duplicate_tags(**kwargs):
"""
Duplicates tags associated with an XBlock whenever the block is duplicated to a new location.
"""
xblock_data = kwargs.get("xblock_info", None)
if not xblock_data or not isinstance(xblock_data, DuplicatedXBlockData):
log.error("Received null or incorrect data for event")
return
copy_object_tags(
xblock_data.source_usage_key,
xblock_data.usage_key,
)

View File

@@ -15,7 +15,7 @@ import openedx.core.djangoapps.content_libraries.api as library_api
from openedx.core.djangoapps.content_libraries.api import LibraryXBlockMetadata
from xmodule.modulestore.django import modulestore
from ...types import ObjectTagByObjectIdDict, ObjectTagByTaxonomyIdDict
from ...types import TagValuesByObjectIdDict, TagValuesByTaxonomyIdDict
@define
@@ -26,7 +26,7 @@ class TaggedContent:
display_name: str
block_id: str
category: str
object_tags: ObjectTagByTaxonomyIdDict
object_tags: TagValuesByTaxonomyIdDict
children: list[TaggedContent] | None
@@ -43,7 +43,7 @@ def iterate_with_level(
def _get_course_tagged_object_and_children(
course_key: CourseKey, object_tag_cache: ObjectTagByObjectIdDict
course_key: CourseKey, object_tag_cache: TagValuesByObjectIdDict
) -> tuple[TaggedContent, list[XBlock]]:
"""
Returns a TaggedContent with course metadata with its tags, and its children.
@@ -68,7 +68,7 @@ def _get_course_tagged_object_and_children(
def _get_library_tagged_object_and_children(
library_key: LibraryLocatorV2, object_tag_cache: ObjectTagByObjectIdDict
library_key: LibraryLocatorV2, object_tag_cache: TagValuesByObjectIdDict
) -> tuple[TaggedContent, list[LibraryXBlockMetadata]]:
"""
Returns a TaggedContent with library metadata with its tags, and its children.
@@ -97,7 +97,7 @@ def _get_library_tagged_object_and_children(
def _get_xblock_tagged_object_and_children(
usage_key: UsageKey, object_tag_cache: ObjectTagByObjectIdDict
usage_key: UsageKey, object_tag_cache: TagValuesByObjectIdDict
) -> tuple[TaggedContent, list[XBlock]]:
"""
Returns a TaggedContent with xblock metadata with its tags, and its children.
@@ -117,7 +117,7 @@ def _get_xblock_tagged_object_and_children(
def _get_library_block_tagged_object(
library_block: LibraryXBlockMetadata, object_tag_cache: ObjectTagByObjectIdDict
library_block: LibraryXBlockMetadata, object_tag_cache: TagValuesByObjectIdDict
) -> tuple[TaggedContent, None]:
"""
Returns a TaggedContent with library content block metadata and its tags,
@@ -137,7 +137,7 @@ def _get_library_block_tagged_object(
def build_object_tree_with_objecttags(
content_key: LibraryLocatorV2 | CourseKey,
object_tag_cache: ObjectTagByObjectIdDict,
object_tag_cache: TagValuesByObjectIdDict,
) -> TaggedContent:
"""
Returns the object with the tags associated with it.

View File

@@ -41,7 +41,7 @@ class TaggedCourseMixin(TestGetAllObjectTagsMixin, ModuleStoreTestCase): # type
category="course",
children=[],
object_tags={
self.taxonomy_1.id: list(self.course_tags),
self.taxonomy_1.id: [tag.value for tag in self.course_tags],
},
)
@@ -58,8 +58,8 @@ class TaggedCourseMixin(TestGetAllObjectTagsMixin, ModuleStoreTestCase): # type
category="sequential",
children=[],
object_tags={
self.taxonomy_1.id: list(self.sequential_tags1),
self.taxonomy_2.id: list(self.sequential_tags2),
self.taxonomy_1.id: [tag.value for tag in self.sequential_tags1],
self.taxonomy_2.id: [tag.value for tag in self.sequential_tags2],
},
)
@@ -108,7 +108,7 @@ class TaggedCourseMixin(TestGetAllObjectTagsMixin, ModuleStoreTestCase): # type
category="vertical",
children=[],
object_tags={
self.taxonomy_2.id: list(self.vertical1_tags),
self.taxonomy_2.id: [tag.value for tag in self.vertical1_tags],
},
)
assert tagged_sequential.children is not None # type guard
@@ -140,7 +140,7 @@ class TaggedCourseMixin(TestGetAllObjectTagsMixin, ModuleStoreTestCase): # type
category="html",
children=[],
object_tags={
self.taxonomy_2.id: list(self.html_tags),
self.taxonomy_2.id: [tag.value for tag in self.html_tags],
},
)
assert untagged_vertical2.children is not None # type guard
@@ -169,7 +169,7 @@ class TaggedCourseMixin(TestGetAllObjectTagsMixin, ModuleStoreTestCase): # type
category="library",
children=[],
object_tags={
self.taxonomy_2.id: list(self.library_tags),
self.taxonomy_2.id: [tag.value for tag in self.library_tags],
},
)
@@ -184,7 +184,7 @@ class TaggedCourseMixin(TestGetAllObjectTagsMixin, ModuleStoreTestCase): # type
category="problem",
children=[],
object_tags={
self.taxonomy_1.id: list(self.problem1_tags),
self.taxonomy_1.id: [tag.value for tag in self.problem1_tags],
},
)
@@ -212,8 +212,8 @@ class TaggedCourseMixin(TestGetAllObjectTagsMixin, ModuleStoreTestCase): # type
category="html",
children=[],
object_tags={
self.taxonomy_1.id: list(self.library_html_tags1),
self.taxonomy_2.id: list(self.library_html_tags2),
self.taxonomy_1.id: [tag.value for tag in self.library_html_tags1],
self.taxonomy_2.id: [tag.value for tag in self.library_html_tags2],
},
)

View File

@@ -34,6 +34,7 @@ from common.djangoapps.student.tests.factories import UserFactory
from openedx.core.djangoapps.content_libraries.api import AccessLevel, create_library, set_library_user_permissions
from openedx.core.djangoapps.content_tagging import api as tagging_api
from openedx.core.djangoapps.content_tagging.models import TaxonomyOrg
from openedx.core.djangoapps.content_tagging.utils import rules_cache
from openedx.core.djangolib.testing.utils import skip_unless_cms
@@ -285,6 +286,9 @@ class TestTaxonomyObjectsMixin:
self._setUp_users()
self._setUp_taxonomies()
# Clear the rules cache in between test runs to keep query counts consistent.
rules_cache.clear()
@skip_unless_cms
@ddt.ddt

View File

@@ -192,10 +192,8 @@ class ObjectTagExportView(APIView):
# Add the tags for each taxonomy
for taxonomy_id in taxonomies:
if taxonomy_id in item.object_tags:
block_data[f"taxonomy_{taxonomy_id}"] = ", ".join([
object_tag.value
for object_tag in item.object_tags[taxonomy_id]
])
tag_values = item.object_tags[taxonomy_id]
block_data[f"taxonomy_{taxonomy_id}"] = ", ".join(tag_values)
yield csv_writer.writerow(block_data)

View File

@@ -3,13 +3,13 @@ import time
import ddt
from django.test.testcases import TestCase
from opaque_keys.edx.keys import CourseKey
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import LibraryLocatorV2
from openedx_tagging.core.tagging.models import ObjectTag, Tag
from openedx_tagging.core.tagging.models import ObjectTag
from organizations.models import Organization
from .. import api
from ..utils import rules_cache
class TestTaxonomyMixin:
@@ -42,25 +42,25 @@ class TestTaxonomyMixin:
self.taxonomy_no_orgs = api.create_taxonomy(name="No orgs")
# Tags
self.tag_disabled = Tag.objects.create(
self.tag_disabled = api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_disabled,
value="learning",
tag="learning",
)
self.tag_all_orgs = Tag.objects.create(
self.tag_all_orgs = api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_all_orgs,
value="learning",
tag="learning",
)
self.tag_both_orgs = Tag.objects.create(
self.tag_both_orgs = api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_both_orgs,
value="learning",
tag="learning",
)
self.tag_one_org = Tag.objects.create(
self.tag_one_org = api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_one_org,
value="learning",
tag="learning",
)
self.tag_no_orgs = Tag.objects.create(
self.tag_no_orgs = api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_no_orgs,
value="learning",
tag="learning",
)
# ObjectTags
api.tag_object(
@@ -79,7 +79,9 @@ class TestTaxonomyMixin:
self.all_orgs_block_tag = api.get_object_tags(
object_id="block-v1:Ax+DemoX+Demo_Course+type@vertical+block@abcde",
)[0]
api.tag_object(
# Force apply these tags: Ax and OeX are not an allowed org for these taxonomies
api.oel_tagging.tag_object(
object_id="course-v1:Ax+DemoX+Demo_Course",
taxonomy=self.taxonomy_both_orgs,
tags=[self.tag_both_orgs.value],
@@ -87,7 +89,7 @@ class TestTaxonomyMixin:
self.both_orgs_course_tag = api.get_object_tags(
object_id="course-v1:Ax+DemoX+Demo_Course",
)[0]
api.tag_object(
api.oel_tagging.tag_object(
object_id="block-v1:OeX+DemoX+Demo_Course+type@video+block@abcde",
taxonomy=self.taxonomy_both_orgs,
tags=[self.tag_both_orgs.value],
@@ -95,7 +97,7 @@ class TestTaxonomyMixin:
self.both_orgs_block_tag = api.get_object_tags(
object_id="block-v1:OeX+DemoX+Demo_Course+type@video+block@abcde",
)[0]
api.tag_object(
api.oel_tagging.tag_object(
object_id="block-v1:OeX+DemoX+Demo_Course+type@html+block@abcde",
taxonomy=self.taxonomy_one_org,
tags=[self.tag_one_org.value],
@@ -103,7 +105,7 @@ class TestTaxonomyMixin:
self.one_org_block_tag = api.get_object_tags(
object_id="block-v1:OeX+DemoX+Demo_Course+type@html+block@abcde",
)[0]
api.tag_object(
api.oel_tagging.tag_object(
object_id="course-v1:Ax+DemoX+Demo_Course",
taxonomy=self.taxonomy_disabled,
tags=[self.tag_disabled.value],
@@ -115,6 +117,9 @@ class TestTaxonomyMixin:
self.taxonomy_disabled.save()
self.disabled_course_tag.refresh_from_db() # Update its cached .taxonomy
# Clear the rules cache in between test runs
rules_cache.clear()
@ddt.ddt
class TestAPITaxonomy(TestTaxonomyMixin, TestCase):
@@ -246,28 +251,27 @@ class TestGetAllObjectTagsMixin:
def setUp(self):
super().setUp()
self.orgA = Organization.objects.create(name="Organization A", short_name="orgA")
self.taxonomy_1 = api.create_taxonomy(name="Taxonomy 1")
api.set_taxonomy_orgs(self.taxonomy_1, all_orgs=True)
Tag.objects.create(
api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_1,
value="Tag 1.1",
tag="Tag 1.1",
)
Tag.objects.create(
api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_1,
value="Tag 1.2",
tag="Tag 1.2",
)
self.taxonomy_2 = api.create_taxonomy(name="Taxonomy 2")
api.set_taxonomy_orgs(self.taxonomy_2, all_orgs=True)
Tag.objects.create(
api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_2,
value="Tag 2.1",
tag="Tag 2.1",
)
Tag.objects.create(
api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_2,
value="Tag 2.2",
tag="Tag 2.2",
)
api.tag_object(
@@ -277,6 +281,14 @@ class TestGetAllObjectTagsMixin:
)
self.course_tags = api.get_object_tags("course-v1:orgA+test_course+test_run")
self.orgA = Organization.objects.create(name="Organization A", short_name="orgA")
self.orgB = Organization.objects.create(name="Organization B", short_name="orgB")
self.taxonomy_3 = api.create_taxonomy(name="Taxonomy 3", orgs=[self.orgA])
api.add_tag_to_taxonomy(
taxonomy=self.taxonomy_3,
tag="Tag 3.1",
)
# Tag blocks
api.tag_object(
object_id="block-v1:orgA+test_course+test_run+type@sequential+block@test_sequential",
@@ -329,17 +341,17 @@ class TestGetAllObjectTagsMixin:
self.expected_course_objecttags = {
"course-v1:orgA+test_course+test_run": {
self.taxonomy_1.id: list(self.course_tags),
self.taxonomy_1.id: [tag.value for tag in self.course_tags],
},
"block-v1:orgA+test_course+test_run+type@sequential+block@test_sequential": {
self.taxonomy_1.id: list(self.sequential_tags1),
self.taxonomy_2.id: list(self.sequential_tags2),
self.taxonomy_1.id: [tag.value for tag in self.sequential_tags1],
self.taxonomy_2.id: [tag.value for tag in self.sequential_tags2],
},
"block-v1:orgA+test_course+test_run+type@vertical+block@test_vertical1": {
self.taxonomy_2.id: list(self.vertical1_tags),
self.taxonomy_2.id: [tag.value for tag in self.vertical1_tags],
},
"block-v1:orgA+test_course+test_run+type@html+block@test_html": {
self.taxonomy_2.id: list(self.html_tags),
self.taxonomy_2.id: [tag.value for tag in self.html_tags],
},
}
@@ -398,21 +410,21 @@ class TestGetAllObjectTagsMixin:
self.expected_library_objecttags = {
f"lib:orgA:lib_{self.block_suffix}": {
self.taxonomy_2.id: list(self.library_tags),
self.taxonomy_2.id: [tag.value for tag in self.library_tags],
},
f"lb:orgA:lib_{self.block_suffix}:problem:problem1_{self.block_suffix}": {
self.taxonomy_1.id: list(self.problem1_tags),
self.taxonomy_1.id: [tag.value for tag in self.problem1_tags],
},
f"lb:orgA:lib_{self.block_suffix}:html:html_{self.block_suffix}": {
self.taxonomy_1.id: list(self.library_html_tags1),
self.taxonomy_2.id: list(self.library_html_tags2),
self.taxonomy_1.id: [tag.value for tag in self.library_html_tags1],
self.taxonomy_2.id: [tag.value for tag in self.library_html_tags2],
},
}
class TestGetAllObjectTags(TestGetAllObjectTagsMixin, TestCase):
class TestAPIObjectTags(TestGetAllObjectTagsMixin, TestCase):
"""
Test get_all_object_tags api function
Tests object tag API functions.
"""
def test_get_course_object_tags(self):
@@ -444,3 +456,55 @@ class TestGetAllObjectTags(TestGetAllObjectTagsMixin, TestCase):
self.taxonomy_1.id: self.taxonomy_1,
self.taxonomy_2.id: self.taxonomy_2,
}
def _test_copy_object_tags(self, src_key, dst_key, expected_tags):
"""
Test copying object tags to a new object.
"""
# Destination block doesn't have any tags yet
with self.assertNumQueries(1):
assert not list(api.get_object_tags(object_id=str(dst_key)))
# Copy tags from the source block
api.copy_object_tags(src_key, dst_key)
with self.assertNumQueries(1):
dst_tags = list(api.get_object_tags(object_id=str(dst_key)))
# Check that the destination tags match the expected list (name + value only; object_id will differ)
with self.assertNumQueries(0):
assert len(dst_tags) == len(expected_tags)
for idx, src_tag in enumerate(expected_tags):
dst_tag = dst_tags[idx]
assert src_tag.name == dst_tag.name
assert src_tag.value == dst_tag.value
def test_copy_object_tags(self):
"""
Test copying object tags to a new object.
"""
src_key = UsageKey.from_string("block-v1:orgA+test_course+test_run+type@sequential+block@test_sequential")
dst_key = UsageKey.from_string("block-v1:orgB+test_course+test_run+type@sequential+block@test_sequential")
expected_tags = list(self.sequential_tags1) + list(self.sequential_tags2)
with self.assertNumQueries(30): # TODO why so high?
self._test_copy_object_tags(src_key, dst_key, expected_tags)
def test_copy_cross_org_tags(self):
"""
Test copying object tags to a new object in a different org.
Ensure only the permitted tags are copied.
"""
src_key = UsageKey.from_string("block-v1:orgA+test_course+test_run+type@sequential+block@test_sequential")
dst_key = UsageKey.from_string("block-v1:orgB+test_course+test_run+type@sequential+block@test_sequential")
# Add another tag from an orgA-specific taxonomy
api.tag_object(
object_id=str(src_key),
taxonomy=self.taxonomy_3,
tags=["Tag 3.1"],
)
# Destination block should have all of the source block's tags, except for the orgA-specific one.
expected_tags = list(self.sequential_tags1) + list(self.sequential_tags2)
with self.assertNumQueries(31): # TODO why so high?
self._test_copy_object_tags(src_key, dst_key, expected_tags)

View File

@@ -7,12 +7,11 @@ from typing import Dict, List, Union
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import LibraryLocatorV2
from openedx_tagging.core.tagging.models import ObjectTag, Taxonomy
from openedx_tagging.core.tagging.models import Taxonomy
ContentKey = Union[LibraryLocatorV2, CourseKey, UsageKey]
ContextKey = Union[LibraryLocatorV2, CourseKey]
ObjectTagByTaxonomyIdDict = Dict[int, List[ObjectTag]]
ObjectTagByObjectIdDict = Dict[str, ObjectTagByTaxonomyIdDict]
TagValuesByTaxonomyIdDict = Dict[int, List[str]]
TagValuesByObjectIdDict = Dict[str, TagValuesByTaxonomyIdDict]
TaxonomyDict = Dict[int, Taxonomy]
TagValuesByTaxonomyExportIdDict = Dict[str, List[str]]

View File

@@ -10,10 +10,8 @@ from opaque_keys.edx.locator import LibraryLocatorV2
from openedx_tagging.core.tagging.models import Taxonomy
from organizations.models import Organization
from openedx.core.djangoapps.content_libraries.api import get_libraries_for_user
from .types import ContentKey, ContextKey
from .models import TaxonomyOrg
from .types import ContentKey, ContextKey
def get_content_key_from_string(key_str: str) -> ContentKey:
@@ -85,6 +83,12 @@ class TaggingRulesCache:
"""
self.request_cache = RequestCache('openedx.core.djangoapps.content_tagging.utils')
def clear(self):
"""
Clears the rules cache.
"""
self.request_cache.clear()
def get_orgs(self, org_names: list[str] | None = None) -> list[Organization]:
"""
Returns the Organizations with the given name(s), or all Organizations if no names given.
@@ -114,6 +118,9 @@ class TaggingRulesCache:
These library orgs are cached for the duration of the request.
"""
# Import the content_libraries api here to avoid circular imports.
from openedx.core.djangoapps.content_libraries.api import get_libraries_for_user
cache_key = f'library_orgs:{user.id}'
library_orgs = self.request_cache.data.get(cache_key)
if library_orgs is None:

View File

@@ -7,7 +7,7 @@ import os
from lxml import etree
from cms.lib.xblock.tagging.tagged_block_mixin import TaggedBlockMixin
from openedx.core.djangoapps.content_tagging.api import get_all_object_tags, TagValuesByObjectIdDict
from .data import StaticFile
from . import utils
@@ -20,6 +20,7 @@ class XBlockSerializer:
A class that can serialize an XBlock to OLX.
"""
static_files: list[StaticFile]
tags: TagValuesByObjectIdDict
def __init__(self, block):
"""
@@ -28,6 +29,7 @@ class XBlockSerializer:
"""
self.orig_block_key = block.scope_ids.usage_id
self.static_files = []
self.tags = {}
olx_node = self._serialize_block(block)
self.olx_str = etree.tostring(olx_node, encoding="unicode", pretty_print=True)
@@ -52,9 +54,17 @@ class XBlockSerializer:
def _serialize_block(self, block) -> etree.Element:
""" Serialize an XBlock to OLX/XML. """
if block.scope_ids.usage_id.block_type == 'html':
return self._serialize_html_block(block)
olx = self._serialize_html_block(block)
else:
return self._serialize_normal_block(block)
olx = self._serialize_normal_block(block)
# Store the block's tags
block_key = block.scope_ids.usage_id
block_id = str(block_key)
object_tags, _ = get_all_object_tags(content_key=block_key)
self.tags[block_id] = object_tags.get(block_id, {})
return olx
def _serialize_normal_block(self, block) -> etree.Element:
"""
@@ -90,12 +100,13 @@ class XBlockSerializer:
data = fh.read()
self.static_files.append(StaticFile(name=unit_file.name, data=data, url=None))
# Serialize and add tag data if any
if isinstance(block, TaggedBlockMixin):
block.add_tags_to_node(olx_node)
if block.has_children:
self._serialize_children(block, olx_node)
# Ensure there's a url_name attribute, so we can resurrect child usage keys.
if "url_name" not in olx_node.attrib:
olx_node.attrib["url_name"] = block.scope_ids.usage_id.block_id
return olx_node
def _serialize_children(self, block, parent_olx_node):
@@ -120,10 +131,6 @@ class XBlockSerializer:
if block.use_latex_compiler:
olx_node.attrib["use_latex_compiler"] = "true"
# Serialize and add tag data if any
if isinstance(block, TaggedBlockMixin):
block.add_tags_to_node(olx_node)
# Escape any CDATA special chars
escaped_block_data = block.data.replace("]]>", "]]&gt;")
olx_node.text = etree.CDATA("\n" + escaped_block_data + "\n")

View File

@@ -80,7 +80,7 @@ EXPECTED_OPENASSESSMENT_OLX = """
teams_enabled="False"
selected_teamset_id=""
show_rubric_during_response="False"
tags-v1="t1-export-id:%3Cspecial %22%27-%3D%2C. %7C%3D chars %3E tag,anotherTag,normal tag"
url_name="Tagged_OpenAssessment_Block"
>
<title>Open Response Assessment</title>
<assessments>
@@ -207,7 +207,7 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
Tag.objects.create(taxonomy=cls.taxonomy2, value="tag", parent=root2)
Tag.objects.create(taxonomy=cls.taxonomy2, value="other tag", parent=root2)
def assertXmlEqual(self, xml_str_a: str, xml_str_b: str) -> bool:
def assertXmlEqual(self, xml_str_a: str, xml_str_b: str) -> None:
""" Assert that the given XML strings are equal, ignoring attribute order and some whitespace variations. """
self.assertEqual(
ElementTree.canonicalize(xml_str_a, strip_text=True),
@@ -429,32 +429,35 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
# Add a bunch of tags
tagging_api.tag_object(
object_id=unit.location,
object_id=str(unit.location),
taxonomy=self.taxonomy1,
tags=["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"]
)
tagging_api.tag_object(
object_id=unit.location,
object_id=str(unit.location),
taxonomy=self.taxonomy2,
tags=["tag", "other tag"]
)
# Check that the tags data in included in the OLX and properly escaped
# Check that the tags data is serialized and omitted from the OLX
serialized = api.serialize_xblock_to_olx(unit)
expected_serialized_tags = (
"t1-export-id:%3Cspecial %22%27-%3D%2C. %7C%3D chars %3E tag,anotherTag,normal tag;"
"t2-export-id:other tag,tag"
)
self.assertXmlEqual(
serialized.olx_str,
f"""
"""
<vertical
display_name="Tagged Unit"
url_name="Tagged_Unit"
tags-v1="{expected_serialized_tags}"
/>
"""
)
self.assertEqual(
serialized.tags, {
str(unit.location): {
self.taxonomy1.id: ["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"],
self.taxonomy2.id: ["tag", "other tag"],
}
}
)
def test_tagged_html_block(self):
"""
@@ -474,36 +477,39 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
# Add a bunch of tags
tagging_api.tag_object(
object_id=html_block.location,
object_id=str(html_block.location),
taxonomy=self.taxonomy1,
tags=["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"]
)
tagging_api.tag_object(
object_id=html_block.location,
object_id=str(html_block.location),
taxonomy=self.taxonomy2,
tags=["tag", "other tag"]
)
# Check that the tags data in included in the OLX and properly escaped
# Check that the tags data is serialized and omitted from the OLX
serialized = api.serialize_xblock_to_olx(html_block)
expected_serialized_tags = (
"t1-export-id:%3Cspecial %22%27-%3D%2C. %7C%3D chars %3E tag,anotherTag,normal tag;"
"t2-export-id:other tag,tag"
)
self.assertXmlEqual(
serialized.olx_str,
f"""
"""
<html
url_name="Tagged_Non-default_HTML_Block"
display_name="Tagged Non-default HTML Block"
editor="raw"
use_latex_compiler="true"
tags-v1="{expected_serialized_tags}"
><![CDATA[
🍔
]]></html>
"""
)
self.assertEqual(
serialized.tags, {
str(html_block.location): {
self.taxonomy1.id: ["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"],
self.taxonomy2.id: ["tag", "other tag"],
}
}
)
def test_tagged_problem_blocks(self):
"""
@@ -535,63 +541,69 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
# Add a bunch of tags to the problem blocks
tagging_api.tag_object(
object_id=regular_problem.location,
object_id=str(regular_problem.location),
taxonomy=self.taxonomy1,
tags=["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"]
)
tagging_api.tag_object(
object_id=regular_problem.location,
object_id=str(regular_problem.location),
taxonomy=self.taxonomy2,
tags=["tag", "other tag"]
)
tagging_api.tag_object(
object_id=python_problem.location,
object_id=str(python_problem.location),
taxonomy=self.taxonomy1,
tags=["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"]
)
tagging_api.tag_object(
object_id=python_problem.location,
object_id=str(python_problem.location),
taxonomy=self.taxonomy2,
tags=["tag", "other tag"]
)
# Check that the tags data in included in the OLX and properly escaped
# Check that the tags data is serialized and omitted from the OLX.
serialized = api.serialize_xblock_to_olx(regular_problem)
expected_serialized_tags = (
"t1-export-id:%3Cspecial %22%27-%3D%2C. %7C%3D chars %3E tag,anotherTag,normal tag;"
"t2-export-id:other tag,tag"
)
self.assertXmlEqual(
serialized.olx_str,
f"""
"""
<problem
display_name="Tagged Problem No Python"
url_name="Tagged_Problem_No_Python"
max_attempts="3"
tags-v1="{expected_serialized_tags}"
>
<optionresponse></optionresponse>
</problem>
"""
)
self.assertEqual(
serialized.tags, {
str(regular_problem.location): {
self.taxonomy1.id: ["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"],
self.taxonomy2.id: ["tag", "other tag"],
}
}
)
serialized = api.serialize_xblock_to_olx(python_problem)
expected_serialized_tags = (
"t1-export-id:%3Cspecial %22%27-%3D%2C. %7C%3D chars %3E tag,anotherTag,normal tag;"
"t2-export-id:other tag,tag"
)
self.assertXmlEqual(
serialized.olx_str,
f"""
"""
<problem
display_name="Tagged Python Problem"
url_name="Tagged_Python_Problem"
tags-v1="{expected_serialized_tags}"
>
This uses python: <script type="text/python">...</script>...
</problem>
"""
)
self.assertEqual(
serialized.tags, {
str(python_problem.location): {
self.taxonomy1.id: ["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"],
self.taxonomy2.id: ["tag", "other tag"],
}
}
)
def test_tagged_library_content_blocks(self):
"""
@@ -609,12 +621,12 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
# Add a bunch of tags to the library content block
tagging_api.tag_object(
object_id=lc_block.location,
object_id=str(lc_block.location),
taxonomy=self.taxonomy1,
tags=["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"]
)
# Check that the tags data in included in the OLX and properly escaped
# Check that the tags data is serialized, omitted from the OLX, and properly escaped
serialized = api.serialize_xblock_to_olx(lc_block)
self.assertXmlEqual(
serialized.olx_str,
@@ -624,10 +636,14 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
max_count="1"
source_library_id="{str(lib.location.library_key)}"
url_name="Tagged_LC_Block"
tags-v1="t1-export-id:%3Cspecial %22%27-%3D%2C. %7C%3D chars %3E tag,anotherTag,normal tag"
/>
"""
)
self.assertEqual(serialized.tags, {
str(lc_block.location): {
self.taxonomy1.id: ["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"],
}
})
def test_tagged_video_block(self):
"""
@@ -642,12 +658,12 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
# Add tags to video block
tagging_api.tag_object(
object_id=video_block.location,
object_id=str(video_block.location),
taxonomy=self.taxonomy1,
tags=["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"]
)
# Check that the tags data in included in the OLX and properly escaped
# Check that the tags data is serialized and omitted from the OLX.
serialized = api.serialize_xblock_to_olx(video_block)
self.assertXmlEqual(
serialized.olx_str,
@@ -656,10 +672,14 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
youtube="1.00:3_yD_cEKoCk"
url_name="Tagged_Video_Block"
display_name="Tagged Video Block"
tags-v1="t1-export-id:%3Cspecial %22%27-%3D%2C. %7C%3D chars %3E tag,anotherTag,normal tag"
/>
"""
)
self.assertEqual(serialized.tags, {
str(video_block.location): {
self.taxonomy1.id: ["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"],
}
})
def test_tagged_openassessment_block(self):
"""
@@ -674,14 +694,19 @@ class XBlockSerializationTestCase(SharedModuleStoreTestCase):
# Add a tags to openassessment block
tagging_api.tag_object(
object_id=openassessment_block.location,
object_id=str(openassessment_block.location),
taxonomy=self.taxonomy1,
tags=["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"]
)
# Check that the tags data in included in the OLX and properly escaped
# Check that the tags data is serialized and omitted from the OLX
serialized = api.serialize_xblock_to_olx(openassessment_block)
self.assertXmlEqual(
serialized.olx_str,
EXPECTED_OPENASSESSMENT_OLX
)
self.assertEqual(serialized.tags, {
str(openassessment_block.location): {
self.taxonomy1.id: ["normal tag", "<special \"'-=,. |= chars > tag", "anotherTag"],
}
})