Merge pull request #18458 from edx/arch/oauth-scopes-apis
Enable Grades and Certificates APIs to enforce OAuth Scopes
This commit is contained in:
@@ -1,7 +1,12 @@
|
||||
"""
|
||||
Tests for the Certificate REST APIs.
|
||||
"""
|
||||
# pylint: disable=missing-docstring
|
||||
from datetime import datetime, timedelta
|
||||
from enum import Enum
|
||||
from itertools import product
|
||||
import ddt
|
||||
from mock import patch
|
||||
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
@@ -10,9 +15,12 @@ from oauth2_provider import models as dot_models
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from course_modes.models import CourseMode
|
||||
from lms.djangoapps.certificates.apis.v0.views import CertificatesDetailView
|
||||
from lms.djangoapps.certificates.models import CertificateStatuses
|
||||
from lms.djangoapps.certificates.tests.factories import GeneratedCertificateFactory
|
||||
from course_modes.models import CourseMode
|
||||
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
|
||||
from openedx.core.lib.token_utils import JwtBuilder
|
||||
from student.tests.factories import UserFactory
|
||||
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
@@ -20,6 +28,17 @@ from xmodule.modulestore.tests.factories import CourseFactory
|
||||
USER_PASSWORD = 'test'
|
||||
|
||||
|
||||
class AuthType(Enum):
|
||||
session = 1
|
||||
oauth = 2
|
||||
jwt = 3
|
||||
jwt_restricted = 4
|
||||
|
||||
|
||||
JWT_AUTH_TYPES = [AuthType.jwt, AuthType.jwt_restricted]
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
|
||||
"""
|
||||
Test for the Certificates REST APIs
|
||||
@@ -58,129 +77,9 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
|
||||
|
||||
self.namespaced_url = 'certificates_api:v0:certificates:detail'
|
||||
|
||||
# create a configuration for django-oauth-toolkit (DOT)
|
||||
dot_app_user = UserFactory.create(password=USER_PASSWORD)
|
||||
dot_app = dot_models.Application.objects.create(
|
||||
name='test app',
|
||||
user=dot_app_user,
|
||||
client_type='confidential',
|
||||
authorization_grant_type='authorization-code',
|
||||
redirect_uris='http://localhost:8079/complete/edxorg/'
|
||||
)
|
||||
self.dot_access_token = dot_models.AccessToken.objects.create(
|
||||
user=self.student,
|
||||
application=dot_app,
|
||||
expires=datetime.utcnow() + timedelta(weeks=1),
|
||||
scope='read write',
|
||||
token='16MGyP3OaQYHmpT1lK7Q6MMNAZsjwF'
|
||||
)
|
||||
|
||||
def get_url(self, username):
|
||||
"""
|
||||
Helper function to create the url for certificates
|
||||
"""
|
||||
return reverse(
|
||||
self.namespaced_url,
|
||||
kwargs={
|
||||
'course_id': self.course.id,
|
||||
'username': username
|
||||
}
|
||||
)
|
||||
|
||||
def assert_oauth_status(self, access_token, expected_status):
|
||||
"""
|
||||
Helper method for requests with OAUTH token
|
||||
"""
|
||||
self.client.logout()
|
||||
auth_header = "Bearer {0}".format(access_token)
|
||||
response = self.client.get(self.get_url(self.student.username), HTTP_AUTHORIZATION=auth_header)
|
||||
self.assertEqual(response.status_code, expected_status)
|
||||
|
||||
def test_permissions(self):
|
||||
"""
|
||||
Test that only the owner of the certificate can access the url
|
||||
"""
|
||||
# anonymous user
|
||||
resp = self.client.get(self.get_url(self.student.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
# another student
|
||||
self.client.login(username=self.student_no_cert.username, password=USER_PASSWORD)
|
||||
resp = self.client.get(self.get_url(self.student.username))
|
||||
# gets 404 instead of 403 for security reasons
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
self.assertEqual(resp.data, {u'detail': u'Not found.'})
|
||||
self.client.logout()
|
||||
|
||||
# same student of the certificate
|
||||
self.client.login(username=self.student.username, password=USER_PASSWORD)
|
||||
resp = self.client.get(self.get_url(self.student.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
self.client.logout()
|
||||
|
||||
# staff user
|
||||
self.client.login(username=self.staff_user.username, password=USER_PASSWORD)
|
||||
resp = self.client.get(self.get_url(self.student.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_inactive_user_access(self):
|
||||
"""
|
||||
Verify inactive users - those who have not verified their email addresses -
|
||||
are allowed to access the endpoint.
|
||||
"""
|
||||
self.client.login(username=self.student.username, password=USER_PASSWORD)
|
||||
|
||||
self.student.is_active = False
|
||||
self.student.save()
|
||||
|
||||
resp = self.client.get(self.get_url(self.student.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_dot_valid_accesstoken(self):
|
||||
"""
|
||||
Verify access with a valid Django Oauth Toolkit access token.
|
||||
"""
|
||||
self.assert_oauth_status(self.dot_access_token, status.HTTP_200_OK)
|
||||
|
||||
def test_dot_invalid_accesstoken(self):
|
||||
"""
|
||||
Verify the endpoint is inaccessible for authorization
|
||||
attempts made with an invalid OAuth access token.
|
||||
"""
|
||||
self.assert_oauth_status("fooooooooooToken", status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
def test_dot_expired_accesstoken(self):
|
||||
"""
|
||||
Verify the endpoint is inaccessible for authorization
|
||||
attempts made with an expired OAuth access token.
|
||||
"""
|
||||
# set the expiration date in the past
|
||||
self.dot_access_token.expires = datetime.utcnow() - timedelta(weeks=1)
|
||||
self.dot_access_token.save()
|
||||
self.assert_oauth_status(self.dot_access_token, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
def test_no_certificate_for_user(self):
|
||||
"""
|
||||
Test for case with no certificate available
|
||||
"""
|
||||
self.client.login(username=self.student_no_cert.username, password=USER_PASSWORD)
|
||||
resp = self.client.get(self.get_url(self.student_no_cert.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
self.assertIn('error_code', resp.data)
|
||||
def _assert_certificate_response(self, response):
|
||||
self.assertEqual(
|
||||
resp.data['error_code'],
|
||||
'no_certificate_for_user'
|
||||
)
|
||||
|
||||
def test_certificate_for_user(self):
|
||||
"""
|
||||
Tests case user that pulls her own certificate
|
||||
"""
|
||||
self.client.login(username=self.student.username, password=USER_PASSWORD)
|
||||
resp = self.client.get(self.get_url(self.student.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(
|
||||
resp.data,
|
||||
response.data,
|
||||
{
|
||||
'username': self.student.username,
|
||||
'status': CertificateStatuses.downloadable,
|
||||
@@ -192,3 +91,205 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase):
|
||||
'created_date': self.now,
|
||||
}
|
||||
)
|
||||
|
||||
def _get_url(self, username):
|
||||
"""
|
||||
Helper function to create the url for certificates
|
||||
"""
|
||||
return reverse(
|
||||
self.namespaced_url,
|
||||
kwargs={
|
||||
'course_id': self.course.id,
|
||||
'username': username
|
||||
}
|
||||
)
|
||||
|
||||
def _create_oauth_token(self, user):
|
||||
dot_app_user = UserFactory.create(password=USER_PASSWORD)
|
||||
dot_app = dot_models.Application.objects.create(
|
||||
name='test app',
|
||||
user=dot_app_user,
|
||||
client_type='confidential',
|
||||
authorization_grant_type='authorization-code',
|
||||
redirect_uris='http://localhost:8079/complete/edxorg/'
|
||||
)
|
||||
return dot_models.AccessToken.objects.create(
|
||||
user=user,
|
||||
application=dot_app,
|
||||
expires=datetime.utcnow() + timedelta(weeks=1),
|
||||
scope='read write',
|
||||
token='test_token',
|
||||
)
|
||||
|
||||
def _create_jwt_token(self, user, auth_type, scopes=None, include_org_filter=True, include_me_filter=False):
|
||||
filters = []
|
||||
if include_org_filter:
|
||||
filters += ['content_org:{}'.format(self.course.id.org)]
|
||||
if include_me_filter:
|
||||
filters += ['user:me']
|
||||
|
||||
if scopes is None:
|
||||
scopes = CertificatesDetailView.required_scopes
|
||||
|
||||
return JwtBuilder(user).build_token(
|
||||
scopes,
|
||||
additional_claims=dict(
|
||||
is_restricted=(auth_type == AuthType.jwt_restricted),
|
||||
filters=filters,
|
||||
),
|
||||
)
|
||||
|
||||
def _get_response(self, requesting_user, auth_type, url=None, token=None):
|
||||
auth_header = None
|
||||
if auth_type == AuthType.session:
|
||||
self.client.login(username=requesting_user.username, password=USER_PASSWORD)
|
||||
elif auth_type == AuthType.oauth:
|
||||
if not token:
|
||||
token = self._create_oauth_token(requesting_user)
|
||||
auth_header = "Bearer {0}".format(token)
|
||||
else:
|
||||
assert auth_type in JWT_AUTH_TYPES
|
||||
if not token:
|
||||
token = self._create_jwt_token(requesting_user, auth_type)
|
||||
auth_header = "JWT {0}".format(token)
|
||||
|
||||
extra = dict(HTTP_AUTHORIZATION=auth_header) if auth_header else {}
|
||||
return self.client.get(
|
||||
url if url else self._get_url(self.student.username),
|
||||
**extra
|
||||
)
|
||||
|
||||
def _assert_in_log(self, text, mock_log_method):
|
||||
self.assertTrue(mock_log_method.called)
|
||||
self.assertIn(text, mock_log_method.call_args_list[0][0][0])
|
||||
|
||||
def test_anonymous_user(self):
|
||||
resp = self.client.get(self._get_url(self.student.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
@ddt.data(*list(AuthType))
|
||||
def test_no_certificate(self, auth_type):
|
||||
resp = self._get_response(
|
||||
self.student_no_cert,
|
||||
auth_type,
|
||||
url=self._get_url(self.student_no_cert.username),
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
self.assertIn('error_code', resp.data)
|
||||
self.assertEqual(
|
||||
resp.data['error_code'],
|
||||
'no_certificate_for_user',
|
||||
)
|
||||
|
||||
@ddt.data(*product(list(AuthType), (True, False)))
|
||||
@ddt.unpack
|
||||
def test_self_user(self, auth_type, scopes_enforced):
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
resp = self._get_response(self.student, auth_type)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
self._assert_certificate_response(resp)
|
||||
|
||||
@ddt.data(*product(list(AuthType), (True, False)))
|
||||
@ddt.unpack
|
||||
def test_inactive_user(self, auth_type, scopes_enforced):
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
self.student.is_active = False
|
||||
self.student.save()
|
||||
|
||||
resp = self._get_response(self.student, auth_type)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
@ddt.data(*product(list(AuthType), (True, False)))
|
||||
@ddt.unpack
|
||||
def test_staff_user(self, auth_type, scopes_enforced):
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
resp = self._get_response(self.staff_user, auth_type)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
@patch('edx_rest_framework_extensions.permissions.log')
|
||||
@ddt.data(*product(list(AuthType), (True, False)))
|
||||
@ddt.unpack
|
||||
def test_another_user(self, auth_type, scopes_enforced, mock_log):
|
||||
""" Returns 403 for OAuth and Session auth with IsUserInUrl. """
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
resp = self._get_response(self.student_no_cert, auth_type)
|
||||
|
||||
# Restricted JWT tokens without the user:me filter have access to other users
|
||||
expected_jwt_access_granted = scopes_enforced and auth_type == AuthType.jwt_restricted
|
||||
|
||||
self.assertEqual(
|
||||
resp.status_code,
|
||||
status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN,
|
||||
)
|
||||
if not expected_jwt_access_granted:
|
||||
self._assert_in_log("IsUserInUrl", mock_log.info)
|
||||
|
||||
@patch('edx_rest_framework_extensions.permissions.log')
|
||||
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
|
||||
@ddt.unpack
|
||||
def test_jwt_no_scopes(self, auth_type, scopes_enforced, mock_log):
|
||||
""" Returns 403 when scopes are enforced with JwtHasScope. """
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[])
|
||||
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
|
||||
|
||||
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
|
||||
|
||||
if is_enforced:
|
||||
self._assert_in_log("JwtHasScope", mock_log.warning)
|
||||
|
||||
@patch('edx_rest_framework_extensions.permissions.log')
|
||||
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
|
||||
@ddt.unpack
|
||||
def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log):
|
||||
""" Returns 403 when scopes are enforced with JwtHasContentOrgFilterForRequestedCourse. """
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False)
|
||||
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
|
||||
|
||||
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
|
||||
|
||||
if is_enforced:
|
||||
self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning)
|
||||
|
||||
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
|
||||
@ddt.unpack
|
||||
def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced):
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True)
|
||||
|
||||
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
@patch('edx_rest_framework_extensions.permissions.log')
|
||||
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
|
||||
@ddt.unpack
|
||||
def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log):
|
||||
""" Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """
|
||||
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
|
||||
jwt_token = self._create_jwt_token(self.student_no_cert, auth_type, include_me_filter=True)
|
||||
resp = self._get_response(self.student, AuthType.jwt, token=jwt_token)
|
||||
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
if scopes_enforced and auth_type == AuthType.jwt_restricted:
|
||||
self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning)
|
||||
else:
|
||||
self._assert_in_log("IsUserInUrl", mock_log.info)
|
||||
|
||||
def test_valid_oauth_token(self):
|
||||
resp = self._get_response(self.student, AuthType.oauth)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_invalid_oauth_token(self):
|
||||
resp = self._get_response(self.student, AuthType.oauth, token="fooooooooooToken")
|
||||
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
def test_expired_oauth_token(self):
|
||||
token = self._create_oauth_token(self.student)
|
||||
token.expires = datetime.utcnow() - timedelta(weeks=1)
|
||||
token.save()
|
||||
resp = self._get_response(self.student, AuthType.oauth, token=token)
|
||||
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
""" API v0 views. """
|
||||
import logging
|
||||
|
||||
from edx_rest_framework_extensions.authentication import JwtAuthentication
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from rest_framework.generics import GenericAPIView
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from rest_framework.response import Response
|
||||
|
||||
from edx_rest_framework_extensions import permissions
|
||||
from edx_rest_framework_extensions.authentication import JwtAuthentication
|
||||
from lms.djangoapps.certificates.api import get_certificate_for_user
|
||||
from openedx.core.lib.api import authentication, permissions
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from openedx.core.lib.api.authentication import (
|
||||
OAuth2AuthenticationAllowInactiveUser,
|
||||
SessionAuthenticationAllowInactiveUser
|
||||
)
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -70,14 +74,14 @@ class CertificatesDetailView(GenericAPIView):
|
||||
"""
|
||||
|
||||
authentication_classes = (
|
||||
authentication.OAuth2AuthenticationAllowInactiveUser,
|
||||
authentication.SessionAuthenticationAllowInactiveUser,
|
||||
JwtAuthentication,
|
||||
OAuth2AuthenticationAllowInactiveUser,
|
||||
SessionAuthenticationAllowInactiveUser,
|
||||
)
|
||||
permission_classes = (
|
||||
IsAuthenticated,
|
||||
permissions.IsUserInUrlOrStaff
|
||||
)
|
||||
|
||||
permission_classes = (permissions.JWT_RESTRICTED_APPLICATION_OR_USER_ACCESS,)
|
||||
|
||||
required_scopes = ['certificates:read']
|
||||
|
||||
def get(self, request, username, course_id):
|
||||
"""
|
||||
|
||||
@@ -185,7 +185,7 @@ class SingleUserGradesTests(GradeViewTestMixin, APITestCase):
|
||||
self.client.logout()
|
||||
self.client.login(username=self.other_student.username, password=self.password)
|
||||
resp = self.client.get(self.get_url(self.student.username))
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_self_get_grade_not_enrolled(self):
|
||||
"""
|
||||
@@ -337,7 +337,7 @@ class CourseGradesViewTest(GradeViewTestMixin, APITestCase):
|
||||
|
||||
def test_student(self):
|
||||
resp = self.client.get(self.get_url())
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_course_does_not_exist(self):
|
||||
self.client.logout()
|
||||
|
||||
@@ -2,25 +2,30 @@
|
||||
import logging
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from rest_framework import status
|
||||
from rest_framework.exceptions import AuthenticationFailed
|
||||
from rest_framework.generics import GenericAPIView
|
||||
from rest_framework.response import Response
|
||||
|
||||
from edx_rest_framework_extensions import permissions
|
||||
from edx_rest_framework_extensions.authentication import JwtAuthentication
|
||||
from enrollment import data as enrollment_data
|
||||
from student.models import CourseEnrollment
|
||||
from lms.djangoapps.grades.course_grade_factory import CourseGradeFactory
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
|
||||
from openedx.core.lib.api.permissions import IsUserInUrlOrStaff
|
||||
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin, view_auth_classes
|
||||
from openedx.core.lib.api.authentication import (
|
||||
OAuth2AuthenticationAllowInactiveUser,
|
||||
SessionAuthenticationAllowInactiveUser
|
||||
)
|
||||
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin
|
||||
from student.models import CourseEnrollment
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
USER_MODEL = get_user_model()
|
||||
|
||||
|
||||
@view_auth_classes()
|
||||
class GradeViewMixin(DeveloperErrorViewMixin):
|
||||
"""
|
||||
Mixin class for Grades related views.
|
||||
@@ -147,7 +152,15 @@ class CourseGradesView(GradeViewMixin, GenericAPIView):
|
||||
"letter_grade": null,
|
||||
}]
|
||||
"""
|
||||
permission_classes = (IsUserInUrlOrStaff,)
|
||||
authentication_classes = (
|
||||
JwtAuthentication,
|
||||
OAuth2AuthenticationAllowInactiveUser,
|
||||
SessionAuthenticationAllowInactiveUser,
|
||||
)
|
||||
|
||||
permission_classes = (permissions.JWT_RESTRICTED_APPLICATION_OR_USER_ACCESS,)
|
||||
|
||||
required_scopes = ['grades:read']
|
||||
|
||||
def get(self, request, course_id=None):
|
||||
"""
|
||||
|
||||
@@ -116,7 +116,7 @@ class MobileAuthUserTestMixin(MobileAuthTestMixin):
|
||||
"""
|
||||
def test_invalid_user(self):
|
||||
self.login_and_enroll()
|
||||
self.api_response(expected_response_code=404, username='no_user')
|
||||
self.api_response(expected_response_code=403, username='no_user')
|
||||
|
||||
def test_other_user(self):
|
||||
# login and enroll as the test user
|
||||
@@ -131,7 +131,7 @@ class MobileAuthUserTestMixin(MobileAuthTestMixin):
|
||||
|
||||
# now login and call the API as the test user
|
||||
self.login()
|
||||
self.api_response(expected_response_code=404, username=other.username)
|
||||
self.api_response(expected_response_code=403, username=other.username)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
|
||||
@@ -17,7 +17,6 @@ from .test_api import BookmarkApiEventTestMixin
|
||||
from .test_models import BookmarksTestsBase
|
||||
|
||||
|
||||
# pylint: disable=no-member
|
||||
class BookmarksViewsTestsBase(BookmarksTestsBase, BookmarkApiEventTestMixin):
|
||||
"""
|
||||
Base class for bookmarks views tests.
|
||||
@@ -397,15 +396,28 @@ class BookmarksDetailViewTests(BookmarksViewsTestsBase):
|
||||
|
||||
def test_get_bookmark_that_belongs_to_other_user(self):
|
||||
"""
|
||||
Test that requesting bookmark that belongs to other user returns 404 status code.
|
||||
Test that requesting bookmark that belongs to other user returns 403 status code.
|
||||
"""
|
||||
self.send_get(
|
||||
client=self.client,
|
||||
url=reverse(
|
||||
'bookmarks_detail',
|
||||
kwargs={'username': 'other', 'usage_id': unicode(self.vertical_1.location)}
|
||||
kwargs={'username': self.other_user.username, 'usage_id': unicode(self.vertical_1.location)}
|
||||
),
|
||||
expected_status=404
|
||||
expected_status=403
|
||||
)
|
||||
|
||||
def test_get_bookmark_that_belongs_to_nonexistent_user(self):
|
||||
"""
|
||||
Test that requesting bookmark that belongs to a non-existent user also returns 403 status code.
|
||||
"""
|
||||
self.send_get(
|
||||
client=self.client,
|
||||
url=reverse(
|
||||
'bookmarks_detail',
|
||||
kwargs={'username': 'non-existent', 'usage_id': unicode(self.vertical_1.location)}
|
||||
),
|
||||
expected_status=403
|
||||
)
|
||||
|
||||
def test_get_bookmark_that_does_not_exist(self):
|
||||
@@ -482,7 +494,7 @@ class BookmarksDetailViewTests(BookmarksViewsTestsBase):
|
||||
|
||||
def test_delete_bookmark_that_belongs_to_other_user(self):
|
||||
"""
|
||||
Test that delete bookmark that belongs to other user returns 404.
|
||||
Test that delete bookmark that belongs to other user returns 403.
|
||||
"""
|
||||
self.send_delete(
|
||||
client=self.client,
|
||||
@@ -490,7 +502,7 @@ class BookmarksDetailViewTests(BookmarksViewsTestsBase):
|
||||
'bookmarks_detail',
|
||||
kwargs={'username': 'other', 'usage_id': unicode(self.vertical_1.location)}
|
||||
),
|
||||
expected_status=404
|
||||
expected_status=403
|
||||
)
|
||||
|
||||
def test_delete_bookmark_that_does_not_exist(self):
|
||||
|
||||
@@ -243,6 +243,18 @@ class ProfileImageViewPostTestCase(ProfileImageEndpointMixin, APITestCase):
|
||||
self.assertFalse(mock_log.info.called)
|
||||
self.assert_no_events_were_emitted()
|
||||
|
||||
def test_upload_nonexistent_user(self, mock_log):
|
||||
"""
|
||||
Test that an authenticated user who POSTs to a non-existent user's upload
|
||||
endpoint gets an indistinguishable 403.
|
||||
"""
|
||||
nonexistent_user_url = reverse(self._view_name, kwargs={'username': 'nonexistent'})
|
||||
|
||||
with make_image_file() as image_file:
|
||||
response = self.client.post(nonexistent_user_url, {'file': image_file}, format='multipart')
|
||||
self.check_response(response, 403)
|
||||
self.assertFalse(mock_log.info.called)
|
||||
|
||||
def test_upload_other(self, mock_log):
|
||||
"""
|
||||
Test that an authenticated user cannot POST to another user's upload
|
||||
@@ -255,7 +267,7 @@ class ProfileImageViewPostTestCase(ProfileImageEndpointMixin, APITestCase):
|
||||
different_client.login(username=different_user.username, password=TEST_PASSWORD)
|
||||
with make_image_file() as image_file:
|
||||
response = different_client.post(self.url, {'file': image_file}, format='multipart')
|
||||
self.check_response(response, 404)
|
||||
self.check_response(response, 403)
|
||||
self.check_images(False)
|
||||
self.check_has_profile_image(False)
|
||||
self.assertFalse(mock_log.info.called)
|
||||
@@ -407,7 +419,7 @@ class ProfileImageViewDeleteTestCase(ProfileImageEndpointMixin, APITestCase):
|
||||
different_client = APIClient()
|
||||
different_client.login(username=different_user.username, password=TEST_PASSWORD)
|
||||
response = different_client.delete(self.url)
|
||||
self.check_response(response, 404)
|
||||
self.check_response(response, 403)
|
||||
self.check_images(True) # thumbnails should remain intact.
|
||||
self.check_has_profile_image(True)
|
||||
self.assertFalse(mock_log.info.called)
|
||||
|
||||
@@ -315,12 +315,13 @@ def _get_authorized_user(requesting_user, username=None, allow_staff=False):
|
||||
# Otherwise, treat this as a request against a separate user
|
||||
username = requesting_user.username
|
||||
|
||||
_check_authorized(requesting_user, username, allow_staff)
|
||||
|
||||
try:
|
||||
existing_user = User.objects.get(username=username)
|
||||
except ObjectDoesNotExist:
|
||||
raise UserNotFound()
|
||||
|
||||
_check_authorized(requesting_user, username, allow_staff)
|
||||
return existing_user
|
||||
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ class TestPreferenceAPI(CacheIsolationTestCase):
|
||||
"""
|
||||
Verifies that get_user_preference returns appropriate errors.
|
||||
"""
|
||||
with self.assertRaises(UserNotFound):
|
||||
with self.assertRaises(UserNotAuthorized):
|
||||
get_user_preference(self.user, self.test_preference_key, username="no_such_user")
|
||||
|
||||
with self.assertRaises(UserNotFound):
|
||||
@@ -100,7 +100,7 @@ class TestPreferenceAPI(CacheIsolationTestCase):
|
||||
"""
|
||||
Verifies that get_user_preferences returns appropriate errors.
|
||||
"""
|
||||
with self.assertRaises(UserNotFound):
|
||||
with self.assertRaises(UserNotAuthorized):
|
||||
get_user_preferences(self.user, username="no_such_user")
|
||||
|
||||
with self.assertRaises(UserNotFound):
|
||||
@@ -125,7 +125,7 @@ class TestPreferenceAPI(CacheIsolationTestCase):
|
||||
"""
|
||||
Verifies that set_user_preference returns appropriate errors.
|
||||
"""
|
||||
with self.assertRaises(UserNotFound):
|
||||
with self.assertRaises(UserNotAuthorized):
|
||||
set_user_preference(self.user, self.test_preference_key, "new_value", username="no_such_user")
|
||||
|
||||
with self.assertRaises(UserNotFound):
|
||||
@@ -227,7 +227,7 @@ class TestPreferenceAPI(CacheIsolationTestCase):
|
||||
update_data = {
|
||||
self.test_preference_key: "new_value"
|
||||
}
|
||||
with self.assertRaises(UserNotFound):
|
||||
with self.assertRaises(UserNotAuthorized):
|
||||
update_user_preferences(self.user, update_data, user="no_such_user")
|
||||
|
||||
with self.assertRaises(UserNotFound):
|
||||
@@ -303,7 +303,7 @@ class TestPreferenceAPI(CacheIsolationTestCase):
|
||||
"""
|
||||
Verifies that delete_user_preference returns appropriate errors.
|
||||
"""
|
||||
with self.assertRaises(UserNotFound):
|
||||
with self.assertRaises(UserNotAuthorized):
|
||||
delete_user_preference(self.user, self.test_preference_key, username="no_such_user")
|
||||
|
||||
with self.assertRaises(UserNotFound):
|
||||
|
||||
@@ -52,7 +52,7 @@ class TestPreferencesAPI(UserAPITestCase):
|
||||
Test that a client (logged in) cannot get the preferences information for a different client.
|
||||
"""
|
||||
self.different_client.login(username=self.different_user.username, password=TEST_PASSWORD)
|
||||
self.send_get(self.different_client, expected_status=404)
|
||||
self.send_get(self.different_client, expected_status=403)
|
||||
|
||||
@ddt.data(
|
||||
("client", "user"),
|
||||
@@ -61,11 +61,11 @@ class TestPreferencesAPI(UserAPITestCase):
|
||||
@ddt.unpack
|
||||
def test_get_unknown_user(self, api_client, username):
|
||||
"""
|
||||
Test that requesting a user who does not exist returns a 404.
|
||||
Test that requesting a user who does not exist returns a 404 for staff users, but 403 for others.
|
||||
"""
|
||||
client = self.login_client(api_client, username)
|
||||
response = client.get(reverse(self.url_endpoint_name, kwargs={'username': "does_not_exist"}))
|
||||
self.assertEqual(404, response.status_code)
|
||||
self.assertEqual(404 if username == "staff_user" else 403, response.status_code)
|
||||
|
||||
def test_get_preferences_default(self):
|
||||
"""
|
||||
@@ -83,8 +83,7 @@ class TestPreferencesAPI(UserAPITestCase):
|
||||
@ddt.unpack
|
||||
def test_get_preferences(self, api_client, user):
|
||||
"""
|
||||
Test that a client (logged in) can get her own preferences information. Also verifies that a "is_staff"
|
||||
user can get the preferences information for other users.
|
||||
Test that a client (logged in) can get her own preferences information.
|
||||
"""
|
||||
# Create some test preferences values.
|
||||
set_user_preference(self.user, "dict_pref", {"int_key": 10})
|
||||
@@ -104,14 +103,14 @@ class TestPreferencesAPI(UserAPITestCase):
|
||||
@ddt.unpack
|
||||
def test_patch_unknown_user(self, api_client, user):
|
||||
"""
|
||||
Test that trying to update preferences for a user who does not exist returns a 404.
|
||||
Test that trying to update preferences for a user who does not exist returns a 403.
|
||||
"""
|
||||
client = self.login_client(api_client, user)
|
||||
response = client.patch(
|
||||
reverse(self.url_endpoint_name, kwargs={'username': "does_not_exist"}),
|
||||
data=json.dumps({"string_pref": "value"}), content_type="application/merge-patch+json"
|
||||
)
|
||||
self.assertEqual(404, response.status_code)
|
||||
self.assertEqual(403, response.status_code)
|
||||
|
||||
def test_patch_bad_content_type(self):
|
||||
"""
|
||||
@@ -168,7 +167,7 @@ class TestPreferencesAPI(UserAPITestCase):
|
||||
"dict_pref": {"int_key": 10},
|
||||
"string_pref": "value",
|
||||
},
|
||||
expected_status=403 if user == "staff_user" else 404,
|
||||
expected_status=403,
|
||||
)
|
||||
|
||||
def test_update_preferences(self):
|
||||
@@ -311,7 +310,7 @@ class TestPreferencesAPI(UserAPITestCase):
|
||||
"new_pref": "new_value",
|
||||
"extra_pref": None,
|
||||
},
|
||||
expected_status=403 if user == "staff_user" else 404
|
||||
expected_status=403
|
||||
)
|
||||
|
||||
|
||||
@@ -405,9 +404,9 @@ class TestPreferencesDetailAPI(UserAPITestCase):
|
||||
Test that a client (logged in) cannot manipulate a preference for a different client.
|
||||
"""
|
||||
self.different_client.login(username=self.different_user.username, password=TEST_PASSWORD)
|
||||
self.send_get(self.different_client, expected_status=404)
|
||||
self.send_put(self.different_client, "new_value", expected_status=404)
|
||||
self.send_delete(self.different_client, expected_status=404)
|
||||
self.send_get(self.different_client, expected_status=403)
|
||||
self.send_put(self.different_client, "new_value", expected_status=403)
|
||||
self.send_delete(self.different_client, expected_status=403)
|
||||
|
||||
@ddt.data(
|
||||
("client", "user"),
|
||||
@@ -416,13 +415,13 @@ class TestPreferencesDetailAPI(UserAPITestCase):
|
||||
@ddt.unpack
|
||||
def test_get_unknown_user(self, api_client, username):
|
||||
"""
|
||||
Test that requesting a user who does not exist returns a 404.
|
||||
Test that requesting a user who does not exist returns a 404 for staff users, but 403 for others.
|
||||
"""
|
||||
client = self.login_client(api_client, username)
|
||||
response = client.get(
|
||||
reverse(self.url_endpoint_name, kwargs={'username': "does_not_exist", 'preference_key': self.test_pref_key})
|
||||
)
|
||||
self.assertEqual(404, response.status_code)
|
||||
self.assertEqual(404 if username == "staff_user" else 403, response.status_code)
|
||||
|
||||
def test_get_preference_does_not_exist(self):
|
||||
"""
|
||||
@@ -532,7 +531,7 @@ class TestPreferencesDetailAPI(UserAPITestCase):
|
||||
self._set_url("new_key")
|
||||
client = self.login_client(api_client, user)
|
||||
new_value = "new value"
|
||||
self.send_put(client, new_value, expected_status=403 if user == "staff_user" else 404)
|
||||
self.send_put(client, new_value, expected_status=403)
|
||||
|
||||
@ddt.data(
|
||||
(u"new value",),
|
||||
@@ -560,7 +559,7 @@ class TestPreferencesDetailAPI(UserAPITestCase):
|
||||
"""
|
||||
client = self.login_client(api_client, user)
|
||||
new_value = "new value"
|
||||
self.send_put(client, new_value, expected_status=403 if user == "staff_user" else 404)
|
||||
self.send_put(client, new_value, expected_status=403)
|
||||
|
||||
@ddt.data(
|
||||
(None,),
|
||||
@@ -607,4 +606,4 @@ class TestPreferencesDetailAPI(UserAPITestCase):
|
||||
Test that a client (logged in) cannot delete a preference for another user.
|
||||
"""
|
||||
client = self.login_client(api_client, user)
|
||||
self.send_delete(client, expected_status=403 if user == "staff_user" else 404)
|
||||
self.send_delete(client, expected_status=403)
|
||||
|
||||
@@ -6,8 +6,10 @@ from django.conf import settings
|
||||
from django.http import Http404
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from rest_condition import C
|
||||
from rest_framework import permissions
|
||||
|
||||
from edx_rest_framework_extensions.permissions import IsStaff, IsUserInUrl
|
||||
from openedx.core.lib.log_utils import audit_log
|
||||
from student.roles import CourseInstructorRole, CourseStaffRole
|
||||
|
||||
@@ -20,7 +22,6 @@ class ApiKeyHeaderPermission(permissions.BasePermission):
|
||||
def has_permission(self, request, view):
|
||||
"""
|
||||
Check for permissions by matching the configured API key and header
|
||||
|
||||
Allow the request if and only if settings.EDX_API_KEY is set and
|
||||
the X-Edx-Api-Key HTTP header is present in the request and
|
||||
matches the setting.
|
||||
@@ -39,7 +40,6 @@ class ApiKeyHeaderPermission(permissions.BasePermission):
|
||||
class ApiKeyHeaderPermissionIsAuthenticated(ApiKeyHeaderPermission, permissions.IsAuthenticated):
|
||||
"""
|
||||
Allow someone to access the view if they have the API key OR they are authenticated.
|
||||
|
||||
See ApiKeyHeaderPermission for more information how the API key portion is implemented.
|
||||
"""
|
||||
|
||||
@@ -50,29 +50,6 @@ class ApiKeyHeaderPermissionIsAuthenticated(ApiKeyHeaderPermission, permissions.
|
||||
return api_permissions or is_authenticated_permissions
|
||||
|
||||
|
||||
class IsUserInUrl(permissions.BasePermission):
|
||||
"""
|
||||
Permission that checks to see if the request user matches the user in the URL.
|
||||
"""
|
||||
|
||||
def has_permission(self, request, view):
|
||||
"""
|
||||
Returns true if the current request is by the user themselves.
|
||||
|
||||
Note: a 404 is returned for non-staff instead of a 403. This is to prevent
|
||||
users from being able to detect the existence of accounts.
|
||||
"""
|
||||
url_username = (
|
||||
request.parser_context.get('kwargs', {}).get('username') or
|
||||
request.GET.get('username', '')
|
||||
)
|
||||
if request.user.username.lower() != url_username.lower():
|
||||
if request.user.is_staff:
|
||||
return False # staff gets 403
|
||||
raise Http404()
|
||||
return True
|
||||
|
||||
|
||||
class IsCourseStaffInstructor(permissions.BasePermission):
|
||||
"""
|
||||
Permission to check that user is a course instructor or staff of
|
||||
@@ -118,26 +95,9 @@ class IsMasterCourseStaffInstructor(permissions.BasePermission):
|
||||
return False
|
||||
|
||||
|
||||
class IsStaff(permissions.BasePermission):
|
||||
"""
|
||||
Permission that checks to see if the request user has is_staff access.
|
||||
"""
|
||||
|
||||
class IsUserInUrlOrStaff(permissions.BasePermission):
|
||||
def has_permission(self, request, view):
|
||||
if request.user.is_staff:
|
||||
return True
|
||||
|
||||
|
||||
class IsUserInUrlOrStaff(IsUserInUrl):
|
||||
"""
|
||||
Permission that checks to see if the request user matches the user in the URL or has is_staff access.
|
||||
"""
|
||||
|
||||
def has_permission(self, request, view):
|
||||
if request.user.is_staff:
|
||||
return True
|
||||
|
||||
return super(IsUserInUrlOrStaff, self).has_permission(request, view)
|
||||
return C(IsStaff) | IsUserInUrl
|
||||
|
||||
|
||||
class IsStaffOrReadOnly(permissions.BasePermission):
|
||||
|
||||
@@ -125,6 +125,7 @@ python-openid
|
||||
python-saml==2.4.0
|
||||
pyuca==1.1 # For more accurate sorting of translated country names in django-countries
|
||||
reportlab==3.1.44 # Used for shopping cart's pdf invoice/receipt generation
|
||||
rest-condition # DRF's recommendation for supporting complex permissions
|
||||
rfc6266-parser # Used to generate Content-Disposition headers.
|
||||
social-auth-app-django
|
||||
social-auth-core
|
||||
|
||||
@@ -115,7 +115,7 @@ edx-completion==0.1.7
|
||||
edx-django-oauth2-provider==1.2.5
|
||||
edx-django-release-util==0.3.1
|
||||
edx-django-sites-extensions==2.3.1
|
||||
edx-drf-extensions==1.5.1
|
||||
edx-drf-extensions==1.5.2
|
||||
edx-enterprise==0.70.1
|
||||
edx-i18n-tools==0.4.5
|
||||
edx-milestones==0.1.13
|
||||
@@ -209,7 +209,7 @@ redis==2.10.6
|
||||
reportlab==3.1.44
|
||||
requests-oauthlib==0.6.1
|
||||
requests==2.9.1
|
||||
rest-condition==1.0.3 # via edx-drf-extensions
|
||||
rest-condition==1.0.3
|
||||
rfc6266-parser==0.0.5.post2
|
||||
rules==1.3
|
||||
s3transfer==0.1.13 # via boto3
|
||||
|
||||
@@ -135,7 +135,7 @@ edx-completion==0.1.7
|
||||
edx-django-oauth2-provider==1.2.5
|
||||
edx-django-release-util==0.3.1
|
||||
edx-django-sites-extensions==2.3.1
|
||||
edx-drf-extensions==1.5.1
|
||||
edx-drf-extensions==1.5.2
|
||||
edx-enterprise==0.70.1
|
||||
edx-i18n-tools==0.4.5
|
||||
edx-lint==0.5.5
|
||||
|
||||
@@ -130,7 +130,7 @@ edx-completion==0.1.7
|
||||
edx-django-oauth2-provider==1.2.5
|
||||
edx-django-release-util==0.3.1
|
||||
edx-django-sites-extensions==2.3.1
|
||||
edx-drf-extensions==1.5.1
|
||||
edx-drf-extensions==1.5.2
|
||||
edx-enterprise==0.70.1
|
||||
edx-i18n-tools==0.4.5
|
||||
edx-lint==0.5.5
|
||||
@@ -228,8 +228,8 @@ pluggy==0.6.0 # via pytest, tox
|
||||
polib==1.1.0
|
||||
psutil==1.2.1
|
||||
py2neo==3.1.2
|
||||
py==1.5.3 # via pytest, tox
|
||||
pyasn1-modules==0.2.1 # via service-identity
|
||||
py==1.5.4 # via pytest, tox
|
||||
pyasn1-modules==0.2.2 # via service-identity
|
||||
pyasn1==0.4.3 # via pyasn1-modules, service-identity
|
||||
pycodestyle==2.3.1
|
||||
pycontracts==1.7.1
|
||||
|
||||
Reference in New Issue
Block a user