diff --git a/common/lib/xmodule/xmodule/capa_base.py b/common/lib/xmodule/xmodule/capa_base.py new file mode 100644 index 0000000000..bf621fc396 --- /dev/null +++ b/common/lib/xmodule/xmodule/capa_base.py @@ -0,0 +1,1089 @@ +"""Implements basics of Capa, including class CapaModule.""" +import cgi +import datetime +import hashlib +import json +import logging +import os +import traceback +import struct +import sys + +from pkg_resources import resource_string + +from capa.capa_problem import LoncapaProblem +from capa.responsetypes import StudentInputError, \ + ResponseError, LoncapaProblemError +from capa.util import convert_files_to_filenames +from .progress import Progress +from xmodule.exceptions import NotFoundError, ProcessingError +from xblock.fields import Scope, String, Boolean, Dict, Integer, Float +from .fields import Timedelta, Date +from django.utils.timezone import UTC +from .util.duedate import get_extended_due_date + +log = logging.getLogger("edx.courseware") + + +# Generate this many different variants of problems with rerandomize=per_student +NUM_RANDOMIZATION_BINS = 20 +# Never produce more than this many different seeds, no matter what. +MAX_RANDOMIZATION_BINS = 1000 + + +def randomization_bin(seed, problem_id): + """ + Pick a randomization bin for the problem given the user's seed and a problem id. + + We do this because we only want e.g. 20 randomizations of a problem to make analytics + interesting. To avoid having sets of students that always get the same problems, + we'll combine the system's per-student seed with the problem id in picking the bin. + """ + r_hash = hashlib.sha1() + r_hash.update(str(seed)) + r_hash.update(str(problem_id)) + # get the first few digits of the hash, convert to an int, then mod. + return int(r_hash.hexdigest()[:7], 16) % NUM_RANDOMIZATION_BINS + + +class Randomization(String): + """ + Define a field to store how to randomize a problem. + """ + def from_json(self, value): + if value in ("", "true"): + return "always" + elif value == "false": + return "per_student" + return value + + to_json = from_json + + +class ComplexEncoder(json.JSONEncoder): + """ + Extend the JSON encoder to correctly handle complex numbers + """ + def default(self, obj): + """ + Print a nicely formatted complex number, or default to the JSON encoder + """ + if isinstance(obj, complex): + return u"{real:.7g}{imag:+.7g}*j".format(real=obj.real, imag=obj.imag) + return json.JSONEncoder.default(self, obj) + + +class CapaFields(object): + """ + Define the possible fields for a Capa problem + """ + display_name = String( + display_name="Display Name", + help="This name appears in the horizontal navigation at the top of the page.", + scope=Scope.settings, + # it'd be nice to have a useful default but it screws up other things; so, + # use display_name_with_default for those + default="Blank Advanced Problem" + ) + attempts = Integer(help="Number of attempts taken by the student on this problem", + default=0, scope=Scope.user_state) + max_attempts = Integer( + display_name="Maximum Attempts", + help=("Defines the number of times a student can try to answer this problem. " + "If the value is not set, infinite attempts are allowed."), + values={"min": 0}, scope=Scope.settings + ) + due = Date(help="Date that this problem is due by", scope=Scope.settings) + extended_due = Date( + help="Date that this problem is due by for a particular student. This " + "can be set by an instructor, and will override the global due " + "date if it is set to a date that is later than the global due " + "date.", + default=None, + scope=Scope.user_state, + ) + graceperiod = Timedelta( + help="Amount of time after the due date that submissions will be accepted", + scope=Scope.settings + ) + showanswer = String( + display_name="Show Answer", + help=("Defines when to show the answer to the problem. " + "A default value can be set in Advanced Settings."), + scope=Scope.settings, + default="finished", + values=[ + {"display_name": "Always", "value": "always"}, + {"display_name": "Answered", "value": "answered"}, + {"display_name": "Attempted", "value": "attempted"}, + {"display_name": "Closed", "value": "closed"}, + {"display_name": "Finished", "value": "finished"}, + {"display_name": "Past Due", "value": "past_due"}, + {"display_name": "Never", "value": "never"}] + ) + force_save_button = Boolean( + help="Whether to force the save button to appear on the page", + scope=Scope.settings, + default=False + ) + rerandomize = Randomization( + display_name="Randomization", + help="Defines how often inputs are randomized when a student loads the problem. " + "This setting only applies to problems that can have randomly generated numeric values. " + "A default value can be set in Advanced Settings.", + default="never", + scope=Scope.settings, + values=[ + {"display_name": "Always", "value": "always"}, + {"display_name": "On Reset", "value": "onreset"}, + {"display_name": "Never", "value": "never"}, + {"display_name": "Per Student", "value": "per_student"} + ] + ) + data = String(help="XML data for the problem", scope=Scope.content, default="") + correct_map = Dict(help="Dictionary with the correctness of current student answers", + scope=Scope.user_state, default={}) + input_state = Dict(help="Dictionary for maintaining the state of inputtypes", scope=Scope.user_state) + student_answers = Dict(help="Dictionary with the current student responses", scope=Scope.user_state) + done = Boolean(help="Whether the student has answered the problem", scope=Scope.user_state) + seed = Integer(help="Random seed for this student", scope=Scope.user_state) + weight = Float( + display_name="Problem Weight", + help=("Defines the number of points each problem is worth. " + "If the value is not set, each response field in the problem is worth one point."), + values={"min": 0, "step": .1}, + scope=Scope.settings + ) + markdown = String(help="Markdown source of this module", default=None, scope=Scope.settings) + source_code = String( + help="Source code for LaTeX and Word problems. This feature is not well-supported.", + scope=Scope.settings + ) + text_customization = Dict( + help="String customization substitutions for particular locations", + scope=Scope.settings + # TODO: someday it should be possible to not duplicate this definition here + # and in inheritance.py + ) + use_latex_compiler = Boolean( + help="Enable LaTeX templates?", + default=False, + scope=Scope.settings + ) + + +class CapaMixin(CapaFields): + """ + Core logic for Capa Problem, which can be used by XModules or XBlocks. + """ + + def __init__(self, *args, **kwargs): + super(CapaMixin, self).__init__(*args, **kwargs) + + due_date = get_extended_due_date(self) + + if self.graceperiod is not None and due_date: + self.close_date = due_date + self.graceperiod + else: + self.close_date = due_date + + if self.seed is None: + self.choose_new_seed() + + # Need the problem location in openendedresponse to send out. Adding + # it to the system here seems like the least clunky way to get it + # there. + self.runtime.set('location', self.location.url()) + + try: + # TODO (vshnayder): move as much as possible of this work and error + # checking to descriptor load time + self.lcp = self.new_lcp(self.get_state_for_lcp()) + + # At this point, we need to persist the randomization seed + # so that when the problem is re-loaded (to check/view/save) + # it stays the same. + # However, we do not want to write to the database + # every time the module is loaded. + # So we set the seed ONLY when there is not one set already + if self.seed is None: + self.seed = self.lcp.seed + + except Exception as err: # pylint: disable=broad-except + msg = u'cannot create LoncapaProblem {loc}: {err}'.format( + loc=self.location.url(), err=err) + # TODO (vshnayder): do modules need error handlers too? + # We shouldn't be switching on DEBUG. + if self.runtime.DEBUG: + log.warning(msg) + # TODO (vshnayder): This logic should be general, not here--and may + # want to preserve the data instead of replacing it. + # e.g. in the CMS + msg = u'

{msg}

'.format(msg=cgi.escape(msg)) + msg += u'

{tb}

