Merge pull request #13706 from edx/jeskew/move_embargo_out_of_common

Move embargo from common to openedx/core/djangoapps.
This commit is contained in:
John Eskew
2016-11-08 14:38:24 -05:00
committed by GitHub
39 changed files with 77 additions and 58 deletions

View File

@@ -20,7 +20,7 @@ from xmodule.modulestore.tests.factories import CourseFactory
from course_modes.models import CourseMode, Mode
from course_modes.tests.factories import CourseModeFactory
from embargo.test_utils import restrict_course
from openedx.core.djangoapps.embargo.test_utils import restrict_course
from student.models import CourseEnrollment
from student.tests.factories import CourseEnrollmentFactory, UserFactory
from util.testing import UrlResetMixin
@@ -409,7 +409,7 @@ class CourseModeViewTest(UrlResetMixin, ModuleStoreTestCase):
class TrackSelectionEmbargoTest(UrlResetMixin, ModuleStoreTestCase):
"""Test embargo restrictions on the track selection page. """
URLCONF_MODULES = ['embargo']
URLCONF_MODULES = ['openedx.core.djangoapps.embargo']
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def setUp(self):

View File

@@ -23,7 +23,7 @@ from lms.djangoapps.commerce.utils import EcommerceService
from course_modes.models import CourseMode
from courseware.access import has_access
from edxmako.shortcuts import render_to_response
from embargo import api as embargo_api
from openedx.core.djangoapps.embargo import api as embargo_api
from student.models import CourseEnrollment
from util.db import outer_atomic

View File

@@ -1,42 +0,0 @@
"""
Django admin page for embargo models
"""
from django.contrib import admin
import textwrap
from config_models.admin import ConfigurationModelAdmin
from embargo.models import IPFilter, CountryAccessRule, RestrictedCourse
from embargo.forms import IPFilterForm, RestrictedCourseForm
class IPFilterAdmin(ConfigurationModelAdmin):
"""Admin for blacklisting/whitelisting specific IP addresses"""
form = IPFilterForm
fieldsets = (
(None, {
'fields': ('enabled', 'whitelist', 'blacklist'),
'description': textwrap.dedent("""Enter specific IP addresses to explicitly
whitelist (not block) or blacklist (block) in the appropriate box below.
Separate IP addresses with a comma. Do not surround with quotes.
""")
}),
)
class CountryAccessRuleInline(admin.StackedInline):
"""Inline editor for country access rules. """
model = CountryAccessRule
extra = 1
def has_delete_permission(self, request, obj=None):
return True
class RestrictedCourseAdmin(admin.ModelAdmin):
"""Admin for configuring course restrictions. """
inlines = [CountryAccessRuleInline]
form = RestrictedCourseForm
admin.site.register(IPFilter, IPFilterAdmin)
admin.site.register(RestrictedCourse, RestrictedCourseAdmin)

View File

@@ -1,203 +0,0 @@
"""
The Python API layer of the country access settings. Essentially the middle tier of the project, responsible for all
business logic that is not directly tied to the data itself.
This API is exposed via the middleware(emabargo/middileware.py) layer but may be used directly in-process.
"""
import logging
import pygeoip
from django.core.cache import cache
from django.conf import settings
from rest_framework.response import Response
from rest_framework import status
from ipware.ip import get_ip
from student.auth import has_course_author_access
from embargo.models import CountryAccessRule, RestrictedCourse
log = logging.getLogger(__name__)
def redirect_if_blocked(course_key, access_point='enrollment', **kwargs):
"""Redirect if the user does not have access to the course. In case of blocked if access_point
is not enrollment and course has enabled is_disabled_access_check then user can view that course.
Arguments:
course_key (CourseKey): Location of the course the user is trying to access.
Keyword Arguments:
Same as `check_course_access` and `message_url_path`
"""
if settings.FEATURES.get('EMBARGO'):
is_blocked = not check_course_access(course_key, **kwargs)
if is_blocked:
if access_point == "courseware":
if not RestrictedCourse.is_disabled_access_check(course_key):
return message_url_path(course_key, access_point)
else:
return message_url_path(course_key, access_point)
def check_course_access(course_key, user=None, ip_address=None, url=None):
"""
Check is the user with this ip_address has access to the given course
Arguments:
course_key (CourseKey): Location of the course the user is trying to access.
Keyword Arguments:
user (User): The user making the request. Can be None, in which case
the user's profile country will not be checked.
ip_address (str): The IP address of the request.
url (str): The URL the user is trying to access. Used in
log messages.
Returns:
Boolean: True if the user has access to the course; False otherwise
"""
# No-op if the country access feature is not enabled
if not settings.FEATURES.get('EMBARGO'):
return True
# First, check whether there are any restrictions on the course.
# If not, then we do not need to do any further checks
course_is_restricted = RestrictedCourse.is_restricted_course(course_key)
if not course_is_restricted:
return True
# Always give global and course staff access, regardless of embargo settings.
if user is not None and has_course_author_access(user, course_key):
return True
if ip_address is not None:
# Retrieve the country code from the IP address
# and check it against the allowed countries list for a course
user_country_from_ip = _country_code_from_ip(ip_address)
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
log.info(
(
u"Blocking user %s from accessing course %s at %s "
u"because the user's IP address %s appears to be "
u"located in %s."
),
getattr(user, 'id', '<Not Authenticated>'),
course_key,
url,
ip_address,
user_country_from_ip
)
return False
if user is not None:
# Retrieve the country code from the user's profile
# and check it against the allowed countries list for a course.
user_country_from_profile = _get_user_country_from_profile(user)
if not CountryAccessRule.check_country_access(course_key, user_country_from_profile):
log.info(
(
u"Blocking user %s from accessing course %s at %s "
u"because the user's profile country is %s."
),
user.id, course_key, url, user_country_from_profile
)
return False
return True
def message_url_path(course_key, access_point):
"""Determine the URL path for the message explaining why the user was blocked.
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
module for more details.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
Raises:
InvalidAccessPoint: Raised if access_point is not a supported value.
"""
return RestrictedCourse.message_url_path(course_key, access_point)
def _get_user_country_from_profile(user):
"""
Check whether the user is embargoed based on the country code in the user's profile.
Args:
user (User): The user attempting to access courseware.
Returns:
user country from profile.
"""
cache_key = u'user.{user_id}.profile.country'.format(user_id=user.id)
profile_country = cache.get(cache_key)
if profile_country is None:
profile = getattr(user, 'profile', None)
if profile is not None and profile.country.code is not None:
profile_country = profile.country.code.upper()
else:
profile_country = ""
cache.set(cache_key, profile_country)
return profile_country
def _country_code_from_ip(ip_addr):
"""
Return the country code associated with an IP address.
Handles both IPv4 and IPv6 addresses.
Args:
ip_addr (str): The IP address to look up.
Returns:
str: A 2-letter country code.
"""
if ip_addr.find(':') >= 0:
return pygeoip.GeoIP(settings.GEOIPV6_PATH).country_code_by_addr(ip_addr)
else:
return pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr)
def get_embargo_response(request, course_id, user):
"""
Check whether any country access rules block the user from enrollment.
Args:
request (HttpRequest): The request object
course_id (str): The requested course ID
user (str): The current user object
Returns:
HttpResponse: Response of the embargo page if embargoed, None if not
"""
redirect_url = redirect_if_blocked(
course_id, user=user, ip_address=get_ip(request), url=request.path)
if redirect_url:
return Response(
status=status.HTTP_403_FORBIDDEN,
data={
"message": (
u"Users from this location cannot access the course '{course_id}'."
).format(course_id=course_id),
"user_message_url": request.build_absolute_uri(redirect_url)
}
)

View File

@@ -1,11 +0,0 @@
"""Exceptions for the embargo app."""
class InvalidAccessPoint(Exception):
"""The requested access point is not supported. """
def __init__(self, access_point, *args, **kwargs):
msg = (
u"Access point '{access_point}' should be either 'enrollment' or 'courseware'"
).format(access_point=access_point)
super(InvalidAccessPoint, self).__init__(msg, *args, **kwargs)

View File

@@ -1,25 +0,0 @@
"""
List of valid ISO 3166-1 Alpha-2 country codes, used for
validating entries on entered country codes on django-admin page.
"""
COUNTRY_CODES = set([
"AC", "AD", "AE", "AF", "AG", "AI", "AL", "AM", "AN", "AO", "AQ", "AR", "AS", "AT",
"AU", "AW", "AX", "AZ", "BA", "BB", "BD", "BE", "BF", "BG", "BH", "BI", "BJ", "BM",
"BN", "BO", "BR", "BS", "BT", "BV", "BW", "BY", "BZ", "CA", "CC", "CD", "CF", "CG",
"CH", "CI", "CK", "CL", "CM", "CN", "CO", "CR", "CU", "CV", "CX", "CY", "CZ", "DE",
"DJ", "DK", "DM", "DO", "DZ", "EC", "EE", "EG", "ER", "ES", "ET", "FI", "FJ", "FK",
"FM", "FO", "FR", "GA", "GB", "GD", "GE", "GF", "GG", "GH", "GI", "GL", "GM", "GN",
"GP", "GQ", "GR", "GS", "GT", "GU", "GW", "GY", "HK", "HM", "HN", "HR", "HT", "HU",
"ID", "IE", "IL", "IM", "IN", "IO", "IQ", "IR", "IS", "IT", "JE", "JM", "JO", "JP",
"KE", "KG", "KH", "KI", "KM", "KN", "KP", "KR", "KW", "KY", "KZ", "LA", "LB", "LC",
"LI", "LK", "LR", "LS", "LT", "LU", "LV", "LY", "MA", "MC", "MD", "ME", "MG", "MH",
"MK", "ML", "MM", "MN", "MO", "MP", "MQ", "MR", "MS", "MT", "MU", "MV", "MW", "MX",
"MY", "MZ", "NA", "NC", "NE", "NF", "NG", "NI", "NL", "NO", "NP", "NR", "NU", "NZ",
"OM", "PA", "PE", "PF", "PG", "PH", "PK", "PL", "PM", "PN", "PR", "PT", "PW", "PY",
"QA", "RE", "RO", "RS", "RU", "RW", "SA", "SB", "SC", "SD", "SE", "SG", "SH", "SI",
"SJ", "SK", "SL", "SM", "SN", "SO", "SR", "ST", "SV", "SY", "SZ", "TA", "TC", "TD",
"TF", "TG", "TH", "TJ", "TK", "TL", "TM", "TN", "TO", "TR", "TT", "TV", "TW", "TZ",
"UA", "UG", "UM", "US", "UY", "UZ", "VA", "VC", "VE", "VG", "VI", "VN", "VU", "WF",
"WS", "YE", "YT", "ZA", "ZM", "ZW"
])

