Add audit log definition, and use for logging of logins in external_auth and student apps. Move test_login to student app. Improve conditional tests for Shibboleth login logic. (Does not include reconfiguring log settings.)
This commit is contained in:
@@ -5,6 +5,8 @@ These are notable changes in edx-platform. This is a rolling list of changes,
|
||||
in roughly chronological order, most recent first. Add your entries at or near
|
||||
the top. Include a label indicating the component affected.
|
||||
|
||||
Common: Add additional logging to cover login attempts and logouts.
|
||||
|
||||
Studio: Send e-mails to new Studio users (on edge only) when their course creator
|
||||
status has changed. This will not be in use until the course creator table
|
||||
is enabled.
|
||||
|
||||
@@ -3,6 +3,7 @@ Tests for Shibboleth Authentication
|
||||
@jbau
|
||||
"""
|
||||
import unittest
|
||||
from mock import patch
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponseRedirect
|
||||
@@ -10,7 +11,6 @@ from django.test.client import RequestFactory, Client as DjangoTestClient
|
||||
from django.test.utils import override_settings
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.contrib.auth.models import AnonymousUser, User
|
||||
from django.contrib.sessions.backends.base import SessionBase
|
||||
from django.utils.importlib import import_module
|
||||
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
@@ -27,11 +27,11 @@ from student.views import create_account, change_enrollment
|
||||
from student.models import UserProfile, Registration, CourseEnrollment
|
||||
from student.tests.factories import UserFactory
|
||||
|
||||
#Shib is supposed to provide 'REMOTE_USER', 'givenName', 'sn', 'mail', 'Shib-Identity-Provider'
|
||||
#attributes via request.META. We can count on 'Shib-Identity-Provider', and 'REMOTE_USER' being present
|
||||
#b/c of how mod_shib works but should test the behavior with the rest of the attributes present/missing
|
||||
# Shib is supposed to provide 'REMOTE_USER', 'givenName', 'sn', 'mail', 'Shib-Identity-Provider'
|
||||
# attributes via request.META. We can count on 'Shib-Identity-Provider', and 'REMOTE_USER' being present
|
||||
# b/c of how mod_shib works but should test the behavior with the rest of the attributes present/missing
|
||||
|
||||
#For the sake of python convention we'll make all of these variable names ALL_CAPS
|
||||
# For the sake of python convention we'll make all of these variable names ALL_CAPS
|
||||
IDP = 'https://idp.stanford.edu/'
|
||||
REMOTE_USER = 'test_user@stanford.edu'
|
||||
MAILS = [None, '', 'test_user@stanford.edu']
|
||||
@@ -93,6 +93,13 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
self.assertEqual(no_idp_response.status_code, 403)
|
||||
self.assertIn("identity server did not return your ID information", no_idp_response.content)
|
||||
|
||||
def _assert_shib_login_is_logged(self, audit_log_call, remote_user):
|
||||
"""Asserts that shibboleth login attempt is being logged"""
|
||||
method_name, args, _kwargs = audit_log_call
|
||||
self.assertEquals(method_name, 'info')
|
||||
self.assertEquals(len(args), 2)
|
||||
self.assertIn(u'logged in via Shibboleth', args[0])
|
||||
self.assertEquals(remote_user, args[1])
|
||||
|
||||
@unittest.skipUnless(settings.MITX_FEATURES.get('AUTH_USE_SHIB'), True)
|
||||
def test_shib_login(self):
|
||||
@@ -140,26 +147,57 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
'REMOTE_USER': remote_user,
|
||||
'mail': remote_user})
|
||||
request.user = AnonymousUser()
|
||||
response = shib_login(request)
|
||||
with patch('external_auth.views.AUDIT_LOG') as mock_audit_log:
|
||||
response = shib_login(request)
|
||||
audit_log_calls = mock_audit_log.method_calls
|
||||
|
||||
if idp == "https://idp.stanford.edu/" and remote_user == 'withmap@stanford.edu':
|
||||
self.assertIsInstance(response, HttpResponseRedirect)
|
||||
self.assertEqual(request.user, user_w_map)
|
||||
self.assertEqual(response['Location'], '/')
|
||||
# verify logging:
|
||||
self.assertEquals(len(audit_log_calls), 2)
|
||||
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
|
||||
method_name, args, _kwargs = audit_log_calls[1]
|
||||
self.assertEquals(method_name, 'info')
|
||||
self.assertEquals(len(args), 3)
|
||||
self.assertIn(u'Login success', args[0])
|
||||
self.assertEquals(remote_user, args[2])
|
||||
elif idp == "https://idp.stanford.edu/" and remote_user == 'inactive@stanford.edu':
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertIn("Account not yet activated: please look for link in your email", response.content)
|
||||
# verify logging:
|
||||
self.assertEquals(len(audit_log_calls), 2)
|
||||
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
|
||||
method_name, args, _kwargs = audit_log_calls[1]
|
||||
self.assertEquals(method_name, 'warning')
|
||||
self.assertEquals(len(args), 2)
|
||||
self.assertIn(u'is not active after external login', args[0])
|
||||
# self.assertEquals(remote_user, args[1])
|
||||
elif idp == "https://idp.stanford.edu/" and remote_user == 'womap@stanford.edu':
|
||||
self.assertIsNotNone(ExternalAuthMap.objects.get(user=user_wo_map))
|
||||
self.assertIsInstance(response, HttpResponseRedirect)
|
||||
self.assertEqual(request.user, user_wo_map)
|
||||
self.assertEqual(response['Location'], '/')
|
||||
# verify logging:
|
||||
self.assertEquals(len(audit_log_calls), 2)
|
||||
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
|
||||
method_name, args, _kwargs = audit_log_calls[1]
|
||||
self.assertEquals(method_name, 'info')
|
||||
self.assertEquals(len(args), 3)
|
||||
self.assertIn(u'Login success', args[0])
|
||||
self.assertEquals(remote_user, args[2])
|
||||
elif idp == "https://someother.idp.com/" and remote_user in \
|
||||
['withmap@stanford.edu', 'womap@stanford.edu', 'inactive@stanford.edu']:
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertIn("You have already created an account using an external login", response.content)
|
||||
# no audit logging calls
|
||||
self.assertEquals(len(audit_log_calls), 0)
|
||||
else:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertContains(response, "<title>Register for")
|
||||
# no audit logging calls
|
||||
self.assertEquals(len(audit_log_calls), 0)
|
||||
|
||||
@unittest.skipUnless(settings.MITX_FEATURES.get('AUTH_USE_SHIB'), True)
|
||||
def test_registration_form(self):
|
||||
@@ -187,7 +225,7 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
else:
|
||||
self.assertNotContains(response, fullname_input_HTML)
|
||||
|
||||
#clean up b/c we don't want existing ExternalAuthMap for the next run
|
||||
# clean up b/c we don't want existing ExternalAuthMap for the next run
|
||||
client.session['ExternalAuthMap'].delete()
|
||||
|
||||
@unittest.skipUnless(settings.MITX_FEATURES.get('AUTH_USE_SHIB'), True)
|
||||
@@ -200,25 +238,47 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
Uses django test client for its session support
|
||||
"""
|
||||
for identity in gen_all_identities():
|
||||
#First we pop the registration form
|
||||
# First we pop the registration form
|
||||
client = DjangoTestClient()
|
||||
response1 = client.get(path='/shib-login/', data={}, follow=False, **identity)
|
||||
#Then we have the user answer the registration form
|
||||
# Then we have the user answer the registration form
|
||||
postvars = {'email': 'post_email@stanford.edu',
|
||||
'username': 'post_username',
|
||||
'password': 'post_password',
|
||||
'name': 'post_name',
|
||||
'terms_of_service': 'true',
|
||||
'honor_code': 'true'}
|
||||
#use RequestFactory instead of TestClient here because we want access to request.user
|
||||
# use RequestFactory instead of TestClient here because we want access to request.user
|
||||
request2 = self.request_factory.post('/create_account', data=postvars)
|
||||
request2.session = client.session
|
||||
request2.user = AnonymousUser()
|
||||
response2 = create_account(request2)
|
||||
with patch('student.views.AUDIT_LOG') as mock_audit_log:
|
||||
_response2 = create_account(request2)
|
||||
|
||||
user = request2.user
|
||||
mail = identity.get('mail')
|
||||
#check that the created user has the right email, either taken from shib or user input
|
||||
|
||||
# verify logging of login happening during account creation:
|
||||
audit_log_calls = mock_audit_log.method_calls
|
||||
self.assertEquals(len(audit_log_calls), 3)
|
||||
method_name, args, _kwargs = audit_log_calls[0]
|
||||
self.assertEquals(method_name, 'info')
|
||||
self.assertEquals(len(args), 1)
|
||||
self.assertIn(u'Login success on new account creation', args[0])
|
||||
self.assertIn(u'post_username', args[0])
|
||||
method_name, args, _kwargs = audit_log_calls[1]
|
||||
self.assertEquals(method_name, 'info')
|
||||
self.assertEquals(len(args), 2)
|
||||
self.assertIn(u'User registered with external_auth', args[0])
|
||||
self.assertEquals(u'post_username', args[1])
|
||||
method_name, args, _kwargs = audit_log_calls[2]
|
||||
self.assertEquals(method_name, 'info')
|
||||
self.assertEquals(len(args), 3)
|
||||
self.assertIn(u'Updated ExternalAuthMap for ', args[0])
|
||||
self.assertEquals(u'post_username', args[1])
|
||||
self.assertEquals(u'test_user@stanford.edu', args[2].external_id)
|
||||
|
||||
# check that the created user has the right email, either taken from shib or user input
|
||||
if mail:
|
||||
self.assertEqual(user.email, mail)
|
||||
self.assertEqual(list(User.objects.filter(email=postvars['email'])), [])
|
||||
@@ -228,7 +288,7 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
self.assertEqual(list(User.objects.filter(email=mail)), [])
|
||||
self.assertIsNotNone(User.objects.get(email=postvars['email'])) # get enforces only 1 such user
|
||||
|
||||
#check that the created user profile has the right name, either taken from shib or user input
|
||||
# check that the created user profile has the right name, either taken from shib or user input
|
||||
profile = UserProfile.objects.get(user=user)
|
||||
sn_empty = not identity.get('sn')
|
||||
given_name_empty = not identity.get('givenName')
|
||||
@@ -236,7 +296,7 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
self.assertEqual(profile.name, postvars['name'])
|
||||
else:
|
||||
self.assertEqual(profile.name, request2.session['ExternalAuthMap'].external_name)
|
||||
#clean up for next loop
|
||||
# clean up for next loop
|
||||
request2.session['ExternalAuthMap'].delete()
|
||||
UserProfile.objects.filter(user=user).delete()
|
||||
Registration.objects.filter(user=user).delete()
|
||||
@@ -251,17 +311,17 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
|
||||
# Test for cases where course is found
|
||||
for domain in ["", "shib:https://idp.stanford.edu/"]:
|
||||
#set domains
|
||||
# set domains
|
||||
course.enrollment_domain = domain
|
||||
metadata = own_metadata(course)
|
||||
metadata['enrollment_domain'] = domain
|
||||
self.store.update_metadata(course.location.url(), metadata)
|
||||
|
||||
#setting location to test that GET params get passed through
|
||||
# setting location to test that GET params get passed through
|
||||
login_request = self.request_factory.get('/course_specific_login/MITx/999/Robot_Super_Course' +
|
||||
'?course_id=MITx/999/Robot_Super_Course' +
|
||||
'&enrollment_action=enroll')
|
||||
reg_request = self.request_factory.get('/course_specific_register/MITx/999/Robot_Super_Course' +
|
||||
_reg_request = self.request_factory.get('/course_specific_register/MITx/999/Robot_Super_Course' +
|
||||
'?course_id=MITx/999/course/Robot_Super_Course' +
|
||||
'&enrollment_action=enroll')
|
||||
|
||||
@@ -292,11 +352,11 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
'&enrollment_action=enroll')
|
||||
|
||||
# Now test for non-existent course
|
||||
#setting location to test that GET params get passed through
|
||||
# setting location to test that GET params get passed through
|
||||
login_request = self.request_factory.get('/course_specific_login/DNE/DNE/DNE' +
|
||||
'?course_id=DNE/DNE/DNE' +
|
||||
'&enrollment_action=enroll')
|
||||
reg_request = self.request_factory.get('/course_specific_register/DNE/DNE/DNE' +
|
||||
_reg_request = self.request_factory.get('/course_specific_register/DNE/DNE/DNE' +
|
||||
'?course_id=DNE/DNE/DNE/Robot_Super_Course' +
|
||||
'&enrollment_action=enroll')
|
||||
|
||||
@@ -321,7 +381,7 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
the proper external auth
|
||||
"""
|
||||
|
||||
#create 2 course, one with limited enrollment one without
|
||||
# create 2 course, one with limited enrollment one without
|
||||
shib_course = CourseFactory.create(org='Stanford', number='123', display_name='Shib Only')
|
||||
shib_course.enrollment_domain = 'shib:https://idp.stanford.edu/'
|
||||
metadata = own_metadata(shib_course)
|
||||
@@ -360,7 +420,7 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
int_student.email = "teststudent3@gmail.com"
|
||||
int_student.save()
|
||||
|
||||
#Tests the two case for courses, limited and not
|
||||
# Tests the two case for courses, limited and not
|
||||
for course in [shib_course, open_enroll_course]:
|
||||
for student in [shib_student, other_ext_student, int_student]:
|
||||
request = self.request_factory.post('/change_enrollment')
|
||||
@@ -368,11 +428,11 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
'course_id': course.id})
|
||||
request.user = student
|
||||
response = change_enrollment(request)
|
||||
#if course is not limited or student has correct shib extauth then enrollment should be allowed
|
||||
# If course is not limited or student has correct shib extauth then enrollment should be allowed
|
||||
if course is open_enroll_course or student is shib_student:
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(CourseEnrollment.objects.filter(user=student, course_id=course.id).count(), 1)
|
||||
#clean up
|
||||
# Clean up
|
||||
CourseEnrollment.objects.filter(user=student, course_id=course.id).delete()
|
||||
else:
|
||||
self.assertEqual(response.status_code, 400)
|
||||
@@ -383,9 +443,6 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
"""
|
||||
A functionality test that a student with an existing shib login can auto-enroll in a class with GET params
|
||||
"""
|
||||
if not settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
|
||||
return
|
||||
|
||||
student = UserFactory.create()
|
||||
extauth = ExternalAuthMap(external_id='testuser@stanford.edu',
|
||||
external_email='',
|
||||
@@ -403,8 +460,8 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
metadata['enrollment_domain'] = course.enrollment_domain
|
||||
self.store.update_metadata(course.location.url(), metadata)
|
||||
|
||||
#use django test client for sessions and url processing
|
||||
#no enrollment before trying
|
||||
# use django test client for sessions and url processing
|
||||
# no enrollment before trying
|
||||
self.assertEqual(CourseEnrollment.objects.filter(user=student, course_id=course.id).count(), 0)
|
||||
self.client.logout()
|
||||
request_kwargs = {'path': '/shib-login/',
|
||||
@@ -413,8 +470,8 @@ class ShibSPTest(ModuleStoreTestCase):
|
||||
'REMOTE_USER': 'testuser@stanford.edu',
|
||||
'Shib-Identity-Provider': 'https://idp.stanford.edu/'}
|
||||
response = self.client.get(**request_kwargs)
|
||||
#successful login is a redirect to "/"
|
||||
# successful login is a redirect to "/"
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response['location'], 'http://testserver/')
|
||||
#now there is enrollment
|
||||
# now there is enrollment
|
||||
self.assertEqual(CourseEnrollment.objects.filter(user=student, course_id=course.id).count(), 1)
|
||||
|
||||
@@ -17,9 +17,9 @@ from django.core.urlresolvers import reverse
|
||||
from django.core.validators import validate_email
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
from student.models import UserProfile, TestCenterUser, TestCenterRegistration
|
||||
from student.models import TestCenterUser, TestCenterRegistration
|
||||
|
||||
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest
|
||||
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, HttpResponseForbidden
|
||||
from django.utils.http import urlquote
|
||||
from django.shortcuts import redirect
|
||||
from django.utils.translation import ugettext as _
|
||||
@@ -50,7 +50,10 @@ from xmodule.modulestore import Location
|
||||
from xmodule.modulestore.exceptions import ItemNotFoundError
|
||||
|
||||
log = logging.getLogger("mitx.external_auth")
|
||||
AUDIT_LOG = logging.getLogger("audit")
|
||||
|
||||
SHIBBOLETH_DOMAIN_PREFIX = 'shib:'
|
||||
OPENID_DOMAIN_PREFIX = 'openid:'
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# OpenID Common
|
||||
@@ -81,7 +84,7 @@ def default_render_failure(request,
|
||||
def generate_password(length=12, chars=string.letters + string.digits):
|
||||
"""Generate internal password for externally authenticated user"""
|
||||
choice = random.SystemRandom().choice
|
||||
return ''.join([choice(chars) for i in range(length)])
|
||||
return ''.join([choice(chars) for _i in range(length)])
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
@@ -105,27 +108,29 @@ def openid_login_complete(request,
|
||||
log.debug('openid success, details=%s', details)
|
||||
|
||||
url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
|
||||
external_domain = "openid:%s" % url
|
||||
external_domain = "{0}{1}".format(OPENID_DOMAIN_PREFIX, url)
|
||||
fullname = '%s %s' % (details.get('first_name', ''),
|
||||
details.get('last_name', ''))
|
||||
|
||||
return external_login_or_signup(request,
|
||||
external_id,
|
||||
external_domain,
|
||||
details,
|
||||
details.get('email', ''),
|
||||
fullname)
|
||||
return _external_login_or_signup(
|
||||
request,
|
||||
external_id,
|
||||
external_domain,
|
||||
details,
|
||||
details.get('email', ''),
|
||||
fullname
|
||||
)
|
||||
|
||||
return render_failure(request, 'Openid failure')
|
||||
|
||||
|
||||
def external_login_or_signup(request,
|
||||
external_id,
|
||||
external_domain,
|
||||
credentials,
|
||||
email,
|
||||
fullname,
|
||||
retfun=None):
|
||||
def _external_login_or_signup(request,
|
||||
external_id,
|
||||
external_domain,
|
||||
credentials,
|
||||
email,
|
||||
fullname,
|
||||
retfun=None):
|
||||
"""Generic external auth login or signup"""
|
||||
|
||||
# see if we have a map from this external_id to an edX username
|
||||
@@ -142,13 +147,13 @@ def external_login_or_signup(request,
|
||||
eamap.external_name = fullname
|
||||
eamap.internal_password = generate_password()
|
||||
log.debug('Created eamap=%s', eamap)
|
||||
|
||||
eamap.save()
|
||||
|
||||
log.info(u"External_Auth login_or_signup for %s : %s : %s : %s", external_domain, external_id, email, fullname)
|
||||
uses_shibboleth = settings.MITX_FEATURES.get('AUTH_USE_SHIB') and external_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX)
|
||||
internal_user = eamap.user
|
||||
if internal_user is None:
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
|
||||
if uses_shibboleth:
|
||||
# if we are using shib, try to link accounts using email
|
||||
try:
|
||||
link_user = User.objects.get(email=eamap.external_email)
|
||||
@@ -169,14 +174,14 @@ def external_login_or_signup(request,
|
||||
return default_render_failure(request, failure_msg)
|
||||
except User.DoesNotExist:
|
||||
log.info('SHIB: No user for %s yet, doing signup', eamap.external_email)
|
||||
return signup(request, eamap)
|
||||
return _signup(request, eamap)
|
||||
else:
|
||||
log.info('No user for %s yet. doing signup', eamap.external_email)
|
||||
return signup(request, eamap)
|
||||
return _signup(request, eamap)
|
||||
|
||||
# We trust shib's authentication, so no need to authenticate using the password again
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
|
||||
uname = internal_user.username
|
||||
uname = internal_user.username
|
||||
if uses_shibboleth:
|
||||
user = internal_user
|
||||
# Assuming this 'AUTHENTICATION_BACKENDS' is set in settings, which I think is safe
|
||||
if settings.AUTHENTICATION_BACKENDS:
|
||||
@@ -184,32 +189,32 @@ def external_login_or_signup(request,
|
||||
else:
|
||||
auth_backend = 'django.contrib.auth.backends.ModelBackend'
|
||||
user.backend = auth_backend
|
||||
log.info('SHIB: Logging in linked user %s', user.email)
|
||||
AUDIT_LOG.info('Linked user "%s" logged in via Shibboleth', user.email)
|
||||
else:
|
||||
uname = internal_user.username
|
||||
user = authenticate(username=uname, password=eamap.internal_password)
|
||||
if user is None:
|
||||
log.warning("External Auth Login failed for %s / %s",
|
||||
uname, eamap.internal_password)
|
||||
return signup(request, eamap)
|
||||
# we want to log the failure, but don't want to log the password attempted:
|
||||
AUDIT_LOG.warning('External Auth Login failed for "%s"', uname)
|
||||
return _signup(request, eamap)
|
||||
|
||||
if not user.is_active:
|
||||
log.warning("User %s is not active", uname)
|
||||
AUDIT_LOG.warning('User "%s" is not active after external login', uname)
|
||||
# TODO: improve error page
|
||||
msg = 'Account not yet activated: please look for link in your email'
|
||||
return default_render_failure(request, msg)
|
||||
|
||||
login(request, user)
|
||||
request.session.set_expiry(0)
|
||||
|
||||
# Now to try enrollment
|
||||
# Need to special case Shibboleth here because it logs in via a GET.
|
||||
# testing request.method for extra paranoia
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and 'shib:' in external_domain and request.method == 'GET':
|
||||
enroll_request = make_shib_enrollment_request(request)
|
||||
if uses_shibboleth and request.method == 'GET':
|
||||
enroll_request = _make_shib_enrollment_request(request)
|
||||
student_views.try_change_enrollment(enroll_request)
|
||||
else:
|
||||
student_views.try_change_enrollment(request)
|
||||
log.info("Login success - %s (%s)", user.username, user.email)
|
||||
AUDIT_LOG.info("Login success - %s (%s)", user.username, user.email)
|
||||
if retfun is None:
|
||||
return redirect('/')
|
||||
return retfun()
|
||||
@@ -217,20 +222,16 @@ def external_login_or_signup(request,
|
||||
|
||||
@ensure_csrf_cookie
|
||||
@cache_if_anonymous
|
||||
def signup(request, eamap=None):
|
||||
def _signup(request, eamap):
|
||||
"""
|
||||
Present form to complete for signup via external authentication.
|
||||
Even though the user has external credentials, he/she still needs
|
||||
to create an account on the edX system, and fill in the user
|
||||
registration form.
|
||||
|
||||
eamap is an ExteralAuthMap object, specifying the external user
|
||||
eamap is an ExternalAuthMap object, specifying the external user
|
||||
for which to complete the signup.
|
||||
"""
|
||||
|
||||
if eamap is None:
|
||||
pass
|
||||
|
||||
# save this for use by student.views.create_account
|
||||
request.session['ExternalAuthMap'] = eamap
|
||||
|
||||
@@ -248,8 +249,9 @@ def signup(request, eamap=None):
|
||||
|
||||
# Some openEdX instances can't have terms of service for shib users, like
|
||||
# according to Stanford's Office of General Counsel
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and settings.MITX_FEATURES.get('SHIB_DISABLE_TOS') and \
|
||||
('shib' in eamap.external_domain):
|
||||
uses_shibboleth = (settings.MITX_FEATURES.get('AUTH_USE_SHIB') and
|
||||
eamap.external_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX))
|
||||
if uses_shibboleth and settings.MITX_FEATURES.get('SHIB_DISABLE_TOS'):
|
||||
context['ask_for_tos'] = False
|
||||
|
||||
# detect if full name is blank and ask for it from user
|
||||
@@ -272,19 +274,19 @@ def signup(request, eamap=None):
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def ssl_dn_extract_info(dn):
|
||||
def _ssl_dn_extract_info(dn_string):
|
||||
"""
|
||||
Extract username, email address (may be anyuser@anydomain.com) and
|
||||
full name from the SSL DN string. Return (user,email,fullname) if
|
||||
successful, and None otherwise.
|
||||
"""
|
||||
ss = re.search('/emailAddress=(.*)@([^/]+)', dn)
|
||||
ss = re.search('/emailAddress=(.*)@([^/]+)', dn_string)
|
||||
if ss:
|
||||
user = ss.group(1)
|
||||
email = "%s@%s" % (user, ss.group(2))
|
||||
else:
|
||||
return None
|
||||
ss = re.search('/CN=([^/]+)/', dn)
|
||||
ss = re.search('/CN=([^/]+)/', dn_string)
|
||||
if ss:
|
||||
fullname = ss.group(1)
|
||||
else:
|
||||
@@ -292,7 +294,7 @@ def ssl_dn_extract_info(dn):
|
||||
return (user, email, fullname)
|
||||
|
||||
|
||||
def ssl_get_cert_from_request(request):
|
||||
def _ssl_get_cert_from_request(request):
|
||||
"""
|
||||
Extract user information from certificate, if it exists, returning (user, email, fullname).
|
||||
Else return None.
|
||||
@@ -311,9 +313,6 @@ def ssl_get_cert_from_request(request):
|
||||
|
||||
return cert
|
||||
|
||||
(user, email, fullname) = ssl_dn_extract_info(cert)
|
||||
return (user, email, fullname)
|
||||
|
||||
|
||||
def ssl_login_shortcut(fn):
|
||||
"""
|
||||
@@ -324,24 +323,26 @@ def ssl_login_shortcut(fn):
|
||||
if not settings.MITX_FEATURES['AUTH_USE_MIT_CERTIFICATES']:
|
||||
return fn(*args, **kwargs)
|
||||
request = args[0]
|
||||
cert = ssl_get_cert_from_request(request)
|
||||
cert = _ssl_get_cert_from_request(request)
|
||||
if not cert: # no certificate information - show normal login window
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
(user, email, fullname) = ssl_dn_extract_info(cert)
|
||||
return external_login_or_signup(request,
|
||||
external_id=email,
|
||||
external_domain="ssl:MIT",
|
||||
credentials=cert,
|
||||
email=email,
|
||||
fullname=fullname)
|
||||
(_user, email, fullname) = _ssl_dn_extract_info(cert)
|
||||
return _external_login_or_signup(
|
||||
request,
|
||||
external_id=email,
|
||||
external_domain="ssl:MIT",
|
||||
credentials=cert,
|
||||
email=email,
|
||||
fullname=fullname
|
||||
)
|
||||
return wrapped
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
def ssl_login(request):
|
||||
"""
|
||||
This is called by student.views.index when
|
||||
This is called by branding.views.index when
|
||||
MITX_FEATURES['AUTH_USE_MIT_CERTIFICATES'] = True
|
||||
|
||||
Used for MIT user authentication. This presumes the web server
|
||||
@@ -355,22 +356,28 @@ def ssl_login(request):
|
||||
|
||||
Else continues on with student.views.index, and no authentication.
|
||||
"""
|
||||
cert = ssl_get_cert_from_request(request)
|
||||
# Just to make sure we're calling this only at MIT:
|
||||
if not settings.MITX_FEATURES['AUTH_USE_MIT_CERTIFICATES']:
|
||||
return HttpResponseForbidden()
|
||||
|
||||
cert = _ssl_get_cert_from_request(request)
|
||||
|
||||
if not cert:
|
||||
# no certificate information - go onward to main index
|
||||
return student_views.index(request)
|
||||
|
||||
(user, email, fullname) = ssl_dn_extract_info(cert)
|
||||
(_user, email, fullname) = _ssl_dn_extract_info(cert)
|
||||
|
||||
retfun = functools.partial(student_views.index, request)
|
||||
return external_login_or_signup(request,
|
||||
external_id=email,
|
||||
external_domain="ssl:MIT",
|
||||
credentials=cert,
|
||||
email=email,
|
||||
fullname=fullname,
|
||||
retfun=retfun)
|
||||
return _external_login_or_signup(
|
||||
request,
|
||||
external_id=email,
|
||||
external_domain="ssl:MIT",
|
||||
credentials=cert,
|
||||
email=email,
|
||||
fullname=fullname,
|
||||
retfun=retfun
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -396,28 +403,30 @@ def shib_login(request):
|
||||
log.error("SHIB: no Shib-Identity-Provider in request.META")
|
||||
return default_render_failure(request, shib_error_msg)
|
||||
else:
|
||||
#if we get here, the user has authenticated properly
|
||||
# If we get here, the user has authenticated properly
|
||||
shib = {attr: request.META.get(attr, '')
|
||||
for attr in ['REMOTE_USER', 'givenName', 'sn', 'mail', 'Shib-Identity-Provider']}
|
||||
|
||||
#Clean up first name, last name, and email address
|
||||
#TODO: Make this less hardcoded re: format, but split will work
|
||||
#even if ";" is not present since we are accessing 1st element
|
||||
# Clean up first name, last name, and email address
|
||||
# TODO: Make this less hardcoded re: format, but split will work
|
||||
# even if ";" is not present, since we are accessing 1st element
|
||||
shib['sn'] = shib['sn'].split(";")[0].strip().capitalize().decode('utf-8')
|
||||
shib['givenName'] = shib['givenName'].split(";")[0].strip().capitalize().decode('utf-8')
|
||||
|
||||
# TODO: should we be logging creds here, at info level?
|
||||
log.info("SHIB creds returned: %r", shib)
|
||||
|
||||
return external_login_or_signup(request,
|
||||
external_id=shib['REMOTE_USER'],
|
||||
external_domain="shib:" + shib['Shib-Identity-Provider'],
|
||||
credentials=shib,
|
||||
email=shib['mail'],
|
||||
fullname=u'%s %s' % (shib['givenName'], shib['sn']),
|
||||
)
|
||||
return _external_login_or_signup(
|
||||
request,
|
||||
external_id=shib['REMOTE_USER'],
|
||||
external_domain=SHIBBOLETH_DOMAIN_PREFIX + shib['Shib-Identity-Provider'],
|
||||
credentials=shib,
|
||||
email=shib['mail'],
|
||||
fullname=u'%s %s' % (shib['givenName'], shib['sn']),
|
||||
)
|
||||
|
||||
|
||||
def make_shib_enrollment_request(request):
|
||||
def _make_shib_enrollment_request(request):
|
||||
"""
|
||||
Need this hack function because shibboleth logins don't happen over POST
|
||||
but change_enrollment expects its request to be a POST, with
|
||||
@@ -452,14 +461,14 @@ def course_specific_login(request, course_id):
|
||||
try:
|
||||
course = course_from_id(course_id)
|
||||
except ItemNotFoundError:
|
||||
#couldn't find the course, will just return vanilla signin page
|
||||
# couldn't find the course, will just return vanilla signin page
|
||||
return redirect_with_querystring('signin_user', query_string)
|
||||
|
||||
#now the dispatching conditionals. Only shib for now
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and 'shib:' in course.enrollment_domain:
|
||||
# now the dispatching conditionals. Only shib for now
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and course.enrollment_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX):
|
||||
return redirect_with_querystring('shib-login', query_string)
|
||||
|
||||
#Default fallthrough to normal signin page
|
||||
# Default fallthrough to normal signin page
|
||||
return redirect_with_querystring('signin_user', query_string)
|
||||
|
||||
|
||||
@@ -473,15 +482,15 @@ def course_specific_register(request, course_id):
|
||||
try:
|
||||
course = course_from_id(course_id)
|
||||
except ItemNotFoundError:
|
||||
#couldn't find the course, will just return vanilla registration page
|
||||
# couldn't find the course, will just return vanilla registration page
|
||||
return redirect_with_querystring('register_user', query_string)
|
||||
|
||||
#now the dispatching conditionals. Only shib for now
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and 'shib:' in course.enrollment_domain:
|
||||
#shib-login takes care of both registration and login flows
|
||||
# now the dispatching conditionals. Only shib for now
|
||||
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and course.enrollment_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX):
|
||||
# shib-login takes care of both registration and login flows
|
||||
return redirect_with_querystring('shib-login', query_string)
|
||||
|
||||
#Default fallthrough to normal registration page
|
||||
# Default fallthrough to normal registration page
|
||||
return redirect_with_querystring('register_user', query_string)
|
||||
|
||||
|
||||
@@ -702,7 +711,7 @@ def provider_login(request):
|
||||
except User.DoesNotExist:
|
||||
request.session['openid_error'] = True
|
||||
msg = "OpenID login failed - Unknown user email: %s"
|
||||
log.warning(msg, email)
|
||||
AUDIT_LOG.warning(msg, email)
|
||||
return HttpResponseRedirect(openid_request_url)
|
||||
|
||||
# attempt to authenticate user (but not actually log them in...)
|
||||
@@ -713,7 +722,7 @@ def provider_login(request):
|
||||
if user is None:
|
||||
request.session['openid_error'] = True
|
||||
msg = "OpenID login failed - password for %s is invalid"
|
||||
log.warning(msg, email)
|
||||
AUDIT_LOG.warning(msg, email)
|
||||
return HttpResponseRedirect(openid_request_url)
|
||||
|
||||
# authentication succeeded, so fetch user information
|
||||
@@ -723,8 +732,8 @@ def provider_login(request):
|
||||
if 'openid_error' in request.session:
|
||||
del request.session['openid_error']
|
||||
|
||||
log.info("OpenID login success - %s (%s)",
|
||||
user.username, user.email)
|
||||
AUDIT_LOG.info("OpenID login success - %s (%s)",
|
||||
user.username, user.email)
|
||||
|
||||
# redirect user to return_to location
|
||||
url = endpoint + urlquote(user.username)
|
||||
@@ -755,7 +764,7 @@ def provider_login(request):
|
||||
# the account is not active, so redirect back to the login page:
|
||||
request.session['openid_error'] = True
|
||||
msg = "Login failed - Account not active for user %s"
|
||||
log.warning(msg, username)
|
||||
AUDIT_LOG.warning(msg, username)
|
||||
return HttpResponseRedirect(openid_request_url)
|
||||
|
||||
# determine consumer domain if applicable
|
||||
@@ -856,9 +865,11 @@ def test_center_login(request):
|
||||
try:
|
||||
testcenteruser = TestCenterUser.objects.get(client_candidate_id=client_candidate_id)
|
||||
except TestCenterUser.DoesNotExist:
|
||||
log.error("not able to find demographics for cand ID {}".format(client_candidate_id))
|
||||
AUDIT_LOG.error("not able to find demographics for cand ID {}".format(client_candidate_id))
|
||||
return HttpResponseRedirect(makeErrorURL(error_url, "invalidClientCandidateID"))
|
||||
|
||||
AUDIT_LOG.info("Attempting to log in test-center user '{}' for test of cand {}".format(testcenteruser.user.username, client_candidate_id))
|
||||
|
||||
# find testcenter_registration that matches the provided exam code:
|
||||
# Note that we could rely in future on either the registrationId or the exam code,
|
||||
# or possibly both. But for now we know what to do with an ExamSeriesCode,
|
||||
@@ -867,13 +878,13 @@ def test_center_login(request):
|
||||
# we are not allowed to make up a new error code, according to Pearson,
|
||||
# so instead of "missingExamSeriesCode", we use a valid one that is
|
||||
# inaccurate but at least distinct. (Sigh.)
|
||||
log.error("missing exam series code for cand ID {}".format(client_candidate_id))
|
||||
AUDIT_LOG.error("missing exam series code for cand ID {}".format(client_candidate_id))
|
||||
return HttpResponseRedirect(makeErrorURL(error_url, "missingPartnerID"))
|
||||
exam_series_code = request.POST.get('vueExamSeriesCode')
|
||||
|
||||
registrations = TestCenterRegistration.objects.filter(testcenter_user=testcenteruser, exam_series_code=exam_series_code)
|
||||
if not registrations:
|
||||
log.error("not able to find exam registration for exam {} and cand ID {}".format(exam_series_code, client_candidate_id))
|
||||
AUDIT_LOG.error("not able to find exam registration for exam {} and cand ID {}".format(exam_series_code, client_candidate_id))
|
||||
return HttpResponseRedirect(makeErrorURL(error_url, "noTestsAssigned"))
|
||||
|
||||
# TODO: figure out what to do if there are more than one registrations....
|
||||
@@ -883,14 +894,14 @@ def test_center_login(request):
|
||||
course_id = registration.course_id
|
||||
course = course_from_id(course_id) # assume it will be found....
|
||||
if not course:
|
||||
log.error("not able to find course from ID {} for cand ID {}".format(course_id, client_candidate_id))
|
||||
AUDIT_LOG.error("not able to find course from ID {} for cand ID {}".format(course_id, client_candidate_id))
|
||||
return HttpResponseRedirect(makeErrorURL(error_url, "incorrectCandidateTests"))
|
||||
exam = course.get_test_center_exam(exam_series_code)
|
||||
if not exam:
|
||||
log.error("not able to find exam {} for course ID {} and cand ID {}".format(exam_series_code, course_id, client_candidate_id))
|
||||
AUDIT_LOG.error("not able to find exam {} for course ID {} and cand ID {}".format(exam_series_code, course_id, client_candidate_id))
|
||||
return HttpResponseRedirect(makeErrorURL(error_url, "incorrectCandidateTests"))
|
||||
location = exam.exam_url
|
||||
log.info("proceeding with test of cand {} on exam {} for course {}: URL = {}".format(client_candidate_id, exam_series_code, course_id, location))
|
||||
log.info("Proceeding with test of cand {} on exam {} for course {}: URL = {}".format(client_candidate_id, exam_series_code, course_id, location))
|
||||
|
||||
# check if the test has already been taken
|
||||
timelimit_descriptor = modulestore().get_instance(course_id, Location(location))
|
||||
@@ -907,7 +918,7 @@ def test_center_login(request):
|
||||
return HttpResponseRedirect(makeErrorURL(error_url, "missingClientProgram"))
|
||||
|
||||
if timelimit_module and timelimit_module.has_ended:
|
||||
log.warning("cand {} on exam {} for course {}: test already over at {}".format(client_candidate_id, exam_series_code, course_id, timelimit_module.ending_at))
|
||||
AUDIT_LOG.warning("cand {} on exam {} for course {}: test already over at {}".format(client_candidate_id, exam_series_code, course_id, timelimit_module.ending_at))
|
||||
return HttpResponseRedirect(makeErrorURL(error_url, "allTestsTaken"))
|
||||
|
||||
# check if we need to provide an accommodation:
|
||||
@@ -922,7 +933,7 @@ def test_center_login(request):
|
||||
|
||||
if time_accommodation_code:
|
||||
timelimit_module.accommodation_code = time_accommodation_code
|
||||
log.info("cand {} on exam {} for course {}: receiving accommodation {}".format(client_candidate_id, exam_series_code, course_id, time_accommodation_code))
|
||||
AUDIT_LOG.info("cand {} on exam {} for course {}: receiving accommodation {}".format(client_candidate_id, exam_series_code, course_id, time_accommodation_code))
|
||||
|
||||
# UGLY HACK!!!
|
||||
# Login assumes that authentication has occurred, and that there is a
|
||||
@@ -936,6 +947,7 @@ def test_center_login(request):
|
||||
# testcenteruser.user.backend = "%s.%s" % (backend.__module__, backend.__class__.__name__)
|
||||
testcenteruser.user.backend = "%s.%s" % ("TestcenterAuthenticationModule", "TestcenterAuthenticationClass")
|
||||
login(request, testcenteruser.user)
|
||||
AUDIT_LOG.info("Logged in user '{}' for test of cand {} on exam {} for course {}: URL = {}".format(testcenteruser.user.username, client_candidate_id, exam_series_code, course_id, location))
|
||||
|
||||
# And start the test:
|
||||
return jump_to(request, course_id, location)
|
||||
|
||||
@@ -20,6 +20,7 @@ from random import randint
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.auth.signals import user_logged_in, user_logged_out
|
||||
from django.db import models
|
||||
from django.db.models.signals import post_save
|
||||
from django.dispatch import receiver
|
||||
@@ -30,6 +31,7 @@ from pytz import UTC
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
AUDIT_LOG = logging.getLogger("audit")
|
||||
|
||||
|
||||
class UserProfile(models.Model):
|
||||
@@ -779,3 +781,20 @@ def update_user_information(sender, instance, created, **kwargs):
|
||||
log = logging.getLogger("mitx.discussion")
|
||||
log.error(unicode(e))
|
||||
log.error("update user info to discussion failed for user with id: " + str(instance.id))
|
||||
|
||||
# Define login and logout handlers here in the models file, instead of the views file,
|
||||
# so that they are more likely to be loaded when a Studio user brings up the Studio admin
|
||||
# page to login. These are currently the only signals available, so we need to continue
|
||||
# identifying and logging failures separately (in views).
|
||||
|
||||
|
||||
@receiver(user_logged_in)
|
||||
def log_successful_login(sender, request, user, **kwargs):
|
||||
"""Handler to log when logins have occurred successfully."""
|
||||
AUDIT_LOG.info(u"Login success - {0} ({1})".format(user.username, user.email))
|
||||
|
||||
|
||||
@receiver(user_logged_out)
|
||||
def log_successful_logout(sender, request, user, **kwargs):
|
||||
"""Handler to log when logouts have occurred successfully."""
|
||||
AUDIT_LOG.info(u"Logout - {0}".format(request.user))
|
||||
|
||||
143
common/djangoapps/student/tests/test_login.py
Normal file
143
common/djangoapps/student/tests/test_login.py
Normal file
@@ -0,0 +1,143 @@
|
||||
'''
|
||||
Tests for student activation and login
|
||||
'''
|
||||
import json
|
||||
from mock import patch
|
||||
|
||||
from django.test import TestCase
|
||||
from django.test.client import Client
|
||||
from django.core.urlresolvers import reverse, NoReverseMatch
|
||||
from student.tests.factories import UserFactory, RegistrationFactory, UserProfileFactory
|
||||
|
||||
|
||||
class LoginTest(TestCase):
|
||||
'''
|
||||
Test student.views.login_user() view
|
||||
'''
|
||||
|
||||
def setUp(self):
|
||||
# Create one user and save it to the database
|
||||
self.user = UserFactory.build(username='test', email='test@edx.org')
|
||||
self.user.set_password('test_password')
|
||||
self.user.save()
|
||||
|
||||
# Create a registration for the user
|
||||
RegistrationFactory(user=self.user)
|
||||
|
||||
# Create a profile for the user
|
||||
UserProfileFactory(user=self.user)
|
||||
|
||||
# Create the test client
|
||||
self.client = Client()
|
||||
|
||||
# Store the login url
|
||||
try:
|
||||
self.url = reverse('login_post')
|
||||
except NoReverseMatch:
|
||||
self.url = reverse('login')
|
||||
|
||||
def test_login_success(self):
|
||||
response, mock_audit_log = self._login_response('test@edx.org', 'test_password', patched_audit_log='student.models.AUDIT_LOG')
|
||||
self._assert_response(response, success=True)
|
||||
self._assert_audit_log(mock_audit_log, 'info', [u'Login success', u'test@edx.org'])
|
||||
|
||||
def test_login_success_unicode_email(self):
|
||||
unicode_email = u'test' + unichr(40960) + u'@edx.org'
|
||||
self.user.email = unicode_email
|
||||
self.user.save()
|
||||
|
||||
response, mock_audit_log = self._login_response(unicode_email, 'test_password', patched_audit_log='student.models.AUDIT_LOG')
|
||||
self._assert_response(response, success=True)
|
||||
self._assert_audit_log(mock_audit_log, 'info', [u'Login success', unicode_email])
|
||||
|
||||
def test_login_fail_no_user_exists(self):
|
||||
nonexistent_email = u'not_a_user@edx.org'
|
||||
response, mock_audit_log = self._login_response(nonexistent_email, 'test_password')
|
||||
self._assert_response(response, success=False,
|
||||
value='Email or password is incorrect')
|
||||
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Unknown user email', nonexistent_email])
|
||||
|
||||
def test_login_fail_wrong_password(self):
|
||||
response, mock_audit_log = self._login_response('test@edx.org', 'wrong_password')
|
||||
self._assert_response(response, success=False,
|
||||
value='Email or password is incorrect')
|
||||
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'password for', u'test@edx.org', u'invalid'])
|
||||
|
||||
def test_login_not_activated(self):
|
||||
# De-activate the user
|
||||
self.user.is_active = False
|
||||
self.user.save()
|
||||
|
||||
# Should now be unable to login
|
||||
response, mock_audit_log = self._login_response('test@edx.org', 'test_password')
|
||||
self._assert_response(response, success=False,
|
||||
value="This account has not been activated")
|
||||
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Account not active for user'])
|
||||
|
||||
def test_login_unicode_email(self):
|
||||
unicode_email = u'test@edx.org' + unichr(40960)
|
||||
response, mock_audit_log = self._login_response(unicode_email, 'test_password')
|
||||
self._assert_response(response, success=False)
|
||||
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', unicode_email])
|
||||
|
||||
def test_login_unicode_password(self):
|
||||
unicode_password = u'test_password' + unichr(1972)
|
||||
response, mock_audit_log = self._login_response('test@edx.org', unicode_password)
|
||||
self._assert_response(response, success=False)
|
||||
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'password for', u'test@edx.org', u'invalid'])
|
||||
|
||||
def test_logout_logging(self):
|
||||
response, _ = self._login_response('test@edx.org', 'test_password')
|
||||
self._assert_response(response, success=True)
|
||||
logout_url = reverse('logout')
|
||||
with patch('student.models.AUDIT_LOG') as mock_audit_log:
|
||||
response = self.client.post(logout_url)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self._assert_audit_log(mock_audit_log, 'info', [u'Logout', u'test'])
|
||||
|
||||
def _login_response(self, email, password, patched_audit_log='student.views.AUDIT_LOG'):
|
||||
''' Post the login info '''
|
||||
post_params = {'email': email, 'password': password}
|
||||
with patch(patched_audit_log) as mock_audit_log:
|
||||
result = self.client.post(self.url, post_params)
|
||||
return result, mock_audit_log
|
||||
|
||||
def _assert_response(self, response, success=None, value=None):
|
||||
'''
|
||||
Assert that the response had status 200 and returned a valid
|
||||
JSON-parseable dict.
|
||||
|
||||
If success is provided, assert that the response had that
|
||||
value for 'success' in the JSON dict.
|
||||
|
||||
If value is provided, assert that the response contained that
|
||||
value for 'value' in the JSON dict.
|
||||
'''
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
try:
|
||||
response_dict = json.loads(response.content)
|
||||
except ValueError:
|
||||
self.fail("Could not parse response content as JSON: %s"
|
||||
% str(response.content))
|
||||
|
||||
if success is not None:
|
||||
self.assertEqual(response_dict['success'], success)
|
||||
|
||||
if value is not None:
|
||||
msg = ("'%s' did not contain '%s'" %
|
||||
(str(response_dict['value']), str(value)))
|
||||
self.assertTrue(value in response_dict['value'], msg)
|
||||
|
||||
def _assert_audit_log(self, mock_audit_log, level, log_strings):
|
||||
"""
|
||||
Check that the audit log has received the expected call.
|
||||
"""
|
||||
method_calls = mock_audit_log.method_calls
|
||||
self.assertEquals(len(method_calls), 1)
|
||||
name, args, _kwargs = method_calls[0]
|
||||
self.assertEquals(name, level)
|
||||
self.assertEquals(len(args), 1)
|
||||
format_string = args[0]
|
||||
for log_string in log_strings:
|
||||
self.assertIn(log_string, format_string)
|
||||
@@ -56,6 +56,8 @@ from statsd import statsd
|
||||
from pytz import UTC
|
||||
|
||||
log = logging.getLogger("mitx.student")
|
||||
AUDIT_LOG = logging.getLogger("audit")
|
||||
|
||||
Article = namedtuple('Article', 'title url author image deck publication publish_date')
|
||||
|
||||
|
||||
@@ -107,8 +109,7 @@ day_pattern = re.compile(r'\s\d+,\s')
|
||||
multimonth_pattern = re.compile(r'\s?\-\s?\S+\s')
|
||||
|
||||
|
||||
def get_date_for_press(publish_date):
|
||||
import datetime
|
||||
def _get_date_for_press(publish_date):
|
||||
# strip off extra months, and just use the first:
|
||||
date = re.sub(multimonth_pattern, ", ", publish_date)
|
||||
if re.search(day_pattern, date):
|
||||
@@ -129,7 +130,7 @@ def press(request):
|
||||
json_articles = json.loads(content)
|
||||
cache.set("student_press_json_articles", json_articles)
|
||||
articles = [Article(**article) for article in json_articles]
|
||||
articles.sort(key=lambda item: get_date_for_press(item.publish_date), reverse=True)
|
||||
articles.sort(key=lambda item: _get_date_for_press(item.publish_date), reverse=True)
|
||||
return render_to_response('static_templates/press.html', {'articles': articles})
|
||||
|
||||
|
||||
@@ -233,7 +234,7 @@ def signin_user(request):
|
||||
|
||||
|
||||
@ensure_csrf_cookie
|
||||
def register_user(request, extra_context={}):
|
||||
def register_user(request, extra_context=None):
|
||||
"""
|
||||
This view will display the non-modal registration form
|
||||
"""
|
||||
@@ -244,7 +245,8 @@ def register_user(request, extra_context={}):
|
||||
'course_id': request.GET.get('course_id'),
|
||||
'enrollment_action': request.GET.get('enrollment_action')
|
||||
}
|
||||
context.update(extra_context)
|
||||
if extra_context is not None:
|
||||
context.update(extra_context)
|
||||
|
||||
return render_to_response('register.html', context)
|
||||
|
||||
@@ -381,7 +383,7 @@ def change_enrollment(request):
|
||||
"run:{0}".format(run)])
|
||||
|
||||
try:
|
||||
enrollment, created = CourseEnrollment.objects.get_or_create(user=user, course_id=course.id)
|
||||
enrollment, _created = CourseEnrollment.objects.get_or_create(user=user, course_id=course.id)
|
||||
except IntegrityError:
|
||||
# If we've already created this enrollment in a separate transaction,
|
||||
# then just continue
|
||||
@@ -425,19 +427,21 @@ def login_user(request, error=""):
|
||||
try:
|
||||
user = User.objects.get(email=email)
|
||||
except User.DoesNotExist:
|
||||
log.warning(u"Login failed - Unknown user email: {0}".format(email))
|
||||
AUDIT_LOG.warning(u"Login failed - Unknown user email: {0}".format(email))
|
||||
return HttpResponse(json.dumps({'success': False,
|
||||
'value': _('Email or password is incorrect.')})) # TODO: User error message
|
||||
|
||||
username = user.username
|
||||
user = authenticate(username=username, password=password)
|
||||
if user is None:
|
||||
log.warning(u"Login failed - password for {0} is invalid".format(email))
|
||||
AUDIT_LOG.warning(u"Login failed - password for {0} is invalid".format(email))
|
||||
return HttpResponse(json.dumps({'success': False,
|
||||
'value': _('Email or password is incorrect.')}))
|
||||
|
||||
if user is not None and user.is_active:
|
||||
try:
|
||||
# We do not log here, because we have a handler registered
|
||||
# to perform logging on successful logins.
|
||||
login(request, user)
|
||||
if request.POST.get('remember') == 'true':
|
||||
request.session.set_expiry(604800)
|
||||
@@ -445,14 +449,14 @@ def login_user(request, error=""):
|
||||
else:
|
||||
request.session.set_expiry(0)
|
||||
except Exception as e:
|
||||
AUDIT_LOG.critical("Login failed - Could not create session. Is memcached running?")
|
||||
log.critical("Login failed - Could not create session. Is memcached running?")
|
||||
log.exception(e)
|
||||
|
||||
log.info(u"Login success - {0} ({1})".format(username, email))
|
||||
raise
|
||||
|
||||
try_change_enrollment(request)
|
||||
|
||||
statsd.increment(_("common.student.successful_login"))
|
||||
statsd.increment("common.student.successful_login")
|
||||
response = HttpResponse(json.dumps({'success': True}))
|
||||
|
||||
# set the login cookie for the edx marketing site
|
||||
@@ -476,7 +480,7 @@ def login_user(request, error=""):
|
||||
|
||||
return response
|
||||
|
||||
log.warning(u"Login failed - Account not active for user {0}, resending activation".format(username))
|
||||
AUDIT_LOG.warning(u"Login failed - Account not active for user {0}, resending activation".format(username))
|
||||
|
||||
reactivation_email_for_user(user)
|
||||
not_activated_msg = _("This account has not been activated. We have sent another activation message. Please check your e-mail for the activation instructions.")
|
||||
@@ -491,7 +495,8 @@ def logout_user(request):
|
||||
Deletes both the CSRF and sessionid cookies so the marketing
|
||||
site can determine the logged in state of the user
|
||||
'''
|
||||
|
||||
# We do not log here, because we have a handler registered
|
||||
# to perform logging on successful logouts.
|
||||
logout(request)
|
||||
response = redirect('/')
|
||||
response.delete_cookie(settings.EDXMKTG_COOKIE_NAME,
|
||||
@@ -598,7 +603,7 @@ def create_account(request, post_override=None):
|
||||
password = eamap.internal_password
|
||||
post_vars = dict(post_vars.items())
|
||||
post_vars.update(dict(email=email, name=name, password=password))
|
||||
log.info('In create_account with external_auth: post_vars = %s' % post_vars)
|
||||
log.debug(u'In create_account with external_auth: user = %s, email=%s', name, email)
|
||||
|
||||
# Confirm we have a properly formed request
|
||||
for a in ['username', 'email', 'password', 'name']:
|
||||
@@ -631,7 +636,7 @@ def create_account(request, post_override=None):
|
||||
|
||||
required_post_vars = ['username', 'email', 'name', 'password', 'terms_of_service', 'honor_code']
|
||||
if tos_not_required:
|
||||
required_post_vars = ['username', 'email', 'name', 'password', 'honor_code']
|
||||
required_post_vars = ['username', 'email', 'name', 'password', 'honor_code']
|
||||
|
||||
for a in required_post_vars:
|
||||
if len(post_vars[a]) < 2:
|
||||
@@ -684,7 +689,7 @@ def create_account(request, post_override=None):
|
||||
'-' * 80 + '\n\n' + message)
|
||||
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [dest_addr], fail_silently=False)
|
||||
else:
|
||||
res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
|
||||
_res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
|
||||
except:
|
||||
log.warning('Unable to send activation email to user', exc_info=True)
|
||||
js['value'] = _('Could not send activation e-mail.')
|
||||
@@ -697,17 +702,23 @@ def create_account(request, post_override=None):
|
||||
login(request, login_user)
|
||||
request.session.set_expiry(0)
|
||||
|
||||
# TODO: there is no error checking here to see that the user actually logged in successfully,
|
||||
# and is not yet an active user.
|
||||
if login_user is not None:
|
||||
AUDIT_LOG.info(u"Login success on new account creation - {0}".format(login_user.username))
|
||||
|
||||
if DoExternalAuth:
|
||||
eamap.user = login_user
|
||||
eamap.dtsignup = datetime.datetime.now(UTC)
|
||||
eamap.save()
|
||||
log.info("User registered with external_auth %s" % post_vars['username'])
|
||||
log.info('Updated ExternalAuthMap for %s to be %s' % (post_vars['username'], eamap))
|
||||
AUDIT_LOG.info("User registered with external_auth %s", post_vars['username'])
|
||||
AUDIT_LOG.info('Updated ExternalAuthMap for %s to be %s', post_vars['username'], eamap)
|
||||
|
||||
if settings.MITX_FEATURES.get('BYPASS_ACTIVATION_EMAIL_FOR_EXTAUTH'):
|
||||
log.info('bypassing activation email')
|
||||
login_user.is_active = True
|
||||
login_user.save()
|
||||
AUDIT_LOG.info(u"Login activated on extauth account - {0} ({1})".format(login_user.username, login_user.email))
|
||||
|
||||
try_change_enrollment(request)
|
||||
|
||||
@@ -964,14 +975,14 @@ def activate_account(request, key):
|
||||
r[0].activate()
|
||||
already_active = False
|
||||
|
||||
#Enroll student in any pending courses he/she may have if auto_enroll flag is set
|
||||
# Enroll student in any pending courses he/she may have if auto_enroll flag is set
|
||||
student = User.objects.filter(id=r[0].user_id)
|
||||
if student:
|
||||
ceas = CourseEnrollmentAllowed.objects.filter(email=student[0].email)
|
||||
for cea in ceas:
|
||||
if cea.auto_enroll:
|
||||
course_id = cea.course_id
|
||||
enrollment, created = CourseEnrollment.objects.get_or_create(user_id=student[0].id, course_id=course_id)
|
||||
_enrollment, _created = CourseEnrollment.objects.get_or_create(user_id=student[0].id, course_id=course_id)
|
||||
|
||||
resp = render_to_response("registration/activation_complete.html", {'user_logged_in': user_logged_in, 'already_active': already_active})
|
||||
return resp
|
||||
@@ -1003,7 +1014,7 @@ def password_reset_confirm_wrapper(request, uidb36=None, token=None):
|
||||
''' A wrapper around django.contrib.auth.views.password_reset_confirm.
|
||||
Needed because we want to set the user as active at this step.
|
||||
'''
|
||||
#cribbed from django.contrib.auth.views.password_reset_confirm
|
||||
# cribbed from django.contrib.auth.views.password_reset_confirm
|
||||
try:
|
||||
uid_int = base36_to_int(uidb36)
|
||||
user = User.objects.get(id=uid_int)
|
||||
@@ -1029,7 +1040,7 @@ def reactivation_email_for_user(user):
|
||||
message = render_to_string('emails/activation_email.txt', d)
|
||||
|
||||
try:
|
||||
res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
|
||||
_res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
|
||||
except:
|
||||
log.warning('Unable to send reactivation email', exc_info=True)
|
||||
return HttpResponse(json.dumps({'success': False, 'error': _('Unable to send reactivation email')}))
|
||||
@@ -1087,7 +1098,7 @@ def change_email_request(request):
|
||||
subject = ''.join(subject.splitlines())
|
||||
message = render_to_string('emails/email_change.txt', d)
|
||||
|
||||
res = send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [pec.new_email])
|
||||
_res = send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [pec.new_email])
|
||||
|
||||
return HttpResponse(json.dumps({'success': True}))
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ def index(request):
|
||||
from external_auth.views import ssl_login
|
||||
return ssl_login(request)
|
||||
if settings.MITX_FEATURES.get('ENABLE_MKTG_SITE'):
|
||||
return redirect(settings.MKTG_URLS.get('ROOT'))
|
||||
return redirect(settings.MKTG_URLS.get('ROOT'))
|
||||
|
||||
university = branding.get_university(request.META.get('HTTP_HOST'))
|
||||
if university is None:
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
'''
|
||||
Tests for student activation and login
|
||||
'''
|
||||
from django.test import TestCase
|
||||
from django.test.client import Client
|
||||
from django.core.urlresolvers import reverse
|
||||
from courseware.tests.factories import UserFactory, RegistrationFactory, UserProfileFactory
|
||||
import json
|
||||
|
||||
|
||||
class LoginTest(TestCase):
|
||||
'''
|
||||
Test student.views.login_user() view
|
||||
'''
|
||||
|
||||
def setUp(self):
|
||||
# Create one user and save it to the database
|
||||
self.user = UserFactory.build(username='test', email='test@edx.org')
|
||||
self.user.set_password('test_password')
|
||||
self.user.save()
|
||||
|
||||
# Create a registration for the user
|
||||
RegistrationFactory(user=self.user)
|
||||
|
||||
# Create a profile for the user
|
||||
UserProfileFactory(user=self.user)
|
||||
|
||||
# Create the test client
|
||||
self.client = Client()
|
||||
|
||||
# Store the login url
|
||||
self.url = reverse('login')
|
||||
|
||||
def test_login_success(self):
|
||||
response = self._login_response('test@edx.org', 'test_password')
|
||||
self._assert_response(response, success=True)
|
||||
|
||||
def test_login_success_unicode_email(self):
|
||||
unicode_email = u'test@edx.org' + unichr(40960)
|
||||
|
||||
self.user.email = unicode_email
|
||||
self.user.save()
|
||||
|
||||
response = self._login_response(unicode_email, 'test_password')
|
||||
self._assert_response(response, success=True)
|
||||
|
||||
def test_login_fail_no_user_exists(self):
|
||||
response = self._login_response('not_a_user@edx.org', 'test_password')
|
||||
self._assert_response(response, success=False,
|
||||
value='Email or password is incorrect')
|
||||
|
||||
def test_login_fail_wrong_password(self):
|
||||
response = self._login_response('test@edx.org', 'wrong_password')
|
||||
self._assert_response(response, success=False,
|
||||
value='Email or password is incorrect')
|
||||
|
||||
def test_login_not_activated(self):
|
||||
# De-activate the user
|
||||
self.user.is_active = False
|
||||
self.user.save()
|
||||
|
||||
# Should now be unable to login
|
||||
response = self._login_response('test@edx.org', 'test_password')
|
||||
self._assert_response(response, success=False,
|
||||
value="This account has not been activated")
|
||||
|
||||
def test_login_unicode_email(self):
|
||||
unicode_email = u'test@edx.org' + unichr(40960)
|
||||
response = self._login_response(unicode_email, 'test_password')
|
||||
self._assert_response(response, success=False)
|
||||
|
||||
def test_login_unicode_password(self):
|
||||
unicode_password = u'test_password' + unichr(1972)
|
||||
response = self._login_response('test@edx.org', unicode_password)
|
||||
self._assert_response(response, success=False)
|
||||
|
||||
def _login_response(self, email, password):
|
||||
''' Post the login info '''
|
||||
post_params = {'email': email, 'password': password}
|
||||
return self.client.post(self.url, post_params)
|
||||
|
||||
def _assert_response(self, response, success=None, value=None):
|
||||
'''
|
||||
Assert that the response had status 200 and returned a valid
|
||||
JSON-parseable dict.
|
||||
|
||||
If success is provided, assert that the response had that
|
||||
value for 'success' in the JSON dict.
|
||||
|
||||
If value is provided, assert that the response contained that
|
||||
value for 'value' in the JSON dict.
|
||||
'''
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
try:
|
||||
response_dict = json.loads(response.content)
|
||||
except ValueError:
|
||||
self.fail("Could not parse response content as JSON: %s"
|
||||
% str(response.content))
|
||||
|
||||
if success is not None:
|
||||
self.assertEqual(response_dict['success'], success)
|
||||
|
||||
if value is not None:
|
||||
msg = ("'%s' did not contain '%s'" %
|
||||
(str(response_dict['value']), str(value)))
|
||||
self.assertTrue(value in response_dict['value'], msg)
|
||||
Reference in New Issue
Block a user