Merge pull request #30295 from openedx/mikix/embargo-chain

fix: use safer ip-chain logic when checking ip for embargos
This commit is contained in:
Michael Terry
2022-04-25 11:41:12 -04:00
committed by GitHub
9 changed files with 343 additions and 192 deletions

View File

@@ -22,7 +22,6 @@ from django.utils.translation import get_language, to_locale
from django.utils.translation import gettext as _
from django.views.generic.base import View
from edx_django_utils.monitoring.utils import increment
from ipware.ip import get_client_ip
from opaque_keys.edx.keys import CourseKey
from urllib.parse import urljoin # lint-amnesty, pylint: disable=wrong-import-order
@@ -103,12 +102,7 @@ class ChooseModeView(View):
# Check whether the user has access to this course
# based on country access rules.
embargo_redirect = embargo_api.redirect_if_blocked(
course_key,
user=request.user,
ip_address=get_client_ip(request)[0],
url=request.path
)
embargo_redirect = embargo_api.redirect_if_blocked(request, course_key)
if embargo_redirect:
return redirect(embargo_redirect)

View File

@@ -31,7 +31,6 @@ from edx_django_utils import monitoring as monitoring_utils
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser # lint-amnesty, pylint: disable=wrong-import-order
from eventtracking import tracker
from ipware.ip import get_client_ip
# Note that this lives in LMS, so this dependency should be refactored.
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
@@ -365,10 +364,7 @@ def change_enrollment(request, check_access=True):
# This can occur if the user's IP is on a global blacklist
# or if the user is enrolling in a country in which the course
# is not available.
redirect_url = embargo_api.redirect_if_blocked(
course_id, user=user, ip_address=get_client_ip(request)[0],
url=request.path
)
redirect_url = embargo_api.redirect_if_blocked(request, course_id)
if redirect_url:
return HttpResponse(redirect_url)

View File

@@ -22,7 +22,6 @@ from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from django.views.generic.base import View
from edx_rest_api_client.exceptions import SlumberBaseException
from ipware.ip import get_client_ip
from opaque_keys.edx.keys import CourseKey
from rest_framework.response import Response
from rest_framework.views import APIView
@@ -235,12 +234,7 @@ class PayAndVerifyView(View):
# Check whether the user has access to this course
# based on country access rules.
redirect_url = embargo_api.redirect_if_blocked(
course_key,
user=request.user,
ip_address=get_client_ip(request)[0],
url=request.path
)
redirect_url = embargo_api.redirect_if_blocked(request, course_key)
if redirect_url:
return redirect(redirect_url)

View File

