From d360c29714444317e4f4db1b848ed1eb6fce5b8a Mon Sep 17 00:00:00 2001 From: jaebradley Date: Wed, 25 Apr 2018 14:40:49 -0400 Subject: [PATCH] EDUCATOR-2770: Update logout endpoint --- .../user_api/accounts/tests/test_views.py | 142 +++++++++--------- .../djangoapps/user_api/accounts/views.py | 92 +++++++++--- 2 files changed, 137 insertions(+), 97 deletions(-) diff --git a/openedx/core/djangoapps/user_api/accounts/tests/test_views.py b/openedx/core/djangoapps/user_api/accounts/tests/test_views.py index 2ffd87f840..ca85a68805 100644 --- a/openedx/core/djangoapps/user_api/accounts/tests/test_views.py +++ b/openedx/core/djangoapps/user_api/accounts/tests/test_views.py @@ -926,80 +926,6 @@ class TestAccountDeactivation(TestCase): ) -@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Account APIs are only supported in LMS') -class TestDeactivateLogout(TestCase): - """ - Tests the account deactivation/logout endpoint. - """ - def setUp(self): - super(TestDeactivateLogout, self).setUp() - self.test_user = UserFactory() - self.test_superuser = SuperuserFactory() - self.test_service_user = UserFactory() - - UserSocialAuth.objects.create( - user=self.test_user, - provider='some_provider_name', - uid='xyz@gmail.com' - ) - UserSocialAuth.objects.create( - user=self.test_user, - provider='some_other_provider_name', - uid='xyz@gmail.com' - ) - - self.url = reverse('deactivate_logout') - - def build_jwt_headers(self, user): - """ - Helper function for creating headers for the JWT authentication. - """ - token = JwtBuilder(user).build_token([]) - headers = { - 'HTTP_AUTHORIZATION': 'JWT ' + token - } - return headers - - def build_post(self, username): - return {'username': username} - - def test_superuser_deactivates_user(self): - """ - Verify a superuser calling the deactivation endpoint logs out a user and deletes all their SSO tokens. - """ - headers = self.build_jwt_headers(self.test_superuser) - response = self.client.post(self.url, self.build_post(self.test_user.username), **headers) - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) - updated_user = User.objects.get(id=self.test_user.id) - self.assertEqual(get_retired_email_by_email(self.test_user.email), updated_user.email) - self.assertFalse(updated_user.has_usable_password()) - self.assertEqual(list(UserSocialAuth.objects.filter(user=self.test_user)), []) - - def test_unauthorized_rejection(self): - """ - Verify unauthorized users cannot deactivate other users. - """ - headers = self.build_jwt_headers(self.test_user) - response = self.client.post(self.url, self.build_post(self.test_user.username), **headers) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - - def test_nonexistent_user(self): - """ - Verify that trying to deactivate a nonexistent user returns a 404. - """ - headers = self.build_jwt_headers(self.test_superuser) - response = self.client.post(self.url, self.build_post("made_up_username"), **headers) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - - def test_user_not_specified(self): - """ - Verify that not specifying a user to the deactivation endpoint results in a 404. - """ - headers = self.build_jwt_headers(self.test_superuser) - response = self.client.post(self.url, self.build_post(""), **headers) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - - class RetirementTestCase(TestCase): """ Test case with a helper methods for retirement @@ -1129,6 +1055,74 @@ class RetirementTestCase(TestCase): return [state for state in RetirementState.objects.filter(is_dead_end_state=True)] +@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Account APIs are only supported in LMS') +class TestDeactivateLogout(RetirementTestCase): + """ + Tests the account deactivation/logout endpoint. + """ + def setUp(self): + super(TestDeactivateLogout, self).setUp() + self.test_password = 'password' + self.test_user = UserFactory(password=self.test_password) + UserSocialAuth.objects.create( + user=self.test_user, + provider='some_provider_name', + uid='xyz@gmail.com' + ) + UserSocialAuth.objects.create( + user=self.test_user, + provider='some_other_provider_name', + uid='xyz@gmail.com' + ) + + self.url = reverse('deactivate_logout') + + def build_post(self, password): + return {'password': password} + + def test_user_can_deactivate_self(self): + """ + Verify a user calling the deactivation endpoint logs out the user, deletes all their SSO tokens, + and creates a user retirement row. + """ + self.client.login(username=self.test_user.username, password=self.test_password) + headers = self.build_jwt_headers(self.test_user) + response = self.client.post(self.url, self.build_post(self.test_password), **headers) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + # make sure the user model is as expected + updated_user = User.objects.get(id=self.test_user.id) + self.assertEqual(get_retired_email_by_email(self.test_user.email), updated_user.email) + self.assertFalse(updated_user.has_usable_password()) + self.assertEqual(list(UserSocialAuth.objects.filter(user=self.test_user)), []) + self.assertEqual(len(UserRetirementStatus.objects.filter(user_id=self.test_user.id)), 1) + # make sure the user cannot log in + self.assertFalse(self.client.login(username=self.test_user.username, password=self.test_password)) + + def test_password_mismatch(self): + """ + Verify that the user submitting a mismatched password results in + a rejection. + """ + self.client.login(username=self.test_user.username, password=self.test_password) + headers = self.build_jwt_headers(self.test_user) + response = self.client.post(self.url, self.build_post(self.test_password + "xxxx"), **headers) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_called_twice(self): + """ + Verify a user calling the deactivation endpoint a second time results in a "forbidden" + error, as the user will be logged out. + """ + self.client.login(username=self.test_user.username, password=self.test_password) + headers = self.build_jwt_headers(self.test_user) + response = self.client.post(self.url, self.build_post(self.test_password), **headers) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + self.client.login(username=self.test_user.username, password=self.test_password) + headers = self.build_jwt_headers(self.test_user) + response = self.client.post(self.url, self.build_post(self.test_password), **headers) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + @unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Account APIs are only supported in LMS') class TestAccountRetireMailings(RetirementTestCase): """ diff --git a/openedx/core/djangoapps/user_api/accounts/views.py b/openedx/core/djangoapps/user_api/accounts/views.py index 8906c35528..aa068ae446 100644 --- a/openedx/core/djangoapps/user_api/accounts/views.py +++ b/openedx/core/djangoapps/user_api/accounts/views.py @@ -6,8 +6,10 @@ https://openedx.atlassian.net/wiki/display/TNL/User+API """ import datetime -from django.contrib.auth import get_user_model +import pytz +from django.contrib.auth import get_user_model, authenticate, logout from django.db import transaction +from django.utils.translation import ugettext as _ from edx_rest_framework_extensions.authentication import JwtAuthentication from rest_framework import permissions from rest_framework import status @@ -16,7 +18,13 @@ from rest_framework.views import APIView from rest_framework.viewsets import ViewSet from six import text_type from social_django.models import UserSocialAuth -import pytz +from student.models import ( + User, + get_retired_email_by_email, + get_potentially_retired_user_by_username_and_hash, + get_potentially_retired_user_by_username +) +from student.views.login import AuthFailedError, LoginFailures from openedx.core.djangoapps.user_api.preferences.api import update_email_opt_in from openedx.core.lib.api.authentication import ( @@ -24,13 +32,6 @@ from openedx.core.lib.api.authentication import ( OAuth2AuthenticationAllowInactiveUser, ) from openedx.core.lib.api.parsers import MergePatchParser -from student.models import ( - User, - get_retired_email_by_email, - get_potentially_retired_user_by_username_and_hash, - get_potentially_retired_user_by_username -) - from .api import get_account_settings, update_account_settings from .permissions import CanDeactivateUser, CanRetireUser from .serializers import UserRetirementStatusSerializer @@ -305,14 +306,14 @@ class DeactivateLogoutView(APIView): """ POST /api/user/v1/accounts/deactivate_logout/ { - "username": "example_username", + "password": "example_password", } **POST Parameters** A POST request must include the following parameter. - * username: Required. The username of the user being deactivated. + * password: Required. The current password of the user being deactivated. **POST Response Values** @@ -329,13 +330,13 @@ class DeactivateLogoutView(APIView): If an unanticipated error occurs, the request returns an HTTP 500 "Internal Server Error" response. - Allows an administrative user to take the following actions - on behalf of an LMS user: + Allows an LMS user to take the following actions: - Change the user's password permanently to Django's unusable password - Log the user out + - Create a row in the retirement table for that user """ authentication_classes = (JwtAuthentication, ) - permission_classes = (permissions.IsAuthenticated, CanRetireUser) + permission_classes = (permissions.IsAuthenticated, ) def post(self, request): """ @@ -344,29 +345,74 @@ class DeactivateLogoutView(APIView): Marks the user as having no password set for deactivation purposes, and logs the user out. """ - username = None user_model = get_user_model() try: # Get the username from the request and check that it exists - username = request.data['username'] - user = user_model.objects.get(username=username) - + verify_user_password_response = self._verify_user_password(request) + if verify_user_password_response.status_code != status.HTTP_204_NO_CONTENT: + return verify_user_password_response with transaction.atomic(): # 1. Unlink LMS social auth accounts - UserSocialAuth.objects.filter(user_id=user.id).delete() + UserSocialAuth.objects.filter(user_id=request.user.id).delete() # 2. Change LMS password & email - user.email = get_retired_email_by_email(user.email) - user.save() - _set_unusable_password(user) + request.user.email = get_retired_email_by_email(request.user.email) + request.user.save() + _set_unusable_password(request.user) # 3. Unlink social accounts & change password on each IDA, still to be implemented + # 4. Add user to retirement queue + UserRetirementStatus.create_retirement(request.user) + # 5. Log the user out + logout(request) return Response(status=status.HTTP_204_NO_CONTENT) except KeyError: return Response(u'Username not specified.', status=status.HTTP_404_NOT_FOUND) except user_model.DoesNotExist: - return Response(u'The user "{}" does not exist.'.format(username), status=status.HTTP_404_NOT_FOUND) + return Response( + u'The user "{}" does not exist.'.format(request.user.username), status=status.HTTP_404_NOT_FOUND + ) except Exception as exc: # pylint: disable=broad-except return Response(text_type(exc), status=status.HTTP_500_INTERNAL_SERVER_ERROR) + def _verify_user_password(self, request): + """ + If the user is logged in and we want to verify that they have submitted the correct password + for a major account change (for example, retiring this user's account). + + Args: + request (HttpRequest): A request object where the password should be included in the POST fields. + """ + try: + self._check_excessive_login_attempts(request.user) + user = authenticate(username=request.user.username, password=request.POST['password'], request=request) + if user: + if LoginFailures.is_feature_enabled(): + LoginFailures.clear_lockout_counter(user) + return Response(status=status.HTTP_204_NO_CONTENT) + else: + self._handle_failed_authentication(request.user) + except AuthFailedError as err: + return Response(text_type(err), status=status.HTTP_403_FORBIDDEN) + except Exception as err: # pylint: disable=broad-except + return Response(u"Could not verify user password", status=status.HTTP_400_BAD_REQUEST) + + def _check_excessive_login_attempts(self, user): + """ + See if account has been locked out due to excessive login failures + """ + if user and LoginFailures.is_feature_enabled(): + if LoginFailures.is_user_locked_out(user): + raise AuthFailedError(_('This account has been temporarily locked due ' + 'to excessive login failures. Try again later.')) + + def _handle_failed_authentication(self, user): + """ + Handles updating the failed login count, inactive user notifications, and logging failed authentications. + """ + if user and LoginFailures.is_feature_enabled(): + LoginFailures.increment_lockout_counter(user) + + raise AuthFailedError(_('Email or password is incorrect.')) + def _set_unusable_password(user): """