Handle next and course_id in /login_ajax

Currently, the /login_ajax endpoint does not regard
any `next` or `course_id` parameters. This commit changes
that, sharing the logic that /login (which the current
templated login page uses) employs to cacluate
a redirect-after-login URL based on `next` and `course_id`.

The new functionality is behind ENABLE_LOGIN_MICROFRONTEND.
This commit is contained in:
Kyle McCormick
2020-02-12 13:33:31 -05:00
committed by Kyle McCormick
parent 9d81f2a320
commit 234eedd8c6
7 changed files with 215 additions and 52 deletions

View File

@@ -6,10 +6,10 @@ Helpers for the student app.
import json
import logging
import mimetypes
import urllib.parse
from collections import OrderedDict
from datetime import datetime
import six.moves.urllib.parse
from completion.exceptions import UnavailableCompletionData
from completion.utilities import get_key_to_last_completed_block
from django.conf import settings
@@ -234,23 +234,31 @@ def get_next_url_for_login_page(request):
/account/finish_auth/ view following login, which will take care of auto-enrollment in
the specified course.
Otherwise, we go to the ?next= query param or to the dashboard if nothing else is
Otherwise, we go to the `next` param or to the dashboard if nothing else is
specified.
If THIRD_PARTY_AUTH_HINT is set, then `tpa_hint=<hint>` is added as a query parameter.
This works with both GET and POST requests.
"""
redirect_to = _get_redirect_to(request)
request_params = request.GET if request.method == 'GET' else request.POST
redirect_to = _get_redirect_to(
request_host=request.get_host(),
request_headers=request.META,
request_params=request_params,
request_is_https=request.is_secure(),
)
if not redirect_to:
try:
redirect_to = reverse('dashboard')
except NoReverseMatch:
redirect_to = reverse('home')
if any(param in request.GET for param in POST_AUTH_PARAMS):
if any(param in request_params for param in POST_AUTH_PARAMS):
# Before we redirect to next/dashboard, we need to handle auto-enrollment:
params = [(param, request.GET[param]) for param in POST_AUTH_PARAMS if param in request.GET]
params = [(param, request_params[param]) for param in POST_AUTH_PARAMS if param in request_params]
params.append(('next', redirect_to)) # After auto-enrollment, user will be sent to payment page or to this URL
redirect_to = '{}?{}'.format(reverse('finish_auth'), six.moves.urllib.parse.urlencode(params))
redirect_to = '{}?{}'.format(reverse('finish_auth'), urllib.parse.urlencode(params))
# Note: if we are resuming a third party auth pipeline, then the next URL will already
# be saved in the session as part of the pipeline state. That URL will take priority
# over this one.
@@ -264,26 +272,35 @@ def get_next_url_for_login_page(request):
# Don't add tpa_hint if we're already in the TPA pipeline (prevent infinite loop),
# and don't overwrite any existing tpa_hint params (allow tpa_hint override).
running_pipeline = third_party_auth.pipeline.get(request)
(scheme, netloc, path, query, fragment) = list(six.moves.urllib.parse.urlsplit(redirect_to))
(scheme, netloc, path, query, fragment) = list(urllib.parse.urlsplit(redirect_to))
if not running_pipeline and 'tpa_hint' not in query:
params = six.moves.urllib.parse.parse_qs(query)
params = urllib.parse.parse_qs(query)
params['tpa_hint'] = [tpa_hint]
query = six.moves.urllib.parse.urlencode(params, doseq=True)
redirect_to = six.moves.urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
query = urllib.parse.urlencode(params, doseq=True)
redirect_to = urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
return redirect_to
def _get_redirect_to(request):
def _get_redirect_to(request_host, request_headers, request_params, request_is_https):
"""
Determine the redirect url and return if safe
:argument
request: request object
:returns: redirect url if safe else None
Arguments:
request_host (str)
request_headers (dict)
request_params (QueryDict)
request_is_https (bool)
Returns: str
redirect url if safe else None
"""
redirect_to = request.GET.get('next')
header_accept = request.META.get('HTTP_ACCEPT', '')
redirect_to = request_params.get('next')
header_accept = request_headers.get('HTTP_ACCEPT', '')
accepts_text_html = any(
mime_type in header_accept
for mime_type in {'*/*', 'text/*', 'text/html'}
)
# If we get a redirect parameter, make sure it's safe i.e. not redirecting outside our domain.
# Also make sure that it is not redirecting to a static asset and redirected page is web page
@@ -291,19 +308,25 @@ def _get_redirect_to(request):
# get information about a user on edx.org. In any such case drop the parameter.
if redirect_to:
mime_type, _ = mimetypes.guess_type(redirect_to, strict=False)
if not is_safe_login_or_logout_redirect(request, redirect_to):
safe_redirect = is_safe_login_or_logout_redirect(
redirect_to=redirect_to,
request_host=request_host,
dot_client_id=request_params.get('client_id'),
require_https=request_is_https,
)
if not safe_redirect:
log.warning(
u"Unsafe redirect parameter detected after login page: '%(redirect_to)s'",
{"redirect_to": redirect_to}
)
redirect_to = None
elif 'text/html' not in header_accept:
elif not accepts_text_html:
log.info(
u"Redirect to non html content '%(content_type)s' detected from '%(user_agent)s'"
u" after login page: '%(redirect_to)s'",
{
"redirect_to": redirect_to, "content_type": header_accept,
"user_agent": request.META.get('HTTP_USER_AGENT', '')
"user_agent": request_headers.get('HTTP_USER_AGENT', '')
}
)
redirect_to = None
@@ -321,7 +344,7 @@ def _get_redirect_to(request):
redirect_to = None
else:
themes = get_themes()
next_path = six.moves.urllib.parse.urlparse(redirect_to).path
next_path = urllib.parse.urlparse(redirect_to).path
for theme in themes:
if theme.theme_dir_name in next_path:
log.warning(
@@ -539,7 +562,11 @@ def _cert_info(user, course_overview, cert_status):
# We can add a log.warning here once we think it shouldn't happen.
return default_info
grades_input = [cert_grade_percent, persisted_grade_percent]
max_grade = None if all(grade is None for grade in grades_input) else max(filter(lambda x: x is not None, grades_input))
max_grade = (
None
if all(grade is None for grade in grades_input)
else max(filter(lambda x: x is not None, grades_input))
)
status_dict['grade'] = text_type(max_grade)
return status_dict

View File

@@ -56,30 +56,29 @@ class TestLoginHelper(TestCase):
""" Test unsafe next parameter """
with LogCapture(LOGGER_NAME, level=log_level) as logger:
req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=unsafe_url))
req.META["HTTP_ACCEPT"] = http_accept # pylint: disable=no-member
req.META["HTTP_USER_AGENT"] = user_agent # pylint: disable=no-member
req.META["HTTP_ACCEPT"] = http_accept
req.META["HTTP_USER_AGENT"] = user_agent
get_next_url_for_login_page(req)
logger.check(
(LOGGER_NAME, log_name, expected_log)
)
@ddt.data(
('/dashboard', 'testserver'),
('https://edx.org/courses', 'edx.org'),
('https://test.edx.org/courses', 'edx.org'),
('https://test2.edx.org/courses', 'edx.org'),
('/dashboard', 'text/html', 'testserver'),
('https://edx.org/courses', 'text/*', 'edx.org'),
('https://test.edx.org/courses', '*/*', 'edx.org'),
('https://test2.edx.org/courses', 'image/webp, */*;q=0.8', 'edx.org'),
)
@ddt.unpack
@override_settings(LOGIN_REDIRECT_WHITELIST=['test.edx.org', 'test2.edx.org'])
def test_safe_next(self, next_url, host):
def test_safe_next(self, next_url, http_accept, host):
""" Test safe next parameter """
req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=next_url), HTTP_HOST=host)
req.META["HTTP_ACCEPT"] = "text/html" # pylint: disable=no-member
req.META["HTTP_ACCEPT"] = http_accept
next_page = get_next_url_for_login_page(req)
self.assertEqual(next_page, next_url)
@patch('student.helpers.third_party_auth.pipeline.get')
@ddt.data(
tpa_hint_test_cases = [
# Test requests outside the TPA pipeline - tpa_hint should be added.
(None, '/dashboard', '/dashboard', False),
('', '/dashboard', '/dashboard', False),
@@ -95,14 +94,36 @@ class TestLoginHelper(TestCase):
('saml-idp', '/dashboard', '/dashboard', True),
# OK to leave tpa_hint overrides in place.
('saml-idp', '/dashboard?tpa_hint=oa2-google-oauth2', '/dashboard?tpa_hint=oa2-google-oauth2', True),
)
]
tpa_hint_test_cases_with_method = [
(method, *test_case)
for test_case in tpa_hint_test_cases
for method in ['GET', 'POST']
]
@patch('student.helpers.third_party_auth.pipeline.get')
@ddt.data(*tpa_hint_test_cases_with_method)
@ddt.unpack
def test_third_party_auth_hint(self, tpa_hint, next_url, expected_url, running_pipeline, mock_running_pipeline):
def test_third_party_auth_hint(
self,
method,
tpa_hint,
next_url,
expected_url,
running_pipeline,
mock_running_pipeline,
):
mock_running_pipeline.return_value = running_pipeline
def validate_login():
req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=next_url))
req.META["HTTP_ACCEPT"] = "text/html" # pylint: disable=no-member
"""
Assert that get_next_url_for_login_page returns as expected.
"""
if method == 'GET':
req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=next_url))
elif method == 'POST':
req = self.request.post(settings.LOGIN_URL, {'next': next_url})
req.META["HTTP_ACCEPT"] = "text/html"
self._add_session(req)
next_page = get_next_url_for_login_page(req)
self.assertEqual(next_page, expected_url)

View File

@@ -25,6 +25,15 @@ class TestRedirectUtils(TestCase):
RedirectCase = namedtuple('RedirectCase', ['url', 'host', 'req_is_secure', 'expected_is_safe'])
@staticmethod
def _is_safe_redirect(req, url):
return is_safe_login_or_logout_redirect(
redirect_to=url,
request_host=req.get_host(),
dot_client_id=req.GET.get('client_id'),
require_https=req.is_secure(),
)
@ddt.data(
RedirectCase('/dashboard', 'testserver', req_is_secure=True, expected_is_safe=True),
RedirectCase('https://test.edx.org/courses', 'edx.org', req_is_secure=True, expected_is_safe=True),
@@ -44,7 +53,7 @@ class TestRedirectUtils(TestCase):
""" Test safe next parameter """
req = self.request.get('/login', HTTP_HOST=host)
req.is_secure = lambda: req_is_secure
actual_is_safe = is_safe_login_or_logout_redirect(req, url)
actual_is_safe = self._is_safe_redirect(req, url)
self.assertEqual(actual_is_safe, expected_is_safe)
@ddt.data(
@@ -60,7 +69,7 @@ class TestRedirectUtils(TestCase):
'redirect_url': redirect_url,
}
req = self.request.get('/logout?{}'.format(urlencode(params)), HTTP_HOST=host)
actual_is_safe = is_safe_login_or_logout_redirect(req, redirect_url)
actual_is_safe = self._is_safe_redirect(req, redirect_url)
self.assertEqual(actual_is_safe, expected_is_safe)

View File

@@ -12,24 +12,36 @@ from oauth2_provider.models import Application
from six.moves.urllib.parse import urlparse # pylint: disable=import-error
def is_safe_login_or_logout_redirect(request, redirect_to):
def is_safe_login_or_logout_redirect(redirect_to, request_host, dot_client_id, require_https):
"""
Determine if the given redirect URL/path is safe for redirection.
Arguments:
redirect_to (str):
The URL in question.
request_host (str):
Originating hostname of the request.
This is always considered an acceptable redirect target.
dot_client_id (str|None):
ID of Django OAuth Toolkit client.
It is acceptable to redirect to any of the DOT client's redirct URIs.
This argument is ignored if it is None.
require_https (str):
Whether HTTPs should be required in the redirect URL.
Returns: bool
"""
login_redirect_whitelist = set(getattr(settings, 'LOGIN_REDIRECT_WHITELIST', []))
request_host = request.get_host() # e.g. 'courses.edx.org'
login_redirect_whitelist.add(request_host)
# Allow OAuth2 clients to redirect back to their site after logout.
dot_client_id = request.GET.get('client_id')
if dot_client_id:
application = Application.objects.get(client_id=dot_client_id)
if redirect_to in application.redirect_uris:
login_redirect_whitelist.add(urlparse(redirect_to).netloc)
is_safe_url = http.is_safe_url(
redirect_to, allowed_hosts=login_redirect_whitelist, require_https=request.is_secure(),
redirect_to, allowed_hosts=login_redirect_whitelist, require_https=require_https
)
return is_safe_url

View File

@@ -5,7 +5,6 @@ Much of this file was broken out from views.py, previous history can be found th
"""
from functools import wraps
import json
import logging
@@ -39,6 +38,7 @@ from openedx.core.djangoapps.user_authn.views.password_reset import send_passwor
from openedx.core.djangoapps.user_authn.config.waffle import ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY
from openedx.core.djangolib.markup import HTML, Text
from openedx.core.lib.api.view_utils import require_post_params
from student.helpers import get_next_url_for_login_page
from student.models import LoginFailures, AllowedAuthUser, UserProfile
from student.views import compose_and_send_activation_email
from third_party_auth import pipeline, provider
@@ -430,6 +430,8 @@ def login_user(request):
if is_user_third_party_authenticated:
running_pipeline = pipeline.get(request)
redirect_url = pipeline.get_complete_url(backend_name=running_pipeline['backend'])
elif settings.FEATURES.get('ENABLE_LOGIN_MICROFRONTEND'):
redirect_url = get_next_url_for_login_page(request)
response = JsonResponse({
'success': True,

View File

@@ -7,7 +7,6 @@ import edx_oauth2_provider
import six.moves.urllib.parse as parse # pylint: disable=import-error
from django.conf import settings
from django.contrib.auth import logout
from django.shortcuts import redirect
from django.utils.http import urlencode
from django.views.generic import TemplateView
from provider.oauth2.models import Client
@@ -61,10 +60,13 @@ class LogoutView(TemplateView):
if target_url:
target_url = parse.unquote(parse.quote_plus(target_url))
if target_url and is_safe_login_or_logout_redirect(self.request, target_url):
return target_url
else:
return self.default_target
use_target_url = target_url and is_safe_login_or_logout_redirect(
redirect_to=target_url,
request_host=self.request.get_host(),
dot_client_id=self.request.GET.get('client_id'),
require_https=self.request.is_secure(),
)
return target_url if use_target_url else self.default_target
def dispatch(self, request, *args, **kwargs):
# We do not log here, because we have a handler registered to perform logging on successful logouts.

View File

@@ -14,8 +14,7 @@ from django.conf import settings
from django.contrib.auth.models import User
from django.core import mail
from django.core.cache import cache
from django.http import HttpRequest, HttpResponse, HttpResponseBadRequest
from django.test import TestCase
from django.http import HttpResponse
from django.test.client import Client
from django.test.utils import override_settings
from django.urls import NoReverseMatch, reverse
@@ -80,6 +79,80 @@ class LoginTest(SiteMixin, CacheIsolationTestCase):
self._assert_response(response, success=True)
self._assert_audit_log(mock_audit_log, 'info', [u'Login success', self.user_email])
FEATURES_WITH_LOGIN_MFE_ENABLED = settings.FEATURES.copy()
FEATURES_WITH_LOGIN_MFE_ENABLED['ENABLE_LOGIN_MICROFRONTEND'] = True
@ddt.data(
# Default redirect is dashboard.
{
'next_url': None,
'course_id': None,
'expected_redirect': '/dashboard',
},
# A relative path is an acceptable redirect.
{
'next_url': '/harmless-relative-page',
'course_id': None,
'expected_redirect': '/harmless-relative-page',
},
# Paths without trailing slashes are also considered relative.
{
'next_url': 'courses',
'course_id': None,
'expected_redirect': 'courses',
},
# An absolute URL to a non-whitelisted domain is not an acceptable redirect.
{
'next_url': 'https://evil.sketchysite',
'course_id': None,
'expected_redirect': '/dashboard',
},
# An absolute URL to a whitelisted domain is acceptable.
{
'next_url': 'https://openedx.service/coolpage',
'course_id': None,
'expected_redirect': 'https://openedx.service/coolpage',
},
# If course_id is provided, redirect to finish_auth with dashboard as next.
{
'next_url': None,
'course_id': 'coursekey',
'expected_redirect': '/account/finish_auth?course_id=coursekey&next=%2Fdashboard',
},
# If valid course_id AND next_url are provided, redirect to finish_auth with
# provided next URL.
{
'next_url': 'freshpage',
'course_id': 'coursekey',
'expected_redirect': '/account/finish_auth?course_id=coursekey&next=freshpage',
},
# If course_id is provided with invalid next_url, redirect to finish_auth with
# course_id and dashboard as next URL.
{
'next_url': 'http://scam.scam',
'course_id': 'coursekey',
'expected_redirect': '/account/finish_auth?course_id=coursekey&next=%2Fdashboard',
},
)
@ddt.unpack
@override_settings(LOGIN_REDIRECT_WHITELIST=['openedx.service'])
@override_settings(FEATURES=FEATURES_WITH_LOGIN_MFE_ENABLED)
def test_login_success_with_redirect(self, next_url, course_id, expected_redirect):
# Default redirect is student dashboard.
post_params = {}
if next_url:
post_params['next'] = next_url
if course_id:
post_params['course_id'] = course_id
response, _ = self._login_response(
self.user_email,
self.password,
extra_post_params=post_params,
HTTP_ACCEPT='*/*',
)
self._assert_response(response, success=True)
self._assert_redirect_url(response, expected_redirect)
@patch.dict("django.conf.settings.FEATURES", {'SQUELCH_PII_IN_LOGS': True})
def test_login_success_no_pii(self):
response, mock_audit_log = self._login_response(
@@ -484,7 +557,9 @@ class LoginTest(SiteMixin, CacheIsolationTestCase):
response, _ = self._login_response(self.user.email, password_entered)
self._assert_response(response, success=login_success)
def _login_response(self, email, password, patched_audit_log=None, extra_post_params=None):
def _login_response(
self, email, password, patched_audit_log=None, extra_post_params=None, **extra
):
"""
Post the login info
"""
@@ -494,7 +569,7 @@ class LoginTest(SiteMixin, CacheIsolationTestCase):
if extra_post_params is not None:
post_params.update(extra_post_params)
with patch(patched_audit_log) as mock_audit_log:
result = self.client.post(self.url, post_params)
result = self.client.post(self.url, post_params, **extra)
return result, mock_audit_log
def _assert_response(self, response, success=None, value=None, status_code=None):
@@ -525,6 +600,21 @@ class LoginTest(SiteMixin, CacheIsolationTestCase):
(six.text_type(response_dict['value']), six.text_type(value)))
self.assertIn(value, response_dict['value'], msg)
def _assert_redirect_url(self, response, expected_redirect_url):
"""
Assert that the redirect URL is in the response and has the expected value.
Assumes that response content is well-formed JSON
(you can call `_assert_response` first to assert this).
"""
response_dict = json.loads(response.content.decode('utf-8'))
assert 'redirect_url' in response_dict, (
"Response JSON unexpectedly does not have redirect_url: {!r}".format(
response_dict
)
)
assert response_dict['redirect_url'] == expected_redirect_url
def _assert_audit_log(self, mock_audit_log, level, log_strings):
"""
Check that the audit log has received the expected call as its last call.