'.format( + tb=cgi.escape(traceback.format_exc())) + # create a dummy problem with error message instead of failing + problem_text = (u'' + u'Problem {url} has an error:{msg}'.format( + url=self.location.url(), + msg=msg) + ) + self.lcp = self.new_lcp(self.get_state_for_lcp(), text=problem_text) + else: + # add extra info and raise + raise Exception(msg), None, sys.exc_info()[2] + + self.set_state_from_lcp() + + assert self.seed is not None + + def choose_new_seed(self): + """ + Choose a new seed. + """ + if self.rerandomize == 'never': + self.seed = 1 + elif self.rerandomize == "per_student" and hasattr(self.runtime, 'seed'): + # see comment on randomization_bin + self.seed = randomization_bin(self.runtime.seed, self.location.url) + else: + self.seed = struct.unpack('i', os.urandom(4))[0] + + # So that sandboxed code execution can be cached, but still have an interesting + # number of possibilities, cap the number of different random seeds. + self.seed %= MAX_RANDOMIZATION_BINS + + def new_lcp(self, state, text=None): + """ + Generate a new Loncapa Problem + """ + if text is None: + text = self.data + + return LoncapaProblem( + problem_text=text, + id=self.location.html_id(), + state=state, + seed=self.seed, + system=self.runtime, + ) + + def get_state_for_lcp(self): + """ + Give a dictionary holding the state of the module + """ + return { + 'done': self.done, + 'correct_map': self.correct_map, + 'student_answers': self.student_answers, + 'input_state': self.input_state, + 'seed': self.seed, + } + + def set_state_from_lcp(self): + """ + Set the module's state from the settings in `self.lcp` + """ + lcp_state = self.lcp.get_state() + self.done = lcp_state['done'] + self.correct_map = lcp_state['correct_map'] + self.input_state = lcp_state['input_state'] + self.student_answers = lcp_state['student_answers'] + self.seed = lcp_state['seed'] + + def get_score(self): + """ + Access the problem's score + """ + return self.lcp.get_score() + + def max_score(self): + """ + Access the problem's max score + """ + return self.lcp.get_max_score() + + def get_progress(self): + """ + For now, just return score / max_score + """ + score_dict = self.get_score() + score = score_dict['score'] + total = score_dict['total'] + + if total > 0: + if self.weight is not None: + # scale score and total by weight/total: + score = score * self.weight / total + total = self.weight + + try: + return Progress(score, total) + except (TypeError, ValueError): + log.exception("Got bad progress") + return None + return None + + def get_html(self): + """ + Return some html with data about the module + """ + progress = self.get_progress() + return self.runtime.render_template('problem_ajax.html', { + 'element_id': self.location.html_id(), + 'id': self.id, + 'ajax_url': self.runtime.ajax_url, + 'progress_status': Progress.to_js_status_str(progress), + 'progress_detail': Progress.to_js_detail_str(progress), + }) + + def check_button_name(self): + """ + Determine the name for the "check" button. + + Usually it is just "Check", but if this is the student's + final attempt, change the name to "Final Check". + The text can be customized by the text_customization setting. + """ + # The logic flow is a little odd so that _('xxx') strings can be found for + # translation while also running _() just once for each string. + _ = self.runtime.service(self, "i18n").ugettext + check = _('Check') + final_check = _('Final Check') + + # Apply customizations if present + if 'custom_check' in self.text_customization: + check = _(self.text_customization.get('custom_check')) + if 'custom_final_check' in self.text_customization: + final_check = _(self.text_customization.get('custom_final_check')) + # TODO: need a way to get the customized words into the list of + # words to be translated + + if self.max_attempts is not None and self.attempts >= self.max_attempts - 1: + return final_check + else: + return check + + def should_show_check_button(self): + """ + Return True/False to indicate whether to show the "Check" button. + """ + submitted_without_reset = (self.is_submitted() and self.rerandomize == "always") + + # If the problem is closed (past due / too many attempts) + # then we do NOT show the "check" button + # Also, do not show the "check" button if we're waiting + # for the user to reset a randomized problem + if self.closed() or submitted_without_reset: + return False + else: + return True + + def should_show_reset_button(self): + """ + Return True/False to indicate whether to show the "Reset" button. + """ + is_survey_question = (self.max_attempts == 0) + + if self.rerandomize in ["always", "onreset"]: + + # If the problem is closed (and not a survey question with max_attempts==0), + # then do NOT show the reset button. + # If the problem hasn't been submitted yet, then do NOT show + # the reset button. + if (self.closed() and not is_survey_question) or not self.is_submitted(): + return False + else: + return True + # Only randomized problems need a "reset" button + else: + return False + + def should_show_save_button(self): + """ + Return True/False to indicate whether to show the "Save" button. + """ + + # If the user has forced the save button to display, + # then show it as long as the problem is not closed + # (past due / too many attempts) + if self.force_save_button: + return not self.closed() + else: + is_survey_question = (self.max_attempts == 0) + needs_reset = self.is_submitted() and self.rerandomize == "always" + + # If the student has unlimited attempts, and their answers + # are not randomized, then we do not need a save button + # because they can use the "Check" button without consequences. + # + # The consequences we want to avoid are: + # * Using up an attempt (if max_attempts is set) + # * Changing the current problem, and no longer being + # able to view it (if rerandomize is "always") + # + # In those cases. the if statement below is false, + # and the save button can still be displayed. + # + if self.max_attempts is None and self.rerandomize != "always": + return False + + # If the problem is closed (and not a survey question with max_attempts==0), + # then do NOT show the save button + # If we're waiting for the user to reset a randomized problem + # then do NOT show the save button + elif (self.closed() and not is_survey_question) or needs_reset: + return False + else: + return True + + def handle_problem_html_error(self, err): + """ + Create a dummy problem to represent any errors. + + Change our problem to a dummy problem containing a warning message to + display to users. Returns the HTML to show to users + + `err` is the Exception encountered while rendering the problem HTML. + """ + log.exception(err.message) + + # TODO (vshnayder): another switch on DEBUG. + if self.runtime.DEBUG: + msg = ( + u'[courseware.capa.capa_module] ' + u'Failed to generate HTML for problem {url}'.format( + url=cgi.escape(self.location.url())) + ) + msg += u'

Error:

{msg}

'.format(msg=cgi.escape(err.message)) + msg += u'

{tb}

'.format(tb=cgi.escape(traceback.format_exc())) + html = msg + + else: + # We're in non-debug mode, and possibly even in production. We want + # to avoid bricking of problem as much as possible + + # Presumably, student submission has corrupted LoncapaProblem HTML. + # First, pull down all student answers + student_answers = self.lcp.student_answers + answer_ids = student_answers.keys() + + # Some inputtypes, such as dynamath, have additional "hidden" state that + # is not exposed to the student. Keep those hidden + # TODO: Use regex, e.g. 'dynamath' is suffix at end of answer_id + hidden_state_keywords = ['dynamath'] + for answer_id in answer_ids: + for hidden_state_keyword in hidden_state_keywords: + if answer_id.find(hidden_state_keyword) >= 0: + student_answers.pop(answer_id) + + # Next, generate a fresh LoncapaProblem + self.lcp = self.new_lcp(None) + self.set_state_from_lcp() + + # Prepend a scary warning to the student + warning = '
'\ + '

Warning: The problem has been reset to its initial state!

