Merge pull request #23194 from edx/diana/remove-from-logout-flow

Remove more references to the old edx_oauth2_provider library.
This commit is contained in:
Diana Huang
2020-02-25 14:17:04 -05:00
committed by GitHub
5 changed files with 0 additions and 507 deletions

View File

@@ -1,4 +0,0 @@
""" Handlers for OpenID Connect provider. """
from oauth2_handler.handlers import IDTokenHandler, UserInfoHandler

View File

@@ -1,241 +0,0 @@
""" Handlers for OpenID Connect provider. """
import six
from django.conf import settings
from django.core.cache import cache
from lms.djangoapps.courseware.access import has_access
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
from openedx.core.djangoapps.lang_pref import LANGUAGE_KEY
from openedx.core.djangoapps.user_api.models import UserPreference
from student.models import UserProfile, anonymous_id_for_user
from student.roles import CourseInstructorRole, CourseStaffRole, GlobalStaff
class OpenIDHandler(object):
""" Basic OpenID Connect scope handler. """
def scope_openid(self, _data):
""" Only override the sub (subject) claim. """
return ['sub']
def claim_sub(self, data):
"""
Return the value of the sub (subject) claim. The value should be
unique for each user.
"""
# Use the anonymous ID without any course as unique identifier.
# Note that this ID is derived using the value of the `SECRET_KEY`
# setting, this means that users will have different sub
# values for different deployments.
value = anonymous_id_for_user(data['user'], None)
return value
class PermissionsHandler(object):
""" Permissions scope handler """
def scope_permissions(self, _data):
return ['administrator']
def claim_administrator(self, data):
"""
Return boolean indicating user's administrator status.
For our purposes an administrator is any user with is_staff set to True.
"""
return data['user'].is_staff
class ProfileHandler(object):
""" Basic OpenID Connect `profile` scope handler with `locale` claim. """
def scope_profile(self, _data):
""" Add specialized claims. """
return ['name', 'locale', 'user_tracking_id']
def claim_name(self, data):
""" User displayable full name. """
user = data['user']
profile = UserProfile.objects.get(user=user)
return profile.name
def claim_locale(self, data):
"""
Return the locale for the users based on their preferences.
Does not return a value if the users have not set their locale preferences.
"""
# Calling UserPreference directly because it is not clear which user made the request.
language = UserPreference.get_value(data['user'], LANGUAGE_KEY)
# If the user has no language specified, return the default one.
if not language:
language = settings.LANGUAGE_CODE
return language
def claim_user_tracking_id(self, data):
""" User tracking ID. """
return data['user'].id
class CourseAccessHandler(object):
"""
Defines two new scopes: `course_instructor` and `course_staff`. Each one is
valid only if the user is instructor or staff of at least one course.
Each new scope has a corresponding claim: `instructor_courses` and
`staff_courses` that lists the course_ids for which the user has instructor
or staff privileges.
The claims support claim request values: if there is no claim request, the
value of the claim is the list all the courses for which the user has the
corresponding privileges. If a claim request is used, then the value of the
claim the list of courses from the requested values that have the
corresponding privileges.
For example, if the user is staff of course_a and course_b but not
course_c, the claim corresponding to the scope request:
scope = openid course_staff
has the value:
{staff_courses: [course_a, course_b] }
For the claim request:
claims = {userinfo: {staff_courses: {values=[course_b, course_d]}}}
the corresponding claim will have the value:
{staff_courses: [course_b] }.
This is useful to quickly determine if a user has the right privileges for a
given course.
For a description of the function naming and arguments, see:
`edx_oauth2_provider/oidc/handlers.py`
"""
COURSE_CACHE_TIMEOUT = getattr(settings, 'OIDC_COURSE_HANDLER_CACHE_TIMEOUT', 60) # In seconds.
def __init__(self, *_args, **_kwargs):
self._course_cache = {}
def scope_course_instructor(self, data):
"""
Scope `course_instructor` valid only if the user is an instructor
of at least one course.
"""
# TODO: unfortunately there is not a faster and still correct way to
# check if a user is instructor of at least one course other than
# checking the access type against all known courses.
course_ids = self.find_courses(data['user'], CourseInstructorRole.ROLE)
return ['instructor_courses'] if course_ids else None
def scope_course_staff(self, data):
"""
Scope `course_staff` valid only if the user is an instructor of at
least one course.
"""
# TODO: see :method:CourseAccessHandler.scope_course_instructor
course_ids = self.find_courses(data['user'], CourseStaffRole.ROLE)
return ['staff_courses'] if course_ids else None
def claim_instructor_courses(self, data):
"""
Claim `instructor_courses` with list of course_ids for which the
user has instructor privileges.
"""
return self.find_courses(data['user'], CourseInstructorRole.ROLE, data.get('values'))
def claim_staff_courses(self, data):
"""
Claim `staff_courses` with list of course_ids for which the user
has staff privileges.
"""
return self.find_courses(data['user'], CourseStaffRole.ROLE, data.get('values'))
def find_courses(self, user, access_type, values=None):
"""
Find all courses for which the user has the specified access type. If
`values` is specified, check only the courses from `values`.
"""
# Check the instance cache and update if not present. The instance
# cache is useful since there are multiple scope and claims calls in the
# same request.
key = (user.id, access_type)
if key in self._course_cache:
course_ids = self._course_cache[key]
else:
course_ids = self._get_courses_with_access_type(user, access_type)
self._course_cache[key] = course_ids
# If values was specified, filter out other courses.
if values is not None:
course_ids = list(set(course_ids) & set(values))
return course_ids
# pylint: disable=missing-docstring
def _get_courses_with_access_type(self, user, access_type):
# Check the application cache and update if not present. The application
# cache is useful since there are calls to different endpoints in close
# succession, for example the id_token and user_info endpoints.
key = '-'.join([str(self.__class__), str(user.id), access_type])
course_ids = cache.get(key)
if not course_ids:
course_keys = CourseOverview.get_all_course_keys()
# Global staff have access to all courses. Filter courses for non-global staff.
if not GlobalStaff().has_user(user):
course_keys = [course_key for course_key in course_keys if has_access(user, access_type, course_key)]
course_ids = [six.text_type(course_key) for course_key in course_keys]
cache.set(key, course_ids, self.COURSE_CACHE_TIMEOUT)
return course_ids
class IDTokenHandler(OpenIDHandler, ProfileHandler, CourseAccessHandler, PermissionsHandler):
""" Configure the ID Token handler for the LMS. """
def claim_instructor_courses(self, data):
# Don't return list of courses unless they are requested as essential.
if data.get('essential'):
return super(IDTokenHandler, self).claim_instructor_courses(data)
else:
return None
def claim_staff_courses(self, data):
# Don't return list of courses unless they are requested as essential.
if data.get('essential'):
return super(IDTokenHandler, self).claim_staff_courses(data)
else:
return None
class UserInfoHandler(OpenIDHandler, ProfileHandler, CourseAccessHandler, PermissionsHandler):
""" Configure the UserInfo handler for the LMS. """
pass

