EDUCATOR-2770: Update logout endpoint
This commit is contained in:
committed by
Sanford Student
parent
b9092c80e1
commit
d360c29714
@@ -926,80 +926,6 @@ class TestAccountDeactivation(TestCase):
|
||||
)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Account APIs are only supported in LMS')
|
||||
class TestDeactivateLogout(TestCase):
|
||||
"""
|
||||
Tests the account deactivation/logout endpoint.
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestDeactivateLogout, self).setUp()
|
||||
self.test_user = UserFactory()
|
||||
self.test_superuser = SuperuserFactory()
|
||||
self.test_service_user = UserFactory()
|
||||
|
||||
UserSocialAuth.objects.create(
|
||||
user=self.test_user,
|
||||
provider='some_provider_name',
|
||||
uid='xyz@gmail.com'
|
||||
)
|
||||
UserSocialAuth.objects.create(
|
||||
user=self.test_user,
|
||||
provider='some_other_provider_name',
|
||||
uid='xyz@gmail.com'
|
||||
)
|
||||
|
||||
self.url = reverse('deactivate_logout')
|
||||
|
||||
def build_jwt_headers(self, user):
|
||||
"""
|
||||
Helper function for creating headers for the JWT authentication.
|
||||
"""
|
||||
token = JwtBuilder(user).build_token([])
|
||||
headers = {
|
||||
'HTTP_AUTHORIZATION': 'JWT ' + token
|
||||
}
|
||||
return headers
|
||||
|
||||
def build_post(self, username):
|
||||
return {'username': username}
|
||||
|
||||
def test_superuser_deactivates_user(self):
|
||||
"""
|
||||
Verify a superuser calling the deactivation endpoint logs out a user and deletes all their SSO tokens.
|
||||
"""
|
||||
headers = self.build_jwt_headers(self.test_superuser)
|
||||
response = self.client.post(self.url, self.build_post(self.test_user.username), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
||||
updated_user = User.objects.get(id=self.test_user.id)
|
||||
self.assertEqual(get_retired_email_by_email(self.test_user.email), updated_user.email)
|
||||
self.assertFalse(updated_user.has_usable_password())
|
||||
self.assertEqual(list(UserSocialAuth.objects.filter(user=self.test_user)), [])
|
||||
|
||||
def test_unauthorized_rejection(self):
|
||||
"""
|
||||
Verify unauthorized users cannot deactivate other users.
|
||||
"""
|
||||
headers = self.build_jwt_headers(self.test_user)
|
||||
response = self.client.post(self.url, self.build_post(self.test_user.username), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_nonexistent_user(self):
|
||||
"""
|
||||
Verify that trying to deactivate a nonexistent user returns a 404.
|
||||
"""
|
||||
headers = self.build_jwt_headers(self.test_superuser)
|
||||
response = self.client.post(self.url, self.build_post("made_up_username"), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
def test_user_not_specified(self):
|
||||
"""
|
||||
Verify that not specifying a user to the deactivation endpoint results in a 404.
|
||||
"""
|
||||
headers = self.build_jwt_headers(self.test_superuser)
|
||||
response = self.client.post(self.url, self.build_post(""), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
|
||||
class RetirementTestCase(TestCase):
|
||||
"""
|
||||
Test case with a helper methods for retirement
|
||||
@@ -1129,6 +1055,74 @@ class RetirementTestCase(TestCase):
|
||||
return [state for state in RetirementState.objects.filter(is_dead_end_state=True)]
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Account APIs are only supported in LMS')
|
||||
class TestDeactivateLogout(RetirementTestCase):
|
||||
"""
|
||||
Tests the account deactivation/logout endpoint.
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestDeactivateLogout, self).setUp()
|
||||
self.test_password = 'password'
|
||||
self.test_user = UserFactory(password=self.test_password)
|
||||
UserSocialAuth.objects.create(
|
||||
user=self.test_user,
|
||||
provider='some_provider_name',
|
||||
uid='xyz@gmail.com'
|
||||
)
|
||||
UserSocialAuth.objects.create(
|
||||
user=self.test_user,
|
||||
provider='some_other_provider_name',
|
||||
uid='xyz@gmail.com'
|
||||
)
|
||||
|
||||
self.url = reverse('deactivate_logout')
|
||||
|
||||
def build_post(self, password):
|
||||
return {'password': password}
|
||||
|
||||
def test_user_can_deactivate_self(self):
|
||||
"""
|
||||
Verify a user calling the deactivation endpoint logs out the user, deletes all their SSO tokens,
|
||||
and creates a user retirement row.
|
||||
"""
|
||||
self.client.login(username=self.test_user.username, password=self.test_password)
|
||||
headers = self.build_jwt_headers(self.test_user)
|
||||
response = self.client.post(self.url, self.build_post(self.test_password), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
||||
# make sure the user model is as expected
|
||||
updated_user = User.objects.get(id=self.test_user.id)
|
||||
self.assertEqual(get_retired_email_by_email(self.test_user.email), updated_user.email)
|
||||
self.assertFalse(updated_user.has_usable_password())
|
||||
self.assertEqual(list(UserSocialAuth.objects.filter(user=self.test_user)), [])
|
||||
self.assertEqual(len(UserRetirementStatus.objects.filter(user_id=self.test_user.id)), 1)
|
||||
# make sure the user cannot log in
|
||||
self.assertFalse(self.client.login(username=self.test_user.username, password=self.test_password))
|
||||
|
||||
def test_password_mismatch(self):
|
||||
"""
|
||||
Verify that the user submitting a mismatched password results in
|
||||
a rejection.
|
||||
"""
|
||||
self.client.login(username=self.test_user.username, password=self.test_password)
|
||||
headers = self.build_jwt_headers(self.test_user)
|
||||
response = self.client.post(self.url, self.build_post(self.test_password + "xxxx"), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_called_twice(self):
|
||||
"""
|
||||
Verify a user calling the deactivation endpoint a second time results in a "forbidden"
|
||||
error, as the user will be logged out.
|
||||
"""
|
||||
self.client.login(username=self.test_user.username, password=self.test_password)
|
||||
headers = self.build_jwt_headers(self.test_user)
|
||||
response = self.client.post(self.url, self.build_post(self.test_password), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
||||
self.client.login(username=self.test_user.username, password=self.test_password)
|
||||
headers = self.build_jwt_headers(self.test_user)
|
||||
response = self.client.post(self.url, self.build_post(self.test_password), **headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Account APIs are only supported in LMS')
|
||||
class TestAccountRetireMailings(RetirementTestCase):
|
||||
"""
|
||||
|
||||
@@ -6,8 +6,10 @@ https://openedx.atlassian.net/wiki/display/TNL/User+API
|
||||
"""
|
||||
import datetime
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
import pytz
|
||||
from django.contrib.auth import get_user_model, authenticate, logout
|
||||
from django.db import transaction
|
||||
from django.utils.translation import ugettext as _
|
||||
from edx_rest_framework_extensions.authentication import JwtAuthentication
|
||||
from rest_framework import permissions
|
||||
from rest_framework import status
|
||||
@@ -16,7 +18,13 @@ from rest_framework.views import APIView
|
||||
from rest_framework.viewsets import ViewSet
|
||||
from six import text_type
|
||||
from social_django.models import UserSocialAuth
|
||||
import pytz
|
||||
from student.models import (
|
||||
User,
|
||||
get_retired_email_by_email,
|
||||
get_potentially_retired_user_by_username_and_hash,
|
||||
get_potentially_retired_user_by_username
|
||||
)
|
||||
from student.views.login import AuthFailedError, LoginFailures
|
||||
|
||||
from openedx.core.djangoapps.user_api.preferences.api import update_email_opt_in
|
||||
from openedx.core.lib.api.authentication import (
|
||||
@@ -24,13 +32,6 @@ from openedx.core.lib.api.authentication import (
|
||||
OAuth2AuthenticationAllowInactiveUser,
|
||||
)
|
||||
from openedx.core.lib.api.parsers import MergePatchParser
|
||||
from student.models import (
|
||||
User,
|
||||
get_retired_email_by_email,
|
||||
get_potentially_retired_user_by_username_and_hash,
|
||||
get_potentially_retired_user_by_username
|
||||
)
|
||||
|
||||
from .api import get_account_settings, update_account_settings
|
||||
from .permissions import CanDeactivateUser, CanRetireUser
|
||||
from .serializers import UserRetirementStatusSerializer
|
||||
@@ -305,14 +306,14 @@ class DeactivateLogoutView(APIView):
|
||||
"""
|
||||
POST /api/user/v1/accounts/deactivate_logout/
|
||||
{
|
||||
"username": "example_username",
|
||||
"password": "example_password",
|
||||
}
|
||||
|
||||
**POST Parameters**
|
||||
|
||||
A POST request must include the following parameter.
|
||||
|
||||
* username: Required. The username of the user being deactivated.
|
||||
* password: Required. The current password of the user being deactivated.
|
||||
|
||||
**POST Response Values**
|
||||
|
||||
@@ -329,13 +330,13 @@ class DeactivateLogoutView(APIView):
|
||||
If an unanticipated error occurs, the request returns an
|
||||
HTTP 500 "Internal Server Error" response.
|
||||
|
||||
Allows an administrative user to take the following actions
|
||||
on behalf of an LMS user:
|
||||
Allows an LMS user to take the following actions:
|
||||
- Change the user's password permanently to Django's unusable password
|
||||
- Log the user out
|
||||
- Create a row in the retirement table for that user
|
||||
"""
|
||||
authentication_classes = (JwtAuthentication, )
|
||||
permission_classes = (permissions.IsAuthenticated, CanRetireUser)
|
||||
permission_classes = (permissions.IsAuthenticated, )
|
||||
|
||||
def post(self, request):
|
||||
"""
|
||||
@@ -344,29 +345,74 @@ class DeactivateLogoutView(APIView):
|
||||
Marks the user as having no password set for deactivation purposes,
|
||||
and logs the user out.
|
||||
"""
|
||||
username = None
|
||||
user_model = get_user_model()
|
||||
try:
|
||||
# Get the username from the request and check that it exists
|
||||
username = request.data['username']
|
||||
user = user_model.objects.get(username=username)
|
||||
|
||||
verify_user_password_response = self._verify_user_password(request)
|
||||
if verify_user_password_response.status_code != status.HTTP_204_NO_CONTENT:
|
||||
return verify_user_password_response
|
||||
with transaction.atomic():
|
||||
# 1. Unlink LMS social auth accounts
|
||||
UserSocialAuth.objects.filter(user_id=user.id).delete()
|
||||
UserSocialAuth.objects.filter(user_id=request.user.id).delete()
|
||||
# 2. Change LMS password & email
|
||||
user.email = get_retired_email_by_email(user.email)
|
||||
user.save()
|
||||
_set_unusable_password(user)
|
||||
request.user.email = get_retired_email_by_email(request.user.email)
|
||||
request.user.save()
|
||||
_set_unusable_password(request.user)
|
||||
# 3. Unlink social accounts & change password on each IDA, still to be implemented
|
||||
# 4. Add user to retirement queue
|
||||
UserRetirementStatus.create_retirement(request.user)
|
||||
# 5. Log the user out
|
||||
logout(request)
|
||||
return Response(status=status.HTTP_204_NO_CONTENT)
|
||||
except KeyError:
|
||||
return Response(u'Username not specified.', status=status.HTTP_404_NOT_FOUND)
|
||||
except user_model.DoesNotExist:
|
||||
return Response(u'The user "{}" does not exist.'.format(username), status=status.HTTP_404_NOT_FOUND)
|
||||
return Response(
|
||||
u'The user "{}" does not exist.'.format(request.user.username), status=status.HTTP_404_NOT_FOUND
|
||||
)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
return Response(text_type(exc), status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||||
|
||||
def _verify_user_password(self, request):
|
||||
"""
|
||||
If the user is logged in and we want to verify that they have submitted the correct password
|
||||
for a major account change (for example, retiring this user's account).
|
||||
|
||||
Args:
|
||||
request (HttpRequest): A request object where the password should be included in the POST fields.
|
||||
"""
|
||||
try:
|
||||
self._check_excessive_login_attempts(request.user)
|
||||
user = authenticate(username=request.user.username, password=request.POST['password'], request=request)
|
||||
if user:
|
||||
if LoginFailures.is_feature_enabled():
|
||||
LoginFailures.clear_lockout_counter(user)
|
||||
return Response(status=status.HTTP_204_NO_CONTENT)
|
||||
else:
|
||||
self._handle_failed_authentication(request.user)
|
||||
except AuthFailedError as err:
|
||||
return Response(text_type(err), status=status.HTTP_403_FORBIDDEN)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
return Response(u"Could not verify user password", status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
def _check_excessive_login_attempts(self, user):
|
||||
"""
|
||||
See if account has been locked out due to excessive login failures
|
||||
"""
|
||||
if user and LoginFailures.is_feature_enabled():
|
||||
if LoginFailures.is_user_locked_out(user):
|
||||
raise AuthFailedError(_('This account has been temporarily locked due '
|
||||
'to excessive login failures. Try again later.'))
|
||||
|
||||
def _handle_failed_authentication(self, user):
|
||||
"""
|
||||
Handles updating the failed login count, inactive user notifications, and logging failed authentications.
|
||||
"""
|
||||
if user and LoginFailures.is_feature_enabled():
|
||||
LoginFailures.increment_lockout_counter(user)
|
||||
|
||||
raise AuthFailedError(_('Email or password is incorrect.'))
|
||||
|
||||
|
||||
def _set_unusable_password(user):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user