feat!: Delete all non-legacy IP code (#31000)

The IP chain code has moved to edx-django-utils—except for the legacy-IP code,
which is now in a new module. This will allow other IDAs to use the IP code.

Commit includes some adjustments to the toggle annotation.

Part of <https://github.com/openedx/edx-django-utils/issues/241>.
This commit is contained in:
Tim McCormack
2022-09-21 14:11:34 +00:00
committed by GitHub
parent 7d27864299
commit 4ecd9fe683
15 changed files with 126 additions and 855 deletions

View File

@@ -11,6 +11,7 @@ from typing import List, Optional
from django.conf import settings
from django.core.cache import cache
from edx_django_utils import ip
from opaque_keys.edx.keys import CourseKey
from rest_framework import status
from rest_framework.request import Request
@@ -19,7 +20,7 @@ from rest_framework.response import Response
from common.djangoapps.student.auth import has_course_author_access
from openedx.core import types
from openedx.core.djangoapps.geoinfo.api import country_code_from_ip
from openedx.core.djangoapps.util import ip
from openedx.core.djangoapps.util import legacy_ip
from .models import CountryAccessRule, RestrictedCourse
@@ -48,8 +49,8 @@ def redirect_if_blocked(
If blocked, a URL path to a page explaining why the user was blocked. Else None.
"""
if settings.FEATURES.get('EMBARGO'):
if ip.USE_LEGACY_IP.is_enabled():
client_ips = [ip.get_legacy_ip(request)]
if legacy_ip.USE_LEGACY_IP.is_enabled():
client_ips = [legacy_ip.get_legacy_ip(request)]
else:
client_ips = ip.get_all_client_ips(request)
user = user or request.user

View File

@@ -35,10 +35,11 @@ from django.core.exceptions import MiddlewareNotUsed
from django.urls import reverse
from django.utils.deprecation import MiddlewareMixin
from django.shortcuts import redirect
from edx_django_utils import ip
from rest_framework.request import Request
from rest_framework.response import Response
from openedx.core.djangoapps.util import ip
from openedx.core.djangoapps.util import legacy_ip
from openedx.core.lib.request_utils import course_id_from_url
from . import api as embargo_api
@@ -86,8 +87,8 @@ class EmbargoMiddleware(MiddlewareMixin):
if pattern.match(request.path) is not None:
return None
if ip.USE_LEGACY_IP.is_enabled():
safest_ip_address = ip.get_legacy_ip(request)
if legacy_ip.USE_LEGACY_IP.is_enabled():
safest_ip_address = legacy_ip.get_legacy_ip(request)
all_ip_addresses = [safest_ip_address]
else:
safest_ip_address = ip.get_safest_client_ip(request)

View File

@@ -28,7 +28,7 @@ from common.djangoapps.student.roles import (
OrgStaffRole, OrgInstructorRole
)
from common.djangoapps.util.testing import UrlResetMixin
from openedx.core.djangoapps.util.ip import USE_LEGACY_IP
from openedx.core.djangoapps.util.legacy_ip import USE_LEGACY_IP
from ..models import (
RestrictedCourse, Country, CountryAccessRule,

View File

@@ -15,7 +15,7 @@ from xmodule.modulestore.tests.factories import CourseFactory
from common.djangoapps.student.tests.factories import UserFactory
from common.djangoapps.util.testing import UrlResetMixin
from openedx.core.djangoapps.util.ip import USE_LEGACY_IP
from openedx.core.djangoapps.util.legacy_ip import USE_LEGACY_IP
from openedx.core.djangolib.testing.utils import skip_unless_lms
from ..models import IPFilter, RestrictedCourse

View File

@@ -1,479 +0,0 @@
"""
Utilities for determining the IP address of a request.
Summary
=======
For developers:
- Call ``get_safest_client_ip`` whenever you want to know the caller's IP address
- Make sure ``init_client_ips`` is called as early as possible in the middleware stack
- See the "Guidance for developers" section for more advanced usage
For site operators:
- See the "Configuration" section for important information and guidance
For everyone:
- Background information is available in the "Concepts" section
Concepts
========
- The *IP chain* is the list of IPs in the ``X-Forwarded-For`` (XFF) header followed
by the ``REMOTE_ADDR`` value. If all involved parties are telling the truth, this
is the list of IP addresses that have relayed the HTTP request. However, due to
the possibility of spoofing, this raw data cannot be used directly for all
purposes:
- The rightmost IP in the chain is the IP that has directly connected with the
server and sent or relayed the request. In most deployments, this is likely
to be a reverse proxy such as nginx. In any case it is the "closest" IP (in
the sense of the request chain, not in terms of geographic proximity.)
- The next closest IP, if present, is the one that the closest IP *claims*
sent the request to it. Each IP in the chain can only vouch for the
correctness of the IP immediately to its left in the list.
- In a normal, unspoofed request, the leftmost IP is the "real" client IP, the
IP of the computer that made the original request.
- However, clients can send a fake XFF header, so the leftmost IP in the chain
cannot be trusted in the general case. In fact, the only IP that can be
trusted absolutely is the rightmost one.
- The challenge is to determine what the leftmost *trusted* IP is, as this is
the most accurate we can get without compromising on security.
- The *external chain* is some prefix of the IP chain that stops before the
(recognized) edge of the deployment's infrastructure. That is, the external
chain is the portion of the IP chain that is to the left of some trust
boundary, as determined by configuration or some fallback method. This is the
list of IPs that can all plausibly be considered the "real" IP of the client.
If the server is configured correctly this may contain, in order: Any IPs
spoofed by the client, the client's own IP, IPs of any forwarding HTTP proxies
specified by the client, and then IPs of any reverse HTTP proxies the
request passed through *before* reaching the deployment's own infrastructure
(CDN, load balancer, etc.)
- Caveat: In the case where the request is being sent through an anonymizing
proxy such as a VPN, the VPN's exit node IP is considered the "real" client
IP.
- Despite the name, this chain may contain private-range IP addresses, in
particular if a request originates from another server in the same
datacenter.
Guidance for developers
=======================
Almost anywhere you care about IP address, just call ``get_safest_client_ip``.
This will get you the *rightmost* IP of the external chain (defined above).
Because it cannot be easily spoofed by the caller, it is suitable for adversarial
use-cases such as:
- Rate-limiting
- Only allowing certain IPs to access a resource (or alternatively, blocking them)
In some less common situations where you need the entire external chain, you
can call ``get_all_client_ips`. This returns a list of IP addresses, although for
the great majority of normal requests this will be a list of length 1. This list is
appropriate for when you're recording IPs for manual review or need to make a
decision based on all of the IPs (no matter which one is the "real" one. This might
include:
- Audit logs
- Telling a user about other active sessions on their account
- Georestriction
In some very rare cases you might want just a single IP that isn't rightmost. In
some cases you might ask for the entire external chain and then take the leftmost
IP. This should only be used in non-adversarial situations, and is usually the wrong
choice, but may be appropriate for:
- Localization (if other HTTP headers aren't sufficient)
- Analytics
Configuration
=============
Configuration is via ``CLOSEST_CLIENT_IP_FROM_HEADERS``, which allows specifying
an HTTP header that will be trusted to report the rightmost IP in the external chain.
See setting annotation for details, but guidance on common configurations is provided
here:
- If you use a CDN as your outermost proxy:
- Find what header your CDN sends to its origin that indicates the remote address it
sees on inbound connections. For example, with Cloudflare this is ``CF-Connecting-IP``.
- Ensure that your CDN always overrides this header if it exists in the inbound request,
and never accepts a value provided by the client. Some CDNs are better than others
about this.
- Recommended setting, using Cloudflare as the example::
CLOSEST_CLIENT_IP_FROM_HEADERS:
- name: CF-Connecting-IP
index: 0
It would be equivalent to use ``-1`` as the index since there is always one and only
one IP in this header, and Python list indexing rules are used here.
- As a general rule, you should also ensure that traffic cannot bypass the CDN and reach
your origin directly, since otherwise attackers will be able to spoof their IP address
(and bypass protections your CDN provides). You may need to arrange for your CDN to set
a header containing a shared secret.
- If your outermost proxy is an AWS ELB or other proxy on the same local network as your
server, or you have any other configuration in which your proxies and application speak
to each other using private-range IP addresses:
- You can rely on the rightmost public IP in the IP chain to be the safest client IP.
To do this, set your configuration for zero trusted headers::
CLOSEST_CLIENT_IP_FROM_HEADERS: []
- This assumes that 1) your outermost proxy always appends to ``X-Forwarded-For``, and
2) any further proxies between that one and your application either append to it
(ideal) or pass it along unchanged (not ideal, but workable). This is true by default
for most proxy software.
- If you have any reverse proxy that will be seen by the next proxy or your application as
having a public IP:
- You'll need to rely on having a consistent *number* of proxies in front of your
application, and you'll need to know which ones append to the ``X-Forwarded-For``
header instead of just passing it unchanged.
- Once you know the number of your proxies in the chain that append, you can use this
count to say that the Nth-from-last IP in the ``X-Forwarded-For`` is the closest client
IP. For example, if you had two, you would use ``-2`` (note the negative sign) to
indicate the second-from-last IP::
CLOSEST_CLIENT_IP_FROM_HEADERS:
- name: X-Forwarded-For
index: -2
- This is fragile in the face of network configuration changes, so having your outermost
proxy set a special header is preferred.
- Configuring the proxy count too low will result in rate-limiting your own proxies;
configuring it too high will allow attackers to bypass rate-limiting.
- Side note: Even if you don't use it for ``CLOSEST_CLIENT_IP_FROM_HEADERS``, this
proxy-counting approach will be required for configuring django-rest-framework's
``NUM_PROXIES`` setting.
- If your application is directly exposed to the public internet, without even a local proxy:
- This is an unusual configuration, but simple to configure; with no proxies, just indicate
that there are no trusted headers and therefore the closest public IP should be used::
CLOSEST_CLIENT_IP_FROM_HEADERS: []
"""
import ipaddress
import warnings
from django.conf import settings
from edx_toggles.toggles import WaffleSwitch
# .. toggle_name: ip.legacy
# .. toggle_implementation: WaffleSwitch
# .. toggle_default: False
# .. toggle_description: Emergency switch to revert to use the older, less secure method for
# IP determination. When enabled, instructs switch's callers to revert to using the *leftmost*
# IP from the X-Forwarded-For header. When disabled (the default), callers should use the new
# code path for IP determination, which has callers retrieve the entire external chain or pick
# the leftmost or rightmost IP from it. The construction of the external chain is configurable
# via ``CLOSEST_CLIENT_IP_FROM_HEADERS``.
# This toggle, as well as any other legacy IP references, should be deleted (in the off
# position) when the new IP code is well-tested and all IP-reliant code has been switched over.
# .. toggle_warning: This switch does not control the behavior of this module. Callers must
# opt into querying this switch, and can call ``get_legacy_ip`` if the switch is enabled.
# .. toggle_use_cases: temporary
# .. toggle_creation_date: 2022-03-24
# .. toggle_target_removal_date: 2022-07-01
# .. toggle_tickets: https://openedx.atlassian.net/browse/ARCHBOM-2056 (internal only)
USE_LEGACY_IP = WaffleSwitch('ip.legacy', module_name=__name__)
def get_legacy_ip(request):
"""
Return a client IP selected using an old, insecure method.
Always picks the leftmost IP in the X-Forwarded-For header, if present,
otherwise returns the original REMOTE_ADDR.
"""
if xff := request.META.get('HTTP_X_FORWARDED_FOR'):
return xff.split(',')[0].strip()
else:
# Might run before or after XForwardedForMiddleware.
return request.META.get('ORIGINAL_REMOTE_ADDR', request.META['REMOTE_ADDR'])
def _get_meta_ip_strs(request, header_name):
"""
Get a list of IPs from a header in the given request.
Return the list of IPs the request is carrying on this header, which is
expected to be comma-delimited if it contains more than one. Response
may be an empty list for missing or empty header. List items may not be
valid IPs.
"""
if not header_name:
return []
field_name = 'HTTP_' + header_name.replace('-', '_').upper()
header_value = request.META.get(field_name, '').strip()
if header_value:
return [s.strip() for s in header_value.split(',')]
else:
return []
def get_raw_ip_chain(request):
"""
Retrieve the full IP chain from this request, as list of raw strings.
This is uninterpreted and unparsed, except for splitting on commas and
removing extraneous whitespace.
"""
return _get_meta_ip_strs(request, 'X-Forwarded-For') + [request.META['REMOTE_ADDR']]
def _get_usable_ip_chain(request):
"""
Retrieve the full IP chain from this request, as parsed addresses.
The IP chain is the X-Forwarded-For header, followed by the REMOTE_ADDR.
This list is then narrowed to the largest suffix that can be parsed as
IP addresses.
"""
parsed = []
for ip_str in reversed(get_raw_ip_chain(request)):
try:
parsed.append(ipaddress.ip_address(ip_str))
except ValueError:
break
return list(reversed(parsed))
def _remove_tail(elements, f_discard):
"""
Remove items from the tail of the given list until f_discard returns false.
- elements is a list
- f_discard is a function that accepts an item from the list and returns
true if it should be discarded from the tail
Returns a new list that is a possibly-empty prefix of the input list.
(This is basically itertools.dropwhile on a reversed list.)
"""
prefix = elements[:]
while prefix and f_discard(prefix[-1]):
prefix.pop()
return prefix
def _get_client_ips_via_xff(request):
"""
Get the external chain of the request by discarding private IPs.
This is a strategy used by ``get_all_client_ips`` and should not be used
directly.
Returns a list of *parsed* IP addresses, one of:
- A list ending in a publicly routable IP
- A list with a single, private-range IP
- An empty list, if REMOTE_ADDR was unparseable as an IP address. This
would be very unusual but could possibly happen if a local reverse proxy
used a domain socket rather than a TCP connection.
"""
ip_chain = _get_usable_ip_chain(request)
external_chain = _remove_tail(ip_chain, lambda ip: not ip.is_global)
# If the external_chain is in fact all private, everything will have been
# removed. In that case, just return the leftmost IP it would have
# considered, even though it must be a private IP.
return external_chain or ip_chain[:1]
# .. setting_name: CLOSEST_CLIENT_IP_FROM_HEADERS
# .. setting_default: []
# .. setting_description: A list of header/index pairs to use for determining the IP in the
# IP chain that is just outside of this deployment's infrastructure boundary -- that is,
# the rightmost address in the IP chain that is *not* owned by the deployment. (See module
# docstring for background and definitions, as well as guidance on configuration.)
# Each list entry is a dict containing a header name and an index into that header. This will
# control how the client's IP addresses are determined for attribution, tracking, rate-limiting,
# or other general-purpose needs.
# The named header must contain a list of IP addresses separated by commas, with whitespace
# tolerated around each address. The index is used for a Python list lookup, e.g. 0 is the first
# element and -2 is the second from the end.
# Header/index pairs will be tried in turn until the first one that yields a usable IP, which
# will then be used to determine the end of the external chain.
# If the setting is an empty list, or if none of the entries yields a usable IP (header is
# missing, index out of range, IP not in IP chain), then a fallback strategy will be used
# instead: Private-range IPs will be discarded from the right of the IP chain until a public
# IP is found, or the chain shrinks to one IP. This entry will then be considered the rightmost
# end of the external chain.
# Migrations from one network configuration to another may be accomplished by first adding the
# new header to the list, making the networking change, and then removing the old one.
# .. setting_warnings: Changes to the networking configuration that are not coordinated with
# this setting may allow callers to spoof their IP address.
def _get_trusted_header_ip(request, header_name, index):
"""
Read a parsed IP address from a header at the specified position.
Helper function for ``_get_client_ips_via_trusted_header``.
Returns None if header is missing, index is out of range, or the located
entry can't be parsed as an IP address.
"""
ip_strs = _get_meta_ip_strs(request, header_name)
if not ip_strs:
warnings.warn(f"Configured IP address header was missing: {header_name!r}", UserWarning)
return None
try:
trusted_ip_str = ip_strs[index]
except IndexError:
warnings.warn(
"Configured index into IP address header is out of range: "
f"{header_name!r}:{index!r} "
f"(actual length {len(ip_strs)})",
UserWarning
)
return None
try:
return ipaddress.ip_address(trusted_ip_str)
except ValueError:
warnings.warn(
"Configured trusted IP address header contained invalid IP: "
f"{header_name!r}:{index!r}",
UserWarning
)
def _get_client_ips_via_trusted_header(request):
"""
Get the external chain by reading the trust boundary from a header.
This is a strategy used by ``get_all_client_ips`` and should not be used
directly. It does not implement any fallback in case of misconfiguration.
Uses ``CLOSEST_CLIENT_IP_FROM_HEADERS`` to identify the IP just outside of
the deployment's infrastructure boundary, and uses the rightmost position
of this to determine where the external chain stops. See setting docs for
more details.
Returns one of the following:
- A non-empty list of *parsed* IP addresses, where the rightmost IP is the
same as the one identified in the trusted header.
- None if no headers configured or all headers are unusable.
A configured header can be unusable if it's missing from the request, the
index is out of range, the indicated entry in the header can't be parsed
as an IP address, or the IP in the header can't be found in the IP chain.
"""
header_entries = getattr(settings, 'CLOSEST_CLIENT_IP_FROM_HEADERS', [])
full_chain = _get_usable_ip_chain(request)
external_chain = []
for entry in header_entries:
header_name = entry['name']
index = entry['index']
if closest_client_ip := _get_trusted_header_ip(request, header_name, index):
# The equality check in this predicate is why we use parsed IP
# addresses -- ::1 should compare as equal to 0:0:0:0:0:0:0:1.
external_chain = _remove_tail(full_chain, lambda ip: ip != closest_client_ip) # pylint: disable=cell-var-from-loop
if external_chain:
break
else:
warnings.warn(
f"Ignoring trusted header IP {header_name!r}:{index!r} "
"because it was not found in the actual IP chain.",
UserWarning
)
return external_chain
def _compute_client_ips(request):
"""
Get the request's external chain, a non-empty list of IP address strings.
Warning: should only be called once and cached by ``init_client_ips``.
Prefer to use ``get_all_client_ips`` to retrieve the value stored on the
request, unless you are sure that later middleware has not modified
the REMOTE_ADDR in-place.
This function will attempt several strategies to determine the external chain:
- If ``CLOSEST_CLIENT_IP_FROM_HEADERS`` is configured and usable, it will be
used to determine the rightmost end of the external chain (by reading a
trusted HTTP header).
- If that does not yield a result, fall back to assuming that the rightmost
public IP address in the IP chain is the end of the external chain. (For an
in-datacenter HTTP request, may instead yield a list with a private IP.)
"""
# In practice the fallback to REMOTE_ADDR should never happen, since that
# would require that value to be present and malformed but with no XFF
# present.
ips = _get_client_ips_via_trusted_header(request) \
or _get_client_ips_via_xff(request) \
or [request.META['REMOTE_ADDR']]
return [str(ip) for ip in ips]
def init_client_ips(request):
"""
Compute the request's external chain and store it in the request.
This should be called early in the middleware stack in order to avoid
being called after another middleware that overwrites ``REMOTE_ADDR``,
which is a pattern some apps use.
If called multiple times or if ``CLIENT_IPS`` is already present in
``request.META``, will just warn.
"""
if 'CLIENT_IPS' in request.META:
warnings.warn("init_client_ips refusing to overwrite existing CLIENT_IPS")
else:
request.META['CLIENT_IPS'] = _compute_client_ips(request)
def get_all_client_ips(request):
"""
Get the request's external chain, a non-empty list of IP address strings.
Most consumers of IP addresses should just use ``get_safest_client_ip``.
Calls ``init_client_ips`` if needed.
"""
if 'CLIENT_IPS' not in request.META:
init_client_ips(request)
return request.META['CLIENT_IPS']
def get_safest_client_ip(request):
"""
Get the safest choice of client IP.
Returns a single string containing the IP address that most likely
represents the originator of the HTTP call, without compromising on
safety.
This is always the rightmost value in the external IP chain that
is returned by ``get_all_client_ips``. See module docstring for
more details.
"""
return get_all_client_ips(request)[-1]

View File

@@ -0,0 +1,48 @@
"""
Utilities for migrating to safer IP address determination.
This module used to contain utilities for reading the IP addresses of
a request, but those have since moved ``edx_django_utils.ip``.
What remains are the "legacy IP" utils, which should be used only
temporarily when switching a piece of code from using the leftmost IP
(legacy IP) to using the safest IP or full public IP chain (using
edx-django-utils).
"""
from edx_toggles.toggles import WaffleSwitch
# .. toggle_name: ip.legacy
# .. toggle_implementation: WaffleSwitch
# .. toggle_default: False
# .. toggle_description: Emergency switch to revert to use an older, less secure method for
# IP determination (instead of the newer, safer code in ``edx_django_utils.ip``).
# When enabled, instructs switch's callers to revert to using the *leftmost*
# IP from the X-Forwarded-For header. When disabled (the default), callers should use the new
# code path for IP determination, which has callers retrieve the entire external chain or pick
# the leftmost or rightmost IP from it. The construction of the external chain is configurable
# via ``CLOSEST_CLIENT_IP_FROM_HEADERS``.
# This toggle, as well as any other legacy IP references, should be deleted (in the off
# position) when the new IP code is well-tested and all IP-reliant code has been switched over
# to using ``edx_django_utils.ip``.
# .. toggle_warning: This switch does not globally control handling of IP addresses; it only
# affects code that is explicitly querying the switch and using ``get_legacy_ip``.
# .. toggle_use_cases: temporary
# .. toggle_creation_date: 2022-03-24
# .. toggle_target_removal_date: 2023-01-01
# .. toggle_tickets: https://2u-internal.atlassian.net/browse/ARCHBOM-2056 (internal only)
USE_LEGACY_IP = WaffleSwitch('ip.legacy', module_name=__name__)
def get_legacy_ip(request):
"""
Return a client IP selected using an old, insecure method.
Always picks the leftmost IP in the X-Forwarded-For header, if present,
otherwise returns the original REMOTE_ADDR.
"""
if xff := request.META.get('HTTP_X_FORWARDED_FOR'):
return xff.split(',')[0].strip()
else:
# Might run before or after XForwardedForMiddleware.
return request.META.get('ORIGINAL_REMOTE_ADDR', request.META['REMOTE_ADDR'])

View File

@@ -3,7 +3,9 @@ Code to get ip from request.
"""
from uuid import uuid4
from openedx.core.djangoapps.util import ip
from edx_django_utils import ip
from openedx.core.djangoapps.util import legacy_ip
def real_ip(group, request): # pylint: disable=unused-argument
@@ -15,8 +17,8 @@ def real_ip(group, request): # pylint: disable=unused-argument
(Intended to be called by ``django-ratelimit``, hence the unused argument.)
"""
if ip.USE_LEGACY_IP.is_enabled():
return ip.get_legacy_ip(request)
if legacy_ip.USE_LEGACY_IP.is_enabled():
return legacy_ip.get_legacy_ip(request)
else:
return ip.get_safest_client_ip(request)

View File

@@ -1,355 +0,0 @@
"""
Tests for IP determination.
Fake data used in these tests, for consistency:
- 1.2.3.4 -- a "real" client IP, e.g. the IP of a laptop or phone.
- 127.0.0.2 -- a local reverse proxy (e.g. nginx or caddy)
- 10.0.3.0 -- our load balancer
- 5.5.5.5 -- our CDN
- 6.6.6.6 -- a malicious CDN configuration
- 7.8.9.0 -- something beyond the real client in the IP chain, probably a spoofed header
...as well as IPv6 versions of these, e.g. 1:2:3:4:: and ::1.
XXXXXXXXX is used as a standin for anything unparseable (some kind of garbage).
"""
import warnings
from contextlib import contextmanager
import ddt
from django.test import TestCase
from django.test.client import RequestFactory
from django.test.utils import override_settings
import openedx.core.djangoapps.util.ip as ip
from openedx.core.lib.x_forwarded_for.middleware import XForwardedForMiddleware
@contextmanager
def warning_messages():
"""
Context manager which produces a list of warning messages as the context
value (only populated after block ends).
"""
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always')
messages = []
yield messages
# Converted to message strings for easier debugging
messages.extend(str(w.message) for w in caught_warnings)
@ddt.ddt
class TestClientIP(TestCase):
"""Tests for get_client_ip and helpers."""
def setUp(self):
super().setUp()
self.request = RequestFactory().get('/somewhere')
@ddt.unpack
@ddt.data(
(
{'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4, 10.0.3.0', 'REMOTE_ADDR': '0:0:0:0::1'},
'7.8.9.0',
),
# XFF is not required
({'REMOTE_ADDR': '127.0.0.2'}, '127.0.0.2'),
)
def test_get_legacy_ip(self, request_meta, expected):
self.request.META = request_meta
assert ip.get_legacy_ip(self.request) == expected
# Check that it still works after the XFF middleware has done its dirty work
XForwardedForMiddleware().process_request(self.request)
assert ip.get_legacy_ip(self.request) == expected
@ddt.unpack
@ddt.data(
({}, 'Some-Thing', []),
({'HTTP_SOME_THING': 'stuff'}, 'Some-Thing', ['stuff']),
({'HTTP_SOME_THING': 'stuff'}, 'Some-Thing', ['stuff']),
({'HTTP_SOME_THING': ' so,much , stuff '}, 'Some-Thing', ['so', 'much', 'stuff']),
)
def test_get_meta_ip_strs(self, add_meta, header_name, expected):
self.request.META.update(add_meta)
assert ip._get_meta_ip_strs(self.request, header_name) == expected # pylint: disable=protected-access
@ddt.unpack
@ddt.data(
# Form the IP chain and parse it (notice the IPv6 is canonicalized)
(
{'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4, 10.0.3.0', 'REMOTE_ADDR': '0:0:0:0::1'},
['7.8.9.0', '1.2.3.4', '10.0.3.0', '::1'],
),
# XFF is not required
({'REMOTE_ADDR': '127.0.0.2'}, ['127.0.0.2']),
# Strips off junk and anything to the left of it
(
{
'HTTP_X_FORWARDED_FOR': '6.6.6.6, XXXXXXXXX, 7.8.9.0, XXXXXXXXX, 1.2.3.4',
'REMOTE_ADDR': '10.0.3.0'
},
['1.2.3.4', '10.0.3.0'],
),
)
def test_get_usable_ip_chain(self, request_meta, expected_strs):
self.request.META = request_meta
actual_ips = ip._get_usable_ip_chain(self.request) # pylint: disable=protected-access
assert [str(ip) for ip in actual_ips] == expected_strs
@ddt.unpack
@ddt.data(
# Empty returns empty
([], lambda x: False, []),
([], lambda x: True, []),
# Can return whole list or remove all of it
([1, 2, 3, 4], lambda x: False, [1, 2, 3, 4]),
([1, 2, 3, 4], lambda x: True, []),
# Walks from right, stops on first false, keeps that element
([2, 0, 1, 0, 3, 5], lambda x: x != 0, [2, 0, 1, 0]),
)
def test_remove_tail(self, elements, f_discard, expected):
assert ip._remove_tail(elements, f_discard) == expected # pylint: disable=protected-access
@ddt.unpack
@ddt.data(
# Walk from right, dropping private-range IPs
('7:8:9:0::, 1.2.3.4, 10.0.3.0', '::1', ['7:8:9::', '1.2.3.4']),
('7.8.9.0', '1.2.3.4', ['7.8.9.0', '1.2.3.4']),
# Publicly exposed server (no XFF added by proxies)
(None, '1.2.3.4', ['1.2.3.4']),
# If XFF is missing, just accept a private IP (maybe an inter-service call
# from another server in the same datacenter)
(None, '127.0.0.2', ['127.0.0.2']),
# If we reach a public IP, don't worry about junk farther on
('XXXXXXXXX, 1:2:3:4::', '10.0.0.1', ['1:2:3:4::']),
# If we find junk or we run out of IPs before finding a public
# one, the best we can do is a private IP. (Should never
# happen for a public IP, but here for completeness.)
('7.8.9.0, XXXXXXXXX, 10.0.3.0', '127.0.0.2', ['10.0.3.0']),
(None, '::1', ['::1']),
# Nothing usable (again, should never happen)
(None, '', []),
(None, 'XXXXXXXXX', []),
('1.2.3.4', 'XXXXXXXXX', []),
)
def test_get_client_ips_via_xff(self, xff, remote_addr, expected_strs):
request_meta = {'REMOTE_ADDR': remote_addr, 'HTTP_X_FORWARDED_FOR': xff}
request_meta = {k: v for k, v in request_meta.items() if v is not None}
self.request.META = request_meta
assert [str(ip) for ip in ip._get_client_ips_via_xff(self.request)] == expected_strs # pylint: disable=protected-access
@ddt.unpack
@ddt.data(
# Happy path
('Some-Thing', 0, {'HTTP_SOME_THING': '1.2.3.4'}, '1.2.3.4', None),
('some-thing', -1, {'HTTP_SOME_THING': '1:2:3:4::, 0:0::1'}, '::1', None),
# Warning: Header present, index out of range
('Some-Thing', 1, {'HTTP_SOME_THING': '1.2.3.4'}, None, "out of range"),
('Some-Thing', -2, {'HTTP_SOME_THING': '1.2.3.4'}, None, "out of range"),
# Warning: Header missing entirely
('Some-Thing', 0, {}, None, "missing"),
# Warning: Bad IP address
('Some-Thing', 0, {'HTTP_SOME_THING': 'XXXXXXXXX'}, None, "invalid IP"),
)
def test_get_trusted_header_ip(self, header_name, index, add_meta, expected, warning_substr):
self.request.META.update(add_meta)
with warning_messages() as caught_warnings:
actual = ip._get_trusted_header_ip(self.request, header_name, index) # pylint: disable=protected-access
if expected is None:
assert actual is None
else:
assert str(actual) == expected # Stringify again for comparison
if warning_substr is None:
assert len(caught_warnings) == 0
else:
assert len(caught_warnings) == 1
assert warning_substr in caught_warnings[0]
@ddt.unpack
@ddt.data(
# Most common case: One header in config, and header does exist
(
[{'name': 'CF-Connecting-IP', 'index': 0}],
{
'HTTP_X_FORWARDED_FOR': '1.2.3.4',
'REMOTE_ADDR': '10.0.3.0',
'HTTP_CF_CONNECTING_IP': '1.2.3.4',
},
['1.2.3.4'],
0,
),
# More complicated version with intervening proxies and a spoofed IP
(
[{'name': 'CF-Connecting-IP', 'index': 0}],
{
'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4, 5.5.5.5, 10.0.3.0',
'REMOTE_ADDR': '127.0.0.2',
'HTTP_CF_CONNECTING_IP': ' 1.2.3.4 ', # tests that whitespace is stripped
},
['7.8.9.0', '1.2.3.4'],
0,
),
# Uses *rightmost* position of identified IP if multiple are present
# (prevent spoofing when client calling through a trustworthy proxy)
(
[{'name': 'CF-Connecting-IP', 'index': 0}],
{
'HTTP_X_FORWARDED_FOR': '6.6.6.6, 1.2.3.4, 7.8.9.0, 1.2.3.4, 5.5.5.5',
'REMOTE_ADDR': '10.0.3.0',
'HTTP_CF_CONNECTING_IP': '1.2.3.4',
},
['6.6.6.6', '1.2.3.4', '7.8.9.0', '1.2.3.4'],
0,
),
# No config? Empty list.
([], {'REMOTE_ADDR': '1.2.3.4'}, [], 0),
(None, {'REMOTE_ADDR': '1.2.3.4'}, [], 0),
# One lookup failure (a warning) before finding a usable header
(
[
{'name': 'X-Real-IP', 'index': 0},
{'name': 'CF-Connecting-IP', 'index': 0},
],
{
'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4, 5.5.5.5',
'REMOTE_ADDR': '10.0.3.0',
'HTTP_CF_CONNECTING_IP': '1.2.3.4',
},
['7.8.9.0', '1.2.3.4'],
1,
),
# Configured, but none of the headers are present
(
[
{'name': 'CF-Connecting-IP', 'index': 0},
{'name': 'X-Forwarded-For', 'index': -2},
],
{'REMOTE_ADDR': '1.2.3.4'},
[],
2,
),
# Can configure to use far end of XFF if needed for some reason
(
[
{'name': 'X-Forwarded-For', 'index': 0},
],
{
'HTTP_X_FORWARDED_FOR': '1.2.3.4, 5.5.5.5',
'REMOTE_ADDR': '10.0.3.0',
},
['1.2.3.4'],
0,
),
)
def test_get_client_ips_via_trusted_header(self, cnf_headers, add_meta, expected, warning_count):
self.request.META.update(add_meta)
if cnf_headers is None:
overrides = {}
else:
overrides = {'CLOSEST_CLIENT_IP_FROM_HEADERS': cnf_headers}
with override_settings(**overrides):
with warning_messages() as caught_warnings:
actual = ip._get_client_ips_via_trusted_header(self.request) # pylint: disable=protected-access
assert [str(ip) for ip in actual] == expected
assert len(caught_warnings) == warning_count
@ddt.unpack
@ddt.data(
# Using headers setting
(
[{'name': 'CF-Connecting-IP', 'index': 0}],
{
'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4, 5.5.5.5, 10.0.3.0',
'HTTP_CF_CONNECTING_IP': '1.2.3.4',
'REMOTE_ADDR': '127.0.0.2',
},
['7.8.9.0', '1.2.3.4'],
0,
),
# Fall back when all override headers are unusable, with warnings
(
[
{'name': 'CF-Connecting-IP', 'index': 0}, # not actually passed in this request
{'name': 'X-Real-IP', 'index': 2}, # index out of range, so unusable
],
{
'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4, 10.0.3.0, 127.0.0.2',
'HTTP_X_REAL_IP': '10.0.3.0',
'REMOTE_ADDR': '127.0.0.2',
},
['7.8.9.0', '1.2.3.4'],
2,
),
# By default, with the least possible information, do something useful.
# (And no warnings when no override headers are specified.)
([], {'REMOTE_ADDR': '127.0.0.2'}, ['127.0.0.2'], 0),
)
def test_compute_client_ips(self, cnf_headers, add_meta, expected, expect_warnings):
"""
Just a few tests to confirm that the correct branch is taken, basically.
"""
if cnf_headers is None:
overrides = {}
else:
overrides = {'CLOSEST_CLIENT_IP_FROM_HEADERS': cnf_headers}
self.request.META.update(add_meta)
with override_settings(**overrides):
with warning_messages() as caught_warnings:
actual = ip._compute_client_ips(self.request) # pylint: disable=protected-access
assert actual == expected
assert len(caught_warnings) == expect_warnings
def test_get_all_client_ips(self):
"""
Test getter for all IPs.
"""
self.request.META.update({
'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4',
'REMOTE_ADDR': '127.0.0.2',
})
assert ip.get_all_client_ips(self.request) == ['7.8.9.0', '1.2.3.4']
def test_get_safest_client_ip(self):
"""
Test convenience wrapper for rightmost IP.
"""
self.request.META.update({
'HTTP_X_FORWARDED_FOR': '7.8.9.0',
'REMOTE_ADDR': '1.2.3.4',
})
assert ip.get_safest_client_ip(self.request) == '1.2.3.4'

View File

@@ -0,0 +1,50 @@
"""
Tests for IP determination.
Fake data used in these tests, for consistency:
- 1.2.3.4 -- a "real" client IP, e.g. the IP of a laptop or phone.
- 127.0.0.2 -- a local reverse proxy (e.g. nginx or caddy)
- 10.0.3.0 -- our load balancer
- 5.5.5.5 -- our CDN
- 6.6.6.6 -- a malicious CDN configuration
- 7.8.9.0 -- something beyond the real client in the IP chain, probably a spoofed header
...as well as IPv6 versions of these, e.g. 1:2:3:4:: and ::1.
XXXXXXXXX is used as a standin for anything unparseable (some kind of garbage).
"""
import ddt
from django.test import TestCase
from django.test.client import RequestFactory
from openedx.core.djangoapps.util import legacy_ip
from openedx.core.lib.x_forwarded_for.middleware import XForwardedForMiddleware
@ddt.ddt
class TestClientIP(TestCase):
"""Tests for get_client_ip and helpers."""
def setUp(self):
super().setUp()
self.request = RequestFactory().get('/somewhere')
@ddt.unpack
@ddt.data(
(
{'HTTP_X_FORWARDED_FOR': '7.8.9.0, 1.2.3.4, 10.0.3.0', 'REMOTE_ADDR': '0:0:0:0::1'},
'7.8.9.0',
),
# XFF is not required
({'REMOTE_ADDR': '127.0.0.2'}, '127.0.0.2'),
)
def test_get_legacy_ip(self, request_meta, expected):
self.request.META = request_meta
assert legacy_ip.get_legacy_ip(self.request) == expected
# Check that it still works after the XFF middleware has done its dirty work
XForwardedForMiddleware().process_request(self.request)
assert legacy_ip.get_legacy_ip(self.request) == expected

View File

@@ -8,6 +8,7 @@ from django.test.client import RequestFactory
from edx_toggles.toggles.testutils import override_waffle_switch
import openedx.core.djangoapps.util.ratelimit as ratelimit
from openedx.core.djangoapps.util.legacy_ip import USE_LEGACY_IP
from openedx.core.lib.x_forwarded_for.middleware import XForwardedForMiddleware
@@ -35,11 +36,11 @@ class TestRateLimiting(TestCase):
XForwardedForMiddleware().process_request(self.request)
assert ratelimit.real_ip(None, self.request) == '1.2.3.4'
@override_waffle_switch(ratelimit.ip.USE_LEGACY_IP, True)
@override_waffle_switch(USE_LEGACY_IP, True)
def test_legacy_switch(self):
assert ratelimit.real_ip(None, self.request) == '7.8.9.0'
@override_waffle_switch(ratelimit.ip.USE_LEGACY_IP, True)
@override_waffle_switch(USE_LEGACY_IP, True)
def test_legacy_switch_after_xff_middleware(self):
"""
Again, but with XFF Middleware running first.

View File

@@ -5,9 +5,10 @@ import ipaddress
import warnings
from django.utils.deprecation import MiddlewareMixin
from edx_django_utils import ip
from edx_django_utils.monitoring import set_custom_attribute
import openedx.core.djangoapps.util.ip as ip
from openedx.core.djangoapps.util import legacy_ip
def _ip_type(ip_str):
@@ -60,7 +61,7 @@ class XForwardedForMiddleware(MiddlewareMixin):
set_custom_attribute('ip_chain.count', len(ip_chain))
set_custom_attribute('ip_chain.types', '-'.join(_ip_type(s) for s in ip_chain))
set_custom_attribute('ip_chain.use_legacy', ip.USE_LEGACY_IP.is_enabled())
set_custom_attribute('ip_chain.use_legacy', legacy_ip.USE_LEGACY_IP.is_enabled())
external_chain = ip.get_all_client_ips(request)
set_custom_attribute('ip_chain.external.count', len(external_chain))
@@ -103,7 +104,7 @@ class XForwardedForMiddleware(MiddlewareMixin):
# without knowing about ORIGINAL_REMOTE_ADDR. (The less code that
# is aware of that, the better, and the ip code should be lifted
# out into a library anyhow.)
if ip.USE_LEGACY_IP.is_enabled():
request.META['REMOTE_ADDR'] = ip.get_legacy_ip(request)
if legacy_ip.USE_LEGACY_IP.is_enabled():
request.META['REMOTE_ADDR'] = legacy_ip.get_legacy_ip(request)
else:
request.META['REMOTE_ADDR'] = ip.get_safest_client_ip(request)

View File

@@ -81,7 +81,8 @@ edx-celeryutils
edx-completion
edx-django-release-util # Release utils for the edx release pipeline
edx-django-sites-extensions
edx-django-utils>=5.0.0 # Utilities for cache, monitoring, and plugins
# Need IP address code that was moved to edx-django-utils 5.1.0
edx-django-utils>=5.1.0 # Utilities for cache, monitoring, and plugins
edx-drf-extensions
edx-enterprise
# 0.6.2 introduces topic prefixing

View File

@@ -453,7 +453,7 @@ edx-django-release-util==1.2.0
# blockstore
edx-django-sites-extensions==4.0.0
# via -r requirements/edx/base.in
edx-django-utils==5.0.1
edx-django-utils==5.1.0
# via
# -r requirements/edx/base.in
# blockstore

View File

@@ -567,7 +567,7 @@ edx-django-release-util==1.2.0
# blockstore
edx-django-sites-extensions==4.0.0
# via -r requirements/edx/testing.txt
edx-django-utils==5.0.1
edx-django-utils==5.1.0
# via
# -r requirements/edx/testing.txt
# blockstore

View File

@@ -547,7 +547,7 @@ edx-django-release-util==1.2.0
# blockstore
edx-django-sites-extensions==4.0.0
# via -r requirements/edx/base.txt
edx-django-utils==5.0.1
edx-django-utils==5.1.0
# via
# -r requirements/edx/base.txt
# blockstore