Merge pull request #17550 from edx/rehan-ammar/transcript-util
transcript util
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Tests for transcripts_utils. """
|
||||
import copy
|
||||
import tempfile
|
||||
import ddt
|
||||
import json
|
||||
import textwrap
|
||||
@@ -19,7 +20,8 @@ from xmodule.contentstore.content import StaticContent
|
||||
from xmodule.contentstore.django import contentstore
|
||||
from xmodule.exceptions import NotFoundError
|
||||
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
|
||||
from student.tests.factories import UserFactory
|
||||
from xmodule.video_module import transcripts_utils
|
||||
|
||||
TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE)
|
||||
@@ -721,3 +723,220 @@ class TestVideoIdsInfo(unittest.TestCase):
|
||||
"""
|
||||
actual_result = transcripts_utils.get_video_ids_info(edx_video_id, youtube_id_1_0, html5_sources)
|
||||
self.assertEqual(actual_result, expected_result)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestGetTranscript(SharedModuleStoreTestCase):
|
||||
"""Tests for `get_transcript` function."""
|
||||
|
||||
def setUp(self):
|
||||
super(TestGetTranscript, self).setUp()
|
||||
|
||||
self.course = CourseFactory.create()
|
||||
|
||||
self.subs_id = 'video_101'
|
||||
|
||||
self.subs_sjson = {
|
||||
'start': [100, 200, 240, 390, 1000],
|
||||
'end': [200, 240, 380, 1000, 1500],
|
||||
'text': [
|
||||
'subs #1',
|
||||
'subs #2',
|
||||
'subs #3',
|
||||
'subs #4',
|
||||
'subs #5'
|
||||
]
|
||||
}
|
||||
|
||||
self.subs_srt = transcripts_utils.Transcript.convert(json.dumps(self.subs_sjson), 'sjson', 'srt')
|
||||
|
||||
self.subs = {
|
||||
u'en': self.subs_srt,
|
||||
u'ur': transcripts_utils.Transcript.convert(json.dumps(self.subs_sjson), 'sjson', 'srt'),
|
||||
}
|
||||
|
||||
self.srt_mime_type = transcripts_utils.Transcript.mime_types[transcripts_utils.Transcript.SRT]
|
||||
self.sjson_mime_type = transcripts_utils.Transcript.mime_types[transcripts_utils.Transcript.SJSON]
|
||||
|
||||
self.user = UserFactory.create()
|
||||
self.vertical = ItemFactory.create(category='vertical', parent_location=self.course.location)
|
||||
self.video = ItemFactory.create(category='video', parent_location=self.vertical.location)
|
||||
|
||||
def create_transcript(self, subs_id, language=u'en', filename='video.srt'):
|
||||
"""
|
||||
create transcript.
|
||||
"""
|
||||
transcripts = {}
|
||||
if language != u'en':
|
||||
transcripts = {language: filename}
|
||||
|
||||
self.video = ItemFactory.create(
|
||||
category='video',
|
||||
parent_location=self.vertical.location,
|
||||
sub=subs_id,
|
||||
transcripts=transcripts
|
||||
)
|
||||
|
||||
if subs_id:
|
||||
transcripts_utils.save_subs_to_store(
|
||||
self.subs_sjson,
|
||||
subs_id,
|
||||
self.video,
|
||||
language=language,
|
||||
)
|
||||
|
||||
def create_srt_file(self, content):
|
||||
"""
|
||||
Create srt file.
|
||||
"""
|
||||
srt_file = tempfile.NamedTemporaryFile(suffix=".srt")
|
||||
srt_file.content_type = transcripts_utils.Transcript.SRT
|
||||
srt_file.write(content)
|
||||
srt_file.seek(0)
|
||||
return srt_file
|
||||
|
||||
def upload_file(self, subs_file, location, filename):
|
||||
"""
|
||||
Upload a file in content store.
|
||||
|
||||
Arguments:
|
||||
subs_file (File): pointer to file to be uploaded
|
||||
location (Locator): Item location
|
||||
filename (unicode): Name of file to be uploaded
|
||||
"""
|
||||
mime_type = subs_file.content_type
|
||||
content_location = StaticContent.compute_location(
|
||||
location.course_key, filename
|
||||
)
|
||||
content = StaticContent(content_location, filename, mime_type, subs_file.read())
|
||||
contentstore().save(content)
|
||||
|
||||
@ddt.data(
|
||||
# en lang does not exist so NotFoundError will be raised
|
||||
(u'en',),
|
||||
# ur lang does not exist so KeyError and then NotFoundError will be raised
|
||||
(u'ur',),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_get_transcript_not_found(self, lang):
|
||||
"""
|
||||
Verify that `NotFoundError` exception is raised when transcript is not found in both the content store and val.
|
||||
"""
|
||||
with self.assertRaises(NotFoundError):
|
||||
transcripts_utils.get_transcript(self.course.id, self.video.location.block_id, lang=lang)
|
||||
|
||||
@ddt.data(
|
||||
{
|
||||
'language': u'en',
|
||||
'subs_id': 'video_101',
|
||||
'filename': 'en_video_101.srt',
|
||||
},
|
||||
{
|
||||
'language': u'ur',
|
||||
'subs_id': '',
|
||||
'filename': 'ur_video_101.srt',
|
||||
},
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_get_transcript_from_content_store(self, language, subs_id, filename):
|
||||
"""
|
||||
Verify that `get_transcript` function returns correct data when transcript is in content store.
|
||||
"""
|
||||
self.upload_file(self.create_srt_file(self.subs_srt), self.video.location, filename)
|
||||
self.create_transcript(subs_id, language, filename)
|
||||
content, filename, mimetype = transcripts_utils.get_transcript(
|
||||
self.course.id,
|
||||
self.video.location.block_id,
|
||||
language
|
||||
)
|
||||
|
||||
self.assertEqual(content, self.subs[language])
|
||||
self.assertEqual(filename, filename)
|
||||
self.assertEqual(mimetype, self.srt_mime_type)
|
||||
|
||||
def test_get_transcript_from_content_store_for_ur(self):
|
||||
"""
|
||||
Verify that `get_transcript` function returns correct data for non-english when transcript is in content store.
|
||||
"""
|
||||
language = u'ur'
|
||||
self.create_transcript(self.subs_id, language)
|
||||
content, filename, mimetype = transcripts_utils.get_transcript(
|
||||
self.course.id,
|
||||
self.video.location.block_id,
|
||||
language,
|
||||
output_format=transcripts_utils.Transcript.SJSON
|
||||
)
|
||||
|
||||
self.assertEqual(json.loads(content), self.subs_sjson)
|
||||
self.assertEqual(filename, 'ur_video_101.sjson')
|
||||
self.assertEqual(mimetype, self.sjson_mime_type)
|
||||
|
||||
@patch(
|
||||
'openedx.core.djangoapps.video_config.models.VideoTranscriptEnabledFlag.feature_enabled',
|
||||
Mock(return_value=True),
|
||||
)
|
||||
@patch('xmodule.video_module.transcripts_utils.get_video_transcript_content')
|
||||
def test_get_transcript_from_val(self, mock_get_video_transcript_content):
|
||||
"""
|
||||
Verify that `get_transcript` function returns correct data when transcript is in val.
|
||||
"""
|
||||
mock_get_video_transcript_content.return_value = {
|
||||
'content': json.dumps(self.subs_sjson),
|
||||
'file_name': 'edx.sjson'
|
||||
}
|
||||
|
||||
content, filename, mimetype = transcripts_utils.get_transcript(
|
||||
self.course.id,
|
||||
self.video.location.block_id,
|
||||
)
|
||||
self.assertEqual(content, self.subs_srt)
|
||||
self.assertEqual(filename, 'edx.srt')
|
||||
self.assertEqual(mimetype, self.srt_mime_type)
|
||||
|
||||
def test_get_transcript_invalid_format(self):
|
||||
"""
|
||||
Verify that `get_transcript` raises correct exception if transcript format is invalid.
|
||||
"""
|
||||
with self.assertRaises(NotFoundError) as invalid_format_exception:
|
||||
transcripts_utils.get_transcript(
|
||||
self.course.id,
|
||||
self.video.location.block_id,
|
||||
'ur',
|
||||
output_format='mpeg'
|
||||
)
|
||||
|
||||
exception_message = text_type(invalid_format_exception.exception)
|
||||
self.assertEqual(exception_message, 'Invalid transcript format `mpeg`')
|
||||
|
||||
def test_get_transcript_no_content(self):
|
||||
"""
|
||||
Verify that `get_transcript` function returns correct exception when transcript content is empty.
|
||||
"""
|
||||
self.upload_file(self.create_srt_file(''), self.video.location, 'ur_video_101.srt')
|
||||
self.create_transcript('', 'ur', 'ur_video_101.srt')
|
||||
|
||||
with self.assertRaises(NotFoundError) as no_content_exception:
|
||||
transcripts_utils.get_transcript(
|
||||
self.course.id,
|
||||
self.video.location.block_id,
|
||||
'ur'
|
||||
)
|
||||
|
||||
exception_message = text_type(no_content_exception.exception)
|
||||
self.assertEqual(exception_message, 'No transcript content')
|
||||
|
||||
def test_get_transcript_no_en_transcript(self):
|
||||
"""
|
||||
Verify that `get_transcript` function returns correct exception when no transcript exists for `en`.
|
||||
"""
|
||||
self.video.youtube_id_1_0 = ''
|
||||
self.store.update_item(self.video, self.user.id)
|
||||
with self.assertRaises(NotFoundError) as no_en_transcript_exception:
|
||||
transcripts_utils.get_transcript(
|
||||
self.course.id,
|
||||
self.video.location.block_id,
|
||||
'en'
|
||||
)
|
||||
|
||||
exception_message = text_type(no_en_transcript_exception.exception)
|
||||
self.assertEqual(exception_message, 'No transcript for `en` language')
|
||||
|
||||
@@ -11,9 +11,11 @@ import logging
|
||||
from pysrt import SubRipTime, SubRipItem, SubRipFile
|
||||
from pysrt.srtexc import Error
|
||||
from lxml import etree
|
||||
from opaque_keys.edx.locator import BlockUsageLocator
|
||||
from HTMLParser import HTMLParser
|
||||
from six import text_type
|
||||
|
||||
from xmodule.modulestore.django import modulestore
|
||||
from xmodule.exceptions import NotFoundError
|
||||
from xmodule.contentstore.content import StaticContent
|
||||
from xmodule.contentstore.django import contentstore
|
||||
@@ -863,3 +865,137 @@ class VideoTranscriptsMixin(object):
|
||||
"sub": sub,
|
||||
"transcripts": transcripts,
|
||||
}
|
||||
|
||||
|
||||
def get_transcript_from_val(edx_video_id, lang=None, output_format=Transcript.SRT):
|
||||
"""
|
||||
Get video transcript from edx-val.
|
||||
Arguments:
|
||||
edx_video_id (unicode): course identifier
|
||||
lang (unicode): transcript language
|
||||
output_format (unicode): transcript output format
|
||||
Returns:
|
||||
tuple containing content, filename, mimetype
|
||||
"""
|
||||
transcript = get_video_transcript_content(edx_video_id, lang)
|
||||
if not transcript:
|
||||
raise NotFoundError(u'Transcript not found for {}, lang: {}'.format(edx_video_id, lang))
|
||||
|
||||
transcript_conversion_props = dict(transcript, output_format=output_format)
|
||||
transcript = convert_video_transcript(**transcript_conversion_props)
|
||||
filename = transcript['filename']
|
||||
content = transcript['content']
|
||||
mimetype = Transcript.mime_types[output_format]
|
||||
|
||||
return content, filename, mimetype
|
||||
|
||||
|
||||
def get_transcript_for_video(video_location, subs_id, file_name, language):
|
||||
"""
|
||||
Get video transcript from content store.
|
||||
|
||||
Arguments:
|
||||
video_location (Locator): Video location
|
||||
subs_id (unicode): id for a transcript in content store
|
||||
file_name (unicode): file_name for a transcript in content store
|
||||
language (unicode): transcript language
|
||||
|
||||
Returns:
|
||||
tuple containing transcript input_format, basename, content
|
||||
"""
|
||||
try:
|
||||
content = Transcript.asset(video_location, subs_id, language).data
|
||||
base_name = subs_id
|
||||
input_format = Transcript.SJSON
|
||||
except NotFoundError:
|
||||
content = Transcript.asset(video_location, None, language, file_name).data
|
||||
base_name = os.path.splitext(file_name)[0]
|
||||
input_format = Transcript.SRT
|
||||
|
||||
return input_format, base_name, content
|
||||
|
||||
|
||||
def get_transcript_from_contentstore(video, language, output_format, youtube_id=None, is_bumper=False):
|
||||
"""
|
||||
Get video transcript from content store.
|
||||
|
||||
Arguments:
|
||||
video (Video Descriptor): Video descriptor
|
||||
language (unicode): transcript language
|
||||
output_format (unicode): transcript output format
|
||||
youtube_id (unicode): youtube video id
|
||||
is_bumper (bool): indicates bumper video
|
||||
|
||||
Returns:
|
||||
tuple containing content, filename, mimetype
|
||||
"""
|
||||
if output_format not in (Transcript.SRT, Transcript.SJSON, Transcript.TXT):
|
||||
raise NotFoundError('Invalid transcript format `{output_format}`'.format(output_format=output_format))
|
||||
|
||||
transcripts_info = video.get_transcripts_info(is_bumper=is_bumper)
|
||||
sub, other_languages = transcripts_info['sub'], transcripts_info['transcripts']
|
||||
transcripts = dict(other_languages)
|
||||
|
||||
# this is sent in case of a translation dispatch and we need to use it as our subs_id.
|
||||
if youtube_id:
|
||||
transcripts['en'] = youtube_id
|
||||
elif sub:
|
||||
transcripts['en'] = sub
|
||||
elif video.youtube_id_1_0:
|
||||
transcripts['en'] = video.youtube_id_1_0
|
||||
elif language == u'en':
|
||||
raise NotFoundError('No transcript for `en` language')
|
||||
|
||||
try:
|
||||
input_format, base_name, transcript_content = get_transcript_for_video(
|
||||
video.location,
|
||||
subs_id=transcripts['en'],
|
||||
file_name=language and transcripts[language],
|
||||
language=language
|
||||
)
|
||||
except KeyError:
|
||||
raise NotFoundError
|
||||
|
||||
# add language prefix to transcript file only if language is not None
|
||||
language_prefix = '{}_'.format(language) if language else ''
|
||||
transcript_name = u'{}{}.{}'.format(language_prefix, base_name, output_format)
|
||||
transcript_content = Transcript.convert(transcript_content, input_format=input_format, output_format=output_format)
|
||||
|
||||
if not transcript_content.strip():
|
||||
raise NotFoundError('No transcript content')
|
||||
|
||||
if youtube_id:
|
||||
youtube_ids = youtube_speed_dict(video)
|
||||
transcript_content = json.dumps(
|
||||
generate_subs(youtube_ids.get(youtube_id, 1), 1, json.loads(transcript_content))
|
||||
)
|
||||
|
||||
return transcript_content, transcript_name, Transcript.mime_types[output_format]
|
||||
|
||||
|
||||
def get_transcript(course_id, block_id, lang=None, output_format=Transcript.SRT, is_bumper=False):
|
||||
"""
|
||||
Get video transcript from edx-val or content store.
|
||||
|
||||
Arguments:
|
||||
course_id (CourseLocator): course identifier
|
||||
block_id (unicode): a unique identifier for an item in modulestore
|
||||
lang (unicode): transcript language
|
||||
output_format (unicode): transcript output format
|
||||
is_bumper (bool): indicates bumper video
|
||||
|
||||
Returns:
|
||||
tuple containing content, filename, mimetype
|
||||
"""
|
||||
usage_key = BlockUsageLocator(course_id, block_type='video', block_id=block_id)
|
||||
video_descriptor = modulestore().get_item(usage_key)
|
||||
|
||||
try:
|
||||
return get_transcript_from_val(video_descriptor.edx_video_id, lang, output_format)
|
||||
except NotFoundError:
|
||||
return get_transcript_from_contentstore(
|
||||
video_descriptor,
|
||||
lang,
|
||||
output_format=output_format,
|
||||
is_bumper=is_bumper
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user