Merge pull request #22468 from edx/arch/account-activation-cleanup
Account Activation cleanup
This commit is contained in:
@@ -5,6 +5,7 @@ import unittest
|
||||
from uuid import uuid4
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import TestCase, override_settings
|
||||
from django.urls import reverse
|
||||
from mock import patch
|
||||
@@ -103,6 +104,10 @@ class TestActivateAccount(TestCase):
|
||||
response = self.client.get(reverse('dashboard'))
|
||||
self.assertNotContains(response, expected_message)
|
||||
|
||||
def _assert_user_active_state(self, expected_active_state):
|
||||
user = User.objects.get(username=self.user.username)
|
||||
self.assertEqual(user.is_active, expected_active_state)
|
||||
|
||||
def test_account_activation_notification_on_logistration(self):
|
||||
"""
|
||||
Verify that logistration page displays success/error/info messages
|
||||
@@ -112,15 +117,19 @@ class TestActivateAccount(TestCase):
|
||||
login_url=reverse('signin_user'),
|
||||
redirect_url=reverse('dashboard'),
|
||||
)
|
||||
self._assert_user_active_state(expected_active_state=False)
|
||||
|
||||
# Access activation link, message should say that account has been activated.
|
||||
response = self.client.get(reverse('activate', args=[self.registration.activation_key]), follow=True)
|
||||
self.assertRedirects(response, login_page_url)
|
||||
self.assertContains(response, 'Success! You have activated your account.')
|
||||
self._assert_user_active_state(expected_active_state=True)
|
||||
|
||||
# Access activation link again, message should say that account is already active.
|
||||
response = self.client.get(reverse('activate', args=[self.registration.activation_key]), follow=True)
|
||||
self.assertRedirects(response, login_page_url)
|
||||
self.assertContains(response, 'This account has already been activated.')
|
||||
self._assert_user_active_state(expected_active_state=True)
|
||||
|
||||
# Open account activation page with an invalid activation link,
|
||||
# there should be an error message displayed.
|
||||
@@ -137,4 +146,4 @@ class TestActivateAccount(TestCase):
|
||||
response = self.client.get(reverse('activate', args=[self.registration.activation_key]), follow=True)
|
||||
self.assertRedirects(response, login_page_url)
|
||||
self.assertContains(response, SYSTEM_MAINTENANCE_MSG)
|
||||
assert not self.user.is_active
|
||||
self._assert_user_active_state(expected_active_state=False)
|
||||
|
||||
@@ -18,7 +18,6 @@ urlpatterns = [
|
||||
url(r'^accounts/disable_account_ajax$', views.disable_account_ajax, name="disable_account_ajax"),
|
||||
url(r'^accounts/manage_user_standing', views.manage_user_standing, name='manage_user_standing'),
|
||||
|
||||
url(r'^change_setting$', views.change_setting, name='change_setting'),
|
||||
url(r'^change_email_settings$', views.change_email_settings, name='change_email_settings'),
|
||||
|
||||
url(r'^course_run/{}/refund_status$'.format(settings.COURSE_ID_PATTERN),
|
||||
|
||||
@@ -468,24 +468,6 @@ def disable_account_ajax(request):
|
||||
return JsonResponse(context)
|
||||
|
||||
|
||||
@login_required
|
||||
@ensure_csrf_cookie
|
||||
def change_setting(request):
|
||||
"""
|
||||
JSON call to change a profile setting: Right now, location
|
||||
"""
|
||||
# TODO (vshnayder): location is no longer used
|
||||
u_prof = UserProfile.objects.get(user=request.user) # request.user.profile_cache
|
||||
if 'location' in request.POST:
|
||||
u_prof.location = request.POST['location']
|
||||
u_prof.save()
|
||||
|
||||
return JsonResponse({
|
||||
"success": True,
|
||||
"location": u_prof.location,
|
||||
})
|
||||
|
||||
|
||||
@receiver(post_save, sender=User)
|
||||
def user_signup_handler(sender, **kwargs): # pylint: disable=unused-argument
|
||||
"""
|
||||
|
||||
@@ -331,35 +331,6 @@ def _send_email_change_requests_if_needed(data, user):
|
||||
)
|
||||
|
||||
|
||||
@helpers.intercept_errors(errors.UserAPIInternalError, ignore_errors=[errors.UserAPIRequestError])
|
||||
def activate_account(activation_key):
|
||||
"""Activate a user's account.
|
||||
|
||||
Args:
|
||||
activation_key (unicode): The activation key the user received via email.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Raises:
|
||||
errors.UserNotAuthorized
|
||||
errors.UserAPIInternalError: the operation failed due to an unexpected error.
|
||||
|
||||
"""
|
||||
# TODO: Confirm this `activate_account` is only used for tests. If so, this should not be used for tests, and we
|
||||
# should instead use the `activate_account` used for /activate.
|
||||
set_custom_metric('user_api_activate_account', 'True')
|
||||
if waffle().is_enabled(PREVENT_AUTH_USER_WRITES):
|
||||
raise errors.UserAPIInternalError(SYSTEM_MAINTENANCE_MSG)
|
||||
try:
|
||||
registration = Registration.objects.get(activation_key=activation_key)
|
||||
except Registration.DoesNotExist:
|
||||
raise errors.UserNotAuthorized
|
||||
else:
|
||||
# This implicitly saves the registration
|
||||
registration.activate()
|
||||
|
||||
|
||||
def get_name_validation_error(name):
|
||||
"""Get the built-in validation error message for when
|
||||
the user's real name is invalid in some way (we wonder how).
|
||||
|
||||
@@ -28,7 +28,6 @@ from openedx.core.djangoapps.ace_common.tests.mixins import EmailTemplateTagMixi
|
||||
from openedx.core.djangoapps.site_configuration.tests.factories import SiteFactory
|
||||
from openedx.core.djangoapps.user_api.accounts import PRIVATE_VISIBILITY, USERNAME_MAX_LENGTH
|
||||
from openedx.core.djangoapps.user_api.accounts.api import (
|
||||
activate_account,
|
||||
get_account_settings,
|
||||
update_account_settings
|
||||
)
|
||||
@@ -530,64 +529,3 @@ class AccountSettingsOnCreationTest(CreateAccountMixin, TestCase):
|
||||
|
||||
expected_user_password = make_password(unicodedata.normalize('NFKC', u'Ṗŕệṿïệẅ Ṯệẍt'), salt_val)
|
||||
self.assertEqual(expected_user_password, user.password)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class AccountActivationAndPasswordChangeTest(CreateAccountMixin, TestCase):
|
||||
"""
|
||||
Test cases to cover the account initialization workflow
|
||||
"""
|
||||
USERNAME = u'claire-underwood'
|
||||
PASSWORD = u'ṕáśśẃőŕd'
|
||||
EMAIL = u'claire+underwood@example.com'
|
||||
|
||||
IS_SECURE = False
|
||||
|
||||
def get_activation_key(self, user):
|
||||
registration = Registration.objects.get(user=user)
|
||||
return registration.activation_key
|
||||
|
||||
@skip_unless_lms
|
||||
def test_activate_account(self):
|
||||
# Create the account, which is initially inactive
|
||||
self.create_account(self.USERNAME, self.PASSWORD, self.EMAIL)
|
||||
user = User.objects.get(username=self.USERNAME)
|
||||
activation_key = self.get_activation_key(user)
|
||||
|
||||
request = RequestFactory().get("/api/user/v1/accounts/")
|
||||
request.user = user
|
||||
account = get_account_settings(request)[0]
|
||||
self.assertEqual(self.USERNAME, account["username"])
|
||||
self.assertEqual(self.EMAIL, account["email"])
|
||||
self.assertFalse(account["is_active"])
|
||||
|
||||
# Activate the account and verify that it is now active
|
||||
activate_account(activation_key)
|
||||
account = get_account_settings(request)[0]
|
||||
self.assertTrue(account['is_active'])
|
||||
|
||||
def test_activate_account_invalid_key(self):
|
||||
with pytest.raises(UserNotAuthorized):
|
||||
activate_account(u'invalid')
|
||||
|
||||
def test_activate_account_prevent_auth_user_writes(self):
|
||||
self.create_account(self.USERNAME, self.PASSWORD, self.EMAIL)
|
||||
user = User.objects.get(username=self.USERNAME)
|
||||
activation_key = self.get_activation_key(user)
|
||||
|
||||
with pytest.raises(UserAPIInternalError, message=SYSTEM_MAINTENANCE_MSG):
|
||||
with waffle().override(PREVENT_AUTH_USER_WRITES, True):
|
||||
activate_account(activation_key)
|
||||
|
||||
def _assert_is_datetime(self, timestamp):
|
||||
"""
|
||||
Internal helper to validate the type of the provided timestamp
|
||||
"""
|
||||
if not timestamp:
|
||||
return False
|
||||
try:
|
||||
parse_datetime(timestamp)
|
||||
except ValueError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
@@ -1,334 +1,41 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Tests for user authn views. """
|
||||
""" Tests for Logistration views. """
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from http.cookies import SimpleCookie
|
||||
from unittest import skipUnless
|
||||
|
||||
import ddt
|
||||
import mock
|
||||
from django.conf import settings
|
||||
from django.contrib import messages
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.contrib.messages.middleware import MessageMiddleware
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.core import mail
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.test import TestCase
|
||||
from django.test.client import RequestFactory
|
||||
from django.test.utils import override_settings
|
||||
from django.urls import reverse
|
||||
from django.utils.translation import ugettext as _
|
||||
from edx_oauth2_provider.tests.factories import AccessTokenFactory, ClientFactory, RefreshTokenFactory
|
||||
from oauth2_provider.models import AccessToken as dot_access_token
|
||||
from oauth2_provider.models import RefreshToken as dot_refresh_token
|
||||
from provider.oauth2.models import AccessToken as dop_access_token
|
||||
from provider.oauth2.models import RefreshToken as dop_refresh_token
|
||||
from six.moves import range
|
||||
from six.moves.urllib.parse import urlencode # pylint: disable=import-error
|
||||
from testfixtures import LogCapture
|
||||
from waffle.models import Switch
|
||||
|
||||
from course_modes.models import CourseMode
|
||||
from openedx.core.djangoapps.oauth_dispatch.tests import factories as dot_factories
|
||||
from openedx.core.djangoapps.site_configuration.tests.mixins import SiteMixin
|
||||
from openedx.core.djangoapps.theming.tests.test_util import with_comprehensive_theme_context
|
||||
from openedx.core.djangoapps.user_api.accounts.api import activate_account
|
||||
from openedx.core.djangoapps.user_api.accounts.utils import ENABLE_SECONDARY_EMAIL_FEATURE_SWITCH
|
||||
from openedx.core.djangoapps.user_api.errors import UserAPIInternalError
|
||||
from openedx.core.djangoapps.user_authn.views.login_form import login_and_registration_form
|
||||
from openedx.core.djangolib.js_utils import dump_js_escaped_json
|
||||
from openedx.core.djangolib.markup import HTML, Text
|
||||
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms
|
||||
from student.models import Registration
|
||||
from student.tests.factories import AccountRecoveryFactory
|
||||
from openedx.core.djangolib.testing.utils import skip_unless_lms
|
||||
from third_party_auth.tests.testutil import ThirdPartyAuthTestMixin, simulate_running_pipeline
|
||||
from util.testing import UrlResetMixin
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
|
||||
LOGGER_NAME = 'audit'
|
||||
User = get_user_model() # pylint:disable=invalid-name
|
||||
|
||||
FEATURES_WITH_FAILED_PASSWORD_RESET_EMAIL = settings.FEATURES.copy()
|
||||
FEATURES_WITH_FAILED_PASSWORD_RESET_EMAIL['ENABLE_PASSWORD_RESET_FAILURE_EMAIL'] = True
|
||||
|
||||
|
||||
@skip_unless_lms
|
||||
@ddt.ddt
|
||||
class UserAccountUpdateTest(CacheIsolationTestCase, UrlResetMixin):
|
||||
""" Tests for views that update the user's account information. """
|
||||
|
||||
USERNAME = u"heisenberg"
|
||||
ALTERNATE_USERNAME = u"walt"
|
||||
OLD_PASSWORD = u"ḅḷüëṡḳÿ"
|
||||
NEW_PASSWORD = u"B🄸🄶B🄻🅄🄴"
|
||||
OLD_EMAIL = u"walter@graymattertech.com"
|
||||
NEW_EMAIL = u"walt@savewalterwhite.com"
|
||||
|
||||
INVALID_KEY = u"123abc"
|
||||
|
||||
URLCONF_MODULES = ['student_accounts.urls']
|
||||
|
||||
ENABLED_CACHES = ['default']
|
||||
|
||||
def _create_account(self, username, password, email):
|
||||
# pylint: disable=missing-docstring
|
||||
registration_url = reverse('user_api_registration')
|
||||
resp = self.client.post(registration_url, {
|
||||
'username': username,
|
||||
'email': email,
|
||||
'password': password,
|
||||
'name': username,
|
||||
'honor_code': 'true',
|
||||
})
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
def setUp(self):
|
||||
super(UserAccountUpdateTest, self).setUp()
|
||||
|
||||
# Create/activate a new account
|
||||
self._create_account(self.USERNAME, self.OLD_PASSWORD, self.OLD_EMAIL)
|
||||
mail.outbox = []
|
||||
user = User.objects.get(username=self.USERNAME)
|
||||
registration = Registration.objects.get(user=user)
|
||||
activate_account(registration.activation_key)
|
||||
|
||||
self.account_recovery = AccountRecoveryFactory.create(user=User.objects.get(email=self.OLD_EMAIL))
|
||||
self.enable_account_recovery_switch = Switch.objects.create(
|
||||
name=ENABLE_SECONDARY_EMAIL_FEATURE_SWITCH,
|
||||
active=True
|
||||
)
|
||||
|
||||
# Login
|
||||
result = self.client.login(username=self.USERNAME, password=self.OLD_PASSWORD)
|
||||
self.assertTrue(result)
|
||||
|
||||
@skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in LMS')
|
||||
def test_password_change(self):
|
||||
# Request a password change while logged in, simulating
|
||||
# use of the password reset link from the account page
|
||||
response = self._change_password()
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that an email was sent
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
|
||||
# Retrieve the activation link from the email body
|
||||
email_body = mail.outbox[0].body
|
||||
result = re.search(r'(?P<url>https?://[^\s]+)', email_body)
|
||||
self.assertIsNot(result, None)
|
||||
activation_link = result.group('url')
|
||||
|
||||
# Visit the activation link
|
||||
response = self.client.get(activation_link)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Submit a new password and follow the redirect to the success page
|
||||
response = self.client.post(
|
||||
activation_link,
|
||||
# These keys are from the form on the current password reset confirmation page.
|
||||
{'new_password1': self.NEW_PASSWORD, 'new_password2': self.NEW_PASSWORD},
|
||||
follow=True
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, "Your password has been reset.")
|
||||
|
||||
# Log the user out to clear session data
|
||||
self.client.logout()
|
||||
|
||||
# Verify that the new password can be used to log in
|
||||
login_api_url = reverse('login_api')
|
||||
response = self.client.post(login_api_url, {'email': self.OLD_EMAIL, 'password': self.NEW_PASSWORD})
|
||||
assert response.status_code == 200
|
||||
response_dict = json.loads(response.content.decode('utf-8'))
|
||||
assert response_dict['success']
|
||||
|
||||
# Try reusing the activation link to change the password again
|
||||
# Visit the activation link again.
|
||||
response = self.client.get(activation_link)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, "This password reset link is invalid. It may have been used already.")
|
||||
|
||||
self.client.logout()
|
||||
|
||||
# Verify that the old password cannot be used to log in
|
||||
result = self.client.login(username=self.USERNAME, password=self.OLD_PASSWORD)
|
||||
self.assertFalse(result)
|
||||
|
||||
# Verify that the new password continues to be valid
|
||||
response = self.client.post(login_api_url, {'email': self.OLD_EMAIL, 'password': self.NEW_PASSWORD})
|
||||
assert response.status_code == 200
|
||||
response_dict = json.loads(response.content.decode('utf-8'))
|
||||
assert response_dict['success']
|
||||
|
||||
def test_password_change_failure(self):
|
||||
with mock.patch('openedx.core.djangoapps.user_authn.views.password_reset.request_password_change',
|
||||
side_effect=UserAPIInternalError):
|
||||
self._change_password()
|
||||
self.assertRaises(UserAPIInternalError)
|
||||
|
||||
@override_settings(FEATURES=FEATURES_WITH_FAILED_PASSWORD_RESET_EMAIL)
|
||||
def test_password_reset_failure_email(self):
|
||||
"""Test that a password reset failure email notification is sent, when enabled."""
|
||||
# Log the user out
|
||||
self.client.logout()
|
||||
|
||||
bad_email = 'doesnotexist@example.com'
|
||||
response = self._change_password(email=bad_email)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that an email was sent
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
|
||||
# Verify that the body contains the failed password reset message
|
||||
sent_message = mail.outbox[0]
|
||||
text_body = sent_message.body
|
||||
html_body = sent_message.alternatives[0][0]
|
||||
|
||||
for email_body in [text_body, html_body]:
|
||||
msg = u'However, there is currently no user account associated with your email address: {email}'.format(
|
||||
email=bad_email
|
||||
)
|
||||
|
||||
assert u'reset for your user account at {}'.format(settings.PLATFORM_NAME) in email_body
|
||||
assert 'password_reset_confirm' not in email_body, 'The link should not be added if user was not found'
|
||||
assert msg in email_body
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_password_change_logged_out(self, send_email):
|
||||
# Log the user out
|
||||
self.client.logout()
|
||||
|
||||
# Request a password change while logged out, simulating
|
||||
# use of the password reset link from the login page
|
||||
if send_email:
|
||||
response = self._change_password(email=self.OLD_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
else:
|
||||
# Don't send an email in the POST data, simulating
|
||||
# its (potentially accidental) omission in the POST
|
||||
# data sent from the login page
|
||||
response = self._change_password()
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_access_token_invalidation_logged_out(self):
|
||||
self.client.logout()
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
self._create_dop_tokens(user)
|
||||
self._create_dot_tokens(user)
|
||||
response = self._change_password(email=self.OLD_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assert_access_token_destroyed(user)
|
||||
|
||||
def test_access_token_invalidation_logged_in(self):
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
self._create_dop_tokens(user)
|
||||
self._create_dot_tokens(user)
|
||||
response = self._change_password()
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assert_access_token_destroyed(user)
|
||||
|
||||
def test_password_change_inactive_user(self):
|
||||
# Log out the user created during test setup
|
||||
self.client.logout()
|
||||
|
||||
# Create a second user, but do not activate it
|
||||
self._create_account(self.ALTERNATE_USERNAME, self.OLD_PASSWORD, self.NEW_EMAIL)
|
||||
mail.outbox = []
|
||||
|
||||
# Send the view the email address tied to the inactive user
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
|
||||
# Expect that the activation email is still sent,
|
||||
# since the user may have lost the original activation email.
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
|
||||
def test_password_change_no_user(self):
|
||||
# Log out the user created during test setup
|
||||
self.client.logout()
|
||||
|
||||
with LogCapture(LOGGER_NAME, level=logging.INFO) as logger:
|
||||
# Send the view an email address not tied to any user
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
logger.check((LOGGER_NAME, 'INFO', 'Invalid password reset attempt'))
|
||||
|
||||
def test_password_change_rate_limited(self):
|
||||
"""
|
||||
Tests that consective password reset requests are rate limited.
|
||||
"""
|
||||
# Log out the user created during test setup, to prevent the view from
|
||||
# selecting the logged-in user's email address over the email provided
|
||||
# in the POST data
|
||||
self.client.logout()
|
||||
for status in [200, 403]:
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
self.assertEqual(response.status_code, status)
|
||||
|
||||
with mock.patch(
|
||||
'util.request_rate_limiter.PasswordResetEmailRateLimiter.is_rate_limit_exceeded',
|
||||
return_value=False
|
||||
):
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@ddt.data(
|
||||
('post', 'password_change_request', []),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_require_http_method(self, correct_method, url_name, args):
|
||||
wrong_methods = {'get', 'put', 'post', 'head', 'options', 'delete'} - {correct_method}
|
||||
url = reverse(url_name, args=args)
|
||||
|
||||
for method in wrong_methods:
|
||||
response = getattr(self.client, method)(url)
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
def _change_password(self, email=None):
|
||||
"""Request to change the user's password. """
|
||||
data = {}
|
||||
|
||||
if email:
|
||||
data['email'] = email
|
||||
|
||||
return self.client.post(path=reverse('password_change_request'), data=data)
|
||||
|
||||
def _create_dop_tokens(self, user=None):
|
||||
"""Create dop access token for given user if user provided else for default user."""
|
||||
if not user:
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
|
||||
client = ClientFactory()
|
||||
access_token = AccessTokenFactory(user=user, client=client)
|
||||
RefreshTokenFactory(user=user, client=client, access_token=access_token)
|
||||
|
||||
def _create_dot_tokens(self, user=None):
|
||||
"""Create dop access token for given user if user provided else for default user."""
|
||||
if not user:
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
|
||||
application = dot_factories.ApplicationFactory(user=user)
|
||||
access_token = dot_factories.AccessTokenFactory(user=user, application=application)
|
||||
dot_factories.RefreshTokenFactory(user=user, application=application, access_token=access_token)
|
||||
|
||||
def assert_access_token_destroyed(self, user):
|
||||
"""Assert all access tokens are destroyed."""
|
||||
self.assertFalse(dot_access_token.objects.filter(user=user).exists())
|
||||
self.assertFalse(dot_refresh_token.objects.filter(user=user).exists())
|
||||
self.assertFalse(dop_access_token.objects.filter(user=user).exists())
|
||||
self.assertFalse(dop_refresh_token.objects.filter(user=user).exists())
|
||||
|
||||
|
||||
@skip_unless_lms
|
||||
@ddt.ddt
|
||||
class LoginAndRegistrationTest(ThirdPartyAuthTestMixin, UrlResetMixin, ModuleStoreTestCase):
|
||||
""" Tests for the student account views that update the user's account information. """
|
||||
""" Tests for Login and Registration. """
|
||||
USERNAME = "bob"
|
||||
EMAIL = "bob@example.com"
|
||||
PASSWORD = u"password"
|
||||
@@ -2,51 +2,54 @@
|
||||
"""
|
||||
Tests for user authorization password-related functionality.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from mock import Mock, patch
|
||||
|
||||
import ddt
|
||||
from django.core import mail
|
||||
from django.contrib.auth.models import User
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.test import TestCase
|
||||
from django.test.client import RequestFactory
|
||||
from django.urls import reverse
|
||||
from testfixtures import LogCapture
|
||||
|
||||
from edx_oauth2_provider.tests.factories import AccessTokenFactory, ClientFactory, RefreshTokenFactory
|
||||
from openedx.core.djangoapps.site_configuration.tests.factories import SiteFactory
|
||||
from openedx.core.djangoapps.oauth_dispatch.tests import factories as dot_factories
|
||||
from openedx.core.djangoapps.user_api.accounts.tests.test_api import CreateAccountMixin
|
||||
from openedx.core.djangoapps.user_api.accounts.api import (
|
||||
activate_account,
|
||||
)
|
||||
from openedx.core.djangoapps.user_api.errors import UserNotFound
|
||||
from openedx.core.djangoapps.user_api.errors import UserNotFound, UserAPIInternalError
|
||||
from openedx.core.djangoapps.user_authn.views.password_reset import request_password_change
|
||||
from openedx.core.djangolib.testing.utils import skip_unless_lms
|
||||
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms
|
||||
from oauth2_provider.models import AccessToken as dot_access_token
|
||||
from oauth2_provider.models import RefreshToken as dot_refresh_token
|
||||
from provider.oauth2.models import AccessToken as dop_access_token
|
||||
from provider.oauth2.models import RefreshToken as dop_refresh_token
|
||||
|
||||
from student.models import Registration
|
||||
|
||||
LOGGER_NAME = 'audit'
|
||||
User = get_user_model() # pylint:disable=invalid-name
|
||||
|
||||
|
||||
class TestRequestPasswordChange(CreateAccountMixin, TestCase):
|
||||
"""
|
||||
Tests for users who request a password change.
|
||||
"""
|
||||
|
||||
USERNAME = u'claire-underwood'
|
||||
PASSWORD = u'ṕáśśẃőŕd'
|
||||
EMAIL = u'claire+underwood@example.com'
|
||||
|
||||
IS_SECURE = False
|
||||
|
||||
def get_activation_key(self, user):
|
||||
registration = Registration.objects.get(user=user)
|
||||
return registration.activation_key
|
||||
|
||||
@skip_unless_lms
|
||||
def test_request_password_change(self):
|
||||
# Create and activate an account
|
||||
self.create_account(self.USERNAME, self.PASSWORD, self.EMAIL)
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
|
||||
user = User.objects.get(username=self.USERNAME)
|
||||
activation_key = self.get_activation_key(user)
|
||||
activate_account(activation_key)
|
||||
|
||||
request = RequestFactory().post('/password')
|
||||
request.user = Mock()
|
||||
request.site = SiteFactory()
|
||||
@@ -87,3 +90,245 @@ class TestRequestPasswordChange(CreateAccountMixin, TestCase):
|
||||
|
||||
# Verify that the password change email was still sent
|
||||
self.assertEqual(len(mail.outbox), 2)
|
||||
|
||||
|
||||
@skip_unless_lms
|
||||
@ddt.ddt
|
||||
class TestPasswordChange(CreateAccountMixin, CacheIsolationTestCase):
|
||||
""" Tests for views that change the user's password. """
|
||||
|
||||
USERNAME = u"heisenberg"
|
||||
ALTERNATE_USERNAME = u"walt"
|
||||
OLD_PASSWORD = u"ḅḷüëṡḳÿ"
|
||||
NEW_PASSWORD = u"B🄸🄶B🄻🅄🄴"
|
||||
OLD_EMAIL = u"walter@graymattertech.com"
|
||||
NEW_EMAIL = u"walt@savewalterwhite.com"
|
||||
|
||||
INVALID_KEY = u"123abc"
|
||||
|
||||
ENABLED_CACHES = ['default']
|
||||
|
||||
def setUp(self):
|
||||
super(TestPasswordChange, self).setUp()
|
||||
|
||||
self.create_account(self.USERNAME, self.OLD_PASSWORD, self.OLD_EMAIL)
|
||||
result = self.client.login(username=self.USERNAME, password=self.OLD_PASSWORD)
|
||||
self.assertTrue(result)
|
||||
mail.outbox = []
|
||||
|
||||
def test_password_change(self):
|
||||
# Request a password change while logged in, simulating
|
||||
# use of the password reset link from the account page
|
||||
response = self._change_password()
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that an email was sent
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
|
||||
# Retrieve the activation link from the email body
|
||||
email_body = mail.outbox[0].body
|
||||
result = re.search(r'(?P<url>https?://[^\s]+)', email_body)
|
||||
self.assertIsNot(result, None)
|
||||
activation_link = result.group('url')
|
||||
|
||||
# Visit the activation link
|
||||
response = self.client.get(activation_link)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Submit a new password and follow the redirect to the success page
|
||||
response = self.client.post(
|
||||
activation_link,
|
||||
# These keys are from the form on the current password reset confirmation page.
|
||||
{'new_password1': self.NEW_PASSWORD, 'new_password2': self.NEW_PASSWORD},
|
||||
follow=True
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, "Your password has been reset.")
|
||||
|
||||
# Log the user out to clear session data
|
||||
self.client.logout()
|
||||
|
||||
# Verify that the new password can be used to log in
|
||||
login_api_url = reverse('login_api')
|
||||
response = self.client.post(login_api_url, {'email': self.OLD_EMAIL, 'password': self.NEW_PASSWORD})
|
||||
assert response.status_code == 200
|
||||
response_dict = json.loads(response.content.decode('utf-8'))
|
||||
assert response_dict['success']
|
||||
|
||||
# Try reusing the activation link to change the password again
|
||||
# Visit the activation link again.
|
||||
response = self.client.get(activation_link)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, "This password reset link is invalid. It may have been used already.")
|
||||
|
||||
self.client.logout()
|
||||
|
||||
# Verify that the old password cannot be used to log in
|
||||
result = self.client.login(username=self.USERNAME, password=self.OLD_PASSWORD)
|
||||
self.assertFalse(result)
|
||||
|
||||
# Verify that the new password continues to be valid
|
||||
response = self.client.post(login_api_url, {'email': self.OLD_EMAIL, 'password': self.NEW_PASSWORD})
|
||||
assert response.status_code == 200
|
||||
response_dict = json.loads(response.content.decode('utf-8'))
|
||||
assert response_dict['success']
|
||||
|
||||
def test_password_change_failure(self):
|
||||
with patch(
|
||||
'openedx.core.djangoapps.user_authn.views.password_reset.request_password_change',
|
||||
side_effect=UserAPIInternalError,
|
||||
):
|
||||
self._change_password()
|
||||
self.assertRaises(UserAPIInternalError)
|
||||
|
||||
@patch.dict(settings.FEATURES, {'ENABLE_PASSWORD_RESET_FAILURE_EMAIL': True})
|
||||
def test_password_reset_failure_email(self):
|
||||
"""Test that a password reset failure email notification is sent, when enabled."""
|
||||
# Log the user out
|
||||
self.client.logout()
|
||||
|
||||
bad_email = 'doesnotexist@example.com'
|
||||
response = self._change_password(email=bad_email)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that an email was sent
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
|
||||
# Verify that the body contains the failed password reset message
|
||||
sent_message = mail.outbox[0]
|
||||
text_body = sent_message.body
|
||||
html_body = sent_message.alternatives[0][0]
|
||||
|
||||
for email_body in [text_body, html_body]:
|
||||
msg = u'However, there is currently no user account associated with your email address: {email}'.format(
|
||||
email=bad_email
|
||||
)
|
||||
|
||||
assert u'reset for your user account at {}'.format(settings.PLATFORM_NAME) in email_body
|
||||
assert 'password_reset_confirm' not in email_body, 'The link should not be added if user was not found'
|
||||
assert msg in email_body
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_password_change_logged_out(self, send_email):
|
||||
# Log the user out
|
||||
self.client.logout()
|
||||
|
||||
# Request a password change while logged out, simulating
|
||||
# use of the password reset link from the login page
|
||||
if send_email:
|
||||
response = self._change_password(email=self.OLD_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
else:
|
||||
# Don't send an email in the POST data, simulating
|
||||
# its (potentially accidental) omission in the POST
|
||||
# data sent from the login page
|
||||
response = self._change_password()
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_access_token_invalidation_logged_out(self):
|
||||
self.client.logout()
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
self._create_dop_tokens(user)
|
||||
self._create_dot_tokens(user)
|
||||
response = self._change_password(email=self.OLD_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self._assert_access_token_destroyed(user)
|
||||
|
||||
def test_access_token_invalidation_logged_in(self):
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
self._create_dop_tokens(user)
|
||||
self._create_dot_tokens(user)
|
||||
response = self._change_password()
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self._assert_access_token_destroyed(user)
|
||||
|
||||
def test_password_change_inactive_user(self):
|
||||
# Log out the user created during test setup
|
||||
self.client.logout()
|
||||
|
||||
# Create a second user, but do not activate it
|
||||
self.create_account(self.ALTERNATE_USERNAME, self.OLD_PASSWORD, self.NEW_EMAIL)
|
||||
mail.outbox = []
|
||||
|
||||
# Send the view the email address tied to the inactive user
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
|
||||
# Expect that the activation email is still sent,
|
||||
# since the user may have lost the original activation email.
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(len(mail.outbox), 1)
|
||||
|
||||
def test_password_change_no_user(self):
|
||||
# Log out the user created during test setup
|
||||
self.client.logout()
|
||||
|
||||
with LogCapture(LOGGER_NAME, level=logging.INFO) as logger:
|
||||
# Send the view an email address not tied to any user
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
logger.check((LOGGER_NAME, 'INFO', 'Invalid password reset attempt'))
|
||||
|
||||
def test_password_change_rate_limited(self):
|
||||
"""
|
||||
Tests that consecutive password reset requests are rate limited.
|
||||
"""
|
||||
# Log out the user created during test setup, to prevent the view from
|
||||
# selecting the logged-in user's email address over the email provided
|
||||
# in the POST data
|
||||
self.client.logout()
|
||||
for status in [200, 403]:
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
self.assertEqual(response.status_code, status)
|
||||
|
||||
with patch(
|
||||
'util.request_rate_limiter.PasswordResetEmailRateLimiter.is_rate_limit_exceeded',
|
||||
return_value=False
|
||||
):
|
||||
response = self._change_password(email=self.NEW_EMAIL)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@ddt.data(
|
||||
('post', 'password_change_request', []),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_require_http_method(self, correct_method, url_name, args):
|
||||
wrong_methods = {'get', 'put', 'post', 'head', 'options', 'delete'} - {correct_method}
|
||||
url = reverse(url_name, args=args)
|
||||
|
||||
for method in wrong_methods:
|
||||
response = getattr(self.client, method)(url)
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
def _change_password(self, email=None):
|
||||
"""Request to change the user's password. """
|
||||
data = {}
|
||||
|
||||
if email:
|
||||
data['email'] = email
|
||||
|
||||
return self.client.post(path=reverse('password_change_request'), data=data)
|
||||
|
||||
def _create_dop_tokens(self, user=None):
|
||||
"""Create dop access token for given user if user provided else for default user."""
|
||||
if not user:
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
|
||||
client = ClientFactory()
|
||||
access_token = AccessTokenFactory(user=user, client=client)
|
||||
RefreshTokenFactory(user=user, client=client, access_token=access_token)
|
||||
|
||||
def _create_dot_tokens(self, user=None):
|
||||
"""Create dot access token for given user if user provided else for default user."""
|
||||
if not user:
|
||||
user = User.objects.get(email=self.OLD_EMAIL)
|
||||
|
||||
application = dot_factories.ApplicationFactory(user=user)
|
||||
access_token = dot_factories.AccessTokenFactory(user=user, application=application)
|
||||
dot_factories.RefreshTokenFactory(user=user, application=application, access_token=access_token)
|
||||
|
||||
def _assert_access_token_destroyed(self, user):
|
||||
"""Assert all access tokens are destroyed."""
|
||||
self.assertFalse(dot_access_token.objects.filter(user=user).exists())
|
||||
self.assertFalse(dot_refresh_token.objects.filter(user=user).exists())
|
||||
self.assertFalse(dop_access_token.objects.filter(user=user).exists())
|
||||
self.assertFalse(dop_refresh_token.objects.filter(user=user).exists())
|
||||
|
||||
Reference in New Issue
Block a user