Merge pull request #1327 from MITx/bugfix/brian/openid_provider_post
Fix handling of openid provider requests that use POST
This commit is contained in:
0
common/djangoapps/external_auth/tests/__init__.py
Normal file
0
common/djangoapps/external_auth/tests/__init__.py
Normal file
209
common/djangoapps/external_auth/tests/test_openid_provider.py
Normal file
209
common/djangoapps/external_auth/tests/test_openid_provider.py
Normal file
@@ -0,0 +1,209 @@
|
||||
'''
|
||||
Created on Jan 18, 2013
|
||||
|
||||
@author: brian
|
||||
'''
|
||||
import openid
|
||||
from openid.fetchers import HTTPFetcher, HTTPResponse
|
||||
from urlparse import parse_qs
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import TestCase, LiveServerTestCase
|
||||
# from django.contrib.auth.models import User
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test.client import RequestFactory
|
||||
|
||||
class MyFetcher(HTTPFetcher):
|
||||
"""A fetcher that uses server-internal calls for performing HTTP
|
||||
requests.
|
||||
"""
|
||||
|
||||
def __init__(self, client):
|
||||
"""@param client: A test client object"""
|
||||
|
||||
super(MyFetcher, self).__init__()
|
||||
self.client = client
|
||||
|
||||
def fetch(self, url, body=None, headers=None):
|
||||
"""Perform an HTTP request
|
||||
|
||||
@raises Exception: Any exception that can be raised by Django
|
||||
|
||||
@see: C{L{HTTPFetcher.fetch}}
|
||||
"""
|
||||
if body:
|
||||
# method = 'POST'
|
||||
# undo the URL encoding of the POST arguments
|
||||
data = parse_qs(body)
|
||||
response = self.client.post(url, data)
|
||||
else:
|
||||
# method = 'GET'
|
||||
data = {}
|
||||
if headers and 'Accept' in headers:
|
||||
data['CONTENT_TYPE'] = headers['Accept']
|
||||
response = self.client.get(url, data)
|
||||
|
||||
# Translate the test client response to the fetcher's HTTP response abstraction
|
||||
content = response.content
|
||||
final_url = url
|
||||
response_headers = {}
|
||||
if 'Content-Type' in response:
|
||||
response_headers['content-type'] = response['Content-Type']
|
||||
if 'X-XRDS-Location' in response:
|
||||
response_headers['x-xrds-location'] = response['X-XRDS-Location']
|
||||
status = response.status_code
|
||||
|
||||
return HTTPResponse(
|
||||
body=content,
|
||||
final_url=final_url,
|
||||
headers=response_headers,
|
||||
status=status,
|
||||
)
|
||||
|
||||
class OpenIdProviderTest(TestCase):
|
||||
|
||||
# def setUp(self):
|
||||
# username = 'viewtest'
|
||||
# email = 'view@test.com'
|
||||
# password = 'foo'
|
||||
# user = User.objects.create_user(username, email, password)
|
||||
|
||||
def testBeginLoginWithXrdsUrl(self):
|
||||
# skip the test if openid is not enabled (as in cms.envs.test):
|
||||
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
|
||||
return
|
||||
|
||||
# the provider URL must be converted to an absolute URL in order to be
|
||||
# used as an openid provider.
|
||||
provider_url = reverse('openid-provider-xrds')
|
||||
factory = RequestFactory()
|
||||
request = factory.request()
|
||||
abs_provider_url = request.build_absolute_uri(location = provider_url)
|
||||
|
||||
# In order for this absolute URL to work (i.e. to get xrds, then authentication)
|
||||
# in the test environment, we either need a live server that works with the default
|
||||
# fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
|
||||
# Here we do the latter:
|
||||
fetcher = MyFetcher(self.client)
|
||||
openid.fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
|
||||
|
||||
# now we can begin the login process by invoking a local openid client,
|
||||
# with a pointer to the (also-local) openid provider:
|
||||
with self.settings(OPENID_SSO_SERVER_URL = abs_provider_url):
|
||||
url = reverse('openid-login')
|
||||
resp = self.client.post(url)
|
||||
code = 200
|
||||
self.assertEqual(resp.status_code, code,
|
||||
"got code {0} for url '{1}'. Expected code {2}"
|
||||
.format(resp.status_code, url, code))
|
||||
|
||||
def testBeginLoginWithLoginUrl(self):
|
||||
# skip the test if openid is not enabled (as in cms.envs.test):
|
||||
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
|
||||
return
|
||||
|
||||
# the provider URL must be converted to an absolute URL in order to be
|
||||
# used as an openid provider.
|
||||
provider_url = reverse('openid-provider-login')
|
||||
factory = RequestFactory()
|
||||
request = factory.request()
|
||||
abs_provider_url = request.build_absolute_uri(location = provider_url)
|
||||
|
||||
# In order for this absolute URL to work (i.e. to get xrds, then authentication)
|
||||
# in the test environment, we either need a live server that works with the default
|
||||
# fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
|
||||
# Here we do the latter:
|
||||
fetcher = MyFetcher(self.client)
|
||||
openid.fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
|
||||
|
||||
# now we can begin the login process by invoking a local openid client,
|
||||
# with a pointer to the (also-local) openid provider:
|
||||
with self.settings(OPENID_SSO_SERVER_URL = abs_provider_url):
|
||||
url = reverse('openid-login')
|
||||
resp = self.client.post(url)
|
||||
code = 200
|
||||
self.assertEqual(resp.status_code, code,
|
||||
"got code {0} for url '{1}'. Expected code {2}"
|
||||
.format(resp.status_code, url, code))
|
||||
self.assertContains(resp, '<input name="openid.mode" type="hidden" value="checkid_setup" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ns" type="hidden" value="http://specs.openid.net/auth/2.0" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.identity" type="hidden" value="http://specs.openid.net/auth/2.0/identifier_select" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.claimed_id" type="hidden" value="http://specs.openid.net/auth/2.0/identifier_select" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ns.ax" type="hidden" value="http://openid.net/srv/ax/1.0" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.mode" type="hidden" value="fetch_request" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.required" type="hidden" value="email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.fullname" type="hidden" value="http://axschema.org/namePerson" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.lastname" type="hidden" value="http://axschema.org/namePerson/last" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.firstname" type="hidden" value="http://axschema.org/namePerson/first" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.nickname" type="hidden" value="http://axschema.org/namePerson/friendly" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.email" type="hidden" value="http://axschema.org/contact/email" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.old_email" type="hidden" value="http://schema.openid.net/contact/email" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.old_nickname" type="hidden" value="http://schema.openid.net/namePerson/friendly" />', html=True)
|
||||
self.assertContains(resp, '<input name="openid.ax.type.old_fullname" type="hidden" value="http://schema.openid.net/namePerson" />', html=True)
|
||||
self.assertContains(resp, '<input type="submit" value="Continue" />', html=True)
|
||||
# this should work on the server:
|
||||
self.assertContains(resp, '<input name="openid.realm" type="hidden" value="http://testserver/" />', html=True)
|
||||
|
||||
# not included here are elements that will vary from run to run:
|
||||
# <input name="openid.return_to" type="hidden" value="http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H" />
|
||||
# <input name="openid.assoc_handle" type="hidden" value="{HMAC-SHA1}{50ff8120}{rh87+Q==}" />
|
||||
|
||||
|
||||
def testOpenIdSetup(self):
|
||||
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
|
||||
return
|
||||
url = reverse('openid-provider-login')
|
||||
post_args = {
|
||||
"openid.mode" : "checkid_setup",
|
||||
"openid.return_to" : "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H",
|
||||
"openid.assoc_handle" : "{HMAC-SHA1}{50ff8120}{rh87+Q==}",
|
||||
"openid.claimed_id" : "http://specs.openid.net/auth/2.0/identifier_select",
|
||||
"openid.ns" : "http://specs.openid.net/auth/2.0",
|
||||
"openid.realm" : "http://testserver/",
|
||||
"openid.identity" : "http://specs.openid.net/auth/2.0/identifier_select",
|
||||
"openid.ns.ax" : "http://openid.net/srv/ax/1.0",
|
||||
"openid.ax.mode" : "fetch_request",
|
||||
"openid.ax.required" : "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname",
|
||||
"openid.ax.type.fullname" : "http://axschema.org/namePerson",
|
||||
"openid.ax.type.lastname" : "http://axschema.org/namePerson/last",
|
||||
"openid.ax.type.firstname" : "http://axschema.org/namePerson/first",
|
||||
"openid.ax.type.nickname" : "http://axschema.org/namePerson/friendly",
|
||||
"openid.ax.type.email" : "http://axschema.org/contact/email",
|
||||
"openid.ax.type.old_email" : "http://schema.openid.net/contact/email",
|
||||
"openid.ax.type.old_nickname" : "http://schema.openid.net/namePerson/friendly",
|
||||
"openid.ax.type.old_fullname" : "http://schema.openid.net/namePerson",
|
||||
}
|
||||
resp = self.client.post(url, post_args)
|
||||
code = 200
|
||||
self.assertEqual(resp.status_code, code,
|
||||
"got code {0} for url '{1}'. Expected code {2}"
|
||||
.format(resp.status_code, url, code))
|
||||
|
||||
|
||||
# In order for this absolute URL to work (i.e. to get xrds, then authentication)
|
||||
# in the test environment, we either need a live server that works with the default
|
||||
# fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
|
||||
# Here we do the former.
|
||||
class OpenIdProviderLiveServerTest(LiveServerTestCase):
|
||||
|
||||
def testBeginLogin(self):
|
||||
# skip the test if openid is not enabled (as in cms.envs.test):
|
||||
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
|
||||
return
|
||||
|
||||
# the provider URL must be converted to an absolute URL in order to be
|
||||
# used as an openid provider.
|
||||
provider_url = reverse('openid-provider-xrds')
|
||||
factory = RequestFactory()
|
||||
request = factory.request()
|
||||
abs_provider_url = request.build_absolute_uri(location = provider_url)
|
||||
|
||||
# now we can begin the login process by invoking a local openid client,
|
||||
# with a pointer to the (also-local) openid provider:
|
||||
with self.settings(OPENID_SSO_SERVER_URL = abs_provider_url):
|
||||
url = reverse('openid-login')
|
||||
resp = self.client.post(url)
|
||||
code = 200
|
||||
self.assertEqual(resp.status_code, code,
|
||||
"got code {0} for url '{1}'. Expected code {2}"
|
||||
.format(resp.status_code, url, code))
|
||||
@@ -438,7 +438,9 @@ def provider_login(request):
|
||||
store = DjangoOpenIDStore()
|
||||
server = Server(store, endpoint)
|
||||
|
||||
# handle OpenID request
|
||||
# first check to see if the request is an OpenID request.
|
||||
# If so, the client will have specified an 'openid.mode' as part
|
||||
# of the request.
|
||||
querydict = dict(request.REQUEST.items())
|
||||
error = False
|
||||
if 'openid.mode' in request.GET or 'openid.mode' in request.POST:
|
||||
@@ -458,6 +460,8 @@ def provider_login(request):
|
||||
openid_request.answer(False), {})
|
||||
|
||||
# checkid_setup, so display login page
|
||||
# (by falling through to the provider_login at the
|
||||
# bottom of this method).
|
||||
elif openid_request.mode == 'checkid_setup':
|
||||
if openid_request.idSelect():
|
||||
# remember request and original path
|
||||
@@ -476,8 +480,10 @@ def provider_login(request):
|
||||
return provider_respond(server, openid_request,
|
||||
server.handleRequest(openid_request), {})
|
||||
|
||||
# handle login
|
||||
if request.method == 'POST' and 'openid_setup' in request.session:
|
||||
# handle login redirection: these are also sent to this view function,
|
||||
# but are distinguished by lacking the openid mode. We also know that
|
||||
# they are posts, because they come from the popup
|
||||
elif request.method == 'POST' and 'openid_setup' in request.session:
|
||||
# get OpenID request from session
|
||||
openid_setup = request.session['openid_setup']
|
||||
openid_request = openid_setup['request']
|
||||
@@ -489,6 +495,8 @@ def provider_login(request):
|
||||
return default_render_failure(request, "Invalid OpenID trust root")
|
||||
|
||||
# check if user with given email exists
|
||||
# Failure is redirected to this method (by using the original URL),
|
||||
# which will bring up the login dialog.
|
||||
email = request.POST.get('email', None)
|
||||
try:
|
||||
user = User.objects.get(email=email)
|
||||
@@ -498,7 +506,8 @@ def provider_login(request):
|
||||
log.warning(msg)
|
||||
return HttpResponseRedirect(openid_request_url)
|
||||
|
||||
# attempt to authenticate user
|
||||
# attempt to authenticate user (but not actually log them in...)
|
||||
# Failure is again redirected to the login dialog.
|
||||
username = user.username
|
||||
password = request.POST.get('password', None)
|
||||
user = authenticate(username=username, password=password)
|
||||
@@ -509,7 +518,8 @@ def provider_login(request):
|
||||
log.warning(msg)
|
||||
return HttpResponseRedirect(openid_request_url)
|
||||
|
||||
# authentication succeeded, so log user in
|
||||
# authentication succeeded, so fetch user information
|
||||
# that was requested
|
||||
if user is not None and user.is_active:
|
||||
# remove error from session since login succeeded
|
||||
if 'openid_error' in request.session:
|
||||
@@ -534,13 +544,19 @@ def provider_login(request):
|
||||
# break the CS50 client. Temporarily we will be returning
|
||||
# username filling in for fullname in addition to username
|
||||
# as sreg nickname.
|
||||
|
||||
# Note too that this is hardcoded, and not really responding to
|
||||
# the extensions that were registered in the first place.
|
||||
results = {
|
||||
'nickname': user.username,
|
||||
'email': user.email,
|
||||
'fullname': user.username
|
||||
}
|
||||
|
||||
# the request succeeded:
|
||||
return provider_respond(server, openid_request, response, results)
|
||||
|
||||
# the account is not active, so redirect back to the login page:
|
||||
request.session['openid_error'] = True
|
||||
msg = "Login failed - Account not active for user {0}".format(username)
|
||||
log.warning(msg)
|
||||
@@ -559,7 +575,7 @@ def provider_login(request):
|
||||
'return_to': return_to
|
||||
})
|
||||
|
||||
# custom XRDS header necessary for discovery process
|
||||
# add custom XRDS header necessary for discovery process
|
||||
response['X-XRDS-Location'] = get_xrds_url('xrds', request)
|
||||
return response
|
||||
|
||||
|
||||
@@ -125,8 +125,15 @@ SECRET_KEY = '85920908f28904ed733fe576320db18cabd7b6cd'
|
||||
################################## OPENID ######################################
|
||||
MITX_FEATURES['AUTH_USE_OPENID'] = True
|
||||
MITX_FEATURES['AUTH_USE_OPENID_PROVIDER'] = True
|
||||
|
||||
OPENID_CREATE_USERS = False
|
||||
OPENID_UPDATE_DETAILS_FROM_SREG = True
|
||||
OPENID_USE_AS_ADMIN_LOGIN = False
|
||||
OPENID_PROVIDER_TRUSTED_ROOTS = ['*']
|
||||
|
||||
INSTALLED_APPS += ('external_auth',)
|
||||
INSTALLED_APPS += ('django_openid_auth',)
|
||||
|
||||
############################ STATIC FILES #############################
|
||||
DEFAULT_FILE_STORAGE = 'django.core.files.storage.FileSystemStorage'
|
||||
MEDIA_ROOT = TEST_ROOT / "uploads"
|
||||
|
||||
Reference in New Issue
Block a user