Error incase of disabled user social login

Updated status code and manage user func

update auth verification

Fixed auth condition

fixed test failuers

fixed style issues

fixed style issues

Created test for auth disabled use

Code refactor

Fixed form ain auth exchange

Fixed oauth apps with disabled user fail

applied quality fixes

Refactored tests

fixed quality issues

removed extra files

Fixed linter issues

Fixed linter issues
This commit is contained in:
Ahtisham Shahid
2020-07-16 19:22:10 +05:00
parent 285ec771bf
commit b2466c8c2b
8 changed files with 87 additions and 8 deletions

View File

@@ -71,6 +71,7 @@ import six
import social_django
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.auth import logout
from django.core.mail.message import EmailMessage
from django.http import HttpResponseBadRequest
from django.shortcuts import redirect
@@ -674,10 +675,11 @@ def set_logged_in_cookies(backend=None, user=None, strategy=None, auth_entry=Non
"""
if not is_api(auth_entry) and user is not None and user.is_authenticated:
request = strategy.request if strategy else None
if not user.has_usable_password():
msg = "Your account is disabled"
logout(request)
return JsonResponse(msg, status=403)
request = strategy.request if strategy else None
# n.b. for new users, user.is_active may be False at this point; set the cookie anyways.
if request is not None:
# Check that the cookie isn't already set.

View File

@@ -18,6 +18,8 @@ from openedx.core.djangoapps.user_api.accounts.serializers import AccountUserSer
from openedx.core.djangoapps.user_authn.utils import generate_password
from common.djangoapps.util.json_request import JsonResponse
from openedx.core.djangolib.oauth2_retirement_utils import retire_dot_oauth2_models
class ManageUserSupportView(View):
"""
@@ -73,6 +75,7 @@ class ManageUserDetailView(GenericAPIView):
UserPasswordToggleHistory.objects.create(
user=user, comment=comment, created_by=request.user, disabled=True
)
retire_dot_oauth2_models(request.user)
else:
user.set_password(generate_password(length=25))
UserPasswordToggleHistory.objects.create(

View File

@@ -187,6 +187,17 @@ class AccessTokenExchangeForm(forms.Form):
user = backend.do_auth(access_token, allow_inactive_user=True)
except (HTTPError, AuthException):
pass
# check if user is disabled
if isinstance(user, User) and not user.has_usable_password():
self.request.social_strategy.clean_partial_pipeline(access_token)
raise OAuthValidationError(
{
"error": "account_disabled",
"error_description": 'user account is disabled',
"error_code": 403
}
)
if user and isinstance(user, User):
self.cleaned_data["user"] = user
else:

View File

@@ -42,13 +42,16 @@ class AccessTokenExchangeViewTest(AccessTokenExchangeTestMixin):
super(AccessTokenExchangeViewTest, self).tearDown()
Partial.objects.all().delete()
def _assert_error(self, data, expected_error, expected_error_description):
def _assert_error(self, data, expected_error, expected_error_description, error_code=None):
response = self.csrf_client.post(self.url, data)
self.assertEqual(response.status_code, 400)
self.assertEqual(response.status_code, error_code if error_code else 400)
self.assertEqual(response["Content-Type"], "application/json")
expected_data = {u"error": expected_error, u"error_description": expected_error_description}
if error_code:
expected_data['error_code'] = error_code
self.assertEqual(
json.loads(response.content.decode('utf-8')),
{u"error": expected_error, u"error_description": expected_error_description}
expected_data
)
def _assert_success(self, data, expected_scopes):
@@ -127,6 +130,15 @@ class AccessTokenExchangeViewTest(AccessTokenExchangeTestMixin):
"""
pass
def test_disabled_user(self):
"""
Test if response status code is correct in case of disabled user.
"""
self.user.set_unusable_password()
self.user.save()
self._setup_provider_response(success=True)
self._assert_error(self.data, "account_disabled", "user account is disabled", 403)
@unittest.skipUnless(TPA_FEATURE_ENABLED, TPA_FEATURES_KEY + " not enabled")
@httpretty.activate

View File

@@ -28,7 +28,7 @@ class AccessTokenExchangeTestMixin(ThirdPartyOAuthTestMixin):
"client_id": self.client_id,
}
def _assert_error(self, _data, _expected_error, _expected_error_description):
def _assert_error(self, _data, _expected_error, _expected_error_description, error_code):
"""
Given request data, execute a test and check that the expected error
was returned (along with any other appropriate assertions).

View File

@@ -98,7 +98,8 @@ class DOTAccessTokenExchangeView(AccessTokenExchangeBase, DOTAccessTokenView):
"""
Return an error response consisting of the errors in the form
"""
return Response(status=400, data=form_errors, **kwargs)
error_code = form_errors.get('error_code', 400)
return Response(status=error_code, data=form_errors, **kwargs)
class LoginWithAccessTokenView(APIView):

View File

@@ -14,6 +14,7 @@ OAUTH2_TOKEN_ERROR_MALFORMED = 'token_malformed'
OAUTH2_TOKEN_ERROR_NONEXISTENT = 'token_nonexistent'
OAUTH2_TOKEN_ERROR_NOT_PROVIDED = 'token_not_provided'
OAUTH2_USER_NOT_ACTIVE_ERROR = 'user_not_active'
OAUTH2_USER_DISABLED_ERROR = 'user_is_disabled'
logger = logging.getLogger(__name__)
@@ -91,6 +92,13 @@ class BearerAuthentication(BaseAuthentication):
})
else:
user = token.user
has_application = dot_models.Application.objects.filter(user_id=user.id)
if not user.has_usable_password() and not has_application:
msg = 'User disabled by admin: %s' % user.get_username()
raise AuthenticationFailed({
'error_code': OAUTH2_USER_DISABLED_ERROR,
'developer_message': msg})
# Check to make sure the users have activated their account (by confirming their email)
if not self.allow_inactive_users and not user.is_active:
set_custom_attribute("BearerAuthentication_user_active", False)

