Merge pull request #19103 from edx/arch/refactor-oauth-scopes-tests

Shared mixin for testing OAuth Scopes and various Auth
This commit is contained in:
Nimisha Asthagiri
2018-10-15 11:32:12 -04:00
committed by GitHub
3 changed files with 289 additions and 291 deletions

View File

@@ -2,16 +2,12 @@
Tests for the Certificate REST APIs.
"""
# pylint: disable=missing-docstring
from datetime import datetime, timedelta
from enum import Enum
from itertools import product
import ddt
from mock import patch
from django.urls import reverse
from django.utils import timezone
from freezegun import freeze_time
from oauth2_provider import models as dot_models
from rest_framework import status
from rest_framework.test import APITestCase
@@ -19,32 +15,21 @@ from course_modes.models import CourseMode
from lms.djangoapps.certificates.apis.v0.views import CertificatesDetailView
from lms.djangoapps.certificates.models import CertificateStatuses
from lms.djangoapps.certificates.tests.factories import GeneratedCertificateFactory
from openedx.core.djangoapps.oauth_dispatch.jwt import _create_jwt
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from openedx.core.djangoapps.user_authn.tests.utils import AuthType, AuthAndScopesTestMixin
from student.tests.factories import UserFactory
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
USER_PASSWORD = 'test'
class AuthType(Enum):
session = 1
oauth = 2
jwt = 3
jwt_restricted = 4
JWT_AUTH_TYPES = [AuthType.jwt, AuthType.jwt_restricted]
@ddt.ddt
class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
class CertificatesRestApiTest(AuthAndScopesTestMixin, SharedModuleStoreTestCase, APITestCase):
"""
Test for the Certificates REST APIs
"""
shard = 4
now = timezone.now()
default_scopes = CertificatesDetailView.required_scopes
@classmethod
def setUpClass(cls):
@@ -62,10 +47,6 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
super(CertificatesRestApiTest, self).setUp()
self.student = UserFactory.create(password=USER_PASSWORD)
self.student_no_cert = UserFactory.create(password=USER_PASSWORD)
self.staff_user = UserFactory.create(password=USER_PASSWORD, is_staff=True)
GeneratedCertificateFactory.create(
user=self.student,
course_id=self.course.id,
@@ -77,7 +58,18 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
self.namespaced_url = 'certificates_api:v0:certificates:detail'
def _assert_certificate_response(self, response):
def get_url(self, username):
""" This method is required by AuthAndScopesTestMixin. """
return reverse(
self.namespaced_url,
kwargs={
'course_id': self.course.id,
'username': username
}
)
def assert_success_response_for_student(self, response):
""" This method is required by AuthAndScopesTestMixin. """
self.assertEqual(
response.data,
{
@@ -92,86 +84,12 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
}
)
def _get_url(self, username):
"""
Helper function to create the url for certificates
"""
return reverse(
self.namespaced_url,
kwargs={
'course_id': self.course.id,
'username': username
}
)
def _create_oauth_token(self, user):
dot_app_user = UserFactory.create(password=USER_PASSWORD)
dot_app = dot_models.Application.objects.create(
name='test app',
user=dot_app_user,
client_type='confidential',
authorization_grant_type='authorization-code',
redirect_uris='http://localhost:8079/complete/edxorg/'
)
return dot_models.AccessToken.objects.create(
user=user,
application=dot_app,
expires=datetime.utcnow() + timedelta(weeks=1),
scope='read write',
token='test_token',
)
def _create_jwt_token(self, user, auth_type, scopes=None, include_org_filter=True, include_me_filter=False):
filters = []
if include_org_filter:
filters += ['content_org:{}'.format(self.course.id.org)]
if include_me_filter:
filters += ['user:me']
if scopes is None:
scopes = CertificatesDetailView.required_scopes
return _create_jwt(
user,
scopes=scopes,
is_restricted=(auth_type == AuthType.jwt_restricted),
filters=filters,
)
def _get_response(self, requesting_user, auth_type, url=None, token=None):
auth_header = None
if auth_type == AuthType.session:
self.client.login(username=requesting_user.username, password=USER_PASSWORD)
elif auth_type == AuthType.oauth:
if not token:
token = self._create_oauth_token(requesting_user)
auth_header = "Bearer {0}".format(token)
else:
assert auth_type in JWT_AUTH_TYPES
if not token:
token = self._create_jwt_token(requesting_user, auth_type)
auth_header = "JWT {0}".format(token)
extra = dict(HTTP_AUTHORIZATION=auth_header) if auth_header else {}
return self.client.get(
url if url else self._get_url(self.student.username),
**extra
)
def _assert_in_log(self, text, mock_log_method):
self.assertTrue(mock_log_method.called)
self.assertIn(text, mock_log_method.call_args_list[0][0][0])
def test_anonymous_user(self):
resp = self.client.get(self._get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
@ddt.data(*list(AuthType))
def test_no_certificate(self, auth_type):
resp = self._get_response(
self.student_no_cert,
auth_type,
url=self._get_url(self.student_no_cert.username),
def test_no_certificate(self):
student_no_cert = UserFactory.create(password=self.user_password)
resp = self.get_response(
AuthType.session,
requesting_user=student_no_cert,
requested_user=student_no_cert,
)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
self.assertIn('error_code', resp.data)
@@ -179,116 +97,3 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
resp.data['error_code'],
'no_certificate_for_user',
)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_self_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self._get_response(self.student, auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self._assert_certificate_response(resp)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_inactive_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
self.student.is_active = False
self.student.save()
resp = self._get_response(self.student, auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_staff_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self._get_response(self.staff_user, auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_another_user(self, auth_type, scopes_enforced, mock_log):
""" Returns 403 for OAuth and Session auth with IsUserInUrl. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self._get_response(self.student_no_cert, auth_type)
# Restricted JWT tokens without the user:me filter have access to other users
expected_jwt_access_granted = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(
resp.status_code,
status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN,
)
if not expected_jwt_access_granted:
self._assert_in_log("IsUserInUrl", mock_log.info)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_no_scopes(self, auth_type, scopes_enforced, mock_log):
""" Returns 403 when scopes are enforced with JwtHasScope. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[])
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
if is_enforced:
self._assert_in_log("JwtHasScope", mock_log.warning)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log):
""" Returns 403 when scopes are enforced with JwtHasContentOrgFilterForRequestedCourse. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False)
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
if is_enforced:
self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning)
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True)
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log):
""" Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student_no_cert, auth_type, include_me_filter=True)
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
if scopes_enforced and auth_type == AuthType.jwt_restricted:
self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning)
else:
self._assert_in_log("IsUserInUrl", mock_log.info)
def test_valid_oauth_token(self):
resp = self._get_response(self.student, AuthType.oauth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_invalid_oauth_token(self):
resp = self._get_response(self.student, AuthType.oauth, token="fooooooooooToken")
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
def test_expired_oauth_token(self):
token = self._create_oauth_token(self.student)
token.expires = datetime.utcnow() - timedelta(weeks=1)
token.save()
resp = self._get_response(self.student, AuthType.oauth, token=token)
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)

View File

@@ -15,11 +15,13 @@ from rest_framework.test import APITestCase
from six import text_type
from lms.djangoapps.courseware.tests.factories import GlobalStaffFactory, StaffFactory
from lms.djangoapps.grades.api.v1.views import CourseGradesView
from lms.djangoapps.grades.config.waffle import waffle_flags, WRITABLE_GRADEBOOK
from lms.djangoapps.grades.course_data import CourseData
from lms.djangoapps.grades.course_grade import CourseGrade
from lms.djangoapps.grades.subsection_grade import ReadSubsectionGrade
from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory
from openedx.core.djangoapps.user_authn.tests.utils import AuthAndScopesTestMixin
from openedx.core.djangoapps.waffle_utils.testutils import override_waffle_flag
from student.tests.factories import CourseEnrollmentFactory, UserFactory
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
@@ -79,28 +81,23 @@ class GradeViewTestMixin(SharedModuleStoreTestCase):
)
cls.course_key = cls.course.id
cls.password = 'test'
cls.student = UserFactory(username='dummy', password=cls.password)
cls.other_student = UserFactory(username='foo', password=cls.password)
cls.other_user = UserFactory(username='bar', password=cls.password)
cls.staff = StaffFactory(course_key=cls.course_key, password=cls.password)
cls.global_staff = GlobalStaffFactory.create()
cls._create_user_enrollments(cls.course, cls.student, cls.other_student)
def setUp(self):
super(GradeViewTestMixin, self).setUp()
self.client.login(username=self.student.username, password=self.password)
@classmethod
def _create_user_enrollments(cls, course, *users):
def _create_user_enrollments(self, *users):
date = datetime(2013, 1, 22, tzinfo=UTC)
for user in users:
CourseEnrollmentFactory(
course_id=course.id,
course_id=self.course.id,
user=user,
created=date,
)
def setUp(self):
super(GradeViewTestMixin, self).setUp()
self.password = 'test'
self.global_staff = GlobalStaffFactory.create()
self.student = UserFactory(password=self.password)
self.other_student = UserFactory(password=self.password)
self._create_user_enrollments(self.student, self.other_student)
@classmethod
def _create_test_course_with_default_grading_policy(cls, display_name, run):
"""
@@ -141,12 +138,13 @@ class GradeViewTestMixin(SharedModuleStoreTestCase):
@ddt.ddt
class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
class SingleUserGradesTests(GradeViewTestMixin, AuthAndScopesTestMixin, APITestCase):
"""
Tests for grades related to a course and specific user
e.g. /api/grades/v1/courses/{course_id}/?username={username}
/api/grades/v1/courses/?course_id={course_id}&username={username}
"""
default_scopes = CourseGradesView.required_scopes
@classmethod
def setUpClass(cls):
@@ -154,9 +152,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
cls.namespaced_url = 'grades_api:v1:course_grades'
def get_url(self, username):
"""
Helper function to create the url
"""
""" This method is required by AuthAndScopesTestMixin. """
base_url = reverse(
self.namespaced_url,
kwargs={
@@ -165,48 +161,35 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
)
return "{0}?username={1}".format(base_url, username)
def test_anonymous(self):
"""
Test that an anonymous user cannot access the API and an error is received.
"""
self.client.logout()
resp = self.client.get(self.get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
def test_self_get_grade(self):
"""
Test that a user can successfully request her own grade.
"""
resp = self.client.get(self.get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def assert_success_response_for_student(self, response):
""" This method is required by AuthAndScopesTestMixin. """
expected_data = [{
'username': self.student.username,
'email': self.student.email,
'letter_grade': None,
'percent': 0.0,
'course_id': str(self.course_key),
'passed': False
}]
self.assertEqual(response.data, expected_data)
def test_nonexistent_user(self):
"""
Test that a request for a nonexistent username returns an error.
"""
self.client.logout()
self.client.login(username=self.global_staff.username, password=self.password)
resp = self.client.get(self.get_url('IDoNotExist'))
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
def test_other_get_grade(self):
"""
Test that if a user requests the grade for another user, she receives an error.
"""
self.client.logout()
self.client.login(username=self.other_student.username, password=self.password)
resp = self.client.get(self.get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
def test_self_get_grade_not_enrolled(self):
"""
Test that a user receives an error if she requests
her own grade in a course where she is not enrolled.
"""
# a user not enrolled in the course cannot request her grade
self.client.logout()
self.client.login(username=self.other_user.username, password=self.password)
resp = self.client.get(self.get_url(self.other_user.username))
unenrolled_user = UserFactory(password=self.password)
self.client.login(username=unenrolled_user.username, password=self.password)
resp = self.client.get(self.get_url(unenrolled_user.username))
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
self.assertIn('error_code', resp.data)
self.assertEqual(
@@ -218,6 +201,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
"""
Test the grade for a user who has not answered any test.
"""
self.client.login(username=self.student.username, password=self.password)
resp = self.client.get(self.get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_200_OK)
expected_data = [{
@@ -239,6 +223,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
"""Mocked function to always raise an exception"""
raise InvalidKeyError('foo', 'bar')
self.client.login(username=self.student.username, password=self.password)
with patch('opaque_keys.edx.keys.CourseKey.from_string', side_effect=mock_from_string):
resp = self.client.get(self.get_url(self.student.username))
@@ -253,6 +238,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
"""
Test that requesting a valid, nonexistent course key returns an error as expected.
"""
self.client.login(username=self.student.username, password=self.password)
base_url = reverse(
self.namespaced_url,
kwargs={
@@ -276,6 +262,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
"""
Test that the user gets her grade in case she answered tests with an insufficient score.
"""
self.client.login(username=self.student.username, password=self.password)
with patch('lms.djangoapps.grades.course_grade_factory.CourseGradeFactory.read') as mock_grade:
grade_fields = {
'letter_grade': grade['letter_grade'],
@@ -296,24 +283,6 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
expected_data.update(grade)
self.assertEqual(resp.data, [expected_data])
def test_staff_can_see_student(self):
"""
Ensure that staff members can see her student's grades.
"""
self.client.logout()
self.client.login(username=self.global_staff.username, password=self.password)
resp = self.client.get(self.get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_200_OK)
expected_data = [{
'username': self.student.username,
'email': self.student.email,
'letter_grade': None,
'percent': 0.0,
'course_id': str(self.course_key),
'passed': False
}]
self.assertEqual(resp.data, expected_data)
@ddt.ddt
class CourseGradesViewTest(GradeViewTestMixin, APITestCase):
@@ -342,16 +311,15 @@ class CourseGradesViewTest(GradeViewTestMixin, APITestCase):
return base_url
def test_anonymous(self):
self.client.logout()
resp = self.client.get(self.get_url())
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
def test_student(self):
self.client.login(username=self.student.username, password=self.password)
resp = self.client.get(self.get_url())
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
def test_course_does_not_exist(self):
self.client.logout()
self.client.login(username=self.global_staff.username, password=self.password)
resp = self.client.get(
self.get_url(course_key='course-v1:MITx+8.MechCX+2014_T1')
@@ -359,7 +327,6 @@ class CourseGradesViewTest(GradeViewTestMixin, APITestCase):
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
def test_course_no_enrollments(self):
self.client.logout()
self.client.login(username=self.global_staff.username, password=self.password)
resp = self.client.get(
self.get_url(course_key=self.empty_course.id)
@@ -368,7 +335,6 @@ class CourseGradesViewTest(GradeViewTestMixin, APITestCase):
self.assertEqual(resp.data, [])
def test_staff_can_get_all_grades(self):
self.client.logout()
self.client.login(username=self.global_staff.username, password=self.password)
resp = self.client.get(self.get_url())
@@ -410,10 +376,6 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase):
cls.course = CourseFactory.create(display_name='test-course', run='run-1')
cls.course_overview = CourseOverviewFactory.create(id=cls.course.id)
# we re-assign cls.course from what's created in the parent class, so we have to
# re-create the enrollments, too.
cls._create_user_enrollments(cls.course, cls.student, cls.other_student)
cls.chapter_1 = ItemFactory.create(
category='chapter',
parent_location=cls.course.location,
@@ -538,7 +500,6 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase):
Helper function to login the global staff user, who has permissions to read from the
Gradebook API.
"""
self.client.logout()
self.client.login(username=self.global_staff.username, password=self.password)
def expected_subsection_grades(self, letter_grade=None):
@@ -637,7 +598,6 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase):
]
def test_feature_not_enabled(self):
self.client.logout()
self.client.login(username=self.global_staff.username, password=self.password)
with override_waffle_flag(self.waffle_flag, active=False):
resp = self.client.get(
@@ -646,12 +606,12 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase):
self.assertEqual(status.HTTP_403_FORBIDDEN, resp.status_code)
def test_anonymous(self):
self.client.logout()
with override_waffle_flag(self.waffle_flag, active=True):
resp = self.client.get(self.get_url())
self.assertEqual(status.HTTP_401_UNAUTHORIZED, resp.status_code)
def test_student(self):
self.client.login(username=self.student.username, password=self.password)
with override_waffle_flag(self.waffle_flag, active=True):
resp = self.client.get(self.get_url())
self.assertEqual(status.HTTP_403_FORBIDDEN, resp.status_code)

View File

@@ -1,9 +1,29 @@
""" Common utilities for tests in the user_authn app. """
from datetime import datetime, timedelta
from enum import Enum
from itertools import product
import ddt
from mock import patch
from django.conf import settings
from oauth2_provider import models as dot_models
from rest_framework import status
from openedx.core.djangoapps.oauth_dispatch.adapters.dot import DOTAdapter
from openedx.core.djangoapps.oauth_dispatch.jwt import _create_jwt
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from student.tests.factories import UserFactory
class AuthType(Enum):
session = 1
oauth = 2
jwt = 3
jwt_restricted = 4
JWT_AUTH_TYPES = [AuthType.jwt, AuthType.jwt_restricted]
def setup_login_oauth_client():
"""
Sets up a test OAuth client for the login service.
@@ -15,3 +35,216 @@ def setup_login_oauth_client():
redirect_uri='',
client_id=settings.JWT_AUTH['JWT_LOGIN_CLIENT_ID'],
)
@ddt.ddt
class AuthAndScopesTestMixin(object):
"""
Mixin class to test authentication and oauth scopes for an API.
Test classes that use this Mixin need to define:
default_scopes - default list of scopes to include in created JWTs.
get_url(self, username) - method that returns the URL to call given
a username.
assert_success_response_for_student(resp) - method that verifies the
data returned in a successful response when accessing the URL for
self.student.
"""
default_scopes = None
user_password = 'test'
def setUp(self):
super(AuthAndScopesTestMixin, self).setUp()
self.student = UserFactory.create(password=self.user_password)
self.other_student = UserFactory.create(password=self.user_password)
self.global_staff = UserFactory.create(password=self.user_password, is_staff=True)
def get_response(self, auth_type, requesting_user=None, requested_user=None, url=None, token=None):
"""
Calls the url using the given auth_type.
Arguments:
- requesting_user is the user that is making the call to the url. Defaults to self.student.
- requested_user is user that is passed to the url. Defaults to self.student.
- url defaults to the response from calling self.get_url with requested_user.username.
- token defaults to the default creation of the token given the value of auth_type.
"""
requesting_user = requesting_user or self.student
requested_user = requested_user or self.student
auth_header = None
if auth_type == AuthType.session:
self.client.login(username=requesting_user.username, password=self.user_password)
elif auth_type == AuthType.oauth:
if not token:
token = self._create_oauth_token(requesting_user)
auth_header = "Bearer {0}".format(token)
else:
assert auth_type in JWT_AUTH_TYPES
if not token:
token = self._create_jwt_token(requesting_user, auth_type)
auth_header = "JWT {0}".format(token)
extra = dict(HTTP_AUTHORIZATION=auth_header) if auth_header else {}
return self.client.get(
url if url else self.get_url(requested_user.username),
**extra
)
def _create_oauth_token(self, user):
""" Creates and returns an OAuth token for the given user. """
dot_app_user = UserFactory.create(password=self.user_password)
dot_app = dot_models.Application.objects.create(
name='test app',
user=dot_app_user,
client_type='confidential',
authorization_grant_type='authorization-code',
redirect_uris='http://localhost:8079/complete/edxorg/'
)
return dot_models.AccessToken.objects.create(
user=user,
application=dot_app,
expires=datetime.utcnow() + timedelta(weeks=1),
scope='read write',
token='test_token',
)
def _create_jwt_token(self, user, auth_type, scopes=None, include_org_filter=True, include_me_filter=False):
""" Creates and returns a JWT token for the given user with the given parameters. """
filters = []
if include_org_filter:
filters += ['content_org:{}'.format(self.course.id.org)]
if include_me_filter:
filters += ['user:me']
if scopes is None:
scopes = self.default_scopes
return _create_jwt(
user,
scopes=scopes,
is_restricted=(auth_type == AuthType.jwt_restricted),
filters=filters,
)
def _assert_in_log(self, text, mock_log_method):
self.assertTrue(mock_log_method.called)
self.assertIn(text, mock_log_method.call_args_list[0][0][0])
def test_anonymous_user(self):
resp = self.client.get(self.get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_self_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assert_success_response_for_student(resp)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_staff_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type, requesting_user=self.global_staff)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assert_success_response_for_student(resp)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_inactive_user(self, auth_type, scopes_enforced):
self.student.is_active = False
self.student.save()
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_another_user(self, auth_type, scopes_enforced, mock_log):
"""
Returns 403 for OAuth, Session, and JWT auth with IsUserInUrl.
Returns 200 for jwt_restricted and user:me filter unset.
"""
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type, requesting_user=self.other_student)
# Restricted JWT tokens without the user:me filter have access to other users
expected_jwt_access_granted = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(
resp.status_code,
status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN,
)
if not expected_jwt_access_granted:
self._assert_in_log("IsUserInUrl", mock_log.info)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_no_scopes(self, auth_type, scopes_enforced, mock_log):
""" Returns 403 when scopes are enforced with JwtHasScope. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[])
resp = self.get_response(AuthType.jwt, token=jwt_token)
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
if is_enforced:
self._assert_in_log("JwtHasScope", mock_log.warning)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log):
""" Returns 403 when scopes are enforced with JwtHasContentOrgFilterForRequestedCourse. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False)
resp = self.get_response(AuthType.jwt, token=jwt_token)
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
if is_enforced:
self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning)
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True)
resp = self.get_response(AuthType.jwt, token=jwt_token)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log):
""" Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.other_student, auth_type, include_me_filter=True)
resp = self.get_response(AuthType.jwt, token=jwt_token)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
if scopes_enforced and auth_type == AuthType.jwt_restricted:
self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning)
else:
self._assert_in_log("IsUserInUrl", mock_log.info)
def test_valid_oauth_token(self):
resp = self.get_response(AuthType.oauth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_invalid_oauth_token(self):
resp = self.get_response(AuthType.oauth, token="fooooooooooToken")
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
def test_expired_oauth_token(self):
token = self._create_oauth_token(self.student)
token.expires = datetime.utcnow() - timedelta(weeks=1)
token.save()
resp = self.get_response(AuthType.oauth, token=token)
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)