The XQueueService is used only by the ProblemBlock. Therefore, we are moving it out of the runtime, and into the ProblemBlock, where it's initialized only when it's going to be used.
2265 lines
92 KiB
Python
2265 lines
92 KiB
Python
"""
|
|
Implements the Problem XBlock, which is built on top of the CAPA subsystem.
|
|
"""
|
|
|
|
import copy
|
|
import datetime
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import struct
|
|
import sys
|
|
import traceback
|
|
|
|
from bleach.sanitizer import Cleaner
|
|
from django.conf import settings
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.utils.encoding import smart_str
|
|
from django.utils.functional import cached_property
|
|
from lxml import etree
|
|
from pkg_resources import resource_filename
|
|
from pytz import utc
|
|
from web_fragments.fragment import Fragment
|
|
from xblock.core import XBlock
|
|
from xblock.fields import Boolean, Dict, Float, Integer, Scope, String, XMLString
|
|
from xblock.scorable import ScorableXBlockMixin, Score
|
|
|
|
from xmodule.capa import responsetypes
|
|
from xmodule.capa.capa_problem import LoncapaProblem, LoncapaSystem
|
|
from xmodule.capa.inputtypes import Status
|
|
from xmodule.capa.responsetypes import LoncapaProblemError, ResponseError, StudentInputError
|
|
from xmodule.capa.util import convert_files_to_filenames, get_inner_html_from_xpath
|
|
from xmodule.contentstore.django import contentstore
|
|
from xmodule.editing_block import EditingMixin
|
|
from xmodule.exceptions import NotFoundError, ProcessingError
|
|
from xmodule.graders import ShowCorrectness
|
|
from xmodule.raw_block import RawMixin
|
|
from xmodule.util.sandboxing import SandboxService
|
|
from xmodule.util.xmodule_django import add_webpack_to_fragment
|
|
from xmodule.x_module import (
|
|
HTMLSnippet,
|
|
ResourceTemplates,
|
|
XModuleMixin,
|
|
XModuleToXBlockMixin,
|
|
shim_xmodule_js
|
|
)
|
|
from xmodule.xml_block import XmlMixin
|
|
from common.djangoapps.xblock_django.constants import (
|
|
ATTR_KEY_DEPRECATED_ANONYMOUS_USER_ID,
|
|
ATTR_KEY_USER_IS_STAFF,
|
|
ATTR_KEY_USER_ID,
|
|
)
|
|
from openedx.core.djangolib.markup import HTML, Text
|
|
from .capa.xqueue_interface import XQueueService
|
|
|
|
from .fields import Date, ScoreField, Timedelta
|
|
from .progress import Progress
|
|
|
|
log = logging.getLogger("edx.courseware")
|
|
|
|
|
|
# Make '_' a no-op so we can scrape strings. Using lambda instead of
|
|
# `django.utils.translation.ugettext_noop` because Django cannot be imported in this file
|
|
_ = lambda text: text
|
|
|
|
# Generate this many different variants of problems with rerandomize=per_student
|
|
NUM_RANDOMIZATION_BINS = 20
|
|
# Never produce more than this many different seeds, no matter what.
|
|
MAX_RANDOMIZATION_BINS = 1000
|
|
|
|
|
|
try:
|
|
FEATURES = getattr(settings, 'FEATURES', {})
|
|
except ImproperlyConfigured:
|
|
FEATURES = {}
|
|
|
|
|
|
class SHOWANSWER:
|
|
"""
|
|
Constants for when to show answer
|
|
"""
|
|
ALWAYS = "always"
|
|
ANSWERED = "answered"
|
|
ATTEMPTED = "attempted"
|
|
CLOSED = "closed"
|
|
FINISHED = "finished"
|
|
CORRECT_OR_PAST_DUE = "correct_or_past_due"
|
|
PAST_DUE = "past_due"
|
|
NEVER = "never"
|
|
AFTER_SOME_NUMBER_OF_ATTEMPTS = "after_attempts"
|
|
AFTER_ALL_ATTEMPTS = "after_all_attempts"
|
|
AFTER_ALL_ATTEMPTS_OR_CORRECT = "after_all_attempts_or_correct"
|
|
ATTEMPTED_NO_PAST_DUE = "attempted_no_past_due"
|
|
|
|
|
|
class RANDOMIZATION:
|
|
"""
|
|
Constants for problem randomization
|
|
"""
|
|
ALWAYS = "always"
|
|
ONRESET = "onreset"
|
|
NEVER = "never"
|
|
PER_STUDENT = "per_student"
|
|
|
|
|
|
class Randomization(String):
|
|
"""
|
|
Define a field to store how to randomize a problem.
|
|
"""
|
|
def from_json(self, value):
|
|
if value in ("", "true"):
|
|
return RANDOMIZATION.ALWAYS
|
|
elif value == "false":
|
|
return RANDOMIZATION.PER_STUDENT
|
|
return value
|
|
|
|
to_json = from_json
|
|
|
|
|
|
@XBlock.needs('user')
|
|
@XBlock.needs('i18n')
|
|
@XBlock.needs('mako')
|
|
@XBlock.needs('cache')
|
|
@XBlock.needs('sandbox')
|
|
@XBlock.needs('replace_urls')
|
|
@XBlock.wants('call_to_action')
|
|
class ProblemBlock(
|
|
ScorableXBlockMixin,
|
|
RawMixin,
|
|
XmlMixin,
|
|
EditingMixin,
|
|
XModuleToXBlockMixin,
|
|
HTMLSnippet,
|
|
ResourceTemplates,
|
|
XModuleMixin,
|
|
):
|
|
"""
|
|
An XBlock representing a "problem".
|
|
|
|
A problem contains zero or more respondable items, such as multiple choice,
|
|
numeric response, true/false, etc. See xmodule/capa/responsetypes.py
|
|
for the full ensemble.
|
|
|
|
The rendering logic of a problem is largely encapsulated within
|
|
LoncapaProblem, LoncapaSystem and related classes. This block serves to
|
|
host the Loncapa system within the XBlock runtime and connect it to the
|
|
greater LMS/CMS.
|
|
|
|
As historical context: the acronym LON-CAPA references the "Learning
|
|
Online - Computer-Assisted Personalized Approach" LMS, from which this
|
|
system is inspired.
|
|
"""
|
|
INDEX_CONTENT_TYPE = 'CAPA'
|
|
|
|
resources_dir = None
|
|
|
|
has_score = True
|
|
show_in_read_only_mode = True
|
|
template_dir_name = 'problem'
|
|
mako_template = "widgets/problem-edit.html"
|
|
has_author_view = True
|
|
|
|
icon_class = 'problem'
|
|
|
|
uses_xmodule_styles_setup = True
|
|
|
|
preview_view_js = {
|
|
'js': [
|
|
resource_filename(__name__, 'js/src/javascript_loader.js'),
|
|
resource_filename(__name__, 'js/src/capa/display.js'),
|
|
resource_filename(__name__, 'js/src/collapsible.js'),
|
|
resource_filename(__name__, 'js/src/capa/imageinput.js'),
|
|
resource_filename(__name__, 'js/src/capa/schematic.js'),
|
|
],
|
|
'xmodule_js': resource_filename(__name__, 'js/src/xmodule.js')
|
|
}
|
|
|
|
studio_view_js = {
|
|
'js': [
|
|
resource_filename(__name__, 'js/src/problem/edit.js'),
|
|
],
|
|
'xmodule_js': resource_filename(__name__, 'js/src/xmodule.js'),
|
|
}
|
|
|
|
display_name = String(
|
|
display_name=_("Display Name"),
|
|
help=_("The display name for this component."),
|
|
scope=Scope.settings,
|
|
# it'd be nice to have a useful default but it screws up other things; so,
|
|
# use display_name_with_default for those
|
|
default=_("Blank Advanced Problem")
|
|
)
|
|
attempts = Integer(
|
|
help=_("Number of attempts taken by the student on this problem"),
|
|
default=0,
|
|
scope=Scope.user_state
|
|
)
|
|
max_attempts = Integer(
|
|
display_name=_("Maximum Attempts"),
|
|
help=_("Defines the number of times a student can try to answer this problem. "
|
|
"If the value is not set, infinite attempts are allowed."),
|
|
values={"min": 0}, scope=Scope.settings
|
|
)
|
|
due = Date(help=_("Date that this problem is due by"), scope=Scope.settings)
|
|
graceperiod = Timedelta(
|
|
help=_("Amount of time after the due date that submissions will be accepted"),
|
|
scope=Scope.settings
|
|
)
|
|
show_correctness = String(
|
|
display_name=_("Show Results"),
|
|
help=_("Defines when to show whether a learner's answer to the problem is correct. "
|
|
"Configured on the subsection."),
|
|
scope=Scope.settings,
|
|
default=ShowCorrectness.ALWAYS,
|
|
values=[
|
|
{"display_name": _("Always"), "value": ShowCorrectness.ALWAYS},
|
|
{"display_name": _("Never"), "value": ShowCorrectness.NEVER},
|
|
{"display_name": _("Past Due"), "value": ShowCorrectness.PAST_DUE},
|
|
],
|
|
)
|
|
showanswer = String(
|
|
display_name=_("Show Answer"),
|
|
help=_("Defines when to show the answer to the problem. "
|
|
"A default value can be set in Advanced Settings."),
|
|
scope=Scope.settings,
|
|
default=SHOWANSWER.FINISHED,
|
|
values=[
|
|
{"display_name": _("Always"), "value": SHOWANSWER.ALWAYS},
|
|
{"display_name": _("Answered"), "value": SHOWANSWER.ANSWERED},
|
|
{"display_name": _("Attempted or Past Due"), "value": SHOWANSWER.ATTEMPTED},
|
|
{"display_name": _("Closed"), "value": SHOWANSWER.CLOSED},
|
|
{"display_name": _("Finished"), "value": SHOWANSWER.FINISHED},
|
|
{"display_name": _("Correct or Past Due"), "value": SHOWANSWER.CORRECT_OR_PAST_DUE},
|
|
{"display_name": _("Past Due"), "value": SHOWANSWER.PAST_DUE},
|
|
{"display_name": _("Never"), "value": SHOWANSWER.NEVER},
|
|
{"display_name": _("After Some Number of Attempts"), "value": SHOWANSWER.AFTER_SOME_NUMBER_OF_ATTEMPTS},
|
|
{"display_name": _("After All Attempts"), "value": SHOWANSWER.AFTER_ALL_ATTEMPTS},
|
|
{"display_name": _("After All Attempts or Correct"), "value": SHOWANSWER.AFTER_ALL_ATTEMPTS_OR_CORRECT},
|
|
{"display_name": _("Attempted"), "value": SHOWANSWER.ATTEMPTED_NO_PAST_DUE},
|
|
]
|
|
)
|
|
attempts_before_showanswer_button = Integer(
|
|
display_name=_("Show Answer: Number of Attempts"),
|
|
help=_(
|
|
"Number of times the student must attempt to answer the question before the Show Answer button appears."
|
|
),
|
|
values={"min": 0},
|
|
default=0,
|
|
scope=Scope.settings,
|
|
)
|
|
force_save_button = Boolean(
|
|
help=_("Whether to force the save button to appear on the page"),
|
|
scope=Scope.settings,
|
|
default=False
|
|
)
|
|
show_reset_button = Boolean(
|
|
display_name=_("Show Reset Button"),
|
|
help=_("Determines whether a 'Reset' button is shown so the user may reset their answer. "
|
|
"A default value can be set in Advanced Settings."),
|
|
scope=Scope.settings,
|
|
default=False
|
|
)
|
|
rerandomize = Randomization(
|
|
display_name=_("Randomization"),
|
|
help=_(
|
|
'Defines when to randomize the variables specified in the associated Python script. '
|
|
'For problems that do not randomize values, specify \"Never\". '
|
|
),
|
|
default=RANDOMIZATION.NEVER,
|
|
scope=Scope.settings,
|
|
values=[
|
|
{"display_name": _("Always"), "value": RANDOMIZATION.ALWAYS},
|
|
{"display_name": _("On Reset"), "value": RANDOMIZATION.ONRESET},
|
|
{"display_name": _("Never"), "value": RANDOMIZATION.NEVER},
|
|
{"display_name": _("Per Student"), "value": RANDOMIZATION.PER_STUDENT}
|
|
]
|
|
)
|
|
data = XMLString(
|
|
help=_("XML data for the problem"),
|
|
scope=Scope.content,
|
|
enforce_type=FEATURES.get('ENABLE_XBLOCK_XML_VALIDATION', True),
|
|
default="<problem></problem>"
|
|
)
|
|
correct_map = Dict(help=_("Dictionary with the correctness of current student answers"),
|
|
scope=Scope.user_state, default={})
|
|
input_state = Dict(help=_("Dictionary for maintaining the state of inputtypes"), scope=Scope.user_state)
|
|
student_answers = Dict(help=_("Dictionary with the current student responses"), scope=Scope.user_state)
|
|
|
|
# enforce_type is set to False here because this field is saved as a dict in the database.
|
|
score = ScoreField(help=_("Dictionary with the current student score"), scope=Scope.user_state, enforce_type=False)
|
|
has_saved_answers = Boolean(help=_("Whether or not the answers have been saved since last submit"),
|
|
scope=Scope.user_state, default=False)
|
|
done = Boolean(help=_("Whether the student has answered the problem"), scope=Scope.user_state, default=False)
|
|
seed = Integer(help=_("Random seed for this student"), scope=Scope.user_state)
|
|
last_submission_time = Date(help=_("Last submission time"), scope=Scope.user_state)
|
|
submission_wait_seconds = Integer(
|
|
display_name=_("Timer Between Attempts"),
|
|
help=_("Seconds a student must wait between submissions for a problem with multiple attempts."),
|
|
scope=Scope.settings,
|
|
default=0)
|
|
weight = Float(
|
|
display_name=_("Problem Weight"),
|
|
help=_("Defines the number of points each problem is worth. "
|
|
"If the value is not set, each response field in the problem is worth one point."),
|
|
values={"min": 0, "step": .1},
|
|
scope=Scope.settings
|
|
)
|
|
markdown = String(help=_("Markdown source of this module"), default=None, scope=Scope.settings)
|
|
source_code = String(
|
|
help=_("Source code for LaTeX and Word problems. This feature is not well-supported."),
|
|
scope=Scope.settings
|
|
)
|
|
use_latex_compiler = Boolean(
|
|
help=_("Enable LaTeX templates?"),
|
|
default=False,
|
|
scope=Scope.settings
|
|
)
|
|
matlab_api_key = String(
|
|
display_name=_("Matlab API key"),
|
|
help=_("Enter the API key provided by MathWorks for accessing the MATLAB Hosted Service. "
|
|
"This key is granted for exclusive use by this course for the specified duration. "
|
|
"Please do not share the API key with other courses and notify MathWorks immediately "
|
|
"if you believe the key is exposed or compromised. To obtain a key for your course, "
|
|
"or to report an issue, please contact moocsupport@mathworks.com"),
|
|
scope=Scope.settings
|
|
)
|
|
|
|
def bind_for_student(self, *args, **kwargs): # lint-amnesty, pylint: disable=signature-differs
|
|
super().bind_for_student(*args, **kwargs)
|
|
|
|
# Capa was an XModule. When bind_for_student() was called on it with a new runtime, a new CapaModule object
|
|
# was initialized when XModuleDescriptor._xmodule() was called next. self.lcp was constructed in CapaModule
|
|
# init(). To keep the same behaviour, we delete self.lcp in bind_for_student().
|
|
if 'lcp' in self.__dict__:
|
|
del self.__dict__['lcp']
|
|
|
|
def student_view(self, _context, show_detailed_errors=False):
|
|
"""
|
|
Return the student view.
|
|
"""
|
|
# self.score is initialized in self.lcp but in this method is accessed before self.lcp so just call it first.
|
|
try:
|
|
self.lcp
|
|
except Exception as err: # lint-amnesty, pylint: disable=broad-except
|
|
html = self.handle_fatal_lcp_error(err if show_detailed_errors else None)
|
|
else:
|
|
html = self.get_html()
|
|
fragment = Fragment(html)
|
|
add_webpack_to_fragment(fragment, 'ProblemBlockPreview')
|
|
shim_xmodule_js(fragment, 'Problem')
|
|
return fragment
|
|
|
|
def public_view(self, context):
|
|
"""
|
|
Return the view seen by users who aren't logged in or who aren't
|
|
enrolled in the course.
|
|
"""
|
|
if getattr(self.runtime, 'suppports_state_for_anonymous_users', False):
|
|
# The new XBlock runtime can generally support capa problems for users who aren't logged in, so show the
|
|
# normal student_view. To prevent anonymous users from viewing specific problems, adjust course policies
|
|
# and/or content groups.
|
|
return self.student_view(context)
|
|
else:
|
|
# Show a message that this content requires users to login/enroll.
|
|
return super().public_view(context)
|
|
|
|
def author_view(self, context):
|
|
"""
|
|
Renders the Studio preview view.
|
|
"""
|
|
return self.student_view(context, show_detailed_errors=True)
|
|
|
|
def studio_view(self, _context):
|
|
"""
|
|
Return the studio view.
|
|
"""
|
|
fragment = Fragment(
|
|
self.runtime.service(self, 'mako').render_template(self.mako_template, self.get_context())
|
|
)
|
|
add_webpack_to_fragment(fragment, 'ProblemBlockStudio')
|
|
shim_xmodule_js(fragment, 'MarkdownEditingDescriptor')
|
|
return fragment
|
|
|
|
def handle_ajax(self, dispatch, data):
|
|
"""
|
|
This is called by courseware.block_render, to handle an AJAX call.
|
|
|
|
`data` is request.POST.
|
|
|
|
Returns a json dictionary:
|
|
{ 'progress_changed' : True/False,
|
|
'progress' : 'none'/'in_progress'/'done',
|
|
<other request-specific values here > }
|
|
"""
|
|
# self.score is initialized in self.lcp but in this method is accessed before self.lcp so just call it first.
|
|
self.lcp # lint-amnesty, pylint: disable=pointless-statement
|
|
handlers = {
|
|
'hint_button': self.hint_button,
|
|
'problem_get': self.get_problem,
|
|
'problem_check': self.submit_problem,
|
|
'problem_reset': self.reset_problem,
|
|
'problem_save': self.save_problem,
|
|
'problem_show': self.get_answer,
|
|
'score_update': self.update_score,
|
|
'input_ajax': self.handle_input_ajax,
|
|
'ungraded_response': self.handle_ungraded_response
|
|
}
|
|
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
|
|
generic_error_message = _(
|
|
"We're sorry, there was an error with processing your request. "
|
|
"Please try reloading your page and trying again."
|
|
)
|
|
|
|
not_found_error_message = _(
|
|
"The state of this problem has changed since you loaded this page. "
|
|
"Please refresh your page."
|
|
)
|
|
|
|
if dispatch not in handlers:
|
|
return f'Error: {dispatch} is not a known capa action'
|
|
|
|
before = self.get_progress()
|
|
before_attempts = self.attempts
|
|
|
|
try:
|
|
result = handlers[dispatch](data)
|
|
|
|
except NotFoundError as ex:
|
|
log.info(
|
|
"Unable to find data when dispatching %s to %s for user %s",
|
|
dispatch,
|
|
self.scope_ids.usage_id,
|
|
self.scope_ids.user_id
|
|
)
|
|
_, _, traceback_obj = sys.exc_info()
|
|
raise ProcessingError(not_found_error_message).with_traceback(traceback_obj) from ex
|
|
|
|
except Exception as ex: # lint-amnesty, pylint: disable=broad-except
|
|
log.exception(
|
|
"Unknown error when dispatching %s to %s for user %s",
|
|
dispatch,
|
|
self.scope_ids.usage_id,
|
|
self.scope_ids.user_id
|
|
)
|
|
_, _, traceback_obj = sys.exc_info()
|
|
raise ProcessingError(generic_error_message).with_traceback(traceback_obj) from ex
|
|
|
|
after = self.get_progress()
|
|
after_attempts = self.attempts
|
|
progress_changed = (after != before) or (after_attempts != before_attempts)
|
|
curr_score, total_possible = self.get_display_progress()
|
|
|
|
result.update({
|
|
'progress_changed': progress_changed,
|
|
'current_score': curr_score,
|
|
'total_possible': total_possible,
|
|
'attempts_used': after_attempts,
|
|
})
|
|
|
|
return json.dumps(result, cls=ComplexEncoder)
|
|
|
|
@property
|
|
def display_name_with_default(self):
|
|
"""
|
|
Constructs the display name for a CAPA problem.
|
|
|
|
Default to the display_name if it isn't None or not an empty string,
|
|
else fall back to problem category.
|
|
"""
|
|
if self.display_name is None or not self.display_name.strip():
|
|
return self.location.block_type
|
|
|
|
return self.display_name
|
|
|
|
@property
|
|
def debug(self):
|
|
"""
|
|
If CAPA block fails to render, we want course authors to be able to see
|
|
the error in Studio. At the same time, in production, we don't want
|
|
to show errors to students.
|
|
"""
|
|
return getattr(self.runtime, 'is_author_mode', False) or settings.DEBUG
|
|
|
|
@classmethod
|
|
def filter_templates(cls, template, course):
|
|
"""
|
|
Filter template that contains 'latex' from templates.
|
|
|
|
Show them only if use_latex_compiler is set to True in
|
|
course settings.
|
|
"""
|
|
return 'latex' not in template['template_id'] or course.use_latex_compiler
|
|
|
|
def get_context(self):
|
|
_context = EditingMixin.get_context(self)
|
|
_context.update({
|
|
'markdown': self.markdown,
|
|
'enable_markdown': self.markdown is not None,
|
|
'enable_latex_compiler': self.use_latex_compiler,
|
|
})
|
|
return _context
|
|
|
|
# VS[compat]
|
|
# TODO (cpennington): Delete this method once all fall 2012 course are being
|
|
# edited in the cms
|
|
@classmethod
|
|
def backcompat_paths(cls, path):
|
|
return [
|
|
'problems/' + path[8:],
|
|
path[8:],
|
|
]
|
|
|
|
@property
|
|
def non_editable_metadata_fields(self):
|
|
non_editable_fields = super().non_editable_metadata_fields
|
|
non_editable_fields.extend([
|
|
ProblemBlock.due,
|
|
ProblemBlock.graceperiod,
|
|
ProblemBlock.force_save_button,
|
|
ProblemBlock.markdown,
|
|
ProblemBlock.use_latex_compiler,
|
|
ProblemBlock.show_correctness,
|
|
])
|
|
return non_editable_fields
|
|
|
|
@property
|
|
def problem_types(self):
|
|
""" Low-level problem type introspection for content libraries filtering by problem type """
|
|
try:
|
|
tree = etree.XML(self.data)
|
|
except etree.XMLSyntaxError:
|
|
log.error(f'Error parsing problem types from xml for capa block {self.display_name}')
|
|
return None # short-term fix to prevent errors (TNL-5057). Will be more properly addressed in TNL-4525.
|
|
registered_tags = responsetypes.registry.registered_tags()
|
|
return {node.tag for node in tree.iter() if node.tag in registered_tags}
|
|
|
|
def index_dictionary(self):
|
|
"""
|
|
Return dictionary prepared with module content and type for indexing.
|
|
"""
|
|
xblock_body = super().index_dictionary()
|
|
|
|
# Make optioninput's options index friendly by replacing the actual tag with the values
|
|
capa_content = re.sub(r'<optioninput options="\(([^"]+)\)".*?>\s*|\S*<\/optioninput>', r'\1', self.data)
|
|
|
|
# Remove the following tags with content that can leak hints or solutions:
|
|
# - `solution` (with optional attributes) and `solutionset`.
|
|
# - `targetedfeedback` (with optional attributes) and `targetedfeedbackset`.
|
|
# - `answer` (with optional attributes).
|
|
# - `script` (with optional attributes).
|
|
# - `style` (with optional attributes).
|
|
# - various types of hints (with optional attributes) and `hintpart`.
|
|
capa_content = re.sub(
|
|
re.compile(
|
|
r"""
|
|
<solution.*?>.*?</solution.*?> |
|
|
<targetedfeedback.*?>.*?</targetedfeedback.*?> |
|
|
<answer.*?>.*?</answer> |
|
|
<script.*?>.*?</script> |
|
|
<style.*?>.*?</style> |
|
|
<[a-z]*hint.*?>.*?</[a-z]*hint.*?>
|
|
""",
|
|
re.DOTALL |
|
|
re.VERBOSE),
|
|
"",
|
|
capa_content
|
|
)
|
|
capa_content = re.sub(
|
|
r"(\s| |//)+",
|
|
" ",
|
|
Cleaner(tags=[], strip=True).clean(capa_content)
|
|
)
|
|
|
|
capa_body = {
|
|
"capa_content": capa_content,
|
|
"display_name": self.display_name,
|
|
}
|
|
if "content" in xblock_body:
|
|
xblock_body["content"].update(capa_body)
|
|
else:
|
|
xblock_body["content"] = capa_body
|
|
xblock_body["content_type"] = self.INDEX_CONTENT_TYPE
|
|
xblock_body["problem_types"] = list(self.problem_types)
|
|
return xblock_body
|
|
|
|
def has_support(self, view, functionality):
|
|
"""
|
|
Override the XBlock.has_support method to return appropriate
|
|
value for the multi-device functionality.
|
|
Returns whether the given view has support for the given functionality.
|
|
"""
|
|
if functionality == "multi_device":
|
|
types = self.problem_types # Avoid calculating this property twice
|
|
return types is not None and all(
|
|
responsetypes.registry.get_class_for_tag(tag).multi_device_support
|
|
for tag in types
|
|
)
|
|
return False
|
|
|
|
def max_score(self):
|
|
"""
|
|
Return the problem's max score if problem is instantiated successfully, else return max score of 0.
|
|
"""
|
|
capa_system = LoncapaSystem(
|
|
ajax_url=None,
|
|
anonymous_student_id=None,
|
|
cache=None,
|
|
can_execute_unsafe_code=None,
|
|
get_python_lib_zip=None,
|
|
DEBUG=None,
|
|
i18n=self.runtime.service(self, "i18n"),
|
|
render_template=None,
|
|
resources_fs=self.runtime.resources_fs,
|
|
seed=None,
|
|
xqueue=None,
|
|
matlab_api_key=None,
|
|
)
|
|
try:
|
|
lcp = LoncapaProblem(
|
|
problem_text=self.data,
|
|
id=self.location.html_id(),
|
|
capa_system=capa_system,
|
|
capa_block=self,
|
|
state={},
|
|
seed=1,
|
|
minimal_init=True,
|
|
)
|
|
except responsetypes.LoncapaProblemError:
|
|
log.exception(f"LcpFatalError for block {str(self.location)} while getting max score")
|
|
maximum_score = 0
|
|
else:
|
|
maximum_score = lcp.get_max_score()
|
|
return maximum_score
|
|
|
|
def generate_report_data(self, user_state_iterator, limit_responses=None):
|
|
"""
|
|
Return a list of student responses to this block in a readable way.
|
|
|
|
Arguments:
|
|
user_state_iterator: iterator over UserStateClient objects.
|
|
E.g. the result of user_state_client.iter_all_for_block(block_key)
|
|
|
|
limit_responses (int|None): maximum number of responses to include.
|
|
Set to None (default) to include all.
|
|
|
|
Returns:
|
|
each call returns a tuple like:
|
|
("username", {
|
|
"Question": "2 + 2 equals how many?",
|
|
"Answer": "Four",
|
|
"Answer ID": "98e6a8e915904d5389821a94e48babcf_10_1"
|
|
})
|
|
"""
|
|
if self.category != 'problem':
|
|
raise NotImplementedError()
|
|
|
|
if limit_responses == 0:
|
|
# Don't even start collecting answers
|
|
return
|
|
capa_system = LoncapaSystem(
|
|
ajax_url=None,
|
|
# TODO set anonymous_student_id to the anonymous ID of the user which answered each problem
|
|
# Anonymous ID is required for Matlab, CodeResponse, and some custom problems that include
|
|
# '$anonymous_student_id' in their XML.
|
|
# For the purposes of this report, we don't need to support those use cases.
|
|
anonymous_student_id=None,
|
|
cache=None,
|
|
can_execute_unsafe_code=lambda: None,
|
|
get_python_lib_zip=(
|
|
lambda: SandboxService(contentstore, self.scope_ids.usage_id.context_key).get_python_lib_zip()
|
|
),
|
|
DEBUG=None,
|
|
i18n=self.runtime.service(self, "i18n"),
|
|
render_template=None,
|
|
resources_fs=self.runtime.resources_fs,
|
|
seed=1,
|
|
xqueue=None,
|
|
matlab_api_key=None,
|
|
)
|
|
_ = capa_system.i18n.ugettext
|
|
|
|
count = 0
|
|
for user_state in user_state_iterator:
|
|
|
|
if 'student_answers' not in user_state.state:
|
|
continue
|
|
try:
|
|
lcp = LoncapaProblem(
|
|
problem_text=self.data,
|
|
id=self.location.html_id(),
|
|
capa_system=capa_system,
|
|
# We choose to run without a fully initialized CapaModule
|
|
capa_block=None,
|
|
state={
|
|
'done': user_state.state.get('done'),
|
|
'correct_map': user_state.state.get('correct_map'),
|
|
'student_answers': user_state.state.get('student_answers'),
|
|
'has_saved_answers': user_state.state.get('has_saved_answers'),
|
|
'input_state': user_state.state.get('input_state'),
|
|
'seed': user_state.state.get('seed'),
|
|
},
|
|
seed=user_state.state.get('seed'),
|
|
# extract_tree=False allows us to work without a fully initialized CapaModule
|
|
# We'll still be able to find particular data in the XML when we need it
|
|
extract_tree=False,
|
|
)
|
|
|
|
for answer_id, orig_answers in lcp.student_answers.items():
|
|
# Some types of problems have data in lcp.student_answers that isn't in lcp.problem_data.
|
|
# E.g. formulae do this to store the MathML version of the answer.
|
|
# We exclude these rows from the report because we only need the text-only answer.
|
|
if answer_id.endswith('_dynamath'):
|
|
continue
|
|
|
|
if limit_responses and count >= limit_responses:
|
|
# End the iterator here
|
|
return
|
|
|
|
question_text = lcp.find_question_label(answer_id)
|
|
answer_text = lcp.find_answer_text(answer_id, current_answer=orig_answers)
|
|
correct_answer_text = lcp.find_correct_answer_text(answer_id)
|
|
|
|
count += 1
|
|
report = {
|
|
_("Answer ID"): answer_id,
|
|
_("Question"): question_text,
|
|
_("Answer"): answer_text,
|
|
}
|
|
if correct_answer_text is not None:
|
|
report[_("Correct Answer")] = correct_answer_text
|
|
yield (user_state.username, report)
|
|
except LoncapaProblemError:
|
|
# Capture a backtrace for errors from failed loncapa problems
|
|
log.exception(
|
|
"An error occurred generating a problem report on course %s, problem %s, and student %s",
|
|
self.course_id, self.scope_ids.usage_id,
|
|
self.scope_ids.user_id
|
|
)
|
|
# Also input error in report
|
|
report = {
|
|
_("Answer ID"): "Python Error",
|
|
_("Question"): "Generating a report on the problem failed.",
|
|
_("Answer"): "Python Error: No Answer Retrieved",
|
|
}
|
|
yield (user_state.username, report)
|
|
|
|
@property
|
|
def close_date(self):
|
|
"""
|
|
Return the date submissions should be closed from.
|
|
"""
|
|
due_date = self.due
|
|
|
|
if self.graceperiod is not None and due_date:
|
|
return due_date + self.graceperiod
|
|
else:
|
|
return due_date
|
|
|
|
def get_seed(self):
|
|
"""
|
|
Generate the seed if not set and return it.
|
|
"""
|
|
if self.seed is None:
|
|
self.choose_new_seed()
|
|
return self.seed
|
|
|
|
@cached_property
|
|
def lcp(self): # lint-amnesty, pylint: disable=method-hidden, missing-function-docstring
|
|
try:
|
|
lcp = self.new_lcp(self.get_state_for_lcp())
|
|
except Exception as err: # pylint: disable=broad-except
|
|
msg = 'cannot create LoncapaProblem {loc}: {err}'.format(
|
|
loc=str(self.location), err=err)
|
|
raise LoncapaProblemError(msg).with_traceback(sys.exc_info()[2])
|
|
|
|
if self.score is None:
|
|
self.set_score(self.score_from_lcp(lcp))
|
|
|
|
assert self.seed is not None
|
|
return lcp
|
|
|
|
def choose_new_seed(self):
|
|
"""
|
|
Choose a new seed.
|
|
"""
|
|
if self.rerandomize == RANDOMIZATION.NEVER:
|
|
self.seed = 1
|
|
elif self.rerandomize == RANDOMIZATION.PER_STUDENT:
|
|
user_id = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_ID) or 0
|
|
# see comment on randomization_bin
|
|
self.seed = randomization_bin(user_id, str(self.location).encode('utf-8'))
|
|
else:
|
|
self.seed = struct.unpack('i', os.urandom(4))[0]
|
|
|
|
# So that sandboxed code execution can be cached, but still have an interesting
|
|
# number of possibilities, cap the number of different random seeds.
|
|
self.seed %= MAX_RANDOMIZATION_BINS
|
|
|
|
def new_lcp(self, state, text=None):
|
|
"""
|
|
Generate a new Loncapa Problem
|
|
"""
|
|
if text is None:
|
|
text = self.data
|
|
|
|
user_service = self.runtime.service(self, 'user')
|
|
anonymous_student_id = user_service.get_current_user().opt_attrs.get(ATTR_KEY_DEPRECATED_ANONYMOUS_USER_ID)
|
|
seed = user_service.get_current_user().opt_attrs.get(ATTR_KEY_USER_ID) or 0
|
|
|
|
sandbox_service = self.runtime.service(self, 'sandbox')
|
|
cache_service = self.runtime.service(self, 'cache')
|
|
|
|
is_studio = getattr(self.runtime, 'is_author_mode', False)
|
|
|
|
capa_system = LoncapaSystem(
|
|
ajax_url=self.ajax_url,
|
|
anonymous_student_id=anonymous_student_id,
|
|
cache=cache_service,
|
|
can_execute_unsafe_code=sandbox_service.can_execute_unsafe_code,
|
|
get_python_lib_zip=sandbox_service.get_python_lib_zip,
|
|
DEBUG=self.debug,
|
|
i18n=self.runtime.service(self, "i18n"),
|
|
render_template=self.runtime.service(self, 'mako').render_template,
|
|
resources_fs=self.runtime.resources_fs,
|
|
seed=seed, # Why do we do this if we have self.seed?
|
|
xqueue=None if is_studio else XQueueService(self),
|
|
matlab_api_key=self.matlab_api_key
|
|
)
|
|
|
|
return LoncapaProblem(
|
|
problem_text=text,
|
|
id=self.location.html_id(),
|
|
state=state,
|
|
seed=self.get_seed(),
|
|
capa_system=capa_system,
|
|
capa_block=self, # njp
|
|
)
|
|
|
|
def get_state_for_lcp(self):
|
|
"""
|
|
Give a dictionary holding the state of the module
|
|
"""
|
|
return {
|
|
'done': self.done,
|
|
'correct_map': self.correct_map,
|
|
'student_answers': self.student_answers,
|
|
'has_saved_answers': self.has_saved_answers,
|
|
'input_state': self.input_state,
|
|
'seed': self.get_seed(),
|
|
}
|
|
|
|
def set_state_from_lcp(self):
|
|
"""
|
|
Set the module's state from the settings in `self.lcp`
|
|
"""
|
|
lcp_state = self.lcp.get_state()
|
|
self.done = lcp_state['done']
|
|
self.correct_map = lcp_state['correct_map']
|
|
self.input_state = lcp_state['input_state']
|
|
self.student_answers = lcp_state['student_answers']
|
|
self.has_saved_answers = lcp_state['has_saved_answers']
|
|
|
|
def set_last_submission_time(self):
|
|
"""
|
|
Set the module's last submission time (when the problem was submitted)
|
|
"""
|
|
self.last_submission_time = datetime.datetime.now(utc)
|
|
|
|
def get_progress(self):
|
|
"""
|
|
For now, just return weighted earned / weighted possible
|
|
"""
|
|
if self.score:
|
|
raw_earned = self.score.raw_earned
|
|
raw_possible = self.score.raw_possible
|
|
else:
|
|
raw_earned = raw_possible = 0
|
|
|
|
if raw_possible > 0:
|
|
if self.weight is not None:
|
|
# Progress objects expect total > 0
|
|
if self.weight == 0:
|
|
return None
|
|
|
|
# scale score and total by weight/total:
|
|
weighted_earned = raw_earned * self.weight / raw_possible
|
|
weighted_possible = self.weight
|
|
else:
|
|
weighted_earned = raw_earned
|
|
weighted_possible = raw_possible
|
|
try:
|
|
return Progress(weighted_earned, weighted_possible)
|
|
except (TypeError, ValueError):
|
|
log.exception("Got bad progress")
|
|
return None
|
|
return None
|
|
|
|
def get_display_progress(self):
|
|
"""
|
|
Return (score, total) to be displayed to the learner.
|
|
"""
|
|
progress = self.get_progress()
|
|
score, total = (progress.frac() if progress else (0, 0))
|
|
|
|
# Withhold the score if hiding correctness
|
|
if not self.correctness_available():
|
|
score = None
|
|
|
|
return score, total
|
|
|
|
def get_html(self):
|
|
"""
|
|
Return some html with data about the module
|
|
"""
|
|
curr_score, total_possible = self.get_display_progress()
|
|
|
|
return self.runtime.service(self, 'mako').render_template('problem_ajax.html', {
|
|
'element_id': self.location.html_id(),
|
|
'id': str(self.location),
|
|
'ajax_url': self.ajax_url,
|
|
'current_score': curr_score,
|
|
'total_possible': total_possible,
|
|
'attempts_used': self.attempts,
|
|
'content': self.get_problem_html(encapsulate=False),
|
|
'graded': self.graded, # pylint: disable=no-member
|
|
})
|
|
|
|
def handle_fatal_lcp_error(self, error): # lint-amnesty, pylint: disable=missing-function-docstring
|
|
log.exception(f"LcpFatalError Encountered for {str(self.location)}")
|
|
if error:
|
|
return(
|
|
HTML('<p>Error formatting HTML for problem:</p><p><pre style="color:red">{msg}</pre></p>').format(
|
|
msg=str(error))
|
|
)
|
|
else:
|
|
return HTML(
|
|
'<p>Could not format HTML for problem. '
|
|
'Contact course staff in the discussion forum for assistance.</p>'
|
|
)
|
|
|
|
def submit_button_name(self):
|
|
"""
|
|
Determine the name for the "submit" button.
|
|
"""
|
|
# The logic flow is a little odd so that _('xxx') strings can be found for
|
|
# translation while also running _() just once for each string.
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
submit = _('Submit')
|
|
|
|
return submit
|
|
|
|
def submit_button_submitting_name(self):
|
|
"""
|
|
Return the "Submitting" text for the "submit" button.
|
|
|
|
After the user presses the "submit" button, the button will briefly
|
|
display the value returned by this function until a response is
|
|
received by the server.
|
|
"""
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
return _('Submitting')
|
|
|
|
def should_enable_submit_button(self):
|
|
"""
|
|
Return True/False to indicate whether to enable the "Submit" button.
|
|
"""
|
|
submitted_without_reset = (self.is_submitted() and self.rerandomize == RANDOMIZATION.ALWAYS)
|
|
|
|
# If the problem is closed (past due / too many attempts)
|
|
# then we disable the "submit" button
|
|
# Also, disable the "submit" button if we're waiting
|
|
# for the user to reset a randomized problem
|
|
if self.closed() or submitted_without_reset:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def should_show_reset_button(self):
|
|
"""
|
|
Return True/False to indicate whether to show the "Reset" button.
|
|
"""
|
|
is_survey_question = (self.max_attempts == 0)
|
|
|
|
# If the problem is closed (and not a survey question with max_attempts==0),
|
|
# then do NOT show the reset button.
|
|
if self.closed() and not is_survey_question:
|
|
return False
|
|
|
|
# Button only shows up for randomized problems if the question has been submitted
|
|
if self.rerandomize in [RANDOMIZATION.ALWAYS, RANDOMIZATION.ONRESET] and self.is_submitted():
|
|
return True
|
|
else:
|
|
# Do NOT show the button if the problem is correct
|
|
if self.is_correct():
|
|
return False
|
|
else:
|
|
return self.show_reset_button
|
|
|
|
def should_show_save_button(self):
|
|
"""
|
|
Return True/False to indicate whether to show the "Save" button.
|
|
"""
|
|
|
|
# If the user has forced the save button to display,
|
|
# then show it as long as the problem is not closed
|
|
# (past due / too many attempts)
|
|
if self.force_save_button:
|
|
return not self.closed()
|
|
else:
|
|
is_survey_question = (self.max_attempts == 0)
|
|
needs_reset = self.is_submitted() and self.rerandomize == RANDOMIZATION.ALWAYS
|
|
|
|
# If the student has unlimited attempts, and their answers
|
|
# are not randomized, then we do not need a save button
|
|
# because they can use the "Check" button without consequences.
|
|
#
|
|
# The consequences we want to avoid are:
|
|
# * Using up an attempt (if max_attempts is set)
|
|
# * Changing the current problem, and no longer being
|
|
# able to view it (if rerandomize is "always")
|
|
#
|
|
# In those cases. the if statement below is false,
|
|
# and the save button can still be displayed.
|
|
#
|
|
if self.max_attempts is None and self.rerandomize != RANDOMIZATION.ALWAYS:
|
|
return False
|
|
|
|
# If the problem is closed (and not a survey question with max_attempts==0),
|
|
# then do NOT show the save button
|
|
# If we're waiting for the user to reset a randomized problem
|
|
# then do NOT show the save button
|
|
elif (self.closed() and not is_survey_question) or needs_reset:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def handle_problem_html_error(self, err):
|
|
"""
|
|
Create a dummy problem to represent any errors.
|
|
|
|
Change our problem to a dummy problem containing a warning message to
|
|
display to users. Returns the HTML to show to users
|
|
|
|
`err` is the Exception encountered while rendering the problem HTML.
|
|
"""
|
|
problem_display_name = self.display_name_with_default
|
|
problem_location = str(self.location)
|
|
log.exception(
|
|
"ProblemGetHtmlError: %r, %r, %s",
|
|
problem_display_name,
|
|
problem_location,
|
|
str(err)
|
|
)
|
|
|
|
if self.debug:
|
|
msg = HTML(
|
|
'[courseware.capa.capa_block] '
|
|
'Failed to generate HTML for problem {url}'
|
|
).format(
|
|
url=str(self.location)
|
|
)
|
|
msg += HTML('<p>Error:</p><p><pre>{msg}</pre></p>').format(msg=str(err))
|
|
msg += HTML('<p><pre>{tb}</pre></p>').format(tb=traceback.format_exc())
|
|
html = msg
|
|
|
|
else:
|
|
# We're in non-debug mode, and possibly even in production. We want
|
|
# to avoid bricking of problem as much as possible
|
|
|
|
# Presumably, student submission has corrupted LoncapaProblem HTML.
|
|
# First, pull down all student answers
|
|
|
|
student_answers = self.lcp.student_answers
|
|
answer_ids = list(student_answers.keys())
|
|
|
|
# Some inputtypes, such as dynamath, have additional "hidden" state that
|
|
# is not exposed to the student. Keep those hidden
|
|
# TODO: Use regex, e.g. 'dynamath' is suffix at end of answer_id
|
|
hidden_state_keywords = ['dynamath']
|
|
for answer_id in answer_ids:
|
|
for hidden_state_keyword in hidden_state_keywords:
|
|
if answer_id.find(hidden_state_keyword) >= 0:
|
|
student_answers.pop(answer_id)
|
|
|
|
# Next, generate a fresh LoncapaProblem
|
|
self.lcp = self.new_lcp(None)
|
|
self.set_state_from_lcp()
|
|
self.set_score(self.score_from_lcp(self.lcp))
|
|
# Prepend a scary warning to the student
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
warning_msg = Text(_("Warning: The problem has been reset to its initial state!"))
|
|
warning = HTML('<div class="capa_reset"> <h2>{}</h2>').format(warning_msg)
|
|
|
|
# Translators: Following this message, there will be a bulleted list of items.
|
|
warning_msg = _("The problem's state was corrupted by an invalid submission. The submission consisted of:")
|
|
warning += HTML('{}<ul>').format(warning_msg)
|
|
|
|
for student_answer in student_answers.values():
|
|
if student_answer != '':
|
|
warning += HTML('<li>{}</li>').format(student_answer)
|
|
|
|
warning_msg = _('If this error persists, please contact the course staff.')
|
|
warning += HTML('</ul>{}</div>').format(warning_msg)
|
|
|
|
html = warning
|
|
try:
|
|
html += self.lcp.get_html()
|
|
except Exception as error:
|
|
# Couldn't do it. Give up.
|
|
log.exception(
|
|
"ProblemGetHtmlError: Unable to generate html from LoncapaProblem: %r, %r, %s",
|
|
problem_display_name,
|
|
problem_location,
|
|
str(error)
|
|
)
|
|
raise
|
|
|
|
return html
|
|
|
|
def _should_enable_demand_hint(self, demand_hints, hint_index=None):
|
|
"""
|
|
Should the demand hint option be enabled?
|
|
|
|
Arguments:
|
|
hint_index (int): The current hint index, or None (default value) if no hint is currently being shown.
|
|
demand_hints (list): List of hints.
|
|
Returns:
|
|
bool: True is the demand hint is possible.
|
|
bool: True is demand hint should be enabled.
|
|
"""
|
|
# hint_index is the index of the last hint that will be displayed in this rendering,
|
|
# so add 1 to check if others exist.
|
|
if hint_index is None:
|
|
should_enable = len(demand_hints) > 0
|
|
else:
|
|
should_enable = len(demand_hints) > 0 and hint_index + 1 < len(demand_hints)
|
|
return len(demand_hints) > 0, should_enable
|
|
|
|
def get_demand_hint(self, hint_index):
|
|
"""
|
|
Return html for the problem, including demand hints.
|
|
|
|
hint_index (int): (None is the default) if not None, this is the index of the next demand
|
|
hint to show.
|
|
"""
|
|
demand_hints = self.lcp.tree.xpath("//problem/demandhint/hint")
|
|
hint_index = hint_index % len(demand_hints)
|
|
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
|
|
counter = 0
|
|
total_text = ''
|
|
while counter <= hint_index:
|
|
# Translators: {previous_hints} is the HTML of hints that have already been generated, {hint_number_prefix}
|
|
# is a header for this hint, and {hint_text} is the text of the hint itself.
|
|
# This string is being passed to translation only for possible reordering of the placeholders.
|
|
total_text = HTML(_('{previous_hints}{list_start_tag}{strong_text}{hint_text}</li>')).format(
|
|
previous_hints=HTML(total_text),
|
|
list_start_tag=HTML('<li class="hint-index-{counter}" tabindex="-1">').format(counter=counter),
|
|
strong_text=HTML('<strong>{hint_number_prefix}</strong>').format(
|
|
# Translators: e.g. "Hint 1 of 3: " meaning we are showing the first of three hints.
|
|
# This text is shown in bold before the accompanying hint text.
|
|
hint_number_prefix=Text(_("Hint ({hint_num} of {hints_count}): ")).format(
|
|
hint_num=counter + 1, hints_count=len(demand_hints)
|
|
)
|
|
),
|
|
# Course-authored HTML demand hints are supported.
|
|
hint_text=HTML(self.runtime.service(self, "replace_urls").replace_urls(
|
|
get_inner_html_from_xpath(demand_hints[counter])
|
|
))
|
|
)
|
|
counter += 1
|
|
|
|
total_text = HTML('<ol>{hints}</ol>').format(hints=total_text)
|
|
|
|
# Log this demand-hint request. Note that this only logs the last hint requested (although now
|
|
# all previously shown hints are still displayed).
|
|
event_info = {}
|
|
event_info['module_id'] = str(self.location)
|
|
event_info['hint_index'] = hint_index
|
|
event_info['hint_len'] = len(demand_hints)
|
|
event_info['hint_text'] = get_inner_html_from_xpath(demand_hints[hint_index])
|
|
self.runtime.publish(self, 'edx.problem.hint.demandhint_displayed', event_info)
|
|
|
|
_, should_enable_next_hint = self._should_enable_demand_hint(demand_hints=demand_hints, hint_index=hint_index)
|
|
|
|
# We report the index of this hint, the client works out what index to use to get the next hint
|
|
return {
|
|
'success': True,
|
|
'hint_index': hint_index,
|
|
'should_enable_next_hint': should_enable_next_hint,
|
|
'msg': total_text,
|
|
}
|
|
|
|
def get_problem_html(self, encapsulate=True, submit_notification=False):
|
|
"""
|
|
Return html for the problem.
|
|
|
|
Adds submit, reset, save, and hint buttons as necessary based on the problem config
|
|
and state.
|
|
|
|
encapsulate (bool): if True (the default) embed the html in a problem <div>
|
|
submit_notification (bool): True if the submit notification should be added
|
|
"""
|
|
try:
|
|
html = self.lcp.get_html()
|
|
|
|
# If we cannot construct the problem HTML,
|
|
# then generate an error message instead.
|
|
except Exception as err: # pylint: disable=broad-except
|
|
html = self.handle_problem_html_error(err)
|
|
|
|
html = self.remove_tags_from_html(html)
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
|
|
# Enable/Disable Submit button if should_enable_submit_button returns True/False.
|
|
submit_button = self.submit_button_name()
|
|
submit_button_submitting = self.submit_button_submitting_name()
|
|
should_enable_submit_button = self.should_enable_submit_button()
|
|
submit_disabled_ctas = None
|
|
if not should_enable_submit_button:
|
|
cta_service = self.runtime.service(self, "call_to_action")
|
|
if cta_service:
|
|
submit_disabled_ctas = cta_service.get_ctas(self, 'capa_submit_disabled')
|
|
|
|
content = {
|
|
'name': self.display_name_with_default,
|
|
'html': smart_str(html),
|
|
'weight': self.weight,
|
|
}
|
|
|
|
# If demand hints are available, emit hint button and div.
|
|
demand_hints = self.lcp.tree.xpath("//problem/demandhint/hint")
|
|
demand_hint_possible, should_enable_next_hint = self._should_enable_demand_hint(demand_hints=demand_hints)
|
|
|
|
answer_notification_type, answer_notification_message = self._get_answer_notification(
|
|
render_notifications=submit_notification)
|
|
|
|
save_message = None
|
|
if self.has_saved_answers:
|
|
save_message = _(
|
|
"Your answers were previously saved. Click '{button_name}' to grade them."
|
|
).format(button_name=self.submit_button_name())
|
|
|
|
context = {
|
|
'problem': content,
|
|
'id': str(self.location),
|
|
'short_id': self.location.html_id(),
|
|
'submit_button': submit_button,
|
|
'submit_button_submitting': submit_button_submitting,
|
|
'should_enable_submit_button': should_enable_submit_button,
|
|
'reset_button': self.should_show_reset_button(),
|
|
'save_button': self.should_show_save_button(),
|
|
'answer_available': self.answer_available(),
|
|
'attempts_used': self.attempts,
|
|
'attempts_allowed': self.max_attempts,
|
|
'demand_hint_possible': demand_hint_possible,
|
|
'should_enable_next_hint': should_enable_next_hint,
|
|
'answer_notification_type': answer_notification_type,
|
|
'answer_notification_message': answer_notification_message,
|
|
'has_saved_answers': self.has_saved_answers,
|
|
'save_message': save_message,
|
|
'submit_disabled_cta': submit_disabled_ctas[0] if submit_disabled_ctas else None,
|
|
}
|
|
|
|
html = self.runtime.service(self, 'mako').render_template('problem.html', context)
|
|
|
|
if encapsulate:
|
|
html = HTML('<div id="problem_{id}" class="problem" data-url="{ajax_url}">{html}</div>').format(
|
|
id=self.location.html_id(), ajax_url=self.ajax_url, html=HTML(html)
|
|
)
|
|
|
|
# Now do all the substitutions which the LMS block_render normally does, but
|
|
# we need to do here explicitly since we can get called for our HTML via AJAX
|
|
html = self.runtime.service(self, "replace_urls").replace_urls(html)
|
|
|
|
return html
|
|
|
|
def _get_answer_notification(self, render_notifications):
|
|
"""
|
|
Generate the answer notification type and message from the current problem status.
|
|
|
|
Arguments:
|
|
render_notifications (bool): If false the method will return an None for type and message
|
|
"""
|
|
answer_notification_message = None
|
|
answer_notification_type = None
|
|
|
|
if render_notifications:
|
|
progress = self.get_progress()
|
|
id_list = list(self.lcp.correct_map.keys())
|
|
|
|
# Show only a generic message if hiding correctness
|
|
if not self.correctness_available():
|
|
answer_notification_type = 'submitted'
|
|
elif len(id_list) == 1:
|
|
# Only one answer available
|
|
answer_notification_type = self.lcp.correct_map.get_correctness(id_list[0])
|
|
elif len(id_list) > 1:
|
|
# Check the multiple answers that are available
|
|
answer_notification_type = self.lcp.correct_map.get_correctness(id_list[0])
|
|
for answer_id in id_list[1:]:
|
|
if self.lcp.correct_map.get_correctness(answer_id) != answer_notification_type:
|
|
# There is at least 1 of the following combinations of correctness states
|
|
# Correct and incorrect, Correct and partially correct, or Incorrect and partially correct
|
|
# which all should have a message type of Partially Correct
|
|
answer_notification_type = 'partially-correct'
|
|
break
|
|
|
|
# Build the notification message based on the notification type and translate it.
|
|
ungettext = self.runtime.service(self, "i18n").ungettext
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
if answer_notification_type == 'incorrect':
|
|
if progress is not None:
|
|
answer_notification_message = ungettext(
|
|
"Incorrect ({progress} point)",
|
|
"Incorrect ({progress} points)",
|
|
progress.frac()[1]
|
|
).format(progress=str(progress))
|
|
else:
|
|
answer_notification_message = _('Incorrect')
|
|
elif answer_notification_type == 'correct':
|
|
if progress is not None:
|
|
answer_notification_message = ungettext(
|
|
"Correct ({progress} point)",
|
|
"Correct ({progress} points)",
|
|
progress.frac()[1]
|
|
).format(progress=str(progress))
|
|
else:
|
|
answer_notification_message = _('Correct')
|
|
elif answer_notification_type == 'partially-correct':
|
|
if progress is not None:
|
|
answer_notification_message = ungettext(
|
|
"Partially correct ({progress} point)",
|
|
"Partially correct ({progress} points)",
|
|
progress.frac()[1]
|
|
).format(progress=str(progress))
|
|
else:
|
|
answer_notification_message = _('Partially Correct')
|
|
elif answer_notification_type == 'submitted':
|
|
answer_notification_message = _("Answer submitted.")
|
|
|
|
return answer_notification_type, answer_notification_message
|
|
|
|
def remove_tags_from_html(self, html):
|
|
"""
|
|
The capa xml includes many tags such as <additional_answer> or <demandhint> which are not
|
|
meant to be part of the client html. We strip them all and return the resulting html.
|
|
"""
|
|
tags = ['demandhint', 'choicehint', 'optionhint', 'stringhint', 'numerichint', 'optionhint',
|
|
'correcthint', 'regexphint', 'additional_answer', 'stringequalhint', 'compoundhint',
|
|
'stringequalhint']
|
|
for tag in tags:
|
|
html = re.sub(fr'<{tag}.*?>.*?</{tag}>', '', html, flags=re.DOTALL) # xss-lint: disable=python-interpolate-html # lint-amnesty, pylint: disable=line-too-long
|
|
# Some of these tags span multiple lines
|
|
# Note: could probably speed this up by calling sub() once with a big regex
|
|
# vs. simply calling sub() many times as we have here.
|
|
return html
|
|
|
|
def hint_button(self, data):
|
|
"""
|
|
Hint button handler, returns new html using hint_index from the client.
|
|
"""
|
|
hint_index = int(data['hint_index'])
|
|
return self.get_demand_hint(hint_index)
|
|
|
|
def used_all_attempts(self):
|
|
""" All attempts have been used """
|
|
return self.max_attempts is not None and self.attempts >= self.max_attempts
|
|
|
|
def is_past_due(self):
|
|
"""
|
|
Is it now past this problem's due date, including grace period?
|
|
"""
|
|
return (self.close_date is not None and
|
|
datetime.datetime.now(utc) > self.close_date)
|
|
|
|
def closed(self):
|
|
"""
|
|
Is the student still allowed to submit answers?
|
|
"""
|
|
if self.used_all_attempts():
|
|
return True
|
|
if self.is_past_due():
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_submitted(self):
|
|
"""
|
|
Used to decide to show or hide RESET or CHECK buttons.
|
|
|
|
Means that student submitted problem and nothing more.
|
|
Problem can be completely wrong.
|
|
Pressing RESET button makes this function to return False.
|
|
"""
|
|
# used by conditional block
|
|
return self.lcp.done
|
|
|
|
def is_attempted(self):
|
|
"""
|
|
Has the problem been attempted?
|
|
|
|
used by conditional block
|
|
"""
|
|
return self.attempts > 0
|
|
|
|
def is_correct(self):
|
|
"""
|
|
True iff full points
|
|
"""
|
|
# self.score is initialized in self.lcp but in this method is accessed before self.lcp so just call it first.
|
|
self.lcp # pylint: disable=pointless-statement
|
|
return self.score.raw_earned == self.score.raw_possible
|
|
|
|
def answer_available(self):
|
|
"""
|
|
Is the user allowed to see an answer?
|
|
"""
|
|
user_is_staff = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_IS_STAFF)
|
|
if not self.correctness_available():
|
|
# If correctness is being withheld, then don't show answers either.
|
|
return False
|
|
elif self.showanswer == '':
|
|
return False
|
|
elif self.showanswer == SHOWANSWER.NEVER:
|
|
return False
|
|
elif user_is_staff:
|
|
# This is after the 'never' check because admins can see the answer
|
|
# unless the problem explicitly prevents it
|
|
return True
|
|
elif self.showanswer == SHOWANSWER.ATTEMPTED:
|
|
return self.is_attempted() or self.is_past_due()
|
|
elif self.showanswer == SHOWANSWER.ANSWERED:
|
|
# NOTE: this is slightly different from 'attempted' -- resetting the problems
|
|
# makes lcp.done False, but leaves attempts unchanged.
|
|
return self.is_correct()
|
|
elif self.showanswer == SHOWANSWER.CLOSED:
|
|
return self.closed()
|
|
elif self.showanswer == SHOWANSWER.FINISHED:
|
|
return self.closed() or self.is_correct()
|
|
|
|
elif self.showanswer == SHOWANSWER.CORRECT_OR_PAST_DUE:
|
|
return self.is_correct() or self.is_past_due()
|
|
elif self.showanswer == SHOWANSWER.PAST_DUE:
|
|
return self.is_past_due()
|
|
elif self.showanswer == SHOWANSWER.AFTER_SOME_NUMBER_OF_ATTEMPTS:
|
|
required_attempts = self.attempts_before_showanswer_button
|
|
if self.max_attempts and required_attempts >= self.max_attempts:
|
|
required_attempts = self.max_attempts
|
|
return self.attempts >= required_attempts
|
|
elif self.showanswer == SHOWANSWER.ALWAYS:
|
|
return True
|
|
elif self.showanswer == SHOWANSWER.AFTER_ALL_ATTEMPTS:
|
|
return self.used_all_attempts()
|
|
elif self.showanswer == SHOWANSWER.AFTER_ALL_ATTEMPTS_OR_CORRECT:
|
|
return self.used_all_attempts() or self.is_correct()
|
|
elif self.showanswer == SHOWANSWER.ATTEMPTED_NO_PAST_DUE:
|
|
return self.is_attempted()
|
|
return False
|
|
|
|
def correctness_available(self):
|
|
"""
|
|
Is the user allowed to see whether she's answered correctly?
|
|
|
|
Limits access to the correct/incorrect flags, messages, and problem score.
|
|
"""
|
|
user_is_staff = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_IS_STAFF)
|
|
return ShowCorrectness.correctness_available(
|
|
show_correctness=self.show_correctness,
|
|
due_date=self.close_date,
|
|
has_staff_access=user_is_staff,
|
|
)
|
|
|
|
def update_score(self, data):
|
|
"""
|
|
Delivers grading response (e.g. from asynchronous code checking) to
|
|
the capa problem, so its score can be updated
|
|
|
|
'data' must have a key 'response' which is a string that contains the
|
|
grader's response
|
|
|
|
No ajax return is needed. Return empty dict.
|
|
"""
|
|
queuekey = data['queuekey']
|
|
score_msg = data['xqueue_body']
|
|
self.lcp.update_score(score_msg, queuekey)
|
|
self.set_state_from_lcp()
|
|
self.set_score(self.score_from_lcp(self.lcp))
|
|
self.publish_grade(grader_response=True)
|
|
|
|
return {} # No AJAX return is needed
|
|
|
|
def handle_ungraded_response(self, data):
|
|
"""
|
|
Delivers a response from the XQueue to the capa problem
|
|
|
|
The score of the problem will not be updated
|
|
|
|
Args:
|
|
- data (dict) must contain keys:
|
|
queuekey - a key specific to this response
|
|
xqueue_body - the body of the response
|
|
Returns:
|
|
empty dictionary
|
|
|
|
No ajax return is needed, so an empty dict is returned
|
|
"""
|
|
queuekey = data['queuekey']
|
|
score_msg = data['xqueue_body']
|
|
|
|
# pass along the xqueue message to the problem
|
|
self.lcp.ungraded_response(score_msg, queuekey)
|
|
self.set_state_from_lcp()
|
|
return {}
|
|
|
|
def handle_input_ajax(self, data):
|
|
"""
|
|
Handle ajax calls meant for a particular input in the problem
|
|
|
|
Args:
|
|
- data (dict) - data that should be passed to the input
|
|
Returns:
|
|
- dict containing the response from the input
|
|
"""
|
|
response = self.lcp.handle_input_ajax(data)
|
|
|
|
# save any state changes that may occur
|
|
self.set_state_from_lcp()
|
|
return response
|
|
|
|
def get_answer(self, _data):
|
|
"""
|
|
For the "show answer" button.
|
|
|
|
Returns the answers and rendered "correct status span" HTML:
|
|
{'answers' : answers, 'correct_status_html': correct_status_span_html}.
|
|
The "correct status span" HTML is injected beside the correct answers
|
|
for radio button and checkmark problems, so that there is a visual
|
|
indication of the correct answers that is not solely based on color
|
|
(and also screen reader text).
|
|
"""
|
|
event_info = {}
|
|
event_info['problem_id'] = str(self.location)
|
|
self.publish_unmasked('showanswer', event_info)
|
|
if not self.answer_available(): # lint-amnesty, pylint: disable=no-else-raise
|
|
raise NotFoundError('Answer is not available')
|
|
else:
|
|
answers = self.lcp.get_question_answers()
|
|
self.set_state_from_lcp()
|
|
|
|
# answers (eg <solution>) may have embedded images
|
|
# but be careful, some problems are using non-string answer dicts
|
|
new_answers = {}
|
|
for answer_id in answers:
|
|
try:
|
|
answer_content = self.runtime.service(self, "replace_urls").replace_urls(answers[answer_id])
|
|
new_answer = {answer_id: answer_content}
|
|
except TypeError:
|
|
log.debug('Unable to perform URL substitution on answers[%s]: %s',
|
|
answer_id, answers[answer_id])
|
|
new_answer = {answer_id: answers[answer_id]}
|
|
new_answers.update(new_answer)
|
|
|
|
return {
|
|
'answers': new_answers,
|
|
'correct_status_html': self.runtime.service(self, 'mako').render_template(
|
|
'status_span.html',
|
|
{'status': Status('correct', self.runtime.service(self, "i18n").ugettext)}
|
|
)
|
|
}
|
|
|
|
# Figure out if we should move these to capa_problem?
|
|
def get_problem(self, _data):
|
|
"""
|
|
Return results of get_problem_html, as a simple dict for json-ing.
|
|
{ 'html': <the-html> }
|
|
|
|
Used if we want to reconfirm we have the right thing e.g. after
|
|
several AJAX calls.
|
|
"""
|
|
return {'html': self.get_problem_html(encapsulate=False, submit_notification=True)}
|
|
|
|
@staticmethod
|
|
def make_dict_of_responses(data):
|
|
"""
|
|
Make dictionary of student responses (aka "answers")
|
|
|
|
`data` is POST dictionary (webob.multidict.MultiDict).
|
|
|
|
The `data` dict has keys of the form 'x_y', which are mapped
|
|
to key 'y' in the returned dict. For example,
|
|
'input_1_2_3' would be mapped to '1_2_3' in the returned dict.
|
|
|
|
Some inputs always expect a list in the returned dict
|
|
(e.g. checkbox inputs). The convention is that
|
|
keys in the `data` dict that end with '[]' will always
|
|
have list values in the returned dict.
|
|
For example, if the `data` dict contains {'input_1[]': 'test' }
|
|
then the output dict would contain {'1': ['test'] }
|
|
(the value is a list).
|
|
|
|
Some other inputs such as ChoiceTextInput expect a dict of values in the returned
|
|
dict If the key ends with '{}' then we will assume that the value is a json
|
|
encoded dict and deserialize it.
|
|
For example, if the `data` dict contains {'input_1{}': '{"1_2_1": 1}'}
|
|
then the output dict would contain {'1': {"1_2_1": 1} }
|
|
(the value is a dictionary)
|
|
|
|
Raises an exception if:
|
|
|
|
-A key in the `data` dictionary does not contain at least one underscore
|
|
(e.g. "input" is invalid, but "input_1" is valid)
|
|
|
|
-Two keys end up with the same name in the returned dict.
|
|
(e.g. 'input_1' and 'input_1[]', which both get mapped to 'input_1'
|
|
in the returned dict)
|
|
"""
|
|
answers = {}
|
|
|
|
# webob.multidict.MultiDict is a view of a list of tuples,
|
|
# so it will return a multi-value key once for each value.
|
|
# We only want to consider each key a single time, so we use set(data.keys())
|
|
for key in set(data.keys()):
|
|
# e.g. input_resistor_1 ==> resistor_1
|
|
_, _, name = key.partition('_')
|
|
|
|
# If key has no underscores, then partition
|
|
# will return (key, '', '')
|
|
# We detect this and raise an error
|
|
if not name: # lint-amnesty, pylint: disable=no-else-raise
|
|
raise ValueError(f"{key} must contain at least one underscore")
|
|
|
|
else:
|
|
# This allows for answers which require more than one value for
|
|
# the same form input (e.g. checkbox inputs). The convention is that
|
|
# if the name ends with '[]' (which looks like an array), then the
|
|
# answer will be an array.
|
|
# if the name ends with '{}' (Which looks like a dict),
|
|
# then the answer will be a dict
|
|
is_list_key = name.endswith('[]')
|
|
is_dict_key = name.endswith('{}')
|
|
name = name[:-2] if is_list_key or is_dict_key else name
|
|
|
|
if is_list_key:
|
|
val = data.getall(key)
|
|
elif is_dict_key:
|
|
try:
|
|
val = json.loads(data[key])
|
|
# If the submission wasn't deserializable, raise an error.
|
|
except(KeyError, ValueError):
|
|
raise ValueError( # lint-amnesty, pylint: disable=raise-missing-from
|
|
f"Invalid submission: {data[key]} for {key}"
|
|
)
|
|
else:
|
|
val = data[key]
|
|
|
|
# If the name already exists, then we don't want
|
|
# to override it. Raise an error instead
|
|
if name in answers: # lint-amnesty, pylint: disable=no-else-raise
|
|
raise ValueError(f"Key {name} already exists in answers dict")
|
|
else:
|
|
answers[name] = val
|
|
|
|
return answers
|
|
|
|
def publish_grade(self, score=None, only_if_higher=None, **kwargs):
|
|
"""
|
|
Publishes the student's current grade to the system as an event
|
|
"""
|
|
if not score:
|
|
score = self.score
|
|
event = {
|
|
'value': score.raw_earned,
|
|
'max_value': score.raw_possible,
|
|
'only_if_higher': only_if_higher,
|
|
}
|
|
if kwargs.get('grader_response'):
|
|
event['grader_response'] = kwargs['grader_response']
|
|
|
|
self.runtime.publish(self, 'grade', event)
|
|
|
|
return {'grade': self.score.raw_earned, 'max_grade': self.score.raw_possible}
|
|
|
|
# pylint: disable=too-many-statements
|
|
def submit_problem(self, data, override_time=False):
|
|
"""
|
|
Checks whether answers to a problem are correct
|
|
|
|
Returns a map of correct/incorrect answers:
|
|
{'success' : 'correct' | 'incorrect' | AJAX alert msg string,
|
|
'contents' : html}
|
|
"""
|
|
event_info = {}
|
|
event_info['state'] = self.lcp.get_state()
|
|
event_info['problem_id'] = str(self.location)
|
|
|
|
self.lcp.has_saved_answers = False
|
|
answers = self.make_dict_of_responses(data)
|
|
answers_without_files = convert_files_to_filenames(answers)
|
|
event_info['answers'] = answers_without_files
|
|
|
|
metric_name = 'xmodule.capa.check_problem.{}'.format # lint-amnesty, pylint: disable=unused-variable
|
|
# Can override current time
|
|
current_time = datetime.datetime.now(utc)
|
|
if override_time is not False:
|
|
current_time = override_time
|
|
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
|
|
# Too late. Cannot submit
|
|
if self.closed():
|
|
log.error(
|
|
'ProblemClosedError: Problem %s, close date: %s, due:%s, is_past_due: %s, attempts: %s/%s,',
|
|
str(self.location),
|
|
self.close_date,
|
|
self.due,
|
|
self.is_past_due(),
|
|
self.attempts,
|
|
self.max_attempts,
|
|
)
|
|
event_info['failure'] = 'closed'
|
|
self.publish_unmasked('problem_check_fail', event_info)
|
|
raise NotFoundError(_("Problem is closed."))
|
|
|
|
# Problem submitted. Student should reset before checking again
|
|
if self.done and self.rerandomize == RANDOMIZATION.ALWAYS:
|
|
event_info['failure'] = 'unreset'
|
|
self.publish_unmasked('problem_check_fail', event_info)
|
|
raise NotFoundError(_("Problem must be reset before it can be submitted again."))
|
|
|
|
# Problem queued. Students must wait a specified waittime before they are allowed to submit
|
|
# IDEA: consider stealing code from below: pretty-print of seconds, cueing of time remaining
|
|
if self.lcp.is_queued():
|
|
prev_submit_time = self.lcp.get_recentmost_queuetime()
|
|
|
|
xqueue_service = self.lcp.capa_system.xqueue
|
|
waittime_between_requests = xqueue_service.waittime if xqueue_service else 0
|
|
if (current_time - prev_submit_time).total_seconds() < waittime_between_requests:
|
|
msg = _("You must wait at least {wait} seconds between submissions.").format(
|
|
wait=waittime_between_requests)
|
|
return {'success': msg, 'html': ''}
|
|
|
|
# Wait time between resets: check if is too soon for submission.
|
|
if self.last_submission_time is not None and self.submission_wait_seconds not in [0, None]:
|
|
seconds_since_submission = (current_time - self.last_submission_time).total_seconds()
|
|
if seconds_since_submission < self.submission_wait_seconds:
|
|
remaining_secs = int(self.submission_wait_seconds - seconds_since_submission)
|
|
msg = _('You must wait at least {wait_secs} between submissions. {remaining_secs} remaining.').format(
|
|
wait_secs=self.pretty_print_seconds(self.submission_wait_seconds),
|
|
remaining_secs=self.pretty_print_seconds(remaining_secs))
|
|
return {
|
|
'success': msg,
|
|
'html': ''
|
|
}
|
|
|
|
try:
|
|
# expose the attempt number to a potential python custom grader
|
|
# self.lcp.context['attempt'] refers to the attempt number (1-based)
|
|
self.lcp.context['attempt'] = self.attempts + 1
|
|
correct_map = self.lcp.grade_answers(answers)
|
|
# self.attempts refers to the number of attempts that did not
|
|
# raise an error (0-based)
|
|
self.attempts = self.attempts + 1
|
|
self.lcp.done = True
|
|
self.set_state_from_lcp()
|
|
self.set_score(self.score_from_lcp(self.lcp))
|
|
self.set_last_submission_time()
|
|
|
|
except (StudentInputError, ResponseError, LoncapaProblemError) as inst:
|
|
if self.debug:
|
|
log.warning(
|
|
"StudentInputError in capa_block:problem_check",
|
|
exc_info=True
|
|
)
|
|
|
|
# Save the user's state before failing
|
|
self.set_state_from_lcp()
|
|
self.set_score(self.score_from_lcp(self.lcp))
|
|
|
|
# If the user is a staff member, include
|
|
# the full exception, including traceback,
|
|
# in the response
|
|
user_is_staff = self.runtime.service(self, 'user').get_current_user().opt_attrs.get(ATTR_KEY_USER_IS_STAFF)
|
|
if user_is_staff:
|
|
msg = f"Staff debug info: {traceback.format_exc()}"
|
|
|
|
# Otherwise, display just an error message,
|
|
# without a stack trace
|
|
else:
|
|
full_error = inst.args[0]
|
|
try:
|
|
# only return the error value of the exception
|
|
msg = full_error.split("\\n")[-2].split(": ", 1)[1]
|
|
except IndexError:
|
|
msg = full_error
|
|
|
|
return {'success': msg}
|
|
|
|
except Exception as err:
|
|
# Save the user's state before failing
|
|
self.set_state_from_lcp()
|
|
self.set_score(self.score_from_lcp(self.lcp))
|
|
|
|
if self.debug:
|
|
msg = f"Error checking problem: {str(err)}"
|
|
msg += f'\nTraceback:\n{traceback.format_exc()}'
|
|
return {'success': msg}
|
|
raise
|
|
published_grade = self.publish_grade()
|
|
|
|
# success = correct if ALL questions in this problem are correct
|
|
success = 'correct'
|
|
for answer_id in correct_map:
|
|
if not correct_map.is_correct(answer_id):
|
|
success = 'incorrect'
|
|
|
|
# NOTE: We are logging both full grading and queued-grading submissions. In the latter,
|
|
# 'success' will always be incorrect
|
|
event_info['grade'] = published_grade['grade']
|
|
event_info['max_grade'] = published_grade['max_grade']
|
|
event_info['correct_map'] = correct_map.get_dict()
|
|
event_info['success'] = success
|
|
event_info['attempts'] = self.attempts
|
|
event_info['submission'] = self.get_submission_metadata_safe(answers_without_files, correct_map)
|
|
self.publish_unmasked('problem_check', event_info)
|
|
|
|
# render problem into HTML
|
|
html = self.get_problem_html(encapsulate=False, submit_notification=True)
|
|
|
|
# Withhold success indicator if hiding correctness
|
|
if not self.correctness_available():
|
|
success = 'submitted'
|
|
|
|
return {
|
|
'success': success,
|
|
'contents': html
|
|
}
|
|
# pylint: enable=too-many-statements
|
|
|
|
def publish_unmasked(self, title, event_info):
|
|
"""
|
|
All calls to runtime.publish route through here so that the
|
|
choice names can be unmasked.
|
|
"""
|
|
# Do the unmask translates on a copy of event_info,
|
|
# avoiding problems where an event_info is unmasked twice.
|
|
event_unmasked = copy.deepcopy(event_info)
|
|
self.unmask_event(event_unmasked)
|
|
self.runtime.publish(self, title, event_unmasked)
|
|
|
|
def unmask_event(self, event_info):
|
|
"""
|
|
Translates in-place the event_info to account for masking
|
|
and adds information about permutation options in force.
|
|
"""
|
|
# answers is like: {u'i4x-Stanford-CS99-problem-dada976e76f34c24bc8415039dee1300_2_1': u'mask_0'}
|
|
# Each response values has an answer_id which matches the key in answers.
|
|
for response in self.lcp.responders.values():
|
|
# Un-mask choice names in event_info for masked responses.
|
|
if response.has_mask():
|
|
# We don't assume much about the structure of event_info,
|
|
# but check for the existence of the things we need to un-mask.
|
|
|
|
# Look for answers/id
|
|
answer = event_info.get('answers', {}).get(response.answer_id)
|
|
if answer is not None:
|
|
event_info['answers'][response.answer_id] = response.unmask_name(answer)
|
|
|
|
# Look for state/student_answers/id
|
|
answer = event_info.get('state', {}).get('student_answers', {}).get(response.answer_id)
|
|
if answer is not None:
|
|
event_info['state']['student_answers'][response.answer_id] = response.unmask_name(answer)
|
|
|
|
# Look for old_state/student_answers/id -- parallel to the above case, happens on reset
|
|
answer = event_info.get('old_state', {}).get('student_answers', {}).get(response.answer_id)
|
|
if answer is not None:
|
|
event_info['old_state']['student_answers'][response.answer_id] = response.unmask_name(answer)
|
|
|
|
# Add 'permutation' to event_info for permuted responses.
|
|
permutation_option = None
|
|
if response.has_shuffle():
|
|
permutation_option = 'shuffle'
|
|
elif response.has_answerpool():
|
|
permutation_option = 'answerpool'
|
|
|
|
if permutation_option is not None:
|
|
# Add permutation record tuple: (one of:'shuffle'/'answerpool', [as-displayed list])
|
|
if 'permutation' not in event_info:
|
|
event_info['permutation'] = {}
|
|
event_info['permutation'][response.answer_id] = (permutation_option, response.unmask_order())
|
|
|
|
def pretty_print_seconds(self, num_seconds):
|
|
"""
|
|
Returns time duration nicely formated, e.g. "3 minutes 4 seconds"
|
|
"""
|
|
# Here _ is the N variant ungettext that does pluralization with a 3-arg call
|
|
ungettext = self.runtime.service(self, "i18n").ungettext
|
|
hours = num_seconds // 3600
|
|
sub_hour = num_seconds % 3600
|
|
minutes = sub_hour // 60
|
|
seconds = sub_hour % 60
|
|
display = ""
|
|
if hours > 0:
|
|
display += ungettext("{num_hour} hour", "{num_hour} hours", hours).format(num_hour=hours)
|
|
if minutes > 0:
|
|
if display != "":
|
|
display += " "
|
|
# translators: "minute" refers to a minute of time
|
|
display += ungettext("{num_minute} minute", "{num_minute} minutes", minutes).format(num_minute=minutes)
|
|
# Taking care to make "0 seconds" instead of "" for 0 time
|
|
if seconds > 0 or (hours == 0 and minutes == 0):
|
|
if display != "":
|
|
display += " "
|
|
# translators: "second" refers to a second of time
|
|
display += ungettext("{num_second} second", "{num_second} seconds", seconds).format(num_second=seconds)
|
|
return display
|
|
|
|
def get_submission_metadata_safe(self, answers, correct_map):
|
|
"""
|
|
Ensures that no exceptions are thrown while generating input metadata summaries. Returns the
|
|
summary if it is successfully created, otherwise an empty dictionary.
|
|
"""
|
|
try:
|
|
return self.get_submission_metadata(answers, correct_map)
|
|
except Exception: # pylint: disable=broad-except
|
|
# NOTE: The above process requires deep inspection of capa structures that may break for some
|
|
# uncommon problem types. Ensure that it does not prevent answer submission in those
|
|
# cases. Any occurrences of errors in this block should be investigated and resolved.
|
|
log.exception('Unable to gather submission metadata, it will not be included in the event.')
|
|
|
|
return {}
|
|
|
|
def get_submission_metadata(self, answers, correct_map):
|
|
"""
|
|
Return a map of inputs to their corresponding summarized metadata.
|
|
|
|
Returns:
|
|
A map whose keys are a unique identifier for the input (in this case a capa input_id) and
|
|
whose values are:
|
|
|
|
question (str): Is the prompt that was presented to the student. It corresponds to the
|
|
label of the input.
|
|
answer (mixed): Is the answer the student provided. This may be a rich structure,
|
|
however it must be json serializable.
|
|
response_type (str): The XML tag of the capa response type.
|
|
input_type (str): The XML tag of the capa input type.
|
|
correct (bool): Whether or not the provided answer is correct. Will be an empty
|
|
string if correctness could not be determined.
|
|
variant (str): In some cases the same question can have several different variants.
|
|
This string should uniquely identify the variant of the question that was answered.
|
|
In the capa context this corresponds to the `seed`.
|
|
|
|
This function attempts to be very conservative and make very few assumptions about the structure
|
|
of the problem. If problem related metadata cannot be located it should be replaced with empty
|
|
strings ''.
|
|
"""
|
|
input_metadata = {}
|
|
for input_id, internal_answer in answers.items():
|
|
answer_input = self.lcp.inputs.get(input_id)
|
|
|
|
if answer_input is None:
|
|
log.warning('Input id %s is not mapped to an input type.', input_id)
|
|
|
|
answer_response = None
|
|
for responder in self.lcp.responders.values():
|
|
if input_id in responder.answer_ids:
|
|
answer_response = responder
|
|
|
|
if answer_response is None:
|
|
log.warning('Answer responder could not be found for input_id %s.', input_id)
|
|
|
|
user_visible_answer = internal_answer
|
|
if hasattr(answer_input, 'get_user_visible_answer'):
|
|
user_visible_answer = answer_input.get_user_visible_answer(internal_answer)
|
|
|
|
# If this problem has rerandomize enabled, then it will generate N variants of the
|
|
# question, one per unique seed value. In this case we would like to know which
|
|
# variant was selected. Ideally it would be nice to have the exact question that
|
|
# was presented to the user, with values interpolated etc, but that can be done
|
|
# later if necessary.
|
|
variant = ''
|
|
if self.rerandomize != RANDOMIZATION.NEVER:
|
|
variant = self.get_seed()
|
|
|
|
is_correct = correct_map.is_correct(input_id)
|
|
if is_correct is None:
|
|
is_correct = ''
|
|
|
|
response_data = getattr(answer_input, 'response_data', {})
|
|
input_metadata[input_id] = {
|
|
'question': response_data.get('label', ''),
|
|
'answer': user_visible_answer,
|
|
'response_type': getattr(getattr(answer_response, 'xml', None), 'tag', ''),
|
|
'input_type': getattr(answer_input, 'tag', ''),
|
|
'correct': is_correct,
|
|
'variant': variant,
|
|
'group_label': response_data.get('group_label', ''),
|
|
}
|
|
|
|
return input_metadata
|
|
|
|
def save_problem(self, data):
|
|
"""
|
|
Save the passed in answers.
|
|
Returns a dict { 'success' : bool, 'msg' : message }
|
|
The message is informative on success, and an error message on failure.
|
|
"""
|
|
event_info = {}
|
|
event_info['state'] = self.lcp.get_state()
|
|
event_info['problem_id'] = str(self.location)
|
|
|
|
answers = self.make_dict_of_responses(data)
|
|
event_info['answers'] = answers
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
|
|
# Too late. Cannot submit
|
|
if self.closed() and not self.max_attempts == 0:
|
|
event_info['failure'] = 'closed'
|
|
self.publish_unmasked('save_problem_fail', event_info)
|
|
return {
|
|
'success': False,
|
|
# pylint: disable=line-too-long
|
|
# Translators: 'closed' means the problem's due date has passed. You may no longer attempt to solve the problem.
|
|
'msg': _("Problem is closed."),
|
|
# pylint: enable=line-too-long
|
|
}
|
|
|
|
# Problem submitted. Student should reset before saving
|
|
# again.
|
|
if self.done and self.rerandomize == RANDOMIZATION.ALWAYS:
|
|
event_info['failure'] = 'done'
|
|
self.publish_unmasked('save_problem_fail', event_info)
|
|
return {
|
|
'success': False,
|
|
'msg': _("Problem needs to be reset prior to save.")
|
|
}
|
|
|
|
self.lcp.student_answers = answers
|
|
self.lcp.has_saved_answers = True
|
|
|
|
self.set_state_from_lcp()
|
|
self.set_score(self.score_from_lcp(self.lcp))
|
|
|
|
self.publish_unmasked('save_problem_success', event_info)
|
|
msg = _("Your answers have been saved.")
|
|
if not self.max_attempts == 0:
|
|
msg = _(
|
|
"Your answers have been saved but not graded. Click '{button_name}' to grade them."
|
|
).format(button_name=self.submit_button_name())
|
|
return {
|
|
'success': True,
|
|
'msg': msg,
|
|
'html': self.get_problem_html(encapsulate=False)
|
|
}
|
|
|
|
def reset_problem(self, _data):
|
|
"""
|
|
Changes problem state to unfinished -- removes student answers,
|
|
Causes problem to rerender itself if randomization is enabled.
|
|
|
|
Returns a dictionary of the form:
|
|
{'success': True/False,
|
|
'html': Problem HTML string }
|
|
|
|
If an error occurs, the dictionary will also have an
|
|
`error` key containing an error message.
|
|
"""
|
|
event_info = {}
|
|
event_info['old_state'] = self.lcp.get_state()
|
|
event_info['problem_id'] = str(self.location)
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
|
|
if self.closed():
|
|
event_info['failure'] = 'closed'
|
|
self.publish_unmasked('reset_problem_fail', event_info)
|
|
return {
|
|
'success': False,
|
|
# pylint: disable=line-too-long
|
|
# Translators: 'closed' means the problem's due date has passed. You may no longer attempt to solve the problem.
|
|
'msg': _("You cannot select Reset for a problem that is closed."),
|
|
# pylint: enable=line-too-long
|
|
}
|
|
|
|
if not self.is_submitted():
|
|
event_info['failure'] = 'not_done'
|
|
self.publish_unmasked('reset_problem_fail', event_info)
|
|
return {
|
|
'success': False,
|
|
'msg': _("You must submit an answer before you can select Reset."),
|
|
}
|
|
|
|
if self.is_submitted() and self.rerandomize in [RANDOMIZATION.ALWAYS, RANDOMIZATION.ONRESET]:
|
|
# Reset random number generator seed.
|
|
self.choose_new_seed()
|
|
|
|
# Generate a new problem with either the previous seed or a new seed
|
|
self.lcp = self.new_lcp(None)
|
|
|
|
# Pull in the new problem seed
|
|
self.set_state_from_lcp()
|
|
self.set_score(self.score_from_lcp(self.lcp))
|
|
|
|
# Grade may have changed, so publish new value
|
|
self.publish_grade()
|
|
|
|
event_info['new_state'] = self.lcp.get_state()
|
|
self.publish_unmasked('reset_problem', event_info)
|
|
|
|
return {
|
|
'success': True,
|
|
'html': self.get_problem_html(encapsulate=False),
|
|
}
|
|
|
|
# ScorableXBlockMixin methods
|
|
|
|
def rescore(self, only_if_higher=False):
|
|
"""
|
|
Checks whether the existing answers to a problem are correct.
|
|
|
|
This is called when the correct answer to a problem has been changed,
|
|
and the grade should be re-evaluated.
|
|
|
|
If only_if_higher is True, the answer and grade are updated
|
|
only if the resulting score is higher than before.
|
|
|
|
Returns a dict with one key:
|
|
{'success' : 'correct' | 'incorrect' | AJAX alert msg string }
|
|
|
|
Raises NotFoundError if called on a problem that has not yet been
|
|
answered, or NotImplementedError if it's a problem that cannot be rescored.
|
|
|
|
Returns the error messages for exceptions occurring while performing
|
|
the rescoring, rather than throwing them.
|
|
"""
|
|
event_info = {'state': self.lcp.get_state(), 'problem_id': str(self.location)}
|
|
|
|
_ = self.runtime.service(self, "i18n").ugettext
|
|
|
|
if not self.lcp.supports_rescoring():
|
|
event_info['failure'] = 'unsupported'
|
|
self.publish_unmasked('problem_rescore_fail', event_info)
|
|
# pylint: disable=line-too-long
|
|
# Translators: 'rescoring' refers to the act of re-submitting a student's solution so it can get a new score.
|
|
raise NotImplementedError(_("Problem's definition does not support rescoring."))
|
|
# pylint: enable=line-too-long
|
|
|
|
if not self.done:
|
|
event_info['failure'] = 'unanswered'
|
|
self.publish_unmasked('problem_rescore_fail', event_info)
|
|
raise NotFoundError(_("Problem must be answered before it can be graded again."))
|
|
|
|
# get old score, for comparison:
|
|
orig_score = self.get_score()
|
|
event_info['orig_score'] = orig_score.raw_earned
|
|
event_info['orig_total'] = orig_score.raw_possible
|
|
try:
|
|
self.update_correctness()
|
|
calculated_score = self.calculate_score()
|
|
except (StudentInputError, ResponseError, LoncapaProblemError) as inst: # lint-amnesty, pylint: disable=unused-variable
|
|
log.warning("Input error in capa_block:problem_rescore", exc_info=True)
|
|
event_info['failure'] = 'input_error'
|
|
self.publish_unmasked('problem_rescore_fail', event_info)
|
|
raise
|
|
|
|
except Exception:
|
|
event_info['failure'] = 'unexpected'
|
|
self.publish_unmasked('problem_rescore_fail', event_info)
|
|
raise
|
|
|
|
# rescoring should have no effect on attempts, so don't
|
|
# need to increment here, or mark done. Just save.
|
|
self.set_state_from_lcp()
|
|
self.publish_grade(score=calculated_score, only_if_higher=only_if_higher)
|
|
|
|
event_info['new_score'] = calculated_score.raw_earned
|
|
event_info['new_total'] = calculated_score.raw_possible
|
|
|
|
# success = correct if ALL questions in this problem are correct
|
|
success = 'correct'
|
|
for answer_id in self.lcp.correct_map:
|
|
if not self.lcp.correct_map.is_correct(answer_id):
|
|
success = 'incorrect'
|
|
|
|
# NOTE: We are logging both full grading and queued-grading submissions. In the latter,
|
|
# 'success' will always be incorrect
|
|
event_info['correct_map'] = self.lcp.correct_map.get_dict()
|
|
event_info['success'] = success
|
|
event_info['attempts'] = self.attempts
|
|
self.publish_unmasked('problem_rescore', event_info)
|
|
|
|
def has_submitted_answer(self):
|
|
return self.done
|
|
|
|
def set_score(self, score):
|
|
"""
|
|
Sets the internal score for the problem. This is not derived directly
|
|
from the internal LCP in keeping with the ScorableXBlock spec.
|
|
"""
|
|
self.score = score
|
|
|
|
def get_score(self):
|
|
"""
|
|
Returns the score currently set on the block.
|
|
"""
|
|
return self.score
|
|
|
|
def update_correctness(self):
|
|
"""
|
|
Updates correct map of the LCP.
|
|
Operates by creating a new correctness map based on the current
|
|
state of the LCP, and updating the old correctness map of the LCP.
|
|
"""
|
|
# Make sure that the attempt number is always at least 1 for grading purposes,
|
|
# even if the number of attempts have been reset and this problem is regraded.
|
|
self.lcp.context['attempt'] = max(self.attempts, 1)
|
|
new_correct_map = self.lcp.get_grade_from_current_answers(None)
|
|
self.lcp.correct_map.update(new_correct_map)
|
|
|
|
def calculate_score(self):
|
|
"""
|
|
Returns the score calculated from the current problem state.
|
|
"""
|
|
new_score = self.lcp.calculate_score()
|
|
return Score(raw_earned=new_score['score'], raw_possible=new_score['total'])
|
|
|
|
def score_from_lcp(self, lcp):
|
|
"""
|
|
Returns the score associated with the correctness map
|
|
currently stored by the LCP.
|
|
"""
|
|
lcp_score = lcp.calculate_score()
|
|
return Score(raw_earned=lcp_score['score'], raw_possible=lcp_score['total'])
|
|
|
|
|
|
class ComplexEncoder(json.JSONEncoder):
|
|
"""
|
|
Extend the JSON encoder to correctly handle complex numbers
|
|
"""
|
|
def default(self, obj): # lint-amnesty, pylint: disable=arguments-differ, method-hidden
|
|
"""
|
|
Print a nicely formatted complex number, or default to the JSON encoder
|
|
"""
|
|
if isinstance(obj, complex):
|
|
return f"{obj.real:.7g}{obj.imag:+.7g}*j"
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
def randomization_bin(seed, problem_id):
|
|
"""
|
|
Pick a randomization bin for the problem given the user's seed and a problem id.
|
|
|
|
We do this because we only want e.g. 20 randomizations of a problem to make analytics
|
|
interesting. To avoid having sets of students that always get the same problems,
|
|
we'll combine the system's per-student seed with the problem id in picking the bin.
|
|
"""
|
|
r_hash = hashlib.sha1()
|
|
r_hash.update(str(seed).encode())
|
|
r_hash.update(str(problem_id).encode())
|
|
# get the first few digits of the hash, convert to an int, then mod.
|
|
return int(r_hash.hexdigest()[:7], 16) % NUM_RANDOMIZATION_BINS
|