View File

@@ -1,103 +0,0 @@
"""
Defines forms for providing validation of embargo admin details.
"""
from django import forms
from django.utils.translation import ugettext as _
import ipaddr
from xmodule.modulestore.django import modulestore
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from embargo.models import IPFilter, RestrictedCourse
class RestrictedCourseForm(forms.ModelForm):
"""Validate course keys for the RestrictedCourse model.
The default behavior in Django admin is to:
* Save course keys for courses that do not exist.
* Return a 500 response if the course key format is invalid.
Using this form ensures that we display a user-friendly
error message instead.
"""
class Meta(object):
model = RestrictedCourse
fields = '__all__'
def clean_course_key(self):
"""Validate the course key.
Checks that the key format is valid and that
the course exists. If not, displays an error message.
Arguments:
field_name (str): The name of the field to validate.
Returns:
CourseKey
"""
cleaned_id = self.cleaned_data['course_key']
error_msg = _('COURSE NOT FOUND. Please check that the course ID is valid.')
try:
course_key = CourseKey.from_string(cleaned_id)
except InvalidKeyError:
raise forms.ValidationError(error_msg)
if not modulestore().has_course(course_key):
raise forms.ValidationError(error_msg)
return course_key
class IPFilterForm(forms.ModelForm):
"""Form validating entry of IP addresses"""
class Meta(object):
model = IPFilter
fields = '__all__'
def _is_valid_ip(self, address):
"""Whether or not address is a valid ipv4 address or ipv6 address"""
try:
# Is this an valid ip address?
ipaddr.IPNetwork(address)
except ValueError:
return False
return True
def _valid_ip_addresses(self, addresses):
"""
Checks if a csv string of IP addresses contains valid values.
If not, raises a ValidationError.
"""
if addresses == '':
return ''
error_addresses = []
for addr in addresses.split(','):
address = addr.strip()
if not self._is_valid_ip(address):
error_addresses.append(address)
if error_addresses:
msg = 'Invalid IP Address(es): {0}'.format(error_addresses)
msg += ' Please fix the error(s) and try again.'
raise forms.ValidationError(msg)
return addresses
def clean_whitelist(self):
"""Validates the whitelist"""
whitelist = self.cleaned_data["whitelist"]
return self._valid_ip_addresses(whitelist)
def clean_blacklist(self):
"""Validates the blacklist"""
blacklist = self.cleaned_data["blacklist"]
return self._valid_ip_addresses(blacklist)

View File

@@ -1,40 +0,0 @@
"""Define messages for restricted courses.
These messages are displayed to users when they are blocked
from either enrolling in or accessing a course.
"""
from collections import namedtuple
BlockedMessage = namedtuple('BlockedMessage', [
# A user-facing description of the message
'description',
# The mako template used to render the message
'template',
])
ENROLL_MESSAGES = {
'default': BlockedMessage(
description='Default',
template='embargo/default_enrollment.html'
),
'embargo': BlockedMessage(
description='Embargo',
template='static_templates/embargo.html'
)
}
COURSEWARE_MESSAGES = {
'default': BlockedMessage(
description='Default',
template='embargo/default_courseware.html'
),
'embargo': BlockedMessage(
description='Embargo',
template='static_templates/embargo.html'
)
}

View File

@@ -1,153 +0,0 @@
"""Middleware for embargoing site and courses.
IMPORTANT NOTE: This code WILL NOT WORK if you have a misconfigured proxy
server. If you are configuring embargo functionality, or if you are
experiencing mysterious problems with embargoing, please check that your
reverse proxy is setting any of the well known client IP address headers (ex.,
HTTP_X_FORWARDED_FOR).
This middleware allows you to:
* Embargoing courses (access restriction by courses)
* Embargoing site (access restriction of the main site)
Embargo can restrict by states and whitelist/blacklist (IP Addresses
(ie. 10.0.0.0), Networks (ie. 10.0.0.0/24)), or the user profile country.
Usage:
1) Enable embargo by setting `settings.FEATURES['EMBARGO']` to True.
2) In Django admin, create a new `IPFilter` model to block or whitelist
an IP address from accessing the site.
3) In Django admin, create a new `RestrictedCourse` model and
configure a whitelist or blacklist of countries for that course.
"""
import logging
import re
from django.core.exceptions import MiddlewareNotUsed
from django.core.urlresolvers import reverse
from django.conf import settings
from django.shortcuts import redirect
from ipware.ip import get_ip
from util.request import course_id_from_url
from embargo.models import IPFilter
from embargo import api as embargo_api
log = logging.getLogger(__name__)
class EmbargoMiddleware(object):
"""Middleware for embargoing site and courses. """
ALLOW_URL_PATTERNS = [
# Don't block the embargo message pages; otherwise we'd
# end up in an infinite redirect loop.
re.compile(r'^/embargo/blocked-message/'),
# Don't block the Django admin pages. Otherwise, we might
# accidentally lock ourselves out of Django admin
# during testing.
re.compile(r'^/admin/'),
# Do not block access to course metadata. This information is needed for
# sever-to-server calls.
re.compile(r'^/api/course_structure/v[\d+]/courses/{}/$'.format(settings.COURSE_ID_PATTERN)),
]
def __init__(self):
# If embargoing is turned off, make this middleware do nothing
if not settings.FEATURES.get('EMBARGO'):
raise MiddlewareNotUsed()
def process_request(self, request):
"""Block requests based on embargo rules.
This will perform the following checks:
1) If the user's IP address is blacklisted, block.
2) If the user's IP address is whitelisted, allow.
3) If the user's country (inferred from their IP address) is blocked for
a courseware page, block.
4) If the user's country (retrieved from the user's profile) is blocked
for a courseware page, block.
5) Allow access.
"""
# Never block certain patterns by IP address
for pattern in self.ALLOW_URL_PATTERNS:
if pattern.match(request.path) is not None:
return None
ip_address = get_ip(request)
ip_filter = IPFilter.current()
if ip_filter.enabled and ip_address in ip_filter.blacklist_ips:
log.info(
(
u"User %s was blocked from accessing %s "
u"because IP address %s is blacklisted."
), request.user.id, request.path, ip_address
)
# If the IP is blacklisted, reject.
# This applies to any request, not just courseware URLs.
ip_blacklist_url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'embargo'
}
)
return redirect(ip_blacklist_url)
elif ip_filter.enabled and ip_address in ip_filter.whitelist_ips:
log.info(
(
u"User %s was allowed access to %s because "
u"IP address %s is whitelisted."
),
request.user.id, request.path, ip_address
)
# If the IP is whitelisted, then allow access,
# skipping later checks.
return None
else:
# Otherwise, perform the country access checks.
# This applies only to courseware URLs.
return self.country_access_rules(request.user, ip_address, request.path)
def country_access_rules(self, user, ip_address, url_path):
"""
Check the country access rules for a given course.
Applies only to courseware URLs.
Args:
user (User): The user making the current request.
ip_address (str): The IP address from which the request originated.
url_path (str): The request path.
Returns:
HttpResponse or None
"""
course_id = course_id_from_url(url_path)
if course_id:
redirect_url = embargo_api.redirect_if_blocked(
course_id,
user=user,
ip_address=ip_address,
url=url_path,
access_point='courseware'
)
if redirect_url:
return redirect(redirect_url)

View File

@@ -1,104 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django_countries.fields
import django.db.models.deletion
from django.conf import settings
from openedx.core.djangoapps.xmodule_django.models import CourseKeyField
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Country',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('country', django_countries.fields.CountryField(help_text='Two character ISO country code.', unique=True, max_length=2, db_index=True)),
],
options={
'ordering': ['country'],
},
),
migrations.CreateModel(
name='CountryAccessRule',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('rule_type', models.CharField(default=b'blacklist', help_text='Whether to include or exclude the given course. If whitelist countries are specified, then ONLY users from whitelisted countries will be able to access the course. If blacklist countries are specified, then users from blacklisted countries will NOT be able to access the course.', max_length=255, choices=[(b'whitelist', b'Whitelist (allow only these countries)'), (b'blacklist', b'Blacklist (block these countries)')])),
('country', models.ForeignKey(help_text='The country to which this rule applies.', to='embargo.Country')),
],
),
migrations.CreateModel(
name='CourseAccessRuleHistory',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('timestamp', models.DateTimeField(auto_now_add=True, db_index=True)),
('course_key', CourseKeyField(max_length=255, db_index=True)),
('snapshot', models.TextField(null=True, blank=True)),
],
options={
'get_latest_by': 'timestamp',
},
),
migrations.CreateModel(
name='EmbargoedCourse',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('course_id', CourseKeyField(unique=True, max_length=255, db_index=True)),
('embargoed', models.BooleanField(default=False)),
],
),
migrations.CreateModel(
name='EmbargoedState',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('change_date', models.DateTimeField(auto_now_add=True, verbose_name='Change date')),
('enabled', models.BooleanField(default=False, verbose_name='Enabled')),
('embargoed_countries', models.TextField(help_text=b'A comma-separated list of country codes that fall under U.S. embargo restrictions', blank=True)),
('changed_by', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, editable=False, to=settings.AUTH_USER_MODEL, null=True, verbose_name='Changed by')),
],
options={
'ordering': ('-change_date',),
'abstract': False,
},
),
migrations.CreateModel(
name='IPFilter',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('change_date', models.DateTimeField(auto_now_add=True, verbose_name='Change date')),
('enabled', models.BooleanField(default=False, verbose_name='Enabled')),
('whitelist', models.TextField(help_text=b'A comma-separated list of IP addresses that should not fall under embargo restrictions.', blank=True)),
('blacklist', models.TextField(help_text=b'A comma-separated list of IP addresses that should fall under embargo restrictions.', blank=True)),
('changed_by', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, editable=False, to=settings.AUTH_USER_MODEL, null=True, verbose_name='Changed by')),
],
options={
'ordering': ('-change_date',),
'abstract': False,
},
),
migrations.CreateModel(
name='RestrictedCourse',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('course_key', CourseKeyField(help_text='The course key for the restricted course.', unique=True, max_length=255, db_index=True)),
('enroll_msg_key', models.CharField(default=b'default', help_text='The message to show when a user is blocked from enrollment.', max_length=255, choices=[(b'default', b'Default'), (b'embargo', b'Embargo')])),
('access_msg_key', models.CharField(default=b'default', help_text='The message to show when a user is blocked from accessing a course.', max_length=255, choices=[(b'default', b'Default'), (b'embargo', b'Embargo')])),
('disable_access_check', models.BooleanField(default=False, help_text='Allow users who enrolled in an allowed country to access restricted courses from excluded countries.')),
],
),
migrations.AddField(
model_name='countryaccessrule',
name='restricted_course',
field=models.ForeignKey(help_text='The course to which this rule applies.', to='embargo.RestrictedCourse'),
),
migrations.AlterUniqueTogether(
name='countryaccessrule',
unique_together=set([('restricted_course', 'country')]),
),
]

