Files
edx-platform/xmodule/video_block/transcripts_utils.py
2023-04-26 17:10:54 +02:00

1123 lines
40 KiB
Python

"""
Utility functions for transcripts.
++++++++++++++++++++++++++++++++++
"""
import copy
import html
import logging
import os
import re
from functools import wraps
import requests
import simplejson as json
from django.conf import settings
from lxml import etree
from opaque_keys.edx.locator import BundleDefinitionLocator
from pysrt import SubRipFile, SubRipItem, SubRipTime
from pysrt.srtexc import Error
from openedx.core.djangolib import blockstore_cache
from openedx.core.lib import blockstore_api
from xmodule.contentstore.content import StaticContent
from xmodule.contentstore.django import contentstore
from xmodule.exceptions import NotFoundError
from .bumper_utils import get_bumper_settings
try:
from edxval import api as edxval_api
except ImportError:
edxval_api = None
log = logging.getLogger(__name__)
NON_EXISTENT_TRANSCRIPT = 'non_existent_dummy_file_name'
class TranscriptException(Exception):
pass
class TranscriptsGenerationException(Exception):
pass
class GetTranscriptsFromYouTubeException(Exception):
pass
class TranscriptsRequestValidationException(Exception):
pass
def exception_decorator(func):
"""
Generate NotFoundError for TranscriptsGenerationException, UnicodeDecodeError.
Args:
`func`: Input function
Returns:
'wrapper': Decorated function
"""
@wraps(func)
def wrapper(*args, **kwds):
try:
return func(*args, **kwds)
except (TranscriptsGenerationException, UnicodeDecodeError) as ex:
log.exception(str(ex))
raise NotFoundError # lint-amnesty, pylint: disable=raise-missing-from
return wrapper
def generate_subs(speed, source_speed, source_subs):
"""
Generate transcripts from one speed to another speed.
Args:
`speed`: float, for this speed subtitles will be generated,
`source_speed`: float, speed of source_subs
`source_subs`: dict, existing subtitles for speed `source_speed`.
Returns:
`subs`: dict, actual subtitles.
"""
if speed == source_speed:
return source_subs
coefficient = 1.0 * speed / source_speed
subs = {
'start': [
int(round(timestamp * coefficient)) for
timestamp in source_subs['start']
],
'end': [
int(round(timestamp * coefficient)) for
timestamp in source_subs['end']
],
'text': source_subs['text']}
return subs
def save_to_store(content, name, mime_type, location):
"""
Save named content to store by location.
Returns location of saved content.
"""
content_location = Transcript.asset_location(location, name)
content = StaticContent(content_location, name, mime_type, content)
contentstore().save(content)
return content_location
def save_subs_to_store(subs, subs_id, item, language='en'):
"""
Save transcripts into `StaticContent`.
Args:
`subs_id`: str, subtitles id
`item`: video block instance
`language`: two chars str ('uk'), language of translation of transcripts
Returns: location of saved subtitles.
"""
filedata = json.dumps(subs, indent=2).encode('utf-8')
filename = subs_filename(subs_id, language)
return save_to_store(filedata, filename, 'application/json', item.location)
def get_transcript_link_from_youtube(youtube_id):
"""
Get the link for YouTube transcript by parsing the source of the YouTube webpage.
Inside the webpage, the details of the transcripts are located in a JSON object.
After prettifying the object, it looks like:
"captions": {
"playerCaptionsTracklistRenderer": {
"captionTracks": [
{
"baseUrl": "...",
"name": {
"simpleText": "(Japanese in local language)"
},
"vssId": ".ja",
"languageCode": "ja",
"isTranslatable": true
},
{
"baseUrl": "...",
"name": {
"simpleText": "(French in local language)"
},
"vssId": ".fr",
"languageCode": "fr",
"isTranslatable": true
},
{
"baseUrl": "...",
"name": {
"simpleText": "(English in local language)"
},
"vssId": ".en",
"languageCode": "en",
"isTranslatable": true
},
...
],
"audioTracks": [...]
"translationLanguages": ...
},
...
}
So we use a regex to find the captionTracks JavaScript array, and then convert it
to a Python dict and return the link for en caption
"""
youtube_url_base = settings.YOUTUBE['TRANSCRIPTS']['YOUTUBE_URL_BASE']
try:
youtube_html = requests.get(f"{youtube_url_base}{youtube_id}")
caption_re = settings.YOUTUBE['TRANSCRIPTS']['CAPTION_TRACKS_REGEX']
caption_matched = re.search(caption_re, youtube_html.content.decode("utf-8"))
if caption_matched:
caption_tracks = json.loads(f'[{caption_matched.group("caption_tracks")}]')
for caption in caption_tracks:
if "languageCode" in caption.keys() and caption["languageCode"] == "en":
return caption["baseUrl"]
return None
except ConnectionError:
return None
def get_transcripts_from_youtube(youtube_id, settings, i18n, youtube_transcript_name=''): # lint-amnesty, pylint: disable=redefined-outer-name
"""
Gets transcripts from youtube for youtube_id.
Parses only utf-8 encoded transcripts.
Other encodings are not supported at the moment.
Returns (status, transcripts): bool, dict.
"""
_ = i18n.ugettext
utf8_parser = etree.XMLParser(encoding='utf-8')
transcript_link = get_transcript_link_from_youtube(youtube_id)
if not transcript_link:
msg = _("Can't get transcript link from Youtube for {youtube_id}.").format(
youtube_id=youtube_id,
)
raise GetTranscriptsFromYouTubeException(msg)
data = requests.get(transcript_link)
if data.status_code != 200 or not data.text:
msg = _("Can't receive transcripts from Youtube for {youtube_id}. Status code: {status_code}.").format(
youtube_id=youtube_id,
status_code=data.status_code
)
raise GetTranscriptsFromYouTubeException(msg)
sub_starts, sub_ends, sub_texts = [], [], []
xmltree = etree.fromstring(data.content, parser=utf8_parser)
for element in xmltree:
if element.tag == "text":
start = float(element.get("start"))
duration = float(element.get("dur", 0)) # dur is not mandatory
text = element.text
end = start + duration
if text:
# Start and end should be ints representing the millisecond timestamp.
sub_starts.append(int(start * 1000))
sub_ends.append(int((end + 0.0001) * 1000))
sub_texts.append(text.replace('\n', ' '))
return {'start': sub_starts, 'end': sub_ends, 'text': sub_texts}
def download_youtube_subs(youtube_id, video_block, settings): # lint-amnesty, pylint: disable=redefined-outer-name
"""
Download transcripts from Youtube.
Args:
youtube_id: str, actual youtube_id of the video.
video_block: video block instance.
We save transcripts for 1.0 speed, as for other speed conversion is done on front-end.
Returns:
Serialized sjson transcript content, if transcripts were successfully downloaded and saved.
Raises:
GetTranscriptsFromYouTubeException, if fails.
"""
i18n = video_block.runtime.service(video_block, "i18n")
_ = i18n.ugettext
subs = get_transcripts_from_youtube(youtube_id, settings, i18n)
return json.dumps(subs, indent=2)
def remove_subs_from_store(subs_id, item, lang='en'):
"""
Remove from store, if transcripts content exists.
"""
filename = subs_filename(subs_id, lang)
Transcript.delete_asset(item.location, filename)
def generate_subs_from_source(speed_subs, subs_type, subs_filedata, block, language='en'):
"""Generate transcripts from source files (like SubRip format, etc.)
and save them to assets for `item` module.
We expect, that speed of source subs equal to 1
:param speed_subs: dictionary {speed: sub_id, ...}
:param subs_type: type of source subs: "srt", ...
:param subs_filedata:unicode, content of source subs.
:param block: course or block.
:param language: str, language of translation of transcripts
:returns: True, if all subs are generated and saved successfully.
"""
_ = block.runtime.service(block, "i18n").ugettext
if subs_type.lower() != 'srt':
raise TranscriptsGenerationException(_("We support only SubRip (*.srt) transcripts format."))
try:
srt_subs_obj = SubRipFile.from_string(subs_filedata)
except Exception as ex:
msg = _("Something wrong with SubRip transcripts file during parsing. Inner message is {error_message}").format(
error_message=str(ex)
)
raise TranscriptsGenerationException(msg) # lint-amnesty, pylint: disable=raise-missing-from
if not srt_subs_obj:
raise TranscriptsGenerationException(_("Something wrong with SubRip transcripts file during parsing."))
sub_starts = []
sub_ends = []
sub_texts = []
for sub in srt_subs_obj:
sub_starts.append(sub.start.ordinal)
sub_ends.append(sub.end.ordinal)
sub_texts.append(sub.text.replace('\n', ' '))
subs = {
'start': sub_starts,
'end': sub_ends,
'text': sub_texts}
for speed, subs_id in speed_subs.items():
save_subs_to_store(
generate_subs(speed, 1, subs),
subs_id,
block,
language
)
return subs
def generate_srt_from_sjson(sjson_subs, speed):
"""Generate transcripts with speed = 1.0 from sjson to SubRip (*.srt).
:param sjson_subs: "sjson" subs.
:param speed: speed of `sjson_subs`.
:returns: "srt" subs.
"""
output = ''
equal_len = len(sjson_subs['start']) == len(sjson_subs['end']) == len(sjson_subs['text'])
if not equal_len:
return output
sjson_speed_1 = generate_subs(speed, 1, sjson_subs)
for i in range(len(sjson_speed_1['start'])):
item = SubRipItem(
index=i,
start=SubRipTime(milliseconds=sjson_speed_1['start'][i]),
end=SubRipTime(milliseconds=sjson_speed_1['end'][i]),
text=sjson_speed_1['text'][i]
)
output += (str(item))
output += '\n'
return output
def generate_sjson_from_srt(srt_subs):
"""
Generate transcripts from sjson to SubRip (*.srt).
Arguments:
srt_subs(SubRip): "SRT" subs object
Returns:
Subs converted to "SJSON" format.
"""
sub_starts = []
sub_ends = []
sub_texts = []
for sub in srt_subs:
sub_starts.append(sub.start.ordinal)
sub_ends.append(sub.end.ordinal)
sub_texts.append(sub.text.replace('\n', ' '))
sjson_subs = {
'start': sub_starts,
'end': sub_ends,
'text': sub_texts
}
return sjson_subs
def copy_or_rename_transcript(new_name, old_name, item, delete_old=False, user=None):
"""
Renames `old_name` transcript file in storage to `new_name`.
If `old_name` is not found in storage, raises `NotFoundError`.
If `delete_old` is True, removes `old_name` files from storage.
"""
filename = f'subs_{old_name}.srt.sjson'
content_location = StaticContent.compute_location(item.location.course_key, filename)
transcripts = contentstore().find(content_location).data.decode('utf-8')
save_subs_to_store(json.loads(transcripts), new_name, item)
item.sub = new_name
item.save_with_metadata(user)
if delete_old:
remove_subs_from_store(old_name, item)
def get_html5_ids(html5_sources):
"""
Helper method to parse out an HTML5 source into the ideas
NOTE: This assumes that '/' are not in the filename
"""
html5_ids = [x.split('/')[-1].rsplit('.', 1)[0] for x in html5_sources]
return html5_ids
def manage_video_subtitles_save(item, user, old_metadata=None, generate_translation=False):
"""
Does some specific things, that can be done only on save.
Video player item has some video fields: HTML5 ones and Youtube one.
If value of `sub` field of `new_item` is cleared, transcripts should be removed.
`item` is video block instance with updated values of fields,
but actually have not been saved to store yet.
`old_metadata` contains old values of XFields.
# 1.
If value of `sub` field of `new_item` is different from values of video fields of `new_item`,
and `new_item.sub` file is present, then code in this function creates copies of
`new_item.sub` file with new names. That names are equal to values of video fields of `new_item`
After that `sub` field of `new_item` is changed to one of values of video fields.
This whole action ensures that after user changes video fields, proper `sub` files, corresponding
to new values of video fields, will be presented in system.
# 2. convert /static/filename.srt to filename.srt in self.transcripts.
(it is done to allow user to enter both /static/filename.srt and filename.srt)
# 3. Generate transcripts translation only when user clicks `save` button, not while switching tabs.
a) delete sjson translation for those languages, which were removed from `item.transcripts`.
Note: we are not deleting old SRT files to give user more flexibility.
b) For all SRT files in`item.transcripts` regenerate new SJSON files.
(To avoid confusing situation if you attempt to correct a translation by uploading
a new version of the SRT file with same name).
"""
_ = item.runtime.service(item, "i18n").ugettext
# # 1.
# html5_ids = get_html5_ids(item.html5_sources)
# # Youtube transcript source should always have a higher priority than html5 sources. Appending
# # `youtube_id_1_0` at the end helps achieve this when we read transcripts list.
# possible_video_id_list = html5_ids + [item.youtube_id_1_0]
# sub_name = item.sub
# for video_id in possible_video_id_list:
# if not video_id:
# continue
# if not sub_name:
# remove_subs_from_store(video_id, item)
# continue
# # copy_or_rename_transcript changes item.sub of module
# try:
# # updates item.sub with `video_id`, if it is successful.
# copy_or_rename_transcript(video_id, sub_name, item, user=user)
# except NotFoundError:
# # subtitles file `sub_name` is not presented in the system. Nothing to copy or rename.
# log.debug(
# "Copying %s file content to %s name is failed, "
# "original file does not exist.",
# sub_name, video_id
# )
# 2.
if generate_translation:
for lang, filename in item.transcripts.items():
item.transcripts[lang] = os.path.split(filename)[-1]
# 3.
if generate_translation:
old_langs = set(old_metadata.get('transcripts', {})) if old_metadata else set()
new_langs = set(item.transcripts)
html5_ids = get_html5_ids(item.html5_sources)
possible_video_id_list = html5_ids + [item.youtube_id_1_0]
for lang in old_langs.difference(new_langs): # 3a
for video_id in possible_video_id_list:
if video_id:
remove_subs_from_store(video_id, item, lang)
reraised_message = ''
for lang in new_langs: # 3b
try:
generate_sjson_for_all_speeds(
item,
item.transcripts[lang],
{speed: subs_id for subs_id, speed in youtube_speed_dict(item).items()},
lang,
)
except TranscriptException as ex: # lint-amnesty, pylint: disable=unused-variable
pass
if reraised_message:
item.save_with_metadata(user)
raise TranscriptException(reraised_message)
def youtube_speed_dict(item):
"""
Returns {speed: youtube_ids, ...} dict for existing youtube_ids
"""
yt_ids = [item.youtube_id_0_75, item.youtube_id_1_0, item.youtube_id_1_25, item.youtube_id_1_5]
yt_speeds = [0.75, 1.00, 1.25, 1.50]
youtube_ids = {p[0]: p[1] for p in zip(yt_ids, yt_speeds) if p[0]}
return youtube_ids
def subs_filename(subs_id, lang='en'):
"""
Generate proper filename for storage.
"""
if lang == 'en':
return f'subs_{subs_id}.srt.sjson'
else:
return f'{lang}_subs_{subs_id}.srt.sjson'
def generate_sjson_for_all_speeds(block, user_filename, result_subs_dict, lang):
"""
Generates sjson from srt for given lang.
"""
_ = block.runtime.service(block, "i18n").ugettext
try:
srt_transcripts = contentstore().find(Transcript.asset_location(block.location, user_filename))
except NotFoundError as ex:
raise TranscriptException(_("{exception_message}: Can't find uploaded transcripts: {user_filename}").format( # lint-amnesty, pylint: disable=raise-missing-from
exception_message=str(ex),
user_filename=user_filename
))
if not lang:
lang = block.transcript_language
# Used utf-8-sig encoding type instead of utf-8 to remove BOM(Byte Order Mark), e.g. U+FEFF
generate_subs_from_source(
result_subs_dict,
os.path.splitext(user_filename)[1][1:],
srt_transcripts.data.decode('utf-8-sig'),
block,
lang
)
def get_or_create_sjson(block, transcripts):
"""
Get sjson if already exists, otherwise generate it.
Generate sjson with subs_id name, from user uploaded srt.
Subs_id is extracted from srt filename, which was set by user.
Args:
transcipts (dict): dictionary of (language: file) pairs.
Raises:
TranscriptException: when srt subtitles do not exist,
and exceptions from generate_subs_from_source.
"""
user_filename = transcripts[block.transcript_language]
user_subs_id = os.path.splitext(user_filename)[0]
source_subs_id, result_subs_dict = user_subs_id, {1.0: user_subs_id}
try:
sjson_transcript = Transcript.asset(block.location, source_subs_id, block.transcript_language).data
except NotFoundError: # generating sjson from srt
generate_sjson_for_all_speeds(block, user_filename, result_subs_dict, block.transcript_language)
sjson_transcript = Transcript.asset(block.location, source_subs_id, block.transcript_language).data
return sjson_transcript
def get_video_ids_info(edx_video_id, youtube_id_1_0, html5_sources):
"""
Returns list internal or external video ids.
Arguments:
edx_video_id (unicode): edx_video_id
youtube_id_1_0 (unicode): youtube id
html5_sources (list): html5 video ids
Returns:
tuple: external or internal, video ids list
"""
clean = lambda item: item.strip() if isinstance(item, str) else item
external = not bool(clean(edx_video_id))
video_ids = [edx_video_id, youtube_id_1_0] + get_html5_ids(html5_sources)
# video_ids cleanup
video_ids = [item for item in video_ids if bool(clean(item))]
return external, video_ids
def clean_video_id(edx_video_id):
"""
Cleans an edx video ID.
Arguments:
edx_video_id(unicode): edx-val's video identifier
"""
return edx_video_id and edx_video_id.strip()
def get_video_transcript_content(edx_video_id, language_code):
"""
Gets video transcript content, only if the corresponding feature flag is enabled for the given `course_id`.
Arguments:
language_code(unicode): Language code of the requested transcript
edx_video_id(unicode): edx-val's video identifier
Returns:
A dict containing transcript's file name and its sjson content.
"""
transcript = None
edx_video_id = clean_video_id(edx_video_id)
if edxval_api and edx_video_id:
try:
transcript = edxval_api.get_video_transcript_data(edx_video_id, language_code)
except ValueError:
log.exception(
f"Error getting transcript from edx-val id: {edx_video_id}: language code {language_code}"
)
content = '{"start": [1],"end": [2],"text": ["An error occured obtaining the transcript."]}'
transcript = dict(
file_name='error-{edx_video_id}-{language_code}.srt',
content=Transcript.convert(content, 'sjson', 'srt')
)
return transcript
def get_available_transcript_languages(edx_video_id):
"""
Gets available transcript languages for a video.
Arguments:
edx_video_id(unicode): edx-val's video identifier
Returns:
A list containing distinct transcript language codes against all the passed video ids.
"""
available_languages = []
edx_video_id = clean_video_id(edx_video_id)
if edxval_api and edx_video_id:
available_languages = edxval_api.get_available_transcript_languages(video_id=edx_video_id)
return available_languages
def convert_video_transcript(file_name, content, output_format):
"""
Convert video transcript into desired format
Arguments:
file_name: name of transcript file along with its extension
content: transcript content stream
output_format: the format in which transcript will be converted
Returns:
A dict containing the new transcript filename and the content converted into desired format.
"""
name_and_extension = os.path.splitext(file_name)
basename, input_format = name_and_extension[0], name_and_extension[1][1:]
filename = f'{basename}.{output_format}'
converted_transcript = Transcript.convert(content, input_format=input_format, output_format=output_format)
return dict(filename=filename, content=converted_transcript)
class Transcript:
"""
Container for transcript methods.
"""
SRT = 'srt'
TXT = 'txt'
SJSON = 'sjson'
mime_types = {
SRT: 'application/x-subrip; charset=utf-8',
TXT: 'text/plain; charset=utf-8',
SJSON: 'application/json',
}
@staticmethod
def convert(content, input_format, output_format):
"""
Convert transcript `content` from `input_format` to `output_format`.
Accepted input formats: sjson, srt.
Accepted output format: srt, txt, sjson.
Raises:
TranscriptsGenerationException: On parsing the invalid srt content during conversion from srt to sjson.
"""
assert input_format in ('srt', 'sjson')
assert output_format in ('txt', 'srt', 'sjson')
if input_format == output_format:
return content
if input_format == 'srt':
# Standardize content into bytes for later decoding.
if isinstance(content, str):
content = content.encode('utf-8')
if output_format == 'txt':
text = SubRipFile.from_string(content.decode('utf-8')).text
return html.unescape(text)
elif output_format == 'sjson':
try:
srt_subs = SubRipFile.from_string(
# Skip byte order mark(BOM) character
content.decode('utf-8-sig'),
error_handling=SubRipFile.ERROR_RAISE
)
except Error as ex: # Base exception from pysrt
raise TranscriptsGenerationException(str(ex)) from ex
return json.dumps(generate_sjson_from_srt(srt_subs))
if input_format == 'sjson':
# If the JSON file content is bytes, try UTF-8, then Latin-1
if isinstance(content, bytes):
try:
content_str = content.decode('utf-8')
except UnicodeDecodeError:
content_str = content.decode('latin-1')
else:
content_str = content
try:
content_dict = json.loads(content_str)
except ValueError:
truncated = content_str[:100].strip()
log.exception(
f"Failed to convert {input_format} to {output_format} for {repr(truncated)}..."
)
content_dict = {"start": [1], "end": [2], "text": ["An error occured obtaining the transcript."]}
if output_format == 'txt':
text = content_dict['text']
text_without_none = [line if line else '' for line in text]
return html.unescape("\n".join(text_without_none))
elif output_format == 'srt':
return generate_srt_from_sjson(content_dict, speed=1.0)
@staticmethod
def asset(location, subs_id, lang='en', filename=None):
"""
Get asset from contentstore, asset location is built from subs_id and lang.
`location` is block location.
"""
# HACK Warning! this is temporary and will be removed once edx-val take over the
# transcript module and contentstore will only function as fallback until all the
# data is migrated to edx-val. It will be saving a contentstore hit for a hardcoded
# dummy-non-existent-transcript name.
if NON_EXISTENT_TRANSCRIPT in [subs_id, filename]:
raise NotFoundError
asset_filename = subs_filename(subs_id, lang) if not filename else filename
return Transcript.get_asset(location, asset_filename)
@staticmethod
def get_asset(location, filename):
"""
Return asset by location and filename.
"""
return contentstore().find(Transcript.asset_location(location, filename))
@staticmethod
def asset_location(location, filename):
"""
Return asset location. `location` is block location.
"""
# If user transcript filename is empty, raise `TranscriptException` to avoid `InvalidKeyError`.
if not filename:
raise TranscriptException("Transcript not uploaded yet")
return StaticContent.compute_location(location.course_key, filename)
@staticmethod
def delete_asset(location, filename):
"""
Delete asset by location and filename.
"""
try:
contentstore().delete(Transcript.asset_location(location, filename))
log.info("Transcript asset %s was removed from store.", filename)
except NotFoundError:
pass
return StaticContent.compute_location(location.course_key, filename)
class VideoTranscriptsMixin:
"""Mixin class for transcript functionality.
This is necessary for VideoBlock.
"""
def available_translations(self, transcripts, verify_assets=None, is_bumper=False):
"""
Return a list of language codes for which we have transcripts.
Arguments:
verify_assets (boolean): If True, checks to ensure that the transcripts
really exist in the contentstore. If False, we just look at the
VideoBlock fields and do not query the contentstore. One reason
we might do this is to avoid slamming contentstore() with queries
when trying to make a listing of videos and their languages.
Defaults to `not FALLBACK_TO_ENGLISH_TRANSCRIPTS`.
transcripts (dict): A dict with all transcripts and a sub.
include_val_transcripts(boolean): If True, adds the edx-val transcript languages as well.
"""
translations = []
if verify_assets is None:
verify_assets = not settings.FEATURES.get('FALLBACK_TO_ENGLISH_TRANSCRIPTS')
sub, other_langs = transcripts["sub"], transcripts["transcripts"]
if verify_assets:
all_langs = dict(**other_langs)
if sub:
all_langs.update({'en': sub})
for language, filename in all_langs.items():
try:
# for bumper videos, transcripts are stored in content store only
if is_bumper:
get_transcript_for_video(self.location, filename, filename, language)
else:
get_transcript(self, language)
except NotFoundError:
continue
translations.append(language)
else:
# If we're not verifying the assets, we just trust our field values
translations = list(other_langs)
if not translations or sub:
translations += ['en']
# to clean redundant language codes.
return list(set(translations))
def get_default_transcript_language(self, transcripts):
"""
Returns the default transcript language for this video block.
Args:
transcripts (dict): A dict with all transcripts and a sub.
"""
sub, other_lang = transcripts["sub"], transcripts["transcripts"]
if self.transcript_language in other_lang:
transcript_language = self.transcript_language
elif sub:
transcript_language = 'en'
elif len(other_lang) > 0:
transcript_language = sorted(other_lang)[0]
else:
transcript_language = 'en'
return transcript_language
def get_transcripts_info(self, is_bumper=False):
"""
Returns a transcript dictionary for the video.
Arguments:
is_bumper(bool): If True, the request is for the bumper transcripts
include_val_transcripts(bool): If True, include edx-val transcripts as well
"""
if is_bumper:
transcripts = copy.deepcopy(get_bumper_settings(self).get('transcripts', {}))
sub = transcripts.pop("en", "")
else:
transcripts = self.transcripts if self.transcripts else {}
sub = self.sub
# Only attach transcripts that are not empty.
transcripts = {
language_code: transcript_file
for language_code, transcript_file in transcripts.items() if transcript_file != ''
}
# bumper transcripts are stored in content store so we don't need to include val transcripts
if not is_bumper:
transcript_languages = get_available_transcript_languages(edx_video_id=self.edx_video_id)
# HACK Warning! this is temporary and will be removed once edx-val take over the
# transcript module and contentstore will only function as fallback until all the
# data is migrated to edx-val.
for language_code in transcript_languages:
if language_code == 'en' and not sub:
sub = NON_EXISTENT_TRANSCRIPT
elif not transcripts.get(language_code):
transcripts[language_code] = NON_EXISTENT_TRANSCRIPT
return {
"sub": sub,
"transcripts": transcripts,
}
@exception_decorator
def get_transcript_from_val(edx_video_id, lang=None, output_format=Transcript.SRT):
"""
Get video transcript from edx-val.
Arguments:
edx_video_id (unicode): video identifier
lang (unicode): transcript language
output_format (unicode): transcript output format
Returns:
tuple containing content, filename, mimetype
"""
transcript = get_video_transcript_content(edx_video_id, lang)
if not transcript:
raise NotFoundError(f'Transcript not found for {edx_video_id}, lang: {lang}')
transcript_conversion_props = dict(transcript, output_format=output_format)
transcript = convert_video_transcript(**transcript_conversion_props)
filename = transcript['filename']
content = transcript['content']
mimetype = Transcript.mime_types[output_format]
return content, filename, mimetype
def get_transcript_for_video(video_location, subs_id, file_name, language):
"""
Get video transcript from content store. This is a lower level function and is used by
`get_transcript_from_contentstore`. Prefer that function instead where possible. If you
need to support getting transcripts from VAL or Blockstore as well, use the `get_transcript`
function instead.
NOTE: Transcripts can be searched from content store by two ways:
1. by an id(a.k.a subs_id) which will be used to construct transcript filename
2. by providing transcript filename
Arguments:
video_location (Locator): Video location
subs_id (unicode): id for a transcript in content store
file_name (unicode): file_name for a transcript in content store
language (unicode): transcript language
Returns:
tuple containing transcript input_format, basename, content
"""
try:
if subs_id is None:
raise NotFoundError
content = Transcript.asset(video_location, subs_id, language).data.decode('utf-8')
base_name = subs_id
input_format = Transcript.SJSON
except NotFoundError:
content = Transcript.asset(video_location, None, language, file_name).data.decode('utf-8')
base_name = os.path.splitext(file_name)[0]
input_format = Transcript.SRT
return input_format, base_name, content
@exception_decorator
def get_transcript_from_contentstore(video, language, output_format, transcripts_info, youtube_id=None):
"""
Get video transcript from content store.
Arguments:
video (Video block): Video block
language (unicode): transcript language
output_format (unicode): transcript output format
transcripts_info (dict): transcript info for a video
youtube_id (unicode): youtube video id
Returns:
tuple containing content, filename, mimetype
"""
input_format, base_name, transcript_content = None, None, None
if output_format not in (Transcript.SRT, Transcript.SJSON, Transcript.TXT):
raise NotFoundError(f'Invalid transcript format `{output_format}`')
sub, other_languages = transcripts_info['sub'], transcripts_info['transcripts']
transcripts = dict(other_languages)
# this is sent in case of a translation dispatch and we need to use it as our subs_id.
possible_sub_ids = [youtube_id, sub, video.youtube_id_1_0] + get_html5_ids(video.html5_sources)
for sub_id in possible_sub_ids:
try:
transcripts['en'] = sub_id
input_format, base_name, transcript_content = get_transcript_for_video(
video.location,
subs_id=sub_id,
file_name=transcripts[language],
language=language
)
break
except (KeyError, NotFoundError):
continue
if transcript_content is None:
raise NotFoundError('No transcript for `{lang}` language'.format(
lang=language
))
# add language prefix to transcript file only if language is not None
language_prefix = f'{language}_' if language else ''
transcript_name = f'{language_prefix}{base_name}.{output_format}'
transcript_content = Transcript.convert(transcript_content, input_format=input_format, output_format=output_format)
if not transcript_content.strip():
raise NotFoundError('No transcript content')
if youtube_id:
youtube_ids = youtube_speed_dict(video)
transcript_content = json.dumps(
generate_subs(youtube_ids.get(youtube_id, 1), 1, json.loads(transcript_content))
)
return transcript_content, transcript_name, Transcript.mime_types[output_format]
def get_transcript_from_blockstore(video_block, language, output_format, transcripts_info):
"""
Get video transcript from Blockstore.
Blockstore expects video transcripts to be placed into the 'static/'
subfolder of the XBlock's folder in a Blockstore bundle. For example, if the
video XBlock's definition is in the standard location of
video/video1/definition.xml
Then the .srt files should be placed at e.g.
video/video1/static/video1-en.srt
This is the same place where other public static files are placed for other
XBlocks, such as image files used by HTML blocks.
Video XBlocks in Blockstore must set the 'transcripts' XBlock field to a
JSON dictionary listing the filename of the transcript for each language:
<video
youtube_id_1_0="3_yD_cEKoCk"
transcripts='{"en": "3_yD_cEKoCk-en.srt"}'
display_name="Welcome Video with Transcript"
download_track="true"
/>
This method is tested in openedx/core/djangoapps/content_libraries/tests/test_static_assets.py
Arguments:
video_block (Video XBlock): The video XBlock
language (str): transcript language
output_format (str): transcript output format
transcripts_info (dict): transcript info for a video, from video_block.get_transcripts_info()
Returns:
tuple containing content, filename, mimetype
"""
if output_format not in (Transcript.SRT, Transcript.SJSON, Transcript.TXT):
raise NotFoundError(f'Invalid transcript format `{output_format}`')
transcripts = transcripts_info['transcripts']
if language not in transcripts:
raise NotFoundError("Video {} does not have a transcript file defined for the '{}' language in its OLX.".format(
video_block.scope_ids.usage_id,
language,
))
filename = transcripts[language]
if not filename.endswith('.srt'):
# We want to standardize on .srt
raise NotFoundError("Video XBlocks in Blockstore only support .srt transcript files.")
# Try to load the transcript file out of Blockstore
# In lieu of an XBlock API for this (like block.runtime.resources_fs), we use the blockstore API directly.
bundle_uuid = video_block.scope_ids.def_id.bundle_uuid
path = video_block.scope_ids.def_id.olx_path.rpartition('/')[0] + '/static/' + filename
bundle_version = video_block.scope_ids.def_id.bundle_version # Either bundle_version or draft_name will be set.
draft_name = video_block.scope_ids.def_id.draft_name
try:
content_binary = blockstore_cache.get_bundle_file_data_with_cache(bundle_uuid, path, bundle_version, draft_name)
except blockstore_api.BundleFileNotFound:
raise NotFoundError("Transcript file '{}' missing for video XBlock {}".format( # lint-amnesty, pylint: disable=raise-missing-from
path,
video_block.scope_ids.usage_id,
))
# Now convert the transcript data to the requested format:
filename_no_extension = os.path.splitext(filename)[0]
output_filename = f'{filename_no_extension}.{output_format}'
output_transcript = Transcript.convert(
content_binary.decode('utf-8'),
input_format=Transcript.SRT,
output_format=output_format,
)
if not output_transcript.strip():
raise NotFoundError('No transcript content')
return output_transcript, output_filename, Transcript.mime_types[output_format]
def get_transcript(video, lang=None, output_format=Transcript.SRT, youtube_id=None):
"""
Get video transcript from edx-val or content store.
Arguments:
video (Video block): Video block
lang (unicode): transcript language
output_format (unicode): transcript output format
youtube_id (unicode): youtube video id
Returns:
tuple containing content, filename, mimetype
"""
transcripts_info = video.get_transcripts_info()
if not lang:
lang = video.get_default_transcript_language(transcripts_info)
if isinstance(video.scope_ids.def_id, BundleDefinitionLocator):
# This block is in Blockstore.
# For Blockstore, VAL is considered deprecated and we can load the transcript file
# directly using the Blockstore API:
return get_transcript_from_blockstore(video, lang, output_format, transcripts_info)
try:
edx_video_id = clean_video_id(video.edx_video_id)
if not edx_video_id:
raise NotFoundError
return get_transcript_from_val(edx_video_id, lang, output_format)
except NotFoundError:
return get_transcript_from_contentstore(
video,
lang,
youtube_id=youtube_id,
output_format=output_format,
transcripts_info=transcripts_info
)