As part of the ongoing effort to deprecate and eventually remove xmodule, we’ve started gradually migrating the necessary code files from xmodule to more appropriate locations within the codebase. Ticket: https://github.com/openedx/public-engineering/issues/445 Also: this tweaks importlinter ignores & add follow-up issue links Co-authored-by: Kyle McCormick <kyle@axim.org>
1176 lines
44 KiB
Python
1176 lines
44 KiB
Python
"""Tests for items views."""
|
|
|
|
|
|
import copy
|
|
import json
|
|
import tempfile
|
|
import textwrap
|
|
from codecs import BOM_UTF8
|
|
from unittest.mock import Mock, patch
|
|
from uuid import uuid4
|
|
|
|
import ddt
|
|
from django.conf import settings
|
|
from django.test.utils import override_settings
|
|
from django.urls import reverse
|
|
from edxval.api import create_video
|
|
from opaque_keys.edx.keys import UsageKey
|
|
from organizations.tests.factories import OrganizationFactory
|
|
|
|
from cms.djangoapps.contentstore.tests.utils import CourseTestCase, setup_caption_responses
|
|
from openedx.core.djangoapps.contentserver.caching import del_cached_content
|
|
from openedx.core.djangoapps.content_libraries import api as lib_api
|
|
from xmodule.contentstore.content import StaticContent # lint-amnesty, pylint: disable=wrong-import-order
|
|
from xmodule.contentstore.django import contentstore # lint-amnesty, pylint: disable=wrong-import-order
|
|
from xmodule.exceptions import NotFoundError # lint-amnesty, pylint: disable=wrong-import-order
|
|
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
|
|
from xmodule.video_block import VideoBlock # lint-amnesty, pylint: disable=wrong-import-order
|
|
from openedx.core.djangoapps.video_config.transcripts_utils import ( # lint-amnesty, pylint: disable=wrong-import-order
|
|
GetTranscriptsFromYouTubeException,
|
|
Transcript,
|
|
get_video_transcript_content,
|
|
get_transcript,
|
|
remove_subs_from_store,
|
|
)
|
|
from openedx.core.djangoapps.xblock import api as xblock_api
|
|
|
|
TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE)
|
|
TEST_DATA_CONTENTSTORE['DOC_STORE_CONFIG']['db'] = 'test_xcontent_%s' % uuid4().hex
|
|
|
|
SRT_TRANSCRIPT_CONTENT = """0
|
|
00:00:10,500 --> 00:00:13,000
|
|
Elephant's Dream
|
|
|
|
1
|
|
00:00:15,000 --> 00:00:18,000
|
|
At the left we can see...
|
|
|
|
"""
|
|
|
|
SJSON_TRANSCRIPT_CONTENT = Transcript.convert(
|
|
SRT_TRANSCRIPT_CONTENT,
|
|
Transcript.SRT,
|
|
Transcript.SJSON,
|
|
)
|
|
|
|
|
|
@override_settings(CONTENTSTORE=TEST_DATA_CONTENTSTORE)
|
|
class BaseTranscripts(CourseTestCase):
|
|
"""Base test class for transcripts tests."""
|
|
|
|
def clear_subs_content(self):
|
|
"""Remove, if transcripts content exists."""
|
|
for youtube_id in self.get_youtube_ids().values():
|
|
filename = f'subs_{youtube_id}.srt.sjson'
|
|
content_location = StaticContent.compute_location(self.course.id, filename)
|
|
try:
|
|
content = contentstore().find(content_location)
|
|
contentstore().delete(content.get_id())
|
|
except NotFoundError:
|
|
pass
|
|
|
|
def save_subs_to_store(self, subs, subs_id):
|
|
"""
|
|
Save transcripts into `StaticContent`.
|
|
"""
|
|
filedata = json.dumps(subs, indent=2)
|
|
mime_type = 'application/json'
|
|
filename = f'subs_{subs_id}.srt.sjson'
|
|
|
|
content_location = StaticContent.compute_location(self.course.id, filename)
|
|
content = StaticContent(content_location, filename, mime_type, filedata)
|
|
contentstore().save(content)
|
|
del_cached_content(content_location)
|
|
return content_location
|
|
|
|
def setUp(self):
|
|
"""Create initial data."""
|
|
super().setUp()
|
|
|
|
# Add video block
|
|
data = {
|
|
'parent_locator': str(self.course.location),
|
|
'category': 'video',
|
|
'type': 'video'
|
|
}
|
|
resp = self.client.ajax_post('/xblock/', data)
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
self.library = lib_api.create_library(
|
|
org=OrganizationFactory.create(short_name="org1"),
|
|
slug="lib",
|
|
title="Library",
|
|
)
|
|
self.library_block_metadata = lib_api.create_library_block(
|
|
self.library.key,
|
|
"video",
|
|
"video-transcript",
|
|
)
|
|
self.library_block = xblock_api.load_block(
|
|
self.library_block_metadata.usage_key,
|
|
self.user,
|
|
)
|
|
|
|
self.video_usage_key = self._get_usage_key(resp)
|
|
self.item = modulestore().get_item(self.video_usage_key)
|
|
# hI10vDNYz4M - valid Youtube ID with transcripts.
|
|
# JMD_ifUUfsU, AKqURZnYqpk, DYpADpL7jAY - valid Youtube IDs without transcripts.
|
|
self.set_fields_from_xml(
|
|
self.item, '<video youtube="0.75:JMD_ifUUfsU,1.0:hI10vDNYz4M,1.25:AKqURZnYqpk,1.50:DYpADpL7jAY" />'
|
|
)
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
self.item = modulestore().get_item(self.video_usage_key)
|
|
# Remove all transcripts for current block.
|
|
self.clear_subs_content()
|
|
|
|
def _get_usage_key(self, resp):
|
|
""" Returns the usage key from the response returned by a create operation. """
|
|
usage_key_string = json.loads(resp.content.decode('utf-8')).get('locator')
|
|
return UsageKey.from_string(usage_key_string)
|
|
|
|
def get_youtube_ids(self):
|
|
"""Return youtube speeds and ids."""
|
|
item = modulestore().get_item(self.video_usage_key)
|
|
|
|
return {
|
|
0.75: item.youtube_id_0_75,
|
|
1: item.youtube_id_1_0,
|
|
1.25: item.youtube_id_1_25,
|
|
1.5: item.youtube_id_1_5
|
|
}
|
|
|
|
def create_non_video_block(self):
|
|
"""
|
|
Setup non video block for tests.
|
|
"""
|
|
data = {
|
|
'parent_locator': str(self.course.location),
|
|
'category': 'problem',
|
|
'type': 'problem'
|
|
}
|
|
response = self.client.ajax_post('/xblock/', data)
|
|
usage_key = self._get_usage_key(response)
|
|
item = modulestore().get_item(usage_key)
|
|
self.set_fields_from_xml(self.item, '<problem youtube="0.75:JMD_ifUUfsU,1.0:hI10vDNYz4M" />')
|
|
modulestore().update_item(item, self.user.id)
|
|
|
|
return usage_key
|
|
|
|
def assert_response(self, response, expected_status_code, expected_message):
|
|
response_content = json.loads(response.content.decode('utf-8'))
|
|
self.assertEqual(response.status_code, expected_status_code)
|
|
self.assertEqual(response_content['status'], expected_message)
|
|
|
|
def set_fields_from_xml(self, item, xml):
|
|
fields_data = VideoBlock.parse_video_xml(xml)
|
|
for key, value in fields_data.items():
|
|
setattr(item, key, value)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestUploadTranscripts(BaseTranscripts):
|
|
"""
|
|
Tests for '/transcripts/upload' endpoint.
|
|
"""
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.contents = {
|
|
'good': SRT_TRANSCRIPT_CONTENT,
|
|
'bad': 'Some BAD data',
|
|
}
|
|
# Create temporary transcript files
|
|
self.good_srt_file = self.create_transcript_file(content=self.contents['good'], suffix='.srt')
|
|
self.bad_data_srt_file = self.create_transcript_file(content=self.contents['bad'], suffix='.srt')
|
|
self.bad_name_srt_file = self.create_transcript_file(content=self.contents['good'], suffix='.bad')
|
|
self.bom_srt_file = self.create_transcript_file(content=self.contents['good'], suffix='.srt', include_bom=True)
|
|
|
|
# Setup a VEDA produced video and persist `edx_video_id` in VAL.
|
|
create_video({
|
|
'edx_video_id': '123-456-789',
|
|
'status': 'upload',
|
|
'client_video_id': 'Test Video',
|
|
'duration': 0,
|
|
'encoded_videos': [],
|
|
'courses': [str(self.course.id)]
|
|
})
|
|
|
|
# Add clean up handler
|
|
self.addCleanup(self.clean_temporary_transcripts)
|
|
|
|
def create_transcript_file(self, content, suffix, include_bom=False):
|
|
"""
|
|
Setup a transcript file with suffix and content.
|
|
"""
|
|
transcript_file = tempfile.NamedTemporaryFile(suffix=suffix) # lint-amnesty, pylint: disable=consider-using-with
|
|
wrapped_content = textwrap.dedent(content)
|
|
if include_bom:
|
|
wrapped_content = wrapped_content.encode('utf-8-sig')
|
|
# Verify that ufeff(BOM) character is in content.
|
|
self.assertIn(BOM_UTF8, wrapped_content)
|
|
transcript_file.write(wrapped_content)
|
|
else:
|
|
transcript_file.write(wrapped_content.encode('utf-8'))
|
|
transcript_file.seek(0)
|
|
|
|
return transcript_file
|
|
|
|
def clean_temporary_transcripts(self):
|
|
"""
|
|
Close transcript files gracefully.
|
|
"""
|
|
self.good_srt_file.close()
|
|
self.bad_data_srt_file.close()
|
|
self.bad_name_srt_file.close()
|
|
self.bom_srt_file.close()
|
|
|
|
def upload_transcript(self, locator, transcript_file, edx_video_id=None):
|
|
"""
|
|
Uploads a transcript for a video
|
|
"""
|
|
payload = {}
|
|
if locator:
|
|
payload.update({'locator': locator})
|
|
|
|
if edx_video_id is not None:
|
|
payload.update({'edx_video_id': edx_video_id})
|
|
|
|
if transcript_file:
|
|
payload.update({'transcript-file': transcript_file})
|
|
|
|
upload_url = reverse('upload_transcripts')
|
|
response = self.client.post(upload_url, payload)
|
|
|
|
return response
|
|
|
|
@ddt.data(
|
|
('123-456-789', False),
|
|
('', False),
|
|
('123-456-789', True)
|
|
)
|
|
@ddt.unpack
|
|
def test_transcript_upload_success(self, edx_video_id, include_bom):
|
|
"""
|
|
Tests transcript file upload to video component works as
|
|
expected in case of following:
|
|
|
|
1. External video component
|
|
2. VEDA produced video component
|
|
3. Transcript content containing BOM character
|
|
"""
|
|
# In case of an external video component, the `edx_video_id` must be empty
|
|
# and VEDA produced video component will have `edx_video_id` set to VAL video ID.
|
|
self.item.edx_video_id = edx_video_id
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
# Upload a transcript
|
|
transcript_file = self.bom_srt_file if include_bom else self.good_srt_file
|
|
response = self.upload_transcript(self.video_usage_key, transcript_file, '')
|
|
|
|
# Verify the response
|
|
self.assert_response(response, expected_status_code=200, expected_message='Success')
|
|
|
|
# Verify the `edx_video_id` on the video component
|
|
json_response = json.loads(response.content.decode('utf-8'))
|
|
expected_edx_video_id = edx_video_id if edx_video_id else json_response['edx_video_id']
|
|
video = modulestore().get_item(self.video_usage_key)
|
|
self.assertEqual(video.edx_video_id, expected_edx_video_id)
|
|
self.assertDictEqual(video.transcripts, {'en': f'{expected_edx_video_id}-en.srt'})
|
|
|
|
# Verify transcript content
|
|
actual_transcript = get_video_transcript_content(video.edx_video_id, language_code='en')
|
|
actual_sjson_content = json.loads(actual_transcript['content'].decode('utf-8'))
|
|
expected_sjson_content = json.loads(Transcript.convert(
|
|
self.contents['good'],
|
|
input_format=Transcript.SRT,
|
|
output_format=Transcript.SJSON
|
|
))
|
|
self.assertDictEqual(actual_sjson_content, expected_sjson_content)
|
|
|
|
def test_transcript_upload_without_locator(self):
|
|
"""
|
|
Test that transcript upload validation fails if the video locator is missing
|
|
"""
|
|
response = self.upload_transcript(locator=None, transcript_file=self.good_srt_file, edx_video_id='')
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Video locator is required.'
|
|
)
|
|
|
|
def test_transcript_upload_without_file(self):
|
|
"""
|
|
Test that transcript upload validation fails if transcript file is missing
|
|
"""
|
|
response = self.upload_transcript(locator=self.video_usage_key, transcript_file=None, edx_video_id='')
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='A transcript file is required.'
|
|
)
|
|
|
|
def test_transcript_upload_bad_format(self):
|
|
"""
|
|
Test that transcript upload validation fails if transcript format is not SRT
|
|
"""
|
|
response = self.upload_transcript(
|
|
locator=self.video_usage_key,
|
|
transcript_file=self.bad_name_srt_file,
|
|
edx_video_id=''
|
|
)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='This transcript file type is not supported.'
|
|
)
|
|
|
|
def test_transcript_upload_bad_content(self):
|
|
"""
|
|
Test that transcript upload validation fails in case of bad transcript content.
|
|
"""
|
|
# Request to upload transcript for the video
|
|
response = self.upload_transcript(
|
|
locator=self.video_usage_key,
|
|
transcript_file=self.bad_data_srt_file,
|
|
edx_video_id=''
|
|
)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='There is a problem with this transcript file. Try to upload a different file.'
|
|
)
|
|
video = modulestore().get_item(self.video_usage_key)
|
|
self.assertDictEqual(video.transcripts, {})
|
|
|
|
def test_transcript_upload_unknown_category(self):
|
|
"""
|
|
Test that transcript upload validation fails if item's category is other than video.
|
|
"""
|
|
# non_video block setup - i.e. an item whose category is not 'video'.
|
|
usage_key = self.create_non_video_block()
|
|
# Request to upload transcript for the item
|
|
response = self.upload_transcript(locator=usage_key, transcript_file=self.good_srt_file, edx_video_id='')
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Transcripts are supported only for "video" blocks.'
|
|
)
|
|
|
|
def test_transcript_upload_non_existent_item(self):
|
|
"""
|
|
Test that transcript upload validation fails in case of invalid item's locator.
|
|
"""
|
|
# Request to upload transcript for the item
|
|
response = self.upload_transcript(
|
|
locator='non_existent_locator',
|
|
transcript_file=self.good_srt_file,
|
|
edx_video_id=''
|
|
)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Cannot find item by locator.'
|
|
)
|
|
|
|
def test_transcript_upload_without_edx_video_id(self):
|
|
"""
|
|
Test that transcript upload validation fails if the `edx_video_id` is missing
|
|
"""
|
|
response = self.upload_transcript(locator=self.video_usage_key, transcript_file=self.good_srt_file)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Video ID is required.'
|
|
)
|
|
|
|
def test_transcript_upload_with_non_existant_edx_video_id(self):
|
|
"""
|
|
Test that transcript upload works as expected if `edx_video_id` set on
|
|
video block is different from `edx_video_id` received in POST request.
|
|
"""
|
|
non_existant_edx_video_id = '1111-2222-3333-4444'
|
|
|
|
# Upload with non-existant `edx_video_id`
|
|
response = self.upload_transcript(
|
|
locator=self.video_usage_key,
|
|
transcript_file=self.good_srt_file,
|
|
edx_video_id=non_existant_edx_video_id
|
|
)
|
|
# Verify the response
|
|
self.assert_response(response, expected_status_code=400, expected_message='Invalid Video ID')
|
|
|
|
# Verify transcript does not exist for non-existant `edx_video_id`
|
|
self.assertIsNone(get_video_transcript_content(non_existant_edx_video_id, language_code='en'))
|
|
|
|
|
|
@ddt.ddt
|
|
class TestChooseTranscripts(BaseTranscripts):
|
|
"""
|
|
Tests for '/transcripts/choose' endpoint.
|
|
"""
|
|
def setUp(self):
|
|
super().setUp()
|
|
|
|
# Create test transcript in contentstore
|
|
self.chosen_html5_id = 'test_html5_subs'
|
|
self.sjson_subs = Transcript.convert(SRT_TRANSCRIPT_CONTENT, Transcript.SRT, Transcript.SJSON)
|
|
self.save_subs_to_store(json.loads(self.sjson_subs), self.chosen_html5_id)
|
|
|
|
# Setup a VEDA produced video and persist `edx_video_id` in VAL.
|
|
create_video({
|
|
'edx_video_id': '123-456-789',
|
|
'status': 'upload',
|
|
'client_video_id': 'Test Video',
|
|
'duration': 0,
|
|
'encoded_videos': [],
|
|
'courses': [str(self.course.id)]
|
|
})
|
|
|
|
def choose_transcript(self, locator, chosen_html5_id):
|
|
"""
|
|
Make an endpoint call to choose transcript
|
|
"""
|
|
payload = {}
|
|
if locator:
|
|
payload.update({'locator': str(locator)})
|
|
|
|
if chosen_html5_id:
|
|
payload.update({'html5_id': chosen_html5_id})
|
|
|
|
choose_transcript_url = reverse('choose_transcripts')
|
|
response = self.client.get(choose_transcript_url, {'data': json.dumps(payload)})
|
|
return response
|
|
|
|
@ddt.data('123-456-789', '')
|
|
def test_choose_transcript_success(self, edx_video_id):
|
|
"""
|
|
Verify that choosing transcript file in video component basic tab works as
|
|
expected in case of following:
|
|
|
|
1. External video component
|
|
2. VEDA produced video component
|
|
"""
|
|
# In case of an external video component, the `edx_video_id` must be empty
|
|
# and VEDA produced video component will have `edx_video_id` set to VAL video ID.
|
|
self.item.edx_video_id = edx_video_id
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
# Make call to choose a transcript
|
|
response = self.choose_transcript(self.video_usage_key, self.chosen_html5_id)
|
|
|
|
# Verify the response
|
|
self.assert_response(response, expected_status_code=200, expected_message='Success')
|
|
|
|
# Verify the `edx_video_id` on the video component
|
|
json_response = json.loads(response.content.decode('utf-8'))
|
|
expected_edx_video_id = edx_video_id if edx_video_id else json_response['edx_video_id']
|
|
video = modulestore().get_item(self.video_usage_key)
|
|
self.assertEqual(video.edx_video_id, expected_edx_video_id)
|
|
|
|
# Verify transcript content
|
|
actual_transcript = get_video_transcript_content(video.edx_video_id, language_code='en')
|
|
actual_sjson_content = json.loads(actual_transcript['content'].decode('utf-8'))
|
|
expected_sjson_content = json.loads(self.sjson_subs)
|
|
self.assertDictEqual(actual_sjson_content, expected_sjson_content)
|
|
|
|
def test_choose_transcript_fails_without_data(self):
|
|
"""
|
|
Verify that choose transcript fails if we do not provide video data in request.
|
|
"""
|
|
response = self.choose_transcript(locator=None, chosen_html5_id=None)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Incoming video data is empty.'
|
|
)
|
|
|
|
def test_choose_transcript_fails_without_locator(self):
|
|
"""
|
|
Verify that choose transcript fails if video locator is missing in request.
|
|
"""
|
|
response = self.choose_transcript(locator=None, chosen_html5_id=self.chosen_html5_id)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Cannot find item by locator.'
|
|
)
|
|
|
|
def test_choose_transcript_with_no_html5_transcript(self):
|
|
"""
|
|
Verify that choose transcript fails if the chosen html5 ID don't
|
|
have any transcript associated in contentstore.
|
|
"""
|
|
response = self.choose_transcript(locator=self.video_usage_key, chosen_html5_id='non-existent-html5-id')
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message="No such transcript."
|
|
)
|
|
|
|
def test_choose_transcript_fails_on_unknown_category(self):
|
|
"""
|
|
Test that transcript choose validation fails if item's category is other than video.
|
|
"""
|
|
# non_video block setup - i.e. an item whose category is not 'video'.
|
|
usage_key = self.create_non_video_block()
|
|
# Request to choose transcript for the item
|
|
response = self.choose_transcript(locator=usage_key, chosen_html5_id=self.chosen_html5_id)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Transcripts are supported only for "video" blocks.'
|
|
)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestRenameTranscripts(BaseTranscripts):
|
|
"""
|
|
Tests for '/transcripts/rename' endpoint.
|
|
"""
|
|
def setUp(self):
|
|
super().setUp()
|
|
|
|
# Create test transcript in contentstore and update item's sub.
|
|
self.item.sub = 'test_video_subs'
|
|
self.sjson_subs = Transcript.convert(SRT_TRANSCRIPT_CONTENT, Transcript.SRT, Transcript.SJSON)
|
|
self.save_subs_to_store(json.loads(self.sjson_subs), self.item.sub)
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
# Setup a VEDA produced video and persist `edx_video_id` in VAL.
|
|
create_video({
|
|
'edx_video_id': '123-456-789',
|
|
'status': 'upload',
|
|
'client_video_id': 'Test Video',
|
|
'duration': 0,
|
|
'encoded_videos': [],
|
|
'courses': [str(self.course.id)]
|
|
})
|
|
|
|
def rename_transcript(self, locator):
|
|
"""
|
|
Make an endpoint call to rename transcripts.
|
|
"""
|
|
payload = {}
|
|
if locator:
|
|
payload.update({'locator': str(locator)})
|
|
|
|
rename_transcript_url = reverse('rename_transcripts')
|
|
response = self.client.get(rename_transcript_url, {'data': json.dumps(payload)})
|
|
return response
|
|
|
|
@ddt.data('123-456-789', '')
|
|
def test_rename_transcript_success(self, edx_video_id):
|
|
"""
|
|
Verify that "use current transcript" in video component basic tab works as
|
|
expected in case of following:
|
|
|
|
1. External video component
|
|
2. VEDA produced video component
|
|
"""
|
|
# In case of an external video component, the `edx_video_id` must be empty
|
|
# and VEDA produced video component will have `edx_video_id` set to VAL video ID.
|
|
self.item.edx_video_id = edx_video_id
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
# Make call to use current transcript from contentstore
|
|
response = self.rename_transcript(self.video_usage_key)
|
|
|
|
# Verify the response
|
|
self.assert_response(response, expected_status_code=200, expected_message='Success')
|
|
|
|
# Verify the `edx_video_id` on the video component
|
|
json_response = json.loads(response.content.decode('utf-8'))
|
|
expected_edx_video_id = edx_video_id if edx_video_id else json_response['edx_video_id']
|
|
video = modulestore().get_item(self.video_usage_key)
|
|
self.assertEqual(video.edx_video_id, expected_edx_video_id)
|
|
|
|
# Verify transcript content
|
|
actual_transcript = get_video_transcript_content(video.edx_video_id, language_code='en')
|
|
actual_sjson_content = json.loads(actual_transcript['content'].decode('utf-8'))
|
|
expected_sjson_content = json.loads(self.sjson_subs)
|
|
self.assertDictEqual(actual_sjson_content, expected_sjson_content)
|
|
|
|
def test_rename_transcript_fails_without_data(self):
|
|
"""
|
|
Verify that use current transcript fails if we do not provide video data in request.
|
|
"""
|
|
response = self.rename_transcript(locator=None)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Incoming video data is empty.'
|
|
)
|
|
|
|
def test_rename_transcript_fails_with_invalid_locator(self):
|
|
"""
|
|
Verify that use current transcript fails if video locator is missing in request.
|
|
"""
|
|
response = self.rename_transcript(locator='non-existent-locator')
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Cannot find item by locator.'
|
|
)
|
|
|
|
def test_rename_transcript_with_non_existent_sub(self):
|
|
"""
|
|
Verify that rename transcript fails if the `item.sub` don't
|
|
have any transcript associated in contentstore.
|
|
"""
|
|
# Update item's sub to an id who does not have any
|
|
# transcript associated in contentstore.
|
|
self.item.sub = 'non-existent-sub'
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
response = self.rename_transcript(locator=self.video_usage_key)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message="No such transcript."
|
|
)
|
|
|
|
def test_rename_transcript_fails_on_unknown_category(self):
|
|
"""
|
|
Test that validation fails if item's category is other than video.
|
|
"""
|
|
# non_video block setup - i.e. an item whose category is not 'video'.
|
|
usage_key = self.create_non_video_block()
|
|
# Make call to use current transcript from contentstore.
|
|
response = self.rename_transcript(usage_key)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Transcripts are supported only for "video" blocks.'
|
|
)
|
|
|
|
|
|
@ddt.ddt
|
|
@patch(
|
|
'cms.djangoapps.contentstore.views.transcripts_ajax.download_youtube_subs',
|
|
Mock(return_value=[['en', SJSON_TRANSCRIPT_CONTENT]])
|
|
)
|
|
class TestReplaceTranscripts(BaseTranscripts):
|
|
"""
|
|
Tests for '/transcripts/replace' endpoint.
|
|
"""
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.youtube_id = 'test_yt_id'
|
|
|
|
# Setup a VEDA produced video and persist `edx_video_id` in VAL.
|
|
create_video({
|
|
'edx_video_id': '123-456-789',
|
|
'status': 'upload',
|
|
'client_video_id': 'Test Video',
|
|
'duration': 0,
|
|
'encoded_videos': [],
|
|
'courses': [str(self.course.id)]
|
|
})
|
|
|
|
def replace_transcript(self, locator, youtube_id):
|
|
"""
|
|
Make an endpoint call to replace transcripts with youtube ones.
|
|
"""
|
|
payload = {}
|
|
if locator:
|
|
payload.update({'locator': str(locator)})
|
|
|
|
if youtube_id:
|
|
payload.update({
|
|
'videos': [
|
|
{
|
|
'type': 'youtube',
|
|
'video': youtube_id
|
|
}
|
|
]
|
|
})
|
|
|
|
replace_transcript_url = reverse('replace_transcripts')
|
|
response = self.client.get(replace_transcript_url, {'data': json.dumps(payload)})
|
|
return response
|
|
|
|
@ddt.data('123-456-789', '')
|
|
def test_replace_transcript_success(self, edx_video_id):
|
|
"""
|
|
Verify that "import from youtube" in video component basic tab works as
|
|
expected in case of following:
|
|
|
|
1. External video component
|
|
2. VEDA produced video component
|
|
"""
|
|
# In case of an external video component, the `edx_video_id` must be empty
|
|
# and VEDA produced video component will have `edx_video_id` set to VAL video ID.
|
|
self.item.edx_video_id = edx_video_id
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
# Make call to replace transcripts from youtube
|
|
response = self.replace_transcript(self.video_usage_key, self.youtube_id)
|
|
|
|
# Verify the response
|
|
self.assert_response(response, expected_status_code=200, expected_message='Success')
|
|
|
|
# Verify the `edx_video_id` on the video component
|
|
json_response = json.loads(response.content.decode('utf-8'))
|
|
expected_edx_video_id = edx_video_id if edx_video_id else json_response['edx_video_id']
|
|
video = modulestore().get_item(self.video_usage_key)
|
|
self.assertEqual(video.edx_video_id, expected_edx_video_id)
|
|
|
|
# Verify transcript content
|
|
actual_transcript = get_video_transcript_content(video.edx_video_id, language_code='en')
|
|
actual_sjson_content = json.loads(actual_transcript['content'].decode('utf-8'))
|
|
expected_sjson_content = json.loads(SJSON_TRANSCRIPT_CONTENT)
|
|
self.assertDictEqual(actual_sjson_content, expected_sjson_content)
|
|
|
|
def test_replace_transcript_library_content_success(self):
|
|
# Make call to replace transcripts from youtube
|
|
response = self.replace_transcript(self.library_block_metadata.usage_key, self.youtube_id)
|
|
|
|
# Verify the response
|
|
self.assert_response(response, expected_status_code=200, expected_message='Success')
|
|
|
|
# Obtain updated block
|
|
updated_block = xblock_api.load_block(
|
|
self.library_block_metadata.usage_key,
|
|
self.user,
|
|
)
|
|
|
|
# Verify transcript content
|
|
transcript = get_transcript(updated_block, 'en', Transcript.SJSON)
|
|
actual_sjson_content = json.loads(transcript[0])
|
|
expected_sjson_content = json.loads(SJSON_TRANSCRIPT_CONTENT)
|
|
self.assertDictEqual(actual_sjson_content, expected_sjson_content)
|
|
|
|
def test_replace_transcript_fails_without_data(self):
|
|
"""
|
|
Verify that replace transcript fails if we do not provide video data in request.
|
|
"""
|
|
response = self.replace_transcript(locator=None, youtube_id=None)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Incoming video data is empty.'
|
|
)
|
|
|
|
def test_replace_transcript_fails_with_invalid_locator(self):
|
|
"""
|
|
Verify that replace transcript fails if a video locator does not exist.
|
|
"""
|
|
response = self.replace_transcript(locator='non-existent-locator', youtube_id=self.youtube_id)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Cannot find item by locator.'
|
|
)
|
|
|
|
def test_replace_transcript_fails_without_yt_id(self):
|
|
"""
|
|
Verify that replace transcript fails if youtube id is not provided.
|
|
"""
|
|
response = self.replace_transcript(locator=self.video_usage_key, youtube_id=None)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='YouTube ID is required.'
|
|
)
|
|
|
|
def test_replace_transcript_no_transcript_on_yt(self):
|
|
"""
|
|
Verify that replace transcript fails if YouTube does not have transcript for the given youtube id.
|
|
"""
|
|
error_message = 'YT ID not found.'
|
|
patch_path = 'cms.djangoapps.contentstore.views.transcripts_ajax.download_youtube_subs'
|
|
with patch(patch_path) as mock_download_youtube_subs:
|
|
mock_download_youtube_subs.side_effect = GetTranscriptsFromYouTubeException(error_message)
|
|
response = self.replace_transcript(locator=self.video_usage_key, youtube_id='non-existent-yt-id')
|
|
self.assertContains(response, text=error_message, status_code=400)
|
|
|
|
def test_replace_transcript_fails_on_unknown_category(self):
|
|
"""
|
|
Test that validation fails if item's category is other than video.
|
|
"""
|
|
# non_video block setup - i.e. an item whose category is not 'video'.
|
|
usage_key = self.create_non_video_block()
|
|
response = self.replace_transcript(usage_key, youtube_id=self.youtube_id)
|
|
self.assert_response(
|
|
response,
|
|
expected_status_code=400,
|
|
expected_message='Transcripts are supported only for "video" blocks.'
|
|
)
|
|
|
|
|
|
class TestDownloadTranscripts(BaseTranscripts):
|
|
"""
|
|
Tests for '/transcripts/download' url.
|
|
"""
|
|
def update_video_component(self, sub=None, youtube_id=None):
|
|
"""
|
|
Updates video component with `sub` and `youtube_id`.
|
|
"""
|
|
sjson_transcript = json.loads(SJSON_TRANSCRIPT_CONTENT)
|
|
self.item.sub = sub
|
|
if sub:
|
|
self.save_subs_to_store(sjson_transcript, sub)
|
|
self.item.youtube_id_1_0 = youtube_id
|
|
if youtube_id:
|
|
self.save_subs_to_store(sjson_transcript, youtube_id)
|
|
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
def download_transcript(self, locator):
|
|
"""
|
|
Makes a call to download transcripts.
|
|
"""
|
|
payload = {}
|
|
if locator:
|
|
payload.update({'locator': str(locator)})
|
|
|
|
download_transcript_url = reverse('download_transcripts')
|
|
response = self.client.get(download_transcript_url, payload)
|
|
return response
|
|
|
|
def assert_download_response(self, response, expected_status_code, expected_content=None):
|
|
"""
|
|
Verify transcript download response.
|
|
"""
|
|
self.assertEqual(response.status_code, expected_status_code)
|
|
if expected_content:
|
|
assert response.content.decode('utf-8') == expected_content
|
|
|
|
def test_download_youtube_transcript_success(self):
|
|
"""
|
|
Verify that the transcript associated to YT id is downloaded successfully.
|
|
"""
|
|
self.update_video_component(youtube_id='JMD_ifUUfsU')
|
|
response = self.download_transcript(locator=self.video_usage_key)
|
|
self.assert_download_response(response, expected_content=SRT_TRANSCRIPT_CONTENT, expected_status_code=200)
|
|
|
|
def test_download_non_youtube_transcript_success(self):
|
|
"""
|
|
Verify that the transcript associated to item's `sub` is downloaded successfully.
|
|
"""
|
|
self.update_video_component(sub='test_subs')
|
|
response = self.download_transcript(locator=self.video_usage_key)
|
|
self.assert_download_response(response, expected_content=SRT_TRANSCRIPT_CONTENT, expected_status_code=200)
|
|
|
|
def test_download_transcript_404_without_locator(self):
|
|
"""
|
|
Verify that download transcript returns 404 without locator.
|
|
"""
|
|
response = self.download_transcript(locator=None)
|
|
self.assert_download_response(response, expected_status_code=404)
|
|
|
|
def test_download_transcript_404_with_bad_locator(self):
|
|
"""
|
|
Verify that download transcript returns 404 with invalid locator.
|
|
"""
|
|
response = self.download_transcript(locator='invalid-locator')
|
|
self.assert_download_response(response, expected_status_code=404)
|
|
|
|
def test_download_transcript_404_for_non_video_block(self):
|
|
"""
|
|
Verify that download transcript returns 404 for a non video block.
|
|
"""
|
|
usage_key = self.create_non_video_block()
|
|
response = self.download_transcript(locator=usage_key)
|
|
self.assert_download_response(response, expected_status_code=404)
|
|
|
|
def test_download_transcript_404_for_no_yt_and_no_sub(self):
|
|
"""
|
|
Verify that download transcript returns 404 when video component
|
|
does not have sub and youtube id.
|
|
"""
|
|
self.update_video_component(sub=None, youtube_id=None)
|
|
response = self.download_transcript(locator=self.video_usage_key)
|
|
self.assert_download_response(response, expected_status_code=404)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestCheckTranscripts(BaseTranscripts):
|
|
"""
|
|
Tests for '/transcripts/check' url.
|
|
"""
|
|
def test_success_download_nonyoutube(self):
|
|
subs_id = str(uuid4())
|
|
self.set_fields_from_xml(self.item, """
|
|
<video youtube="" sub="{}">
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.mp4"/>
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.webm"/>
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.ogv"/>
|
|
</video>
|
|
""".format(subs_id))
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
subs = {
|
|
'start': [100, 200, 240],
|
|
'end': [200, 240, 380],
|
|
'text': [
|
|
'subs #1',
|
|
'subs #2',
|
|
'subs #3'
|
|
]
|
|
}
|
|
self.save_subs_to_store(subs, subs_id)
|
|
|
|
data = {
|
|
'locator': str(self.video_usage_key),
|
|
'videos': [{
|
|
'type': 'html5',
|
|
'video': subs_id,
|
|
'mode': 'mp4',
|
|
}]
|
|
}
|
|
link = reverse('check_transcripts')
|
|
resp = self.client.get(link, {'data': json.dumps(data)})
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertDictEqual(
|
|
json.loads(resp.content.decode('utf-8')),
|
|
{
|
|
'status': 'Success',
|
|
'youtube_local': False,
|
|
'is_youtube_mode': False,
|
|
'youtube_server': False,
|
|
'command': 'found',
|
|
'current_item_subs': str(subs_id),
|
|
'youtube_diff': True,
|
|
'html5_local': [str(subs_id)],
|
|
'html5_equal': False,
|
|
}
|
|
)
|
|
|
|
remove_subs_from_store(subs_id, self.item)
|
|
|
|
def test_check_youtube(self):
|
|
self.set_fields_from_xml(self.item, '<video youtube="1:JMD_ifUUfsU" />')
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
subs = {
|
|
'start': [100, 200, 240],
|
|
'end': [200, 240, 380],
|
|
'text': [
|
|
'subs #1',
|
|
'subs #2',
|
|
'subs #3'
|
|
]
|
|
}
|
|
self.save_subs_to_store(subs, 'JMD_ifUUfsU')
|
|
link = reverse('check_transcripts')
|
|
data = {
|
|
'locator': str(self.video_usage_key),
|
|
'videos': [{
|
|
'type': 'youtube',
|
|
'video': 'JMD_ifUUfsU',
|
|
'mode': 'youtube',
|
|
}]
|
|
}
|
|
|
|
resp = self.client.get(link, {'data': json.dumps(data)})
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
self.assertDictEqual(
|
|
json.loads(resp.content.decode('utf-8')),
|
|
{
|
|
'status': 'Success',
|
|
'youtube_local': True,
|
|
'is_youtube_mode': True,
|
|
'youtube_server': False,
|
|
'command': 'found',
|
|
'current_item_subs': None,
|
|
'youtube_diff': True,
|
|
'html5_local': [],
|
|
'html5_equal': False,
|
|
}
|
|
)
|
|
|
|
@patch('openedx.core.djangoapps.video_config.transcripts_utils.requests.get')
|
|
def test_check_youtube_with_transcript_name(self, mock_get):
|
|
"""
|
|
Test that the transcripts are fetched correctly when the the transcript name is set
|
|
"""
|
|
self.set_fields_from_xml(self.item, '<video youtube="good_id_2" />')
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
subs = {
|
|
'start': [100, 200, 240],
|
|
'end': [200, 240, 380],
|
|
'text': [
|
|
'subs #1',
|
|
'subs #2',
|
|
'subs #3'
|
|
]
|
|
}
|
|
self.save_subs_to_store(subs, 'good_id_2')
|
|
setup_caption_responses(mock_get, 'en', 'caption_response_string')
|
|
link = reverse('check_transcripts')
|
|
data = {
|
|
'locator': str(self.video_usage_key),
|
|
'videos': [{
|
|
'type': 'youtube',
|
|
'video': 'good_id_2',
|
|
'mode': 'youtube',
|
|
}]
|
|
}
|
|
resp = self.client.get(link, {'data': json.dumps(data)})
|
|
|
|
self.assertEqual(2, len(mock_get.mock_calls))
|
|
args, kwargs = mock_get.call_args_list[0]
|
|
self.assertEqual(args[0], 'https://www.youtube.com/watch?v=good_id_2')
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
self.assertDictEqual(
|
|
json.loads(resp.content.decode('utf-8')),
|
|
{
|
|
'status': 'Success',
|
|
'youtube_local': True,
|
|
'is_youtube_mode': True,
|
|
'youtube_server': True,
|
|
'command': 'replace',
|
|
'current_item_subs': None,
|
|
'youtube_diff': True,
|
|
'html5_local': [],
|
|
'html5_equal': False,
|
|
}
|
|
)
|
|
|
|
def test_fail_data_without_id(self):
|
|
link = reverse('check_transcripts')
|
|
data = {
|
|
'locator': '',
|
|
'videos': [{
|
|
'type': '',
|
|
'video': '',
|
|
'mode': '',
|
|
}]
|
|
}
|
|
resp = self.client.get(link, {'data': json.dumps(data)})
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(json.loads(resp.content.decode('utf-8')).get('status'), "Can't find item by locator.")
|
|
|
|
def test_fail_data_with_bad_locator(self):
|
|
# Test for raising `InvalidLocationError` exception.
|
|
link = reverse('check_transcripts')
|
|
data = {
|
|
'locator': '',
|
|
'videos': [{
|
|
'type': '',
|
|
'video': '',
|
|
'mode': '',
|
|
}]
|
|
}
|
|
resp = self.client.get(link, {'data': json.dumps(data)})
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(json.loads(resp.content.decode('utf-8')).get('status'), "Can't find item by locator.")
|
|
|
|
# Test for raising `ItemNotFoundError` exception.
|
|
data = {
|
|
'locator': '{}_{}'.format(self.video_usage_key, 'BAD_LOCATOR'),
|
|
'videos': [{
|
|
'type': '',
|
|
'video': '',
|
|
'mode': '',
|
|
}]
|
|
}
|
|
resp = self.client.get(link, {'data': json.dumps(data)})
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(json.loads(resp.content.decode('utf-8')).get('status'), "Can't find item by locator.")
|
|
|
|
def test_fail_for_non_video_block(self):
|
|
# Not video block: setup
|
|
data = {
|
|
'parent_locator': str(self.course.location),
|
|
'category': 'problem',
|
|
'type': 'problem'
|
|
}
|
|
resp = self.client.ajax_post('/xblock/', data)
|
|
usage_key = self._get_usage_key(resp)
|
|
subs_id = str(uuid4())
|
|
item = modulestore().get_item(usage_key)
|
|
self.set_fields_from_xml(self.item, ("""
|
|
<problem youtube="" sub="{}">
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.mp4"/>
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.webm"/>
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.ogv"/>
|
|
</problem>
|
|
""".format(subs_id)))
|
|
modulestore().update_item(item, self.user.id)
|
|
|
|
subs = {
|
|
'start': [100, 200, 240],
|
|
'end': [200, 240, 380],
|
|
'text': [
|
|
'subs #1',
|
|
'subs #2',
|
|
'subs #3'
|
|
]
|
|
}
|
|
self.save_subs_to_store(subs, subs_id)
|
|
|
|
data = {
|
|
'locator': str(usage_key),
|
|
'videos': [{
|
|
'type': '',
|
|
'video': '',
|
|
'mode': '',
|
|
}]
|
|
}
|
|
link = reverse('check_transcripts')
|
|
resp = self.client.get(link, {'data': json.dumps(data)})
|
|
self.assertEqual(resp.status_code, 400)
|
|
self.assertEqual(
|
|
json.loads(resp.content.decode('utf-8')).get('status'),
|
|
'Transcripts are supported only for "video" blocks.',
|
|
)
|
|
|
|
@patch('openedx.core.djangoapps.video_config.transcripts_utils.get_video_transcript_content')
|
|
def test_command_for_fallback_transcript(self, mock_get_video_transcript_content):
|
|
"""
|
|
Verify the command if a transcript is there in edx-val.
|
|
"""
|
|
mock_get_video_transcript_content.return_value = {
|
|
'content': json.dumps({
|
|
"start": [10],
|
|
"end": [100],
|
|
"text": ["Hi, welcome to Edx."],
|
|
}),
|
|
'file_name': 'edx.sjson'
|
|
}
|
|
|
|
# video_transcript_feature.return_value = feature_enabled
|
|
self.set_fields_from_xml(self.item, ("""
|
|
<video youtube="" sub="" edx_video_id="123">
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.mp4"/>
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.webm"/>
|
|
<source src="http://www.quirksmode.org/html5/videos/big_buck_bunny.ogv"/>
|
|
</video>
|
|
"""))
|
|
modulestore().update_item(self.item, self.user.id)
|
|
|
|
# Make request to check transcript view
|
|
data = {
|
|
'locator': str(self.video_usage_key),
|
|
'videos': [{
|
|
'type': 'html5',
|
|
'video': "",
|
|
'mode': 'mp4',
|
|
}]
|
|
}
|
|
check_transcripts_url = reverse('check_transcripts')
|
|
response = self.client.get(check_transcripts_url, {'data': json.dumps(data)})
|
|
|
|
# Assert the response
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertDictEqual(
|
|
json.loads(response.content.decode('utf-8')),
|
|
{
|
|
'status': 'Success',
|
|
'youtube_local': False,
|
|
'is_youtube_mode': False,
|
|
'youtube_server': False,
|
|
'command': 'found',
|
|
'current_item_subs': None,
|
|
'youtube_diff': True,
|
|
'html5_local': [],
|
|
'html5_equal': False,
|
|
}
|
|
)
|