'\ + 'The problem\'s state was corrupted by an invalid submission. ' \ + 'The submission consisted of:'\ + ''\ + 'If this error persists, please contact the course staff.'\ + '
' + + html = warning + try: + html += self.lcp.get_html() + except Exception: # Couldn't do it. Give up + log.exception("Unable to generate html from LoncapaProblem") + raise + + return html + + def get_problem_html(self, encapsulate=True): + """ + Return html for the problem. + + Adds check, reset, save buttons as necessary based on the problem config and state. + """ + + try: + html = self.lcp.get_html() + + # If we cannot construct the problem HTML, + # then generate an error message instead. + except Exception as err: # pylint: disable=broad-except + html = self.handle_problem_html_error(err) + + # The convention is to pass the name of the check button + # if we want to show a check button, and False otherwise + # This works because non-empty strings evaluate to True + if self.should_show_check_button(): + check_button = self.check_button_name() + else: + check_button = False + + content = {'name': self.display_name_with_default, + 'html': html, + 'weight': self.weight, + } + + context = {'problem': content, + 'id': self.id, + 'check_button': check_button, + 'reset_button': self.should_show_reset_button(), + 'save_button': self.should_show_save_button(), + 'answer_available': self.answer_available(), + 'attempts_used': self.attempts, + 'attempts_allowed': self.max_attempts, + } + + html = self.runtime.render_template('problem.html', context) + + if encapsulate: + html = u'
'.format( + id=self.location.html_id(), ajax_url=self.runtime.ajax_url + ) + html + "
" + + # now do all the substitutions which the LMS module_render normally does, but + # we need to do here explicitly since we can get called for our HTML via AJAX + html = self.runtime.replace_urls(html) + if self.runtime.replace_course_urls: + html = self.runtime.replace_course_urls(html) + + if self.runtime.replace_jump_to_id_urls: + html = self.runtime.replace_jump_to_id_urls(html) + + return html + + def is_past_due(self): + """ + Is it now past this problem's due date, including grace period? + """ + return (self.close_date is not None and + datetime.datetime.now(UTC()) > self.close_date) + + def closed(self): + """ + Is the student still allowed to submit answers? + """ + if self.max_attempts is not None and self.attempts >= self.max_attempts: + return True + if self.is_past_due(): + return True + + return False + + def is_submitted(self): + """ + Used to decide to show or hide RESET or CHECK buttons. + + Means that student submitted problem and nothing more. + Problem can be completely wrong. + Pressing RESET button makes this function to return False. + """ + # used by conditional module + return self.lcp.done + + def is_attempted(self): + """ + Has the problem been attempted? + + used by conditional module + """ + return self.attempts > 0 + + def is_correct(self): + """ + True iff full points + """ + score_dict = self.get_score() + return score_dict['score'] == score_dict['total'] + + def answer_available(self): + """ + Is the user allowed to see an answer? + """ + if self.showanswer == '': + return False + elif self.showanswer == "never": + return False + elif self.runtime.user_is_staff: + # This is after the 'never' check because admins can see the answer + # unless the problem explicitly prevents it + return True + elif self.showanswer == 'attempted': + return self.attempts > 0 + elif self.showanswer == 'answered': + # NOTE: this is slightly different from 'attempted' -- resetting the problems + # makes lcp.done False, but leaves attempts unchanged. + return self.lcp.done + elif self.showanswer == 'closed': + return self.closed() + elif self.showanswer == 'finished': + return self.closed() or self.is_correct() + + elif self.showanswer == 'past_due': + return self.is_past_due() + elif self.showanswer == 'always': + return True + + return False + + def update_score(self, data): + """ + Delivers grading response (e.g. from asynchronous code checking) to + the capa problem, so its score can be updated + + 'data' must have a key 'response' which is a string that contains the + grader's response + + No ajax return is needed. Return empty dict. + """ + queuekey = data['queuekey'] + score_msg = data['xqueue_body'] + self.lcp.update_score(score_msg, queuekey) + self.set_state_from_lcp() + self.publish_grade() + + return dict() # No AJAX return is needed + + def handle_ungraded_response(self, data): + """ + Delivers a response from the XQueue to the capa problem + + The score of the problem will not be updated + + Args: + - data (dict) must contain keys: + queuekey - a key specific to this response + xqueue_body - the body of the response + Returns: + empty dictionary + + No ajax return is needed, so an empty dict is returned + """ + queuekey = data['queuekey'] + score_msg = data['xqueue_body'] + + # pass along the xqueue message to the problem + self.lcp.ungraded_response(score_msg, queuekey) + self.set_state_from_lcp() + return dict() + + def handle_input_ajax(self, data): + """ + Handle ajax calls meant for a particular input in the problem + + Args: + - data (dict) - data that should be passed to the input + Returns: + - dict containing the response from the input + """ + response = self.lcp.handle_input_ajax(data) + + # save any state changes that may occur + self.set_state_from_lcp() + return response + + def get_answer(self, _data): + """ + For the "show answer" button. + + Returns the answers: {'answers' : answers} + """ + event_info = dict() + event_info['problem_id'] = self.location.url() + self.runtime.track_function('showanswer', event_info) + if not self.answer_available(): + raise NotFoundError('Answer is not available') + else: + answers = self.lcp.get_question_answers() + self.set_state_from_lcp() + + # answers (eg ) may have embedded images + # but be careful, some problems are using non-string answer dicts + new_answers = dict() + for answer_id in answers: + try: + new_answer = {answer_id: self.runtime.replace_urls(answers[answer_id])} + except TypeError: + log.debug(u'Unable to perform URL substitution on answers[%s]: %s', + answer_id, answers[answer_id]) + new_answer = {answer_id: answers[answer_id]} + new_answers.update(new_answer) + + return {'answers': new_answers} + + # Figure out if we should move these to capa_problem? + def get_problem(self, _data): + """ + Return results of get_problem_html, as a simple dict for json-ing. + { 'html': } + + Used if we want to reconfirm we have the right thing e.g. after + several AJAX calls. + """ + return {'html': self.get_problem_html(encapsulate=False)} + + @staticmethod + def make_dict_of_responses(data): + """ + Make dictionary of student responses (aka "answers") + + `data` is POST dictionary (webob.multidict.MultiDict). + + The `data` dict has keys of the form 'x_y', which are mapped + to key 'y' in the returned dict. For example, + 'input_1_2_3' would be mapped to '1_2_3' in the returned dict. + + Some inputs always expect a list in the returned dict + (e.g. checkbox inputs). The convention is that + keys in the `data` dict that end with '[]' will always + have list values in the returned dict. + For example, if the `data` dict contains {'input_1[]': 'test' } + then the output dict would contain {'1': ['test'] } + (the value is a list). + + Some other inputs such as ChoiceTextInput expect a dict of values in the returned + dict If the key ends with '{}' then we will assume that the value is a json + encoded dict and deserialize it. + For example, if the `data` dict contains {'input_1{}': '{"1_2_1": 1}'} + then the output dict would contain {'1': {"1_2_1": 1} } + (the value is a dictionary) + + Raises an exception if: + + -A key in the `data` dictionary does not contain at least one underscore + (e.g. "input" is invalid, but "input_1" is valid) + + -Two keys end up with the same name in the returned dict. + (e.g. 'input_1' and 'input_1[]', which both get mapped to 'input_1' + in the returned dict) + """ + answers = dict() + + # webob.multidict.MultiDict is a view of a list of tuples, + # so it will return a multi-value key once for each value. + # We only want to consider each key a single time, so we use set(data.keys()) + for key in set(data.keys()): + # e.g. input_resistor_1 ==> resistor_1 + _, _, name = key.partition('_') # pylint: disable=redefined-outer-name + + # If key has no underscores, then partition + # will return (key, '', '') + # We detect this and raise an error + if not name: + raise ValueError(u"{key} must contain at least one underscore".format(key=key)) + + else: + # This allows for answers which require more than one value for + # the same form input (e.g. checkbox inputs). The convention is that + # if the name ends with '[]' (which looks like an array), then the + # answer will be an array. + # if the name ends with '{}' (Which looks like a dict), + # then the answer will be a dict + is_list_key = name.endswith('[]') + is_dict_key = name.endswith('{}') + name = name[:-2] if is_list_key or is_dict_key else name + + if is_list_key: + val = data.getall(key) + elif is_dict_key: + try: + val = json.loads(data[key]) + # If the submission wasn't deserializable, raise an error. + except(KeyError, ValueError): + raise ValueError( + u"Invalid submission: {val} for {key}".format(val=data[key], key=key) + ) + else: + val = data[key] + + # If the name already exists, then we don't want + # to override it. Raise an error instead + if name in answers: + raise ValueError(u"Key {name} already exists in answers dict".format(name=name)) + else: + answers[name] = val + + return answers + + def publish_grade(self): + """ + Publishes the student's current grade to the system as an event + """ + score = self.lcp.get_score() + self.runtime.publish({ + 'event_name': 'grade', + 'value': score['score'], + 'max_value': score['total'], + }) + + return {'grade': score['score'], 'max_grade': score['total']} + + def check_problem(self, data): + """ + Checks whether answers to a problem are correct + + Returns a map of correct/incorrect answers: + {'success' : 'correct' | 'incorrect' | AJAX alert msg string, + 'contents' : html} + """ + event_info = dict() + event_info['state'] = self.lcp.get_state() + event_info['problem_id'] = self.location.url() + + answers = self.make_dict_of_responses(data) + event_info['answers'] = convert_files_to_filenames(answers) + + # Too late. Cannot submit + if self.closed(): + event_info['failure'] = 'closed' + self.runtime.track_function('problem_check_fail', event_info) + raise NotFoundError('Problem is closed') + + # Problem submitted. Student should reset before checking again + if self.done and self.rerandomize == "always": + event_info['failure'] = 'unreset' + self.runtime.track_function('problem_check_fail', event_info) + raise NotFoundError('Problem must be reset before it can be checked again') + + # Problem queued. Students must wait a specified waittime before they are allowed to submit + if self.lcp.is_queued(): + current_time = datetime.datetime.now(UTC()) + prev_submit_time = self.lcp.get_recentmost_queuetime() + waittime_between_requests = self.runtime.xqueue['waittime'] + if (current_time - prev_submit_time).total_seconds() < waittime_between_requests: + msg = u'You must wait at least {wait} seconds between submissions'.format( + wait=waittime_between_requests) + return {'success': msg, 'html': ''} # Prompts a modal dialog in ajax callback + + try: + correct_map = self.lcp.grade_answers(answers) + self.attempts = self.attempts + 1 + self.lcp.done = True + self.set_state_from_lcp() + + except (StudentInputError, ResponseError, LoncapaProblemError) as inst: + log.warning("StudentInputError in capa_module:problem_check", + exc_info=True) + + # Save the user's state before failing + self.set_state_from_lcp() + + # If the user is a staff member, include + # the full exception, including traceback, + # in the response + if self.runtime.user_is_staff: + msg = u"Staff debug info: {tb}".format(tb=cgi.escape(traceback.format_exc())) + + # Otherwise, display just an error message, + # without a stack trace + else: + msg = u"Error: {msg}".format(msg=inst.message) + + return {'success': msg} + + except Exception as err: + # Save the user's state before failing + self.set_state_from_lcp() + + if self.runtime.DEBUG: + msg = u"Error checking problem: {}".format(err.message) + msg += u'\nTraceback:\n{}'.format(traceback.format_exc()) + return {'success': msg} + raise + + published_grade = self.publish_grade() + + # success = correct if ALL questions in this problem are correct + success = 'correct' + for answer_id in correct_map: + if not correct_map.is_correct(answer_id): + success = 'incorrect' + + # NOTE: We are logging both full grading and queued-grading submissions. In the latter, + # 'success' will always be incorrect + event_info['grade'] = published_grade['grade'] + event_info['max_grade'] = published_grade['max_grade'] + event_info['correct_map'] = correct_map.get_dict() + event_info['success'] = success + event_info['attempts'] = self.attempts + self.runtime.track_function('problem_check', event_info) + + if hasattr(self.runtime, 'psychometrics_handler'): # update PsychometricsData using callback + self.runtime.psychometrics_handler(self.get_state_for_lcp()) + + # render problem into HTML + html = self.get_problem_html(encapsulate=False) + + return {'success': success, + 'contents': html, + } + + def rescore_problem(self): + """ + Checks whether the existing answers to a problem are correct. + + This is called when the correct answer to a problem has been changed, + and the grade should be re-evaluated. + + Returns a dict with one key: + {'success' : 'correct' | 'incorrect' | AJAX alert msg string } + + Raises NotFoundError if called on a problem that has not yet been + answered, or NotImplementedError if it's a problem that cannot be rescored. + + Returns the error messages for exceptions occurring while performing + the rescoring, rather than throwing them. + """ + event_info = {'state': self.lcp.get_state(), 'problem_id': self.location.url()} + + if not self.lcp.supports_rescoring(): + event_info['failure'] = 'unsupported' + self.runtime.track_function('problem_rescore_fail', event_info) + raise NotImplementedError("Problem's definition does not support rescoring") + + if not self.done: + event_info['failure'] = 'unanswered' + self.runtime.track_function('problem_rescore_fail', event_info) + raise NotFoundError('Problem must be answered before it can be graded again') + + # get old score, for comparison: + orig_score = self.lcp.get_score() + event_info['orig_score'] = orig_score['score'] + event_info['orig_total'] = orig_score['total'] + + try: + correct_map = self.lcp.rescore_existing_answers() + + except (StudentInputError, ResponseError, LoncapaProblemError) as inst: + log.warning("Input error in capa_module:problem_rescore", exc_info=True) + event_info['failure'] = 'input_error' + self.runtime.track_function('problem_rescore_fail', event_info) + return {'success': u"Error: {0}".format(inst.message)} + + except Exception as err: + event_info['failure'] = 'unexpected' + self.runtime.track_function('problem_rescore_fail', event_info) + if self.runtime.DEBUG: + msg = u"Error checking problem: {0}".format(err.message) + msg += u'\nTraceback:\n' + traceback.format_exc() + return {'success': msg} + raise + + # rescoring should have no effect on attempts, so don't + # need to increment here, or mark done. Just save. + self.set_state_from_lcp() + + self.publish_grade() + + new_score = self.lcp.get_score() + event_info['new_score'] = new_score['score'] + event_info['new_total'] = new_score['total'] + + # success = correct if ALL questions in this problem are correct + success = 'correct' + for answer_id in correct_map: + if not correct_map.is_correct(answer_id): + success = 'incorrect' + + # NOTE: We are logging both full grading and queued-grading submissions. In the latter, + # 'success' will always be incorrect + event_info['correct_map'] = correct_map.get_dict() + event_info['success'] = success + event_info['attempts'] = self.attempts + self.runtime.track_function('problem_rescore', event_info) + + # psychometrics should be called on rescoring requests in the same way as check-problem + if hasattr(self.runtime, 'psychometrics_handler'): # update PsychometricsData using callback + self.runtime.psychometrics_handler(self.get_state_for_lcp()) + + return {'success': success} + + def save_problem(self, data): + """ + Save the passed in answers. + Returns a dict { 'success' : bool, 'msg' : message } + The message is informative on success, and an error message on failure. + """ + event_info = dict() + event_info['state'] = self.lcp.get_state() + event_info['problem_id'] = self.location.url() + + answers = self.make_dict_of_responses(data) + event_info['answers'] = answers + + # Too late. Cannot submit + if self.closed() and not self.max_attempts == 0: + event_info['failure'] = 'closed' + self.runtime.track_function('save_problem_fail', event_info) + return {'success': False, + 'msg': "Problem is closed"} + + # Problem submitted. Student should reset before saving + # again. + if self.done and self.rerandomize == "always": + event_info['failure'] = 'done' + self.runtime.track_function('save_problem_fail', event_info) + return {'success': False, + 'msg': "Problem needs to be reset prior to save"} + + self.lcp.student_answers = answers + + self.set_state_from_lcp() + + self.runtime.track_function('save_problem_success', event_info) + msg = "Your answers have been saved" + if not self.max_attempts == 0: + msg += " but not graded. Hit 'Check' to grade them." + return {'success': True, + 'msg': msg} + + def reset_problem(self, _data): + """ + Changes problem state to unfinished -- removes student answers, + and causes problem to rerender itself. + + Returns a dictionary of the form: + {'success': True/False, + 'html': Problem HTML string } + + If an error occurs, the dictionary will also have an + `error` key containing an error message. + """ + event_info = dict() + event_info['old_state'] = self.lcp.get_state() + event_info['problem_id'] = self.location.url() + + if self.closed(): + event_info['failure'] = 'closed' + self.runtime.track_function('reset_problem_fail', event_info) + return {'success': False, + 'error': "Problem is closed"} + + if not self.done: + event_info['failure'] = 'not_done' + self.runtime.track_function('reset_problem_fail', event_info) + return {'success': False, + 'error': "Refresh the page and make an attempt before resetting."} + + if self.rerandomize in ["always", "onreset"]: + # Reset random number generator seed. + self.choose_new_seed() + + # Generate a new problem with either the previous seed or a new seed + self.lcp = self.new_lcp(None) + + # Pull in the new problem seed + self.set_state_from_lcp() + + event_info['new_state'] = self.lcp.get_state() + self.runtime.track_function('reset_problem', event_info) + + return {'success': True, + 'html': self.get_problem_html(encapsulate=False)} diff --git a/common/lib/xmodule/xmodule/capa_module.py b/common/lib/xmodule/xmodule/capa_module.py index 60bf55b9f3..d00fe6cf76 100644 --- a/common/lib/xmodule/xmodule/capa_module.py +++ b/common/lib/xmodule/xmodule/capa_module.py @@ -1,180 +1,20 @@ """Implements basics of Capa, including class CapaModule.""" -import cgi -import datetime -import hashlib import json import logging -import os -import traceback -import struct import sys from pkg_resources import resource_string -from capa.capa_problem import LoncapaProblem -from capa.responsetypes import StudentInputError, \ - ResponseError, LoncapaProblemError -from capa.util import convert_files_to_filenames +from .capa_base import CapaMixin, CapaFields, ComplexEncoder from .progress import Progress from xmodule.x_module import XModule, module_attr from xmodule.raw_module import RawDescriptor from xmodule.exceptions import NotFoundError, ProcessingError -from xblock.fields import Scope, String, Boolean, Dict, Integer, Float -from .fields import Timedelta, Date -from django.utils.timezone import UTC -from .util.duedate import get_extended_due_date log = logging.getLogger("edx.courseware") -# Generate this many different variants of problems with rerandomize=per_student -NUM_RANDOMIZATION_BINS = 20 -# Never produce more than this many different seeds, no matter what. -MAX_RANDOMIZATION_BINS = 1000 - - -def randomization_bin(seed, problem_id): - """ - Pick a randomization bin for the problem given the user's seed and a problem id. - - We do this because we only want e.g. 20 randomizations of a problem to make analytics - interesting. To avoid having sets of students that always get the same problems, - we'll combine the system's per-student seed with the problem id in picking the bin. - """ - r_hash = hashlib.sha1() - r_hash.update(str(seed)) - r_hash.update(str(problem_id)) - # get the first few digits of the hash, convert to an int, then mod. - return int(r_hash.hexdigest()[:7], 16) % NUM_RANDOMIZATION_BINS - - -class Randomization(String): - """ - Define a field to store how to randomize a problem. - """ - def from_json(self, value): - if value in ("", "true"): - return "always" - elif value == "false": - return "per_student" - return value - - to_json = from_json - - -class ComplexEncoder(json.JSONEncoder): - """ - Extend the JSON encoder to correctly handle complex numbers - """ - def default(self, obj): - """ - Print a nicely formatted complex number, or default to the JSON encoder - """ - if isinstance(obj, complex): - return u"{real:.7g}{imag:+.7g}*j".format(real=obj.real, imag=obj.imag) - return json.JSONEncoder.default(self, obj) - - -class CapaFields(object): - """ - Define the possible fields for a Capa problem - """ - display_name = String( - display_name="Display Name", - help="This name appears in the horizontal navigation at the top of the page.", - scope=Scope.settings, - # it'd be nice to have a useful default but it screws up other things; so, - # use display_name_with_default for those - default="Blank Advanced Problem" - ) - attempts = Integer(help="Number of attempts taken by the student on this problem", - default=0, scope=Scope.user_state) - max_attempts = Integer( - display_name="Maximum Attempts", - help=("Defines the number of times a student can try to answer this problem. " - "If the value is not set, infinite attempts are allowed."), - values={"min": 0}, scope=Scope.settings - ) - due = Date(help="Date that this problem is due by", scope=Scope.settings) - extended_due = Date( - help="Date that this problem is due by for a particular student. This " - "can be set by an instructor, and will override the global due " - "date if it is set to a date that is later than the global due " - "date.", - default=None, - scope=Scope.user_state, - ) - graceperiod = Timedelta( - help="Amount of time after the due date that submissions will be accepted", - scope=Scope.settings - ) - showanswer = String( - display_name="Show Answer", - help=("Defines when to show the answer to the problem. " - "A default value can be set in Advanced Settings."), - scope=Scope.settings, - default="finished", - values=[ - {"display_name": "Always", "value": "always"}, - {"display_name": "Answered", "value": "answered"}, - {"display_name": "Attempted", "value": "attempted"}, - {"display_name": "Closed", "value": "closed"}, - {"display_name": "Finished", "value": "finished"}, - {"display_name": "Past Due", "value": "past_due"}, - {"display_name": "Never", "value": "never"}] - ) - force_save_button = Boolean( - help="Whether to force the save button to appear on the page", - scope=Scope.settings, - default=False - ) - rerandomize = Randomization( - display_name="Randomization", - help="Defines how often inputs are randomized when a student loads the problem. " - "This setting only applies to problems that can have randomly generated numeric values. " - "A default value can be set in Advanced Settings.", - default="never", - scope=Scope.settings, - values=[ - {"display_name": "Always", "value": "always"}, - {"display_name": "On Reset", "value": "onreset"}, - {"display_name": "Never", "value": "never"}, - {"display_name": "Per Student", "value": "per_student"} - ] - ) - data = String(help="XML data for the problem", scope=Scope.content, default="") - correct_map = Dict(help="Dictionary with the correctness of current student answers", - scope=Scope.user_state, default={}) - input_state = Dict(help="Dictionary for maintaining the state of inputtypes", scope=Scope.user_state) - student_answers = Dict(help="Dictionary with the current student responses", scope=Scope.user_state) - done = Boolean(help="Whether the student has answered the problem", scope=Scope.user_state) - seed = Integer(help="Random seed for this student", scope=Scope.user_state) - weight = Float( - display_name="Problem Weight", - help=("Defines the number of points each problem is worth. " - "If the value is not set, each response field in the problem is worth one point."), - values={"min": 0, "step": .1}, - scope=Scope.settings - ) - markdown = String(help="Markdown source of this module", default=None, scope=Scope.settings) - source_code = String( - help="Source code for LaTeX and Word problems. This feature is not well-supported.", - scope=Scope.settings - ) - text_customization = Dict( - help="String customization substitutions for particular locations", - scope=Scope.settings - # TODO: someday it should be possible to not duplicate this definition here - # and in inheritance.py - ) - use_latex_compiler = Boolean( - help="Enable LaTeX templates?", - default=False, - scope=Scope.settings - ) - - -class CapaModule(CapaFields, XModule): +class CapaModule(CapaMixin, XModule): """ An XModule implementing LonCapa format problems, implemented by way of capa.capa_problem.LoncapaProblem @@ -200,385 +40,6 @@ class CapaModule(CapaFields, XModule): """ super(CapaModule, self).__init__(*args, **kwargs) - due_date = get_extended_due_date(self) - - if self.graceperiod is not None and due_date: - self.close_date = due_date + self.graceperiod - else: - self.close_date = due_date - - if self.seed is None: - self.choose_new_seed() - - # Need the problem location in openendedresponse to send out. Adding - # it to the system here seems like the least clunky way to get it - # there. - self.system.set('location', self.location.url()) - - try: - # TODO (vshnayder): move as much as possible of this work and error - # checking to descriptor load time - self.lcp = self.new_lcp(self.get_state_for_lcp()) - - # At this point, we need to persist the randomization seed - # so that when the problem is re-loaded (to check/view/save) - # it stays the same. - # However, we do not want to write to the database - # every time the module is loaded. - # So we set the seed ONLY when there is not one set already - if self.seed is None: - self.seed = self.lcp.seed - - except Exception as err: # pylint: disable=broad-except - msg = u'cannot create LoncapaProblem {loc}: {err}'.format( - loc=self.location.url(), err=err) - # TODO (vshnayder): do modules need error handlers too? - # We shouldn't be switching on DEBUG. - if self.system.DEBUG: - log.warning(msg) - # TODO (vshnayder): This logic should be general, not here--and may - # want to preserve the data instead of replacing it. - # e.g. in the CMS - msg = u'