View File

@@ -1,30 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
# Converted from the original South migration 0003_add_countries.py
from django.db import migrations, models
from django_countries import countries
def create_embargo_countries(apps, schema_editor):
"""Populate the available countries with all 2-character ISO country codes. """
country_model = apps.get_model("embargo", "Country")
for country_code, __ in list(countries):
country_model.objects.get_or_create(country=country_code)
def remove_embargo_countries(apps, schema_editor):
"""Clear all available countries. """
country_model = apps.get_model("embargo", "Country")
country_model.objects.all().delete()
class Migration(migrations.Migration):
dependencies = [
('embargo', '0001_initial'),
]
operations = [
migrations.RunPython(create_embargo_countries, remove_embargo_countries),
]

View File

@@ -1,715 +0,0 @@
"""
Models for embargoing visits to certain courses by IP address.
WE'RE USING MIGRATIONS!
If you make changes to this model, be sure to create an appropriate migration
file and check it in at the same time as your model changes. To do that,
1. Go to the edx-platform dir
2. ./manage.py lms schemamigration embargo --auto description_of_your_change
3. Add the migration file created in edx-platform/common/djangoapps/embargo/migrations/
"""
import ipaddr
import json
import logging
from django.db import models
from django.utils.translation import ugettext as _, ugettext_lazy
from django.core.cache import cache
from django.core.urlresolvers import reverse
from django.db.models.signals import post_save, post_delete
from django_countries.fields import CountryField
from django_countries import countries
from config_models.models import ConfigurationModel
from openedx.core.djangoapps.xmodule_django.models import CourseKeyField, NoneToEmptyManager
from embargo.exceptions import InvalidAccessPoint
from embargo.messages import ENROLL_MESSAGES, COURSEWARE_MESSAGES
log = logging.getLogger(__name__)
class EmbargoedCourse(models.Model):
"""
Enable course embargo on a course-by-course basis.
Deprecated by `RestrictedCourse`
"""
objects = NoneToEmptyManager()
# The course to embargo
course_id = CourseKeyField(max_length=255, db_index=True, unique=True)
# Whether or not to embargo
embargoed = models.BooleanField(default=False)
@classmethod
def is_embargoed(cls, course_id):
"""
Returns whether or not the given course id is embargoed.
If course has not been explicitly embargoed, returns False.
"""
try:
record = cls.objects.get(course_id=course_id)
return record.embargoed
except cls.DoesNotExist:
return False
def __unicode__(self):
not_em = "Not "
if self.embargoed:
not_em = ""
# pylint: disable=no-member
return u"Course '{}' is {}Embargoed".format(self.course_id.to_deprecated_string(), not_em)
class EmbargoedState(ConfigurationModel):
"""
Register countries to be embargoed.
Deprecated by `Country`.
"""
# The countries to embargo
embargoed_countries = models.TextField(
blank=True,
help_text="A comma-separated list of country codes that fall under U.S. embargo restrictions"
)
@property
def embargoed_countries_list(self):
"""
Return a list of upper case country codes
"""
if self.embargoed_countries == '':
return []
return [country.strip().upper() for country in self.embargoed_countries.split(',')]
class RestrictedCourse(models.Model):
"""Course with access restrictions.
Restricted courses can block users at two points:
1) When enrolling in a course.
2) When attempting to access a course the user is already enrolled in.
The second case can occur when new restrictions
are put into place; for example, when new countries
are embargoed.
Restricted courses can be configured to display
messages to users when they are blocked.
These displayed on pages served by the embargo app.
"""
COURSE_LIST_CACHE_KEY = 'embargo.restricted_courses'
MESSAGE_URL_CACHE_KEY = 'embargo.message_url_path.{access_point}.{course_key}'
ENROLL_MSG_KEY_CHOICES = tuple([
(msg_key, msg.description)
for msg_key, msg in ENROLL_MESSAGES.iteritems()
])
COURSEWARE_MSG_KEY_CHOICES = tuple([
(msg_key, msg.description)
for msg_key, msg in COURSEWARE_MESSAGES.iteritems()
])
course_key = CourseKeyField(
max_length=255, db_index=True, unique=True,
help_text=ugettext_lazy(u"The course key for the restricted course.")
)
enroll_msg_key = models.CharField(
max_length=255,
choices=ENROLL_MSG_KEY_CHOICES,
default='default',
help_text=ugettext_lazy(u"The message to show when a user is blocked from enrollment.")
)
access_msg_key = models.CharField(
max_length=255,
choices=COURSEWARE_MSG_KEY_CHOICES,
default='default',
help_text=ugettext_lazy(u"The message to show when a user is blocked from accessing a course.")
)
disable_access_check = models.BooleanField(
default=False,
help_text=ugettext_lazy(
u"Allow users who enrolled in an allowed country "
u"to access restricted courses from excluded countries."
)
)
@classmethod
def is_restricted_course(cls, course_id):
"""
Check if the course is in restricted list
Args:
course_id (str): course_id to look for
Returns:
Boolean
True if course is in restricted course list.
"""
return unicode(course_id) in cls._get_restricted_courses_from_cache()
@classmethod
def is_disabled_access_check(cls, course_id):
"""
Check if the course is in restricted list has disabled_access_check
Args:
course_id (str): course_id to look for
Returns:
Boolean
disabled_access_check attribute of restricted course
"""
# checking is_restricted_course method also here to make sure course exists in the list otherwise in case of
# no course found it will throw the key not found error on 'disable_access_check'
return (
cls.is_restricted_course(unicode(course_id))
and cls._get_restricted_courses_from_cache().get(unicode(course_id))["disable_access_check"]
)
@classmethod
def _get_restricted_courses_from_cache(cls):
"""
Cache all restricted courses and returns the dict of course_keys and disable_access_check that are restricted
"""
restricted_courses = cache.get(cls.COURSE_LIST_CACHE_KEY)
if restricted_courses is None:
restricted_courses = {
unicode(course.course_key): {
'disable_access_check': course.disable_access_check
}
for course in RestrictedCourse.objects.all()
}
cache.set(cls.COURSE_LIST_CACHE_KEY, restricted_courses)
return restricted_courses
def snapshot(self):
"""Return a snapshot of all access rules for this course.
This is useful for recording an audit trail of rule changes.
The returned dictionary is JSON-serializable.
Returns:
dict
Example Usage:
>>> restricted_course.snapshot()
{
'enroll_msg': 'default',
'access_msg': 'default',
'country_rules': [
{'country': 'IR', 'rule_type': 'blacklist'},
{'country': 'CU', 'rule_type': 'blacklist'}
]
}
"""
country_rules_for_course = (
CountryAccessRule.objects
).select_related('country').filter(restricted_course=self)
return {
'enroll_msg': self.enroll_msg_key,
'access_msg': self.access_msg_key,
'country_rules': [
{
'country': unicode(rule.country.country),
'rule_type': rule.rule_type
}
for rule in country_rules_for_course
]
}
def message_key_for_access_point(self, access_point):
"""Determine which message to show the user.
The message can be configured per-course and depends
on how the user is trying to access the course
(trying to enroll or accessing courseware).
Arguments:
access_point (str): Either "courseware" or "enrollment"
Returns:
str: The message key. If the access point is not valid,
returns None instead.
"""
if access_point == 'enrollment':
return self.enroll_msg_key
elif access_point == 'courseware':
return self.access_msg_key
def __unicode__(self):
return unicode(self.course_key)
@classmethod
def message_url_path(cls, course_key, access_point):
"""Determine the URL path for the message explaining why the user was blocked.
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
module for more details.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
Raises:
InvalidAccessPoint: Raised if access_point is not a supported value.
"""
if access_point not in ['enrollment', 'courseware']:
raise InvalidAccessPoint(access_point)
# First check the cache to see if we already have
# a URL for this (course_key, access_point) tuple
cache_key = cls.MESSAGE_URL_CACHE_KEY.format(
access_point=access_point,
course_key=course_key
)
url = cache.get(cache_key)
# If there's a cache miss, we'll need to retrieve the message
# configuration from the database
if url is None:
url = cls._get_message_url_path_from_db(course_key, access_point)
cache.set(cache_key, url)
return url
@classmethod
def _get_message_url_path_from_db(cls, course_key, access_point):
"""Retrieve the "blocked" message from the database.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
"""
# Fallback in case we're not able to find a message path
# Presumably if the caller is requesting a URL, the caller
# has already determined that the user should be blocked.
# We use generic messaging unless we find something more specific,
# but *always* return a valid URL path.
default_path = reverse(
'embargo_blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'default'
}
)
# First check whether this is a restricted course.
# The list of restricted courses is cached, so this does
# not require a database query.
if not cls.is_restricted_course(course_key):
return default_path
# Retrieve the message key from the restricted course
# for this access point, then determine the URL.
try:
course = cls.objects.get(course_key=course_key)
msg_key = course.message_key_for_access_point(access_point)
return reverse(
'embargo_blocked_message',
kwargs={
'access_point': access_point,
'message_key': msg_key
}
)
except cls.DoesNotExist:
# This occurs only if there's a race condition
# between cache invalidation and database access.
return default_path
@classmethod
def invalidate_cache_for_course(cls, course_key):
"""Invalidate the caches for the restricted course. """
cache.delete(cls.COURSE_LIST_CACHE_KEY)
log.info("Invalidated cached list of restricted courses.")
for access_point in ['enrollment', 'courseware']:
msg_cache_key = cls.MESSAGE_URL_CACHE_KEY.format(
access_point=access_point,
course_key=course_key
)
cache.delete(msg_cache_key)
log.info("Invalidated cached messaging URLs ")
class Country(models.Model):
"""Representation of a country.
This is used to define country-based access rules.
There is a data migration that creates entries for
each country code.
"""
country = CountryField(
db_index=True, unique=True,
help_text=ugettext_lazy(u"Two character ISO country code.")
)
def __unicode__(self):
return u"{name} ({code})".format(
name=unicode(self.country.name),
code=unicode(self.country)
)
class Meta(object):
"""Default ordering is ascending by country code """
ordering = ['country']
class CountryAccessRule(models.Model):
"""Course access rule based on the user's country.
The rule applies to a particular course-country pair.
Countries can either be whitelisted or blacklisted,
but not both.
To determine whether a user has access to a course
based on the user's country:
1) Retrieve the list of whitelisted countries for the course.
(If there aren't any, then include every possible country.)
2) From the initial list, remove all blacklisted countries
for the course.
"""
WHITELIST_RULE = 'whitelist'
BLACKLIST_RULE = 'blacklist'
RULE_TYPE_CHOICES = (
(WHITELIST_RULE, 'Whitelist (allow only these countries)'),
(BLACKLIST_RULE, 'Blacklist (block these countries)'),
)
rule_type = models.CharField(
max_length=255,
choices=RULE_TYPE_CHOICES,
default=BLACKLIST_RULE,
help_text=ugettext_lazy(
u"Whether to include or exclude the given course. "
u"If whitelist countries are specified, then ONLY users from whitelisted countries "
u"will be able to access the course. If blacklist countries are specified, then "
u"users from blacklisted countries will NOT be able to access the course."
)
)
restricted_course = models.ForeignKey(
"RestrictedCourse",
help_text=ugettext_lazy(u"The course to which this rule applies.")
)
country = models.ForeignKey(
"Country",
help_text=ugettext_lazy(u"The country to which this rule applies.")
)
CACHE_KEY = u"embargo.allowed_countries.{course_key}"
ALL_COUNTRIES = set(code[0] for code in list(countries))
@classmethod
def check_country_access(cls, course_id, country):
"""
Check if the country is either in whitelist or blacklist of countries for the course_id
Args:
course_id (str): course_id to look for
country (str): A 2 characters code of country
Returns:
Boolean
True if country found in allowed country
otherwise check given country exists in list
"""
# If the country code is not in the list of all countries,
# we don't want to automatically exclude the user.
# This can happen, for example, when GeoIP falls back
# to using a continent code because it cannot determine
# the specific country.
if country not in cls.ALL_COUNTRIES:
return True
cache_key = cls.CACHE_KEY.format(course_key=course_id)
allowed_countries = cache.get(cache_key)
if allowed_countries is None:
allowed_countries = cls._get_country_access_list(course_id)
cache.set(cache_key, allowed_countries)
return country == '' or country in allowed_countries
@classmethod
def _get_country_access_list(cls, course_id):
"""
if a course is blacklist for two countries then course can be accessible from
any where except these two countries.
if a course is whitelist for two countries then course can be accessible from
these countries only.
Args:
course_id (str): course_id to look for
Returns:
List
Consolidated list of accessible countries for given course
"""
whitelist_countries = set()
blacklist_countries = set()
# Retrieve all rules in one database query, performing the "join" with the Country table
rules_for_course = CountryAccessRule.objects.select_related('country').filter(
restricted_course__course_key=course_id
)
# Filter the rules into a whitelist and blacklist in one pass
for rule in rules_for_course:
if rule.rule_type == cls.WHITELIST_RULE:
whitelist_countries.add(rule.country.country.code)
elif rule.rule_type == cls.BLACKLIST_RULE:
blacklist_countries.add(rule.country.country.code)
# If there are no whitelist countries, default to all countries
if not whitelist_countries:
whitelist_countries = cls.ALL_COUNTRIES
# Consolidate the rules into a single list of countries
# that have access to the course.
return list(whitelist_countries - blacklist_countries)
def __unicode__(self):
if self.rule_type == self.WHITELIST_RULE:
return _(u"Whitelist {country} for {course}").format(
course=unicode(self.restricted_course.course_key),
country=unicode(self.country),
)
elif self.rule_type == self.BLACKLIST_RULE:
return _(u"Blacklist {country} for {course}").format(
course=unicode(self.restricted_course.course_key),
country=unicode(self.country),
)
@classmethod
def invalidate_cache_for_course(cls, course_key):
"""Invalidate the cache. """
cache_key = cls.CACHE_KEY.format(course_key=course_key)
cache.delete(cache_key)
log.info("Invalidated country access list for course %s", course_key)
class Meta(object):
"""a course can be added with either black or white list. """
unique_together = (
# This restriction ensures that a country is on
# either the whitelist or the blacklist, but
# not both (for a particular course).
("restricted_course", "country")
)
def invalidate_country_rule_cache(sender, instance, **kwargs): # pylint: disable=unused-argument
"""Invalidate cached rule information on changes to the rule models.
We need to handle this in a Django receiver, because Django admin
doesn't always call the model's `delete()` method directly during
a bulk delete operation.
Arguments:
sender: Not used, but required by Django receivers.
instance (RestrictedCourse or CountryAccessRule): The instance
being saved or deleted.
"""
if isinstance(instance, RestrictedCourse):
# If a restricted course changed, we need to update the list
# of which courses are restricted as well as any rules
# associated with the course.
RestrictedCourse.invalidate_cache_for_course(instance.course_key)
CountryAccessRule.invalidate_cache_for_course(instance.course_key)
if isinstance(instance, CountryAccessRule):
try:
restricted_course = instance.restricted_course
except RestrictedCourse.DoesNotExist:
# If the restricted course and its rules are being deleted,
# the restricted course may not exist at this point.
# However, the cache should have been invalidated
# when the restricted course was deleted.
pass
else:
# Invalidate the cache of countries for the course.
CountryAccessRule.invalidate_cache_for_course(restricted_course.course_key)
# Hook up the cache invalidation receivers to the appropriate
# post_save and post_delete signals.
post_save.connect(invalidate_country_rule_cache, sender=CountryAccessRule)
post_save.connect(invalidate_country_rule_cache, sender=RestrictedCourse)
post_delete.connect(invalidate_country_rule_cache, sender=CountryAccessRule)
post_delete.connect(invalidate_country_rule_cache, sender=RestrictedCourse)
class CourseAccessRuleHistory(models.Model):
"""History of course access rule changes. """
timestamp = models.DateTimeField(db_index=True, auto_now_add=True)
course_key = CourseKeyField(max_length=255, db_index=True)
snapshot = models.TextField(null=True, blank=True)
DELETED_PLACEHOLDER = "DELETED"
@classmethod
def save_snapshot(cls, restricted_course, deleted=False):
"""Save a snapshot of access rules for a course.
Arguments:
restricted_course (RestrictedCourse)
Keyword Arguments:
deleted (boolean): If True, the restricted course
is about to be deleted. Create a placeholder
snapshot recording that the course and all its
rules was deleted.
Returns:
None
"""
course_key = restricted_course.course_key
# At the point this is called, the access rules may not have
# been deleted yet. When the rules *are* deleted, the
# restricted course entry may no longer exist, so we
# won't be able to take a snapshot of the rules.
# To handle this, we save a placeholder "DELETED" entry
# so that it's clear in the audit that the restricted
# course (along with all its rules) was deleted.
snapshot = (
CourseAccessRuleHistory.DELETED_PLACEHOLDER if deleted
else json.dumps(restricted_course.snapshot())
)
cls.objects.create(
course_key=course_key,
snapshot=snapshot
)
@staticmethod
def snapshot_post_save_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument
"""Create a snapshot of course access rules when the rules are updated. """
if isinstance(instance, RestrictedCourse):
CourseAccessRuleHistory.save_snapshot(instance)
elif isinstance(instance, CountryAccessRule):
CourseAccessRuleHistory.save_snapshot(instance.restricted_course)
@staticmethod
def snapshot_post_delete_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument
"""Create a snapshot of course access rules when rules are deleted. """
if isinstance(instance, RestrictedCourse):
CourseAccessRuleHistory.save_snapshot(instance, deleted=True)
elif isinstance(instance, CountryAccessRule):
try:
restricted_course = instance.restricted_course
except RestrictedCourse.DoesNotExist:
# When Django admin deletes a restricted course, it will
# also delete the rules associated with that course.
# At this point, we can't access the restricted course
# from the rule beause it may already have been deleted.
# If this happens, we don't need to record anything,
# since we already record a placeholder "DELETED"
# entry when the restricted course record is deleted.
pass
else:
CourseAccessRuleHistory.save_snapshot(restricted_course)
class Meta(object):
get_latest_by = 'timestamp'
# Connect the signals to the receivers so we record a history
# of changes to the course access rules.
post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=RestrictedCourse)
post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=CountryAccessRule)
post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=RestrictedCourse)
post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=CountryAccessRule)
class IPFilter(ConfigurationModel):
"""
Register specific IP addresses to explicitly block or unblock.
"""
whitelist = models.TextField(
blank=True,
help_text="A comma-separated list of IP addresses that should not fall under embargo restrictions."
)
blacklist = models.TextField(
blank=True,
help_text="A comma-separated list of IP addresses that should fall under embargo restrictions."
)
class IPFilterList(object):
"""
Represent a list of IP addresses with support of networks.
"""
def __init__(self, ips):
self.networks = [ipaddr.IPNetwork(ip) for ip in ips]
def __iter__(self):
for network in self.networks:
yield network
def __contains__(self, ip):
try:
ip = ipaddr.IPAddress(ip)
except ValueError:
return False
for network in self.networks:
if network.Contains(ip):
return True
return False
@property
def whitelist_ips(self):
"""
Return a list of valid IP addresses to whitelist
"""
if self.whitelist == '':
return []
return self.IPFilterList([addr.strip() for addr in self.whitelist.split(',')])
@property
def blacklist_ips(self):
"""
Return a list of valid IP addresses to blacklist
"""
if self.blacklist == '':
return []
return self.IPFilterList([addr.strip() for addr in self.blacklist.split(',')])

