From 1e4f1b58891398089cdf132f993cd5a6a5b4f518 Mon Sep 17 00:00:00 2001 From: Diana Huang Date: Mon, 2 Mar 2015 14:46:49 -0500 Subject: [PATCH] Allow the enrollment API to be accessed via API keys. XCOM-107 --- .../djangoapps/enrollment/tests/test_views.py | 76 ++++++++++++------- common/djangoapps/enrollment/views.py | 34 +++++++-- openedx/core/lib/api/permissions.py | 13 ++++ 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/common/djangoapps/enrollment/tests/test_views.py b/common/djangoapps/enrollment/tests/test_views.py index 998ba0cec5..c1641c347c 100644 --- a/common/djangoapps/enrollment/tests/test_views.py +++ b/common/djangoapps/enrollment/tests/test_views.py @@ -16,11 +16,13 @@ from util.testing import UrlResetMixin from enrollment import api from enrollment.errors import CourseEnrollmentError from openedx.core.djangoapps.user_api.models import UserOrgTag +from django.test.utils import override_settings from student.tests.factories import UserFactory, CourseModeFactory from student.models import CourseEnrollment from embargo.test_utils import restrict_course +@override_settings(EDX_API_KEY="i am a key") @ddt.ddt @unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') class EnrollmentTest(ModuleStoreTestCase, APITestCase): @@ -30,12 +32,14 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase): USERNAME = "Bob" EMAIL = "bob@example.com" PASSWORD = "edx" + API_KEY = "i am a key" def setUp(self): """ Create a course and user, then log in. """ super(EnrollmentTest, self).setUp() self.course = CourseFactory.create() self.user = UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD) + self.other_user = UserFactory.create() self.client.login(username=self.USERNAME, password=self.PASSWORD) @ddt.data( @@ -179,7 +183,10 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase): mode_slug='honor', mode_display_name='Honor', ) - self._create_enrollment(username='not_the_user', expected_status=status.HTTP_404_NOT_FOUND) + self._create_enrollment(username=self.other_user.username, expected_status=status.HTTP_404_NOT_FOUND) + # Verify that the server still has access to this endpoint. + self.client.logout() + self._create_enrollment(username=self.other_user.username, as_server=True) def test_user_does_not_match_param_for_list(self): CourseModeFactory.create( @@ -187,8 +194,14 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase): mode_slug='honor', mode_display_name='Honor', ) - resp = self.client.get(reverse('courseenrollments'), {"user": "not_the_user"}) + resp = self.client.get(reverse('courseenrollments'), {"user": self.other_user.username}) self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) + # Verify that the server still has access to this endpoint. + self.client.logout() + resp = self.client.get( + reverse('courseenrollments'), {"user": self.other_user.username}, **{'HTTP_X_EDX_API_KEY': self.API_KEY} + ) + self.assertEqual(resp.status_code, status.HTTP_200_OK) def test_user_does_not_match_param(self): CourseModeFactory.create( @@ -197,9 +210,16 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase): mode_display_name='Honor', ) resp = self.client.get( - reverse('courseenrollment', kwargs={"user": "not_the_user", "course_id": unicode(self.course.id)}) + reverse('courseenrollment', kwargs={"user": self.other_user.username, "course_id": unicode(self.course.id)}) ) + # Verify that the server still has access to this endpoint. self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) + self.client.logout() + resp = self.client.get( + reverse('courseenrollment', kwargs={"user": self.other_user.username, "course_id": unicode(self.course.id)}), + **{'HTTP_X_EDX_API_KEY': self.API_KEY} + ) + self.assertEqual(resp.status_code, status.HTTP_200_OK) def test_get_course_details(self): CourseModeFactory.create( @@ -237,29 +257,6 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase): ) self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST) - def _create_enrollment(self, course_id=None, username=None, expected_status=status.HTTP_200_OK, email_opt_in=None): - """Enroll in the course and verify the URL we are sent to. """ - course_id = unicode(self.course.id) if course_id is None else course_id - username = self.user.username if username is None else username - - params = { - 'course_details': { - 'course_id': course_id - }, - 'user': username - } - if email_opt_in is not None: - params['email_opt_in'] = email_opt_in - resp = self.client.post(reverse('courseenrollments'), params, format='json') - self.assertEqual(resp.status_code, expected_status) - - if expected_status == status.HTTP_200_OK: - data = json.loads(resp.content) - self.assertEqual(course_id, data['course_details']['course_id']) - self.assertEqual('honor', data['mode']) - self.assertTrue(data['is_active']) - return resp - def test_enrollment_already_enrolled(self): response = self._create_enrollment() repeat_response = self._create_enrollment() @@ -279,6 +276,33 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase): self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST) self.assertIn("No course ", resp.content) + def _create_enrollment(self, course_id=None, username=None, expected_status=status.HTTP_200_OK, email_opt_in=None, as_server=False): + """Enroll in the course and verify the URL we are sent to. """ + course_id = unicode(self.course.id) if course_id is None else course_id + username = self.user.username if username is None else username + + params = { + 'course_details': { + 'course_id': course_id + }, + 'user': username + } + if email_opt_in is not None: + params['email_opt_in'] = email_opt_in + if as_server: + resp = self.client.post(reverse('courseenrollments'), params, format='json', **{'HTTP_X_EDX_API_KEY': self.API_KEY}) + else: + resp = self.client.post(reverse('courseenrollments'), params, format='json') + + self.assertEqual(resp.status_code, expected_status) + + if expected_status == status.HTTP_200_OK: + data = json.loads(resp.content) + self.assertEqual(course_id, data['course_details']['course_id']) + self.assertEqual('honor', data['mode']) + self.assertTrue(data['is_active']) + return resp + @unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms') class EnrollmentEmbargoTest(UrlResetMixin, ModuleStoreTestCase): diff --git a/common/djangoapps/enrollment/views.py b/common/djangoapps/enrollment/views.py index 37359a6145..bee302c4a4 100644 --- a/common/djangoapps/enrollment/views.py +++ b/common/djangoapps/enrollment/views.py @@ -8,6 +8,7 @@ from django.conf import settings from opaque_keys import InvalidKeyError from opaque_keys.edx.locator import CourseLocator from openedx.core.djangoapps.user_api import api as user_api +from openedx.core.lib.api.permissions import ApiKeyHeaderPermission, ApiKeyHeaderPermissionIsAuthenticated from rest_framework import status from rest_framework import permissions from rest_framework.response import Response @@ -30,8 +31,27 @@ class EnrollmentUserThrottle(UserRateThrottle): rate = '50/second' +class ApiKeyPermissionMixIn(object): + """ + This mixin is used to provide a convenience function for doing individual permission checks + for the presence of API keys. + """ + def has_api_key_permissions(self, request): + """ + Checks to see if the request was made by a server with an API key. + + Args: + request (Request): the request being made into the view + + Return: + True if the request has been made with a valid API key + False otherwise + """ + return ApiKeyHeaderPermission().has_permission(request, self) + + @can_disable_rate_limit -class EnrollmentView(APIView): +class EnrollmentView(APIView, ApiKeyPermissionMixIn): """ **Use Cases** @@ -73,7 +93,7 @@ class EnrollmentView(APIView): """ authentication_classes = OAuth2AuthenticationAllowInactiveUser, SessionAuthenticationAllowInactiveUser - permission_classes = permissions.IsAuthenticated, + permission_classes = ApiKeyHeaderPermissionIsAuthenticated, throttle_classes = EnrollmentUserThrottle, def get(self, request, course_id=None, user=None): @@ -94,7 +114,7 @@ class EnrollmentView(APIView): """ user = user if user else request.user.username - if request.user.username != user: + if request.user.username != user and not self.has_api_key_permissions(request): # Return a 404 instead of a 403 (Unauthorized). If one user is looking up # other users, do not let them deduce the existence of an enrollment. return Response(status=status.HTTP_404_NOT_FOUND) @@ -182,7 +202,7 @@ class EnrollmentCourseDetailView(APIView): @can_disable_rate_limit -class EnrollmentListView(APIView): +class EnrollmentListView(APIView, ApiKeyPermissionMixIn): """ **Use Cases** @@ -245,7 +265,7 @@ class EnrollmentListView(APIView): """ authentication_classes = OAuth2AuthenticationAllowInactiveUser, SessionAuthenticationAllowInactiveUser - permission_classes = permissions.IsAuthenticated, + permission_classes = ApiKeyHeaderPermissionIsAuthenticated, throttle_classes = EnrollmentUserThrottle, def get(self, request): @@ -253,7 +273,7 @@ class EnrollmentListView(APIView): Gets a list of all course enrollments for the currently logged in user. """ user = request.GET.get('user', request.user.username) - if request.user.username != user: + if request.user.username != user and not self.has_api_key_permissions(request): # Return a 404 instead of a 403 (Unauthorized). If one user is looking up # other users, do not let them deduce the existence of an enrollment. return Response(status=status.HTTP_404_NOT_FOUND) @@ -276,7 +296,7 @@ class EnrollmentListView(APIView): user = request.DATA.get('user', request.user.username) if not user: user = request.user.username - if user != request.user.username: + if user != request.user.username and not self.has_api_key_permissions(request): # Return a 404 instead of a 403 (Unauthorized). If one user is looking up # other users, do not let them deduce the existence of an enrollment. return Response(status=status.HTTP_404_NOT_FOUND) diff --git a/openedx/core/lib/api/permissions.py b/openedx/core/lib/api/permissions.py index 2b7c122665..129ae7dee2 100644 --- a/openedx/core/lib/api/permissions.py +++ b/openedx/core/lib/api/permissions.py @@ -21,6 +21,19 @@ class ApiKeyHeaderPermission(permissions.BasePermission): ) +class ApiKeyHeaderPermissionIsAuthenticated(ApiKeyHeaderPermission, permissions.IsAuthenticated): + """ + Allow someone to access the view if they have the API key OR they are authenticated. + + See ApiKeyHeaderPermission for more information how the API key portion is implemented. + """ + def has_permission(self, request, view): + #TODO We can optimize this later on when we know which of these methods is used more often. + api_permissions = ApiKeyHeaderPermission.has_permission(self, request, view) + is_authenticated_permissions = permissions.IsAuthenticated.has_permission(self, request, view) + return api_permissions or is_authenticated_permissions + + class IsAuthenticatedOrDebug(permissions.BasePermission): """ Allows access only to authenticated users, or anyone if debug mode is enabled.