{msg}

'.format(msg=cgi.escape(msg)) - msg += u'

{tb}

'.format( - tb=cgi.escape(traceback.format_exc())) - # create a dummy problem with error message instead of failing - problem_text = (u'' - u'Problem {url} has an error:{msg}'.format( - url=self.location.url(), - msg=msg) - ) - self.lcp = self.new_lcp(self.get_state_for_lcp(), text=problem_text) - else: - # add extra info and raise - raise Exception(msg), None, sys.exc_info()[2] - - self.set_state_from_lcp() - - assert self.seed is not None - - def choose_new_seed(self): - """ - Choose a new seed. - """ - if self.rerandomize == 'never': - self.seed = 1 - elif self.rerandomize == "per_student" and hasattr(self.system, 'seed'): - # see comment on randomization_bin - self.seed = randomization_bin(self.system.seed, self.location.url) - else: - self.seed = struct.unpack('i', os.urandom(4))[0] - - # So that sandboxed code execution can be cached, but still have an interesting - # number of possibilities, cap the number of different random seeds. - self.seed %= MAX_RANDOMIZATION_BINS - - def new_lcp(self, state, text=None): - """ - Generate a new Loncapa Problem - """ - if text is None: - text = self.data - - return LoncapaProblem( - problem_text=text, - id=self.location.html_id(), - state=state, - seed=self.seed, - system=self.system, - ) - - def get_state_for_lcp(self): - """ - Give a dictionary holding the state of the module - """ - return { - 'done': self.done, - 'correct_map': self.correct_map, - 'student_answers': self.student_answers, - 'input_state': self.input_state, - 'seed': self.seed, - } - - def set_state_from_lcp(self): - """ - Set the module's state from the settings in `self.lcp` - """ - lcp_state = self.lcp.get_state() - self.done = lcp_state['done'] - self.correct_map = lcp_state['correct_map'] - self.input_state = lcp_state['input_state'] - self.student_answers = lcp_state['student_answers'] - self.seed = lcp_state['seed'] - - def get_score(self): - """ - Access the problem's score - """ - return self.lcp.get_score() - - def max_score(self): - """ - Access the problem's max score - """ - return self.lcp.get_max_score() - - def get_progress(self): - """ - For now, just return score / max_score - """ - score_dict = self.get_score() - score = score_dict['score'] - total = score_dict['total'] - - if total > 0: - if self.weight is not None: - # scale score and total by weight/total: - score = score * self.weight / total - total = self.weight - - try: - return Progress(score, total) - except (TypeError, ValueError): - log.exception("Got bad progress") - return None - return None - - def get_html(self): - """ - Return some html with data about the module - """ - progress = self.get_progress() - return self.system.render_template('problem_ajax.html', { - 'element_id': self.location.html_id(), - 'id': self.id, - 'ajax_url': self.system.ajax_url, - 'progress_status': Progress.to_js_status_str(progress), - 'progress_detail': Progress.to_js_detail_str(progress), - }) - - def check_button_name(self): - """ - Determine the name for the "check" button. - - Usually it is just "Check", but if this is the student's - final attempt, change the name to "Final Check". - The text can be customized by the text_customization setting. - """ - # The logic flow is a little odd so that _('xxx') strings can be found for - # translation while also running _() just once for each string. - _ = self.runtime.service(self, "i18n").ugettext - check = _('Check') - final_check = _('Final Check') - - # Apply customizations if present - if 'custom_check' in self.text_customization: - check = _(self.text_customization.get('custom_check')) - if 'custom_final_check' in self.text_customization: - final_check = _(self.text_customization.get('custom_final_check')) - # TODO: need a way to get the customized words into the list of - # words to be translated - - if self.max_attempts is not None and self.attempts >= self.max_attempts - 1: - return final_check - else: - return check - - def should_show_check_button(self): - """ - Return True/False to indicate whether to show the "Check" button. - """ - submitted_without_reset = (self.is_submitted() and self.rerandomize == "always") - - # If the problem is closed (past due / too many attempts) - # then we do NOT show the "check" button - # Also, do not show the "check" button if we're waiting - # for the user to reset a randomized problem - if self.closed() or submitted_without_reset: - return False - else: - return True - - def should_show_reset_button(self): - """ - Return True/False to indicate whether to show the "Reset" button. - """ - is_survey_question = (self.max_attempts == 0) - - if self.rerandomize in ["always", "onreset"]: - - # If the problem is closed (and not a survey question with max_attempts==0), - # then do NOT show the reset button. - # If the problem hasn't been submitted yet, then do NOT show - # the reset button. - if (self.closed() and not is_survey_question) or not self.is_submitted(): - return False - else: - return True - # Only randomized problems need a "reset" button - else: - return False - - def should_show_save_button(self): - """ - Return True/False to indicate whether to show the "Save" button. - """ - - # If the user has forced the save button to display, - # then show it as long as the problem is not closed - # (past due / too many attempts) - if self.force_save_button: - return not self.closed() - else: - is_survey_question = (self.max_attempts == 0) - needs_reset = self.is_submitted() and self.rerandomize == "always" - - # If the student has unlimited attempts, and their answers - # are not randomized, then we do not need a save button - # because they can use the "Check" button without consequences. - # - # The consequences we want to avoid are: - # * Using up an attempt (if max_attempts is set) - # * Changing the current problem, and no longer being - # able to view it (if rerandomize is "always") - # - # In those cases. the if statement below is false, - # and the save button can still be displayed. - # - if self.max_attempts is None and self.rerandomize != "always": - return False - - # If the problem is closed (and not a survey question with max_attempts==0), - # then do NOT show the save button - # If we're waiting for the user to reset a randomized problem - # then do NOT show the save button - elif (self.closed() and not is_survey_question) or needs_reset: - return False - else: - return True - - def handle_problem_html_error(self, err): - """ - Create a dummy problem to represent any errors. - - Change our problem to a dummy problem containing a warning message to - display to users. Returns the HTML to show to users - - `err` is the Exception encountered while rendering the problem HTML. - """ - log.exception(err.message) - - # TODO (vshnayder): another switch on DEBUG. - if self.system.DEBUG: - msg = ( - u'[courseware.capa.capa_module] ' - u'Failed to generate HTML for problem {url}'.format( - url=cgi.escape(self.location.url())) - ) - msg += u'