View File

@@ -1,83 +0,0 @@
"""Utilities for writing unit tests that involve course embargos. """
import contextlib
import mock
import pygeoip
from django.core.urlresolvers import reverse
from django.core.cache import cache
from embargo.models import Country, CountryAccessRule, RestrictedCourse
@contextlib.contextmanager
def restrict_course(course_key, access_point="enrollment", disable_access_check=False):
"""Simulate that a course is restricted.
This does two things:
1) Configures country access rules so that the course is restricted.
2) Mocks the GeoIP call so the user appears to be coming
from a country that's blocked from the course.
This is useful for tests that need to verify
that restricted users won't be able to access
particular views.
Arguments:
course_key (CourseKey): The location of the course to block.
Keyword Arguments:
access_point (str): Either "courseware" or "enrollment"
Yields:
str: A URL to the page in the embargo app that explains
why the user was blocked.
Example Usage:
>>> with restrict_course(course_key) as redirect_url:
>>> # The client will appear to be coming from
>>> # an IP address that is blocked.
>>> resp = self.client.get(url)
>>> self.assertRedirects(resp, redirect_url)
"""
# Clear the cache to ensure that previous tests don't interfere
# with this test.
cache.clear()
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
# Remove all existing rules for the course
CountryAccessRule.objects.all().delete()
# Create the country object
# Ordinarily, we'd create models for every country,
# but that would slow down the test suite.
country, __ = Country.objects.get_or_create(country='IR')
# Create a model for the restricted course
restricted_course, __ = RestrictedCourse.objects.get_or_create(course_key=course_key)
restricted_course.enroll_msg_key = 'default'
restricted_course.access_msg_key = 'default'
restricted_course.disable_access_check = disable_access_check
restricted_course.save()
# Ensure that there is a blacklist rule for the country
CountryAccessRule.objects.get_or_create(
restricted_course=restricted_course,
country=country,
rule_type='blacklist'
)
# Simulate that the user is coming from the blacklisted country
mock_ip.return_value = 'IR'
# Yield the redirect url so the tests don't need to know
# the embargo messaging URL structure.
redirect_url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': access_point,
'message_key': 'default'
}
)
yield redirect_url

