Remove oauth2_handler djangoapp.
This commit is contained in:
@@ -1,4 +0,0 @@
|
||||
""" Handlers for OpenID Connect provider. """
|
||||
|
||||
|
||||
from oauth2_handler.handlers import IDTokenHandler, UserInfoHandler
|
||||
@@ -1,241 +0,0 @@
|
||||
""" Handlers for OpenID Connect provider. """
|
||||
|
||||
|
||||
import six
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
|
||||
from lms.djangoapps.courseware.access import has_access
|
||||
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
|
||||
from openedx.core.djangoapps.lang_pref import LANGUAGE_KEY
|
||||
from openedx.core.djangoapps.user_api.models import UserPreference
|
||||
from student.models import UserProfile, anonymous_id_for_user
|
||||
from student.roles import CourseInstructorRole, CourseStaffRole, GlobalStaff
|
||||
|
||||
|
||||
class OpenIDHandler(object):
|
||||
""" Basic OpenID Connect scope handler. """
|
||||
|
||||
def scope_openid(self, _data):
|
||||
""" Only override the sub (subject) claim. """
|
||||
return ['sub']
|
||||
|
||||
def claim_sub(self, data):
|
||||
"""
|
||||
Return the value of the sub (subject) claim. The value should be
|
||||
unique for each user.
|
||||
|
||||
"""
|
||||
|
||||
# Use the anonymous ID without any course as unique identifier.
|
||||
# Note that this ID is derived using the value of the `SECRET_KEY`
|
||||
# setting, this means that users will have different sub
|
||||
# values for different deployments.
|
||||
value = anonymous_id_for_user(data['user'], None)
|
||||
return value
|
||||
|
||||
|
||||
class PermissionsHandler(object):
|
||||
""" Permissions scope handler """
|
||||
|
||||
def scope_permissions(self, _data):
|
||||
return ['administrator']
|
||||
|
||||
def claim_administrator(self, data):
|
||||
"""
|
||||
Return boolean indicating user's administrator status.
|
||||
|
||||
For our purposes an administrator is any user with is_staff set to True.
|
||||
"""
|
||||
return data['user'].is_staff
|
||||
|
||||
|
||||
class ProfileHandler(object):
|
||||
""" Basic OpenID Connect `profile` scope handler with `locale` claim. """
|
||||
|
||||
def scope_profile(self, _data):
|
||||
""" Add specialized claims. """
|
||||
return ['name', 'locale', 'user_tracking_id']
|
||||
|
||||
def claim_name(self, data):
|
||||
""" User displayable full name. """
|
||||
user = data['user']
|
||||
profile = UserProfile.objects.get(user=user)
|
||||
return profile.name
|
||||
|
||||
def claim_locale(self, data):
|
||||
"""
|
||||
Return the locale for the users based on their preferences.
|
||||
Does not return a value if the users have not set their locale preferences.
|
||||
"""
|
||||
|
||||
# Calling UserPreference directly because it is not clear which user made the request.
|
||||
language = UserPreference.get_value(data['user'], LANGUAGE_KEY)
|
||||
|
||||
# If the user has no language specified, return the default one.
|
||||
if not language:
|
||||
language = settings.LANGUAGE_CODE
|
||||
|
||||
return language
|
||||
|
||||
def claim_user_tracking_id(self, data):
|
||||
""" User tracking ID. """
|
||||
return data['user'].id
|
||||
|
||||
|
||||
class CourseAccessHandler(object):
|
||||
"""
|
||||
Defines two new scopes: `course_instructor` and `course_staff`. Each one is
|
||||
valid only if the user is instructor or staff of at least one course.
|
||||
|
||||
Each new scope has a corresponding claim: `instructor_courses` and
|
||||
`staff_courses` that lists the course_ids for which the user has instructor
|
||||
or staff privileges.
|
||||
|
||||
The claims support claim request values: if there is no claim request, the
|
||||
value of the claim is the list all the courses for which the user has the
|
||||
corresponding privileges. If a claim request is used, then the value of the
|
||||
claim the list of courses from the requested values that have the
|
||||
corresponding privileges.
|
||||
|
||||
For example, if the user is staff of course_a and course_b but not
|
||||
course_c, the claim corresponding to the scope request:
|
||||
|
||||
scope = openid course_staff
|
||||
|
||||
has the value:
|
||||
|
||||
{staff_courses: [course_a, course_b] }
|
||||
|
||||
For the claim request:
|
||||
|
||||
claims = {userinfo: {staff_courses: {values=[course_b, course_d]}}}
|
||||
|
||||
the corresponding claim will have the value:
|
||||
|
||||
{staff_courses: [course_b] }.
|
||||
|
||||
This is useful to quickly determine if a user has the right privileges for a
|
||||
given course.
|
||||
|
||||
For a description of the function naming and arguments, see:
|
||||
|
||||
`edx_oauth2_provider/oidc/handlers.py`
|
||||
|
||||
"""
|
||||
|
||||
COURSE_CACHE_TIMEOUT = getattr(settings, 'OIDC_COURSE_HANDLER_CACHE_TIMEOUT', 60) # In seconds.
|
||||
|
||||
def __init__(self, *_args, **_kwargs):
|
||||
self._course_cache = {}
|
||||
|
||||
def scope_course_instructor(self, data):
|
||||
"""
|
||||
Scope `course_instructor` valid only if the user is an instructor
|
||||
of at least one course.
|
||||
|
||||
"""
|
||||
|
||||
# TODO: unfortunately there is not a faster and still correct way to
|
||||
# check if a user is instructor of at least one course other than
|
||||
# checking the access type against all known courses.
|
||||
course_ids = self.find_courses(data['user'], CourseInstructorRole.ROLE)
|
||||
return ['instructor_courses'] if course_ids else None
|
||||
|
||||
def scope_course_staff(self, data):
|
||||
"""
|
||||
Scope `course_staff` valid only if the user is an instructor of at
|
||||
least one course.
|
||||
|
||||
"""
|
||||
# TODO: see :method:CourseAccessHandler.scope_course_instructor
|
||||
course_ids = self.find_courses(data['user'], CourseStaffRole.ROLE)
|
||||
|
||||
return ['staff_courses'] if course_ids else None
|
||||
|
||||
def claim_instructor_courses(self, data):
|
||||
"""
|
||||
Claim `instructor_courses` with list of course_ids for which the
|
||||
user has instructor privileges.
|
||||
|
||||
"""
|
||||
|
||||
return self.find_courses(data['user'], CourseInstructorRole.ROLE, data.get('values'))
|
||||
|
||||
def claim_staff_courses(self, data):
|
||||
"""
|
||||
Claim `staff_courses` with list of course_ids for which the user
|
||||
has staff privileges.
|
||||
|
||||
"""
|
||||
|
||||
return self.find_courses(data['user'], CourseStaffRole.ROLE, data.get('values'))
|
||||
|
||||
def find_courses(self, user, access_type, values=None):
|
||||
"""
|
||||
Find all courses for which the user has the specified access type. If
|
||||
`values` is specified, check only the courses from `values`.
|
||||
|
||||
"""
|
||||
|
||||
# Check the instance cache and update if not present. The instance
|
||||
# cache is useful since there are multiple scope and claims calls in the
|
||||
# same request.
|
||||
|
||||
key = (user.id, access_type)
|
||||
if key in self._course_cache:
|
||||
course_ids = self._course_cache[key]
|
||||
else:
|
||||
course_ids = self._get_courses_with_access_type(user, access_type)
|
||||
self._course_cache[key] = course_ids
|
||||
|
||||
# If values was specified, filter out other courses.
|
||||
if values is not None:
|
||||
course_ids = list(set(course_ids) & set(values))
|
||||
|
||||
return course_ids
|
||||
|
||||
# pylint: disable=missing-docstring
|
||||
def _get_courses_with_access_type(self, user, access_type):
|
||||
# Check the application cache and update if not present. The application
|
||||
# cache is useful since there are calls to different endpoints in close
|
||||
# succession, for example the id_token and user_info endpoints.
|
||||
|
||||
key = '-'.join([str(self.__class__), str(user.id), access_type])
|
||||
course_ids = cache.get(key)
|
||||
|
||||
if not course_ids:
|
||||
course_keys = CourseOverview.get_all_course_keys()
|
||||
|
||||
# Global staff have access to all courses. Filter courses for non-global staff.
|
||||
if not GlobalStaff().has_user(user):
|
||||
course_keys = [course_key for course_key in course_keys if has_access(user, access_type, course_key)]
|
||||
|
||||
course_ids = [six.text_type(course_key) for course_key in course_keys]
|
||||
|
||||
cache.set(key, course_ids, self.COURSE_CACHE_TIMEOUT)
|
||||
|
||||
return course_ids
|
||||
|
||||
|
||||
class IDTokenHandler(OpenIDHandler, ProfileHandler, CourseAccessHandler, PermissionsHandler):
|
||||
""" Configure the ID Token handler for the LMS. """
|
||||
|
||||
def claim_instructor_courses(self, data):
|
||||
# Don't return list of courses unless they are requested as essential.
|
||||
if data.get('essential'):
|
||||
return super(IDTokenHandler, self).claim_instructor_courses(data)
|
||||
else:
|
||||
return None
|
||||
|
||||
def claim_staff_courses(self, data):
|
||||
# Don't return list of courses unless they are requested as essential.
|
||||
if data.get('essential'):
|
||||
return super(IDTokenHandler, self).claim_staff_courses(data)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
class UserInfoHandler(OpenIDHandler, ProfileHandler, CourseAccessHandler, PermissionsHandler):
|
||||
""" Configure the UserInfo handler for the LMS. """
|
||||
pass
|
||||
@@ -1,256 +0,0 @@
|
||||
# pylint: disable=missing-docstring
|
||||
|
||||
|
||||
import mock
|
||||
import six
|
||||
from django.core.cache import cache
|
||||
from django.test.utils import override_settings
|
||||
# Will also run default tests for IDTokens and UserInfo
|
||||
from edx_oauth2_provider.tests import IDTokenTestCase, UserInfoTestCase
|
||||
|
||||
from openedx.core.djangoapps.lang_pref import LANGUAGE_KEY
|
||||
from openedx.core.djangoapps.user_api.preferences.api import set_user_preference
|
||||
from student.models import UserProfile, anonymous_id_for_user
|
||||
from student.roles import CourseInstructorRole, CourseStaffRole, GlobalStaff, OrgInstructorRole, OrgStaffRole
|
||||
from student.tests.factories import UserFactory, UserProfileFactory
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory, check_mongo_calls
|
||||
|
||||
|
||||
class BaseTestMixin(ModuleStoreTestCase):
|
||||
profile = None
|
||||
ENABLED_SIGNALS = ['course_published']
|
||||
|
||||
def setUp(self):
|
||||
super(BaseTestMixin, self).setUp()
|
||||
self.course_key = CourseFactory.create(emit_signals=True).id
|
||||
self.course_id = six.text_type(self.course_key)
|
||||
self.user_factory = UserFactory
|
||||
self.set_user(self.make_user())
|
||||
|
||||
def set_user(self, user):
|
||||
super(BaseTestMixin, self).set_user(user)
|
||||
self.profile = UserProfileFactory(user=self.user)
|
||||
|
||||
|
||||
class IDTokenTest(BaseTestMixin, IDTokenTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(IDTokenTest, self).setUp()
|
||||
|
||||
# CourseAccessHandler uses the application cache.
|
||||
cache.clear()
|
||||
|
||||
def test_sub_claim(self):
|
||||
scopes, claims = self.get_id_token_values('openid')
|
||||
self.assertIn('openid', scopes)
|
||||
|
||||
sub = claims['sub']
|
||||
|
||||
expected_sub = anonymous_id_for_user(self.user, None)
|
||||
self.assertEqual(sub, expected_sub)
|
||||
|
||||
def test_user_name_claim(self):
|
||||
_scopes, claims = self.get_id_token_values('openid profile')
|
||||
claim_name = claims['name']
|
||||
|
||||
user_profile = UserProfile.objects.get(user=self.user)
|
||||
user_name = user_profile.name
|
||||
|
||||
self.assertEqual(claim_name, user_name)
|
||||
|
||||
@override_settings(LANGUAGE_CODE='en')
|
||||
def test_user_without_locale_claim(self):
|
||||
scopes, claims = self.get_id_token_values('openid profile')
|
||||
self.assertIn('profile', scopes)
|
||||
self.assertEqual(claims['locale'], 'en')
|
||||
|
||||
def test_user_with_locale_claim(self):
|
||||
language = 'en'
|
||||
set_user_preference(self.user, LANGUAGE_KEY, language)
|
||||
scopes, claims = self.get_id_token_values('openid profile')
|
||||
|
||||
self.assertIn('profile', scopes)
|
||||
|
||||
locale = claims['locale']
|
||||
self.assertEqual(language, locale)
|
||||
|
||||
def test_user_tracking_id_claim(self):
|
||||
scopes, claims = self.get_id_token_values('openid profile')
|
||||
self.assertIn('profile', scopes)
|
||||
self.assertEqual(claims['user_tracking_id'], self.user.id)
|
||||
|
||||
def test_no_special_course_access(self):
|
||||
with check_mongo_calls(0):
|
||||
scopes, claims = self.get_id_token_values('openid course_instructor course_staff')
|
||||
self.assertNotIn('course_staff', scopes)
|
||||
self.assertNotIn('staff_courses', claims)
|
||||
|
||||
self.assertNotIn('course_instructor', scopes)
|
||||
self.assertNotIn('instructor_courses', claims)
|
||||
|
||||
def test_course_staff_courses(self):
|
||||
CourseStaffRole(self.course_key).add_users(self.user)
|
||||
with check_mongo_calls(0):
|
||||
scopes, claims = self.get_id_token_values('openid course_staff')
|
||||
|
||||
self.assertIn('course_staff', scopes)
|
||||
self.assertNotIn('staff_courses', claims) # should not return courses in id_token
|
||||
|
||||
def test_course_instructor_courses(self):
|
||||
with check_mongo_calls(0):
|
||||
CourseInstructorRole(self.course_key).add_users(self.user)
|
||||
|
||||
scopes, claims = self.get_id_token_values('openid course_instructor')
|
||||
|
||||
self.assertIn('course_instructor', scopes)
|
||||
self.assertNotIn('instructor_courses', claims) # should not return courses in id_token
|
||||
|
||||
def test_course_staff_courses_with_claims(self):
|
||||
CourseStaffRole(self.course_key).add_users(self.user)
|
||||
|
||||
course_id = six.text_type(self.course_key)
|
||||
|
||||
nonexistent_course_id = 'some/other/course'
|
||||
|
||||
claims = {
|
||||
'staff_courses': {
|
||||
'values': [course_id, nonexistent_course_id],
|
||||
'essential': True,
|
||||
}
|
||||
}
|
||||
|
||||
with check_mongo_calls(0):
|
||||
scopes, claims = self.get_id_token_values(scope='openid course_staff', claims=claims)
|
||||
|
||||
self.assertIn('course_staff', scopes)
|
||||
self.assertIn('staff_courses', claims)
|
||||
self.assertEqual(len(claims['staff_courses']), 1)
|
||||
self.assertIn(course_id, claims['staff_courses'])
|
||||
self.assertNotIn(nonexistent_course_id, claims['staff_courses'])
|
||||
|
||||
def test_permissions_scope(self):
|
||||
scopes, claims = self.get_id_token_values('openid profile permissions')
|
||||
self.assertIn('permissions', scopes)
|
||||
self.assertFalse(claims['administrator'])
|
||||
|
||||
self.user.is_staff = True
|
||||
self.user.save()
|
||||
_scopes, claims = self.get_id_token_values('openid profile permissions')
|
||||
self.assertTrue(claims['administrator'])
|
||||
|
||||
def test_rate_limit_token(self):
|
||||
|
||||
response = self.get_access_token_response('openid profile permissions')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.get_access_token_response('openid profile permissions')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.get_access_token_response('openid profile permissions')
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
|
||||
class UserInfoTest(BaseTestMixin, UserInfoTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(UserInfoTest, self).setUp()
|
||||
# create another course in the DB that only global staff have access to
|
||||
CourseFactory.create(emit_signals=True)
|
||||
|
||||
def token_for_scope(self, scope):
|
||||
full_scope = 'openid %s' % scope
|
||||
self.set_access_token_scope(full_scope)
|
||||
|
||||
token = self.access_token.token
|
||||
return full_scope, token
|
||||
|
||||
def get_with_scope(self, scope):
|
||||
scope, token = self.token_for_scope(scope)
|
||||
result, claims = self.get_userinfo(token, scope)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
return claims
|
||||
|
||||
def get_with_claim_value(self, scope, claim, values):
|
||||
_full_scope, token = self.token_for_scope(scope)
|
||||
|
||||
result, claims = self.get_userinfo(
|
||||
token,
|
||||
claims={claim: {'values': values}}
|
||||
)
|
||||
|
||||
self.assertEqual(result.status_code, 200)
|
||||
return claims
|
||||
|
||||
def _assert_role_using_scope(self, scope, claim, assert_one_course=True):
|
||||
with check_mongo_calls(0):
|
||||
claims = self.get_with_scope(scope)
|
||||
self.assertEqual(len(claims), 2)
|
||||
courses = claims[claim]
|
||||
self.assertIn(self.course_id, courses)
|
||||
if assert_one_course:
|
||||
self.assertEqual(len(courses), 1)
|
||||
|
||||
def test_request_global_staff_courses_using_scope(self):
|
||||
GlobalStaff().add_users(self.user)
|
||||
self._assert_role_using_scope('course_staff', 'staff_courses', assert_one_course=False)
|
||||
|
||||
def test_request_org_staff_courses_using_scope(self):
|
||||
OrgStaffRole(self.course_key.org).add_users(self.user)
|
||||
self._assert_role_using_scope('course_staff', 'staff_courses')
|
||||
|
||||
def test_request_org_instructor_courses_using_scope(self):
|
||||
OrgInstructorRole(self.course_key.org).add_users(self.user)
|
||||
self._assert_role_using_scope('course_instructor', 'instructor_courses')
|
||||
|
||||
def test_request_staff_courses_using_scope(self):
|
||||
CourseStaffRole(self.course_key).add_users(self.user)
|
||||
self._assert_role_using_scope('course_staff', 'staff_courses')
|
||||
|
||||
def test_request_instructor_courses_using_scope(self):
|
||||
CourseInstructorRole(self.course_key).add_users(self.user)
|
||||
self._assert_role_using_scope('course_instructor', 'instructor_courses')
|
||||
|
||||
def _assert_role_using_claim(self, scope, claim):
|
||||
values = [self.course_id, 'some_invalid_course']
|
||||
with check_mongo_calls(0):
|
||||
claims = self.get_with_claim_value(scope, claim, values)
|
||||
self.assertEqual(len(claims), 2)
|
||||
|
||||
courses = claims[claim]
|
||||
self.assertIn(self.course_id, courses)
|
||||
self.assertEqual(len(courses), 1)
|
||||
|
||||
def test_request_global_staff_courses_with_claims(self):
|
||||
GlobalStaff().add_users(self.user)
|
||||
self._assert_role_using_claim('course_staff', 'staff_courses')
|
||||
|
||||
def test_request_org_staff_courses_with_claims(self):
|
||||
OrgStaffRole(self.course_key.org).add_users(self.user)
|
||||
self._assert_role_using_claim('course_staff', 'staff_courses')
|
||||
|
||||
def test_request_org_instructor_courses_with_claims(self):
|
||||
OrgInstructorRole(self.course_key.org).add_users(self.user)
|
||||
self._assert_role_using_claim('course_instructor', 'instructor_courses')
|
||||
|
||||
def test_request_staff_courses_with_claims(self):
|
||||
CourseStaffRole(self.course_key).add_users(self.user)
|
||||
self._assert_role_using_claim('course_staff', 'staff_courses')
|
||||
|
||||
def test_request_instructor_courses_with_claims(self):
|
||||
CourseInstructorRole(self.course_key).add_users(self.user)
|
||||
self._assert_role_using_claim('course_instructor', 'instructor_courses')
|
||||
|
||||
def test_permissions_scope(self):
|
||||
claims = self.get_with_scope('permissions')
|
||||
self.assertIn('administrator', claims)
|
||||
self.assertFalse(claims['administrator'])
|
||||
|
||||
self.user.is_staff = True
|
||||
self.user.save()
|
||||
claims = self.get_with_scope('permissions')
|
||||
self.assertTrue(claims['administrator'])
|
||||
|
||||
def test_profile_scope(self):
|
||||
claims = self.get_with_scope('profile')
|
||||
self.assertEqual(claims['name'], UserProfile.objects.get(user=self.user).name)
|
||||
self.assertEqual(claims['user_tracking_id'], self.user.id)
|
||||
@@ -566,14 +566,12 @@ OAUTH_OIDC_ID_TOKEN_HANDLERS = (
|
||||
'edx_oauth2_provider.oidc.handlers.BasicIDTokenHandler',
|
||||
'edx_oauth2_provider.oidc.handlers.ProfileHandler',
|
||||
'edx_oauth2_provider.oidc.handlers.EmailHandler',
|
||||
'oauth2_handler.IDTokenHandler'
|
||||
)
|
||||
|
||||
OAUTH_OIDC_USERINFO_HANDLERS = (
|
||||
'edx_oauth2_provider.oidc.handlers.BasicUserInfoHandler',
|
||||
'edx_oauth2_provider.oidc.handlers.ProfileHandler',
|
||||
'edx_oauth2_provider.oidc.handlers.EmailHandler',
|
||||
'oauth2_handler.UserInfoHandler'
|
||||
)
|
||||
|
||||
OAUTH_EXPIRE_CONFIDENTIAL_CLIENT_DAYS = 365
|
||||
|
||||
Reference in New Issue
Block a user