Error:

{msg}

'.format(msg=cgi.escape(err.message)) - msg += u'

{tb}

'.format(tb=cgi.escape(traceback.format_exc())) - html = msg - - else: - # We're in non-debug mode, and possibly even in production. We want - # to avoid bricking of problem as much as possible - - # Presumably, student submission has corrupted LoncapaProblem HTML. - # First, pull down all student answers - student_answers = self.lcp.student_answers - answer_ids = student_answers.keys() - - # Some inputtypes, such as dynamath, have additional "hidden" state that - # is not exposed to the student. Keep those hidden - # TODO: Use regex, e.g. 'dynamath' is suffix at end of answer_id - hidden_state_keywords = ['dynamath'] - for answer_id in answer_ids: - for hidden_state_keyword in hidden_state_keywords: - if answer_id.find(hidden_state_keyword) >= 0: - student_answers.pop(answer_id) - - # Next, generate a fresh LoncapaProblem - self.lcp = self.new_lcp(None) - self.set_state_from_lcp() - - # Prepend a scary warning to the student - warning = '
'\ - '

Warning: The problem has been reset to its initial state!

'\ - 'The problem\'s state was corrupted by an invalid submission. ' \ - 'The submission consisted of:'\ - '
    ' - for student_answer in student_answers.values(): - if student_answer != '': - warning += '
  • ' + cgi.escape(student_answer) + '
  • ' - warning += '