@@ -7,34 +7,53 @@ This API is exposed via the middleware(emabargo/middileware.py) layer but may be
"""
import logging
from typing import List, Optional
from django.conf import settings
from django.core.cache import cache
from ipware.ip import get_client_ip
from opaque_keys.edx.keys import CourseKey
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from common.djangoapps.student.auth import has_course_author_access
from openedx.core import types
from openedx.core.djangoapps.geoinfo.api import country_code_from_ip
from openedx.core.djangoapps.util import ip
from .models import CountryAccessRule, RestrictedCourse
log = logging.getLogger(__name__)
def redirect_if_blocked(course_key, access_point='enrollment', **kwargs):
"""Redirect if the user does not have access to the course. In case of blocked if access_point
is not enrollment and course has enabled is_disabled_access_check then user can view that course.
def redirect_if_blocked(
request: Request,
course_key: CourseKey,
access_point: str = 'enrollment',
user: Optional[types.User] = None,
) -> Optional[str]:
"""
Redirect if the user does not have access to the course.
Even if the user would normally be blocked, if the given access_point is 'courseware' and the course has enabled
the `is_disabled_access_check` flag, then the user can still view that course.
Arguments:
course_key (CourseKey): Location of the course the user is trying to access.
Keyword Arguments:
Same as `check_course_access` and `message_url_path`
request: The current request to be checked.
course_key: Location of the course the user is trying to access.
access_point: Type of page being accessed (e.g. 'courseware', 'enrollment', etc)
user: User to check for (uses request.user if None)
Returns:
If blocked, a URL path to a page explaining why the user was blocked. Else None.
"""
if settings.FEATURES.get('EMBARGO'):
is_blocked = not check_course_access(course_key, **kwargs)
if ip.USE_LEGACY_IP.is_enabled():
client_ips = [ip.get_legacy_ip(request)]
else:
client_ips = ip.get_all_client_ips(request)
user = user or request.user
is_blocked = not check_course_access(course_key, user=user, ip_addresses=client_ips, url=request.path)
if is_blocked:
if access_point == "courseware":
if not RestrictedCourse.is_disabled_access_check(course_key):
@@ -43,22 +62,23 @@ def redirect_if_blocked(course_key, access_point='enrollment', **kwargs):
return message_url_path(course_key, access_point)
def check_course_access(course_key, user=None, ip_address=None, url=None):
def check_course_access(
course_key: CourseKey,
user: Optional[types.User] = None,
ip_addresses: Optional[List[str]] = None,
url: Optional[str] = None,
) -> bool:
"""
Check is the user with this ip_address has access to the given course
Check is the user with this ip_addresses chain has access to the given course
Arguments:
course_key (CourseKey): Location of the course the user is trying to access.
Keyword Arguments:
user (User): The user making the request. Can be None, in which case
the user's profile country will not be checked.
ip_address (str): The IP address of the request.
url (str): The URL the user is trying to access. Used in
log messages.
course_key: Location of the course the user is trying to access.
user: The user making the request. Can be None, in which case the user's profile country will not be checked.
ip_addresses: The full external chain of IP addresses of the request.
url: The URL the user is trying to access. Used in log messages.
Returns:
Boolean: True if the user has access to the course; False otherwise
True if the user has access to the course; False otherwise
"""
# No-op if the country access feature is not enabled
@@ -76,25 +96,27 @@ def check_course_access(course_key, user=None, ip_address=None, url=None):
if user is not None and has_course_author_access(user, course_key):
return True
if ip_address is not None:
# Retrieve the country code from the IP address
# and check it against the allowed countries list for a course
user_country_from_ip = country_code_from_ip(ip_address)
if ip_addresses is not None:
# Check every IP address provided and deny access if ANY of them fail our country checks
for ip_address in ip_addresses:
# Retrieve the country code from the IP address
# and check it against the allowed countries list for a course
user_country_from_ip = country_code_from_ip(ip_address)
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
log.info(
(
"Blocking user %s from accessing course %s at %s "
"because the user's IP address %s appears to be "
"located in %s."
),
getattr(user, 'id', '<Not Authenticated>'),
course_key,
url,
ip_address,
user_country_from_ip
)
return False
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
log.info(
(
"Blocking user %s from accessing course %s at %s "
"because the user's IP address %s appears to be "
"located in %s."
),
getattr(user, 'id', '<Not Authenticated>'),
course_key,
url,
ip_address,
user_country_from_ip
)
return False
if user is not None:
# Retrieve the country code from the user's profile
@@ -114,19 +136,19 @@ def check_course_access(course_key, user=None, ip_address=None, url=None):
return True
def message_url_path(course_key, access_point):
"""Determine the URL path for the message explaining why the user was blocked.
def message_url_path(course_key: CourseKey, access_point: str) -> str:
"""
Determine the URL path for the message explaining why the user was blocked.
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
module for more details.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
course_key: The location of the course.
access_point: How the user was trying to access the course. Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
The URL path to a page explaining why the user was blocked.
Raises:
InvalidAccessPoint: Raised if access_point is not a supported value.
@@ -135,7 +157,7 @@ def message_url_path(course_key, access_point):
return RestrictedCourse.message_url_path(course_key, access_point)
def _get_user_country_from_profile(user):
def _get_user_country_from_profile(user: types.User) -> str:
"""
Check whether the user is embargoed based on the country code in the user's profile.
@@ -159,28 +181,27 @@ def _get_user_country_from_profile(user):
return profile_country
def get_embargo_response(request, course_id, user):
def get_embargo_response(request: Request, course_key: CourseKey, user: types.User) -> Optional[Response]:
"""
Check whether any country access rules block the user from enrollment.
Args:
request (HttpRequest): The request object
course_id (str): The requested course ID
user (str): The current user object
request: The request object
course_key: The requested course ID
user: The current user object
Returns:
HttpResponse: Response of the embargo page if embargoed, None if not
Response of the embargo page if embargoed, None if not
"""
redirect_url = redirect_if_blocked(
course_id, user=user, ip_address=get_client_ip(request)[0], url=request.path)
redirect_url = redirect_if_blocked(request, course_key, user=user)
if redirect_url:
return Response(
status=status.HTTP_403_FORBIDDEN,
data={
"message": (
"Users from this location cannot access the course '{course_id}'."
).format(course_id=course_id),
).format(course_id=str(course_key)),
"user_message_url": request.build_absolute_uri(redirect_url)
}
)

View File

@@ -28,14 +28,17 @@ Usage:
import logging
import re
from typing import Optional
from django.conf import settings
from django.core.exceptions import MiddlewareNotUsed
from django.urls import reverse
from django.utils.deprecation import MiddlewareMixin
from django.shortcuts import redirect
from ipware.ip import get_client_ip
from rest_framework.request import Request
from rest_framework.response import Response
from openedx.core.djangoapps.util import ip
from openedx.core.lib.request_utils import course_id_from_url
from . import api as embargo_api
@@ -64,7 +67,7 @@ class EmbargoMiddleware(MiddlewareMixin):
raise MiddlewareNotUsed()
super().__init__(*args, **kwargs)
def process_request(self, request):
def process_request(self, request: Request) -> Optional[Response]:
"""Block requests based on embargo rules.
This will perform the following checks:
@@ -83,15 +86,24 @@ class EmbargoMiddleware(MiddlewareMixin):
if pattern.match(request.path) is not None:
return None
ip_address = get_client_ip(request)[0]
if ip.USE_LEGACY_IP.is_enabled():
safest_ip_address = ip.get_legacy_ip(request)
all_ip_addresses = [safest_ip_address]
else:
safest_ip_address = ip.get_safest_client_ip(request)
all_ip_addresses = ip.get_all_client_ips(request)
ip_filter = IPFilter.current()
if ip_filter.enabled and ip_address in ip_filter.blacklist_ips:
# When checking if a request is block-listed, we need to check EVERY client IP address in the chain, in case
# a blocked ip tried to hop through other proxies.
blocked_ips = [x for x in all_ip_addresses if x in ip_filter.blacklist_ips]
if ip_filter.enabled and blocked_ips:
log.info(
(
"User %s was blocked from accessing %s "
"because IP address %s is blacklisted."
), request.user.id, request.path, ip_address
), request.user.id, request.path, blocked_ips[0]
)
# If the IP is blacklisted, reject.
@@ -105,13 +117,15 @@ class EmbargoMiddleware(MiddlewareMixin):
)
return redirect(ip_blacklist_url)
elif ip_filter.enabled and ip_address in ip_filter.whitelist_ips:
# When checking if a request is allow-listed, we should only look at the safest client IP address, as the
# others in the chain might be spoofed.
elif ip_filter.enabled and safest_ip_address in ip_filter.whitelist_ips:
log.info(
(
"User %s was allowed access to %s because "
"IP address %s is whitelisted."
),
request.user.id, request.path, ip_address
request.user.id, request.path, safest_ip_address
)
# If the IP is whitelisted, then allow access,
@@ -121,32 +135,24 @@ class EmbargoMiddleware(MiddlewareMixin):
else:
# Otherwise, perform the country access checks.
# This applies only to courseware URLs.
return self.country_access_rules(request.user, ip_address, request.path)
return self.country_access_rules(request)
def country_access_rules(self, user, ip_address, url_path):
def country_access_rules(self, request: Request) -> Optional[Response]:
"""
Check the country access rules for a given course.
Applies only to courseware URLs.
Args:
user (User): The user making the current request.
ip_address (str): The IP address from which the request originated.
url_path (str): The request path.
request: The request to validate against the embargo rules
Returns:
HttpResponse or None
"""
course_id = course_id_from_url(url_path)
course_id = course_id_from_url(request.path)
if course_id:
redirect_url = embargo_api.redirect_if_blocked(
course_id,
user=user,
ip_address=ip_address,
url=url_path,
access_point='courseware'
)
redirect_url = embargo_api.redirect_if_blocked(request, course_id, access_point='courseware')
if redirect_url:
return redirect(redirect_url)

