feat!: Legacy account, profile, order history removal (#36219)
* feat!: Legacy account, profile, order history removal This removes the legacy account and profile applications, and the order history page. This is primarily a reapplication of #31893, which was rolled back due to prior blockers. FIXES: APER-3884 FIXES: openedx/public-engineering#71 Co-authored-by: Muhammad Abdullah Waheed <42172960+abdullahwaheed@users.noreply.github.com> Co-authored-by: Bilal Qamar <59555732+BilalQamar95@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
"""
|
||||
Test that various filters are fired for models/views in the student app.
|
||||
"""
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
from django.test import override_settings
|
||||
from django.urls import reverse
|
||||
@@ -421,7 +422,7 @@ class StudentDashboardFiltersTest(ModuleStoreTestCase):
|
||||
response = self.client.get(self.dashboard_url)
|
||||
|
||||
self.assertEqual(status.HTTP_302_FOUND, response.status_code)
|
||||
self.assertEqual(reverse("account_settings"), response.url)
|
||||
self.assertEqual(settings.ACCOUNT_MICROFRONTEND_URL, response.url)
|
||||
|
||||
@override_settings(
|
||||
OPEN_EDX_FILTERS_CONFIG={
|
||||
|
||||
@@ -233,7 +233,7 @@ class StudentDashboardTests(SharedModuleStoreTestCase, MilestonesTestCaseMixin,
|
||||
"""
|
||||
UserProfile.objects.get(user=self.user).delete()
|
||||
response = self.client.get(self.path)
|
||||
self.assertRedirects(response, reverse('account_settings'))
|
||||
self.assertRedirects(response, settings.ACCOUNT_MICROFRONTEND_URL, target_status_code=302)
|
||||
|
||||
@patch('common.djangoapps.student.views.dashboard.learner_home_mfe_enabled')
|
||||
def test_redirect_to_learner_home(self, mock_learner_home_mfe_enabled):
|
||||
|
||||
@@ -518,7 +518,7 @@ def student_dashboard(request): # lint-amnesty, pylint: disable=too-many-statem
|
||||
"""
|
||||
user = request.user
|
||||
if not UserProfile.objects.filter(user=user).exists():
|
||||
return redirect(reverse('account_settings'))
|
||||
return redirect(settings.ACCOUNT_MICROFRONTEND_URL)
|
||||
|
||||
if learner_home_mfe_enabled():
|
||||
return redirect(settings.LEARNER_HOME_MICROFRONTEND_URL)
|
||||
@@ -623,7 +623,7 @@ def student_dashboard(request): # lint-amnesty, pylint: disable=too-many-statem
|
||||
"Go to {link_start}your Account Settings{link_end}.")
|
||||
).format(
|
||||
link_start=HTML("<a href='{account_setting_page}'>").format(
|
||||
account_setting_page=reverse('account_settings'),
|
||||
account_setting_page=settings.ACCOUNT_MICROFRONTEND_URL,
|
||||
),
|
||||
link_end=HTML("</a>")
|
||||
)
|
||||
@@ -892,7 +892,7 @@ def student_dashboard(request): # lint-amnesty, pylint: disable=too-many-statem
|
||||
except DashboardRenderStarted.RenderInvalidDashboard as exc:
|
||||
response = render_to_response(exc.dashboard_template, exc.template_context)
|
||||
except DashboardRenderStarted.RedirectToPage as exc:
|
||||
response = HttpResponseRedirect(exc.redirect_to or reverse('account_settings'))
|
||||
response = HttpResponseRedirect(exc.redirect_to or settings.ACCOUNT_MICROFRONTEND_URL)
|
||||
except DashboardRenderStarted.RenderCustomResponse as exc:
|
||||
response = exc.response
|
||||
else:
|
||||
|
||||
@@ -2,10 +2,11 @@
|
||||
Tests for the Third Party Auth REST API
|
||||
"""
|
||||
|
||||
import urllib
|
||||
from unittest.mock import patch
|
||||
|
||||
import ddt
|
||||
import six
|
||||
from django.conf import settings
|
||||
from django.http import QueryDict
|
||||
from django.test.utils import override_settings
|
||||
from django.urls import reverse
|
||||
@@ -219,7 +220,7 @@ class UserViewV2APITests(UserViewsMixin, TpaAPITestCase):
|
||||
"""
|
||||
return '?'.join([
|
||||
reverse('third_party_auth_users_api_v2'),
|
||||
six.moves.urllib.parse.urlencode(identifier)
|
||||
urllib.parse.urlencode(identifier)
|
||||
])
|
||||
|
||||
|
||||
@@ -377,11 +378,12 @@ class TestThirdPartyAuthUserStatusView(ThirdPartyAuthTestMixin, APITestCase):
|
||||
"""
|
||||
self.client.login(username=self.user.username, password=PASSWORD)
|
||||
response = self.client.get(self.url, content_type="application/json")
|
||||
next_url = urllib.parse.quote(settings.ACCOUNT_MICROFRONTEND_URL, safe="")
|
||||
assert response.status_code == 200
|
||||
assert (response.data ==
|
||||
[{
|
||||
'accepts_logins': True, 'name': 'Google',
|
||||
'disconnect_url': '/auth/disconnect/google-oauth2/?',
|
||||
'connect_url': '/auth/login/google-oauth2/?auth_entry=account_settings&next=%2Faccount%2Fsettings',
|
||||
'connect_url': f'/auth/login/google-oauth2/?auth_entry=account_settings&next={next_url}',
|
||||
'connected': False, 'id': 'oa2-google-oauth2'
|
||||
}])
|
||||
|
||||
@@ -9,7 +9,6 @@ from django.conf import settings
|
||||
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
|
||||
from django.db.models import Q
|
||||
from django.http import Http404
|
||||
from django.urls import reverse
|
||||
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
|
||||
from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser
|
||||
from rest_framework import exceptions, permissions, status, throttling
|
||||
@@ -425,7 +424,7 @@ class ThirdPartyAuthUserStatusView(APIView):
|
||||
state.provider.provider_id,
|
||||
pipeline.AUTH_ENTRY_ACCOUNT_SETTINGS,
|
||||
# The url the user should be directed to after the auth process has completed.
|
||||
redirect_url=reverse('account_settings'),
|
||||
redirect_url=settings.ACCOUNT_MICROFRONTEND_URL,
|
||||
),
|
||||
'accepts_logins': state.provider.accepts_logins,
|
||||
# If the user is connected, sending a POST request to this url removes the connection
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
Base integration test for provider implementations.
|
||||
"""
|
||||
|
||||
|
||||
import json
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
@@ -11,7 +10,7 @@ from unittest import mock
|
||||
import pytest
|
||||
from django import test
|
||||
from django.conf import settings
|
||||
from django.contrib import auth
|
||||
from django.contrib import auth, messages
|
||||
from django.contrib.auth import models as auth_models
|
||||
from django.contrib.messages.storage import fallback
|
||||
from django.contrib.sessions.backends import cache
|
||||
@@ -28,7 +27,6 @@ from openedx.core.djangoapps.user_authn.views.login_form import login_and_regist
|
||||
from openedx.core.djangoapps.user_authn.views.register import RegistrationView
|
||||
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
|
||||
from openedx.core.djangoapps.site_configuration.tests.factories import SiteFactory
|
||||
from openedx.core.djangoapps.user_api.accounts.settings_views import account_settings_context
|
||||
from common.djangoapps.student import models as student_models
|
||||
from common.djangoapps.student.tests.factories import UserFactory
|
||||
|
||||
@@ -56,9 +54,9 @@ class HelperMixin:
|
||||
test_username (str): username to check the form initialization with.
|
||||
expected (str): expected cleaned username after the form initialization.
|
||||
"""
|
||||
form_data['username'] = test_username
|
||||
form_data["username"] = test_username
|
||||
form_field_data = self.provider.get_register_form_data(form_data)
|
||||
assert form_field_data['username'] == expected
|
||||
assert form_field_data["username"] == expected
|
||||
|
||||
def assert_redirect_to_provider_looks_correct(self, response):
|
||||
"""Asserts the redirect to the provider's site looks correct.
|
||||
@@ -70,9 +68,11 @@ class HelperMixin:
|
||||
example, more details about the format of the Location header.
|
||||
"""
|
||||
assert 302 == response.status_code
|
||||
assert response.has_header('Location')
|
||||
assert response.has_header("Location")
|
||||
|
||||
def assert_register_response_in_pipeline_looks_correct(self, response, pipeline_kwargs, required_fields): # lint-amnesty, pylint: disable=invalid-name
|
||||
def assert_register_response_in_pipeline_looks_correct(
|
||||
self, response, pipeline_kwargs, required_fields
|
||||
): # lint-amnesty, pylint: disable=invalid-name
|
||||
"""Performs spot checks of the rendered register.html page.
|
||||
|
||||
When we display the new account registration form after the user signs
|
||||
@@ -84,10 +84,7 @@ class HelperMixin:
|
||||
assertions in your test, override this method.
|
||||
"""
|
||||
# Check that the correct provider was selected.
|
||||
self.assertContains(
|
||||
response,
|
||||
'"errorMessage": null'
|
||||
)
|
||||
self.assertContains(response, '"errorMessage": null')
|
||||
self.assertContains(
|
||||
response,
|
||||
f'"currentProvider": "{self.provider.name}"',
|
||||
@@ -99,46 +96,68 @@ class HelperMixin:
|
||||
if prepopulated_form_data in required_fields:
|
||||
self.assertContains(response, form_field_data[prepopulated_form_data])
|
||||
|
||||
def assert_register_form_populates_unicode_username_correctly(self, request): # lint-amnesty, pylint: disable=invalid-name
|
||||
def _get_user_providers_state(self, request):
|
||||
"""
|
||||
Return provider user states and duplicated providers.
|
||||
"""
|
||||
data = {
|
||||
"auth": {},
|
||||
}
|
||||
data["duplicate_provider"] = pipeline.get_duplicate_provider(messages.get_messages(request))
|
||||
auth_states = pipeline.get_provider_user_states(request.user)
|
||||
data["auth"]["providers"] = [
|
||||
{
|
||||
"name": state.provider.name,
|
||||
"connected": state.has_account,
|
||||
}
|
||||
for state in auth_states
|
||||
if state.provider.display_for_login or state.has_account
|
||||
]
|
||||
return data
|
||||
|
||||
def assert_third_party_accounts_state(self, request, duplicate=False, linked=None):
|
||||
"""
|
||||
Asserts the user's third party account in the expected state.
|
||||
|
||||
If duplicate is True, we expect data['duplicate_provider'] to contain
|
||||
the duplicate provider backend name. If linked is passed, we conditionally
|
||||
check that the provider is included in data['auth']['providers'] and
|
||||
its connected state is correct.
|
||||
"""
|
||||
data = self._get_user_providers_state(request)
|
||||
if duplicate:
|
||||
assert data["duplicate_provider"] == self.provider.backend_name
|
||||
else:
|
||||
assert data["duplicate_provider"] is None
|
||||
|
||||
if linked is not None:
|
||||
expected_provider = [
|
||||
provider for provider in data["auth"]["providers"] if provider["name"] == self.provider.name
|
||||
][0]
|
||||
assert expected_provider is not None
|
||||
assert expected_provider["connected"] == linked
|
||||
|
||||
def assert_register_form_populates_unicode_username_correctly(
|
||||
self, request
|
||||
): # lint-amnesty, pylint: disable=invalid-name
|
||||
"""
|
||||
Check the registration form username field behaviour with unicode values.
|
||||
|
||||
The field could be empty or prefilled depending on whether ENABLE_UNICODE_USERNAME feature is disabled/enabled.
|
||||
"""
|
||||
unicode_username = 'Червона_Калина'
|
||||
ascii_substring = 'untouchable'
|
||||
unicode_username = "Червона_Калина"
|
||||
ascii_substring = "untouchable"
|
||||
partial_unicode_username = unicode_username + ascii_substring
|
||||
pipeline_kwargs = pipeline.get(request)['kwargs']
|
||||
pipeline_kwargs = pipeline.get(request)["kwargs"]
|
||||
|
||||
assert settings.FEATURES['ENABLE_UNICODE_USERNAME'] is False
|
||||
assert settings.FEATURES["ENABLE_UNICODE_USERNAME"] is False
|
||||
|
||||
self._check_registration_form_username(pipeline_kwargs, unicode_username, '')
|
||||
self._check_registration_form_username(pipeline_kwargs, unicode_username, "")
|
||||
self._check_registration_form_username(pipeline_kwargs, partial_unicode_username, ascii_substring)
|
||||
|
||||
with mock.patch.dict('django.conf.settings.FEATURES', {'ENABLE_UNICODE_USERNAME': True}):
|
||||
with mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_UNICODE_USERNAME": True}):
|
||||
self._check_registration_form_username(pipeline_kwargs, unicode_username, unicode_username)
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def assert_account_settings_context_looks_correct(self, context, duplicate=False, linked=None):
|
||||
"""Asserts the user's account settings page context is in the expected state.
|
||||
|
||||
If duplicate is True, we expect context['duplicate_provider'] to contain
|
||||
the duplicate provider backend name. If linked is passed, we conditionally
|
||||
check that the provider is included in context['auth']['providers'] and
|
||||
its connected state is correct.
|
||||
"""
|
||||
if duplicate:
|
||||
assert context['duplicate_provider'] == self.provider.backend_name
|
||||
else:
|
||||
assert context['duplicate_provider'] is None
|
||||
|
||||
if linked is not None:
|
||||
expected_provider = [
|
||||
provider for provider in context['auth']['providers'] if provider['name'] == self.provider.name
|
||||
][0]
|
||||
assert expected_provider is not None
|
||||
assert expected_provider['connected'] == linked
|
||||
|
||||
def assert_exception_redirect_looks_correct(self, expected_uri, auth_entry=None):
|
||||
"""Tests middleware conditional redirection.
|
||||
|
||||
@@ -147,49 +166,48 @@ class HelperMixin:
|
||||
"""
|
||||
exception_middleware = middleware.ExceptionMiddleware(get_response=lambda request: None)
|
||||
request, _ = self.get_request_and_strategy(auth_entry=auth_entry)
|
||||
response = exception_middleware.process_exception(
|
||||
request, exceptions.AuthCanceled(request.backend))
|
||||
location = response.get('Location')
|
||||
response = exception_middleware.process_exception(request, exceptions.AuthCanceled(request.backend))
|
||||
location = response.get("Location")
|
||||
|
||||
assert 302 == response.status_code
|
||||
assert 'canceled' in location
|
||||
assert "canceled" in location
|
||||
assert self.backend_name in location
|
||||
assert location.startswith(expected_uri + '?')
|
||||
assert location.startswith(expected_uri + "?")
|
||||
|
||||
def assert_json_failure_response_is_inactive_account(self, response):
|
||||
"""Asserts failure on /login for inactive account looks right."""
|
||||
assert 400 == response.status_code
|
||||
payload = json.loads(response.content.decode('utf-8'))
|
||||
payload = json.loads(response.content.decode("utf-8"))
|
||||
context = {
|
||||
'platformName': configuration_helpers.get_value('PLATFORM_NAME', settings.PLATFORM_NAME),
|
||||
'supportLink': configuration_helpers.get_value('SUPPORT_SITE_LINK', settings.SUPPORT_SITE_LINK)
|
||||
"platformName": configuration_helpers.get_value("PLATFORM_NAME", settings.PLATFORM_NAME),
|
||||
"supportLink": configuration_helpers.get_value("SUPPORT_SITE_LINK", settings.SUPPORT_SITE_LINK),
|
||||
}
|
||||
|
||||
assert not payload.get('success')
|
||||
assert 'inactive-user' in payload.get('error_code')
|
||||
assert context == payload.get('context')
|
||||
assert not payload.get("success")
|
||||
assert "inactive-user" in payload.get("error_code")
|
||||
assert context == payload.get("context")
|
||||
|
||||
def assert_json_failure_response_is_missing_social_auth(self, response):
|
||||
"""Asserts failure on /login for missing social auth looks right."""
|
||||
assert 403 == response.status_code
|
||||
payload = json.loads(response.content.decode('utf-8'))
|
||||
assert not payload.get('success')
|
||||
assert payload.get('error_code') == 'third-party-auth-with-no-linked-account'
|
||||
payload = json.loads(response.content.decode("utf-8"))
|
||||
assert not payload.get("success")
|
||||
assert payload.get("error_code") == "third-party-auth-with-no-linked-account"
|
||||
|
||||
def assert_json_failure_response_is_username_collision(self, response):
|
||||
"""Asserts the json response indicates a username collision."""
|
||||
assert 409 == response.status_code
|
||||
payload = json.loads(response.content.decode('utf-8'))
|
||||
assert not payload.get('success')
|
||||
assert 'It looks like this username is already taken' == payload['username'][0]['user_message']
|
||||
payload = json.loads(response.content.decode("utf-8"))
|
||||
assert not payload.get("success")
|
||||
assert "It looks like this username is already taken" == payload["username"][0]["user_message"]
|
||||
|
||||
def assert_json_success_response_looks_correct(self, response, verify_redirect_url):
|
||||
"""Asserts the json response indicates success and redirection."""
|
||||
assert 200 == response.status_code
|
||||
payload = json.loads(response.content.decode('utf-8'))
|
||||
assert payload.get('success')
|
||||
payload = json.loads(response.content.decode("utf-8"))
|
||||
assert payload.get("success")
|
||||
if verify_redirect_url:
|
||||
assert pipeline.get_complete_url(self.provider.backend_name) == payload.get('redirect_url')
|
||||
assert pipeline.get_complete_url(self.provider.backend_name) == payload.get("redirect_url")
|
||||
|
||||
def assert_login_response_before_pipeline_looks_correct(self, response):
|
||||
"""Asserts a GET of /login not in the pipeline looks correct."""
|
||||
@@ -218,19 +236,19 @@ class HelperMixin:
|
||||
assert 302 == response.status_code
|
||||
# NOTE: Ideally we should use assertRedirects(), however it errors out due to the hostname, testserver,
|
||||
# not being properly set. This may be an issue with the call made by PSA, but we are not certain.
|
||||
assert response.get('Location').endswith(
|
||||
assert response.get("Location").endswith(
|
||||
expected_redirect_url or django_settings.SOCIAL_AUTH_LOGIN_REDIRECT_URL
|
||||
)
|
||||
|
||||
def assert_redirect_to_login_looks_correct(self, response):
|
||||
"""Asserts a response would redirect to /login."""
|
||||
assert 302 == response.status_code
|
||||
assert '/login' == response.get('Location')
|
||||
assert "/login" == response.get("Location")
|
||||
|
||||
def assert_redirect_to_register_looks_correct(self, response):
|
||||
"""Asserts a response would redirect to /register."""
|
||||
assert 302 == response.status_code
|
||||
assert '/register' == response.get('Location')
|
||||
assert "/register" == response.get("Location")
|
||||
|
||||
def assert_register_response_before_pipeline_looks_correct(self, response):
|
||||
"""Asserts a GET of /register not in the pipeline looks correct."""
|
||||
@@ -241,43 +259,41 @@ class HelperMixin:
|
||||
|
||||
def assert_social_auth_does_not_exist_for_user(self, user, strategy):
|
||||
"""Asserts a user does not have an auth with the expected provider."""
|
||||
social_auths = strategy.storage.user.get_social_auth_for_user(
|
||||
user, provider=self.provider.backend_name)
|
||||
social_auths = strategy.storage.user.get_social_auth_for_user(user, provider=self.provider.backend_name)
|
||||
assert 0 == len(social_auths)
|
||||
|
||||
def assert_social_auth_exists_for_user(self, user, strategy):
|
||||
"""Asserts a user has a social auth with the expected provider."""
|
||||
social_auths = strategy.storage.user.get_social_auth_for_user(
|
||||
user, provider=self.provider.backend_name)
|
||||
social_auths = strategy.storage.user.get_social_auth_for_user(user, provider=self.provider.backend_name)
|
||||
assert 1 == len(social_auths)
|
||||
assert self.backend_name == social_auths[0].provider
|
||||
|
||||
def assert_logged_in_cookie_redirect(self, response):
|
||||
"""Verify that the user was redirected in order to set the logged in cookie. """
|
||||
"""Verify that the user was redirected in order to set the logged in cookie."""
|
||||
assert response.status_code == 302
|
||||
assert response['Location'] == pipeline.get_complete_url(self.provider.backend_name)
|
||||
assert response.cookies[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME].value == 'true'
|
||||
assert response["Location"] == pipeline.get_complete_url(self.provider.backend_name)
|
||||
assert response.cookies[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME].value == "true"
|
||||
assert django_settings.EDXMKTG_USER_INFO_COOKIE_NAME in response.cookies
|
||||
|
||||
@property
|
||||
def backend_name(self):
|
||||
""" Shortcut for the backend name """
|
||||
"""Shortcut for the backend name"""
|
||||
return self.provider.backend_name
|
||||
|
||||
def get_registration_post_vars(self, overrides=None):
|
||||
"""POST vars generated by the registration form."""
|
||||
defaults = {
|
||||
'username': 'username',
|
||||
'name': 'First Last',
|
||||
'gender': '',
|
||||
'year_of_birth': '',
|
||||
'level_of_education': '',
|
||||
'goals': '',
|
||||
'honor_code': 'true',
|
||||
'terms_of_service': 'true',
|
||||
'password': 'password',
|
||||
'mailing_address': '',
|
||||
'email': 'user@email.com',
|
||||
"username": "username",
|
||||
"name": "First Last",
|
||||
"gender": "",
|
||||
"year_of_birth": "",
|
||||
"level_of_education": "",
|
||||
"goals": "",
|
||||
"honor_code": "true",
|
||||
"terms_of_service": "true",
|
||||
"password": "password",
|
||||
"mailing_address": "",
|
||||
"email": "user@email.com",
|
||||
}
|
||||
|
||||
if overrides:
|
||||
@@ -294,12 +310,13 @@ class HelperMixin:
|
||||
social_django.utils.strategy().
|
||||
"""
|
||||
request = self.request_factory.get(
|
||||
pipeline.get_complete_url(self.backend_name) +
|
||||
'?redirect_state=redirect_state_value&code=code_value&state=state_value')
|
||||
pipeline.get_complete_url(self.backend_name)
|
||||
+ "?redirect_state=redirect_state_value&code=code_value&state=state_value"
|
||||
)
|
||||
request.site = SiteFactory.create()
|
||||
request.user = auth_models.AnonymousUser()
|
||||
request.session = cache.SessionStore()
|
||||
request.session[self.backend_name + '_state'] = 'state_value'
|
||||
request.session[self.backend_name + "_state"] = "state_value"
|
||||
|
||||
if auth_entry:
|
||||
request.session[pipeline.AUTH_ENTRY_KEY] = auth_entry
|
||||
@@ -312,7 +329,7 @@ class HelperMixin:
|
||||
|
||||
def _get_login_post_request(self, strategy):
|
||||
"""Gets a fully-configured login POST request given a strategy and pipeline."""
|
||||
request = self.request_factory.post(reverse('login_api'))
|
||||
request = self.request_factory.post(reverse("login_api"))
|
||||
|
||||
# Note: The shared GET request can't be used for login, which is now POST-only,
|
||||
# so this POST request is given a copy of all configuration from the GET request
|
||||
@@ -329,7 +346,7 @@ class HelperMixin:
|
||||
def _patch_edxmako_current_request(self, request):
|
||||
"""Make ``request`` be the current request for edxmako template rendering."""
|
||||
|
||||
with mock.patch('common.djangoapps.edxmako.request_context.get_current_request', return_value=request):
|
||||
with mock.patch("common.djangoapps.edxmako.request_context.get_current_request", return_value=request):
|
||||
yield
|
||||
|
||||
def get_user_by_email(self, strategy, email):
|
||||
@@ -337,11 +354,13 @@ class HelperMixin:
|
||||
return strategy.storage.user.user_model().objects.get(email=email)
|
||||
|
||||
def set_logged_in_cookies(self, request):
|
||||
"""Simulate setting the marketing site cookie on the request. """
|
||||
request.COOKIES[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME] = 'true'
|
||||
request.COOKIES[django_settings.EDXMKTG_USER_INFO_COOKIE_NAME] = json.dumps({
|
||||
'version': django_settings.EDXMKTG_USER_INFO_COOKIE_VERSION,
|
||||
})
|
||||
"""Simulate setting the marketing site cookie on the request."""
|
||||
request.COOKIES[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME] = "true"
|
||||
request.COOKIES[django_settings.EDXMKTG_USER_INFO_COOKIE_NAME] = json.dumps(
|
||||
{
|
||||
"version": django_settings.EDXMKTG_USER_INFO_COOKIE_VERSION,
|
||||
}
|
||||
)
|
||||
|
||||
def create_user_models_for_existing_account(self, strategy, email, password, username, skip_social_auth=False):
|
||||
"""Creates user, profile, registration, and (usually) social auth.
|
||||
@@ -371,10 +390,10 @@ class HelperMixin:
|
||||
"""
|
||||
args = ()
|
||||
kwargs = {
|
||||
'request': strategy.request,
|
||||
'backend': strategy.request.backend,
|
||||
'user': None,
|
||||
'response': self.get_response_data(),
|
||||
"request": strategy.request,
|
||||
"backend": strategy.request.backend,
|
||||
"user": None,
|
||||
"response": self.get_response_data(),
|
||||
}
|
||||
return strategy.authenticate(*args, **kwargs)
|
||||
|
||||
@@ -386,6 +405,7 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
currently less comprehensive. Some providers are tested with this, others with
|
||||
IntegrationTest.
|
||||
"""
|
||||
|
||||
# Provider information:
|
||||
PROVIDER_NAME = "override"
|
||||
PROVIDER_BACKEND = "override"
|
||||
@@ -399,8 +419,8 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
super().setUp()
|
||||
|
||||
self.request_factory = test.RequestFactory()
|
||||
self.login_page_url = reverse('signin_user')
|
||||
self.register_page_url = reverse('register_user')
|
||||
self.login_page_url = reverse("signin_user")
|
||||
self.register_page_url = reverse("register_user")
|
||||
patcher = testutil.patch_mako_templates()
|
||||
patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
@@ -415,47 +435,44 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
try_login_response = self.client.get(provider_register_url)
|
||||
# The user should be redirected to the provider's login page:
|
||||
assert try_login_response.status_code == 302
|
||||
provider_response = self.do_provider_login(try_login_response['Location'])
|
||||
provider_response = self.do_provider_login(try_login_response["Location"])
|
||||
# We should be redirected to the register screen since this account is not linked to an edX account:
|
||||
assert provider_response.status_code == 302
|
||||
assert provider_response['Location'] == self.register_page_url
|
||||
assert provider_response["Location"] == self.register_page_url
|
||||
register_response = self.client.get(self.register_page_url)
|
||||
tpa_context = register_response.context["data"]["third_party_auth"]
|
||||
assert tpa_context['errorMessage'] is None
|
||||
assert tpa_context["errorMessage"] is None
|
||||
# Check that the "You've successfully signed into [PROVIDER_NAME]" message is shown.
|
||||
assert tpa_context['currentProvider'] == self.PROVIDER_NAME
|
||||
assert tpa_context["currentProvider"] == self.PROVIDER_NAME
|
||||
# Check that the data (e.g. email) from the provider is displayed in the form:
|
||||
form_data = register_response.context['data']['registration_form_desc']
|
||||
form_fields = {field['name']: field for field in form_data['fields']}
|
||||
assert form_fields['email']['defaultValue'] == self.USER_EMAIL
|
||||
assert form_fields['name']['defaultValue'] == self.USER_NAME
|
||||
assert form_fields['username']['defaultValue'] == self.USER_USERNAME
|
||||
form_data = register_response.context["data"]["registration_form_desc"]
|
||||
form_fields = {field["name"]: field for field in form_data["fields"]}
|
||||
assert form_fields["email"]["defaultValue"] == self.USER_EMAIL
|
||||
assert form_fields["name"]["defaultValue"] == self.USER_NAME
|
||||
assert form_fields["username"]["defaultValue"] == self.USER_USERNAME
|
||||
for field_name, value in extra_defaults.items():
|
||||
assert form_fields[field_name]['defaultValue'] == value
|
||||
assert form_fields[field_name]["defaultValue"] == value
|
||||
registration_values = {
|
||||
'email': 'email-edited@tpa-test.none',
|
||||
'name': 'My Customized Name',
|
||||
'username': 'new_username',
|
||||
'honor_code': True,
|
||||
"email": "email-edited@tpa-test.none",
|
||||
"name": "My Customized Name",
|
||||
"username": "new_username",
|
||||
"honor_code": True,
|
||||
}
|
||||
# Now complete the form:
|
||||
ajax_register_response = self.client.post(
|
||||
reverse('user_api_registration'),
|
||||
registration_values
|
||||
)
|
||||
ajax_register_response = self.client.post(reverse("user_api_registration"), registration_values)
|
||||
assert ajax_register_response.status_code == 200
|
||||
# Then the AJAX will finish the third party auth:
|
||||
continue_response = self.client.get(tpa_context["finishAuthUrl"])
|
||||
# And we should be redirected to the dashboard:
|
||||
assert continue_response.status_code == 302
|
||||
assert continue_response['Location'] == reverse('dashboard')
|
||||
assert continue_response["Location"] == reverse("dashboard")
|
||||
|
||||
# Now check that we can login again, whether or not we have yet verified the account:
|
||||
self.client.logout()
|
||||
self._test_return_login(user_is_activated=False)
|
||||
|
||||
self.client.logout()
|
||||
self.verify_user_email('email-edited@tpa-test.none')
|
||||
self.verify_user_email("email-edited@tpa-test.none")
|
||||
self._test_return_login(user_is_activated=True)
|
||||
|
||||
def _test_login(self):
|
||||
@@ -468,27 +485,27 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
try_login_response = self.client.get(provider_login_url)
|
||||
# The user should be redirected to the provider's login page:
|
||||
assert try_login_response.status_code == 302
|
||||
complete_response = self.do_provider_login(try_login_response['Location'])
|
||||
complete_response = self.do_provider_login(try_login_response["Location"])
|
||||
# We should be redirected to the login screen since this account is not linked to an edX account:
|
||||
assert complete_response.status_code == 302
|
||||
assert complete_response['Location'] == self.login_page_url
|
||||
assert complete_response["Location"] == self.login_page_url
|
||||
login_response = self.client.get(self.login_page_url)
|
||||
tpa_context = login_response.context["data"]["third_party_auth"]
|
||||
assert tpa_context['errorMessage'] is None
|
||||
assert tpa_context["errorMessage"] is None
|
||||
# Check that the "You've successfully signed into [PROVIDER_NAME]" message is shown.
|
||||
assert tpa_context['currentProvider'] == self.PROVIDER_NAME
|
||||
assert tpa_context["currentProvider"] == self.PROVIDER_NAME
|
||||
# Now the user enters their username and password.
|
||||
# The AJAX on the page will log them in:
|
||||
ajax_login_response = self.client.post(
|
||||
reverse('user_api_login_session', kwargs={'api_version': 'v1'}),
|
||||
{'email': self.user.email, 'password': 'Password1234'}
|
||||
reverse("user_api_login_session", kwargs={"api_version": "v1"}),
|
||||
{"email": self.user.email, "password": "Password1234"},
|
||||
)
|
||||
assert ajax_login_response.status_code == 200
|
||||
# Then the AJAX will finish the third party auth:
|
||||
continue_response = self.client.get(tpa_context["finishAuthUrl"])
|
||||
# And we should be redirected to the dashboard:
|
||||
assert continue_response.status_code == 302
|
||||
assert continue_response['Location'] == reverse('dashboard')
|
||||
assert continue_response["Location"] == reverse("dashboard")
|
||||
|
||||
# Now check that we can login again:
|
||||
self.client.logout()
|
||||
@@ -502,9 +519,9 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
raise NotImplementedError
|
||||
|
||||
def _test_return_login(self, user_is_activated=True, previous_session_timed_out=False):
|
||||
""" Test logging in to an account that is already linked. """
|
||||
"""Test logging in to an account that is already linked."""
|
||||
# Make sure we're not logged in:
|
||||
dashboard_response = self.client.get(reverse('dashboard'))
|
||||
dashboard_response = self.client.get(reverse("dashboard"))
|
||||
assert dashboard_response.status_code == 302
|
||||
# The user goes to the login page, and sees a button to login with this provider:
|
||||
provider_login_url = self._check_login_page()
|
||||
@@ -512,22 +529,22 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
try_login_response = self.client.get(provider_login_url)
|
||||
# The user should be redirected to the provider:
|
||||
assert try_login_response.status_code == 302
|
||||
login_response = self.do_provider_login(try_login_response['Location'])
|
||||
login_response = self.do_provider_login(try_login_response["Location"])
|
||||
# If the previous session was manually logged out, there will be one weird redirect
|
||||
# required to set the login cookie (it sticks around if the main session times out):
|
||||
if not previous_session_timed_out:
|
||||
assert login_response.status_code == 302
|
||||
assert login_response['Location'] == (self.complete_url + '?')
|
||||
assert login_response["Location"] == (self.complete_url + "?")
|
||||
# And then we should be redirected to the dashboard:
|
||||
login_response = self.client.get(login_response['Location'])
|
||||
login_response = self.client.get(login_response["Location"])
|
||||
assert login_response.status_code == 302
|
||||
if user_is_activated:
|
||||
url_expected = reverse('dashboard')
|
||||
url_expected = reverse("dashboard")
|
||||
else:
|
||||
url_expected = reverse('third_party_inactive_redirect') + '?next=' + reverse('dashboard')
|
||||
assert login_response['Location'] == url_expected
|
||||
url_expected = reverse("third_party_inactive_redirect") + "?next=" + reverse("dashboard")
|
||||
assert login_response["Location"] == url_expected
|
||||
# Now we are logged in:
|
||||
dashboard_response = self.client.get(reverse('dashboard'))
|
||||
dashboard_response = self.client.get(reverse("dashboard"))
|
||||
assert dashboard_response.status_code == 200
|
||||
|
||||
def _check_login_page(self):
|
||||
@@ -545,22 +562,23 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
return self._check_login_or_register_page(self.register_page_url, "registerUrl")
|
||||
|
||||
def _check_login_or_register_page(self, url, url_to_return):
|
||||
""" Shared logic for _check_login_page() and _check_register_page() """
|
||||
"""Shared logic for _check_login_page() and _check_register_page()"""
|
||||
response = self.client.get(url)
|
||||
self.assertContains(response, self.PROVIDER_NAME)
|
||||
context_data = response.context['data']['third_party_auth']
|
||||
provider_urls = {provider['id']: provider[url_to_return] for provider in context_data['providers']}
|
||||
context_data = response.context["data"]["third_party_auth"]
|
||||
provider_urls = {provider["id"]: provider[url_to_return] for provider in context_data["providers"]}
|
||||
assert self.PROVIDER_ID in provider_urls
|
||||
return provider_urls[self.PROVIDER_ID]
|
||||
|
||||
@property
|
||||
def complete_url(self):
|
||||
""" Get the auth completion URL for this provider """
|
||||
return reverse('social:complete', kwargs={'backend': self.PROVIDER_BACKEND})
|
||||
"""Get the auth completion URL for this provider"""
|
||||
return reverse("social:complete", kwargs={"backend": self.PROVIDER_BACKEND})
|
||||
|
||||
|
||||
@unittest.skipUnless(
|
||||
testutil.AUTH_FEATURES_KEY in django_settings.FEATURES, testutil.AUTH_FEATURES_KEY + ' not in settings.FEATURES')
|
||||
testutil.AUTH_FEATURES_KEY in django_settings.FEATURES, testutil.AUTH_FEATURES_KEY + " not in settings.FEATURES"
|
||||
)
|
||||
@django_utils.override_settings() # For settings reversion on a method-by-method basis.
|
||||
class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
"""Abstract base class for provider integration tests."""
|
||||
@@ -572,46 +590,51 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
# Actual tests, executed once per child.
|
||||
|
||||
def test_canceling_authentication_redirects_to_login_when_auth_entry_login(self):
|
||||
self.assert_exception_redirect_looks_correct('/login', auth_entry=pipeline.AUTH_ENTRY_LOGIN)
|
||||
self.assert_exception_redirect_looks_correct("/login", auth_entry=pipeline.AUTH_ENTRY_LOGIN)
|
||||
|
||||
def test_canceling_authentication_redirects_to_register_when_auth_entry_register(self):
|
||||
self.assert_exception_redirect_looks_correct('/register', auth_entry=pipeline.AUTH_ENTRY_REGISTER)
|
||||
self.assert_exception_redirect_looks_correct("/register", auth_entry=pipeline.AUTH_ENTRY_REGISTER)
|
||||
|
||||
def test_canceling_authentication_redirects_to_account_settings_when_auth_entry_account_settings(self):
|
||||
self.assert_exception_redirect_looks_correct(
|
||||
'/account/settings', auth_entry=pipeline.AUTH_ENTRY_ACCOUNT_SETTINGS
|
||||
"/account/settings", auth_entry=pipeline.AUTH_ENTRY_ACCOUNT_SETTINGS
|
||||
)
|
||||
|
||||
def test_canceling_authentication_redirects_to_root_when_auth_entry_not_set(self):
|
||||
self.assert_exception_redirect_looks_correct('/')
|
||||
self.assert_exception_redirect_looks_correct("/")
|
||||
|
||||
@mock.patch('common.djangoapps.third_party_auth.pipeline.segment.track')
|
||||
@mock.patch("common.djangoapps.third_party_auth.pipeline.segment.track")
|
||||
def test_full_pipeline_succeeds_for_linking_account(self, _mock_segment_track):
|
||||
# First, create, the GET request and strategy that store pipeline state,
|
||||
# configure the backend, and mock out wire traffic.
|
||||
get_request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
get_request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
get_request.user = self.create_user_models_for_existing_account(
|
||||
strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True)
|
||||
partial_pipeline_token = strategy.session_get('partial_pipeline_token')
|
||||
strategy, "user@example.com", "password", self.get_username(), skip_social_auth=True
|
||||
)
|
||||
partial_pipeline_token = strategy.session_get("partial_pipeline_token")
|
||||
partial_data = strategy.storage.partial.load(partial_pipeline_token)
|
||||
|
||||
# Instrument the pipeline to get to the dashboard with the full
|
||||
# expected state.
|
||||
self.client.get(
|
||||
pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
actions.do_complete(get_request.backend, social_views._do_login, # pylint: disable=protected-access
|
||||
request=get_request)
|
||||
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
actions.do_complete(
|
||||
get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access
|
||||
)
|
||||
|
||||
post_request = self._get_login_post_request(strategy)
|
||||
login_user(post_request)
|
||||
actions.do_complete(post_request.backend, social_views._do_login, # pylint: disable=protected-access, no-member
|
||||
request=post_request)
|
||||
actions.do_complete(
|
||||
post_request.backend,
|
||||
social_views._do_login, # pylint: disable=protected-access, no-member
|
||||
request=post_request,
|
||||
)
|
||||
|
||||
# First we expect that we're in the unlinked state, and that there
|
||||
# really is no association in the backend.
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=False)
|
||||
self.assert_third_party_accounts_state(get_request, linked=False)
|
||||
self.assert_social_auth_does_not_exist_for_user(get_request.user, strategy)
|
||||
|
||||
# We should be redirected back to the complete page, setting
|
||||
@@ -630,16 +653,18 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
|
||||
# Now we expect to be in the linked state, with a backend entry.
|
||||
self.assert_social_auth_exists_for_user(get_request.user, strategy)
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=True)
|
||||
self.assert_third_party_accounts_state(get_request, linked=True)
|
||||
|
||||
def test_full_pipeline_succeeds_for_unlinking_account(self):
|
||||
# First, create, the GET request and strategy that store pipeline state,
|
||||
# configure the backend, and mock out wire traffic.
|
||||
get_request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
get_request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
user = self.create_user_models_for_existing_account(
|
||||
strategy, 'user@example.com', 'password', self.get_username())
|
||||
strategy, "user@example.com", "password", self.get_username()
|
||||
)
|
||||
self.assert_social_auth_exists_for_user(user, strategy)
|
||||
|
||||
# We're already logged in, so simulate that the cookie is set correctly
|
||||
@@ -647,36 +672,37 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
|
||||
# Instrument the pipeline to get to the dashboard with the full
|
||||
# expected state.
|
||||
self.client.get(
|
||||
pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
actions.do_complete(get_request.backend, social_views._do_login, # pylint: disable=protected-access
|
||||
request=get_request)
|
||||
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
actions.do_complete(
|
||||
get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access
|
||||
)
|
||||
|
||||
post_request = self._get_login_post_request(strategy)
|
||||
with self._patch_edxmako_current_request(post_request):
|
||||
login_user(post_request)
|
||||
actions.do_complete(post_request.backend, social_views._do_login, user=user, # pylint: disable=protected-access, no-member
|
||||
request=post_request)
|
||||
actions.do_complete(
|
||||
post_request.backend,
|
||||
social_views._do_login, # pylint: disable=protected-access
|
||||
user=user, # pylint: disable=no-member
|
||||
request=post_request,
|
||||
)
|
||||
|
||||
# Copy the user that was set on the post_request object back to the original get_request object.
|
||||
get_request.user = post_request.user
|
||||
|
||||
# First we expect that we're in the linked state, with a backend entry.
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=True)
|
||||
self.assert_third_party_accounts_state(get_request, linked=True)
|
||||
self.assert_social_auth_exists_for_user(get_request.user, strategy)
|
||||
|
||||
# Fire off the disconnect pipeline to unlink.
|
||||
self.assert_redirect_after_pipeline_completes(
|
||||
actions.do_disconnect(
|
||||
get_request.backend,
|
||||
get_request.user,
|
||||
None,
|
||||
redirect_field_name=auth.REDIRECT_FIELD_NAME
|
||||
get_request.backend, get_request.user, None, redirect_field_name=auth.REDIRECT_FIELD_NAME
|
||||
)
|
||||
)
|
||||
|
||||
# Now we expect to be in the unlinked state, with no backend entry.
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=False)
|
||||
self.assert_third_party_accounts_state(get_request, linked=False)
|
||||
self.assert_social_auth_does_not_exist_for_user(user, strategy)
|
||||
|
||||
def test_linking_already_associated_account_raises_auth_already_associated(self):
|
||||
@@ -684,16 +710,18 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
# test_already_associated_exception_populates_dashboard_with_error. It
|
||||
# verifies the exception gets raised when we expect; the latter test
|
||||
# covers exception handling.
|
||||
email = 'user@example.com'
|
||||
password = 'password'
|
||||
email = "user@example.com"
|
||||
password = "password"
|
||||
username = self.get_username()
|
||||
_, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
backend = strategy.request.backend
|
||||
backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
linked_user = self.create_user_models_for_existing_account(strategy, email, password, username)
|
||||
unlinked_user = social_utils.Storage.user.create_user(
|
||||
email='other_' + email, password=password, username='other_' + username)
|
||||
email="other_" + email, password=password, username="other_" + username
|
||||
)
|
||||
|
||||
self.assert_social_auth_exists_for_user(linked_user, strategy)
|
||||
self.assert_social_auth_does_not_exist_for_user(unlinked_user, strategy)
|
||||
@@ -711,42 +739,50 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
# covered in other tests. Using linked=True does, however, let us test
|
||||
# that the duplicate error has no effect on the state of the controls.
|
||||
get_request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
user = self.create_user_models_for_existing_account(
|
||||
strategy, 'user@example.com', 'password', self.get_username())
|
||||
strategy, "user@example.com", "password", self.get_username()
|
||||
)
|
||||
self.assert_social_auth_exists_for_user(user, strategy)
|
||||
|
||||
self.client.get('/login')
|
||||
self.client.get("/login")
|
||||
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
actions.do_complete(get_request.backend, social_views._do_login, # pylint: disable=protected-access
|
||||
request=get_request)
|
||||
actions.do_complete(
|
||||
get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access
|
||||
)
|
||||
|
||||
post_request = self._get_login_post_request(strategy)
|
||||
with self._patch_edxmako_current_request(post_request):
|
||||
login_user(post_request)
|
||||
actions.do_complete(post_request.backend, social_views._do_login, # pylint: disable=protected-access, no-member
|
||||
user=user, request=post_request)
|
||||
actions.do_complete(
|
||||
post_request.backend,
|
||||
social_views._do_login, # pylint: disable=protected-access, no-member
|
||||
user=user,
|
||||
request=post_request,
|
||||
)
|
||||
|
||||
# Monkey-patch storage for messaging; pylint: disable=protected-access
|
||||
post_request._messages = fallback.FallbackStorage(post_request)
|
||||
middleware.ExceptionMiddleware(get_response=lambda request: None).process_exception(
|
||||
post_request,
|
||||
exceptions.AuthAlreadyAssociated(self.provider.backend_name, 'account is already in use.'))
|
||||
post_request, exceptions.AuthAlreadyAssociated(self.provider.backend_name, "account is already in use.")
|
||||
)
|
||||
|
||||
self.assert_account_settings_context_looks_correct(
|
||||
account_settings_context(post_request), duplicate=True, linked=True)
|
||||
self.assert_third_party_accounts_state(post_request, duplicate=True, linked=True)
|
||||
|
||||
@mock.patch('common.djangoapps.third_party_auth.pipeline.segment.track')
|
||||
@mock.patch("common.djangoapps.third_party_auth.pipeline.segment.track")
|
||||
def test_full_pipeline_succeeds_for_signing_in_to_existing_active_account(self, _mock_segment_track):
|
||||
# First, create, the GET request and strategy that store pipeline state,
|
||||
# configure the backend, and mock out wire traffic.
|
||||
get_request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
user = self.create_user_models_for_existing_account(
|
||||
strategy, 'user@example.com', 'password', self.get_username())
|
||||
partial_pipeline_token = strategy.session_get('partial_pipeline_token')
|
||||
strategy, "user@example.com", "password", self.get_username()
|
||||
)
|
||||
partial_pipeline_token = strategy.session_get("partial_pipeline_token")
|
||||
partial_data = strategy.storage.partial.load(partial_pipeline_token)
|
||||
|
||||
self.assert_social_auth_exists_for_user(user, strategy)
|
||||
@@ -754,19 +790,21 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
|
||||
# Begin! Ensure that the login form contains expected controls before
|
||||
# the user starts the pipeline.
|
||||
self.assert_login_response_before_pipeline_looks_correct(self.client.get('/login'))
|
||||
self.assert_login_response_before_pipeline_looks_correct(self.client.get("/login"))
|
||||
|
||||
# The pipeline starts by a user GETting /auth/login/<provider>.
|
||||
# Synthesize that request and check that it redirects to the correct
|
||||
# provider page.
|
||||
self.assert_redirect_to_provider_looks_correct(self.client.get(
|
||||
pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)))
|
||||
self.assert_redirect_to_provider_looks_correct(
|
||||
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
)
|
||||
|
||||
# Next, the provider makes a request against /auth/complete/<provider>
|
||||
# to resume the pipeline.
|
||||
# pylint: disable=protected-access
|
||||
self.assert_redirect_to_login_looks_correct(actions.do_complete(get_request.backend, social_views._do_login,
|
||||
request=get_request))
|
||||
self.assert_redirect_to_login_looks_correct(
|
||||
actions.do_complete(get_request.backend, social_views._do_login, request=get_request)
|
||||
)
|
||||
|
||||
# At this point we know the pipeline has resumed correctly. Next we
|
||||
# fire off the view that displays the login form and posts it via JS.
|
||||
@@ -781,10 +819,16 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
|
||||
# We should be redirected back to the complete page, setting
|
||||
# the "logged in" cookie for the marketing site.
|
||||
self.assert_logged_in_cookie_redirect(actions.do_complete(
|
||||
post_request.backend, social_views._do_login, post_request.user, None, # pylint: disable=protected-access, no-member
|
||||
redirect_field_name=auth.REDIRECT_FIELD_NAME, request=post_request
|
||||
))
|
||||
self.assert_logged_in_cookie_redirect(
|
||||
actions.do_complete(
|
||||
post_request.backend,
|
||||
social_views._do_login,
|
||||
post_request.user,
|
||||
None, # pylint: disable=protected-access, no-member
|
||||
redirect_field_name=auth.REDIRECT_FIELD_NAME,
|
||||
request=post_request,
|
||||
)
|
||||
)
|
||||
|
||||
# Set the cookie and try again
|
||||
self.set_logged_in_cookies(get_request)
|
||||
@@ -795,14 +839,16 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
self.assert_redirect_after_pipeline_completes(
|
||||
self.do_complete(strategy, get_request, partial_pipeline_token, partial_data, user)
|
||||
)
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(get_request))
|
||||
self.assert_third_party_accounts_state(get_request)
|
||||
|
||||
def test_signin_fails_if_account_not_active(self):
|
||||
_, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
user = self.create_user_models_for_existing_account(strategy, 'user@example.com', 'password',
|
||||
self.get_username())
|
||||
user = self.create_user_models_for_existing_account(
|
||||
strategy, "user@example.com", "password", self.get_username()
|
||||
)
|
||||
|
||||
user.is_active = False
|
||||
user.save()
|
||||
@@ -813,25 +859,28 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
|
||||
def test_signin_fails_if_no_account_associated(self):
|
||||
_, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
self.create_user_models_for_existing_account(
|
||||
strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True)
|
||||
strategy, "user@example.com", "password", self.get_username(), skip_social_auth=True
|
||||
)
|
||||
|
||||
post_request = self._get_login_post_request(strategy)
|
||||
self.assert_json_failure_response_is_missing_social_auth(login_user(post_request))
|
||||
|
||||
def test_signin_associates_user_if_oauth_provider_and_tpa_is_required(self):
|
||||
username, email, password = self.get_username(), 'user@example.com', 'password'
|
||||
username, email, password = self.get_username(), "user@example.com", "password"
|
||||
|
||||
_, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
|
||||
user = self.create_user_models_for_existing_account(strategy, email, password, username, skip_social_auth=True)
|
||||
|
||||
with mock.patch(
|
||||
'common.djangoapps.third_party_auth.pipeline.get_associated_user_by_email_response',
|
||||
return_value=[{'user': user}, True],
|
||||
"common.djangoapps.third_party_auth.pipeline.get_associated_user_by_email_response",
|
||||
return_value=[{"user": user}, True],
|
||||
):
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
|
||||
@@ -839,30 +888,37 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
self.assert_json_success_response_looks_correct(login_user(post_request), verify_redirect_url=True)
|
||||
|
||||
def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_email_in_request(self):
|
||||
self.assert_first_party_auth_trumps_third_party_auth(email='user@example.com')
|
||||
self.assert_first_party_auth_trumps_third_party_auth(email="user@example.com")
|
||||
|
||||
def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_password_in_request(self):
|
||||
self.assert_first_party_auth_trumps_third_party_auth(password='password')
|
||||
self.assert_first_party_auth_trumps_third_party_auth(password="password")
|
||||
|
||||
def test_first_party_auth_trumps_third_party_auth_and_fails_when_credentials_bad(self):
|
||||
self.assert_first_party_auth_trumps_third_party_auth(
|
||||
email='user@example.com', password='password', success=False)
|
||||
email="user@example.com", password="password", success=False
|
||||
)
|
||||
|
||||
def test_first_party_auth_trumps_third_party_auth_and_succeeds_when_credentials_good(self):
|
||||
self.assert_first_party_auth_trumps_third_party_auth(
|
||||
email='user@example.com', password='password', success=True)
|
||||
email="user@example.com", password="password", success=True
|
||||
)
|
||||
|
||||
def test_pipeline_redirects_to_requested_url(self):
|
||||
requested_redirect_url = 'foo' # something different from '/dashboard'
|
||||
request, strategy = self.get_request_and_strategy(redirect_uri='social:complete')
|
||||
requested_redirect_url = "foo" # something different from '/dashboard'
|
||||
request, strategy = self.get_request_and_strategy(redirect_uri="social:complete")
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
request.session[pipeline.AUTH_REDIRECT_KEY] = requested_redirect_url
|
||||
|
||||
user = self.create_user_models_for_existing_account(strategy, 'user@foo.com', 'password', self.get_username())
|
||||
user = self.create_user_models_for_existing_account(strategy, "user@foo.com", "password", self.get_username())
|
||||
self.set_logged_in_cookies(request)
|
||||
|
||||
self.assert_redirect_after_pipeline_completes(
|
||||
actions.do_complete(request.backend, social_views._do_login, user=user, request=request), # pylint: disable=protected-access
|
||||
actions.do_complete(
|
||||
request.backend,
|
||||
social_views._do_login, # pylint: disable=protected-access
|
||||
user=user,
|
||||
request=request,
|
||||
),
|
||||
requested_redirect_url,
|
||||
)
|
||||
|
||||
@@ -870,44 +926,47 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
# First, create, the request and strategy that store pipeline state.
|
||||
# Mock out wire traffic.
|
||||
request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete"
|
||||
)
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
partial_pipeline_token = strategy.session_get('partial_pipeline_token')
|
||||
partial_pipeline_token = strategy.session_get("partial_pipeline_token")
|
||||
partial_data = strategy.storage.partial.load(partial_pipeline_token)
|
||||
|
||||
# Begin! Grab the registration page and check the login control on it.
|
||||
self.assert_register_response_before_pipeline_looks_correct(self.client.get('/register'))
|
||||
self.assert_register_response_before_pipeline_looks_correct(self.client.get("/register"))
|
||||
|
||||
# The pipeline starts by a user GETting /auth/login/<provider>.
|
||||
# Synthesize that request and check that it redirects to the correct
|
||||
# provider page.
|
||||
self.assert_redirect_to_provider_looks_correct(self.client.get(
|
||||
pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)))
|
||||
self.assert_redirect_to_provider_looks_correct(
|
||||
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
)
|
||||
|
||||
# Next, the provider makes a request against /auth/complete/<provider>.
|
||||
# pylint: disable=protected-access
|
||||
self.assert_redirect_to_register_looks_correct(actions.do_complete(request.backend, social_views._do_login,
|
||||
request=request))
|
||||
self.assert_redirect_to_register_looks_correct(
|
||||
actions.do_complete(request.backend, social_views._do_login, request=request)
|
||||
)
|
||||
|
||||
# At this point we know the pipeline has resumed correctly. Next we
|
||||
# fire off the view that displays the registration form.
|
||||
with self._patch_edxmako_current_request(request):
|
||||
self.assert_register_form_populates_unicode_username_correctly(request)
|
||||
self.assert_register_response_in_pipeline_looks_correct(
|
||||
login_and_registration_form(strategy.request, initial_mode='register'),
|
||||
pipeline.get(request)['kwargs'],
|
||||
['name', 'username', 'email']
|
||||
login_and_registration_form(strategy.request, initial_mode="register"),
|
||||
pipeline.get(request)["kwargs"],
|
||||
["name", "username", "email"],
|
||||
)
|
||||
|
||||
# Next, we invoke the view that handles the POST. Not all providers
|
||||
# supply email. Manually add it as the user would have to; this
|
||||
# also serves as a test of overriding provider values. Always provide a
|
||||
# password for us to check that we override it properly.
|
||||
overridden_password = strategy.request.POST.get('password')
|
||||
email = 'new@example.com'
|
||||
overridden_password = strategy.request.POST.get("password")
|
||||
email = "new@example.com"
|
||||
|
||||
if not strategy.request.POST.get('email'):
|
||||
strategy.request.POST = self.get_registration_post_vars({'email': email})
|
||||
if not strategy.request.POST.get("email"):
|
||||
strategy.request.POST = self.get_registration_post_vars({"email": email})
|
||||
|
||||
# The user must not exist yet...
|
||||
with pytest.raises(auth_models.User.DoesNotExist):
|
||||
@@ -935,41 +994,44 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
self.assert_redirect_after_pipeline_completes(
|
||||
self.do_complete(strategy, request, partial_pipeline_token, partial_data, created_user)
|
||||
)
|
||||
# Now the user has been redirected to the dashboard. Their third party account should now be linked.
|
||||
# Their third party account should now be linked.
|
||||
self.assert_social_auth_exists_for_user(created_user, strategy)
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(request), linked=True)
|
||||
self.assert_third_party_accounts_state(request, linked=True)
|
||||
|
||||
def test_new_account_registration_assigns_distinct_username_on_collision(self):
|
||||
original_username = self.get_username()
|
||||
request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete"
|
||||
)
|
||||
|
||||
# Create a colliding username in the backend, then proceed with
|
||||
# assignment via pipeline to make sure a distinct username is created.
|
||||
strategy.storage.user.create_user(username=self.get_username(), email='user@email.com', password='password')
|
||||
strategy.storage.user.create_user(username=self.get_username(), email="user@email.com", password="password")
|
||||
backend = strategy.request.backend
|
||||
backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
# pylint: disable=protected-access
|
||||
response = actions.do_complete(backend, social_views._do_login, request=request)
|
||||
assert response.status_code == 302
|
||||
|
||||
response = json.loads(create_account(strategy.request).content.decode('utf-8'))
|
||||
assert response['username'] != original_username
|
||||
response = json.loads(create_account(strategy.request).content.decode("utf-8"))
|
||||
assert response["username"] != original_username
|
||||
|
||||
def test_new_account_registration_fails_if_email_exists(self):
|
||||
request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete"
|
||||
)
|
||||
backend = strategy.request.backend
|
||||
backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
# pylint: disable=protected-access
|
||||
self.assert_redirect_to_register_looks_correct(actions.do_complete(backend, social_views._do_login,
|
||||
request=request))
|
||||
self.assert_redirect_to_register_looks_correct(
|
||||
actions.do_complete(backend, social_views._do_login, request=request)
|
||||
)
|
||||
|
||||
with self._patch_edxmako_current_request(request):
|
||||
self.assert_register_response_in_pipeline_looks_correct(
|
||||
login_and_registration_form(strategy.request, initial_mode='register'),
|
||||
pipeline.get(request)['kwargs'],
|
||||
['name', 'username', 'email']
|
||||
login_and_registration_form(strategy.request, initial_mode="register"),
|
||||
pipeline.get(request)["kwargs"],
|
||||
["name", "username", "email"],
|
||||
)
|
||||
|
||||
with self._patch_edxmako_current_request(strategy.request):
|
||||
@@ -979,18 +1041,18 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
self.assert_json_failure_response_is_username_collision(create_account(strategy.request))
|
||||
|
||||
def test_pipeline_raises_auth_entry_error_if_auth_entry_invalid(self):
|
||||
auth_entry = 'invalid'
|
||||
auth_entry = "invalid"
|
||||
assert auth_entry not in pipeline._AUTH_ENTRY_CHOICES # pylint: disable=protected-access
|
||||
|
||||
_, strategy = self.get_request_and_strategy(auth_entry=auth_entry, redirect_uri='social:complete')
|
||||
_, strategy = self.get_request_and_strategy(auth_entry=auth_entry, redirect_uri="social:complete")
|
||||
|
||||
with pytest.raises(pipeline.AuthEntryError):
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
|
||||
def test_pipeline_assumes_login_if_auth_entry_missing(self):
|
||||
_, strategy = self.get_request_and_strategy(auth_entry=None, redirect_uri='social:complete')
|
||||
_, strategy = self.get_request_and_strategy(auth_entry=None, redirect_uri="social:complete")
|
||||
response = self.fake_auth_complete(strategy)
|
||||
assert response.url == reverse('signin_user')
|
||||
assert response.url == reverse("signin_user")
|
||||
|
||||
def assert_first_party_auth_trumps_third_party_auth(self, email=None, password=None, success=None):
|
||||
"""Asserts first party auth was used in place of third party auth.
|
||||
@@ -1004,33 +1066,35 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
one of username or password will be missing).
|
||||
"""
|
||||
_, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
self.create_user_models_for_existing_account(
|
||||
strategy, email, password, self.get_username(), skip_social_auth=True)
|
||||
strategy, email, password, self.get_username(), skip_social_auth=True
|
||||
)
|
||||
|
||||
post_request = self._get_login_post_request(strategy)
|
||||
post_request.POST = dict(post_request.POST)
|
||||
|
||||
if email:
|
||||
post_request.POST['email'] = email
|
||||
post_request.POST["email"] = email
|
||||
if password:
|
||||
post_request.POST['password'] = 'bad_' + password if success is False else password
|
||||
post_request.POST["password"] = "bad_" + password if success is False else password
|
||||
|
||||
self.assert_pipeline_running(post_request)
|
||||
payload = json.loads(login_user(post_request).content.decode('utf-8'))
|
||||
payload = json.loads(login_user(post_request).content.decode("utf-8"))
|
||||
|
||||
if success is None:
|
||||
# Request malformed -- just one of email/password given.
|
||||
assert not payload.get('success')
|
||||
assert 'There was an error receiving your login information' in payload.get('value')
|
||||
assert not payload.get("success")
|
||||
assert "There was an error receiving your login information" in payload.get("value")
|
||||
elif success:
|
||||
# Request well-formed and credentials good.
|
||||
assert payload.get('success')
|
||||
assert payload.get("success")
|
||||
else:
|
||||
# Request well-formed but credentials bad.
|
||||
assert not payload.get('success')
|
||||
assert 'incorrect' in payload.get('value')
|
||||
assert not payload.get("success")
|
||||
assert "incorrect" in payload.get("value")
|
||||
|
||||
def get_response_data(self):
|
||||
"""Gets a dict of response data of the form given by the provider.
|
||||
@@ -1064,8 +1128,13 @@ class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin):
|
||||
if not user:
|
||||
user = request.user
|
||||
return actions.do_complete(
|
||||
request.backend, social_views._do_login, user, None, # pylint: disable=protected-access
|
||||
redirect_field_name=auth.REDIRECT_FIELD_NAME, request=request, partial_token=partial_pipeline_token
|
||||
request.backend,
|
||||
social_views._do_login, # pylint: disable=protected-access
|
||||
user,
|
||||
None,
|
||||
redirect_field_name=auth.REDIRECT_FIELD_NAME,
|
||||
request=request,
|
||||
partial_token=partial_pipeline_token,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
Third_party_auth integration tests using a mock version of the TestShib provider
|
||||
"""
|
||||
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
@@ -27,16 +26,15 @@ from common.djangoapps.third_party_auth.saml import SapSuccessFactorsIdentityPro
|
||||
from common.djangoapps.third_party_auth.saml import log as saml_log
|
||||
from common.djangoapps.third_party_auth.tasks import fetch_saml_metadata
|
||||
from common.djangoapps.third_party_auth.tests import testutil, utils
|
||||
from openedx.core.djangoapps.user_api.accounts.settings_views import account_settings_context
|
||||
from openedx.core.djangoapps.user_authn.views.login import login_user
|
||||
from openedx.features.enterprise_support.tests.factories import EnterpriseCustomerFactory
|
||||
|
||||
from .base import IntegrationTestMixin
|
||||
|
||||
TESTSHIB_ENTITY_ID = 'https://idp.testshib.org/idp/shibboleth'
|
||||
TESTSHIB_METADATA_URL = 'https://mock.testshib.org/metadata/testshib-providers.xml'
|
||||
TESTSHIB_METADATA_URL_WITH_CACHE_DURATION = 'https://mock.testshib.org/metadata/testshib-providers-cache.xml'
|
||||
TESTSHIB_SSO_URL = 'https://idp.testshib.org/idp/profile/SAML2/Redirect/SSO'
|
||||
TESTSHIB_ENTITY_ID = "https://idp.testshib.org/idp/shibboleth"
|
||||
TESTSHIB_METADATA_URL = "https://mock.testshib.org/metadata/testshib-providers.xml"
|
||||
TESTSHIB_METADATA_URL_WITH_CACHE_DURATION = "https://mock.testshib.org/metadata/testshib-providers-cache.xml"
|
||||
TESTSHIB_SSO_URL = "https://idp.testshib.org/idp/profile/SAML2/Redirect/SSO"
|
||||
|
||||
|
||||
class SamlIntegrationTestUtilities:
|
||||
@@ -44,6 +42,7 @@ class SamlIntegrationTestUtilities:
|
||||
Class contains methods particular to SAML integration testing so that they
|
||||
can be separated out from the actual test methods.
|
||||
"""
|
||||
|
||||
PROVIDER_ID = "saml-testshib"
|
||||
PROVIDER_NAME = "TestShib"
|
||||
PROVIDER_BACKEND = "tpa-saml"
|
||||
@@ -67,51 +66,59 @@ class SamlIntegrationTestUtilities:
|
||||
self.addCleanup(httpretty.disable) # lint-amnesty, pylint: disable=no-member
|
||||
|
||||
def metadata_callback(_request, _uri, headers):
|
||||
""" Return a cached copy of TestShib's metadata by reading it from disk """
|
||||
return (200, headers, self.read_data_file('testshib_metadata.xml')) # lint-amnesty, pylint: disable=no-member
|
||||
"""Return a cached copy of TestShib's metadata by reading it from disk"""
|
||||
return (
|
||||
200,
|
||||
headers,
|
||||
self.read_data_file("testshib_metadata.xml"),
|
||||
) # lint-amnesty, pylint: disable=no-member
|
||||
|
||||
httpretty.register_uri(httpretty.GET, TESTSHIB_METADATA_URL, content_type='text/xml', body=metadata_callback)
|
||||
httpretty.register_uri(httpretty.GET, TESTSHIB_METADATA_URL, content_type="text/xml", body=metadata_callback)
|
||||
|
||||
def cache_duration_metadata_callback(_request, _uri, headers):
|
||||
"""Return a cached copy of TestShib's metadata with a cacheDuration attribute"""
|
||||
return (200, headers, self.read_data_file('testshib_metadata_with_cache_duration.xml')) # lint-amnesty, pylint: disable=no-member
|
||||
return (
|
||||
200,
|
||||
headers,
|
||||
self.read_data_file("testshib_metadata_with_cache_duration.xml"),
|
||||
) # lint-amnesty, pylint: disable=no-member
|
||||
|
||||
httpretty.register_uri(
|
||||
httpretty.GET,
|
||||
TESTSHIB_METADATA_URL_WITH_CACHE_DURATION,
|
||||
content_type='text/xml',
|
||||
body=cache_duration_metadata_callback
|
||||
content_type="text/xml",
|
||||
body=cache_duration_metadata_callback,
|
||||
)
|
||||
|
||||
# Configure the SAML library to use the same request ID for every request.
|
||||
# Doing this and freezing the time allows us to play back recorded request/response pairs
|
||||
uid_patch = patch('onelogin.saml2.utils.OneLogin_Saml2_Utils.generate_unique_id', return_value='TESTID')
|
||||
uid_patch = patch("onelogin.saml2.utils.OneLogin_Saml2_Utils.generate_unique_id", return_value="TESTID")
|
||||
uid_patch.start()
|
||||
self.addCleanup(uid_patch.stop) # lint-amnesty, pylint: disable=no-member
|
||||
self._freeze_time(timestamp=1434326820) # This is the time when the saved request/response was recorded.
|
||||
|
||||
def _freeze_time(self, timestamp):
|
||||
""" Mock the current time for SAML, so we can replay canned requests/responses """
|
||||
now_patch = patch('onelogin.saml2.utils.OneLogin_Saml2_Utils.now', return_value=timestamp)
|
||||
"""Mock the current time for SAML, so we can replay canned requests/responses"""
|
||||
now_patch = patch("onelogin.saml2.utils.OneLogin_Saml2_Utils.now", return_value=timestamp)
|
||||
now_patch.start()
|
||||
self.addCleanup(now_patch.stop) # lint-amnesty, pylint: disable=no-member
|
||||
|
||||
def _configure_testshib_provider(self, **kwargs):
|
||||
""" Enable and configure the TestShib SAML IdP as a third_party_auth provider """
|
||||
fetch_metadata = kwargs.pop('fetch_metadata', True)
|
||||
assert_metadata_updates = kwargs.pop('assert_metadata_updates', True)
|
||||
kwargs.setdefault('name', self.PROVIDER_NAME)
|
||||
kwargs.setdefault('enabled', True)
|
||||
kwargs.setdefault('visible', True)
|
||||
"""Enable and configure the TestShib SAML IdP as a third_party_auth provider"""
|
||||
fetch_metadata = kwargs.pop("fetch_metadata", True)
|
||||
assert_metadata_updates = kwargs.pop("assert_metadata_updates", True)
|
||||
kwargs.setdefault("name", self.PROVIDER_NAME)
|
||||
kwargs.setdefault("enabled", True)
|
||||
kwargs.setdefault("visible", True)
|
||||
kwargs.setdefault("backend_name", "tpa-saml")
|
||||
kwargs.setdefault('slug', self.PROVIDER_IDP_SLUG)
|
||||
kwargs.setdefault('entity_id', TESTSHIB_ENTITY_ID)
|
||||
kwargs.setdefault('metadata_source', TESTSHIB_METADATA_URL)
|
||||
kwargs.setdefault('icon_class', 'fa-university')
|
||||
kwargs.setdefault('attr_email', 'urn:oid:1.3.6.1.4.1.5923.1.1.1.6') # eduPersonPrincipalName
|
||||
kwargs.setdefault('max_session_length', None)
|
||||
kwargs.setdefault('send_to_registration_first', False)
|
||||
kwargs.setdefault('skip_email_verification', False)
|
||||
kwargs.setdefault("slug", self.PROVIDER_IDP_SLUG)
|
||||
kwargs.setdefault("entity_id", TESTSHIB_ENTITY_ID)
|
||||
kwargs.setdefault("metadata_source", TESTSHIB_METADATA_URL)
|
||||
kwargs.setdefault("icon_class", "fa-university")
|
||||
kwargs.setdefault("attr_email", "urn:oid:1.3.6.1.4.1.5923.1.1.1.6") # eduPersonPrincipalName
|
||||
kwargs.setdefault("max_session_length", None)
|
||||
kwargs.setdefault("send_to_registration_first", False)
|
||||
kwargs.setdefault("skip_email_verification", False)
|
||||
saml_provider = self.configure_saml_provider(**kwargs) # pylint: disable=no-member
|
||||
|
||||
if fetch_metadata:
|
||||
@@ -127,17 +134,17 @@ class SamlIntegrationTestUtilities:
|
||||
return saml_provider
|
||||
|
||||
def do_provider_login(self, provider_redirect_url):
|
||||
""" Mocked: the user logs in to TestShib and then gets redirected back """
|
||||
"""Mocked: the user logs in to TestShib and then gets redirected back"""
|
||||
# The SAML provider (TestShib) will authenticate the user, then get the browser to POST a response:
|
||||
assert provider_redirect_url.startswith(TESTSHIB_SSO_URL) # lint-amnesty, pylint: disable=no-member
|
||||
|
||||
saml_response_xml = utils.read_and_pre_process_xml(
|
||||
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'testshib_saml_response.xml')
|
||||
os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "testshib_saml_response.xml")
|
||||
)
|
||||
|
||||
return self.client.post( # lint-amnesty, pylint: disable=no-member
|
||||
self.complete_url, # lint-amnesty, pylint: disable=no-member
|
||||
content_type='application/x-www-form-urlencoded',
|
||||
content_type="application/x-www-form-urlencoded",
|
||||
data=utils.prepare_saml_response_from_xml(saml_response_xml),
|
||||
)
|
||||
|
||||
@@ -150,16 +157,16 @@ class TestIndexExceptionTest(SamlIntegrationTestUtilities, IntegrationTestMixin,
|
||||
"""
|
||||
|
||||
TOKEN_RESPONSE_DATA = {
|
||||
'access_token': 'access_token_value',
|
||||
'expires_in': 'expires_in_value',
|
||||
"access_token": "access_token_value",
|
||||
"expires_in": "expires_in_value",
|
||||
}
|
||||
USER_RESPONSE_DATA = {
|
||||
'lastName': 'lastName_value',
|
||||
'id': 'id_value',
|
||||
'firstName': 'firstName_value',
|
||||
'idp_name': 'testshib',
|
||||
'attributes': {'urn:oid:0.9.2342.19200300.100.1.1': [], 'name_id': '1'},
|
||||
'session_index': '1',
|
||||
"lastName": "lastName_value",
|
||||
"id": "id_value",
|
||||
"firstName": "firstName_value",
|
||||
"idp_name": "testshib",
|
||||
"attributes": {"urn:oid:0.9.2342.19200300.100.1.1": [], "name_id": "1"},
|
||||
"session_index": "1",
|
||||
}
|
||||
|
||||
def test_index_error_from_empty_list_saml_attribute(self):
|
||||
@@ -169,7 +176,8 @@ class TestIndexExceptionTest(SamlIntegrationTestUtilities, IntegrationTestMixin,
|
||||
"""
|
||||
self.provider = self._configure_testshib_provider()
|
||||
request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
with self.assertRaises(IncorrectConfigurationException):
|
||||
request.backend.auth_complete = MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
|
||||
@@ -188,16 +196,16 @@ class TestKeyExceptionTest(SamlIntegrationTestUtilities, IntegrationTestMixin, t
|
||||
"""
|
||||
|
||||
TOKEN_RESPONSE_DATA = {
|
||||
'access_token': 'access_token_value',
|
||||
'expires_in': 'expires_in_value',
|
||||
"access_token": "access_token_value",
|
||||
"expires_in": "expires_in_value",
|
||||
}
|
||||
USER_RESPONSE_DATA = {
|
||||
'lastName': 'lastName_value',
|
||||
'id': 'id_value',
|
||||
'firstName': 'firstName_value',
|
||||
'idp_name': 'testshib',
|
||||
'attributes': {'name_id': '1'},
|
||||
'session_index': '1',
|
||||
"lastName": "lastName_value",
|
||||
"id": "id_value",
|
||||
"firstName": "firstName_value",
|
||||
"idp_name": "testshib",
|
||||
"attributes": {"name_id": "1"},
|
||||
"session_index": "1",
|
||||
}
|
||||
|
||||
def test_key_error_from_missing_saml_attributes(self):
|
||||
@@ -207,7 +215,8 @@ class TestKeyExceptionTest(SamlIntegrationTestUtilities, IntegrationTestMixin, t
|
||||
"""
|
||||
self.provider = self._configure_testshib_provider()
|
||||
request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
with self.assertRaises(IncorrectConfigurationException):
|
||||
request.backend.auth_complete = MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
|
||||
@@ -226,25 +235,23 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin
|
||||
"""
|
||||
|
||||
TOKEN_RESPONSE_DATA = {
|
||||
'access_token': 'access_token_value',
|
||||
'expires_in': 'expires_in_value',
|
||||
"access_token": "access_token_value",
|
||||
"expires_in": "expires_in_value",
|
||||
}
|
||||
USER_RESPONSE_DATA = {
|
||||
'lastName': 'lastName_value',
|
||||
'id': 'id_value',
|
||||
'firstName': 'firstName_value',
|
||||
'idp_name': 'testshib',
|
||||
'attributes': {'urn:oid:0.9.2342.19200300.100.1.1': ['myself'], 'name_id': '1'},
|
||||
'session_index': '1',
|
||||
"lastName": "lastName_value",
|
||||
"id": "id_value",
|
||||
"firstName": "firstName_value",
|
||||
"idp_name": "testshib",
|
||||
"attributes": {"urn:oid:0.9.2342.19200300.100.1.1": ["myself"], "name_id": "1"},
|
||||
"session_index": "1",
|
||||
}
|
||||
|
||||
@patch('openedx.features.enterprise_support.api.enterprise_customer_for_request')
|
||||
@patch('openedx.core.djangoapps.user_api.accounts.settings_views.enterprise_customer_for_request')
|
||||
@patch('openedx.features.enterprise_support.utils.third_party_auth.provider.Registry.get')
|
||||
@patch("openedx.features.enterprise_support.api.enterprise_customer_for_request")
|
||||
@patch("openedx.features.enterprise_support.utils.third_party_auth.provider.Registry.get")
|
||||
def test_full_pipeline_succeeds_for_unlinking_testshib_account(
|
||||
self,
|
||||
mock_auth_provider,
|
||||
mock_enterprise_customer_for_request_settings_view,
|
||||
mock_enterprise_customer_for_request,
|
||||
):
|
||||
|
||||
@@ -252,10 +259,12 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin
|
||||
# configure the backend, and mock out wire traffic.
|
||||
self.provider = self._configure_testshib_provider()
|
||||
request, strategy = self.get_request_and_strategy(
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
|
||||
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete"
|
||||
)
|
||||
request.backend.auth_complete = MagicMock(return_value=self.fake_auth_complete(strategy))
|
||||
user = self.create_user_models_for_existing_account(
|
||||
strategy, 'user@example.com', 'password', self.get_username())
|
||||
strategy, "user@example.com", "password", self.get_username()
|
||||
)
|
||||
self.assert_social_auth_exists_for_user(user, strategy)
|
||||
|
||||
request.user = user
|
||||
@@ -267,70 +276,67 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin
|
||||
enterprise_customer = EnterpriseCustomerFactory()
|
||||
assert EnterpriseCustomerUser.objects.count() == 0, "Precondition check: no link records should exist"
|
||||
EnterpriseCustomerUser.objects.link_user(enterprise_customer, user.email)
|
||||
assert (EnterpriseCustomerUser.objects
|
||||
.filter(enterprise_customer=enterprise_customer, user_id=user.id).count() == 1)
|
||||
EnterpriseCustomerIdentityProvider.objects.get_or_create(enterprise_customer=enterprise_customer,
|
||||
provider_id=self.provider.provider_id)
|
||||
assert (
|
||||
EnterpriseCustomerUser.objects.filter(enterprise_customer=enterprise_customer, user_id=user.id).count() == 1
|
||||
)
|
||||
EnterpriseCustomerIdentityProvider.objects.get_or_create(
|
||||
enterprise_customer=enterprise_customer, provider_id=self.provider.provider_id
|
||||
)
|
||||
|
||||
enterprise_customer_data = {
|
||||
'uuid': enterprise_customer.uuid,
|
||||
'name': enterprise_customer.name,
|
||||
'identity_provider': 'saml-default',
|
||||
'identity_providers': [
|
||||
"uuid": enterprise_customer.uuid,
|
||||
"name": enterprise_customer.name,
|
||||
"identity_provider": "saml-default",
|
||||
"identity_providers": [
|
||||
{
|
||||
"provider_id": "saml-default",
|
||||
}
|
||||
],
|
||||
}
|
||||
mock_auth_provider.return_value.backend_name = 'tpa-saml'
|
||||
mock_auth_provider.return_value.backend_name = "tpa-saml"
|
||||
mock_enterprise_customer_for_request.return_value = enterprise_customer_data
|
||||
mock_enterprise_customer_for_request_settings_view.return_value = enterprise_customer_data
|
||||
|
||||
# Instrument the pipeline to get to the dashboard with the full expected state.
|
||||
self.client.get(
|
||||
pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))
|
||||
|
||||
actions.do_complete(request.backend, social_views._do_login, # pylint: disable=protected-access
|
||||
request=request)
|
||||
actions.do_complete(
|
||||
request.backend, social_views._do_login, request=request # pylint: disable=protected-access
|
||||
)
|
||||
|
||||
with self._patch_edxmako_current_request(strategy.request):
|
||||
login_user(strategy.request)
|
||||
actions.do_complete(request.backend, social_views._do_login, user=user, # pylint: disable=protected-access
|
||||
request=request)
|
||||
actions.do_complete(
|
||||
request.backend, social_views._do_login, user=user, request=request # pylint: disable=protected-access
|
||||
)
|
||||
|
||||
# First we expect that we're in the linked state, with a backend entry.
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(request), linked=True)
|
||||
self.assert_social_auth_exists_for_user(request.user, strategy)
|
||||
|
||||
FEATURES_WITH_ENTERPRISE_ENABLED = settings.FEATURES.copy()
|
||||
FEATURES_WITH_ENTERPRISE_ENABLED['ENABLE_ENTERPRISE_INTEGRATION'] = True
|
||||
FEATURES_WITH_ENTERPRISE_ENABLED["ENABLE_ENTERPRISE_INTEGRATION"] = True
|
||||
with patch.dict("django.conf.settings.FEATURES", FEATURES_WITH_ENTERPRISE_ENABLED):
|
||||
# Fire off the disconnect pipeline without the user information.
|
||||
actions.do_disconnect(
|
||||
request.backend,
|
||||
None,
|
||||
None,
|
||||
redirect_field_name=auth.REDIRECT_FIELD_NAME,
|
||||
request=request
|
||||
request.backend, None, None, redirect_field_name=auth.REDIRECT_FIELD_NAME, request=request
|
||||
)
|
||||
assert (
|
||||
EnterpriseCustomerUser.objects.filter(enterprise_customer=enterprise_customer, user_id=user.id).count()
|
||||
!= 0
|
||||
)
|
||||
assert EnterpriseCustomerUser.objects\
|
||||
.filter(enterprise_customer=enterprise_customer, user_id=user.id).count() != 0
|
||||
|
||||
# Fire off the disconnect pipeline to unlink.
|
||||
self.assert_redirect_after_pipeline_completes(
|
||||
actions.do_disconnect(
|
||||
request.backend,
|
||||
user,
|
||||
None,
|
||||
redirect_field_name=auth.REDIRECT_FIELD_NAME,
|
||||
request=request
|
||||
request.backend, user, None, redirect_field_name=auth.REDIRECT_FIELD_NAME, request=request
|
||||
)
|
||||
)
|
||||
# Now we expect to be in the unlinked state, with no backend entry.
|
||||
self.assert_account_settings_context_looks_correct(account_settings_context(request), linked=False)
|
||||
self.assert_third_party_accounts_state(request, linked=False)
|
||||
self.assert_social_auth_does_not_exist_for_user(user, strategy)
|
||||
assert EnterpriseCustomerUser.objects\
|
||||
.filter(enterprise_customer=enterprise_customer, user_id=user.id).count() == 0
|
||||
assert (
|
||||
EnterpriseCustomerUser.objects.filter(enterprise_customer=enterprise_customer, user_id=user.id).count()
|
||||
== 0
|
||||
)
|
||||
|
||||
def get_response_data(self):
|
||||
"""Gets dict (string -> object) of merged data about the user."""
|
||||
@@ -340,7 +346,7 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin
|
||||
|
||||
def get_username(self):
|
||||
response_data = self.get_response_data()
|
||||
return response_data.get('idp_name')
|
||||
return response_data.get("idp_name")
|
||||
|
||||
def test_login_before_metadata_fetched(self):
|
||||
self._configure_testshib_provider(fetch_metadata=False)
|
||||
@@ -350,18 +356,18 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin
|
||||
try_login_response = self.client.get(testshib_login_url)
|
||||
# The user should be redirected to back to the login page:
|
||||
assert try_login_response.status_code == 302
|
||||
assert try_login_response['Location'] == self.login_page_url
|
||||
assert try_login_response["Location"] == self.login_page_url
|
||||
# When loading the login page, the user will see an error message:
|
||||
response = self.client.get(self.login_page_url)
|
||||
self.assertContains(response, 'Authentication with TestShib is currently unavailable.')
|
||||
self.assertContains(response, "Authentication with TestShib is currently unavailable.")
|
||||
|
||||
def test_login(self):
|
||||
""" Configure TestShib before running the login test """
|
||||
"""Configure TestShib before running the login test"""
|
||||
self._configure_testshib_provider()
|
||||
self._test_login()
|
||||
|
||||
def test_register(self):
|
||||
""" Configure TestShib before running the register test """
|
||||
"""Configure TestShib before running the register test"""
|
||||
self._configure_testshib_provider()
|
||||
self._test_register()
|
||||
|
||||
@@ -374,17 +380,17 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin
|
||||
user=self.user, provider=self.PROVIDER_BACKEND, uid__startswith=self.PROVIDER_IDP_SLUG
|
||||
)
|
||||
attributes = record.extra_data
|
||||
assert attributes.get('urn:oid:1.3.6.1.4.1.5923.1.1.1.9') == ['Member@testshib.org', 'Staff@testshib.org']
|
||||
assert attributes.get('urn:oid:2.5.4.3') == ['Me Myself And I']
|
||||
assert attributes.get('urn:oid:0.9.2342.19200300.100.1.1') == ['myself']
|
||||
assert attributes.get('urn:oid:2.5.4.20') == ['555-5555']
|
||||
assert attributes.get("urn:oid:1.3.6.1.4.1.5923.1.1.1.9") == ["Member@testshib.org", "Staff@testshib.org"]
|
||||
assert attributes.get("urn:oid:2.5.4.3") == ["Me Myself And I"]
|
||||
assert attributes.get("urn:oid:0.9.2342.19200300.100.1.1") == ["myself"]
|
||||
assert attributes.get("urn:oid:2.5.4.20") == ["555-5555"]
|
||||
# Phone number
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_debug_mode_login(self, debug_mode_enabled):
|
||||
""" Test SAML login logs with debug mode enabled or not """
|
||||
"""Test SAML login logs with debug mode enabled or not"""
|
||||
self._configure_testshib_provider(debug_mode=debug_mode_enabled)
|
||||
with patch.object(saml_log, 'info') as mock_log:
|
||||
with patch.object(saml_log, "info") as mock_log:
|
||||
self._test_login()
|
||||
if debug_mode_enabled:
|
||||
# We expect that test_login() does two full logins, and each attempt generates two
|
||||
@@ -393,38 +399,37 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin
|
||||
|
||||
expected_next_url = "/dashboard"
|
||||
(msg, action_type, idp_name, request_data, next_url, xml), _kwargs = mock_log.call_args_list[0]
|
||||
assert msg.startswith('SAML login %s')
|
||||
assert action_type == 'request'
|
||||
assert msg.startswith("SAML login %s")
|
||||
assert action_type == "request"
|
||||
assert idp_name == self.PROVIDER_IDP_SLUG
|
||||
self.assertDictContainsSubset(
|
||||
{"idp": idp_name, "auth_entry": "login", "next": expected_next_url},
|
||||
request_data
|
||||
{"idp": idp_name, "auth_entry": "login", "next": expected_next_url}, request_data
|
||||
)
|
||||
assert next_url == expected_next_url
|
||||
assert '<samlp:AuthnRequest' in xml
|
||||
assert "<samlp:AuthnRequest" in xml
|
||||
|
||||
(msg, action_type, idp_name, response_data, next_url, xml), _kwargs = mock_log.call_args_list[1]
|
||||
assert msg.startswith('SAML login %s')
|
||||
assert action_type == 'response'
|
||||
assert msg.startswith("SAML login %s")
|
||||
assert action_type == "response"
|
||||
assert idp_name == self.PROVIDER_IDP_SLUG
|
||||
self.assertDictContainsSubset({"RelayState": idp_name}, response_data)
|
||||
assert 'SAMLResponse' in response_data
|
||||
assert "SAMLResponse" in response_data
|
||||
assert next_url == expected_next_url
|
||||
assert '<saml2p:Response' in xml
|
||||
assert "<saml2p:Response" in xml
|
||||
else:
|
||||
assert not mock_log.called
|
||||
|
||||
def test_configure_testshib_provider_with_cache_duration(self):
|
||||
""" Enable and configure the TestShib SAML IdP as a third_party_auth provider """
|
||||
"""Enable and configure the TestShib SAML IdP as a third_party_auth provider"""
|
||||
kwargs = {}
|
||||
kwargs.setdefault('name', self.PROVIDER_NAME)
|
||||
kwargs.setdefault('enabled', True)
|
||||
kwargs.setdefault('visible', True)
|
||||
kwargs.setdefault('slug', self.PROVIDER_IDP_SLUG)
|
||||
kwargs.setdefault('entity_id', TESTSHIB_ENTITY_ID)
|
||||
kwargs.setdefault('metadata_source', TESTSHIB_METADATA_URL_WITH_CACHE_DURATION)
|
||||
kwargs.setdefault('icon_class', 'fa-university')
|
||||
kwargs.setdefault('attr_email', 'urn:oid:1.3.6.1.4.1.5923.1.1.1.6') # eduPersonPrincipalName
|
||||
kwargs.setdefault("name", self.PROVIDER_NAME)
|
||||
kwargs.setdefault("enabled", True)
|
||||
kwargs.setdefault("visible", True)
|
||||
kwargs.setdefault("slug", self.PROVIDER_IDP_SLUG)
|
||||
kwargs.setdefault("entity_id", TESTSHIB_ENTITY_ID)
|
||||
kwargs.setdefault("metadata_source", TESTSHIB_METADATA_URL_WITH_CACHE_DURATION)
|
||||
kwargs.setdefault("icon_class", "fa-university")
|
||||
kwargs.setdefault("attr_email", "urn:oid:1.3.6.1.4.1.5923.1.1.1.6") # eduPersonPrincipalName
|
||||
self.configure_saml_provider(**kwargs)
|
||||
assert httpretty.is_enabled()
|
||||
num_total, num_skipped, num_attempted, num_updated, num_failed, failure_messages = fetch_saml_metadata()
|
||||
@@ -476,70 +481,72 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
super().setUp()
|
||||
|
||||
# Mock the call to the SAP SuccessFactors assertion endpoint
|
||||
SAPSF_ASSERTION_URL = 'http://successfactors.com/oauth/idp'
|
||||
SAPSF_ASSERTION_URL = "http://successfactors.com/oauth/idp"
|
||||
|
||||
def assertion_callback(_request, _uri, headers):
|
||||
"""
|
||||
Return a fake assertion after checking that the input is what we expect.
|
||||
"""
|
||||
assert b'private_key=fake_private_key_here' in _request.body
|
||||
assert b'user_id=myself' in _request.body
|
||||
assert b'token_url=http%3A%2F%2Fsuccessfactors.com%2Foauth%2Ftoken' in _request.body
|
||||
assert b'client_id=TatVotSEiCMteSNWtSOnLanCtBGwNhGB' in _request.body
|
||||
return (200, headers, 'fake_saml_assertion')
|
||||
assert b"private_key=fake_private_key_here" in _request.body
|
||||
assert b"user_id=myself" in _request.body
|
||||
assert b"token_url=http%3A%2F%2Fsuccessfactors.com%2Foauth%2Ftoken" in _request.body
|
||||
assert b"client_id=TatVotSEiCMteSNWtSOnLanCtBGwNhGB" in _request.body
|
||||
return (200, headers, "fake_saml_assertion")
|
||||
|
||||
httpretty.register_uri(httpretty.POST, SAPSF_ASSERTION_URL, content_type='text/plain', body=assertion_callback)
|
||||
httpretty.register_uri(httpretty.POST, SAPSF_ASSERTION_URL, content_type="text/plain", body=assertion_callback)
|
||||
|
||||
SAPSF_BAD_ASSERTION_URL = 'http://successfactors.com/oauth-fake/idp'
|
||||
SAPSF_BAD_ASSERTION_URL = "http://successfactors.com/oauth-fake/idp"
|
||||
|
||||
def bad_callback(_request, _uri, headers):
|
||||
"""
|
||||
Return a 404 error when someone tries to call the URL.
|
||||
"""
|
||||
return (404, headers, 'NOT AN ASSERTION')
|
||||
return (404, headers, "NOT AN ASSERTION")
|
||||
|
||||
httpretty.register_uri(httpretty.POST, SAPSF_BAD_ASSERTION_URL, content_type='text/plain', body=bad_callback)
|
||||
httpretty.register_uri(httpretty.POST, SAPSF_BAD_ASSERTION_URL, content_type="text/plain", body=bad_callback)
|
||||
|
||||
# Mock the call to the SAP SuccessFactors token endpoint
|
||||
SAPSF_TOKEN_URL = 'http://successfactors.com/oauth/token'
|
||||
SAPSF_TOKEN_URL = "http://successfactors.com/oauth/token"
|
||||
|
||||
def token_callback(_request, _uri, headers):
|
||||
"""
|
||||
Return a fake assertion after checking that the input is what we expect.
|
||||
"""
|
||||
assert b'assertion=fake_saml_assertion' in _request.body
|
||||
assert b'company_id=NCC1701D' in _request.body
|
||||
assert b'grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Asaml2-bearer' in _request.body
|
||||
assert b'client_id=TatVotSEiCMteSNWtSOnLanCtBGwNhGB' in _request.body
|
||||
assert b"assertion=fake_saml_assertion" in _request.body
|
||||
assert b"company_id=NCC1701D" in _request.body
|
||||
assert b"grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Asaml2-bearer" in _request.body
|
||||
assert b"client_id=TatVotSEiCMteSNWtSOnLanCtBGwNhGB" in _request.body
|
||||
return (200, headers, '{"access_token": "faketoken"}')
|
||||
|
||||
httpretty.register_uri(httpretty.POST, SAPSF_TOKEN_URL, content_type='application/json', body=token_callback)
|
||||
httpretty.register_uri(httpretty.POST, SAPSF_TOKEN_URL, content_type="application/json", body=token_callback)
|
||||
|
||||
# Mock the call to the SAP SuccessFactors OData user endpoint
|
||||
ODATA_USER_URL = (
|
||||
'http://api.successfactors.com/odata/v2/User(userId=\'myself\')'
|
||||
'?$select=firstName,lastName,defaultFullName,email'
|
||||
"http://api.successfactors.com/odata/v2/User(userId='myself')"
|
||||
"?$select=firstName,lastName,defaultFullName,email"
|
||||
)
|
||||
|
||||
def user_callback(request, _uri, headers):
|
||||
auth_header = request.headers.get('Authorization')
|
||||
assert auth_header == 'Bearer faketoken'
|
||||
auth_header = request.headers.get("Authorization")
|
||||
assert auth_header == "Bearer faketoken"
|
||||
return (
|
||||
200,
|
||||
headers,
|
||||
json.dumps({
|
||||
'd': {
|
||||
'username': 'jsmith',
|
||||
'firstName': 'John',
|
||||
'lastName': 'Smith',
|
||||
'defaultFullName': 'John Smith',
|
||||
'email': 'john@smith.com',
|
||||
'country': 'United States',
|
||||
json.dumps(
|
||||
{
|
||||
"d": {
|
||||
"username": "jsmith",
|
||||
"firstName": "John",
|
||||
"lastName": "Smith",
|
||||
"defaultFullName": "John Smith",
|
||||
"email": "john@smith.com",
|
||||
"country": "United States",
|
||||
}
|
||||
}
|
||||
})
|
||||
),
|
||||
)
|
||||
|
||||
httpretty.register_uri(httpretty.GET, ODATA_USER_URL, content_type='application/json', body=user_callback)
|
||||
httpretty.register_uri(httpretty.GET, ODATA_USER_URL, content_type="application/json", body=user_callback)
|
||||
|
||||
def _mock_odata_api_for_error(self, odata_api_root_url, username):
|
||||
"""
|
||||
@@ -550,17 +557,17 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
"""
|
||||
Return a 500 error when someone tries to call the URL.
|
||||
"""
|
||||
headers['CorrelationId'] = 'aefd38b7-c92c-445a-8c7a-487a3f0c7a9d'
|
||||
headers['RequestNo'] = '[787177]' # This is the format SAPSF returns for the transaction request number
|
||||
return 500, headers, 'Failure!'
|
||||
headers["CorrelationId"] = "aefd38b7-c92c-445a-8c7a-487a3f0c7a9d"
|
||||
headers["RequestNo"] = "[787177]" # This is the format SAPSF returns for the transaction request number
|
||||
return 500, headers, "Failure!"
|
||||
|
||||
fields = ','.join(SapSuccessFactorsIdentityProvider.default_field_mapping.copy())
|
||||
url = '{root_url}User(userId=\'{user_id}\')?$select={fields}'.format(
|
||||
fields = ",".join(SapSuccessFactorsIdentityProvider.default_field_mapping.copy())
|
||||
url = "{root_url}User(userId='{user_id}')?$select={fields}".format(
|
||||
root_url=odata_api_root_url,
|
||||
user_id=username,
|
||||
fields=fields,
|
||||
)
|
||||
httpretty.register_uri(httpretty.GET, url, body=callback, content_type='application/json')
|
||||
httpretty.register_uri(httpretty.GET, url, body=callback, content_type="application/json")
|
||||
return url
|
||||
|
||||
def test_register_insufficient_sapsf_metadata(self):
|
||||
@@ -569,7 +576,7 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
SuccessFactors API, and test that it falls back to the data it receives from the SAML assertion.
|
||||
"""
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings='{"key_i_dont_need":"value_i_also_dont_need"}',
|
||||
)
|
||||
@@ -579,7 +586,7 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
self.USER_USERNAME = "myself"
|
||||
self._test_register()
|
||||
|
||||
@patch.dict('django.conf.settings.REGISTRATION_EXTRA_FIELDS', country='optional')
|
||||
@patch.dict("django.conf.settings.REGISTRATION_EXTRA_FIELDS", country="optional")
|
||||
def test_register_sapsf_metadata_present(self):
|
||||
"""
|
||||
Configure the provider such that it can talk to a mocked-out version of the SAP SuccessFactors
|
||||
@@ -589,19 +596,19 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
what we're looking for, and when an empty override is provided (expected behavior is that
|
||||
existing value maps will be left alone).
|
||||
"""
|
||||
expected_country = 'US'
|
||||
expected_country = "US"
|
||||
provider_settings = {
|
||||
'sapsf_oauth_root_url': 'http://successfactors.com/oauth/',
|
||||
'sapsf_private_key': 'fake_private_key_here',
|
||||
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
|
||||
'odata_company_id': 'NCC1701D',
|
||||
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
|
||||
"sapsf_oauth_root_url": "http://successfactors.com/oauth/",
|
||||
"sapsf_private_key": "fake_private_key_here",
|
||||
"odata_api_root_url": "http://api.successfactors.com/odata/v2/",
|
||||
"odata_company_id": "NCC1701D",
|
||||
"odata_client_id": "TatVotSEiCMteSNWtSOnLanCtBGwNhGB",
|
||||
}
|
||||
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings=json.dumps(provider_settings)
|
||||
other_settings=json.dumps(provider_settings),
|
||||
)
|
||||
self._test_register(country=expected_country)
|
||||
|
||||
@@ -616,47 +623,49 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
"""
|
||||
# Mock the call to the SAP SuccessFactors OData user endpoint
|
||||
ODATA_USER_URL = (
|
||||
'http://api.successfactors.com/odata/v2/User(userId=\'myself\')'
|
||||
'?$select=firstName,country,lastName,defaultFullName,email'
|
||||
"http://api.successfactors.com/odata/v2/User(userId='myself')"
|
||||
"?$select=firstName,country,lastName,defaultFullName,email"
|
||||
)
|
||||
|
||||
def user_callback(request, _uri, headers):
|
||||
auth_header = request.headers.get('Authorization')
|
||||
assert auth_header == 'Bearer faketoken'
|
||||
auth_header = request.headers.get("Authorization")
|
||||
assert auth_header == "Bearer faketoken"
|
||||
return (
|
||||
200,
|
||||
headers,
|
||||
json.dumps({
|
||||
'd': {
|
||||
'username': 'jsmith',
|
||||
'firstName': 'John',
|
||||
'lastName': 'Smith',
|
||||
'defaultFullName': 'John Smith',
|
||||
'country': 'United States'
|
||||
json.dumps(
|
||||
{
|
||||
"d": {
|
||||
"username": "jsmith",
|
||||
"firstName": "John",
|
||||
"lastName": "Smith",
|
||||
"defaultFullName": "John Smith",
|
||||
"country": "United States",
|
||||
}
|
||||
}
|
||||
})
|
||||
),
|
||||
)
|
||||
|
||||
httpretty.register_uri(httpretty.GET, ODATA_USER_URL, content_type='application/json', body=user_callback)
|
||||
httpretty.register_uri(httpretty.GET, ODATA_USER_URL, content_type="application/json", body=user_callback)
|
||||
|
||||
provider_settings = {
|
||||
'sapsf_oauth_root_url': 'http://successfactors.com/oauth/',
|
||||
'sapsf_private_key': 'fake_private_key_here',
|
||||
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
|
||||
'odata_company_id': 'NCC1701D',
|
||||
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
|
||||
"sapsf_oauth_root_url": "http://successfactors.com/oauth/",
|
||||
"sapsf_private_key": "fake_private_key_here",
|
||||
"odata_api_root_url": "http://api.successfactors.com/odata/v2/",
|
||||
"odata_company_id": "NCC1701D",
|
||||
"odata_client_id": "TatVotSEiCMteSNWtSOnLanCtBGwNhGB",
|
||||
}
|
||||
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings=json.dumps(provider_settings),
|
||||
default_email='default@testshib.org'
|
||||
default_email="default@testshib.org",
|
||||
)
|
||||
self.USER_EMAIL = 'default@testshib.org'
|
||||
self.USER_EMAIL = "default@testshib.org"
|
||||
self._test_register()
|
||||
|
||||
@patch.dict('django.conf.settings.REGISTRATION_EXTRA_FIELDS', country='optional')
|
||||
@patch.dict("django.conf.settings.REGISTRATION_EXTRA_FIELDS", country="optional")
|
||||
def test_register_sapsf_metadata_present_override_relevant_value(self):
|
||||
"""
|
||||
Configure the provider such that it can talk to a mocked-out version of the SAP SuccessFactors
|
||||
@@ -666,26 +675,26 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
what we're looking for, and when an empty override is provided (expected behavior is that
|
||||
existing value maps will be left alone).
|
||||
"""
|
||||
value_map = {'country': {'United States': 'NZ'}}
|
||||
expected_country = 'NZ'
|
||||
value_map = {"country": {"United States": "NZ"}}
|
||||
expected_country = "NZ"
|
||||
provider_settings = {
|
||||
'sapsf_oauth_root_url': 'http://successfactors.com/oauth/',
|
||||
'sapsf_private_key': 'fake_private_key_here',
|
||||
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
|
||||
'odata_company_id': 'NCC1701D',
|
||||
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
|
||||
"sapsf_oauth_root_url": "http://successfactors.com/oauth/",
|
||||
"sapsf_private_key": "fake_private_key_here",
|
||||
"odata_api_root_url": "http://api.successfactors.com/odata/v2/",
|
||||
"odata_company_id": "NCC1701D",
|
||||
"odata_client_id": "TatVotSEiCMteSNWtSOnLanCtBGwNhGB",
|
||||
}
|
||||
if value_map:
|
||||
provider_settings['sapsf_value_mappings'] = value_map
|
||||
provider_settings["sapsf_value_mappings"] = value_map
|
||||
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings=json.dumps(provider_settings)
|
||||
other_settings=json.dumps(provider_settings),
|
||||
)
|
||||
self._test_register(country=expected_country)
|
||||
|
||||
@patch.dict('django.conf.settings.REGISTRATION_EXTRA_FIELDS', country='optional')
|
||||
@patch.dict("django.conf.settings.REGISTRATION_EXTRA_FIELDS", country="optional")
|
||||
def test_register_sapsf_metadata_present_override_other_value(self):
|
||||
"""
|
||||
Configure the provider such that it can talk to a mocked-out version of the SAP SuccessFactors
|
||||
@@ -695,26 +704,26 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
what we're looking for, and when an empty override is provided (expected behavior is that
|
||||
existing value maps will be left alone).
|
||||
"""
|
||||
value_map = {'country': {'Australia': 'blahfake'}}
|
||||
expected_country = 'US'
|
||||
value_map = {"country": {"Australia": "blahfake"}}
|
||||
expected_country = "US"
|
||||
provider_settings = {
|
||||
'sapsf_oauth_root_url': 'http://successfactors.com/oauth/',
|
||||
'sapsf_private_key': 'fake_private_key_here',
|
||||
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
|
||||
'odata_company_id': 'NCC1701D',
|
||||
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
|
||||
"sapsf_oauth_root_url": "http://successfactors.com/oauth/",
|
||||
"sapsf_private_key": "fake_private_key_here",
|
||||
"odata_api_root_url": "http://api.successfactors.com/odata/v2/",
|
||||
"odata_company_id": "NCC1701D",
|
||||
"odata_client_id": "TatVotSEiCMteSNWtSOnLanCtBGwNhGB",
|
||||
}
|
||||
if value_map:
|
||||
provider_settings['sapsf_value_mappings'] = value_map
|
||||
provider_settings["sapsf_value_mappings"] = value_map
|
||||
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings=json.dumps(provider_settings)
|
||||
other_settings=json.dumps(provider_settings),
|
||||
)
|
||||
self._test_register(country=expected_country)
|
||||
|
||||
@patch.dict('django.conf.settings.REGISTRATION_EXTRA_FIELDS', country='optional')
|
||||
@patch.dict("django.conf.settings.REGISTRATION_EXTRA_FIELDS", country="optional")
|
||||
def test_register_sapsf_metadata_present_empty_value_override(self):
|
||||
"""
|
||||
Configure the provider such that it can talk to a mocked-out version of the SAP SuccessFactors
|
||||
@@ -725,22 +734,22 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
existing value maps will be left alone).
|
||||
"""
|
||||
|
||||
value_map = {'country': {}}
|
||||
expected_country = 'US'
|
||||
value_map = {"country": {}}
|
||||
expected_country = "US"
|
||||
provider_settings = {
|
||||
'sapsf_oauth_root_url': 'http://successfactors.com/oauth/',
|
||||
'sapsf_private_key': 'fake_private_key_here',
|
||||
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
|
||||
'odata_company_id': 'NCC1701D',
|
||||
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
|
||||
"sapsf_oauth_root_url": "http://successfactors.com/oauth/",
|
||||
"sapsf_private_key": "fake_private_key_here",
|
||||
"odata_api_root_url": "http://api.successfactors.com/odata/v2/",
|
||||
"odata_company_id": "NCC1701D",
|
||||
"odata_client_id": "TatVotSEiCMteSNWtSOnLanCtBGwNhGB",
|
||||
}
|
||||
if value_map:
|
||||
provider_settings['sapsf_value_mappings'] = value_map
|
||||
provider_settings["sapsf_value_mappings"] = value_map
|
||||
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings=json.dumps(provider_settings)
|
||||
other_settings=json.dumps(provider_settings),
|
||||
)
|
||||
self._test_register(country=expected_country)
|
||||
|
||||
@@ -750,15 +759,17 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
metadata from the SAML assertion.
|
||||
"""
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings=json.dumps({
|
||||
'sapsf_oauth_root_url': 'http://successfactors.com/oauth-fake/',
|
||||
'sapsf_private_key': 'fake_private_key_here',
|
||||
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
|
||||
'odata_company_id': 'NCC1701D',
|
||||
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
|
||||
})
|
||||
other_settings=json.dumps(
|
||||
{
|
||||
"sapsf_oauth_root_url": "http://successfactors.com/oauth-fake/",
|
||||
"sapsf_private_key": "fake_private_key_here",
|
||||
"odata_api_root_url": "http://api.successfactors.com/odata/v2/",
|
||||
"odata_company_id": "NCC1701D",
|
||||
"odata_client_id": "TatVotSEiCMteSNWtSOnLanCtBGwNhGB",
|
||||
}
|
||||
),
|
||||
)
|
||||
# Because we're getting details from the assertion, fall back to the initial set of details.
|
||||
self.USER_EMAIL = "myself@testshib.org"
|
||||
@@ -776,39 +787,41 @@ class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTes
|
||||
self.USER_NAME = "Me Myself And I"
|
||||
self.USER_USERNAME = "myself"
|
||||
|
||||
odata_company_id = 'NCC1701D'
|
||||
odata_api_root_url = 'http://api.successfactors.com/odata/v2/'
|
||||
odata_company_id = "NCC1701D"
|
||||
odata_api_root_url = "http://api.successfactors.com/odata/v2/"
|
||||
mocked_odata_api_url = self._mock_odata_api_for_error(odata_api_root_url, self.USER_USERNAME)
|
||||
self._configure_testshib_provider(
|
||||
identity_provider_type='sap_success_factors',
|
||||
identity_provider_type="sap_success_factors",
|
||||
metadata_source=TESTSHIB_METADATA_URL,
|
||||
other_settings=json.dumps({
|
||||
'sapsf_oauth_root_url': 'http://successfactors.com/oauth/',
|
||||
'sapsf_private_key': 'fake_private_key_here',
|
||||
'odata_api_root_url': odata_api_root_url,
|
||||
'odata_company_id': odata_company_id,
|
||||
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
|
||||
})
|
||||
other_settings=json.dumps(
|
||||
{
|
||||
"sapsf_oauth_root_url": "http://successfactors.com/oauth/",
|
||||
"sapsf_private_key": "fake_private_key_here",
|
||||
"odata_api_root_url": odata_api_root_url,
|
||||
"odata_company_id": odata_company_id,
|
||||
"odata_client_id": "TatVotSEiCMteSNWtSOnLanCtBGwNhGB",
|
||||
}
|
||||
),
|
||||
)
|
||||
with LogCapture(level=logging.WARNING) as log_capture:
|
||||
self._test_register()
|
||||
logging_messages = str([log_msg.getMessage() for log_msg in log_capture.records]).replace('\\', '')
|
||||
logging_messages = str([log_msg.getMessage() for log_msg in log_capture.records]).replace("\\", "")
|
||||
assert odata_company_id in logging_messages
|
||||
assert mocked_odata_api_url in logging_messages
|
||||
assert self.USER_USERNAME in logging_messages
|
||||
assert 'SAPSuccessFactors' in logging_messages
|
||||
assert 'Error message' in logging_messages
|
||||
assert 'System message' in logging_messages
|
||||
assert 'Headers' in logging_messages
|
||||
assert "SAPSuccessFactors" in logging_messages
|
||||
assert "Error message" in logging_messages
|
||||
assert "System message" in logging_messages
|
||||
assert "Headers" in logging_messages
|
||||
|
||||
@skip('Test not necessary for this subclass')
|
||||
@skip("Test not necessary for this subclass")
|
||||
def test_get_saml_idp_class_with_fake_identifier(self):
|
||||
pass
|
||||
|
||||
@skip('Test not necessary for this subclass')
|
||||
@skip("Test not necessary for this subclass")
|
||||
def test_login(self):
|
||||
pass
|
||||
|
||||
@skip('Test not necessary for this subclass')
|
||||
@skip("Test not necessary for this subclass")
|
||||
def test_register(self):
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user