757 lines
29 KiB
Python
757 lines
29 KiB
Python
import logging
|
|
from cStringIO import StringIO
|
|
from math import exp, erf
|
|
from lxml import etree
|
|
from path import path # NOTE (THK): Only used for detecting presence of syllabus
|
|
import requests
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from xmodule.modulestore import Location
|
|
from xmodule.seq_module import SequenceDescriptor, SequenceModule
|
|
from xmodule.timeparse import parse_time
|
|
from xmodule.util.decorators import lazyproperty
|
|
from xmodule.graders import grader_from_conf
|
|
from datetime import datetime
|
|
import json
|
|
import logging
|
|
import requests
|
|
import time
|
|
import copy
|
|
|
|
from xblock.core import Scope, ModelType, List, String, Object, Boolean
|
|
from .fields import Date
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StringOrDate(Date):
|
|
def from_json(self, value):
|
|
"""
|
|
Parse an optional metadata key containing a time: if present, complain
|
|
if it doesn't parse.
|
|
Return None if not present or invalid.
|
|
"""
|
|
if value is None:
|
|
return None
|
|
|
|
try:
|
|
return time.strptime(value, self.time_format)
|
|
except ValueError:
|
|
return value
|
|
|
|
def to_json(self, value):
|
|
"""
|
|
Convert a time struct to a string
|
|
"""
|
|
if value is None:
|
|
return None
|
|
|
|
try:
|
|
return time.strftime(self.time_format, value)
|
|
except (ValueError, TypeError):
|
|
return value
|
|
|
|
|
|
|
|
edx_xml_parser = etree.XMLParser(dtd_validation=False, load_dtd=False,
|
|
remove_comments=True, remove_blank_text=True)
|
|
|
|
_cached_toc = {}
|
|
|
|
class Textbook(object):
|
|
def __init__(self, title, book_url):
|
|
self.title = title
|
|
self.book_url = book_url
|
|
self.start_page = int(self.table_of_contents[0].attrib['page'])
|
|
|
|
# The last page should be the last element in the table of contents,
|
|
# but it may be nested. So recurse all the way down the last element
|
|
last_el = self.table_of_contents[-1]
|
|
while last_el.getchildren():
|
|
last_el = last_el[-1]
|
|
|
|
self.end_page = int(last_el.attrib['page'])
|
|
|
|
@lazyproperty
|
|
def table_of_contents(self):
|
|
"""
|
|
Accesses the textbook's table of contents (default name "toc.xml") at the URL self.book_url
|
|
|
|
Returns XML tree representation of the table of contents
|
|
"""
|
|
toc_url = self.book_url + 'toc.xml'
|
|
|
|
# cdodge: I've added this caching of TOC because in Mongo-backed instances (but not Filesystem stores)
|
|
# course modules have a very short lifespan and are constantly being created and torn down.
|
|
# Since this module in the __init__() method does a synchronous call to AWS to get the TOC
|
|
# this is causing a big performance problem. So let's be a bit smarter about this and cache
|
|
# each fetch and store in-mem for 10 minutes.
|
|
# NOTE: I have to get this onto sandbox ASAP as we're having runtime failures. I'd like to swing back and
|
|
# rewrite to use the traditional Django in-memory cache.
|
|
try:
|
|
# see if we already fetched this
|
|
if toc_url in _cached_toc:
|
|
(table_of_contents, timestamp) = _cached_toc[toc_url]
|
|
age = datetime.now() - timestamp
|
|
# expire every 10 minutes
|
|
if age.seconds < 600:
|
|
return table_of_contents
|
|
except Exception as err:
|
|
pass
|
|
|
|
# Get the table of contents from S3
|
|
log.info("Retrieving textbook table of contents from %s" % toc_url)
|
|
try:
|
|
r = requests.get(toc_url)
|
|
except Exception as err:
|
|
msg = 'Error %s: Unable to retrieve textbook table of contents at %s' % (err, toc_url)
|
|
log.error(msg)
|
|
raise Exception(msg)
|
|
|
|
# TOC is XML. Parse it
|
|
try:
|
|
table_of_contents = etree.fromstring(r.text)
|
|
except Exception as err:
|
|
msg = 'Error %s: Unable to parse XML for textbook table of contents at %s' % (err, toc_url)
|
|
log.error(msg)
|
|
raise Exception(msg)
|
|
|
|
return table_of_contents
|
|
|
|
|
|
class TextbookList(List):
|
|
def from_json(self, values):
|
|
textbooks = []
|
|
for title, book_url in values:
|
|
try:
|
|
textbooks.append(Textbook(title, book_url))
|
|
except:
|
|
# If we can't get to S3 (e.g. on a train with no internet), don't break
|
|
# the rest of the courseware.
|
|
log.exception("Couldn't load textbook ({0}, {1})".format(title, book_url))
|
|
continue
|
|
|
|
return textbooks
|
|
|
|
def to_json(self, values):
|
|
json_data = []
|
|
for val in values:
|
|
if isinstance(val, Textbook):
|
|
json_data.append((val.title, val.book_url))
|
|
elif isinstance(val, tuple):
|
|
json_data.append(val)
|
|
else:
|
|
continue
|
|
return json_data
|
|
|
|
|
|
class CourseFields(object):
|
|
textbooks = TextbookList(help="List of pairs of (title, url) for textbooks used in this course", scope=Scope.content)
|
|
wiki_slug = String(help="Slug that points to the wiki for this course", scope=Scope.content)
|
|
enrollment_start = Date(help="Date that enrollment for this class is opened", scope=Scope.settings)
|
|
enrollment_end = Date(help="Date that enrollment for this class is closed", scope=Scope.settings)
|
|
start = Date(help="Start time when this module is visible", scope=Scope.settings)
|
|
end = Date(help="Date that this class ends", scope=Scope.settings)
|
|
advertised_start = StringOrDate(help="Date that this course is advertised to start", scope=Scope.settings)
|
|
grading_policy = Object(help="Grading policy definition for this class", scope=Scope.content)
|
|
show_calculator = Boolean(help="Whether to show the calculator in this course", default=False, scope=Scope.settings)
|
|
display_name = String(help="Display name for this module", scope=Scope.settings)
|
|
tabs = List(help="List of tabs to enable in this course", scope=Scope.settings)
|
|
end_of_course_survey_url = String(help="Url for the end-of-course survey", scope=Scope.settings)
|
|
discussion_blackouts = List(help="List of pairs of start/end dates for discussion blackouts", scope=Scope.settings)
|
|
discussion_topics = Object(
|
|
help="Map of topics names to ids",
|
|
scope=Scope.settings,
|
|
computed_default=lambda c: {'General': {'id': c.location.html_id()}},
|
|
)
|
|
testcenter_info = Object(help="Dictionary of Test Center info", scope=Scope.settings)
|
|
announcement = Date(help="Date this course is announced", scope=Scope.settings)
|
|
cohort_config = Object(help="Dictionary defining cohort configuration", scope=Scope.settings)
|
|
is_new = Boolean(help="Whether this course should be flagged as new", scope=Scope.settings)
|
|
no_grade = Boolean(help="True if this course isn't graded", default=False, scope=Scope.settings)
|
|
disable_progress_graph = Boolean(help="True if this course shouldn't display the progress graph", default=False, scope=Scope.settings)
|
|
pdf_textbooks = List(help="List of dictionaries containing pdf_textbook configuration", scope=Scope.settings)
|
|
html_textbooks = List(help="List of dictionaries containing html_textbook configuration", scope=Scope.settings)
|
|
remote_gradebook = Object(scope=Scope.settings)
|
|
allow_anonymous = Boolean(scope=Scope.settings, default=True)
|
|
allow_anonymous_to_peers = Boolean(scope=Scope.settings, default=False)
|
|
advanced_modules = List(help="Beta modules used in your course", scope=Scope.settings)
|
|
has_children = True
|
|
|
|
info_sidebar_name = String(scope=Scope.settings, default='Course Handouts')
|
|
|
|
# An extra property is used rather than the wiki_slug/number because
|
|
# there are courses that change the number for different runs. This allows
|
|
# courses to share the same css_class across runs even if they have
|
|
# different numbers.
|
|
#
|
|
# TODO get rid of this as soon as possible or potentially build in a robust
|
|
# way to add in course-specific styling. There needs to be a discussion
|
|
# about the right way to do this, but arjun will address this ASAP. Also
|
|
# note that the courseware template needs to change when this is removed.
|
|
css_class = String(help="DO NOT USE THIS", scope=Scope.settings)
|
|
|
|
# TODO: This is a quick kludge to allow CS50 (and other courses) to
|
|
# specify their own discussion forums as external links by specifying a
|
|
# "discussion_link" in their policy JSON file. This should later get
|
|
# folded in with Syllabus, Course Info, and additional Custom tabs in a
|
|
# more sensible framework later.
|
|
discussion_link = String(help="DO NOT USE THIS", scope=Scope.settings)
|
|
|
|
# TODO: same as above, intended to let internal CS50 hide the progress tab
|
|
# until we get grade integration set up.
|
|
# Explicit comparison to True because we always want to return a bool.
|
|
hide_progress_tab = Boolean(help="DO NOT USE THIS", scope=Scope.settings)
|
|
|
|
|
|
class CourseDescriptor(CourseFields, SequenceDescriptor):
|
|
module_class = SequenceModule
|
|
|
|
template_dir_name = 'course'
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(CourseDescriptor, self).__init__(*args, **kwargs)
|
|
|
|
if self.wiki_slug is None:
|
|
self.wiki_slug = self.location.course
|
|
|
|
msg = None
|
|
if self.start is None:
|
|
msg = "Course loaded without a valid start date. id = %s" % self.id
|
|
# hack it -- start in 1970
|
|
self.start = time.gmtime(0)
|
|
log.critical(msg)
|
|
self.system.error_tracker(msg)
|
|
|
|
# NOTE: relies on the modulestore to call set_grading_policy() right after
|
|
# init. (Modulestore is in charge of figuring out where to load the policy from)
|
|
|
|
# NOTE (THK): This is a last-minute addition for Fall 2012 launch to dynamically
|
|
# disable the syllabus content for courses that do not provide a syllabus
|
|
self.syllabus_present = self.system.resources_fs.exists(path('syllabus'))
|
|
self._grading_policy = {}
|
|
self.set_grading_policy(self.grading_policy)
|
|
|
|
self.test_center_exams = []
|
|
test_center_info = self.testcenter_info
|
|
if test_center_info is not None:
|
|
for exam_name in test_center_info:
|
|
try:
|
|
exam_info = test_center_info[exam_name]
|
|
self.test_center_exams.append(self.TestCenterExam(self.id, exam_name, exam_info))
|
|
except Exception as err:
|
|
# If we can't parse the test center exam info, don't break
|
|
# the rest of the courseware.
|
|
msg = 'Error %s: Unable to load test-center exam info for exam "%s" of course "%s"' % (err, exam_name, self.id)
|
|
log.error(msg)
|
|
continue
|
|
|
|
def default_grading_policy(self):
|
|
"""
|
|
Return a dict which is a copy of the default grading policy
|
|
"""
|
|
return {"GRADER": [
|
|
{
|
|
"type": "Homework",
|
|
"min_count": 12,
|
|
"drop_count": 2,
|
|
"short_label": "HW",
|
|
"weight": 0.15
|
|
},
|
|
{
|
|
"type": "Lab",
|
|
"min_count": 12,
|
|
"drop_count": 2,
|
|
"weight": 0.15
|
|
},
|
|
{
|
|
"type": "Midterm Exam",
|
|
"short_label": "Midterm",
|
|
"min_count": 1,
|
|
"drop_count": 0,
|
|
"weight": 0.3
|
|
},
|
|
{
|
|
"type": "Final Exam",
|
|
"short_label": "Final",
|
|
"min_count": 1,
|
|
"drop_count": 0,
|
|
"weight": 0.4
|
|
}
|
|
],
|
|
"GRADE_CUTOFFS": {
|
|
"Pass": 0.5
|
|
}}
|
|
|
|
def set_grading_policy(self, course_policy):
|
|
"""
|
|
The JSON object can have the keys GRADER and GRADE_CUTOFFS. If either is
|
|
missing, it reverts to the default.
|
|
"""
|
|
if course_policy is None:
|
|
course_policy = {}
|
|
|
|
# Load the global settings as a dictionary
|
|
grading_policy = self.default_grading_policy()
|
|
|
|
# Override any global settings with the course settings
|
|
grading_policy.update(course_policy)
|
|
|
|
# Here is where we should parse any configurations, so that we can fail early
|
|
# Use setters so that side effecting to .definitions works
|
|
self.raw_grader = grading_policy['GRADER'] # used for cms access
|
|
self.grade_cutoffs = grading_policy['GRADE_CUTOFFS']
|
|
|
|
@classmethod
|
|
def read_grading_policy(cls, paths, system):
|
|
"""Load a grading policy from the specified paths, in order, if it exists."""
|
|
# Default to a blank policy dict
|
|
policy_str = '{}'
|
|
|
|
for policy_path in paths:
|
|
if not system.resources_fs.exists(policy_path):
|
|
continue
|
|
log.debug("Loading grading policy from {0}".format(policy_path))
|
|
try:
|
|
with system.resources_fs.open(policy_path) as grading_policy_file:
|
|
policy_str = grading_policy_file.read()
|
|
# if we successfully read the file, stop looking at backups
|
|
break
|
|
except (IOError):
|
|
msg = "Unable to load course settings file from '{0}'".format(policy_path)
|
|
log.warning(msg)
|
|
|
|
return policy_str
|
|
|
|
@classmethod
|
|
def from_xml(cls, xml_data, system, org=None, course=None):
|
|
instance = super(CourseDescriptor, cls).from_xml(xml_data, system, org, course)
|
|
|
|
# bleh, have to parse the XML here to just pull out the url_name attribute
|
|
# I don't think it's stored anywhere in the instance.
|
|
course_file = StringIO(xml_data.encode('ascii', 'ignore'))
|
|
xml_obj = etree.parse(course_file, parser=edx_xml_parser).getroot()
|
|
|
|
policy_dir = None
|
|
url_name = xml_obj.get('url_name', xml_obj.get('slug'))
|
|
if url_name:
|
|
policy_dir = 'policies/' + url_name
|
|
|
|
# Try to load grading policy
|
|
paths = ['grading_policy.json']
|
|
if policy_dir:
|
|
paths = [policy_dir + '/grading_policy.json'] + paths
|
|
|
|
try:
|
|
policy = json.loads(cls.read_grading_policy(paths, system))
|
|
except ValueError:
|
|
system.error_tracker("Unable to decode grading policy as json")
|
|
policy = {}
|
|
|
|
# cdodge: import the grading policy information that is on disk and put into the
|
|
# descriptor 'definition' bucket as a dictionary so that it is persisted in the DB
|
|
instance.grading_policy = policy
|
|
|
|
# now set the current instance. set_grading_policy() will apply some inheritance rules
|
|
instance.set_grading_policy(policy)
|
|
|
|
return instance
|
|
|
|
@classmethod
|
|
def definition_from_xml(cls, xml_object, system):
|
|
textbooks = []
|
|
for textbook in xml_object.findall("textbook"):
|
|
textbooks.append((textbook.get('title'), textbook.get('book_url')))
|
|
xml_object.remove(textbook)
|
|
|
|
#Load the wiki tag if it exists
|
|
wiki_slug = None
|
|
wiki_tag = xml_object.find("wiki")
|
|
if wiki_tag is not None:
|
|
wiki_slug = wiki_tag.attrib.get("slug", default=None)
|
|
xml_object.remove(wiki_tag)
|
|
|
|
definition, children = super(CourseDescriptor, cls).definition_from_xml(xml_object, system)
|
|
|
|
definition['textbooks'] = textbooks
|
|
definition['wiki_slug'] = wiki_slug
|
|
|
|
return definition, children
|
|
|
|
def has_ended(self):
|
|
"""
|
|
Returns True if the current time is after the specified course end date.
|
|
Returns False if there is no end date specified.
|
|
"""
|
|
if self.end is None:
|
|
return False
|
|
|
|
return time.gmtime() > self.end
|
|
|
|
def has_started(self):
|
|
return time.gmtime() > self.start
|
|
|
|
@property
|
|
def grader(self):
|
|
return grader_from_conf(self.raw_grader)
|
|
|
|
@property
|
|
def raw_grader(self):
|
|
return self._grading_policy['RAW_GRADER']
|
|
|
|
@raw_grader.setter
|
|
def raw_grader(self, value):
|
|
# NOTE WELL: this change will not update the processed graders. If we need that, this needs to call grader_from_conf
|
|
self._grading_policy['RAW_GRADER'] = value
|
|
self.grading_policy['GRADER'] = value
|
|
|
|
@property
|
|
def grade_cutoffs(self):
|
|
return self._grading_policy['GRADE_CUTOFFS']
|
|
|
|
@grade_cutoffs.setter
|
|
def grade_cutoffs(self, value):
|
|
self._grading_policy['GRADE_CUTOFFS'] = value
|
|
|
|
# XBlock fields don't update after mutation
|
|
policy = self.grading_policy
|
|
policy['GRADE_CUTOFFS'] = value
|
|
self.grading_policy = policy
|
|
|
|
|
|
@property
|
|
def lowest_passing_grade(self):
|
|
return min(self._grading_policy['GRADE_CUTOFFS'].values())
|
|
|
|
@property
|
|
def is_cohorted(self):
|
|
"""
|
|
Return whether the course is cohorted.
|
|
"""
|
|
config = self.cohort_config
|
|
if config is None:
|
|
return False
|
|
|
|
return bool(config.get("cohorted"))
|
|
|
|
@property
|
|
def auto_cohort(self):
|
|
"""
|
|
Return whether the course is auto-cohorted.
|
|
"""
|
|
if not self.is_cohorted:
|
|
return False
|
|
|
|
return bool(self.cohort_config.get(
|
|
"auto_cohort", False))
|
|
|
|
@property
|
|
def auto_cohort_groups(self):
|
|
"""
|
|
Return the list of groups to put students into. Returns [] if not
|
|
specified. Returns specified list even if is_cohorted and/or auto_cohort are
|
|
false.
|
|
"""
|
|
if self.cohort_config is None:
|
|
return []
|
|
else:
|
|
return self.cohort_config.get("auto_cohort_groups", [])
|
|
|
|
|
|
@property
|
|
def top_level_discussion_topic_ids(self):
|
|
"""
|
|
Return list of topic ids defined in course policy.
|
|
"""
|
|
topics = self.discussion_topics
|
|
return [d["id"] for d in topics.values()]
|
|
|
|
|
|
@property
|
|
def cohorted_discussions(self):
|
|
"""
|
|
Return the set of discussions that is explicitly cohorted. It may be
|
|
the empty set. Note that all inline discussions are automatically
|
|
cohorted based on the course's is_cohorted setting.
|
|
"""
|
|
config = self.cohort_config
|
|
if config is None:
|
|
return set()
|
|
|
|
return set(config.get("cohorted_discussions", []))
|
|
|
|
|
|
|
|
@property
|
|
def is_newish(self):
|
|
"""
|
|
Returns if the course has been flagged as new. If
|
|
there is no flag, return a heuristic value considering the
|
|
announcement and the start dates.
|
|
"""
|
|
flag = self.is_new
|
|
if flag is None:
|
|
# Use a heuristic if the course has not been flagged
|
|
announcement, start, now = self._sorting_dates()
|
|
if announcement and (now - announcement).days < 30:
|
|
# The course has been announced for less that month
|
|
return True
|
|
elif (now - start).days < 1:
|
|
# The course has not started yet
|
|
return True
|
|
else:
|
|
return False
|
|
elif isinstance(flag, basestring):
|
|
return flag.lower() in ['true', 'yes', 'y']
|
|
else:
|
|
return bool(flag)
|
|
|
|
@property
|
|
def sorting_score(self):
|
|
"""
|
|
Returns a tuple that can be used to sort the courses according
|
|
the how "new" they are. The "newness" score is computed using a
|
|
heuristic that takes into account the announcement and
|
|
(advertized) start dates of the course if available.
|
|
|
|
The lower the number the "newer" the course.
|
|
"""
|
|
# Make courses that have an announcement date shave a lower
|
|
# score than courses than don't, older courses should have a
|
|
# higher score.
|
|
announcement, start, now = self._sorting_dates()
|
|
scale = 300.0 # about a year
|
|
if announcement:
|
|
days = (now - announcement).days
|
|
score = -exp(-days / scale)
|
|
else:
|
|
days = (now - start).days
|
|
score = exp(days / scale)
|
|
return score
|
|
|
|
def _sorting_dates(self):
|
|
# utility function to get datetime objects for dates used to
|
|
# compute the is_new flag and the sorting_score
|
|
def to_datetime(timestamp):
|
|
return datetime(*timestamp[:6])
|
|
|
|
announcement = self.announcement
|
|
if announcement is not None:
|
|
announcement = to_datetime(announcement)
|
|
if self.advertised_start is None or isinstance(self.advertised_start, basestring):
|
|
start = to_datetime(self.start)
|
|
else:
|
|
start = to_datetime(self.advertised_start)
|
|
now = to_datetime(time.gmtime())
|
|
|
|
return announcement, start, now
|
|
|
|
@lazyproperty
|
|
def grading_context(self):
|
|
"""
|
|
This returns a dictionary with keys necessary for quickly grading
|
|
a student. They are used by grades.grade()
|
|
|
|
The grading context has two keys:
|
|
graded_sections - This contains the sections that are graded, as
|
|
well as all possible children modules that can affect the
|
|
grading. This allows some sections to be skipped if the student
|
|
hasn't seen any part of it.
|
|
|
|
The format is a dictionary keyed by section-type. The values are
|
|
arrays of dictionaries containing
|
|
"section_descriptor" : The section descriptor
|
|
"xmoduledescriptors" : An array of xmoduledescriptors that
|
|
could possibly be in the section, for any student
|
|
|
|
all_descriptors - This contains a list of all xmodules that can
|
|
effect grading a student. This is used to efficiently fetch
|
|
all the xmodule state for a ModelDataCache without walking
|
|
the descriptor tree again.
|
|
|
|
|
|
"""
|
|
|
|
all_descriptors = []
|
|
graded_sections = {}
|
|
|
|
def yield_descriptor_descendents(module_descriptor):
|
|
for child in module_descriptor.get_children():
|
|
yield child
|
|
for module_descriptor in yield_descriptor_descendents(child):
|
|
yield module_descriptor
|
|
|
|
for c in self.get_children():
|
|
sections = []
|
|
for s in c.get_children():
|
|
if s.lms.graded:
|
|
xmoduledescriptors = list(yield_descriptor_descendents(s))
|
|
xmoduledescriptors.append(s)
|
|
|
|
# The xmoduledescriptors included here are only the ones that have scores.
|
|
section_description = {'section_descriptor': s, 'xmoduledescriptors': filter(lambda child: child.has_score, xmoduledescriptors)}
|
|
|
|
section_format = s.lms.format if s.lms.format is not None else ''
|
|
graded_sections[section_format] = graded_sections.get(section_format, []) + [section_description]
|
|
|
|
all_descriptors.extend(xmoduledescriptors)
|
|
all_descriptors.append(s)
|
|
|
|
return {'graded_sections': graded_sections,
|
|
'all_descriptors': all_descriptors, }
|
|
|
|
|
|
@staticmethod
|
|
def make_id(org, course, url_name):
|
|
return '/'.join([org, course, url_name])
|
|
|
|
@staticmethod
|
|
def id_to_location(course_id):
|
|
'''Convert the given course_id (org/course/name) to a location object.
|
|
Throws ValueError if course_id is of the wrong format.
|
|
'''
|
|
org, course, name = course_id.split('/')
|
|
return Location('i4x', org, course, 'course', name)
|
|
|
|
@staticmethod
|
|
def location_to_id(location):
|
|
'''Convert a location of a course to a course_id. If location category
|
|
is not "course", raise a ValueError.
|
|
|
|
location: something that can be passed to Location
|
|
'''
|
|
loc = Location(location)
|
|
if loc.category != "course":
|
|
raise ValueError("{0} is not a course location".format(loc))
|
|
return "/".join([loc.org, loc.course, loc.name])
|
|
|
|
@property
|
|
def id(self):
|
|
"""Return the course_id for this course"""
|
|
return self.location_to_id(self.location)
|
|
|
|
@property
|
|
def start_date_text(self):
|
|
if isinstance(self.advertised_start, basestring):
|
|
return self.advertised_start
|
|
elif self.advertised_start is None and self.start is None:
|
|
return 'TBD'
|
|
else:
|
|
return time.strftime("%b %d, %Y", self.advertised_start or self.start)
|
|
|
|
@property
|
|
def end_date_text(self):
|
|
return time.strftime("%b %d, %Y", self.end)
|
|
|
|
@property
|
|
def forum_posts_allowed(self):
|
|
try:
|
|
blackout_periods = [(parse_time(start), parse_time(end))
|
|
for start, end
|
|
in self.discussion_blackouts]
|
|
now = time.gmtime()
|
|
for start, end in blackout_periods:
|
|
if start <= now <= end:
|
|
return False
|
|
except:
|
|
log.exception("Error parsing discussion_blackouts for course {0}".format(self.id))
|
|
|
|
return True
|
|
|
|
class TestCenterExam(object):
|
|
def __init__(self, course_id, exam_name, exam_info):
|
|
self.course_id = course_id
|
|
self.exam_name = exam_name
|
|
self.exam_info = exam_info
|
|
self.exam_series_code = exam_info.get('Exam_Series_Code') or exam_name
|
|
self.display_name = exam_info.get('Exam_Display_Name') or self.exam_series_code
|
|
self.first_eligible_appointment_date = self._try_parse_time('First_Eligible_Appointment_Date')
|
|
if self.first_eligible_appointment_date is None:
|
|
raise ValueError("First appointment date must be specified")
|
|
# TODO: If defaulting the last appointment date, it should be the
|
|
# *end* of the same day, not the same time. It's going to be used as the
|
|
# end of the exam overall, so we don't want the exam to disappear too soon.
|
|
# It's also used optionally as the registration end date, so time matters there too.
|
|
self.last_eligible_appointment_date = self._try_parse_time('Last_Eligible_Appointment_Date') # or self.first_eligible_appointment_date
|
|
if self.last_eligible_appointment_date is None:
|
|
raise ValueError("Last appointment date must be specified")
|
|
self.registration_start_date = self._try_parse_time('Registration_Start_Date') or time.gmtime(0)
|
|
self.registration_end_date = self._try_parse_time('Registration_End_Date') or self.last_eligible_appointment_date
|
|
# do validation within the exam info:
|
|
if self.registration_start_date > self.registration_end_date:
|
|
raise ValueError("Registration start date must be before registration end date")
|
|
if self.first_eligible_appointment_date > self.last_eligible_appointment_date:
|
|
raise ValueError("First appointment date must be before last appointment date")
|
|
if self.registration_end_date > self.last_eligible_appointment_date:
|
|
raise ValueError("Registration end date must be before last appointment date")
|
|
self.exam_url = exam_info.get('Exam_URL')
|
|
|
|
def _try_parse_time(self, key):
|
|
"""
|
|
Parse an optional metadata key containing a time: if present, complain
|
|
if it doesn't parse.
|
|
Return None if not present or invalid.
|
|
"""
|
|
if key in self.exam_info:
|
|
try:
|
|
return parse_time(self.exam_info[key])
|
|
except ValueError as e:
|
|
msg = "Exam {0} in course {1} loaded with a bad exam_info key '{2}': '{3}'".format(self.exam_name, self.course_id, self.exam_info[key], e)
|
|
log.warning(msg)
|
|
return None
|
|
|
|
def has_started(self):
|
|
return time.gmtime() > self.first_eligible_appointment_date
|
|
|
|
def has_ended(self):
|
|
return time.gmtime() > self.last_eligible_appointment_date
|
|
|
|
def has_started_registration(self):
|
|
return time.gmtime() > self.registration_start_date
|
|
|
|
def has_ended_registration(self):
|
|
return time.gmtime() > self.registration_end_date
|
|
|
|
def is_registering(self):
|
|
now = time.gmtime()
|
|
return now >= self.registration_start_date and now <= self.registration_end_date
|
|
|
|
@property
|
|
def first_eligible_appointment_date_text(self):
|
|
return time.strftime("%b %d, %Y", self.first_eligible_appointment_date)
|
|
|
|
@property
|
|
def last_eligible_appointment_date_text(self):
|
|
return time.strftime("%b %d, %Y", self.last_eligible_appointment_date)
|
|
|
|
@property
|
|
def registration_end_date_text(self):
|
|
return time.strftime("%b %d, %Y at %H:%M UTC", self.registration_end_date)
|
|
|
|
@property
|
|
def current_test_center_exam(self):
|
|
exams = [exam for exam in self.test_center_exams if exam.has_started_registration() and not exam.has_ended()]
|
|
if len(exams) > 1:
|
|
# TODO: output some kind of warning. This should already be
|
|
# caught if we decide to do validation at load time.
|
|
return exams[0]
|
|
elif len(exams) == 1:
|
|
return exams[0]
|
|
else:
|
|
return None
|
|
|
|
def get_test_center_exam(self, exam_series_code):
|
|
exams = [exam for exam in self.test_center_exams if exam.exam_series_code == exam_series_code]
|
|
return exams[0] if len(exams) == 1 else None
|
|
|
|
@property
|
|
def number(self):
|
|
return self.location.course
|
|
|
|
@property
|
|
def org(self):
|
|
return self.location.org
|