Merge pull request #7193 from edx/diana/enrollment-api-permissions
Allow the enrollment API to be accessed via API keys.
This commit is contained in:
@@ -16,11 +16,13 @@ from util.testing import UrlResetMixin
|
||||
from enrollment import api
|
||||
from enrollment.errors import CourseEnrollmentError
|
||||
from openedx.core.djangoapps.user_api.models import UserOrgTag
|
||||
from django.test.utils import override_settings
|
||||
from student.tests.factories import UserFactory, CourseModeFactory
|
||||
from student.models import CourseEnrollment
|
||||
from embargo.test_utils import restrict_course
|
||||
|
||||
|
||||
@override_settings(EDX_API_KEY="i am a key")
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class EnrollmentTest(ModuleStoreTestCase, APITestCase):
|
||||
@@ -30,12 +32,14 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
|
||||
USERNAME = "Bob"
|
||||
EMAIL = "bob@example.com"
|
||||
PASSWORD = "edx"
|
||||
API_KEY = "i am a key"
|
||||
|
||||
def setUp(self):
|
||||
""" Create a course and user, then log in. """
|
||||
super(EnrollmentTest, self).setUp()
|
||||
self.course = CourseFactory.create()
|
||||
self.user = UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
|
||||
self.other_user = UserFactory.create()
|
||||
self.client.login(username=self.USERNAME, password=self.PASSWORD)
|
||||
|
||||
@ddt.data(
|
||||
@@ -179,7 +183,10 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
|
||||
mode_slug='honor',
|
||||
mode_display_name='Honor',
|
||||
)
|
||||
self._create_enrollment(username='not_the_user', expected_status=status.HTTP_404_NOT_FOUND)
|
||||
self._create_enrollment(username=self.other_user.username, expected_status=status.HTTP_404_NOT_FOUND)
|
||||
# Verify that the server still has access to this endpoint.
|
||||
self.client.logout()
|
||||
self._create_enrollment(username=self.other_user.username, as_server=True)
|
||||
|
||||
def test_user_does_not_match_param_for_list(self):
|
||||
CourseModeFactory.create(
|
||||
@@ -187,8 +194,14 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
|
||||
mode_slug='honor',
|
||||
mode_display_name='Honor',
|
||||
)
|
||||
resp = self.client.get(reverse('courseenrollments'), {"user": "not_the_user"})
|
||||
resp = self.client.get(reverse('courseenrollments'), {"user": self.other_user.username})
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
# Verify that the server still has access to this endpoint.
|
||||
self.client.logout()
|
||||
resp = self.client.get(
|
||||
reverse('courseenrollments'), {"user": self.other_user.username}, **{'HTTP_X_EDX_API_KEY': self.API_KEY}
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_user_does_not_match_param(self):
|
||||
CourseModeFactory.create(
|
||||
@@ -197,9 +210,16 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
|
||||
mode_display_name='Honor',
|
||||
)
|
||||
resp = self.client.get(
|
||||
reverse('courseenrollment', kwargs={"user": "not_the_user", "course_id": unicode(self.course.id)})
|
||||
reverse('courseenrollment', kwargs={"user": self.other_user.username, "course_id": unicode(self.course.id)})
|
||||
)
|
||||
# Verify that the server still has access to this endpoint.
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
self.client.logout()
|
||||
resp = self.client.get(
|
||||
reverse('courseenrollment', kwargs={"user": self.other_user.username, "course_id": unicode(self.course.id)}),
|
||||
**{'HTTP_X_EDX_API_KEY': self.API_KEY}
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_get_course_details(self):
|
||||
CourseModeFactory.create(
|
||||
@@ -237,29 +257,6 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
def _create_enrollment(self, course_id=None, username=None, expected_status=status.HTTP_200_OK, email_opt_in=None):
|
||||
"""Enroll in the course and verify the URL we are sent to. """
|
||||
course_id = unicode(self.course.id) if course_id is None else course_id
|
||||
username = self.user.username if username is None else username
|
||||
|
||||
params = {
|
||||
'course_details': {
|
||||
'course_id': course_id
|
||||
},
|
||||
'user': username
|
||||
}
|
||||
if email_opt_in is not None:
|
||||
params['email_opt_in'] = email_opt_in
|
||||
resp = self.client.post(reverse('courseenrollments'), params, format='json')
|
||||
self.assertEqual(resp.status_code, expected_status)
|
||||
|
||||
if expected_status == status.HTTP_200_OK:
|
||||
data = json.loads(resp.content)
|
||||
self.assertEqual(course_id, data['course_details']['course_id'])
|
||||
self.assertEqual('honor', data['mode'])
|
||||
self.assertTrue(data['is_active'])
|
||||
return resp
|
||||
|
||||
def test_enrollment_already_enrolled(self):
|
||||
response = self._create_enrollment()
|
||||
repeat_response = self._create_enrollment()
|
||||
@@ -279,6 +276,33 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
|
||||
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertIn("No course ", resp.content)
|
||||
|
||||
def _create_enrollment(self, course_id=None, username=None, expected_status=status.HTTP_200_OK, email_opt_in=None, as_server=False):
|
||||
"""Enroll in the course and verify the URL we are sent to. """
|
||||
course_id = unicode(self.course.id) if course_id is None else course_id
|
||||
username = self.user.username if username is None else username
|
||||
|
||||
params = {
|
||||
'course_details': {
|
||||
'course_id': course_id
|
||||
},
|
||||
'user': username
|
||||
}
|
||||
if email_opt_in is not None:
|
||||
params['email_opt_in'] = email_opt_in
|
||||
if as_server:
|
||||
resp = self.client.post(reverse('courseenrollments'), params, format='json', **{'HTTP_X_EDX_API_KEY': self.API_KEY})
|
||||
else:
|
||||
resp = self.client.post(reverse('courseenrollments'), params, format='json')
|
||||
|
||||
self.assertEqual(resp.status_code, expected_status)
|
||||
|
||||
if expected_status == status.HTTP_200_OK:
|
||||
data = json.loads(resp.content)
|
||||
self.assertEqual(course_id, data['course_details']['course_id'])
|
||||
self.assertEqual('honor', data['mode'])
|
||||
self.assertTrue(data['is_active'])
|
||||
return resp
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class EnrollmentEmbargoTest(UrlResetMixin, ModuleStoreTestCase):
|
||||
|
||||
@@ -8,6 +8,7 @@ from django.conf import settings
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.locator import CourseLocator
|
||||
from openedx.core.djangoapps.user_api import api as user_api
|
||||
from openedx.core.lib.api.permissions import ApiKeyHeaderPermission, ApiKeyHeaderPermissionIsAuthenticated
|
||||
from rest_framework import status
|
||||
from rest_framework import permissions
|
||||
from rest_framework.response import Response
|
||||
@@ -30,8 +31,27 @@ class EnrollmentUserThrottle(UserRateThrottle):
|
||||
rate = '50/second'
|
||||
|
||||
|
||||
class ApiKeyPermissionMixIn(object):
|
||||
"""
|
||||
This mixin is used to provide a convenience function for doing individual permission checks
|
||||
for the presence of API keys.
|
||||
"""
|
||||
def has_api_key_permissions(self, request):
|
||||
"""
|
||||
Checks to see if the request was made by a server with an API key.
|
||||
|
||||
Args:
|
||||
request (Request): the request being made into the view
|
||||
|
||||
Return:
|
||||
True if the request has been made with a valid API key
|
||||
False otherwise
|
||||
"""
|
||||
return ApiKeyHeaderPermission().has_permission(request, self)
|
||||
|
||||
|
||||
@can_disable_rate_limit
|
||||
class EnrollmentView(APIView):
|
||||
class EnrollmentView(APIView, ApiKeyPermissionMixIn):
|
||||
"""
|
||||
**Use Cases**
|
||||
|
||||
@@ -73,7 +93,7 @@ class EnrollmentView(APIView):
|
||||
"""
|
||||
|
||||
authentication_classes = OAuth2AuthenticationAllowInactiveUser, SessionAuthenticationAllowInactiveUser
|
||||
permission_classes = permissions.IsAuthenticated,
|
||||
permission_classes = ApiKeyHeaderPermissionIsAuthenticated,
|
||||
throttle_classes = EnrollmentUserThrottle,
|
||||
|
||||
def get(self, request, course_id=None, user=None):
|
||||
@@ -94,7 +114,7 @@ class EnrollmentView(APIView):
|
||||
|
||||
"""
|
||||
user = user if user else request.user.username
|
||||
if request.user.username != user:
|
||||
if request.user.username != user and not self.has_api_key_permissions(request):
|
||||
# Return a 404 instead of a 403 (Unauthorized). If one user is looking up
|
||||
# other users, do not let them deduce the existence of an enrollment.
|
||||
return Response(status=status.HTTP_404_NOT_FOUND)
|
||||
@@ -182,7 +202,7 @@ class EnrollmentCourseDetailView(APIView):
|
||||
|
||||
|
||||
@can_disable_rate_limit
|
||||
class EnrollmentListView(APIView):
|
||||
class EnrollmentListView(APIView, ApiKeyPermissionMixIn):
|
||||
"""
|
||||
**Use Cases**
|
||||
|
||||
@@ -245,7 +265,7 @@ class EnrollmentListView(APIView):
|
||||
"""
|
||||
|
||||
authentication_classes = OAuth2AuthenticationAllowInactiveUser, SessionAuthenticationAllowInactiveUser
|
||||
permission_classes = permissions.IsAuthenticated,
|
||||
permission_classes = ApiKeyHeaderPermissionIsAuthenticated,
|
||||
throttle_classes = EnrollmentUserThrottle,
|
||||
|
||||
def get(self, request):
|
||||
@@ -253,7 +273,7 @@ class EnrollmentListView(APIView):
|
||||
Gets a list of all course enrollments for the currently logged in user.
|
||||
"""
|
||||
user = request.GET.get('user', request.user.username)
|
||||
if request.user.username != user:
|
||||
if request.user.username != user and not self.has_api_key_permissions(request):
|
||||
# Return a 404 instead of a 403 (Unauthorized). If one user is looking up
|
||||
# other users, do not let them deduce the existence of an enrollment.
|
||||
return Response(status=status.HTTP_404_NOT_FOUND)
|
||||
@@ -276,7 +296,7 @@ class EnrollmentListView(APIView):
|
||||
user = request.DATA.get('user', request.user.username)
|
||||
if not user:
|
||||
user = request.user.username
|
||||
if user != request.user.username:
|
||||
if user != request.user.username and not self.has_api_key_permissions(request):
|
||||
# Return a 404 instead of a 403 (Unauthorized). If one user is looking up
|
||||
# other users, do not let them deduce the existence of an enrollment.
|
||||
return Response(status=status.HTTP_404_NOT_FOUND)
|
||||
|
||||
@@ -21,6 +21,19 @@ class ApiKeyHeaderPermission(permissions.BasePermission):
|
||||
)
|
||||
|
||||
|
||||
class ApiKeyHeaderPermissionIsAuthenticated(ApiKeyHeaderPermission, permissions.IsAuthenticated):
|
||||
"""
|
||||
Allow someone to access the view if they have the API key OR they are authenticated.
|
||||
|
||||
See ApiKeyHeaderPermission for more information how the API key portion is implemented.
|
||||
"""
|
||||
def has_permission(self, request, view):
|
||||
#TODO We can optimize this later on when we know which of these methods is used more often.
|
||||
api_permissions = ApiKeyHeaderPermission.has_permission(self, request, view)
|
||||
is_authenticated_permissions = permissions.IsAuthenticated.has_permission(self, request, view)
|
||||
return api_permissions or is_authenticated_permissions
|
||||
|
||||
|
||||
class IsAuthenticatedOrDebug(permissions.BasePermission):
|
||||
"""
|
||||
Allows access only to authenticated users, or anyone if debug mode is enabled.
|
||||
|
||||
Reference in New Issue
Block a user