View File

@@ -1,319 +0,0 @@
"""
Tests for EmbargoMiddleware
"""
from contextlib import contextmanager
import mock
from nose.plugins.attrib import attr
import unittest
import pygeoip
import ddt
from django.conf import settings
from django.test.utils import override_settings
from django.core.cache import cache
from django.db import connection
from student.tests.factories import UserFactory
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import (
ModuleStoreTestCase, mixed_store_config
)
from student.roles import (
GlobalStaff, CourseRole, OrgRole,
CourseStaffRole, CourseInstructorRole,
OrgStaffRole, OrgInstructorRole
)
from embargo.models import (
RestrictedCourse, Country, CountryAccessRule,
)
from util.testing import UrlResetMixin
from embargo import api as embargo_api
from embargo.exceptions import InvalidAccessPoint
from mock import patch
MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {})
@attr(shard=3)
@ddt.ddt
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
@mock.patch.dict(settings.FEATURES, {'EMBARGO': True})
class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
"""Test the embargo API calls to determine whether a user has access. """
ENABLED_CACHES = ['default', 'mongo_metadata_inheritance', 'loc_cache']
def setUp(self):
super(EmbargoCheckAccessApiTests, self).setUp()
self.course = CourseFactory.create()
self.user = UserFactory.create()
self.restricted_course = RestrictedCourse.objects.create(course_key=self.course.id)
Country.objects.create(country='US')
Country.objects.create(country='IR')
Country.objects.create(country='CU')
# Clear the cache to prevent interference between tests
cache.clear()
@ddt.data(
# IP country, profile_country, blacklist, whitelist, allow_access
('US', None, [], [], True),
('IR', None, ['IR', 'CU'], [], False),
('US', 'IR', ['IR', 'CU'], [], False),
('IR', 'IR', ['IR', 'CU'], [], False),
('US', None, [], ['US'], True),
('IR', None, [], ['US'], False),
('US', 'IR', [], ['US'], False),
)
@ddt.unpack
def test_country_access_rules(self, ip_country, profile_country, blacklist, whitelist, allow_access):
# Configure the access rules
for whitelist_country in whitelist:
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.WHITELIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country=whitelist_country)
)
for blacklist_country in blacklist:
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country=blacklist_country)
)
# Configure the user's profile country
if profile_country is not None:
self.user.profile.country = profile_country
self.user.profile.save()
# Appear to make a request from an IP in a particular country
with self._mock_geoip(ip_country):
# Call the API. Note that the IP address we pass in doesn't
# matter, since we're injecting a mock for geo-location
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
# Verify that the access rules were applied correctly
self.assertEqual(result, allow_access)
def test_no_user_has_access(self):
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country='US')
)
# The user is set to None, because the user has not been authenticated.
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
self.assertTrue(result)
def test_no_user_blocked(self):
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country='US')
)
with self._mock_geoip('US'):
# The user is set to None, because the user has not been authenticated.
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
self.assertFalse(result)
def test_course_not_restricted(self):
# No restricted course model for this course key,
# so all access checks should be skipped.
unrestricted_course = CourseFactory.create()
with self.assertNumQueries(1):
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
# The second check should require no database queries
with self.assertNumQueries(0):
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
def test_ip_v6(self):
# Test the scenario that will go through every check
# (restricted course, but pass all the checks)
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='FE80::0202:B3FF:FE1E:8329')
self.assertTrue(result)
def test_country_access_fallback_to_continent_code(self):
# Simulate PyGeoIP falling back to a continent code
# instead of a country code. In this case, we should
# allow the user access.
with self._mock_geoip('EU'):
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
self.assertTrue(result)
@mock.patch.dict(settings.FEATURES, {'EMBARGO': True})
def test_profile_country_db_null(self):
# Django country fields treat NULL values inconsistently.
# When saving a profile with country set to None, Django saves an empty string to the database.
# However, when the country field loads a NULL value from the database, it sets
# `country.code` to `None`. This caused a bug in which country values created by
# the original South schema migration -- which defaulted to NULL -- caused a runtime
# exception when the embargo middleware treated the value as a string.
# In order to simulate this behavior, we can't simply set `profile.country = None`.
# (because when we save it, it will set the database field to an empty string instead of NULL)
query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s"
connection.cursor().execute(query, [str(self.user.profile.id)])
# Verify that we can check the user's access without error
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
self.assertTrue(result)
def test_caching(self):
with self._mock_geoip('US'):
# Test the scenario that will go through every check
# (restricted course, but pass all the checks)
# This is the worst case, so it will hit all of the
# caching code.
with self.assertNumQueries(3):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
with self.assertNumQueries(0):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
def test_caching_no_restricted_courses(self):
RestrictedCourse.objects.all().delete()
cache.clear()
with self.assertNumQueries(1):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
with self.assertNumQueries(0):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
@ddt.data(
GlobalStaff,
CourseStaffRole,
CourseInstructorRole,
OrgStaffRole,
OrgInstructorRole,
)
def test_staff_access_country_block(self, staff_role_cls):
# Add a country to the blacklist
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country='US')
)
# Appear to make a request from an IP in the blocked country
with self._mock_geoip('US'):
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
# Expect that the user is blocked, because the user isn't staff
self.assertFalse(result, msg="User should not have access because the user isn't staff.")
# Instantiate the role, configuring it for this course or org
if issubclass(staff_role_cls, CourseRole):
staff_role = staff_role_cls(self.course.id)
elif issubclass(staff_role_cls, OrgRole):
staff_role = staff_role_cls(self.course.id.org)
else:
staff_role = staff_role_cls()
# Add the user to the role
staff_role.add_users(self.user)
# Now the user should have access
with self._mock_geoip('US'):
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
self.assertTrue(result, msg="User should have access because the user is staff.")
@contextmanager
def _mock_geoip(self, country_code):
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
mock_ip.return_value = country_code
yield
@ddt.ddt
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EmbargoMessageUrlApiTests(UrlResetMixin, ModuleStoreTestCase):
"""Test the embargo API calls for retrieving the blocking message URLs. """
URLCONF_MODULES = ['embargo']
ENABLED_CACHES = ['default', 'mongo_metadata_inheritance', 'loc_cache']
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def setUp(self):
super(EmbargoMessageUrlApiTests, self).setUp()
self.course = CourseFactory.create()
@ddt.data(
('enrollment', '/embargo/blocked-message/enrollment/embargo/'),
('courseware', '/embargo/blocked-message/courseware/embargo/')
)
@ddt.unpack
def test_message_url_path(self, access_point, expected_url_path):
self._restrict_course(self.course.id)
# Retrieve the URL to the blocked message page
url_path = embargo_api.message_url_path(self.course.id, access_point)
self.assertEqual(url_path, expected_url_path)
def test_message_url_path_caching(self):
self._restrict_course(self.course.id)
# The first time we retrieve the message, we'll need
# to hit the database.
with self.assertNumQueries(2):
embargo_api.message_url_path(self.course.id, "enrollment")
# The second time, we should be using cached values
with self.assertNumQueries(0):
embargo_api.message_url_path(self.course.id, "enrollment")
@ddt.data('enrollment', 'courseware')
def test_message_url_path_no_restrictions_for_course(self, access_point):
# No restrictions for the course
url_path = embargo_api.message_url_path(self.course.id, access_point)
# Use a default path
self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/')
def test_invalid_access_point(self):
with self.assertRaises(InvalidAccessPoint):
embargo_api.message_url_path(self.course.id, "invalid")
def test_message_url_stale_cache(self):
# Retrieve the URL once, populating the cache with the list
# of restricted courses.
self._restrict_course(self.course.id)
embargo_api.message_url_path(self.course.id, 'courseware')
# Delete the restricted course entry
RestrictedCourse.objects.get(course_key=self.course.id).delete()
# Clear the message URL cache
message_cache_key = (
'embargo.message_url_path.courseware.{course_key}'
).format(course_key=self.course.id)
cache.delete(message_cache_key)
# Try again. Even though the cache results are stale,
# we should still get a valid URL.
url_path = embargo_api.message_url_path(self.course.id, 'courseware')
self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/')
def _restrict_course(self, course_key):
"""Restrict the user from accessing the course. """
country = Country.objects.create(country='us')
restricted_course = RestrictedCourse.objects.create(
course_key=course_key,
enroll_msg_key='embargo',
access_msg_key='embargo'
)
CountryAccessRule.objects.create(
restricted_course=restricted_course,
rule_type=CountryAccessRule.BLACKLIST_RULE,
country=country
)