View File

@@ -1,256 +0,0 @@
# pylint: disable=missing-docstring
import mock
import six
from django.core.cache import cache
from django.test.utils import override_settings
# Will also run default tests for IDTokens and UserInfo
from edx_oauth2_provider.tests import IDTokenTestCase, UserInfoTestCase
from openedx.core.djangoapps.lang_pref import LANGUAGE_KEY
from openedx.core.djangoapps.user_api.preferences.api import set_user_preference
from student.models import UserProfile, anonymous_id_for_user
from student.roles import CourseInstructorRole, CourseStaffRole, GlobalStaff, OrgInstructorRole, OrgStaffRole
from student.tests.factories import UserFactory, UserProfileFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory, check_mongo_calls
class BaseTestMixin(ModuleStoreTestCase):
profile = None
ENABLED_SIGNALS = ['course_published']
def setUp(self):
super(BaseTestMixin, self).setUp()
self.course_key = CourseFactory.create(emit_signals=True).id
self.course_id = six.text_type(self.course_key)
self.user_factory = UserFactory
self.set_user(self.make_user())
def set_user(self, user):
super(BaseTestMixin, self).set_user(user)
self.profile = UserProfileFactory(user=self.user)
class IDTokenTest(BaseTestMixin, IDTokenTestCase):
def setUp(self):
super(IDTokenTest, self).setUp()
# CourseAccessHandler uses the application cache.
cache.clear()
def test_sub_claim(self):
scopes, claims = self.get_id_token_values('openid')
self.assertIn('openid', scopes)
sub = claims['sub']
expected_sub = anonymous_id_for_user(self.user, None)
self.assertEqual(sub, expected_sub)
def test_user_name_claim(self):
_scopes, claims = self.get_id_token_values('openid profile')
claim_name = claims['name']
user_profile = UserProfile.objects.get(user=self.user)
user_name = user_profile.name
self.assertEqual(claim_name, user_name)
@override_settings(LANGUAGE_CODE='en')
def test_user_without_locale_claim(self):
scopes, claims = self.get_id_token_values('openid profile')
self.assertIn('profile', scopes)
self.assertEqual(claims['locale'], 'en')
def test_user_with_locale_claim(self):
language = 'en'
set_user_preference(self.user, LANGUAGE_KEY, language)
scopes, claims = self.get_id_token_values('openid profile')
self.assertIn('profile', scopes)
locale = claims['locale']
self.assertEqual(language, locale)
def test_user_tracking_id_claim(self):
scopes, claims = self.get_id_token_values('openid profile')
self.assertIn('profile', scopes)
self.assertEqual(claims['user_tracking_id'], self.user.id)
def test_no_special_course_access(self):
with check_mongo_calls(0):
scopes, claims = self.get_id_token_values('openid course_instructor course_staff')
self.assertNotIn('course_staff', scopes)
self.assertNotIn('staff_courses', claims)
self.assertNotIn('course_instructor', scopes)
self.assertNotIn('instructor_courses', claims)
def test_course_staff_courses(self):
CourseStaffRole(self.course_key).add_users(self.user)
with check_mongo_calls(0):
scopes, claims = self.get_id_token_values('openid course_staff')
self.assertIn('course_staff', scopes)
self.assertNotIn('staff_courses', claims) # should not return courses in id_token
def test_course_instructor_courses(self):
with check_mongo_calls(0):
CourseInstructorRole(self.course_key).add_users(self.user)
scopes, claims = self.get_id_token_values('openid course_instructor')
self.assertIn('course_instructor', scopes)
self.assertNotIn('instructor_courses', claims) # should not return courses in id_token
def test_course_staff_courses_with_claims(self):
CourseStaffRole(self.course_key).add_users(self.user)
course_id = six.text_type(self.course_key)
nonexistent_course_id = 'some/other/course'
claims = {
'staff_courses': {
'values': [course_id, nonexistent_course_id],
'essential': True,
}
}
with check_mongo_calls(0):
scopes, claims = self.get_id_token_values(scope='openid course_staff', claims=claims)
self.assertIn('course_staff', scopes)
self.assertIn('staff_courses', claims)
self.assertEqual(len(claims['staff_courses']), 1)
self.assertIn(course_id, claims['staff_courses'])
self.assertNotIn(nonexistent_course_id, claims['staff_courses'])
def test_permissions_scope(self):
scopes, claims = self.get_id_token_values('openid profile permissions')
self.assertIn('permissions', scopes)
self.assertFalse(claims['administrator'])
self.user.is_staff = True
self.user.save()
_scopes, claims = self.get_id_token_values('openid profile permissions')
self.assertTrue(claims['administrator'])
def test_rate_limit_token(self):
response = self.get_access_token_response('openid profile permissions')
self.assertEqual(response.status_code, 200)
response = self.get_access_token_response('openid profile permissions')
self.assertEqual(response.status_code, 200)
response = self.get_access_token_response('openid profile permissions')
self.assertEqual(response.status_code, 403)
class UserInfoTest(BaseTestMixin, UserInfoTestCase):
def setUp(self):
super(UserInfoTest, self).setUp()
# create another course in the DB that only global staff have access to
CourseFactory.create(emit_signals=True)
def token_for_scope(self, scope):
full_scope = 'openid %s' % scope
self.set_access_token_scope(full_scope)
token = self.access_token.token
return full_scope, token
def get_with_scope(self, scope):
scope, token = self.token_for_scope(scope)
result, claims = self.get_userinfo(token, scope)
self.assertEqual(result.status_code, 200)
return claims
def get_with_claim_value(self, scope, claim, values):
_full_scope, token = self.token_for_scope(scope)
result, claims = self.get_userinfo(
token,
claims={claim: {'values': values}}
)
self.assertEqual(result.status_code, 200)
return claims
def _assert_role_using_scope(self, scope, claim, assert_one_course=True):
with check_mongo_calls(0):
claims = self.get_with_scope(scope)
self.assertEqual(len(claims), 2)
courses = claims[claim]
self.assertIn(self.course_id, courses)
if assert_one_course:
self.assertEqual(len(courses), 1)
def test_request_global_staff_courses_using_scope(self):
GlobalStaff().add_users(self.user)
self._assert_role_using_scope('course_staff', 'staff_courses', assert_one_course=False)
def test_request_org_staff_courses_using_scope(self):
OrgStaffRole(self.course_key.org).add_users(self.user)
self._assert_role_using_scope('course_staff', 'staff_courses')
def test_request_org_instructor_courses_using_scope(self):
OrgInstructorRole(self.course_key.org).add_users(self.user)
self._assert_role_using_scope('course_instructor', 'instructor_courses')
def test_request_staff_courses_using_scope(self):
CourseStaffRole(self.course_key).add_users(self.user)
self._assert_role_using_scope('course_staff', 'staff_courses')
def test_request_instructor_courses_using_scope(self):
CourseInstructorRole(self.course_key).add_users(self.user)
self._assert_role_using_scope('course_instructor', 'instructor_courses')
def _assert_role_using_claim(self, scope, claim):
values = [self.course_id, 'some_invalid_course']
with check_mongo_calls(0):
claims = self.get_with_claim_value(scope, claim, values)
self.assertEqual(len(claims), 2)
courses = claims[claim]
self.assertIn(self.course_id, courses)
self.assertEqual(len(courses), 1)
def test_request_global_staff_courses_with_claims(self):
GlobalStaff().add_users(self.user)
self._assert_role_using_claim('course_staff', 'staff_courses')
def test_request_org_staff_courses_with_claims(self):
OrgStaffRole(self.course_key.org).add_users(self.user)
self._assert_role_using_claim('course_staff', 'staff_courses')
def test_request_org_instructor_courses_with_claims(self):
OrgInstructorRole(self.course_key.org).add_users(self.user)
self._assert_role_using_claim('course_instructor', 'instructor_courses')
def test_request_staff_courses_with_claims(self):
CourseStaffRole(self.course_key).add_users(self.user)
self._assert_role_using_claim('course_staff', 'staff_courses')
def test_request_instructor_courses_with_claims(self):
CourseInstructorRole(self.course_key).add_users(self.user)
self._assert_role_using_claim('course_instructor', 'instructor_courses')
def test_permissions_scope(self):
claims = self.get_with_scope('permissions')
self.assertIn('administrator', claims)
self.assertFalse(claims['administrator'])
self.user.is_staff = True
self.user.save()
claims = self.get_with_scope('permissions')
self.assertTrue(claims['administrator'])
def test_profile_scope(self):
claims = self.get_with_scope('profile')
self.assertEqual(claims['name'], UserProfile.objects.get(user=self.user).name)
self.assertEqual(claims['user_tracking_id'], self.user.id)

