Extended Embargo feature to support site access restriction and ip network
This commit is contained in:
@@ -88,6 +88,9 @@ FEATURES = {
|
||||
# Toggles embargo functionality
|
||||
'EMBARGO': False,
|
||||
|
||||
# Toggle embargo site functionality
|
||||
'EMBARGO_SITE': False,
|
||||
|
||||
# Turn on/off Microsites feature
|
||||
'USE_MICROSITES': False,
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from django import forms
|
||||
from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter
|
||||
from embargo.fixtures.country_codes import COUNTRY_CODES
|
||||
|
||||
import socket
|
||||
import ipaddr
|
||||
|
||||
from xmodule.modulestore.django import modulestore
|
||||
|
||||
@@ -76,21 +76,12 @@ class IPFilterForm(forms.ModelForm): # pylint: disable=incomplete-protocol
|
||||
class Meta: # pylint: disable=missing-docstring
|
||||
model = IPFilter
|
||||
|
||||
def _is_valid_ipv4(self, address):
|
||||
"""Whether or not address is a valid ipv4 address"""
|
||||
def _is_valid_ip(self, address):
|
||||
"""Whether or not address is a valid ipv4 address or ipv6 address"""
|
||||
try:
|
||||
# Is this an ipv4 address?
|
||||
socket.inet_pton(socket.AF_INET, address)
|
||||
except socket.error:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _is_valid_ipv6(self, address):
|
||||
"""Whether or not address is a valid ipv6 address"""
|
||||
try:
|
||||
# Is this an ipv6 address?
|
||||
socket.inet_pton(socket.AF_INET6, address)
|
||||
except socket.error:
|
||||
# Is this an valid ip address?
|
||||
ipaddr.IPNetwork(address)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -105,7 +96,7 @@ class IPFilterForm(forms.ModelForm): # pylint: disable=incomplete-protocol
|
||||
error_addresses = []
|
||||
for addr in addresses.split(','):
|
||||
address = addr.strip()
|
||||
if not (self._is_valid_ipv4(address) or self._is_valid_ipv6(address)):
|
||||
if not self._is_valid_ip(address):
|
||||
error_addresses.append(address)
|
||||
if error_addresses:
|
||||
msg = 'Invalid IP Address(es): {0}'.format(error_addresses)
|
||||
|
||||
@@ -1,11 +1,34 @@
|
||||
"""
|
||||
Middleware for embargoing courses.
|
||||
"""Middleware for embargoing site and courses.
|
||||
|
||||
IMPORTANT NOTE: This code WILL NOT WORK if you have a misconfigured proxy
|
||||
server. If you are configuring embargo functionality, or if you are
|
||||
experiencing mysterious problems with embargoing, please check that your
|
||||
reverse proxy is setting any of the well known client IP address headers (ex.,
|
||||
HTTP_X_FORWARDED_FOR).
|
||||
|
||||
This middleware allows you to:
|
||||
|
||||
* Embargoing courses (access restriction by courses)
|
||||
* Embargoing site (access restriction of the main site)
|
||||
|
||||
Embargo can restrict by states and whitelist/blacklist (IP Addresses
|
||||
(ie. 10.0.0.0) or Networks (ie. 10.0.0.0/24)).
|
||||
|
||||
Usage:
|
||||
|
||||
# Enable the middleware in your settings
|
||||
|
||||
# To enable Embargoing by courses, add:
|
||||
FEATURES['EMBARGO'] = True # blocked ip will be redirected to /embargo
|
||||
|
||||
# To enable Embargoing site:
|
||||
FEATURES['EMBARGO_SITE'] = True
|
||||
|
||||
# With EMBARGO_SITE, you can define an external to redirect with:
|
||||
EMBARGO_SITE_REDIRECT_URL = 'https://www.edx.org/'
|
||||
|
||||
# if EMBARGO_SITE_REDIRECT_URL is missing, a HttpResponseForbidden is returned.
|
||||
|
||||
"""
|
||||
import logging
|
||||
import pygeoip
|
||||
@@ -13,6 +36,7 @@ import pygeoip
|
||||
from django.core.exceptions import MiddlewareNotUsed
|
||||
from django.conf import settings
|
||||
from django.shortcuts import redirect
|
||||
from django.http import HttpResponseRedirect, HttpResponseForbidden
|
||||
from ipware.ip import get_ip
|
||||
from util.request import course_id_from_url
|
||||
|
||||
@@ -23,14 +47,16 @@ log = logging.getLogger(__name__)
|
||||
|
||||
class EmbargoMiddleware(object):
|
||||
"""
|
||||
Middleware for embargoing courses
|
||||
Middleware for embargoing site and courses
|
||||
|
||||
This is configured by creating ``EmbargoedCourse``, ``EmbargoedState``, and
|
||||
optionally ``IPFilter`` rows in the database, using the django admin site.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.site_enabled = settings.FEATURES.get('EMBARGO_SITE', False)
|
||||
# If embargoing is turned off, make this middleware do nothing
|
||||
if not settings.FEATURES.get('EMBARGO', False):
|
||||
if not settings.FEATURES.get('EMBARGO', False) and \
|
||||
not self.site_enabled:
|
||||
raise MiddlewareNotUsed()
|
||||
|
||||
def process_request(self, request):
|
||||
@@ -41,21 +67,28 @@ class EmbargoMiddleware(object):
|
||||
course_id = course_id_from_url(url)
|
||||
|
||||
# If they're trying to access a course that cares about embargoes
|
||||
if EmbargoedCourse.is_embargoed(course_id):
|
||||
if self.site_enabled or EmbargoedCourse.is_embargoed(course_id):
|
||||
response = redirect('embargo')
|
||||
# Set the proper response if site is enabled
|
||||
if self.site_enabled:
|
||||
redirect_url = getattr(settings, 'EMBARGO_SITE_REDIRECT_URL', None)
|
||||
response = HttpResponseRedirect(redirect_url) if redirect_url \
|
||||
else HttpResponseForbidden('Access Denied')
|
||||
|
||||
# If we're having performance issues, add caching here
|
||||
ip_addr = get_ip(request)
|
||||
|
||||
# if blacklisted, immediately fail
|
||||
if ip_addr in IPFilter.current().blacklist_ips:
|
||||
log.info("Embargo: Restricting IP address %s to course %s because IP is blacklisted.", ip_addr, course_id)
|
||||
return redirect('embargo')
|
||||
log.info("Embargo: Restricting IP address %s because IP is blacklisted.", ip_addr)
|
||||
return response
|
||||
|
||||
country_code_from_ip = pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr)
|
||||
is_embargoed = country_code_from_ip in EmbargoedState.current().embargoed_countries_list
|
||||
# Fail if country is embargoed and the ip address isn't explicitly whitelisted
|
||||
if is_embargoed and ip_addr not in IPFilter.current().whitelist_ips:
|
||||
log.info(
|
||||
"Embargo: Restricting IP address %s to course %s because IP is from country %s.",
|
||||
ip_addr, course_id, country_code_from_ip
|
||||
"Embargo: Restricting IP address %s because IP is from country %s.",
|
||||
ip_addr, country_code_from_ip
|
||||
)
|
||||
return redirect('embargo')
|
||||
return response
|
||||
|
||||
@@ -10,6 +10,9 @@ file and check it in at the same time as your model changes. To do that,
|
||||
2. ./manage.py lms schemamigration embargo --auto description_of_your_change
|
||||
3. Add the migration file created in edx-platform/common/djangoapps/embargo/migrations/
|
||||
"""
|
||||
|
||||
import ipaddr
|
||||
|
||||
from django.db import models
|
||||
|
||||
from config_models.models import ConfigurationModel
|
||||
@@ -79,6 +82,30 @@ class IPFilter(ConfigurationModel):
|
||||
help_text="A comma-separated list of IP addresses that should fall under embargo restrictions."
|
||||
)
|
||||
|
||||
class IPFilterList(object):
|
||||
"""
|
||||
Represent a list of IP addresses with support of networks.
|
||||
"""
|
||||
|
||||
def __init__(self, ips):
|
||||
self.networks = [ipaddr.IPNetwork(ip) for ip in ips]
|
||||
|
||||
def __iter__(self):
|
||||
for network in self.networks:
|
||||
yield network
|
||||
|
||||
def __contains__(self, ip):
|
||||
try:
|
||||
ip = ipaddr.IPAddress(ip)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
for network in self.networks:
|
||||
if network.Contains(ip):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@property
|
||||
def whitelist_ips(self):
|
||||
"""
|
||||
@@ -86,7 +113,7 @@ class IPFilter(ConfigurationModel):
|
||||
"""
|
||||
if self.whitelist == '':
|
||||
return []
|
||||
return [addr.strip() for addr in self.whitelist.split(',')] # pylint: disable=no-member
|
||||
return self.IPFilterList([addr.strip() for addr in self.whitelist.split(',')]) # pylint: disable=no-member
|
||||
|
||||
@property
|
||||
def blacklist_ips(self):
|
||||
@@ -95,4 +122,4 @@ class IPFilter(ConfigurationModel):
|
||||
"""
|
||||
if self.blacklist == '':
|
||||
return []
|
||||
return [addr.strip() for addr in self.blacklist.split(',')] # pylint: disable=no-member
|
||||
return self.IPFilterList([addr.strip() for addr in self.blacklist.split(',')]) # pylint: disable=no-member
|
||||
|
||||
@@ -156,8 +156,8 @@ class IPFilterFormTest(TestCase):
|
||||
# should be able to do both ipv4 and ipv6
|
||||
# spacing should not matter
|
||||
form_data = {
|
||||
'whitelist': '127.0.0.1, 2003:dead:beef:4dad:23:46:bb:101',
|
||||
'blacklist': ' 18.244.1.5 , 2002:c0a8:101::42, 18.36.22.1'
|
||||
'whitelist': '127.0.0.1, 2003:dead:beef:4dad:23:46:bb:101, 1.1.0.1/32, 1.0.0.0/24',
|
||||
'blacklist': ' 18.244.1.5 , 2002:c0a8:101::42, 18.36.22.1, 1.0.0.0/16'
|
||||
}
|
||||
form = IPFilterForm(data=form_data)
|
||||
self.assertTrue(form.is_valid())
|
||||
@@ -169,6 +169,20 @@ class IPFilterFormTest(TestCase):
|
||||
for addr in '18.244.1.5, 2002:c0a8:101::42, 18.36.22.1'.split(','):
|
||||
self.assertIn(addr.strip(), blacklist)
|
||||
|
||||
# Network tests
|
||||
# ips not in whitelist network
|
||||
for addr in '1.1.0.2, 1.0.1.0'.split(','):
|
||||
self.assertNotIn(addr.strip(), whitelist)
|
||||
# ips in whitelist network
|
||||
for addr in '1.1.0.1, 1.0.0.100'.split(','):
|
||||
self.assertIn(addr.strip(), whitelist)
|
||||
# ips not in blacklist network
|
||||
for addr in '2.0.0.0, 1.1.0.0'.split(','):
|
||||
self.assertNotIn(addr.strip(), blacklist)
|
||||
# ips in blacklist network
|
||||
for addr in '1.0.100.0, 1.0.0.10'.split(','):
|
||||
self.assertIn(addr.strip(), blacklist)
|
||||
|
||||
# Test clearing by adding an empty list is OK too
|
||||
form_data = {
|
||||
'whitelist': '',
|
||||
@@ -183,15 +197,15 @@ class IPFilterFormTest(TestCase):
|
||||
def test_add_invalid_ips(self):
|
||||
# test adding invalid ip addresses
|
||||
form_data = {
|
||||
'whitelist': '.0.0.1, :dead:beef:::',
|
||||
'blacklist': ' 18.244.* , 999999:c0a8:101::42'
|
||||
'whitelist': '.0.0.1, :dead:beef:::, 1.0.0.0/55',
|
||||
'blacklist': ' 18.244.* , 999999:c0a8:101::42, 1.0.0.0/'
|
||||
}
|
||||
form = IPFilterForm(data=form_data)
|
||||
self.assertFalse(form.is_valid())
|
||||
|
||||
wmsg = "Invalid IP Address(es): [u'.0.0.1', u':dead:beef:::'] Please fix the error(s) and try again."
|
||||
wmsg = "Invalid IP Address(es): [u'.0.0.1', u':dead:beef:::', u'1.0.0.0/55'] Please fix the error(s) and try again."
|
||||
self.assertEquals(wmsg, form._errors['whitelist'][0]) # pylint: disable=protected-access
|
||||
bmsg = "Invalid IP Address(es): [u'18.244.*', u'999999:c0a8:101::42'] Please fix the error(s) and try again."
|
||||
bmsg = "Invalid IP Address(es): [u'18.244.*', u'999999:c0a8:101::42', u'1.0.0.0/'] Please fix the error(s) and try again."
|
||||
self.assertEquals(bmsg, form._errors['blacklist'][0]) # pylint: disable=protected-access
|
||||
|
||||
with self.assertRaisesRegexp(ValueError, "The IPFilter could not be created because the data didn't validate."):
|
||||
|
||||
@@ -129,6 +129,62 @@ class EmbargoMiddlewareTests(TestCase):
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
def test_ip_network_exceptions(self):
|
||||
# Explicitly whitelist/blacklist some IP networks
|
||||
IPFilter(
|
||||
whitelist='1.0.0.1/24',
|
||||
blacklist='5.0.0.0/16,1.1.0.0/24',
|
||||
changed_by=self.user,
|
||||
enabled=True
|
||||
).save()
|
||||
|
||||
# Accessing an embargoed page from a blocked IP that's been whitelisted with a network
|
||||
# should succeed
|
||||
response = self.client.get(self.embargoed_page, HTTP_X_FORWARDED_FOR='1.0.0.0', REMOTE_ADDR='1.0.0.0')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Accessing a regular course from a blocked IP that's been whitelisted with a network
|
||||
# should succeed
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='1.0.0.0', REMOTE_ADDR='1.0.0.0')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Accessing an embargoed course from non-embargoed IP that's been blacklisted with a network
|
||||
# should cause a redirect
|
||||
response = self.client.get(self.embargoed_page, HTTP_X_FORWARDED_FOR='5.0.0.100', REMOTE_ADDR='5.0.0.100')
|
||||
self.assertEqual(response.status_code, 302)
|
||||
# Following the redirect should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_page,
|
||||
HTTP_X_FORWARDED_FOR='5.0.0.0',
|
||||
REMOTE_ADDR='1.0.0.0',
|
||||
follow=True
|
||||
)
|
||||
self.assertIn(self.embargo_text, response.content)
|
||||
|
||||
# Accessing an embargoed course from non-embargoed IP that's been blaclisted with a network
|
||||
# should cause a redirect
|
||||
response = self.client.get(self.embargoed_page, HTTP_X_FORWARDED_FOR='1.1.0.1', REMOTE_ADDR='1.1.0.1')
|
||||
self.assertEqual(response.status_code, 302)
|
||||
# Following the redirect should give us the embargo page
|
||||
response = self.client.get(
|
||||
self.embargoed_page,
|
||||
HTTP_X_FORWARDED_FOR='1.1.0.0',
|
||||
REMOTE_ADDR='1.1.0.0',
|
||||
follow=True
|
||||
)
|
||||
self.assertIn(self.embargo_text, response.content)
|
||||
|
||||
# Accessing an embargoed from a blocked IP that's not blacklisted by the network rule.
|
||||
# should succeed
|
||||
response = self.client.get(self.embargoed_page, HTTP_X_FORWARDED_FOR='1.1.1.0', REMOTE_ADDR='1.1.1.0')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Accessing a regular course from a non-embargoed IP that's been blacklisted
|
||||
# should succeed
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
@mock.patch.dict(settings.FEATURES, {'EMBARGO': False})
|
||||
def test_countries_embargo_off(self):
|
||||
@@ -157,3 +213,25 @@ class EmbargoMiddlewareTests(TestCase):
|
||||
# Accessing a regular course from a non-embargoed IP that's been blacklisted should succeed
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
@mock.patch.dict(settings.FEATURES, {'EMBARGO': False, 'EMBARGO_SITE': True})
|
||||
def test_embargo_off_embargo_site_on(self):
|
||||
# When the middleware is turned on with SITE, main site access should be restricted
|
||||
# Accessing a regular page from a blocked IP is denied.
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='1.0.0.0', REMOTE_ADDR='1.0.0.0')
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
# Accessing a regular page from a non blocked IP should succeed
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
@mock.patch.dict(settings.FEATURES, {'EMBARGO': False, 'EMBARGO_SITE': True})
|
||||
@override_settings(EMBARGO_SITE_REDIRECT_URL='https://www.edx.org/')
|
||||
def test_embargo_off_embargo_site_on_with_redirect_url(self):
|
||||
# When the middleware is turned on with EMBARGO_SITE, main site access
|
||||
# should be restricted. Accessing a regular page from a blocked IP is
|
||||
# denied, and redirected to EMBARGO_SITE_REDIRECT_URL rather than returning a 403.
|
||||
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='1.0.0.0', REMOTE_ADDR='1.0.0.0')
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
@@ -78,3 +78,19 @@ class EmbargoModelsTest(TestCase):
|
||||
self.assertTrue(whitelist in cwhitelist)
|
||||
cblacklist = IPFilter.current().blacklist_ips
|
||||
self.assertTrue(blacklist in cblacklist)
|
||||
|
||||
# network tests
|
||||
whitelist = '1.0.0.0/24'
|
||||
blacklist = '1.1.0.0/16'
|
||||
|
||||
IPFilter(whitelist=whitelist, blacklist=blacklist).save()
|
||||
|
||||
cwhitelist = IPFilter.current().whitelist_ips
|
||||
self.assertTrue('1.0.0.100' in cwhitelist)
|
||||
self.assertTrue('1.0.0.10' in cwhitelist)
|
||||
self.assertFalse('1.0.1.0' in cwhitelist)
|
||||
cblacklist = IPFilter.current().blacklist_ips
|
||||
self.assertTrue('1.1.0.0' in cblacklist)
|
||||
self.assertTrue('1.1.0.1' in cblacklist)
|
||||
self.assertTrue('1.1.1.0' in cblacklist)
|
||||
self.assertFalse('1.2.0.0' in cblacklist)
|
||||
|
||||
@@ -228,6 +228,9 @@ FEATURES = {
|
||||
# Toggle embargo functionality
|
||||
'EMBARGO': False,
|
||||
|
||||
# Toggle embargo site functionality
|
||||
'EMBARGO_SITE': False,
|
||||
|
||||
# Whether the Wiki subsystem should be accessible via the direct /wiki/ paths. Setting this to True means
|
||||
# that people can submit content and modify the Wiki in any arbitrary manner. We're leaving this as True in the
|
||||
# defaults, so that we maintain current behavior
|
||||
|
||||
@@ -91,6 +91,9 @@ sphinx_rtd_theme==0.1.5
|
||||
Babel==1.3
|
||||
transifex-client==0.10
|
||||
|
||||
# Ip network support for Embargo feature
|
||||
ipaddr==2.1.11
|
||||
|
||||
# We've tried several times to update the debug toolbar to version 1.0.1,
|
||||
# and had problems each time, resulting in us rolling back to 0.9.4. Before
|
||||
# submitting another pull request to do this update, check the following:
|
||||
|
||||
Reference in New Issue
Block a user