""" Implements the Problem XBlock, which is built on top of the CAPA subsystem. """ import copy import datetime import hashlib import json import logging import os import re import struct import sys import traceback from bleach.sanitizer import Cleaner from django.conf import settings from django.core.exceptions import ImproperlyConfigured from django.utils.encoding import smart_str from django.utils.functional import cached_property from lxml import etree from pkg_resources import resource_string from pytz import utc from web_fragments.fragment import Fragment from xblock.core import XBlock from xblock.fields import Boolean, Dict, Float, Integer, Scope, String, XMLString from xblock.scorable import ScorableXBlockMixin, Score from capa import responsetypes from capa.capa_problem import LoncapaProblem, LoncapaSystem from capa.inputtypes import Status from capa.responsetypes import LoncapaProblemError, ResponseError, StudentInputError from capa.util import convert_files_to_filenames, get_inner_html_from_xpath from xmodule.contentstore.django import contentstore from xmodule.editing_module import EditingMixin from xmodule.exceptions import NotFoundError, ProcessingError from xmodule.graders import ShowCorrectness from xmodule.raw_module import RawMixin from xmodule.util.sandboxing import get_python_lib_zip from xmodule.util.xmodule_django import add_webpack_to_fragment from xmodule.x_module import ( HTMLSnippet, ResourceTemplates, XModuleMixin, XModuleToXBlockMixin, shim_xmodule_js ) from xmodule.xml_module import XmlMixin from common.djangoapps.xblock_django.constants import ( ATTR_KEY_ANONYMOUS_USER_ID, ATTR_KEY_USER_IS_STAFF, ATTR_KEY_USER_ID, ) from openedx.core.djangolib.markup import HTML, Text from .fields import Date, ScoreField, Timedelta from .progress import Progress log = logging.getLogger("edx.courseware") # Make '_' a no-op so we can scrape strings. Using lambda instead of # `django.utils.translation.ugettext_noop` because Django cannot be imported in this file _ = lambda text: text # Generate this many different variants of problems with rerandomize=per_student NUM_RANDOMIZATION_BINS = 20 # Never produce more than this many different seeds, no matter what. MAX_RANDOMIZATION_BINS = 1000 try: FEATURES = getattr(settings, 'FEATURES', {}) except ImproperlyConfigured: FEATURES = {} class SHOWANSWER: """ Constants for when to show answer """ ALWAYS = "always" ANSWERED = "answered" ATTEMPTED = "attempted" CLOSED = "closed" FINISHED = "finished" CORRECT_OR_PAST_DUE = "correct_or_past_due" PAST_DUE = "past_due" NEVER = "never" AFTER_SOME_NUMBER_OF_ATTEMPTS = "after_attempts" AFTER_ALL_ATTEMPTS = "after_all_attempts" AFTER_ALL_ATTEMPTS_OR_CORRECT = "after_all_attempts_or_correct" ATTEMPTED_NO_PAST_DUE = "attempted_no_past_due" class RANDOMIZATION: """ Constants for problem randomization """ ALWAYS = "always" ONRESET = "onreset" NEVER = "never" PER_STUDENT = "per_student" class Randomization(String): """ Define a field to store how to randomize a problem. """ def from_json(self, value): if value in ("", "true"): return RANDOMIZATION.ALWAYS elif value == "false": return RANDOMIZATION.PER_STUDENT return value to_json = from_json @XBlock.needs('user') @XBlock.needs('i18n') @XBlock.needs('mako') @XBlock.needs('cache') @XBlock.needs('sandbox') @XBlock.needs('replace_urls') # Studio doesn't provide XQueueService, but the LMS does. @XBlock.wants('xqueue') @XBlock.wants('call_to_action') class ProblemBlock( ScorableXBlockMixin, RawMixin, XmlMixin, EditingMixin, XModuleToXBlockMixin, HTMLSnippet, ResourceTemplates, XModuleMixin, ): """ An XBlock representing a "problem". A problem contains zero or more respondable items, such as multiple choice, numeric response, true/false, etc. See common/lib/capa/capa/responsetypes.py for the full ensemble. The rendering logic of a problem is largely encapsulated within LoncapaProblem, LoncapaSystem and related classes. This block serves to host the Loncapa system within the XBlock runtime and connect it to the greater LMS/CMS. As historical context: the acronym LON-CAPA references the "Learning Online - Computer-Assisted Personalized Approach" LMS, from which this system is inspired. """ INDEX_CONTENT_TYPE = 'CAPA' resources_dir = None has_score = True show_in_read_only_mode = True template_dir_name = 'problem' mako_template = "widgets/problem-edit.html" has_author_view = True icon_class = 'problem' uses_xmodule_styles_setup = True requires_per_student_anonymous_id = True preview_view_js = { 'js': [ resource_string(__name__, 'js/src/javascript_loader.js'), resource_string(__name__, 'js/src/capa/display.js'), resource_string(__name__, 'js/src/collapsible.js'), resource_string(__name__, 'js/src/capa/imageinput.js'), resource_string(__name__, 'js/src/capa/schematic.js'), ], 'xmodule_js': resource_string(__name__, 'js/src/xmodule.js') } preview_view_css = { 'scss': [ resource_string(__name__, 'css/capa/display.scss'), ], } studio_view_js = { 'js': [ resource_string(__name__, 'js/src/problem/edit.js'), ], 'xmodule_js': resource_string(__name__, 'js/src/xmodule.js'), } studio_view_css = { 'scss': [ resource_string(__name__, 'css/editor/edit.scss'), resource_string(__name__, 'css/problem/edit.scss'), ] } display_name = String( display_name=_("Display Name"), help=_("The display name for this component."), scope=Scope.settings, # it'd be nice to have a useful default but it screws up other things; so, # use display_name_with_default for those default=_("Blank Advanced Problem") ) attempts = Integer( help=_("Number of attempts taken by the student on this problem"), default=0, scope=Scope.user_state ) max_attempts = Integer( display_name=_("Maximum Attempts"), help=_("Defines the number of times a student can try to answer this problem. " "If the value is not set, infinite attempts are allowed."), values={"min": 0}, scope=Scope.settings ) due = Date(help=_("Date that this problem is due by"), scope=Scope.settings) graceperiod = Timedelta( help=_("Amount of time after the due date that submissions will be accepted"), scope=Scope.settings ) show_correctness = String( display_name=_("Show Results"), help=_("Defines when to show whether a learner's answer to the problem is correct. " "Configured on the subsection."), scope=Scope.settings, default=ShowCorrectness.ALWAYS, values=[ {"display_name": _("Always"), "value": ShowCorrectness.ALWAYS}, {"display_name": _("Never"), "value": ShowCorrectness.NEVER}, {"display_name": _("Past Due"), "value": ShowCorrectness.PAST_DUE}, ], ) showanswer = String( display_name=_("Show Answer"), help=_("Defines when to show the answer to the problem. " "A default value can be set in Advanced Settings."), scope=Scope.settings, default=SHOWANSWER.FINISHED, values=[ {"display_name": _("Always"), "value": SHOWANSWER.ALWAYS}, {"display_name": _("Answered"), "value": SHOWANSWER.ANSWERED}, {"display_name": _("Attempted or Past Due"), "value": SHOWANSWER.ATTEMPTED}, {"display_name": _("Closed"), "value": SHOWANSWER.CLOSED}, {"display_name": _("Finished"), "value": SHOWANSWER.FINISHED}, {"display_name": _("Correct or Past Due"), "value": SHOWANSWER.CORRECT_OR_PAST_DUE}, {"display_name": _("Past Due"), "value": SHOWANSWER.PAST_DUE}, {"display_name": _("Never"), "value": SHOWANSWER.NEVER}, {"display_name": _("After Some Number of Attempts"), "value": SHOWANSWER.AFTER_SOME_NUMBER_OF_ATTEMPTS}, {"display_name": _("After All Attempts"), "value": SHOWANSWER.AFTER_ALL_ATTEMPTS}, {"display_name": _("After All Attempts or Correct"), "value": SHOWANSWER.AFTER_ALL_ATTEMPTS_OR_CORRECT}, {"display_name": _("Attempted"), "value": SHOWANSWER.ATTEMPTED_NO_PAST_DUE}, ] ) attempts_before_showanswer_button = Integer( display_name=_("Show Answer: Number of Attempts"), help=_( "Number of times the student must attempt to answer the question before the Show Answer button appears." ), values={"min": 0}, default=0, scope=Scope.settings, ) force_save_button = Boolean( help=_("Whether to force the save button to appear on the page"), scope=Scope.settings, default=False ) show_reset_button = Boolean( display_name=_("Show Reset Button"), help=_("Determines whether a 'Reset' button is shown so the user may reset their answer. " "A default value can be set in Advanced Settings."), scope=Scope.settings, default=False ) rerandomize = Randomization( display_name=_("Randomization"), help=_( 'Defines when to randomize the variables specified in the associated Python script. ' 'For problems that do not randomize values, specify \"Never\". ' ), default=RANDOMIZATION.NEVER, scope=Scope.settings, values=[ {"display_name": _("Always"), "value": RANDOMIZATION.ALWAYS}, {"display_name": _("On Reset"), "value": RANDOMIZATION.ONRESET}, {"display_name": _("Never"), "value": RANDOMIZATION.NEVER}, {"display_name": _("Per Student"), "value": RANDOMIZATION.PER_STUDENT} ] ) data = XMLString( help=_("XML data for the problem"), scope=Scope.content, enforce_type=FEATURES.get('ENABLE_XBLOCK_XML_VALIDATION', True), default="" ) correct_map = Dict(help=_("Dictionary with the correctness of current student answers"), scope=Scope.user_state, default={}) input_state = Dict(help=_("Dictionary for maintaining the state of inputtypes"), scope=Scope.user_state) student_answers = Dict(help=_("Dictionary with the current student responses"), scope=Scope.user_state) # enforce_type is set to False here because this field is saved as a dict in the database. score = ScoreField(help=_("Dictionary with the current student score"), scope=Scope.user_state, enforce_type=False) has_saved_answers = Boolean(help=_("Whether or not the answers have been saved since last submit"), scope=Scope.user_state, default=False) done = Boolean(help=_("Whether the student has answered the problem"), scope=Scope.user_state, default=False) seed = Integer(help=_("Random seed for this student"), scope=Scope.user_state) last_submission_time = Date(help=_("Last submission time"), scope=Scope.user_state) submission_wait_seconds = Integer( display_name=_("Timer Between Attempts"), help=_("Seconds a student must wait between submissions for a problem with multiple attempts."), scope=Scope.settings, default=0) weight = Float( display_name=_("Problem Weight"), help=_("Defines the number of points each problem is worth. " "If the value is not set, each response field in the problem is worth one point."), values={"min": 0, "step": .1}, scope=Scope.settings ) markdown = String(help=_("Markdown source of this module"), default=None, scope=Scope.settings) source_code = String( help=_("Source code for LaTeX and Word problems. This feature is not well-supported."), scope=Scope.settings ) use_latex_compiler = Boolean( help=_("Enable LaTeX templates?"), default=False, scope=Scope.settings ) matlab_api_key = String( display_name=_("Matlab API key"), help=_("Enter the API key provided by MathWorks for accessing the MATLAB Hosted Service. " "This key is granted for exclusive use by this course for the specified duration. " "Please do not share the API key with other courses and notify MathWorks immediately " "if you believe the key is exposed or compromised. To obtain a key for your course, " "or to report an issue, please contact moocsupport@mathworks.com"), scope=Scope.settings ) def bind_for_student(self, *args, **kwargs): # lint-amnesty, pylint: disable=signature-differs super().bind_for_student(*args, **kwargs) # Capa was an XModule. When bind_for_student() was called on it with a new runtime, a new CapaModule object # was initialized when XModuleDescriptor._xmodule() was called next. self.lcp was constructed in CapaModule # init(). To keep the same behaviour, we delete self.lcp in bind_for_student(). if 'lcp' in self.__dict__: del self.__dict__['lcp'] def student_view(self, _context, show_detailed_errors=False): """ Return the student view. """ # self.score is initialized in self.lcp but in this method is accessed before self.lcp so just call it first. try: self.lcp except Exception as err: # lint-amnesty, pylint: disable=broad-except html = self.handle_fatal_lcp_error(err if show_detailed_errors else None) else: html = self.get_html() fragment = Fragment(html) add_webpack_to_fragment(fragment, 'ProblemBlockPreview') shim_xmodule_js(fragment, 'Problem') return fragment def public_view(self, context): """ Return the view seen by users who aren't logged in or who aren't enrolled in the course. """ if getattr(self.runtime, 'suppports_state_for_anonymous_users', False): # The new XBlock runtime can generally support capa problems for users who aren't logged in, so show the # normal student_view. To prevent anonymous users from viewing specific problems, adjust course policies # and/or content groups. return self.student_view(context) else: # Show a message that this content requires users to login/enroll. return super().public_view(context) def author_view(self, context): """ Renders the Studio preview view. """ return self.student_view(context, show_detailed_errors=True) def studio_view(self, _context): """ Return the studio view. """ fragment = Fragment( self.runtime.service(self, 'mako').render_template(self.mako_template, self.get_context()) ) add_webpack_to_fragment(fragment, 'ProblemBlockStudio') shim_xmodule_js(fragment, 'MarkdownEditingDescriptor') return fragment def handle_ajax(self, dispatch, data): """ This is called by courseware.module_render, to handle an AJAX call. `data` is request.POST. Returns a json dictionary: { 'progress_changed' : True/False, 'progress' : 'none'/'in_progress'/'done', } """ # self.score is initialized in self.lcp but in this method is accessed before self.lcp so just call it first. self.lcp # lint-amnesty, pylint: disable=pointless-statement handlers = { 'hint_button': self.hint_button, 'problem_get': self.get_problem, 'problem_check': self.submit_problem, 'problem_reset': self.reset_problem, 'problem_save': self.save_problem, 'problem_show': self.get_answer, 'score_update': self.update_score, 'input_ajax': self.handle_input_ajax, 'ungraded_response': self.handle_ungraded_response } _ = self.runtime.service(self, "i18n").ugettext generic_error_message = _( "We're sorry, there was an error with processing your request. " "Please try reloading your page and trying again." ) not_found_error_message = _( "The state of this problem has changed since you loaded this page. " "Please refresh your page." ) if dispatch not in handlers: return f'Error: {dispatch} is not a known capa action' before = self.get_progress() before_attempts = self.attempts try: result = handlers[dispatch](data) except NotFoundError as ex: log.info( "Unable to find data when dispatching %s to %s for user %s", dispatch, self.scope_ids.usage_id, self.scope_ids.user_id ) _, _, traceback_obj = sys.exc_info() raise ProcessingError(not_found_error_message).with_traceback(traceback_obj) from ex except Exception as ex: # lint-amnesty, pylint: disable=broad-except log.exception( "Unknown error when dispatching %s to %s for user %s", dispatch, self.scope_ids.usage_id, self.scope_ids.user_id ) _, _, traceback_obj = sys.exc_info() raise ProcessingError(generic_error_message).with_traceback(traceback_obj) from ex after = self.get_progress() after_attempts = self.attempts progress_changed = (after != before) or (after_attempts != before_attempts) curr_score, total_possible = self.get_display_progress() result.update({ 'progress_changed': progress_changed, 'current_score': curr_score, 'total_possible': total_possible, 'attempts_used': after_attempts, }) return json.dumps(result, cls=ComplexEncoder) @property def display_name_with_default(self): """ Constructs the display name for a CAPA problem. Default to the display_name if it isn't None or not an empty string, else fall back to problem category. """ if self.display_name is None or not self.display_name.strip(): return self.location.block_type return self.display_name @property def debug(self): """ If CAPA block fails to render, we want course authors to be able to see the error in Studio. At the same time, in production, we don't want to show errors to students. """ return getattr(self.runtime, 'is_author_mode', False) or settings.DEBUG @classmethod def filter_templates(cls, template, course): """ Filter template that contains 'latex' from templates. Show them only if use_latex_compiler is set to True in course settings. """ return 'latex' not in template['template_id'] or course.use_latex_compiler def get_context(self): _context = EditingMixin.get_context(self) _context.update({ 'markdown': self.markdown, 'enable_markdown': self.markdown is not None, 'enable_latex_compiler': self.use_latex_compiler, }) return _context # VS[compat] # TODO (cpennington): Delete this method once all fall 2012 course are being # edited in the cms @classmethod def backcompat_paths(cls, path): return [ 'problems/' + path[8:], path[8:], ] @property def non_editable_metadata_fields(self): non_editable_fields = super().non_editable_metadata_fields non_editable_fields.extend([ ProblemBlock.due, ProblemBlock.graceperiod, ProblemBlock.force_save_button, ProblemBlock.markdown, ProblemBlock.use_latex_compiler, ProblemBlock.show_correctness, ]) return non_editable_fields @property def problem_types(self): """ Low-level problem type introspection for content libraries filtering by problem type """ try: tree = etree.XML(self.data) except etree.XMLSyntaxError: log.error(f'Error parsing problem types from xml for capa module {self.display_name}') return None # short-term fix to prevent errors (TNL-5057). Will be more properly addressed in TNL-4525. registered_tags = responsetypes.registry.registered_tags() return {node.tag for node in tree.iter() if node.tag in registered_tags} def index_dictionary(self): """ Return dictionary prepared with module content and type for indexing. """ xblock_body = super().index_dictionary() # Make optioninput's options index friendly by replacing the actual tag with the values capa_content = re.sub(r'\s*|\S*<\/optioninput>', r'\1', self.data) # Remove the following tags with content that can leak hints or solutions: # - `solution` (with optional attributes) and `solutionset`. # - `targetedfeedback` (with optional attributes) and `targetedfeedbackset`. # - `answer` (with optional attributes). # - `script` (with optional attributes). # - `style` (with optional attributes). # - various types of hints (with optional attributes) and `hintpart`. capa_content = re.sub( re.compile( r""" .*? | .*? | .*? | .*? | .*? | <[a-z]*hint.*?>.*? """, re.DOTALL | re.VERBOSE), "", capa_content ) capa_content = re.sub( r"(\s| |//)+", " ", Cleaner(tags=[], strip=True).clean(capa_content) ) capa_body = { "capa_content": capa_content, "display_name": self.display_name, } if "content" in xblock_body: xblock_body["content"].update(capa_body) else: xblock_body["content"] = capa_body xblock_body["content_type"] = self.INDEX_CONTENT_TYPE xblock_body["problem_types"] = list(self.problem_types) return xblock_body def has_support(self, view, functionality): """ Override the XBlock.has_support method to return appropriate value for the multi-device functionality. Returns whether the given view has support for the given functionality. """ if functionality == "multi_device": types = self.problem_types # Avoid calculating this property twice return types is not None and all( responsetypes.registry.get_class_for_tag(tag).multi_device_support for tag in types ) return False def max_score(self): """ Return the problem's max score if problem is instantiated successfully, else return max score of 0. """ capa_system = LoncapaSystem( ajax_url=None, anonymous_student_id=None, cache=None, can_execute_unsafe_code=None, get_python_lib_zip=None, DEBUG=None, i18n=self.runtime.service(self, "i18n"), render_template=None, resources_fs=self.runtime.resources_fs, seed=None, STATIC_URL=None, xqueue=None, matlab_api_key=None, ) try: lcp = LoncapaProblem( problem_text=self.data, id=self.location.html_id(), capa_system=capa_system, capa_module=self, state={}, seed=1, minimal_init=True, ) except responsetypes.LoncapaProblemError: log.exception(f"LcpFatalError for block {str(self.location)} while getting max score") maximum_score = 0 else: maximum_score = lcp.get_max_score() return maximum_score def generate_report_data(self, user_state_iterator, limit_responses=None): """ Return a list of student responses to this block in a readable way. Arguments: user_state_iterator: iterator over UserStateClient objects. E.g. the result of user_state_client.iter_all_for_block(block_key) limit_responses (int|None): maximum number of responses to include. Set to None (default) to include all. Returns: each call returns a tuple like: ("username", { "Question": "2 + 2 equals how many?", "Answer": "Four", "Answer ID": "98e6a8e915904d5389821a94e48babcf_10_1" }) """ if self.category != 'problem': raise NotImplementedError() if limit_responses == 0: # Don't even start collecting answers return capa_system = LoncapaSystem( ajax_url=None, # TODO set anonymous_student_id to the anonymous ID of the user which answered each problem # Anonymous ID is required for Matlab, CodeResponse, and some custom problems that include # '$anonymous_student_id' in their XML. # For the purposes of this report, we don't need to support those use cases. anonymous_student_id=None, cache=None, can_execute_unsafe_code=lambda: None, get_python_lib_zip=(lambda: get_python_lib_zip(contentstore, self.runtime.course_id)), DEBUG=None, i18n=self.runtime.service(self, "i18n"), render_template=None, resources_fs=self.runtime.resources_fs, seed=1, STATIC_URL=None, xqueue=None, matlab_api_key=None, ) _ = capa_system.i18n.ugettext count = 0 for user_state in user_state_iterator: if 'student_answers' not in user_state.state: continue try: lcp = LoncapaProblem( problem_text=self.data, id=self.location.html_id(), capa_system=capa_system, # We choose to run without a fully initialized CapaModule capa_module=None, state={ 'done': user_state.state.get('done'), 'correct_map': user_state.state.get('correct_map'), 'student_answers': user_state.state.get('student_answers'), 'has_saved_answers': user_state.state.get('has_saved_answers'), 'input_state': user_state.state.get('input_state'), 'seed': user_state.state.get('seed'), }, seed=user_state.state.get('seed'), # extract_tree=False allows us to work without a fully initialized CapaModule # We'll still be able to find particular data in the XML when we need it extract_tree=False, ) for answer_id, orig_answers in lcp.student_answers.items(): # Some types of problems have data in lcp.student_answers that isn't in lcp.problem_data. # E.g. formulae do this to store the MathML version of the answer. # We exclude these rows from the report because we only need the text-only answer. if answer_id.endswith('_dynamath'): continue if limit_responses and count >= limit_responses: # End the iterator here return question_text = lcp.find_question_label(answer_id) answer_text = lcp.find_answer_text(answer_id, current_answer=orig_answers) correct_answer_text = lcp.find_correct_answer_text(answer_id) count += 1 report = { _("Answer ID"): answer_id, _("Question"): question_text, _("Answer"): answer_text, } if correct_answer_text is not None: report[_("Correct Answer")] = correct_answer_text yield (user_state.username, report) except LoncapaProblemError: # Capture a backtrace for errors from failed loncapa problems log.exception( "An error occurred generating a problem report on course %s, problem %s, and student %s", self.course_id, self.scope_ids.usage_id, self.scope_ids.user_id ) # Also input error in report report = { _("Answer ID"): "Python Error", _("Question"): "Generating a report on the problem failed.", _("Answer"): "Python Error: No Answer Retrieved", } yield (user_state.username, report) @property def close_date(self): """ Return the date submissions should be closed from. """ due_date = self.due if self.graceperiod is not None and due_date: return due_date + self.graceperiod else: return due_date def get_seed(self): """ Generate the seed if not set and return it. """ if self.seed is None: self.choose_new_seed() return self.seed @cached_property def lcp(self): # lint-amnesty, pylint: disable=method-hidden, missing-function-docstring try: lcp = self.new_lcp(self.get_state_for_lcp()) except Exception as err: # pylint: disable=broad-except msg = 'cannot create LoncapaProblem {loc}: {err}'.format( loc=str(self.location), err=err) raise LoncapaProblemError(msg).with_traceback(sys.exc_info()[2]) if self.score is None: self.set_score(self.score_from_lcp(lcp)) assert self.seed is not None return lcp def choose_new_seed(self): """ Choose a new seed. """ if self.rerandomize == RANDOMIZATION.NEVER: self.seed = 1 elif self.rerandomize == RANDOMIZATION.PER_STUDENT: user_id = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_ID) or 0 # see comment on randomization_bin self.seed = randomization_bin(user_id, str(self.location).encode('utf-8')) else: self.seed = struct.unpack('i', os.urandom(4))[0] # So that sandboxed code execution can be cached, but still have an interesting # number of possibilities, cap the number of different random seeds. self.seed %= MAX_RANDOMIZATION_BINS def new_lcp(self, state, text=None): """ Generate a new Loncapa Problem """ if text is None: text = self.data user_service = self.runtime.service(self, 'user') anonymous_student_id = user_service.get_current_user().opt_attrs.get(ATTR_KEY_ANONYMOUS_USER_ID) seed = user_service.get_current_user().opt_attrs.get(ATTR_KEY_USER_ID) or 0 sandbox_service = self.runtime.service(self, 'sandbox') cache_service = self.runtime.service(self, 'cache') capa_system = LoncapaSystem( ajax_url=self.ajax_url, anonymous_student_id=anonymous_student_id, cache=cache_service, can_execute_unsafe_code=sandbox_service.can_execute_unsafe_code, get_python_lib_zip=sandbox_service.get_python_lib_zip, DEBUG=self.debug, i18n=self.runtime.service(self, "i18n"), render_template=self.runtime.service(self, 'mako').render_template, resources_fs=self.runtime.resources_fs, seed=seed, # Why do we do this if we have self.seed? STATIC_URL=self.runtime.STATIC_URL, xqueue=self.runtime.service(self, 'xqueue'), matlab_api_key=self.matlab_api_key ) return LoncapaProblem( problem_text=text, id=self.location.html_id(), state=state, seed=self.get_seed(), capa_system=capa_system, capa_module=self, # njp ) def get_state_for_lcp(self): """ Give a dictionary holding the state of the module """ return { 'done': self.done, 'correct_map': self.correct_map, 'student_answers': self.student_answers, 'has_saved_answers': self.has_saved_answers, 'input_state': self.input_state, 'seed': self.get_seed(), } def set_state_from_lcp(self): """ Set the module's state from the settings in `self.lcp` """ lcp_state = self.lcp.get_state() self.done = lcp_state['done'] self.correct_map = lcp_state['correct_map'] self.input_state = lcp_state['input_state'] self.student_answers = lcp_state['student_answers'] self.has_saved_answers = lcp_state['has_saved_answers'] def set_last_submission_time(self): """ Set the module's last submission time (when the problem was submitted) """ self.last_submission_time = datetime.datetime.now(utc) def get_progress(self): """ For now, just return weighted earned / weighted possible """ if self.score: raw_earned = self.score.raw_earned raw_possible = self.score.raw_possible else: raw_earned = raw_possible = 0 if raw_possible > 0: if self.weight is not None: # Progress objects expect total > 0 if self.weight == 0: return None # scale score and total by weight/total: weighted_earned = raw_earned * self.weight / raw_possible weighted_possible = self.weight else: weighted_earned = raw_earned weighted_possible = raw_possible try: return Progress(weighted_earned, weighted_possible) except (TypeError, ValueError): log.exception("Got bad progress") return None return None def get_display_progress(self): """ Return (score, total) to be displayed to the learner. """ progress = self.get_progress() score, total = (progress.frac() if progress else (0, 0)) # Withhold the score if hiding correctness if not self.correctness_available(): score = None return score, total def get_html(self): """ Return some html with data about the module """ curr_score, total_possible = self.get_display_progress() return self.runtime.service(self, 'mako').render_template('problem_ajax.html', { 'element_id': self.location.html_id(), 'id': str(self.location), 'ajax_url': self.ajax_url, 'current_score': curr_score, 'total_possible': total_possible, 'attempts_used': self.attempts, 'content': self.get_problem_html(encapsulate=False), 'graded': self.graded, # pylint: disable=no-member }) def handle_fatal_lcp_error(self, error): # lint-amnesty, pylint: disable=missing-function-docstring log.exception(f"LcpFatalError Encountered for {str(self.location)}") if error: return( HTML('

