Revert "edX Course/Library Import/Export API"

This reverts commit c94abd2705.
This commit is contained in:
Brandon DeRosier
2015-07-07 16:53:35 -04:00
committed by christopher lee
parent 783e83deb0
commit 2bfbda3c1e
20 changed files with 850 additions and 1392 deletions

View File

@@ -345,7 +345,7 @@ class CourseKeyVerificationTestCase(CourseTestCase):
resp = self.client.get_html(url)
self.assertEqual(resp.status_code, status_code)
url = '/api/import_export/v1/courses/{course_key}/import_status/{filename}'.format(
url = '/import_status/{course_key}/{filename}'.format(
course_key=course_key,
filename='xyz.tar.gz'
)

View File

@@ -2,166 +2,485 @@
These views handle all actions in Studio related to import and exporting of
courses
"""
import base64
import logging
from opaque_keys import InvalidKeyError
import os
import re
import shutil
import tarfile
from path import path
from tempfile import mkdtemp
from contentstore.utils import reverse_course_url, reverse_library_url, reverse_usage_url
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.core.exceptions import PermissionDenied
from django.core.urlresolvers import reverse
from django.core.exceptions import SuspiciousOperation, PermissionDenied
from django.core.files.temp import NamedTemporaryFile
from django.core.servers.basehttp import FileWrapper
from django.http import HttpResponse, HttpResponseNotFound
from django.utils.translation import ugettext as _
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import ensure_csrf_cookie
from django.views.decorators.http import require_http_methods, require_GET
import dogstats_wrapper as dog_stats_api
from edxmako.shortcuts import render_to_response
from xmodule.contentstore.django import contentstore
from xmodule.exceptions import SerializationError
from xmodule.modulestore.django import modulestore
from opaque_keys.edx.keys import CourseKey
from opaque_keys.edx.locator import LibraryLocator
from xmodule.modulestore.xml_importer import import_course_from_xml, import_library_from_xml
from xmodule.modulestore.xml_exporter import export_course_to_xml, export_library_to_xml
from xmodule.modulestore import COURSE_ROOT, LIBRARY_ROOT
from student.auth import has_course_author_access
from openedx.core.lib.extract_tar import safetar_extractall
from util.json_request import JsonResponse
from util.views import ensure_valid_course_key
from xmodule.modulestore.django import modulestore
from urllib import urlencode
from contentstore.utils import reverse_course_url, reverse_usage_url, reverse_library_url
__all__ = ["import_handler", "export_handler"]
__all__ = [
'import_handler', 'import_status_handler',
'export_handler',
]
log = logging.getLogger(__name__)
# Regex to capture Content-Range header ranges.
CONTENT_RE = re.compile(
r"(?P<start>\d{1,11})-(?P<stop>\d{1,11})/(?P<end>\d{1,11})"
)
CONTENT_RE = re.compile(r"(?P<start>\d{1,11})-(?P<stop>\d{1,11})/(?P<end>\d{1,11})")
# pylint: disable=unused-argument
@login_required
@ensure_csrf_cookie
@require_http_methods(("GET",))
@require_http_methods(("GET", "POST", "PUT"))
@ensure_valid_course_key
def import_handler(request, course_key_string):
"""
The restful handler for the import page.
The restful handler for importing a course.
GET
html: return html page for import page
json: not supported
POST or PUT
json: import a course via the .tar.gz file specified in request.FILES
"""
courselike_key = CourseKey.from_string(course_key_string)
library = isinstance(courselike_key, LibraryLocator)
if library:
successful_url = reverse_library_url("library_handler", courselike_key)
root_name = LIBRARY_ROOT
successful_url = reverse_library_url('library_handler', courselike_key)
context_name = 'context_library'
courselike_module = modulestore().get_library(courselike_key)
context_name = "context_library"
import_func = import_library_from_xml
else:
successful_url = reverse_course_url("course_handler", courselike_key)
root_name = COURSE_ROOT
successful_url = reverse_course_url('course_handler', courselike_key)
context_name = 'context_course'
courselike_module = modulestore().get_course(courselike_key)
context_name = "context_course"
import_func = import_course_from_xml
return _import_handler(
request, courselike_key, root_name, successful_url, context_name, courselike_module, import_func
)
def _import_handler(request, courselike_key, root_name, successful_url, context_name, courselike_module, import_func):
"""
Parameterized function containing the meat of import_handler.
"""
if not has_course_author_access(request.user, courselike_key):
raise PermissionDenied()
return render_to_response("import.html", {
context_name: courselike_module,
"successful_import_redirect_url": successful_url,
"import_status_url": reverse(
"course_import_status_handler",
kwargs={
"course_key_string": unicode(courselike_key),
"filename": "fillerName"
}
),
"import_url": reverse(
"course_import_export_handler",
kwargs={
"course_key_string": unicode(courselike_key),
}
),
"library": library
})
if 'application/json' in request.META.get('HTTP_ACCEPT', 'application/json'):
if request.method == 'GET':
raise NotImplementedError('coming soon')
else:
# Do everything in a try-except block to make sure everything is properly cleaned up.
try:
data_root = path(settings.GITHUB_REPO_ROOT)
subdir = base64.urlsafe_b64encode(repr(courselike_key))
course_dir = data_root / subdir
filename = request.FILES['course-data'].name
# Use sessions to keep info about import progress
session_status = request.session.setdefault("import_status", {})
courselike_string = unicode(courselike_key) + filename
_save_request_status(request, courselike_string, 0)
if not filename.endswith('.tar.gz'):
_save_request_status(request, courselike_string, -1)
return JsonResponse(
{
'ErrMsg': _('We only support uploading a .tar.gz file.'),
'Stage': -1
},
status=415
)
temp_filepath = course_dir / filename
if not course_dir.isdir():
os.mkdir(course_dir)
logging.debug('importing course to {0}'.format(temp_filepath))
# Get upload chunks byte ranges
try:
matches = CONTENT_RE.search(request.META["HTTP_CONTENT_RANGE"])
content_range = matches.groupdict()
except KeyError: # Single chunk
# no Content-Range header, so make one that will work
content_range = {'start': 0, 'stop': 1, 'end': 2}
# stream out the uploaded files in chunks to disk
if int(content_range['start']) == 0:
mode = "wb+"
else:
mode = "ab+"
size = os.path.getsize(temp_filepath)
# Check to make sure we haven't missed a chunk
# This shouldn't happen, even if different instances are handling
# the same session, but it's always better to catch errors earlier.
if size < int(content_range['start']):
_save_request_status(request, courselike_string, -1)
log.warning(
"Reported range %s does not match size downloaded so far %s",
content_range['start'],
size
)
return JsonResponse(
{
'ErrMsg': _('File upload corrupted. Please try again'),
'Stage': -1
},
status=409
)
# The last request sometimes comes twice. This happens because
# nginx sends a 499 error code when the response takes too long.
elif size > int(content_range['stop']) and size == int(content_range['end']):
return JsonResponse({'ImportStatus': 1})
with open(temp_filepath, mode) as temp_file:
for chunk in request.FILES['course-data'].chunks():
temp_file.write(chunk)
size = os.path.getsize(temp_filepath)
if int(content_range['stop']) != int(content_range['end']) - 1:
# More chunks coming
return JsonResponse({
"files": [{
"name": filename,
"size": size,
"deleteUrl": "",
"deleteType": "",
"url": reverse_course_url('import_handler', courselike_key),
"thumbnailUrl": ""
}]
})
# Send errors to client with stage at which error occurred.
except Exception as exception: # pylint: disable=broad-except
_save_request_status(request, courselike_string, -1)
if course_dir.isdir():
shutil.rmtree(course_dir)
log.info("Course import %s: Temp data cleared", courselike_key)
log.exception(
"error importing course"
)
return JsonResponse(
{
'ErrMsg': str(exception),
'Stage': -1
},
status=400
)
# try-finally block for proper clean up after receiving last chunk.
try:
# This was the last chunk.
log.info("Course import %s: Upload complete", courselike_key)
_save_request_status(request, courselike_string, 1)
tar_file = tarfile.open(temp_filepath)
try:
safetar_extractall(tar_file, (course_dir + '/').encode('utf-8'))
except SuspiciousOperation as exc:
_save_request_status(request, courselike_string, -1)
return JsonResponse(
{
'ErrMsg': 'Unsafe tar file. Aborting import.',
'SuspiciousFileOperationMsg': exc.args[0],
'Stage': -1
},
status=400
)
finally:
tar_file.close()
log.info("Course import %s: Uploaded file extracted", courselike_key)
_save_request_status(request, courselike_string, 2)
# find the 'course.xml' file
def get_all_files(directory):
"""
For each file in the directory, yield a 2-tuple of (file-name,
directory-path)
"""
for dirpath, _dirnames, filenames in os.walk(directory):
for filename in filenames:
yield (filename, dirpath)
def get_dir_for_fname(directory, filename):
"""
Returns the dirpath for the first file found in the directory
with the given name. If there is no file in the directory with
the specified name, return None.
"""
for fname, dirpath in get_all_files(directory):
if fname == filename:
return dirpath
return None
dirpath = get_dir_for_fname(course_dir, root_name)
if not dirpath:
_save_request_status(request, courselike_string, -2)
return JsonResponse(
{
'ErrMsg': _('Could not find the {0} file in the package.').format(root_name),
'Stage': -2
},
status=415
)
dirpath = os.path.relpath(dirpath, data_root)
logging.debug('found %s at %s', root_name, dirpath)
log.info("Course import %s: Extracted file verified", courselike_key)
_save_request_status(request, courselike_string, 3)
with dog_stats_api.timer(
'courselike_import.time',
tags=[u"courselike:{}".format(courselike_key)]
):
courselike_items = import_func(
modulestore(), request.user.id,
settings.GITHUB_REPO_ROOT, [dirpath],
load_error_modules=False,
static_content_store=contentstore(),
target_id=courselike_key
)
new_location = courselike_items[0].location
logging.debug('new course at %s', new_location)
log.info("Course import %s: Course import successful", courselike_key)
_save_request_status(request, courselike_string, 4)
# Send errors to client with stage at which error occurred.
except Exception as exception: # pylint: disable=broad-except
log.exception(
"error importing course"
)
return JsonResponse(
{
'ErrMsg': str(exception),
'Stage': -session_status[courselike_string]
},
status=400
)
finally:
if course_dir.isdir():
shutil.rmtree(course_dir)
log.info("Course import %s: Temp data cleared", courselike_key)
# set failed stage number with negative sign in case of unsuccessful import
if session_status[courselike_string] != 4:
_save_request_status(request, courselike_string, -abs(session_status[courselike_string]))
return JsonResponse({'Status': 'OK'})
elif request.method == 'GET': # assume html
status_url = reverse_course_url(
"import_status_handler", courselike_key, kwargs={'filename': "fillerName"}
)
return render_to_response('import.html', {
context_name: courselike_module,
'successful_import_redirect_url': successful_url,
'import_status_url': status_url,
'library': isinstance(courselike_key, LibraryLocator)
})
else:
return HttpResponseNotFound()
def _save_request_status(request, key, status):
"""
Save import status for a course in request session
"""
session_status = request.session.get('import_status')
if session_status is None:
session_status = request.session.setdefault("import_status", {})
session_status[key] = status
request.session.save()
# pylint: disable=unused-argument
@require_GET
@ensure_csrf_cookie
@login_required
@ensure_valid_course_key
def import_status_handler(request, course_key_string, filename=None):
"""
Returns an integer corresponding to the status of a file import. These are:
-X : Import unsuccessful due to some error with X as stage [0-3]
0 : No status info found (import done or upload still in progress)
1 : Extracting file
2 : Validating.
3 : Importing to mongo
4 : Import successful
"""
course_key = CourseKey.from_string(course_key_string)
if not has_course_author_access(request.user, course_key):
raise PermissionDenied()
try:
session_status = request.session["import_status"]
status = session_status[course_key_string + filename]
except KeyError:
status = 0
return JsonResponse({"ImportStatus": status})
def create_export_tarball(course_module, course_key, context):
"""
Generates the export tarball, or returns None if there was an error.
Updates the context with any error information if applicable.
"""
name = course_module.url_name
export_file = NamedTemporaryFile(prefix=name + '.', suffix=".tar.gz")
root_dir = path(mkdtemp())
try:
if isinstance(course_key, LibraryLocator):
export_library_to_xml(modulestore(), contentstore(), course_key, root_dir, name)
else:
export_course_to_xml(modulestore(), contentstore(), course_module.id, root_dir, name)
logging.debug(u'tar file being generated at %s', export_file.name)
with tarfile.open(name=export_file.name, mode='w:gz') as tar_file:
tar_file.add(root_dir / name, arcname=name)
except SerializationError as exc:
log.exception(u'There was an error exporting %s', course_key)
unit = None
failed_item = None
parent = None
try:
failed_item = modulestore().get_item(exc.location)
parent_loc = modulestore().get_parent_location(failed_item.location)
if parent_loc is not None:
parent = modulestore().get_item(parent_loc)
if parent.location.category == 'vertical':
unit = parent
except: # pylint: disable=bare-except
# if we have a nested exception, then we'll show the more generic error message
pass
context.update({
'in_err': True,
'raw_err_msg': str(exc),
'failed_module': failed_item,
'unit': unit,
'edit_unit_url': reverse_usage_url("container_handler", parent.location) if parent else "",
})
raise
except Exception as exc:
log.exception('There was an error exporting %s', course_key)
context.update({
'in_err': True,
'unit': None,
'raw_err_msg': str(exc)})
raise
finally:
shutil.rmtree(root_dir / name)
return export_file
def send_tarball(tarball):
"""
Renders a tarball to response, for use when sending a tar.gz file to the user.
"""
wrapper = FileWrapper(tarball)
response = HttpResponse(wrapper, content_type='application/x-tgz')
response['Content-Disposition'] = 'attachment; filename=%s' % os.path.basename(tarball.name.encode('utf-8'))
response['Content-Length'] = os.path.getsize(tarball.name)
return response
# pylint: disable=unused-argument
@ensure_csrf_cookie
@login_required
@require_http_methods(("GET",))
@ensure_valid_course_key
def export_handler(request, course_key_string):
"""
The restful handler for the export page.
The restful handler for exporting a course.
GET
html: return html page for import page
application/x-tgz: return tar.gz file containing exported course
json: not supported
Note that there are 2 ways to request the tar.gz file. The request header can specify
application/x-tgz via HTTP_ACCEPT, or a query parameter can be used (?_accept=application/x-tgz).
If the tar.gz file has been requested but the export operation fails, an HTML page will be returned
which describes the error.
"""
error = request.GET.get("error", None)
error_message = request.GET.get("error_message", None)
failed_module = request.GET.get("failed_module", None)
unit = request.GET.get("unit", None)
courselike_key = CourseKey.from_string(course_key_string)
library = isinstance(courselike_key, LibraryLocator)
if library:
successful_url = reverse_library_url("library_handler", courselike_key)
courselike_module = modulestore().get_library(courselike_key)
context_name = "context_library"
else:
successful_url = reverse_course_url("course_handler", courselike_key)
courselike_module = modulestore().get_course(courselike_key)
context_name = "context_course"
if not has_course_author_access(request.user, courselike_key):
course_key = CourseKey.from_string(course_key_string)
export_url = reverse_course_url('export_handler', course_key)
if not has_course_author_access(request.user, course_key):
raise PermissionDenied()
export_url = reverse(
"course_import_export_handler",
kwargs={
"course_key_string": unicode(courselike_key),
if isinstance(course_key, LibraryLocator):
courselike_module = modulestore().get_library(course_key)
context = {
'context_library': courselike_module,
'courselike_home_url': reverse_library_url("library_handler", course_key),
'library': True
}
else:
courselike_module = modulestore().get_course(course_key)
context = {
'context_course': courselike_module,
'courselike_home_url': reverse_course_url("course_handler", course_key),
'library': False
}
) + "?accept=application/x-tgz"
export_url += "&{0}".format(
urlencode({
"redirect": reverse_course_url(
"export_handler",
unicode(courselike_key)
)
})
)
context['export_url'] = export_url + '?_accept=application/x-tgz'
if unit:
# an _accept URL parameter will be preferred over HTTP_ACCEPT in the header.
requested_format = request.REQUEST.get('_accept', request.META.get('HTTP_ACCEPT', 'text/html'))
if 'application/x-tgz' in requested_format:
try:
edit_unit_url = reverse_usage_url("container_handler", unit)
except (InvalidKeyError, AttributeError):
log.error("Invalid parent key supplied to export view: %s", unit)
tarball = create_export_tarball(courselike_module, course_key, context)
except SerializationError:
return render_to_response('export.html', context)
return send_tarball(tarball)
return render_to_response("export.html", {
context_name: courselike_module,
"export_url": export_url,
"raw_err_msg": _(
"An invalid parent key was supplied: \"{supplied_key}\" "
"is not a valid course unit."
).format(supplied_key=unit),
"library": library
})
else:
edit_unit_url = ""
elif 'text/html' in requested_format:
return render_to_response('export.html', context)
if error:
return render_to_response('export.html', {
context_name: courselike_module,
"export_url": export_url,
"in_err": error,
"unit": unit,
"failed_module": failed_module,
"edit_unit_url": edit_unit_url,
"course_home_url": successful_url,
"raw_err_msg": error_message,
"library": library
})
else:
return render_to_response("export.html", {
context_name: courselike_module,
"export_url": export_url,
"library": library
})
# Only HTML or x-tgz request formats are supported (no JSON).
return HttpResponse(status=406)

View File

@@ -1,15 +1,329 @@
"""
Unit tests for course import and export
"""
import copy
import json
import logging
import lxml
import os
import shutil
import tarfile
import tempfile
from path import path
from uuid import uuid4
from django.test.utils import override_settings
from django.conf import settings
from xmodule.contentstore.django import contentstore
from xmodule.modulestore.xml_exporter import export_library_to_xml
from xmodule.modulestore.xml_importer import import_library_from_xml
from xmodule.modulestore import LIBRARY_ROOT
from contentstore.utils import reverse_course_url
from xmodule.modulestore.tests.factories import ItemFactory, LibraryFactory
from contentstore.tests.utils import CourseTestCase
from contentstore.utils import reverse_course_url
from openedx.core.lib.extract_tar import safetar_extractall
from student import auth
from student.roles import CourseInstructorRole, CourseStaffRole
TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE)
TEST_DATA_CONTENTSTORE['DOC_STORE_CONFIG']['db'] = 'test_xcontent_%s' % uuid4().hex
TEST_DATA_DIR = settings.COMMON_TEST_DATA_ROOT
log = logging.getLogger(__name__)
class ImportExportTestCase(CourseTestCase):
@override_settings(CONTENTSTORE=TEST_DATA_CONTENTSTORE)
class ImportTestCase(CourseTestCase):
"""
Unit tests for importing a course or Library
"""
def setUp(self):
super(ImportTestCase, self).setUp()
self.url = reverse_course_url('import_handler', self.course.id)
self.content_dir = path(tempfile.mkdtemp())
self.addCleanup(shutil.rmtree, self.content_dir)
def touch(name):
""" Equivalent to shell's 'touch'"""
with file(name, 'a'):
os.utime(name, None)
# Create tar test files -----------------------------------------------
# OK course:
good_dir = tempfile.mkdtemp(dir=self.content_dir)
# test course being deeper down than top of tar file
embedded_dir = os.path.join(good_dir, "grandparent", "parent")
os.makedirs(os.path.join(embedded_dir, "course"))
with open(os.path.join(embedded_dir, "course.xml"), "w+") as f:
f.write('<course url_name="2013_Spring" org="EDx" course="0.00x"/>')
with open(os.path.join(embedded_dir, "course", "2013_Spring.xml"), "w+") as f:
f.write('<course></course>')
self.good_tar = os.path.join(self.content_dir, "good.tar.gz")
with tarfile.open(self.good_tar, "w:gz") as gtar:
gtar.add(good_dir)
# Bad course (no 'course.xml' file):
bad_dir = tempfile.mkdtemp(dir=self.content_dir)
touch(os.path.join(bad_dir, "bad.xml"))
self.bad_tar = os.path.join(self.content_dir, "bad.tar.gz")
with tarfile.open(self.bad_tar, "w:gz") as btar:
btar.add(bad_dir)
self.unsafe_common_dir = path(tempfile.mkdtemp(dir=self.content_dir))
def test_no_coursexml(self):
"""
Check that the response for a tar.gz import without a course.xml is
correct.
"""
with open(self.bad_tar) as btar:
resp = self.client.post(
self.url,
{
"name": self.bad_tar,
"course-data": [btar]
})
self.assertEquals(resp.status_code, 415)
# Check that `import_status` returns the appropriate stage (i.e., the
# stage at which import failed).
resp_status = self.client.get(
reverse_course_url(
'import_status_handler',
self.course.id,
kwargs={'filename': os.path.split(self.bad_tar)[1]}
)
)
self.assertEquals(json.loads(resp_status.content)["ImportStatus"], -2)
def test_with_coursexml(self):
"""
Check that the response for a tar.gz import with a course.xml is
correct.
"""
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
def test_import_in_existing_course(self):
"""
Check that course is imported successfully in existing course and users have their access roles
"""
# Create a non_staff user and add it to course staff only
__, nonstaff_user = self.create_non_staff_authed_user_client()
auth.add_users(self.user, CourseStaffRole(self.course.id), nonstaff_user)
course = self.store.get_course(self.course.id)
self.assertIsNotNone(course)
display_name_before_import = course.display_name
# Check that global staff user can import course
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
course = self.store.get_course(self.course.id)
self.assertIsNotNone(course)
display_name_after_import = course.display_name
# Check that course display name have changed after import
self.assertNotEqual(display_name_before_import, display_name_after_import)
# Now check that non_staff user has his same role
self.assertFalse(CourseInstructorRole(self.course.id).has_user(nonstaff_user))
self.assertTrue(CourseStaffRole(self.course.id).has_user(nonstaff_user))
# Now course staff user can also successfully import course
self.client.login(username=nonstaff_user.username, password='foo')
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
# Now check that non_staff user has his same role
self.assertFalse(CourseInstructorRole(self.course.id).has_user(nonstaff_user))
self.assertTrue(CourseStaffRole(self.course.id).has_user(nonstaff_user))
## Unsafe tar methods #####################################################
# Each of these methods creates a tarfile with a single type of unsafe
# content.
def _fifo_tar(self):
"""
Tar file with FIFO
"""
fifop = self.unsafe_common_dir / "fifo.file"
fifo_tar = self.unsafe_common_dir / "fifo.tar.gz"
os.mkfifo(fifop)
with tarfile.open(fifo_tar, "w:gz") as tar:
tar.add(fifop)
return fifo_tar
def _symlink_tar(self):
"""
Tarfile with symlink to path outside directory.
"""
outsidep = self.unsafe_common_dir / "unsafe_file.txt"
symlinkp = self.unsafe_common_dir / "symlink.txt"
symlink_tar = self.unsafe_common_dir / "symlink.tar.gz"
outsidep.symlink(symlinkp)
with tarfile.open(symlink_tar, "w:gz") as tar:
tar.add(symlinkp)
return symlink_tar
def _outside_tar(self):
"""
Tarfile with file that extracts to outside directory.
Extracting this tarfile in directory <dir> will put its contents
directly in <dir> (rather than <dir/tarname>).
"""
outside_tar = self.unsafe_common_dir / "unsafe_file.tar.gz"
with tarfile.open(outside_tar, "w:gz") as tar:
tar.addfile(tarfile.TarInfo(str(self.content_dir / "a_file")))
return outside_tar
def _outside_tar2(self):
"""
Tarfile with file that extracts to outside directory.
The path here matches the basename (`self.unsafe_common_dir`), but
then "cd's out". E.g. "/usr/../etc" == "/etc", but the naive basename
of the first (but not the second) is "/usr"
Extracting this tarfile in directory <dir> will also put its contents
directly in <dir> (rather than <dir/tarname>).
"""
outside_tar = self.unsafe_common_dir / "unsafe_file.tar.gz"
with tarfile.open(outside_tar, "w:gz") as tar:
tar.addfile(tarfile.TarInfo(str(self.unsafe_common_dir / "../a_file")))
return outside_tar
def test_unsafe_tar(self):
"""
Check that safety measure work.
This includes:
'tarbombs' which include files or symlinks with paths
outside or directly in the working directory,
'special files' (character device, block device or FIFOs),
all raise exceptions/400s.
"""
def try_tar(tarpath):
""" Attempt to tar an unacceptable file """
with open(tarpath) as tar:
args = {"name": tarpath, "course-data": [tar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 400)
self.assertTrue("SuspiciousFileOperation" in resp.content)
try_tar(self._fifo_tar())
try_tar(self._symlink_tar())
try_tar(self._outside_tar())
try_tar(self._outside_tar2())
# Check that `import_status` returns the appropriate stage (i.e.,
# either 3, indicating all previous steps are completed, or 0,
# indicating no upload in progress)
resp_status = self.client.get(
reverse_course_url(
'import_status_handler',
self.course.id,
kwargs={'filename': os.path.split(self.good_tar)[1]}
)
)
import_status = json.loads(resp_status.content)["ImportStatus"]
self.assertIn(import_status, (0, 3))
def test_library_import(self):
"""
Try importing a known good library archive, and verify that the
contents of the library have completely replaced the old contents.
"""
# Create some blocks to overwrite
library = LibraryFactory.create(modulestore=self.store)
lib_key = library.location.library_key
test_block = ItemFactory.create(
category="vertical",
parent_location=library.location,
user_id=self.user.id,
publish_item=False,
)
test_block2 = ItemFactory.create(
category="vertical",
parent_location=library.location,
user_id=self.user.id,
publish_item=False
)
# Create a library and blocks that should remain unmolested.
unchanged_lib = LibraryFactory.create()
unchanged_key = unchanged_lib.location.library_key
test_block3 = ItemFactory.create(
category="vertical",
parent_location=unchanged_lib.location,
user_id=self.user.id,
publish_item=False
)
test_block4 = ItemFactory.create(
category="vertical",
parent_location=unchanged_lib.location,
user_id=self.user.id,
publish_item=False
)
# Refresh library.
library = self.store.get_library(lib_key)
children = [self.store.get_item(child).url_name for child in library.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block.url_name, children)
self.assertIn(test_block2.url_name, children)
unchanged_lib = self.store.get_library(unchanged_key)
children = [self.store.get_item(child).url_name for child in unchanged_lib.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block3.url_name, children)
self.assertIn(test_block4.url_name, children)
extract_dir = path(tempfile.mkdtemp())
try:
tar = tarfile.open(path(TEST_DATA_DIR) / 'imports' / 'library.HhJfPD.tar.gz')
safetar_extractall(tar, extract_dir)
library_items = import_library_from_xml(
self.store, self.user.id,
settings.GITHUB_REPO_ROOT, [extract_dir / 'library'],
load_error_modules=False,
static_content_store=contentstore(),
target_id=lib_key
)
finally:
shutil.rmtree(extract_dir)
self.assertEqual(lib_key, library_items[0].location.library_key)
library = self.store.get_library(lib_key)
children = [self.store.get_item(child).url_name for child in library.children]
self.assertEqual(len(children), 3)
self.assertNotIn(test_block.url_name, children)
self.assertNotIn(test_block2.url_name, children)
unchanged_lib = self.store.get_library(unchanged_key)
children = [self.store.get_item(child).url_name for child in unchanged_lib.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block3.url_name, children)
self.assertIn(test_block4.url_name, children)
@override_settings(CONTENTSTORE=TEST_DATA_CONTENTSTORE)
class ExportTestCase(CourseTestCase):
"""
Tests for export_handler.
"""
@@ -17,34 +331,116 @@ class ImportExportTestCase(CourseTestCase):
"""
Sets up the test course.
"""
super(ImportExportTestCase, self).setUp()
self.import_url = reverse_course_url('import_handler', self.course.id)
self.export_url = reverse_course_url('export_handler', self.course.id)
def test_import_html(self):
"""
Get the HTML for the import page.
"""
resp = self.client.get_html(self.import_url)
self.assertEquals(resp.status_code, 200)
self.assertContains(resp, "Replace Your Course Content")
super(ExportTestCase, self).setUp()
self.url = reverse_course_url('export_handler', self.course.id)
def test_export_html(self):
"""
Get the HTML for the export page.
Get the HTML for the page.
"""
resp = self.client.get_html(self.export_url)
resp = self.client.get_html(self.url)
self.assertEquals(resp.status_code, 200)
self.assertContains(resp, "Export My Course Content")
def test_permission_denied(self):
def test_export_json_unsupported(self):
"""
Test if the views handle unauthorized requests properly
JSON is unsupported.
"""
# pylint: disable=unused-variable
client, user = self.create_non_staff_authed_user_client(
authenticate=True
resp = self.client.get(self.url, HTTP_ACCEPT='application/json')
self.assertEquals(resp.status_code, 406)
def test_export_targz(self):
"""
Get tar.gz file, using HTTP_ACCEPT.
"""
resp = self.client.get(self.url, HTTP_ACCEPT='application/x-tgz')
self._verify_export_succeeded(resp)
def test_export_targz_urlparam(self):
"""
Get tar.gz file, using URL parameter.
"""
resp = self.client.get(self.url + '?_accept=application/x-tgz')
self._verify_export_succeeded(resp)
def _verify_export_succeeded(self, resp):
""" Export success helper method. """
self.assertEquals(resp.status_code, 200)
self.assertTrue(resp.get('Content-Disposition').startswith('attachment'))
def test_export_failure_top_level(self):
"""
Export failure.
"""
fake_xblock = ItemFactory.create(parent_location=self.course.location, category='aawefawef')
self.store.publish(fake_xblock.location, self.user.id)
self._verify_export_failure(u'/container/{}'.format(self.course.location))
def test_export_failure_subsection_level(self):
"""
Slightly different export failure.
"""
vertical = ItemFactory.create(parent_location=self.course.location, category='vertical', display_name='foo')
ItemFactory.create(
parent_location=vertical.location,
category='aawefawef'
)
for url in [self.import_url, self.export_url]:
resp = client.get(url)
self.assertEquals(resp.status_code, 403)
self._verify_export_failure(u'/container/{}'.format(vertical.location))
def _verify_export_failure(self, expected_text):
""" Export failure helper method. """
resp = self.client.get(self.url, HTTP_ACCEPT='application/x-tgz')
self.assertEquals(resp.status_code, 200)
self.assertIsNone(resp.get('Content-Disposition'))
self.assertContains(resp, 'Unable to create xml for module')
self.assertContains(resp, expected_text)
def test_library_export(self):
"""
Verify that useable library data can be exported.
"""
youtube_id = "qS4NO9MNC6w"
library = LibraryFactory.create(modulestore=self.store)
video_block = ItemFactory.create(
category="video",
parent_location=library.location,
user_id=self.user.id,
publish_item=False,
youtube_id_1_0=youtube_id
)
name = library.url_name
lib_key = library.location.library_key
root_dir = path(tempfile.mkdtemp())
try:
export_library_to_xml(self.store, contentstore(), lib_key, root_dir, name)
# pylint: disable=no-member
lib_xml = lxml.etree.XML(open(root_dir / name / LIBRARY_ROOT).read())
self.assertEqual(lib_xml.get('org'), lib_key.org)
self.assertEqual(lib_xml.get('library'), lib_key.library)
block = lib_xml.find('video')
self.assertIsNotNone(block)
self.assertEqual(block.get('url_name'), video_block.url_name)
# pylint: disable=no-member
video_xml = lxml.etree.XML(open(root_dir / name / 'video' / video_block.url_name + '.xml').read())
self.assertEqual(video_xml.tag, 'video')
self.assertEqual(video_xml.get('youtube_id_1_0'), youtube_id)
finally:
shutil.rmtree(root_dir / name)
def test_export_success_with_custom_tag(self):
"""
Verify that course export with customtag
"""
xml_string = '<impl>slides</impl>'
vertical = ItemFactory.create(
parent_location=self.course.location, category='vertical', display_name='foo'
)
ItemFactory.create(
parent_location=vertical.location,
category='customtag',
display_name='custom_tag_foo',
data=xml_string
)
self.test_export_targz_urlparam()

View File

@@ -318,19 +318,6 @@ SESSION_INACTIVITY_TIMEOUT_IN_SECONDS = AUTH_TOKENS.get("SESSION_INACTIVITY_TIME
##### X-Frame-Options response header settings #####
X_FRAME_OPTIONS = ENV_TOKENS.get('X_FRAME_OPTIONS', X_FRAME_OPTIONS)
##### OAUTH2 Provider ##############
if FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
OAUTH_OIDC_ISSUER_PATH = ENV_TOKENS.get('OAUTH_OIDC_ISSUER_PATH', 'oauth2')
OAUTH_OIDC_ISSUER = ENV_TOKENS.get(
'OAUTH_OIDC_ISSUER',
'https://{0}/{1}'.format(
SITE_NAME,
OAUTH_OIDC_ISSUER_PATH
)
)
OAUTH_ENFORCE_SECURE = ENV_TOKENS.get('OAUTH_ENFORCE_SECURE', True)
OAUTH_ENFORCE_CLIENT_SECURE = ENV_TOKENS.get('OAUTH_ENFORCE_CLIENT_SECURE', True)
##### ADVANCED_SECURITY_CONFIG #####
ADVANCED_SECURITY_CONFIG = ENV_TOKENS.get('ADVANCED_SECURITY_CONFIG', {})

View File

@@ -71,9 +71,6 @@ FEATURES = {
'AUTH_USE_CERTIFICATES': False,
# Toggles OAuth2 authentication provider
'ENABLE_OAUTH2_PROVIDER': False,
# email address for studio staff (eg to request course creation)
'STUDIO_REQUEST_EMAIL': '',
@@ -206,29 +203,6 @@ sys.path.append(COMMON_ROOT / 'djangoapps')
GEOIP_PATH = REPO_ROOT / "common/static/data/geoip/GeoIP.dat"
GEOIPV6_PATH = REPO_ROOT / "common/static/data/geoip/GeoIPv6.dat"
############################ OAUTH2 Provider ###################################
# OpenID Connect issuer ID. Normally the URL of the authentication endpoint.
OAUTH_OIDC_ISSUER_PATH = 'oauth2'
OAUTH_OIDC_ISSUER = 'https:/example.com/oauth2'
# OpenID Connect claim handlers
OAUTH_OIDC_ID_TOKEN_HANDLERS = (
'oauth2_provider.oidc.handlers.BasicIDTokenHandler',
'oauth2_provider.oidc.handlers.ProfileHandler',
'oauth2_provider.oidc.handlers.EmailHandler',
'oauth2_handler.IDTokenHandler'
)
OAUTH_OIDC_USERINFO_HANDLERS = (
'oauth2_provider.oidc.handlers.BasicUserInfoHandler',
'oauth2_provider.oidc.handlers.ProfileHandler',
'oauth2_provider.oidc.handlers.EmailHandler',
'oauth2_handler.UserInfoHandler'
)
############################# WEB CONFIGURATION #############################
# This is where we stick our compiled template files.
import tempfile
@@ -274,8 +248,7 @@ LMS_BASE = None
# These are standard regexes for pulling out info like course_ids, usage_ids, etc.
# They are used so that URLs with deprecated-format strings still work.
from lms.envs.common import (
COURSE_KEY_PATTERN, COURSELIKE_KEY_PATTERN, COURSE_ID_PATTERN,
USAGE_KEY_PATTERN, ASSET_KEY_PATTERN
COURSE_KEY_PATTERN, COURSE_ID_PATTERN, USAGE_KEY_PATTERN, ASSET_KEY_PATTERN
)
######################### CSRF #########################################
@@ -751,11 +724,6 @@ INSTALLED_APPS = (
'static_replace',
'require',
# OAuth2 Provider
'provider',
'provider.oauth2',
'oauth2_provider',
# comment common
'django_comment_common',
@@ -792,10 +760,6 @@ INSTALLED_APPS = (
# Credit courses
'openedx.core.djangoapps.credit',
# Import/Export API
'rest_framework',
'openedx.core.djangoapps.import_export',
'xblock_django',
)

View File

@@ -33,30 +33,6 @@ else:
require(["js/factories/export"], function(ExportFactory) {
ExportFactory(hasUnit, editUnitUrl, courselikeHomeUrl, is_library, errMsg);
});
## Even though there isn't an export error, we should still show contextual
## error popups if supplied.
%elif raw_err_msg:
var errMsg = ${json.dumps(raw_err_msg)};
require(['gettext', 'js/views/feedback_prompt'], function(gettext, PromptView) {
dialog = new PromptView({
title: gettext('There has been an error.'),
message: errMsg,
intent: 'error',
actions: {
primary: {
text: gettext('Continue'),
click: function(view) {
view.hide();
}
}
}
});
$('body').addClass('js');
dialog.show();
});
%endif
</%block>

View File

@@ -53,7 +53,7 @@ else:
</div>
<form id="fileupload" method="post" action="${import_url}" enctype="multipart/form-data" class="import-form">
<form id="fileupload" method="post" enctype="multipart/form-data" class="import-form">
## Translators: ".tar.gz" is a file extension, and files with that extension are called "gzipped tar files": these terms should not be translated
<h2 class="title">

View File

@@ -7,6 +7,10 @@ admin.autodiscover()
# pylint: disable=bad-continuation
# Pattern to match a course key or a library key
COURSELIKE_KEY_PATTERN = r'(?P<course_key_string>({}|{}))'.format(
r'[^/]+/[^/]+/[^/]+', r'[^/:]+:[^/+]+\+[^/+]+(\+[^/]+)?'
)
# Pattern to match a library key only
LIBRARY_KEY_PATTERN = r'(?P<library_key_string>library-v1:[^/+]+\+[^/+]+)'
@@ -70,7 +74,7 @@ urlpatterns += patterns(
url(r'^signin$', 'login_page', name='login'),
url(r'^request_course_creator$', 'request_course_creator'),
url(r'^course_team/{}(?:/(?P<email>.+))?$'.format(settings.COURSELIKE_KEY_PATTERN), 'course_team_handler'),
url(r'^course_team/{}(?:/(?P<email>.+))?$'.format(COURSELIKE_KEY_PATTERN), 'course_team_handler'),
url(r'^course_info/{}$'.format(settings.COURSE_KEY_PATTERN), 'course_info_handler'),
url(
r'^course_info_update/{}/(?P<provided_id>\d+)?$'.format(settings.COURSE_KEY_PATTERN),
@@ -90,8 +94,9 @@ urlpatterns += patterns(
url(r'^checklists/{}/(?P<checklist_index>\d+)?$'.format(settings.COURSE_KEY_PATTERN), 'checklists_handler'),
url(r'^orphan/{}$'.format(settings.COURSE_KEY_PATTERN), 'orphan_handler'),
url(r'^assets/{}/{}?$'.format(settings.COURSE_KEY_PATTERN, settings.ASSET_KEY_PATTERN), 'assets_handler'),
url(r'^import/{}$'.format(settings.COURSELIKE_KEY_PATTERN), 'import_handler'),
url(r'^export/{}$'.format(settings.COURSELIKE_KEY_PATTERN), 'export_handler'),
url(r'^import/{}$'.format(COURSELIKE_KEY_PATTERN), 'import_handler'),
url(r'^import_status/{}/(?P<filename>.+)$'.format(COURSELIKE_KEY_PATTERN), 'import_status_handler'),
url(r'^export/{}$'.format(COURSELIKE_KEY_PATTERN), 'export_handler'),
url(r'^xblock/outline/{}$'.format(settings.USAGE_KEY_PATTERN), 'xblock_outline_handler'),
url(r'^xblock/container/{}$'.format(settings.USAGE_KEY_PATTERN), 'xblock_container_handler'),
url(r'^xblock/{}/(?P<view_name>[^/]+)$'.format(settings.USAGE_KEY_PATTERN), 'xblock_view_handler'),
@@ -107,11 +112,7 @@ urlpatterns += patterns(
url(r'^group_configurations/{}$'.format(settings.COURSE_KEY_PATTERN), 'group_configurations_list_handler'),
url(r'^group_configurations/{}/(?P<group_configuration_id>\d+)(/)?(?P<group_id>\d+)?$'.format(
settings.COURSE_KEY_PATTERN), 'group_configurations_detail_handler'),
url(r'^api/val/v0/', include('edxval.urls')),
# Import/Export API
url(r'^api/import_export/v1/', include('openedx.core.djangoapps.import_export.urls')),
)
JS_INFO_DICT = {
@@ -155,12 +156,6 @@ if settings.FEATURES.get('AUTH_USE_CAS'):
url(r'^cas-auth/logout/$', 'django_cas.views.logout', {'next_page': '/'}, name="cas-logout"),
)
if settings.FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
urlpatterns += (
url(r'^oauth2/', include('oauth2_provider.urls', namespace='oauth2')),
)
urlpatterns += patterns('', url(r'^admin/', include(admin.site.urls)),)
# enable automatic login