diff --git a/lms/urls.py b/lms/urls.py
index 529f6591e6..02eeae6fc7 100644
--- a/lms/urls.py
+++ b/lms/urls.py
@@ -98,6 +98,7 @@ urlpatterns = (
url(r'^api/val/v0/', include('edxval.urls')),
url(r'^api/commerce/', include('commerce.api.urls', namespace='commerce_api')),
+ url(r'^api/credit/', include('openedx.core.djangoapps.credit.urls', app_name="credit", namespace='credit')),
)
if settings.FEATURES["ENABLE_COMBINED_LOGIN_REGISTRATION"]:
@@ -115,12 +116,6 @@ else:
url(r'^register$', 'student.views.register_user', name="register_user"),
)
-if settings.FEATURES.get("ENABLE_CREDIT_API"):
- # Credit API end-points
- urlpatterns += (
- url(r'^api/credit/', include('openedx.core.djangoapps.credit.urls', app_name="credit", namespace='credit')),
- )
-
if settings.FEATURES["ENABLE_MOBILE_REST_API"]:
urlpatterns += (
url(r'^api/mobile/v0.5/', include('mobile_api.urls')),
diff --git a/openedx/core/djangoapps/credit/api/eligibility.py b/openedx/core/djangoapps/credit/api/eligibility.py
index fd9c2bd826..9ad0abff9e 100644
--- a/openedx/core/djangoapps/credit/api/eligibility.py
+++ b/openedx/core/djangoapps/credit/api/eligibility.py
@@ -5,6 +5,8 @@ whether a user has satisfied those requirements.
import logging
+from opaque_keys.edx.keys import CourseKey
+
from openedx.core.djangoapps.credit.exceptions import InvalidCreditRequirements, InvalidCreditCourse
from openedx.core.djangoapps.credit.email_utils import send_credit_notifications
from openedx.core.djangoapps.credit.models import (
@@ -14,8 +16,7 @@ from openedx.core.djangoapps.credit.models import (
CreditEligibility,
)
-from opaque_keys.edx.keys import CourseKey
-
+# TODO: Cleanup this mess! ECOM-2908
log = logging.getLogger(__name__)
@@ -246,8 +247,7 @@ def set_credit_requirement_status(username, course_key, req_namespace, req_name,
# Find the requirement we're trying to set
req_to_update = next((
req for req in reqs
- if req.namespace == req_namespace
- and req.name == req_name
+ if req.namespace == req_namespace and req.name == req_name
), None)
# If we can't find the requirement, then the most likely explanation
diff --git a/openedx/core/djangoapps/credit/api/provider.py b/openedx/core/djangoapps/credit/api/provider.py
index ef2acc36c5..50dcd54be5 100644
--- a/openedx/core/djangoapps/credit/api/provider.py
+++ b/openedx/core/djangoapps/credit/api/provider.py
@@ -4,12 +4,12 @@ API for initiating and tracking requests for credit from a provider.
import datetime
import logging
-import pytz
import uuid
+import pytz
from django.db import transaction
-from lms.djangoapps.django_comment_client.utils import JsonResponse
+from lms.djangoapps.django_comment_client.utils import JsonResponse
from openedx.core.djangoapps.credit.exceptions import (
UserIsNotEligible,
CreditProviderNotConfigured,
@@ -28,6 +28,8 @@ from student.models import User
from util.date_utils import to_timestamp
+# TODO: Cleanup this mess! ECOM-2908
+
log = logging.getLogger(__name__)
@@ -257,12 +259,10 @@ def create_credit_request(course_key, provider_id, username):
final_grade = unicode(final_grade)
except (CreditRequirementStatus.DoesNotExist, TypeError, KeyError):
- log.exception(
- "Could not retrieve final grade from the credit eligibility table "
- "for user %s in course %s.",
- user.id, course_key
- )
- raise UserIsNotEligible
+ msg = 'Could not retrieve final grade from the credit eligibility table for ' \
+ 'user [{user_id}] in course [{course_key}].'.format(user_id=user.id, course_key=course_key)
+ log.exception(msg)
+ raise UserIsNotEligible(msg)
parameters = {
"request_uuid": credit_request.uuid,
diff --git a/openedx/core/djangoapps/credit/exceptions.py b/openedx/core/djangoapps/credit/exceptions.py
index cffa47e7d1..04d58c8c30 100644
--- a/openedx/core/djangoapps/credit/exceptions.py
+++ b/openedx/core/djangoapps/credit/exceptions.py
@@ -1,4 +1,10 @@
"""Exceptions raised by the credit API. """
+from __future__ import unicode_literals
+from django.utils.translation import ugettext_lazy as _
+from rest_framework import status
+from rest_framework.exceptions import APIException
+
+# TODO: Cleanup this mess! ECOM-2908
class CreditApiBadRequest(Exception):
@@ -56,3 +62,25 @@ class InvalidCreditStatus(CreditApiBadRequest):
The status is not either "approved" or "rejected".
"""
pass
+
+
+class InvalidCreditRequest(APIException):
+ """ API request is invalid. """
+ status_code = status.HTTP_400_BAD_REQUEST
+
+
+class UserNotEligibleException(InvalidCreditRequest):
+ """ User not eligible for credit for a given course. """
+
+ def __init__(self, course_key, username):
+ detail = _('[{username}] is not eligible for credit for [{course_key}].').format(username=username,
+ course_key=course_key)
+ super(UserNotEligibleException, self).__init__(detail)
+
+
+class InvalidCourseKey(InvalidCreditRequest):
+ """ Course key is invalid. """
+
+ def __init__(self, course_key):
+ detail = _('[{course_key}] is not a valid course key.').format(course_key=course_key)
+ super(InvalidCourseKey, self).__init__(detail)
diff --git a/openedx/core/djangoapps/credit/models.py b/openedx/core/djangoapps/credit/models.py
index 21a8d29263..7cde41b58f 100644
--- a/openedx/core/djangoapps/credit/models.py
+++ b/openedx/core/djangoapps/credit/models.py
@@ -25,6 +25,7 @@ from xmodule_django.models import CourseKeyField
from django.utils.translation import ugettext_lazy
+CREDIT_PROVIDER_ID_REGEX = r"[a-z,A-Z,0-9,\-]+"
log = logging.getLogger(__name__)
@@ -42,7 +43,7 @@ class CreditProvider(TimeStampedModel):
unique=True,
validators=[
RegexValidator(
- regex=r"^[a-z,A-Z,0-9,\-]+$",
+ regex=CREDIT_PROVIDER_ID_REGEX,
message="Only alphanumeric characters and hyphens (-) are allowed",
code="invalid_provider_id",
)
@@ -498,10 +499,7 @@ def default_deadline_for_credit_eligibility(): # pylint: disable=invalid-name
class CreditEligibility(TimeStampedModel):
- """
- A record of a user's eligibility for credit from a specific credit
- provider for a specific course.
- """
+ """ A record of a user's eligibility for credit for a specific course. """
username = models.CharField(max_length=255, db_index=True)
course = models.ForeignKey(CreditCourse, related_name="eligibilities")
diff --git a/openedx/core/djangoapps/credit/serializers.py b/openedx/core/djangoapps/credit/serializers.py
index 216aa1fa05..d2f8a07720 100644
--- a/openedx/core/djangoapps/credit/serializers.py
+++ b/openedx/core/djangoapps/credit/serializers.py
@@ -1,16 +1,25 @@
""" Credit API Serializers """
-from rest_framework import serializers
+from __future__ import unicode_literals
+import datetime
+import logging
-from opaque_keys.edx.keys import CourseKey
+from django.conf import settings
from opaque_keys import InvalidKeyError
-from openedx.core.djangoapps.credit.models import CreditCourse
+from opaque_keys.edx.keys import CourseKey
+import pytz
+from rest_framework import serializers
+from rest_framework.exceptions import PermissionDenied
+
+from openedx.core.djangoapps.credit.models import CreditCourse, CreditProvider, CreditEligibility, CreditRequest
+from openedx.core.djangoapps.credit.signature import get_shared_secret_key, signature
+from util.date_utils import from_timestamp
+
+log = logging.getLogger(__name__)
class CourseKeyField(serializers.Field):
- """
- Serializer field for a model CourseKey field.
- """
+ """ Serializer field for a model CourseKey field. """
def to_representation(self, data):
"""Convert a course key to unicode. """
@@ -32,3 +41,81 @@ class CreditCourseSerializer(serializers.ModelSerializer):
class Meta(object):
model = CreditCourse
exclude = ('id',)
+
+
+class CreditProviderSerializer(serializers.ModelSerializer):
+ """ CreditProvider """
+ id = serializers.CharField(source='provider_id') # pylint:disable=invalid-name
+ description = serializers.CharField(source='provider_description')
+ status_url = serializers.URLField(source='provider_status_url')
+ url = serializers.URLField(source='provider_url')
+
+ class Meta(object):
+ model = CreditProvider
+ fields = ('id', 'display_name', 'url', 'status_url', 'description', 'enable_integration',
+ 'fulfillment_instructions', 'thumbnail_url',)
+
+
+class CreditEligibilitySerializer(serializers.ModelSerializer):
+ """ CreditEligibility serializer. """
+ course_key = serializers.SerializerMethodField()
+
+ def get_course_key(self, obj):
+ """ Returns the course key associated with the course. """
+ return unicode(obj.course.course_key)
+
+ class Meta(object):
+ model = CreditEligibility
+ fields = ('username', 'course_key', 'deadline',)
+
+
+class CreditProviderCallbackSerializer(serializers.Serializer): # pylint:disable=abstract-method
+ """
+ Serializer for input to the CreditProviderCallback view.
+
+ This is used solely for validating the input.
+ """
+ request_uuid = serializers.CharField(required=True)
+ status = serializers.ChoiceField(required=True, choices=CreditRequest.REQUEST_STATUS_CHOICES)
+ timestamp = serializers.IntegerField(required=True)
+ signature = serializers.CharField(required=True)
+
+ def __init__(self, **kwargs):
+ self.provider = kwargs.pop('provider', None)
+ super(CreditProviderCallbackSerializer, self).__init__(**kwargs)
+
+ def validate_timestamp(self, value):
+ """ Ensure the request has been received in a timely manner. """
+ date_time = from_timestamp(value)
+
+ # Ensure we converted the timestamp to a datetime
+ if not date_time:
+ msg = '[{}] is not a valid timestamp'.format(value)
+ log.warning(msg)
+ raise serializers.ValidationError(msg)
+
+ elapsed = (datetime.datetime.now(pytz.UTC) - date_time).total_seconds()
+ if elapsed > settings.CREDIT_PROVIDER_TIMESTAMP_EXPIRATION:
+ msg = '[{value}] is too far in the past (over [{elapsed}] seconds).'.format(value=value, elapsed=elapsed)
+ log.warning(msg)
+ raise serializers.ValidationError(msg)
+
+ return value
+
+ def validate_signature(self, value):
+ """ Validate the signature and ensure the provider is setup properly. """
+ provider_id = self.provider.provider_id
+ secret_key = get_shared_secret_key(provider_id)
+ if secret_key is None:
+ msg = 'Could not retrieve secret key for credit provider [{}]. ' \
+ 'Unable to validate requests from provider.'.format(provider_id)
+ log.error(msg)
+ raise PermissionDenied(msg)
+
+ data = self.initial_data
+ actual_signature = data["signature"]
+ if signature(data, secret_key) != actual_signature:
+ msg = 'Request from credit provider [{}] had an invalid signature.'.format(provider_id)
+ raise PermissionDenied(msg)
+
+ return value
diff --git a/openedx/core/djangoapps/credit/signature.py b/openedx/core/djangoapps/credit/signature.py
index ce5931a0b8..343a3c833b 100644
--- a/openedx/core/djangoapps/credit/signature.py
+++ b/openedx/core/djangoapps/credit/signature.py
@@ -59,5 +59,5 @@ def signature(params, shared_secret):
for key in sorted(params.keys())
if key != u"signature"
])
- hasher = hmac.new(shared_secret, encoded_params.encode('utf-8'), hashlib.sha256)
+ hasher = hmac.new(shared_secret.encode('utf-8'), encoded_params.encode('utf-8'), hashlib.sha256)
return hasher.hexdigest()
diff --git a/openedx/core/djangoapps/credit/tasks.py b/openedx/core/djangoapps/credit/tasks.py
index 654aaee98f..a963e4798e 100644
--- a/openedx/core/djangoapps/credit/tasks.py
+++ b/openedx/core/djangoapps/credit/tasks.py
@@ -2,13 +2,9 @@
This file contains celery tasks for credit course views.
"""
-import datetime
-from pytz import UTC
-
-from django.conf import settings
-
from celery import task
from celery.utils.log import get_task_logger
+from django.conf import settings
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey, UsageKey
diff --git a/openedx/core/djangoapps/credit/tests/factories.py b/openedx/core/djangoapps/credit/tests/factories.py
new file mode 100644
index 0000000000..ae6aa8398d
--- /dev/null
+++ b/openedx/core/djangoapps/credit/tests/factories.py
@@ -0,0 +1,72 @@
+# pylint:disable=missing-docstring,no-member
+import datetime
+import json
+import uuid # pylint:disable=unused-import
+
+from django.contrib.auth.models import User
+import factory
+from factory.fuzzy import FuzzyText
+import pytz
+
+from openedx.core.djangoapps.credit.models import CreditProvider, CreditEligibility, CreditCourse, CreditRequest
+from util.date_utils import to_timestamp
+
+
+class CreditCourseFactory(factory.DjangoModelFactory):
+ class Meta(object):
+ model = CreditCourse
+
+ course_key = FuzzyText(prefix='fake.org/', suffix='/fake.run')
+ enabled = True
+
+
+class CreditProviderFactory(factory.DjangoModelFactory):
+ class Meta(object):
+ model = CreditProvider
+
+ provider_id = FuzzyText(length=5)
+ provider_url = FuzzyText(prefix='http://')
+
+
+class CreditEligibilityFactory(factory.DjangoModelFactory):
+ class Meta(object):
+ model = CreditEligibility
+
+ course = factory.SubFactory(CreditCourseFactory)
+
+
+class CreditRequestFactory(factory.DjangoModelFactory):
+ class Meta(object):
+ model = CreditRequest
+
+ uuid = factory.LazyAttribute(lambda o: uuid.uuid4().hex) # pylint: disable=undefined-variable
+
+ # pylint: disable=access-member-before-definition,attribute-defined-outside-init,no-self-argument,unused-argument
+ @factory.post_generation
+ def post(obj, create, extracted, **kwargs):
+ """
+ Post-generation handler.
+
+ Sets up parameters field.
+ """
+ if not obj.parameters:
+ course_key = obj.course.course_key
+ user = User.objects.get(username=obj.username)
+ user_profile = user.profile
+
+ # pylint:disable=access-member-before-definition
+ obj.parameters = json.dumps({
+ "request_uuid": obj.uuid,
+ "timestamp": to_timestamp(datetime.datetime.now(pytz.UTC)),
+ "course_org": course_key.org,
+ "course_num": course_key.course,
+ "course_run": course_key.run,
+ "final_grade": '0.96',
+ "user_username": user.username,
+ "user_email": user.email,
+ "user_full_name": user_profile.name,
+ "user_mailing_address": "",
+ "user_country": user_profile.country.code or "",
+ })
+
+ obj.save()
diff --git a/openedx/core/djangoapps/credit/tests/test_api.py b/openedx/core/djangoapps/credit/tests/test_api.py
index 5328bd03f6..f8bdfda56f 100644
--- a/openedx/core/djangoapps/credit/tests/test_api.py
+++ b/openedx/core/djangoapps/credit/tests/test_api.py
@@ -2,17 +2,13 @@
Tests for the API functions in the credit app.
"""
import datetime
-import json
import unittest
import ddt
from django.conf import settings
from django.core import mail
-from django.test import TestCase
from django.test.utils import override_settings
from django.db import connection, transaction
-from django.core.urlresolvers import reverse, NoReverseMatch
-from mock import patch
from opaque_keys.edx.keys import CourseKey
import pytz
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
@@ -39,8 +35,6 @@ from student.tests.factories import UserFactory
TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6"
-from util.testing import UrlResetMixin
-
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
"hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY,
@@ -880,119 +874,3 @@ class CreditProviderIntegrationApiTests(CreditApiTestBase):
"""Check the user's credit status. """
statuses = api.get_credit_requests_for_user(self.USER_INFO["username"])
self.assertEqual(statuses[0]["status"], expected_status)
-
-
-class CreditApiFeatureFlagTest(UrlResetMixin, TestCase):
- """
- Base class to test the credit api urls.
- """
-
- def setUp(self, **kwargs):
- enable_credit_api = kwargs.get('enable_credit_api', False)
- with patch.dict('django.conf.settings.FEATURES', {'ENABLE_CREDIT_API': enable_credit_api}):
- super(CreditApiFeatureFlagTest, self).setUp('lms.urls')
-
-
-@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
-class CreditApiFeatureFlagDisabledTests(CreditApiFeatureFlagTest):
- """
- Test Python API for credit provider api with feature flag
- 'ENABLE_CREDIT_API' disabled.
- """
- PROVIDER_ID = "hogwarts"
-
- def setUp(self):
- super(CreditApiFeatureFlagDisabledTests, self).setUp(enable_credit_api=False)
-
- def test_get_credit_provider_details(self):
- """
- Test that 'get_provider_info' api url not found.
- """
- with self.assertRaises(NoReverseMatch):
- reverse('credit:get_provider_info', args=[self.PROVIDER_ID])
-
-
-@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
-class CreditApiFeatureFlagEnabledTests(CreditApiFeatureFlagTest, CreditApiTestBase):
- """
- Test Python API for credit provider api with feature flag
- 'ENABLE_CREDIT_API' enabled.
- """
- USER_INFO = {
- "username": "bob",
- "email": "bob@example.com",
- "full_name": "Bob",
- "mailing_address": "123 Fake Street, Cambridge MA",
- "country": "US",
- }
-
- FINAL_GRADE = 0.95
-
- def setUp(self):
- super(CreditApiFeatureFlagEnabledTests, self).setUp(enable_credit_api=True)
- self.user = UserFactory(
- username=self.USER_INFO['username'],
- email=self.USER_INFO['email'],
- )
-
- self.user.profile.name = self.USER_INFO['full_name']
- self.user.profile.mailing_address = self.USER_INFO['mailing_address']
- self.user.profile.country = self.USER_INFO['country']
- self.user.profile.save()
-
- # By default, configure the database so that there is a single
- # credit requirement that the user has satisfied (minimum grade)
- self._configure_credit()
-
- def test_get_credit_provider_details(self):
- """Test that credit api method 'test_get_credit_provider_details'
- returns dictionary data related to provided credit provider.
- """
- expected_result = {
- "provider_id": self.PROVIDER_ID,
- "display_name": self.PROVIDER_NAME,
- "provider_url": self.PROVIDER_URL,
- "provider_status_url": self.PROVIDER_STATUS_URL,
- "provider_description": self.PROVIDER_DESCRIPTION,
- "enable_integration": self.ENABLE_INTEGRATION,
- "fulfillment_instructions": self.FULFILLMENT_INSTRUCTIONS,
- "thumbnail_url": self.THUMBNAIL_URL
- }
- path = reverse('credit:get_provider_info', kwargs={'provider_id': self.PROVIDER_ID})
- result = self.client.get(path)
- result = json.loads(result.content)
- self.assertEqual(result, expected_result)
-
- # now test that user gets empty dict for non existent credit provider
- path = reverse('credit:get_provider_info', kwargs={'provider_id': 'fake_provider_id'})
- result = self.client.get(path)
- result = json.loads(result.content)
- self.assertEqual(result, {})
-
- def _configure_credit(self):
- """
- Configure a credit course and its requirements.
-
- By default, add a single requirement (minimum grade)
- that the user has satisfied.
-
- """
- credit_course = self.add_credit_course()
- requirement = CreditRequirement.objects.create(
- course=credit_course,
- namespace="grade",
- name="grade",
- active=True
- )
- status = CreditRequirementStatus.objects.create(
- username=self.USER_INFO["username"],
- requirement=requirement,
- )
- status.status = "satisfied"
- status.reason = {"final_grade": self.FINAL_GRADE}
- status.save()
-
- CreditEligibility.objects.create(
- username=self.USER_INFO['username'],
- course=CreditCourse.objects.get(course_key=self.course_key)
- )
diff --git a/openedx/core/djangoapps/credit/tests/test_models.py b/openedx/core/djangoapps/credit/tests/test_models.py
index 9f0f14d160..733876f198 100644
--- a/openedx/core/djangoapps/credit/tests/test_models.py
+++ b/openedx/core/djangoapps/credit/tests/test_models.py
@@ -5,7 +5,6 @@ Tests for credit course models.
import ddt
from django.test import TestCase
-
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.credit.models import CreditCourse, CreditRequirement
@@ -17,7 +16,7 @@ class CreditEligibilityModelTests(TestCase):
Tests for credit models used to track credit eligibility.
"""
- def setUp(self, **kwargs):
+ def setUp(self):
super(CreditEligibilityModelTests, self).setUp()
self.course_key = CourseKey.from_string("edX/DemoX/Demo_Course")
diff --git a/openedx/core/djangoapps/credit/tests/test_serializers.py b/openedx/core/djangoapps/credit/tests/test_serializers.py
new file mode 100644
index 0000000000..9526627ac2
--- /dev/null
+++ b/openedx/core/djangoapps/credit/tests/test_serializers.py
@@ -0,0 +1,46 @@
+""" Tests for Credit API serializers. """
+
+# pylint: disable=no-member
+from __future__ import unicode_literals
+
+from django.test import TestCase
+
+from openedx.core.djangoapps.credit import serializers
+from openedx.core.djangoapps.credit.tests.factories import CreditProviderFactory, CreditEligibilityFactory
+from student.tests.factories import UserFactory
+
+
+class CreditProviderSerializerTests(TestCase):
+ """ CreditProviderSerializer tests. """
+
+ def test_data(self):
+ """ Verify the correct fields are serialized. """
+ provider = CreditProviderFactory(active=False)
+ serializer = serializers.CreditProviderSerializer(provider)
+ expected = {
+ 'id': provider.provider_id,
+ 'display_name': provider.display_name,
+ 'url': provider.provider_url,
+ 'status_url': provider.provider_status_url,
+ 'description': provider.provider_description,
+ 'enable_integration': provider.enable_integration,
+ 'fulfillment_instructions': provider.fulfillment_instructions,
+ 'thumbnail_url': provider.thumbnail_url,
+ }
+ self.assertDictEqual(serializer.data, expected)
+
+
+class CreditEligibilitySerializerTests(TestCase):
+ """ CreditEligibilitySerializer tests. """
+
+ def test_data(self):
+ """ Verify the correct fields are serialized. """
+ user = UserFactory()
+ eligibility = CreditEligibilityFactory(username=user.username)
+ serializer = serializers.CreditEligibilitySerializer(eligibility)
+ expected = {
+ 'course_key': unicode(eligibility.course.course_key),
+ 'deadline': eligibility.deadline.strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
+ 'username': user.username,
+ }
+ self.assertDictEqual(serializer.data, expected)
diff --git a/openedx/core/djangoapps/credit/tests/test_views.py b/openedx/core/djangoapps/credit/tests/test_views.py
index c7e497d878..f687faea09 100644
--- a/openedx/core/djangoapps/credit/tests/test_views.py
+++ b/openedx/core/djangoapps/credit/tests/test_views.py
@@ -1,6 +1,10 @@
"""
Tests for credit app views.
"""
+
+# pylint: disable=no-member
+
+from __future__ import unicode_literals
import datetime
import json
import unittest
@@ -10,397 +14,98 @@ from django.conf import settings
from django.core.urlresolvers import reverse
from django.test import TestCase, Client
from django.test.utils import override_settings
-from mock import patch
from oauth2_provider.tests.factories import AccessTokenFactory, ClientFactory
from opaque_keys.edx.keys import CourseKey
import pytz
-from student.tests.factories import UserFactory
-from util.date_utils import to_timestamp
-from util.testing import UrlResetMixin
-from openedx.core.djangoapps.credit import api
from openedx.core.djangoapps.credit.signature import signature
+from openedx.core.djangoapps.credit.serializers import CreditProviderSerializer, CreditEligibilitySerializer
+from openedx.core.djangoapps.credit.tests.factories import (
+ CreditProviderFactory,
+ CreditEligibilityFactory,
+ CreditCourseFactory, CreditRequestFactory)
+from student.tests.factories import UserFactory, AdminFactory
from openedx.core.djangoapps.credit.models import (
CreditCourse,
- CreditProvider,
- CreditRequirement,
- CreditRequirementStatus,
- CreditEligibility,
- CreditRequest,
-)
+ CreditProvider, CreditRequest, CreditRequirement, CreditRequirementStatus)
+from util.date_utils import to_timestamp
JSON = 'application/json'
-TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6"
+
+
+class ApiTestCaseMixin(object):
+ """ Mixin to aid with API testing. """
+
+ def assert_error_response(self, response, msg, status_code=400):
+ """ Validate the response's status and detail message. """
+ self.assertEqual(response.status_code, status_code)
+ self.assertDictEqual(response.data, {'detail': msg})
+
+
+class UserMixin(object):
+ """ Test mixin that creates, and authenticates, a new user for every test. """
+ password = 'password'
+ list_path = None
+
+ def setUp(self):
+ super(UserMixin, self).setUp()
+
+ # This value must be set here, as setting it outside of a method results in issues with CMS/Studio tests.
+ if self.list_path:
+ self.path = reverse(self.list_path)
+
+ # Create a user and login, so that we can use session auth for the
+ # tests that aren't specifically testing authentication or authorization.
+ self.user = UserFactory(password=self.password, is_staff=True)
+ self.client.login(username=self.user.username, password=self.password)
+
+
+class AuthMixin(object):
+ """ Test mixin with methods to test OAuth 2.0 and session authentication. """
+
+ def test_authentication_required(self):
+ """ Verify the endpoint requires authentication. """
+ self.client.logout()
+ response = self.client.get(self.path)
+ self.assertEqual(response.status_code, 401)
+
+ def test_oauth(self):
+ """ Verify the endpoint supports authentication via OAuth 2.0. """
+ access_token = AccessTokenFactory(user=self.user, client=ClientFactory()).token
+ headers = {
+ 'HTTP_AUTHORIZATION': 'Bearer ' + access_token
+ }
+ self.client.logout()
+ response = self.client.get(self.path, **headers)
+ self.assertEqual(response.status_code, 200)
+
+ def test_session_auth(self):
+ """ Verify the endpoint supports authentication via session. """
+ self.client.logout()
+ self.client.login(username=self.user.username, password=self.password)
+ response = self.client.get(self.path)
+ self.assertEqual(response.status_code, 200)
@ddt.ddt
-@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
- "hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY
-})
-@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
-class CreditProviderViewTests(UrlResetMixin, TestCase):
- """
- Tests for HTTP end-points used to issue requests to credit providers
- and receive responses approving or denying requests.
- """
+class ReadOnlyMixin(object):
+ """ Test mixin for read-only API endpoints. """
- USERNAME = "ron"
- USER_FULL_NAME = "Ron Weasley"
- PASSWORD = "password"
- PROVIDER_ID = "hogwarts"
- PROVIDER_URL = "https://credit.example.com/request"
- COURSE_KEY = CourseKey.from_string("edX/DemoX/Demo_Course")
- FINAL_GRADE = 0.95
-
- @patch.dict(settings.FEATURES, {"ENABLE_CREDIT_API": True})
- def setUp(self):
- """
- Configure a credit course.
- """
- super(CreditProviderViewTests, self).setUp()
-
- # Create the test user and log in
- self.user = UserFactory(username=self.USERNAME, password=self.PASSWORD)
- self.user.profile.name = self.USER_FULL_NAME
- self.user.profile.save()
-
- success = self.client.login(username=self.USERNAME, password=self.PASSWORD)
- self.assertTrue(success, msg="Could not log in")
-
- # Enable the course for credit
- credit_course = CreditCourse.objects.create(
- course_key=self.COURSE_KEY,
- enabled=True,
- )
-
- # Configure a credit provider for the course
- CreditProvider.objects.create(
- provider_id=self.PROVIDER_ID,
- enable_integration=True,
- provider_url=self.PROVIDER_URL,
- )
-
- # Add a single credit requirement (final grade)
- requirement = CreditRequirement.objects.create(
- course=credit_course,
- namespace="grade",
- name="grade",
- )
-
- # Mark the user as having satisfied the requirement
- # and eligible for credit.
- CreditRequirementStatus.objects.create(
- username=self.USERNAME,
- requirement=requirement,
- status="satisfied",
- reason={"final_grade": self.FINAL_GRADE}
- )
- CreditEligibility.objects.create(
- username=self.USERNAME,
- course=credit_course,
- )
-
- def test_credit_request_and_response(self):
- # Initiate a request
- response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
- self.assertEqual(response.status_code, 200)
-
- # Check that the user's request status is pending
- requests = api.get_credit_requests_for_user(self.USERNAME)
- self.assertEqual(len(requests), 1)
- self.assertEqual(requests[0]["status"], "pending")
-
- # Check request parameters
- content = json.loads(response.content)
- self.assertEqual(content["url"], self.PROVIDER_URL)
- self.assertEqual(content["method"], "POST")
- self.assertEqual(len(content["parameters"]["request_uuid"]), 32)
- self.assertEqual(content["parameters"]["course_org"], "edX")
- self.assertEqual(content["parameters"]["course_num"], "DemoX")
- self.assertEqual(content["parameters"]["course_run"], "Demo_Course")
- self.assertEqual(content["parameters"]["final_grade"], unicode(self.FINAL_GRADE))
- self.assertEqual(content["parameters"]["user_username"], self.USERNAME)
- self.assertEqual(content["parameters"]["user_full_name"], self.USER_FULL_NAME)
- self.assertEqual(content["parameters"]["user_mailing_address"], "")
- self.assertEqual(content["parameters"]["user_country"], "")
-
- # The signature is going to change each test run because the request
- # is assigned a different UUID each time.
- # For this reason, we use the signature function directly
- # (the "signature" parameter will be ignored when calculating the signature).
- # Other unit tests verify that the signature function is working correctly.
- self.assertEqual(
- content["parameters"]["signature"],
- signature(content["parameters"], TEST_CREDIT_PROVIDER_SECRET_KEY)
- )
-
- # Simulate a response from the credit provider
- response = self._credit_provider_callback(
- content["parameters"]["request_uuid"],
- "approved"
- )
- self.assertEqual(response.status_code, 200)
-
- # Check that the user's status is approved
- requests = api.get_credit_requests_for_user(self.USERNAME)
- self.assertEqual(len(requests), 1)
- self.assertEqual(requests[0]["status"], "approved")
-
- def test_request_credit_anonymous_user(self):
- self.client.logout()
- response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
- self.assertEqual(response.status_code, 403)
-
- def test_request_credit_for_another_user(self):
- response = self._create_credit_request("another_user", self.COURSE_KEY)
- self.assertEqual(response.status_code, 403)
-
- @ddt.data(
- # Invalid JSON
- "{",
-
- # Missing required parameters
- json.dumps({"username": USERNAME}),
- json.dumps({"course_key": unicode(COURSE_KEY)}),
-
- # Invalid course key format
- json.dumps({"username": USERNAME, "course_key": "invalid"}),
- )
- def test_create_credit_request_invalid_parameters(self, request_data):
- url = reverse("credit:create_request", args=[self.PROVIDER_ID])
- response = self.client.post(url, data=request_data, content_type=JSON)
- self.assertEqual(response.status_code, 400)
-
- def test_credit_provider_callback_validates_signature(self):
- request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
-
- # Simulate a callback from the credit provider with an invalid signature
- # Since the signature is invalid, we respond with a 403 Not Authorized.
- response = self._credit_provider_callback(request_uuid, "approved", sig="invalid")
- self.assertEqual(response.status_code, 403)
-
- def test_credit_provider_callback_validates_timestamp(self):
- request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
-
- # Simulate a callback from the credit provider with a timestamp too far in the past
- # (slightly more than 15 minutes)
- # Since the message isn't timely, respond with a 403.
- timestamp = to_timestamp(datetime.datetime.now(pytz.UTC) - datetime.timedelta(0, 60 * 15 + 1))
- response = self._credit_provider_callback(request_uuid, "approved", timestamp=timestamp)
- self.assertEqual(response.status_code, 403)
-
- def test_credit_provider_callback_handles_string_timestamp(self):
- request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
-
- # Simulate a callback from the credit provider with a timestamp
- # encoded as a string instead of an integer.
- timestamp = str(to_timestamp(datetime.datetime.now(pytz.UTC)))
- response = self._credit_provider_callback(request_uuid, "approved", timestamp=timestamp)
- self.assertEqual(response.status_code, 200)
-
- def test_credit_provider_callback_is_idempotent(self):
- request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
-
- # Initially, the status should be "pending"
- self._assert_request_status(request_uuid, "pending")
-
- # First call sets the status to approved
- self._credit_provider_callback(request_uuid, "approved")
- self._assert_request_status(request_uuid, "approved")
-
- # Second call succeeds as well; status is still approved
- self._credit_provider_callback(request_uuid, "approved")
- self._assert_request_status(request_uuid, "approved")
-
- @ddt.data(
- # Invalid JSON
- "{",
-
- # Not a dictionary
- "4",
-
- # Invalid timestamp format
- json.dumps({
- "request_uuid": "557168d0f7664fe59097106c67c3f847",
- "status": "approved",
- "timestamp": "invalid",
- "signature": "7685ae1c8f763597ee7ce526685c5ac24353317dbfe087f0ed32a699daf7dc63",
- }),
- )
- def test_credit_provider_callback_invalid_parameters(self, request_data):
- url = reverse("credit:provider_callback", args=[self.PROVIDER_ID])
- response = self.client.post(url, data=request_data, content_type=JSON)
- self.assertEqual(response.status_code, 400)
-
- def test_credit_provider_invalid_status(self):
- response = self._credit_provider_callback("557168d0f7664fe59097106c67c3f847", "invalid")
- self.assertEqual(response.status_code, 400)
-
- def test_credit_provider_key_not_configured(self):
- # Cannot initiate a request because we can't sign it
- with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
- response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
- self.assertEqual(response.status_code, 400)
-
- # Create the request with the secret key configured
- request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
-
- # Callback from the provider is not authorized, because
- # the shared secret isn't configured.
- with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
- response = self._credit_provider_callback(request_uuid, "approved")
- self.assertEqual(response.status_code, 403)
-
- def test_request_associated_with_another_provider(self):
- other_provider_id = "other_provider"
- other_provider_secret_key = "1d01f067a5a54b0b8059f7095a7c636d"
-
- # Create an additional credit provider
- CreditProvider.objects.create(provider_id=other_provider_id, enable_integration=True)
-
- # Initiate a credit request with the first provider
- request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
-
- # Attempt to update the request status for a different provider
- with override_settings(CREDIT_PROVIDER_SECRET_KEYS={other_provider_id: other_provider_secret_key}):
- response = self._credit_provider_callback(
- request_uuid,
- "approved",
- provider_id=other_provider_id,
- secret_key=other_provider_secret_key,
- )
-
- # Response should be a 404 to avoid leaking request UUID values to other providers.
- self.assertEqual(response.status_code, 404)
-
- # Request status should still be "pending"
- self._assert_request_status(request_uuid, "pending")
-
- def _create_credit_request(self, username, course_key):
- """
- Initiate a request for credit.
- """
- url = reverse("credit:create_request", args=[self.PROVIDER_ID])
- return self.client.post(
- url,
- data=json.dumps({
- "username": username,
- "course_key": unicode(course_key),
- }),
- content_type=JSON,
- )
-
- def _create_credit_request_and_get_uuid(self, username, course_key):
- """
- Initiate a request for credit and return the request UUID.
- """
- response = self._create_credit_request(username, course_key)
- self.assertEqual(response.status_code, 200)
- return json.loads(response.content)["parameters"]["request_uuid"]
-
- def _credit_provider_callback(self, request_uuid, status, **kwargs):
- """
- Simulate a response from the credit provider approving
- or rejecting the credit request.
-
- Arguments:
- request_uuid (str): The UUID of the credit request.
- status (str): The status of the credit request.
-
- Keyword Arguments:
- provider_id (str): Identifier for the credit provider.
- secret_key (str): Shared secret key for signing messages.
- timestamp (datetime): Timestamp of the message.
- sig (str): Digital signature to use on messages.
-
- """
- provider_id = kwargs.get("provider_id", self.PROVIDER_ID)
- secret_key = kwargs.get("secret_key", TEST_CREDIT_PROVIDER_SECRET_KEY)
- timestamp = kwargs.get("timestamp", to_timestamp(datetime.datetime.now(pytz.UTC)))
-
- url = reverse("credit:provider_callback", args=[provider_id])
-
- parameters = {
- "request_uuid": request_uuid,
- "status": status,
- "timestamp": timestamp,
- }
- parameters["signature"] = kwargs.get("sig", signature(parameters, secret_key))
-
- return self.client.post(url, data=json.dumps(parameters), content_type=JSON)
-
- def _assert_request_status(self, uuid, expected_status):
- """
- Check the status of a credit request.
- """
- request = CreditRequest.objects.get(uuid=uuid)
- self.assertEqual(request.status, expected_status)
-
- def test_get_providers_detail(self):
- """Verify that the method 'get_provider_detail' returns provider with
- the given provide in 'provider_ids'.
- """
- url = reverse("credit:providers_detail") + "?provider_ids=hogwarts"
- response = self.client.get(url)
- expected = [
- {
- 'enable_integration': True,
- 'description': '',
- 'url': 'https://credit.example.com/request',
- 'status_url': '',
- 'thumbnail_url': '',
- 'fulfillment_instructions': None,
- 'display_name': '',
- 'id': 'hogwarts'
- }
- ]
-
- self.assertListEqual(json.loads(response.content), expected)
-
- def test_get_providers_with_multiple_provider_ids(self):
- """Test that the method 'get_provider_detail' returns multiple
- providers with given 'provider_ids' or when no provider in
- 'provider_ids' is given.
- """
- # Add another credit provider for the course
- CreditProvider.objects.create(
- provider_id='dummy_id',
- enable_integration=True,
- provider_url='https://example.com',
- )
-
- # verify that all the matching providers are returned when provider ids
- # are given in parameter 'provider_ids'
- url = reverse("credit:providers_detail") + "?provider_ids=hogwarts,dummy_id"
- response = self.client.get(url)
- self.assertEquals(len(json.loads(response.content)), 2)
-
- # verify that all providers are returned when no provider id in
- # parameter 'provider_ids' is provided
- url = reverse("credit:providers_detail")
- response = self.client.get(url)
- self.assertEquals(len(json.loads(response.content)), 2)
+ @ddt.data('delete', 'post', 'put')
+ def test_readonly(self, method):
+ """ Verify the viewset does not allow CreditProvider objects to be created or modified. """
+ response = getattr(self.client, method)(self.path)
+ self.assertEqual(response.status_code, 405)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
-class CreditCourseViewSetTests(TestCase):
+class CreditCourseViewSetTests(UserMixin, TestCase):
""" Tests for the CreditCourse endpoints.
GET/POST /api/v1/credit/creditcourse/
GET/PUT /api/v1/credit/creditcourse/:course_id/
"""
- password = 'password'
-
- def setUp(self):
- super(CreditCourseViewSetTests, self).setUp()
-
- # This value must be set here, as setting it outside of a method results in issues with CMS/Studio tests.
- self.path = reverse('credit:creditcourse-list')
-
- # Create a user and login, so that we can use session auth for the
- # tests that aren't specifically testing authentication or authorization.
- user = UserFactory(password=self.password, is_staff=True)
- self.client.login(username=user.username, password=self.password)
+ list_path = 'credit:creditcourse-list'
def _serialize_credit_course(self, credit_course):
""" Serializes a CreditCourse to a Python dict. """
@@ -442,7 +147,7 @@ class CreditCourseViewSetTests(TestCase):
self.assertIn('CSRF', response.content)
# Retrieve a CSRF token
- response = client.get('/dashboard')
+ response = client.get('/')
csrf_token = response.cookies[settings.CSRF_COOKIE_NAME].value # pylint: disable=no-member
self.assertGreater(len(csrf_token), 0)
@@ -552,3 +257,410 @@ class CreditCourseViewSetTests(TestCase):
# Verify the data was persisted
credit_course = CreditCourse.objects.get(course_key=credit_course.course_key)
self.assertTrue(credit_course.enabled)
+
+
+@ddt.ddt
+@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
+class CreditProviderViewSetTests(ApiTestCaseMixin, ReadOnlyMixin, AuthMixin, UserMixin, TestCase):
+ """ Tests for CreditProviderViewSet. """
+ list_path = 'credit:creditprovider-list'
+
+ @classmethod
+ def setUpClass(cls):
+ super(CreditProviderViewSetTests, cls).setUpClass()
+ cls.bayside = CreditProviderFactory(provider_id='bayside')
+ cls.hogwarts = CreditProviderFactory(provider_id='hogwarts')
+ cls.starfleet = CreditProviderFactory(provider_id='starfleet')
+
+ def test_list(self):
+ """ Verify the endpoint returns a list of all CreditProvider objects. """
+ response = self.client.get(self.path)
+ self.assertEqual(response.status_code, 200)
+
+ expected = CreditProviderSerializer(CreditProvider.objects.all(), many=True).data
+ self.assertEqual(response.data, expected)
+
+ @ddt.data(
+ ('bayside',),
+ ('hogwarts', 'starfleet')
+ )
+ def test_list_filtering(self, provider_ids):
+ """ Verify the endpoint returns a list of all CreditProvider objects, filtered to contain only those objects
+ associated with the given IDs. """
+ url = '{}?provider_ids={}'.format(self.path, ','.join(provider_ids))
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ expected = CreditProviderSerializer(CreditProvider.objects.filter(provider_id__in=provider_ids),
+ many=True).data
+ self.assertEqual(response.data, expected)
+
+ def test_retrieve(self):
+ """ Verify the endpoint returns the details for a single CreditProvider. """
+ url = reverse('credit:creditprovider-detail', kwargs={'provider_id': self.bayside.provider_id})
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.data, CreditProviderSerializer(self.bayside).data)
+
+
+@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
+class CreditProviderRequestCreateViewTests(ApiTestCaseMixin, UserMixin, TestCase):
+ """ Tests for CreditProviderRequestCreateView. """
+
+ @classmethod
+ def setUpClass(cls):
+ super(CreditProviderRequestCreateViewTests, cls).setUpClass()
+ cls.provider = CreditProviderFactory()
+
+ def setUp(self):
+ super(CreditProviderRequestCreateViewTests, self).setUp()
+ self.path = reverse('credit:create_request', kwargs={'provider_id': self.provider.provider_id})
+ self.eligibility = CreditEligibilityFactory(username=self.user.username)
+
+ def post_credit_request(self, username, course_id):
+ """ Create a credit request for the given user and course. """
+ data = {
+ 'username': username,
+ 'course_key': unicode(course_id)
+ }
+ return self.client.post(self.path, json.dumps(data), content_type=JSON)
+
+ def test_post_with_provider_integration(self):
+ """ Verify the endpoint can create a new credit request. """
+ username = self.user.username
+ course = self.eligibility.course
+ course_key = course.course_key
+ final_grade = 0.95
+
+ # Enable provider integration
+ self.provider.enable_integration = True
+ self.provider.save()
+
+ # Add a single credit requirement (final grade)
+ requirement = CreditRequirement.objects.create(
+ course=course,
+ namespace='grade',
+ name='grade',
+ )
+
+ # Mark the user as having satisfied the requirement and eligible for credit.
+ CreditRequirementStatus.objects.create(
+ username=username,
+ requirement=requirement,
+ status='satisfied',
+ reason={'final_grade': final_grade}
+ )
+
+ secret_key = 'secret'
+ with override_settings(CREDIT_PROVIDER_SECRET_KEYS={self.provider.provider_id: secret_key}):
+ response = self.post_credit_request(username, course_key)
+ self.assertEqual(response.status_code, 200)
+
+ # Check that the user's request status is pending
+ request = CreditRequest.objects.get(username=username, course__course_key=course_key)
+ self.assertEqual(request.status, 'pending')
+
+ # Check request parameters
+ content = json.loads(response.content)
+ parameters = content['parameters']
+
+ self.assertEqual(content['url'], self.provider.provider_url)
+ self.assertEqual(content['method'], 'POST')
+ self.assertEqual(len(parameters['request_uuid']), 32)
+ self.assertEqual(parameters['course_org'], course_key.org)
+ self.assertEqual(parameters['course_num'], course_key.course)
+ self.assertEqual(parameters['course_run'], course_key.run)
+ self.assertEqual(parameters['final_grade'], unicode(final_grade))
+ self.assertEqual(parameters['user_username'], username)
+ self.assertEqual(parameters['user_full_name'], self.user.get_full_name())
+ self.assertEqual(parameters['user_mailing_address'], '')
+ self.assertEqual(parameters['user_country'], '')
+
+ # The signature is going to change each test run because the request
+ # is assigned a different UUID each time.
+ # For this reason, we use the signature function directly
+ # (the "signature" parameter will be ignored when calculating the signature).
+ # Other unit tests verify that the signature function is working correctly.
+ self.assertEqual(parameters['signature'], signature(parameters, secret_key))
+
+ def test_post_invalid_provider(self):
+ """ Verify the endpoint returns HTTP 404 if the credit provider is not valid. """
+ path = reverse('credit:create_request', kwargs={'provider_id': 'fake'})
+ response = self.client.post(path, {})
+ self.assertEqual(response.status_code, 404)
+
+ def test_post_no_username(self):
+ """ Verify the endpoint returns HTTP 400 if no username is supplied. """
+ response = self.post_credit_request(None, 'a/b/c')
+ self.assert_error_response(response, 'A username must be specified.')
+
+ def test_post_invalid_course_key(self):
+ """ Verify the endpoint returns HTTP 400 if the course is not a valid course key. """
+ course_key = 'not-a-course-id'
+ response = self.post_credit_request(self.user.username, course_key)
+ self.assert_error_response(response, '[{}] is not a valid course key.'.format(course_key))
+
+ def test_post_user_not_eligible(self):
+ """ Verify the endpoint returns HTTP 400 if the user is not eligible for credit for the course. """
+ credit_course = CreditCourseFactory()
+ username = 'ineligible-user'
+ course_key = credit_course.course_key
+
+ response = self.post_credit_request(username, course_key)
+ msg = '[{username}] is not eligible for credit for [{course_key}].'.format(username=username,
+ course_key=course_key)
+ self.assert_error_response(response, msg)
+
+ def test_post_permissions_staff(self):
+ """ Verify staff users can create requests for any user. """
+ admin = AdminFactory(password=self.password)
+ self.client.logout()
+ self.client.login(username=admin.username, password=self.password)
+ response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
+ self.assertEqual(response.status_code, 200)
+
+ def test_post_other_user(self):
+ """ Verify non-staff users cannot create requests for other users. """
+ user = UserFactory(password=self.password)
+ self.client.logout()
+ self.client.login(username=user.username, password=self.password)
+ response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
+ self.assertEqual(response.status_code, 403)
+
+ def test_post_no_provider_integration(self):
+ """ Verify the endpoint returns the provider URL if provider integration is not enabled. """
+ response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
+ self.assertEqual(response.status_code, 200)
+ expected = {
+ 'url': self.provider.provider_url,
+ 'method': 'GET',
+ 'parameters': {},
+ }
+ self.assertEqual(response.data, expected)
+
+ def test_post_secret_key_not_set(self):
+ """ Verify the endpoint returns HTTP 400 if we attempt to create a
+ request for a provider with no secret key set. """
+ # Enable provider integration
+ self.provider.enable_integration = True
+ self.provider.save()
+
+ # Cannot initiate a request because we cannot sign it
+ with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
+ response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
+ self.assertEqual(response.status_code, 400)
+
+
+@ddt.ddt
+@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
+class CreditProviderCallbackViewTests(UserMixin, TestCase):
+ """ Tests for CreditProviderCallbackView. """
+
+ def setUp(self):
+ super(CreditProviderCallbackViewTests, self).setUp()
+
+ # Authentication should NOT be required for this endpoint.
+ self.client.logout()
+
+ self.provider = CreditProviderFactory()
+ self.path = reverse('credit:provider_callback', args=[self.provider.provider_id])
+ self.eligibility = CreditEligibilityFactory(username=self.user.username)
+
+ def _credit_provider_callback(self, request_uuid, status, **kwargs):
+ """
+ Simulate a response from the credit provider approving
+ or rejecting the credit request.
+
+ Arguments:
+ request_uuid (str): The UUID of the credit request.
+ status (str): The status of the credit request.
+
+ Keyword Arguments:
+ provider_id (str): Identifier for the credit provider.
+ secret_key (str): Shared secret key for signing messages.
+ timestamp (datetime): Timestamp of the message.
+ sig (str): Digital signature to use on messages.
+ keys (dict): Override for CREDIT_PROVIDER_SECRET_KEYS setting.
+
+ """
+ provider_id = kwargs.get('provider_id', self.provider.provider_id)
+ secret_key = kwargs.get('secret_key', '931433d583c84ca7ba41784bad3232e6')
+ timestamp = kwargs.get('timestamp', to_timestamp(datetime.datetime.now(pytz.UTC)))
+ keys = kwargs.get('keys', {self.provider.provider_id: secret_key})
+
+ url = reverse('credit:provider_callback', args=[provider_id])
+
+ parameters = {
+ 'request_uuid': request_uuid,
+ 'status': status,
+ 'timestamp': timestamp,
+ }
+ parameters['signature'] = kwargs.get('sig', signature(parameters, secret_key))
+
+ with override_settings(CREDIT_PROVIDER_SECRET_KEYS=keys):
+ return self.client.post(url, data=json.dumps(parameters), content_type=JSON)
+
+ def _create_credit_request_and_get_uuid(self, username=None, course_key=None):
+ """ Initiate a request for credit and return the request UUID. """
+ username = username or self.user.username
+ course = CreditCourse.objects.get(course_key=course_key) if course_key else self.eligibility.course
+ credit_request = CreditRequestFactory(username=username, course=course, provider=self.provider)
+ return credit_request.uuid
+
+ def _assert_request_status(self, uuid, expected_status):
+ """ Check the status of a credit request. """
+ request = CreditRequest.objects.get(uuid=uuid)
+ self.assertEqual(request.status, expected_status)
+
+ def test_post_invalid_provider_id(self):
+ """ Verify the endpoint returns HTTP 404 if the provider does not exist. """
+ provider_id = 'fakey-provider'
+ self.assertFalse(CreditProvider.objects.filter(provider_id=provider_id).exists())
+
+ path = reverse('credit:provider_callback', args=[provider_id])
+ response = self.client.post(path, {})
+ self.assertEqual(response.status_code, 404)
+
+ def test_post_with_invalid_signature(self):
+ """ Verify the endpoint returns HTTP 403 if a request is received with an invalid signature. """
+ request_uuid = self._create_credit_request_and_get_uuid()
+
+ # Simulate a callback from the credit provider with an invalid signature
+ # Since the signature is invalid, we respond with a 403 Not Authorized.
+ response = self._credit_provider_callback(request_uuid, "approved", sig="invalid")
+ self.assertEqual(response.status_code, 403)
+
+ @ddt.data(
+ to_timestamp(datetime.datetime.now(pytz.UTC) - datetime.timedelta(0, 60 * 15 + 1)),
+ 'invalid'
+ )
+ def test_post_with_invalid_timestamp(self, timestamp):
+ """ Verify HTTP 400 is returned for requests with an invalid timestamp. """
+ request_uuid = self._create_credit_request_and_get_uuid()
+ response = self._credit_provider_callback(request_uuid, 'approved', timestamp=timestamp)
+ self.assertEqual(response.status_code, 400)
+
+ def test_post_with_string_timestamp(self):
+ """ Verify the endpoint supports timestamps transmitted as strings instead of integers. """
+ request_uuid = self._create_credit_request_and_get_uuid()
+ timestamp = str(to_timestamp(datetime.datetime.now(pytz.UTC)))
+ response = self._credit_provider_callback(request_uuid, 'approved', timestamp=timestamp)
+ self.assertEqual(response.status_code, 200)
+
+ def test_credit_provider_callback_is_idempotent(self):
+ """ Verify clients can make subsequent calls with the same status. """
+ request_uuid = self._create_credit_request_and_get_uuid()
+
+ # Initially, the status should be "pending"
+ self._assert_request_status(request_uuid, "pending")
+
+ # First call sets the status to approved
+ self._credit_provider_callback(request_uuid, 'approved')
+ self._assert_request_status(request_uuid, "approved")
+
+ # Second call succeeds as well; status is still approved
+ self._credit_provider_callback(request_uuid, 'approved')
+ self._assert_request_status(request_uuid, "approved")
+
+ def test_credit_provider_invalid_status(self):
+ """ Verify requests with an invalid status value return HTTP 400. """
+ request_uuid = self._create_credit_request_and_get_uuid()
+ response = self._credit_provider_callback(request_uuid, 'invalid')
+ self.assertEqual(response.status_code, 400)
+
+ def test_request_associated_with_another_provider(self):
+ """ Verify the endpoint returns HTTP 404 if a request is received for the incorrect provider. """
+ other_provider_id = 'other-provider'
+ other_provider_secret_key = '1d01f067a5a54b0b8059f7095a7c636d'
+
+ # Create an additional credit provider
+ CreditProvider.objects.create(provider_id=other_provider_id, enable_integration=True)
+
+ # Initiate a credit request with the first provider
+ request_uuid = self._create_credit_request_and_get_uuid()
+
+ # Attempt to update the request status for a different provider
+ response = self._credit_provider_callback(
+ request_uuid,
+ 'approved',
+ provider_id=other_provider_id,
+ secret_key=other_provider_secret_key,
+ keys={other_provider_id: other_provider_secret_key}
+ )
+
+ # Response should be a 404 to avoid leaking request UUID values to other providers.
+ self.assertEqual(response.status_code, 404)
+
+ # Request status should still be 'pending'
+ self._assert_request_status(request_uuid, 'pending')
+
+ def test_credit_provider_key_not_configured(self):
+ """ Verify the endpoint returns HTTP 403 if the provider has no key configured. """
+ request_uuid = self._create_credit_request_and_get_uuid()
+
+ # Callback from the provider is not authorized, because the shared secret isn't configured.
+ with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
+ response = self._credit_provider_callback(request_uuid, 'approved', keys={})
+ self.assertEqual(response.status_code, 403)
+
+
+@ddt.ddt
+@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
+class CreditEligibilityViewTests(AuthMixin, UserMixin, ReadOnlyMixin, TestCase):
+ """ Tests for CreditEligibilityView. """
+ view_name = 'credit:eligibility_details'
+
+ def setUp(self):
+ super(CreditEligibilityViewTests, self).setUp()
+ self.eligibility = CreditEligibilityFactory(username=self.user.username)
+ self.path = self.create_url(self.eligibility)
+
+ def create_url(self, eligibility):
+ """ Returns a URL that can be used to view eligibility data. """
+ return '{path}?username={username}&course_key={course_key}'.format(
+ path=reverse(self.view_name),
+ username=eligibility.username,
+ course_key=eligibility.course.course_key
+ )
+
+ def assert_valid_get_response(self, eligibility):
+ """ Ensure the endpoint returns the correct eligibility data. """
+ url = self.create_url(eligibility)
+ response = self.client.get(url)
+
+ self.assertEqual(response.status_code, 200)
+ self.assertListEqual(response.data, CreditEligibilitySerializer([eligibility], many=True).data)
+
+ def test_get(self):
+ """ Verify the endpoint returns eligibility information for the give user and course. """
+ self.assert_valid_get_response(self.eligibility)
+
+ def test_get_with_missing_parameters(self):
+ """ Verify the endpoint returns HTTP status 400 if either the username or course_key querystring argument
+ is not provided. """
+ response = self.client.get(reverse(self.view_name))
+ self.assertEqual(response.status_code, 400)
+ self.assertDictEqual(response.data,
+ {'detail': 'Both the course_key and username querystring parameters must be supplied.'})
+
+ def test_get_with_invalid_course_key(self):
+ """ Verify the endpoint returns HTTP status 400 if the provided course_key is not an actual CourseKey. """
+ url = '{}?username=edx&course_key=a'.format(reverse(self.view_name))
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 400)
+ self.assertDictEqual(response.data, {'detail': '[a] is not a valid course key.'})
+
+ def test_staff_can_view_all(self):
+ """ Verify that staff users can view eligibility data for all users. """
+ staff = AdminFactory(password=self.password)
+ self.client.logout()
+ self.client.login(username=staff.username, password=self.password)
+ self.assert_valid_get_response(self.eligibility)
+
+ def test_nonstaff_can_only_view_own_data(self):
+ """ Verify that non-staff users can only view their own eligibility data. """
+ user = UserFactory(password=self.password)
+ self.client.logout()
+ self.client.login(username=user.username, password=self.password)
+ response = self.client.get(self.path)
+ self.assertEqual(response.status_code, 403)
diff --git a/openedx/core/djangoapps/credit/urls.py b/openedx/core/djangoapps/credit/urls.py
index a5e92ffc88..1f495acb3a 100644
--- a/openedx/core/djangoapps/credit/urls.py
+++ b/openedx/core/djangoapps/credit/urls.py
@@ -3,47 +3,25 @@ URLs for the credit app.
"""
from django.conf.urls import patterns, url, include
-from openedx.core.djangoapps.credit import views, routers
-from openedx.core.djangoapps.credit.api.provider import get_credit_provider_info
+from openedx.core.djangoapps.credit import views, routers, models
-PROVIDER_ID_PATTERN = r'(?P[^/]+)'
+PROVIDER_ID_PATTERN = r'(?P{})'.format(models.CREDIT_PROVIDER_ID_REGEX)
+
+PROVIDER_URLS = patterns(
+ '',
+ url(r'^request/$', views.CreditProviderRequestCreateView.as_view(), name='create_request'),
+ url(r'^callback/?$', views.CreditProviderCallbackView.as_view(), name='provider_callback'),
+)
V1_URLS = patterns(
'',
-
- url(
- r'^providers/$',
- views.get_providers_detail,
- name='providers_detail'
- ),
-
- url(
- r'^providers/{provider_id}/$'.format(provider_id=PROVIDER_ID_PATTERN),
- get_credit_provider_info,
- name='get_provider_info'
- ),
-
- url(
- r'^providers/{provider_id}/request/$'.format(provider_id=PROVIDER_ID_PATTERN),
- views.create_credit_request,
- name='create_request'
- ),
-
- url(
- r'^providers/{provider_id}/callback/?$'.format(provider_id=PROVIDER_ID_PATTERN),
- views.credit_provider_callback,
- name='provider_callback'
- ),
-
- url(
- r'^eligibility/$',
- views.get_eligibility_for_user,
- name='eligibility_details'
- ),
+ url(r'^providers/{}/'.format(PROVIDER_ID_PATTERN), include(PROVIDER_URLS)),
+ url(r'^eligibility/$', views.CreditEligibilityView.as_view(), name='eligibility_details'),
)
router = routers.SimpleRouter() # pylint: disable=invalid-name
router.register(r'courses', views.CreditCourseViewSet)
+router.register(r'providers', views.CreditProviderViewSet)
V1_URLS += router.urls
urlpatterns = patterns(
diff --git a/openedx/core/djangoapps/credit/views.py b/openedx/core/djangoapps/credit/views.py
index 03ebf8f19f..1be3f90cfe 100644
--- a/openedx/core/djangoapps/credit/views.py
+++ b/openedx/core/djangoapps/credit/views.py
@@ -1,376 +1,157 @@
"""
Views for the credit Django app.
"""
-import datetime
-import json
+from __future__ import unicode_literals
import logging
+import datetime
from django.conf import settings
-from django.http import (
- HttpResponse,
- HttpResponseBadRequest,
- HttpResponseForbidden,
- Http404
-)
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
-from django.views.decorators.http import require_POST, require_GET
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
import pytz
-from rest_framework import viewsets, mixins, permissions
+from rest_framework import viewsets, mixins, permissions, views, generics
from rest_framework.authentication import SessionAuthentication
+from rest_framework.exceptions import ValidationError
+from rest_framework.response import Response
from rest_framework_oauth.authentication import OAuth2Authentication
-from openedx.core.djangoapps.credit import api
-from openedx.core.djangoapps.credit.exceptions import CreditApiBadRequest, CreditRequestNotFound
-from openedx.core.djangoapps.credit.models import CreditCourse
-from openedx.core.djangoapps.credit.serializers import CreditCourseSerializer
-from openedx.core.djangoapps.credit.signature import signature, get_shared_secret_key
+from openedx.core.djangoapps.credit.api import create_credit_request
+from openedx.core.djangoapps.credit.exceptions import (UserNotEligibleException, InvalidCourseKey, CreditApiBadRequest,
+ InvalidCreditRequest)
+from openedx.core.djangoapps.credit.models import (CreditCourse, CreditProvider, CREDIT_PROVIDER_ID_REGEX,
+ CreditEligibility, CreditRequest)
+from openedx.core.djangoapps.credit.serializers import (CreditCourseSerializer, CreditProviderSerializer,
+ CreditEligibilitySerializer, CreditProviderCallbackSerializer)
from openedx.core.lib.api.mixins import PutAsCreateMixin
-from util.date_utils import from_timestamp
-from util.json_request import JsonResponse
+from openedx.core.lib.api.permissions import IsStaffOrOwner
log = logging.getLogger(__name__)
-@require_GET
-def get_providers_detail(request):
- """
+class CreditProviderViewSet(viewsets.ReadOnlyModelViewSet):
+ """ Credit provider endpoints. """
- **User Cases**
+ lookup_field = 'provider_id'
+ lookup_value_regex = CREDIT_PROVIDER_ID_REGEX
+ authentication_classes = (OAuth2Authentication, SessionAuthentication,)
+ pagination_class = None
+ permission_classes = (permissions.IsAuthenticated,)
+ queryset = CreditProvider.objects.all()
+ serializer_class = CreditProviderSerializer
- Returns details of the credit providers filtered by provided query parameters.
+ def filter_queryset(self, queryset):
+ queryset = super(CreditProviderViewSet, self).filter_queryset(queryset)
- **Parameters:**
+ # Filter by provider ID
+ provider_ids = self.request.GET.get('provider_ids', None)
- * provider_id (list of provider ids separated with ","): The identifiers for the providers for which
- user requested
+ if provider_ids:
+ provider_ids = provider_ids.split(',')
+ queryset = queryset.filter(provider_id__in=provider_ids)
- **Example Usage:**
-
- GET /api/credit/v1/providers?provider_id=asu,hogwarts
- "response": [
- "id": "hogwarts",
- "display_name": "Hogwarts School of Witchcraft and Wizardry",
- "url": "https://credit.example.com/",
- "status_url": "https://credit.example.com/status/",
- "description": "A new model for the Witchcraft and Wizardry School System.",
- "enable_integration": false,
- "fulfillment_instructions": "
-
In order to fulfill credit, Hogwarts School of Witchcraft and Wizardry requires learners to:
-
-
Sample instruction abc
-
Sample instruction xyz
-
",
- },
- ...
- ]
-
- **Responses:**
-
- * 200 OK: The request was created successfully. Returned content
- is a JSON-encoded dictionary describing what the client should
- send to the credit provider.
-
- * 404 Not Found: The provider does not exist.
-
- """
- provider_ids = request.GET.get("provider_ids", None)
- providers_list = provider_ids.split(",") if provider_ids else None
- providers = api.get_credit_providers(providers_list)
- return JsonResponse(providers)
+ return queryset
-@require_POST
-def create_credit_request(request, provider_id):
- """
- Initiate a request for credit in a course.
+class CreditProviderRequestCreateView(views.APIView):
+ """ Creates a credit request for the given user and course, if the user is eligible for credit."""
- This end-point will get-or-create a record in the database to track
- the request. It will then calculate the parameters to send to
- the credit provider and digitally sign the parameters, using a secret
- key shared with the credit provider.
+ authentication_classes = (OAuth2Authentication, SessionAuthentication,)
+ permission_classes = (permissions.IsAuthenticated, IsStaffOrOwner,)
- The user's browser is responsible for POSTing these parameters
- directly to the credit provider.
+ def post(self, request, provider_id):
+ """ POST handler. """
+ # Get the provider, or return HTTP 404 if it doesn't exist
+ provider = generics.get_object_or_404(CreditProvider, provider_id=provider_id)
- **Example Usage:**
+ # Validate the course key
+ course_key = request.data.get('course_key')
+ try:
+ course_key = CourseKey.from_string(course_key)
+ except InvalidKeyError:
+ raise InvalidCourseKey(course_key)
- POST /api/credit/v1/providers/hogwarts/request/
- {
- "username": "ron",
- "course_key": "edX/DemoX/Demo_Course"
- }
+ # Validate the username
+ username = request.data.get('username')
+ if not username:
+ raise ValidationError({'detail': 'A username must be specified.'})
- Response: 200 OK
- Content-Type: application/json
- {
- "url": "http://example.com/request-credit",
- "method": "POST",
- "parameters": {
- request_uuid: "557168d0f7664fe59097106c67c3f847"
- timestamp: 1434631630,
- course_org: "ASUx"
- course_num: "DemoX"
- course_run: "1T2015"
- final_grade: "0.95",
- user_username: "john",
- user_email: "john@example.com"
- user_full_name: "John Smith"
- user_mailing_address: "",
- user_country: "US",
- signature: "cRCNjkE4IzY+erIjRwOQCpRILgOvXx4q2qvx141BCqI="
- }
- }
+ # Ensure the user is actually eligible to receive credit
+ if not CreditEligibility.is_user_eligible_for_credit(course_key, username):
+ raise UserNotEligibleException(course_key, username)
- **Parameters:**
+ try:
+ credit_request = create_credit_request(course_key, provider.provider_id, username)
+ return Response(credit_request)
+ except CreditApiBadRequest as ex:
+ raise InvalidCreditRequest(ex.message)
- * username (unicode): The username of the user requesting credit.
- * course_key (unicode): The identifier for the course for which the user
- is requesting credit.
+class CreditProviderCallbackView(views.APIView):
+ """ Callback used by credit providers to update credit request status. """
- **Responses:**
+ # This endpoint should be open to all external credit providers.
+ authentication_classes = ()
+ permission_classes = ()
- * 200 OK: The request was created successfully. Returned content
- is a JSON-encoded dictionary describing what the client should
- send to the credit provider.
+ @method_decorator(csrf_exempt)
+ def dispatch(self, request, *args, **kwargs):
+ return super(CreditProviderCallbackView, self).dispatch(request, *args, **kwargs)
- * 400 Bad Request:
- - The provided course key did not correspond to a valid credit course.
- - The user already has a completed credit request for this course and provider.
+ def post(self, request, provider_id):
+ """ POST handler. """
+ provider = generics.get_object_or_404(CreditProvider, provider_id=provider_id)
+ data = request.data
- * 403 Not Authorized:
- - The username does not match the name of the logged in user.
- - The user is not eligible for credit in the course.
+ # Ensure the input data is valid
+ serializer = CreditProviderCallbackSerializer(data=data, provider=provider)
+ serializer.is_valid(raise_exception=True)
- * 404 Not Found:
- - The provider does not exist.
+ # Update the credit request status
+ request_uuid = data['request_uuid']
+ new_status = data['status']
+ credit_request = generics.get_object_or_404(CreditRequest, uuid=request_uuid, provider=provider)
+ old_status = credit_request.status
+ credit_request.status = new_status
+ credit_request.save()
- """
- response, parameters = _validate_json_parameters(request.body, ["username", "course_key"])
- if response is not None:
- return response
-
- try:
- course_key = CourseKey.from_string(parameters["course_key"])
- except InvalidKeyError:
- return HttpResponseBadRequest(
- u'Could not parse "{course_key}" as a course key'.format(
- course_key=parameters["course_key"]
- )
+ log.info(
+ 'Updated [%s] CreditRequest [%s] from status [%s] to [%s].',
+ provider_id, request_uuid, old_status, new_status
)
- # Check user authorization
- if not (request.user and request.user.username == parameters["username"]):
- log.warning(
- u'User with ID %s attempted to initiate a credit request for user with username "%s"',
- request.user.id if request.user else "[Anonymous]",
- parameters["username"]
+ return Response()
+
+
+class CreditEligibilityView(generics.ListAPIView):
+ """ Returns eligibility for a user-course combination. """
+
+ authentication_classes = (OAuth2Authentication, SessionAuthentication,)
+ pagination_class = None
+ permission_classes = (permissions.IsAuthenticated, IsStaffOrOwner)
+ serializer_class = CreditEligibilitySerializer
+ queryset = CreditEligibility.objects.all()
+
+ def filter_queryset(self, queryset):
+ username = self.request.GET.get('username')
+ course_key = self.request.GET.get('course_key')
+
+ if not (username and course_key):
+ raise ValidationError(
+ {'detail': 'Both the course_key and username querystring parameters must be supplied.'})
+
+ course_key = unicode(course_key)
+
+ try:
+ course_key = CourseKey.from_string(course_key)
+ except InvalidKeyError:
+ raise ValidationError({'detail': '[{}] is not a valid course key.'.format(course_key)})
+ return queryset.filter(
+ username=username,
+ course__course_key=course_key,
+ deadline__gt=datetime.datetime.now(pytz.UTC)
)
- return HttpResponseForbidden("Users are not allowed to initiate credit requests for other users.")
-
- # Initiate the request
- try:
- credit_request = api.create_credit_request(course_key, provider_id, parameters["username"])
- except CreditApiBadRequest as ex:
- return HttpResponseBadRequest(ex)
- else:
- return JsonResponse(credit_request)
-
-
-@require_POST
-@csrf_exempt
-def credit_provider_callback(request, provider_id):
- """
- Callback end-point used by credit providers to approve or reject
- a request for credit.
-
- **Example Usage:**
-
- POST /api/credit/v1/providers/{provider-id}/callback
- {
- "request_uuid": "557168d0f7664fe59097106c67c3f847",
- "status": "approved",
- "timestamp": 1434631630,
- "signature": "cRCNjkE4IzY+erIjRwOQCpRILgOvXx4q2qvx141BCqI="
- }
-
- Response: 200 OK
-
- **Parameters:**
-
- * request_uuid (string): The UUID of the request.
-
- * status (string): Either "approved" or "rejected".
-
- * timestamp (int or string): The datetime at which the POST request was made, represented
- as the number of seconds since January 1, 1970 00:00:00 UTC.
- If the timestamp is a string, it will be converted to an integer.
-
- * signature (string): A digital signature of the request parameters,
- created using a secret key shared with the credit provider.
-
- **Responses:**
-
- * 200 OK: The user's status was updated successfully.
-
- * 400 Bad request: The provided parameters were not valid.
- Response content will be a JSON-encoded string describing the error.
-
- * 403 Forbidden: Signature was invalid or timestamp was too far in the past.
-
- * 404 Not Found: Could not find a request with the specified UUID associated with this provider.
-
- """
- response, parameters = _validate_json_parameters(request.body, [
- "request_uuid", "status", "timestamp", "signature"
- ])
- if response is not None:
- return response
-
- # Validate the digital signature of the request.
- # This ensures that the message came from the credit provider
- # and hasn't been tampered with.
- response = _validate_signature(parameters, provider_id)
- if response is not None:
- return response
-
- # Validate the timestamp to ensure that the request is timely.
- response = _validate_timestamp(parameters["timestamp"], provider_id)
- if response is not None:
- return response
-
- # Update the credit request status
- try:
- api.update_credit_request_status(parameters["request_uuid"], provider_id, parameters["status"])
- except CreditRequestNotFound:
- raise Http404
- except CreditApiBadRequest as ex:
- return HttpResponseBadRequest(ex)
- else:
- return HttpResponse()
-
-
-@require_GET
-def get_eligibility_for_user(request):
- """
-
- **User Cases**
-
- Retrieve user eligibility against course.
-
- **Parameters:**
-
- * course_key (unicode): Identifier of course.
- * username (unicode): Username of current User.
-
- **Example Usage:**
-
- GET /api/credit/v1/eligibility?username=user&course_key=edX/Demo_101/Fall
- "response": {
- "course_key": "edX/Demo_101/Fall",
- "deadline": "2015-10-23"
- }
-
- **Responses:**
-
- * 200 OK: The request was created successfully.
-
- * 404 Not Found: The provider does not exist.
-
- """
- course_key = request.GET.get("course_key", None)
- username = request.GET.get("username", None)
- return JsonResponse(api.get_eligibilities_for_user(username=username, course_key=course_key))
-
-
-def _validate_json_parameters(params_string, expected_parameters):
- """
- Load the request parameters as a JSON dictionary and check that
- all required paramters are present.
-
- Arguments:
- params_string (unicode): The JSON-encoded parameter dictionary.
- expected_parameters (list): Required keys of the parameters dictionary.
-
- Returns: tuple of (HttpResponse, dict)
-
- """
- try:
- parameters = json.loads(params_string)
- except (TypeError, ValueError):
- return HttpResponseBadRequest("Could not parse the request body as JSON."), None
-
- if not isinstance(parameters, dict):
- return HttpResponseBadRequest("Request parameters must be a JSON-encoded dictionary."), None
-
- missing_params = set(expected_parameters) - set(parameters.keys())
- if missing_params:
- msg = u"Required parameters are missing: {missing}".format(missing=u", ".join(missing_params))
- return HttpResponseBadRequest(msg), None
-
- return None, parameters
-
-
-def _validate_signature(parameters, provider_id):
- """
- Check that the signature from the credit provider is valid.
-
- Arguments:
- parameters (dict): Parameters received from the credit provider.
- provider_id (unicode): Identifier for the credit provider.
-
- Returns:
- HttpResponseForbidden or None
-
- """
- secret_key = get_shared_secret_key(provider_id)
- if secret_key is None:
- log.error(
- (
- u'Could not retrieve secret key for credit provider with ID "%s". '
- u'Since no key has been configured, we cannot validate requests from the credit provider.'
- ), provider_id
- )
- return HttpResponseForbidden("Credit provider credentials have not been configured.")
-
- if signature(parameters, secret_key) != parameters["signature"]:
- log.warning(u'Request from credit provider with ID "%s" had an invalid signature', parameters["signature"])
- return HttpResponseForbidden("Invalid signature.")
-
-
-def _validate_timestamp(timestamp_value, provider_id):
- """
- Check that the timestamp of the request is recent.
-
- Arguments:
- timestamp (int or string): Number of seconds since Jan. 1, 1970 UTC.
- If specified as a string, it will be converted to an integer.
- provider_id (unicode): Identifier for the credit provider.
-
- Returns:
- HttpResponse or None
-
- """
- timestamp = from_timestamp(timestamp_value)
- if timestamp is None:
- msg = u'"{timestamp}" is not a valid timestamp'.format(timestamp=timestamp_value)
- log.warning(msg)
- return HttpResponseBadRequest(msg)
-
- # Check that the timestamp is recent
- elapsed_seconds = (datetime.datetime.now(pytz.UTC) - timestamp).total_seconds()
- if elapsed_seconds > settings.CREDIT_PROVIDER_TIMESTAMP_EXPIRATION:
- log.warning(
- (
- u'Timestamp %s is too far in the past (%s seconds), '
- u'so we are rejecting the notification from the credit provider "%s".'
- ),
- timestamp_value, elapsed_seconds, provider_id,
- )
- return HttpResponseForbidden(u"Timestamp is too far in the past.")
class CreditCourseViewSet(PutAsCreateMixin, mixins.UpdateModelMixin, viewsets.ReadOnlyModelViewSet):
diff --git a/openedx/core/lib/api/permissions.py b/openedx/core/lib/api/permissions.py
index dc37143186..2ec8cd9625 100644
--- a/openedx/core/lib/api/permissions.py
+++ b/openedx/core/lib/api/permissions.py
@@ -80,3 +80,16 @@ class IsStaffOrReadOnly(permissions.BasePermission):
return (request.user.is_staff or
CourseStaffRole(obj.course_id).has_user(request.user) or
request.method in permissions.SAFE_METHODS)
+
+
+class IsStaffOrOwner(permissions.BasePermission):
+ """
+ Permission that allows access to admin users or the owner of an object.
+ The owner is considered the User object represented by obj.user.
+ """
+ def has_object_permission(self, request, view, obj):
+ return request.user.is_staff or obj.user == request.user
+
+ def has_permission(self, request, view):
+ user = request.user
+ return user.is_staff or (user.username == request.data.get('username'))
diff --git a/openedx/core/lib/api/tests/test_permissions.py b/openedx/core/lib/api/tests/test_permissions.py
new file mode 100644
index 0000000000..cc2436085a
--- /dev/null
+++ b/openedx/core/lib/api/tests/test_permissions.py
@@ -0,0 +1,65 @@
+""" Tests for API permissions classes. """
+from django.test import TestCase, RequestFactory
+
+from openedx.core.lib.api.permissions import IsStaffOrOwner
+from student.tests.factories import UserFactory
+
+
+class TestObject(object):
+ """ Fake class for object permission tests. """
+ user = None
+
+
+class IsStaffOrOwnerTests(TestCase):
+ """ Tests for IsStaffOrOwner permission class. """
+ def setUp(self):
+ super(IsStaffOrOwnerTests, self).setUp()
+ self.permission = IsStaffOrOwner()
+ self.request = RequestFactory().get('/')
+ self.obj = TestObject()
+
+ def assert_user_has_object_permission(self, user, permitted):
+ """
+ Asserts whether or not the user has permission to access an object.
+
+ Arguments
+ user (User)
+ permitted (boolean)
+ """
+ self.request.user = user
+ self.assertEqual(self.permission.has_object_permission(self.request, None, self.obj), permitted)
+
+ def test_staff_user(self):
+ """ Staff users should be permitted. """
+ user = UserFactory.create(is_staff=True)
+ self.assert_user_has_object_permission(user, True)
+
+ def test_owner(self):
+ """ Owners should be permitted. """
+ user = UserFactory.create()
+ self.obj.user = user
+ self.assert_user_has_object_permission(user, True)
+
+ def test_non_staff_test_non_owner_or_staff_user(self):
+ """ Non-staff and non-owner users should not be permitted. """
+ user = UserFactory.create()
+ self.assert_user_has_object_permission(user, False)
+
+ def test_has_permission_as_staff(self):
+ """ Staff users always have permission. """
+ self.request.user = UserFactory.create(is_staff=True)
+ self.assertTrue(self.permission.has_permission(self.request, None))
+
+ def test_has_permission_as_owner(self):
+ """ Owners always have permission. """
+ user = UserFactory.create()
+ request = RequestFactory().get('/?username={}'.format(user.username))
+ request.user = user
+ self.assertTrue(self.permission.has_permission(request, None))
+
+ def test_has_permission_as_non_owner(self):
+ """ Non-owners should not have permission. """
+ user = UserFactory.create()
+ request = RequestFactory().get('/?username={}'.format(user.username))
+ request.user = UserFactory.create()
+ self.assertFalse(self.permission.has_permission(request, None))