""" Tests for file.py """ import os from datetime import datetime from io import StringIO from unittest.mock import Mock, patch import pytest import ddt from django.core import exceptions from django.core.files.uploadedfile import SimpleUploadedFile from django.http import HttpRequest from django.test import TestCase from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locations import CourseLocator from pytz import UTC import common.djangoapps.util.file from common.djangoapps.util.file import ( FileValidationException, UniversalNewlineIterator, course_and_time_based_filename_generator, course_filename_prefix_generator, store_uploaded_file ) @ddt.ddt class FilenamePrefixGeneratorTestCase(TestCase): """ Tests for course_filename_prefix_generator """ @ddt.data(CourseLocator(org='foo', course='bar', run='baz'), CourseKey.from_string('foo/bar/baz')) def test_locators(self, course_key): assert course_filename_prefix_generator(course_key) == 'foo_bar_baz' @ddt.data(CourseLocator(org='foo', course='bar', run='baz'), CourseKey.from_string('foo/bar/baz')) def test_custom_separator(self, course_key): assert course_filename_prefix_generator(course_key, separator='-') == 'foo-bar-baz' @ddt.ddt class FilenameGeneratorTestCase(TestCase): """ Tests for course_and_time_based_filename_generator """ NOW = datetime.strptime('1974-06-22T01:02:03', '%Y-%m-%dT%H:%M:%S').replace(tzinfo=UTC) def setUp(self): super().setUp() datetime_patcher = patch.object( common.djangoapps.util.file, 'datetime', Mock(wraps=datetime) ) mocked_datetime = datetime_patcher.start() mocked_datetime.now.return_value = self.NOW self.addCleanup(datetime_patcher.stop) @ddt.data(CourseLocator(org='foo', course='bar', run='baz'), CourseKey.from_string('foo/bar/baz')) def test_filename_generator(self, course_key): """ Tests that the generator creates names based on course_id, base name, and date. """ assert 'foo_bar_baz_file_1974-06-22-010203' == course_and_time_based_filename_generator(course_key, 'file') assert 'foo_bar_baz_base_name_ø_1974-06-22-010203' ==\ course_and_time_based_filename_generator(course_key, ' base` name ø ') class StoreUploadedFileTestCase(TestCase): """ Tests for store_uploaded_file. """ def setUp(self): super().setUp() self.request = Mock(spec=HttpRequest) self.file_content = b"test file content" self.stored_file_name = None self.file_storage = None self.default_max_size = 2000000 def tearDown(self): super().tearDown() if self.file_storage and self.stored_file_name: self.file_storage.delete(self.stored_file_name) def verify_exception(self, expected_message, error): """ Helper method to verify exception text. """ assert expected_message == str(error.value) def test_error_conditions(self): """ Verifies that exceptions are thrown in the expected cases. """ with pytest.raises(ValueError) as error: self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} store_uploaded_file(self.request, "wrong_key", [".txt", ".csv"], "stored_file", self.default_max_size) self.verify_exception("No file uploaded with key 'wrong_key'.", error) with pytest.raises(exceptions.PermissionDenied) as error: self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} store_uploaded_file(self.request, "uploaded_file", [], "stored_file", self.default_max_size) self.verify_exception("The file must end with one of the following extensions: ''.", error) with pytest.raises(exceptions.PermissionDenied) as error: self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} store_uploaded_file(self.request, "uploaded_file", [".bar"], "stored_file", self.default_max_size) self.verify_exception("The file must end with the extension '.bar'.", error) with pytest.raises(exceptions.PermissionDenied) as error: self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} store_uploaded_file(self.request, "uploaded_file", [".xxx", ".bar"], "stored_file", self.default_max_size) self.verify_exception("The file must end with one of the following extensions: '.xxx', '.bar'.", error) with pytest.raises(exceptions.PermissionDenied) as error: self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} store_uploaded_file(self.request, "uploaded_file", [".csv"], "stored_file", 2) self.verify_exception("Maximum upload file size is 2 bytes.", error) def test_validator(self): """ Verify that a validator function can throw an exception. """ validator_data = {} def verify_file_presence(should_exist): """ Verify whether or not the stored file, passed to the validator, exists. """ assert should_exist == validator_data['storage'].exists(validator_data['filename']) def store_file_data(storage, filename): """ Stores file validator data for testing after validation is complete. """ validator_data["storage"] = storage validator_data["filename"] = filename verify_file_presence(True) def exception_validator(storage, filename): """ Validation test function that throws an exception """ assert 'error_file.csv' == os.path.basename(filename) with storage.open(filename, 'rb') as f: assert self.file_content == f.read() store_file_data(storage, filename) raise FileValidationException("validation failed") def success_validator(storage, filename): """ Validation test function that is a no-op """ assert 'success_file' in os.path.basename(filename) store_file_data(storage, filename) with pytest.raises(FileValidationException) as error: self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} store_uploaded_file( self.request, "uploaded_file", [".csv"], "error_file", self.default_max_size, validator=exception_validator ) self.verify_exception("validation failed", error) # Verify the file was deleted. verify_file_presence(False) self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} store_uploaded_file( self.request, "uploaded_file", [".csv"], "success_file", self.default_max_size, validator=success_validator ) # Verify the file still exists verify_file_presence(True) def test_file_upload_lower_case_extension(self): """ Tests uploading a file with lower case extension. Verifies that the stored file contents are correct. """ self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.csv", self.file_content)} file_storage, stored_file_name = store_uploaded_file( self.request, "uploaded_file", [".csv"], "stored_file", self.default_max_size ) self._verify_successful_upload(file_storage, stored_file_name, self.file_content) def test_file_upload_upper_case_extension(self): """ Tests uploading a file with upper case extension. Verifies that the stored file contents are correct. """ file_content = b"uppercase" self.request.FILES = {"uploaded_file": SimpleUploadedFile("tempfile.CSV", file_content)} file_storage, stored_file_name = store_uploaded_file( self.request, "uploaded_file", [".gif", ".csv"], "second_stored_file", self.default_max_size ) self._verify_successful_upload(file_storage, stored_file_name, file_content) def test_unique_filenames(self): """ Test that the file storage method will create a unique filename if the file already exists. """ requested_file_name = "nonunique_store" file_content = b"copy" self.request.FILES = {"nonunique_file": SimpleUploadedFile("nonunique.txt", file_content)} _, first_stored_file_name = store_uploaded_file( self.request, "nonunique_file", [".txt"], requested_file_name, self.default_max_size ) self.request.FILES = {"nonunique_file": SimpleUploadedFile("nonunique.txt", file_content)} file_storage, second_stored_file_name = store_uploaded_file( self.request, "nonunique_file", [".txt"], requested_file_name, self.default_max_size ) assert first_stored_file_name != second_stored_file_name assert requested_file_name in second_stored_file_name self._verify_successful_upload(file_storage, second_stored_file_name, file_content) def _verify_successful_upload(self, storage, file_name, expected_content): """ Helper method that checks that the stored version of the uploaded file has the correct content """ assert storage.exists(file_name) with storage.open(file_name, 'rb') as f: assert expected_content == f.read() @ddt.ddt class TestUniversalNewlineIterator(TestCase): """ Tests for the UniversalNewlineIterator class. """ @ddt.data(1, 2, 999) def test_line_feeds(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO('foo\nbar\n'), buffer_size=buffer_size)] == ['foo\n', 'bar\n'] @ddt.data(1, 2, 999) def test_carriage_returns(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO('foo\rbar\r'), buffer_size=buffer_size)] == ['foo\n', 'bar\n'] @ddt.data(1, 2, 999) def test_carriage_returns_and_line_feeds(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO('foo\r\nbar\r\n'), buffer_size=buffer_size)] == ['foo\n', 'bar\n'] @ddt.data(1, 2, 999) def test_no_trailing_newline(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO('foo\nbar'), buffer_size=buffer_size)] == ['foo\n', 'bar'] @ddt.data(1, 2, 999) def test_only_one_line(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO('foo\n'), buffer_size=buffer_size)] == ['foo\n'] @ddt.data(1, 2, 999) def test_only_one_line_no_trailing_newline(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO('foo'), buffer_size=buffer_size)] == ['foo'] @ddt.data(1, 2, 999) def test_empty_file(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO(''), buffer_size=buffer_size)] == [] @ddt.data(1, 2, 999) def test_unicode_data(self, buffer_size): assert [thing.decode('utf-8') for thing in UniversalNewlineIterator(StringIO('héllø wo®ld'), buffer_size=buffer_size)] == ['héllø wo®ld']