'\ - 'If this error persists, please contact the course staff.'\ - '
' - - html = warning - try: - html += self.lcp.get_html() - except Exception: # Couldn't do it. Give up - log.exception("Unable to generate html from LoncapaProblem") - raise - - return html - - def get_problem_html(self, encapsulate=True): - """ - Return html for the problem. - - Adds check, reset, save buttons as necessary based on the problem config and state. - """ - - try: - html = self.lcp.get_html() - - # If we cannot construct the problem HTML, - # then generate an error message instead. - except Exception as err: # pylint: disable=broad-except - html = self.handle_problem_html_error(err) - - # The convention is to pass the name of the check button - # if we want to show a check button, and False otherwise - # This works because non-empty strings evaluate to True - if self.should_show_check_button(): - check_button = self.check_button_name() - else: - check_button = False - - content = {'name': self.display_name_with_default, - 'html': html, - 'weight': self.weight, - } - - context = {'problem': content, - 'id': self.id, - 'check_button': check_button, - 'reset_button': self.should_show_reset_button(), - 'save_button': self.should_show_save_button(), - 'answer_available': self.answer_available(), - 'attempts_used': self.attempts, - 'attempts_allowed': self.max_attempts, - } - - html = self.system.render_template('problem.html', context) - - if encapsulate: - html = u'
'.format( - id=self.location.html_id(), ajax_url=self.system.ajax_url - ) + html + "
" - - # now do all the substitutions which the LMS module_render normally does, but - # we need to do here explicitly since we can get called for our HTML via AJAX - html = self.system.replace_urls(html) - if self.system.replace_course_urls: - html = self.system.replace_course_urls(html) - - if self.system.replace_jump_to_id_urls: - html = self.system.replace_jump_to_id_urls(html) - - return html - def handle_ajax(self, dispatch, data): """ This is called by courseware.module_render, to handle an AJAX call. @@ -637,535 +98,6 @@ class CapaModule(CapaFields, XModule): return json.dumps(result, cls=ComplexEncoder) - def is_past_due(self): - """ - Is it now past this problem's due date, including grace period? - """ - return (self.close_date is not None and - datetime.datetime.now(UTC()) > self.close_date) - - def closed(self): - """ - Is the student still allowed to submit answers? - """ - if self.max_attempts is not None and self.attempts >= self.max_attempts: - return True - if self.is_past_due(): - return True - - return False - - def is_submitted(self): - """ - Used to decide to show or hide RESET or CHECK buttons. - - Means that student submitted problem and nothing more. - Problem can be completely wrong. - Pressing RESET button makes this function to return False. - """ - # used by conditional module - return self.lcp.done - - def is_attempted(self): - """ - Has the problem been attempted? - - used by conditional module - """ - return self.attempts > 0 - - def is_correct(self): - """ - True iff full points - """ - score_dict = self.get_score() - return score_dict['score'] == score_dict['total'] - - def answer_available(self): - """ - Is the user allowed to see an answer? - """ - if self.showanswer == '': - return False - elif self.showanswer == "never": - return False - elif self.system.user_is_staff: - # This is after the 'never' check because admins can see the answer - # unless the problem explicitly prevents it - return True - elif self.showanswer == 'attempted': - return self.attempts > 0 - elif self.showanswer == 'answered': - # NOTE: this is slightly different from 'attempted' -- resetting the problems - # makes lcp.done False, but leaves attempts unchanged. - return self.lcp.done - elif self.showanswer == 'closed': - return self.closed() - elif self.showanswer == 'finished': - return self.closed() or self.is_correct() - - elif self.showanswer == 'past_due': - return self.is_past_due() - elif self.showanswer == 'always': - return True - - return False - - def update_score(self, data): - """ - Delivers grading response (e.g. from asynchronous code checking) to - the capa problem, so its score can be updated - - 'data' must have a key 'response' which is a string that contains the - grader's response - - No ajax return is needed. Return empty dict. - """ - queuekey = data['queuekey'] - score_msg = data['xqueue_body'] - self.lcp.update_score(score_msg, queuekey) - self.set_state_from_lcp() - self.publish_grade() - - return dict() # No AJAX return is needed - - def handle_ungraded_response(self, data): - """ - Delivers a response from the XQueue to the capa problem - - The score of the problem will not be updated - - Args: - - data (dict) must contain keys: - queuekey - a key specific to this response - xqueue_body - the body of the response - Returns: - empty dictionary - - No ajax return is needed, so an empty dict is returned - """ - queuekey = data['queuekey'] - score_msg = data['xqueue_body'] - - # pass along the xqueue message to the problem - self.lcp.ungraded_response(score_msg, queuekey) - self.set_state_from_lcp() - return dict() - - def handle_input_ajax(self, data): - """ - Handle ajax calls meant for a particular input in the problem - - Args: - - data (dict) - data that should be passed to the input - Returns: - - dict containing the response from the input - """ - response = self.lcp.handle_input_ajax(data) - - # save any state changes that may occur - self.set_state_from_lcp() - return response - - def get_answer(self, _data): - """ - For the "show answer" button. - - Returns the answers: {'answers' : answers} - """ - event_info = dict() - event_info['problem_id'] = self.location.url() - self.system.track_function('showanswer', event_info) - if not self.answer_available(): - raise NotFoundError('Answer is not available') - else: - answers = self.lcp.get_question_answers() - self.set_state_from_lcp() - - # answers (eg ) may have embedded images - # but be careful, some problems are using non-string answer dicts - new_answers = dict() - for answer_id in answers: - try: - new_answer = {answer_id: self.system.replace_urls(answers[answer_id])} - except TypeError: - log.debug(u'Unable to perform URL substitution on answers[%s]: %s', - answer_id, answers[answer_id]) - new_answer = {answer_id: answers[answer_id]} - new_answers.update(new_answer) - - return {'answers': new_answers} - - # Figure out if we should move these to capa_problem? - def get_problem(self, _data): - """ - Return results of get_problem_html, as a simple dict for json-ing. - { 'html': } - - Used if we want to reconfirm we have the right thing e.g. after - several AJAX calls. - """ - return {'html': self.get_problem_html(encapsulate=False)} - - @staticmethod - def make_dict_of_responses(data): - """ - Make dictionary of student responses (aka "answers") - - `data` is POST dictionary (webob.multidict.MultiDict). - - The `data` dict has keys of the form 'x_y', which are mapped - to key 'y' in the returned dict. For example, - 'input_1_2_3' would be mapped to '1_2_3' in the returned dict. - - Some inputs always expect a list in the returned dict - (e.g. checkbox inputs). The convention is that - keys in the `data` dict that end with '[]' will always - have list values in the returned dict. - For example, if the `data` dict contains {'input_1[]': 'test' } - then the output dict would contain {'1': ['test'] } - (the value is a list). - - Some other inputs such as ChoiceTextInput expect a dict of values in the returned - dict If the key ends with '{}' then we will assume that the value is a json - encoded dict and deserialize it. - For example, if the `data` dict contains {'input_1{}': '{"1_2_1": 1}'} - then the output dict would contain {'1': {"1_2_1": 1} } - (the value is a dictionary) - - Raises an exception if: - - -A key in the `data` dictionary does not contain at least one underscore - (e.g. "input" is invalid, but "input_1" is valid) - - -Two keys end up with the same name in the returned dict. - (e.g. 'input_1' and 'input_1[]', which both get mapped to 'input_1' - in the returned dict) - """ - answers = dict() - - # webob.multidict.MultiDict is a view of a list of tuples, - # so it will return a multi-value key once for each value. - # We only want to consider each key a single time, so we use set(data.keys()) - for key in set(data.keys()): - # e.g. input_resistor_1 ==> resistor_1 - _, _, name = key.partition('_') # pylint: disable=redefined-outer-name - - # If key has no underscores, then partition - # will return (key, '', '') - # We detect this and raise an error - if not name: - raise ValueError(u"{key} must contain at least one underscore".format(key=key)) - - else: - # This allows for answers which require more than one value for - # the same form input (e.g. checkbox inputs). The convention is that - # if the name ends with '[]' (which looks like an array), then the - # answer will be an array. - # if the name ends with '{}' (Which looks like a dict), - # then the answer will be a dict - is_list_key = name.endswith('[]') - is_dict_key = name.endswith('{}') - name = name[:-2] if is_list_key or is_dict_key else name - - if is_list_key: - val = data.getall(key) - elif is_dict_key: - try: - val = json.loads(data[key]) - # If the submission wasn't deserializable, raise an error. - except(KeyError, ValueError): - raise ValueError( - u"Invalid submission: {val} for {key}".format(val=data[key], key=key) - ) - else: - val = data[key] - - # If the name already exists, then we don't want - # to override it. Raise an error instead - if name in answers: - raise ValueError(u"Key {name} already exists in answers dict".format(name=name)) - else: - answers[name] = val - - return answers - - def publish_grade(self): - """ - Publishes the student's current grade to the system as an event - """ - score = self.lcp.get_score() - self.system.publish({ - 'event_name': 'grade', - 'value': score['score'], - 'max_value': score['total'], - }) - - return {'grade': score['score'], 'max_grade': score['total']} - - def check_problem(self, data): - """ - Checks whether answers to a problem are correct - - Returns a map of correct/incorrect answers: - {'success' : 'correct' | 'incorrect' | AJAX alert msg string, - 'contents' : html} - """ - event_info = dict() - event_info['state'] = self.lcp.get_state() - event_info['problem_id'] = self.location.url() - - answers = self.make_dict_of_responses(data) - event_info['answers'] = convert_files_to_filenames(answers) - - # Too late. Cannot submit - if self.closed(): - event_info['failure'] = 'closed' - self.system.track_function('problem_check_fail', event_info) - raise NotFoundError('Problem is closed') - - # Problem submitted. Student should reset before checking again - if self.done and self.rerandomize == "always": - event_info['failure'] = 'unreset' - self.system.track_function('problem_check_fail', event_info) - raise NotFoundError('Problem must be reset before it can be checked again') - - # Problem queued. Students must wait a specified waittime before they are allowed to submit - if self.lcp.is_queued(): - current_time = datetime.datetime.now(UTC()) - prev_submit_time = self.lcp.get_recentmost_queuetime() - waittime_between_requests = self.system.xqueue['waittime'] - if (current_time - prev_submit_time).total_seconds() < waittime_between_requests: - msg = u'You must wait at least {wait} seconds between submissions'.format( - wait=waittime_between_requests) - return {'success': msg, 'html': ''} # Prompts a modal dialog in ajax callback - - try: - correct_map = self.lcp.grade_answers(answers) - self.attempts = self.attempts + 1 - self.lcp.done = True - self.set_state_from_lcp() - - except (StudentInputError, ResponseError, LoncapaProblemError) as inst: - log.warning("StudentInputError in capa_module:problem_check", - exc_info=True) - - # Save the user's state before failing - self.set_state_from_lcp() - - # If the user is a staff member, include - # the full exception, including traceback, - # in the response - if self.system.user_is_staff: - msg = u"Staff debug info: {tb}".format(tb=cgi.escape(traceback.format_exc())) - - # Otherwise, display just an error message, - # without a stack trace - else: - msg = u"Error: {msg}".format(msg=inst.message) - - return {'success': msg} - - except Exception as err: - # Save the user's state before failing - self.set_state_from_lcp() - - if self.system.DEBUG: - msg = u"Error checking problem: {}".format(err.message) - msg += u'\nTraceback:\n{}'.format(traceback.format_exc()) - return {'success': msg} - raise - - published_grade = self.publish_grade() - - # success = correct if ALL questions in this problem are correct - success = 'correct' - for answer_id in correct_map: - if not correct_map.is_correct(answer_id): - success = 'incorrect' - - # NOTE: We are logging both full grading and queued-grading submissions. In the latter, - # 'success' will always be incorrect - event_info['grade'] = published_grade['grade'] - event_info['max_grade'] = published_grade['max_grade'] - event_info['correct_map'] = correct_map.get_dict() - event_info['success'] = success - event_info['attempts'] = self.attempts - self.system.track_function('problem_check', event_info) - - if hasattr(self.system, 'psychometrics_handler'): # update PsychometricsData using callback - self.system.psychometrics_handler(self.get_state_for_lcp()) - - # render problem into HTML - html = self.get_problem_html(encapsulate=False) - - return {'success': success, - 'contents': html, - } - - def rescore_problem(self): - """ - Checks whether the existing answers to a problem are correct. - - This is called when the correct answer to a problem has been changed, - and the grade should be re-evaluated. - - Returns a dict with one key: - {'success' : 'correct' | 'incorrect' | AJAX alert msg string } - - Raises NotFoundError if called on a problem that has not yet been - answered, or NotImplementedError if it's a problem that cannot be rescored. - - Returns the error messages for exceptions occurring while performing - the rescoring, rather than throwing them. - """ - event_info = {'state': self.lcp.get_state(), 'problem_id': self.location.url()} - - if not self.lcp.supports_rescoring(): - event_info['failure'] = 'unsupported' - self.system.track_function('problem_rescore_fail', event_info) - raise NotImplementedError("Problem's definition does not support rescoring") - - if not self.done: - event_info['failure'] = 'unanswered' - self.system.track_function('problem_rescore_fail', event_info) - raise NotFoundError('Problem must be answered before it can be graded again') - - # get old score, for comparison: - orig_score = self.lcp.get_score() - event_info['orig_score'] = orig_score['score'] - event_info['orig_total'] = orig_score['total'] - - try: - correct_map = self.lcp.rescore_existing_answers() - - except (StudentInputError, ResponseError, LoncapaProblemError) as inst: - log.warning("Input error in capa_module:problem_rescore", exc_info=True) - event_info['failure'] = 'input_error' - self.system.track_function('problem_rescore_fail', event_info) - return {'success': u"Error: {0}".format(inst.message)} - - except Exception as err: - event_info['failure'] = 'unexpected' - self.system.track_function('problem_rescore_fail', event_info) - if self.system.DEBUG: - msg = u"Error checking problem: {0}".format(err.message) - msg += u'\nTraceback:\n' + traceback.format_exc() - return {'success': msg} - raise - - # rescoring should have no effect on attempts, so don't - # need to increment here, or mark done. Just save. - self.set_state_from_lcp() - - self.publish_grade() - - new_score = self.lcp.get_score() - event_info['new_score'] = new_score['score'] - event_info['new_total'] = new_score['total'] - - # success = correct if ALL questions in this problem are correct - success = 'correct' - for answer_id in correct_map: - if not correct_map.is_correct(answer_id): - success = 'incorrect' - - # NOTE: We are logging both full grading and queued-grading submissions. In the latter, - # 'success' will always be incorrect - event_info['correct_map'] = correct_map.get_dict() - event_info['success'] = success - event_info['attempts'] = self.attempts - self.system.track_function('problem_rescore', event_info) - - # psychometrics should be called on rescoring requests in the same way as check-problem - if hasattr(self.system, 'psychometrics_handler'): # update PsychometricsData using callback - self.system.psychometrics_handler(self.get_state_for_lcp()) - - return {'success': success} - - def save_problem(self, data): - """ - Save the passed in answers. - Returns a dict { 'success' : bool, 'msg' : message } - The message is informative on success, and an error message on failure. - """ - event_info = dict() - event_info['state'] = self.lcp.get_state() - event_info['problem_id'] = self.location.url() - - answers = self.make_dict_of_responses(data) - event_info['answers'] = answers - - # Too late. Cannot submit - if self.closed() and not self.max_attempts == 0: - event_info['failure'] = 'closed' - self.system.track_function('save_problem_fail', event_info) - return {'success': False, - 'msg': "Problem is closed"} - - # Problem submitted. Student should reset before saving - # again. - if self.done and self.rerandomize == "always": - event_info['failure'] = 'done' - self.system.track_function('save_problem_fail', event_info) - return {'success': False, - 'msg': "Problem needs to be reset prior to save"} - - self.lcp.student_answers = answers - - self.set_state_from_lcp() - - self.system.track_function('save_problem_success', event_info) - msg = "Your answers have been saved" - if not self.max_attempts == 0: - msg += " but not graded. Hit 'Check' to grade them." - return {'success': True, - 'msg': msg} - - def reset_problem(self, _data): - """ - Changes problem state to unfinished -- removes student answers, - and causes problem to rerender itself. - - Returns a dictionary of the form: - {'success': True/False, - 'html': Problem HTML string } - - If an error occurs, the dictionary will also have an - `error` key containing an error message. - """ - event_info = dict() - event_info['old_state'] = self.lcp.get_state() - event_info['problem_id'] = self.location.url() - - if self.closed(): - event_info['failure'] = 'closed' - self.system.track_function('reset_problem_fail', event_info) - return {'success': False, - 'error': "Problem is closed"} - - if not self.done: - event_info['failure'] = 'not_done' - self.system.track_function('reset_problem_fail', event_info) - return {'success': False, - 'error': "Refresh the page and make an attempt before resetting."} - - if self.rerandomize in ["always", "onreset"]: - # Reset random number generator seed. - self.choose_new_seed() - - # Generate a new problem with either the previous seed or a new seed - self.lcp = self.new_lcp(None) - - # Pull in the new problem seed - self.set_state_from_lcp() - - event_info['new_state'] = self.lcp.get_state() - self.system.track_function('reset_problem', event_info) - - return {'success': True, - 'html': self.get_problem_html(encapsulate=False)} - class CapaDescriptor(CapaFields, RawDescriptor): """ diff --git a/common/lib/xmodule/xmodule/tests/test_capa_module.py b/common/lib/xmodule/xmodule/tests/test_capa_module.py index a76dc4eba7..6bb82f3d47 100644 --- a/common/lib/xmodule/xmodule/tests/test_capa_module.py +++ b/common/lib/xmodule/xmodule/tests/test_capa_module.py @@ -1335,8 +1335,8 @@ class CapaModuleTest(unittest.TestCase): module = CapaFactory.create(rerandomize=rerandomize) assert 0 <= module.seed < 1000 - @patch('xmodule.capa_module.log') - @patch('xmodule.capa_module.Progress') + @patch('xmodule.capa_base.log') + @patch('xmodule.capa_base.Progress') def test_get_progress_error(self, mock_progress, mock_log): """ Check that an exception given in `Progress` produces a `log.exception` call. @@ -1349,7 +1349,7 @@ class CapaModuleTest(unittest.TestCase): mock_log.exception.assert_called_once_with('Got bad progress') mock_log.reset_mock() - @patch('xmodule.capa_module.Progress') + @patch('xmodule.capa_base.Progress') def test_get_progress_calculate_progress_fraction(self, mock_progress): """ Check that score and total are calculated correctly for the progress fraction.