Files
edx-platform/cms/djangoapps/contentstore/views/transcripts_ajax.py
Kyle McCormick c70bfe980a build!: Switch to openedx-core (renamed from openedx-learning) (#38011)
build!: Switch to openedx-core (renamed from openedx-learning)

Instead of installing openedx-learning==0.32.0, we install openedx-core==0.34.1.
We update various class names, function names, docstrings, and comments to
represent the rename:

* We say "openedx-core" when referring to the whole repo or PyPI project
  * or occasionally "Open edX Core" if we want it to look nice in the docs.
* We say "openedx_content" to refer to the Content API within openedx-core,
   which is actually the thing we have been calling "Learning Core" all along.
  * In snake-case code, it's `*_openedx_content_*`.
  * In camel-case code, it's `*OpenedXContent*`

For consistency's sake we avoid anything else like oex_core, OeXCore,
OpenEdXCore, OexContent, openedx-content, OpenEdxContent, etc.
There should be no more references to learning_core, learning-core, Learning Core,
Learning-Core, LC, openedx-learning, openedx_learning, etc.

BREAKING CHANGE: for openedx-learning/openedx-core developers:
You may need to uninstall openedx-learning and re-install openedx-core
from your venv. If running tutor, you may need to un-mount openedx-learning,
rename the directory to openedx-core, re-mount it, and re-build.
The code APIs themselves are fully backwards-compatible.

Part of: https://github.com/openedx/openedx-core/issues/470
2026-02-18 22:38:25 +00:00

784 lines
29 KiB
Python

"""
Actions manager for transcripts ajax calls.
+++++++++++++++++++++++++++++++++++++++++++
Blocks do not support rollback (pressing "Cancel" button in Studio)
All user changes are saved immediately.
"""
import json
import logging
import os
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.core.exceptions import PermissionDenied
from django.core.files.base import ContentFile
from django.http import Http404, HttpResponse
from django.utils.translation import gettext as _
from edxval.api import create_external_video, create_or_update_video_transcript
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import UsageKey, UsageKeyV2
from opaque_keys.edx.locator import LibraryLocatorV2
from cms.djangoapps.contentstore.video_storage_handlers import TranscriptProvider
from common.djangoapps.student.auth import has_course_author_access
from common.djangoapps.util.json_request import JsonResponse
from xmodule.contentstore.content import StaticContent # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.contentstore.django import contentstore # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.exceptions import NotFoundError # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.exceptions import ItemNotFoundError # lint-amnesty, pylint: disable=wrong-import-order
from openedx.core.djangoapps.video_config.transcripts_utils import ( # lint-amnesty, pylint: disable=wrong-import-order
GetTranscriptsFromYouTubeException,
Transcript,
TranscriptsRequestValidationException,
clean_video_id,
download_youtube_subs,
get_transcript,
get_transcript_for_video,
get_transcript_from_val,
get_transcript_from_youtube,
get_transcript_link_from_youtube,
get_transcript_links_from_youtube,
)
from xblocks_contrib.video.exceptions import TranscriptsGenerationException
from openedx.core.djangoapps.content_libraries import api as lib_api
from openedx.core.djangoapps.xblock import api as xblock_api
from openedx.core.djangoapps.xblock.data import CheckPerm
__all__ = [
'upload_transcripts',
'download_transcripts',
'check_transcripts',
'choose_transcripts',
'replace_transcripts',
'rename_transcripts',
]
log = logging.getLogger(__name__)
def error_response(response, message, status_code=400):
"""
Simplify similar actions: log message and return JsonResponse with message included in response.
By default return 400 (Bad Request) Response.
"""
log.debug(message)
response['status'] = message
return JsonResponse(response, status_code)
def link_video_to_component(video_component, user):
"""
Links a VAL video to the video component.
Arguments:
video_component: video block.
user: A requesting user.
Returns:
A cleaned Video ID.
"""
edx_video_id = clean_video_id(video_component.edx_video_id)
if not edx_video_id:
edx_video_id = create_external_video(display_name='external video')
video_component.edx_video_id = edx_video_id
video_component.save_with_metadata(user.id)
return edx_video_id
def save_video_transcript_in_openedx_content(
usage_key,
input_format,
transcript_content,
language_code
):
"""
Saves a video transcript with the openedx_content API.
openedx_content uses the standard `.srt` format for subtitles.
Note: SJSON is an edx-specific format that we're trying to move away from,
so for all new stuff related to openedx_content should only use `.srt`.
Arguments:
usage_key: UsageKey of the block
input_format: Input transcript format for content being passed.
transcript_content: Content of the transcript file
language_code: transcript language code
Returns:
result: A boolean indicating whether the transcript was saved or not.
video_key: Key used in video filename
"""
try:
srt_content = Transcript.convert(
content=transcript_content,
input_format=input_format,
output_format=Transcript.SRT
).encode()
filename = f"static/transcript-{language_code}.srt"
lib_api.add_library_block_static_asset_file(
usage_key,
filename,
srt_content,
)
result = True
except (TranscriptsGenerationException, UnicodeDecodeError):
result = False
return result
def save_video_transcript(edx_video_id, input_format, transcript_content, language_code):
"""
Saves a video transcript to the VAL and its content to the configured django storage(DS).
Arguments:
edx_video_id: A Video ID to associate the transcript.
input_format: Input transcript format for content being passed.
transcript_content: Content of the transcript file
language_code: transcript language code
Returns:
A boolean indicating whether the transcript was saved or not.
"""
try:
# Convert the transcript into the 'sjson' and upload it to
# configured transcript storage. For example, S3.
sjson_subs = Transcript.convert(
content=transcript_content,
input_format=input_format,
output_format=Transcript.SJSON
).encode()
create_or_update_video_transcript(
video_id=edx_video_id,
language_code=language_code,
metadata={
'provider': TranscriptProvider.CUSTOM,
'file_format': Transcript.SJSON,
'language_code': language_code
},
file_data=ContentFile(sjson_subs),
)
result = True
except (TranscriptsGenerationException, UnicodeDecodeError):
result = False
return result
def validate_video_block(request, locator):
"""
Validates video block given its locator and request. Also, checks
if requesting user has course authoring access.
Arguments:
request: WSGI request.
locator: video locator.
Returns:
A tuple containing error(or None) and video block(i.e. if validation succeeds).
Raises:
PermissionDenied: if requesting user does not have access to author the video component.
"""
error, item = None, None
try:
item = _get_item(request, {'locator': locator})
if item.category != 'video':
error = _('Transcripts are supported only for "video" blocks.')
except (InvalidKeyError, ItemNotFoundError):
error = _('Cannot find item by locator.')
return error, item
def validate_transcript_upload_data(request):
"""
Validates video transcript file.
Arguments:
request: A WSGI request's data part.
Returns:
Tuple containing an error and validated data
If there is a validation error then, validated data will be empty.
"""
error, validated_data = None, {}
data, files = request.POST, request.FILES
video_locator = data.get('locator')
edx_video_id = data.get('edx_video_id')
if not video_locator:
error = _('Video locator is required.')
elif 'transcript-file' not in files:
error = _('A transcript file is required.')
elif os.path.splitext(files['transcript-file'].name)[1][1:] != Transcript.SRT:
error = _('This transcript file type is not supported.')
elif 'edx_video_id' not in data:
error = _('Video ID is required.')
if not error:
error, video = validate_video_block(request, video_locator)
if not error:
validated_data.update({
'video': video,
'edx_video_id': clean_video_id(edx_video_id) or clean_video_id(video.edx_video_id),
'transcript_file': files['transcript-file']
})
return error, validated_data
@login_required
def upload_transcripts(request):
"""
Upload transcripts for current block.
returns: response dict::
status: 'Success' and HTTP 200 or 'Error' and HTTP 400.
subs: Value of uploaded and saved html5 sub field in video item.
"""
error, validated_data = validate_transcript_upload_data(request)
if error:
response = JsonResponse({'status': error}, status=400)
else:
video = validated_data['video']
edx_video_id = validated_data['edx_video_id']
transcript_file = validated_data['transcript_file']
# check if we need to create an external VAL video to associate the transcript
# and save its ID on the video component.
if not edx_video_id:
edx_video_id = create_external_video(display_name='external video')
video.edx_video_id = edx_video_id
video.save_with_metadata(request.user.id)
response = JsonResponse({'edx_video_id': edx_video_id, 'status': 'Success'}, status=200)
try:
# Convert 'srt' transcript into the 'sjson' and upload it to
# configured transcript storage. For example, S3.
sjson_subs = Transcript.convert(
content=transcript_file.read().decode('utf-8'),
input_format=Transcript.SRT,
output_format=Transcript.SJSON
).encode()
transcript_created = create_or_update_video_transcript(
video_id=edx_video_id,
language_code='en',
metadata={
'provider': TranscriptProvider.CUSTOM,
'file_format': Transcript.SJSON,
'language_code': 'en'
},
file_data=ContentFile(sjson_subs),
)
video.transcripts['en'] = f"{edx_video_id}-en.srt"
video.save_with_metadata(request.user.id)
if transcript_created is None:
response = JsonResponse({'status': 'Invalid Video ID'}, status=400)
except (TranscriptsGenerationException, UnicodeDecodeError):
response = JsonResponse({
'status': _('There is a problem with this transcript file. Try to upload a different file.')
}, status=400)
return response
@login_required
def download_transcripts(request):
"""
Passes to user requested transcripts file.
Raises Http404 if unsuccessful.
"""
error, video = validate_video_block(request, locator=request.GET.get('locator'))
if error:
raise Http404
try:
content, filename, mimetype = get_transcript(video, lang='en')
except NotFoundError:
raise Http404 # lint-amnesty, pylint: disable=raise-missing-from
# Construct an HTTP response
response = HttpResponse(content, content_type=mimetype)
response['Content-Disposition'] = f'attachment; filename="{filename}"'
return response
@login_required
def check_transcripts(request): # lint-amnesty, pylint: disable=too-many-statements
"""
Check state of transcripts availability.
request.GET['data'] has key `videos`, which can contain any of the following::
[
{u'type': u'youtube', u'video': u'OEoXaMPEzfM', u'mode': u'youtube'},
{u'type': u'html5', u'video': u'video1', u'mode': u'mp4'}
{u'type': u'html5', u'video': u'video2', u'mode': u'webm'}
]
`type` is youtube or html5
`video` is html5 or youtube video_id
`mode` is youtube, ,p4 or webm
Returns transcripts_presence dict::
html5_local: list of html5 ids, if subtitles exist locally for them;
is_youtube_mode: bool, if we have youtube_id, and as youtube mode is of higher priority, reflect this with flag;
youtube_local: bool, if youtube transcripts exist locally;
youtube_server: bool, if youtube transcripts exist on server;
youtube_diff: bool, if youtube transcripts exist on youtube server, and are different from local youtube ones;
current_item_subs: string, value of item.sub field;
status: string, 'Error' or 'Success';
subs: string, new value of item.sub field, that should be set in module;
command: string, action to front-end what to do and what to show to user.
"""
transcripts_presence = {
'html5_local': [],
'html5_equal': False,
'is_youtube_mode': False,
'youtube_local': False,
'youtube_server': False,
'youtube_diff': True,
'current_item_subs': None,
'status': 'Error',
}
try:
__, videos, item = _validate_transcripts_data(request)
except TranscriptsRequestValidationException as e:
return error_response(transcripts_presence, str(e))
transcripts_presence['status'] = 'Success'
try:
edx_video_id = clean_video_id(videos.get('edx_video_id'))
get_transcript_from_val(edx_video_id=edx_video_id, lang='en')
command = 'found'
except NotFoundError:
# Check for youtube transcripts presence
youtube_id = videos.get('youtube', None)
if youtube_id:
_check_youtube_transcripts(
transcripts_presence,
youtube_id,
item,
)
if not isinstance(item.usage_key, UsageKeyV2):
filename = f'subs_{item.sub}.srt.sjson'
content_location = StaticContent.compute_location(item.location.course_key, filename)
try:
contentstore().find(content_location).data.decode('utf-8')
transcripts_presence['current_item_subs'] = item.sub
except NotFoundError:
pass
# Check for html5 local transcripts presence
html5_subs = []
for html5_id in videos['html5']:
filename = f'subs_{html5_id}.srt.sjson'
content_location = StaticContent.compute_location(item.location.course_key, filename)
try:
html5_subs.append(contentstore().find(content_location).data)
transcripts_presence['html5_local'].append(html5_id)
except NotFoundError:
log.debug("Can't find transcripts in storage for non-youtube video_id: %s", html5_id)
if len(html5_subs) == 2: # check html5 transcripts for equality
transcripts_presence['html5_equal'] = (
json.loads(html5_subs[0].decode('utf-8')) == json.loads(html5_subs[1].decode('utf-8'))
)
command, __ = _transcripts_logic(transcripts_presence, videos)
transcripts_presence.update({'command': command})
return JsonResponse(transcripts_presence)
def _check_youtube_transcripts(transcripts_presence, youtube_id, item):
"""
Check for youtube transcripts presence
"""
transcripts_presence['is_youtube_mode'] = True
if get_transcript_link_from_youtube(youtube_id):
transcripts_presence['youtube_server'] = True
if not isinstance(item.usage_key, UsageKeyV2):
# youtube local
filename = f'subs_{youtube_id}.srt.sjson'
content_location = StaticContent.compute_location(item.location.course_key, filename)
try:
local_transcripts = contentstore().find(content_location).data.decode('utf-8')
transcripts_presence['youtube_local'] = True
except NotFoundError:
log.debug("Can't find transcripts in storage for youtube id: %s", youtube_id)
# check youtube local and server transcripts for equality
if transcripts_presence['youtube_server'] and transcripts_presence['youtube_local']:
try:
transcript_links = get_transcript_links_from_youtube(
youtube_id,
settings,
item.runtime.service(item, "i18n")
)
for (_, link) in transcript_links.items():
youtube_server_subs = get_transcript_from_youtube(
link, youtube_id, item.runtime.service(item, "i18n")
)
if json.loads(local_transcripts) == youtube_server_subs: # check transcripts for equality
transcripts_presence['youtube_diff'] = False
except GetTranscriptsFromYouTubeException:
pass
def _transcripts_logic(transcripts_presence, videos):
"""
By `transcripts_presence` content, figure what show to user:
returns: `command` and `subs`.
`command`: string, action to front-end what to do and what show to user.
`subs`: string, new value of item.sub field, that should be set in module.
`command` is one of::
replace: replace local youtube subtitles with server one's
found: subtitles are found
import: import subtitles from youtube server
choose: choose one from two html5 subtitles
not found: subtitles are not found
"""
command = None
# new value of item.sub field, that should be set in module.
subs = ''
# youtube transcripts are of high priority than html5 by design
if (
transcripts_presence['youtube_diff'] and
transcripts_presence['youtube_local'] and
transcripts_presence['youtube_server']): # youtube server and local exist
command = 'replace'
subs = videos['youtube']
elif transcripts_presence['youtube_local']: # only youtube local exist
command = 'found'
subs = videos['youtube']
elif transcripts_presence['youtube_server']: # only youtube server exist
command = 'import'
else: # html5 part
if transcripts_presence['html5_local']: # can be 1 or 2 html5 videos
if len(transcripts_presence['html5_local']) == 1 or transcripts_presence['html5_equal']:
command = 'found'
subs = transcripts_presence['html5_local'][0]
else:
command = 'choose'
subs = transcripts_presence['html5_local'][0]
else: # html5 source have no subtitles
# check if item sub has subtitles
if transcripts_presence['current_item_subs'] and not transcripts_presence['is_youtube_mode']:
log.debug("Command is use existing %s subs", transcripts_presence['current_item_subs'])
command = 'use_existing'
else:
command = 'not_found'
log.debug(
"Resulted command: %s, current transcripts: %s, youtube mode: %s",
command,
transcripts_presence['current_item_subs'],
transcripts_presence['is_youtube_mode']
)
return command, subs
def _validate_transcripts_data(request):
"""
Validates, that request contains all proper data for transcripts processing.
Returns tuple of 3 elements::
data: dict, loaded json from request,
videos: parsed `data` to useful format,
item: video item from storage or library
Raises `TranscriptsRequestValidationException` if validation is unsuccessful
or `PermissionDenied` if user has no access.
"""
data = json.loads(request.GET.get('data', '{}'))
if not data:
raise TranscriptsRequestValidationException(_('Incoming video data is empty.'))
try:
item = _get_item(request, data)
except (InvalidKeyError, ItemNotFoundError):
raise TranscriptsRequestValidationException(_("Can't find item by locator.")) # lint-amnesty, pylint: disable=raise-missing-from
if item.category != 'video':
raise TranscriptsRequestValidationException(_('Transcripts are supported only for "video" blocks.'))
# parse data form request.GET.['data']['video'] to useful format
videos = {'youtube': '', 'html5': {}}
for video_data in data.get('videos'):
if video_data['type'] == 'youtube':
videos['youtube'] = video_data['video']
elif video_data['type'] == 'edx_video_id':
if clean_video_id(video_data['video']):
videos['edx_video_id'] = video_data['video']
else: # do not add same html5 videos
if videos['html5'].get('video') != video_data['video']:
videos['html5'][video_data['video']] = video_data['mode']
return data, videos, item
def validate_transcripts_request(request, include_yt=False, include_html5=False):
"""
Validates transcript handler's request.
NOTE: This is one central validation flow for `choose_transcripts`,
`check_transcripts` and `replace_transcripts` handlers.
Returns:
A tuple containing:
1. An error message in case of validation failure.
2. validated video data
"""
error = None
validated_data = {'video': None, 'youtube': '', 'html5': {}}
# Loads the request data
data = json.loads(request.GET.get('data', '{}'))
if not data:
error = _('Incoming video data is empty.')
else:
error, video = validate_video_block(request, locator=data.get('locator'))
if not error:
validated_data.update({'video': video})
videos = data.get('videos', [])
if include_yt:
validated_data.update({
video['type']: video['video']
for video in videos
if video['type'] == 'youtube'
})
if include_html5:
validated_data['chosen_html5_id'] = data.get('html5_id')
validated_data['html5'] = {
video['video']: video['mode']
for video in videos
if video['type'] != 'youtube'
}
return error, validated_data
@login_required
def choose_transcripts(request):
"""
Create/Update edx transcript in DS with chosen html5 subtitles from contentstore.
Returns:
status `Success` and resulted `edx_video_id` value
Or error in case of validation failures.
"""
error, validated_data = validate_transcripts_request(request, include_html5=True)
edx_video_id = None
if error:
response = error_response({}, error)
else:
# 1. Retrieve transcript file for `chosen_html5_id` from contentstore.
try:
video = validated_data['video']
chosen_html5_id = validated_data['chosen_html5_id']
input_format, __, transcript_content = get_transcript_for_video(
video.location,
subs_id=chosen_html5_id,
file_name=chosen_html5_id,
language='en'
)
except NotFoundError:
return error_response({}, _('No such transcript.'))
# 2. Link a video to video component if its not already linked to one.
if not isinstance(video.usage_key.context_key, LibraryLocatorV2):
edx_video_id = link_video_to_component(video, request.user)
# 3. Upload the retrieved transcript to DS for the linked video ID.
if isinstance(video.usage_key.context_key, LibraryLocatorV2):
success = save_video_transcript_in_openedx_content(
video.usage_key,
input_format,
transcript_content,
language_code='en',
)
else:
success = save_video_transcript(
edx_video_id,
input_format,
transcript_content,
language_code='en',
)
if success:
response = JsonResponse({'edx_video_id': edx_video_id, 'status': 'Success'}, status=200)
else:
response = error_response({}, _('There is a problem with the chosen transcript file.'))
return response
@login_required
def rename_transcripts(request):
"""
Copies existing transcript on video component's `sub`(from contentstore) into the
DS for a video.
Returns:
status `Success` and resulted `edx_video_id` value
Or error in case of validation failures.
"""
error, validated_data = validate_transcripts_request(request)
edx_video_id = None
if error:
response = error_response({}, error)
else:
# 1. Retrieve transcript file for `video.sub` from contentstore.
try:
video = validated_data['video']
input_format, __, transcript_content = get_transcript_for_video(
video.location,
subs_id=video.sub,
file_name=video.sub,
language='en'
)
except NotFoundError:
return error_response({}, _('No such transcript.'))
# 2. Link a video to video component if its not already linked to one.
if not isinstance(video.usage_key.context_key, LibraryLocatorV2):
edx_video_id = link_video_to_component(video, request.user)
# 3. Upload the retrieved transcript to DS for the linked video ID.
if isinstance(video.usage_key.context_key, LibraryLocatorV2):
success = save_video_transcript_in_openedx_content(
video.usage_key,
input_format,
transcript_content,
language_code='en',
)
else:
success = save_video_transcript(
edx_video_id,
input_format,
transcript_content,
language_code='en',
)
if success:
response = JsonResponse({'edx_video_id': edx_video_id, 'status': 'Success'}, status=200)
else:
response = error_response(
{}, _('There is a problem with the existing transcript file. Please upload a different file.')
)
return response
@login_required
def replace_transcripts(request):
"""
Downloads subtitles from youtube and replaces edx transcripts in DS with youtube ones.
Returns:
status `Success` and resulted `edx_video_id` value
Or error on validation failures.
"""
error, validated_data = validate_transcripts_request(request, include_yt=True)
youtube_id = validated_data['youtube']
edx_video_id = None
if error:
response = error_response({}, error)
elif not youtube_id:
response = error_response({}, _('YouTube ID is required.'))
else:
# 1. Download transcript from YouTube.
try:
video = validated_data['video']
transcript_content = download_youtube_subs(youtube_id, video, settings)
except GetTranscriptsFromYouTubeException as e:
return error_response({}, str(e))
# 2. Link a video to video component if its not already linked to one.
if not isinstance(video.usage_key.context_key, LibraryLocatorV2):
edx_video_id = link_video_to_component(video, request.user)
# 3. Upload YT transcript to DS for the linked video ID.
success = True
for transcript in transcript_content:
[language_code, json_content] = transcript
if isinstance(video.usage_key.context_key, LibraryLocatorV2):
success = save_video_transcript_in_openedx_content(
video.usage_key,
Transcript.SJSON,
json_content,
language_code,
)
filename = f"transcript-{language_code}.srt"
else:
success = save_video_transcript(
edx_video_id,
Transcript.SJSON,
json_content,
language_code,
)
filename = f"{edx_video_id}-{language_code}.srt"
if not success:
break
video.transcripts[language_code] = filename
if success:
video.save()
response = JsonResponse({'edx_video_id': edx_video_id, 'status': 'Success'}, status=200)
else:
response = error_response({}, _('There is a problem with the YouTube transcript file.'))
return response
def _get_item(request, data):
"""
Obtains from 'data' the locator for an item.
Next, gets that item from the modulestore (allowing any errors to raise up)
or from library API if is a library content.
Finally, verifies that the user has access to the item.
Returns the item and a boolean if is a library content.
"""
usage_key = UsageKey.from_string(data.get('locator'))
context_key = usage_key.context_key
if not context_key.is_course:
if isinstance(context_key, LibraryLocatorV2):
return xblock_api.load_block(
usage_key,
request.user,
check_permission=CheckPerm.CAN_EDIT,
)
raise TranscriptsRequestValidationException(_('Transcripts are not yet supported for this type of block'))
# This is placed before has_course_author_access() to validate the location,
# because has_course_author_access() raises error if location is invalid.
item = modulestore().get_item(usage_key)
# use the item's course_key, because the usage_key might not have the run
if not has_course_author_access(request.user, item.location.course_key):
raise PermissionDenied()
return item