View File

@@ -566,14 +566,12 @@ OAUTH_OIDC_ID_TOKEN_HANDLERS = (
'edx_oauth2_provider.oidc.handlers.BasicIDTokenHandler',
'edx_oauth2_provider.oidc.handlers.ProfileHandler',
'edx_oauth2_provider.oidc.handlers.EmailHandler',
'oauth2_handler.IDTokenHandler'
)
OAUTH_OIDC_USERINFO_HANDLERS = (
'edx_oauth2_provider.oidc.handlers.BasicUserInfoHandler',
'edx_oauth2_provider.oidc.handlers.ProfileHandler',
'edx_oauth2_provider.oidc.handlers.EmailHandler',
'oauth2_handler.UserInfoHandler'
)
OAUTH_EXPIRE_CONFIDENTIAL_CLIENT_DAYS = 365

View File

@@ -3,7 +3,6 @@
import re
import edx_oauth2_provider
import six.moves.urllib.parse as parse # pylint: disable=import-error
from django.conf import settings
from django.contrib.auth import logout
@@ -75,9 +74,6 @@ class LogoutView(TemplateView):
# Get third party auth provider's logout url
self.tpa_logout_url = tpa_pipeline.get_idp_logout_url_from_running_pipeline(request)
# Get the list of authorized clients before we clear the session.
self.oauth_client_ids = request.session.get(edx_oauth2_provider.constants.AUTHORIZED_CLIENTS_SESSION_KEY, [])
logout(request)
response = super(LogoutView, self).dispatch(request, *args, **kwargs)