View File

@@ -15,6 +15,7 @@ file and check it in at the same time as your model changes. To do that,
import ipaddress
import json
import logging
from typing import List, Optional
from config_models.models import ConfigurationModel
from django.core.cache import cache
@@ -26,6 +27,7 @@ from django.utils.translation import gettext_lazy
from django_countries import countries
from django_countries.fields import CountryField
from opaque_keys.edx.django.models import CourseKeyField
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.xmodule_django.models import NoneToEmptyManager
@@ -158,37 +160,35 @@ class RestrictedCourse(models.Model):
)
@classmethod
def is_restricted_course(cls, course_id):
def is_restricted_course(cls, course_key: CourseKey) -> bool:
"""
Check if the course is in restricted list
Args:
course_id (str): course_id to look for
course_key: course to look for
Returns:
Boolean
True if course is in restricted course list.
True if the course is in the restricted course list.
"""
return str(course_id) in cls._get_restricted_courses_from_cache()
return str(course_key) in cls._get_restricted_courses_from_cache()
@classmethod
def is_disabled_access_check(cls, course_id):
def is_disabled_access_check(cls, course_key: CourseKey) -> bool:
"""
Check if the course is in restricted list has disabled_access_check
Args:
course_id (str): course_id to look for
course_key: course to look for
Returns:
Boolean
disabled_access_check attribute of restricted course
"""
# checking is_restricted_course method also here to make sure course exists in the list otherwise in case of
# no course found it will throw the key not found error on 'disable_access_check'
return (
cls.is_restricted_course(str(course_id))
and cls._get_restricted_courses_from_cache().get(str(course_id))["disable_access_check"]
cls.is_restricted_course(course_key)
and cls._get_restricted_courses_from_cache().get(str(course_key))["disable_access_check"]
)
@classmethod
@@ -244,7 +244,7 @@ class RestrictedCourse(models.Model):
]
}
def message_key_for_access_point(self, access_point):
def message_key_for_access_point(self, access_point: str) -> Optional[str]:
"""Determine which message to show the user.
The message can be configured per-course and depends
@@ -252,11 +252,10 @@ class RestrictedCourse(models.Model):
(trying to enroll or accessing courseware).
Arguments:
access_point (str): Either "courseware" or "enrollment"
access_point: Either "courseware" or "enrollment"
Returns:
str: The message key. If the access point is not valid,
returns None instead.
The message key. If the access point is not valid, returns None instead.
"""
if access_point == 'enrollment':
@@ -268,19 +267,18 @@ class RestrictedCourse(models.Model):
return str(self.course_key)
@classmethod
def message_url_path(cls, course_key, access_point):
def message_url_path(cls, course_key: CourseKey, access_point: str) -> str:
"""Determine the URL path for the message explaining why the user was blocked.
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
module for more details.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
course_key: The location of the course.
access_point: How the user was trying to access the course. Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
The URL path to a page explaining why the user was blocked.
Raises:
InvalidAccessPoint: Raised if access_point is not a supported value.
@@ -306,16 +304,15 @@ class RestrictedCourse(models.Model):
return url
@classmethod
def _get_message_url_path_from_db(cls, course_key, access_point):
def _get_message_url_path_from_db(cls, course_key: CourseKey, access_point: str) -> str:
"""Retrieve the "blocked" message from the database.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
course_key: The location of the course.
access_point: How the user was trying to access the course. Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
The URL path to a page explaining why the user was blocked.
"""
# Fallback in case we're not able to find a message path
@@ -355,7 +352,7 @@ class RestrictedCourse(models.Model):
return default_path
@classmethod
def invalidate_cache_for_course(cls, course_key):
def invalidate_cache_for_course(cls, course_key: CourseKey) -> None:
"""Invalidate the caches for the restricted course. """
cache.delete(cls.COURSE_LIST_CACHE_KEY)
log.info("Invalidated cached list of restricted courses.")
@@ -450,18 +447,16 @@ class CountryAccessRule(models.Model):
ALL_COUNTRIES = {code[0] for code in list(countries)}
@classmethod
def check_country_access(cls, course_id, country):
def check_country_access(cls, course_key: CourseKey, country: str) -> bool:
"""
Check if the country is either in whitelist or blacklist of countries for the course_id
Args:
course_id (str): course_id to look for
country (str): A 2 characters code of country
course_key: course to look for
country: A 2 characters code of country
Returns:
Boolean
True if country found in allowed country
otherwise check given country exists in list
True if country found in allowed country, otherwise check given country exists in list
"""
# If the country code is not in the list of all countries,
# we don't want to automatically exclude the user.
@@ -471,25 +466,24 @@ class CountryAccessRule(models.Model):
if country not in cls.ALL_COUNTRIES:
return True
cache_key = cls.CACHE_KEY.format(course_key=course_id)
cache_key = cls.CACHE_KEY.format(course_key=course_key)
allowed_countries = cache.get(cache_key)
if allowed_countries is None:
allowed_countries = cls._get_country_access_list(course_id)
allowed_countries = cls._get_country_access_list(course_key)
cache.set(cache_key, allowed_countries)
return country == '' or country in allowed_countries
@classmethod
def _get_country_access_list(cls, course_id):
def _get_country_access_list(cls, course_key: CourseKey) -> List[str]:
"""
if a course is blacklist for two countries then course can be accessible from
any where except these two countries.
if a course is whitelist for two countries then course can be accessible from
these countries only.
Args:
course_id (str): course_id to look for
course_key: course to look for
Returns:
List
Consolidated list of accessible countries for given course
"""
@@ -498,7 +492,7 @@ class CountryAccessRule(models.Model):
# Retrieve all rules in one database query, performing the "join" with the Country table
rules_for_course = CountryAccessRule.objects.select_related('country').filter(
restricted_course__course_key=course_id
restricted_course__course_key=course_key
)
# Filter the rules into a whitelist and blacklist in one pass
@@ -529,7 +523,7 @@ class CountryAccessRule(models.Model):
)
@classmethod
def invalidate_cache_for_course(cls, course_key):
def invalidate_cache_for_course(cls, course_key: CourseKey) -> None:
"""Invalidate the cache. """
cache_key = cls.CACHE_KEY.format(course_key=course_key)
cache.delete(cache_key)

View File

@@ -12,23 +12,24 @@ import ddt
import pytest
from django.conf import settings
from django.test.utils import override_settings
from django.core.cache import cache
from django.db import connection
from django.test.client import RequestFactory
from django.test.utils import override_settings
from edx_toggles.toggles.testutils import override_waffle_switch
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase, mixed_store_config
from openedx.core.djangolib.testing.utils import skip_unless_lms
from common.djangoapps.student.tests.factories import UserFactory
from xmodule.modulestore.tests.factories import CourseFactory # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.tests.django_utils import ( # lint-amnesty, pylint: disable=wrong-import-order
ModuleStoreTestCase, mixed_store_config
)
from common.djangoapps.student.roles import (
GlobalStaff, CourseRole, OrgRole,
CourseStaffRole, CourseInstructorRole,
OrgStaffRole, OrgInstructorRole
)
from common.djangoapps.util.testing import UrlResetMixin
from openedx.core.djangoapps.util.ip import USE_LEGACY_IP
from ..models import (
RestrictedCourse, Country, CountryAccessRule,
)
@@ -96,7 +97,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
with self._mock_geoip(ip_country):
# Call the API. Note that the IP address we pass in doesn't
# matter, since we're injecting a mock for geo-location
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
# Verify that the access rules were applied correctly
assert result == allow_access
@@ -110,7 +111,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
# The user is set to None, because the user has not been authenticated.
with self._mock_geoip(""):
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
result = embargo_api.check_course_access(self.course.id, ip_addresses=['0.0.0.0'])
assert result
def test_no_user_blocked(self):
@@ -122,7 +123,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
with self._mock_geoip('US'):
# The user is set to None, because the user has not been authenticated.
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
result = embargo_api.check_course_access(self.course.id, ip_addresses=['0.0.0.0'])
assert not result
def test_course_not_restricted(self):
@@ -130,18 +131,18 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
# so all access checks should be skipped.
unrestricted_course = CourseFactory.create()
with self.assertNumQueries(1):
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_addresses=['0.0.0.0'])
# The second check should require no database queries
with self.assertNumQueries(0):
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_addresses=['0.0.0.0'])
def test_ip_v6(self):
# Test the scenario that will go through every check
# (restricted course, but pass all the checks)
with self._mock_geoip('US'):
result = embargo_api.check_course_access(self.course.id, user=self.user,
ip_address='FE80::0202:B3FF:FE1E:8329')
ip_addresses=['FE80::0202:B3FF:FE1E:8329'])
assert result
def test_country_access_fallback_to_continent_code(self):
@@ -149,7 +150,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
# instead of a country code. In this case, we should
# allow the user access.
with self._mock_geoip('EU'):
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
assert result
@mock.patch.dict(settings.FEATURES, {'EMBARGO': True})
@@ -167,7 +168,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
# Verify that we can check the user's access without error
with self._mock_geoip('US'):
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
assert result
def test_caching(self):
@@ -177,20 +178,20 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
# This is the worst case, so it will hit all of the
# caching code.
with self.assertNumQueries(3):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
with self.assertNumQueries(0):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
def test_caching_no_restricted_courses(self):
RestrictedCourse.objects.all().delete()
cache.clear()
with self.assertNumQueries(1):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
with self.assertNumQueries(0):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
@ddt.data(
GlobalStaff,
@@ -209,7 +210,7 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
# Appear to make a request from an IP in the blocked country
with self._mock_geoip('US'):
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
# Expect that the user is blocked, because the user isn't staff
assert not result, "User should not have access because the user isn't staff."
@@ -227,10 +228,54 @@ class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
# Now the user should have access
with self._mock_geoip('US'):
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_addresses=['0.0.0.0'])
assert result, 'User should have access because the user is staff.'
@ddt.data(
# (Note that any '0.x.x.x' IP _should_ be blocked in this test.)
# ips, legacy, allow access
(['0.0.0.0', '1.1.1.1'], True, False), # legacy looks at first IP and blocks
(['1.1.1.1', '0.0.0.0'], True, True), # legacy fails to look at later IPs and allows
(['1.1.1.1', '2.2.2.2'], False, True), # normal chain of access
(['1.1.1.1', '0.0.0.0', '2.2.2.2'], False, False), # tried to sneak a blocked IP in, but we caught it
)
@ddt.unpack
@mock.patch('openedx.core.djangoapps.embargo.api.country_code_from_ip')
def test_redirect_if_blocked_ips(self, ips, use_legacy, allow_access, mock_country):
# Block the US
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country='US')
)
# Treat any ip that starts with zero as from the US
mock_country.side_effect = lambda x: 'US' if x.startswith('0.') else 'XX'
request = RequestFactory().get('', HTTP_X_FORWARDED_FOR=','.join(ips))
request.user = self.user
with override_waffle_switch(USE_LEGACY_IP, use_legacy):
assert (embargo_api.redirect_if_blocked(request, self.course.id) is None) == allow_access
@ddt.data(
# access point, check disabled, allow access
('enrollment', False, False), # 'enrollment' never changes blocked status
('enrollment', True, False), # 'enrollment' never changes blocked status
('courseware', False, False), # 'courseware' normally also leaves blocked status in place
('courseware', True, True), # Unless the access check has been disabled, then we allow them
)
@ddt.unpack
@mock.patch('openedx.core.djangoapps.embargo.api.check_course_access', return_value=False)
def test_redirect_if_blocked_courseware(self, access_point, check_disabled, allow_access, _mock_access):
self.restricted_course.disable_access_check = check_disabled
self.restricted_course.save()
request = RequestFactory().get('')
maybe_url = embargo_api.redirect_if_blocked(request, self.course.id, access_point=access_point, user=self.user)
assert (maybe_url is None) == allow_access
@contextmanager
def _mock_geoip(self, country_code):
"""