Error formatting HTML for problem:

{msg}

').format( msg=str(error)) ) else: return HTML( '

Could not format HTML for problem. ' 'Contact course staff in the discussion forum for assistance.

' ) def submit_button_name(self): """ Determine the name for the "submit" button. """ # The logic flow is a little odd so that _('xxx') strings can be found for # translation while also running _() just once for each string. _ = self.runtime.service(self, "i18n").ugettext submit = _('Submit') return submit def submit_button_submitting_name(self): """ Return the "Submitting" text for the "submit" button. After the user presses the "submit" button, the button will briefly display the value returned by this function until a response is received by the server. """ _ = self.runtime.service(self, "i18n").ugettext return _('Submitting') def should_enable_submit_button(self): """ Return True/False to indicate whether to enable the "Submit" button. """ submitted_without_reset = (self.is_submitted() and self.rerandomize == RANDOMIZATION.ALWAYS) # If the problem is closed (past due / too many attempts) # then we disable the "submit" button # Also, disable the "submit" button if we're waiting # for the user to reset a randomized problem if self.closed() or submitted_without_reset: return False else: return True def should_show_reset_button(self): """ Return True/False to indicate whether to show the "Reset" button. """ is_survey_question = (self.max_attempts == 0) # If the problem is closed (and not a survey question with max_attempts==0), # then do NOT show the reset button. if self.closed() and not is_survey_question: return False # Button only shows up for randomized problems if the question has been submitted if self.rerandomize in [RANDOMIZATION.ALWAYS, RANDOMIZATION.ONRESET] and self.is_submitted(): return True else: # Do NOT show the button if the problem is correct if self.is_correct(): return False else: return self.show_reset_button def should_show_save_button(self): """ Return True/False to indicate whether to show the "Save" button. """ # If the user has forced the save button to display, # then show it as long as the problem is not closed # (past due / too many attempts) if self.force_save_button: return not self.closed() else: is_survey_question = (self.max_attempts == 0) needs_reset = self.is_submitted() and self.rerandomize == RANDOMIZATION.ALWAYS # If the student has unlimited attempts, and their answers # are not randomized, then we do not need a save button # because they can use the "Check" button without consequences. # # The consequences we want to avoid are: # * Using up an attempt (if max_attempts is set) # * Changing the current problem, and no longer being # able to view it (if rerandomize is "always") # # In those cases. the if statement below is false, # and the save button can still be displayed. # if self.max_attempts is None and self.rerandomize != RANDOMIZATION.ALWAYS: return False # If the problem is closed (and not a survey question with max_attempts==0), # then do NOT show the save button # If we're waiting for the user to reset a randomized problem # then do NOT show the save button elif (self.closed() and not is_survey_question) or needs_reset: return False else: return True def handle_problem_html_error(self, err): """ Create a dummy problem to represent any errors. Change our problem to a dummy problem containing a warning message to display to users. Returns the HTML to show to users `err` is the Exception encountered while rendering the problem HTML. """ problem_display_name = self.display_name_with_default problem_location = str(self.location) log.exception( "ProblemGetHtmlError: %r, %r, %s", problem_display_name, problem_location, str(err) ) if self.debug: msg = HTML( '[courseware.capa.capa_module] ' 'Failed to generate HTML for problem {url}' ).format( url=str(self.location) ) msg += HTML('