View File

@@ -1,120 +0,0 @@
# -*- coding: utf-8 -*-
"""
Unit tests for embargo app admin forms.
"""
from django.test import TestCase
from opaque_keys.edx.locator import CourseLocator
# Explicitly import the cache from ConfigurationModel so we can reset it after each test
from config_models.models import cache
from embargo.models import IPFilter
from embargo.forms import RestrictedCourseForm, IPFilterForm
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
class RestrictedCourseFormTest(ModuleStoreTestCase):
"""Test the course form properly validates course IDs"""
def test_save_valid_data(self):
course = CourseFactory.create()
data = {
'course_key': unicode(course.id),
'enroll_msg_key': 'default',
'access_msg_key': 'default'
}
form = RestrictedCourseForm(data=data)
self.assertTrue(form.is_valid())
def test_invalid_course_key(self):
# Invalid format for the course key
form = RestrictedCourseForm(data={'course_key': 'not/valid'})
self._assert_course_field_error(form)
def test_course_not_found(self):
course_key = CourseLocator(org='test', course='test', run='test')
form = RestrictedCourseForm(data={'course_key': course_key})
self._assert_course_field_error(form)
def _assert_course_field_error(self, form):
# Validation shouldn't work
self.assertFalse(form.is_valid())
msg = 'COURSE NOT FOUND'
self.assertIn(msg, form._errors['course_key'][0]) # pylint: disable=protected-access
with self.assertRaisesRegexp(ValueError, "The RestrictedCourse could not be created because the data didn't validate."):
form.save()
class IPFilterFormTest(TestCase):
"""Test form for adding [black|white]list IP addresses"""
def tearDown(self):
super(IPFilterFormTest, self).tearDown()
# Explicitly clear ConfigurationModel's cache so tests have a clear cache
# and don't interfere with each other
cache.clear()
def test_add_valid_ips(self):
# test adding valid ip addresses
# should be able to do both ipv4 and ipv6
# spacing should not matter
form_data = {
'whitelist': '127.0.0.1, 2003:dead:beef:4dad:23:46:bb:101, 1.1.0.1/32, 1.0.0.0/24',
'blacklist': ' 18.244.1.5 , 2002:c0a8:101::42, 18.36.22.1, 1.0.0.0/16'
}
form = IPFilterForm(data=form_data)
self.assertTrue(form.is_valid())
form.save()
whitelist = IPFilter.current().whitelist_ips
blacklist = IPFilter.current().blacklist_ips
for addr in '127.0.0.1, 2003:dead:beef:4dad:23:46:bb:101'.split(','):
self.assertIn(addr.strip(), whitelist)
for addr in '18.244.1.5, 2002:c0a8:101::42, 18.36.22.1'.split(','):
self.assertIn(addr.strip(), blacklist)
# Network tests
# ips not in whitelist network
for addr in ['1.1.0.2', '1.0.1.0']:
self.assertNotIn(addr.strip(), whitelist)
# ips in whitelist network
for addr in ['1.1.0.1', '1.0.0.100']:
self.assertIn(addr.strip(), whitelist)
# ips not in blacklist network
for addr in ['2.0.0.0', '1.1.0.0']:
self.assertNotIn(addr.strip(), blacklist)
# ips in blacklist network
for addr in ['1.0.100.0', '1.0.0.10']:
self.assertIn(addr.strip(), blacklist)
# Test clearing by adding an empty list is OK too
form_data = {
'whitelist': '',
'blacklist': ''
}
form = IPFilterForm(data=form_data)
self.assertTrue(form.is_valid())
form.save()
self.assertEqual(len(IPFilter.current().whitelist), 0)
self.assertEqual(len(IPFilter.current().blacklist), 0)
def test_add_invalid_ips(self):
# test adding invalid ip addresses
form_data = {
'whitelist': '.0.0.1, :dead:beef:::, 1.0.0.0/55',
'blacklist': ' 18.244.* , 999999:c0a8:101::42, 1.0.0.0/'
}
form = IPFilterForm(data=form_data)
self.assertFalse(form.is_valid())
wmsg = "Invalid IP Address(es): [u'.0.0.1', u':dead:beef:::', u'1.0.0.0/55'] Please fix the error(s) and try again."
self.assertEquals(wmsg, form._errors['whitelist'][0]) # pylint: disable=protected-access
bmsg = "Invalid IP Address(es): [u'18.244.*', u'999999:c0a8:101::42', u'1.0.0.0/'] Please fix the error(s) and try again."
self.assertEquals(bmsg, form._errors['blacklist'][0]) # pylint: disable=protected-access
with self.assertRaisesRegexp(ValueError, "The IPFilter could not be created because the data didn't validate."):
form.save()

View File

@@ -1,207 +0,0 @@
"""
Tests for EmbargoMiddleware with CountryAccessRules
"""
import unittest
from mock import patch
from nose.plugins.attrib import attr
import ddt
from django.core.urlresolvers import reverse
from django.conf import settings
from django.core.cache import cache as django_cache
from util.testing import UrlResetMixin
from student.tests.factories import UserFactory
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from config_models.models import cache as config_cache
from embargo.models import RestrictedCourse, IPFilter
from embargo.test_utils import restrict_course
@attr(shard=3)
@ddt.ddt
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase):
"""Tests of embargo middleware country access rules.
There are detailed unit tests for the rule logic in
`test_api.py`; here, we're mainly testing the integration
with middleware
"""
USERNAME = 'fred'
PASSWORD = 'secret'
URLCONF_MODULES = ['embargo']
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def setUp(self):
super(EmbargoMiddlewareAccessTests, self).setUp()
self.user = UserFactory(username=self.USERNAME, password=self.PASSWORD)
self.course = CourseFactory.create()
self.client.login(username=self.USERNAME, password=self.PASSWORD)
self.courseware_url = reverse(
'course_root',
kwargs={'course_id': unicode(self.course.id)}
)
self.non_courseware_url = reverse('dashboard')
# Clear the cache to avoid interference between tests
django_cache.clear()
config_cache.clear()
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@ddt.data(True, False)
def test_blocked(self, disable_access_check):
with restrict_course(self.course.id, access_point='courseware', disable_access_check=disable_access_check) as redirect_url: # pylint: disable=line-too-long
response = self.client.get(self.courseware_url)
if disable_access_check:
self.assertEqual(response.status_code, 200)
else:
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def test_allowed(self):
# Add the course to the list of restricted courses
# but don't create any access rules
RestrictedCourse.objects.create(course_key=self.course.id)
# Expect that we can access courseware
response = self.client.get(self.courseware_url)
self.assertEqual(response.status_code, 200)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def test_non_courseware_url(self):
with restrict_course(self.course.id):
response = self.client.get(self.non_courseware_url)
self.assertEqual(response.status_code, 200)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@ddt.data(
# request_ip, blacklist, whitelist, is_enabled, allow_access
('173.194.123.35', ['173.194.123.35'], [], True, False),
('173.194.123.35', ['173.194.0.0/16'], [], True, False),
('173.194.123.35', ['127.0.0.0/32', '173.194.0.0/16'], [], True, False),
('173.195.10.20', ['173.194.0.0/16'], [], True, True),
('173.194.123.35', ['173.194.0.0/16'], ['173.194.0.0/16'], True, False),
('173.194.123.35', [], ['173.194.0.0/16'], True, True),
('192.178.2.3', [], ['173.194.0.0/16'], True, True),
('173.194.123.35', ['173.194.123.35'], [], False, True),
)
@ddt.unpack
def test_ip_access_rules(self, request_ip, blacklist, whitelist, is_enabled, allow_access):
# Ensure that IP blocking works for anonymous users
self.client.logout()
# Set up the IP rules
IPFilter.objects.create(
blacklist=", ".join(blacklist),
whitelist=", ".join(whitelist),
enabled=is_enabled
)
# Check that access is enforced
response = self.client.get(
"/",
HTTP_X_FORWARDED_FOR=request_ip,
REMOTE_ADDR=request_ip
)
if allow_access:
self.assertEqual(response.status_code, 200)
else:
redirect_url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'embargo'
}
)
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@ddt.data(
('courseware', 'default'),
('courseware', 'embargo'),
('enrollment', 'default'),
('enrollment', 'embargo')
)
@ddt.unpack
def test_always_allow_access_to_embargo_messages(self, access_point, msg_key):
# Blacklist an IP address
IPFilter.objects.create(
blacklist="192.168.10.20",
enabled=True
)
url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': access_point,
'message_key': msg_key
}
)
response = self.client.get(
url,
HTTP_X_FORWARDED_FOR="192.168.10.20",
REMOTE_ADDR="192.168.10.20"
)
self.assertEqual(response.status_code, 200)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def test_whitelist_ip_skips_country_access_checks(self):
# Whitelist an IP address
IPFilter.objects.create(
whitelist="192.168.10.20",
enabled=True
)
# Set up country access rules so the user would
# be restricted from the course.
with restrict_course(self.course.id):
# Make a request from the whitelisted IP address
response = self.client.get(
self.courseware_url,
HTTP_X_FORWARDED_FOR="192.168.10.20",
REMOTE_ADDR="192.168.10.20"
)
# Expect that we were still able to access the page,
# even though we would have been blocked by country
# access rules.
self.assertEqual(response.status_code, 200)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def test_always_allow_course_detail_access(self):
""" Access to the Course Structure API's course detail endpoint should always be granted. """
# Make the user staff so that it has permissions to access the views.
self.user.is_staff = True
self.user.save() # pylint: disable=no-member
# Blacklist an IP address
ip_address = "192.168.10.20"
IPFilter.objects.create(
blacklist=ip_address,
enabled=True
)
url = reverse('course_structure_api:v0:detail', kwargs={'course_id': unicode(self.course.id)})
response = self.client.get(
url,
HTTP_X_FORWARDED_FOR=ip_address,
REMOTE_ADDR=ip_address
)
self.assertEqual(response.status_code, 200)
# Test with a fully-restricted course
with restrict_course(self.course.id):
response = self.client.get(
url,
HTTP_X_FORWARDED_FOR=ip_address,
REMOTE_ADDR=ip_address
)
self.assertEqual(response.status_code, 200)

