From b8ecfed67dc0520b8c4d95de3096b35acc083611 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Thu, 21 Apr 2022 15:21:28 -0400 Subject: [PATCH] fix: use safer ip-chain logic when checking ip for embargos Specifically: - check ALL ip addresses in the client ip chain for blocking - check RIGHTMOST ip address in the client ip chain for allowing Before, we always checked the LEFTMOST ip address in both cases. AA-1234 --- common/djangoapps/course_modes/views.py | 8 +- common/djangoapps/student/views/management.py | 6 +- lms/djangoapps/verify_student/views.py | 8 +- openedx/core/djangoapps/embargo/api.py | 127 +++++++------ openedx/core/djangoapps/embargo/middleware.py | 46 +++-- openedx/core/djangoapps/embargo/models.py | 70 ++++--- .../core/djangoapps/embargo/tests/test_api.py | 85 +++++++-- .../embargo/tests/test_middleware.py | 174 ++++++++++++++---- openedx/core/djangoapps/embargo/views.py | 11 +- 9 files changed, 343 insertions(+), 192 deletions(-) diff --git a/common/djangoapps/course_modes/views.py b/common/djangoapps/course_modes/views.py index 7958a480ed..7569f4d80c 100644 --- a/common/djangoapps/course_modes/views.py +++ b/common/djangoapps/course_modes/views.py @@ -22,7 +22,6 @@ from django.utils.translation import get_language, to_locale from django.utils.translation import gettext as _ from django.views.generic.base import View from edx_django_utils.monitoring.utils import increment -from ipware.ip import get_client_ip from opaque_keys.edx.keys import CourseKey from urllib.parse import urljoin # lint-amnesty, pylint: disable=wrong-import-order @@ -103,12 +102,7 @@ class ChooseModeView(View): # Check whether the user has access to this course # based on country access rules. - embargo_redirect = embargo_api.redirect_if_blocked( - course_key, - user=request.user, - ip_address=get_client_ip(request)[0], - url=request.path - ) + embargo_redirect = embargo_api.redirect_if_blocked(request, course_key) if embargo_redirect: return redirect(embargo_redirect) diff --git a/common/djangoapps/student/views/management.py b/common/djangoapps/student/views/management.py index 66b9d61094..1d918ac57a 100644 --- a/common/djangoapps/student/views/management.py +++ b/common/djangoapps/student/views/management.py @@ -31,7 +31,6 @@ from edx_django_utils import monitoring as monitoring_utils from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser # lint-amnesty, pylint: disable=wrong-import-order from eventtracking import tracker -from ipware.ip import get_client_ip # Note that this lives in LMS, so this dependency should be refactored. from opaque_keys import InvalidKeyError from opaque_keys.edx.keys import CourseKey @@ -365,10 +364,7 @@ def change_enrollment(request, check_access=True): # This can occur if the user's IP is on a global blacklist # or if the user is enrolling in a country in which the course # is not available. - redirect_url = embargo_api.redirect_if_blocked( - course_id, user=user, ip_address=get_client_ip(request)[0], - url=request.path - ) + redirect_url = embargo_api.redirect_if_blocked(request, course_id) if redirect_url: return HttpResponse(redirect_url) diff --git a/lms/djangoapps/verify_student/views.py b/lms/djangoapps/verify_student/views.py index 62c0b75293..5602816f39 100644 --- a/lms/djangoapps/verify_student/views.py +++ b/lms/djangoapps/verify_student/views.py @@ -22,7 +22,6 @@ from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_POST from django.views.generic.base import View from edx_rest_api_client.exceptions import SlumberBaseException -from ipware.ip import get_client_ip from opaque_keys.edx.keys import CourseKey from rest_framework.response import Response from rest_framework.views import APIView @@ -235,12 +234,7 @@ class PayAndVerifyView(View): # Check whether the user has access to this course # based on country access rules. - redirect_url = embargo_api.redirect_if_blocked( - course_key, - user=request.user, - ip_address=get_client_ip(request)[0], - url=request.path - ) + redirect_url = embargo_api.redirect_if_blocked(request, course_key) if redirect_url: return redirect(redirect_url) diff --git a/openedx/core/djangoapps/embargo/api.py b/openedx/core/djangoapps/embargo/api.py index ba101f9b79..9713e73ed7 100644 --- a/openedx/core/djangoapps/embargo/api.py +++ b/openedx/core/djangoapps/embargo/api.py @@ -7,34 +7,53 @@ This API is exposed via the middleware(emabargo/middileware.py) layer but may be """ import logging +from typing import List, Optional from django.conf import settings from django.core.cache import cache -from ipware.ip import get_client_ip +from opaque_keys.edx.keys import CourseKey from rest_framework import status +from rest_framework.request import Request from rest_framework.response import Response from common.djangoapps.student.auth import has_course_author_access +from openedx.core import types from openedx.core.djangoapps.geoinfo.api import country_code_from_ip +from openedx.core.djangoapps.util import ip from .models import CountryAccessRule, RestrictedCourse log = logging.getLogger(__name__) -def redirect_if_blocked(course_key, access_point='enrollment', **kwargs): - """Redirect if the user does not have access to the course. In case of blocked if access_point - is not enrollment and course has enabled is_disabled_access_check then user can view that course. +def redirect_if_blocked( + request: Request, + course_key: CourseKey, + access_point: str = 'enrollment', + user: Optional[types.User] = None, +) -> Optional[str]: + """ + Redirect if the user does not have access to the course. + + Even if the user would normally be blocked, if the given access_point is 'courseware' and the course has enabled + the `is_disabled_access_check` flag, then the user can still view that course. Arguments: - course_key (CourseKey): Location of the course the user is trying to access. - - Keyword Arguments: - Same as `check_course_access` and `message_url_path` + request: The current request to be checked. + course_key: Location of the course the user is trying to access. + access_point: Type of page being accessed (e.g. 'courseware', 'enrollment', etc) + user: User to check for (uses request.user if None) + Returns: + If blocked, a URL path to a page explaining why the user was blocked. Else None. """ if settings.FEATURES.get('EMBARGO'): - is_blocked = not check_course_access(course_key, **kwargs) + if ip.USE_LEGACY_IP.is_enabled(): + client_ips = [ip.get_legacy_ip(request)] + else: + client_ips = ip.get_all_client_ips(request) + user = user or request.user + is_blocked = not check_course_access(course_key, user=user, ip_addresses=client_ips, url=request.path) if is_blocked: if access_point == "courseware": if not RestrictedCourse.is_disabled_access_check(course_key): @@ -43,22 +62,23 @@ def redirect_if_blocked(course_key, access_point='enrollment', **kwargs): return message_url_path(course_key, access_point) -def check_course_access(course_key, user=None, ip_address=None, url=None): +def check_course_access( + course_key: CourseKey, + user: Optional[types.User] = None, + ip_addresses: Optional[List[str]] = None, + url: Optional[str] = None, +) -> bool: """ - Check is the user with this ip_address has access to the given course + Check is the user with this ip_addresses chain has access to the given course Arguments: - course_key (CourseKey): Location of the course the user is trying to access. - - Keyword Arguments: - user (User): The user making the request. Can be None, in which case - the user's profile country will not be checked. - ip_address (str): The IP address of the request. - url (str): The URL the user is trying to access. Used in - log messages. + course_key: Location of the course the user is trying to access. + user: The user making the request. Can be None, in which case the user's profile country will not be checked. + ip_addresses: The full external chain of IP addresses of the request. + url: The URL the user is trying to access. Used in log messages. Returns: - Boolean: True if the user has access to the course; False otherwise + True if the user has access to the course; False otherwise """ # No-op if the country access feature is not enabled @@ -76,25 +96,27 @@ def check_course_access(course_key, user=None, ip_address=None, url=None): if user is not None and has_course_author_access(user, course_key): return True - if ip_address is not None: - # Retrieve the country code from the IP address - # and check it against the allowed countries list for a course - user_country_from_ip = country_code_from_ip(ip_address) + if ip_addresses is not None: + # Check every IP address provided and deny access if ANY of them fail our country checks + for ip_address in ip_addresses: + # Retrieve the country code from the IP address + # and check it against the allowed countries list for a course + user_country_from_ip = country_code_from_ip(ip_address) - if not CountryAccessRule.check_country_access(course_key, user_country_from_ip): - log.info( - ( - "Blocking user %s from accessing course %s at %s " - "because the user's IP address %s appears to be " - "located in %s." - ), - getattr(user, 'id', ''), - course_key, - url, - ip_address, - user_country_from_ip - ) - return False + if not CountryAccessRule.check_country_access(course_key, user_country_from_ip): + log.info( + ( + "Blocking user %s from accessing course %s at %s " + "because the user's IP address %s appears to be " + "located in %s." + ), + getattr(user, 'id', ''), + course_key, + url, + ip_address, + user_country_from_ip + ) + return False if user is not None: # Retrieve the country code from the user's profile @@ -114,19 +136,19 @@ def check_course_access(course_key, user=None, ip_address=None, url=None): return True -def message_url_path(course_key, access_point): - """Determine the URL path for the message explaining why the user was blocked. +def message_url_path(course_key: CourseKey, access_point: str) -> str: + """ + Determine the URL path for the message explaining why the user was blocked. This is configured per-course. See `RestrictedCourse` in the `embargo.models` module for more details. Arguments: - course_key (CourseKey): The location of the course. - access_point (str): How the user was trying to access the course. - Can be either "enrollment" or "courseware". + course_key: The location of the course. + access_point: How the user was trying to access the course. Can be either "enrollment" or "courseware". Returns: - unicode: The URL path to a page explaining why the user was blocked. + The URL path to a page explaining why the user was blocked. Raises: InvalidAccessPoint: Raised if access_point is not a supported value. @@ -135,7 +157,7 @@ def message_url_path(course_key, access_point): return RestrictedCourse.message_url_path(course_key, access_point) -def _get_user_country_from_profile(user): +def _get_user_country_from_profile(user: types.User) -> str: """ Check whether the user is embargoed based on the country code in the user's profile. @@ -159,28 +181,27 @@ def _get_user_country_from_profile(user): return profile_country -def get_embargo_response(request, course_id, user): +def get_embargo_response(request: Request, course_key: CourseKey, user: types.User) -> Optional[Response]: """ Check whether any country access rules block the user from enrollment. Args: - request (HttpRequest): The request object - course_id (str): The requested course ID - user (str): The current user object + request: The request object + course_key: The requested course ID + user: The current user object Returns: - HttpResponse: Response of the embargo page if embargoed, None if not + Response of the embargo page if embargoed, None if not """ - redirect_url = redirect_if_blocked( - course_id, user=user, ip_address=get_client_ip(request)[0], url=request.path) + redirect_url = redirect_if_blocked(request, course_key, user=user) if redirect_url: return Response( status=status.HTTP_403_FORBIDDEN, data={ "message": ( "Users from this location cannot access the course '{course_id}'." - ).format(course_id=course_id), + ).format(course_id=str(course_key)), "user_message_url": request.build_absolute_uri(redirect_url) } ) diff --git a/openedx/core/djangoapps/embargo/middleware.py b/openedx/core/djangoapps/embargo/middleware.py index d431a36071..4d5e799cf6 100644 --- a/openedx/core/djangoapps/embargo/middleware.py +++ b/openedx/core/djangoapps/embargo/middleware.py @@ -28,14 +28,17 @@ Usage: import logging import re +from typing import Optional from django.conf import settings from django.core.exceptions import MiddlewareNotUsed from django.urls import reverse from django.utils.deprecation import MiddlewareMixin from django.shortcuts import redirect -from ipware.ip import get_client_ip +from rest_framework.request import Request +from rest_framework.response import Response +from openedx.core.djangoapps.util import ip from openedx.core.lib.request_utils import course_id_from_url from . import api as embargo_api @@ -64,7 +67,7 @@ class EmbargoMiddleware(MiddlewareMixin): raise MiddlewareNotUsed() super().__init__(*args, **kwargs) - def process_request(self, request): + def process_request(self, request: Request) -> Optional[Response]: """Block requests based on embargo rules. This will perform the following checks: @@ -83,15 +86,24 @@ class EmbargoMiddleware(MiddlewareMixin): if pattern.match(request.path) is not None: return None - ip_address = get_client_ip(request)[0] + if ip.USE_LEGACY_IP.is_enabled(): + safest_ip_address = ip.get_legacy_ip(request) + all_ip_addresses = [safest_ip_address] + else: + safest_ip_address = ip.get_safest_client_ip(request) + all_ip_addresses = ip.get_all_client_ips(request) + ip_filter = IPFilter.current() - if ip_filter.enabled and ip_address in ip_filter.blacklist_ips: + # When checking if a request is block-listed, we need to check EVERY client IP address in the chain, in case + # a blocked ip tried to hop through other proxies. + blocked_ips = [x for x in all_ip_addresses if x in ip_filter.blacklist_ips] + if ip_filter.enabled and blocked_ips: log.info( ( "User %s was blocked from accessing %s " "because IP address %s is blacklisted." - ), request.user.id, request.path, ip_address + ), request.user.id, request.path, blocked_ips[0] ) # If the IP is blacklisted, reject. @@ -105,13 +117,15 @@ class EmbargoMiddleware(MiddlewareMixin): ) return redirect(ip_blacklist_url) - elif ip_filter.enabled and ip_address in ip_filter.whitelist_ips: + # When checking if a request is allow-listed, we should only look at the safest client IP address, as the + # others in the chain might be spoofed. + elif ip_filter.enabled and safest_ip_address in ip_filter.whitelist_ips: log.info( ( "User %s was allowed access to %s because " "IP address %s is whitelisted." ), - request.user.id, request.path, ip_address + request.user.id, request.path, safest_ip_address ) # If the IP is whitelisted, then allow access, @@ -121,32 +135,24 @@ class EmbargoMiddleware(MiddlewareMixin): else: # Otherwise, perform the country access checks. # This applies only to courseware URLs. - return self.country_access_rules(request.user, ip_address, request.path) + return self.country_access_rules(request) - def country_access_rules(self, user, ip_address, url_path): + def country_access_rules(self, request: Request) -> Optional[Response]: """ Check the country access rules for a given course. Applies only to courseware URLs. Args: - user (User): The user making the current request. - ip_address (str): The IP address from which the request originated. - url_path (str): The request path. + request: The request to validate against the embargo rules Returns: HttpResponse or None """ - course_id = course_id_from_url(url_path) + course_id = course_id_from_url(request.path) if course_id: - redirect_url = embargo_api.redirect_if_blocked( - course_id, - user=user, - ip_address=ip_address, - url=url_path, - access_point='courseware' - ) + redirect_url = embargo_api.redirect_if_blocked(request, course_id, access_point='courseware') if redirect_url: return redirect(redirect_url) diff --git a/openedx/core/djangoapps/embargo/models.py b/openedx/core/djangoapps/embargo/models.py index 653012cedc..3e72699404 100644 --- a/openedx/core/djangoapps/embargo/models.py +++ b/openedx/core/djangoapps/embargo/models.py @@ -15,6 +15,7 @@ file and check it in at the same time as your model changes. To do that, import ipaddress import json import logging +from typing import List, Optional from config_models.models import ConfigurationModel from django.core.cache import cache @@ -26,6 +27,7 @@ from django.utils.translation import gettext_lazy from django_countries import countries from django_countries.fields import CountryField from opaque_keys.edx.django.models import CourseKeyField +from opaque_keys.edx.keys import CourseKey from openedx.core.djangoapps.xmodule_django.models import NoneToEmptyManager @@ -158,37 +160,35 @@ class RestrictedCourse(models.Model): ) @classmethod - def is_restricted_course(cls, course_id): + def is_restricted_course(cls, course_key: CourseKey) -> bool: """ Check if the course is in restricted list Args: - course_id (str): course_id to look for + course_key: course to look for Returns: - Boolean - True if course is in restricted course list. + True if the course is in the restricted course list. """ - return str(course_id) in cls._get_restricted_courses_from_cache() + return str(course_key) in cls._get_restricted_courses_from_cache() @classmethod - def is_disabled_access_check(cls, course_id): + def is_disabled_access_check(cls, course_key: CourseKey) -> bool: """ Check if the course is in restricted list has disabled_access_check Args: - course_id (str): course_id to look for + course_key: course to look for Returns: - Boolean disabled_access_check attribute of restricted course """ # checking is_restricted_course method also here to make sure course exists in the list otherwise in case of # no course found it will throw the key not found error on 'disable_access_check' return ( - cls.is_restricted_course(str(course_id)) - and cls._get_restricted_courses_from_cache().get(str(course_id))["disable_access_check"] + cls.is_restricted_course(course_key) + and cls._get_restricted_courses_from_cache().get(str(course_key))["disable_access_check"] ) @classmethod @@ -244,7 +244,7 @@ class RestrictedCourse(models.Model): ] } - def message_key_for_access_point(self, access_point): + def message_key_for_access_point(self, access_point: str) -> Optional[str]: """Determine which message to show the user. The message can be configured per-course and depends @@ -252,11 +252,10 @@ class RestrictedCourse(models.Model): (trying to enroll or accessing courseware). Arguments: - access_point (str): Either "courseware" or "enrollment" + access_point: Either "courseware" or "enrollment" Returns: - str: The message key. If the access point is not valid, - returns None instead. + The message key. If the access point is not valid, returns None instead. """ if access_point == 'enrollment': @@ -268,19 +267,18 @@ class RestrictedCourse(models.Model): return str(self.course_key) @classmethod - def message_url_path(cls, course_key, access_point): + def message_url_path(cls, course_key: CourseKey, access_point: str) -> str: """Determine the URL path for the message explaining why the user was blocked. This is configured per-course. See `RestrictedCourse` in the `embargo.models` module for more details. Arguments: - course_key (CourseKey): The location of the course. - access_point (str): How the user was trying to access the course. - Can be either "enrollment" or "courseware". + course_key: The location of the course. + access_point: How the user was trying to access the course. Can be either "enrollment" or "courseware". Returns: - unicode: The URL path to a page explaining why the user was blocked. + The URL path to a page explaining why the user was blocked. Raises: InvalidAccessPoint: Raised if access_point is not a supported value. @@ -306,16 +304,15 @@ class RestrictedCourse(models.Model): return url @classmethod - def _get_message_url_path_from_db(cls, course_key, access_point): + def _get_message_url_path_from_db(cls, course_key: CourseKey, access_point: str) -> str: """Retrieve the "blocked" message from the database. Arguments: - course_key (CourseKey): The location of the course. - access_point (str): How the user was trying to access the course. - Can be either "enrollment" or "courseware". + course_key: The location of the course. + access_point: How the user was trying to access the course. Can be either "enrollment" or "courseware". Returns: - unicode: The URL path to a page explaining why the user was blocked. + The URL path to a page explaining why the user was blocked. """ # Fallback in case we're not able to find a message path @@ -355,7 +352,7 @@ class RestrictedCourse(models.Model): return default_path @classmethod - def invalidate_cache_for_course(cls, course_key): + def invalidate_cache_for_course(cls, course_key: CourseKey) -> None: """Invalidate the caches for the restricted course. """ cache.delete(cls.COURSE_LIST_CACHE_KEY) log.info("Invalidated cached list of restricted courses.") @@ -450,18 +447,16 @@ class CountryAccessRule(models.Model): ALL_COUNTRIES = {code[0] for code in list(countries)} @classmethod - def check_country_access(cls, course_id, country): + def check_country_access(cls, course_key: CourseKey, country: str) -> bool: """ Check if the country is either in whitelist or blacklist of countries for the course_id Args: - course_id (str): course_id to look for - country (str): A 2 characters code of country + course_key: course to look for + country: A 2 characters code of country Returns: - Boolean - True if country found in allowed country - otherwise check given country exists in list + True if country found in allowed country, otherwise check given country exists in list """ # If the country code is not in the list of all countries, # we don't want to automatically exclude the user. @@ -471,25 +466,24 @@ class CountryAccessRule(models.Model): if country not in cls.ALL_COUNTRIES: return True - cache_key = cls.CACHE_KEY.format(course_key=course_id) + cache_key = cls.CACHE_KEY.format(course_key=course_key) allowed_countries = cache.get(cache_key) if allowed_countries is None: - allowed_countries = cls._get_country_access_list(course_id) + allowed_countries = cls._get_country_access_list(course_key) cache.set(cache_key, allowed_countries) return country == '' or country in allowed_countries @classmethod - def _get_country_access_list(cls, course_id): + def _get_country_access_list(cls, course_key: CourseKey) -> List[str]: """ if a course is blacklist for two countries then course can be accessible from any where except these two countries. if a course is whitelist for two countries then course can be accessible from these countries only. Args: - course_id (str): course_id to look for + course_key: course to look for Returns: - List Consolidated list of accessible countries for given course """ @@ -498,7 +492,7 @@ class CountryAccessRule(models.Model): # Retrieve all rules in one database query, performing the "join" with the Country table rules_for_course = CountryAccessRule.objects.select_related('country').filter( - restricted_course__course_key=course_id + restricted_course__course_key=course_key ) # Filter the rules into a whitelist and blacklist in one pass @@ -529,7 +523,7 @@ class CountryAccessRule(models.Model): ) @classmethod - def invalidate_cache_for_course(cls, course_key): + def invalidate_cache_for_course(cls, course_key: CourseKey) -> None: """Invalidate the cache. """ cache_key = cls.CACHE_KEY.format(course_key=course_key) cache.delete(cache_key) diff --git a/openedx/core/djangoapps/embargo/tests/test_api.py b/openedx/core/djangoapps/embargo/tests/test_api.py index 67b9c02628..4f8b603f31 100644 --- a/openedx/core/djangoapps/embargo/tests/test_api.py +++ b/openedx/core/djangoapps/embargo/tests/test_api.py @@ -12,23 +12,24 @@ import ddt import pytest from django.conf import settings -from django.test.utils import override_settings from django.core.cache import cache from django.db import connection +from django.test.client import RequestFactory +from django.test.utils import override_settings +from edx_toggles.toggles.testutils import override_waffle_switch +from xmodule.modulestore.tests.factories import CourseFactory +from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase, mixed_store_config from openedx.core.djangolib.testing.utils import skip_unless_lms from common.djangoapps.student.tests.factories import UserFactory -from xmodule.modulestore.tests.factories import CourseFactory # lint-amnesty, pylint: disable=wrong-import-order -from xmodule.modulestore.tests.django_utils import ( # lint-amnesty, pylint: disable=wrong-import-order - ModuleStoreTestCase, mixed_store_config -) from common.djangoapps.student.roles import ( GlobalStaff, CourseRole, OrgRole, CourseStaffRole, CourseInstructorRole, OrgStaffRole, OrgInstructorRole ) - from common.djangoapps.util.testing import UrlResetMixin +from openedx.core.djangoapps.util.ip import USE_LEGACY_IP + from ..models import ( RestrictedCourse, Country, CountryAccessRule, ) @@ -96,7 +97,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): with self._mock_geoip(ip_country): # Call the API. Note that the IP address we pass in doesn't # matter, since we're injecting a mock for geo-location - result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) # Verify that the access rules were applied correctly assert result == allow_access @@ -110,7 +111,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): # The user is set to None, because the user has not been authenticated. with self._mock_geoip(""): - result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0') + result = embargo_api.check_course_access(self.course.id, ip_addresses=['0.0.0.0']) assert result def test_no_user_blocked(self): @@ -122,7 +123,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): with self._mock_geoip('US'): # The user is set to None, because the user has not been authenticated. - result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0') + result = embargo_api.check_course_access(self.course.id, ip_addresses=['0.0.0.0']) assert not result def test_course_not_restricted(self): @@ -130,18 +131,18 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): # so all access checks should be skipped. unrestricted_course = CourseFactory.create() with self.assertNumQueries(1): - embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0') + embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_addresses=['0.0.0.0']) # The second check should require no database queries with self.assertNumQueries(0): - embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0') + embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_addresses=['0.0.0.0']) def test_ip_v6(self): # Test the scenario that will go through every check # (restricted course, but pass all the checks) with self._mock_geoip('US'): result = embargo_api.check_course_access(self.course.id, user=self.user, - ip_address='FE80::0202:B3FF:FE1E:8329') + ip_addresses=['FE80::0202:B3FF:FE1E:8329']) assert result def test_country_access_fallback_to_continent_code(self): @@ -149,7 +150,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): # instead of a country code. In this case, we should # allow the user access. with self._mock_geoip('EU'): - result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) assert result @mock.patch.dict(settings.FEATURES, {'EMBARGO': True}) @@ -167,7 +168,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): # Verify that we can check the user's access without error with self._mock_geoip('US'): - result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) assert result def test_caching(self): @@ -177,20 +178,20 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): # This is the worst case, so it will hit all of the # caching code. with self.assertNumQueries(3): - embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) with self.assertNumQueries(0): - embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) def test_caching_no_restricted_courses(self): RestrictedCourse.objects.all().delete() cache.clear() with self.assertNumQueries(1): - embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) with self.assertNumQueries(0): - embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) @ddt.data( GlobalStaff, @@ -209,7 +210,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): # Appear to make a request from an IP in the blocked country with self._mock_geoip('US'): - result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) # Expect that the user is blocked, because the user isn't staff assert not result, "User should not have access because the user isn't staff." @@ -227,10 +228,54 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase): # Now the user should have access with self._mock_geoip('US'): - result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0') + result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0']) assert result, 'User should have access because the user is staff.' + @ddt.data( + # (Note that any '0.x.x.x' IP _should_ be blocked in this test.) + # ips, legacy, allow access + (['0.0.0.0', '1.1.1.1'], True, False), # legacy looks at first IP and blocks + (['1.1.1.1', '0.0.0.0'], True, True), # legacy fails to look at later IPs and allows + (['1.1.1.1', '2.2.2.2'], False, True), # normal chain of access + (['1.1.1.1', '0.0.0.0', '2.2.2.2'], False, False), # tried to sneak a blocked IP in, but we caught it + ) + @ddt.unpack + @mock.patch('openedx.core.djangoapps.embargo.api.country_code_from_ip') + def test_redirect_if_blocked_ips(self, ips, use_legacy, allow_access, mock_country): + # Block the US + CountryAccessRule.objects.create( + rule_type=CountryAccessRule.BLACKLIST_RULE, + restricted_course=self.restricted_course, + country=Country.objects.get(country='US') + ) + + # Treat any ip that starts with zero as from the US + mock_country.side_effect = lambda x: 'US' if x.startswith('0.') else 'XX' + + request = RequestFactory().get('', HTTP_X_FORWARDED_FOR=','.join(ips)) + request.user = self.user + + with override_waffle_switch(USE_LEGACY_IP, use_legacy): + assert (embargo_api.redirect_if_blocked(request, self.course.id) is None) == allow_access + + @ddt.data( + # access point, check disabled, allow access + ('enrollment', False, False), # 'enrollment' never changes blocked status + ('enrollment', True, False), # 'enrollment' never changes blocked status + ('courseware', False, False), # 'courseware' normally also leaves blocked status in place + ('courseware', True, True), # Unless the access check has been disabled, then we allow them + ) + @ddt.unpack + @mock.patch('openedx.core.djangoapps.embargo.api.check_course_access', return_value=False) + def test_redirect_if_blocked_courseware(self, access_point, check_disabled, allow_access, _mock_access): + self.restricted_course.disable_access_check = check_disabled + self.restricted_course.save() + + request = RequestFactory().get('') + maybe_url = embargo_api.redirect_if_blocked(request, self.course.id, access_point=access_point, user=self.user) + assert (maybe_url is None) == allow_access + @contextmanager def _mock_geoip(self, country_code): """ diff --git a/openedx/core/djangoapps/embargo/tests/test_middleware.py b/openedx/core/djangoapps/embargo/tests/test_middleware.py index bed219bb50..8debf60967 100644 --- a/openedx/core/djangoapps/embargo/tests/test_middleware.py +++ b/openedx/core/djangoapps/embargo/tests/test_middleware.py @@ -9,12 +9,14 @@ from config_models.models import cache as config_cache from django.conf import settings from django.core.cache import cache as django_cache from django.urls import reverse +from edx_toggles.toggles.testutils import override_waffle_switch from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory -from openedx.core.djangolib.testing.utils import skip_unless_lms from common.djangoapps.student.tests.factories import UserFactory from common.djangoapps.util.testing import UrlResetMixin +from openedx.core.djangoapps.util.ip import USE_LEGACY_IP +from openedx.core.djangolib.testing.utils import skip_unless_lms from ..models import IPFilter, RestrictedCourse from ..test_utils import restrict_course @@ -77,18 +79,18 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase): @patch.dict(settings.FEATURES, {'EMBARGO': True}) @ddt.data( - # request_ip, blacklist, whitelist, is_enabled, allow_access - ('173.194.123.35', ['173.194.123.35'], [], True, False), - ('173.194.123.35', ['173.194.0.0/16'], [], True, False), - ('173.194.123.35', ['127.0.0.0/32', '173.194.0.0/16'], [], True, False), - ('173.195.10.20', ['173.194.0.0/16'], [], True, True), - ('173.194.123.35', ['173.194.0.0/16'], ['173.194.0.0/16'], True, False), - ('173.194.123.35', [], ['173.194.0.0/16'], True, True), - ('192.178.2.3', [], ['173.194.0.0/16'], True, True), - ('173.194.123.35', ['173.194.123.35'], [], False, True), + # request ip chain, blacklist, whitelist, is_enabled, allow_access + (['192.178.2.3'], [], [], True, True), # confirm that test setup & no config allows users by default + (['173.194.123.35'], ['173.194.123.35'], [], True, False), + (['173.194.123.35'], ['173.194.0.0/16'], [], True, False), + (['173.194.123.35'], ['127.0.0.0/32', '173.194.0.0/16'], [], True, False), + (['173.195.10.20'], ['173.194.0.0/16'], [], True, True), + (['173.194.123.35'], ['173.194.0.0/16'], ['173.194.0.0/16'], True, False), # blacklist checked before whitelist + (['173.194.123.35', '192.178.2.3'], ['173.194.123.35'], [], True, False), # earlier ip can still be blocked + (['173.194.123.35'], ['173.194.123.35'], [], False, True), # blacklist disabled ) @ddt.unpack - def test_ip_access_rules(self, request_ip, blacklist, whitelist, is_enabled, allow_access): + def test_ip_blacklist_rules(self, request_ips, blacklist, whitelist, is_enabled, allow_access): # Ensure that IP blocking works for anonymous users self.client.logout() @@ -101,9 +103,9 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase): # Check that access is enforced response = self.client.get( - "/", - HTTP_X_FORWARDED_FOR=request_ip, - REMOTE_ADDR=request_ip + self.courseware_url, + HTTP_X_FORWARDED_FOR=','.join(request_ips), + REMOTE_ADDR=request_ips[-1], ) if allow_access: @@ -118,6 +120,127 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase): ) self.assertRedirects(response, redirect_url) + @patch.dict(settings.FEATURES, {'EMBARGO': True}) + @ddt.data( + # request ip chain, blacklist, whitelist, is_enabled, allow_access + (['192.178.2.3'], [], [], True, False), # confirm that test setup & no config blocks users by default + (['173.194.123.35', '192.178.2.3'], [], ['173.194.123.35'], True, False), # whitelist only looks at last ip + (['192.178.2.3', '173.194.123.35'], [], ['173.194.0.0/16'], True, True), + (['192.178.2.3'], [], ['173.194.0.0/16'], True, False), + (['173.194.123.35'], [], ['173.194.123.35'], False, False), # whitelist disabled + ) + @ddt.unpack + def test_ip_whitelist_rules(self, request_ips, blacklist, whitelist, is_enabled, allow_access): + # Ensure that IP blocking works for anonymous users + self.client.logout() + + # Set up the IP rules + IPFilter.objects.create( + blacklist=", ".join(blacklist), + whitelist=", ".join(whitelist), + enabled=is_enabled + ) + + # Check that access is enforced (restrict course by default, so that allow-list logic is actually tested) + with restrict_course(self.course.id): + response = self.client.get( + self.courseware_url, + HTTP_X_FORWARDED_FOR=','.join(request_ips), + REMOTE_ADDR=request_ips[-1], + ) + + if allow_access: + assert response.status_code == 200 + else: + redirect_url = reverse( + 'embargo:blocked_message', + kwargs={ + 'access_point': 'courseware', + 'message_key': 'default' + } + ) + self.assertRedirects(response, redirect_url) + + @patch.dict(settings.FEATURES, {'EMBARGO': True}) + @override_waffle_switch(USE_LEGACY_IP, True) + @ddt.data( + # request ip chain, blacklist, whitelist, allow_access + (['192.178.2.3'], [], [], False), # confirm that test setup & no config blocks users by default + (['173.194.123.35', '192.178.2.3'], [], ['192.178.2.3'], False), # whitelist ignores last (safest) ip + (['173.194.123.35', '192.178.2.3'], [], ['173.194.0.0/16'], True), # whitelist does look at first ip though + ) + @ddt.unpack + def test_ip_legacy_whitelist_rules(self, request_ips, blacklist, whitelist, allow_access): + # Ensure that IP blocking works for anonymous users + self.client.logout() + + # Set up the IP rules + IPFilter.objects.create( + blacklist=", ".join(blacklist), + whitelist=", ".join(whitelist), + enabled=True, + ) + + # Check that access is enforced (restrict course by default, so that allow-list logic is actually tested) + with restrict_course(self.course.id): + response = self.client.get( + self.courseware_url, + HTTP_X_FORWARDED_FOR=','.join(request_ips), + REMOTE_ADDR=request_ips[-1], + ) + + if allow_access: + assert response.status_code == 200 + else: + redirect_url = reverse( + 'embargo:blocked_message', + kwargs={ + 'access_point': 'courseware', + 'message_key': 'default', + } + ) + self.assertRedirects(response, redirect_url) + + @patch.dict(settings.FEATURES, {'EMBARGO': True}) + @override_waffle_switch(USE_LEGACY_IP, True) + @ddt.data( + # request ip chain, blacklist, whitelist, allow_access + (['192.178.2.3'], [], [], True), # confirm that test setup & no config allows users by default + (['173.194.123.35', '192.178.2.3'], ['192.178.2.3'], [], True), # blacklist ignores last (safest) ip + (['173.194.123.35', '192.178.2.3'], ['173.194.123.35'], [], False), # blacklist looks at first though + (['192.178.2.3'], ['192.178.2.3'], ['192.178.2.3'], False), # blacklist overrides whitelist + ) + @ddt.unpack + def test_ip_legacy_blacklist_rules(self, request_ips, blacklist, whitelist, allow_access): + # Ensure that IP blocking works for anonymous users + self.client.logout() + + # Set up the IP rules + IPFilter.objects.create( + blacklist=", ".join(blacklist), + whitelist=", ".join(whitelist), + enabled=True, + ) + + # Check that access is enforced + response = self.client.get( + self.courseware_url, + HTTP_X_FORWARDED_FOR=','.join(request_ips), + REMOTE_ADDR=request_ips[-1], + ) + + if allow_access: + assert response.status_code == 200 + else: + redirect_url = reverse( + 'embargo:blocked_message', + kwargs={ + 'access_point': 'courseware', + 'message_key': 'embargo', + } + ) + self.assertRedirects(response, redirect_url) + @patch.dict(settings.FEATURES, {'EMBARGO': True}) @ddt.data( ('courseware', 'default'), @@ -146,26 +269,3 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase): REMOTE_ADDR="192.168.10.20" ) assert response.status_code == 200 - - @patch.dict(settings.FEATURES, {'EMBARGO': True}) - def test_whitelist_ip_skips_country_access_checks(self): - # Whitelist an IP address - IPFilter.objects.create( - whitelist="192.168.10.20", - enabled=True - ) - - # Set up country access rules so the user would - # be restricted from the course. - with restrict_course(self.course.id): - # Make a request from the whitelisted IP address - response = self.client.get( - self.courseware_url, - HTTP_X_FORWARDED_FOR="192.168.10.20", - REMOTE_ADDR="192.168.10.20" - ) - - # Expect that we were still able to access the page, - # even though we would have been blocked by country - # access rules. - assert response.status_code == 200 diff --git a/openedx/core/djangoapps/embargo/views.py b/openedx/core/djangoapps/embargo/views.py index e801d1b129..d0cb56e06d 100644 --- a/openedx/core/djangoapps/embargo/views.py +++ b/openedx/core/djangoapps/embargo/views.py @@ -45,11 +45,12 @@ class CheckCourseAccessView(APIView): # lint-amnesty, pylint: disable=missing-c if course_ids and user and user_ip_address: for course_id in course_ids: try: - if not check_course_access(CourseKey.from_string(course_id), user, user_ip_address): - response['access'] = False - break - except InvalidKeyError: - raise ValidationError('Invalid course_ids') # lint-amnesty, pylint: disable=raise-missing-from + course_key = CourseKey.from_string(course_id) + except InvalidKeyError as exc: + raise ValidationError('Invalid course_ids') from exc + if not check_course_access(course_key, user=user, ip_addresses=[user_ip_address]): + response['access'] = False + break else: raise ValidationError('Missing parameters')