refactor: parallelize content block update on discard
This commit is contained in:
committed by
Navin Karkera
parent
65258117d8
commit
2efff2697b
@@ -143,7 +143,7 @@ def _wait_for_meili_task(info: TaskInfo) -> None:
|
||||
raise MeilisearchError(err_reason)
|
||||
|
||||
|
||||
def _wait_for_meili_tasks(info_list: list[TaskInfo]) -> None:
|
||||
def wait_for_meili_tasks(info_list: list[TaskInfo]) -> None:
|
||||
"""
|
||||
Simple helper method to wait for multiple Meilisearch tasks to complete
|
||||
"""
|
||||
@@ -231,7 +231,7 @@ def _recurse_children(block, fn, status_cb: Callable[[str], None] | None = None)
|
||||
fn(child)
|
||||
|
||||
|
||||
def _update_index_docs(docs) -> None:
|
||||
def _update_index_docs(docs, wait=True) -> list[TaskInfo]:
|
||||
"""
|
||||
Helper function that updates the documents in the search index
|
||||
|
||||
@@ -249,7 +249,9 @@ def _update_index_docs(docs) -> None:
|
||||
tasks.append(client.index(current_rebuild_index_name).update_documents(docs))
|
||||
tasks.append(client.index(STUDIO_INDEX_NAME).update_documents(docs))
|
||||
|
||||
_wait_for_meili_tasks(tasks)
|
||||
if wait:
|
||||
wait_for_meili_tasks(tasks)
|
||||
return tasks
|
||||
|
||||
|
||||
def only_if_meilisearch_enabled(f):
|
||||
@@ -464,10 +466,10 @@ def delete_index_doc(usage_key: UsageKey) -> None:
|
||||
tasks.append(client.index(current_rebuild_index_name).delete_document(meili_id_from_opaque_key(usage_key)))
|
||||
tasks.append(client.index(STUDIO_INDEX_NAME).delete_document(meili_id_from_opaque_key(usage_key)))
|
||||
|
||||
_wait_for_meili_tasks(tasks)
|
||||
wait_for_meili_tasks(tasks)
|
||||
|
||||
|
||||
def delete_index_docs(meili_ids: list[str]) -> None:
|
||||
def delete_index_docs(meili_ids: list[str], wait=True) -> list[TaskInfo]:
|
||||
"""
|
||||
Deletes documents for the given XBlocks from the search index
|
||||
|
||||
@@ -484,7 +486,9 @@ def delete_index_docs(meili_ids: list[str]) -> None:
|
||||
tasks.append(client.index(current_rebuild_index_name).delete_documents(meili_ids))
|
||||
tasks.append(client.index(STUDIO_INDEX_NAME).delete_documents(meili_ids))
|
||||
|
||||
_wait_for_meili_tasks(tasks)
|
||||
if wait:
|
||||
wait_for_meili_tasks(tasks)
|
||||
return tasks
|
||||
|
||||
|
||||
def upsert_library_block_index_doc(usage_key: UsageKey) -> None:
|
||||
@@ -502,7 +506,7 @@ def upsert_library_block_index_doc(usage_key: UsageKey) -> None:
|
||||
_update_index_docs(docs)
|
||||
|
||||
|
||||
def upsert_content_library_index_docs(library_key: LibraryLocatorV2) -> None:
|
||||
def upsert_content_library_index_docs(library_key: LibraryLocatorV2, wait=True) -> None:
|
||||
"""
|
||||
Creates or updates the documents for the given Content Library in the search index
|
||||
"""
|
||||
@@ -512,14 +516,15 @@ def upsert_content_library_index_docs(library_key: LibraryLocatorV2) -> None:
|
||||
doc = searchable_doc_for_library_block(metadata)
|
||||
docs.append(doc)
|
||||
|
||||
_update_index_docs(docs)
|
||||
return _update_index_docs(docs, wait=wait)
|
||||
|
||||
|
||||
def delete_content_library_index_docs(library_key: LibraryLocatorV2) -> None:
|
||||
def delete_content_library_index_docs(library_key: LibraryLocatorV2, wait=True) -> None:
|
||||
"""
|
||||
Deletes the discarded draft documents for the given Content Library in the search index
|
||||
"""
|
||||
meili_ids = []
|
||||
# draft version and published version equal to False/null means those drafts were discarded.
|
||||
for component in lib_api.get_library_components(library_key, draft=False, published=False):
|
||||
usage_key = LibraryUsageLocatorV2(
|
||||
library_key,
|
||||
@@ -527,7 +532,7 @@ def delete_content_library_index_docs(library_key: LibraryLocatorV2) -> None:
|
||||
component.local_key,
|
||||
)
|
||||
meili_ids.append(meili_id_from_opaque_key(usage_key))
|
||||
delete_index_docs(meili_ids)
|
||||
return delete_index_docs(meili_ids, wait=wait)
|
||||
|
||||
|
||||
def upsert_block_tags_index_docs(usage_key: UsageKey):
|
||||
|
||||
@@ -136,7 +136,8 @@ def content_library_updated_handler(**kwargs) -> None:
|
||||
log.error("Received null or incorrect data for event")
|
||||
return
|
||||
|
||||
update_content_library_index_docs.delay(str(content_library_data.library_key))
|
||||
if content_library_data.update_blocks:
|
||||
update_content_library_index_docs.delay(str(content_library_data.library_key))
|
||||
|
||||
|
||||
@receiver(CONTENT_OBJECT_TAGS_CHANGED)
|
||||
|
||||
@@ -9,9 +9,9 @@ import logging
|
||||
from celery import shared_task
|
||||
from celery_utils.logged_task import LoggedTask
|
||||
from edx_django_utils.monitoring import set_code_owner_attribute
|
||||
from meilisearch.errors import MeilisearchError
|
||||
from opaque_keys.edx.keys import UsageKey
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
|
||||
from meilisearch.errors import MeilisearchError
|
||||
|
||||
from . import api
|
||||
|
||||
@@ -80,5 +80,6 @@ def update_content_library_index_docs(library_key_str: str) -> None:
|
||||
|
||||
log.info("Updating content index documents for library with id: %s", library_key)
|
||||
|
||||
api.delete_content_library_index_docs(library_key)
|
||||
api.upsert_content_library_index_docs(library_key)
|
||||
tasks = api.delete_content_library_index_docs(library_key, wait=False) + \
|
||||
api.upsert_content_library_index_docs(library_key, wait=False)
|
||||
api.wait_for_meili_tasks(tasks)
|
||||
|
||||
Reference in New Issue
Block a user