View File

@@ -1,364 +0,0 @@
"""Test of models for embargo app"""
import json
from django.test import TestCase
from django.db.utils import IntegrityError
from opaque_keys.edx.locator import CourseLocator
from embargo.models import (
EmbargoedCourse, EmbargoedState, IPFilter, RestrictedCourse,
Country, CountryAccessRule, CourseAccessRuleHistory
)
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase
class EmbargoModelsTest(CacheIsolationTestCase):
"""Test each of the 3 models in embargo.models"""
ENABLED_CACHES = ['default']
def test_course_embargo(self):
course_id = CourseLocator('abc', '123', 'doremi')
# Test that course is not authorized by default
self.assertFalse(EmbargoedCourse.is_embargoed(course_id))
# Authorize
cauth = EmbargoedCourse(course_id=course_id, embargoed=True)
cauth.save()
# Now, course should be embargoed
self.assertTrue(EmbargoedCourse.is_embargoed(course_id))
self.assertEquals(
unicode(cauth),
u"Course '{course_id}' is Embargoed".format(course_id=course_id)
)
# Unauthorize by explicitly setting email_enabled to False
cauth.embargoed = False
cauth.save()
# Test that course is now unauthorized
self.assertFalse(EmbargoedCourse.is_embargoed(course_id))
self.assertEquals(
unicode(cauth),
u"Course '{course_id}' is Not Embargoed".format(course_id=course_id)
)
def test_state_embargo(self):
# Azerbaijan and France should not be blocked
good_states = ['AZ', 'FR']
# Gah block USA and Antartica
blocked_states = ['US', 'AQ']
currently_blocked = EmbargoedState.current().embargoed_countries_list
for state in blocked_states + good_states:
self.assertNotIn(state, currently_blocked)
# Block
cauth = EmbargoedState(embargoed_countries='US, AQ')
cauth.save()
currently_blocked = EmbargoedState.current().embargoed_countries_list
for state in good_states:
self.assertNotIn(state, currently_blocked)
for state in blocked_states:
self.assertIn(state, currently_blocked)
# Change embargo - block Isle of Man too
blocked_states.append('IM')
cauth.embargoed_countries = 'US, AQ, IM'
cauth.save()
currently_blocked = EmbargoedState.current().embargoed_countries_list
for state in good_states:
self.assertNotIn(state, currently_blocked)
for state in blocked_states:
self.assertIn(state, currently_blocked)
def test_ip_blocking(self):
whitelist = '127.0.0.1'
blacklist = '18.244.51.3'
cwhitelist = IPFilter.current().whitelist_ips
self.assertNotIn(whitelist, cwhitelist)
cblacklist = IPFilter.current().blacklist_ips
self.assertNotIn(blacklist, cblacklist)
IPFilter(whitelist=whitelist, blacklist=blacklist).save()
cwhitelist = IPFilter.current().whitelist_ips
self.assertIn(whitelist, cwhitelist)
cblacklist = IPFilter.current().blacklist_ips
self.assertIn(blacklist, cblacklist)
def test_ip_network_blocking(self):
whitelist = '1.0.0.0/24'
blacklist = '1.1.0.0/16'
IPFilter(whitelist=whitelist, blacklist=blacklist).save()
cwhitelist = IPFilter.current().whitelist_ips
self.assertIn('1.0.0.100', cwhitelist)
self.assertIn('1.0.0.10', cwhitelist)
self.assertNotIn('1.0.1.0', cwhitelist)
cblacklist = IPFilter.current().blacklist_ips
self.assertIn('1.1.0.0', cblacklist)
self.assertIn('1.1.0.1', cblacklist)
self.assertIn('1.1.1.0', cblacklist)
self.assertNotIn('1.2.0.0', cblacklist)
class RestrictedCourseTest(CacheIsolationTestCase):
"""Test RestrictedCourse model. """
ENABLED_CACHES = ['default']
def test_unicode_values(self):
course_id = CourseLocator('abc', '123', 'doremi')
restricted_course = RestrictedCourse.objects.create(course_key=course_id)
self.assertEquals(
unicode(restricted_course),
unicode(course_id)
)
def test_restricted_course_cache_with_save_delete(self):
course_id = CourseLocator('abc', '123', 'doremi')
RestrictedCourse.objects.create(course_key=course_id)
# Warm the cache
with self.assertNumQueries(1):
RestrictedCourse.is_restricted_course(course_id)
RestrictedCourse.is_disabled_access_check(course_id)
# it should come from cache
with self.assertNumQueries(0):
RestrictedCourse.is_restricted_course(course_id)
RestrictedCourse.is_disabled_access_check(course_id)
self.assertFalse(RestrictedCourse.is_disabled_access_check(course_id))
# add new the course so the cache must get delete and again hit the db
new_course_id = CourseLocator('def', '123', 'doremi')
RestrictedCourse.objects.create(course_key=new_course_id, disable_access_check=True)
with self.assertNumQueries(1):
RestrictedCourse.is_restricted_course(new_course_id)
RestrictedCourse.is_disabled_access_check(new_course_id)
# it should come from cache
with self.assertNumQueries(0):
RestrictedCourse.is_restricted_course(new_course_id)
RestrictedCourse.is_disabled_access_check(new_course_id)
self.assertTrue(RestrictedCourse.is_disabled_access_check(new_course_id))
# deleting an object will delete cache also.and hit db on
# get the is_restricted course
abc = RestrictedCourse.objects.get(course_key=new_course_id)
abc.delete()
with self.assertNumQueries(1):
RestrictedCourse.is_restricted_course(new_course_id)
# it should come from cache
with self.assertNumQueries(0):
RestrictedCourse.is_restricted_course(new_course_id)
class CountryTest(TestCase):
"""Test Country model. """
def test_unicode_values(self):
country = Country.objects.create(country='NZ')
self.assertEquals(unicode(country), "New Zealand (NZ)")
class CountryAccessRuleTest(CacheIsolationTestCase):
"""Test CountryAccessRule model. """
ENABLED_CACHES = ['default']
def test_unicode_values(self):
course_id = CourseLocator('abc', '123', 'doremi')
country = Country.objects.create(country='NZ')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
access_rule = CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=CountryAccessRule.WHITELIST_RULE,
country=country
)
self.assertEquals(
unicode(access_rule),
u"Whitelist New Zealand (NZ) for {course_key}".format(course_key=course_id)
)
course_id = CourseLocator('def', '123', 'doremi')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
access_rule = CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=CountryAccessRule.BLACKLIST_RULE,
country=country
)
self.assertEquals(
unicode(access_rule),
u"Blacklist New Zealand (NZ) for {course_key}".format(course_key=course_id)
)
def test_unique_together_constraint(self):
"""
Course with specific country can be added either as whitelist or blacklist
trying to add with both types will raise error
"""
course_id = CourseLocator('abc', '123', 'doremi')
country = Country.objects.create(country='NZ')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=CountryAccessRule.WHITELIST_RULE,
country=country
)
with self.assertRaises(IntegrityError):
CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=CountryAccessRule.BLACKLIST_RULE,
country=country
)
def test_country_access_list_cache_with_save_delete(self):
course_id = CourseLocator('abc', '123', 'doremi')
country = Country.objects.create(country='NZ')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
course = CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=CountryAccessRule.WHITELIST_RULE,
country=country
)
# Warm the cache
with self.assertNumQueries(1):
CountryAccessRule.check_country_access(course_id, 'NZ')
with self.assertNumQueries(0):
CountryAccessRule.check_country_access(course_id, 'NZ')
# Deleting an object will invalidate the cache
course.delete()
with self.assertNumQueries(1):
CountryAccessRule.check_country_access(course_id, 'NZ')
class CourseAccessRuleHistoryTest(TestCase):
"""Test course access rule history. """
def setUp(self):
super(CourseAccessRuleHistoryTest, self).setUp()
self.course_key = CourseLocator('edx', 'DemoX', 'Demo_Course')
self.restricted_course = RestrictedCourse.objects.create(course_key=self.course_key)
self.countries = {
'US': Country.objects.create(country='US'),
'AU': Country.objects.create(country='AU')
}
def test_course_access_history_no_rules(self):
self._assert_history([])
self.restricted_course.delete()
self._assert_history_deleted()
def test_course_access_history_with_rules(self):
# Add one rule
us_rule = CountryAccessRule.objects.create(
restricted_course=self.restricted_course,
country=self.countries['US'],
rule_type=CountryAccessRule.WHITELIST_RULE
)
self._assert_history([('US', 'whitelist')])
# Add another rule
au_rule = CountryAccessRule.objects.create(
restricted_course=self.restricted_course,
country=self.countries['AU'],
rule_type=CountryAccessRule.BLACKLIST_RULE
)
self._assert_history([
('US', 'whitelist'),
('AU', 'blacklist')
])
# Delete the first rule
us_rule.delete()
self._assert_history([('AU', 'blacklist')])
# Delete the second rule
au_rule.delete()
self._assert_history([])
def test_course_access_history_delete_all(self):
# Create a rule
CountryAccessRule.objects.create(
restricted_course=self.restricted_course,
country=self.countries['US'],
rule_type=CountryAccessRule.WHITELIST_RULE
)
# Delete the course (and, implicitly, all the rules)
self.restricted_course.delete()
self._assert_history_deleted()
def test_course_access_history_change_message(self):
# Change the message key
self.restricted_course.enroll_msg_key = 'embargo'
self.restricted_course.access_msg_key = 'embargo'
self.restricted_course.save()
# Expect a history entry with the changed keys
self._assert_history([], enroll_msg='embargo', access_msg='embargo')
def _assert_history(self, country_rules, enroll_msg='default', access_msg='default'):
"""Check the latest history entry.
Arguments:
country_rules (list): List of rules, each of which are tuples
of the form `(country_code, rule_type)`.
Keyword Arguments:
enroll_msg (str): The expected enrollment message key.
access_msg (str): The expected access message key.
Raises:
AssertionError
"""
record = CourseAccessRuleHistory.objects.latest()
# Check that the record is for the correct course
self.assertEqual(record.course_key, self.course_key)
# Load the history entry and verify the message keys
snapshot = json.loads(record.snapshot)
self.assertEqual(snapshot['enroll_msg'], enroll_msg)
self.assertEqual(snapshot['access_msg'], access_msg)
# For each rule, check that there is an entry
# in the history record.
for (country, rule_type) in country_rules:
self.assertIn(
{
'country': country,
'rule_type': rule_type
},
snapshot['country_rules']
)
# Check that there are no duplicate entries
self.assertEqual(len(snapshot['country_rules']), len(country_rules))
def _assert_history_deleted(self):
"""Check the latest history entry for a 'DELETED' placeholder.
Raises:
AssertionError
"""
record = CourseAccessRuleHistory.objects.latest()
self.assertEqual(record.course_key, self.course_key)
self.assertEqual(record.snapshot, "DELETED")

