Move LoginSessionView from user_api to user_authn

This commit is contained in:
Diana Huang
2019-10-28 16:36:21 -04:00
committed by Nimisha Asthagiri
parent 748398e57a
commit 6fb40586fb
9 changed files with 490 additions and 482 deletions

View File

@@ -13,7 +13,6 @@ from functools import wraps
import six
from django import forms
from django.core.serializers.json import DjangoJSONEncoder
from django.http import HttpRequest, HttpResponseBadRequest
from django.utils.encoding import force_text
from django.utils.functional import Promise
@@ -89,34 +88,6 @@ def intercept_errors(api_error, ignore_errors=None):
return _decorator
def require_post_params(required_params):
"""
View decorator that ensures the required POST params are
present. If not, returns an HTTP response with status 400.
Args:
required_params (list): The required parameter keys.
Returns:
HttpResponse
"""
def _decorator(func): # pylint: disable=missing-docstring
@wraps(func)
def _wrapped(*args, **_kwargs):
request = args[0]
missing_params = set(required_params) - set(request.POST.keys())
if len(missing_params) > 0:
msg = u"Missing POST parameters: {missing}".format(
missing=", ".join(missing_params)
)
return HttpResponseBadRequest(msg)
else:
return func(request)
return _wrapped
return _decorator
class InvalidFieldError(Exception):
"""The provided field definition is not valid. """
@@ -383,149 +354,6 @@ class LocalizedJSONEncoder(DjangoJSONEncoder):
super(LocalizedJSONEncoder, self).default(obj)
def shim_student_view(view_func, check_logged_in=False):
"""Create a "shim" view for a view function from the student Django app.
Specifically, we need to:
* Strip out enrollment params, since the client for the new registration/login
page will communicate with the enrollment API to update enrollments.
* Return responses with HTTP status codes indicating success/failure
(instead of always using status 200, but setting "success" to False in
the JSON-serialized content of the response)
* Use status code 403 to indicate a login failure.
The shim will preserve any cookies set by the view.
Arguments:
view_func (function): The view function from the student Django app.
Keyword Args:
check_logged_in (boolean): If true, check whether the user successfully
authenticated and if not set the status to 403.
Returns:
function
"""
@wraps(view_func)
def _inner(request): # pylint: disable=missing-docstring
# Make a copy of the current POST request to modify.
modified_request = request.POST.copy()
if isinstance(request, HttpRequest):
# Works for an HttpRequest but not a rest_framework.request.Request.
request.POST = modified_request
else:
# The request must be a rest_framework.request.Request.
request._data = modified_request
# The login and registration handlers in student view try to change
# the user's enrollment status if these parameters are present.
# Since we want the JavaScript client to communicate directly with
# the enrollment API, we want to prevent the student views from
# updating enrollments.
if "enrollment_action" in modified_request:
del modified_request["enrollment_action"]
if "course_id" in modified_request:
del modified_request["course_id"]
# Include the course ID if it's specified in the analytics info
# so it can be included in analytics events.
if "analytics" in modified_request:
try:
analytics = json.loads(modified_request["analytics"])
if "enroll_course_id" in analytics:
modified_request["course_id"] = analytics.get("enroll_course_id")
except (ValueError, TypeError):
LOGGER.error(
u"Could not parse analytics object sent to user API: {analytics}".format(
analytics=analytics
)
)
# Call the original view to generate a response.
# We can safely modify the status code or content
# of the response, but to be safe we won't mess
# with the headers.
response = view_func(request)
# Most responses from this view are JSON-encoded
# dictionaries with keys "success", "value", and
# (sometimes) "redirect_url".
#
# We want to communicate some of this information
# using HTTP status codes instead.
#
# We ignore the "redirect_url" parameter, because we don't need it:
# 1) It's used to redirect on change enrollment, which
# our client will handle directly
# (that's why we strip out the enrollment params from the request)
# 2) It's used by third party auth when a user has already successfully
# authenticated and we're not sending login credentials. However,
# this case is never encountered in practice: on the old login page,
# the login form would be submitted directly, so third party auth
# would always be "trumped" by first party auth. If a user has
# successfully authenticated with us, we redirect them to the dashboard
# regardless of how they authenticated; and if a user is completing
# the third party auth pipeline, we redirect them from the pipeline
# completion end-point directly.
try:
response_dict = json.loads(response.content.decode('utf-8'))
msg = response_dict.get("value", u"")
success = response_dict.get("success")
except (ValueError, TypeError):
msg = response.content
success = True
# If the user is not authenticated when we expect them to be
# send the appropriate status code.
# We check whether the user attribute is set to make
# it easier to test this without necessarily running
# the request through authentication middleware.
is_authenticated = (
getattr(request, 'user', None) is not None
and request.user.is_authenticated
)
if check_logged_in and not is_authenticated:
# If we get a 403 status code from the student view
# this means we've successfully authenticated with a
# third party provider, but we don't have a linked
# EdX account. Send a helpful error code so the client
# knows this occurred.
if response.status_code == 403:
response.content = "third-party-auth"
# Otherwise, it's a general authentication failure.
# Ensure that the status code is a 403 and pass
# along the message from the view.
else:
response.status_code = 403
response.content = msg
# If an error condition occurs, send a status 400
elif response.status_code != 200 or not success:
# The student views tend to send status 200 even when an error occurs
# If the JSON-serialized content has a value "success" set to False,
# then we know an error occurred.
if response.status_code == 200:
response.status_code = 400
response.content = msg
# If the response is successful, then return the content
# of the response directly rather than including it
# in a JSON-serialized dictionary.
else:
response.content = msg
# Return the response, preserving the original headers.
# This is really important, since the student views set cookies
# that are used elsewhere in the system (such as the marketing site).
return response
return _inner
def serializer_is_dirty(preference_serializer):
"""
Return True if saving the supplied (Raw)UserPreferenceSerializer would change the database.

View File

@@ -38,8 +38,6 @@ urlpatterns = [
]
urlpatterns += [
url(r'^v1/account/login_session/$', user_api_views.LoginSessionView.as_view(),
name="user_api_login_session"),
url(r'^v1/account/password_reset/$', user_api_views.PasswordResetView.as_view(),
name="user_api_password_reset"),
]

View File

@@ -10,11 +10,10 @@ import ddt
import mock
import pytest
from django import forms
from django.http import HttpRequest, HttpResponse
from django.test import TestCase
from six import text_type
from ..helpers import FormDescription, InvalidFieldError, intercept_errors, shim_student_view
from ..helpers import FormDescription, InvalidFieldError, intercept_errors
class FakeInputException(Exception):
@@ -188,93 +187,6 @@ class FormDescriptionTest(TestCase):
)
@ddt.ddt
class StudentViewShimTest(TestCase):
"Tests of the student view shim."
def setUp(self):
super(StudentViewShimTest, self).setUp()
self.captured_request = None
def test_strip_enrollment_action(self):
view = self._shimmed_view(HttpResponse())
request = HttpRequest()
request.POST["enrollment_action"] = "enroll"
request.POST["course_id"] = "edx/101/demo"
view(request)
# Expect that the enrollment action and course ID
# were stripped out before reaching the wrapped view.
self.assertNotIn("enrollment_action", self.captured_request.POST)
self.assertNotIn("course_id", self.captured_request.POST)
def test_include_analytics_info(self):
view = self._shimmed_view(HttpResponse())
request = HttpRequest()
request.POST["analytics"] = json.dumps({
"enroll_course_id": "edX/DemoX/Fall"
})
view(request)
# Expect that the analytics course ID was passed to the view
self.assertEqual(self.captured_request.POST.get("course_id"), "edX/DemoX/Fall")
def test_third_party_auth_login_failure(self):
view = self._shimmed_view(
HttpResponse(status=403),
check_logged_in=True
)
response = view(HttpRequest())
self.assertEqual(response.status_code, 403)
self.assertEqual(response.content, b"third-party-auth")
def test_non_json_response(self):
view = self._shimmed_view(HttpResponse(content="Not a JSON dict"))
response = view(HttpRequest())
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b"Not a JSON dict")
@ddt.data("redirect", "redirect_url")
def test_ignore_redirect_from_json(self, redirect_key):
view = self._shimmed_view(
HttpResponse(content=json.dumps({
"success": True,
redirect_key: "/redirect"
}))
)
response = view(HttpRequest())
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content.decode('utf-8'), "")
def test_error_from_json(self):
view = self._shimmed_view(
HttpResponse(content=json.dumps({
"success": False,
"value": "Error!"
}))
)
response = view(HttpRequest())
self.assertEqual(response.status_code, 400)
self.assertEqual(response.content, b"Error!")
def test_preserve_headers(self):
view_response = HttpResponse()
view_response["test-header"] = "test"
view = self._shimmed_view(view_response)
response = view(HttpRequest())
self.assertEqual(response["test-header"], "test")
def test_check_logged_in(self):
view = self._shimmed_view(HttpResponse(), check_logged_in=True)
response = view(HttpRequest())
self.assertEqual(response.status_code, 403)
def _shimmed_view(self, response, check_logged_in=False):
def stub_view(request):
self.captured_request = request
return response
return shim_student_view(stub_view, check_logged_in=check_logged_in)
class DummyRegistrationExtensionModel(object):
"""
Dummy registration object