View File

@@ -9,12 +9,14 @@ from config_models.models import cache as config_cache
from django.conf import settings
from django.core.cache import cache as django_cache
from django.urls import reverse
from edx_toggles.toggles.testutils import override_waffle_switch
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from openedx.core.djangolib.testing.utils import skip_unless_lms
from common.djangoapps.student.tests.factories import UserFactory
from common.djangoapps.util.testing import UrlResetMixin
from openedx.core.djangoapps.util.ip import USE_LEGACY_IP
from openedx.core.djangolib.testing.utils import skip_unless_lms
from ..models import IPFilter, RestrictedCourse
from ..test_utils import restrict_course
@@ -77,18 +79,18 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase):
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@ddt.data(
# request_ip, blacklist, whitelist, is_enabled, allow_access
('173.194.123.35', ['173.194.123.35'], [], True, False),
('173.194.123.35', ['173.194.0.0/16'], [], True, False),
('173.194.123.35', ['127.0.0.0/32', '173.194.0.0/16'], [], True, False),
('173.195.10.20', ['173.194.0.0/16'], [], True, True),
('173.194.123.35', ['173.194.0.0/16'], ['173.194.0.0/16'], True, False),
('173.194.123.35', [], ['173.194.0.0/16'], True, True),
('192.178.2.3', [], ['173.194.0.0/16'], True, True),
('173.194.123.35', ['173.194.123.35'], [], False, True),
# request ip chain, blacklist, whitelist, is_enabled, allow_access
(['192.178.2.3'], [], [], True, True), # confirm that test setup & no config allows users by default
(['173.194.123.35'], ['173.194.123.35'], [], True, False),
(['173.194.123.35'], ['173.194.0.0/16'], [], True, False),
(['173.194.123.35'], ['127.0.0.0/32', '173.194.0.0/16'], [], True, False),
(['173.195.10.20'], ['173.194.0.0/16'], [], True, True),
(['173.194.123.35'], ['173.194.0.0/16'], ['173.194.0.0/16'], True, False), # blacklist checked before whitelist
(['173.194.123.35', '192.178.2.3'], ['173.194.123.35'], [], True, False), # earlier ip can still be blocked
(['173.194.123.35'], ['173.194.123.35'], [], False, True), # blacklist disabled
)
@ddt.unpack
def test_ip_access_rules(self, request_ip, blacklist, whitelist, is_enabled, allow_access):
def test_ip_blacklist_rules(self, request_ips, blacklist, whitelist, is_enabled, allow_access):
# Ensure that IP blocking works for anonymous users
self.client.logout()
@@ -101,9 +103,9 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase):
# Check that access is enforced
response = self.client.get(
"/",
HTTP_X_FORWARDED_FOR=request_ip,
REMOTE_ADDR=request_ip
self.courseware_url,
HTTP_X_FORWARDED_FOR=','.join(request_ips),
REMOTE_ADDR=request_ips[-1],
)
if allow_access:
@@ -118,6 +120,127 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase):
)
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@ddt.data(
# request ip chain, blacklist, whitelist, is_enabled, allow_access
(['192.178.2.3'], [], [], True, False), # confirm that test setup & no config blocks users by default
(['173.194.123.35', '192.178.2.3'], [], ['173.194.123.35'], True, False), # whitelist only looks at last ip
(['192.178.2.3', '173.194.123.35'], [], ['173.194.0.0/16'], True, True),
(['192.178.2.3'], [], ['173.194.0.0/16'], True, False),
(['173.194.123.35'], [], ['173.194.123.35'], False, False), # whitelist disabled
)
@ddt.unpack
def test_ip_whitelist_rules(self, request_ips, blacklist, whitelist, is_enabled, allow_access):
# Ensure that IP blocking works for anonymous users
self.client.logout()
# Set up the IP rules
IPFilter.objects.create(
blacklist=", ".join(blacklist),
whitelist=", ".join(whitelist),
enabled=is_enabled
)
# Check that access is enforced (restrict course by default, so that allow-list logic is actually tested)
with restrict_course(self.course.id):
response = self.client.get(
self.courseware_url,
HTTP_X_FORWARDED_FOR=','.join(request_ips),
REMOTE_ADDR=request_ips[-1],
)
if allow_access:
assert response.status_code == 200
else:
redirect_url = reverse(
'embargo:blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'default'
}
)
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@override_waffle_switch(USE_LEGACY_IP, True)
@ddt.data(
# request ip chain, blacklist, whitelist, allow_access
(['192.178.2.3'], [], [], False), # confirm that test setup & no config blocks users by default
(['173.194.123.35', '192.178.2.3'], [], ['192.178.2.3'], False), # whitelist ignores last (safest) ip
(['173.194.123.35', '192.178.2.3'], [], ['173.194.0.0/16'], True), # whitelist does look at first ip though
)
@ddt.unpack
def test_ip_legacy_whitelist_rules(self, request_ips, blacklist, whitelist, allow_access):
# Ensure that IP blocking works for anonymous users
self.client.logout()
# Set up the IP rules
IPFilter.objects.create(
blacklist=", ".join(blacklist),
whitelist=", ".join(whitelist),
enabled=True,
)
# Check that access is enforced (restrict course by default, so that allow-list logic is actually tested)
with restrict_course(self.course.id):
response = self.client.get(
self.courseware_url,
HTTP_X_FORWARDED_FOR=','.join(request_ips),
REMOTE_ADDR=request_ips[-1],
)
if allow_access:
assert response.status_code == 200
else:
redirect_url = reverse(
'embargo:blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'default',
}
)
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@override_waffle_switch(USE_LEGACY_IP, True)
@ddt.data(
# request ip chain, blacklist, whitelist, allow_access
(['192.178.2.3'], [], [], True), # confirm that test setup & no config allows users by default
(['173.194.123.35', '192.178.2.3'], ['192.178.2.3'], [], True), # blacklist ignores last (safest) ip
(['173.194.123.35', '192.178.2.3'], ['173.194.123.35'], [], False), # blacklist looks at first though
(['192.178.2.3'], ['192.178.2.3'], ['192.178.2.3'], False), # blacklist overrides whitelist
)
@ddt.unpack
def test_ip_legacy_blacklist_rules(self, request_ips, blacklist, whitelist, allow_access):
# Ensure that IP blocking works for anonymous users
self.client.logout()
# Set up the IP rules
IPFilter.objects.create(
blacklist=", ".join(blacklist),
whitelist=", ".join(whitelist),
enabled=True,
)
# Check that access is enforced
response = self.client.get(
self.courseware_url,
HTTP_X_FORWARDED_FOR=','.join(request_ips),
REMOTE_ADDR=request_ips[-1],
)
if allow_access:
assert response.status_code == 200
else:
redirect_url = reverse(
'embargo:blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'embargo',
}
)
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'EMBARGO': True})
@ddt.data(
('courseware', 'default'),
@@ -146,26 +269,3 @@ class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase):
REMOTE_ADDR="192.168.10.20"
)
assert response.status_code == 200
@patch.dict(settings.FEATURES, {'EMBARGO': True})
def test_whitelist_ip_skips_country_access_checks(self):
# Whitelist an IP address
IPFilter.objects.create(
whitelist="192.168.10.20",
enabled=True
)
# Set up country access rules so the user would
# be restricted from the course.
with restrict_course(self.course.id):
# Make a request from the whitelisted IP address
response = self.client.get(
self.courseware_url,
HTTP_X_FORWARDED_FOR="192.168.10.20",
REMOTE_ADDR="192.168.10.20"
)
# Expect that we were still able to access the page,
# even though we would have been blocked by country
# access rules.
assert response.status_code == 200

View File

@@ -45,11 +45,12 @@ class CheckCourseAccessView(APIView): # lint-amnesty, pylint: disable=missing-c
if course_ids and user and user_ip_address:
for course_id in course_ids:
try:
if not check_course_access(CourseKey.from_string(course_id), user, user_ip_address):
response['access'] = False
break
except InvalidKeyError:
raise ValidationError('Invalid course_ids') # lint-amnesty, pylint: disable=raise-missing-from
course_key = CourseKey.from_string(course_id)
except InvalidKeyError as exc:
raise ValidationError('Invalid course_ids') from exc
if not check_course_access(course_key, user=user, ip_addresses=[user_ip_address]):
response['access'] = False
break
else:
raise ValidationError('Missing parameters')