diff --git a/lms/djangoapps/certificates/apis/v0/tests/test_views.py b/lms/djangoapps/certificates/apis/v0/tests/test_views.py index b474c6143f..be35679485 100644 --- a/lms/djangoapps/certificates/apis/v0/tests/test_views.py +++ b/lms/djangoapps/certificates/apis/v0/tests/test_views.py @@ -2,16 +2,12 @@ Tests for the Certificate REST APIs. """ # pylint: disable=missing-docstring -from datetime import datetime, timedelta -from enum import Enum from itertools import product import ddt -from mock import patch from django.urls import reverse from django.utils import timezone from freezegun import freeze_time -from oauth2_provider import models as dot_models from rest_framework import status from rest_framework.test import APITestCase @@ -19,32 +15,21 @@ from course_modes.models import CourseMode from lms.djangoapps.certificates.apis.v0.views import CertificatesDetailView from lms.djangoapps.certificates.models import CertificateStatuses from lms.djangoapps.certificates.tests.factories import GeneratedCertificateFactory -from openedx.core.djangoapps.oauth_dispatch.jwt import _create_jwt from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES +from openedx.core.djangoapps.user_authn.tests.utils import AuthType, AuthAndScopesTestMixin from student.tests.factories import UserFactory from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory -USER_PASSWORD = 'test' - - -class AuthType(Enum): - session = 1 - oauth = 2 - jwt = 3 - jwt_restricted = 4 - - -JWT_AUTH_TYPES = [AuthType.jwt, AuthType.jwt_restricted] - @ddt.ddt -class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): +class CertificatesRestApiTest(AuthAndScopesTestMixin, SharedModuleStoreTestCase, APITestCase): """ Test for the Certificates REST APIs """ shard = 4 now = timezone.now() + default_scopes = CertificatesDetailView.required_scopes @classmethod def setUpClass(cls): @@ -62,10 +47,6 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): super(CertificatesRestApiTest, self).setUp() - self.student = UserFactory.create(password=USER_PASSWORD) - self.student_no_cert = UserFactory.create(password=USER_PASSWORD) - self.staff_user = UserFactory.create(password=USER_PASSWORD, is_staff=True) - GeneratedCertificateFactory.create( user=self.student, course_id=self.course.id, @@ -77,7 +58,18 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): self.namespaced_url = 'certificates_api:v0:certificates:detail' - def _assert_certificate_response(self, response): + def get_url(self, username): + """ This method is required by AuthAndScopesTestMixin. """ + return reverse( + self.namespaced_url, + kwargs={ + 'course_id': self.course.id, + 'username': username + } + ) + + def assert_success_response_for_student(self, response): + """ This method is required by AuthAndScopesTestMixin. """ self.assertEqual( response.data, { @@ -92,86 +84,12 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): } ) - def _get_url(self, username): - """ - Helper function to create the url for certificates - """ - return reverse( - self.namespaced_url, - kwargs={ - 'course_id': self.course.id, - 'username': username - } - ) - - def _create_oauth_token(self, user): - dot_app_user = UserFactory.create(password=USER_PASSWORD) - dot_app = dot_models.Application.objects.create( - name='test app', - user=dot_app_user, - client_type='confidential', - authorization_grant_type='authorization-code', - redirect_uris='http://localhost:8079/complete/edxorg/' - ) - return dot_models.AccessToken.objects.create( - user=user, - application=dot_app, - expires=datetime.utcnow() + timedelta(weeks=1), - scope='read write', - token='test_token', - ) - - def _create_jwt_token(self, user, auth_type, scopes=None, include_org_filter=True, include_me_filter=False): - filters = [] - if include_org_filter: - filters += ['content_org:{}'.format(self.course.id.org)] - if include_me_filter: - filters += ['user:me'] - - if scopes is None: - scopes = CertificatesDetailView.required_scopes - - return _create_jwt( - user, - scopes=scopes, - is_restricted=(auth_type == AuthType.jwt_restricted), - filters=filters, - ) - - def _get_response(self, requesting_user, auth_type, url=None, token=None): - auth_header = None - if auth_type == AuthType.session: - self.client.login(username=requesting_user.username, password=USER_PASSWORD) - elif auth_type == AuthType.oauth: - if not token: - token = self._create_oauth_token(requesting_user) - auth_header = "Bearer {0}".format(token) - else: - assert auth_type in JWT_AUTH_TYPES - if not token: - token = self._create_jwt_token(requesting_user, auth_type) - auth_header = "JWT {0}".format(token) - - extra = dict(HTTP_AUTHORIZATION=auth_header) if auth_header else {} - return self.client.get( - url if url else self._get_url(self.student.username), - **extra - ) - - def _assert_in_log(self, text, mock_log_method): - self.assertTrue(mock_log_method.called) - self.assertIn(text, mock_log_method.call_args_list[0][0][0]) - - def test_anonymous_user(self): - resp = self.client.get(self._get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) - - @ddt.data(*list(AuthType)) - def test_no_certificate(self, auth_type): - resp = self._get_response( - self.student_no_cert, - auth_type, - url=self._get_url(self.student_no_cert.username), + def test_no_certificate(self): + student_no_cert = UserFactory.create(password=self.user_password) + resp = self.get_response( + AuthType.session, + requesting_user=student_no_cert, + requested_user=student_no_cert, ) self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('error_code', resp.data) @@ -179,116 +97,3 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): resp.data['error_code'], 'no_certificate_for_user', ) - - @ddt.data(*product(list(AuthType), (True, False))) - @ddt.unpack - def test_self_user(self, auth_type, scopes_enforced): - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - resp = self._get_response(self.student, auth_type) - self.assertEqual(resp.status_code, status.HTTP_200_OK) - self._assert_certificate_response(resp) - - @ddt.data(*product(list(AuthType), (True, False))) - @ddt.unpack - def test_inactive_user(self, auth_type, scopes_enforced): - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - self.student.is_active = False - self.student.save() - - resp = self._get_response(self.student, auth_type) - self.assertEqual(resp.status_code, status.HTTP_200_OK) - - @ddt.data(*product(list(AuthType), (True, False))) - @ddt.unpack - def test_staff_user(self, auth_type, scopes_enforced): - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - resp = self._get_response(self.staff_user, auth_type) - self.assertEqual(resp.status_code, status.HTTP_200_OK) - - @patch('edx_rest_framework_extensions.permissions.log') - @ddt.data(*product(list(AuthType), (True, False))) - @ddt.unpack - def test_another_user(self, auth_type, scopes_enforced, mock_log): - """ Returns 403 for OAuth and Session auth with IsUserInUrl. """ - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - resp = self._get_response(self.student_no_cert, auth_type) - - # Restricted JWT tokens without the user:me filter have access to other users - expected_jwt_access_granted = scopes_enforced and auth_type == AuthType.jwt_restricted - - self.assertEqual( - resp.status_code, - status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN, - ) - if not expected_jwt_access_granted: - self._assert_in_log("IsUserInUrl", mock_log.info) - - @patch('edx_rest_framework_extensions.permissions.log') - @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) - @ddt.unpack - def test_jwt_no_scopes(self, auth_type, scopes_enforced, mock_log): - """ Returns 403 when scopes are enforced with JwtHasScope. """ - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[]) - resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) - - is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted - self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK) - - if is_enforced: - self._assert_in_log("JwtHasScope", mock_log.warning) - - @patch('edx_rest_framework_extensions.permissions.log') - @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) - @ddt.unpack - def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log): - """ Returns 403 when scopes are enforced with JwtHasContentOrgFilterForRequestedCourse. """ - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False) - resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) - - is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted - self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK) - - if is_enforced: - self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning) - - @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) - @ddt.unpack - def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced): - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True) - - resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) - self.assertEqual(resp.status_code, status.HTTP_200_OK) - - @patch('edx_rest_framework_extensions.permissions.log') - @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) - @ddt.unpack - def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log): - """ Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """ - with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): - jwt_token = self._create_jwt_token(self.student_no_cert, auth_type, include_me_filter=True) - resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) - - self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) - - if scopes_enforced and auth_type == AuthType.jwt_restricted: - self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning) - else: - self._assert_in_log("IsUserInUrl", mock_log.info) - - def test_valid_oauth_token(self): - resp = self._get_response(self.student, AuthType.oauth) - self.assertEqual(resp.status_code, status.HTTP_200_OK) - - def test_invalid_oauth_token(self): - resp = self._get_response(self.student, AuthType.oauth, token="fooooooooooToken") - self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) - - def test_expired_oauth_token(self): - token = self._create_oauth_token(self.student) - token.expires = datetime.utcnow() - timedelta(weeks=1) - token.save() - resp = self._get_response(self.student, AuthType.oauth, token=token) - self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) diff --git a/lms/djangoapps/grades/api/v1/tests/test_views.py b/lms/djangoapps/grades/api/v1/tests/test_views.py index 1812f90197..23ec64b79e 100644 --- a/lms/djangoapps/grades/api/v1/tests/test_views.py +++ b/lms/djangoapps/grades/api/v1/tests/test_views.py @@ -15,11 +15,13 @@ from rest_framework.test import APITestCase from six import text_type from lms.djangoapps.courseware.tests.factories import GlobalStaffFactory, StaffFactory +from lms.djangoapps.grades.api.v1.views import CourseGradesView from lms.djangoapps.grades.config.waffle import waffle_flags, WRITABLE_GRADEBOOK from lms.djangoapps.grades.course_data import CourseData from lms.djangoapps.grades.course_grade import CourseGrade from lms.djangoapps.grades.subsection_grade import ReadSubsectionGrade from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory +from openedx.core.djangoapps.user_authn.tests.utils import AuthAndScopesTestMixin from openedx.core.djangoapps.waffle_utils.testutils import override_waffle_flag from student.tests.factories import CourseEnrollmentFactory, UserFactory from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory @@ -79,28 +81,23 @@ class GradeViewTestMixin(SharedModuleStoreTestCase): ) cls.course_key = cls.course.id - cls.password = 'test' - cls.student = UserFactory(username='dummy', password=cls.password) - cls.other_student = UserFactory(username='foo', password=cls.password) - cls.other_user = UserFactory(username='bar', password=cls.password) - cls.staff = StaffFactory(course_key=cls.course_key, password=cls.password) - cls.global_staff = GlobalStaffFactory.create() - cls._create_user_enrollments(cls.course, cls.student, cls.other_student) - - def setUp(self): - super(GradeViewTestMixin, self).setUp() - self.client.login(username=self.student.username, password=self.password) - - @classmethod - def _create_user_enrollments(cls, course, *users): + def _create_user_enrollments(self, *users): date = datetime(2013, 1, 22, tzinfo=UTC) for user in users: CourseEnrollmentFactory( - course_id=course.id, + course_id=self.course.id, user=user, created=date, ) + def setUp(self): + super(GradeViewTestMixin, self).setUp() + self.password = 'test' + self.global_staff = GlobalStaffFactory.create() + self.student = UserFactory(password=self.password) + self.other_student = UserFactory(password=self.password) + self._create_user_enrollments(self.student, self.other_student) + @classmethod def _create_test_course_with_default_grading_policy(cls, display_name, run): """ @@ -141,12 +138,13 @@ class GradeViewTestMixin(SharedModuleStoreTestCase): @ddt.ddt -class SingleUserGradesTests(GradeViewTestMixin, APITestCase): +class SingleUserGradesTests(GradeViewTestMixin, AuthAndScopesTestMixin, APITestCase): """ Tests for grades related to a course and specific user e.g. /api/grades/v1/courses/{course_id}/?username={username} /api/grades/v1/courses/?course_id={course_id}&username={username} """ + default_scopes = CourseGradesView.required_scopes @classmethod def setUpClass(cls): @@ -154,9 +152,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase): cls.namespaced_url = 'grades_api:v1:course_grades' def get_url(self, username): - """ - Helper function to create the url - """ + """ This method is required by AuthAndScopesTestMixin. """ base_url = reverse( self.namespaced_url, kwargs={ @@ -165,48 +161,35 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase): ) return "{0}?username={1}".format(base_url, username) - def test_anonymous(self): - """ - Test that an anonymous user cannot access the API and an error is received. - """ - self.client.logout() - resp = self.client.get(self.get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) - - def test_self_get_grade(self): - """ - Test that a user can successfully request her own grade. - """ - resp = self.client.get(self.get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_200_OK) + def assert_success_response_for_student(self, response): + """ This method is required by AuthAndScopesTestMixin. """ + expected_data = [{ + 'username': self.student.username, + 'email': self.student.email, + 'letter_grade': None, + 'percent': 0.0, + 'course_id': str(self.course_key), + 'passed': False + }] + self.assertEqual(response.data, expected_data) def test_nonexistent_user(self): """ Test that a request for a nonexistent username returns an error. """ - self.client.logout() self.client.login(username=self.global_staff.username, password=self.password) resp = self.client.get(self.get_url('IDoNotExist')) self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) - def test_other_get_grade(self): - """ - Test that if a user requests the grade for another user, she receives an error. - """ - self.client.logout() - self.client.login(username=self.other_student.username, password=self.password) - resp = self.client.get(self.get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) - def test_self_get_grade_not_enrolled(self): """ Test that a user receives an error if she requests her own grade in a course where she is not enrolled. """ # a user not enrolled in the course cannot request her grade - self.client.logout() - self.client.login(username=self.other_user.username, password=self.password) - resp = self.client.get(self.get_url(self.other_user.username)) + unenrolled_user = UserFactory(password=self.password) + self.client.login(username=unenrolled_user.username, password=self.password) + resp = self.client.get(self.get_url(unenrolled_user.username)) self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('error_code', resp.data) self.assertEqual( @@ -218,6 +201,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase): """ Test the grade for a user who has not answered any test. """ + self.client.login(username=self.student.username, password=self.password) resp = self.client.get(self.get_url(self.student.username)) self.assertEqual(resp.status_code, status.HTTP_200_OK) expected_data = [{ @@ -239,6 +223,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase): """Mocked function to always raise an exception""" raise InvalidKeyError('foo', 'bar') + self.client.login(username=self.student.username, password=self.password) with patch('opaque_keys.edx.keys.CourseKey.from_string', side_effect=mock_from_string): resp = self.client.get(self.get_url(self.student.username)) @@ -253,6 +238,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase): """ Test that requesting a valid, nonexistent course key returns an error as expected. """ + self.client.login(username=self.student.username, password=self.password) base_url = reverse( self.namespaced_url, kwargs={ @@ -276,6 +262,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase): """ Test that the user gets her grade in case she answered tests with an insufficient score. """ + self.client.login(username=self.student.username, password=self.password) with patch('lms.djangoapps.grades.course_grade_factory.CourseGradeFactory.read') as mock_grade: grade_fields = { 'letter_grade': grade['letter_grade'], @@ -296,24 +283,6 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase): expected_data.update(grade) self.assertEqual(resp.data, [expected_data]) - def test_staff_can_see_student(self): - """ - Ensure that staff members can see her student's grades. - """ - self.client.logout() - self.client.login(username=self.global_staff.username, password=self.password) - resp = self.client.get(self.get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_200_OK) - expected_data = [{ - 'username': self.student.username, - 'email': self.student.email, - 'letter_grade': None, - 'percent': 0.0, - 'course_id': str(self.course_key), - 'passed': False - }] - self.assertEqual(resp.data, expected_data) - @ddt.ddt class CourseGradesViewTest(GradeViewTestMixin, APITestCase): @@ -342,16 +311,15 @@ class CourseGradesViewTest(GradeViewTestMixin, APITestCase): return base_url def test_anonymous(self): - self.client.logout() resp = self.client.get(self.get_url()) self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) def test_student(self): + self.client.login(username=self.student.username, password=self.password) resp = self.client.get(self.get_url()) self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) def test_course_does_not_exist(self): - self.client.logout() self.client.login(username=self.global_staff.username, password=self.password) resp = self.client.get( self.get_url(course_key='course-v1:MITx+8.MechCX+2014_T1') @@ -359,7 +327,6 @@ class CourseGradesViewTest(GradeViewTestMixin, APITestCase): self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) def test_course_no_enrollments(self): - self.client.logout() self.client.login(username=self.global_staff.username, password=self.password) resp = self.client.get( self.get_url(course_key=self.empty_course.id) @@ -368,7 +335,6 @@ class CourseGradesViewTest(GradeViewTestMixin, APITestCase): self.assertEqual(resp.data, []) def test_staff_can_get_all_grades(self): - self.client.logout() self.client.login(username=self.global_staff.username, password=self.password) resp = self.client.get(self.get_url()) @@ -410,10 +376,6 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase): cls.course = CourseFactory.create(display_name='test-course', run='run-1') cls.course_overview = CourseOverviewFactory.create(id=cls.course.id) - # we re-assign cls.course from what's created in the parent class, so we have to - # re-create the enrollments, too. - cls._create_user_enrollments(cls.course, cls.student, cls.other_student) - cls.chapter_1 = ItemFactory.create( category='chapter', parent_location=cls.course.location, @@ -538,7 +500,6 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase): Helper function to login the global staff user, who has permissions to read from the Gradebook API. """ - self.client.logout() self.client.login(username=self.global_staff.username, password=self.password) def expected_subsection_grades(self, letter_grade=None): @@ -637,7 +598,6 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase): ] def test_feature_not_enabled(self): - self.client.logout() self.client.login(username=self.global_staff.username, password=self.password) with override_waffle_flag(self.waffle_flag, active=False): resp = self.client.get( @@ -646,12 +606,12 @@ class GradebookViewTest(GradeViewTestMixin, APITestCase): self.assertEqual(status.HTTP_403_FORBIDDEN, resp.status_code) def test_anonymous(self): - self.client.logout() with override_waffle_flag(self.waffle_flag, active=True): resp = self.client.get(self.get_url()) self.assertEqual(status.HTTP_401_UNAUTHORIZED, resp.status_code) def test_student(self): + self.client.login(username=self.student.username, password=self.password) with override_waffle_flag(self.waffle_flag, active=True): resp = self.client.get(self.get_url()) self.assertEqual(status.HTTP_403_FORBIDDEN, resp.status_code) diff --git a/openedx/core/djangoapps/user_authn/tests/utils.py b/openedx/core/djangoapps/user_authn/tests/utils.py index c0de8113bf..82318cce8f 100644 --- a/openedx/core/djangoapps/user_authn/tests/utils.py +++ b/openedx/core/djangoapps/user_authn/tests/utils.py @@ -1,9 +1,29 @@ """ Common utilities for tests in the user_authn app. """ +from datetime import datetime, timedelta +from enum import Enum +from itertools import product + +import ddt +from mock import patch from django.conf import settings +from oauth2_provider import models as dot_models +from rest_framework import status + from openedx.core.djangoapps.oauth_dispatch.adapters.dot import DOTAdapter +from openedx.core.djangoapps.oauth_dispatch.jwt import _create_jwt +from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES from student.tests.factories import UserFactory +class AuthType(Enum): + session = 1 + oauth = 2 + jwt = 3 + jwt_restricted = 4 + +JWT_AUTH_TYPES = [AuthType.jwt, AuthType.jwt_restricted] + + def setup_login_oauth_client(): """ Sets up a test OAuth client for the login service. @@ -15,3 +35,216 @@ def setup_login_oauth_client(): redirect_uri='', client_id=settings.JWT_AUTH['JWT_LOGIN_CLIENT_ID'], ) + + +@ddt.ddt +class AuthAndScopesTestMixin(object): + """ + Mixin class to test authentication and oauth scopes for an API. + Test classes that use this Mixin need to define: + default_scopes - default list of scopes to include in created JWTs. + get_url(self, username) - method that returns the URL to call given + a username. + assert_success_response_for_student(resp) - method that verifies the + data returned in a successful response when accessing the URL for + self.student. + """ + default_scopes = None + user_password = 'test' + + def setUp(self): + super(AuthAndScopesTestMixin, self).setUp() + self.student = UserFactory.create(password=self.user_password) + self.other_student = UserFactory.create(password=self.user_password) + self.global_staff = UserFactory.create(password=self.user_password, is_staff=True) + + def get_response(self, auth_type, requesting_user=None, requested_user=None, url=None, token=None): + """ + Calls the url using the given auth_type. + Arguments: + - requesting_user is the user that is making the call to the url. Defaults to self.student. + - requested_user is user that is passed to the url. Defaults to self.student. + - url defaults to the response from calling self.get_url with requested_user.username. + - token defaults to the default creation of the token given the value of auth_type. + """ + requesting_user = requesting_user or self.student + requested_user = requested_user or self.student + + auth_header = None + if auth_type == AuthType.session: + self.client.login(username=requesting_user.username, password=self.user_password) + elif auth_type == AuthType.oauth: + if not token: + token = self._create_oauth_token(requesting_user) + auth_header = "Bearer {0}".format(token) + else: + assert auth_type in JWT_AUTH_TYPES + if not token: + token = self._create_jwt_token(requesting_user, auth_type) + auth_header = "JWT {0}".format(token) + + extra = dict(HTTP_AUTHORIZATION=auth_header) if auth_header else {} + return self.client.get( + url if url else self.get_url(requested_user.username), + **extra + ) + + def _create_oauth_token(self, user): + """ Creates and returns an OAuth token for the given user. """ + dot_app_user = UserFactory.create(password=self.user_password) + dot_app = dot_models.Application.objects.create( + name='test app', + user=dot_app_user, + client_type='confidential', + authorization_grant_type='authorization-code', + redirect_uris='http://localhost:8079/complete/edxorg/' + ) + return dot_models.AccessToken.objects.create( + user=user, + application=dot_app, + expires=datetime.utcnow() + timedelta(weeks=1), + scope='read write', + token='test_token', + ) + + def _create_jwt_token(self, user, auth_type, scopes=None, include_org_filter=True, include_me_filter=False): + """ Creates and returns a JWT token for the given user with the given parameters. """ + filters = [] + if include_org_filter: + filters += ['content_org:{}'.format(self.course.id.org)] + if include_me_filter: + filters += ['user:me'] + + if scopes is None: + scopes = self.default_scopes + + return _create_jwt( + user, + scopes=scopes, + is_restricted=(auth_type == AuthType.jwt_restricted), + filters=filters, + ) + + def _assert_in_log(self, text, mock_log_method): + self.assertTrue(mock_log_method.called) + self.assertIn(text, mock_log_method.call_args_list[0][0][0]) + + def test_anonymous_user(self): + resp = self.client.get(self.get_url(self.student.username)) + self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) + + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_self_user(self, auth_type, scopes_enforced): + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + resp = self.get_response(auth_type) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + self.assert_success_response_for_student(resp) + + @ddt.data(*product(list(AuthType), (True, False))) + @ddt.unpack + def test_staff_user(self, auth_type, scopes_enforced): + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + resp = self.get_response(auth_type, requesting_user=self.global_staff) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + self.assert_success_response_for_student(resp) + + @ddt.data(*product(list(AuthType), (True, False))) + @ddt.unpack + def test_inactive_user(self, auth_type, scopes_enforced): + self.student.is_active = False + self.student.save() + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + resp = self.get_response(auth_type) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(list(AuthType), (True, False))) + @ddt.unpack + def test_another_user(self, auth_type, scopes_enforced, mock_log): + """ + Returns 403 for OAuth, Session, and JWT auth with IsUserInUrl. + Returns 200 for jwt_restricted and user:me filter unset. + """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + resp = self.get_response(auth_type, requesting_user=self.other_student) + + # Restricted JWT tokens without the user:me filter have access to other users + expected_jwt_access_granted = scopes_enforced and auth_type == AuthType.jwt_restricted + + self.assertEqual( + resp.status_code, + status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN, + ) + if not expected_jwt_access_granted: + self._assert_in_log("IsUserInUrl", mock_log.info) + + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_no_scopes(self, auth_type, scopes_enforced, mock_log): + """ Returns 403 when scopes are enforced with JwtHasScope. """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[]) + resp = self.get_response(AuthType.jwt, token=jwt_token) + + is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK) + + if is_enforced: + self._assert_in_log("JwtHasScope", mock_log.warning) + + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log): + """ Returns 403 when scopes are enforced with JwtHasContentOrgFilterForRequestedCourse. """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False) + resp = self.get_response(AuthType.jwt, token=jwt_token) + + is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK) + + if is_enforced: + self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning) + + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced): + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True) + + resp = self.get_response(AuthType.jwt, token=jwt_token) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log): + """ Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.other_student, auth_type, include_me_filter=True) + resp = self.get_response(AuthType.jwt, token=jwt_token) + + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) + + if scopes_enforced and auth_type == AuthType.jwt_restricted: + self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning) + else: + self._assert_in_log("IsUserInUrl", mock_log.info) + + def test_valid_oauth_token(self): + resp = self.get_response(AuthType.oauth) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + + def test_invalid_oauth_token(self): + resp = self.get_response(AuthType.oauth, token="fooooooooooToken") + self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_expired_oauth_token(self): + token = self._create_oauth_token(self.student) + token.expires = datetime.utcnow() - timedelta(weeks=1) + token.save() + resp = self.get_response(AuthType.oauth, token=token) + self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)