Merge pull request #9903 from open-craft/sso-custom-auth-entry-points
Allow using a custom login/register form with third_party_auth
This commit is contained in:
@@ -57,6 +57,10 @@ rather than spreading them across two functions in the pipeline.
|
||||
See http://psa.matiasaguirre.net/docs/pipeline.html for more docs.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import random
|
||||
import string
|
||||
from collections import OrderedDict
|
||||
@@ -104,6 +108,18 @@ AUTH_ENTRY_ACCOUNT_SETTINGS = 'account_settings'
|
||||
AUTH_ENTRY_LOGIN_API = 'login_api'
|
||||
AUTH_ENTRY_REGISTER_API = 'register_api'
|
||||
|
||||
# AUTH_ENTRY_CUSTOM: Custom auth entry point for post-auth integrations.
|
||||
# This should be a dict where the key is a word passed via ?auth_entry=, and the
|
||||
# value is a dict with an arbitrary 'secret_key' and a 'url'.
|
||||
# This can be used as an extension point to inject custom behavior into the auth
|
||||
# process, replacing the registration/login form that would normally be seen
|
||||
# immediately after the user has authenticated with the third party provider.
|
||||
# If a custom 'auth_entry' query parameter is used, then once the user has
|
||||
# authenticated with a specific backend/provider, they will be redirected to the
|
||||
# URL specified with this setting, rather than to the built-in
|
||||
# registration/login form/logic.
|
||||
AUTH_ENTRY_CUSTOM = getattr(settings, 'THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS', {})
|
||||
|
||||
|
||||
def is_api(auth_entry):
|
||||
"""Returns whether the auth entry point is via an API call."""
|
||||
@@ -128,7 +144,7 @@ _AUTH_ENTRY_CHOICES = frozenset([
|
||||
AUTH_ENTRY_ACCOUNT_SETTINGS,
|
||||
AUTH_ENTRY_LOGIN_API,
|
||||
AUTH_ENTRY_REGISTER_API,
|
||||
])
|
||||
] + AUTH_ENTRY_CUSTOM.keys())
|
||||
|
||||
_DEFAULT_RANDOM_PASSWORD_LENGTH = 12
|
||||
_PASSWORD_CHARSET = string.letters + string.digits
|
||||
@@ -445,6 +461,33 @@ def set_pipeline_timeout(strategy, user, *args, **kwargs):
|
||||
# choice of the user.
|
||||
|
||||
|
||||
def redirect_to_custom_form(request, auth_entry, user_details):
|
||||
"""
|
||||
If auth_entry is found in AUTH_ENTRY_CUSTOM, this is used to send provider
|
||||
data to an external server's registration/login page.
|
||||
|
||||
The data is sent as a base64-encoded values in a POST request and includes
|
||||
a cryptographic checksum in case the integrity of the data is important.
|
||||
"""
|
||||
form_info = AUTH_ENTRY_CUSTOM[auth_entry]
|
||||
secret_key = form_info['secret_key']
|
||||
if isinstance(secret_key, unicode):
|
||||
secret_key = secret_key.encode('utf-8')
|
||||
custom_form_url = form_info['url']
|
||||
data_str = json.dumps({
|
||||
"user_details": user_details
|
||||
})
|
||||
digest = hmac.new(secret_key, msg=data_str, digestmod=hashlib.sha256).digest()
|
||||
# Store the data in the session temporarily, then redirect to a page that will POST it to
|
||||
# the custom login/register page.
|
||||
request.session['tpa_custom_auth_entry_data'] = {
|
||||
'data': base64.b64encode(data_str),
|
||||
'hmac': base64.b64encode(digest),
|
||||
'post_url': custom_form_url,
|
||||
}
|
||||
return redirect(reverse('tpa_post_to_custom_auth_form'))
|
||||
|
||||
|
||||
@partial.partial
|
||||
def ensure_user_information(strategy, auth_entry, backend=None, user=None, social=None,
|
||||
allow_inactive_user=False, *args, **kwargs):
|
||||
@@ -492,6 +535,9 @@ def ensure_user_information(strategy, auth_entry, backend=None, user=None, socia
|
||||
return dispatch_to_register()
|
||||
elif auth_entry == AUTH_ENTRY_ACCOUNT_SETTINGS:
|
||||
raise AuthEntryError(backend, 'auth_entry is wrong. Settings requires a user.')
|
||||
elif auth_entry in AUTH_ENTRY_CUSTOM:
|
||||
# Pass the username, email, etc. via query params to the custom entry page:
|
||||
return redirect_to_custom_form(strategy.request, auth_entry, kwargs['details'])
|
||||
else:
|
||||
raise AuthEntryError(backend, 'auth_entry invalid')
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ A custom Strategy for python-social-auth that allows us to fetch configuration f
|
||||
ConfigurationModels rather than django.settings
|
||||
"""
|
||||
from .models import OAuth2ProviderConfig
|
||||
from .pipeline import AUTH_ENTRY_CUSTOM
|
||||
from social.backends.oauth import OAuthAuth
|
||||
from social.strategies.django_strategy import DjangoStrategy
|
||||
|
||||
@@ -31,6 +32,15 @@ class ConfigurationModelStrategy(DjangoStrategy):
|
||||
return provider_config.get_setting(name)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# special case handling of login error URL if we're using a custom auth entry point:
|
||||
if name == 'LOGIN_ERROR_URL':
|
||||
auth_entry = self.request.session.get('auth_entry')
|
||||
if auth_entry and auth_entry in AUTH_ENTRY_CUSTOM:
|
||||
error_url = AUTH_ENTRY_CUSTOM[auth_entry].get('error_url')
|
||||
if error_url:
|
||||
return error_url
|
||||
|
||||
# At this point, we know 'name' is not set in a [OAuth2|LTI|SAML]ProviderConfig row.
|
||||
# It's probably a global Django setting like 'FIELDS_STORED_IN_SESSION':
|
||||
return super(ConfigurationModelStrategy, self).setting(name, default, backend)
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
{% load i18n %}
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<title>{% trans "Please wait" %}</title>
|
||||
<style type="text/css">
|
||||
#djDebug {display:none;}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<form id="sso-data-form" action="{{post_url}}" method="post">
|
||||
{% csrf_token %}
|
||||
<input type="hidden" name="sso_data" value="{{data}}">
|
||||
<input type="hidden" name="sso_data_hmac" value="{{hmac}}">
|
||||
<noscript>
|
||||
<input id="submit-button" type="submit" value="Click to continue" autofocus>
|
||||
</noscript>
|
||||
</form>
|
||||
<script>
|
||||
document.getElementById('sso-data-form').submit();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
@@ -1,5 +1,14 @@
|
||||
"""Integration tests for Google providers."""
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
import json
|
||||
from mock import patch
|
||||
from social.exceptions import AuthException
|
||||
from student.tests.factories import UserFactory
|
||||
from third_party_auth import pipeline
|
||||
from third_party_auth.tests.specs import base
|
||||
|
||||
|
||||
@@ -34,3 +43,90 @@ class GoogleOauth2IntegrationTest(base.Oauth2IntegrationTest):
|
||||
|
||||
def get_username(self):
|
||||
return self.get_response_data().get('email').split('@')[0]
|
||||
|
||||
def assert_redirect_to_provider_looks_correct(self, response):
|
||||
super(GoogleOauth2IntegrationTest, self).assert_redirect_to_provider_looks_correct(response)
|
||||
self.assertIn('google.com', response['Location'])
|
||||
|
||||
def test_custom_form(self):
|
||||
"""
|
||||
Use the Google provider to test the custom login/register form feature.
|
||||
"""
|
||||
# The pipeline starts by a user GETting /auth/login/google-oauth2/?auth_entry=custom1
|
||||
# Synthesize that request and check that it redirects to the correct
|
||||
# provider page.
|
||||
auth_entry = 'custom1' # See definition in lms/envs/test.py
|
||||
login_url = pipeline.get_login_url(self.provider.provider_id, auth_entry)
|
||||
login_url += "&next=/misc/final-destination"
|
||||
self.assert_redirect_to_provider_looks_correct(self.client.get(login_url))
|
||||
|
||||
def fake_auth_complete(inst, *args, **kwargs):
|
||||
""" Mock the backend's auth_complete() method """
|
||||
kwargs.update({'response': self.get_response_data(), 'backend': inst})
|
||||
return inst.strategy.authenticate(*args, **kwargs)
|
||||
|
||||
# Next, the provider makes a request against /auth/complete/<provider>.
|
||||
complete_url = pipeline.get_complete_url(self.provider.backend_name)
|
||||
with patch.object(self.provider.backend_class, 'auth_complete', fake_auth_complete):
|
||||
response = self.client.get(complete_url)
|
||||
# This should redirect to the custom login/register form:
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response['Location'], 'http://example.none/auth/custom_auth_entry')
|
||||
|
||||
response = self.client.get(response['Location'])
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn('action="/misc/my-custom-registration-form" method="post"', response.content)
|
||||
data_decoded = base64.b64decode(response.context['data']) # pylint: disable=no-member
|
||||
data_parsed = json.loads(data_decoded)
|
||||
# The user's details get passed to the custom page as a base64 encoded query parameter:
|
||||
self.assertEqual(data_parsed, {
|
||||
'user_details': {
|
||||
'username': 'email_value',
|
||||
'email': 'email_value@example.com',
|
||||
'fullname': 'name_value',
|
||||
'first_name': 'given_name_value',
|
||||
'last_name': 'family_name_value',
|
||||
}
|
||||
})
|
||||
# Check the hash that is used to confirm the user's data in the GET parameter is correct
|
||||
secret_key = settings.THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS['custom1']['secret_key']
|
||||
hmac_expected = hmac.new(secret_key, msg=data_decoded, digestmod=hashlib.sha256).digest()
|
||||
self.assertEqual(base64.b64decode(response.context['hmac']), hmac_expected) # pylint: disable=no-member
|
||||
|
||||
# Now our custom registration form creates or logs in the user:
|
||||
email, password = data_parsed['user_details']['email'], 'random_password'
|
||||
created_user = UserFactory(email=email, password=password)
|
||||
login_response = self.client.post(reverse('login'), {'email': email, 'password': password})
|
||||
self.assertEqual(login_response.status_code, 200)
|
||||
|
||||
# Now our custom login/registration page must resume the pipeline:
|
||||
response = self.client.get(complete_url)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response['Location'], 'http://example.none/misc/final-destination')
|
||||
|
||||
_, strategy = self.get_request_and_strategy()
|
||||
self.assert_social_auth_exists_for_user(created_user, strategy)
|
||||
|
||||
def test_custom_form_error(self):
|
||||
"""
|
||||
Use the Google provider to test the custom login/register failure redirects.
|
||||
"""
|
||||
# The pipeline starts by a user GETting /auth/login/google-oauth2/?auth_entry=custom1
|
||||
# Synthesize that request and check that it redirects to the correct
|
||||
# provider page.
|
||||
auth_entry = 'custom1' # See definition in lms/envs/test.py
|
||||
login_url = pipeline.get_login_url(self.provider.provider_id, auth_entry)
|
||||
login_url += "&next=/misc/final-destination"
|
||||
self.assert_redirect_to_provider_looks_correct(self.client.get(login_url))
|
||||
|
||||
def fake_auth_complete_error(_inst, *_args, **_kwargs):
|
||||
""" Mock the backend's auth_complete() method """
|
||||
raise AuthException("Mock login failed")
|
||||
|
||||
# Next, the provider makes a request against /auth/complete/<provider>.
|
||||
complete_url = pipeline.get_complete_url(self.provider.backend_name)
|
||||
with patch.object(self.provider.backend_class, 'auth_complete', fake_auth_complete_error):
|
||||
response = self.client.get(complete_url)
|
||||
# This should redirect to the custom error URL
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response['Location'], 'http://example.none/misc/my-custom-sso-error-page')
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
|
||||
from django.conf.urls import include, patterns, url
|
||||
|
||||
from .views import inactive_user_view, saml_metadata_view, lti_login_and_complete_view
|
||||
from .views import inactive_user_view, saml_metadata_view, lti_login_and_complete_view, post_to_custom_auth_form
|
||||
|
||||
urlpatterns = patterns(
|
||||
'',
|
||||
url(r'^auth/inactive', inactive_user_view, name="third_party_inactive_redirect"),
|
||||
url(r'^auth/custom_auth_entry', post_to_custom_auth_form, name='tpa_post_to_custom_auth_form'),
|
||||
url(r'^auth/saml/metadata.xml', saml_metadata_view),
|
||||
url(r'^auth/login/(?P<backend>lti)/$', lti_login_and_complete_view),
|
||||
url(r'^auth/', include('social.apps.django_app.urls', namespace='social')),
|
||||
|
||||
@@ -4,7 +4,7 @@ Extra views required for SSO
|
||||
from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.http import HttpResponse, HttpResponseServerError, Http404, HttpResponseNotAllowed
|
||||
from django.shortcuts import redirect
|
||||
from django.shortcuts import redirect, render
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
import social
|
||||
from social.apps.django_app.views import complete
|
||||
@@ -59,3 +59,26 @@ def lti_login_and_complete_view(request, backend, *args, **kwargs):
|
||||
|
||||
request.backend.start()
|
||||
return complete(request, backend, *args, **kwargs)
|
||||
|
||||
|
||||
def post_to_custom_auth_form(request):
|
||||
"""
|
||||
Redirect to a custom login/register page.
|
||||
|
||||
Since we can't do a redirect-to-POST, this view is used to pass SSO data from
|
||||
the third_party_auth pipeline to a custom login/register form (possibly on another server).
|
||||
"""
|
||||
pipeline_data = request.session.pop('tpa_custom_auth_entry_data', None)
|
||||
if not pipeline_data:
|
||||
raise Http404
|
||||
# Verify the format of pipeline_data:
|
||||
data = {
|
||||
'post_url': pipeline_data['post_url'],
|
||||
# The user's name, email, etc. as base64 encoded JSON
|
||||
# It's base64 encoded because it's signed cryptographically and we don't want whitespace
|
||||
# or ordering issues affecting the hash/signature.
|
||||
'data': pipeline_data['data'],
|
||||
# The cryptographic hash of user_data:
|
||||
'hmac': pipeline_data['hmac'],
|
||||
}
|
||||
return render(request, 'third_party_auth/post_custom_auth_entry.html', data)
|
||||
|
||||
@@ -592,6 +592,11 @@ if FEATURES.get('ENABLE_THIRD_PARTY_AUTH'):
|
||||
'schedule': datetime.timedelta(hours=ENV_TOKENS.get('THIRD_PARTY_AUTH_SAML_FETCH_PERIOD_HOURS', 24)),
|
||||
}
|
||||
|
||||
# The following can be used to integrate a custom login form with third_party_auth.
|
||||
# It should be a dict where the key is a word passed via ?auth_entry=, and the value is a
|
||||
# dict with an arbitrary 'secret_key' and a 'url'.
|
||||
THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS = AUTH_TOKENS.get('THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS', {})
|
||||
|
||||
##### OAUTH2 Provider ##############
|
||||
if FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
|
||||
OAUTH_OIDC_ISSUER = ENV_TOKENS['OAUTH_OIDC_ISSUER']
|
||||
|
||||
@@ -273,6 +273,14 @@ AUTHENTICATION_BACKENDS = (
|
||||
'third_party_auth.lti.LTIAuthBackend',
|
||||
) + AUTHENTICATION_BACKENDS
|
||||
|
||||
THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS = {
|
||||
'custom1': {
|
||||
'secret_key': 'opensesame',
|
||||
'url': '/misc/my-custom-registration-form',
|
||||
'error_url': '/misc/my-custom-sso-error-page'
|
||||
},
|
||||
}
|
||||
|
||||
################################## OPENID #####################################
|
||||
FEATURES['AUTH_USE_OPENID'] = True
|
||||
FEATURES['AUTH_USE_OPENID_PROVIDER'] = True
|
||||
|
||||
Reference in New Issue
Block a user