From 98ee3a53779491a7f353c56f4abd4f2ddf52b74a Mon Sep 17 00:00:00 2001 From: Will Daly Date: Tue, 10 Feb 2015 08:02:42 -0500 Subject: [PATCH] Implement IP filtering in embargo middleware. Add history table for course access rule changes. Provide test utility for simulating restricted access. Provide `redirect_if_blocked` method for integration with other parts of the system (will be used for blocking enrollment). Add info-level logging explaining when and why users are blocked. --- common/djangoapps/embargo/api.py | 141 +++++-- common/djangoapps/embargo/exceptions.py | 11 + common/djangoapps/embargo/middleware.py | 123 +++++- .../0005_add_courseaccessrulehistory.py | 114 +++++ common/djangoapps/embargo/models.py | 362 +++++++++++++--- common/djangoapps/embargo/test_utils.py | 82 ++++ common/djangoapps/embargo/tests/test_api.py | 251 +++++++++++ .../tests/test_middleware_access_rules.py | 398 +++++++----------- .../djangoapps/embargo/tests/test_models.py | 183 ++++++-- 9 files changed, 1269 insertions(+), 396 deletions(-) create mode 100644 common/djangoapps/embargo/exceptions.py create mode 100644 common/djangoapps/embargo/migrations/0005_add_courseaccessrulehistory.py create mode 100644 common/djangoapps/embargo/test_utils.py create mode 100644 common/djangoapps/embargo/tests/test_api.py diff --git a/common/djangoapps/embargo/api.py b/common/djangoapps/embargo/api.py index e9c79d9fbd..a922e3904b 100644 --- a/common/djangoapps/embargo/api.py +++ b/common/djangoapps/embargo/api.py @@ -10,12 +10,114 @@ import pygeoip from django.core.cache import cache from django.conf import settings + from embargo.models import CountryAccessRule, RestrictedCourse + log = logging.getLogger(__name__) -def get_user_country_from_profile(user): +def redirect_if_blocked(course_key, access_point='enrollment', **kwargs): + """Redirect if the user does not have access to the course. + + Arguments: + course_key (CourseKey): Location of the course the user is trying to access. + + Keyword Arguments: + Same as `check_course_access` and `message_url_path` + + """ + if settings.FEATURES.get('ENABLE_COUNTRY_ACCESS'): + is_blocked = not check_course_access(course_key, **kwargs) + if is_blocked: + return message_url_path(course_key, access_point) + + +def check_course_access(course_key, user=None, ip_address=None, url=None): + """ + Check is the user with this ip_address has access to the given course + + Arguments: + course_key (CourseKey): Location of the course the user is trying to access. + + Keyword Arguments: + user (User): The user making the request. Can be None, in which case + the user's profile country will not be checked. + ip_address (str): The IP address of the request. + url (str): The URL the user is trying to access. Used in + log messages. + + Returns: + Boolean: True if the user has access to the course; False otherwise + + """ + # First, check whether there are any restrictions on the course. + # If not, then we do not need to do any further checks + course_is_restricted = RestrictedCourse.is_restricted_course(course_key) + + if not course_is_restricted: + return True + + if ip_address is not None: + # Retrieve the country code from the IP address + # and check it against the allowed countries list for a course + user_country_from_ip = _country_code_from_ip(ip_address) + + if not CountryAccessRule.check_country_access(course_key, user_country_from_ip): + log.info( + ( + u"Blocking user %s from accessing course %s at %s " + u"because the user's IP address %s appears to be " + u"located in %s." + ), + getattr(user, 'id', ''), + course_key, + url, + ip_address, + user_country_from_ip + ) + return False + + if user is not None: + # Retrieve the country code from the user's profile + # and check it against the allowed countries list for a course. + user_country_from_profile = _get_user_country_from_profile(user) + + if not CountryAccessRule.check_country_access(course_key, user_country_from_profile): + log.info( + ( + u"Blocking user %s from accessing course %s at %s " + u"because the user's profile country is %s." + ), + user.id, course_key, url, user_country_from_profile + ) + return False + + return True + + +def message_url_path(course_key, access_point): + """Determine the URL path for the message explaining why the user was blocked. + + This is configured per-course. See `RestrictedCourse` in the `embargo.models` + module for more details. + + Arguments: + course_key (CourseKey): The location of the course. + access_point (str): How the user was trying to access the course. + Can be either "enrollment" or "courseware". + + Returns: + unicode: The URL path to a page explaining why the user was blocked. + + Raises: + InvalidAccessPoint: Raised if access_point is not a supported value. + + """ + return RestrictedCourse.message_url_path(course_key, access_point) + + +def _get_user_country_from_profile(user): """ Check whether the user is embargoed based on the country code in the user's profile. @@ -55,40 +157,3 @@ def _country_code_from_ip(ip_addr): return pygeoip.GeoIP(settings.GEOIPV6_PATH).country_code_by_addr(ip_addr) else: return pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr) - - -def check_course_access(user, ip_address, course_key): - """ - Check is the user with this ip_address has access to the given course - - Params: - user (User): Currently logged in user object - ip_address (str): The ip_address of user - course_key (CourseLocator): CourseLocator object the user is trying to access - - Returns: - The return will be True if the user has access on the course. - if any constraints fails it will return the False - """ - course_is_restricted = RestrictedCourse.is_restricted_course(course_key) - # If they're trying to access a course that cares about embargoes - - # If course is not restricted then return immediately return True - # no need for further checking - if not course_is_restricted: - return True - - # Retrieve the country code from the IP address - # and check it against the allowed countries list for a course - user_country_from_ip = _country_code_from_ip(ip_address) - # if user country has access to course return True - if not CountryAccessRule.check_country_access(course_key, user_country_from_ip): - return False - - # Retrieve the country code from the user profile. - user_country_from_profile = get_user_country_from_profile(user) - # if profile country has access return True - if not CountryAccessRule.check_country_access(course_key, user_country_from_profile): - return False - - return True diff --git a/common/djangoapps/embargo/exceptions.py b/common/djangoapps/embargo/exceptions.py new file mode 100644 index 0000000000..39441eb99d --- /dev/null +++ b/common/djangoapps/embargo/exceptions.py @@ -0,0 +1,11 @@ +"""Exceptions for the embargo app.""" + + +class InvalidAccessPoint(Exception): + """The requested access point is not supported. """ + + def __init__(self, access_point, *args, **kwargs): + msg = ( + u"Access point '{access_point}' should be either 'enrollment' or 'courseware'" + ).format(access_point=access_point) + super(InvalidAccessPoint, self).__init__(msg, *args, **kwargs) diff --git a/common/djangoapps/embargo/middleware.py b/common/djangoapps/embargo/middleware.py index 8c0801655d..556c3b37f7 100644 --- a/common/djangoapps/embargo/middleware.py +++ b/common/djangoapps/embargo/middleware.py @@ -32,11 +32,13 @@ EMBARGO_SITE_REDIRECT_URL = 'https://www.edx.org/' """ from functools import partial import logging +import re import pygeoip from lazy import lazy from django.core.exceptions import MiddlewareNotUsed from django.core.cache import cache +from django.core.urlresolvers import reverse from django.conf import settings from django.shortcuts import redirect from django.http import HttpResponseRedirect, HttpResponseForbidden @@ -45,7 +47,7 @@ from util.request import course_id_from_url from student.models import unique_id_for_user from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter -from embargo.api import check_course_access +from embargo import api as embargo_api log = logging.getLogger(__name__) @@ -58,6 +60,17 @@ class EmbargoMiddleware(object): optionally ``IPFilter`` rows in the database, using the django admin site. """ + ALLOW_URL_PATTERNS = [ + # Don't block the embargo message pages; otherwise we'd + # end up in an infinite redirect loop. + re.compile(r'^/embargo/blocked-message/'), + + # Don't block the Django admin pages. Otherwise, we might + # accidentally lock ourselves out of Django admin + # during testing. + re.compile(r'^/admin/'), + ] + # Reasons a user might be blocked. # These are used to generate info messages in the logs. REASONS = { @@ -71,20 +84,81 @@ class EmbargoMiddleware(object): def __init__(self): self.site_enabled = settings.FEATURES.get('SITE_EMBARGOED', False) - # If embargoing is turned off, make this middleware do nothing - if not settings.FEATURES.get('EMBARGO', False) and not self.site_enabled: - raise MiddlewareNotUsed() self.enable_country_access = settings.FEATURES.get('ENABLE_COUNTRY_ACCESS', False) + # If embargoing is turned off, make this middleware do nothing + disable_middleware = not ( + settings.FEATURES.get('EMBARGO') or + self.site_enabled or + self.enable_country_access + ) + if disable_middleware: + raise MiddlewareNotUsed() + def process_request(self, request): + """Block requests based on embargo rules. + + In the new ENABLE_COUNTRY_ACCESS implmentation, + this will perform the following checks: + + 1) If the user's IP address is blacklisted, block. + 2) If the user's IP address is whitelisted, allow. + 3) If the user's country (inferred from their IP address) is blocked for + a courseware page, block. + 4) If the user's country (retrieved from the user's profile) is blocked + for a courseware page, block. + 5) Allow access. + """ - Processes embargo requests. - """ + # If the feature flag is set, use the new "country access" implementation. + # This is a more flexible implementation of the embargo feature that allows + # per-course country access rules. if self.enable_country_access: - if self.country_access_rules(request): + + # Never block certain patterns by IP address + for pattern in self.ALLOW_URL_PATTERNS: + if pattern.match(request.path) is not None: + return None + + ip_address = get_ip(request) + ip_filter = IPFilter.current() + + if ip_filter.enabled and ip_address in ip_filter.blacklist_ips: + log.info( + ( + u"User %s was blocked from accessing %s " + u"because IP address %s is blacklisted." + ), request.user.id, request.path, ip_address + ) + + # If the IP is blacklisted, reject. + # This applies to any request, not just courseware URLs. + ip_blacklist_url = reverse( + 'embargo_blocked_message', + kwargs={ + 'access_point': 'courseware', + 'message_key': 'embargo' + } + ) + return redirect(ip_blacklist_url) + + elif ip_filter.enabled and ip_address in ip_filter.whitelist_ips: + log.info( + ( + u"User %s was allowed access to %s because " + u"IP address %s is whitelisted." + ), + request.user.id, request.path, ip_address + ) + + # If the IP is whitelisted, then allow access, + # skipping later checks. return None + else: - return self._embargo_redirect_response + # Otherwise, perform the country access checks. + # This applies only to courseware URLs. + return self.country_access_rules(request.user, ip_address, request.path) url = request.path course_id = course_id_from_url(url) @@ -306,19 +380,30 @@ class EmbargoMiddleware(object): return _inner - def country_access_rules(self, request): + def country_access_rules(self, user, ip_address, url_path): """ - check the country access rules for a given course. - if course id is invalid return True + Check the country access rules for a given course. + Applies only to courseware URLs. + Args: - request + user (User): The user making the current request. + ip_address (str): The IP address from which the request originated. + url_path (str): The request path. - Return: - boolean: True if the user has access else false. + Returns: + HttpResponse or None """ - url = request.path - course_id = course_id_from_url(url) - if course_id is None: - return True - return check_course_access(request.user, get_ip(request), course_id) + course_id = course_id_from_url(url_path) + + if course_id: + redirect_url = embargo_api.redirect_if_blocked( + course_id, + user=user, + ip_address=ip_address, + url=url_path, + access_point='courseware' + ) + + if redirect_url: + return redirect(redirect_url) diff --git a/common/djangoapps/embargo/migrations/0005_add_courseaccessrulehistory.py b/common/djangoapps/embargo/migrations/0005_add_courseaccessrulehistory.py new file mode 100644 index 0000000000..1f99a113be --- /dev/null +++ b/common/djangoapps/embargo/migrations/0005_add_courseaccessrulehistory.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'CourseAccessRuleHistory' + db.create_table('embargo_courseaccessrulehistory', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('timestamp', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, db_index=True, blank=True)), + ('course_key', self.gf('xmodule_django.models.CourseKeyField')(max_length=255, db_index=True)), + ('snapshot', self.gf('django.db.models.fields.TextField')(null=True, blank=True)), + )) + db.send_create_signal('embargo', ['CourseAccessRuleHistory']) + + + def backwards(self, orm): + # Deleting model 'CourseAccessRuleHistory' + db.delete_table('embargo_courseaccessrulehistory') + + + models = { + 'auth.group': { + 'Meta': {'object_name': 'Group'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}), + 'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}) + }, + 'auth.permission': { + 'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'}, + 'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '50'}) + }, + 'auth.user': { + 'Meta': {'object_name': 'User'}, + 'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}), + 'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}), + 'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}), + 'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}), + 'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}), + 'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}), + 'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}), + 'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}), + 'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'}) + }, + 'contenttypes.contenttype': { + 'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"}, + 'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '100'}) + }, + 'embargo.country': { + 'Meta': {'ordering': "['country']", 'object_name': 'Country'}, + 'country': ('django_countries.fields.CountryField', [], {'unique': 'True', 'max_length': '2', 'db_index': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}) + }, + 'embargo.countryaccessrule': { + 'Meta': {'unique_together': "(('restricted_course', 'country'),)", 'object_name': 'CountryAccessRule'}, + 'country': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['embargo.Country']"}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'restricted_course': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['embargo.RestrictedCourse']"}), + 'rule_type': ('django.db.models.fields.CharField', [], {'default': "'blacklist'", 'max_length': '255'}) + }, + 'embargo.courseaccessrulehistory': { + 'Meta': {'object_name': 'CourseAccessRuleHistory'}, + 'course_key': ('xmodule_django.models.CourseKeyField', [], {'max_length': '255', 'db_index': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'snapshot': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'timestamp': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'db_index': 'True', 'blank': 'True'}) + }, + 'embargo.embargoedcourse': { + 'Meta': {'object_name': 'EmbargoedCourse'}, + 'course_id': ('xmodule_django.models.CourseKeyField', [], {'unique': 'True', 'max_length': '255', 'db_index': 'True'}), + 'embargoed': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}) + }, + 'embargo.embargoedstate': { + 'Meta': {'object_name': 'EmbargoedState'}, + 'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}), + 'embargoed_countries': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}) + }, + 'embargo.ipfilter': { + 'Meta': {'object_name': 'IPFilter'}, + 'blacklist': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), + 'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}), + 'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'whitelist': ('django.db.models.fields.TextField', [], {'blank': 'True'}) + }, + 'embargo.restrictedcourse': { + 'Meta': {'object_name': 'RestrictedCourse'}, + 'access_msg_key': ('django.db.models.fields.CharField', [], {'default': "'default'", 'max_length': '255'}), + 'course_key': ('xmodule_django.models.CourseKeyField', [], {'unique': 'True', 'max_length': '255', 'db_index': 'True'}), + 'enroll_msg_key': ('django.db.models.fields.CharField', [], {'default': "'default'", 'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}) + } + } + + complete_apps = ['embargo'] \ No newline at end of file diff --git a/common/djangoapps/embargo/models.py b/common/djangoapps/embargo/models.py index b0a1171920..c254c45194 100644 --- a/common/djangoapps/embargo/models.py +++ b/common/djangoapps/embargo/models.py @@ -12,10 +12,14 @@ file and check it in at the same time as your model changes. To do that, """ import ipaddr +import json +import logging from django.db import models from django.utils.translation import ugettext as _, ugettext_lazy from django.core.cache import cache +from django.core.urlresolvers import reverse +from django.db.models.signals import post_save, post_delete from django_countries.fields import CountryField from django_countries import countries @@ -23,11 +27,11 @@ from django_countries import countries from config_models.models import ConfigurationModel from xmodule_django.models import CourseKeyField, NoneToEmptyManager +from embargo.exceptions import InvalidAccessPoint from embargo.messages import ENROLL_MESSAGES, COURSEWARE_MESSAGES -WHITE_LIST = 'whitelist' -BLACK_LIST = 'blacklist' +log = logging.getLogger(__name__) class EmbargoedCourse(models.Model): @@ -100,6 +104,9 @@ class RestrictedCourse(models.Model): These displayed on pages served by the embargo app. """ + COURSE_LIST_CACHE_KEY = 'embargo.restricted_courses' + MESSAGE_URL_CACHE_KEY = 'embargo.message_url_path.{access_point}.{course_key}' + ENROLL_MSG_KEY_CHOICES = tuple([ (msg_key, msg.description) for msg_key, msg in ENROLL_MESSAGES.iteritems() @@ -129,11 +136,6 @@ class RestrictedCourse(models.Model): help_text=ugettext_lazy(u"The message to show when a user is blocked from accessing a course.") ) - @classmethod - def cache_key_name(cls): - """Return the name of the key to use to cache the current restricted course list""" - return 'embargo/RestrictedCourse/courses' - @classmethod def is_restricted_course(cls, course_id): """ @@ -153,25 +155,172 @@ class RestrictedCourse(models.Model): """ Cache all restricted courses and returns the list of course_keys that are restricted """ - restricted_courses = cache.get(cls.cache_key_name()) + restricted_courses = cache.get(cls.COURSE_LIST_CACHE_KEY) if not restricted_courses: restricted_courses = list(RestrictedCourse.objects.values_list('course_key', flat=True)) - cache.set(cls.cache_key_name(), restricted_courses) + cache.set(cls.COURSE_LIST_CACHE_KEY, restricted_courses) return restricted_courses + def snapshot(self): + """Return a snapshot of all access rules for this course. + + This is useful for recording an audit trail of rule changes. + The returned dictionary is JSON-serializable. + + Returns: + dict + + Example Usage: + >>> restricted_course.snapshot() + { + 'enroll_msg': 'default', + 'access_msg': 'default', + 'country_rules': [ + {'country': 'IR', 'rule_type': 'blacklist'}, + {'country': 'CU', 'rule_type': 'blacklist'} + ] + } + + """ + country_rules_for_course = ( + CountryAccessRule.objects + ).select_related('restricted_country').filter(restricted_course=self) + + return { + 'enroll_msg': self.enroll_msg_key, + 'access_msg': self.access_msg_key, + 'country_rules': [ + { + 'country': unicode(rule.country.country), + 'rule_type': rule.rule_type + } + for rule in country_rules_for_course + ] + } + + def message_key_for_access_point(self, access_point): + """Determine which message to show the user. + + The message can be configured per-course and depends + on how the user is trying to access the course + (trying to enroll or accessing courseware). + + Arguments: + access_point (str): Either "courseware" or "enrollment" + + Returns: + str: The message key. If the access point is not valid, + returns None instead. + + """ + if access_point == 'enrollment': + return self.enroll_msg_key + elif access_point == 'courseware': + return self.access_msg_key + def __unicode__(self): return unicode(self.course_key) - def save(self, *args, **kwargs): - """ - Clear the cached value when saving a RestrictedCourse entry - """ - super(RestrictedCourse, self).save(*args, **kwargs) - cache.delete(self.cache_key_name()) + @classmethod + def message_url_path(cls, course_key, access_point): + """Determine the URL path for the message explaining why the user was blocked. - def delete(self, using=None): - super(RestrictedCourse, self).delete() - cache.delete(self.cache_key_name()) + This is configured per-course. See `RestrictedCourse` in the `embargo.models` + module for more details. + + Arguments: + course_key (CourseKey): The location of the course. + access_point (str): How the user was trying to access the course. + Can be either "enrollment" or "courseware". + + Returns: + unicode: The URL path to a page explaining why the user was blocked. + + Raises: + InvalidAccessPoint: Raised if access_point is not a supported value. + + """ + if access_point not in ['enrollment', 'courseware']: + raise InvalidAccessPoint(access_point) + + # First check the cache to see if we already have + # a URL for this (course_key, access_point) tuple + cache_key = cls.MESSAGE_URL_CACHE_KEY.format( + access_point=access_point, + course_key=course_key + ) + url = cache.get(cache_key) + + # If there's a cache miss, we'll need to retrieve the message + # configuration from the database + if url is None: + url = cls._get_message_url_path_from_db(course_key, access_point) + cache.set(cache_key, url) + + return url + + @classmethod + def _get_message_url_path_from_db(cls, course_key, access_point): + """Retrieve the "blocked" message from the database. + + Arguments: + course_key (CourseKey): The location of the course. + access_point (str): How the user was trying to access the course. + Can be either "enrollment" or "courseware". + + Returns: + unicode: The URL path to a page explaining why the user was blocked. + + """ + # Fallback in case we're not able to find a message path + # Presumably if the caller is requesting a URL, the caller + # has already determined that the user should be blocked. + # We use generic messaging unless we find something more specific, + # but *always* return a valid URL path. + default_path = reverse( + 'embargo_blocked_message', + kwargs={ + 'access_point': 'courseware', + 'message_key': 'default' + } + ) + + # First check whether this is a restricted course. + # The list of restricted courses is cached, so this does + # not require a database query. + if not cls.is_restricted_course(course_key): + return default_path + + # Retrieve the message key from the restricted course + # for this access point, then determine the URL. + try: + course = cls.objects.get(course_key=course_key) + msg_key = course.message_key_for_access_point(access_point) + return reverse( + 'embargo_blocked_message', + kwargs={ + 'access_point': access_point, + 'message_key': msg_key + } + ) + except cls.DoesNotExist: + # This occurs only if there's a race condition + # between cache invalidation and database access. + return default_path + + @classmethod + def invalidate_cache_for_course(cls, course_key): + """Invalidate the caches for the restricted course. """ + cache.delete(cls.COURSE_LIST_CACHE_KEY) + log.info("Invalidated cached list of restricted courses.") + + for access_point in ['enrollment', 'courseware']: + msg_cache_key = cls.MESSAGE_URL_CACHE_KEY.format( + access_point=access_point, + course_key=course_key + ) + cache.delete(msg_cache_key) + log.info("Invalidated cached messaging URLs ") class Country(models.Model): @@ -216,15 +365,18 @@ class CountryAccessRule(models.Model): """ + WHITELIST_RULE = 'whitelist' + BLACKLIST_RULE = 'blacklist' + RULE_TYPE_CHOICES = ( - (WHITE_LIST, 'Whitelist (allow only these countries)'), - (BLACK_LIST, 'Blacklist (block these countries)'), + (WHITELIST_RULE, 'Whitelist (allow only these countries)'), + (BLACKLIST_RULE, 'Blacklist (block these countries)'), ) rule_type = models.CharField( max_length=255, choices=RULE_TYPE_CHOICES, - default=BLACK_LIST, + default=BLACKLIST_RULE, help_text=ugettext_lazy( u"Whether to include or exclude the given course. " u"If whitelist countries are specified, then ONLY users from whitelisted countries " @@ -243,15 +395,7 @@ class CountryAccessRule(models.Model): help_text=ugettext_lazy(u"The country to which this rule applies.") ) - @classmethod - def cache_key_for_consolidated_countries(cls, course_id): - """ - Args: - course_id (str): course_id to look for - Returns: - Consolidated list of accessible countries for given course - """ - return "{}/allowed/countries".format(course_id) + CACHE_KEY = u"embargo.allowed_countries.{course_key}" @classmethod def check_country_access(cls, course_id, country): @@ -267,10 +411,11 @@ class CountryAccessRule(models.Model): True if country found in allowed country otherwise check given country exists in list """ - allowed_countries = cache.get(cls.cache_key_for_consolidated_countries(course_id)) + cache_key = cls.CACHE_KEY.format(course_key=course_id) + allowed_countries = cache.get(cache_key) if not allowed_countries: allowed_countries = cls._get_country_access_list(course_id) - cache.set(cls.cache_key_for_consolidated_countries(course_id), allowed_countries) + cache.set(cache_key, allowed_countries) return country == '' or country in allowed_countries @@ -298,9 +443,9 @@ class CountryAccessRule(models.Model): # Filter the rules into a whitelist and blacklist in one pass for rule in rules_for_course: - if rule.rule_type == 'whitelist': + if rule.rule_type == cls.WHITELIST_RULE: whitelist_countries.add(rule.country.country.code) - elif rule.rule_type == 'blacklist': + elif rule.rule_type == cls.BLACKLIST_RULE: blacklist_countries.add(rule.country.country.code) # If there are no whitelist countries, default to all countries @@ -312,30 +457,23 @@ class CountryAccessRule(models.Model): return list(whitelist_countries - blacklist_countries) def __unicode__(self): - if self.rule_type == WHITE_LIST: + if self.rule_type == self.WHITELIST_RULE: return _(u"Whitelist {country} for {course}").format( course=unicode(self.restricted_course.course_key), country=unicode(self.country), ) - elif self.rule_type == BLACK_LIST: + elif self.rule_type == self.BLACKLIST_RULE: return _(u"Blacklist {country} for {course}").format( course=unicode(self.restricted_course.course_key), country=unicode(self.country), ) - def save(self, *args, **kwargs): - """ - Clear the cached value when saving a entry - """ - super(CountryAccessRule, self).save(*args, **kwargs) - cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key))) - - def delete(self, using=None): - """ - clear the cached value when saving a entry - """ - super(CountryAccessRule, self).delete() - cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key))) + @classmethod + def invalidate_cache_for_course(cls, course_key): + """Invalidate the cache. """ + cache_key = cls.CACHE_KEY.format(course_key=course_key) + cache.delete(cache_key) + log.info("Invalidated country access list for course %s", course_key) class Meta: """a course can be added with either black or white list. """ @@ -347,6 +485,132 @@ class CountryAccessRule(models.Model): ) +def invalidate_country_rule_cache(sender, instance, **kwargs): # pylint: disable=unused-argument + """Invalidate cached rule information on changes to the rule models. + + We need to handle this in a Django receiver, because Django admin + doesn't always call the model's `delete()` method directly during + a bulk delete operation. + + Arguments: + sender: Not used, but required by Django receivers. + instance (RestrictedCourse or CountryAccessRule): The instance + being saved or deleted. + + """ + if isinstance(instance, RestrictedCourse): + # If a restricted course changed, we need to update the list + # of which courses are restricted as well as any rules + # associated with the course. + RestrictedCourse.invalidate_cache_for_course(instance.course_key) + CountryAccessRule.invalidate_cache_for_course(instance.course_key) + if isinstance(instance, CountryAccessRule): + try: + restricted_course = instance.restricted_course + except RestrictedCourse.DoesNotExist: + # If the restricted course and its rules are being deleted, + # the restricted course may not exist at this point. + # However, the cache should have been invalidated + # when the restricted course was deleted. + pass + else: + # Invalidate the cache of countries for the course. + CountryAccessRule.invalidate_cache_for_course(restricted_course.course_key) + + +# Hook up the cache invalidation receivers to the appropriate +# post_save and post_delete signals. +post_save.connect(invalidate_country_rule_cache, sender=CountryAccessRule) +post_save.connect(invalidate_country_rule_cache, sender=RestrictedCourse) +post_delete.connect(invalidate_country_rule_cache, sender=CountryAccessRule) +post_delete.connect(invalidate_country_rule_cache, sender=RestrictedCourse) + + +class CourseAccessRuleHistory(models.Model): + """History of course access rule changes. """ + + timestamp = models.DateTimeField(db_index=True, auto_now_add=True) + course_key = CourseKeyField(max_length=255, db_index=True) + snapshot = models.TextField(null=True, blank=True) + + DELETED_PLACEHOLDER = "DELETED" + + @classmethod + def save_snapshot(cls, restricted_course, deleted=False): + """Save a snapshot of access rules for a course. + + Arguments: + restricted_course (RestrictedCourse) + + Keyword Arguments: + deleted (boolean): If True, the restricted course + is about to be deleted. Create a placeholder + snapshot recording that the course and all its + rules was deleted. + + Returns: + None + + """ + course_key = restricted_course.course_key + + # At the point this is called, the access rules may not have + # been deleted yet. When the rules *are* deleted, the + # restricted course entry may no longer exist, so we + # won't be able to take a snapshot of the rules. + # To handle this, we save a placeholder "DELETED" entry + # so that it's clear in the audit that the restricted + # course (along with all its rules) was deleted. + snapshot = ( + CourseAccessRuleHistory.DELETED_PLACEHOLDER if deleted + else json.dumps(restricted_course.snapshot()) + ) + + cls.objects.create( + course_key=course_key, + snapshot=snapshot + ) + + @staticmethod + def snapshot_post_save_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument + """Create a snapshot of course access rules when the rules are updated. """ + if isinstance(instance, RestrictedCourse): + CourseAccessRuleHistory.save_snapshot(instance) + elif isinstance(instance, CountryAccessRule): + CourseAccessRuleHistory.save_snapshot(instance.restricted_course) + + @staticmethod + def snapshot_post_delete_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument + """Create a snapshot of course access rules when rules are deleted. """ + if isinstance(instance, RestrictedCourse): + CourseAccessRuleHistory.save_snapshot(instance, deleted=True) + elif isinstance(instance, CountryAccessRule): + try: + restricted_course = instance.restricted_course + except RestrictedCourse.DoesNotExist: + # When Django admin deletes a restricted course, it will + # also delete the rules associated with that course. + # At this point, we can't access the restricted course + # from the rule beause it may already have been deleted. + # If this happens, we don't need to record anything, + # since we already record a placeholder "DELETED" + # entry when the restricted course record is deleted. + pass + else: + CourseAccessRuleHistory.save_snapshot(restricted_course) + + class Meta: # pylint: disable=missing-docstring,old-style-class + get_latest_by = 'timestamp' + + +# Connect the signals to the receivers so we record a history +# of changes to the course access rules. +post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=RestrictedCourse) +post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=CountryAccessRule) +post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=RestrictedCourse) +post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=CountryAccessRule) + + class IPFilter(ConfigurationModel): """ Register specific IP addresses to explicitly block or unblock. diff --git a/common/djangoapps/embargo/test_utils.py b/common/djangoapps/embargo/test_utils.py new file mode 100644 index 0000000000..a278e5d49d --- /dev/null +++ b/common/djangoapps/embargo/test_utils.py @@ -0,0 +1,82 @@ +"""Utilities for writing unit tests that involve course embargos. """ +import contextlib +import mock + +import pygeoip + +from django.core.urlresolvers import reverse +from django.core.cache import cache +from embargo.models import Country, CountryAccessRule, RestrictedCourse + + +@contextlib.contextmanager +def restrict_course(course_key, access_point="enrollment"): + """Simulate that a course is restricted. + + This does two things: + 1) Configures country access rules so that the course is restricted. + 2) Mocks the GeoIP call so the user appears to be coming + from a country that's blocked from the course. + + This is useful for tests that need to verify + that restricted users won't be able to access + particular views. + + Arguments: + course_key (CourseKey): The location of the course to block. + + Keyword Arguments: + access_point (str): Either "courseware" or "enrollment" + + Yields: + str: A URL to the page in the embargo app that explains + why the user was blocked. + + Example Usage: + >>> with restrict_course(course_key) as redirect_url: + >>> # The client will appear to be coming from + >>> # an IP address that is blocked. + >>> resp = self.client.get(url) + >>> self.assertRedirects(resp, redirect_url) + + """ + # Clear the cache to ensure that previous tests don't interfere + # with this test. + cache.clear() + + with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip: + + # Remove all existing rules for the course + CountryAccessRule.objects.all().delete() + + # Create the country object + # Ordinarily, we'd create models for every country, + # but that would slow down the test suite. + country, __ = Country.objects.get_or_create(country='IR') + + # Create a model for the restricted course + restricted_course, __ = RestrictedCourse.objects.get_or_create(course_key=course_key) + restricted_course.enroll_msg_key = 'default' + restricted_course.access_msg_key = 'default' + restricted_course.save() + + # Ensure that there is a blacklist rule for the country + CountryAccessRule.objects.get_or_create( + restricted_course=restricted_course, + country=country, + rule_type='blacklist' + ) + + # Simulate that the user is coming from the blacklisted country + mock_ip.return_value = 'IR' + + # Yield the redirect url so the tests don't need to know + # the embargo messaging URL structure. + redirect_url = reverse( + 'embargo_blocked_message', + kwargs={ + 'access_point': access_point, + 'message_key': 'default' + } + ) + yield redirect_url diff --git a/common/djangoapps/embargo/tests/test_api.py b/common/djangoapps/embargo/tests/test_api.py new file mode 100644 index 0000000000..0eeb3d7b75 --- /dev/null +++ b/common/djangoapps/embargo/tests/test_api.py @@ -0,0 +1,251 @@ +""" +Tests for EmbargoMiddleware +""" + +import mock +import unittest +import pygeoip +import ddt + +from django.conf import settings +from django.test.utils import override_settings +from django.core.cache import cache +from django.db import connection, transaction + +from student.tests.factories import UserFactory +from xmodule.modulestore.tests.factories import CourseFactory +from xmodule.modulestore.tests.django_utils import ( + ModuleStoreTestCase, mixed_store_config +) + +from embargo.models import ( + RestrictedCourse, Country, CountryAccessRule, +) + +from util.testing import UrlResetMixin +from embargo import api as embargo_api +from embargo.exceptions import InvalidAccessPoint +from mock import patch + + +# Since we don't need any XML course fixtures, use a modulestore configuration +# that disables the XML modulestore. +MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, include_xml=False) + + +@ddt.ddt +@override_settings(MODULESTORE=MODULESTORE_CONFIG) +@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') +class EmbargoCheckAccessApiTests(ModuleStoreTestCase): + """Test the embargo API calls to determine whether a user has access. """ + + def setUp(self): + super(EmbargoCheckAccessApiTests, self).setUp() + self.course = CourseFactory.create() + self.user = UserFactory.create() + self.restricted_course = RestrictedCourse.objects.create(course_key=self.course.id) + Country.objects.create(country='US') + Country.objects.create(country='IR') + Country.objects.create(country='CU') + + # Clear the cache to prevent interference between tests + cache.clear() + + @ddt.data( + # IP country, profile_country, blacklist, whitelist, allow_access + ('US', None, [], [], True), + ('IR', None, ['IR', 'CU'], [], False), + ('US', 'IR', ['IR', 'CU'], [], False), + ('IR', 'IR', ['IR', 'CU'], [], False), + ('US', None, [], ['US'], True), + ('IR', None, [], ['US'], False), + ('US', 'IR', [], ['US'], False), + ) + @ddt.unpack + def test_country_access_rules(self, ip_country, profile_country, blacklist, whitelist, allow_access): + # Configure the access rules + for whitelist_country in whitelist: + CountryAccessRule.objects.create( + rule_type=CountryAccessRule.WHITELIST_RULE, + restricted_course=self.restricted_course, + country=Country.objects.get(country=whitelist_country) + ) + + for blacklist_country in blacklist: + CountryAccessRule.objects.create( + rule_type=CountryAccessRule.BLACKLIST_RULE, + restricted_course=self.restricted_course, + country=Country.objects.get(country=blacklist_country) + ) + + # Configure the user's profile country + if profile_country is not None: + self.user.profile.country = profile_country + self.user.profile.save() + + # Appear to make a request from an IP in a particular country + with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip: + mock_ip.return_value = ip_country + + # Call the API. Note that the IP address we pass in doesn't + # matter, since we're injecting a mock for geo-location + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + + # Verify that the access rules were applied correctly + self.assertEqual(result, allow_access) + + def test_no_user_has_access(self): + CountryAccessRule.objects.create( + rule_type=CountryAccessRule.BLACKLIST_RULE, + restricted_course=self.restricted_course, + country=Country.objects.get(country='US') + ) + + # The user is set to None, because the user has not been authenticated. + result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0') + self.assertTrue(result) + + def test_no_user_blocked(self): + CountryAccessRule.objects.create( + rule_type=CountryAccessRule.BLACKLIST_RULE, + restricted_course=self.restricted_course, + country=Country.objects.get(country='US') + ) + + with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip: + mock_ip.return_value = 'US' + + # The user is set to None, because the user has not been authenticated. + result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0') + self.assertFalse(result) + + def test_course_not_restricted(self): + # No restricted course model for this course key, + # so all access checks should be skipped. + unrestricted_course = CourseFactory.create() + with self.assertNumQueries(1): + embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0') + + # The second check should require no database queries + with self.assertNumQueries(0): + embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0') + + def test_ip_v6(self): + # Test the scenario that will go through every check + # (restricted course, but pass all the checks) + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='FE80::0202:B3FF:FE1E:8329') + self.assertTrue(result) + + @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + def test_profile_country_db_null(self): + # Django country fields treat NULL values inconsistently. + # When saving a profile with country set to None, Django saves an empty string to the database. + # However, when the country field loads a NULL value from the database, it sets + # `country.code` to `None`. This caused a bug in which country values created by + # the original South schema migration -- which defaulted to NULL -- caused a runtime + # exception when the embargo middleware treated the value as a string. + # In order to simulate this behavior, we can't simply set `profile.country = None`. + # (because when we save it, it will set the database field to an empty string instead of NULL) + query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s" + connection.cursor().execute(query, [str(self.user.profile.id)]) + transaction.commit_unless_managed() + + # Verify that we can check the user's access without error + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + self.assertTrue(result) + + def test_caching(self): + # Test the scenario that will go through every check + # (restricted course, but pass all the checks) + # This is the worst case, so it will hit all of the + # caching code. + with self.assertNumQueries(3): + embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + + with self.assertNumQueries(0): + embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + + +@ddt.ddt +@override_settings(MODULESTORE=MODULESTORE_CONFIG) +@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') +class EmbargoMessageUrlApiTests(UrlResetMixin, ModuleStoreTestCase): + """Test the embargo API calls for retrieving the blocking message URLs. """ + + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + def setUp(self): + super(EmbargoMessageUrlApiTests, self).setUp('embargo') + self.course = CourseFactory.create() + + def tearDown(self): + cache.clear() + + @ddt.data( + ('enrollment', '/embargo/blocked-message/enrollment/embargo/'), + ('courseware', '/embargo/blocked-message/courseware/embargo/') + ) + @ddt.unpack + def test_message_url_path(self, access_point, expected_url_path): + self._restrict_course(self.course.id) + + # Retrieve the URL to the blocked message page + url_path = embargo_api.message_url_path(self.course.id, access_point) + self.assertEqual(url_path, expected_url_path) + + def test_message_url_path_caching(self): + self._restrict_course(self.course.id) + + # The first time we retrieve the message, we'll need + # to hit the database. + with self.assertNumQueries(2): + embargo_api.message_url_path(self.course.id, "enrollment") + + # The second time, we should be using cached values + with self.assertNumQueries(0): + embargo_api.message_url_path(self.course.id, "enrollment") + + @ddt.data('enrollment', 'courseware') + def test_message_url_path_no_restrictions_for_course(self, access_point): + # No restrictions for the course + url_path = embargo_api.message_url_path(self.course.id, access_point) + + # Use a default path + self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/') + + def test_invalid_access_point(self): + with self.assertRaises(InvalidAccessPoint): + embargo_api.message_url_path(self.course.id, "invalid") + + def test_message_url_stale_cache(self): + # Retrieve the URL once, populating the cache with the list + # of restricted courses. + self._restrict_course(self.course.id) + embargo_api.message_url_path(self.course.id, 'courseware') + + # Delete the restricted course entry + RestrictedCourse.objects.get(course_key=self.course.id).delete() + + # Clear the message URL cache + message_cache_key = ( + 'embargo.message_url_path.courseware.{course_key}' + ).format(course_key=self.course.id) + cache.delete(message_cache_key) + + # Try again. Even though the cache results are stale, + # we should still get a valid URL. + url_path = embargo_api.message_url_path(self.course.id, 'courseware') + self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/') + + def _restrict_course(self, course_key): + """Restrict the user from accessing the course. """ + country = Country.objects.create(country='us') + restricted_course = RestrictedCourse.objects.create( + course_key=course_key, + enroll_msg_key='embargo', + access_msg_key='embargo' + ) + CountryAccessRule.objects.create( + restricted_course=restricted_course, + rule_type=CountryAccessRule.BLACKLIST_RULE, + country=country + ) diff --git a/common/djangoapps/embargo/tests/test_middleware_access_rules.py b/common/djangoapps/embargo/tests/test_middleware_access_rules.py index 990e28f255..e4a4e2d1e6 100644 --- a/common/djangoapps/embargo/tests/test_middleware_access_rules.py +++ b/common/djangoapps/embargo/tests/test_middleware_access_rules.py @@ -2,27 +2,24 @@ Tests for EmbargoMiddleware with CountryAccessRules """ -import mock -import pygeoip import unittest - -from django.db import connection, transaction -from django.core.urlresolvers import reverse -from django.conf import settings +from mock import patch import ddt +from django.core.urlresolvers import reverse +from django.conf import settings +from django.core.cache import cache as django_cache + +from util.testing import UrlResetMixin from student.tests.factories import UserFactory from xmodule.modulestore.tests.factories import CourseFactory from xmodule.modulestore.tests.django_utils import ( ModuleStoreTestCase, mixed_store_config ) +from config_models.models import cache as config_cache -# Explicitly import the cache from ConfigurationModel so we can reset it after each test -from config_models.models import cache -from embargo.models import ( - RestrictedCourse, Country, CountryAccessRule, WHITE_LIST, BLACK_LIST -) -from django_countries import countries +from embargo.models import RestrictedCourse, IPFilter +from embargo.test_utils import restrict_course # Since we don't need any XML course fixtures, use a modulestore configuration @@ -32,256 +29,147 @@ MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, incl @ddt.ddt @unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') -class EmbargoCountryAccessRulesTests(ModuleStoreTestCase): - """ - Tests of EmbargoApi - """ +class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase): + """Tests of embargo middleware country access rules. + There are detailed unit tests for the rule logic in + `test_api.py`; here, we're mainly testing the integration + with middleware + + """ + USERNAME = 'fred' + PASSWORD = 'secret' + + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) def setUp(self): - super(EmbargoCountryAccessRulesTests, self).setUp() - self.user = UserFactory(username='fred', password='secret') - self.client.login(username='fred', password='secret') - self.embargo_course1 = CourseFactory.create() - self.embargo_course1.save() - self.embargo_course2 = CourseFactory.create() - self.embargo_course2.save() - self.regular_course = CourseFactory.create(org="Regular") - self.regular_course.save() - self.embargoed_course_whitelisted = '/courses/' + self.embargo_course1.id.to_deprecated_string() + '/info' - self.embargoed_course_blacklisted = '/courses/' + self.embargo_course2.id.to_deprecated_string() + '/info' - self.regular_page = '/courses/' + self.regular_course.id.to_deprecated_string() + '/info' + super(EmbargoMiddlewareAccessTests, self).setUp('embargo') + self.user = UserFactory(username=self.USERNAME, password=self.PASSWORD) + self.course = CourseFactory.create() + self.client.login(username=self.USERNAME, password=self.PASSWORD) - restricted_course_1 = RestrictedCourse.objects.create(course_key=self.embargo_course1.id) - restricted_course_2 = RestrictedCourse.objects.create(course_key=self.embargo_course2.id) + self.courseware_url = reverse( + 'course_root', + kwargs={'course_id': unicode(self.course.id)} + ) + self.non_courseware_url = reverse('dashboard') - all_countries = [Country(country=code[0]) for code in list(countries)] - Country.objects.bulk_create(all_countries) + # Clear the cache to avoid interference between tests + django_cache.clear() + config_cache.clear() - country_access_white_rules = [ - CountryAccessRule( - restricted_course=restricted_course_1, - rule_type=WHITE_LIST, - country=Country.objects.get(country='US') - ), - CountryAccessRule( - restricted_course=restricted_course_1, - rule_type=WHITE_LIST, - country=Country.objects.get(country='NZ') + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + def test_blocked(self): + with restrict_course(self.course.id, access_point='courseware') as redirect_url: + response = self.client.get(self.courseware_url) + self.assertRedirects(response, redirect_url) + + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + def test_allowed(self): + # Add the course to the list of restricted courses + # but don't create any access rules + RestrictedCourse.objects.create(course_key=self.course.id) + + # Expect that we can access courseware + response = self.client.get(self.courseware_url) + self.assertEqual(response.status_code, 200) + + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + def test_non_courseware_url(self): + with restrict_course(self.course.id): + response = self.client.get(self.non_courseware_url) + self.assertEqual(response.status_code, 200) + + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + @ddt.data( + # request_ip, blacklist, whitelist, is_enabled, allow_access + ('173.194.123.35', ['173.194.123.35'], [], True, False), + ('173.194.123.35', ['173.194.0.0/16'], [], True, False), + ('173.194.123.35', ['127.0.0.0/32', '173.194.0.0/16'], [], True, False), + ('173.195.10.20', ['173.194.0.0/16'], [], True, True), + ('173.194.123.35', ['173.194.0.0/16'], ['173.194.0.0/16'], True, False), + ('173.194.123.35', [], ['173.194.0.0/16'], True, True), + ('192.178.2.3', [], ['173.194.0.0/16'], True, True), + ('173.194.123.35', ['173.194.123.35'], [], False, True), + ) + @ddt.unpack + def test_ip_access_rules(self, request_ip, blacklist, whitelist, is_enabled, allow_access): + # Ensure that IP blocking works for anonymous users + self.client.logout() + + # Set up the IP rules + IPFilter.objects.create( + blacklist=", ".join(blacklist), + whitelist=", ".join(whitelist), + enabled=is_enabled + ) + + # Check that access is enforced + response = self.client.get( + "/", + HTTP_X_FORWARDED_FOR=request_ip, + REMOTE_ADDR=request_ip + ) + + if allow_access: + self.assertEqual(response.status_code, 200) + else: + redirect_url = reverse( + 'embargo_blocked_message', + kwargs={ + 'access_point': 'courseware', + 'message_key': 'embargo' + } ) - ] - CountryAccessRule.objects.bulk_create(country_access_white_rules) + self.assertRedirects(response, redirect_url) - country_access_black_rules = [ - CountryAccessRule( - restricted_course=restricted_course_2, - rule_type=BLACK_LIST, - country=Country.objects.get(country='CU') - ), - CountryAccessRule( - restricted_course=restricted_course_2, - rule_type=BLACK_LIST, - country=Country.objects.get(country='IR') + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + @ddt.data( + ('courseware', 'default'), + ('courseware', 'embargo'), + ('enrollment', 'default'), + ('enrollment', 'embargo') + ) + @ddt.unpack + def test_always_allow_access_to_embargo_messages(self, access_point, msg_key): + # Blacklist an IP address + IPFilter.objects.create( + blacklist="192.168.10.20", + enabled=True + ) + + url = reverse( + 'embargo_blocked_message', + kwargs={ + 'access_point': access_point, + 'message_key': msg_key + } + ) + response = self.client.get( + url, + HTTP_X_FORWARDED_FOR="192.168.10.20", + REMOTE_ADDR="192.168.10.20" + ) + self.assertEqual(response.status_code, 200) + + @patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) + def test_whitelist_ip_skips_country_access_checks(self): + # Whitelist an IP address + IPFilter.objects.create( + whitelist="192.168.10.20", + enabled=True + ) + + # Set up country access rules so the user would + # be restricted from the course. + with restrict_course(self.course.id): + # Make a request from the whitelisted IP address + response = self.client.get( + self.courseware_url, + HTTP_X_FORWARDED_FOR="192.168.10.20", + REMOTE_ADDR="192.168.10.20" ) - ] - CountryAccessRule.objects.bulk_create(country_access_black_rules) - # Text from lms/templates/static_templates/embargo.html - self.embargo_text = "Unfortunately, at this time edX must comply with export controls, and we cannot allow you to access this course." # pylint: disable=line-too-long - self.patcher = mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr', self.mock_country_code_by_addr) - self.patcher.start() - - def tearDown(self): - # Explicitly clear ConfigurationModel's cache so tests have a clear cache - # and don't interfere with each other - cache.clear() - self.patcher.stop() - - def mock_country_code_by_addr(self, ip_addr): - """ - making a lists of countries which will be use in country access rules. - if incoming request's ip belongs to this dict then related country will return. - for one course CU and IR added as blacklist in course access rules. - for one course US and NZ added as whitelist in course access rules. - """ - ip_dict = { - '1.0.0.0': 'CU', - '2.0.0.0': 'IR', - '3.0.0.0': 'US', - '4.0.0.0': 'NZ' - } - return ip_dict.get(ip_addr, 'FR') - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - @ddt.data('1.0.0.0', '2.0.0.0') - def test_course_access_rules_with_black_rule_country_by_user_ip(self, ip_address): - # Accessing an embargoed page from a user ip whose origin is added as - # blacklist in course access rules should be redirected. - # any other IP should be success - - # Following the redirect should give us the embargo page - response = self.client.get( - self.embargoed_course_blacklisted, - HTTP_X_FORWARDED_FOR=ip_address, - REMOTE_ADDR=ip_address - ) - self.assertEqual(response.status_code, 302) - - # Following the redirect should give us the embargo page - response = self.client.get( - self.embargoed_course_blacklisted, - HTTP_X_FORWARDED_FOR=ip_address, - REMOTE_ADDR=ip_address, - follow=True - ) - self.assertIn(self.embargo_text, response.content) - - # accesssing blacklist course from any other country ip should be success - response = self.client.get( - self.embargoed_course_blacklisted, - HTTP_X_FORWARDED_FOR='5.0.0.1', - REMOTE_ADDR='5.0.0.1' - ) + # Expect that we were still able to access the page, + # even though we would have been blocked by country + # access rules. self.assertEqual(response.status_code, 200) - - # accesssing whitelist course from these should give us the embargo page - response = self.client.get( - self.embargoed_course_whitelisted, - HTTP_X_FORWARDED_FOR=ip_address, - REMOTE_ADDR=ip_address - ) - self.assertEqual(response.status_code, 302) - - # Accessing a regular page from these IP should be success - response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address) - self.assertEqual(response.status_code, 200) - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - @ddt.data('3.0.0.0', '4.0.0.0', "7.0.0.1", "2001:250::") - def test_course_access_rules_with_white_rule_country_by_user_ip(self, ip_address): - # Accessing an embargoed page from a user ip whose origin is added as - # white in course access rules should succeed. any other ip should be fail - - response = self.client.get( - self.embargoed_course_whitelisted, - HTTP_X_FORWARDED_FOR=ip_address, - REMOTE_ADDR=ip_address - ) - if ip_address in ['3.0.0.0', '4.0.0.0']: - self.assertEqual(response.status_code, 200) - else: - self.assertEqual(response.status_code, 302) - - # access the blacklisted course should give success - response = self.client.get( - self.embargoed_course_blacklisted, - HTTP_X_FORWARDED_FOR=ip_address, - REMOTE_ADDR=ip_address - ) - self.assertEqual(response.status_code, 200) - - # Accessing a regular page should success - response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address) - self.assertEqual(response.status_code, 200) - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - def test_embargo_profile_country_db_null(self): - # Django country fields treat NULL values inconsistently. - # When saving a profile with country set to None, Django saves an empty string to the database. - # However, when the country field loads a NULL value from the database, it sets - # `country.code` to `None`. This caused a bug in which country values created by - # the original South schema migration -- which defaulted to NULL -- caused a runtime - # exception when the embargo middleware treated the value as a string. - # In order to simulate this behavior, we can't simply set `profile.country = None`. - # (because when we save it, it will set the database field to an empty string instead of NULL) - query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s" - connection.cursor().execute(query, [str(self.user.profile.id)]) - transaction.commit_unless_managed() - - # Attempt to access an embargoed course - # Verify that the student can access the page without an error - response = self.client.get(self.embargoed_course_blacklisted) - self.assertEqual(response.status_code, 200) - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - @ddt.data("", "US", "CA", "AF", "NZ", "IR") - def test_regular_course_accessible_from_every_where(self, profile_country): - # regular course is accessible even when ENABLE_COUNTRY_ACCESS flag is true - profile = self.user.profile - profile.country = profile_country - profile.save() - - response = self.client.get(self.regular_page) - # Course is accessible from all countries. - self.assertEqual(response.status_code, 200) - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - @ddt.data("", "US", "CA", "AF", "NZ", "IR") - def test_embargo_course_whitelisted_with_profile_country(self, profile_country): - # course is emabargoed and has white list countries. - # but user ip belongs to US but profile country is blacklist - # only white list country can access the course. - - profile = self.user.profile - profile.country = profile_country - profile.save() - - # adding the US IP so the _country_code_from_ip() get passed - response = self.client.get( - self.embargoed_course_whitelisted, - HTTP_X_FORWARDED_FOR='3.0.0.0', - REMOTE_ADDR='3.0.0.0' - ) - # Course is whitelisted against US,NZ so all other countries will be disallowed - if profile_country in ["CA", "AF", "IR"]: - self.assertRedirects(response, reverse('embargo')) - self.assertEqual(response.status_code, 302) - else: - self.assertEqual(response.status_code, 200) - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - @ddt.data("", "US", "CA", "NZ", "IR", "CU") - def test_embargo_course_blacklisted_with_profile_country(self, profile_country): - # if course is emabargoed and has black list countries ( CU , IR ). - # then users from these countries can't access this course. - # any user from other than these countries can access. - - profile = self.user.profile - profile.country = profile_country - profile.save() - - response = self.client.get(self.embargoed_course_blacklisted) - if profile_country in ["", "US", "CA", "NZ"]: - self.assertEqual(response.status_code, 200) - else: - embargo_url = reverse('embargo') - self.assertRedirects(response, embargo_url) - self.assertEqual(response.status_code, 302) - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - @ddt.data("", "US", "CA", "NZ", "IR", "CU") - def test_embargo_course_without_any_rules_list(self, profile_country): - # if course is emabargoed but without whitelist and blacklist - # then course can be accessible from any where - - profile = self.user.profile - profile.country = profile_country - profile.save() - - embargo_course3 = CourseFactory.create() - embargo_course3.save() - RestrictedCourse(course_key=embargo_course3.id).save() - embargoed_course_page = '/courses/' + embargo_course3.id.to_deprecated_string() + '/info' - - response = self.client.get(embargoed_course_page) - self.assertEqual(response.status_code, 200) - - @mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True}) - def test_embargo_profile_country_cache(self): - # Warm the cache - with self.assertNumQueries(24): - self.client.get(self.embargoed_course_blacklisted) - - # Access the page multiple times, but expect that we hit - # the database to check the user's profile only once - with self.assertNumQueries(9): - self.client.get(self.embargoed_course_blacklisted) diff --git a/common/djangoapps/embargo/tests/test_models.py b/common/djangoapps/embargo/tests/test_models.py index a424440c7b..caf8422c70 100644 --- a/common/djangoapps/embargo/tests/test_models.py +++ b/common/djangoapps/embargo/tests/test_models.py @@ -1,17 +1,18 @@ -"""Test of models for embargo middleware app""" +"""Test of models for embargo app""" +import json from django.test import TestCase from django.db.utils import IntegrityError -from opaque_keys.edx.locations import SlashSeparatedCourseKey +from opaque_keys.edx.locator import CourseLocator from embargo.models import ( EmbargoedCourse, EmbargoedState, IPFilter, RestrictedCourse, - Country, CountryAccessRule, WHITE_LIST, BLACK_LIST + Country, CountryAccessRule, CourseAccessRuleHistory ) class EmbargoModelsTest(TestCase): """Test each of the 3 models in embargo.models""" def test_course_embargo(self): - course_id = SlashSeparatedCourseKey('abc', '123', 'doremi') + course_id = CourseLocator('abc', '123', 'doremi') # Test that course is not authorized by default self.assertFalse(EmbargoedCourse.is_embargoed(course_id)) @@ -22,8 +23,8 @@ class EmbargoModelsTest(TestCase): # Now, course should be embargoed self.assertTrue(EmbargoedCourse.is_embargoed(course_id)) self.assertEquals( - cauth.__unicode__(), - "Course 'abc/123/doremi' is Embargoed" + unicode(cauth), + u"Course '{course_id}' is Embargoed".format(course_id=course_id) ) # Unauthorize by explicitly setting email_enabled to False @@ -32,8 +33,8 @@ class EmbargoModelsTest(TestCase): # Test that course is now unauthorized self.assertFalse(EmbargoedCourse.is_embargoed(course_id)) self.assertEquals( - cauth.__unicode__(), - "Course 'abc/123/doremi' is Not Embargoed" + unicode(cauth), + u"Course '{course_id}' is Not Embargoed".format(course_id=course_id) ) def test_state_embargo(self): @@ -101,18 +102,18 @@ class EmbargoModelsTest(TestCase): class RestrictedCourseTest(TestCase): - """Test unicode values tests and cache functionality""" + """Test RestrictedCourse model. """ def test_unicode_values(self): - course_id = SlashSeparatedCourseKey('abc', '123', 'doremi') + course_id = CourseLocator('abc', '123', 'doremi') restricted_course = RestrictedCourse.objects.create(course_key=course_id) self.assertEquals( - restricted_course.__unicode__(), - "abc/123/doremi" + unicode(restricted_course), + unicode(course_id) ) def test_restricted_course_cache_with_save_delete(self): - course_id = SlashSeparatedCourseKey('abc', '123', 'doremi') + course_id = CourseLocator('abc', '123', 'doremi') RestrictedCourse.objects.create(course_key=course_id) # Warm the cache @@ -124,7 +125,7 @@ class RestrictedCourseTest(TestCase): RestrictedCourse.is_restricted_course(course_id) # add new the course so the cache must get delete and again hit the db - new_course_id = SlashSeparatedCourseKey('def', '123', 'doremi') + new_course_id = CourseLocator('def', '123', 'doremi') RestrictedCourse.objects.create(course_key=new_course_id) with self.assertNumQueries(1): RestrictedCourse.is_restricted_course(new_course_id) @@ -146,45 +147,42 @@ class RestrictedCourseTest(TestCase): class CountryTest(TestCase): - """Test unicode values test""" + """Test Country model. """ def test_unicode_values(self): country = Country.objects.create(country='NZ') - self.assertEquals( - country.__unicode__(), - "New Zealand (NZ)" - ) + self.assertEquals(unicode(country), "New Zealand (NZ)") class CountryAccessRuleTest(TestCase): - """Test unicode values tests and unique-together contraint""" + """Test CountryAccessRule model. """ def test_unicode_values(self): - course_id = SlashSeparatedCourseKey('abc', '123', 'doremi') + course_id = CourseLocator('abc', '123', 'doremi') country = Country.objects.create(country='NZ') restricted_course1 = RestrictedCourse.objects.create(course_key=course_id) access_rule = CountryAccessRule.objects.create( restricted_course=restricted_course1, - rule_type=WHITE_LIST, + rule_type=CountryAccessRule.WHITELIST_RULE, country=country ) self.assertEquals( - access_rule.__unicode__(), - "Whitelist New Zealand (NZ) for abc/123/doremi" + unicode(access_rule), + u"Whitelist New Zealand (NZ) for {course_key}".format(course_key=course_id) ) - course_id = SlashSeparatedCourseKey('def', '123', 'doremi') + course_id = CourseLocator('def', '123', 'doremi') restricted_course1 = RestrictedCourse.objects.create(course_key=course_id) access_rule = CountryAccessRule.objects.create( restricted_course=restricted_course1, - rule_type=BLACK_LIST, + rule_type=CountryAccessRule.BLACKLIST_RULE, country=country ) self.assertEquals( - access_rule.__unicode__(), - "Blacklist New Zealand (NZ) for def/123/doremi" + unicode(access_rule), + u"Blacklist New Zealand (NZ) for {course_key}".format(course_key=course_id) ) def test_unique_together_constraint(self): @@ -192,31 +190,31 @@ class CountryAccessRuleTest(TestCase): Course with specific country can be added either as whitelist or blacklist trying to add with both types will raise error """ - course_id = SlashSeparatedCourseKey('abc', '123', 'doremi') + course_id = CourseLocator('abc', '123', 'doremi') country = Country.objects.create(country='NZ') restricted_course1 = RestrictedCourse.objects.create(course_key=course_id) CountryAccessRule.objects.create( restricted_course=restricted_course1, - rule_type=WHITE_LIST, + rule_type=CountryAccessRule.WHITELIST_RULE, country=country ) with self.assertRaises(IntegrityError): CountryAccessRule.objects.create( restricted_course=restricted_course1, - rule_type=BLACK_LIST, + rule_type=CountryAccessRule.BLACKLIST_RULE, country=country ) def test_country_access_list_cache_with_save_delete(self): - course_id = SlashSeparatedCourseKey('abc', '123', 'doremi') + course_id = CourseLocator('abc', '123', 'doremi') country = Country.objects.create(country='NZ') restricted_course1 = RestrictedCourse.objects.create(course_key=course_id) course = CountryAccessRule.objects.create( restricted_course=restricted_course1, - rule_type=WHITE_LIST, + rule_type=CountryAccessRule.WHITELIST_RULE, country=country ) @@ -227,8 +225,123 @@ class CountryAccessRuleTest(TestCase): with self.assertNumQueries(0): CountryAccessRule.check_country_access(course_id, 'NZ') - # deleting an object will delete cache also.and hit db on - # get the country access lists for course + # Deleting an object will invalidate the cache course.delete() with self.assertNumQueries(1): CountryAccessRule.check_country_access(course_id, 'NZ') + + +class CourseAccessRuleHistoryTest(TestCase): + """Test course access rule history. """ + + def setUp(self): + self.course_key = CourseLocator('edx', 'DemoX', 'Demo_Course') + self.restricted_course = RestrictedCourse.objects.create(course_key=self.course_key) + self.countries = { + 'US': Country.objects.create(country='US'), + 'AU': Country.objects.create(country='AU') + } + + def test_course_access_history_no_rules(self): + self._assert_history([]) + self.restricted_course.delete() + self._assert_history_deleted() + + def test_course_access_history_with_rules(self): + # Add one rule + us_rule = CountryAccessRule.objects.create( + restricted_course=self.restricted_course, + country=self.countries['US'], + rule_type=CountryAccessRule.WHITELIST_RULE + ) + self._assert_history([('US', 'whitelist')]) + + # Add another rule + au_rule = CountryAccessRule.objects.create( + restricted_course=self.restricted_course, + country=self.countries['AU'], + rule_type=CountryAccessRule.BLACKLIST_RULE + ) + self._assert_history([ + ('US', 'whitelist'), + ('AU', 'blacklist') + ]) + + # Delete the first rule + us_rule.delete() + self._assert_history([('AU', 'blacklist')]) + + # Delete the second rule + au_rule.delete() + self._assert_history([]) + + def test_course_access_history_delete_all(self): + # Create a rule + CountryAccessRule.objects.create( + restricted_course=self.restricted_course, + country=self.countries['US'], + rule_type=CountryAccessRule.WHITELIST_RULE + ) + + # Delete the course (and, implicitly, all the rules) + self.restricted_course.delete() + self._assert_history_deleted() + + def test_course_access_history_change_message(self): + # Change the message key + self.restricted_course.enroll_msg_key = 'embargo' + self.restricted_course.access_msg_key = 'embargo' + self.restricted_course.save() + + # Expect a history entry with the changed keys + self._assert_history([], enroll_msg='embargo', access_msg='embargo') + + def _assert_history(self, country_rules, enroll_msg='default', access_msg='default'): + """Check the latest history entry. + + Arguments: + country_rules (list): List of rules, each of which are tuples + of the form `(country_code, rule_type)`. + + Keyword Arguments: + enroll_msg (str): The expected enrollment message key. + access_msg (str): The expected access message key. + + Raises: + AssertionError + + """ + record = CourseAccessRuleHistory.objects.latest() + + # Check that the record is for the correct course + self.assertEqual(record.course_key, self.course_key) + + # Load the history entry and verify the message keys + snapshot = json.loads(record.snapshot) + self.assertEqual(snapshot['enroll_msg'], enroll_msg) + self.assertEqual(snapshot['access_msg'], access_msg) + + # For each rule, check that there is an entry + # in the history record. + for (country, rule_type) in country_rules: + self.assertIn( + { + 'country': country, + 'rule_type': rule_type + }, + snapshot['country_rules'] + ) + + # Check that there are no duplicate entries + self.assertEqual(len(snapshot['country_rules']), len(country_rules)) + + def _assert_history_deleted(self): + """Check the latest history entry for a 'DELETED' placeholder. + + Raises: + AssertionError + + """ + record = CourseAccessRuleHistory.objects.latest() + self.assertEqual(record.course_key, self.course_key) + self.assertEqual(record.snapshot, "DELETED")