Files
edx-platform/common/djangoapps/third_party_auth/tests/specs/base.py
Deborah Kaplan 29de9b2dc4 feat!: Legacy account, profile, order history removal (#36219)
* feat!: Legacy account, profile, order history removal

This removes the legacy account and profile applications, and the order
history page. This is primarily a reapplication of #31893, which was
rolled back due to prior blockers.

FIXES: APER-3884
FIXES: openedx/public-engineering#71


Co-authored-by: Muhammad Abdullah Waheed <42172960+abdullahwaheed@users.noreply.github.com>
Co-authored-by: Bilal Qamar <59555732+BilalQamar95@users.noreply.github.com>
2025-02-10 14:39:13 -05:00

1159 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Base integration test for provider implementations.
"""
import json
import unittest
from contextlib import contextmanager
from unittest import mock
import pytest
from django import test
from django.conf import settings
from django.contrib import auth, messages
from django.contrib.auth import models as auth_models
from django.contrib.messages.storage import fallback
from django.contrib.sessions.backends import cache
from django.urls import reverse
from django.test import utils as django_utils
from django.conf import settings as django_settings # lint-amnesty, pylint: disable=reimported
from social_core import actions, exceptions
from social_django import utils as social_utils
from social_django import views as social_views
from lms.djangoapps.commerce.tests import TEST_API_URL
from openedx.core.djangoapps.user_authn.views.login import login_user
from openedx.core.djangoapps.user_authn.views.login_form import login_and_registration_form
from openedx.core.djangoapps.user_authn.views.register import RegistrationView
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
from openedx.core.djangoapps.site_configuration.tests.factories import SiteFactory
from common.djangoapps.student import models as student_models
from common.djangoapps.student.tests.factories import UserFactory
from common.djangoapps.third_party_auth import middleware, pipeline
from common.djangoapps.third_party_auth.tests import testutil
def create_account(request):
return RegistrationView().post(request)
class HelperMixin:
"""
Contains helper methods for IntegrationTestMixin and IntegrationTest classes below.
"""
provider = None
def _check_registration_form_username(self, form_data, test_username, expected):
"""
DRY method to check the username in the registration form.
Args:
form_data (dict): data to initialize form with.
test_username (str): username to check the form initialization with.
expected (str): expected cleaned username after the form initialization.
"""
form_data["username"] = test_username
form_field_data = self.provider.get_register_form_data(form_data)
assert form_field_data["username"] == expected
def assert_redirect_to_provider_looks_correct(self, response):
"""Asserts the redirect to the provider's site looks correct.
When we hit /auth/login/<provider>, we should be redirected to the
provider's site. Here we check that we're redirected, but we don't know
enough about the provider to check what we're redirected to. Child test
implementations may optionally strengthen this assertion with, for
example, more details about the format of the Location header.
"""
assert 302 == response.status_code
assert response.has_header("Location")
def assert_register_response_in_pipeline_looks_correct(
self, response, pipeline_kwargs, required_fields
): # lint-amnesty, pylint: disable=invalid-name
"""Performs spot checks of the rendered register.html page.
When we display the new account registration form after the user signs
in with a third party, we prepopulate the form with values sent back
from the provider. The exact set of values varies on a provider-by-
provider basis and is generated by
provider.BaseProvider.get_register_form_data. We provide some stock
assertions based on the provider's implementation; if you want more
assertions in your test, override this method.
"""
# Check that the correct provider was selected.
self.assertContains(response, '"errorMessage": null')
self.assertContains(
response,
f'"currentProvider": "{self.provider.name}"',
)
# Expect that each truthy value we've prepopulated the register form
# with is actually present.
form_field_data = self.provider.get_register_form_data(pipeline_kwargs)
for prepopulated_form_data in form_field_data:
if prepopulated_form_data in required_fields:
self.assertContains(response, form_field_data[prepopulated_form_data])
def _get_user_providers_state(self, request):
"""
Return provider user states and duplicated providers.
"""
data = {
"auth": {},
}
data["duplicate_provider"] = pipeline.get_duplicate_provider(messages.get_messages(request))
auth_states = pipeline.get_provider_user_states(request.user)
data["auth"]["providers"] = [
{
"name": state.provider.name,
"connected": state.has_account,
}
for state in auth_states
if state.provider.display_for_login or state.has_account
]
return data
def assert_third_party_accounts_state(self, request, duplicate=False, linked=None):
"""
Asserts the user's third party account in the expected state.
If duplicate is True, we expect data['duplicate_provider'] to contain
the duplicate provider backend name. If linked is passed, we conditionally
check that the provider is included in data['auth']['providers'] and
its connected state is correct.
"""
data = self._get_user_providers_state(request)
if duplicate:
assert data["duplicate_provider"] == self.provider.backend_name
else:
assert data["duplicate_provider"] is None
if linked is not None:
expected_provider = [
provider for provider in data["auth"]["providers"] if provider["name"] == self.provider.name
][0]
assert expected_provider is not None
assert expected_provider["connected"] == linked
def assert_register_form_populates_unicode_username_correctly(
self, request
): # lint-amnesty, pylint: disable=invalid-name
"""
Check the registration form username field behaviour with unicode values.
The field could be empty or prefilled depending on whether ENABLE_UNICODE_USERNAME feature is disabled/enabled.
"""
unicode_username = "Червона_Калина"
ascii_substring = "untouchable"
partial_unicode_username = unicode_username + ascii_substring
pipeline_kwargs = pipeline.get(request)["kwargs"]
assert settings.FEATURES["ENABLE_UNICODE_USERNAME"] is False
self._check_registration_form_username(pipeline_kwargs, unicode_username, "")
self._check_registration_form_username(pipeline_kwargs, partial_unicode_username, ascii_substring)
with mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_UNICODE_USERNAME": True}):
self._check_registration_form_username(pipeline_kwargs, unicode_username, unicode_username)
def assert_exception_redirect_looks_correct(self, expected_uri, auth_entry=None):
"""Tests middleware conditional redirection.
middleware.ExceptionMiddleware makes sure the user ends up in the right
place when they cancel authentication via the provider's UX.
"""
exception_middleware = middleware.ExceptionMiddleware(get_response=lambda request: None)
request, _ = self.get_request_and_strategy(auth_entry=auth_entry)
response = exception_middleware.process_exception(request, exceptions.AuthCanceled(request.backend))
location = response.get("Location")
assert 302 == response.status_code
assert "canceled" in location
assert self.backend_name in location
assert location.startswith(expected_uri + "?")
def assert_json_failure_response_is_inactive_account(self, response):
"""Asserts failure on /login for inactive account looks right."""
assert 400 == response.status_code
payload = json.loads(response.content.decode("utf-8"))
context = {
"platformName": configuration_helpers.get_value("PLATFORM_NAME", settings.PLATFORM_NAME),
"supportLink": configuration_helpers.get_value("SUPPORT_SITE_LINK", settings.SUPPORT_SITE_LINK),
}
assert not payload.get("success")
assert "inactive-user" in payload.get("error_code")
assert context == payload.get("context")
def assert_json_failure_response_is_missing_social_auth(self, response):
"""Asserts failure on /login for missing social auth looks right."""
assert 403 == response.status_code
payload = json.loads(response.content.decode("utf-8"))
assert not payload.get("success")
assert payload.get("error_code") == "third-party-auth-with-no-linked-account"
def assert_json_failure_response_is_username_collision(self, response):
"""Asserts the json response indicates a username collision."""
assert 409 == response.status_code
payload = json.loads(response.content.decode("utf-8"))
assert not payload.get("success")
assert "It looks like this username is already taken" == payload["username"][0]["user_message"]
def assert_json_success_response_looks_correct(self, response, verify_redirect_url):
"""Asserts the json response indicates success and redirection."""
assert 200 == response.status_code
payload = json.loads(response.content.decode("utf-8"))
assert payload.get("success")
if verify_redirect_url:
assert pipeline.get_complete_url(self.provider.backend_name) == payload.get("redirect_url")
def assert_login_response_before_pipeline_looks_correct(self, response):
"""Asserts a GET of /login not in the pipeline looks correct."""
# The combined login/registration page dynamically generates the login button,
# but we can still check that the provider name is passed in the data attribute
# for the container element.
self.assertContains(response, self.provider.name)
def assert_login_response_in_pipeline_looks_correct(self, response):
"""Asserts a GET of /login in the pipeline looks correct."""
assert 200 == response.status_code
def assert_password_overridden_by_pipeline(self, username, password):
"""Verifies that the given password is not correct.
The pipeline overrides POST['password'], if any, with random data.
"""
assert auth.authenticate(password=password, username=username) is None
def assert_pipeline_running(self, request):
"""Makes sure the given request is running an auth pipeline."""
assert pipeline.running(request)
def assert_redirect_after_pipeline_completes(self, response, expected_redirect_url=None):
"""Asserts a response would redirect to the expected_redirect_url or SOCIAL_AUTH_LOGIN_REDIRECT_URL."""
assert 302 == response.status_code
# NOTE: Ideally we should use assertRedirects(), however it errors out due to the hostname, testserver,
# not being properly set. This may be an issue with the call made by PSA, but we are not certain.
assert response.get("Location").endswith(
expected_redirect_url or django_settings.SOCIAL_AUTH_LOGIN_REDIRECT_URL
)
def assert_redirect_to_login_looks_correct(self, response):
"""Asserts a response would redirect to /login."""
assert 302 == response.status_code
assert "/login" == response.get("Location")
def assert_redirect_to_register_looks_correct(self, response):
"""Asserts a response would redirect to /register."""
assert 302 == response.status_code
assert "/register" == response.get("Location")
def assert_register_response_before_pipeline_looks_correct(self, response):
"""Asserts a GET of /register not in the pipeline looks correct."""
# The combined login/registration page dynamically generates the register button,
# but we can still check that the provider name is passed in the data attribute
# for the container element.
self.assertContains(response, self.provider.name)
def assert_social_auth_does_not_exist_for_user(self, user, strategy):
"""Asserts a user does not have an auth with the expected provider."""
social_auths = strategy.storage.user.get_social_auth_for_user(user, provider=self.provider.backend_name)
assert 0 == len(social_auths)
def assert_social_auth_exists_for_user(self, user, strategy):
"""Asserts a user has a social auth with the expected provider."""
social_auths = strategy.storage.user.get_social_auth_for_user(user, provider=self.provider.backend_name)
assert 1 == len(social_auths)
assert self.backend_name == social_auths[0].provider
def assert_logged_in_cookie_redirect(self, response):
"""Verify that the user was redirected in order to set the logged in cookie."""
assert response.status_code == 302
assert response["Location"] == pipeline.get_complete_url(self.provider.backend_name)
assert response.cookies[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME].value == "true"
assert django_settings.EDXMKTG_USER_INFO_COOKIE_NAME in response.cookies
@property
def backend_name(self):
"""Shortcut for the backend name"""
return self.provider.backend_name
def get_registration_post_vars(self, overrides=None):
"""POST vars generated by the registration form."""
defaults = {
"username": "username",
"name": "First Last",
"gender": "",
"year_of_birth": "",
"level_of_education": "",
"goals": "",
"honor_code": "true",
"terms_of_service": "true",
"password": "password",
"mailing_address": "",
"email": "user@email.com",
}
if overrides:
defaults.update(overrides)
return defaults
def get_request_and_strategy(self, auth_entry=None, redirect_uri=None):
"""Gets a fully-configured GET request and strategy.
These two objects contain circular references, so we create them
together. The references themselves are a mixture of normal __init__
stuff and monkey-patching done by python-social-auth. See, for example,
social_django.utils.strategy().
"""
request = self.request_factory.get(
pipeline.get_complete_url(self.backend_name)
+ "?redirect_state=redirect_state_value&code=code_value&state=state_value"
)
request.site = SiteFactory.create()
request.user = auth_models.AnonymousUser()
request.session = cache.SessionStore()
request.session[self.backend_name + "_state"] = "state_value"
if auth_entry:
request.session[pipeline.AUTH_ENTRY_KEY] = auth_entry
strategy = social_utils.load_strategy(request=request)
request.social_strategy = strategy
request.backend = social_utils.load_backend(strategy, self.backend_name, redirect_uri)
return request, strategy
def _get_login_post_request(self, strategy):
"""Gets a fully-configured login POST request given a strategy and pipeline."""
request = self.request_factory.post(reverse("login_api"))
# Note: The shared GET request can't be used for login, which is now POST-only,
# so this POST request is given a copy of all configuration from the GET request
# with the active third-party auth pipeline and strategy.
request.site = strategy.request.site
request.social_strategy = strategy
request.user = strategy.request.user
request.session = strategy.request.session
request.backend = strategy.request.backend
return request
@contextmanager
def _patch_edxmako_current_request(self, request):
"""Make ``request`` be the current request for edxmako template rendering."""
with mock.patch("common.djangoapps.edxmako.request_context.get_current_request", return_value=request):
yield
def get_user_by_email(self, strategy, email):
"""Gets a user by email, using the given strategy."""
return strategy.storage.user.user_model().objects.get(email=email)
def set_logged_in_cookies(self, request):
"""Simulate setting the marketing site cookie on the request."""
request.COOKIES[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME] = "true"
request.COOKIES[django_settings.EDXMKTG_USER_INFO_COOKIE_NAME] = json.dumps(
{
"version": django_settings.EDXMKTG_USER_INFO_COOKIE_VERSION,
}
)
def create_user_models_for_existing_account(self, strategy, email, password, username, skip_social_auth=False):
"""Creates user, profile, registration, and (usually) social auth.
This synthesizes what happens during /register.
"""
response_data = self.get_response_data()
uid = strategy.request.backend.get_user_id(response_data, response_data)
user = social_utils.Storage.user.create_user(email=email, password=password, username=username)
profile = student_models.UserProfile(user=user)
profile.save()
registration = student_models.Registration()
registration.register(user)
registration.save()
if not skip_social_auth:
social_utils.Storage.user.create_social_auth(user, uid, self.provider.backend_name)
return user
def fake_auth_complete(self, strategy):
"""Fake implementation of social_core.backends.BaseAuth.auth_complete.
Unlike what the docs say, it does not need to return a user instance.
Sometimes (like when directing users to the /register form) it instead
returns a response that 302s to /register.
"""
args = ()
kwargs = {
"request": strategy.request,
"backend": strategy.request.backend,
"user": None,
"response": self.get_response_data(),
}
return strategy.authenticate(*args, **kwargs)
class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
"""
Mixin base class for third_party_auth integration tests.
This class is newer and simpler than the 'IntegrationTest' alternative below, but it is
currently less comprehensive. Some providers are tested with this, others with
IntegrationTest.
"""
# Provider information:
PROVIDER_NAME = "override"
PROVIDER_BACKEND = "override"
PROVIDER_ID = "override"
# Information about the user expected from the provider:
USER_EMAIL = "override"
USER_NAME = "override"
USER_USERNAME = "override"
def setUp(self):
super().setUp()
self.request_factory = test.RequestFactory()
self.login_page_url = reverse("signin_user")
self.register_page_url = reverse("register_user")
patcher = testutil.patch_mako_templates()
patcher.start()
self.addCleanup(patcher.stop)
# Override this method in a subclass and enable at least one provider.
def _test_register(self, **extra_defaults):
"""
The user goes to the register page, and sees a button to register with the provider.
"""
provider_register_url = self._check_register_page()
# The user clicks on the Dummy button:
try_login_response = self.client.get(provider_register_url)
# The user should be redirected to the provider's login page:
assert try_login_response.status_code == 302
provider_response = self.do_provider_login(try_login_response["Location"])
# We should be redirected to the register screen since this account is not linked to an edX account:
assert provider_response.status_code == 302
assert provider_response["Location"] == self.register_page_url
register_response = self.client.get(self.register_page_url)
tpa_context = register_response.context["data"]["third_party_auth"]
assert tpa_context["errorMessage"] is None
# Check that the "You've successfully signed into [PROVIDER_NAME]" message is shown.
assert tpa_context["currentProvider"] == self.PROVIDER_NAME
# Check that the data (e.g. email) from the provider is displayed in the form:
form_data = register_response.context["data"]["registration_form_desc"]
form_fields = {field["name"]: field for field in form_data["fields"]}
assert form_fields["email"]["defaultValue"] == self.USER_EMAIL
assert form_fields["name"]["defaultValue"] == self.USER_NAME
assert form_fields["username"]["defaultValue"] == self.USER_USERNAME
for field_name, value in extra_defaults.items():
assert form_fields[field_name]["defaultValue"] == value
registration_values = {
"email": "email-edited@tpa-test.none",
"name": "My Customized Name",
"username": "new_username",
"honor_code": True,
}
# Now complete the form:
ajax_register_response = self.client.post(reverse("user_api_registration"), registration_values)
assert ajax_register_response.status_code == 200
# Then the AJAX will finish the third party auth:
continue_response = self.client.get(tpa_context["finishAuthUrl"])
# And we should be redirected to the dashboard:
assert continue_response.status_code == 302
assert continue_response["Location"] == reverse("dashboard")
# Now check that we can login again, whether or not we have yet verified the account:
self.client.logout()
self._test_return_login(user_is_activated=False)
self.client.logout()
self.verify_user_email("email-edited@tpa-test.none")
self._test_return_login(user_is_activated=True)
def _test_login(self):
"""
The user goes to the login page, and sees a button to login with the provider.
"""
self.user = UserFactory.create()
provider_login_url = self._check_login_page()
# The user clicks on the provider's button:
try_login_response = self.client.get(provider_login_url)
# The user should be redirected to the provider's login page:
assert try_login_response.status_code == 302
complete_response = self.do_provider_login(try_login_response["Location"])
# We should be redirected to the login screen since this account is not linked to an edX account:
assert complete_response.status_code == 302
assert complete_response["Location"] == self.login_page_url
login_response = self.client.get(self.login_page_url)
tpa_context = login_response.context["data"]["third_party_auth"]
assert tpa_context["errorMessage"] is None
# Check that the "You've successfully signed into [PROVIDER_NAME]" message is shown.
assert tpa_context["currentProvider"] == self.PROVIDER_NAME
# Now the user enters their username and password.
# The AJAX on the page will log them in:
ajax_login_response = self.client.post(
reverse("user_api_login_session", kwargs={"api_version": "v1"}),
{"email": self.user.email, "password": "Password1234"},
)
assert ajax_login_response.status_code == 200
# Then the AJAX will finish the third party auth:
continue_response = self.client.get(tpa_context["finishAuthUrl"])
# And we should be redirected to the dashboard:
assert continue_response.status_code == 302
assert continue_response["Location"] == reverse("dashboard")
# Now check that we can login again:
self.client.logout()
self._test_return_login()
def do_provider_login(self, provider_redirect_url):
"""
mock logging in to the provider
Should end with loading self.complete_url, which should be returned
"""
raise NotImplementedError
def _test_return_login(self, user_is_activated=True, previous_session_timed_out=False):
"""Test logging in to an account that is already linked."""
# Make sure we're not logged in:
dashboard_response = self.client.get(reverse("dashboard"))
assert dashboard_response.status_code == 302
# The user goes to the login page, and sees a button to login with this provider:
provider_login_url = self._check_login_page()
# The user clicks on the provider's login button:
try_login_response = self.client.get(provider_login_url)
# The user should be redirected to the provider:
assert try_login_response.status_code == 302
login_response = self.do_provider_login(try_login_response["Location"])
# If the previous session was manually logged out, there will be one weird redirect
# required to set the login cookie (it sticks around if the main session times out):
if not previous_session_timed_out:
assert login_response.status_code == 302
assert login_response["Location"] == (self.complete_url + "?")
# And then we should be redirected to the dashboard:
login_response = self.client.get(login_response["Location"])
assert login_response.status_code == 302
if user_is_activated:
url_expected = reverse("dashboard")
else:
url_expected = reverse("third_party_inactive_redirect") + "?next=" + reverse("dashboard")
assert login_response["Location"] == url_expected
# Now we are logged in:
dashboard_response = self.client.get(reverse("dashboard"))
assert dashboard_response.status_code == 200
def _check_login_page(self):
"""
Load the login form and check that it contains a button for the provider.
Return the URL for logging into that provider.
"""
return self._check_login_or_register_page(self.login_page_url, "loginUrl")
def _check_register_page(self):
"""
Load the registration form and check that it contains a button for the provider.
Return the URL for registering with that provider.
"""
return self._check_login_or_register_page(self.register_page_url, "registerUrl")
def _check_login_or_register_page(self, url, url_to_return):
"""Shared logic for _check_login_page() and _check_register_page()"""
response = self.client.get(url)
self.assertContains(response, self.PROVIDER_NAME)
context_data = response.context["data"]["third_party_auth"]
provider_urls = {provider["id"]: provider[url_to_return] for provider in context_data["providers"]}
assert self.PROVIDER_ID in provider_urls
return provider_urls[self.PROVIDER_ID]
@property
def complete_url(self):
"""Get the auth completion URL for this provider"""
return reverse("social:complete", kwargs={"backend": self.PROVIDER_BACKEND})
@unittest.skipUnless(
testutil.AUTH_FEATURES_KEY in django_settings.FEATURES, testutil.AUTH_FEATURES_KEY + " not in settings.FEATURES"
)
@django_utils.override_settings() # For settings reversion on a method-by-method basis.
class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
"""Abstract base class for provider integration tests."""
def setUp(self):
super().setUp()
self.request_factory = test.RequestFactory()
# Actual tests, executed once per child.
def test_canceling_authentication_redirects_to_login_when_auth_entry_login(self):
self.assert_exception_redirect_looks_correct("/login", auth_entry=pipeline.AUTH_ENTRY_LOGIN)
def test_canceling_authentication_redirects_to_register_when_auth_entry_register(self):
self.assert_exception_redirect_looks_correct("/register", auth_entry=pipeline.AUTH_ENTRY_REGISTER)
def test_canceling_authentication_redirects_to_account_settings_when_auth_entry_account_settings(self):
self.assert_exception_redirect_looks_correct(
"/account/settings", auth_entry=pipeline.AUTH_ENTRY_ACCOUNT_SETTINGS
)
def test_canceling_authentication_redirects_to_root_when_auth_entry_not_set(self):
self.assert_exception_redirect_looks_correct("/")
@mock.patch("common.djangoapps.third_party_auth.pipeline.segment.track")
def test_full_pipeline_succeeds_for_linking_account(self, _mock_segment_track):
# First, create, the GET request and strategy that store pipeline state,
# configure the backend, and mock out wire traffic.
get_request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
get_request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
get_request.user = self.create_user_models_for_existing_account(
strategy, "user@example.com", "password", self.get_username(), skip_social_auth=True
)
partial_pipeline_token = strategy.session_get("partial_pipeline_token")
partial_data = strategy.storage.partial.load(partial_pipeline_token)
# Instrument the pipeline to get to the dashboard with the full
# expected state.
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
actions.do_complete(
get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access
)
post_request = self._get_login_post_request(strategy)
login_user(post_request)
actions.do_complete(
post_request.backend,
social_views._do_login, # pylint: disable=protected-access, no-member
request=post_request,
)
# First we expect that we're in the unlinked state, and that there
# really is no association in the backend.
self.assert_third_party_accounts_state(get_request, linked=False)
self.assert_social_auth_does_not_exist_for_user(get_request.user, strategy)
# We should be redirected back to the complete page, setting
# the "logged in" cookie for the marketing site.
self.assert_logged_in_cookie_redirect(
self.do_complete(strategy, get_request, partial_pipeline_token, partial_data)
)
# Set the cookie and try again
self.set_logged_in_cookies(get_request)
# Fire off the auth pipeline to link.
self.assert_redirect_after_pipeline_completes(
self.do_complete(strategy, get_request, partial_pipeline_token, partial_data)
)
# Now we expect to be in the linked state, with a backend entry.
self.assert_social_auth_exists_for_user(get_request.user, strategy)
self.assert_third_party_accounts_state(get_request, linked=True)
def test_full_pipeline_succeeds_for_unlinking_account(self):
# First, create, the GET request and strategy that store pipeline state,
# configure the backend, and mock out wire traffic.
get_request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
get_request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
user = self.create_user_models_for_existing_account(
strategy, "user@example.com", "password", self.get_username()
)
self.assert_social_auth_exists_for_user(user, strategy)
# We're already logged in, so simulate that the cookie is set correctly
self.set_logged_in_cookies(get_request)
# Instrument the pipeline to get to the dashboard with the full
# expected state.
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
actions.do_complete(
get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access
)
post_request = self._get_login_post_request(strategy)
with self._patch_edxmako_current_request(post_request):
login_user(post_request)
actions.do_complete(
post_request.backend,
social_views._do_login, # pylint: disable=protected-access
user=user, # pylint: disable=no-member
request=post_request,
)
# Copy the user that was set on the post_request object back to the original get_request object.
get_request.user = post_request.user
# First we expect that we're in the linked state, with a backend entry.
self.assert_third_party_accounts_state(get_request, linked=True)
self.assert_social_auth_exists_for_user(get_request.user, strategy)
# Fire off the disconnect pipeline to unlink.
self.assert_redirect_after_pipeline_completes(
actions.do_disconnect(
get_request.backend, get_request.user, None, redirect_field_name=auth.REDIRECT_FIELD_NAME
)
)
# Now we expect to be in the unlinked state, with no backend entry.
self.assert_third_party_accounts_state(get_request, linked=False)
self.assert_social_auth_does_not_exist_for_user(user, strategy)
def test_linking_already_associated_account_raises_auth_already_associated(self):
# This is of a piece with
# test_already_associated_exception_populates_dashboard_with_error. It
# verifies the exception gets raised when we expect; the latter test
# covers exception handling.
email = "user@example.com"
password = "password"
username = self.get_username()
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
backend = strategy.request.backend
backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
linked_user = self.create_user_models_for_existing_account(strategy, email, password, username)
unlinked_user = social_utils.Storage.user.create_user(
email="other_" + email, password=password, username="other_" + username
)
self.assert_social_auth_exists_for_user(linked_user, strategy)
self.assert_social_auth_does_not_exist_for_user(unlinked_user, strategy)
with pytest.raises(exceptions.AuthAlreadyAssociated):
# pylint: disable=protected-access
actions.do_complete(backend, social_views._do_login, user=unlinked_user, request=strategy.request)
def test_already_associated_exception_populates_dashboard_with_error(self):
# Instrument the pipeline with an exception. We test that the
# exception is raised correctly separately, so it's ok that we're
# raising it artificially here. This makes the linked=True artificial
# in the final assert because in practice the account would be
# unlinked, but getting that behavior is cumbersome here and already
# covered in other tests. Using linked=True does, however, let us test
# that the duplicate error has no effect on the state of the controls.
get_request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
user = self.create_user_models_for_existing_account(
strategy, "user@example.com", "password", self.get_username()
)
self.assert_social_auth_exists_for_user(user, strategy)
self.client.get("/login")
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
actions.do_complete(
get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access
)
post_request = self._get_login_post_request(strategy)
with self._patch_edxmako_current_request(post_request):
login_user(post_request)
actions.do_complete(
post_request.backend,
social_views._do_login, # pylint: disable=protected-access, no-member
user=user,
request=post_request,
)
# Monkey-patch storage for messaging; pylint: disable=protected-access
post_request._messages = fallback.FallbackStorage(post_request)
middleware.ExceptionMiddleware(get_response=lambda request: None).process_exception(
post_request, exceptions.AuthAlreadyAssociated(self.provider.backend_name, "account is already in use.")
)
self.assert_third_party_accounts_state(post_request, duplicate=True, linked=True)
@mock.patch("common.djangoapps.third_party_auth.pipeline.segment.track")
def test_full_pipeline_succeeds_for_signing_in_to_existing_active_account(self, _mock_segment_track):
# First, create, the GET request and strategy that store pipeline state,
# configure the backend, and mock out wire traffic.
get_request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
user = self.create_user_models_for_existing_account(
strategy, "user@example.com", "password", self.get_username()
)
partial_pipeline_token = strategy.session_get("partial_pipeline_token")
partial_data = strategy.storage.partial.load(partial_pipeline_token)
self.assert_social_auth_exists_for_user(user, strategy)
assert user.is_active
# Begin! Ensure that the login form contains expected controls before
# the user starts the pipeline.
self.assert_login_response_before_pipeline_looks_correct(self.client.get("/login"))
# The pipeline starts by a user GETting /auth/login/<provider>.
# Synthesize that request and check that it redirects to the correct
# provider page.
self.assert_redirect_to_provider_looks_correct(
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
)
# Next, the provider makes a request against /auth/complete/<provider>
# to resume the pipeline.
# pylint: disable=protected-access
self.assert_redirect_to_login_looks_correct(
actions.do_complete(get_request.backend, social_views._do_login, request=get_request)
)
# At this point we know the pipeline has resumed correctly. Next we
# fire off the view that displays the login form and posts it via JS.
with self._patch_edxmako_current_request(strategy.request):
self.assert_login_response_in_pipeline_looks_correct(login_and_registration_form(strategy.request))
# Next, we invoke the view that handles the POST, and expect it
# redirects to /auth/complete. In the browser ajax handlers will
# redirect the user to the dashboard; we invoke it manually here.
post_request = self._get_login_post_request(strategy)
self.assert_json_success_response_looks_correct(login_user(post_request), verify_redirect_url=True)
# We should be redirected back to the complete page, setting
# the "logged in" cookie for the marketing site.
self.assert_logged_in_cookie_redirect(
actions.do_complete(
post_request.backend,
social_views._do_login,
post_request.user,
None, # pylint: disable=protected-access, no-member
redirect_field_name=auth.REDIRECT_FIELD_NAME,
request=post_request,
)
)
# Set the cookie and try again
self.set_logged_in_cookies(get_request)
# Copy the user that was set on the post_request object back to the original get_request object.
get_request.user = post_request.user
self.assert_redirect_after_pipeline_completes(
self.do_complete(strategy, get_request, partial_pipeline_token, partial_data, user)
)
self.assert_third_party_accounts_state(get_request)
def test_signin_fails_if_account_not_active(self):
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
user = self.create_user_models_for_existing_account(
strategy, "user@example.com", "password", self.get_username()
)
user.is_active = False
user.save()
post_request = self._get_login_post_request(strategy)
with self._patch_edxmako_current_request(post_request):
self.assert_json_failure_response_is_inactive_account(login_user(post_request))
def test_signin_fails_if_no_account_associated(self):
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
self.create_user_models_for_existing_account(
strategy, "user@example.com", "password", self.get_username(), skip_social_auth=True
)
post_request = self._get_login_post_request(strategy)
self.assert_json_failure_response_is_missing_social_auth(login_user(post_request))
def test_signin_associates_user_if_oauth_provider_and_tpa_is_required(self):
username, email, password = self.get_username(), "user@example.com", "password"
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
user = self.create_user_models_for_existing_account(strategy, email, password, username, skip_social_auth=True)
with mock.patch(
"common.djangoapps.third_party_auth.pipeline.get_associated_user_by_email_response",
return_value=[{"user": user}, True],
):
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
post_request = self._get_login_post_request(strategy)
self.assert_json_success_response_looks_correct(login_user(post_request), verify_redirect_url=True)
def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_email_in_request(self):
self.assert_first_party_auth_trumps_third_party_auth(email="user@example.com")
def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_password_in_request(self):
self.assert_first_party_auth_trumps_third_party_auth(password="password")
def test_first_party_auth_trumps_third_party_auth_and_fails_when_credentials_bad(self):
self.assert_first_party_auth_trumps_third_party_auth(
email="user@example.com", password="password", success=False
)
def test_first_party_auth_trumps_third_party_auth_and_succeeds_when_credentials_good(self):
self.assert_first_party_auth_trumps_third_party_auth(
email="user@example.com", password="password", success=True
)
def test_pipeline_redirects_to_requested_url(self):
requested_redirect_url = "foo" # something different from '/dashboard'
request, strategy = self.get_request_and_strategy(redirect_uri="social:complete")
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
request.session[pipeline.AUTH_REDIRECT_KEY] = requested_redirect_url
user = self.create_user_models_for_existing_account(strategy, "user@foo.com", "password", self.get_username())
self.set_logged_in_cookies(request)
self.assert_redirect_after_pipeline_completes(
actions.do_complete(
request.backend,
social_views._do_login, # pylint: disable=protected-access
user=user,
request=request,
),
requested_redirect_url,
)
def test_full_pipeline_succeeds_registering_new_account(self):
# First, create, the request and strategy that store pipeline state.
# Mock out wire traffic.
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete"
)
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
partial_pipeline_token = strategy.session_get("partial_pipeline_token")
partial_data = strategy.storage.partial.load(partial_pipeline_token)
# Begin! Grab the registration page and check the login control on it.
self.assert_register_response_before_pipeline_looks_correct(self.client.get("/register"))
# The pipeline starts by a user GETting /auth/login/<provider>.
# Synthesize that request and check that it redirects to the correct
# provider page.
self.assert_redirect_to_provider_looks_correct(
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
)
# Next, the provider makes a request against /auth/complete/<provider>.
# pylint: disable=protected-access
self.assert_redirect_to_register_looks_correct(
actions.do_complete(request.backend, social_views._do_login, request=request)
)
# At this point we know the pipeline has resumed correctly. Next we
# fire off the view that displays the registration form.
with self._patch_edxmako_current_request(request):
self.assert_register_form_populates_unicode_username_correctly(request)
self.assert_register_response_in_pipeline_looks_correct(
login_and_registration_form(strategy.request, initial_mode="register"),
pipeline.get(request)["kwargs"],
["name", "username", "email"],
)
# Next, we invoke the view that handles the POST. Not all providers
# supply email. Manually add it as the user would have to; this
# also serves as a test of overriding provider values. Always provide a
# password for us to check that we override it properly.
overridden_password = strategy.request.POST.get("password")
email = "new@example.com"
if not strategy.request.POST.get("email"):
strategy.request.POST = self.get_registration_post_vars({"email": email})
# The user must not exist yet...
with pytest.raises(auth_models.User.DoesNotExist):
self.get_user_by_email(strategy, email)
# ...but when we invoke create_account the existing edX view will make
# it, but not social auths. The pipeline creates those later.
with self._patch_edxmako_current_request(strategy.request):
self.assert_json_success_response_looks_correct(create_account(strategy.request), verify_redirect_url=False)
# We've overridden the user's password, so authenticate() with the old
# value won't work:
created_user = self.get_user_by_email(strategy, email)
self.assert_password_overridden_by_pipeline(overridden_password, created_user.username)
# At this point the user object exists, but there is no associated
# social auth.
self.assert_social_auth_does_not_exist_for_user(created_user, strategy)
# We should be redirected back to the complete page, setting
# the "logged in" cookie for the marketing site.
self.assert_logged_in_cookie_redirect(self.do_complete(strategy, request, partial_pipeline_token, partial_data))
# Set the cookie and try again
self.set_logged_in_cookies(request)
self.assert_redirect_after_pipeline_completes(
self.do_complete(strategy, request, partial_pipeline_token, partial_data, created_user)
)
# Their third party account should now be linked.
self.assert_social_auth_exists_for_user(created_user, strategy)
self.assert_third_party_accounts_state(request, linked=True)
def test_new_account_registration_assigns_distinct_username_on_collision(self):
original_username = self.get_username()
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete"
)
# Create a colliding username in the backend, then proceed with
# assignment via pipeline to make sure a distinct username is created.
strategy.storage.user.create_user(username=self.get_username(), email="user@email.com", password="password")
backend = strategy.request.backend
backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
# pylint: disable=protected-access
response = actions.do_complete(backend, social_views._do_login, request=request)
assert response.status_code == 302
response = json.loads(create_account(strategy.request).content.decode("utf-8"))
assert response["username"] != original_username
def test_new_account_registration_fails_if_email_exists(self):
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete"
)
backend = strategy.request.backend
backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
# pylint: disable=protected-access
self.assert_redirect_to_register_looks_correct(
actions.do_complete(backend, social_views._do_login, request=request)
)
with self._patch_edxmako_current_request(request):
self.assert_register_response_in_pipeline_looks_correct(
login_and_registration_form(strategy.request, initial_mode="register"),
pipeline.get(request)["kwargs"],
["name", "username", "email"],
)
with self._patch_edxmako_current_request(strategy.request):
strategy.request.POST = self.get_registration_post_vars()
# Create twice: once successfully, and once causing a collision.
create_account(strategy.request)
self.assert_json_failure_response_is_username_collision(create_account(strategy.request))
def test_pipeline_raises_auth_entry_error_if_auth_entry_invalid(self):
auth_entry = "invalid"
assert auth_entry not in pipeline._AUTH_ENTRY_CHOICES # pylint: disable=protected-access
_, strategy = self.get_request_and_strategy(auth_entry=auth_entry, redirect_uri="social:complete")
with pytest.raises(pipeline.AuthEntryError):
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
def test_pipeline_assumes_login_if_auth_entry_missing(self):
_, strategy = self.get_request_and_strategy(auth_entry=None, redirect_uri="social:complete")
response = self.fake_auth_complete(strategy)
assert response.url == reverse("signin_user")
def assert_first_party_auth_trumps_third_party_auth(self, email=None, password=None, success=None):
"""Asserts first party auth was used in place of third party auth.
Args:
email: string. The user's email. If not None, will be set on POST.
password: string. The user's password. If not None, will be set on
POST.
success: None or bool. Whether we expect auth to be successful. Set
to None to indicate we expect the request to be invalid (meaning
one of username or password will be missing).
"""
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
)
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
self.create_user_models_for_existing_account(
strategy, email, password, self.get_username(), skip_social_auth=True
)
post_request = self._get_login_post_request(strategy)
post_request.POST = dict(post_request.POST)
if email:
post_request.POST["email"] = email
if password:
post_request.POST["password"] = "bad_" + password if success is False else password
self.assert_pipeline_running(post_request)
payload = json.loads(login_user(post_request).content.decode("utf-8"))
if success is None:
# Request malformed -- just one of email/password given.
assert not payload.get("success")
assert "There was an error receiving your login information" in payload.get("value")
elif success:
# Request well-formed and credentials good.
assert payload.get("success")
else:
# Request well-formed but credentials bad.
assert not payload.get("success")
assert "incorrect" in payload.get("value")
def get_response_data(self):
"""Gets a dict of response data of the form given by the provider.
To determine what the provider returns, drop into a debugger in your
provider's do_auth implementation. Providers may merge different kinds
of data (for example, data about the user and data about the user's
credentials).
"""
raise NotImplementedError
def get_username(self):
"""Gets username based on response data from a provider.
Each provider has different logic for username generation. Sadly,
this is not extracted into its own method in python-social-auth, so we
must provide a getter ourselves.
Note that this is the *initial* value the framework will attempt to use.
If it collides, the pipeline will generate a new username. We extract
it here so we can force collisions in a polymorphic way.
"""
raise NotImplementedError
def do_complete(self, strategy, request, partial_pipeline_token, partial_data, user=None):
"""
Makes sure that strategy store includes the partial data object before
calling actions.do_complete
"""
strategy.storage.partial.store(partial_data)
if not user:
user = request.user
return actions.do_complete(
request.backend,
social_views._do_login, # pylint: disable=protected-access
user,
None,
redirect_field_name=auth.REDIRECT_FIELD_NAME,
request=request,
partial_token=partial_pipeline_token,
)
# pylint: disable=abstract-method
@django_utils.override_settings(ECOMMERCE_API_URL=TEST_API_URL)
class Oauth2IntegrationTest(IntegrationTest): # lint-amnesty, pylint: disable=test-inherits-tests
"""Base test case for integration tests of Oauth2 providers."""
# Dict of string -> object. Information about the token granted to the
# user. Override with test values in subclass; None to force a throw.
TOKEN_RESPONSE_DATA = None
# Dict of string -> object. Information about the user themself. Override
# with test values in subclass; None to force a throw.
USER_RESPONSE_DATA = None
def get_response_data(self):
"""Gets dict (string -> object) of merged data about the user."""
response_data = dict(self.TOKEN_RESPONSE_DATA)
response_data.update(self.USER_RESPONSE_DATA)
return response_data