diff --git a/common/djangoapps/student/helpers.py b/common/djangoapps/student/helpers.py index e31b533cff..934096c129 100644 --- a/common/djangoapps/student/helpers.py +++ b/common/djangoapps/student/helpers.py @@ -6,10 +6,10 @@ Helpers for the student app. import json import logging import mimetypes +import urllib.parse from collections import OrderedDict from datetime import datetime -import six.moves.urllib.parse from completion.exceptions import UnavailableCompletionData from completion.utilities import get_key_to_last_completed_block from django.conf import settings @@ -234,23 +234,31 @@ def get_next_url_for_login_page(request): /account/finish_auth/ view following login, which will take care of auto-enrollment in the specified course. - Otherwise, we go to the ?next= query param or to the dashboard if nothing else is + Otherwise, we go to the `next` param or to the dashboard if nothing else is specified. If THIRD_PARTY_AUTH_HINT is set, then `tpa_hint=` is added as a query parameter. + + This works with both GET and POST requests. """ - redirect_to = _get_redirect_to(request) + request_params = request.GET if request.method == 'GET' else request.POST + redirect_to = _get_redirect_to( + request_host=request.get_host(), + request_headers=request.META, + request_params=request_params, + request_is_https=request.is_secure(), + ) if not redirect_to: try: redirect_to = reverse('dashboard') except NoReverseMatch: redirect_to = reverse('home') - if any(param in request.GET for param in POST_AUTH_PARAMS): + if any(param in request_params for param in POST_AUTH_PARAMS): # Before we redirect to next/dashboard, we need to handle auto-enrollment: - params = [(param, request.GET[param]) for param in POST_AUTH_PARAMS if param in request.GET] + params = [(param, request_params[param]) for param in POST_AUTH_PARAMS if param in request_params] params.append(('next', redirect_to)) # After auto-enrollment, user will be sent to payment page or to this URL - redirect_to = '{}?{}'.format(reverse('finish_auth'), six.moves.urllib.parse.urlencode(params)) + redirect_to = '{}?{}'.format(reverse('finish_auth'), urllib.parse.urlencode(params)) # Note: if we are resuming a third party auth pipeline, then the next URL will already # be saved in the session as part of the pipeline state. That URL will take priority # over this one. @@ -264,26 +272,35 @@ def get_next_url_for_login_page(request): # Don't add tpa_hint if we're already in the TPA pipeline (prevent infinite loop), # and don't overwrite any existing tpa_hint params (allow tpa_hint override). running_pipeline = third_party_auth.pipeline.get(request) - (scheme, netloc, path, query, fragment) = list(six.moves.urllib.parse.urlsplit(redirect_to)) + (scheme, netloc, path, query, fragment) = list(urllib.parse.urlsplit(redirect_to)) if not running_pipeline and 'tpa_hint' not in query: - params = six.moves.urllib.parse.parse_qs(query) + params = urllib.parse.parse_qs(query) params['tpa_hint'] = [tpa_hint] - query = six.moves.urllib.parse.urlencode(params, doseq=True) - redirect_to = six.moves.urllib.parse.urlunsplit((scheme, netloc, path, query, fragment)) + query = urllib.parse.urlencode(params, doseq=True) + redirect_to = urllib.parse.urlunsplit((scheme, netloc, path, query, fragment)) return redirect_to -def _get_redirect_to(request): +def _get_redirect_to(request_host, request_headers, request_params, request_is_https): """ Determine the redirect url and return if safe - :argument - request: request object - :returns: redirect url if safe else None + Arguments: + request_host (str) + request_headers (dict) + request_params (QueryDict) + request_is_https (bool) + + Returns: str + redirect url if safe else None """ - redirect_to = request.GET.get('next') - header_accept = request.META.get('HTTP_ACCEPT', '') + redirect_to = request_params.get('next') + header_accept = request_headers.get('HTTP_ACCEPT', '') + accepts_text_html = any( + mime_type in header_accept + for mime_type in {'*/*', 'text/*', 'text/html'} + ) # If we get a redirect parameter, make sure it's safe i.e. not redirecting outside our domain. # Also make sure that it is not redirecting to a static asset and redirected page is web page @@ -291,19 +308,25 @@ def _get_redirect_to(request): # get information about a user on edx.org. In any such case drop the parameter. if redirect_to: mime_type, _ = mimetypes.guess_type(redirect_to, strict=False) - if not is_safe_login_or_logout_redirect(request, redirect_to): + safe_redirect = is_safe_login_or_logout_redirect( + redirect_to=redirect_to, + request_host=request_host, + dot_client_id=request_params.get('client_id'), + require_https=request_is_https, + ) + if not safe_redirect: log.warning( u"Unsafe redirect parameter detected after login page: '%(redirect_to)s'", {"redirect_to": redirect_to} ) redirect_to = None - elif 'text/html' not in header_accept: + elif not accepts_text_html: log.info( u"Redirect to non html content '%(content_type)s' detected from '%(user_agent)s'" u" after login page: '%(redirect_to)s'", { "redirect_to": redirect_to, "content_type": header_accept, - "user_agent": request.META.get('HTTP_USER_AGENT', '') + "user_agent": request_headers.get('HTTP_USER_AGENT', '') } ) redirect_to = None @@ -321,7 +344,7 @@ def _get_redirect_to(request): redirect_to = None else: themes = get_themes() - next_path = six.moves.urllib.parse.urlparse(redirect_to).path + next_path = urllib.parse.urlparse(redirect_to).path for theme in themes: if theme.theme_dir_name in next_path: log.warning( @@ -539,7 +562,11 @@ def _cert_info(user, course_overview, cert_status): # We can add a log.warning here once we think it shouldn't happen. return default_info grades_input = [cert_grade_percent, persisted_grade_percent] - max_grade = None if all(grade is None for grade in grades_input) else max(filter(lambda x: x is not None, grades_input)) + max_grade = ( + None + if all(grade is None for grade in grades_input) + else max(filter(lambda x: x is not None, grades_input)) + ) status_dict['grade'] = text_type(max_grade) return status_dict diff --git a/common/djangoapps/student/tests/test_helpers.py b/common/djangoapps/student/tests/test_helpers.py index 01d64dd8e6..eb8289b749 100644 --- a/common/djangoapps/student/tests/test_helpers.py +++ b/common/djangoapps/student/tests/test_helpers.py @@ -56,30 +56,29 @@ class TestLoginHelper(TestCase): """ Test unsafe next parameter """ with LogCapture(LOGGER_NAME, level=log_level) as logger: req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=unsafe_url)) - req.META["HTTP_ACCEPT"] = http_accept # pylint: disable=no-member - req.META["HTTP_USER_AGENT"] = user_agent # pylint: disable=no-member + req.META["HTTP_ACCEPT"] = http_accept + req.META["HTTP_USER_AGENT"] = user_agent get_next_url_for_login_page(req) logger.check( (LOGGER_NAME, log_name, expected_log) ) @ddt.data( - ('/dashboard', 'testserver'), - ('https://edx.org/courses', 'edx.org'), - ('https://test.edx.org/courses', 'edx.org'), - ('https://test2.edx.org/courses', 'edx.org'), + ('/dashboard', 'text/html', 'testserver'), + ('https://edx.org/courses', 'text/*', 'edx.org'), + ('https://test.edx.org/courses', '*/*', 'edx.org'), + ('https://test2.edx.org/courses', 'image/webp, */*;q=0.8', 'edx.org'), ) @ddt.unpack @override_settings(LOGIN_REDIRECT_WHITELIST=['test.edx.org', 'test2.edx.org']) - def test_safe_next(self, next_url, host): + def test_safe_next(self, next_url, http_accept, host): """ Test safe next parameter """ req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=next_url), HTTP_HOST=host) - req.META["HTTP_ACCEPT"] = "text/html" # pylint: disable=no-member + req.META["HTTP_ACCEPT"] = http_accept next_page = get_next_url_for_login_page(req) self.assertEqual(next_page, next_url) - @patch('student.helpers.third_party_auth.pipeline.get') - @ddt.data( + tpa_hint_test_cases = [ # Test requests outside the TPA pipeline - tpa_hint should be added. (None, '/dashboard', '/dashboard', False), ('', '/dashboard', '/dashboard', False), @@ -95,14 +94,36 @@ class TestLoginHelper(TestCase): ('saml-idp', '/dashboard', '/dashboard', True), # OK to leave tpa_hint overrides in place. ('saml-idp', '/dashboard?tpa_hint=oa2-google-oauth2', '/dashboard?tpa_hint=oa2-google-oauth2', True), - ) + ] + tpa_hint_test_cases_with_method = [ + (method, *test_case) + for test_case in tpa_hint_test_cases + for method in ['GET', 'POST'] + ] + + @patch('student.helpers.third_party_auth.pipeline.get') + @ddt.data(*tpa_hint_test_cases_with_method) @ddt.unpack - def test_third_party_auth_hint(self, tpa_hint, next_url, expected_url, running_pipeline, mock_running_pipeline): + def test_third_party_auth_hint( + self, + method, + tpa_hint, + next_url, + expected_url, + running_pipeline, + mock_running_pipeline, + ): mock_running_pipeline.return_value = running_pipeline def validate_login(): - req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=next_url)) - req.META["HTTP_ACCEPT"] = "text/html" # pylint: disable=no-member + """ + Assert that get_next_url_for_login_page returns as expected. + """ + if method == 'GET': + req = self.request.get(settings.LOGIN_URL + "?next={url}".format(url=next_url)) + elif method == 'POST': + req = self.request.post(settings.LOGIN_URL, {'next': next_url}) + req.META["HTTP_ACCEPT"] = "text/html" self._add_session(req) next_page = get_next_url_for_login_page(req) self.assertEqual(next_page, expected_url) diff --git a/openedx/core/djangoapps/user_authn/tests/test_utils.py b/openedx/core/djangoapps/user_authn/tests/test_utils.py index d9d74207b7..710454a640 100644 --- a/openedx/core/djangoapps/user_authn/tests/test_utils.py +++ b/openedx/core/djangoapps/user_authn/tests/test_utils.py @@ -25,6 +25,15 @@ class TestRedirectUtils(TestCase): RedirectCase = namedtuple('RedirectCase', ['url', 'host', 'req_is_secure', 'expected_is_safe']) + @staticmethod + def _is_safe_redirect(req, url): + return is_safe_login_or_logout_redirect( + redirect_to=url, + request_host=req.get_host(), + dot_client_id=req.GET.get('client_id'), + require_https=req.is_secure(), + ) + @ddt.data( RedirectCase('/dashboard', 'testserver', req_is_secure=True, expected_is_safe=True), RedirectCase('https://test.edx.org/courses', 'edx.org', req_is_secure=True, expected_is_safe=True), @@ -44,7 +53,7 @@ class TestRedirectUtils(TestCase): """ Test safe next parameter """ req = self.request.get('/login', HTTP_HOST=host) req.is_secure = lambda: req_is_secure - actual_is_safe = is_safe_login_or_logout_redirect(req, url) + actual_is_safe = self._is_safe_redirect(req, url) self.assertEqual(actual_is_safe, expected_is_safe) @ddt.data( @@ -60,7 +69,7 @@ class TestRedirectUtils(TestCase): 'redirect_url': redirect_url, } req = self.request.get('/logout?{}'.format(urlencode(params)), HTTP_HOST=host) - actual_is_safe = is_safe_login_or_logout_redirect(req, redirect_url) + actual_is_safe = self._is_safe_redirect(req, redirect_url) self.assertEqual(actual_is_safe, expected_is_safe) diff --git a/openedx/core/djangoapps/user_authn/utils.py b/openedx/core/djangoapps/user_authn/utils.py index bce494f0bf..e9fddab58f 100644 --- a/openedx/core/djangoapps/user_authn/utils.py +++ b/openedx/core/djangoapps/user_authn/utils.py @@ -12,24 +12,36 @@ from oauth2_provider.models import Application from six.moves.urllib.parse import urlparse # pylint: disable=import-error -def is_safe_login_or_logout_redirect(request, redirect_to): +def is_safe_login_or_logout_redirect(redirect_to, request_host, dot_client_id, require_https): """ Determine if the given redirect URL/path is safe for redirection. + + Arguments: + redirect_to (str): + The URL in question. + request_host (str): + Originating hostname of the request. + This is always considered an acceptable redirect target. + dot_client_id (str|None): + ID of Django OAuth Toolkit client. + It is acceptable to redirect to any of the DOT client's redirct URIs. + This argument is ignored if it is None. + require_https (str): + Whether HTTPs should be required in the redirect URL. + + Returns: bool """ login_redirect_whitelist = set(getattr(settings, 'LOGIN_REDIRECT_WHITELIST', [])) - - request_host = request.get_host() # e.g. 'courses.edx.org' login_redirect_whitelist.add(request_host) # Allow OAuth2 clients to redirect back to their site after logout. - dot_client_id = request.GET.get('client_id') if dot_client_id: application = Application.objects.get(client_id=dot_client_id) if redirect_to in application.redirect_uris: login_redirect_whitelist.add(urlparse(redirect_to).netloc) is_safe_url = http.is_safe_url( - redirect_to, allowed_hosts=login_redirect_whitelist, require_https=request.is_secure(), + redirect_to, allowed_hosts=login_redirect_whitelist, require_https=require_https ) return is_safe_url diff --git a/openedx/core/djangoapps/user_authn/views/login.py b/openedx/core/djangoapps/user_authn/views/login.py index ccf45b9e85..b8c712aca1 100644 --- a/openedx/core/djangoapps/user_authn/views/login.py +++ b/openedx/core/djangoapps/user_authn/views/login.py @@ -5,7 +5,6 @@ Much of this file was broken out from views.py, previous history can be found th """ -from functools import wraps import json import logging @@ -39,6 +38,7 @@ from openedx.core.djangoapps.user_authn.views.password_reset import send_passwor from openedx.core.djangoapps.user_authn.config.waffle import ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY from openedx.core.djangolib.markup import HTML, Text from openedx.core.lib.api.view_utils import require_post_params +from student.helpers import get_next_url_for_login_page from student.models import LoginFailures, AllowedAuthUser, UserProfile from student.views import compose_and_send_activation_email from third_party_auth import pipeline, provider @@ -430,6 +430,8 @@ def login_user(request): if is_user_third_party_authenticated: running_pipeline = pipeline.get(request) redirect_url = pipeline.get_complete_url(backend_name=running_pipeline['backend']) + elif settings.FEATURES.get('ENABLE_LOGIN_MICROFRONTEND'): + redirect_url = get_next_url_for_login_page(request) response = JsonResponse({ 'success': True, diff --git a/openedx/core/djangoapps/user_authn/views/logout.py b/openedx/core/djangoapps/user_authn/views/logout.py index 093f1bab5f..a6684c96e5 100644 --- a/openedx/core/djangoapps/user_authn/views/logout.py +++ b/openedx/core/djangoapps/user_authn/views/logout.py @@ -7,7 +7,6 @@ import edx_oauth2_provider import six.moves.urllib.parse as parse # pylint: disable=import-error from django.conf import settings from django.contrib.auth import logout -from django.shortcuts import redirect from django.utils.http import urlencode from django.views.generic import TemplateView from provider.oauth2.models import Client @@ -61,10 +60,13 @@ class LogoutView(TemplateView): if target_url: target_url = parse.unquote(parse.quote_plus(target_url)) - if target_url and is_safe_login_or_logout_redirect(self.request, target_url): - return target_url - else: - return self.default_target + use_target_url = target_url and is_safe_login_or_logout_redirect( + redirect_to=target_url, + request_host=self.request.get_host(), + dot_client_id=self.request.GET.get('client_id'), + require_https=self.request.is_secure(), + ) + return target_url if use_target_url else self.default_target def dispatch(self, request, *args, **kwargs): # We do not log here, because we have a handler registered to perform logging on successful logouts. diff --git a/openedx/core/djangoapps/user_authn/views/tests/test_login.py b/openedx/core/djangoapps/user_authn/views/tests/test_login.py index 164f24a224..da77569898 100644 --- a/openedx/core/djangoapps/user_authn/views/tests/test_login.py +++ b/openedx/core/djangoapps/user_authn/views/tests/test_login.py @@ -14,8 +14,7 @@ from django.conf import settings from django.contrib.auth.models import User from django.core import mail from django.core.cache import cache -from django.http import HttpRequest, HttpResponse, HttpResponseBadRequest -from django.test import TestCase +from django.http import HttpResponse from django.test.client import Client from django.test.utils import override_settings from django.urls import NoReverseMatch, reverse @@ -80,6 +79,80 @@ class LoginTest(SiteMixin, CacheIsolationTestCase): self._assert_response(response, success=True) self._assert_audit_log(mock_audit_log, 'info', [u'Login success', self.user_email]) + FEATURES_WITH_LOGIN_MFE_ENABLED = settings.FEATURES.copy() + FEATURES_WITH_LOGIN_MFE_ENABLED['ENABLE_LOGIN_MICROFRONTEND'] = True + + @ddt.data( + # Default redirect is dashboard. + { + 'next_url': None, + 'course_id': None, + 'expected_redirect': '/dashboard', + }, + # A relative path is an acceptable redirect. + { + 'next_url': '/harmless-relative-page', + 'course_id': None, + 'expected_redirect': '/harmless-relative-page', + }, + # Paths without trailing slashes are also considered relative. + { + 'next_url': 'courses', + 'course_id': None, + 'expected_redirect': 'courses', + }, + # An absolute URL to a non-whitelisted domain is not an acceptable redirect. + { + 'next_url': 'https://evil.sketchysite', + 'course_id': None, + 'expected_redirect': '/dashboard', + }, + # An absolute URL to a whitelisted domain is acceptable. + { + 'next_url': 'https://openedx.service/coolpage', + 'course_id': None, + 'expected_redirect': 'https://openedx.service/coolpage', + }, + # If course_id is provided, redirect to finish_auth with dashboard as next. + { + 'next_url': None, + 'course_id': 'coursekey', + 'expected_redirect': '/account/finish_auth?course_id=coursekey&next=%2Fdashboard', + }, + # If valid course_id AND next_url are provided, redirect to finish_auth with + # provided next URL. + { + 'next_url': 'freshpage', + 'course_id': 'coursekey', + 'expected_redirect': '/account/finish_auth?course_id=coursekey&next=freshpage', + }, + # If course_id is provided with invalid next_url, redirect to finish_auth with + # course_id and dashboard as next URL. + { + 'next_url': 'http://scam.scam', + 'course_id': 'coursekey', + 'expected_redirect': '/account/finish_auth?course_id=coursekey&next=%2Fdashboard', + }, + ) + @ddt.unpack + @override_settings(LOGIN_REDIRECT_WHITELIST=['openedx.service']) + @override_settings(FEATURES=FEATURES_WITH_LOGIN_MFE_ENABLED) + def test_login_success_with_redirect(self, next_url, course_id, expected_redirect): + # Default redirect is student dashboard. + post_params = {} + if next_url: + post_params['next'] = next_url + if course_id: + post_params['course_id'] = course_id + response, _ = self._login_response( + self.user_email, + self.password, + extra_post_params=post_params, + HTTP_ACCEPT='*/*', + ) + self._assert_response(response, success=True) + self._assert_redirect_url(response, expected_redirect) + @patch.dict("django.conf.settings.FEATURES", {'SQUELCH_PII_IN_LOGS': True}) def test_login_success_no_pii(self): response, mock_audit_log = self._login_response( @@ -484,7 +557,9 @@ class LoginTest(SiteMixin, CacheIsolationTestCase): response, _ = self._login_response(self.user.email, password_entered) self._assert_response(response, success=login_success) - def _login_response(self, email, password, patched_audit_log=None, extra_post_params=None): + def _login_response( + self, email, password, patched_audit_log=None, extra_post_params=None, **extra + ): """ Post the login info """ @@ -494,7 +569,7 @@ class LoginTest(SiteMixin, CacheIsolationTestCase): if extra_post_params is not None: post_params.update(extra_post_params) with patch(patched_audit_log) as mock_audit_log: - result = self.client.post(self.url, post_params) + result = self.client.post(self.url, post_params, **extra) return result, mock_audit_log def _assert_response(self, response, success=None, value=None, status_code=None): @@ -525,6 +600,21 @@ class LoginTest(SiteMixin, CacheIsolationTestCase): (six.text_type(response_dict['value']), six.text_type(value))) self.assertIn(value, response_dict['value'], msg) + def _assert_redirect_url(self, response, expected_redirect_url): + """ + Assert that the redirect URL is in the response and has the expected value. + + Assumes that response content is well-formed JSON + (you can call `_assert_response` first to assert this). + """ + response_dict = json.loads(response.content.decode('utf-8')) + assert 'redirect_url' in response_dict, ( + "Response JSON unexpectedly does not have redirect_url: {!r}".format( + response_dict + ) + ) + assert response_dict['redirect_url'] == expected_redirect_url + def _assert_audit_log(self, mock_audit_log, level, log_strings): """ Check that the audit log has received the expected call as its last call.