398 lines
17 KiB
Python
398 lines
17 KiB
Python
"""
|
|
Base test classes for LMS instructor-initiated background tasks
|
|
|
|
"""
|
|
|
|
|
|
import json
|
|
# pylint: disable=attribute-defined-outside-init
|
|
import os
|
|
import shutil
|
|
from tempfile import mkdtemp
|
|
from unittest.mock import Mock, patch
|
|
from uuid import uuid4
|
|
|
|
import unicodecsv
|
|
from celery.states import FAILURE, SUCCESS
|
|
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
|
|
from django.urls import reverse
|
|
from opaque_keys.edx.keys import CourseKey
|
|
from opaque_keys.edx.locations import Location
|
|
|
|
from xmodule.modulestore import ModuleStoreEnum
|
|
from xmodule.modulestore.django import modulestore
|
|
from xmodule.modulestore.tests.django_utils import TEST_DATA_SPLIT_MODULESTORE, ModuleStoreTestCase
|
|
from xmodule.modulestore.tests.factories import CourseFactory, BlockFactory
|
|
from xmodule.capa.tests.response_xml_factory import OptionResponseXMLFactory
|
|
from common.djangoapps.student.tests.factories import CourseEnrollmentFactory, UserFactory
|
|
from lms.djangoapps.courseware.model_data import StudentModule
|
|
from lms.djangoapps.courseware.tests.tests import LoginEnrollmentTestCase
|
|
from lms.djangoapps.instructor_task.api_helper import encode_problem_and_student_input
|
|
from lms.djangoapps.instructor_task.models import PROGRESS, QUEUING, ReportStore
|
|
from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory
|
|
from lms.djangoapps.instructor_task.views import instructor_task_status
|
|
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase
|
|
from openedx.core.lib.url_utils import quote_slashes
|
|
|
|
TEST_COURSE_ORG = 'edx'
|
|
TEST_COURSE_NAME = 'test_course'
|
|
TEST_COURSE_NUMBER = '1.23x'
|
|
TEST_COURSE_KEY = CourseKey.from_string(f'course-v1:{TEST_COURSE_ORG}+{TEST_COURSE_NUMBER}+{TEST_COURSE_NAME}')
|
|
TEST_CHAPTER_NAME = "Section"
|
|
TEST_SECTION_NAME = "Subsection"
|
|
|
|
TEST_FAILURE_MESSAGE = 'task failed horribly'
|
|
TEST_FAILURE_EXCEPTION = 'RandomCauseError'
|
|
|
|
OPTION_1 = 'Option 1'
|
|
OPTION_2 = 'Option 2'
|
|
|
|
|
|
class InstructorTaskTestCase(CacheIsolationTestCase):
|
|
"""
|
|
Tests API and view methods that involve the reporting of status for background tasks.
|
|
"""
|
|
def setUp(self):
|
|
super().setUp()
|
|
|
|
self.student = UserFactory.create(username="student", email="student@edx.org")
|
|
self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org")
|
|
self.problem_url = InstructorTaskTestCase.problem_location("test_urlname")
|
|
|
|
@staticmethod
|
|
def problem_location(problem_url_name):
|
|
"""
|
|
Create an internal location for a test problem.
|
|
"""
|
|
return TEST_COURSE_KEY.make_usage_key('problem', problem_url_name)
|
|
|
|
def _create_entry(self, task_state=QUEUING, task_output=None, student=None):
|
|
"""Creates a InstructorTask entry for testing."""
|
|
task_id = str(uuid4())
|
|
progress_json = json.dumps(task_output) if task_output is not None else None
|
|
task_input, task_key = encode_problem_and_student_input(self.problem_url, student)
|
|
|
|
instructor_task = InstructorTaskFactory.create(course_id=TEST_COURSE_KEY,
|
|
requester=self.instructor,
|
|
task_input=json.dumps(task_input),
|
|
task_key=task_key,
|
|
task_id=task_id,
|
|
task_state=task_state,
|
|
task_output=progress_json)
|
|
return instructor_task
|
|
|
|
def _create_failure_entry(self):
|
|
"""Creates a InstructorTask entry representing a failed task."""
|
|
# view task entry for task failure
|
|
progress = {'message': TEST_FAILURE_MESSAGE,
|
|
'exception': TEST_FAILURE_EXCEPTION,
|
|
}
|
|
return self._create_entry(task_state=FAILURE, task_output=progress)
|
|
|
|
def _create_success_entry(self, student=None):
|
|
"""Creates a InstructorTask entry representing a successful task."""
|
|
return self._create_progress_entry(student, task_state=SUCCESS)
|
|
|
|
def _create_progress_entry(self, student=None, task_state=PROGRESS):
|
|
"""Creates a InstructorTask entry representing a task in progress."""
|
|
progress = {'attempted': 3,
|
|
'succeeded': 2,
|
|
'total': 5,
|
|
'action_name': 'rescored',
|
|
}
|
|
return self._create_entry(task_state=task_state, task_output=progress, student=student)
|
|
|
|
|
|
class InstructorTaskCourseTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase):
|
|
"""
|
|
Base test class for InstructorTask-related tests that require
|
|
the setup of a course.
|
|
"""
|
|
MODULESTORE = TEST_DATA_SPLIT_MODULESTORE
|
|
course = None
|
|
current_user = None
|
|
|
|
def initialize_course(self, course_factory_kwargs=None):
|
|
"""
|
|
Create a course in the store, with a chapter and section.
|
|
|
|
Arguments:
|
|
course_factory_kwargs (dict): kwargs dict to pass to
|
|
CourseFactory.create()
|
|
"""
|
|
self.module_store = modulestore()
|
|
|
|
# Create the course
|
|
course_args = {
|
|
"org": TEST_COURSE_ORG,
|
|
"number": TEST_COURSE_NUMBER,
|
|
"display_name": TEST_COURSE_NAME
|
|
}
|
|
if course_factory_kwargs is not None:
|
|
course_args.update(course_factory_kwargs)
|
|
self.course = CourseFactory.create(**course_args)
|
|
self.add_course_content()
|
|
|
|
def add_course_content(self):
|
|
"""
|
|
Add a chapter and a sequential to the current course.
|
|
"""
|
|
# Add a chapter to the course
|
|
self.chapter = BlockFactory.create(
|
|
parent_location=self.course.location,
|
|
display_name=TEST_CHAPTER_NAME,
|
|
)
|
|
|
|
# add a sequence to the course to which the problems can be added
|
|
self.problem_section = BlockFactory.create(
|
|
parent_location=self.chapter.location,
|
|
category='sequential',
|
|
metadata={'graded': True, 'format': 'Homework'},
|
|
display_name=TEST_SECTION_NAME,
|
|
)
|
|
|
|
@staticmethod
|
|
def get_user_email(username):
|
|
"""Generate email address based on username"""
|
|
return f'{username}@test.com'
|
|
|
|
def login_username(self, username):
|
|
"""Login the user, given the `username`."""
|
|
if self.current_user != username:
|
|
self.logout()
|
|
user_email = User.objects.get(username=username).email
|
|
self.login(user_email, self.TEST_PASSWORD)
|
|
self.current_user = username
|
|
|
|
def _create_user(self, username, email=None, is_staff=False, mode='honor', enrollment_active=True):
|
|
"""Creates a user and enrolls them in the test course."""
|
|
if email is None:
|
|
email = InstructorTaskCourseTestCase.get_user_email(username)
|
|
thisuser = UserFactory.create(username=username, email=email, is_staff=is_staff)
|
|
CourseEnrollmentFactory.create(user=thisuser, course_id=self.course.id, mode=mode, is_active=enrollment_active)
|
|
return thisuser
|
|
|
|
def create_instructor(self, username, email=None):
|
|
"""Creates an instructor for the test course."""
|
|
return self._create_user(username, email, is_staff=True)
|
|
|
|
def create_student(self, username, email=None, mode='honor', enrollment_active=True):
|
|
"""Creates a student for the test course."""
|
|
return self._create_user(username, email, is_staff=False, mode=mode, enrollment_active=enrollment_active)
|
|
|
|
@staticmethod
|
|
def get_task_status(task_id):
|
|
"""Use api method to fetch task status, using mock request."""
|
|
mock_request = Mock()
|
|
mock_request.GET = mock_request.POST = {'task_id': task_id}
|
|
response = instructor_task_status(mock_request)
|
|
status = json.loads(response.content.decode('utf-8'))
|
|
return status
|
|
|
|
def create_task_request(self, requester_username):
|
|
"""Generate request that can be used for submitting tasks"""
|
|
request = Mock()
|
|
request.user = User.objects.get(username=requester_username)
|
|
request.get_host = Mock(return_value="testhost")
|
|
request.META = {'REMOTE_ADDR': '0:0:0:0', 'SERVER_NAME': 'testhost'}
|
|
request.is_secure = Mock(return_value=False)
|
|
return request
|
|
|
|
|
|
class InstructorTaskModuleTestCase(InstructorTaskCourseTestCase):
|
|
"""
|
|
Base test class for InstructorTask-related tests that require
|
|
the setup of a course and problem in order to access StudentModule state.
|
|
"""
|
|
@staticmethod
|
|
def problem_location(problem_url_name, course_key=None):
|
|
"""
|
|
Create an internal location for a test problem.
|
|
"""
|
|
if "i4x:" in problem_url_name:
|
|
return Location.from_string(problem_url_name)
|
|
elif course_key:
|
|
return course_key.make_usage_key('problem', problem_url_name)
|
|
else:
|
|
return TEST_COURSE_KEY.make_usage_key('problem', problem_url_name)
|
|
|
|
def _option_problem_factory_args(self, correct_answer=OPTION_1, num_inputs=1, num_responses=2):
|
|
"""
|
|
Returns the factory args for the option problem type.
|
|
"""
|
|
return {
|
|
'question_text': f'The correct answer is {correct_answer}',
|
|
'options': [OPTION_1, OPTION_2],
|
|
'correct_option': correct_answer,
|
|
'num_responses': num_responses,
|
|
'num_inputs': num_inputs,
|
|
}
|
|
|
|
def define_option_problem(self, problem_url_name, parent=None, **kwargs):
|
|
"""Create the problem definition so the answer is Option 1"""
|
|
if parent is None:
|
|
parent = self.problem_section
|
|
factory = OptionResponseXMLFactory()
|
|
factory_args = self._option_problem_factory_args()
|
|
problem_xml = factory.build_xml(**factory_args)
|
|
return BlockFactory.create(parent_location=parent.location,
|
|
parent=parent,
|
|
category="problem",
|
|
display_name=problem_url_name,
|
|
data=problem_xml,
|
|
**kwargs)
|
|
|
|
def redefine_option_problem(self, problem_url_name, correct_answer=OPTION_1, num_inputs=1, num_responses=2):
|
|
"""Change the problem definition so the answer is Option 2"""
|
|
factory = OptionResponseXMLFactory()
|
|
factory_args = self._option_problem_factory_args(correct_answer, num_inputs, num_responses)
|
|
problem_xml = factory.build_xml(**factory_args)
|
|
location = InstructorTaskTestCase.problem_location(problem_url_name)
|
|
item = self.module_store.get_item(location)
|
|
with self.module_store.branch_setting(ModuleStoreEnum.Branch.draft_preferred, location.course_key):
|
|
item.data = problem_xml
|
|
self.module_store.update_item(item, self.user.id)
|
|
self.module_store.publish(location, self.user.id)
|
|
|
|
def get_student_module(self, username, block):
|
|
"""Get StudentModule object for test course, given the `username` and the problem's `block`."""
|
|
return StudentModule.objects.get(course_id=self.course.id,
|
|
student=User.objects.get(username=username),
|
|
module_type=block.location.block_type,
|
|
module_state_key=block.location,
|
|
)
|
|
|
|
def submit_student_answer(self, username, problem_url_name, responses):
|
|
"""
|
|
Use ajax interface to submit a student answer.
|
|
|
|
Assumes the input list of responses has two values.
|
|
"""
|
|
def get_input_id(response_id):
|
|
"""Creates input id using information about the test course and the current problem."""
|
|
# Note that this is a capa-specific convention. The form is a version of the problem's
|
|
# URL, modified so that it can be easily stored in html, prepended with "input-" and
|
|
# appended with a sequence identifier for the particular response the input goes to.
|
|
return 'input_{}_{}'.format(
|
|
problem_url_name,
|
|
response_id
|
|
)
|
|
|
|
# make sure that the requested user is logged in, so that the ajax call works
|
|
# on the right problem:
|
|
self.login_username(username)
|
|
# make ajax call:
|
|
modx_url = reverse('xblock_handler', kwargs={
|
|
'course_id': str(self.course.id),
|
|
'usage_id': quote_slashes(
|
|
str(InstructorTaskModuleTestCase.problem_location(problem_url_name, self.course.id))
|
|
),
|
|
'handler': 'xmodule_handler',
|
|
'suffix': 'problem_check',
|
|
})
|
|
|
|
# assign correct identifier to each response.
|
|
resp = self.client.post(modx_url, {
|
|
get_input_id('{}_1').format(index): response for index, response in enumerate(responses, 2)
|
|
})
|
|
return resp
|
|
|
|
|
|
class TestReportMixin:
|
|
"""
|
|
Cleans up after tests that place files in the reports directory.
|
|
"""
|
|
|
|
def setUp(self):
|
|
|
|
def clean_up_tmpdir():
|
|
"""Remove temporary directory created for instructor task models."""
|
|
if os.path.exists(self.tmp_dir):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
super().setUp()
|
|
|
|
# Ensure that working with the temp directories in tests is thread safe
|
|
# by creating a unique temporary directory for each testcase.
|
|
self.tmp_dir = mkdtemp()
|
|
|
|
mock_grades_download = {'STORAGE_TYPE': 'localfs', 'BUCKET': 'test-grades', 'ROOT_PATH': self.tmp_dir}
|
|
self.grades_patch = patch.dict('django.conf.settings.GRADES_DOWNLOAD', mock_grades_download)
|
|
self.grades_patch.start()
|
|
self.addCleanup(self.grades_patch.stop)
|
|
|
|
mock_fin_report = {'STORAGE_TYPE': 'localfs', 'BUCKET': 'test-financial-reports', 'ROOT_PATH': self.tmp_dir}
|
|
self.reports_patch = patch.dict('django.conf.settings.FINANCIAL_REPORTS', mock_fin_report)
|
|
self.reports_patch.start()
|
|
self.addCleanup(self.reports_patch.stop)
|
|
|
|
self.addCleanup(clean_up_tmpdir)
|
|
|
|
def verify_rows_in_csv(self, expected_rows, file_index=0, verify_order=True, ignore_other_columns=False):
|
|
"""
|
|
Verify that the last ReportStore CSV contains the expected content.
|
|
|
|
Arguments:
|
|
expected_rows (iterable): An iterable of dictionaries,
|
|
where each dict represents a row of data in the last
|
|
ReportStore CSV. Each dict maps keys from the CSV
|
|
header to values in that row's corresponding cell.
|
|
file_index (int): Describes which report store file to
|
|
open. Files are ordered by last modified date, and 0
|
|
corresponds to the most recently modified file.
|
|
verify_order (boolean): When True (default), we verify that
|
|
both the content and order of `expected_rows` matches
|
|
the actual csv rows. When False, we only verify that
|
|
the content matches.
|
|
ignore_other_columns (boolean): When True, we verify that `expected_rows`
|
|
contain data which is the subset of actual csv rows.
|
|
"""
|
|
report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD')
|
|
report_csv_filename = report_store.links_for(self.course.id)[file_index][0]
|
|
report_path = report_store.path_to(self.course.id, report_csv_filename)
|
|
with report_store.storage.open(report_path) as csv_file:
|
|
# Expand the dict reader generator so we don't lose it's content
|
|
csv_rows = list(unicodecsv.DictReader(csv_file, encoding='utf-8-sig'))
|
|
|
|
if ignore_other_columns:
|
|
csv_rows = [
|
|
{key: row.get(key) for key in expected_rows[index].keys()} for index, row in enumerate(csv_rows)
|
|
]
|
|
|
|
numeric_csv_rows = [self._extract_and_round_numeric_items(row) for row in csv_rows]
|
|
numeric_expected_rows = [self._extract_and_round_numeric_items(row) for row in expected_rows]
|
|
|
|
if verify_order:
|
|
assert csv_rows == expected_rows
|
|
assert numeric_csv_rows == numeric_expected_rows
|
|
else:
|
|
self.assertCountEqual(csv_rows, expected_rows)
|
|
self.assertCountEqual(numeric_csv_rows, numeric_expected_rows)
|
|
|
|
@staticmethod
|
|
def _extract_and_round_numeric_items(dictionary):
|
|
"""
|
|
csv data may contain numeric values that are converted to strings, and fractional
|
|
numbers can be imprecise (e.g. 1 / 6 is sometimes '0.16666666666666666' and other times
|
|
'0.166666666667'). This function returns a new dictionary that contains only the
|
|
numerically-valued items from it, rounded to four decimal places.
|
|
"""
|
|
extracted = {}
|
|
for key in list(dictionary):
|
|
try:
|
|
extracted[key] = round(float(dictionary[key]), 4)
|
|
except ValueError:
|
|
pass
|
|
return extracted
|
|
|
|
def get_csv_row_with_headers(self):
|
|
"""
|
|
Helper function to return list with the column names from the CSV file (the first row)
|
|
"""
|
|
report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD')
|
|
report_csv_filename = report_store.links_for(self.course.id)[0][0]
|
|
report_path = report_store.path_to(self.course.id, report_csv_filename)
|
|
with report_store.storage.open(report_path) as csv_file:
|
|
rows = unicodecsv.reader(csv_file, encoding='utf-8-sig')
|
|
return next(rows)
|