405 lines
17 KiB
Python
405 lines
17 KiB
Python
"""
|
|
Adds crowdsourced hinting functionality to lon-capa numerical response problems.
|
|
|
|
Currently experimental - not for instructor use, yet.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import random
|
|
import copy
|
|
|
|
from pkg_resources import resource_string
|
|
|
|
from lxml import etree
|
|
|
|
from xmodule.x_module import XModule, STUDENT_VIEW
|
|
from xmodule.raw_module import RawDescriptor
|
|
from xblock.fields import Scope, String, Integer, Boolean, Dict, List
|
|
|
|
from capa.responsetypes import FormulaResponse
|
|
|
|
from django.utils.html import escape
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class CrowdsourceHinterFields(object):
|
|
"""Defines fields for the crowdsource hinter module."""
|
|
has_children = True
|
|
|
|
moderate = String(help='String "True"/"False" - activates moderation', scope=Scope.content,
|
|
default='False')
|
|
debug = String(help='String "True"/"False" - allows multiple voting', scope=Scope.content,
|
|
default='False')
|
|
# Usage: hints[answer] = {str(pk): [hint_text, #votes]}
|
|
# hints is a dictionary that takes answer keys.
|
|
# Each value is itself a dictionary, accepting hint_pk strings as keys,
|
|
# and returning [hint text, #votes] pairs as values
|
|
hints = Dict(help='A dictionary containing all the active hints.', scope=Scope.content, default={})
|
|
mod_queue = Dict(help='A dictionary containing hints still awaiting approval', scope=Scope.content,
|
|
default={})
|
|
hint_pk = Integer(help='Used to index hints.', scope=Scope.content, default=0)
|
|
|
|
# A list of previous hints that a student viewed.
|
|
# Of the form [answer, [hint_pk_1, ...]] for each problem.
|
|
# Sorry about the variable name - I know it's confusing.
|
|
previous_answers = List(help='A list of hints viewed.', scope=Scope.user_state, default=[])
|
|
|
|
# user_submissions actually contains a list of previous answers submitted.
|
|
# (Originally, preivous_answers did this job, hence the name confusion.)
|
|
user_submissions = List(help='A list of previous submissions', scope=Scope.user_state, default=[])
|
|
user_voted = Boolean(help='Specifies if the user has voted on this problem or not.',
|
|
scope=Scope.user_state, default=False)
|
|
|
|
|
|
class CrowdsourceHinterModule(CrowdsourceHinterFields, XModule):
|
|
"""
|
|
An Xmodule that makes crowdsourced hints.
|
|
Currently, only works on capa problems with exactly one numerical response,
|
|
and no other parts.
|
|
|
|
Example usage:
|
|
<crowdsource_hinter>
|
|
<problem blah blah />
|
|
</crowdsource_hinter>
|
|
|
|
XML attributes:
|
|
-moderate="True" will not display hints until staff approve them in the hint manager.
|
|
-debug="True" will let users vote as often as they want.
|
|
"""
|
|
icon_class = 'crowdsource_hinter'
|
|
css = {'scss': [resource_string(__name__, 'css/crowdsource_hinter/display.scss')]}
|
|
js = {'coffee': [resource_string(__name__, 'js/src/crowdsource_hinter/display.coffee')],
|
|
'js': []}
|
|
js_module_name = "Hinter"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(CrowdsourceHinterModule, self).__init__(*args, **kwargs)
|
|
# We need to know whether we are working with a FormulaResponse problem.
|
|
try:
|
|
responder = self.get_display_items()[0].lcp.responders.values()[0]
|
|
except (IndexError, AttributeError):
|
|
log.exception('Unable to find a capa problem child.')
|
|
return
|
|
|
|
self.is_formula = isinstance(self, FormulaResponse)
|
|
if self.is_formula:
|
|
self.answer_to_str = self.formula_answer_to_str
|
|
else:
|
|
self.answer_to_str = self.numerical_answer_to_str
|
|
# compare_answer is expected to return whether its two inputs are close enough
|
|
# to be equal, or raise a StudentInputError if one of the inputs is malformatted.
|
|
if hasattr(responder, 'compare_answer') and hasattr(responder, 'validate_answer'):
|
|
self.compare_answer = responder.compare_answer
|
|
self.validate_answer = responder.validate_answer
|
|
else:
|
|
# This response type is not supported!
|
|
log.exception('Response type not supported for hinting: ' + str(responder))
|
|
|
|
def get_html(self):
|
|
"""
|
|
Puts a wrapper around the problem html. This wrapper includes ajax urls of the
|
|
hinter and of the problem.
|
|
- Dependent on lon-capa problem.
|
|
"""
|
|
if self.debug == 'True':
|
|
# Reset the user vote, for debugging only!
|
|
self.user_voted = False
|
|
if self.hints == {}:
|
|
# Force self.hints to be written into the database. (When an xmodule is initialized,
|
|
# fields are not added to the db until explicitly changed at least once.)
|
|
self.hints = {}
|
|
|
|
try:
|
|
child = self.get_display_items()[0]
|
|
out = child.render(STUDENT_VIEW).content
|
|
# The event listener uses the ajax url to find the child.
|
|
child_id = child.id
|
|
except IndexError:
|
|
out = u"Error in loading crowdsourced hinter - can't find child problem."
|
|
child_id = ''
|
|
|
|
# Wrap the module in a <section>. This lets us pass data attributes to the javascript.
|
|
out += u'<section class="crowdsource-wrapper" data-url="{ajax_url}" data-child-id="{child_id}"> </section>'.format(
|
|
ajax_url=self.runtime.ajax_url,
|
|
child_id=child_id
|
|
)
|
|
|
|
return out
|
|
|
|
def numerical_answer_to_str(self, answer):
|
|
"""
|
|
Converts capa numerical answer format to a string representation
|
|
of the answer.
|
|
-Lon-capa dependent.
|
|
-Assumes that the problem only has one part.
|
|
"""
|
|
return str(answer.values()[0])
|
|
|
|
def formula_answer_to_str(self, answer):
|
|
"""
|
|
Converts capa formula answer into a string.
|
|
-Lon-capa dependent.
|
|
-Assumes that the problem only has one part.
|
|
"""
|
|
return str(answer.values()[0])
|
|
|
|
def get_matching_answers(self, answer):
|
|
"""
|
|
Look in self.hints, and find all answer keys that are "equal with tolerance"
|
|
to the input answer.
|
|
"""
|
|
return [key for key in self.hints if self.compare_answer(key, answer)]
|
|
|
|
def handle_ajax(self, dispatch, data):
|
|
"""
|
|
This is the landing method for AJAX calls.
|
|
"""
|
|
if dispatch == 'get_hint':
|
|
out = self.get_hint(data)
|
|
elif dispatch == 'get_feedback':
|
|
out = self.get_feedback(data)
|
|
elif dispatch == 'vote':
|
|
out = self.tally_vote(data)
|
|
elif dispatch == 'submit_hint':
|
|
out = self.submit_hint(data)
|
|
else:
|
|
return json.dumps({'contents': 'Error - invalid operation.'})
|
|
|
|
if out is None:
|
|
out = {'op': 'empty'}
|
|
elif 'error' in out:
|
|
# Error in processing.
|
|
out.update({'op': 'error'})
|
|
else:
|
|
out.update({'op': dispatch})
|
|
return json.dumps({'contents': self.runtime.render_template('hinter_display.html', out)})
|
|
|
|
def get_hint(self, data):
|
|
"""
|
|
The student got the incorrect answer found in data. Give him a hint.
|
|
|
|
Called by hinter javascript after a problem is graded as incorrect.
|
|
Args:
|
|
`data` -- must be interpretable by answer_to_str.
|
|
Output keys:
|
|
- 'hints' is a list of hint strings to show to the user.
|
|
- 'answer' is the parsed answer that was submitted.
|
|
Will record the user's wrong answer in user_submissions, and the hints shown
|
|
in previous_answers.
|
|
"""
|
|
# First, validate our inputs.
|
|
try:
|
|
answer = self.answer_to_str(data)
|
|
except (ValueError, AttributeError):
|
|
# Sometimes, we get an answer that's just not parsable. Do nothing.
|
|
log.exception('Answer not parsable: ' + str(data))
|
|
return
|
|
if not self.validate_answer(answer):
|
|
# Answer is not in the right form.
|
|
log.exception('Answer not valid: ' + str(answer))
|
|
return
|
|
if answer not in self.user_submissions:
|
|
self.user_submissions += [answer]
|
|
|
|
# For all answers similar enough to our own, accumulate all hints together.
|
|
# Also track the original answer of each hint.
|
|
matching_answers = self.get_matching_answers(answer)
|
|
matching_hints = {}
|
|
for matching_answer in matching_answers:
|
|
temp_dict = copy.deepcopy(self.hints[matching_answer])
|
|
for key, value in temp_dict.items():
|
|
# Each value now has hint, votes, matching_answer.
|
|
temp_dict[key] = value + [matching_answer]
|
|
matching_hints.update(temp_dict)
|
|
# matching_hints now maps pk's to lists of [hint, votes, matching_answer]
|
|
|
|
# Finally, randomly choose a subset of matching_hints to actually show.
|
|
if not matching_hints:
|
|
# No hints to give. Return.
|
|
return
|
|
# Get the top hint, plus two random hints.
|
|
n_hints = len(matching_hints)
|
|
hints = []
|
|
# max(dict) returns the maximum key in dict.
|
|
# The key function takes each pk, and returns the number of votes for the
|
|
# hint with that pk.
|
|
best_hint_index = max(matching_hints, key=lambda pk: matching_hints[pk][1])
|
|
hints.append(matching_hints[best_hint_index][0])
|
|
best_hint_answer = matching_hints[best_hint_index][2]
|
|
# The brackets surrounding the index are for backwards compatability purposes.
|
|
# (It used to be that each answer was paired with multiple hints in a list.)
|
|
self.previous_answers += [[best_hint_answer, [best_hint_index]]]
|
|
for _ in xrange(min(2, n_hints - 1)):
|
|
# Keep making random hints until we hit a target, or run out.
|
|
while True:
|
|
# random.choice randomly chooses an element from its input list.
|
|
# (We then unpack the item, in this case data for a hint.)
|
|
(hint_index, (rand_hint, _, hint_answer)) =\
|
|
random.choice(matching_hints.items())
|
|
if rand_hint not in hints:
|
|
break
|
|
hints.append(rand_hint)
|
|
self.previous_answers += [[hint_answer, [hint_index]]]
|
|
return {'hints': hints,
|
|
'answer': answer}
|
|
|
|
def get_feedback(self, data):
|
|
"""
|
|
The student got it correct. Ask him to vote on hints, or submit a hint.
|
|
|
|
Args:
|
|
`data` -- not actually used. (It is assumed that the answer is correct.)
|
|
Output keys:
|
|
- 'answer_to_hints': a nested dictionary.
|
|
answer_to_hints[answer][hint_pk] returns the text of the hint.
|
|
- 'user_submissions': the same thing as self.user_submissions. A list of
|
|
the answers that the user previously submitted.
|
|
"""
|
|
# The student got it right.
|
|
# Did he submit at least one wrong answer?
|
|
if len(self.user_submissions) == 0:
|
|
# No. Nothing to do here.
|
|
return
|
|
# Make a hint-voting interface for each wrong answer. The student will only
|
|
# be allowed to make one vote / submission, but he can choose which wrong answer
|
|
# he wants to look at.
|
|
answer_to_hints = {} # answer_to_hints[answer text][hint pk] -> hint text
|
|
|
|
# Go through each previous answer, and populate index_to_hints and index_to_answer.
|
|
for i in xrange(len(self.previous_answers)):
|
|
answer, hints_offered = self.previous_answers[i]
|
|
if answer not in answer_to_hints:
|
|
answer_to_hints[answer] = {}
|
|
if answer in self.hints:
|
|
# Go through each hint, and add to index_to_hints
|
|
for hint_id in hints_offered:
|
|
if (hint_id is not None) and (hint_id not in answer_to_hints[answer]):
|
|
try:
|
|
answer_to_hints[answer][hint_id] = self.hints[answer][str(hint_id)][0]
|
|
except KeyError:
|
|
# Sometimes, the hint that a user saw will have been deleted by the instructor.
|
|
continue
|
|
return {'answer_to_hints': answer_to_hints,
|
|
'user_submissions': self.user_submissions}
|
|
|
|
def tally_vote(self, data):
|
|
"""
|
|
Tally a user's vote on his favorite hint.
|
|
|
|
Args:
|
|
`data` -- expected to have the following keys:
|
|
'answer': text of answer we're voting on
|
|
'hint': hint_pk
|
|
'pk_list': A list of [answer, pk] pairs, each of which representing a hint.
|
|
We will return a list of how many votes each hint in the list has so far.
|
|
It's up to the browser to specify which hints to return vote counts for.
|
|
|
|
Returns key 'hint_and_votes', a list of (hint_text, #votes) pairs.
|
|
"""
|
|
if self.user_voted:
|
|
return {'error': 'Sorry, but you have already voted!'}
|
|
ans = data['answer']
|
|
if not self.validate_answer(ans):
|
|
# Uh oh. Invalid answer.
|
|
log.exception('Failure in hinter tally_vote: Unable to parse answer: {ans}'.format(ans=ans))
|
|
return {'error': 'Failure in voting!'}
|
|
hint_pk = str(data['hint'])
|
|
# We use temp_dict because we need to do a direct write for the database to update.
|
|
temp_dict = self.hints
|
|
try:
|
|
temp_dict[ans][hint_pk][1] += 1
|
|
except KeyError:
|
|
log.exception('''Failure in hinter tally_vote: User voted for non-existant hint:
|
|
Answer={ans} pk={hint_pk}'''.format(ans=ans, hint_pk=hint_pk))
|
|
return {'error': 'Failure in voting!'}
|
|
self.hints = temp_dict
|
|
# Don't let the user vote again!
|
|
self.user_voted = True
|
|
|
|
# Return a list of how many votes each hint got.
|
|
pk_list = json.loads(data['pk_list'])
|
|
hint_and_votes = []
|
|
for answer, vote_pk in pk_list:
|
|
if not self.validate_answer(answer):
|
|
log.exception('In hinter tally_vote, couldn\'t parse {ans}'.format(ans=answer))
|
|
continue
|
|
try:
|
|
hint_and_votes.append(temp_dict[answer][str(vote_pk)])
|
|
except KeyError:
|
|
log.exception('In hinter tally_vote, couldn\'t find: {ans}, {vote_pk}'.format(
|
|
ans=answer, vote_pk=str(vote_pk)))
|
|
|
|
hint_and_votes.sort(key=lambda pair: pair[1], reverse=True)
|
|
# Reset self.previous_answers and user_submissions.
|
|
self.previous_answers = []
|
|
self.user_submissions = []
|
|
return {'hint_and_votes': hint_and_votes}
|
|
|
|
def submit_hint(self, data):
|
|
"""
|
|
Take a hint submission and add it to the database.
|
|
|
|
Args:
|
|
`data` -- expected to have the following keys:
|
|
'answer': text of answer
|
|
'hint': text of the new hint that the user is adding
|
|
Returns a thank-you message.
|
|
"""
|
|
# Do html escaping. Perhaps in the future do profanity filtering, etc. as well.
|
|
hint = escape(data['hint'])
|
|
answer = data['answer']
|
|
if not self.validate_answer(answer):
|
|
log.exception('Failure in hinter submit_hint: Unable to parse answer: {ans}'.format(
|
|
ans=answer))
|
|
return {'error': 'Could not submit answer'}
|
|
# Only allow a student to vote or submit a hint once.
|
|
if self.user_voted:
|
|
return {'message': 'Sorry, but you have already voted!'}
|
|
# Add the new hint to self.hints or self.mod_queue. (Awkward because a direct write
|
|
# is necessary.)
|
|
if self.moderate == 'True':
|
|
temp_dict = self.mod_queue
|
|
else:
|
|
temp_dict = self.hints
|
|
if answer in temp_dict:
|
|
temp_dict[answer][str(self.hint_pk)] = [hint, 1] # With one vote (the user himself).
|
|
else:
|
|
temp_dict[answer] = {str(self.hint_pk): [hint, 1]}
|
|
self.hint_pk += 1
|
|
if self.moderate == 'True':
|
|
self.mod_queue = temp_dict
|
|
else:
|
|
self.hints = temp_dict
|
|
# Mark the user has having voted; reset previous_answers
|
|
self.user_voted = True
|
|
self.previous_answers = []
|
|
self.user_submissions = []
|
|
return {'message': 'Thank you for your hint!'}
|
|
|
|
|
|
class CrowdsourceHinterDescriptor(CrowdsourceHinterFields, RawDescriptor):
|
|
module_class = CrowdsourceHinterModule
|
|
stores_state = True
|
|
|
|
@classmethod
|
|
def definition_from_xml(cls, xml_object, system):
|
|
children = []
|
|
for child in xml_object:
|
|
try:
|
|
child_block = system.process_xml(etree.tostring(child, encoding='unicode'))
|
|
children.append(child_block.scope_ids.usage_id)
|
|
except Exception as e:
|
|
log.exception("Unable to load child when parsing CrowdsourceHinter. Continuing...")
|
|
if system.error_tracker is not None:
|
|
system.error_tracker(u"ERROR: {0}".format(e))
|
|
continue
|
|
return {}, children
|
|
|
|
def definition_to_xml(self, resource_fs):
|
|
xml_object = etree.Element('crowdsource_hinter')
|
|
for child in self.get_children():
|
|
self.runtime.add_block_as_child_node(child, xml_object)
|
|
return xml_object
|