View File

@@ -2,7 +2,6 @@
from __future__ import absolute_import
import datetime
import json
from unittest import skipUnless
@@ -37,7 +36,6 @@ from third_party_auth.tests.utils import (
ThirdPartyOAuthTestMixinGoogle
)
from util.password_policy_validators import (
DEFAULT_MAX_PASSWORD_LENGTH,
create_validator_config,
password_validators_instruction_texts,
password_validators_restrictions
@@ -566,155 +564,6 @@ class PreferenceUsersListViewTest(UserApiTestCase):
self.assertEqual(len(set(all_user_uris)), 2)
@ddt.ddt
@skip_unless_lms
class LoginSessionViewTest(UserAPITestCase):
"""Tests for the login end-points of the user API. """
USERNAME = "bob"
EMAIL = "bob@example.com"
PASSWORD = "password"
def setUp(self):
super(LoginSessionViewTest, self).setUp()
self.url = reverse("user_api_login_session")
@ddt.data("get", "post")
def test_auth_disabled(self, method):
self.assertAuthDisabled(method, self.url)
def test_allowed_methods(self):
self.assertAllowedMethods(self.url, ["GET", "POST", "HEAD", "OPTIONS"])
def test_put_not_allowed(self):
response = self.client.put(self.url)
self.assertHttpMethodNotAllowed(response)
def test_delete_not_allowed(self):
response = self.client.delete(self.url)
self.assertHttpMethodNotAllowed(response)
def test_patch_not_allowed(self):
response = self.client.patch(self.url)
self.assertHttpMethodNotAllowed(response)
def test_login_form(self):
# Retrieve the login form
response = self.client.get(self.url, content_type="application/json")
self.assertHttpOK(response)
# Verify that the form description matches what we expect
form_desc = json.loads(response.content.decode('utf-8'))
self.assertEqual(form_desc["method"], "post")
self.assertEqual(form_desc["submit_url"], self.url)
self.assertEqual(form_desc["fields"], [
{
"name": "email",
"defaultValue": "",
"type": "email",
"required": True,
"label": "Email",
"placeholder": "username@domain.com",
"instructions": u"The email address you used to register with {platform_name}".format(
platform_name=settings.PLATFORM_NAME
),
"restrictions": {
"min_length": EMAIL_MIN_LENGTH,
"max_length": EMAIL_MAX_LENGTH
},
"errorMessages": {},
"supplementalText": "",
"supplementalLink": "",
},
{
"name": "password",
"defaultValue": "",
"type": "password",
"required": True,
"label": "Password",
"placeholder": "",
"instructions": "",
"restrictions": {
"max_length": DEFAULT_MAX_PASSWORD_LENGTH,
},
"errorMessages": {},
"supplementalText": "",
"supplementalLink": "",
},
])
def test_login(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Login
response = self.client.post(self.url, {
"email": self.EMAIL,
"password": self.PASSWORD,
})
self.assertHttpOK(response)
# Verify that we logged in successfully by accessing
# a page that requires authentication.
response = self.client.get(reverse("dashboard"))
self.assertHttpOK(response)
def test_session_cookie_expiry(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Login and remember me
data = {
"email": self.EMAIL,
"password": self.PASSWORD,
}
response = self.client.post(self.url, data)
self.assertHttpOK(response)
# Verify that the session expiration was set correctly
cookie = self.client.cookies[settings.SESSION_COOKIE_NAME]
expected_expiry = datetime.datetime.utcnow() + datetime.timedelta(weeks=4)
self.assertIn(expected_expiry.strftime('%d-%b-%Y'), cookie.get('expires'))
def test_invalid_credentials(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Invalid password
response = self.client.post(self.url, {
"email": self.EMAIL,
"password": "invalid"
})
self.assertHttpForbidden(response)
# Invalid email address
response = self.client.post(self.url, {
"email": "invalid@example.com",
"password": self.PASSWORD,
})
self.assertHttpForbidden(response)
def test_missing_login_params(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Missing password
response = self.client.post(self.url, {
"email": self.EMAIL,
})
self.assertHttpBadRequest(response)
# Missing email
response = self.client.post(self.url, {
"password": self.PASSWORD,
})
self.assertHttpBadRequest(response)
# Missing both email and password
response = self.client.post(self.url, {})
@ddt.ddt
@skip_unless_lms
class PasswordResetViewTest(UserAPITestCase):

View File

@@ -8,8 +8,7 @@ from django.db import transaction
from django.http import HttpResponse, HttpResponseForbidden
from django.utils.decorators import method_decorator
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_exempt, csrf_protect, ensure_csrf_cookie
from django.views.decorators.debug import sensitive_post_parameters
from django.views.decorators.csrf import ensure_csrf_cookie
from django_filters.rest_framework import DjangoFilterBackend
from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser
from opaque_keys import InvalidKeyError
@@ -25,11 +24,10 @@ from openedx.core.djangoapps.django_comment_common.models import Role
from openedx.core.djangoapps.user_api import accounts
from openedx.core.djangoapps.user_api.accounts.api import check_account_exists
from openedx.core.djangoapps.user_api.api import (
RegistrationFormFactory,
get_login_session_form,
get_password_reset_form
)
from openedx.core.djangoapps.user_api.helpers import require_post_params, shim_student_view
from openedx.core.lib.api.view_utils import require_post_params
from openedx.core.djangoapps.user_api.models import UserPreference
from openedx.core.djangoapps.user_api.preferences.api import get_country_time_zones, update_email_opt_in
from openedx.core.djangoapps.user_api.serializers import (
@@ -42,59 +40,6 @@ from student.helpers import AccountValidationError
from util.json_request import JsonResponse
class LoginSessionView(APIView):
"""HTTP end-points for logging in users. """
# This end-point is available to anonymous users,
# so do not require authentication.
authentication_classes = []
@method_decorator(ensure_csrf_cookie)
def get(self, request):
return HttpResponse(get_login_session_form(request).to_json(), content_type="application/json")
@method_decorator(require_post_params(["email", "password"]))
@method_decorator(csrf_protect)
def post(self, request):
"""Log in a user.
You must send all required form fields with the request.
You can optionally send an `analytics` param with a JSON-encoded
object with additional info to include in the login analytics event.
Currently, the only supported field is "enroll_course_id" to indicate
that the user logged in while enrolling in a particular course.
Arguments:
request (HttpRequest)
Returns:
HttpResponse: 200 on success
HttpResponse: 400 if the request is not valid.
HttpResponse: 403 if authentication failed.
403 with content "third-party-auth" if the user
has successfully authenticated with a third party provider
but does not have a linked account.
HttpResponse: 302 if redirecting to another page.
Example Usage:
POST /user_api/v1/login_session
with POST params `email`, `password`, and `remember`.
200 OK
"""
# For the initial implementation, shim the existing login view
# from the student Django app.
from openedx.core.djangoapps.user_authn.views.login import login_user
return shim_student_view(login_user, check_logged_in=True)(request)
@method_decorator(sensitive_post_parameters("password"))
def dispatch(self, request, *args, **kwargs):
return super(LoginSessionView, self).dispatch(request, *args, **kwargs)
class PasswordResetView(APIView):
"""HTTP end-point for GETting a description of the password reset form. """

View File

@@ -11,18 +11,24 @@ from __future__ import absolute_import
from django.conf import settings
from django.conf.urls import url
from .views.register import RegistrationView
from .views import auto_auth, login, logout
from .views import auto_auth, login, logout, register
urlpatterns = [
url(r'^create_account$', RegistrationView.as_view(), name='create_account'),
# Registration
url(r'^create_account$', register.RegistrationView.as_view(), name='create_account'),
url(r'^user_api/v1/account/registration/$', register.RegistrationView.as_view(),
name="user_api_registration"),
# Login
url(r'^login_post$', login.login_user, name='login_post'),
url(r'^login_ajax$', login.login_user, name="login"),
url(r'^login_ajax/(?P<error>[^/]*)$', login.login_user),
url(r'^user_api/v1/account/login_session/$', login.LoginSessionView.as_view(),
name="user_api_login_session"),
# Login Refresh of JWT Cookies
url(r'^login_refresh$', login.login_refresh, name="login_refresh"),
url(r'^user_api/v1/account/registration/$', RegistrationView.as_view(),
name="user_api_registration"),
url(r'^logout$', logout.LogoutView.as_view(), name='logout'),
]

View File

@@ -6,6 +6,8 @@ Much of this file was broken out from views.py, previous history can be found th
from __future__ import absolute_import
from functools import wraps
import json
import logging
import six
@@ -14,24 +16,29 @@ from django.contrib.auth import authenticate
from django.contrib.auth import login as django_login
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import User
from django.http import HttpResponse
from django.http import HttpRequest, HttpResponse
from django.utils.decorators import method_decorator
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie
from django.views.decorators.csrf import csrf_exempt, csrf_protect, ensure_csrf_cookie
from django.views.decorators.debug import sensitive_post_parameters
from django.views.decorators.http import require_http_methods
from ratelimitbackend.exceptions import RateLimitException
from rest_framework.views import APIView
import third_party_auth
from edxmako.shortcuts import render_to_response
from openedx.core.djangoapps.password_policy import compliance as password_policy_compliance
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
from openedx.core.djangoapps.user_api.api import get_login_session_form
from openedx.core.djangoapps.user_authn.cookies import refresh_jwt_cookies, set_logged_in_cookies
from openedx.core.djangoapps.user_authn.exceptions import AuthFailedError
from openedx.core.djangoapps.util.user_messages import PageLevelMessages
from openedx.core.djangolib.markup import HTML, Text
from openedx.core.lib.api.view_utils import require_post_params
from student.forms import send_password_reset_email_for_user
from student.models import LoginFailures
from student.views import send_reactivation_email_for_user
from third_party_auth import pipeline, provider
import third_party_auth
from track import segment
from util.json_request import JsonResponse
from util.password_policy_validators import normalize_password
@@ -374,3 +381,196 @@ def login_refresh(request):
except AuthFailedError as error:
log.exception(error.get_response())
return JsonResponse(error.get_response(), status=400)
class LoginSessionView(APIView):
"""HTTP end-points for logging in users. """
# This end-point is available to anonymous users,
# so do not require authentication.
authentication_classes = []
@method_decorator(ensure_csrf_cookie)
def get(self, request):
return HttpResponse(get_login_session_form(request).to_json(), content_type="application/json")
@method_decorator(require_post_params(["email", "password"]))
@method_decorator(csrf_protect)
def post(self, request):
"""Log in a user.
You must send all required form fields with the request.
You can optionally send an `analytics` param with a JSON-encoded
object with additional info to include in the login analytics event.
Currently, the only supported field is "enroll_course_id" to indicate
that the user logged in while enrolling in a particular course.
Arguments:
request (HttpRequest)
Returns:
HttpResponse: 200 on success
HttpResponse: 400 if the request is not valid.
HttpResponse: 403 if authentication failed.
403 with content "third-party-auth" if the user
has successfully authenticated with a third party provider
but does not have a linked account.
HttpResponse: 302 if redirecting to another page.
Example Usage:
POST /user_api/v1/login_session
with POST params `email`, `password`, and `remember`.
200 OK
"""
return shim_student_view(login_user, check_logged_in=True)(request)
@method_decorator(sensitive_post_parameters("password"))
def dispatch(self, request, *args, **kwargs):
return super(LoginSessionView, self).dispatch(request, *args, **kwargs)
def shim_student_view(view_func, check_logged_in=False):
"""Create a "shim" view for a view function from the student Django app.
Specifically, we need to:
* Strip out enrollment params, since the client for the new registration/login
page will communicate with the enrollment API to update enrollments.
* Return responses with HTTP status codes indicating success/failure
(instead of always using status 200, but setting "success" to False in
the JSON-serialized content of the response)
* Use status code 403 to indicate a login failure.
The shim will preserve any cookies set by the view.
Arguments:
view_func (function): The view function from the student Django app.
Keyword Args:
check_logged_in (boolean): If true, check whether the user successfully
authenticated and if not set the status to 403.
Returns:
function
"""
@wraps(view_func)
def _inner(request): # pylint: disable=missing-docstring
# Make a copy of the current POST request to modify.
modified_request = request.POST.copy()
if isinstance(request, HttpRequest):
# Works for an HttpRequest but not a rest_framework.request.Request.
request.POST = modified_request
else:
# The request must be a rest_framework.request.Request.
request._data = modified_request # pylint: disable=protected-access
# The login and registration handlers in student view try to change
# the user's enrollment status if these parameters are present.
# Since we want the JavaScript client to communicate directly with
# the enrollment API, we want to prevent the student views from
# updating enrollments.
if "enrollment_action" in modified_request:
del modified_request["enrollment_action"]
if "course_id" in modified_request:
del modified_request["course_id"]
# Include the course ID if it's specified in the analytics info
# so it can be included in analytics events.
if "analytics" in modified_request:
try:
analytics = json.loads(modified_request["analytics"])
if "enroll_course_id" in analytics:
modified_request["course_id"] = analytics.get("enroll_course_id")
except (ValueError, TypeError):
log.error(
u"Could not parse analytics object sent to user API: {analytics}".format(
analytics=analytics
)
)
# Call the original view to generate a response.
# We can safely modify the status code or content
# of the response, but to be safe we won't mess
# with the headers.
response = view_func(request)
# Most responses from this view are JSON-encoded
# dictionaries with keys "success", "value", and
# (sometimes) "redirect_url".
#
# We want to communicate some of this information
# using HTTP status codes instead.
#
# We ignore the "redirect_url" parameter, because we don't need it:
# 1) It's used to redirect on change enrollment, which
# our client will handle directly
# (that's why we strip out the enrollment params from the request)
# 2) It's used by third party auth when a user has already successfully
# authenticated and we're not sending login credentials. However,
# this case is never encountered in practice: on the old login page,
# the login form would be submitted directly, so third party auth
# would always be "trumped" by first party auth. If a user has
# successfully authenticated with us, we redirect them to the dashboard
# regardless of how they authenticated; and if a user is completing
# the third party auth pipeline, we redirect them from the pipeline
# completion end-point directly.
try:
response_dict = json.loads(response.content.decode('utf-8'))
msg = response_dict.get("value", u"")
success = response_dict.get("success")
except (ValueError, TypeError):
msg = response.content
success = True
# If the user is not authenticated when we expect them to be
# send the appropriate status code.
# We check whether the user attribute is set to make
# it easier to test this without necessarily running
# the request through authentication middleware.
is_authenticated = (
getattr(request, 'user', None) is not None
and request.user.is_authenticated
)
if check_logged_in and not is_authenticated:
# If we get a 403 status code from the student view
# this means we've successfully authenticated with a
# third party provider, but we don't have a linked
# EdX account. Send a helpful error code so the client
# knows this occurred.
if response.status_code == 403:
response.content = "third-party-auth"
# Otherwise, it's a general authentication failure.
# Ensure that the status code is a 403 and pass
# along the message from the view.
else:
response.status_code = 403
response.content = msg
# If an error condition occurs, send a status 400
elif response.status_code != 200 or not success:
# The student views tend to send status 200 even when an error occurs
# If the JSON-serialized content has a value "success" set to False,
# then we know an error occurred.
if response.status_code == 200:
response.status_code = 400
response.content = msg
# If the response is successful, then return the content
# of the response directly rather than including it
# in a JSON-serialized dictionary.
else:
response.content = msg
# Return the response, preserving the original headers.
# This is really important, since the student views set cookies
# that are used elsewhere in the system (such as the marketing site).
return response
return _inner

View File

@@ -4,6 +4,7 @@ Tests for student activation and login
"""
from __future__ import absolute_import
import datetime
import json
import unicodedata
@@ -13,7 +14,8 @@ from django.conf import settings
from django.contrib.auth.models import User
from django.core import mail
from django.core.cache import cache
from django.http import HttpResponse, HttpResponseBadRequest
from django.http import HttpRequest, HttpResponse, HttpResponseBadRequest
from django.test import TestCase
from django.test.client import Client
from django.test.utils import override_settings
from django.urls import NoReverseMatch, reverse
@@ -25,10 +27,14 @@ from openedx.core.djangoapps.password_policy.compliance import (
NonCompliantPasswordWarning
)
from openedx.core.djangoapps.user_api.config.waffle import PREVENT_AUTH_USER_WRITES, waffle
from openedx.core.djangoapps.user_api.accounts import EMAIL_MIN_LENGTH, EMAIL_MAX_LENGTH
from openedx.core.djangoapps.user_authn.cookies import jwt_cookies
from openedx.core.djangoapps.user_authn.views.login import shim_student_view
from openedx.core.djangoapps.user_authn.tests.utils import setup_login_oauth_client
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms
from openedx.core.lib.api.test_utils import ApiTestCase
from student.tests.factories import RegistrationFactory, UserFactory, UserProfileFactory
from util.password_policy_validators import DEFAULT_MAX_PASSWORD_LENGTH
@ddt.ddt
@@ -566,3 +572,239 @@ class LoginTest(CacheIsolationTestCase):
format_string = args[0]
for log_string in log_strings:
self.assertNotIn(log_string, format_string)
@ddt.ddt
@skip_unless_lms
class LoginSessionViewTest(ApiTestCase):
"""Tests for the login end-points of the user API. """
USERNAME = "bob"
EMAIL = "bob@example.com"
PASSWORD = "password"
def setUp(self):
super(LoginSessionViewTest, self).setUp()
self.url = reverse("user_api_login_session")
@ddt.data("get", "post")
def test_auth_disabled(self, method):
self.assertAuthDisabled(method, self.url)
def test_allowed_methods(self):
self.assertAllowedMethods(self.url, ["GET", "POST", "HEAD", "OPTIONS"])
def test_put_not_allowed(self):
response = self.client.put(self.url)
self.assertHttpMethodNotAllowed(response)
def test_delete_not_allowed(self):
response = self.client.delete(self.url)
self.assertHttpMethodNotAllowed(response)
def test_patch_not_allowed(self):
response = self.client.patch(self.url)
self.assertHttpMethodNotAllowed(response)
def test_login_form(self):
# Retrieve the login form
response = self.client.get(self.url, content_type="application/json")
self.assertHttpOK(response)
# Verify that the form description matches what we expect
form_desc = json.loads(response.content.decode('utf-8'))
self.assertEqual(form_desc["method"], "post")
self.assertEqual(form_desc["submit_url"], self.url)
self.assertEqual(form_desc["fields"], [
{
"name": "email",
"defaultValue": "",
"type": "email",
"required": True,
"label": "Email",
"placeholder": "username@domain.com",
"instructions": u"The email address you used to register with {platform_name}".format(
platform_name=settings.PLATFORM_NAME
),
"restrictions": {
"min_length": EMAIL_MIN_LENGTH,
"max_length": EMAIL_MAX_LENGTH
},
"errorMessages": {},
"supplementalText": "",
"supplementalLink": "",
},
{
"name": "password",
"defaultValue": "",
"type": "password",
"required": True,
"label": "Password",
"placeholder": "",
"instructions": "",
"restrictions": {
"max_length": DEFAULT_MAX_PASSWORD_LENGTH,
},
"errorMessages": {},
"supplementalText": "",
"supplementalLink": "",
},
])
def test_login(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Login
response = self.client.post(self.url, {
"email": self.EMAIL,
"password": self.PASSWORD,
})
self.assertHttpOK(response)
# Verify that we logged in successfully by accessing
# a page that requires authentication.
response = self.client.get(reverse("dashboard"))
self.assertHttpOK(response)
def test_session_cookie_expiry(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Login and remember me
data = {
"email": self.EMAIL,
"password": self.PASSWORD,
}
response = self.client.post(self.url, data)
self.assertHttpOK(response)
# Verify that the session expiration was set correctly
cookie = self.client.cookies[settings.SESSION_COOKIE_NAME]
expected_expiry = datetime.datetime.utcnow() + datetime.timedelta(weeks=4)
self.assertIn(expected_expiry.strftime('%d-%b-%Y'), cookie.get('expires'))
def test_invalid_credentials(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Invalid password
response = self.client.post(self.url, {
"email": self.EMAIL,
"password": "invalid"
})
self.assertHttpForbidden(response)
# Invalid email address
response = self.client.post(self.url, {
"email": "invalid@example.com",
"password": self.PASSWORD,
})
self.assertHttpForbidden(response)
def test_missing_login_params(self):
# Create a test user
UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
# Missing password
response = self.client.post(self.url, {
"email": self.EMAIL,
})
self.assertHttpBadRequest(response)
# Missing email
response = self.client.post(self.url, {
"password": self.PASSWORD,
})
self.assertHttpBadRequest(response)
# Missing both email and password
response = self.client.post(self.url, {})
@ddt.ddt
class StudentViewShimTest(TestCase):
"Tests of the student view shim."
def setUp(self):
super(StudentViewShimTest, self).setUp()
self.captured_request = None
def test_strip_enrollment_action(self):
view = self._shimmed_view(HttpResponse())
request = HttpRequest()
request.POST["enrollment_action"] = "enroll"
request.POST["course_id"] = "edx/101/demo"
view(request)
# Expect that the enrollment action and course ID
# were stripped out before reaching the wrapped view.
self.assertNotIn("enrollment_action", self.captured_request.POST)
self.assertNotIn("course_id", self.captured_request.POST)
def test_include_analytics_info(self):
view = self._shimmed_view(HttpResponse())
request = HttpRequest()
request.POST["analytics"] = json.dumps({
"enroll_course_id": "edX/DemoX/Fall"
})
view(request)
# Expect that the analytics course ID was passed to the view
self.assertEqual(self.captured_request.POST.get("course_id"), "edX/DemoX/Fall")
def test_third_party_auth_login_failure(self):
view = self._shimmed_view(
HttpResponse(status=403),
check_logged_in=True
)
response = view(HttpRequest())
self.assertEqual(response.status_code, 403)
self.assertEqual(response.content, b"third-party-auth")
def test_non_json_response(self):
view = self._shimmed_view(HttpResponse(content="Not a JSON dict"))
response = view(HttpRequest())
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b"Not a JSON dict")
@ddt.data("redirect", "redirect_url")
def test_ignore_redirect_from_json(self, redirect_key):
view = self._shimmed_view(
HttpResponse(content=json.dumps({
"success": True,
redirect_key: "/redirect"
}))
)
response = view(HttpRequest())
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content.decode('utf-8'), "")
def test_error_from_json(self):
view = self._shimmed_view(
HttpResponse(content=json.dumps({
"success": False,
"value": "Error!"
}))
)
response = view(HttpRequest())
self.assertEqual(response.status_code, 400)
self.assertEqual(response.content, b"Error!")
def test_preserve_headers(self):
view_response = HttpResponse()
view_response["test-header"] = "test"
view = self._shimmed_view(view_response)
response = view(HttpRequest())
self.assertEqual(response["test-header"], "test")
def test_check_logged_in(self):
view = self._shimmed_view(HttpResponse(), check_logged_in=True)
response = view(HttpRequest())
self.assertEqual(response.status_code, 403)
def _shimmed_view(self, response, check_logged_in=False):
def stub_view(request):
self.captured_request = request
return response
return shim_student_view(stub_view, check_logged_in=check_logged_in)

View File

@@ -6,7 +6,7 @@ from collections import Sequence
from functools import wraps
from django.core.exceptions import NON_FIELD_ERRORS, ObjectDoesNotExist, ValidationError
from django.http import Http404
from django.http import Http404, HttpResponseBadRequest
from django.utils.translation import ugettext as _
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser
@@ -345,6 +345,34 @@ class PaginatedAPIView(APIView):
return self.paginator.get_paginated_response(data, *args, **kwargs)
def require_post_params(required_params):
"""
View decorator that ensures the required POST params are
present. If not, returns an HTTP response with status 400.
Args:
required_params (list): The required parameter keys.
Returns:
HttpResponse
"""
def _decorator(func): # pylint: disable=missing-docstring
@wraps(func)
def _wrapped(*args, **_kwargs):
request = args[0]
missing_params = set(required_params) - set(request.POST.keys())
if missing_params:
msg = u"Missing POST parameters: {missing}".format(
missing=", ".join(missing_params)
)
return HttpResponseBadRequest(msg)
else:
return func(request)
return _wrapped
return _decorator
def get_course_key(request, course_id=None):
if not course_id:
return CourseKey.from_string(request.GET.get('course_id'))