Add new models to embargo to support country access
Add Django admin UI for configuring country access Migrate existing embargo rules into the new tables. ECOM-996: updated the middleware to use new models and access rules ECOM-996: added the flag to support old and new formats ECOM-996: added the api layer for country access settings ECOM-996: added the api layer for country access settings ECOM-996 implementing the white and blacklist checks. ECOM-996 minor re-factoring in api. ECOM-996 minor re-factoring in api. ECOM-1025 refactoring the code according to PR feedback. ECOM-1025 refactoring the code according to PR feedback. ECOM-1025 deleting cache in model save and delete methods ECOM-1025 adding basic api test cases file. ECOM-1025 refactoring the code according to PR feedback. ECOM-1025 refactoring the code according to PR feedback. ECOM-1025 refactoring the code according to PR feedback. adding the test cases. ECOM-1025 removing extra line ECOM-1025 removing un-used function. ECOM-1025 removing un-used function. ECOM-1025 re-factor the code. ECOM-1025 re-name the test file to test_middleware_access_rules.py. we already had old test_middleware.py ECOM-1025 adding test cases for newly added models. ECOM-1025 adding test cases and resolve conflicts. ECOM-1025 fixing the quality and pep-8 issues. ECOM-1025 re-factoring the code according to the PR feedback. ECOM-1025 re-name the variable name. ECOM-1025 removing the _check_ip_lists and its test cases. also added few missing scenarios test cases. ECOM-1025 removing un-used line.
This commit is contained in:
94
common/djangoapps/embargo/api.py
Normal file
94
common/djangoapps/embargo/api.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""
|
||||
The Python API layer of the country access settings. Essentially the middle tier of the project, responsible for all
|
||||
business logic that is not directly tied to the data itself.
|
||||
|
||||
This API is exposed via the middleware(emabargo/middileware.py) layer but may be used directly in-process.
|
||||
|
||||
"""
|
||||
import logging
|
||||
import pygeoip
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.conf import settings
|
||||
from embargo.models import CountryAccessRule, RestrictedCourse
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_user_country_from_profile(user):
|
||||
"""
|
||||
Check whether the user is embargoed based on the country code in the user's profile.
|
||||
|
||||
Args:
|
||||
user (User): The user attempting to access courseware.
|
||||
|
||||
Returns:
|
||||
user country from profile.
|
||||
|
||||
"""
|
||||
cache_key = u'user.{user_id}.profile.country'.format(user_id=user.id)
|
||||
profile_country = cache.get(cache_key)
|
||||
if profile_country is None:
|
||||
profile = getattr(user, 'profile', None)
|
||||
if profile is not None and profile.country.code is not None:
|
||||
profile_country = profile.country.code.upper()
|
||||
else:
|
||||
profile_country = ""
|
||||
cache.set(cache_key, profile_country)
|
||||
|
||||
return profile_country
|
||||
|
||||
|
||||
def _country_code_from_ip(ip_addr):
|
||||
"""
|
||||
Return the country code associated with an IP address.
|
||||
Handles both IPv4 and IPv6 addresses.
|
||||
|
||||
Args:
|
||||
ip_addr (str): The IP address to look up.
|
||||
|
||||
Returns:
|
||||
str: A 2-letter country code.
|
||||
|
||||
"""
|
||||
if ip_addr.find(':') >= 0:
|
||||
return pygeoip.GeoIP(settings.GEOIPV6_PATH).country_code_by_addr(ip_addr)
|
||||
else:
|
||||
return pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr)
|
||||
|
||||
|
||||
def check_course_access(user, ip_address, course_key):
|
||||
"""
|
||||
Check is the user with this ip_address has access to the given course
|
||||
|
||||
Params:
|
||||
user (User): Currently logged in user object
|
||||
ip_address (str): The ip_address of user
|
||||
course_key (CourseLocator): CourseLocator object the user is trying to access
|
||||
|
||||
Returns:
|
||||
The return will be True if the user has access on the course.
|
||||
if any constraints fails it will return the False
|
||||
"""
|
||||
course_is_restricted = RestrictedCourse.is_restricted_course(course_key)
|
||||
# If they're trying to access a course that cares about embargoes
|
||||
|
||||
# If course is not restricted then return immediately return True
|
||||
# no need for further checking
|
||||
if not course_is_restricted:
|
||||
return True
|
||||
|
||||
# Retrieve the country code from the IP address
|
||||
# and check it against the allowed countries list for a course
|
||||
user_country_from_ip = _country_code_from_ip(ip_address)
|
||||
# if user country has access to course return True
|
||||
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
|
||||
return False
|
||||
|
||||
# Retrieve the country code from the user profile.
|
||||
user_country_from_profile = get_user_country_from_profile(user)
|
||||
# if profile country has access return True
|
||||
if not CountryAccessRule.check_country_access(course_key, user_country_from_profile):
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -45,6 +45,7 @@ from util.request import course_id_from_url
|
||||
|
||||
from student.models import unique_id_for_user
|
||||
from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter
|
||||
from embargo.api import check_course_access
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -73,11 +74,18 @@ class EmbargoMiddleware(object):
|
||||
# If embargoing is turned off, make this middleware do nothing
|
||||
if not settings.FEATURES.get('EMBARGO', False) and not self.site_enabled:
|
||||
raise MiddlewareNotUsed()
|
||||
self.enable_country_access = settings.FEATURES.get('ENABLE_COUNTRY_ACCESS', False)
|
||||
|
||||
def process_request(self, request):
|
||||
"""
|
||||
Processes embargo requests.
|
||||
"""
|
||||
if self.enable_country_access:
|
||||
if self.country_access_rules(request):
|
||||
return None
|
||||
else:
|
||||
return self._embargo_redirect_response
|
||||
|
||||
url = request.path
|
||||
course_id = course_id_from_url(url)
|
||||
course_is_embargoed = EmbargoedCourse.is_embargoed(course_id)
|
||||
@@ -297,3 +305,20 @@ class EmbargoMiddleware(object):
|
||||
return True
|
||||
|
||||
return _inner
|
||||
|
||||
def country_access_rules(self, request):
|
||||
"""
|
||||
check the country access rules for a given course.
|
||||
if course id is invalid return True
|
||||
Args:
|
||||
request
|
||||
|
||||
Return:
|
||||
boolean: True if the user has access else false.
|
||||
|
||||
"""
|
||||
url = request.path
|
||||
course_id = course_id_from_url(url)
|
||||
if course_id is None:
|
||||
return True
|
||||
return check_course_access(request.user, get_ip(request), course_id)
|
||||
|
||||
@@ -15,8 +15,10 @@ import ipaddr
|
||||
|
||||
from django.db import models
|
||||
from django.utils.translation import ugettext as _, ugettext_lazy
|
||||
from django.core.cache import cache
|
||||
|
||||
from django_countries.fields import CountryField
|
||||
from django_countries import countries
|
||||
|
||||
from config_models.models import ConfigurationModel
|
||||
from xmodule_django.models import CourseKeyField, NoneToEmptyManager
|
||||
@@ -24,6 +26,10 @@ from xmodule_django.models import CourseKeyField, NoneToEmptyManager
|
||||
from embargo.messages import ENROLL_MESSAGES, COURSEWARE_MESSAGES
|
||||
|
||||
|
||||
WHITE_LIST = 'whitelist'
|
||||
BLACK_LIST = 'blacklist'
|
||||
|
||||
|
||||
class EmbargoedCourse(models.Model):
|
||||
"""
|
||||
Enable course embargo on a course-by-course basis.
|
||||
@@ -123,9 +129,50 @@ class RestrictedCourse(models.Model):
|
||||
help_text=ugettext_lazy(u"The message to show when a user is blocked from accessing a course.")
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def cache_key_name(cls):
|
||||
"""Return the name of the key to use to cache the current restricted course list"""
|
||||
return 'embargo/RestrictedCourse/courses'
|
||||
|
||||
@classmethod
|
||||
def is_restricted_course(cls, course_id):
|
||||
"""
|
||||
Check if the course is in restricted list
|
||||
|
||||
Args:
|
||||
course_id (str): course_id to look for
|
||||
|
||||
Returns:
|
||||
Boolean
|
||||
True if course is in restricted course list.
|
||||
"""
|
||||
return unicode(course_id) in cls._get_restricted_courses_from_cache()
|
||||
|
||||
@classmethod
|
||||
def _get_restricted_courses_from_cache(cls):
|
||||
"""
|
||||
Cache all restricted courses and returns the list of course_keys that are restricted
|
||||
"""
|
||||
restricted_courses = cache.get(cls.cache_key_name())
|
||||
if not restricted_courses:
|
||||
restricted_courses = list(RestrictedCourse.objects.values_list('course_key', flat=True))
|
||||
cache.set(cls.cache_key_name(), restricted_courses)
|
||||
return restricted_courses
|
||||
|
||||
def __unicode__(self):
|
||||
return unicode(self.course_key)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""
|
||||
Clear the cached value when saving a RestrictedCourse entry
|
||||
"""
|
||||
super(RestrictedCourse, self).save(*args, **kwargs)
|
||||
cache.delete(self.cache_key_name())
|
||||
|
||||
def delete(self, using=None):
|
||||
super(RestrictedCourse, self).delete()
|
||||
cache.delete(self.cache_key_name())
|
||||
|
||||
|
||||
class Country(models.Model):
|
||||
"""Representation of a country.
|
||||
@@ -147,7 +194,7 @@ class Country(models.Model):
|
||||
)
|
||||
|
||||
class Meta:
|
||||
# Default ordering is ascending by country code
|
||||
"""Default ordering is ascending by country code """
|
||||
ordering = ['country']
|
||||
|
||||
|
||||
@@ -170,14 +217,14 @@ class CountryAccessRule(models.Model):
|
||||
"""
|
||||
|
||||
RULE_TYPE_CHOICES = (
|
||||
('whitelist', 'Whitelist (allow only these countries)'),
|
||||
('blacklist', 'Blacklist (block these countries)'),
|
||||
(WHITE_LIST, 'Whitelist (allow only these countries)'),
|
||||
(BLACK_LIST, 'Blacklist (block these countries)'),
|
||||
)
|
||||
|
||||
rule_type = models.CharField(
|
||||
max_length=255,
|
||||
choices=RULE_TYPE_CHOICES,
|
||||
default='blacklist',
|
||||
default=BLACK_LIST,
|
||||
help_text=ugettext_lazy(
|
||||
u"Whether to include or exclude the given course. "
|
||||
u"If whitelist countries are specified, then ONLY users from whitelisted countries "
|
||||
@@ -196,19 +243,102 @@ class CountryAccessRule(models.Model):
|
||||
help_text=ugettext_lazy(u"The country to which this rule applies.")
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def cache_key_for_consolidated_countries(cls, course_id):
|
||||
"""
|
||||
Args:
|
||||
course_id (str): course_id to look for
|
||||
Returns:
|
||||
Consolidated list of accessible countries for given course
|
||||
"""
|
||||
return "{}/allowed/countries".format(course_id)
|
||||
|
||||
@classmethod
|
||||
def check_country_access(cls, course_id, country):
|
||||
"""
|
||||
Check if the country is either in whitelist or blacklist of countries for the course_id
|
||||
|
||||
Args:
|
||||
course_id (str): course_id to look for
|
||||
country (str): A 2 characters code of country
|
||||
|
||||
Returns:
|
||||
Boolean
|
||||
True if country found in allowed country
|
||||
otherwise check given country exists in list
|
||||
"""
|
||||
allowed_countries = cache.get(cls.cache_key_for_consolidated_countries(course_id))
|
||||
if not allowed_countries:
|
||||
allowed_countries = cls._get_country_access_list(course_id)
|
||||
cache.set(cls.cache_key_for_consolidated_countries(course_id), allowed_countries)
|
||||
|
||||
return country == '' or country in allowed_countries
|
||||
|
||||
@classmethod
|
||||
def _get_country_access_list(cls, course_id):
|
||||
"""
|
||||
if a course is blacklist for two countries then course can be accessible from
|
||||
any where except these two countries.
|
||||
if a course is whitelist for two countries then course can be accessible from
|
||||
these countries only.
|
||||
Args:
|
||||
course_id (str): course_id to look for
|
||||
Returns:
|
||||
List
|
||||
Consolidated list of accessible countries for given course
|
||||
"""
|
||||
|
||||
whitelist_countries = set()
|
||||
blacklist_countries = set()
|
||||
|
||||
# Retrieve all rules in one database query, performing the "join" with the Country table
|
||||
rules_for_course = CountryAccessRule.objects.select_related('country').filter(
|
||||
restricted_course__course_key=course_id
|
||||
)
|
||||
|
||||
# Filter the rules into a whitelist and blacklist in one pass
|
||||
for rule in rules_for_course:
|
||||
if rule.rule_type == 'whitelist':
|
||||
whitelist_countries.add(rule.country.country.code)
|
||||
elif rule.rule_type == 'blacklist':
|
||||
blacklist_countries.add(rule.country.country.code)
|
||||
|
||||
# If there are no whitelist countries, default to all countries
|
||||
if not whitelist_countries:
|
||||
whitelist_countries = set(code[0] for code in list(countries))
|
||||
|
||||
# Consolidate the rules into a single list of countries
|
||||
# that have access to the course.
|
||||
return list(whitelist_countries - blacklist_countries)
|
||||
|
||||
def __unicode__(self):
|
||||
if self.rule_type == 'whitelist':
|
||||
if self.rule_type == WHITE_LIST:
|
||||
return _(u"Whitelist {country} for {course}").format(
|
||||
course=unicode(self.restricted_course.course_key),
|
||||
country=unicode(self.country),
|
||||
)
|
||||
elif self.rule_type == 'blacklist':
|
||||
elif self.rule_type == BLACK_LIST:
|
||||
return _(u"Blacklist {country} for {course}").format(
|
||||
course=unicode(self.restricted_course.course_key),
|
||||
country=unicode(self.country),
|
||||
)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""
|
||||
Clear the cached value when saving a entry
|
||||
"""
|
||||
super(CountryAccessRule, self).save(*args, **kwargs)
|
||||
cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key)))
|
||||
|
||||
def delete(self, using=None):
|
||||
"""
|
||||
clear the cached value when saving a entry
|
||||
"""
|
||||
super(CountryAccessRule, self).delete()
|
||||
cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key)))
|
||||
|
||||
class Meta:
|
||||
"""a course can be added with either black or white list. """
|
||||
unique_together = (
|
||||
# This restriction ensures that a country is on
|
||||
# either the whitelist or the blacklist, but
|
||||
|
||||
287
common/djangoapps/embargo/tests/test_middleware_access_rules.py
Normal file
287
common/djangoapps/embargo/tests/test_middleware_access_rules.py
Normal file
@@ -0,0 +1,287 @@
|
||||
"""
|
||||
Tests for EmbargoMiddleware with CountryAccessRules
|
||||
"""
|
||||
|
||||
import mock
|
||||
import pygeoip
|
||||
import unittest
|
||||
|
||||
from django.db import connection, transaction
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.conf import settings
|
||||
import ddt
|
||||
|
||||
from student.tests.factories import UserFactory
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from xmodule.modulestore.tests.django_utils import (
|
||||
ModuleStoreTestCase, mixed_store_config
|
||||
)
|
||||
|
||||
# Explicitly import the cache from ConfigurationModel so we can reset it after each test
|
||||
from config_models.models import cache
|
||||
from embargo.models import (
|
||||
RestrictedCourse, Country, CountryAccessRule, WHITE_LIST, BLACK_LIST
|
||||
)
|
||||
from django_countries import countries
|
||||
|
||||
|
||||
# Since we don't need any XML course fixtures, use a modulestore configuration
|
||||
# that disables the XML modulestore.
|
||||
MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, include_xml=False)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class EmbargoCountryAccessRulesTests(ModuleStoreTestCase):
|
||||
"""
|
||||
Tests of EmbargoApi
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(EmbargoCountryAccessRulesTests, self).setUp()
|
||||
self.user = UserFactory(username='fred', password='secret')
|
||||
self.client.login(username='fred', password='secret')
|
||||
self.embargo_course1 = CourseFactory.create()
|
||||
self.embargo_course1.save()
|
||||
self.embargo_course2 = CourseFactory.create()
|
||||
self.embargo_course2.save()
|
||||
self.regular_course = CourseFactory.create(org="Regular")
|
||||
self.regular_course.save()
|
||||
self.embargoed_course_whitelisted = '/courses/' + self.embargo_course1.id.to_deprecated_string() + '/info'
|
||||
self.embargoed_course_blacklisted = '/courses/' + self.embargo_course2.id.to_deprecated_string() + '/info'
|
||||
self.regular_page = '/courses/' + self.regular_course.id.to_deprecated_string() + '/info'
|
||||
|
||||
restricted_course_1 = RestrictedCourse.objects.create(course_key=self.embargo_course1.id)
|
||||
restricted_course_2 = RestrictedCourse.objects.create(course_key=self.embargo_course2.id)
|
||||
|
||||
all_countries = [Country(country=code[0]) for code in list(countries)]
|
||||
Country.objects.bulk_create(all_countries)
|
||||
|
||||
country_access_white_rules = [
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_1,
|
||||
rule_type=WHITE_LIST,
|
||||
country=Country.objects.get(country='US')
|
||||
),
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_1,
|
||||
rule_type=WHITE_LIST,
|
||||
country=Country.objects.get(country='NZ')
|
||||
)
|
||||
]
|
||||
CountryAccessRule.objects.bulk_create(country_access_white_rules)
|
||||
|
||||
country_access_black_rules = [
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_2,
|
||||
rule_type=BLACK_LIST,
|
||||
country=Country.objects.get(country='CU')
|
||||
),
|
||||
CountryAccessRule(
|
||||
restricted_course=restricted_course_2,
|
||||
rule_type=BLACK_LIST,
|
||||
country=Country.objects.get(country='IR')
|
||||
)
|
||||
]
|
||||
CountryAccessRule.objects.bulk_create(country_access_black_rules)
|
||||
|
||||
# Text from lms/templates/static_templates/embargo.html
|
||||
self.embargo_text = "Unfortunately, at this time edX must comply with export controls, and we cannot allow you to access this course." # pylint: disable=line-too-long
|
||||
self.patcher = mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr', self.mock_country_code_by_addr)
|
||||
self.patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
# Explicitly clear ConfigurationModel's cache so tests have a clear cache
|
||||
# and don't interfere with each other
|
||||
cache.clear()
|
||||
self.patcher.stop()
|
||||
|
||||
def mock_country_code_by_addr(self, ip_addr):
|
||||
"""
|
||||
making a lists of countries which will be use in country access rules.
|
||||
if incoming request's ip belongs to this dict then related country will return.
|
||||
for one course CU and IR added as blacklist in course access rules.
|
||||
for one course US and NZ added as whitelist in course access rules.
|
||||
"""
|
||||
ip_dict = {
|
||||
'1.0.0.0': 'CU',
|
||||
'2.0.0.0': 'IR',
|
||||
'3.0.0.0': 'US',
|
||||
'4.0.0.0': 'NZ'
|
||||
}
|
||||
return ip_dict.get(ip_addr, 'FR')
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data('1.0.0.0', '2.0.0.0')
|
||||
def test_course_access_rules_with_black_rule_country_by_user_ip(self, ip_address):
|
||||
# Accessing an embargoed page from a user ip whose origin is added as
|
||||
# blacklist in course access rules should be redirected.
|
||||
# any other IP should be success
|
||||
|
||||
# Following the redirect should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
# Following the redirect should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address,
|
||||
follow=True
|
||||
)
|
||||
self.assertIn(self.embargo_text, response.content)
|
||||
|
||||
# accesssing blacklist course from any other country ip should be success
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR='5.0.0.1',
|
||||
REMOTE_ADDR='5.0.0.1'
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# accesssing whitelist course from these should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_course_whitelisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
# Accessing a regular page from these IP should be success
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data('3.0.0.0', '4.0.0.0', "7.0.0.1", "2001:250::")
|
||||
def test_course_access_rules_with_white_rule_country_by_user_ip(self, ip_address):
|
||||
# Accessing an embargoed page from a user ip whose origin is added as
|
||||
# white in course access rules should succeed. any other ip should be fail
|
||||
|
||||
response = self.client.get(
|
||||
self.embargoed_course_whitelisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
if ip_address in ['3.0.0.0', '4.0.0.0']:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
else:
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
# access the blacklisted course should give success
|
||||
response = self.client.get(
|
||||
self.embargoed_course_blacklisted,
|
||||
HTTP_X_FORWARDED_FOR=ip_address,
|
||||
REMOTE_ADDR=ip_address
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Accessing a regular page should success
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_embargo_profile_country_db_null(self):
|
||||
# Django country fields treat NULL values inconsistently.
|
||||
# When saving a profile with country set to None, Django saves an empty string to the database.
|
||||
# However, when the country field loads a NULL value from the database, it sets
|
||||
# `country.code` to `None`. This caused a bug in which country values created by
|
||||
# the original South schema migration -- which defaulted to NULL -- caused a runtime
|
||||
# exception when the embargo middleware treated the value as a string.
|
||||
# In order to simulate this behavior, we can't simply set `profile.country = None`.
|
||||
# (because when we save it, it will set the database field to an empty string instead of NULL)
|
||||
query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s"
|
||||
connection.cursor().execute(query, [str(self.user.profile.id)])
|
||||
transaction.commit_unless_managed()
|
||||
|
||||
# Attempt to access an embargoed course
|
||||
# Verify that the student can access the page without an error
|
||||
response = self.client.get(self.embargoed_course_blacklisted)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "AF", "NZ", "IR")
|
||||
def test_regular_course_accessible_from_every_where(self, profile_country):
|
||||
# regular course is accessible even when ENABLE_COUNTRY_ACCESS flag is true
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
response = self.client.get(self.regular_page)
|
||||
# Course is accessible from all countries.
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "AF", "NZ", "IR")
|
||||
def test_embargo_course_whitelisted_with_profile_country(self, profile_country):
|
||||
# course is emabargoed and has white list countries.
|
||||
# but user ip belongs to US but profile country is blacklist
|
||||
# only white list country can access the course.
|
||||
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
# adding the US IP so the _country_code_from_ip() get passed
|
||||
response = self.client.get(
|
||||
self.embargoed_course_whitelisted,
|
||||
HTTP_X_FORWARDED_FOR='3.0.0.0',
|
||||
REMOTE_ADDR='3.0.0.0'
|
||||
)
|
||||
# Course is whitelisted against US,NZ so all other countries will be disallowed
|
||||
if profile_country in ["CA", "AF", "IR"]:
|
||||
self.assertRedirects(response, reverse('embargo'))
|
||||
self.assertEqual(response.status_code, 302)
|
||||
else:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "NZ", "IR", "CU")
|
||||
def test_embargo_course_blacklisted_with_profile_country(self, profile_country):
|
||||
# if course is emabargoed and has black list countries ( CU , IR ).
|
||||
# then users from these countries can't access this course.
|
||||
# any user from other than these countries can access.
|
||||
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
response = self.client.get(self.embargoed_course_blacklisted)
|
||||
if profile_country in ["", "US", "CA", "NZ"]:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
else:
|
||||
embargo_url = reverse('embargo')
|
||||
self.assertRedirects(response, embargo_url)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
@ddt.data("", "US", "CA", "NZ", "IR", "CU")
|
||||
def test_embargo_course_without_any_rules_list(self, profile_country):
|
||||
# if course is emabargoed but without whitelist and blacklist
|
||||
# then course can be accessible from any where
|
||||
|
||||
profile = self.user.profile
|
||||
profile.country = profile_country
|
||||
profile.save()
|
||||
|
||||
embargo_course3 = CourseFactory.create()
|
||||
embargo_course3.save()
|
||||
RestrictedCourse(course_key=embargo_course3.id).save()
|
||||
embargoed_course_page = '/courses/' + embargo_course3.id.to_deprecated_string() + '/info'
|
||||
|
||||
response = self.client.get(embargoed_course_page)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
|
||||
def test_embargo_profile_country_cache(self):
|
||||
# Warm the cache
|
||||
with self.assertNumQueries(24):
|
||||
self.client.get(self.embargoed_course_blacklisted)
|
||||
|
||||
# Access the page multiple times, but expect that we hit
|
||||
# the database to check the user's profile only once
|
||||
with self.assertNumQueries(9):
|
||||
self.client.get(self.embargoed_course_blacklisted)
|
||||
@@ -1,8 +1,11 @@
|
||||
"""Test of models for embargo middleware app"""
|
||||
from django.test import TestCase
|
||||
|
||||
from django.db.utils import IntegrityError
|
||||
from opaque_keys.edx.locations import SlashSeparatedCourseKey
|
||||
from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter
|
||||
from embargo.models import (
|
||||
EmbargoedCourse, EmbargoedState, IPFilter, RestrictedCourse,
|
||||
Country, CountryAccessRule, WHITE_LIST, BLACK_LIST
|
||||
)
|
||||
|
||||
|
||||
class EmbargoModelsTest(TestCase):
|
||||
@@ -95,3 +98,137 @@ class EmbargoModelsTest(TestCase):
|
||||
self.assertTrue('1.1.0.1' in cblacklist)
|
||||
self.assertTrue('1.1.1.0' in cblacklist)
|
||||
self.assertFalse('1.2.0.0' in cblacklist)
|
||||
|
||||
|
||||
class RestrictedCourseTest(TestCase):
|
||||
"""Test unicode values tests and cache functionality"""
|
||||
|
||||
def test_unicode_values(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
restricted_course = RestrictedCourse.objects.create(course_key=course_id)
|
||||
self.assertEquals(
|
||||
restricted_course.__unicode__(),
|
||||
"abc/123/doremi"
|
||||
)
|
||||
|
||||
def test_restricted_course_cache_with_save_delete(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
RestrictedCourse.objects.create(course_key=course_id)
|
||||
|
||||
# Warm the cache
|
||||
with self.assertNumQueries(1):
|
||||
RestrictedCourse.is_restricted_course(course_id)
|
||||
|
||||
# it should come from cache
|
||||
with self.assertNumQueries(0):
|
||||
RestrictedCourse.is_restricted_course(course_id)
|
||||
|
||||
# add new the course so the cache must get delete and again hit the db
|
||||
new_course_id = SlashSeparatedCourseKey('def', '123', 'doremi')
|
||||
RestrictedCourse.objects.create(course_key=new_course_id)
|
||||
with self.assertNumQueries(1):
|
||||
RestrictedCourse.is_restricted_course(new_course_id)
|
||||
|
||||
# it should come from cache
|
||||
with self.assertNumQueries(0):
|
||||
RestrictedCourse.is_restricted_course(new_course_id)
|
||||
|
||||
# deleting an object will delete cache also.and hit db on
|
||||
# get the is_restricted course
|
||||
abc = RestrictedCourse.objects.get(course_key=new_course_id)
|
||||
abc.delete()
|
||||
with self.assertNumQueries(1):
|
||||
RestrictedCourse.is_restricted_course(new_course_id)
|
||||
|
||||
# it should come from cache
|
||||
with self.assertNumQueries(0):
|
||||
RestrictedCourse.is_restricted_course(new_course_id)
|
||||
|
||||
|
||||
class CountryTest(TestCase):
|
||||
"""Test unicode values test"""
|
||||
|
||||
def test_unicode_values(self):
|
||||
country = Country.objects.create(country='NZ')
|
||||
self.assertEquals(
|
||||
country.__unicode__(),
|
||||
"New Zealand (NZ)"
|
||||
)
|
||||
|
||||
|
||||
class CountryAccessRuleTest(TestCase):
|
||||
"""Test unicode values tests and unique-together contraint"""
|
||||
|
||||
def test_unicode_values(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
country = Country.objects.create(country='NZ')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
access_rule = CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=WHITE_LIST,
|
||||
country=country
|
||||
)
|
||||
|
||||
self.assertEquals(
|
||||
access_rule.__unicode__(),
|
||||
"Whitelist New Zealand (NZ) for abc/123/doremi"
|
||||
)
|
||||
|
||||
course_id = SlashSeparatedCourseKey('def', '123', 'doremi')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
access_rule = CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=BLACK_LIST,
|
||||
country=country
|
||||
)
|
||||
|
||||
self.assertEquals(
|
||||
access_rule.__unicode__(),
|
||||
"Blacklist New Zealand (NZ) for def/123/doremi"
|
||||
)
|
||||
|
||||
def test_unique_together_constraint(self):
|
||||
"""
|
||||
Course with specific country can be added either as whitelist or blacklist
|
||||
trying to add with both types will raise error
|
||||
"""
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
country = Country.objects.create(country='NZ')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
|
||||
CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=WHITE_LIST,
|
||||
country=country
|
||||
)
|
||||
|
||||
with self.assertRaises(IntegrityError):
|
||||
CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=BLACK_LIST,
|
||||
country=country
|
||||
)
|
||||
|
||||
def test_country_access_list_cache_with_save_delete(self):
|
||||
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
|
||||
country = Country.objects.create(country='NZ')
|
||||
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
|
||||
|
||||
course = CountryAccessRule.objects.create(
|
||||
restricted_course=restricted_course1,
|
||||
rule_type=WHITE_LIST,
|
||||
country=country
|
||||
)
|
||||
|
||||
# Warm the cache
|
||||
with self.assertNumQueries(1):
|
||||
CountryAccessRule.check_country_access(course_id, 'NZ')
|
||||
|
||||
with self.assertNumQueries(0):
|
||||
CountryAccessRule.check_country_access(course_id, 'NZ')
|
||||
|
||||
# deleting an object will delete cache also.and hit db on
|
||||
# get the country access lists for course
|
||||
course.delete()
|
||||
with self.assertNumQueries(1):
|
||||
CountryAccessRule.check_country_access(course_id, 'NZ')
|
||||
|
||||
Reference in New Issue
Block a user