Merge pull request #14552 from edx/jmbowman/async_course_import
PLAT-1104 Import courses asynchronously
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -75,6 +75,10 @@ jscover.log.*
|
||||
.tddium*
|
||||
common/test/data/test_unicode/static/
|
||||
test_root/courses/
|
||||
test_root/data/test_bare.git/
|
||||
test_root/export_course_repos/
|
||||
test_root/paver_logs/
|
||||
test_root/uploads/
|
||||
django-pyfs
|
||||
|
||||
### Installation artifacts
|
||||
|
||||
23
cms/djangoapps/contentstore/storage.py
Normal file
23
cms/djangoapps/contentstore/storage.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
Storage backend for course import and export.
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.files.storage import get_storage_class
|
||||
|
||||
from storages.backends.s3boto import S3BotoStorage
|
||||
from storages.utils import setting
|
||||
|
||||
|
||||
class ImportExportS3Storage(S3BotoStorage): # pylint: disable=abstract-method
|
||||
"""
|
||||
S3 backend for course import and export OLX files.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
bucket = setting('COURSE_IMPORT_EXPORT_BUCKET', settings.AWS_STORAGE_BUCKET_NAME)
|
||||
super(ImportExportS3Storage, self).__init__(bucket=bucket, querystring_auth=True)
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
course_import_export_storage = get_storage_class(settings.COURSE_IMPORT_EXPORT_STORAGE)()
|
||||
@@ -1,24 +1,52 @@
|
||||
"""
|
||||
This file contains celery tasks for contentstore views
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tarfile
|
||||
from datetime import datetime
|
||||
|
||||
from celery.task import task
|
||||
from celery.utils.log import get_task_logger
|
||||
from datetime import datetime
|
||||
from path import Path as path
|
||||
from pytz import UTC
|
||||
from six import iteritems, text_type
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.test import RequestFactory
|
||||
from django.utils.text import get_valid_filename
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from djcelery.common import respect_language
|
||||
from user_tasks.tasks import UserTask
|
||||
|
||||
import dogstats_wrapper as dog_stats_api
|
||||
from contentstore.courseware_index import CoursewareSearchIndexer, LibrarySearchIndexer, SearchIndexingError
|
||||
from contentstore.storage import course_import_export_storage
|
||||
from contentstore.utils import initialize_permissions
|
||||
from course_action_state.models import CourseRerunState
|
||||
from models.settings.course_metadata import CourseMetadata
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from opaque_keys.edx.locator import LibraryLocator
|
||||
from openedx.core.lib.extract_tar import safetar_extractall
|
||||
from student.auth import has_course_author_access
|
||||
from xmodule.contentstore.django import contentstore
|
||||
from xmodule.course_module import CourseFields
|
||||
from xmodule.modulestore import COURSE_ROOT, LIBRARY_ROOT
|
||||
from xmodule.modulestore.django import modulestore
|
||||
from xmodule.modulestore.exceptions import DuplicateCourseError, ItemNotFoundError
|
||||
from xmodule.modulestore.xml_importer import import_course_from_xml, import_library_from_xml
|
||||
|
||||
|
||||
LOGGER = get_task_logger(__name__)
|
||||
FILE_READ_CHUNK = 1024 # bytes
|
||||
FULL_COURSE_REINDEX_THRESHOLD = 1
|
||||
|
||||
|
||||
@@ -30,10 +58,10 @@ def rerun_course(source_course_key_string, destination_course_key_string, user_i
|
||||
# import here, at top level this import prevents the celery workers from starting up correctly
|
||||
from edxval.api import copy_course_videos
|
||||
|
||||
source_course_key = CourseKey.from_string(source_course_key_string)
|
||||
destination_course_key = CourseKey.from_string(destination_course_key_string)
|
||||
try:
|
||||
# deserialize the payload
|
||||
source_course_key = CourseKey.from_string(source_course_key_string)
|
||||
destination_course_key = CourseKey.from_string(destination_course_key_string)
|
||||
fields = deserialize_fields(fields) if fields else None
|
||||
|
||||
# use the split modulestore as the store for the rerun course,
|
||||
@@ -53,17 +81,17 @@ def rerun_course(source_course_key_string, destination_course_key_string, user_i
|
||||
|
||||
return "succeeded"
|
||||
|
||||
except DuplicateCourseError as exc:
|
||||
except DuplicateCourseError:
|
||||
# do NOT delete the original course, only update the status
|
||||
CourseRerunState.objects.failed(course_key=destination_course_key)
|
||||
logging.exception(u'Course Rerun Error')
|
||||
LOGGER.exception(u'Course Rerun Error')
|
||||
return "duplicate course"
|
||||
|
||||
# catch all exceptions so we can update the state and properly cleanup the course.
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
# update state: Failed
|
||||
CourseRerunState.objects.failed(course_key=destination_course_key)
|
||||
logging.exception(u'Course Rerun Error')
|
||||
LOGGER.exception(u'Course Rerun Error')
|
||||
|
||||
try:
|
||||
# cleanup any remnants of the course
|
||||
@@ -72,12 +100,12 @@ def rerun_course(source_course_key_string, destination_course_key_string, user_i
|
||||
# it's possible there was an error even before the course module was created
|
||||
pass
|
||||
|
||||
return "exception: " + unicode(exc)
|
||||
return u"exception: " + text_type(exc)
|
||||
|
||||
|
||||
def deserialize_fields(json_fields):
|
||||
fields = json.loads(json_fields)
|
||||
for field_name, value in fields.iteritems():
|
||||
for field_name, value in iteritems(fields):
|
||||
fields[field_name] = getattr(CourseFields, field_name).from_json(value)
|
||||
return fields
|
||||
|
||||
@@ -99,9 +127,9 @@ def update_search_index(course_id, triggered_time_isoformat):
|
||||
CoursewareSearchIndexer.index(modulestore(), course_key, triggered_at=(_parse_time(triggered_time_isoformat)))
|
||||
|
||||
except SearchIndexingError as exc:
|
||||
LOGGER.error('Search indexing error for complete course %s - %s', course_id, unicode(exc))
|
||||
LOGGER.error(u'Search indexing error for complete course %s - %s', course_id, text_type(exc))
|
||||
else:
|
||||
LOGGER.debug('Search indexing successful for complete course %s', course_id)
|
||||
LOGGER.debug(u'Search indexing successful for complete course %s', course_id)
|
||||
|
||||
|
||||
@task()
|
||||
@@ -112,9 +140,9 @@ def update_library_index(library_id, triggered_time_isoformat):
|
||||
LibrarySearchIndexer.index(modulestore(), library_key, triggered_at=(_parse_time(triggered_time_isoformat)))
|
||||
|
||||
except SearchIndexingError as exc:
|
||||
LOGGER.error('Search indexing error for library %s - %s', library_id, unicode(exc))
|
||||
LOGGER.error(u'Search indexing error for library %s - %s', library_id, text_type(exc))
|
||||
else:
|
||||
LOGGER.debug('Search indexing successful for library %s', library_id)
|
||||
LOGGER.debug(u'Search indexing successful for library %s', library_id)
|
||||
|
||||
|
||||
@task()
|
||||
@@ -125,3 +153,218 @@ def push_course_update_task(course_key_string, course_subscription_id, course_di
|
||||
# TODO Use edx-notifications library instead (MA-638).
|
||||
from .push_notification import send_push_course_update
|
||||
send_push_course_update(course_key_string, course_subscription_id, course_display_name)
|
||||
|
||||
|
||||
class CourseImportTask(UserTask): # pylint: disable=abstract-method
|
||||
"""
|
||||
Base class for course and library import tasks.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def calculate_total_steps(arguments_dict):
|
||||
"""
|
||||
Get the number of in-progress steps in the import process, as shown in the UI.
|
||||
|
||||
For reference, these are:
|
||||
|
||||
1. Unpacking
|
||||
2. Verifying
|
||||
3. Updating
|
||||
"""
|
||||
return 3
|
||||
|
||||
@classmethod
|
||||
def generate_name(cls, arguments_dict):
|
||||
"""
|
||||
Create a name for this particular import task instance.
|
||||
|
||||
Arguments:
|
||||
arguments_dict (dict): The arguments given to the task function
|
||||
|
||||
Returns:
|
||||
text_type: The generated name
|
||||
"""
|
||||
key = arguments_dict[u'course_key_string']
|
||||
filename = arguments_dict[u'archive_name']
|
||||
return u'Import of {} from {}'.format(key, filename)
|
||||
|
||||
|
||||
@task(base=CourseImportTask, bind=True)
|
||||
def import_olx(self, user_id, course_key_string, archive_path, archive_name, language):
|
||||
"""
|
||||
Import a course or library from a provided OLX .tar.gz archive.
|
||||
"""
|
||||
courselike_key = CourseKey.from_string(course_key_string)
|
||||
try:
|
||||
user = User.objects.get(pk=user_id)
|
||||
except User.DoesNotExist:
|
||||
with respect_language(language):
|
||||
self.status.fail(_(u'Unknown User ID: {0}').format(user_id))
|
||||
return
|
||||
if not has_course_author_access(user, courselike_key):
|
||||
with respect_language(language):
|
||||
self.status.fail(_(u'Permission denied'))
|
||||
return
|
||||
|
||||
is_library = isinstance(courselike_key, LibraryLocator)
|
||||
is_course = not is_library
|
||||
if is_library:
|
||||
root_name = LIBRARY_ROOT
|
||||
courselike_module = modulestore().get_library(courselike_key)
|
||||
import_func = import_library_from_xml
|
||||
else:
|
||||
root_name = COURSE_ROOT
|
||||
courselike_module = modulestore().get_course(courselike_key)
|
||||
import_func = import_course_from_xml
|
||||
|
||||
# Locate the uploaded OLX archive (and download it from S3 if necessary)
|
||||
# Do everything in a try-except block to make sure everything is properly cleaned up.
|
||||
data_root = path(settings.GITHUB_REPO_ROOT)
|
||||
subdir = base64.urlsafe_b64encode(repr(courselike_key))
|
||||
course_dir = data_root / subdir
|
||||
try:
|
||||
self.status.set_state(u'Unpacking')
|
||||
|
||||
if not archive_name.endswith(u'.tar.gz'):
|
||||
with respect_language(language):
|
||||
self.status.fail(_(u'We only support uploading a .tar.gz file.'))
|
||||
return
|
||||
|
||||
temp_filepath = course_dir / get_valid_filename(archive_name)
|
||||
if not course_dir.isdir(): # pylint: disable=no-value-for-parameter
|
||||
os.mkdir(course_dir)
|
||||
|
||||
LOGGER.debug(u'importing course to {0}'.format(temp_filepath))
|
||||
|
||||
# Copy the OLX archive from where it was uploaded to (S3, Swift, file system, etc.)
|
||||
if not course_import_export_storage.exists(archive_path):
|
||||
LOGGER.info(u'Course import %s: Uploaded file %s not found', courselike_key, archive_path)
|
||||
with respect_language(language):
|
||||
self.status.fail(_(u'Tar file not found'))
|
||||
return
|
||||
with course_import_export_storage.open(archive_path, 'rb') as source:
|
||||
with open(temp_filepath, 'wb') as destination:
|
||||
def read_chunk():
|
||||
"""
|
||||
Read and return a sequence of bytes from the source file.
|
||||
"""
|
||||
return source.read(FILE_READ_CHUNK)
|
||||
for chunk in iter(read_chunk, b''):
|
||||
destination.write(chunk)
|
||||
LOGGER.info(u'Course import %s: Download from storage complete', courselike_key)
|
||||
# Delete from source location
|
||||
course_import_export_storage.delete(archive_path)
|
||||
|
||||
# If the course has an entrance exam then remove it and its corresponding milestone.
|
||||
# current course state before import.
|
||||
if is_course:
|
||||
if courselike_module.entrance_exam_enabled:
|
||||
fake_request = RequestFactory().get(u'/')
|
||||
fake_request.user = user
|
||||
from contentstore.views.entrance_exam import remove_entrance_exam_milestone_reference
|
||||
# TODO: Is this really ok? Seems dangerous for a live course
|
||||
remove_entrance_exam_milestone_reference(fake_request, courselike_key)
|
||||
LOGGER.info(
|
||||
u'entrance exam milestone content reference for course %s has been removed',
|
||||
courselike_module.id
|
||||
)
|
||||
# Send errors to client with stage at which error occurred.
|
||||
except Exception as exception: # pylint: disable=broad-except
|
||||
if course_dir.isdir(): # pylint: disable=no-value-for-parameter
|
||||
shutil.rmtree(course_dir)
|
||||
LOGGER.info(u'Course import %s: Temp data cleared', courselike_key)
|
||||
|
||||
LOGGER.exception(u'Error importing course %s', courselike_key)
|
||||
self.status.fail(text_type(exception))
|
||||
return
|
||||
|
||||
# try-finally block for proper clean up after receiving file.
|
||||
try:
|
||||
tar_file = tarfile.open(temp_filepath)
|
||||
try:
|
||||
safetar_extractall(tar_file, (course_dir + u'/').encode(u'utf-8'))
|
||||
except SuspiciousOperation as exc:
|
||||
LOGGER.info(u'Course import %s: Unsafe tar file - %s', courselike_key, exc.args[0])
|
||||
with respect_language(language):
|
||||
self.status.fail(_(u'Unsafe tar file. Aborting import.'))
|
||||
return
|
||||
finally:
|
||||
tar_file.close()
|
||||
|
||||
LOGGER.info(u'Course import %s: Uploaded file extracted', courselike_key)
|
||||
self.status.set_state(u'Verifying')
|
||||
self.status.increment_completed_steps()
|
||||
|
||||
# find the 'course.xml' file
|
||||
def get_all_files(directory):
|
||||
"""
|
||||
For each file in the directory, yield a 2-tuple of (file-name,
|
||||
directory-path)
|
||||
"""
|
||||
for directory_path, _dirnames, filenames in os.walk(directory):
|
||||
for filename in filenames:
|
||||
yield (filename, directory_path)
|
||||
|
||||
def get_dir_for_filename(directory, filename):
|
||||
"""
|
||||
Returns the directory path for the first file found in the directory
|
||||
with the given name. If there is no file in the directory with
|
||||
the specified name, return None.
|
||||
"""
|
||||
for name, directory_path in get_all_files(directory):
|
||||
if name == filename:
|
||||
return directory_path
|
||||
return None
|
||||
|
||||
dirpath = get_dir_for_filename(course_dir, root_name)
|
||||
if not dirpath:
|
||||
with respect_language(language):
|
||||
self.status.fail(_(u'Could not find the {0} file in the package.').format(root_name))
|
||||
return
|
||||
|
||||
dirpath = os.path.relpath(dirpath, data_root)
|
||||
LOGGER.debug(u'found %s at %s', root_name, dirpath)
|
||||
|
||||
LOGGER.info(u'Course import %s: Extracted file verified', courselike_key)
|
||||
self.status.set_state(u'Updating')
|
||||
self.status.increment_completed_steps()
|
||||
|
||||
with dog_stats_api.timer(
|
||||
u'courselike_import.time',
|
||||
tags=[u"courselike:{}".format(courselike_key)]
|
||||
):
|
||||
courselike_items = import_func(
|
||||
modulestore(), user.id,
|
||||
settings.GITHUB_REPO_ROOT, [dirpath],
|
||||
load_error_modules=False,
|
||||
static_content_store=contentstore(),
|
||||
target_id=courselike_key
|
||||
)
|
||||
|
||||
new_location = courselike_items[0].location
|
||||
LOGGER.debug(u'new course at %s', new_location)
|
||||
|
||||
LOGGER.info(u'Course import %s: Course import successful', courselike_key)
|
||||
except Exception as exception: # pylint: disable=broad-except
|
||||
LOGGER.exception(u'error importing course')
|
||||
self.status.fail(text_type(exception))
|
||||
finally:
|
||||
if course_dir.isdir(): # pylint: disable=no-value-for-parameter
|
||||
shutil.rmtree(course_dir)
|
||||
LOGGER.info(u'Course import %s: Temp data cleared', courselike_key)
|
||||
|
||||
if self.status.state == u'Updating' and is_course:
|
||||
# Reload the course so we have the latest state
|
||||
course = modulestore().get_course(courselike_key)
|
||||
if course.entrance_exam_enabled:
|
||||
entrance_exam_chapter = modulestore().get_items(
|
||||
course.id,
|
||||
qualifiers={u'category': u'chapter'},
|
||||
settings={u'is_entrance_exam': True}
|
||||
)[0]
|
||||
|
||||
metadata = {u'entrance_exam_id': text_type(entrance_exam_chapter.location)}
|
||||
CourseMetadata.update_from_dict(metadata, course, user)
|
||||
from contentstore.views.entrance_exam import add_entrance_exam_milestone
|
||||
add_entrance_exam_milestone(course.id, entrance_exam_chapter)
|
||||
LOGGER.info(u'Course %s Entrance exam imported', course.id)
|
||||
|
||||
@@ -11,37 +11,36 @@ import tarfile
|
||||
from path import Path as path
|
||||
from tempfile import mkdtemp
|
||||
|
||||
from six import text_type
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.core.exceptions import SuspiciousOperation, PermissionDenied
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.core.files import File
|
||||
from django.core.files.temp import NamedTemporaryFile
|
||||
from django.core.servers.basehttp import FileWrapper
|
||||
from django.db import transaction
|
||||
from django.http import HttpResponse, HttpResponseNotFound, Http404
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.views.decorators.csrf import ensure_csrf_cookie
|
||||
from django.views.decorators.http import require_http_methods, require_GET
|
||||
|
||||
import dogstats_wrapper as dog_stats_api
|
||||
from edxmako.shortcuts import render_to_response
|
||||
from xmodule.contentstore.django import contentstore
|
||||
from xmodule.exceptions import SerializationError
|
||||
from xmodule.modulestore.django import modulestore
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from opaque_keys.edx.locator import LibraryLocator
|
||||
from xmodule.modulestore.xml_importer import import_course_from_xml, import_library_from_xml
|
||||
from user_tasks.conf import settings as user_tasks_settings
|
||||
from user_tasks.models import UserTaskStatus
|
||||
from xmodule.modulestore.xml_exporter import export_course_to_xml, export_library_to_xml
|
||||
from xmodule.modulestore import COURSE_ROOT, LIBRARY_ROOT
|
||||
|
||||
from student.auth import has_course_author_access
|
||||
|
||||
from openedx.core.lib.extract_tar import safetar_extractall
|
||||
from util.json_request import JsonResponse
|
||||
from util.views import ensure_valid_course_key
|
||||
from models.settings.course_metadata import CourseMetadata
|
||||
from contentstore.views.entrance_exam import (
|
||||
add_entrance_exam_milestone,
|
||||
remove_entrance_exam_milestone_reference
|
||||
)
|
||||
from contentstore.storage import course_import_export_storage
|
||||
from contentstore.tasks import CourseImportTask, import_olx
|
||||
|
||||
from contentstore.utils import reverse_course_url, reverse_usage_url, reverse_library_url
|
||||
|
||||
@@ -58,7 +57,10 @@ log = logging.getLogger(__name__)
|
||||
# Regex to capture Content-Range header ranges.
|
||||
CONTENT_RE = re.compile(r"(?P<start>\d{1,11})-(?P<stop>\d{1,11})/(?P<end>\d{1,11})")
|
||||
|
||||
STATUS_FILTERS = user_tasks_settings.USER_TASKS_STATUS_FILTERS
|
||||
|
||||
|
||||
@transaction.non_atomic_requests
|
||||
@login_required
|
||||
@ensure_csrf_cookie
|
||||
@require_http_methods(("GET", "POST", "PUT"))
|
||||
@@ -76,26 +78,13 @@ def import_handler(request, course_key_string):
|
||||
courselike_key = CourseKey.from_string(course_key_string)
|
||||
library = isinstance(courselike_key, LibraryLocator)
|
||||
if library:
|
||||
root_name = LIBRARY_ROOT
|
||||
successful_url = reverse_library_url('library_handler', courselike_key)
|
||||
context_name = 'context_library'
|
||||
courselike_module = modulestore().get_library(courselike_key)
|
||||
import_func = import_library_from_xml
|
||||
else:
|
||||
root_name = COURSE_ROOT
|
||||
successful_url = reverse_course_url('course_handler', courselike_key)
|
||||
context_name = 'context_course'
|
||||
courselike_module = modulestore().get_course(courselike_key)
|
||||
import_func = import_course_from_xml
|
||||
return _import_handler(
|
||||
request, courselike_key, root_name, successful_url, context_name, courselike_module, import_func
|
||||
)
|
||||
|
||||
|
||||
def _import_handler(request, courselike_key, root_name, successful_url, context_name, courselike_module, import_func):
|
||||
"""
|
||||
Parameterized function containing the meat of import_handler.
|
||||
"""
|
||||
if not has_course_author_access(request.user, courselike_key):
|
||||
raise PermissionDenied()
|
||||
|
||||
@@ -103,235 +92,7 @@ def _import_handler(request, courselike_key, root_name, successful_url, context_
|
||||
if request.method == 'GET':
|
||||
raise NotImplementedError('coming soon')
|
||||
else:
|
||||
# Do everything in a try-except block to make sure everything is properly cleaned up.
|
||||
try:
|
||||
data_root = path(settings.GITHUB_REPO_ROOT)
|
||||
subdir = base64.urlsafe_b64encode(repr(courselike_key))
|
||||
course_dir = data_root / subdir
|
||||
filename = request.FILES['course-data'].name
|
||||
|
||||
# Use sessions to keep info about import progress
|
||||
session_status = request.session.setdefault("import_status", {})
|
||||
courselike_string = unicode(courselike_key) + filename
|
||||
_save_request_status(request, courselike_string, 0)
|
||||
|
||||
# If the course has an entrance exam then remove it and its corresponding milestone.
|
||||
# current course state before import.
|
||||
if root_name == COURSE_ROOT:
|
||||
if courselike_module.entrance_exam_enabled:
|
||||
remove_entrance_exam_milestone_reference(request, courselike_key)
|
||||
log.info(
|
||||
"entrance exam milestone content reference for course %s has been removed",
|
||||
courselike_module.id
|
||||
)
|
||||
|
||||
if not filename.endswith('.tar.gz'):
|
||||
_save_request_status(request, courselike_string, -1)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': _('We only support uploading a .tar.gz file.'),
|
||||
'Stage': -1
|
||||
},
|
||||
status=415
|
||||
)
|
||||
|
||||
temp_filepath = course_dir / filename
|
||||
if not course_dir.isdir():
|
||||
os.mkdir(course_dir)
|
||||
|
||||
logging.debug('importing course to {0}'.format(temp_filepath))
|
||||
|
||||
# Get upload chunks byte ranges
|
||||
try:
|
||||
matches = CONTENT_RE.search(request.META["HTTP_CONTENT_RANGE"])
|
||||
content_range = matches.groupdict()
|
||||
except KeyError: # Single chunk
|
||||
# no Content-Range header, so make one that will work
|
||||
content_range = {'start': 0, 'stop': 1, 'end': 2}
|
||||
|
||||
# stream out the uploaded files in chunks to disk
|
||||
if int(content_range['start']) == 0:
|
||||
mode = "wb+"
|
||||
else:
|
||||
mode = "ab+"
|
||||
size = os.path.getsize(temp_filepath)
|
||||
# Check to make sure we haven't missed a chunk
|
||||
# This shouldn't happen, even if different instances are handling
|
||||
# the same session, but it's always better to catch errors earlier.
|
||||
if size < int(content_range['start']):
|
||||
_save_request_status(request, courselike_string, -1)
|
||||
log.warning(
|
||||
"Reported range %s does not match size downloaded so far %s",
|
||||
content_range['start'],
|
||||
size
|
||||
)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': _('File upload corrupted. Please try again'),
|
||||
'Stage': -1
|
||||
},
|
||||
status=409
|
||||
)
|
||||
# The last request sometimes comes twice. This happens because
|
||||
# nginx sends a 499 error code when the response takes too long.
|
||||
elif size > int(content_range['stop']) and size == int(content_range['end']):
|
||||
return JsonResponse({'ImportStatus': 1})
|
||||
|
||||
with open(temp_filepath, mode) as temp_file:
|
||||
for chunk in request.FILES['course-data'].chunks():
|
||||
temp_file.write(chunk)
|
||||
|
||||
size = os.path.getsize(temp_filepath)
|
||||
|
||||
if int(content_range['stop']) != int(content_range['end']) - 1:
|
||||
# More chunks coming
|
||||
return JsonResponse({
|
||||
"files": [{
|
||||
"name": filename,
|
||||
"size": size,
|
||||
"deleteUrl": "",
|
||||
"deleteType": "",
|
||||
"url": reverse_course_url('import_handler', courselike_key),
|
||||
"thumbnailUrl": ""
|
||||
}]
|
||||
})
|
||||
# Send errors to client with stage at which error occurred.
|
||||
except Exception as exception: # pylint: disable=broad-except
|
||||
_save_request_status(request, courselike_string, -1)
|
||||
if course_dir.isdir():
|
||||
shutil.rmtree(course_dir)
|
||||
log.info("Course import %s: Temp data cleared", courselike_key)
|
||||
|
||||
log.exception(
|
||||
"error importing course"
|
||||
)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': str(exception),
|
||||
'Stage': -1
|
||||
},
|
||||
status=400
|
||||
)
|
||||
|
||||
# try-finally block for proper clean up after receiving last chunk.
|
||||
try:
|
||||
# This was the last chunk.
|
||||
log.info("Course import %s: Upload complete", courselike_key)
|
||||
_save_request_status(request, courselike_string, 1)
|
||||
|
||||
tar_file = tarfile.open(temp_filepath)
|
||||
try:
|
||||
safetar_extractall(tar_file, (course_dir + '/').encode('utf-8'))
|
||||
except SuspiciousOperation as exc:
|
||||
_save_request_status(request, courselike_string, -1)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': 'Unsafe tar file. Aborting import.',
|
||||
'SuspiciousFileOperationMsg': exc.args[0],
|
||||
'Stage': -1
|
||||
},
|
||||
status=400
|
||||
)
|
||||
finally:
|
||||
tar_file.close()
|
||||
|
||||
log.info("Course import %s: Uploaded file extracted", courselike_key)
|
||||
_save_request_status(request, courselike_string, 2)
|
||||
|
||||
# find the 'course.xml' file
|
||||
def get_all_files(directory):
|
||||
"""
|
||||
For each file in the directory, yield a 2-tuple of (file-name,
|
||||
directory-path)
|
||||
"""
|
||||
for dirpath, _dirnames, filenames in os.walk(directory):
|
||||
for filename in filenames:
|
||||
yield (filename, dirpath)
|
||||
|
||||
def get_dir_for_fname(directory, filename):
|
||||
"""
|
||||
Returns the dirpath for the first file found in the directory
|
||||
with the given name. If there is no file in the directory with
|
||||
the specified name, return None.
|
||||
"""
|
||||
for fname, dirpath in get_all_files(directory):
|
||||
if fname == filename:
|
||||
return dirpath
|
||||
return None
|
||||
|
||||
dirpath = get_dir_for_fname(course_dir, root_name)
|
||||
if not dirpath:
|
||||
_save_request_status(request, courselike_string, -2)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': _('Could not find the {0} file in the package.').format(root_name),
|
||||
'Stage': -2
|
||||
},
|
||||
status=415
|
||||
)
|
||||
|
||||
dirpath = os.path.relpath(dirpath, data_root)
|
||||
logging.debug('found %s at %s', root_name, dirpath)
|
||||
|
||||
log.info("Course import %s: Extracted file verified", courselike_key)
|
||||
_save_request_status(request, courselike_string, 3)
|
||||
|
||||
with dog_stats_api.timer(
|
||||
'courselike_import.time',
|
||||
tags=[u"courselike:{}".format(courselike_key)]
|
||||
):
|
||||
courselike_items = import_func(
|
||||
modulestore(), request.user.id,
|
||||
settings.GITHUB_REPO_ROOT, [dirpath],
|
||||
load_error_modules=False,
|
||||
static_content_store=contentstore(),
|
||||
target_id=courselike_key
|
||||
)
|
||||
|
||||
new_location = courselike_items[0].location
|
||||
logging.debug('new course at %s', new_location)
|
||||
|
||||
log.info("Course import %s: Course import successful", courselike_key)
|
||||
_save_request_status(request, courselike_string, 4)
|
||||
|
||||
# Send errors to client with stage at which error occurred.
|
||||
except Exception as exception: # pylint: disable=broad-except
|
||||
log.exception(
|
||||
"error importing course"
|
||||
)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': str(exception),
|
||||
'Stage': -session_status[courselike_string]
|
||||
},
|
||||
status=400
|
||||
)
|
||||
|
||||
finally:
|
||||
if course_dir.isdir():
|
||||
shutil.rmtree(course_dir)
|
||||
log.info("Course import %s: Temp data cleared", courselike_key)
|
||||
# set failed stage number with negative sign in case of unsuccessful import
|
||||
if session_status[courselike_string] != 4:
|
||||
_save_request_status(request, courselike_string, -abs(session_status[courselike_string]))
|
||||
|
||||
# status == 4 represents that course has been imported successfully.
|
||||
if session_status[courselike_string] == 4 and root_name == COURSE_ROOT:
|
||||
# Reload the course so we have the latest state
|
||||
course = modulestore().get_course(courselike_key)
|
||||
if course.entrance_exam_enabled:
|
||||
entrance_exam_chapter = modulestore().get_items(
|
||||
course.id,
|
||||
qualifiers={'category': 'chapter'},
|
||||
settings={'is_entrance_exam': True}
|
||||
)[0]
|
||||
|
||||
metadata = {'entrance_exam_id': unicode(entrance_exam_chapter.location)}
|
||||
CourseMetadata.update_from_dict(metadata, course, request.user)
|
||||
add_entrance_exam_milestone(course.id, entrance_exam_chapter)
|
||||
log.info("Course %s Entrance exam imported", course.id)
|
||||
|
||||
return JsonResponse({'Status': 'OK'})
|
||||
return _write_chunk(request, courselike_key)
|
||||
elif request.method == 'GET': # assume html
|
||||
status_url = reverse_course_url(
|
||||
"import_status_handler", courselike_key, kwargs={'filename': "fillerName"}
|
||||
@@ -358,6 +119,122 @@ def _save_request_status(request, key, status):
|
||||
request.session.save()
|
||||
|
||||
|
||||
def _write_chunk(request, courselike_key):
|
||||
"""
|
||||
Write the OLX file data chunk from the given request to the local filesystem.
|
||||
"""
|
||||
# Upload .tar.gz to local filesystem for one-server installations not using S3 or Swift
|
||||
data_root = path(settings.GITHUB_REPO_ROOT)
|
||||
subdir = base64.urlsafe_b64encode(repr(courselike_key))
|
||||
course_dir = data_root / subdir
|
||||
filename = request.FILES['course-data'].name
|
||||
|
||||
courselike_string = text_type(courselike_key) + filename
|
||||
# Do everything in a try-except block to make sure everything is properly cleaned up.
|
||||
try:
|
||||
# Use sessions to keep info about import progress
|
||||
_save_request_status(request, courselike_string, 0)
|
||||
|
||||
if not filename.endswith('.tar.gz'):
|
||||
_save_request_status(request, courselike_string, -1)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': _('We only support uploading a .tar.gz file.'),
|
||||
'Stage': -1
|
||||
},
|
||||
status=415
|
||||
)
|
||||
|
||||
temp_filepath = course_dir / filename
|
||||
if not course_dir.isdir(): # pylint: disable=no-value-for-parameter
|
||||
os.mkdir(course_dir)
|
||||
|
||||
logging.debug('importing course to {0}'.format(temp_filepath))
|
||||
|
||||
# Get upload chunks byte ranges
|
||||
try:
|
||||
matches = CONTENT_RE.search(request.META["HTTP_CONTENT_RANGE"])
|
||||
content_range = matches.groupdict()
|
||||
except KeyError: # Single chunk
|
||||
# no Content-Range header, so make one that will work
|
||||
content_range = {'start': 0, 'stop': 1, 'end': 2}
|
||||
|
||||
# stream out the uploaded files in chunks to disk
|
||||
if int(content_range['start']) == 0:
|
||||
mode = "wb+"
|
||||
else:
|
||||
mode = "ab+"
|
||||
size = os.path.getsize(temp_filepath)
|
||||
# Check to make sure we haven't missed a chunk
|
||||
# This shouldn't happen, even if different instances are handling
|
||||
# the same session, but it's always better to catch errors earlier.
|
||||
if size < int(content_range['start']):
|
||||
_save_request_status(request, courselike_string, -1)
|
||||
log.warning(
|
||||
"Reported range %s does not match size downloaded so far %s",
|
||||
content_range['start'],
|
||||
size
|
||||
)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': _('File upload corrupted. Please try again'),
|
||||
'Stage': -1
|
||||
},
|
||||
status=409
|
||||
)
|
||||
# The last request sometimes comes twice. This happens because
|
||||
# nginx sends a 499 error code when the response takes too long.
|
||||
elif size > int(content_range['stop']) and size == int(content_range['end']):
|
||||
return JsonResponse({'ImportStatus': 1})
|
||||
|
||||
with open(temp_filepath, mode) as temp_file:
|
||||
for chunk in request.FILES['course-data'].chunks():
|
||||
temp_file.write(chunk)
|
||||
|
||||
size = os.path.getsize(temp_filepath)
|
||||
|
||||
if int(content_range['stop']) != int(content_range['end']) - 1:
|
||||
# More chunks coming
|
||||
return JsonResponse({
|
||||
"files": [{
|
||||
"name": filename,
|
||||
"size": size,
|
||||
"deleteUrl": "",
|
||||
"deleteType": "",
|
||||
"url": reverse_course_url('import_handler', courselike_key),
|
||||
"thumbnailUrl": ""
|
||||
}]
|
||||
})
|
||||
|
||||
log.info("Course import %s: Upload complete", courselike_key)
|
||||
with open(temp_filepath, 'rb') as local_file:
|
||||
django_file = File(local_file)
|
||||
storage_path = course_import_export_storage.save(u'olx_import/' + filename, django_file)
|
||||
import_olx.delay(
|
||||
request.user.id, text_type(courselike_key), storage_path, filename, request.LANGUAGE_CODE)
|
||||
|
||||
# Send errors to client with stage at which error occurred.
|
||||
except Exception as exception: # pylint: disable=broad-except
|
||||
_save_request_status(request, courselike_string, -1)
|
||||
if course_dir.isdir(): # pylint: disable=no-value-for-parameter
|
||||
shutil.rmtree(course_dir)
|
||||
log.info("Course import %s: Temp data cleared", courselike_key)
|
||||
|
||||
log.exception(
|
||||
"error importing course"
|
||||
)
|
||||
return JsonResponse(
|
||||
{
|
||||
'ErrMsg': str(exception),
|
||||
'Stage': -1
|
||||
},
|
||||
status=400
|
||||
)
|
||||
|
||||
return JsonResponse({'ImportStatus': 1})
|
||||
|
||||
|
||||
@transaction.non_atomic_requests
|
||||
@require_GET
|
||||
@ensure_csrf_cookie
|
||||
@login_required
|
||||
@@ -368,9 +245,9 @@ def import_status_handler(request, course_key_string, filename=None):
|
||||
|
||||
-X : Import unsuccessful due to some error with X as stage [0-3]
|
||||
0 : No status info found (import done or upload still in progress)
|
||||
1 : Extracting file
|
||||
2 : Validating.
|
||||
3 : Importing to mongo
|
||||
1 : Unpacking
|
||||
2 : Verifying
|
||||
3 : Updating
|
||||
4 : Import successful
|
||||
|
||||
"""
|
||||
@@ -378,11 +255,26 @@ def import_status_handler(request, course_key_string, filename=None):
|
||||
if not has_course_author_access(request.user, course_key):
|
||||
raise PermissionDenied()
|
||||
|
||||
try:
|
||||
session_status = request.session["import_status"]
|
||||
status = session_status[course_key_string + filename]
|
||||
except KeyError:
|
||||
status = 0
|
||||
# The task status record is authoritative once it's been created
|
||||
args = {u'course_key_string': course_key_string, u'archive_name': filename}
|
||||
name = CourseImportTask.generate_name(args)
|
||||
task_status = UserTaskStatus.objects.filter(name=name)
|
||||
for status_filter in STATUS_FILTERS:
|
||||
task_status = status_filter().filter_queryset(request, task_status, import_status_handler)
|
||||
task_status = task_status.order_by(u'-created').first()
|
||||
if task_status is None:
|
||||
# The task hasn't been initialized yet; did we store info in the session already?
|
||||
try:
|
||||
session_status = request.session["import_status"]
|
||||
status = session_status[course_key_string + filename]
|
||||
except KeyError:
|
||||
status = 0
|
||||
elif task_status.state == UserTaskStatus.SUCCEEDED:
|
||||
status = 4
|
||||
elif task_status.state in (UserTaskStatus.FAILED, UserTaskStatus.CANCELED):
|
||||
status = max(-(task_status.completed_steps + 1), -3)
|
||||
else:
|
||||
status = min(task_status.completed_steps + 1, 3)
|
||||
|
||||
return JsonResponse({"ImportStatus": status})
|
||||
|
||||
@@ -456,6 +348,7 @@ def send_tarball(tarball):
|
||||
return response
|
||||
|
||||
|
||||
@transaction.non_atomic_requests
|
||||
@ensure_csrf_cookie
|
||||
@login_required
|
||||
@require_http_methods(("GET",))
|
||||
|
||||
@@ -184,7 +184,7 @@ class ImportTestCase(CourseTestCase):
|
||||
"name": self.bad_tar,
|
||||
"course-data": [btar]
|
||||
})
|
||||
self.assertEquals(resp.status_code, 415)
|
||||
self.assertEquals(resp.status_code, 200)
|
||||
# Check that `import_status` returns the appropriate stage (i.e., the
|
||||
# stage at which import failed).
|
||||
resp_status = self.client.get(
|
||||
@@ -336,8 +336,16 @@ class ImportTestCase(CourseTestCase):
|
||||
with open(tarpath) as tar:
|
||||
args = {"name": tarpath, "course-data": [tar]}
|
||||
resp = self.client.post(self.url, args)
|
||||
self.assertEquals(resp.status_code, 400)
|
||||
self.assertIn("SuspiciousFileOperation", resp.content)
|
||||
self.assertEquals(resp.status_code, 200)
|
||||
resp = self.client.get(
|
||||
reverse_course_url(
|
||||
'import_status_handler',
|
||||
self.course.id,
|
||||
kwargs={'filename': os.path.split(tarpath)[1]}
|
||||
)
|
||||
)
|
||||
status = json.loads(resp.content)["ImportStatus"]
|
||||
self.assertEqual(status, -1)
|
||||
|
||||
try_tar(self._fifo_tar())
|
||||
try_tar(self._symlink_tar())
|
||||
|
||||
@@ -299,10 +299,17 @@ AWS_SECRET_ACCESS_KEY = AUTH_TOKENS["AWS_SECRET_ACCESS_KEY"]
|
||||
if AWS_SECRET_ACCESS_KEY == "":
|
||||
AWS_SECRET_ACCESS_KEY = None
|
||||
|
||||
AWS_STORAGE_BUCKET_NAME = AUTH_TOKENS.get('AWS_STORAGE_BUCKET_NAME', 'edxuploads')
|
||||
|
||||
# Disabling querystring auth instructs Boto to exclude the querystring parameters (e.g. signature, access key) it
|
||||
# normally appends to every returned URL.
|
||||
AWS_QUERYSTRING_AUTH = AUTH_TOKENS.get('AWS_QUERYSTRING_AUTH', True)
|
||||
|
||||
AWS_DEFAULT_ACL = 'private'
|
||||
AWS_BUCKET_ACL = AWS_DEFAULT_ACL
|
||||
AWS_QUERYSTRING_EXPIRE = 7 * 24 * 60 * 60 # 7 days
|
||||
AWS_S3_CUSTOM_DOMAIN = AUTH_TOKENS.get('AWS_S3_CUSTOM_DOMAIN', 'edxuploads.s3.amazonaws.com')
|
||||
|
||||
if AUTH_TOKENS.get('DEFAULT_FILE_STORAGE'):
|
||||
DEFAULT_FILE_STORAGE = AUTH_TOKENS.get('DEFAULT_FILE_STORAGE')
|
||||
elif AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY:
|
||||
@@ -310,6 +317,15 @@ elif AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY:
|
||||
else:
|
||||
DEFAULT_FILE_STORAGE = 'django.core.files.storage.FileSystemStorage'
|
||||
|
||||
COURSE_IMPORT_EXPORT_BUCKET = ENV_TOKENS.get('COURSE_IMPORT_EXPORT_BUCKET', '')
|
||||
|
||||
if COURSE_IMPORT_EXPORT_BUCKET:
|
||||
COURSE_IMPORT_EXPORT_STORAGE = 'contentstore.storage.ImportExportS3Storage'
|
||||
else:
|
||||
COURSE_IMPORT_EXPORT_STORAGE = DEFAULT_FILE_STORAGE
|
||||
|
||||
USER_TASKS_ARTIFACT_STORAGE = COURSE_IMPORT_EXPORT_STORAGE
|
||||
|
||||
DATABASES = AUTH_TOKENS['DATABASES']
|
||||
|
||||
# The normal database user does not have enough permissions to run migrations.
|
||||
|
||||
@@ -555,6 +555,8 @@ LOCALE_PATHS = (REPO_ROOT + '/conf/locale',) # edx-platform/conf/locale/
|
||||
# Messages
|
||||
MESSAGE_STORAGE = 'django.contrib.messages.storage.session.SessionStorage'
|
||||
|
||||
COURSE_IMPORT_EXPORT_STORAGE = 'django.core.files.storage.FileSystemStorage'
|
||||
|
||||
##### EMBARGO #####
|
||||
EMBARGO_SITE_REDIRECT_URL = None
|
||||
|
||||
|
||||
@@ -8,6 +8,8 @@ from .aws import * # pylint: disable=wildcard-import, unused-wildcard-import
|
||||
|
||||
# Don't use S3 in devstack, fall back to filesystem
|
||||
del DEFAULT_FILE_STORAGE
|
||||
COURSE_IMPORT_EXPORT_STORAGE = 'django.core.files.storage.FileSystemStorage'
|
||||
USER_TASKS_ARTIFACT_STORAGE = COURSE_IMPORT_EXPORT_STORAGE
|
||||
MEDIA_ROOT = "/edx/var/edxapp/uploads"
|
||||
|
||||
DEBUG = True
|
||||
|
||||
@@ -81,7 +81,7 @@ define(
|
||||
*/
|
||||
var initEventListeners = function() {
|
||||
$(window).on('beforeunload.import', function() {
|
||||
if (current.stage <= STAGE.UNPACKING) {
|
||||
if (current.stage < STAGE.UNPACKING) {
|
||||
return gettext('Your import is in progress; navigating away will abort it.');
|
||||
}
|
||||
});
|
||||
|
||||
@@ -118,7 +118,7 @@ else:
|
||||
<li class="item-progresspoint item-progresspoint-unpack is-started">
|
||||
<span class="deco status-visual">
|
||||
<span class="icon fa fa-cog" aria-hidden="true"></span>
|
||||
<span class="icon fa fa-warning" aria-hidden="true"v></span>
|
||||
<span class="icon fa fa-warning" aria-hidden="true"></span>
|
||||
</span>
|
||||
|
||||
<div class="status-detail">
|
||||
|
||||
@@ -2176,6 +2176,9 @@ CSRF_COOKIE_SECURE = False
|
||||
|
||||
REST_FRAMEWORK = {
|
||||
'DEFAULT_PAGINATION_CLASS': 'openedx.core.lib.api.paginators.DefaultPagination',
|
||||
'DEFAULT_RENDERER_CLASSES': (
|
||||
'rest_framework.renderers.JSONRenderer',
|
||||
),
|
||||
'PAGE_SIZE': 10,
|
||||
'URL_FORMAT_OVERRIDE': None,
|
||||
'DEFAULT_THROTTLE_RATES': {
|
||||
|
||||
@@ -141,7 +141,7 @@ class TestPaverServerTasks(PaverTestCase):
|
||||
"""
|
||||
Test the "celery" task.
|
||||
"""
|
||||
settings = options.get("settings", "dev_with_worker")
|
||||
settings = options.get("settings", "devstack_with_worker")
|
||||
call_task("pavelib.servers.celery", options=options)
|
||||
self.assertEquals(self.task_messages, [EXPECTED_CELERY_COMMAND.format(settings=settings)])
|
||||
|
||||
@@ -292,7 +292,7 @@ class TestPaverServerTasks(PaverTestCase):
|
||||
port=8001,
|
||||
)
|
||||
)
|
||||
expected_messages.append(EXPECTED_CELERY_COMMAND.format(settings="dev_with_worker"))
|
||||
expected_messages.append(EXPECTED_CELERY_COMMAND.format(settings="devstack_with_worker"))
|
||||
self.assertEquals(self.task_messages, expected_messages)
|
||||
|
||||
def expected_sass_commands(self, system=None, asset_settings=u"test_static_optimized"):
|
||||
|
||||
@@ -157,7 +157,7 @@ def celery(options):
|
||||
"""
|
||||
Runs Celery workers.
|
||||
"""
|
||||
settings = getattr(options, 'settings', 'dev_with_worker')
|
||||
settings = getattr(options, 'settings', 'devstack_with_worker')
|
||||
run_process(django_cmd('lms', settings, 'celery', 'worker', '--beat', '--loglevel=INFO', '--pythonpath=.'))
|
||||
|
||||
|
||||
@@ -187,7 +187,7 @@ def run_all_servers(options):
|
||||
"""
|
||||
settings = getattr(options, 'settings', DEFAULT_SETTINGS)
|
||||
asset_settings = getattr(options, 'asset_settings', settings)
|
||||
worker_settings = getattr(options, 'worker_settings', 'dev_with_worker')
|
||||
worker_settings = getattr(options, 'worker_settings', 'devstack_with_worker')
|
||||
fast = getattr(options, 'fast', False)
|
||||
optimized = getattr(options, 'optimized', False)
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ django-simple-history==1.6.3
|
||||
django-statici18n==1.1.5
|
||||
django-storages==1.4.1
|
||||
django-method-override==0.1.0
|
||||
django-user-tasks==0.1.2
|
||||
django-user-tasks==0.1.4
|
||||
# We need a fix to DRF 3.2.x, for now use it from our own cherry-picked repo
|
||||
#djangorestframework>=3.1,<3.2
|
||||
git+https://github.com/edx/django-rest-framework.git@3c72cb5ee5baebc4328947371195eae2077197b0#egg=djangorestframework==3.2.3
|
||||
|
||||
Reference in New Issue
Block a user