Error:

{msg}

').format(msg=str(err)) msg += HTML('

{tb}

').format(tb=traceback.format_exc()) html = msg else: # We're in non-debug mode, and possibly even in production. We want # to avoid bricking of problem as much as possible # Presumably, student submission has corrupted LoncapaProblem HTML. # First, pull down all student answers student_answers = self.lcp.student_answers answer_ids = list(student_answers.keys()) # Some inputtypes, such as dynamath, have additional "hidden" state that # is not exposed to the student. Keep those hidden # TODO: Use regex, e.g. 'dynamath' is suffix at end of answer_id hidden_state_keywords = ['dynamath'] for answer_id in answer_ids: for hidden_state_keyword in hidden_state_keywords: if answer_id.find(hidden_state_keyword) >= 0: student_answers.pop(answer_id) # Next, generate a fresh LoncapaProblem self.lcp = self.new_lcp(None) self.set_state_from_lcp() self.set_score(self.score_from_lcp(self.lcp)) # Prepend a scary warning to the student _ = self.runtime.service(self, "i18n").ugettext warning_msg = Text(_("Warning: The problem has been reset to its initial state!")) warning = HTML('

{}

').format(warning_msg) # Translators: Following this message, there will be a bulleted list of items. warning_msg = _("The problem's state was corrupted by an invalid submission. The submission consisted of:") warning += HTML('{}
    ').format(warning_msg) for student_answer in student_answers.values(): if student_answer != '': warning += HTML('
  • {}
  • ').format(student_answer) warning_msg = _('If this error persists, please contact the course staff.') warning += HTML('
{}
').format(warning_msg) html = warning try: html += self.lcp.get_html() except Exception as error: # Couldn't do it. Give up. log.exception( "ProblemGetHtmlError: Unable to generate html from LoncapaProblem: %r, %r, %s", problem_display_name, problem_location, str(error) ) raise return html def _should_enable_demand_hint(self, demand_hints, hint_index=None): """ Should the demand hint option be enabled? Arguments: hint_index (int): The current hint index, or None (default value) if no hint is currently being shown. demand_hints (list): List of hints. Returns: bool: True is the demand hint is possible. bool: True is demand hint should be enabled. """ # hint_index is the index of the last hint that will be displayed in this rendering, # so add 1 to check if others exist. if hint_index is None: should_enable = len(demand_hints) > 0 else: should_enable = len(demand_hints) > 0 and hint_index + 1 < len(demand_hints) return len(demand_hints) > 0, should_enable def get_demand_hint(self, hint_index): """ Return html for the problem, including demand hints. hint_index (int): (None is the default) if not None, this is the index of the next demand hint to show. """ demand_hints = self.lcp.tree.xpath("//problem/demandhint/hint") hint_index = hint_index % len(demand_hints) _ = self.runtime.service(self, "i18n").ugettext counter = 0 total_text = '' while counter <= hint_index: # Translators: {previous_hints} is the HTML of hints that have already been generated, {hint_number_prefix} # is a header for this hint, and {hint_text} is the text of the hint itself. # This string is being passed to translation only for possible reordering of the placeholders. total_text = HTML(_('{previous_hints}{list_start_tag}{strong_text}{hint_text}')).format( previous_hints=HTML(total_text), list_start_tag=HTML('
  • ').format(counter=counter), strong_text=HTML('{hint_number_prefix}').format( # Translators: e.g. "Hint 1 of 3: " meaning we are showing the first of three hints. # This text is shown in bold before the accompanying hint text. hint_number_prefix=Text(_("Hint ({hint_num} of {hints_count}): ")).format( hint_num=counter + 1, hints_count=len(demand_hints) ) ), # Course-authored HTML demand hints are supported. hint_text=HTML(self.runtime.service(self, "replace_urls").replace_urls( get_inner_html_from_xpath(demand_hints[counter]) )) ) counter += 1 total_text = HTML('
      {hints}
    ').format(hints=total_text) # Log this demand-hint request. Note that this only logs the last hint requested (although now # all previously shown hints are still displayed). event_info = {} event_info['module_id'] = str(self.location) event_info['hint_index'] = hint_index event_info['hint_len'] = len(demand_hints) event_info['hint_text'] = get_inner_html_from_xpath(demand_hints[hint_index]) self.runtime.publish(self, 'edx.problem.hint.demandhint_displayed', event_info) _, should_enable_next_hint = self._should_enable_demand_hint(demand_hints=demand_hints, hint_index=hint_index) # We report the index of this hint, the client works out what index to use to get the next hint return { 'success': True, 'hint_index': hint_index, 'should_enable_next_hint': should_enable_next_hint, 'msg': total_text, } def get_problem_html(self, encapsulate=True, submit_notification=False): """ Return html for the problem. Adds submit, reset, save, and hint buttons as necessary based on the problem config and state. encapsulate (bool): if True (the default) embed the html in a problem
    submit_notification (bool): True if the submit notification should be added """ try: html = self.lcp.get_html() # If we cannot construct the problem HTML, # then generate an error message instead. except Exception as err: # pylint: disable=broad-except html = self.handle_problem_html_error(err) html = self.remove_tags_from_html(html) _ = self.runtime.service(self, "i18n").ugettext # Enable/Disable Submit button if should_enable_submit_button returns True/False. submit_button = self.submit_button_name() submit_button_submitting = self.submit_button_submitting_name() should_enable_submit_button = self.should_enable_submit_button() submit_disabled_ctas = None if not should_enable_submit_button: cta_service = self.runtime.service(self, "call_to_action") if cta_service: submit_disabled_ctas = cta_service.get_ctas(self, 'capa_submit_disabled') content = { 'name': self.display_name_with_default, 'html': smart_str(html), 'weight': self.weight, } # If demand hints are available, emit hint button and div. demand_hints = self.lcp.tree.xpath("//problem/demandhint/hint") demand_hint_possible, should_enable_next_hint = self._should_enable_demand_hint(demand_hints=demand_hints) answer_notification_type, answer_notification_message = self._get_answer_notification( render_notifications=submit_notification) save_message = None if self.has_saved_answers: save_message = _( "Your answers were previously saved. Click '{button_name}' to grade them." ).format(button_name=self.submit_button_name()) context = { 'problem': content, 'id': str(self.location), 'short_id': self.location.html_id(), 'submit_button': submit_button, 'submit_button_submitting': submit_button_submitting, 'should_enable_submit_button': should_enable_submit_button, 'reset_button': self.should_show_reset_button(), 'save_button': self.should_show_save_button(), 'answer_available': self.answer_available(), 'attempts_used': self.attempts, 'attempts_allowed': self.max_attempts, 'demand_hint_possible': demand_hint_possible, 'should_enable_next_hint': should_enable_next_hint, 'answer_notification_type': answer_notification_type, 'answer_notification_message': answer_notification_message, 'has_saved_answers': self.has_saved_answers, 'save_message': save_message, 'submit_disabled_cta': submit_disabled_ctas[0] if submit_disabled_ctas else None, } html = self.runtime.service(self, 'mako').render_template('problem.html', context) if encapsulate: html = HTML('
    {html}
    ').format( id=self.location.html_id(), ajax_url=self.ajax_url, html=HTML(html) ) # Now do all the substitutions which the LMS module_render normally does, but # we need to do here explicitly since we can get called for our HTML via AJAX html = self.runtime.service(self, "replace_urls").replace_urls(html) return html def _get_answer_notification(self, render_notifications): """ Generate the answer notification type and message from the current problem status. Arguments: render_notifications (bool): If false the method will return an None for type and message """ answer_notification_message = None answer_notification_type = None if render_notifications: progress = self.get_progress() id_list = list(self.lcp.correct_map.keys()) # Show only a generic message if hiding correctness if not self.correctness_available(): answer_notification_type = 'submitted' elif len(id_list) == 1: # Only one answer available answer_notification_type = self.lcp.correct_map.get_correctness(id_list[0]) elif len(id_list) > 1: # Check the multiple answers that are available answer_notification_type = self.lcp.correct_map.get_correctness(id_list[0]) for answer_id in id_list[1:]: if self.lcp.correct_map.get_correctness(answer_id) != answer_notification_type: # There is at least 1 of the following combinations of correctness states # Correct and incorrect, Correct and partially correct, or Incorrect and partially correct # which all should have a message type of Partially Correct answer_notification_type = 'partially-correct' break # Build the notification message based on the notification type and translate it. ungettext = self.runtime.service(self, "i18n").ungettext _ = self.runtime.service(self, "i18n").ugettext if answer_notification_type == 'incorrect': if progress is not None: answer_notification_message = ungettext( "Incorrect ({progress} point)", "Incorrect ({progress} points)", progress.frac()[1] ).format(progress=str(progress)) else: answer_notification_message = _('Incorrect') elif answer_notification_type == 'correct': if progress is not None: answer_notification_message = ungettext( "Correct ({progress} point)", "Correct ({progress} points)", progress.frac()[1] ).format(progress=str(progress)) else: answer_notification_message = _('Correct') elif answer_notification_type == 'partially-correct': if progress is not None: answer_notification_message = ungettext( "Partially correct ({progress} point)", "Partially correct ({progress} points)", progress.frac()[1] ).format(progress=str(progress)) else: answer_notification_message = _('Partially Correct') elif answer_notification_type == 'submitted': answer_notification_message = _("Answer submitted.") return answer_notification_type, answer_notification_message def remove_tags_from_html(self, html): """ The capa xml includes many tags such as or which are not meant to be part of the client html. We strip them all and return the resulting html. """ tags = ['demandhint', 'choicehint', 'optionhint', 'stringhint', 'numerichint', 'optionhint', 'correcthint', 'regexphint', 'additional_answer', 'stringequalhint', 'compoundhint', 'stringequalhint'] for tag in tags: html = re.sub(fr'<{tag}.*?>.*?', '', html, flags=re.DOTALL) # xss-lint: disable=python-interpolate-html # lint-amnesty, pylint: disable=line-too-long # Some of these tags span multiple lines # Note: could probably speed this up by calling sub() once with a big regex # vs. simply calling sub() many times as we have here. return html def hint_button(self, data): """ Hint button handler, returns new html using hint_index from the client. """ hint_index = int(data['hint_index']) return self.get_demand_hint(hint_index) def used_all_attempts(self): """ All attempts have been used """ return self.max_attempts is not None and self.attempts >= self.max_attempts def is_past_due(self): """ Is it now past this problem's due date, including grace period? """ return (self.close_date is not None and datetime.datetime.now(utc) > self.close_date) def closed(self): """ Is the student still allowed to submit answers? """ if self.used_all_attempts(): return True if self.is_past_due(): return True return False def is_submitted(self): """ Used to decide to show or hide RESET or CHECK buttons. Means that student submitted problem and nothing more. Problem can be completely wrong. Pressing RESET button makes this function to return False. """ # used by conditional module return self.lcp.done def is_attempted(self): """ Has the problem been attempted? used by conditional module """ return self.attempts > 0 def is_correct(self): """ True iff full points """ # self.score is initialized in self.lcp but in this method is accessed before self.lcp so just call it first. self.lcp # pylint: disable=pointless-statement return self.score.raw_earned == self.score.raw_possible def answer_available(self): """ Is the user allowed to see an answer? """ user_is_staff = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_IS_STAFF) if not self.correctness_available(): # If correctness is being withheld, then don't show answers either. return False elif self.showanswer == '': return False elif self.showanswer == SHOWANSWER.NEVER: return False elif user_is_staff: # This is after the 'never' check because admins can see the answer # unless the problem explicitly prevents it return True elif self.showanswer == SHOWANSWER.ATTEMPTED: return self.is_attempted() or self.is_past_due() elif self.showanswer == SHOWANSWER.ANSWERED: # NOTE: this is slightly different from 'attempted' -- resetting the problems # makes lcp.done False, but leaves attempts unchanged. return self.is_correct() elif self.showanswer == SHOWANSWER.CLOSED: return self.closed() elif self.showanswer == SHOWANSWER.FINISHED: return self.closed() or self.is_correct() elif self.showanswer == SHOWANSWER.CORRECT_OR_PAST_DUE: return self.is_correct() or self.is_past_due() elif self.showanswer == SHOWANSWER.PAST_DUE: return self.is_past_due() elif self.showanswer == SHOWANSWER.AFTER_SOME_NUMBER_OF_ATTEMPTS: required_attempts = self.attempts_before_showanswer_button if self.max_attempts and required_attempts >= self.max_attempts: required_attempts = self.max_attempts return self.attempts >= required_attempts elif self.showanswer == SHOWANSWER.ALWAYS: return True elif self.showanswer == SHOWANSWER.AFTER_ALL_ATTEMPTS: return self.used_all_attempts() elif self.showanswer == SHOWANSWER.AFTER_ALL_ATTEMPTS_OR_CORRECT: return self.used_all_attempts() or self.is_correct() elif self.showanswer == SHOWANSWER.ATTEMPTED_NO_PAST_DUE: return self.is_attempted() return False def correctness_available(self): """ Is the user allowed to see whether she's answered correctly? Limits access to the correct/incorrect flags, messages, and problem score. """ user_is_staff = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_IS_STAFF) return ShowCorrectness.correctness_available( show_correctness=self.show_correctness, due_date=self.close_date, has_staff_access=user_is_staff, ) def update_score(self, data): """ Delivers grading response (e.g. from asynchronous code checking) to the capa problem, so its score can be updated 'data' must have a key 'response' which is a string that contains the grader's response No ajax return is needed. Return empty dict. """ queuekey = data['queuekey'] score_msg = data['xqueue_body'] self.lcp.update_score(score_msg, queuekey) self.set_state_from_lcp() self.set_score(self.score_from_lcp(self.lcp)) self.publish_grade(grader_response=True) return {} # No AJAX return is needed def handle_ungraded_response(self, data): """ Delivers a response from the XQueue to the capa problem The score of the problem will not be updated Args: - data (dict) must contain keys: queuekey - a key specific to this response xqueue_body - the body of the response Returns: empty dictionary No ajax return is needed, so an empty dict is returned """ queuekey = data['queuekey'] score_msg = data['xqueue_body'] # pass along the xqueue message to the problem self.lcp.ungraded_response(score_msg, queuekey) self.set_state_from_lcp() return {} def handle_input_ajax(self, data): """ Handle ajax calls meant for a particular input in the problem Args: - data (dict) - data that should be passed to the input Returns: - dict containing the response from the input """ response = self.lcp.handle_input_ajax(data) # save any state changes that may occur self.set_state_from_lcp() return response def get_answer(self, _data): """ For the "show answer" button. Returns the answers and rendered "correct status span" HTML: {'answers' : answers, 'correct_status_html': correct_status_span_html}. The "correct status span" HTML is injected beside the correct answers for radio button and checkmark problems, so that there is a visual indication of the correct answers that is not solely based on color (and also screen reader text). """ event_info = {} event_info['problem_id'] = str(self.location) self.track_function_unmask('showanswer', event_info) if not self.answer_available(): # lint-amnesty, pylint: disable=no-else-raise raise NotFoundError('Answer is not available') else: answers = self.lcp.get_question_answers() self.set_state_from_lcp() # answers (eg ) may have embedded images # but be careful, some problems are using non-string answer dicts new_answers = {} for answer_id in answers: try: answer_content = self.runtime.service(self, "replace_urls").replace_urls(answers[answer_id]) new_answer = {answer_id: answer_content} except TypeError: log.debug('Unable to perform URL substitution on answers[%s]: %s', answer_id, answers[answer_id]) new_answer = {answer_id: answers[answer_id]} new_answers.update(new_answer) return { 'answers': new_answers, 'correct_status_html': self.runtime.service(self, 'mako').render_template( 'status_span.html', {'status': Status('correct', self.runtime.service(self, "i18n").ugettext)} ) } # Figure out if we should move these to capa_problem? def get_problem(self, _data): """ Return results of get_problem_html, as a simple dict for json-ing. { 'html': } Used if we want to reconfirm we have the right thing e.g. after several AJAX calls. """ return {'html': self.get_problem_html(encapsulate=False, submit_notification=True)} @staticmethod def make_dict_of_responses(data): """ Make dictionary of student responses (aka "answers") `data` is POST dictionary (webob.multidict.MultiDict). The `data` dict has keys of the form 'x_y', which are mapped to key 'y' in the returned dict. For example, 'input_1_2_3' would be mapped to '1_2_3' in the returned dict. Some inputs always expect a list in the returned dict (e.g. checkbox inputs). The convention is that keys in the `data` dict that end with '[]' will always have list values in the returned dict. For example, if the `data` dict contains {'input_1[]': 'test' } then the output dict would contain {'1': ['test'] } (the value is a list). Some other inputs such as ChoiceTextInput expect a dict of values in the returned dict If the key ends with '{}' then we will assume that the value is a json encoded dict and deserialize it. For example, if the `data` dict contains {'input_1{}': '{"1_2_1": 1}'} then the output dict would contain {'1': {"1_2_1": 1} } (the value is a dictionary) Raises an exception if: -A key in the `data` dictionary does not contain at least one underscore (e.g. "input" is invalid, but "input_1" is valid) -Two keys end up with the same name in the returned dict. (e.g. 'input_1' and 'input_1[]', which both get mapped to 'input_1' in the returned dict) """ answers = {} # webob.multidict.MultiDict is a view of a list of tuples, # so it will return a multi-value key once for each value. # We only want to consider each key a single time, so we use set(data.keys()) for key in set(data.keys()): # e.g. input_resistor_1 ==> resistor_1 _, _, name = key.partition('_') # If key has no underscores, then partition # will return (key, '', '') # We detect this and raise an error if not name: # lint-amnesty, pylint: disable=no-else-raise raise ValueError(f"{key} must contain at least one underscore") else: # This allows for answers which require more than one value for # the same form input (e.g. checkbox inputs). The convention is that # if the name ends with '[]' (which looks like an array), then the # answer will be an array. # if the name ends with '{}' (Which looks like a dict), # then the answer will be a dict is_list_key = name.endswith('[]') is_dict_key = name.endswith('{}') name = name[:-2] if is_list_key or is_dict_key else name if is_list_key: val = data.getall(key) elif is_dict_key: try: val = json.loads(data[key]) # If the submission wasn't deserializable, raise an error. except(KeyError, ValueError): raise ValueError( # lint-amnesty, pylint: disable=raise-missing-from f"Invalid submission: {data[key]} for {key}" ) else: val = data[key] # If the name already exists, then we don't want # to override it. Raise an error instead if name in answers: # lint-amnesty, pylint: disable=no-else-raise raise ValueError(f"Key {name} already exists in answers dict") else: answers[name] = val return answers def publish_grade(self, score=None, only_if_higher=None, **kwargs): """ Publishes the student's current grade to the system as an event """ if not score: score = self.score event = { 'value': score.raw_earned, 'max_value': score.raw_possible, 'only_if_higher': only_if_higher, } if kwargs.get('grader_response'): event['grader_response'] = kwargs['grader_response'] self.runtime.publish(self, 'grade', event) return {'grade': self.score.raw_earned, 'max_grade': self.score.raw_possible} # pylint: disable=too-many-statements def submit_problem(self, data, override_time=False): """ Checks whether answers to a problem are correct Returns a map of correct/incorrect answers: {'success' : 'correct' | 'incorrect' | AJAX alert msg string, 'contents' : html} """ event_info = {} event_info['state'] = self.lcp.get_state() event_info['problem_id'] = str(self.location) self.lcp.has_saved_answers = False answers = self.make_dict_of_responses(data) answers_without_files = convert_files_to_filenames(answers) event_info['answers'] = answers_without_files metric_name = 'capa.check_problem.{}'.format # lint-amnesty, pylint: disable=unused-variable # Can override current time current_time = datetime.datetime.now(utc) if override_time is not False: current_time = override_time _ = self.runtime.service(self, "i18n").ugettext # Too late. Cannot submit if self.closed(): log.error( 'ProblemClosedError: Problem %s, close date: %s, due:%s, is_past_due: %s, attempts: %s/%s,', str(self.location), self.close_date, self.due, self.is_past_due(), self.attempts, self.max_attempts, ) event_info['failure'] = 'closed' self.track_function_unmask('problem_check_fail', event_info) raise NotFoundError(_("Problem is closed.")) # Problem submitted. Student should reset before checking again if self.done and self.rerandomize == RANDOMIZATION.ALWAYS: event_info['failure'] = 'unreset' self.track_function_unmask('problem_check_fail', event_info) raise NotFoundError(_("Problem must be reset before it can be submitted again.")) # Problem queued. Students must wait a specified waittime before they are allowed to submit # IDEA: consider stealing code from below: pretty-print of seconds, cueing of time remaining if self.lcp.is_queued(): prev_submit_time = self.lcp.get_recentmost_queuetime() xqueue_service = self.runtime.service(self, 'xqueue') waittime_between_requests = xqueue_service.waittime if xqueue_service else 0 if (current_time - prev_submit_time).total_seconds() < waittime_between_requests: msg = _("You must wait at least {wait} seconds between submissions.").format( wait=waittime_between_requests) return {'success': msg, 'html': ''} # Wait time between resets: check if is too soon for submission. if self.last_submission_time is not None and self.submission_wait_seconds not in [0, None]: seconds_since_submission = (current_time - self.last_submission_time).total_seconds() if seconds_since_submission < self.submission_wait_seconds: remaining_secs = int(self.submission_wait_seconds - seconds_since_submission) msg = _('You must wait at least {wait_secs} between submissions. {remaining_secs} remaining.').format( wait_secs=self.pretty_print_seconds(self.submission_wait_seconds), remaining_secs=self.pretty_print_seconds(remaining_secs)) return { 'success': msg, 'html': '' } try: # expose the attempt number to a potential python custom grader # self.lcp.context['attempt'] refers to the attempt number (1-based) self.lcp.context['attempt'] = self.attempts + 1 correct_map = self.lcp.grade_answers(answers) # self.attempts refers to the number of attempts that did not # raise an error (0-based) self.attempts = self.attempts + 1 self.lcp.done = True self.set_state_from_lcp() self.set_score(self.score_from_lcp(self.lcp)) self.set_last_submission_time() except (StudentInputError, ResponseError, LoncapaProblemError) as inst: if self.debug: log.warning( "StudentInputError in capa_module:problem_check", exc_info=True ) # Save the user's state before failing self.set_state_from_lcp() self.set_score(self.score_from_lcp(self.lcp)) # If the user is a staff member, include # the full exception, including traceback, # in the response user_is_staff = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_IS_STAFF) if user_is_staff: msg = f"Staff debug info: {traceback.format_exc()}" # Otherwise, display just an error message, # without a stack trace else: full_error = inst.args[0] try: # only return the error value of the exception msg = full_error.split("\\n")[-2].split(": ", 1)[1] except IndexError: msg = full_error return {'success': msg} except Exception as err: # Save the user's state before failing self.set_state_from_lcp() self.set_score(self.score_from_lcp(self.lcp)) if self.debug: msg = f"Error checking problem: {str(err)}" msg += f'\nTraceback:\n{traceback.format_exc()}' return {'success': msg} raise published_grade = self.publish_grade() # success = correct if ALL questions in this problem are correct success = 'correct' for answer_id in correct_map: if not correct_map.is_correct(answer_id): success = 'incorrect' # NOTE: We are logging both full grading and queued-grading submissions. In the latter, # 'success' will always be incorrect event_info['grade'] = published_grade['grade'] event_info['max_grade'] = published_grade['max_grade'] event_info['correct_map'] = correct_map.get_dict() event_info['success'] = success event_info['attempts'] = self.attempts event_info['submission'] = self.get_submission_metadata_safe(answers_without_files, correct_map) self.track_function_unmask('problem_check', event_info) # render problem into HTML html = self.get_problem_html(encapsulate=False, submit_notification=True) # Withhold success indicator if hiding correctness if not self.correctness_available(): success = 'submitted' return { 'success': success, 'contents': html } # pylint: enable=too-many-statements def track_function_unmask(self, title, event_info): """ All calls to runtime.track_function route through here so that the choice names can be unmasked. """ # Do the unmask translates on a copy of event_info, # avoiding problems where an event_info is unmasked twice. event_unmasked = copy.deepcopy(event_info) self.unmask_event(event_unmasked) self.runtime.publish(self, title, event_unmasked) def unmask_event(self, event_info): """ Translates in-place the event_info to account for masking and adds information about permutation options in force. """ # answers is like: {u'i4x-Stanford-CS99-problem-dada976e76f34c24bc8415039dee1300_2_1': u'mask_0'} # Each response values has an answer_id which matches the key in answers. for response in self.lcp.responders.values(): # Un-mask choice names in event_info for masked responses. if response.has_mask(): # We don't assume much about the structure of event_info, # but check for the existence of the things we need to un-mask. # Look for answers/id answer = event_info.get('answers', {}).get(response.answer_id) if answer is not None: event_info['answers'][response.answer_id] = response.unmask_name(answer) # Look for state/student_answers/id answer = event_info.get('state', {}).get('student_answers', {}).get(response.answer_id) if answer is not None: event_info['state']['student_answers'][response.answer_id] = response.unmask_name(answer) # Look for old_state/student_answers/id -- parallel to the above case, happens on reset answer = event_info.get('old_state', {}).get('student_answers', {}).get(response.answer_id) if answer is not None: event_info['old_state']['student_answers'][response.answer_id] = response.unmask_name(answer) # Add 'permutation' to event_info for permuted responses. permutation_option = None if response.has_shuffle(): permutation_option = 'shuffle' elif response.has_answerpool(): permutation_option = 'answerpool' if permutation_option is not None: # Add permutation record tuple: (one of:'shuffle'/'answerpool', [as-displayed list]) if 'permutation' not in event_info: event_info['permutation'] = {} event_info['permutation'][response.answer_id] = (permutation_option, response.unmask_order()) def pretty_print_seconds(self, num_seconds): """ Returns time duration nicely formated, e.g. "3 minutes 4 seconds" """ # Here _ is the N variant ungettext that does pluralization with a 3-arg call ungettext = self.runtime.service(self, "i18n").ungettext hours = num_seconds // 3600 sub_hour = num_seconds % 3600 minutes = sub_hour // 60 seconds = sub_hour % 60 display = "" if hours > 0: display += ungettext("{num_hour} hour", "{num_hour} hours", hours).format(num_hour=hours) if minutes > 0: if display != "": display += " " # translators: "minute" refers to a minute of time display += ungettext("{num_minute} minute", "{num_minute} minutes", minutes).format(num_minute=minutes) # Taking care to make "0 seconds" instead of "" for 0 time if seconds > 0 or (hours == 0 and minutes == 0): if display != "": display += " " # translators: "second" refers to a second of time display += ungettext("{num_second} second", "{num_second} seconds", seconds).format(num_second=seconds) return display def get_submission_metadata_safe(self, answers, correct_map): """ Ensures that no exceptions are thrown while generating input metadata summaries. Returns the summary if it is successfully created, otherwise an empty dictionary. """ try: return self.get_submission_metadata(answers, correct_map) except Exception: # pylint: disable=broad-except # NOTE: The above process requires deep inspection of capa structures that may break for some # uncommon problem types. Ensure that it does not prevent answer submission in those # cases. Any occurrences of errors in this block should be investigated and resolved. log.exception('Unable to gather submission metadata, it will not be included in the event.') return {} def get_submission_metadata(self, answers, correct_map): """ Return a map of inputs to their corresponding summarized metadata. Returns: A map whose keys are a unique identifier for the input (in this case a capa input_id) and whose values are: question (str): Is the prompt that was presented to the student. It corresponds to the label of the input. answer (mixed): Is the answer the student provided. This may be a rich structure, however it must be json serializable. response_type (str): The XML tag of the capa response type. input_type (str): The XML tag of the capa input type. correct (bool): Whether or not the provided answer is correct. Will be an empty string if correctness could not be determined. variant (str): In some cases the same question can have several different variants. This string should uniquely identify the variant of the question that was answered. In the capa context this corresponds to the `seed`. This function attempts to be very conservative and make very few assumptions about the structure of the problem. If problem related metadata cannot be located it should be replaced with empty strings ''. """ input_metadata = {} for input_id, internal_answer in answers.items(): answer_input = self.lcp.inputs.get(input_id) if answer_input is None: log.warning('Input id %s is not mapped to an input type.', input_id) answer_response = None for responder in self.lcp.responders.values(): if input_id in responder.answer_ids: answer_response = responder if answer_response is None: log.warning('Answer responder could not be found for input_id %s.', input_id) user_visible_answer = internal_answer if hasattr(answer_input, 'get_user_visible_answer'): user_visible_answer = answer_input.get_user_visible_answer(internal_answer) # If this problem has rerandomize enabled, then it will generate N variants of the # question, one per unique seed value. In this case we would like to know which # variant was selected. Ideally it would be nice to have the exact question that # was presented to the user, with values interpolated etc, but that can be done # later if necessary. variant = '' if self.rerandomize != RANDOMIZATION.NEVER: variant = self.get_seed() is_correct = correct_map.is_correct(input_id) if is_correct is None: is_correct = '' response_data = getattr(answer_input, 'response_data', {}) input_metadata[input_id] = { 'question': response_data.get('label', ''), 'answer': user_visible_answer, 'response_type': getattr(getattr(answer_response, 'xml', None), 'tag', ''), 'input_type': getattr(answer_input, 'tag', ''), 'correct': is_correct, 'variant': variant, 'group_label': response_data.get('group_label', ''), } return input_metadata def save_problem(self, data): """ Save the passed in answers. Returns a dict { 'success' : bool, 'msg' : message } The message is informative on success, and an error message on failure. """ event_info = {} event_info['state'] = self.lcp.get_state() event_info['problem_id'] = str(self.location) answers = self.make_dict_of_responses(data) event_info['answers'] = answers _ = self.runtime.service(self, "i18n").ugettext # Too late. Cannot submit if self.closed() and not self.max_attempts == 0: event_info['failure'] = 'closed' self.track_function_unmask('save_problem_fail', event_info) return { 'success': False, # pylint: disable=line-too-long # Translators: 'closed' means the problem's due date has passed. You may no longer attempt to solve the problem. 'msg': _("Problem is closed."), # pylint: enable=line-too-long } # Problem submitted. Student should reset before saving # again. if self.done and self.rerandomize == RANDOMIZATION.ALWAYS: event_info['failure'] = 'done' self.track_function_unmask('save_problem_fail', event_info) return { 'success': False, 'msg': _("Problem needs to be reset prior to save.") } self.lcp.student_answers = answers self.lcp.has_saved_answers = True self.set_state_from_lcp() self.set_score(self.score_from_lcp(self.lcp)) self.track_function_unmask('save_problem_success', event_info) msg = _("Your answers have been saved.") if not self.max_attempts == 0: msg = _( "Your answers have been saved but not graded. Click '{button_name}' to grade them." ).format(button_name=self.submit_button_name()) return { 'success': True, 'msg': msg, 'html': self.get_problem_html(encapsulate=False) } def reset_problem(self, _data): """ Changes problem state to unfinished -- removes student answers, Causes problem to rerender itself if randomization is enabled. Returns a dictionary of the form: {'success': True/False, 'html': Problem HTML string } If an error occurs, the dictionary will also have an `error` key containing an error message. """ event_info = {} event_info['old_state'] = self.lcp.get_state() event_info['problem_id'] = str(self.location) _ = self.runtime.service(self, "i18n").ugettext if self.closed(): event_info['failure'] = 'closed' self.track_function_unmask('reset_problem_fail', event_info) return { 'success': False, # pylint: disable=line-too-long # Translators: 'closed' means the problem's due date has passed. You may no longer attempt to solve the problem. 'msg': _("You cannot select Reset for a problem that is closed."), # pylint: enable=line-too-long } if not self.is_submitted(): event_info['failure'] = 'not_done' self.track_function_unmask('reset_problem_fail', event_info) return { 'success': False, 'msg': _("You must submit an answer before you can select Reset."), } if self.is_submitted() and self.rerandomize in [RANDOMIZATION.ALWAYS, RANDOMIZATION.ONRESET]: # Reset random number generator seed. self.choose_new_seed() # Generate a new problem with either the previous seed or a new seed self.lcp = self.new_lcp(None) # Pull in the new problem seed self.set_state_from_lcp() self.set_score(self.score_from_lcp(self.lcp)) # Grade may have changed, so publish new value self.publish_grade() event_info['new_state'] = self.lcp.get_state() self.track_function_unmask('reset_problem', event_info) return { 'success': True, 'html': self.get_problem_html(encapsulate=False), } # ScorableXBlockMixin methods def rescore(self, only_if_higher=False): """ Checks whether the existing answers to a problem are correct. This is called when the correct answer to a problem has been changed, and the grade should be re-evaluated. If only_if_higher is True, the answer and grade are updated only if the resulting score is higher than before. Returns a dict with one key: {'success' : 'correct' | 'incorrect' | AJAX alert msg string } Raises NotFoundError if called on a problem that has not yet been answered, or NotImplementedError if it's a problem that cannot be rescored. Returns the error messages for exceptions occurring while performing the rescoring, rather than throwing them. """ event_info = {'state': self.lcp.get_state(), 'problem_id': str(self.location)} _ = self.runtime.service(self, "i18n").ugettext if not self.lcp.supports_rescoring(): event_info['failure'] = 'unsupported' self.track_function_unmask('problem_rescore_fail', event_info) # pylint: disable=line-too-long # Translators: 'rescoring' refers to the act of re-submitting a student's solution so it can get a new score. raise NotImplementedError(_("Problem's definition does not support rescoring.")) # pylint: enable=line-too-long if not self.done: event_info['failure'] = 'unanswered' self.track_function_unmask('problem_rescore_fail', event_info) raise NotFoundError(_("Problem must be answered before it can be graded again.")) # get old score, for comparison: orig_score = self.get_score() event_info['orig_score'] = orig_score.raw_earned event_info['orig_total'] = orig_score.raw_possible try: self.update_correctness() calculated_score = self.calculate_score() except (StudentInputError, ResponseError, LoncapaProblemError) as inst: # lint-amnesty, pylint: disable=unused-variable log.warning("Input error in capa_module:problem_rescore", exc_info=True) event_info['failure'] = 'input_error' self.track_function_unmask('problem_rescore_fail', event_info) raise except Exception: event_info['failure'] = 'unexpected' self.track_function_unmask('problem_rescore_fail', event_info) raise # rescoring should have no effect on attempts, so don't # need to increment here, or mark done. Just save. self.set_state_from_lcp() self.publish_grade(score=calculated_score, only_if_higher=only_if_higher) event_info['new_score'] = calculated_score.raw_earned event_info['new_total'] = calculated_score.raw_possible # success = correct if ALL questions in this problem are correct success = 'correct' for answer_id in self.lcp.correct_map: if not self.lcp.correct_map.is_correct(answer_id): success = 'incorrect' # NOTE: We are logging both full grading and queued-grading submissions. In the latter, # 'success' will always be incorrect event_info['correct_map'] = self.lcp.correct_map.get_dict() event_info['success'] = success event_info['attempts'] = self.attempts self.track_function_unmask('problem_rescore', event_info) def has_submitted_answer(self): return self.done def set_score(self, score): """ Sets the internal score for the problem. This is not derived directly from the internal LCP in keeping with the ScorableXBlock spec. """ self.score = score def get_score(self): """ Returns the score currently set on the block. """ return self.score def update_correctness(self): """ Updates correct map of the LCP. Operates by creating a new correctness map based on the current state of the LCP, and updating the old correctness map of the LCP. """ # Make sure that the attempt number is always at least 1 for grading purposes, # even if the number of attempts have been reset and this problem is regraded. self.lcp.context['attempt'] = max(self.attempts, 1) new_correct_map = self.lcp.get_grade_from_current_answers(None) self.lcp.correct_map.update(new_correct_map) def calculate_score(self): """ Returns the score calculated from the current problem state. """ new_score = self.lcp.calculate_score() return Score(raw_earned=new_score['score'], raw_possible=new_score['total']) def score_from_lcp(self, lcp): """ Returns the score associated with the correctness map currently stored by the LCP. """ lcp_score = lcp.calculate_score() return Score(raw_earned=lcp_score['score'], raw_possible=lcp_score['total']) class ComplexEncoder(json.JSONEncoder): """ Extend the JSON encoder to correctly handle complex numbers """ def default(self, obj): # lint-amnesty, pylint: disable=arguments-differ, method-hidden """ Print a nicely formatted complex number, or default to the JSON encoder """ if isinstance(obj, complex): return f"{obj.real:.7g}{obj.imag:+.7g}*j" return json.JSONEncoder.default(self, obj) def randomization_bin(seed, problem_id): """ Pick a randomization bin for the problem given the user's seed and a problem id. We do this because we only want e.g. 20 randomizations of a problem to make analytics interesting. To avoid having sets of students that always get the same problems, we'll combine the system's per-student seed with the problem id in picking the bin. """ r_hash = hashlib.sha1() r_hash.update(str(seed).encode()) r_hash.update(str(problem_id).encode()) # get the first few digits of the hash, convert to an int, then mod. return int(r_hash.hexdigest()[:7], 16) % NUM_RANDOMIZATION_BINS