View File

@@ -3,7 +3,6 @@ Tests for OAuth2. This module is copied from django-rest-framework-oauth
(tests/test_authentication.py) and updated to use our subclass of BearerAuthentication.
"""
import itertools
import json
import unittest
@@ -60,7 +59,6 @@ urlpatterns = [
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
@override_settings(ROOT_URLCONF=__name__)
class OAuth2AllowInActiveUsersTests(TestCase):
OAUTH2_BASE_TESTING_URL = '/oauth2-inactive-test/'
def setUp(self):
@@ -230,3 +228,47 @@ class BearerAuthenticationTests(OAuth2AllowInActiveUsersTests): # pylint: disab
# Since this is testing back to previous version, user should be set to true
self.user.is_active = True
self.user.save()
class OAuthDenyDisabledUsers(OAuth2AllowInActiveUsersTests): # pylint: disable=test-inherits-tests
"""
To test OAuth on disabled user.
"""
OAUTH2_BASE_TESTING_URL = '/oauth2-test/'
def setUp(self):
super().setUp()
# User is active but has have disabled status
self.user.is_active = True
self.user.set_unusable_password()
self.user.save()
def test_get_form_passing_auth_with_dot(self):
"""
Asserts response with disabled user with DOT App
"""
response = self.get_with_bearer_token(self.OAUTH2_BASE_TESTING_URL, token=self.dot_access_token.token)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_post_form_passing_auth(self):
"""
Asserts response with disabled user with DOT App
"""
response = self.post_with_bearer_token(self.OAUTH2_BASE_TESTING_URL)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_get_form_without_oauth_app(self):
"""
Asserts response with disabled user and without DOT APP
"""
dot_models.Application.objects.filter(user_id=self.user.id).delete()
response = self.get_with_bearer_token(self.OAUTH2_BASE_TESTING_URL, token=self.dot_access_token.token)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_post_form_without_oauth_app(self):
"""
Asserts response with disabled user and without DOT APP
"""
dot_models.Application.objects.filter(user_id=self.user.id).delete()
response = self.post_with_bearer_token(self.OAUTH2_BASE_TESTING_URL)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)