From b6ea3f4e692909f71cf251edc4a946138533fef9 Mon Sep 17 00:00:00 2001
From: John Orr
Date: Tue, 14 Jan 2014 23:02:34 +0000
Subject: [PATCH] Factor the core Capa problem logic into a mixin. This mixin
can be used by either the current XModule, or by an XBlock. Leave the
XModule-specific code in capa_module and move the shared code to capa_base.
Plan to move caba_base into the capa package in the future.
---
common/lib/xmodule/xmodule/capa_base.py | 1089 +++++++++++++++++
common/lib/xmodule/xmodule/capa_module.py | 1072 +---------------
.../xmodule/xmodule/tests/test_capa_module.py | 6 +-
3 files changed, 1094 insertions(+), 1073 deletions(-)
create mode 100644 common/lib/xmodule/xmodule/capa_base.py
diff --git a/common/lib/xmodule/xmodule/capa_base.py b/common/lib/xmodule/xmodule/capa_base.py
new file mode 100644
index 0000000000..bf621fc396
--- /dev/null
+++ b/common/lib/xmodule/xmodule/capa_base.py
@@ -0,0 +1,1089 @@
+"""Implements basics of Capa, including class CapaModule."""
+import cgi
+import datetime
+import hashlib
+import json
+import logging
+import os
+import traceback
+import struct
+import sys
+
+from pkg_resources import resource_string
+
+from capa.capa_problem import LoncapaProblem
+from capa.responsetypes import StudentInputError, \
+ ResponseError, LoncapaProblemError
+from capa.util import convert_files_to_filenames
+from .progress import Progress
+from xmodule.exceptions import NotFoundError, ProcessingError
+from xblock.fields import Scope, String, Boolean, Dict, Integer, Float
+from .fields import Timedelta, Date
+from django.utils.timezone import UTC
+from .util.duedate import get_extended_due_date
+
+log = logging.getLogger("edx.courseware")
+
+
+# Generate this many different variants of problems with rerandomize=per_student
+NUM_RANDOMIZATION_BINS = 20
+# Never produce more than this many different seeds, no matter what.
+MAX_RANDOMIZATION_BINS = 1000
+
+
+def randomization_bin(seed, problem_id):
+ """
+ Pick a randomization bin for the problem given the user's seed and a problem id.
+
+ We do this because we only want e.g. 20 randomizations of a problem to make analytics
+ interesting. To avoid having sets of students that always get the same problems,
+ we'll combine the system's per-student seed with the problem id in picking the bin.
+ """
+ r_hash = hashlib.sha1()
+ r_hash.update(str(seed))
+ r_hash.update(str(problem_id))
+ # get the first few digits of the hash, convert to an int, then mod.
+ return int(r_hash.hexdigest()[:7], 16) % NUM_RANDOMIZATION_BINS
+
+
+class Randomization(String):
+ """
+ Define a field to store how to randomize a problem.
+ """
+ def from_json(self, value):
+ if value in ("", "true"):
+ return "always"
+ elif value == "false":
+ return "per_student"
+ return value
+
+ to_json = from_json
+
+
+class ComplexEncoder(json.JSONEncoder):
+ """
+ Extend the JSON encoder to correctly handle complex numbers
+ """
+ def default(self, obj):
+ """
+ Print a nicely formatted complex number, or default to the JSON encoder
+ """
+ if isinstance(obj, complex):
+ return u"{real:.7g}{imag:+.7g}*j".format(real=obj.real, imag=obj.imag)
+ return json.JSONEncoder.default(self, obj)
+
+
+class CapaFields(object):
+ """
+ Define the possible fields for a Capa problem
+ """
+ display_name = String(
+ display_name="Display Name",
+ help="This name appears in the horizontal navigation at the top of the page.",
+ scope=Scope.settings,
+ # it'd be nice to have a useful default but it screws up other things; so,
+ # use display_name_with_default for those
+ default="Blank Advanced Problem"
+ )
+ attempts = Integer(help="Number of attempts taken by the student on this problem",
+ default=0, scope=Scope.user_state)
+ max_attempts = Integer(
+ display_name="Maximum Attempts",
+ help=("Defines the number of times a student can try to answer this problem. "
+ "If the value is not set, infinite attempts are allowed."),
+ values={"min": 0}, scope=Scope.settings
+ )
+ due = Date(help="Date that this problem is due by", scope=Scope.settings)
+ extended_due = Date(
+ help="Date that this problem is due by for a particular student. This "
+ "can be set by an instructor, and will override the global due "
+ "date if it is set to a date that is later than the global due "
+ "date.",
+ default=None,
+ scope=Scope.user_state,
+ )
+ graceperiod = Timedelta(
+ help="Amount of time after the due date that submissions will be accepted",
+ scope=Scope.settings
+ )
+ showanswer = String(
+ display_name="Show Answer",
+ help=("Defines when to show the answer to the problem. "
+ "A default value can be set in Advanced Settings."),
+ scope=Scope.settings,
+ default="finished",
+ values=[
+ {"display_name": "Always", "value": "always"},
+ {"display_name": "Answered", "value": "answered"},
+ {"display_name": "Attempted", "value": "attempted"},
+ {"display_name": "Closed", "value": "closed"},
+ {"display_name": "Finished", "value": "finished"},
+ {"display_name": "Past Due", "value": "past_due"},
+ {"display_name": "Never", "value": "never"}]
+ )
+ force_save_button = Boolean(
+ help="Whether to force the save button to appear on the page",
+ scope=Scope.settings,
+ default=False
+ )
+ rerandomize = Randomization(
+ display_name="Randomization",
+ help="Defines how often inputs are randomized when a student loads the problem. "
+ "This setting only applies to problems that can have randomly generated numeric values. "
+ "A default value can be set in Advanced Settings.",
+ default="never",
+ scope=Scope.settings,
+ values=[
+ {"display_name": "Always", "value": "always"},
+ {"display_name": "On Reset", "value": "onreset"},
+ {"display_name": "Never", "value": "never"},
+ {"display_name": "Per Student", "value": "per_student"}
+ ]
+ )
+ data = String(help="XML data for the problem", scope=Scope.content, default="")
+ correct_map = Dict(help="Dictionary with the correctness of current student answers",
+ scope=Scope.user_state, default={})
+ input_state = Dict(help="Dictionary for maintaining the state of inputtypes", scope=Scope.user_state)
+ student_answers = Dict(help="Dictionary with the current student responses", scope=Scope.user_state)
+ done = Boolean(help="Whether the student has answered the problem", scope=Scope.user_state)
+ seed = Integer(help="Random seed for this student", scope=Scope.user_state)
+ weight = Float(
+ display_name="Problem Weight",
+ help=("Defines the number of points each problem is worth. "
+ "If the value is not set, each response field in the problem is worth one point."),
+ values={"min": 0, "step": .1},
+ scope=Scope.settings
+ )
+ markdown = String(help="Markdown source of this module", default=None, scope=Scope.settings)
+ source_code = String(
+ help="Source code for LaTeX and Word problems. This feature is not well-supported.",
+ scope=Scope.settings
+ )
+ text_customization = Dict(
+ help="String customization substitutions for particular locations",
+ scope=Scope.settings
+ # TODO: someday it should be possible to not duplicate this definition here
+ # and in inheritance.py
+ )
+ use_latex_compiler = Boolean(
+ help="Enable LaTeX templates?",
+ default=False,
+ scope=Scope.settings
+ )
+
+
+class CapaMixin(CapaFields):
+ """
+ Core logic for Capa Problem, which can be used by XModules or XBlocks.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(CapaMixin, self).__init__(*args, **kwargs)
+
+ due_date = get_extended_due_date(self)
+
+ if self.graceperiod is not None and due_date:
+ self.close_date = due_date + self.graceperiod
+ else:
+ self.close_date = due_date
+
+ if self.seed is None:
+ self.choose_new_seed()
+
+ # Need the problem location in openendedresponse to send out. Adding
+ # it to the system here seems like the least clunky way to get it
+ # there.
+ self.runtime.set('location', self.location.url())
+
+ try:
+ # TODO (vshnayder): move as much as possible of this work and error
+ # checking to descriptor load time
+ self.lcp = self.new_lcp(self.get_state_for_lcp())
+
+ # At this point, we need to persist the randomization seed
+ # so that when the problem is re-loaded (to check/view/save)
+ # it stays the same.
+ # However, we do not want to write to the database
+ # every time the module is loaded.
+ # So we set the seed ONLY when there is not one set already
+ if self.seed is None:
+ self.seed = self.lcp.seed
+
+ except Exception as err: # pylint: disable=broad-except
+ msg = u'cannot create LoncapaProblem {loc}: {err}'.format(
+ loc=self.location.url(), err=err)
+ # TODO (vshnayder): do modules need error handlers too?
+ # We shouldn't be switching on DEBUG.
+ if self.runtime.DEBUG:
+ log.warning(msg)
+ # TODO (vshnayder): This logic should be general, not here--and may
+ # want to preserve the data instead of replacing it.
+ # e.g. in the CMS
+ msg = u'{msg}
'.format(msg=cgi.escape(msg))
+ msg += u'{tb}
'.format(
+ tb=cgi.escape(traceback.format_exc()))
+ # create a dummy problem with error message instead of failing
+ problem_text = (u''
+ u'Problem {url} has an error:{msg}'.format(
+ url=self.location.url(),
+ msg=msg)
+ )
+ self.lcp = self.new_lcp(self.get_state_for_lcp(), text=problem_text)
+ else:
+ # add extra info and raise
+ raise Exception(msg), None, sys.exc_info()[2]
+
+ self.set_state_from_lcp()
+
+ assert self.seed is not None
+
+ def choose_new_seed(self):
+ """
+ Choose a new seed.
+ """
+ if self.rerandomize == 'never':
+ self.seed = 1
+ elif self.rerandomize == "per_student" and hasattr(self.runtime, 'seed'):
+ # see comment on randomization_bin
+ self.seed = randomization_bin(self.runtime.seed, self.location.url)
+ else:
+ self.seed = struct.unpack('i', os.urandom(4))[0]
+
+ # So that sandboxed code execution can be cached, but still have an interesting
+ # number of possibilities, cap the number of different random seeds.
+ self.seed %= MAX_RANDOMIZATION_BINS
+
+ def new_lcp(self, state, text=None):
+ """
+ Generate a new Loncapa Problem
+ """
+ if text is None:
+ text = self.data
+
+ return LoncapaProblem(
+ problem_text=text,
+ id=self.location.html_id(),
+ state=state,
+ seed=self.seed,
+ system=self.runtime,
+ )
+
+ def get_state_for_lcp(self):
+ """
+ Give a dictionary holding the state of the module
+ """
+ return {
+ 'done': self.done,
+ 'correct_map': self.correct_map,
+ 'student_answers': self.student_answers,
+ 'input_state': self.input_state,
+ 'seed': self.seed,
+ }
+
+ def set_state_from_lcp(self):
+ """
+ Set the module's state from the settings in `self.lcp`
+ """
+ lcp_state = self.lcp.get_state()
+ self.done = lcp_state['done']
+ self.correct_map = lcp_state['correct_map']
+ self.input_state = lcp_state['input_state']
+ self.student_answers = lcp_state['student_answers']
+ self.seed = lcp_state['seed']
+
+ def get_score(self):
+ """
+ Access the problem's score
+ """
+ return self.lcp.get_score()
+
+ def max_score(self):
+ """
+ Access the problem's max score
+ """
+ return self.lcp.get_max_score()
+
+ def get_progress(self):
+ """
+ For now, just return score / max_score
+ """
+ score_dict = self.get_score()
+ score = score_dict['score']
+ total = score_dict['total']
+
+ if total > 0:
+ if self.weight is not None:
+ # scale score and total by weight/total:
+ score = score * self.weight / total
+ total = self.weight
+
+ try:
+ return Progress(score, total)
+ except (TypeError, ValueError):
+ log.exception("Got bad progress")
+ return None
+ return None
+
+ def get_html(self):
+ """
+ Return some html with data about the module
+ """
+ progress = self.get_progress()
+ return self.runtime.render_template('problem_ajax.html', {
+ 'element_id': self.location.html_id(),
+ 'id': self.id,
+ 'ajax_url': self.runtime.ajax_url,
+ 'progress_status': Progress.to_js_status_str(progress),
+ 'progress_detail': Progress.to_js_detail_str(progress),
+ })
+
+ def check_button_name(self):
+ """
+ Determine the name for the "check" button.
+
+ Usually it is just "Check", but if this is the student's
+ final attempt, change the name to "Final Check".
+ The text can be customized by the text_customization setting.
+ """
+ # The logic flow is a little odd so that _('xxx') strings can be found for
+ # translation while also running _() just once for each string.
+ _ = self.runtime.service(self, "i18n").ugettext
+ check = _('Check')
+ final_check = _('Final Check')
+
+ # Apply customizations if present
+ if 'custom_check' in self.text_customization:
+ check = _(self.text_customization.get('custom_check'))
+ if 'custom_final_check' in self.text_customization:
+ final_check = _(self.text_customization.get('custom_final_check'))
+ # TODO: need a way to get the customized words into the list of
+ # words to be translated
+
+ if self.max_attempts is not None and self.attempts >= self.max_attempts - 1:
+ return final_check
+ else:
+ return check
+
+ def should_show_check_button(self):
+ """
+ Return True/False to indicate whether to show the "Check" button.
+ """
+ submitted_without_reset = (self.is_submitted() and self.rerandomize == "always")
+
+ # If the problem is closed (past due / too many attempts)
+ # then we do NOT show the "check" button
+ # Also, do not show the "check" button if we're waiting
+ # for the user to reset a randomized problem
+ if self.closed() or submitted_without_reset:
+ return False
+ else:
+ return True
+
+ def should_show_reset_button(self):
+ """
+ Return True/False to indicate whether to show the "Reset" button.
+ """
+ is_survey_question = (self.max_attempts == 0)
+
+ if self.rerandomize in ["always", "onreset"]:
+
+ # If the problem is closed (and not a survey question with max_attempts==0),
+ # then do NOT show the reset button.
+ # If the problem hasn't been submitted yet, then do NOT show
+ # the reset button.
+ if (self.closed() and not is_survey_question) or not self.is_submitted():
+ return False
+ else:
+ return True
+ # Only randomized problems need a "reset" button
+ else:
+ return False
+
+ def should_show_save_button(self):
+ """
+ Return True/False to indicate whether to show the "Save" button.
+ """
+
+ # If the user has forced the save button to display,
+ # then show it as long as the problem is not closed
+ # (past due / too many attempts)
+ if self.force_save_button:
+ return not self.closed()
+ else:
+ is_survey_question = (self.max_attempts == 0)
+ needs_reset = self.is_submitted() and self.rerandomize == "always"
+
+ # If the student has unlimited attempts, and their answers
+ # are not randomized, then we do not need a save button
+ # because they can use the "Check" button without consequences.
+ #
+ # The consequences we want to avoid are:
+ # * Using up an attempt (if max_attempts is set)
+ # * Changing the current problem, and no longer being
+ # able to view it (if rerandomize is "always")
+ #
+ # In those cases. the if statement below is false,
+ # and the save button can still be displayed.
+ #
+ if self.max_attempts is None and self.rerandomize != "always":
+ return False
+
+ # If the problem is closed (and not a survey question with max_attempts==0),
+ # then do NOT show the save button
+ # If we're waiting for the user to reset a randomized problem
+ # then do NOT show the save button
+ elif (self.closed() and not is_survey_question) or needs_reset:
+ return False
+ else:
+ return True
+
+ def handle_problem_html_error(self, err):
+ """
+ Create a dummy problem to represent any errors.
+
+ Change our problem to a dummy problem containing a warning message to
+ display to users. Returns the HTML to show to users
+
+ `err` is the Exception encountered while rendering the problem HTML.
+ """
+ log.exception(err.message)
+
+ # TODO (vshnayder): another switch on DEBUG.
+ if self.runtime.DEBUG:
+ msg = (
+ u'[courseware.capa.capa_module] '
+ u'Failed to generate HTML for problem {url}'.format(
+ url=cgi.escape(self.location.url()))
+ )
+ msg += u'Error:
{msg}'.format(msg=cgi.escape(err.message))
+ msg += u'{tb}'.format(tb=cgi.escape(traceback.format_exc()))
+ html = msg
+
+ else:
+ # We're in non-debug mode, and possibly even in production. We want
+ # to avoid bricking of problem as much as possible
+
+ # Presumably, student submission has corrupted LoncapaProblem HTML.
+ # First, pull down all student answers
+ student_answers = self.lcp.student_answers
+ answer_ids = student_answers.keys()
+
+ # Some inputtypes, such as dynamath, have additional "hidden" state that
+ # is not exposed to the student. Keep those hidden
+ # TODO: Use regex, e.g. 'dynamath' is suffix at end of answer_id
+ hidden_state_keywords = ['dynamath']
+ for answer_id in answer_ids:
+ for hidden_state_keyword in hidden_state_keywords:
+ if answer_id.find(hidden_state_keyword) >= 0:
+ student_answers.pop(answer_id)
+
+ # Next, generate a fresh LoncapaProblem
+ self.lcp = self.new_lcp(None)
+ self.set_state_from_lcp()
+
+ # Prepend a scary warning to the student
+ warning = ''\
+ '
Warning: The problem has been reset to its initial state!
'\
+ 'The problem\'s state was corrupted by an invalid submission. ' \
+ 'The submission consisted of:'\
+ '
'
+ for student_answer in student_answers.values():
+ if student_answer != '':
+ warning += '- ' + cgi.escape(student_answer) + '
'
+ warning += '
'\
+ 'If this error persists, please contact the course staff.'\
+ '
'
+
+ html = warning
+ try:
+ html += self.lcp.get_html()
+ except Exception: # Couldn't do it. Give up
+ log.exception("Unable to generate html from LoncapaProblem")
+ raise
+
+ return html
+
+ def get_problem_html(self, encapsulate=True):
+ """
+ Return html for the problem.
+
+ Adds check, reset, save buttons as necessary based on the problem config and state.
+ """
+
+ try:
+ html = self.lcp.get_html()
+
+ # If we cannot construct the problem HTML,
+ # then generate an error message instead.
+ except Exception as err: # pylint: disable=broad-except
+ html = self.handle_problem_html_error(err)
+
+ # The convention is to pass the name of the check button
+ # if we want to show a check button, and False otherwise
+ # This works because non-empty strings evaluate to True
+ if self.should_show_check_button():
+ check_button = self.check_button_name()
+ else:
+ check_button = False
+
+ content = {'name': self.display_name_with_default,
+ 'html': html,
+ 'weight': self.weight,
+ }
+
+ context = {'problem': content,
+ 'id': self.id,
+ 'check_button': check_button,
+ 'reset_button': self.should_show_reset_button(),
+ 'save_button': self.should_show_save_button(),
+ 'answer_available': self.answer_available(),
+ 'attempts_used': self.attempts,
+ 'attempts_allowed': self.max_attempts,
+ }
+
+ html = self.runtime.render_template('problem.html', context)
+
+ if encapsulate:
+ html = u''.format(
+ id=self.location.html_id(), ajax_url=self.runtime.ajax_url
+ ) + html + "
"
+
+ # now do all the substitutions which the LMS module_render normally does, but
+ # we need to do here explicitly since we can get called for our HTML via AJAX
+ html = self.runtime.replace_urls(html)
+ if self.runtime.replace_course_urls:
+ html = self.runtime.replace_course_urls(html)
+
+ if self.runtime.replace_jump_to_id_urls:
+ html = self.runtime.replace_jump_to_id_urls(html)
+
+ return html
+
+ def is_past_due(self):
+ """
+ Is it now past this problem's due date, including grace period?
+ """
+ return (self.close_date is not None and
+ datetime.datetime.now(UTC()) > self.close_date)
+
+ def closed(self):
+ """
+ Is the student still allowed to submit answers?
+ """
+ if self.max_attempts is not None and self.attempts >= self.max_attempts:
+ return True
+ if self.is_past_due():
+ return True
+
+ return False
+
+ def is_submitted(self):
+ """
+ Used to decide to show or hide RESET or CHECK buttons.
+
+ Means that student submitted problem and nothing more.
+ Problem can be completely wrong.
+ Pressing RESET button makes this function to return False.
+ """
+ # used by conditional module
+ return self.lcp.done
+
+ def is_attempted(self):
+ """
+ Has the problem been attempted?
+
+ used by conditional module
+ """
+ return self.attempts > 0
+
+ def is_correct(self):
+ """
+ True iff full points
+ """
+ score_dict = self.get_score()
+ return score_dict['score'] == score_dict['total']
+
+ def answer_available(self):
+ """
+ Is the user allowed to see an answer?
+ """
+ if self.showanswer == '':
+ return False
+ elif self.showanswer == "never":
+ return False
+ elif self.runtime.user_is_staff:
+ # This is after the 'never' check because admins can see the answer
+ # unless the problem explicitly prevents it
+ return True
+ elif self.showanswer == 'attempted':
+ return self.attempts > 0
+ elif self.showanswer == 'answered':
+ # NOTE: this is slightly different from 'attempted' -- resetting the problems
+ # makes lcp.done False, but leaves attempts unchanged.
+ return self.lcp.done
+ elif self.showanswer == 'closed':
+ return self.closed()
+ elif self.showanswer == 'finished':
+ return self.closed() or self.is_correct()
+
+ elif self.showanswer == 'past_due':
+ return self.is_past_due()
+ elif self.showanswer == 'always':
+ return True
+
+ return False
+
+ def update_score(self, data):
+ """
+ Delivers grading response (e.g. from asynchronous code checking) to
+ the capa problem, so its score can be updated
+
+ 'data' must have a key 'response' which is a string that contains the
+ grader's response
+
+ No ajax return is needed. Return empty dict.
+ """
+ queuekey = data['queuekey']
+ score_msg = data['xqueue_body']
+ self.lcp.update_score(score_msg, queuekey)
+ self.set_state_from_lcp()
+ self.publish_grade()
+
+ return dict() # No AJAX return is needed
+
+ def handle_ungraded_response(self, data):
+ """
+ Delivers a response from the XQueue to the capa problem
+
+ The score of the problem will not be updated
+
+ Args:
+ - data (dict) must contain keys:
+ queuekey - a key specific to this response
+ xqueue_body - the body of the response
+ Returns:
+ empty dictionary
+
+ No ajax return is needed, so an empty dict is returned
+ """
+ queuekey = data['queuekey']
+ score_msg = data['xqueue_body']
+
+ # pass along the xqueue message to the problem
+ self.lcp.ungraded_response(score_msg, queuekey)
+ self.set_state_from_lcp()
+ return dict()
+
+ def handle_input_ajax(self, data):
+ """
+ Handle ajax calls meant for a particular input in the problem
+
+ Args:
+ - data (dict) - data that should be passed to the input
+ Returns:
+ - dict containing the response from the input
+ """
+ response = self.lcp.handle_input_ajax(data)
+
+ # save any state changes that may occur
+ self.set_state_from_lcp()
+ return response
+
+ def get_answer(self, _data):
+ """
+ For the "show answer" button.
+
+ Returns the answers: {'answers' : answers}
+ """
+ event_info = dict()
+ event_info['problem_id'] = self.location.url()
+ self.runtime.track_function('showanswer', event_info)
+ if not self.answer_available():
+ raise NotFoundError('Answer is not available')
+ else:
+ answers = self.lcp.get_question_answers()
+ self.set_state_from_lcp()
+
+ # answers (eg ) may have embedded images
+ # but be careful, some problems are using non-string answer dicts
+ new_answers = dict()
+ for answer_id in answers:
+ try:
+ new_answer = {answer_id: self.runtime.replace_urls(answers[answer_id])}
+ except TypeError:
+ log.debug(u'Unable to perform URL substitution on answers[%s]: %s',
+ answer_id, answers[answer_id])
+ new_answer = {answer_id: answers[answer_id]}
+ new_answers.update(new_answer)
+
+ return {'answers': new_answers}
+
+ # Figure out if we should move these to capa_problem?
+ def get_problem(self, _data):
+ """
+ Return results of get_problem_html, as a simple dict for json-ing.
+ { 'html': }
+
+ Used if we want to reconfirm we have the right thing e.g. after
+ several AJAX calls.
+ """
+ return {'html': self.get_problem_html(encapsulate=False)}
+
+ @staticmethod
+ def make_dict_of_responses(data):
+ """
+ Make dictionary of student responses (aka "answers")
+
+ `data` is POST dictionary (webob.multidict.MultiDict).
+
+ The `data` dict has keys of the form 'x_y', which are mapped
+ to key 'y' in the returned dict. For example,
+ 'input_1_2_3' would be mapped to '1_2_3' in the returned dict.
+
+ Some inputs always expect a list in the returned dict
+ (e.g. checkbox inputs). The convention is that
+ keys in the `data` dict that end with '[]' will always
+ have list values in the returned dict.
+ For example, if the `data` dict contains {'input_1[]': 'test' }
+ then the output dict would contain {'1': ['test'] }
+ (the value is a list).
+
+ Some other inputs such as ChoiceTextInput expect a dict of values in the returned
+ dict If the key ends with '{}' then we will assume that the value is a json
+ encoded dict and deserialize it.
+ For example, if the `data` dict contains {'input_1{}': '{"1_2_1": 1}'}
+ then the output dict would contain {'1': {"1_2_1": 1} }
+ (the value is a dictionary)
+
+ Raises an exception if:
+
+ -A key in the `data` dictionary does not contain at least one underscore
+ (e.g. "input" is invalid, but "input_1" is valid)
+
+ -Two keys end up with the same name in the returned dict.
+ (e.g. 'input_1' and 'input_1[]', which both get mapped to 'input_1'
+ in the returned dict)
+ """
+ answers = dict()
+
+ # webob.multidict.MultiDict is a view of a list of tuples,
+ # so it will return a multi-value key once for each value.
+ # We only want to consider each key a single time, so we use set(data.keys())
+ for key in set(data.keys()):
+ # e.g. input_resistor_1 ==> resistor_1
+ _, _, name = key.partition('_') # pylint: disable=redefined-outer-name
+
+ # If key has no underscores, then partition
+ # will return (key, '', '')
+ # We detect this and raise an error
+ if not name:
+ raise ValueError(u"{key} must contain at least one underscore".format(key=key))
+
+ else:
+ # This allows for answers which require more than one value for
+ # the same form input (e.g. checkbox inputs). The convention is that
+ # if the name ends with '[]' (which looks like an array), then the
+ # answer will be an array.
+ # if the name ends with '{}' (Which looks like a dict),
+ # then the answer will be a dict
+ is_list_key = name.endswith('[]')
+ is_dict_key = name.endswith('{}')
+ name = name[:-2] if is_list_key or is_dict_key else name
+
+ if is_list_key:
+ val = data.getall(key)
+ elif is_dict_key:
+ try:
+ val = json.loads(data[key])
+ # If the submission wasn't deserializable, raise an error.
+ except(KeyError, ValueError):
+ raise ValueError(
+ u"Invalid submission: {val} for {key}".format(val=data[key], key=key)
+ )
+ else:
+ val = data[key]
+
+ # If the name already exists, then we don't want
+ # to override it. Raise an error instead
+ if name in answers:
+ raise ValueError(u"Key {name} already exists in answers dict".format(name=name))
+ else:
+ answers[name] = val
+
+ return answers
+
+ def publish_grade(self):
+ """
+ Publishes the student's current grade to the system as an event
+ """
+ score = self.lcp.get_score()
+ self.runtime.publish({
+ 'event_name': 'grade',
+ 'value': score['score'],
+ 'max_value': score['total'],
+ })
+
+ return {'grade': score['score'], 'max_grade': score['total']}
+
+ def check_problem(self, data):
+ """
+ Checks whether answers to a problem are correct
+
+ Returns a map of correct/incorrect answers:
+ {'success' : 'correct' | 'incorrect' | AJAX alert msg string,
+ 'contents' : html}
+ """
+ event_info = dict()
+ event_info['state'] = self.lcp.get_state()
+ event_info['problem_id'] = self.location.url()
+
+ answers = self.make_dict_of_responses(data)
+ event_info['answers'] = convert_files_to_filenames(answers)
+
+ # Too late. Cannot submit
+ if self.closed():
+ event_info['failure'] = 'closed'
+ self.runtime.track_function('problem_check_fail', event_info)
+ raise NotFoundError('Problem is closed')
+
+ # Problem submitted. Student should reset before checking again
+ if self.done and self.rerandomize == "always":
+ event_info['failure'] = 'unreset'
+ self.runtime.track_function('problem_check_fail', event_info)
+ raise NotFoundError('Problem must be reset before it can be checked again')
+
+ # Problem queued. Students must wait a specified waittime before they are allowed to submit
+ if self.lcp.is_queued():
+ current_time = datetime.datetime.now(UTC())
+ prev_submit_time = self.lcp.get_recentmost_queuetime()
+ waittime_between_requests = self.runtime.xqueue['waittime']
+ if (current_time - prev_submit_time).total_seconds() < waittime_between_requests:
+ msg = u'You must wait at least {wait} seconds between submissions'.format(
+ wait=waittime_between_requests)
+ return {'success': msg, 'html': ''} # Prompts a modal dialog in ajax callback
+
+ try:
+ correct_map = self.lcp.grade_answers(answers)
+ self.attempts = self.attempts + 1
+ self.lcp.done = True
+ self.set_state_from_lcp()
+
+ except (StudentInputError, ResponseError, LoncapaProblemError) as inst:
+ log.warning("StudentInputError in capa_module:problem_check",
+ exc_info=True)
+
+ # Save the user's state before failing
+ self.set_state_from_lcp()
+
+ # If the user is a staff member, include
+ # the full exception, including traceback,
+ # in the response
+ if self.runtime.user_is_staff:
+ msg = u"Staff debug info: {tb}".format(tb=cgi.escape(traceback.format_exc()))
+
+ # Otherwise, display just an error message,
+ # without a stack trace
+ else:
+ msg = u"Error: {msg}".format(msg=inst.message)
+
+ return {'success': msg}
+
+ except Exception as err:
+ # Save the user's state before failing
+ self.set_state_from_lcp()
+
+ if self.runtime.DEBUG:
+ msg = u"Error checking problem: {}".format(err.message)
+ msg += u'\nTraceback:\n{}'.format(traceback.format_exc())
+ return {'success': msg}
+ raise
+
+ published_grade = self.publish_grade()
+
+ # success = correct if ALL questions in this problem are correct
+ success = 'correct'
+ for answer_id in correct_map:
+ if not correct_map.is_correct(answer_id):
+ success = 'incorrect'
+
+ # NOTE: We are logging both full grading and queued-grading submissions. In the latter,
+ # 'success' will always be incorrect
+ event_info['grade'] = published_grade['grade']
+ event_info['max_grade'] = published_grade['max_grade']
+ event_info['correct_map'] = correct_map.get_dict()
+ event_info['success'] = success
+ event_info['attempts'] = self.attempts
+ self.runtime.track_function('problem_check', event_info)
+
+ if hasattr(self.runtime, 'psychometrics_handler'): # update PsychometricsData using callback
+ self.runtime.psychometrics_handler(self.get_state_for_lcp())
+
+ # render problem into HTML
+ html = self.get_problem_html(encapsulate=False)
+
+ return {'success': success,
+ 'contents': html,
+ }
+
+ def rescore_problem(self):
+ """
+ Checks whether the existing answers to a problem are correct.
+
+ This is called when the correct answer to a problem has been changed,
+ and the grade should be re-evaluated.
+
+ Returns a dict with one key:
+ {'success' : 'correct' | 'incorrect' | AJAX alert msg string }
+
+ Raises NotFoundError if called on a problem that has not yet been
+ answered, or NotImplementedError if it's a problem that cannot be rescored.
+
+ Returns the error messages for exceptions occurring while performing
+ the rescoring, rather than throwing them.
+ """
+ event_info = {'state': self.lcp.get_state(), 'problem_id': self.location.url()}
+
+ if not self.lcp.supports_rescoring():
+ event_info['failure'] = 'unsupported'
+ self.runtime.track_function('problem_rescore_fail', event_info)
+ raise NotImplementedError("Problem's definition does not support rescoring")
+
+ if not self.done:
+ event_info['failure'] = 'unanswered'
+ self.runtime.track_function('problem_rescore_fail', event_info)
+ raise NotFoundError('Problem must be answered before it can be graded again')
+
+ # get old score, for comparison:
+ orig_score = self.lcp.get_score()
+ event_info['orig_score'] = orig_score['score']
+ event_info['orig_total'] = orig_score['total']
+
+ try:
+ correct_map = self.lcp.rescore_existing_answers()
+
+ except (StudentInputError, ResponseError, LoncapaProblemError) as inst:
+ log.warning("Input error in capa_module:problem_rescore", exc_info=True)
+ event_info['failure'] = 'input_error'
+ self.runtime.track_function('problem_rescore_fail', event_info)
+ return {'success': u"Error: {0}".format(inst.message)}
+
+ except Exception as err:
+ event_info['failure'] = 'unexpected'
+ self.runtime.track_function('problem_rescore_fail', event_info)
+ if self.runtime.DEBUG:
+ msg = u"Error checking problem: {0}".format(err.message)
+ msg += u'\nTraceback:\n' + traceback.format_exc()
+ return {'success': msg}
+ raise
+
+ # rescoring should have no effect on attempts, so don't
+ # need to increment here, or mark done. Just save.
+ self.set_state_from_lcp()
+
+ self.publish_grade()
+
+ new_score = self.lcp.get_score()
+ event_info['new_score'] = new_score['score']
+ event_info['new_total'] = new_score['total']
+
+ # success = correct if ALL questions in this problem are correct
+ success = 'correct'
+ for answer_id in correct_map:
+ if not correct_map.is_correct(answer_id):
+ success = 'incorrect'
+
+ # NOTE: We are logging both full grading and queued-grading submissions. In the latter,
+ # 'success' will always be incorrect
+ event_info['correct_map'] = correct_map.get_dict()
+ event_info['success'] = success
+ event_info['attempts'] = self.attempts
+ self.runtime.track_function('problem_rescore', event_info)
+
+ # psychometrics should be called on rescoring requests in the same way as check-problem
+ if hasattr(self.runtime, 'psychometrics_handler'): # update PsychometricsData using callback
+ self.runtime.psychometrics_handler(self.get_state_for_lcp())
+
+ return {'success': success}
+
+ def save_problem(self, data):
+ """
+ Save the passed in answers.
+ Returns a dict { 'success' : bool, 'msg' : message }
+ The message is informative on success, and an error message on failure.
+ """
+ event_info = dict()
+ event_info['state'] = self.lcp.get_state()
+ event_info['problem_id'] = self.location.url()
+
+ answers = self.make_dict_of_responses(data)
+ event_info['answers'] = answers
+
+ # Too late. Cannot submit
+ if self.closed() and not self.max_attempts == 0:
+ event_info['failure'] = 'closed'
+ self.runtime.track_function('save_problem_fail', event_info)
+ return {'success': False,
+ 'msg': "Problem is closed"}
+
+ # Problem submitted. Student should reset before saving
+ # again.
+ if self.done and self.rerandomize == "always":
+ event_info['failure'] = 'done'
+ self.runtime.track_function('save_problem_fail', event_info)
+ return {'success': False,
+ 'msg': "Problem needs to be reset prior to save"}
+
+ self.lcp.student_answers = answers
+
+ self.set_state_from_lcp()
+
+ self.runtime.track_function('save_problem_success', event_info)
+ msg = "Your answers have been saved"
+ if not self.max_attempts == 0:
+ msg += " but not graded. Hit 'Check' to grade them."
+ return {'success': True,
+ 'msg': msg}
+
+ def reset_problem(self, _data):
+ """
+ Changes problem state to unfinished -- removes student answers,
+ and causes problem to rerender itself.
+
+ Returns a dictionary of the form:
+ {'success': True/False,
+ 'html': Problem HTML string }
+
+ If an error occurs, the dictionary will also have an
+ `error` key containing an error message.
+ """
+ event_info = dict()
+ event_info['old_state'] = self.lcp.get_state()
+ event_info['problem_id'] = self.location.url()
+
+ if self.closed():
+ event_info['failure'] = 'closed'
+ self.runtime.track_function('reset_problem_fail', event_info)
+ return {'success': False,
+ 'error': "Problem is closed"}
+
+ if not self.done:
+ event_info['failure'] = 'not_done'
+ self.runtime.track_function('reset_problem_fail', event_info)
+ return {'success': False,
+ 'error': "Refresh the page and make an attempt before resetting."}
+
+ if self.rerandomize in ["always", "onreset"]:
+ # Reset random number generator seed.
+ self.choose_new_seed()
+
+ # Generate a new problem with either the previous seed or a new seed
+ self.lcp = self.new_lcp(None)
+
+ # Pull in the new problem seed
+ self.set_state_from_lcp()
+
+ event_info['new_state'] = self.lcp.get_state()
+ self.runtime.track_function('reset_problem', event_info)
+
+ return {'success': True,
+ 'html': self.get_problem_html(encapsulate=False)}
diff --git a/common/lib/xmodule/xmodule/capa_module.py b/common/lib/xmodule/xmodule/capa_module.py
index 60bf55b9f3..d00fe6cf76 100644
--- a/common/lib/xmodule/xmodule/capa_module.py
+++ b/common/lib/xmodule/xmodule/capa_module.py
@@ -1,180 +1,20 @@
"""Implements basics of Capa, including class CapaModule."""
-import cgi
-import datetime
-import hashlib
import json
import logging
-import os
-import traceback
-import struct
import sys
from pkg_resources import resource_string
-from capa.capa_problem import LoncapaProblem
-from capa.responsetypes import StudentInputError, \
- ResponseError, LoncapaProblemError
-from capa.util import convert_files_to_filenames
+from .capa_base import CapaMixin, CapaFields, ComplexEncoder
from .progress import Progress
from xmodule.x_module import XModule, module_attr
from xmodule.raw_module import RawDescriptor
from xmodule.exceptions import NotFoundError, ProcessingError
-from xblock.fields import Scope, String, Boolean, Dict, Integer, Float
-from .fields import Timedelta, Date
-from django.utils.timezone import UTC
-from .util.duedate import get_extended_due_date
log = logging.getLogger("edx.courseware")
-# Generate this many different variants of problems with rerandomize=per_student
-NUM_RANDOMIZATION_BINS = 20
-# Never produce more than this many different seeds, no matter what.
-MAX_RANDOMIZATION_BINS = 1000
-
-
-def randomization_bin(seed, problem_id):
- """
- Pick a randomization bin for the problem given the user's seed and a problem id.
-
- We do this because we only want e.g. 20 randomizations of a problem to make analytics
- interesting. To avoid having sets of students that always get the same problems,
- we'll combine the system's per-student seed with the problem id in picking the bin.
- """
- r_hash = hashlib.sha1()
- r_hash.update(str(seed))
- r_hash.update(str(problem_id))
- # get the first few digits of the hash, convert to an int, then mod.
- return int(r_hash.hexdigest()[:7], 16) % NUM_RANDOMIZATION_BINS
-
-
-class Randomization(String):
- """
- Define a field to store how to randomize a problem.
- """
- def from_json(self, value):
- if value in ("", "true"):
- return "always"
- elif value == "false":
- return "per_student"
- return value
-
- to_json = from_json
-
-
-class ComplexEncoder(json.JSONEncoder):
- """
- Extend the JSON encoder to correctly handle complex numbers
- """
- def default(self, obj):
- """
- Print a nicely formatted complex number, or default to the JSON encoder
- """
- if isinstance(obj, complex):
- return u"{real:.7g}{imag:+.7g}*j".format(real=obj.real, imag=obj.imag)
- return json.JSONEncoder.default(self, obj)
-
-
-class CapaFields(object):
- """
- Define the possible fields for a Capa problem
- """
- display_name = String(
- display_name="Display Name",
- help="This name appears in the horizontal navigation at the top of the page.",
- scope=Scope.settings,
- # it'd be nice to have a useful default but it screws up other things; so,
- # use display_name_with_default for those
- default="Blank Advanced Problem"
- )
- attempts = Integer(help="Number of attempts taken by the student on this problem",
- default=0, scope=Scope.user_state)
- max_attempts = Integer(
- display_name="Maximum Attempts",
- help=("Defines the number of times a student can try to answer this problem. "
- "If the value is not set, infinite attempts are allowed."),
- values={"min": 0}, scope=Scope.settings
- )
- due = Date(help="Date that this problem is due by", scope=Scope.settings)
- extended_due = Date(
- help="Date that this problem is due by for a particular student. This "
- "can be set by an instructor, and will override the global due "
- "date if it is set to a date that is later than the global due "
- "date.",
- default=None,
- scope=Scope.user_state,
- )
- graceperiod = Timedelta(
- help="Amount of time after the due date that submissions will be accepted",
- scope=Scope.settings
- )
- showanswer = String(
- display_name="Show Answer",
- help=("Defines when to show the answer to the problem. "
- "A default value can be set in Advanced Settings."),
- scope=Scope.settings,
- default="finished",
- values=[
- {"display_name": "Always", "value": "always"},
- {"display_name": "Answered", "value": "answered"},
- {"display_name": "Attempted", "value": "attempted"},
- {"display_name": "Closed", "value": "closed"},
- {"display_name": "Finished", "value": "finished"},
- {"display_name": "Past Due", "value": "past_due"},
- {"display_name": "Never", "value": "never"}]
- )
- force_save_button = Boolean(
- help="Whether to force the save button to appear on the page",
- scope=Scope.settings,
- default=False
- )
- rerandomize = Randomization(
- display_name="Randomization",
- help="Defines how often inputs are randomized when a student loads the problem. "
- "This setting only applies to problems that can have randomly generated numeric values. "
- "A default value can be set in Advanced Settings.",
- default="never",
- scope=Scope.settings,
- values=[
- {"display_name": "Always", "value": "always"},
- {"display_name": "On Reset", "value": "onreset"},
- {"display_name": "Never", "value": "never"},
- {"display_name": "Per Student", "value": "per_student"}
- ]
- )
- data = String(help="XML data for the problem", scope=Scope.content, default="")
- correct_map = Dict(help="Dictionary with the correctness of current student answers",
- scope=Scope.user_state, default={})
- input_state = Dict(help="Dictionary for maintaining the state of inputtypes", scope=Scope.user_state)
- student_answers = Dict(help="Dictionary with the current student responses", scope=Scope.user_state)
- done = Boolean(help="Whether the student has answered the problem", scope=Scope.user_state)
- seed = Integer(help="Random seed for this student", scope=Scope.user_state)
- weight = Float(
- display_name="Problem Weight",
- help=("Defines the number of points each problem is worth. "
- "If the value is not set, each response field in the problem is worth one point."),
- values={"min": 0, "step": .1},
- scope=Scope.settings
- )
- markdown = String(help="Markdown source of this module", default=None, scope=Scope.settings)
- source_code = String(
- help="Source code for LaTeX and Word problems. This feature is not well-supported.",
- scope=Scope.settings
- )
- text_customization = Dict(
- help="String customization substitutions for particular locations",
- scope=Scope.settings
- # TODO: someday it should be possible to not duplicate this definition here
- # and in inheritance.py
- )
- use_latex_compiler = Boolean(
- help="Enable LaTeX templates?",
- default=False,
- scope=Scope.settings
- )
-
-
-class CapaModule(CapaFields, XModule):
+class CapaModule(CapaMixin, XModule):
"""
An XModule implementing LonCapa format problems, implemented by way of
capa.capa_problem.LoncapaProblem
@@ -200,385 +40,6 @@ class CapaModule(CapaFields, XModule):
"""
super(CapaModule, self).__init__(*args, **kwargs)
- due_date = get_extended_due_date(self)
-
- if self.graceperiod is not None and due_date:
- self.close_date = due_date + self.graceperiod
- else:
- self.close_date = due_date
-
- if self.seed is None:
- self.choose_new_seed()
-
- # Need the problem location in openendedresponse to send out. Adding
- # it to the system here seems like the least clunky way to get it
- # there.
- self.system.set('location', self.location.url())
-
- try:
- # TODO (vshnayder): move as much as possible of this work and error
- # checking to descriptor load time
- self.lcp = self.new_lcp(self.get_state_for_lcp())
-
- # At this point, we need to persist the randomization seed
- # so that when the problem is re-loaded (to check/view/save)
- # it stays the same.
- # However, we do not want to write to the database
- # every time the module is loaded.
- # So we set the seed ONLY when there is not one set already
- if self.seed is None:
- self.seed = self.lcp.seed
-
- except Exception as err: # pylint: disable=broad-except
- msg = u'cannot create LoncapaProblem {loc}: {err}'.format(
- loc=self.location.url(), err=err)
- # TODO (vshnayder): do modules need error handlers too?
- # We shouldn't be switching on DEBUG.
- if self.system.DEBUG:
- log.warning(msg)
- # TODO (vshnayder): This logic should be general, not here--and may
- # want to preserve the data instead of replacing it.
- # e.g. in the CMS
- msg = u'{msg}
'.format(msg=cgi.escape(msg))
- msg += u'{tb}'.format(
- tb=cgi.escape(traceback.format_exc()))
- # create a dummy problem with error message instead of failing
- problem_text = (u''
- u'Problem {url} has an error:{msg}'.format(
- url=self.location.url(),
- msg=msg)
- )
- self.lcp = self.new_lcp(self.get_state_for_lcp(), text=problem_text)
- else:
- # add extra info and raise
- raise Exception(msg), None, sys.exc_info()[2]
-
- self.set_state_from_lcp()
-
- assert self.seed is not None
-
- def choose_new_seed(self):
- """
- Choose a new seed.
- """
- if self.rerandomize == 'never':
- self.seed = 1
- elif self.rerandomize == "per_student" and hasattr(self.system, 'seed'):
- # see comment on randomization_bin
- self.seed = randomization_bin(self.system.seed, self.location.url)
- else:
- self.seed = struct.unpack('i', os.urandom(4))[0]
-
- # So that sandboxed code execution can be cached, but still have an interesting
- # number of possibilities, cap the number of different random seeds.
- self.seed %= MAX_RANDOMIZATION_BINS
-
- def new_lcp(self, state, text=None):
- """
- Generate a new Loncapa Problem
- """
- if text is None:
- text = self.data
-
- return LoncapaProblem(
- problem_text=text,
- id=self.location.html_id(),
- state=state,
- seed=self.seed,
- system=self.system,
- )
-
- def get_state_for_lcp(self):
- """
- Give a dictionary holding the state of the module
- """
- return {
- 'done': self.done,
- 'correct_map': self.correct_map,
- 'student_answers': self.student_answers,
- 'input_state': self.input_state,
- 'seed': self.seed,
- }
-
- def set_state_from_lcp(self):
- """
- Set the module's state from the settings in `self.lcp`
- """
- lcp_state = self.lcp.get_state()
- self.done = lcp_state['done']
- self.correct_map = lcp_state['correct_map']
- self.input_state = lcp_state['input_state']
- self.student_answers = lcp_state['student_answers']
- self.seed = lcp_state['seed']
-
- def get_score(self):
- """
- Access the problem's score
- """
- return self.lcp.get_score()
-
- def max_score(self):
- """
- Access the problem's max score
- """
- return self.lcp.get_max_score()
-
- def get_progress(self):
- """
- For now, just return score / max_score
- """
- score_dict = self.get_score()
- score = score_dict['score']
- total = score_dict['total']
-
- if total > 0:
- if self.weight is not None:
- # scale score and total by weight/total:
- score = score * self.weight / total
- total = self.weight
-
- try:
- return Progress(score, total)
- except (TypeError, ValueError):
- log.exception("Got bad progress")
- return None
- return None
-
- def get_html(self):
- """
- Return some html with data about the module
- """
- progress = self.get_progress()
- return self.system.render_template('problem_ajax.html', {
- 'element_id': self.location.html_id(),
- 'id': self.id,
- 'ajax_url': self.system.ajax_url,
- 'progress_status': Progress.to_js_status_str(progress),
- 'progress_detail': Progress.to_js_detail_str(progress),
- })
-
- def check_button_name(self):
- """
- Determine the name for the "check" button.
-
- Usually it is just "Check", but if this is the student's
- final attempt, change the name to "Final Check".
- The text can be customized by the text_customization setting.
- """
- # The logic flow is a little odd so that _('xxx') strings can be found for
- # translation while also running _() just once for each string.
- _ = self.runtime.service(self, "i18n").ugettext
- check = _('Check')
- final_check = _('Final Check')
-
- # Apply customizations if present
- if 'custom_check' in self.text_customization:
- check = _(self.text_customization.get('custom_check'))
- if 'custom_final_check' in self.text_customization:
- final_check = _(self.text_customization.get('custom_final_check'))
- # TODO: need a way to get the customized words into the list of
- # words to be translated
-
- if self.max_attempts is not None and self.attempts >= self.max_attempts - 1:
- return final_check
- else:
- return check
-
- def should_show_check_button(self):
- """
- Return True/False to indicate whether to show the "Check" button.
- """
- submitted_without_reset = (self.is_submitted() and self.rerandomize == "always")
-
- # If the problem is closed (past due / too many attempts)
- # then we do NOT show the "check" button
- # Also, do not show the "check" button if we're waiting
- # for the user to reset a randomized problem
- if self.closed() or submitted_without_reset:
- return False
- else:
- return True
-
- def should_show_reset_button(self):
- """
- Return True/False to indicate whether to show the "Reset" button.
- """
- is_survey_question = (self.max_attempts == 0)
-
- if self.rerandomize in ["always", "onreset"]:
-
- # If the problem is closed (and not a survey question with max_attempts==0),
- # then do NOT show the reset button.
- # If the problem hasn't been submitted yet, then do NOT show
- # the reset button.
- if (self.closed() and not is_survey_question) or not self.is_submitted():
- return False
- else:
- return True
- # Only randomized problems need a "reset" button
- else:
- return False
-
- def should_show_save_button(self):
- """
- Return True/False to indicate whether to show the "Save" button.
- """
-
- # If the user has forced the save button to display,
- # then show it as long as the problem is not closed
- # (past due / too many attempts)
- if self.force_save_button:
- return not self.closed()
- else:
- is_survey_question = (self.max_attempts == 0)
- needs_reset = self.is_submitted() and self.rerandomize == "always"
-
- # If the student has unlimited attempts, and their answers
- # are not randomized, then we do not need a save button
- # because they can use the "Check" button without consequences.
- #
- # The consequences we want to avoid are:
- # * Using up an attempt (if max_attempts is set)
- # * Changing the current problem, and no longer being
- # able to view it (if rerandomize is "always")
- #
- # In those cases. the if statement below is false,
- # and the save button can still be displayed.
- #
- if self.max_attempts is None and self.rerandomize != "always":
- return False
-
- # If the problem is closed (and not a survey question with max_attempts==0),
- # then do NOT show the save button
- # If we're waiting for the user to reset a randomized problem
- # then do NOT show the save button
- elif (self.closed() and not is_survey_question) or needs_reset:
- return False
- else:
- return True
-
- def handle_problem_html_error(self, err):
- """
- Create a dummy problem to represent any errors.
-
- Change our problem to a dummy problem containing a warning message to
- display to users. Returns the HTML to show to users
-
- `err` is the Exception encountered while rendering the problem HTML.
- """
- log.exception(err.message)
-
- # TODO (vshnayder): another switch on DEBUG.
- if self.system.DEBUG:
- msg = (
- u'[courseware.capa.capa_module] '
- u'Failed to generate HTML for problem {url}'.format(
- url=cgi.escape(self.location.url()))
- )
- msg += u'Error:
{msg}'.format(msg=cgi.escape(err.message))
- msg += u'{tb}'.format(tb=cgi.escape(traceback.format_exc()))
- html = msg
-
- else:
- # We're in non-debug mode, and possibly even in production. We want
- # to avoid bricking of problem as much as possible
-
- # Presumably, student submission has corrupted LoncapaProblem HTML.
- # First, pull down all student answers
- student_answers = self.lcp.student_answers
- answer_ids = student_answers.keys()
-
- # Some inputtypes, such as dynamath, have additional "hidden" state that
- # is not exposed to the student. Keep those hidden
- # TODO: Use regex, e.g. 'dynamath' is suffix at end of answer_id
- hidden_state_keywords = ['dynamath']
- for answer_id in answer_ids:
- for hidden_state_keyword in hidden_state_keywords:
- if answer_id.find(hidden_state_keyword) >= 0:
- student_answers.pop(answer_id)
-
- # Next, generate a fresh LoncapaProblem
- self.lcp = self.new_lcp(None)
- self.set_state_from_lcp()
-
- # Prepend a scary warning to the student
- warning = ''\
- '
Warning: The problem has been reset to its initial state!
'\
- 'The problem\'s state was corrupted by an invalid submission. ' \
- 'The submission consisted of:'\
- '
'
- for student_answer in student_answers.values():
- if student_answer != '':
- warning += '- ' + cgi.escape(student_answer) + '
'
- warning += '
'\
- 'If this error persists, please contact the course staff.'\
- '
'
-
- html = warning
- try:
- html += self.lcp.get_html()
- except Exception: # Couldn't do it. Give up
- log.exception("Unable to generate html from LoncapaProblem")
- raise
-
- return html
-
- def get_problem_html(self, encapsulate=True):
- """
- Return html for the problem.
-
- Adds check, reset, save buttons as necessary based on the problem config and state.
- """
-
- try:
- html = self.lcp.get_html()
-
- # If we cannot construct the problem HTML,
- # then generate an error message instead.
- except Exception as err: # pylint: disable=broad-except
- html = self.handle_problem_html_error(err)
-
- # The convention is to pass the name of the check button
- # if we want to show a check button, and False otherwise
- # This works because non-empty strings evaluate to True
- if self.should_show_check_button():
- check_button = self.check_button_name()
- else:
- check_button = False
-
- content = {'name': self.display_name_with_default,
- 'html': html,
- 'weight': self.weight,
- }
-
- context = {'problem': content,
- 'id': self.id,
- 'check_button': check_button,
- 'reset_button': self.should_show_reset_button(),
- 'save_button': self.should_show_save_button(),
- 'answer_available': self.answer_available(),
- 'attempts_used': self.attempts,
- 'attempts_allowed': self.max_attempts,
- }
-
- html = self.system.render_template('problem.html', context)
-
- if encapsulate:
- html = u''.format(
- id=self.location.html_id(), ajax_url=self.system.ajax_url
- ) + html + "
"
-
- # now do all the substitutions which the LMS module_render normally does, but
- # we need to do here explicitly since we can get called for our HTML via AJAX
- html = self.system.replace_urls(html)
- if self.system.replace_course_urls:
- html = self.system.replace_course_urls(html)
-
- if self.system.replace_jump_to_id_urls:
- html = self.system.replace_jump_to_id_urls(html)
-
- return html
-
def handle_ajax(self, dispatch, data):
"""
This is called by courseware.module_render, to handle an AJAX call.
@@ -637,535 +98,6 @@ class CapaModule(CapaFields, XModule):
return json.dumps(result, cls=ComplexEncoder)
- def is_past_due(self):
- """
- Is it now past this problem's due date, including grace period?
- """
- return (self.close_date is not None and
- datetime.datetime.now(UTC()) > self.close_date)
-
- def closed(self):
- """
- Is the student still allowed to submit answers?
- """
- if self.max_attempts is not None and self.attempts >= self.max_attempts:
- return True
- if self.is_past_due():
- return True
-
- return False
-
- def is_submitted(self):
- """
- Used to decide to show or hide RESET or CHECK buttons.
-
- Means that student submitted problem and nothing more.
- Problem can be completely wrong.
- Pressing RESET button makes this function to return False.
- """
- # used by conditional module
- return self.lcp.done
-
- def is_attempted(self):
- """
- Has the problem been attempted?
-
- used by conditional module
- """
- return self.attempts > 0
-
- def is_correct(self):
- """
- True iff full points
- """
- score_dict = self.get_score()
- return score_dict['score'] == score_dict['total']
-
- def answer_available(self):
- """
- Is the user allowed to see an answer?
- """
- if self.showanswer == '':
- return False
- elif self.showanswer == "never":
- return False
- elif self.system.user_is_staff:
- # This is after the 'never' check because admins can see the answer
- # unless the problem explicitly prevents it
- return True
- elif self.showanswer == 'attempted':
- return self.attempts > 0
- elif self.showanswer == 'answered':
- # NOTE: this is slightly different from 'attempted' -- resetting the problems
- # makes lcp.done False, but leaves attempts unchanged.
- return self.lcp.done
- elif self.showanswer == 'closed':
- return self.closed()
- elif self.showanswer == 'finished':
- return self.closed() or self.is_correct()
-
- elif self.showanswer == 'past_due':
- return self.is_past_due()
- elif self.showanswer == 'always':
- return True
-
- return False
-
- def update_score(self, data):
- """
- Delivers grading response (e.g. from asynchronous code checking) to
- the capa problem, so its score can be updated
-
- 'data' must have a key 'response' which is a string that contains the
- grader's response
-
- No ajax return is needed. Return empty dict.
- """
- queuekey = data['queuekey']
- score_msg = data['xqueue_body']
- self.lcp.update_score(score_msg, queuekey)
- self.set_state_from_lcp()
- self.publish_grade()
-
- return dict() # No AJAX return is needed
-
- def handle_ungraded_response(self, data):
- """
- Delivers a response from the XQueue to the capa problem
-
- The score of the problem will not be updated
-
- Args:
- - data (dict) must contain keys:
- queuekey - a key specific to this response
- xqueue_body - the body of the response
- Returns:
- empty dictionary
-
- No ajax return is needed, so an empty dict is returned
- """
- queuekey = data['queuekey']
- score_msg = data['xqueue_body']
-
- # pass along the xqueue message to the problem
- self.lcp.ungraded_response(score_msg, queuekey)
- self.set_state_from_lcp()
- return dict()
-
- def handle_input_ajax(self, data):
- """
- Handle ajax calls meant for a particular input in the problem
-
- Args:
- - data (dict) - data that should be passed to the input
- Returns:
- - dict containing the response from the input
- """
- response = self.lcp.handle_input_ajax(data)
-
- # save any state changes that may occur
- self.set_state_from_lcp()
- return response
-
- def get_answer(self, _data):
- """
- For the "show answer" button.
-
- Returns the answers: {'answers' : answers}
- """
- event_info = dict()
- event_info['problem_id'] = self.location.url()
- self.system.track_function('showanswer', event_info)
- if not self.answer_available():
- raise NotFoundError('Answer is not available')
- else:
- answers = self.lcp.get_question_answers()
- self.set_state_from_lcp()
-
- # answers (eg ) may have embedded images
- # but be careful, some problems are using non-string answer dicts
- new_answers = dict()
- for answer_id in answers:
- try:
- new_answer = {answer_id: self.system.replace_urls(answers[answer_id])}
- except TypeError:
- log.debug(u'Unable to perform URL substitution on answers[%s]: %s',
- answer_id, answers[answer_id])
- new_answer = {answer_id: answers[answer_id]}
- new_answers.update(new_answer)
-
- return {'answers': new_answers}
-
- # Figure out if we should move these to capa_problem?
- def get_problem(self, _data):
- """
- Return results of get_problem_html, as a simple dict for json-ing.
- { 'html': }
-
- Used if we want to reconfirm we have the right thing e.g. after
- several AJAX calls.
- """
- return {'html': self.get_problem_html(encapsulate=False)}
-
- @staticmethod
- def make_dict_of_responses(data):
- """
- Make dictionary of student responses (aka "answers")
-
- `data` is POST dictionary (webob.multidict.MultiDict).
-
- The `data` dict has keys of the form 'x_y', which are mapped
- to key 'y' in the returned dict. For example,
- 'input_1_2_3' would be mapped to '1_2_3' in the returned dict.
-
- Some inputs always expect a list in the returned dict
- (e.g. checkbox inputs). The convention is that
- keys in the `data` dict that end with '[]' will always
- have list values in the returned dict.
- For example, if the `data` dict contains {'input_1[]': 'test' }
- then the output dict would contain {'1': ['test'] }
- (the value is a list).
-
- Some other inputs such as ChoiceTextInput expect a dict of values in the returned
- dict If the key ends with '{}' then we will assume that the value is a json
- encoded dict and deserialize it.
- For example, if the `data` dict contains {'input_1{}': '{"1_2_1": 1}'}
- then the output dict would contain {'1': {"1_2_1": 1} }
- (the value is a dictionary)
-
- Raises an exception if:
-
- -A key in the `data` dictionary does not contain at least one underscore
- (e.g. "input" is invalid, but "input_1" is valid)
-
- -Two keys end up with the same name in the returned dict.
- (e.g. 'input_1' and 'input_1[]', which both get mapped to 'input_1'
- in the returned dict)
- """
- answers = dict()
-
- # webob.multidict.MultiDict is a view of a list of tuples,
- # so it will return a multi-value key once for each value.
- # We only want to consider each key a single time, so we use set(data.keys())
- for key in set(data.keys()):
- # e.g. input_resistor_1 ==> resistor_1
- _, _, name = key.partition('_') # pylint: disable=redefined-outer-name
-
- # If key has no underscores, then partition
- # will return (key, '', '')
- # We detect this and raise an error
- if not name:
- raise ValueError(u"{key} must contain at least one underscore".format(key=key))
-
- else:
- # This allows for answers which require more than one value for
- # the same form input (e.g. checkbox inputs). The convention is that
- # if the name ends with '[]' (which looks like an array), then the
- # answer will be an array.
- # if the name ends with '{}' (Which looks like a dict),
- # then the answer will be a dict
- is_list_key = name.endswith('[]')
- is_dict_key = name.endswith('{}')
- name = name[:-2] if is_list_key or is_dict_key else name
-
- if is_list_key:
- val = data.getall(key)
- elif is_dict_key:
- try:
- val = json.loads(data[key])
- # If the submission wasn't deserializable, raise an error.
- except(KeyError, ValueError):
- raise ValueError(
- u"Invalid submission: {val} for {key}".format(val=data[key], key=key)
- )
- else:
- val = data[key]
-
- # If the name already exists, then we don't want
- # to override it. Raise an error instead
- if name in answers:
- raise ValueError(u"Key {name} already exists in answers dict".format(name=name))
- else:
- answers[name] = val
-
- return answers
-
- def publish_grade(self):
- """
- Publishes the student's current grade to the system as an event
- """
- score = self.lcp.get_score()
- self.system.publish({
- 'event_name': 'grade',
- 'value': score['score'],
- 'max_value': score['total'],
- })
-
- return {'grade': score['score'], 'max_grade': score['total']}
-
- def check_problem(self, data):
- """
- Checks whether answers to a problem are correct
-
- Returns a map of correct/incorrect answers:
- {'success' : 'correct' | 'incorrect' | AJAX alert msg string,
- 'contents' : html}
- """
- event_info = dict()
- event_info['state'] = self.lcp.get_state()
- event_info['problem_id'] = self.location.url()
-
- answers = self.make_dict_of_responses(data)
- event_info['answers'] = convert_files_to_filenames(answers)
-
- # Too late. Cannot submit
- if self.closed():
- event_info['failure'] = 'closed'
- self.system.track_function('problem_check_fail', event_info)
- raise NotFoundError('Problem is closed')
-
- # Problem submitted. Student should reset before checking again
- if self.done and self.rerandomize == "always":
- event_info['failure'] = 'unreset'
- self.system.track_function('problem_check_fail', event_info)
- raise NotFoundError('Problem must be reset before it can be checked again')
-
- # Problem queued. Students must wait a specified waittime before they are allowed to submit
- if self.lcp.is_queued():
- current_time = datetime.datetime.now(UTC())
- prev_submit_time = self.lcp.get_recentmost_queuetime()
- waittime_between_requests = self.system.xqueue['waittime']
- if (current_time - prev_submit_time).total_seconds() < waittime_between_requests:
- msg = u'You must wait at least {wait} seconds between submissions'.format(
- wait=waittime_between_requests)
- return {'success': msg, 'html': ''} # Prompts a modal dialog in ajax callback
-
- try:
- correct_map = self.lcp.grade_answers(answers)
- self.attempts = self.attempts + 1
- self.lcp.done = True
- self.set_state_from_lcp()
-
- except (StudentInputError, ResponseError, LoncapaProblemError) as inst:
- log.warning("StudentInputError in capa_module:problem_check",
- exc_info=True)
-
- # Save the user's state before failing
- self.set_state_from_lcp()
-
- # If the user is a staff member, include
- # the full exception, including traceback,
- # in the response
- if self.system.user_is_staff:
- msg = u"Staff debug info: {tb}".format(tb=cgi.escape(traceback.format_exc()))
-
- # Otherwise, display just an error message,
- # without a stack trace
- else:
- msg = u"Error: {msg}".format(msg=inst.message)
-
- return {'success': msg}
-
- except Exception as err:
- # Save the user's state before failing
- self.set_state_from_lcp()
-
- if self.system.DEBUG:
- msg = u"Error checking problem: {}".format(err.message)
- msg += u'\nTraceback:\n{}'.format(traceback.format_exc())
- return {'success': msg}
- raise
-
- published_grade = self.publish_grade()
-
- # success = correct if ALL questions in this problem are correct
- success = 'correct'
- for answer_id in correct_map:
- if not correct_map.is_correct(answer_id):
- success = 'incorrect'
-
- # NOTE: We are logging both full grading and queued-grading submissions. In the latter,
- # 'success' will always be incorrect
- event_info['grade'] = published_grade['grade']
- event_info['max_grade'] = published_grade['max_grade']
- event_info['correct_map'] = correct_map.get_dict()
- event_info['success'] = success
- event_info['attempts'] = self.attempts
- self.system.track_function('problem_check', event_info)
-
- if hasattr(self.system, 'psychometrics_handler'): # update PsychometricsData using callback
- self.system.psychometrics_handler(self.get_state_for_lcp())
-
- # render problem into HTML
- html = self.get_problem_html(encapsulate=False)
-
- return {'success': success,
- 'contents': html,
- }
-
- def rescore_problem(self):
- """
- Checks whether the existing answers to a problem are correct.
-
- This is called when the correct answer to a problem has been changed,
- and the grade should be re-evaluated.
-
- Returns a dict with one key:
- {'success' : 'correct' | 'incorrect' | AJAX alert msg string }
-
- Raises NotFoundError if called on a problem that has not yet been
- answered, or NotImplementedError if it's a problem that cannot be rescored.
-
- Returns the error messages for exceptions occurring while performing
- the rescoring, rather than throwing them.
- """
- event_info = {'state': self.lcp.get_state(), 'problem_id': self.location.url()}
-
- if not self.lcp.supports_rescoring():
- event_info['failure'] = 'unsupported'
- self.system.track_function('problem_rescore_fail', event_info)
- raise NotImplementedError("Problem's definition does not support rescoring")
-
- if not self.done:
- event_info['failure'] = 'unanswered'
- self.system.track_function('problem_rescore_fail', event_info)
- raise NotFoundError('Problem must be answered before it can be graded again')
-
- # get old score, for comparison:
- orig_score = self.lcp.get_score()
- event_info['orig_score'] = orig_score['score']
- event_info['orig_total'] = orig_score['total']
-
- try:
- correct_map = self.lcp.rescore_existing_answers()
-
- except (StudentInputError, ResponseError, LoncapaProblemError) as inst:
- log.warning("Input error in capa_module:problem_rescore", exc_info=True)
- event_info['failure'] = 'input_error'
- self.system.track_function('problem_rescore_fail', event_info)
- return {'success': u"Error: {0}".format(inst.message)}
-
- except Exception as err:
- event_info['failure'] = 'unexpected'
- self.system.track_function('problem_rescore_fail', event_info)
- if self.system.DEBUG:
- msg = u"Error checking problem: {0}".format(err.message)
- msg += u'\nTraceback:\n' + traceback.format_exc()
- return {'success': msg}
- raise
-
- # rescoring should have no effect on attempts, so don't
- # need to increment here, or mark done. Just save.
- self.set_state_from_lcp()
-
- self.publish_grade()
-
- new_score = self.lcp.get_score()
- event_info['new_score'] = new_score['score']
- event_info['new_total'] = new_score['total']
-
- # success = correct if ALL questions in this problem are correct
- success = 'correct'
- for answer_id in correct_map:
- if not correct_map.is_correct(answer_id):
- success = 'incorrect'
-
- # NOTE: We are logging both full grading and queued-grading submissions. In the latter,
- # 'success' will always be incorrect
- event_info['correct_map'] = correct_map.get_dict()
- event_info['success'] = success
- event_info['attempts'] = self.attempts
- self.system.track_function('problem_rescore', event_info)
-
- # psychometrics should be called on rescoring requests in the same way as check-problem
- if hasattr(self.system, 'psychometrics_handler'): # update PsychometricsData using callback
- self.system.psychometrics_handler(self.get_state_for_lcp())
-
- return {'success': success}
-
- def save_problem(self, data):
- """
- Save the passed in answers.
- Returns a dict { 'success' : bool, 'msg' : message }
- The message is informative on success, and an error message on failure.
- """
- event_info = dict()
- event_info['state'] = self.lcp.get_state()
- event_info['problem_id'] = self.location.url()
-
- answers = self.make_dict_of_responses(data)
- event_info['answers'] = answers
-
- # Too late. Cannot submit
- if self.closed() and not self.max_attempts == 0:
- event_info['failure'] = 'closed'
- self.system.track_function('save_problem_fail', event_info)
- return {'success': False,
- 'msg': "Problem is closed"}
-
- # Problem submitted. Student should reset before saving
- # again.
- if self.done and self.rerandomize == "always":
- event_info['failure'] = 'done'
- self.system.track_function('save_problem_fail', event_info)
- return {'success': False,
- 'msg': "Problem needs to be reset prior to save"}
-
- self.lcp.student_answers = answers
-
- self.set_state_from_lcp()
-
- self.system.track_function('save_problem_success', event_info)
- msg = "Your answers have been saved"
- if not self.max_attempts == 0:
- msg += " but not graded. Hit 'Check' to grade them."
- return {'success': True,
- 'msg': msg}
-
- def reset_problem(self, _data):
- """
- Changes problem state to unfinished -- removes student answers,
- and causes problem to rerender itself.
-
- Returns a dictionary of the form:
- {'success': True/False,
- 'html': Problem HTML string }
-
- If an error occurs, the dictionary will also have an
- `error` key containing an error message.
- """
- event_info = dict()
- event_info['old_state'] = self.lcp.get_state()
- event_info['problem_id'] = self.location.url()
-
- if self.closed():
- event_info['failure'] = 'closed'
- self.system.track_function('reset_problem_fail', event_info)
- return {'success': False,
- 'error': "Problem is closed"}
-
- if not self.done:
- event_info['failure'] = 'not_done'
- self.system.track_function('reset_problem_fail', event_info)
- return {'success': False,
- 'error': "Refresh the page and make an attempt before resetting."}
-
- if self.rerandomize in ["always", "onreset"]:
- # Reset random number generator seed.
- self.choose_new_seed()
-
- # Generate a new problem with either the previous seed or a new seed
- self.lcp = self.new_lcp(None)
-
- # Pull in the new problem seed
- self.set_state_from_lcp()
-
- event_info['new_state'] = self.lcp.get_state()
- self.system.track_function('reset_problem', event_info)
-
- return {'success': True,
- 'html': self.get_problem_html(encapsulate=False)}
-
class CapaDescriptor(CapaFields, RawDescriptor):
"""
diff --git a/common/lib/xmodule/xmodule/tests/test_capa_module.py b/common/lib/xmodule/xmodule/tests/test_capa_module.py
index a76dc4eba7..6bb82f3d47 100644
--- a/common/lib/xmodule/xmodule/tests/test_capa_module.py
+++ b/common/lib/xmodule/xmodule/tests/test_capa_module.py
@@ -1335,8 +1335,8 @@ class CapaModuleTest(unittest.TestCase):
module = CapaFactory.create(rerandomize=rerandomize)
assert 0 <= module.seed < 1000
- @patch('xmodule.capa_module.log')
- @patch('xmodule.capa_module.Progress')
+ @patch('xmodule.capa_base.log')
+ @patch('xmodule.capa_base.Progress')
def test_get_progress_error(self, mock_progress, mock_log):
"""
Check that an exception given in `Progress` produces a `log.exception` call.
@@ -1349,7 +1349,7 @@ class CapaModuleTest(unittest.TestCase):
mock_log.exception.assert_called_once_with('Got bad progress')
mock_log.reset_mock()
- @patch('xmodule.capa_module.Progress')
+ @patch('xmodule.capa_base.Progress')
def test_get_progress_calculate_progress_fraction(self, mock_progress):
"""
Check that score and total are calculated correctly for the progress fraction.