View File

@@ -1,88 +0,0 @@
"""Tests for embargo app views. """
import unittest
from mock import patch
from django.core.urlresolvers import reverse
from django.conf import settings
import ddt
from util.testing import UrlResetMixin
from embargo import messages
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase
from openedx.core.djangoapps.theming.tests.test_util import with_comprehensive_theme
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
@ddt.ddt
class CourseAccessMessageViewTest(CacheIsolationTestCase, UrlResetMixin):
"""Tests for the courseware access message view.
These end-points serve static content.
While we *could* check the text on each page,
this will require changes to the test every time
the text on the page changes.
Instead, we load each page we expect to be available
(based on the configuration in `embargo.messages`)
and verify that we get the correct status code.
This will catch errors in the message configuration
(for example, moving a template and forgetting to
update the configuration appropriately).
"""
ENABLED_CACHES = ['default']
URLCONF_MODULES = ['embargo']
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def setUp(self):
super(CourseAccessMessageViewTest, self).setUp()
@ddt.data(*messages.ENROLL_MESSAGES.keys())
def test_enrollment_messages(self, msg_key):
self._load_page('enrollment', msg_key)
@ddt.data(*messages.COURSEWARE_MESSAGES.keys())
def test_courseware_messages(self, msg_key):
self._load_page('courseware', msg_key)
@ddt.data('enrollment', 'courseware')
def test_invalid_message_key(self, access_point):
self._load_page(access_point, 'invalid', expected_status=404)
@with_comprehensive_theme("test-theme")
@ddt.data('enrollment', 'courseware')
def test_custom_theme_override(self, access_point):
# Custom override specified for the "embargo" message
# for backwards compatibility with previous versions
# of the embargo app.
url = reverse('embargo_blocked_message', kwargs={
'access_point': access_point,
'message_key': "embargo"
})
response = self.client.get(url)
self.assertContains(
response,
"This is a test template to test embargo message override for theming."
)
def _load_page(self, access_point, message_key, expected_status=200):
"""Load the message page and check the status code. """
url = reverse('embargo_blocked_message', kwargs={
'access_point': access_point,
'message_key': message_key
})
response = self.client.get(url)
self.assertEqual(
response.status_code, expected_status,
msg=(
u"Unexpected status code when loading '{url}': "
u"expected {expected} but got {actual}"
).format(
url=url,
expected=expected_status,
actual=response.status_code
)
)

View File

@@ -1,15 +0,0 @@
"""URLs served by the embargo app. """
from django.conf.urls import patterns, url
from embargo.views import CourseAccessMessageView
urlpatterns = patterns(
'embargo.views',
url(
r'^blocked-message/(?P<access_point>enrollment|courseware)/(?P<message_key>.+)/$',
CourseAccessMessageView.as_view(),
name='embargo_blocked_message',
),
)

View File

@@ -1,67 +0,0 @@
"""Views served by the embargo app. """
from django.http import Http404
from django.views.generic.base import View
from edxmako.shortcuts import render_to_response
from embargo import messages
class CourseAccessMessageView(View):
"""Show a message explaining that the user was blocked from a course. """
ENROLLMENT_ACCESS_POINT = 'enrollment'
COURSEWARE_ACCESS_POINT = 'courseware'
def get(self, request, access_point=None, message_key=None):
"""Show a message explaining that the user was blocked.
Arguments:
request (HttpRequest)
Keyword Arguments:
access_point (str): Either 'enrollment' or 'courseware',
indicating how the user is trying to access the restricted
content.
message_key (str): An identifier for which message to show.
See `embargo.messages` for more information.
Returns:
HttpResponse
Raises:
Http404: If no message is configured for the specified message key.
"""
blocked_message = self._message(access_point, message_key)
if blocked_message is None:
raise Http404
return render_to_response(blocked_message.template, {})
def _message(self, access_point, message_key):
"""Retrieve message information.
Arguments:
access_point (str): Either 'enrollment' or 'courseware'
message_key (str): The identifier for which message to show.
Returns:
embargo.messages.BlockedMessage or None
"""
message_dict = dict()
# The access point determines which set of messages to use.
# This allows us to show different messages to students who
# are enrolling in a course than we show to students
# who are enrolled and accessing courseware.
if access_point == self.ENROLLMENT_ACCESS_POINT:
message_dict = messages.ENROLL_MESSAGES
elif access_point == self.COURSEWARE_ACCESS_POINT:
message_dict = messages.COURSEWARE_MESSAGES
return message_dict.get(message_key)

View File

@@ -22,7 +22,6 @@ from django.test.utils import override_settings
import pytz
from course_modes.models import CourseMode
from embargo.models import CountryAccessRule, Country, RestrictedCourse
from enrollment.views import EnrollmentUserThrottle
from util.models import RateLimitConfiguration
from util.testing import UrlResetMixin
@@ -34,7 +33,8 @@ from openedx.core.lib.django_test_client_utils import get_absolute_url
from student.models import CourseEnrollment
from student.roles import CourseStaffRole
from student.tests.factories import AdminFactory, CourseModeFactory, UserFactory
from embargo.test_utils import restrict_course
from openedx.core.djangoapps.embargo.models import CountryAccessRule, Country, RestrictedCourse
from openedx.core.djangoapps.embargo.test_utils import restrict_course
class EnrollmentTestMixin(object):
@@ -925,7 +925,7 @@ class EnrollmentEmbargoTest(EnrollmentTestMixin, UrlResetMixin, ModuleStoreTestC
EMAIL = "bob@example.com"
PASSWORD = "edx"
URLCONF_MODULES = ['embargo']
URLCONF_MODULES = ['openedx.core.djangoapps.embargo']
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def setUp(self):

View File

@@ -17,7 +17,7 @@ from rest_framework.response import Response
from rest_framework.throttling import UserRateThrottle
from rest_framework.views import APIView
from opaque_keys.edx.keys import CourseKey
from embargo import api as embargo_api
from openedx.core.djangoapps.embargo import api as embargo_api
from openedx.core.djangoapps.cors_csrf.authentication import SessionAuthenticationCrossDomainCsrf
from openedx.core.djangoapps.cors_csrf.decorators import ensure_csrf_cookie_cross_domain
from openedx.core.lib.api.authentication import (

View File

@@ -12,7 +12,7 @@ from course_modes.models import CourseMode
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from util.testing import UrlResetMixin
from embargo.test_utils import restrict_course
from openedx.core.djangoapps.embargo.test_utils import restrict_course
from student.tests.factories import UserFactory, CourseModeFactory
from student.models import CourseEnrollment, CourseFullError
from student.roles import (
@@ -32,7 +32,7 @@ class EnrollmentTest(UrlResetMixin, SharedModuleStoreTestCase):
USERNAME = "Bob"
EMAIL = "bob@example.com"
PASSWORD = "edx"
URLCONF_MODULES = ['embargo']
URLCONF_MODULES = ['openedx.core.djangoapps.embargo']
@classmethod
def setUpClass(cls):

View File

@@ -111,7 +111,7 @@ from student.cookies import set_logged_in_cookies, delete_logged_in_cookies
from student.models import anonymous_id_for_user, UserAttribute, EnrollStatusChange
from shoppingcart.models import DonationConfiguration, CourseRegistrationCode
from embargo import api as embargo_api
from openedx.core.djangoapps.embargo import api as embargo_api
import analytics
from eventtracking import tracker