Merge pull request #6943 from edx/will/country-access-api-changes
Finish Country Access (Part 1 of 3)
This commit is contained in:
@@ -10,12 +10,114 @@ import pygeoip
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.conf import settings
|
||||
|
||||
from embargo.models import CountryAccessRule, RestrictedCourse
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_user_country_from_profile(user):
|
||||
def redirect_if_blocked(course_key, access_point='enrollment', **kwargs):
|
||||
"""Redirect if the user does not have access to the course.
|
||||
|
||||
Arguments:
|
||||
course_key (CourseKey): Location of the course the user is trying to access.
|
||||
|
||||
Keyword Arguments:
|
||||
Same as `check_course_access` and `message_url_path`
|
||||
|
||||
"""
|
||||
if settings.FEATURES.get('ENABLE_COUNTRY_ACCESS'):
|
||||
is_blocked = not check_course_access(course_key, **kwargs)
|
||||
if is_blocked:
|
||||
return message_url_path(course_key, access_point)
|
||||
|
||||
|
||||
def check_course_access(course_key, user=None, ip_address=None, url=None):
|
||||
"""
|
||||
Check is the user with this ip_address has access to the given course
|
||||
|
||||
Arguments:
|
||||
course_key (CourseKey): Location of the course the user is trying to access.
|
||||
|
||||
Keyword Arguments:
|
||||
user (User): The user making the request. Can be None, in which case
|
||||
the user's profile country will not be checked.
|
||||
ip_address (str): The IP address of the request.
|
||||
url (str): The URL the user is trying to access. Used in
|
||||
log messages.
|
||||
|
||||
Returns:
|
||||
Boolean: True if the user has access to the course; False otherwise
|
||||
|
||||
"""
|
||||
# First, check whether there are any restrictions on the course.
|
||||
# If not, then we do not need to do any further checks
|
||||
course_is_restricted = RestrictedCourse.is_restricted_course(course_key)
|
||||
|
||||
if not course_is_restricted:
|
||||
return True
|
||||
|
||||
if ip_address is not None:
|
||||
# Retrieve the country code from the IP address
|
||||
# and check it against the allowed countries list for a course
|
||||
user_country_from_ip = _country_code_from_ip(ip_address)
|
||||
|
||||
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
|
||||
log.info(
|
||||
(
|
||||
u"Blocking user %s from accessing course %s at %s "
|
||||
u"because the user's IP address %s appears to be "
|
||||
u"located in %s."
|
||||
),
|
||||
getattr(user, 'id', '<Not Authenticated>'),
|
||||
course_key,
|
||||
url,
|
||||
ip_address,
|
||||
user_country_from_ip
|
||||
)
|
||||
return False
|
||||
|
||||
if user is not None:
|
||||
# Retrieve the country code from the user's profile
|
||||
# and check it against the allowed countries list for a course.
|
||||
user_country_from_profile = _get_user_country_from_profile(user)
|
||||
|
||||
if not CountryAccessRule.check_country_access(course_key, user_country_from_profile):
|
||||
log.info(
|
||||
(
|
||||
u"Blocking user %s from accessing course %s at %s "
|
||||
u"because the user's profile country is %s."
|
||||
),
|
||||
user.id, course_key, url, user_country_from_profile
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def message_url_path(course_key, access_point):
|
||||
"""Determine the URL path for the message explaining why the user was blocked.
|
||||
|
||||
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
|
||||
module for more details.
|
||||
|
||||
Arguments:
|
||||
course_key (CourseKey): The location of the course.
|
||||
access_point (str): How the user was trying to access the course.
|
||||
Can be either "enrollment" or "courseware".
|
||||
|
||||
Returns:
|
||||
unicode: The URL path to a page explaining why the user was blocked.
|
||||
|
||||
Raises:
|
||||
InvalidAccessPoint: Raised if access_point is not a supported value.
|
||||
|
||||
"""
|
||||
return RestrictedCourse.message_url_path(course_key, access_point)
|
||||
|
||||
|
||||
def _get_user_country_from_profile(user):
|
||||
"""
|
||||
Check whether the user is embargoed based on the country code in the user's profile.
|
||||
|
||||
@@ -55,40 +157,3 @@ def _country_code_from_ip(ip_addr):
|
||||
return pygeoip.GeoIP(settings.GEOIPV6_PATH).country_code_by_addr(ip_addr)
|
||||
else:
|
||||
return pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr)
|
||||
|
||||
|
||||
def check_course_access(user, ip_address, course_key):
|
||||
"""
|
||||
Check is the user with this ip_address has access to the given course
|
||||
|
||||
Params:
|
||||
user (User): Currently logged in user object
|
||||
ip_address (str): The ip_address of user
|
||||
course_key (CourseLocator): CourseLocator object the user is trying to access
|
||||
|
||||
Returns:
|
||||
The return will be True if the user has access on the course.
|
||||
if any constraints fails it will return the False
|
||||
"""
|
||||
course_is_restricted = RestrictedCourse.is_restricted_course(course_key)
|
||||
# If they're trying to access a course that cares about embargoes
|
||||
|
||||
# If course is not restricted then return immediately return True
|
||||
# no need for further checking
|
||||
if not course_is_restricted:
|
||||
return True
|
||||
|
||||
# Retrieve the country code from the IP address
|
||||
# and check it against the allowed countries list for a course
|
||||
user_country_from_ip = _country_code_from_ip(ip_address)
|
||||
# if user country has access to course return True
|
||||
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
|
||||
return False
|
||||
|
||||
# Retrieve the country code from the user profile.
|
||||
user_country_from_profile = get_user_country_from_profile(user)
|
||||
# if profile country has access return True
|
||||
if not CountryAccessRule.check_country_access(course_key, user_country_from_profile):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
11
common/djangoapps/embargo/exceptions.py
Normal file
11
common/djangoapps/embargo/exceptions.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Exceptions for the embargo app."""
|
||||
|
||||
|
||||
class InvalidAccessPoint(Exception):
|
||||
"""The requested access point is not supported. """
|
||||
|
||||
def __init__(self, access_point, *args, **kwargs):
|
||||
msg = (
|
||||
u"Access point '{access_point}' should be either 'enrollment' or 'courseware'"
|
||||
).format(access_point=access_point)
|
||||
super(InvalidAccessPoint, self).__init__(msg, *args, **kwargs)
|
||||
@@ -32,11 +32,13 @@ EMBARGO_SITE_REDIRECT_URL = 'https://www.edx.org/'
|
||||
"""
|
||||
from functools import partial
|
||||
import logging
|
||||
import re
|
||||
import pygeoip
|
||||
from lazy import lazy
|
||||
|
||||
from django.core.exceptions import MiddlewareNotUsed
|
||||
from django.core.cache import cache
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.conf import settings
|
||||
from django.shortcuts import redirect
|
||||
from django.http import HttpResponseRedirect, HttpResponseForbidden
|
||||
@@ -45,7 +47,7 @@ from util.request import course_id_from_url
|
||||
|
||||
from student.models import unique_id_for_user
|
||||
from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter
|
||||
from embargo.api import check_course_access
|
||||
from embargo import api as embargo_api
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -58,6 +60,17 @@ class EmbargoMiddleware(object):
|
||||
optionally ``IPFilter`` rows in the database, using the django admin site.
|
||||
"""
|
||||
|
||||
ALLOW_URL_PATTERNS = [
|
||||
# Don't block the embargo message pages; otherwise we'd
|
||||
# end up in an infinite redirect loop.
|
||||
re.compile(r'^/embargo/blocked-message/'),
|
||||
|
||||
# Don't block the Django admin pages. Otherwise, we might
|
||||
# accidentally lock ourselves out of Django admin
|
||||
# during testing.
|
||||
re.compile(r'^/admin/'),
|
||||
]
|
||||
|
||||
# Reasons a user might be blocked.
|
||||
# These are used to generate info messages in the logs.
|
||||
REASONS = {
|
||||
@@ -71,20 +84,81 @@ class EmbargoMiddleware(object):
|
||||
|
||||
def __init__(self):
|
||||
self.site_enabled = settings.FEATURES.get('SITE_EMBARGOED', False)
|
||||
# If embargoing is turned off, make this middleware do nothing
|
||||
if not settings.FEATURES.get('EMBARGO', False) and not self.site_enabled:
|
||||
raise MiddlewareNotUsed()
|
||||
self.enable_country_access = settings.FEATURES.get('ENABLE_COUNTRY_ACCESS', False)
|
||||
|
||||
# If embargoing is turned off, make this middleware do nothing
|
||||
disable_middleware = not (
|
||||
settings.FEATURES.get('EMBARGO') or
|
||||
self.site_enabled or
|
||||
self.enable_country_access
|
||||
)
|
||||
if disable_middleware:
|
||||
raise MiddlewareNotUsed()
|
||||
|
||||
def process_request(self, request):
|
||||
"""Block requests based on embargo rules.
|
||||
|
||||
In the new ENABLE_COUNTRY_ACCESS implmentation,
|
||||
this will perform the following checks:
|
||||
|
||||
1) If the user's IP address is blacklisted, block.
|
||||
2) If the user's IP address is whitelisted, allow.
|
||||
3) If the user's country (inferred from their IP address) is blocked for
|
||||
a courseware page, block.
|
||||
4) If the user's country (retrieved from the user's profile) is blocked
|
||||
for a courseware page, block.
|
||||
5) Allow access.
|
||||
|
||||
"""
|
||||
Processes embargo requests.
|
||||
"""
|
||||
# If the feature flag is set, use the new "country access" implementation.
|
||||
# This is a more flexible implementation of the embargo feature that allows
|
||||
# per-course country access rules.
|
||||
if self.enable_country_access:
|
||||
if self.country_access_rules(request):
|
||||
|
||||
# Never block certain patterns by IP address
|
||||
for pattern in self.ALLOW_URL_PATTERNS:
|
||||
if pattern.match(request.path) is not None:
|
||||
return None
|
||||
|
||||
ip_address = get_ip(request)
|
||||
ip_filter = IPFilter.current()
|
||||
|
||||
if ip_filter.enabled and ip_address in ip_filter.blacklist_ips:
|
||||
log.info(
|
||||
(
|
||||
u"User %s was blocked from accessing %s "
|
||||
u"because IP address %s is blacklisted."
|
||||
), request.user.id, request.path, ip_address
|
||||
)
|
||||
|
||||
# If the IP is blacklisted, reject.
|
||||
# This applies to any request, not just courseware URLs.
|
||||
ip_blacklist_url = reverse(
|
||||
'embargo_blocked_message',
|
||||
kwargs={
|
||||
'access_point': 'courseware',
|
||||
'message_key': 'embargo'
|
||||
}
|
||||
)
|
||||
return redirect(ip_blacklist_url)
|
||||
|
||||
elif ip_filter.enabled and ip_address in ip_filter.whitelist_ips:
|
||||
log.info(
|
||||
(
|
||||
u"User %s was allowed access to %s because "
|
||||
u"IP address %s is whitelisted."
|
||||
),
|
||||
request.user.id, request.path, ip_address
|
||||
)
|
||||
|
||||
# If the IP is whitelisted, then allow access,
|
||||
# skipping later checks.
|
||||
return None
|
||||
|
||||
else:
|
||||
return self._embargo_redirect_response
|
||||
# Otherwise, perform the country access checks.
|
||||
# This applies only to courseware URLs.
|
||||
return self.country_access_rules(request.user, ip_address, request.path)
|
||||
|
||||
url = request.path
|
||||
course_id = course_id_from_url(url)
|
||||
@@ -306,19 +380,30 @@ class EmbargoMiddleware(object):
|
||||
|
||||
return _inner
|
||||
|
||||
def country_access_rules(self, request):
|
||||
def country_access_rules(self, user, ip_address, url_path):
|
||||
"""
|
||||
check the country access rules for a given course.
|
||||
if course id is invalid return True
|
||||
Check the country access rules for a given course.
|
||||
Applies only to courseware URLs.
|
||||
|
||||
Args:
|
||||
request
|
||||
user (User): The user making the current request.
|
||||
ip_address (str): The IP address from which the request originated.
|
||||
url_path (str): The request path.
|
||||
|
||||
Return:
|
||||
boolean: True if the user has access else false.
|
||||
Returns:
|
||||
HttpResponse or None
|
||||
|
||||
"""
|
||||
url = request.path
|
||||
course_id = course_id_from_url(url)
|
||||
if course_id is None:
|
||||
return True
|
||||
return check_course_access(request.user, get_ip(request), course_id)
|
||||
course_id = course_id_from_url(url_path)
|
||||
|
||||
if course_id:
|
||||
redirect_url = embargo_api.redirect_if_blocked(
|
||||
course_id,
|
||||
user=user,
|
||||
ip_address=ip_address,
|
||||
url=url_path,
|
||||
access_point='courseware'
|
||||
)
|
||||
|
||||
if redirect_url:
|
||||
return redirect(redirect_url)
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import datetime
|
||||
from south.db import db
|
||||
from south.v2 import SchemaMigration
|
||||
from django.db import models
|
||||
|
||||
|
||||
class Migration(SchemaMigration):
|
||||
|
||||
def forwards(self, orm):
|
||||
# Adding model 'CourseAccessRuleHistory'
|
||||
db.create_table('embargo_courseaccessrulehistory', (
|
||||
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
|
||||
('timestamp', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, db_index=True, blank=True)),
|
||||
('course_key', self.gf('xmodule_django.models.CourseKeyField')(max_length=255, db_index=True)),
|
||||
('snapshot', self.gf('django.db.models.fields.TextField')(null=True, blank=True)),
|
||||
))
|
||||
db.send_create_signal('embargo', ['CourseAccessRuleHistory'])
|
||||
|
||||
|
||||
def backwards(self, orm):
|
||||
# Deleting model 'CourseAccessRuleHistory'
|
||||
db.delete_table('embargo_courseaccessrulehistory')
|
||||
|
||||
|
||||
models = {
|
||||
'auth.group': {
|
||||
'Meta': {'object_name': 'Group'},
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
|
||||
'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
|
||||
},
|
||||
'auth.permission': {
|
||||
'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'},
|
||||
'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
|
||||
'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
|
||||
},
|
||||
'auth.user': {
|
||||
'Meta': {'object_name': 'User'},
|
||||
'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
|
||||
'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
|
||||
'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
|
||||
'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
|
||||
'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
|
||||
'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
|
||||
'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
|
||||
'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
|
||||
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
|
||||
},
|
||||
'contenttypes.contenttype': {
|
||||
'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
|
||||
'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
|
||||
},
|
||||
'embargo.country': {
|
||||
'Meta': {'ordering': "['country']", 'object_name': 'Country'},
|
||||
'country': ('django_countries.fields.CountryField', [], {'unique': 'True', 'max_length': '2', 'db_index': 'True'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
|
||||
},
|
||||
'embargo.countryaccessrule': {
|
||||
'Meta': {'unique_together': "(('restricted_course', 'country'),)", 'object_name': 'CountryAccessRule'},
|
||||
'country': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['embargo.Country']"}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'restricted_course': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['embargo.RestrictedCourse']"}),
|
||||
'rule_type': ('django.db.models.fields.CharField', [], {'default': "'blacklist'", 'max_length': '255'})
|
||||
},
|
||||
'embargo.courseaccessrulehistory': {
|
||||
'Meta': {'object_name': 'CourseAccessRuleHistory'},
|
||||
'course_key': ('xmodule_django.models.CourseKeyField', [], {'max_length': '255', 'db_index': 'True'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'snapshot': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
|
||||
'timestamp': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'db_index': 'True', 'blank': 'True'})
|
||||
},
|
||||
'embargo.embargoedcourse': {
|
||||
'Meta': {'object_name': 'EmbargoedCourse'},
|
||||
'course_id': ('xmodule_django.models.CourseKeyField', [], {'unique': 'True', 'max_length': '255', 'db_index': 'True'}),
|
||||
'embargoed': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
|
||||
},
|
||||
'embargo.embargoedstate': {
|
||||
'Meta': {'object_name': 'EmbargoedState'},
|
||||
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
|
||||
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
|
||||
'embargoed_countries': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
|
||||
},
|
||||
'embargo.ipfilter': {
|
||||
'Meta': {'object_name': 'IPFilter'},
|
||||
'blacklist': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
|
||||
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
|
||||
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'whitelist': ('django.db.models.fields.TextField', [], {'blank': 'True'})
|
||||
},
|
||||
'embargo.restrictedcourse': {
|
||||
'Meta': {'object_name': 'RestrictedCourse'},
|
||||
'access_msg_key': ('django.db.models.fields.CharField', [], {'default': "'default'", 'max_length': '255'}),
|
||||
'course_key': ('xmodule_django.models.CourseKeyField', [], {'unique': 'True', 'max_length': '255', 'db_index': 'True'}),
|
||||
'enroll_msg_key': ('django.db.models.fields.CharField', [], {'default': "'default'", 'max_length': '255'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
|
||||
}
|
||||
}
|
||||
|
||||
complete_apps = ['embargo']
|
||||
@@ -12,10 +12,14 @@ file and check it in at the same time as your model changes. To do that,
|
||||
"""
|
||||
|
||||
import ipaddr
|
||||
import json
|
||||
import logging
|
||||
|
||||
from django.db import models
|
||||
from django.utils.translation import ugettext as _, ugettext_lazy
|
||||
from django.core.cache import cache
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.db.models.signals import post_save, post_delete
|
||||
|
||||
from django_countries.fields import CountryField
|
||||
from django_countries import countries
|
||||
@@ -23,11 +27,11 @@ from django_countries import countries
|
||||
from config_models.models import ConfigurationModel
|
||||
from xmodule_django.models import CourseKeyField, NoneToEmptyManager
|
||||
|
||||
from embargo.exceptions import InvalidAccessPoint
|
||||
from embargo.messages import ENROLL_MESSAGES, COURSEWARE_MESSAGES
|
||||
|
||||
|
||||
WHITE_LIST = 'whitelist'
|
||||
BLACK_LIST = 'blacklist'
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EmbargoedCourse(models.Model):
|
||||
@@ -100,6 +104,9 @@ class RestrictedCourse(models.Model):
|
||||
These displayed on pages served by the embargo app.
|
||||
|
||||
"""
|
||||
COURSE_LIST_CACHE_KEY = 'embargo.restricted_courses'
|
||||
MESSAGE_URL_CACHE_KEY = 'embargo.message_url_path.{access_point}.{course_key}'
|
||||
|
||||
ENROLL_MSG_KEY_CHOICES = tuple([
|
||||
(msg_key, msg.description)
|
||||
for msg_key, msg in ENROLL_MESSAGES.iteritems()
|
||||
@@ -129,11 +136,6 @@ class RestrictedCourse(models.Model):
|
||||
help_text=ugettext_lazy(u"The message to show when a user is blocked from accessing a course.")
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def cache_key_name(cls):
|
||||
"""Return the name of the key to use to cache the current restricted course list"""
|
||||
return 'embargo/RestrictedCourse/courses'
|
||||
|
||||
@classmethod
|
||||
def is_restricted_course(cls, course_id):
|
||||
"""
|
||||
@@ -153,25 +155,172 @@ class RestrictedCourse(models.Model):
|
||||
"""
|
||||
Cache all restricted courses and returns the list of course_keys that are restricted
|
||||
"""
|
||||
restricted_courses = cache.get(cls.cache_key_name())
|
||||
restricted_courses = cache.get(cls.COURSE_LIST_CACHE_KEY)
|
||||
if not restricted_courses:
|
||||
restricted_courses = list(RestrictedCourse.objects.values_list('course_key', flat=True))
|
||||
cache.set(cls.cache_key_name(), restricted_courses)
|
||||
cache.set(cls.COURSE_LIST_CACHE_KEY, restricted_courses)
|
||||
return restricted_courses
|
||||
|
||||
def snapshot(self):
|
||||
"""Return a snapshot of all access rules for this course.
|
||||
|
||||
This is useful for recording an audit trail of rule changes.
|
||||
The returned dictionary is JSON-serializable.
|
||||
|
||||
Returns:
|
||||
dict
|
||||
|
||||
Example Usage:
|
||||
>>> restricted_course.snapshot()
|
||||
{
|
||||
'enroll_msg': 'default',
|
||||
'access_msg': 'default',
|
||||
'country_rules': [
|
||||
{'country': 'IR', 'rule_type': 'blacklist'},
|
||||
{'country': 'CU', 'rule_type': 'blacklist'}
|
||||
]
|
||||
}
|
||||
|
||||
"""
|
||||
country_rules_for_course = (
|
||||
CountryAccessRule.objects
|
||||
).select_related('restricted_country').filter(restricted_course=self)
|
||||
|
||||
return {
|
||||
'enroll_msg': self.enroll_msg_key,
|
||||
'access_msg': self.access_msg_key,
|
||||
'country_rules': [
|
||||
{
|
||||
'country': unicode(rule.country.country),
|
||||
'rule_type': rule.rule_type
|
||||
}
|
||||
for rule in country_rules_for_course
|
||||
]
|
||||
}
|
||||
|
||||
def message_key_for_access_point(self, access_point):
|
||||
"""Determine which message to show the user.
|
||||
|
||||
The message can be configured per-course and depends
|
||||
on how the user is trying to access the course
|
||||
(trying to enroll or accessing courseware).
|
||||
|
||||
Arguments:
|
||||
access_point (str): Either "courseware" or "enrollment"
|
||||
|
||||
Returns:
|
||||
str: The message key. If the access point is not valid,
|
||||
returns None instead.
|
||||
|
||||
"""
|
||||
if access_point == 'enrollment':
|
||||
return self.enroll_msg_key
|
||||
elif access_point == 'courseware':
|
||||
return self.access_msg_key
|
||||
|
||||
def __unicode__(self):
|
||||
return unicode(self.course_key)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""
|
||||
Clear the cached value when saving a RestrictedCourse entry
|
||||
"""
|
||||
super(RestrictedCourse, self).save(*args, **kwargs)
|
||||
cache.delete(self.cache_key_name())
|
||||
@classmethod
|
||||
def message_url_path(cls, course_key, access_point):
|
||||
"""Determine the URL path for the message explaining why the user was blocked.
|
||||
|
||||
def delete(self, using=None):
|
||||
super(RestrictedCourse, self).delete()
|
||||
cache.delete(self.cache_key_name())
|
||||
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
|
||||
module for more details.
|
||||
|
||||
Arguments:
|
||||
course_key (CourseKey): The location of the course.
|
||||
access_point (str): How the user was trying to access the course.
|
||||
Can be either "enrollment" or "courseware".
|
||||
|
||||
Returns:
|
||||
unicode: The URL path to a page explaining why the user was blocked.
|
||||
|
||||
Raises:
|
||||
InvalidAccessPoint: Raised if access_point is not a supported value.
|
||||
|
||||
"""
|
||||
if access_point not in ['enrollment', 'courseware']:
|
||||
raise InvalidAccessPoint(access_point)
|
||||
|
||||
# First check the cache to see if we already have
|
||||
# a URL for this (course_key, access_point) tuple
|
||||
cache_key = cls.MESSAGE_URL_CACHE_KEY.format(
|
||||
access_point=access_point,
|
||||
course_key=course_key
|
||||
)
|
||||
url = cache.get(cache_key)
|
||||
|
||||
# If there's a cache miss, we'll need to retrieve the message
|
||||
# configuration from the database
|
||||
if url is None:
|
||||
url = cls._get_message_url_path_from_db(course_key, access_point)
|
||||
cache.set(cache_key, url)
|
||||
|
||||
return url
|
||||
|
||||
@classmethod
|
||||
def _get_message_url_path_from_db(cls, course_key, access_point):
|
||||
"""Retrieve the "blocked" message from the database.
|
||||
|
||||
Arguments:
|
||||
course_key (CourseKey): The location of the course.
|
||||
access_point (str): How the user was trying to access the course.
|
||||
Can be either "enrollment" or "courseware".
|
||||
|
||||
Returns:
|
||||
unicode: The URL path to a page explaining why the user was blocked.
|
||||
|
||||
"""
|
||||
# Fallback in case we're not able to find a message path
|
||||
# Presumably if the caller is requesting a URL, the caller
|
||||
# has already determined that the user should be blocked.
|
||||
# We use generic messaging unless we find something more specific,
|
||||
# but *always* return a valid URL path.
|
||||
default_path = reverse(
|
||||
'embargo_blocked_message',
|
||||
kwargs={
|
||||
'access_point': 'courseware',
|
||||
'message_key': 'default'
|
||||
}
|
||||
)
|
||||
|
||||
# First check whether this is a restricted course.
|
||||
# The list of restricted courses is cached, so this does
|
||||
# not require a database query.
|
||||
if not cls.is_restricted_course(course_key):
|
||||
return default_path
|
||||
|
||||
# Retrieve the message key from the restricted course
|
||||
# for this access point, then determine the URL.
|
||||
try:
|
||||
course = cls.objects.get(course_key=course_key)
|
||||
msg_key = course.message_key_for_access_point(access_point)
|
||||
return reverse(
|
||||
'embargo_blocked_message',
|
||||
kwargs={
|
||||
'access_point': access_point,
|
||||
'message_key': msg_key
|
||||
}
|
||||
)
|
||||
except cls.DoesNotExist:
|
||||
# This occurs only if there's a race condition
|
||||
# between cache invalidation and database access.
|
||||
return default_path
|
||||
|
||||
@classmethod
|
||||
def invalidate_cache_for_course(cls, course_key):
|
||||
"""Invalidate the caches for the restricted course. """
|
||||
cache.delete(cls.COURSE_LIST_CACHE_KEY)
|
||||
log.info("Invalidated cached list of restricted courses.")
|
||||
|
||||
for access_point in ['enrollment', 'courseware']:
|
||||
msg_cache_key = cls.MESSAGE_URL_CACHE_KEY.format(
|
||||
access_point=access_point,
|
||||
course_key=course_key
|
||||
)
|
||||
cache.delete(msg_cache_key)
|
||||
log.info("Invalidated cached messaging URLs ")
|
||||
|
||||
|
||||
class Country(models.Model):
|
||||
@@ -216,15 +365,18 @@ class CountryAccessRule(models.Model):
|
||||
|
||||
"""
|
||||
|
||||
WHITELIST_RULE = 'whitelist'
|
||||
BLACKLIST_RULE = 'blacklist'
|
||||
|
||||
RULE_TYPE_CHOICES = (
|
||||
(WHITE_LIST, 'Whitelist (allow only these countries)'),
|
||||
(BLACK_LIST, 'Blacklist (block these countries)'),
|
||||
(WHITELIST_RULE, 'Whitelist (allow only these countries)'),
|
||||
(BLACKLIST_RULE, 'Blacklist (block these countries)'),
|
||||
)
|
||||
|
||||
rule_type = models.CharField(
|
||||
max_length=255,
|
||||
choices=RULE_TYPE_CHOICES,
|
||||
default=BLACK_LIST,
|
||||
default=BLACKLIST_RULE,
|
||||
help_text=ugettext_lazy(
|
||||
u"Whether to include or exclude the given course. "
|
||||
u"If whitelist countries are specified, then ONLY users from whitelisted countries "
|
||||
@@ -243,15 +395,7 @@ class CountryAccessRule(models.Model):
|
||||
help_text=ugettext_lazy(u"The country to which this rule applies.")
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def cache_key_for_consolidated_countries(cls, course_id):
|
||||
"""
|
||||
Args:
|
||||
course_id (str): course_id to look for
|
||||
Returns:
|
||||
Consolidated list of accessible countries for given course
|
||||
"""
|
||||
return "{}/allowed/countries".format(course_id)
|
||||
CACHE_KEY = u"embargo.allowed_countries.{course_key}"
|
||||
|
||||
@classmethod
|
||||
def check_country_access(cls, course_id, country):
|
||||
@@ -267,10 +411,11 @@ class CountryAccessRule(models.Model):
|
||||
True if country found in allowed country
|
||||
otherwise check given country exists in list
|
||||
"""
|
||||
allowed_countries = cache.get(cls.cache_key_for_consolidated_countries(course_id))
|
||||
cache_key = cls.CACHE_KEY.format(course_key=course_id)
|
||||
allowed_countries = cache.get(cache_key)
|
||||
if not allowed_countries:
|
||||
allowed_countries = cls._get_country_access_list(course_id)
|
||||
cache.set(cls.cache_key_for_consolidated_countries(course_id), allowed_countries)
|
||||
cache.set(cache_key, allowed_countries)
|
||||
|
||||
return country == '' or country in allowed_countries
|
||||
|
||||
@@ -298,9 +443,9 @@ class CountryAccessRule(models.Model):
|
||||
|
||||
# Filter the rules into a whitelist and blacklist in one pass
|
||||
for rule in rules_for_course:
|
||||
if rule.rule_type == 'whitelist':
|
||||
if rule.rule_type == cls.WHITELIST_RULE:
|
||||
whitelist_countries.add(rule.country.country.code)
|
||||
elif rule.rule_type == 'blacklist':
|
||||
elif rule.rule_type == cls.BLACKLIST_RULE:
|
||||
blacklist_countries.add(rule.country.country.code)
|
||||
|
||||
# If there are no whitelist countries, default to all countries
|
||||
@@ -312,30 +457,23 @@ class CountryAccessRule(models.Model):
|
||||
return list(whitelist_countries - blacklist_countries)
|
||||
|
||||
def __unicode__(self):
|
||||
if self.rule_type == WHITE_LIST:
|
||||
if self.rule_type == self.WHITELIST_RULE:
|
||||
return _(u"Whitelist {country} for {course}").format(
|
||||
course=unicode(self.restricted_course.course_key),
|
||||
country=unicode(self.country),
|
||||
)
|
||||
elif self.rule_type == BLACK_LIST:
|
||||
elif self.rule_type == self.BLACKLIST_RULE:
|
||||
return _(u"Blacklist {country} for {course}").format(
|
||||
course=unicode(self.restricted_course.course_key),
|
||||
country=unicode(self.country),
|
||||
)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""
|
||||
Clear the cached value when saving a entry
|
||||
"""
|
||||
super(CountryAccessRule, self).save(*args, **kwargs)
|
||||
cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key)))
|
||||
|
||||
def delete(self, using=None):
|
||||
"""
|
||||
clear the cached value when saving a entry
|
||||
"""
|
||||
super(CountryAccessRule, self).delete()
|
||||
cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key)))
|
||||
@classmethod
|
||||
def invalidate_cache_for_course(cls, course_key):
|
||||
"""Invalidate the cache. """
|
||||
cache_key = cls.CACHE_KEY.format(course_key=course_key)
|
||||
cache.delete(cache_key)
|
||||
log.info("Invalidated country access list for course %s", course_key)
|
||||
|
||||
class Meta:
|
||||
"""a course can be added with either black or white list. """
|
||||
@@ -347,6 +485,132 @@ class CountryAccessRule(models.Model):
|
||||
)
|
||||
|
||||
|
||||
def invalidate_country_rule_cache(sender, instance, **kwargs): # pylint: disable=unused-argument
|
||||
"""Invalidate cached rule information on changes to the rule models.
|
||||
|
||||
We need to handle this in a Django receiver, because Django admin
|
||||
doesn't always call the model's `delete()` method directly during
|
||||
a bulk delete operation.
|
||||
|
||||
Arguments:
|
||||
sender: Not used, but required by Django receivers.
|
||||
instance (RestrictedCourse or CountryAccessRule): The instance
|
||||
being saved or deleted.
|
||||
|
||||
"""
|
||||
if isinstance(instance, RestrictedCourse):
|
||||
# If a restricted course changed, we need to update the list
|
||||
# of which courses are restricted as well as any rules
|
||||
# associated with the course.
|
||||
RestrictedCourse.invalidate_cache_for_course(instance.course_key)
|
||||
CountryAccessRule.invalidate_cache_for_course(instance.course_key)
|
||||
if isinstance(instance, CountryAccessRule):
|
||||
try:
|
||||
restricted_course = instance.restricted_course
|
||||
except RestrictedCourse.DoesNotExist:
|
||||
# If the restricted course and its rules are being deleted,
|
||||
# the restricted course may not exist at this point.
|
||||
# However, the cache should have been invalidated
|
||||
# when the restricted course was deleted.
|
||||
pass
|
||||
else:
|
||||
# Invalidate the cache of countries for the course.
|
||||
CountryAccessRule.invalidate_cache_for_course(restricted_course.course_key)
|
||||
|
||||
|
||||
# Hook up the cache invalidation receivers to the appropriate
|
||||
# post_save and post_delete signals.
|
||||
post_save.connect(invalidate_country_rule_cache, sender=CountryAccessRule)
|
||||
post_save.connect(invalidate_country_rule_cache, sender=RestrictedCourse)
|
||||
post_delete.connect(invalidate_country_rule_cache, sender=CountryAccessRule)
|
||||
post_delete.connect(invalidate_country_rule_cache, sender=RestrictedCourse)
|
||||
|
||||
|
||||
class CourseAccessRuleHistory(models.Model):
|
||||
"""History of course access rule changes. """
|
||||
|
||||
timestamp = models.DateTimeField(db_index=True, auto_now_add=True)
|
||||
course_key = CourseKeyField(max_length=255, db_index=True)
|
||||
snapshot = models.TextField(null=True, blank=True)
|
||||
|
||||
DELETED_PLACEHOLDER = "DELETED"
|
||||
|
||||
@classmethod
|
||||
def save_snapshot(cls, restricted_course, deleted=False):
|
||||
"""Save a snapshot of access rules for a course.
|
||||
|
||||
Arguments:
|
||||
restricted_course (RestrictedCourse)
|
||||
|
||||
Keyword Arguments:
|
||||
deleted (boolean): If True, the restricted course
|
||||
is about to be deleted. Create a placeholder
|
||||
snapshot recording that the course and all its
|
||||
rules was deleted.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
"""
|
||||
course_key = restricted_course.course_key
|
||||
|
||||
# At the point this is called, the access rules may not have
|
||||
# been deleted yet. When the rules *are* deleted, the
|
||||
# restricted course entry may no longer exist, so we
|
||||
# won't be able to take a snapshot of the rules.
|
||||
# To handle this, we save a placeholder "DELETED" entry
|
||||
# so that it's clear in the audit that the restricted
|
||||
# course (along with all its rules) was deleted.
|
||||
snapshot = (
|
||||
CourseAccessRuleHistory.DELETED_PLACEHOLDER if deleted
|
||||
else json.dumps(restricted_course.snapshot())
|
||||
)
|
||||
|
||||
cls.objects.create(
|
||||
course_key=course_key,
|
||||
snapshot=snapshot
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def snapshot_post_save_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument
|
||||
"""Create a snapshot of course access rules when the rules are updated. """
|
||||
if isinstance(instance, RestrictedCourse):
|
||||
CourseAccessRuleHistory.save_snapshot(instance)
|
||||
elif isinstance(instance, CountryAccessRule):
|
||||
CourseAccessRuleHistory.save_snapshot(instance.restricted_course)
|
||||
|
||||
@staticmethod
|
||||
def snapshot_post_delete_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument
|
||||
"""Create a snapshot of course access rules when rules are deleted. """
|
||||
if isinstance(instance, RestrictedCourse):
|
||||
CourseAccessRuleHistory.save_snapshot(instance, deleted=True)
|
||||
elif isinstance(instance, CountryAccessRule):
|
||||
try:
|
||||
restricted_course = instance.restricted_course
|
||||
except RestrictedCourse.DoesNotExist:
|
||||
# When Django admin deletes a restricted course, it will
|
||||
# also delete the rules associated with that course.
|
||||
# At this point, we can't access the restricted course
|
||||
# from the rule beause it may already have been deleted.
|
||||
# If this happens, we don't need to record anything,
|
||||
# since we already record a placeholder "DELETED"
|
||||
# entry when the restricted course record is deleted.
|
||||
pass
|
||||
else:
|
||||
CourseAccessRuleHistory.save_snapshot(restricted_course)
|
||||
|
||||
class Meta: # pylint: disable=missing-docstring,old-style-class
|
||||
get_latest_by = 'timestamp'
|
||||
|
||||
|
||||
# Connect the signals to the receivers so we record a history
|
||||
# of changes to the course access rules.
|
||||
post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=RestrictedCourse)
|
||||
post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=CountryAccessRule)
|
||||
post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=RestrictedCourse)
|
||||
post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=CountryAccessRule)
|
||||
|
||||
|
||||
class IPFilter(ConfigurationModel):
|
||||
"""
|
||||
Register specific IP addresses to explicitly block or unblock.
|
||||
|
||||
82
common/djangoapps/embargo/test_utils.py
Normal file
82
common/djangoapps/embargo/test_utils.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Utilities for writing unit tests that involve course embargos. """
|
||||
import contextlib
|
||||
import mock
|
||||
|
||||
import pygeoip
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.core.cache import cache
|
||||
from embargo.models import Country, CountryAccessRule, RestrictedCourse
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def restrict_course(course_key, access_point="enrollment"):
|
||||
"""Simulate that a course is restricted.
|
||||
|
||||
This does two things:
|
||||
1) Configures country access rules so that the course is restricted.
|
||||
2) Mocks the GeoIP call so the user appears to be coming
|
||||
from a country that's blocked from the course.
|
||||
|
||||
This is useful for tests that need to verify
|
||||
that restricted users won't be able to access
|
||||
particular views.
|
||||
|
||||
Arguments:
|
||||
course_key (CourseKey): The location of the course to block.
|
||||
|
||||
Keyword Arguments:
|
||||
access_point (str): Either "courseware" or "enrollment"
|
||||
|
||||
Yields:
|
||||
str: A URL to the page in the embargo app that explains
|
||||
why the user was blocked.
|
||||
|
||||
Example Usage:
|
||||
>>> with restrict_course(course_key) as redirect_url:
|
||||
>>> # The client will appear to be coming from
|
||||
>>> # an IP address that is blocked.
|
||||
>>> resp = self.client.get(url)
|
||||
>>> self.assertRedirects(resp, redirect_url)
|
||||
|
||||
"""
|
||||
# Clear the cache to ensure that previous tests don't interfere
|
||||
# with this test.
|
||||
cache.clear()
|
||||
|
||||
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
|
||||
|
||||
# Remove all existing rules for the course
|
||||
CountryAccessRule.objects.all().delete()
|
||||
|
||||
# Create the country object
|
||||
# Ordinarily, we'd create models for every country,
|
||||
# but that would slow down the test suite.
|
||||
country, __ = Country.objects.get_or_create(country='IR')
|
||||
|
||||
# Create a model for the restricted course
|
||||
restricted_course, __ = RestrictedCourse.objects.get_or_create(course_key=course_key)
|
||||
restricted_course.enroll_msg_key = 'default'
|
||||
restricted_course.access_msg_key = 'default'
|
||||
restricted_course.save()
|
||||
|
||||
# Ensure that there is a blacklist rule for the country
|
||||
CountryAccessRule.objects.get_or_create(
|
||||
restricted_course=restricted_course,
|
||||
country=country,
|
||||
rule_type='blacklist'
|
||||
)
|
||||
|
||||
# Simulate that the user is coming from the blacklisted country
|
||||
mock_ip.return_value = 'IR'
|
||||
|
||||
# Yield the redirect url so the tests don't need to know
|
||||
# the embargo messaging URL structure.
|
||||
redirect_url = reverse(
|
||||
'embargo_blocked_message',
|
||||
kwargs={
|
||||
'access_point': access_point,
|
||||
'message_key': 'default'
|
||||
}
|
||||
)
|
||||
yield redirect_url
|
||||
251
common/djangoapps/embargo/tests/test_api.py
Normal file
251
common/djangoapps/embargo/tests/test_api.py
Normal file
@@ -0,0 +1,251 @@
|
||||
"""
|
||||
Tests for EmbargoMiddleware
|
||||
"""
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
import pygeoip
|
||||
import ddt
|
||||
|
||||
from django.conf import settings
|
||||
from django.test.utils import override_settings
|
||||
from django.core.cache import cache
|
||||
from django.db import connection, transaction
|
||||
|
||||
from student.tests.factories import UserFactory
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from xmodule.modulestore.tests.django_utils import (
|
||||
ModuleStoreTestCase, mixed_store_config
|
||||
)
|
||||
|
||||
from embargo.models import (
|
||||
RestrictedCourse, Country, CountryAccessRule,
|
||||
)
|
||||
|
||||
from util.testing import UrlResetMixin
|
||||
from embargo import api as embargo_api
|
||||
from embargo.exceptions import InvalidAccessPoint
|
||||
from mock import patch
|
||||
|
||||
|
||||
# Since we don't need any XML course fixtures, use a modulestore configuration
|
||||
# that disables the XML modulestore.
|
||||
MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, include_xml=False)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
|
||||
"""Test the embargo API calls to determine whether a user has access. """
|
||||
|
||||
def setUp(self):
|
||||
super(EmbargoCheckAccessApiTests, self).setUp()
|
||||
self.course = CourseFactory.create()
|
||||
self.user = UserFactory.create()
|
||||
self.restricted_course = RestrictedCourse.objects.create(course_key=self.course.id)
|
||||
Country.objects.create(country='US')
|
||||
Country.objects.create(country='IR')
|
||||
Country.objects.create(country='CU')
|
||||
|
||||
# Clear the cache to prevent interference between tests
|
||||
cache.clear()
|
||||
|
||||
@ddt.data(
|
||||
# IP country, profile_country, blacklist, whitelist, allow_access
|
||||
('US', None, [], [], True),
|
||||
('IR', None, ['IR', 'CU'], [], False),
|
||||
('US', 'IR', ['IR', 'CU'], [], False),
|
||||
('IR', 'IR', ['IR', 'CU'], [], False),
|
||||
('US', None, [], ['US'], True),
|
||||
('IR', None, [], ['US'], False),
|
||||
('US', 'IR', [], ['US'], False),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_country_access_rules(self, ip_country, profile_country, blacklist, whitelist, allow_access):
|
||||
# Configure the access rules
|
||||
for whitelist_country in whitelist:
|
||||
CountryAccessRule.objects.create(
|
||||
rule_type=CountryAccessRule.WHITELIST_RULE,
|
||||
restricted_course=self.restricted_course,
|
||||
country=Country.objects.get(country=whitelist_country)
|
||||
)
|
||||
|
||||
for blacklist_country in blacklist:
|
||||
CountryAccessRule.objects.create(
|
||||
rule_type=CountryAccessRule.BLACKLIST_RULE,
|
||||
restricted_course=self.restricted_course,
|
||||
country=Country.objects.get(country=blacklist_country)
|
||||
)
|
||||
|
||||
# Configure the user's profile country
|
||||
if profile_country is not None:
|
||||
self.user.profile.country = profile_country
|
||||
self.user.profile.save()
|
||||
|
||||
# Appear to make a request from an IP in a particular country
|
||||
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
|
||||
mock_ip.return_value = ip_country
|
||||
|
||||
# Call the API. Note that the IP address we pass in doesn't
|
||||
# matter, since we're injecting a mock for geo-location
|
||||
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
|
||||
|
||||
# Verify that the access rules were applied correctly
|
||||
self.assertEqual(result, allow_access)
|
||||
|
||||
def test_no_user_has_access(self):
|
||||
CountryAccessRule.objects.create(
|
||||
rule_type=CountryAccessRule.BLACKLIST_RULE,
|
||||
restricted_course=self.restricted_course,
|
||||
country=Country.objects.get(country='US')
|
||||
)
|
||||
|
||||
# The user is set to None, because the user has not been authenticated.
|
||||
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_no_user_blocked(self):
|
||||
CountryAccessRule.objects.create(
|
||||
rule_type=CountryAccessRule.BLACKLIST_RULE,
|
||||
restricted_course=self.restricted_course,
|
||||
country=Country.objects.get(country='US')
|
||||
)
|
||||
|
||||
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
|
||||
mock_ip.return_value = 'US'
|
||||
|
||||
# The user is set to None, because the user has not been authenticated.
|
||||
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_course_not_restricted(self):
|
||||
# No restricted course model for this course key,
|
||||
# so all access checks should be skipped.
|
||||
unrestricted_course = CourseFactory.create()
|
||||
with self.assertNumQueries(1):
|
||||
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
|
||||
|
||||
# The second check should require no database queries
|
||||
with self.assertNumQueries(0):
|
||||
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
|
||||
|
||||
def test_ip_v6(self):
|
||||
# Test the scenario that will go through every check
|
||||
# (restricted course, but pass all the checks)
|
||||
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='FE80::0202:B3FF:FE1E:8329')
|
||||
self.assertTrue(result)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_profile_country_db_null(self):
|
||||
# Django country fields treat NULL values inconsistently.
|
||||
# When saving a profile with country set to None, Django saves an empty string to the database.
|
||||
# However, when the country field loads a NULL value from the database, it sets
|
||||
# `country.code` to `None`. This caused a bug in which country values created by
|
||||
# the original South schema migration -- which defaulted to NULL -- caused a runtime
|
||||
# exception when the embargo middleware treated the value as a string.
|
||||
# In order to simulate this behavior, we can't simply set `profile.country = None`.
|
||||
# (because when we save it, it will set the database field to an empty string instead of NULL)
|
||||
query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s"
|
||||
connection.cursor().execute(query, [str(self.user.profile.id)])
|
||||
transaction.commit_unless_managed()
|
||||
|
||||
# Verify that we can check the user's access without error
|
||||
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_caching(self):
|
||||
# Test the scenario that will go through every check
|
||||
# (restricted course, but pass all the checks)
|
||||
# This is the worst case, so it will hit all of the
|
||||
# caching code.
|
||||
with self.assertNumQueries(3):
|
||||
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
|
||||
|
||||
with self.assertNumQueries(0):
|
||||
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class EmbargoMessageUrlApiTests(UrlResetMixin, ModuleStoreTestCase):
|
||||
"""Test the embargo API calls for retrieving the blocking message URLs. """
|
||||
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def setUp(self):
|
||||
super(EmbargoMessageUrlApiTests, self).setUp('embargo')
|
||||
self.course = CourseFactory.create()
|
||||
|
||||
def tearDown(self):
|
||||
cache.clear()
|
||||
|
||||
@ddt.data(
|
||||
('enrollment', '/embargo/blocked-message/enrollment/embargo/'),
|
||||
('courseware', '/embargo/blocked-message/courseware/embargo/')
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_message_url_path(self, access_point, expected_url_path):
|
||||
self._restrict_course(self.course.id)
|
||||
|
||||
# Retrieve the URL to the blocked message page
|
||||
url_path = embargo_api.message_url_path(self.course.id, access_point)
|
||||
self.assertEqual(url_path, expected_url_path)
|
||||
|
||||
def test_message_url_path_caching(self):
|
||||
self._restrict_course(self.course.id)
|
||||
|
||||
# The first time we retrieve the message, we'll need
|
||||
# to hit the database.
|
||||
with self.assertNumQueries(2):
|
||||
embargo_api.message_url_path(self.course.id, "enrollment")
|
||||
|
||||
# The second time, we should be using cached values
|
||||
with self.assertNumQueries(0):
|
||||
embargo_api.message_url_path(self.course.id, "enrollment")
|
||||
|
||||
@ddt.data('enrollment', 'courseware')
|
||||
def test_message_url_path_no_restrictions_for_course(self, access_point):
|
||||
# No restrictions for the course
|
||||
url_path = embargo_api.message_url_path(self.course.id, access_point)
|
||||
|
||||
# Use a default path
|
||||
self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/')
|
||||
|
||||
def test_invalid_access_point(self):
|
||||
with self.assertRaises(InvalidAccessPoint):
|
||||
embargo_api.message_url_path(self.course.id, "invalid")
|
||||
|
||||
def test_message_url_stale_cache(self):
|
||||
# Retrieve the URL once, populating the cache with the list
|
||||
# of restricted courses.
|
||||
self._restrict_course(self.course.id)
|
||||
embargo_api.message_url_path(self.course.id, 'courseware')
|
||||
|
||||
# Delete the restricted course entry
|
||||
RestrictedCourse.objects.get(course_key=self.course.id).delete()
|
||||
|
||||
# Clear the message URL cache
|
||||
message_cache_key = (
|
||||
'embargo.message_url_path.courseware.{course_key}'
|
||||
).format(course_key=self.course.id)
|
||||
cache.delete(message_cache_key)
|
||||
|
||||
# Try again. Even though the cache results are stale,
|
||||
# we should still get a valid URL.
|
||||
url_path = embargo_api.message_url_path(self.course.id, 'courseware')
|
||||
self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/')
|
||||
|
||||
def _restrict_course(self, course_key):
|
||||
"""Restrict the user from accessing the course. """
|
||||
country = Country.objects.create(country='us')
|
||||
restricted_course = RestrictedCourse.objects.create(
|
||||
course_key=course_key,
|
||||
enroll_msg_key='embargo',
|
||||
access_msg_key='embargo'
|
||||
)
|
||||
CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course,
|
||||
rule_type=CountryAccessRule.BLACKLIST_RULE,
|
||||
country=country
|
||||
)
|
||||
@@ -2,27 +2,24 @@
|
||||
Tests for EmbargoMiddleware with CountryAccessRules
|
||||
"""
|
||||
|
||||
import mock
|
||||
import pygeoip
|
||||
import unittest
|
||||
|
||||
from django.db import connection, transaction
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.conf import settings
|
||||
from mock import patch
|
||||
import ddt
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache as django_cache
|
||||
|
||||
from util.testing import UrlResetMixin
|
||||
from student.tests.factories import UserFactory
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from xmodule.modulestore.tests.django_utils import (
|
||||
ModuleStoreTestCase, mixed_store_config
|
||||
)
|
||||
from config_models.models import cache as config_cache
|
||||
|
||||
# Explicitly import the cache from ConfigurationModel so we can reset it after each test
|
||||
from config_models.models import cache
|
||||
from embargo.models import (
|
||||
RestrictedCourse, Country, CountryAccessRule, WHITE_LIST, BLACK_LIST
|
||||
)
|
||||
from django_countries import countries
|
||||
from embargo.models import RestrictedCourse, IPFilter
|
||||
from embargo.test_utils import restrict_course
|
||||
|
||||
|
||||
# Since we don't need any XML course fixtures, use a modulestore configuration
|
||||
@@ -32,256 +29,147 @@ MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, incl
|
||||
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class EmbargoCountryAccessRulesTests(ModuleStoreTestCase):
|
||||
"""
|
||||
Tests of EmbargoApi
|
||||
"""
|
||||
class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase):
|
||||
"""Tests of embargo middleware country access rules.
|
||||
|
||||
There are detailed unit tests for the rule logic in
|
||||
`test_api.py`; here, we're mainly testing the integration
|
||||
with middleware
|
||||
|
||||
"""
|
||||
USERNAME = 'fred'
|
||||
PASSWORD = 'secret'
|
||||
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def setUp(self):
|
||||
super(EmbargoCountryAccessRulesTests, self).setUp()
|
||||
self.user = UserFactory(username='fred', password='secret')
|
||||
self.client.login(username='fred', password='secret')
|
||||
self.embargo_course1 = CourseFactory.create()
|
||||
self.embargo_course1.save()
|
||||
self.embargo_course2 = CourseFactory.create()
|
||||
self.embargo_course2.save()
|
||||
self.regular_course = CourseFactory.create(org="Regular")
|
||||
self.regular_course.save()
|
||||
self.embargoed_course_whitelisted = '/courses/' + self.embargo_course1.id.to_deprecated_string() + '/info'
|
||||
self.embargoed_course_blacklisted = '/courses/' + self.embargo_course2.id.to_deprecated_string() + '/info'
|
||||
self.regular_page = '/courses/' + self.regular_course.id.to_deprecated_string() + '/info'
|
||||
super(EmbargoMiddlewareAccessTests, self).setUp('embargo')
|
||||
self.user = UserFactory(username=self.USERNAME, password=self.PASSWORD)
|
||||
self.course = CourseFactory.create()
|
||||
self.client.login(username=self.USERNAME, password=self.PASSWORD)
|
||||
|
||||
restricted_course_1 = RestrictedCourse.objects.create(course_key=self.embargo_course1.id)
|
||||
restricted_course_2 = RestrictedCourse.objects.create(course_key=self.embargo_course2.id)
|
||||
self.courseware_url = reverse(
|
||||
'course_root',
|
||||
kwargs={'course_id': unicode(self.course.id)}
|
||||
)
|
||||
self.non_courseware_url = reverse('dashboard')
|
||||
|
||||
all_countries = [Country(country=code[0]) for code in list(countries)]
|
||||
Country.objects.bulk_create(all_countries)
|
||||
# Clear the cache to avoid interference between tests
|
||||
django_cache.clear()
|
||||
config_cache.clear()
|
||||
|
||||
country_access_white_rules = [
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_1,
|
||||
rule_type=WHITE_LIST,
|
||||
country=Country.objects.get(country='US')
|
||||
),
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_1,
|
||||
rule_type=WHITE_LIST,
|
||||
country=Country.objects.get(country='NZ')
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_blocked(self):
|
||||
with restrict_course(self.course.id, access_point='courseware') as redirect_url:
|
||||
response = self.client.get(self.courseware_url)
|
||||
self.assertRedirects(response, redirect_url)
|
||||
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_allowed(self):
|
||||
# Add the course to the list of restricted courses
|
||||
# but don't create any access rules
|
||||
RestrictedCourse.objects.create(course_key=self.course.id)
|
||||
|
||||
# Expect that we can access courseware
|
||||
response = self.client.get(self.courseware_url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_non_courseware_url(self):
|
||||
with restrict_course(self.course.id):
|
||||
response = self.client.get(self.non_courseware_url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data(
|
||||
# request_ip, blacklist, whitelist, is_enabled, allow_access
|
||||
('173.194.123.35', ['173.194.123.35'], [], True, False),
|
||||
('173.194.123.35', ['173.194.0.0/16'], [], True, False),
|
||||
('173.194.123.35', ['127.0.0.0/32', '173.194.0.0/16'], [], True, False),
|
||||
('173.195.10.20', ['173.194.0.0/16'], [], True, True),
|
||||
('173.194.123.35', ['173.194.0.0/16'], ['173.194.0.0/16'], True, False),
|
||||
('173.194.123.35', [], ['173.194.0.0/16'], True, True),
|
||||
('192.178.2.3', [], ['173.194.0.0/16'], True, True),
|
||||
('173.194.123.35', ['173.194.123.35'], [], False, True),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_ip_access_rules(self, request_ip, blacklist, whitelist, is_enabled, allow_access):
|
||||
# Ensure that IP blocking works for anonymous users
|
||||
self.client.logout()
|
||||
|
||||
# Set up the IP rules
|
||||
IPFilter.objects.create(
|
||||
blacklist=", ".join(blacklist),
|
||||
whitelist=", ".join(whitelist),
|
||||
enabled=is_enabled
|
||||
)
|
||||
|
||||
# Check that access is enforced
|
||||
response = self.client.get(
|
||||
"/",
|
||||
HTTP_X_FORWARDED_FOR=request_ip,
|
||||
REMOTE_ADDR=request_ip
|
||||
)
|
||||
|
||||
if allow_access:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
else:
|
||||
redirect_url = reverse(
|
||||
'embargo_blocked_message',
|
||||
kwargs={
|
||||
'access_point': 'courseware',
|
||||
'message_key': 'embargo'
|
||||
}
|
||||
)
|
||||
]
|
||||
CountryAccessRule.objects.bulk_create(country_access_white_rules)
|
||||
self.assertRedirects(response, redirect_url)
|
||||
|
||||
country_access_black_rules = [
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_2,
|
||||
rule_type=BLACK_LIST,
|
||||
country=Country.objects.get(country='CU')
|
||||
),
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_2,
|
||||
rule_type=BLACK_LIST,
|
||||
country=Country.objects.get(country='IR')
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data(
|
||||
('courseware', 'default'),
|
||||
('courseware', 'embargo'),
|
||||
('enrollment', 'default'),
|
||||
('enrollment', 'embargo')
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_always_allow_access_to_embargo_messages(self, access_point, msg_key):
|
||||
# Blacklist an IP address
|
||||
IPFilter.objects.create(
|
||||
blacklist="192.168.10.20",
|
||||
enabled=True
|
||||
)
|
||||
|
||||
url = reverse(
|
||||
'embargo_blocked_message',
|
||||
kwargs={
|
||||
'access_point': access_point,
|
||||
'message_key': msg_key
|
||||
}
|
||||
)
|
||||
response = self.client.get(
|
||||
url,
|
||||
HTTP_X_FORWARDED_FOR="192.168.10.20",
|
||||
REMOTE_ADDR="192.168.10.20"
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_whitelist_ip_skips_country_access_checks(self):
|
||||
# Whitelist an IP address
|
||||
IPFilter.objects.create(
|
||||
whitelist="192.168.10.20",
|
||||
enabled=True
|
||||
)
|
||||
|
||||
# Set up country access rules so the user would
|
||||
# be restricted from the course.
|
||||
with restrict_course(self.course.id):
|
||||
# Make a request from the whitelisted IP address
|
||||
response = self.client.get(
|
||||
self.courseware_url,
|
||||
HTTP_X_FORWARDED_FOR="192.168.10.20",
|
||||
REMOTE_ADDR="192.168.10.20"
|
||||
)
|
||||
]
|
||||
CountryAccessRule.objects.bulk_create(country_access_black_rules)
|
||||
|
||||
# Text from lms/templates/static_templates/embargo.html
|
||||
self.embargo_text = "Unfortunately, at this time edX must comply with export controls, and we cannot allow you to access this course." # pylint: disable=line-too-long
|
||||
self.patcher = mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr', self.mock_country_code_by_addr)
|
||||
self.patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
# Explicitly clear ConfigurationModel's cache so tests have a clear cache
|
||||
# and don't interfere with each other
|
||||
cache.clear()
|
||||
self.patcher.stop()
|
||||
|
||||
def mock_country_code_by_addr(self, ip_addr):
|
||||
"""
|
||||
making a lists of countries which will be use in country access rules.
|
||||
if incoming request's ip belongs to this dict then related country will return.
|
||||
for one course CU and IR added as blacklist in course access rules.
|
||||
for one course US and NZ added as whitelist in course access rules.
|
||||
"""
|
||||
ip_dict = {
|
||||
'1.0.0.0': 'CU',
|
||||
'2.0.0.0': 'IR',
|
||||
'3.0.0.0': 'US',
|
||||
'4.0.0.0': 'NZ'
|
||||
}
|
||||
return ip_dict.get(ip_addr, 'FR')
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data('1.0.0.0', '2.0.0.0')
|
||||
def test_course_access_rules_with_black_rule_country_by_user_ip(self, ip_address):
|
||||
# Accessing an embargoed page from a user ip whose origin is added as
|
||||
# blacklist in course access rules should be redirected.
|
||||
# any other IP should be success
|
||||
|
||||
# Following the redirect should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
# Following the redirect should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address,
|
||||
follow=True
|
||||
)
|
||||
self.assertIn(self.embargo_text, response.content)
|
||||
|
||||
# accesssing blacklist course from any other country ip should be success
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR='5.0.0.1',
|
||||
REMOTE_ADDR='5.0.0.1'
|
||||
)
|
||||
# Expect that we were still able to access the page,
|
||||
# even though we would have been blocked by country
|
||||
# access rules.
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# accesssing whitelist course from these should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_course_whitelisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
# Accessing a regular page from these IP should be success
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data('3.0.0.0', '4.0.0.0', "7.0.0.1", "2001:250::")
|
||||
def test_course_access_rules_with_white_rule_country_by_user_ip(self, ip_address):
|
||||
# Accessing an embargoed page from a user ip whose origin is added as
|
||||
# white in course access rules should succeed. any other ip should be fail
|
||||
|
||||
response = self.client.get(
|
||||
self.embargoed_course_whitelisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
if ip_address in ['3.0.0.0', '4.0.0.0']:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
else:
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
# access the blacklisted course should give success
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Accessing a regular page should success
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_embargo_profile_country_db_null(self):
|
||||
# Django country fields treat NULL values inconsistently.
|
||||
# When saving a profile with country set to None, Django saves an empty string to the database.
|
||||
# However, when the country field loads a NULL value from the database, it sets
|
||||
# `country.code` to `None`. This caused a bug in which country values created by
|
||||
# the original South schema migration -- which defaulted to NULL -- caused a runtime
|
||||
# exception when the embargo middleware treated the value as a string.
|
||||
# In order to simulate this behavior, we can't simply set `profile.country = None`.
|
||||
# (because when we save it, it will set the database field to an empty string instead of NULL)
|
||||
query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s"
|
||||
connection.cursor().execute(query, [str(self.user.profile.id)])
|
||||
transaction.commit_unless_managed()
|
||||
|
||||
# Attempt to access an embargoed course
|
||||
# Verify that the student can access the page without an error
|
||||
response = self.client.get(self.embargoed_course_blacklisted)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "AF", "NZ", "IR")
|
||||
def test_regular_course_accessible_from_every_where(self, profile_country):
|
||||
# regular course is accessible even when ENABLE_COUNTRY_ACCESS flag is true
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
response = self.client.get(self.regular_page)
|
||||
# Course is accessible from all countries.
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "AF", "NZ", "IR")
|
||||
def test_embargo_course_whitelisted_with_profile_country(self, profile_country):
|
||||
# course is emabargoed and has white list countries.
|
||||
# but user ip belongs to US but profile country is blacklist
|
||||
# only white list country can access the course.
|
||||
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
# adding the US IP so the _country_code_from_ip() get passed
|
||||
response = self.client.get(
|
||||
self.embargoed_course_whitelisted,
|
||||
HTTP_X_FORWARDED_FOR='3.0.0.0',
|
||||
REMOTE_ADDR='3.0.0.0'
|
||||
)
|
||||
# Course is whitelisted against US,NZ so all other countries will be disallowed
|
||||
if profile_country in ["CA", "AF", "IR"]:
|
||||
self.assertRedirects(response, reverse('embargo'))
|
||||
self.assertEqual(response.status_code, 302)
|
||||
else:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "NZ", "IR", "CU")
|
||||
def test_embargo_course_blacklisted_with_profile_country(self, profile_country):
|
||||
# if course is emabargoed and has black list countries ( CU , IR ).
|
||||
# then users from these countries can't access this course.
|
||||
# any user from other than these countries can access.
|
||||
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
response = self.client.get(self.embargoed_course_blacklisted)
|
||||
if profile_country in ["", "US", "CA", "NZ"]:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
else:
|
||||
embargo_url = reverse('embargo')
|
||||
self.assertRedirects(response, embargo_url)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "NZ", "IR", "CU")
|
||||
def test_embargo_course_without_any_rules_list(self, profile_country):
|
||||
# if course is emabargoed but without whitelist and blacklist
|
||||
# then course can be accessible from any where
|
||||
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
embargo_course3 = CourseFactory.create()
|
||||
embargo_course3.save()
|
||||
RestrictedCourse(course_key=embargo_course3.id).save()
|
||||
embargoed_course_page = '/courses/' + embargo_course3.id.to_deprecated_string() + '/info'
|
||||
|
||||
response = self.client.get(embargoed_course_page)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_embargo_profile_country_cache(self):
|
||||
# Warm the cache
|
||||
with self.assertNumQueries(24):
|
||||
self.client.get(self.embargoed_course_blacklisted)
|
||||
|
||||
# Access the page multiple times, but expect that we hit
|
||||
# the database to check the user's profile only once
|
||||
with self.assertNumQueries(9):
|
||||
self.client.get(self.embargoed_course_blacklisted)
|
||||
|
||||
@@ -1,17 +1,18 @@
|
||||
"""Test of models for embargo middleware app"""
|
||||
"""Test of models for embargo app"""
|
||||
import json
|
||||
from django.test import TestCase
|
||||
from django.db.utils import IntegrityError
|
||||
from opaque_keys.edx.locations import SlashSeparatedCourseKey
|
||||
from opaque_keys.edx.locator import CourseLocator
|
||||
from embargo.models import (
|
||||
EmbargoedCourse, EmbargoedState, IPFilter, RestrictedCourse,
|
||||
Country, CountryAccessRule, WHITE_LIST, BLACK_LIST
|
||||
Country, CountryAccessRule, CourseAccessRuleHistory
|
||||
)
|
||||
|
||||
|
||||
class EmbargoModelsTest(TestCase):
|
||||
"""Test each of the 3 models in embargo.models"""
|
||||
def test_course_embargo(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
course_id = CourseLocator('abc', '123', 'doremi')
|
||||
# Test that course is not authorized by default
|
||||
self.assertFalse(EmbargoedCourse.is_embargoed(course_id))
|
||||
|
||||
@@ -22,8 +23,8 @@ class EmbargoModelsTest(TestCase):
|
||||
# Now, course should be embargoed
|
||||
self.assertTrue(EmbargoedCourse.is_embargoed(course_id))
|
||||
self.assertEquals(
|
||||
cauth.__unicode__(),
|
||||
"Course 'abc/123/doremi' is Embargoed"
|
||||
unicode(cauth),
|
||||
u"Course '{course_id}' is Embargoed".format(course_id=course_id)
|
||||
)
|
||||
|
||||
# Unauthorize by explicitly setting email_enabled to False
|
||||
@@ -32,8 +33,8 @@ class EmbargoModelsTest(TestCase):
|
||||
# Test that course is now unauthorized
|
||||
self.assertFalse(EmbargoedCourse.is_embargoed(course_id))
|
||||
self.assertEquals(
|
||||
cauth.__unicode__(),
|
||||
"Course 'abc/123/doremi' is Not Embargoed"
|
||||
unicode(cauth),
|
||||
u"Course '{course_id}' is Not Embargoed".format(course_id=course_id)
|
||||
)
|
||||
|
||||
def test_state_embargo(self):
|
||||
@@ -101,18 +102,18 @@ class EmbargoModelsTest(TestCase):
|
||||
|
||||
|
||||
class RestrictedCourseTest(TestCase):
|
||||
"""Test unicode values tests and cache functionality"""
|
||||
"""Test RestrictedCourse model. """
|
||||
|
||||
def test_unicode_values(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
course_id = CourseLocator('abc', '123', 'doremi')
|
||||
restricted_course = RestrictedCourse.objects.create(course_key=course_id)
|
||||
self.assertEquals(
|
||||
restricted_course.__unicode__(),
|
||||
"abc/123/doremi"
|
||||
unicode(restricted_course),
|
||||
unicode(course_id)
|
||||
)
|
||||
|
||||
def test_restricted_course_cache_with_save_delete(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
course_id = CourseLocator('abc', '123', 'doremi')
|
||||
RestrictedCourse.objects.create(course_key=course_id)
|
||||
|
||||
# Warm the cache
|
||||
@@ -124,7 +125,7 @@ class RestrictedCourseTest(TestCase):
|
||||
RestrictedCourse.is_restricted_course(course_id)
|
||||
|
||||
# add new the course so the cache must get delete and again hit the db
|
||||
new_course_id = SlashSeparatedCourseKey('def', '123', 'doremi')
|
||||
new_course_id = CourseLocator('def', '123', 'doremi')
|
||||
RestrictedCourse.objects.create(course_key=new_course_id)
|
||||
with self.assertNumQueries(1):
|
||||
RestrictedCourse.is_restricted_course(new_course_id)
|
||||
@@ -146,45 +147,42 @@ class RestrictedCourseTest(TestCase):
|
||||
|
||||
|
||||
class CountryTest(TestCase):
|
||||
"""Test unicode values test"""
|
||||
"""Test Country model. """
|
||||
|
||||
def test_unicode_values(self):
|
||||
country = Country.objects.create(country='NZ')
|
||||
self.assertEquals(
|
||||
country.__unicode__(),
|
||||
"New Zealand (NZ)"
|
||||
)
|
||||
self.assertEquals(unicode(country), "New Zealand (NZ)")
|
||||
|
||||
|
||||
class CountryAccessRuleTest(TestCase):
|
||||
"""Test unicode values tests and unique-together contraint"""
|
||||
"""Test CountryAccessRule model. """
|
||||
|
||||
def test_unicode_values(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
course_id = CourseLocator('abc', '123', 'doremi')
|
||||
country = Country.objects.create(country='NZ')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
access_rule = CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=WHITE_LIST,
|
||||
rule_type=CountryAccessRule.WHITELIST_RULE,
|
||||
country=country
|
||||
)
|
||||
|
||||
self.assertEquals(
|
||||
access_rule.__unicode__(),
|
||||
"Whitelist New Zealand (NZ) for abc/123/doremi"
|
||||
unicode(access_rule),
|
||||
u"Whitelist New Zealand (NZ) for {course_key}".format(course_key=course_id)
|
||||
)
|
||||
|
||||
course_id = SlashSeparatedCourseKey('def', '123', 'doremi')
|
||||
course_id = CourseLocator('def', '123', 'doremi')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
access_rule = CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=BLACK_LIST,
|
||||
rule_type=CountryAccessRule.BLACKLIST_RULE,
|
||||
country=country
|
||||
)
|
||||
|
||||
self.assertEquals(
|
||||
access_rule.__unicode__(),
|
||||
"Blacklist New Zealand (NZ) for def/123/doremi"
|
||||
unicode(access_rule),
|
||||
u"Blacklist New Zealand (NZ) for {course_key}".format(course_key=course_id)
|
||||
)
|
||||
|
||||
def test_unique_together_constraint(self):
|
||||
@@ -192,31 +190,31 @@ class CountryAccessRuleTest(TestCase):
|
||||
Course with specific country can be added either as whitelist or blacklist
|
||||
trying to add with both types will raise error
|
||||
"""
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
course_id = CourseLocator('abc', '123', 'doremi')
|
||||
country = Country.objects.create(country='NZ')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
|
||||
CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=WHITE_LIST,
|
||||
rule_type=CountryAccessRule.WHITELIST_RULE,
|
||||
country=country
|
||||
)
|
||||
|
||||
with self.assertRaises(IntegrityError):
|
||||
CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=BLACK_LIST,
|
||||
rule_type=CountryAccessRule.BLACKLIST_RULE,
|
||||
country=country
|
||||
)
|
||||
|
||||
def test_country_access_list_cache_with_save_delete(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
course_id = CourseLocator('abc', '123', 'doremi')
|
||||
country = Country.objects.create(country='NZ')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
|
||||
course = CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=WHITE_LIST,
|
||||
rule_type=CountryAccessRule.WHITELIST_RULE,
|
||||
country=country
|
||||
)
|
||||
|
||||
@@ -227,8 +225,123 @@ class CountryAccessRuleTest(TestCase):
|
||||
with self.assertNumQueries(0):
|
||||
CountryAccessRule.check_country_access(course_id, 'NZ')
|
||||
|
||||
# deleting an object will delete cache also.and hit db on
|
||||
# get the country access lists for course
|
||||
# Deleting an object will invalidate the cache
|
||||
course.delete()
|
||||
with self.assertNumQueries(1):
|
||||
CountryAccessRule.check_country_access(course_id, 'NZ')
|
||||
|
||||
|
||||
class CourseAccessRuleHistoryTest(TestCase):
|
||||
"""Test course access rule history. """
|
||||
|
||||
def setUp(self):
|
||||
self.course_key = CourseLocator('edx', 'DemoX', 'Demo_Course')
|
||||
self.restricted_course = RestrictedCourse.objects.create(course_key=self.course_key)
|
||||
self.countries = {
|
||||
'US': Country.objects.create(country='US'),
|
||||
'AU': Country.objects.create(country='AU')
|
||||
}
|
||||
|
||||
def test_course_access_history_no_rules(self):
|
||||
self._assert_history([])
|
||||
self.restricted_course.delete()
|
||||
self._assert_history_deleted()
|
||||
|
||||
def test_course_access_history_with_rules(self):
|
||||
# Add one rule
|
||||
us_rule = CountryAccessRule.objects.create(
|
||||
restricted_course=self.restricted_course,
|
||||
country=self.countries['US'],
|
||||
rule_type=CountryAccessRule.WHITELIST_RULE
|
||||
)
|
||||
self._assert_history([('US', 'whitelist')])
|
||||
|
||||
# Add another rule
|
||||
au_rule = CountryAccessRule.objects.create(
|
||||
restricted_course=self.restricted_course,
|
||||
country=self.countries['AU'],
|
||||
rule_type=CountryAccessRule.BLACKLIST_RULE
|
||||
)
|
||||
self._assert_history([
|
||||
('US', 'whitelist'),
|
||||
('AU', 'blacklist')
|
||||
])
|
||||
|
||||
# Delete the first rule
|
||||
us_rule.delete()
|
||||
self._assert_history([('AU', 'blacklist')])
|
||||
|
||||
# Delete the second rule
|
||||
au_rule.delete()
|
||||
self._assert_history([])
|
||||
|
||||
def test_course_access_history_delete_all(self):
|
||||
# Create a rule
|
||||
CountryAccessRule.objects.create(
|
||||
restricted_course=self.restricted_course,
|
||||
country=self.countries['US'],
|
||||
rule_type=CountryAccessRule.WHITELIST_RULE
|
||||
)
|
||||
|
||||
# Delete the course (and, implicitly, all the rules)
|
||||
self.restricted_course.delete()
|
||||
self._assert_history_deleted()
|
||||
|
||||
def test_course_access_history_change_message(self):
|
||||
# Change the message key
|
||||
self.restricted_course.enroll_msg_key = 'embargo'
|
||||
self.restricted_course.access_msg_key = 'embargo'
|
||||
self.restricted_course.save()
|
||||
|
||||
# Expect a history entry with the changed keys
|
||||
self._assert_history([], enroll_msg='embargo', access_msg='embargo')
|
||||
|
||||
def _assert_history(self, country_rules, enroll_msg='default', access_msg='default'):
|
||||
"""Check the latest history entry.
|
||||
|
||||
Arguments:
|
||||
country_rules (list): List of rules, each of which are tuples
|
||||
of the form `(country_code, rule_type)`.
|
||||
|
||||
Keyword Arguments:
|
||||
enroll_msg (str): The expected enrollment message key.
|
||||
access_msg (str): The expected access message key.
|
||||
|
||||
Raises:
|
||||
AssertionError
|
||||
|
||||
"""
|
||||
record = CourseAccessRuleHistory.objects.latest()
|
||||
|
||||
# Check that the record is for the correct course
|
||||
self.assertEqual(record.course_key, self.course_key)
|
||||
|
||||
# Load the history entry and verify the message keys
|
||||
snapshot = json.loads(record.snapshot)
|
||||
self.assertEqual(snapshot['enroll_msg'], enroll_msg)
|
||||
self.assertEqual(snapshot['access_msg'], access_msg)
|
||||
|
||||
# For each rule, check that there is an entry
|
||||
# in the history record.
|
||||
for (country, rule_type) in country_rules:
|
||||
self.assertIn(
|
||||
{
|
||||
'country': country,
|
||||
'rule_type': rule_type
|
||||
},
|
||||
snapshot['country_rules']
|
||||
)
|
||||
|
||||
# Check that there are no duplicate entries
|
||||
self.assertEqual(len(snapshot['country_rules']), len(country_rules))
|
||||
|
||||
def _assert_history_deleted(self):
|
||||
"""Check the latest history entry for a 'DELETED' placeholder.
|
||||
|
||||
Raises:
|
||||
AssertionError
|
||||
|
||||
"""
|
||||
record = CourseAccessRuleHistory.objects.latest()
|
||||
self.assertEqual(record.course_key, self.course_key)
|
||||
self.assertEqual(record.snapshot, "DELETED")
|
||||
|
||||
Reference in New Issue
Block a user