Rewrote Credit API
- API built atop Django REST Framework - Added support for OAuth 2.0 and session authentication - Added permissions around eligibility data ECOM-2609
This commit is contained in:
@@ -393,9 +393,6 @@ FEATURES = {
|
||||
# Enable OpenBadge support. See the BADGR_* settings later in this file.
|
||||
'ENABLE_OPENBADGES': False,
|
||||
|
||||
# Credit course API
|
||||
'ENABLE_CREDIT_API': True,
|
||||
|
||||
# The block types to disable need to be specified in "x block disable config" in django admin.
|
||||
'ENABLE_DISABLING_XBLOCK_TYPES': True,
|
||||
|
||||
|
||||
@@ -68,8 +68,6 @@ FEATURES['ENABLE_SHOPPING_CART'] = True
|
||||
|
||||
FEATURES['ENABLE_VERIFIED_CERTIFICATES'] = True
|
||||
|
||||
FEATURES['ENABLE_CREDIT_API'] = True
|
||||
|
||||
# Enable this feature for course staff grade downloads, to enable acceptance tests
|
||||
FEATURES['ENABLE_S3_GRADE_DOWNLOADS'] = True
|
||||
FEATURES['ALLOW_COURSE_STAFF_GRADE_DOWNLOADS'] = True
|
||||
|
||||
@@ -16,6 +16,8 @@ var edx = edx || {};
|
||||
headers: {
|
||||
'X-CSRFToken': $.cookie('csrftoken')
|
||||
},
|
||||
dataType: 'json',
|
||||
contentType: 'application/json',
|
||||
data: JSON.stringify({
|
||||
'course_key': courseKey,
|
||||
'username': username
|
||||
|
||||
@@ -140,7 +140,11 @@ var edx = edx || {};
|
||||
return $.ajax({
|
||||
url: _.sprintf(providerUrl, providerId),
|
||||
type: 'GET',
|
||||
dataType: 'json'
|
||||
dataType: 'json',
|
||||
contentType: 'application/json',
|
||||
headers: {
|
||||
'X-CSRFToken': $.cookie('csrftoken')
|
||||
}
|
||||
}).retry({times: 5, timeout: 2000, statusCodes: [404]});
|
||||
},
|
||||
/**
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
</div>
|
||||
<div class="provider-more-info">
|
||||
<%= interpolate(
|
||||
gettext("To finalize course credit, %(provider_id)s requires %(platform_name)s learners to submit a credit request."),
|
||||
{ provider_id: provider_id.toUpperCase(), platform_name: platformName }, true
|
||||
gettext("To finalize course credit, %(display_name)s requires %(platform_name)s learners to submit a credit request."),
|
||||
{ display_name: display_name, platform_name: platformName }, true
|
||||
) %>
|
||||
</div>
|
||||
<div class="provider-instructions">
|
||||
@@ -21,7 +21,7 @@
|
||||
<%= interpolate("<img src='%s' alt='%s'></image>", [thumbnail_url, display_name]) %>
|
||||
</div>
|
||||
<div class="complete-order">
|
||||
<%= interpolate('<button data-provider="%s" data-course-key="%s" data-username="%s" class="complete-course" onClick=completeOrder(this)>%s</button>', [provider_id, course_key, username,
|
||||
<%= interpolate('<button data-provider="%s" data-course-key="%s" data-username="%s" class="complete-course" onClick=completeOrder(this)>%s</button>', [id, course_key, username,
|
||||
gettext( "Get Credit")]) %>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -98,6 +98,7 @@ urlpatterns = (
|
||||
url(r'^api/val/v0/', include('edxval.urls')),
|
||||
|
||||
url(r'^api/commerce/', include('commerce.api.urls', namespace='commerce_api')),
|
||||
url(r'^api/credit/', include('openedx.core.djangoapps.credit.urls', app_name="credit", namespace='credit')),
|
||||
)
|
||||
|
||||
if settings.FEATURES["ENABLE_COMBINED_LOGIN_REGISTRATION"]:
|
||||
@@ -115,12 +116,6 @@ else:
|
||||
url(r'^register$', 'student.views.register_user', name="register_user"),
|
||||
)
|
||||
|
||||
if settings.FEATURES.get("ENABLE_CREDIT_API"):
|
||||
# Credit API end-points
|
||||
urlpatterns += (
|
||||
url(r'^api/credit/', include('openedx.core.djangoapps.credit.urls', app_name="credit", namespace='credit')),
|
||||
)
|
||||
|
||||
if settings.FEATURES["ENABLE_MOBILE_REST_API"]:
|
||||
urlpatterns += (
|
||||
url(r'^api/mobile/v0.5/', include('mobile_api.urls')),
|
||||
|
||||
@@ -5,6 +5,8 @@ whether a user has satisfied those requirements.
|
||||
|
||||
import logging
|
||||
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
|
||||
from openedx.core.djangoapps.credit.exceptions import InvalidCreditRequirements, InvalidCreditCourse
|
||||
from openedx.core.djangoapps.credit.email_utils import send_credit_notifications
|
||||
from openedx.core.djangoapps.credit.models import (
|
||||
@@ -14,8 +16,7 @@ from openedx.core.djangoapps.credit.models import (
|
||||
CreditEligibility,
|
||||
)
|
||||
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
|
||||
# TODO: Cleanup this mess! ECOM-2908
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -246,8 +247,7 @@ def set_credit_requirement_status(username, course_key, req_namespace, req_name,
|
||||
# Find the requirement we're trying to set
|
||||
req_to_update = next((
|
||||
req for req in reqs
|
||||
if req.namespace == req_namespace
|
||||
and req.name == req_name
|
||||
if req.namespace == req_namespace and req.name == req_name
|
||||
), None)
|
||||
|
||||
# If we can't find the requirement, then the most likely explanation
|
||||
|
||||
@@ -4,12 +4,12 @@ API for initiating and tracking requests for credit from a provider.
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import pytz
|
||||
import uuid
|
||||
|
||||
import pytz
|
||||
from django.db import transaction
|
||||
from lms.djangoapps.django_comment_client.utils import JsonResponse
|
||||
|
||||
from lms.djangoapps.django_comment_client.utils import JsonResponse
|
||||
from openedx.core.djangoapps.credit.exceptions import (
|
||||
UserIsNotEligible,
|
||||
CreditProviderNotConfigured,
|
||||
@@ -28,6 +28,8 @@ from student.models import User
|
||||
from util.date_utils import to_timestamp
|
||||
|
||||
|
||||
# TODO: Cleanup this mess! ECOM-2908
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -257,12 +259,10 @@ def create_credit_request(course_key, provider_id, username):
|
||||
final_grade = unicode(final_grade)
|
||||
|
||||
except (CreditRequirementStatus.DoesNotExist, TypeError, KeyError):
|
||||
log.exception(
|
||||
"Could not retrieve final grade from the credit eligibility table "
|
||||
"for user %s in course %s.",
|
||||
user.id, course_key
|
||||
)
|
||||
raise UserIsNotEligible
|
||||
msg = 'Could not retrieve final grade from the credit eligibility table for ' \
|
||||
'user [{user_id}] in course [{course_key}].'.format(user_id=user.id, course_key=course_key)
|
||||
log.exception(msg)
|
||||
raise UserIsNotEligible(msg)
|
||||
|
||||
parameters = {
|
||||
"request_uuid": credit_request.uuid,
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
"""Exceptions raised by the credit API. """
|
||||
from __future__ import unicode_literals
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from rest_framework import status
|
||||
from rest_framework.exceptions import APIException
|
||||
|
||||
# TODO: Cleanup this mess! ECOM-2908
|
||||
|
||||
|
||||
class CreditApiBadRequest(Exception):
|
||||
@@ -56,3 +62,25 @@ class InvalidCreditStatus(CreditApiBadRequest):
|
||||
The status is not either "approved" or "rejected".
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class InvalidCreditRequest(APIException):
|
||||
""" API request is invalid. """
|
||||
status_code = status.HTTP_400_BAD_REQUEST
|
||||
|
||||
|
||||
class UserNotEligibleException(InvalidCreditRequest):
|
||||
""" User not eligible for credit for a given course. """
|
||||
|
||||
def __init__(self, course_key, username):
|
||||
detail = _('[{username}] is not eligible for credit for [{course_key}].').format(username=username,
|
||||
course_key=course_key)
|
||||
super(UserNotEligibleException, self).__init__(detail)
|
||||
|
||||
|
||||
class InvalidCourseKey(InvalidCreditRequest):
|
||||
""" Course key is invalid. """
|
||||
|
||||
def __init__(self, course_key):
|
||||
detail = _('[{course_key}] is not a valid course key.').format(course_key=course_key)
|
||||
super(InvalidCourseKey, self).__init__(detail)
|
||||
|
||||
@@ -25,6 +25,7 @@ from xmodule_django.models import CourseKeyField
|
||||
from django.utils.translation import ugettext_lazy
|
||||
|
||||
|
||||
CREDIT_PROVIDER_ID_REGEX = r"[a-z,A-Z,0-9,\-]+"
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -42,7 +43,7 @@ class CreditProvider(TimeStampedModel):
|
||||
unique=True,
|
||||
validators=[
|
||||
RegexValidator(
|
||||
regex=r"^[a-z,A-Z,0-9,\-]+$",
|
||||
regex=CREDIT_PROVIDER_ID_REGEX,
|
||||
message="Only alphanumeric characters and hyphens (-) are allowed",
|
||||
code="invalid_provider_id",
|
||||
)
|
||||
@@ -498,10 +499,7 @@ def default_deadline_for_credit_eligibility(): # pylint: disable=invalid-name
|
||||
|
||||
|
||||
class CreditEligibility(TimeStampedModel):
|
||||
"""
|
||||
A record of a user's eligibility for credit from a specific credit
|
||||
provider for a specific course.
|
||||
"""
|
||||
""" A record of a user's eligibility for credit for a specific course. """
|
||||
username = models.CharField(max_length=255, db_index=True)
|
||||
course = models.ForeignKey(CreditCourse, related_name="eligibilities")
|
||||
|
||||
|
||||
@@ -1,16 +1,25 @@
|
||||
""" Credit API Serializers """
|
||||
|
||||
from rest_framework import serializers
|
||||
from __future__ import unicode_literals
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
from django.conf import settings
|
||||
from opaque_keys import InvalidKeyError
|
||||
from openedx.core.djangoapps.credit.models import CreditCourse
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
import pytz
|
||||
from rest_framework import serializers
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
|
||||
from openedx.core.djangoapps.credit.models import CreditCourse, CreditProvider, CreditEligibility, CreditRequest
|
||||
from openedx.core.djangoapps.credit.signature import get_shared_secret_key, signature
|
||||
from util.date_utils import from_timestamp
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CourseKeyField(serializers.Field):
|
||||
"""
|
||||
Serializer field for a model CourseKey field.
|
||||
"""
|
||||
""" Serializer field for a model CourseKey field. """
|
||||
|
||||
def to_representation(self, data):
|
||||
"""Convert a course key to unicode. """
|
||||
@@ -32,3 +41,81 @@ class CreditCourseSerializer(serializers.ModelSerializer):
|
||||
class Meta(object):
|
||||
model = CreditCourse
|
||||
exclude = ('id',)
|
||||
|
||||
|
||||
class CreditProviderSerializer(serializers.ModelSerializer):
|
||||
""" CreditProvider """
|
||||
id = serializers.CharField(source='provider_id') # pylint:disable=invalid-name
|
||||
description = serializers.CharField(source='provider_description')
|
||||
status_url = serializers.URLField(source='provider_status_url')
|
||||
url = serializers.URLField(source='provider_url')
|
||||
|
||||
class Meta(object):
|
||||
model = CreditProvider
|
||||
fields = ('id', 'display_name', 'url', 'status_url', 'description', 'enable_integration',
|
||||
'fulfillment_instructions', 'thumbnail_url',)
|
||||
|
||||
|
||||
class CreditEligibilitySerializer(serializers.ModelSerializer):
|
||||
""" CreditEligibility serializer. """
|
||||
course_key = serializers.SerializerMethodField()
|
||||
|
||||
def get_course_key(self, obj):
|
||||
""" Returns the course key associated with the course. """
|
||||
return unicode(obj.course.course_key)
|
||||
|
||||
class Meta(object):
|
||||
model = CreditEligibility
|
||||
fields = ('username', 'course_key', 'deadline',)
|
||||
|
||||
|
||||
class CreditProviderCallbackSerializer(serializers.Serializer): # pylint:disable=abstract-method
|
||||
"""
|
||||
Serializer for input to the CreditProviderCallback view.
|
||||
|
||||
This is used solely for validating the input.
|
||||
"""
|
||||
request_uuid = serializers.CharField(required=True)
|
||||
status = serializers.ChoiceField(required=True, choices=CreditRequest.REQUEST_STATUS_CHOICES)
|
||||
timestamp = serializers.IntegerField(required=True)
|
||||
signature = serializers.CharField(required=True)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.provider = kwargs.pop('provider', None)
|
||||
super(CreditProviderCallbackSerializer, self).__init__(**kwargs)
|
||||
|
||||
def validate_timestamp(self, value):
|
||||
""" Ensure the request has been received in a timely manner. """
|
||||
date_time = from_timestamp(value)
|
||||
|
||||
# Ensure we converted the timestamp to a datetime
|
||||
if not date_time:
|
||||
msg = '[{}] is not a valid timestamp'.format(value)
|
||||
log.warning(msg)
|
||||
raise serializers.ValidationError(msg)
|
||||
|
||||
elapsed = (datetime.datetime.now(pytz.UTC) - date_time).total_seconds()
|
||||
if elapsed > settings.CREDIT_PROVIDER_TIMESTAMP_EXPIRATION:
|
||||
msg = '[{value}] is too far in the past (over [{elapsed}] seconds).'.format(value=value, elapsed=elapsed)
|
||||
log.warning(msg)
|
||||
raise serializers.ValidationError(msg)
|
||||
|
||||
return value
|
||||
|
||||
def validate_signature(self, value):
|
||||
""" Validate the signature and ensure the provider is setup properly. """
|
||||
provider_id = self.provider.provider_id
|
||||
secret_key = get_shared_secret_key(provider_id)
|
||||
if secret_key is None:
|
||||
msg = 'Could not retrieve secret key for credit provider [{}]. ' \
|
||||
'Unable to validate requests from provider.'.format(provider_id)
|
||||
log.error(msg)
|
||||
raise PermissionDenied(msg)
|
||||
|
||||
data = self.initial_data
|
||||
actual_signature = data["signature"]
|
||||
if signature(data, secret_key) != actual_signature:
|
||||
msg = 'Request from credit provider [{}] had an invalid signature.'.format(provider_id)
|
||||
raise PermissionDenied(msg)
|
||||
|
||||
return value
|
||||
|
||||
@@ -59,5 +59,5 @@ def signature(params, shared_secret):
|
||||
for key in sorted(params.keys())
|
||||
if key != u"signature"
|
||||
])
|
||||
hasher = hmac.new(shared_secret, encoded_params.encode('utf-8'), hashlib.sha256)
|
||||
hasher = hmac.new(shared_secret.encode('utf-8'), encoded_params.encode('utf-8'), hashlib.sha256)
|
||||
return hasher.hexdigest()
|
||||
|
||||
@@ -2,13 +2,9 @@
|
||||
This file contains celery tasks for credit course views.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
from pytz import UTC
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from celery import task
|
||||
from celery.utils.log import get_task_logger
|
||||
from django.conf import settings
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey, UsageKey
|
||||
|
||||
|
||||
72
openedx/core/djangoapps/credit/tests/factories.py
Normal file
72
openedx/core/djangoapps/credit/tests/factories.py
Normal file
@@ -0,0 +1,72 @@
|
||||
# pylint:disable=missing-docstring,no-member
|
||||
import datetime
|
||||
import json
|
||||
import uuid # pylint:disable=unused-import
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
import factory
|
||||
from factory.fuzzy import FuzzyText
|
||||
import pytz
|
||||
|
||||
from openedx.core.djangoapps.credit.models import CreditProvider, CreditEligibility, CreditCourse, CreditRequest
|
||||
from util.date_utils import to_timestamp
|
||||
|
||||
|
||||
class CreditCourseFactory(factory.DjangoModelFactory):
|
||||
class Meta(object):
|
||||
model = CreditCourse
|
||||
|
||||
course_key = FuzzyText(prefix='fake.org/', suffix='/fake.run')
|
||||
enabled = True
|
||||
|
||||
|
||||
class CreditProviderFactory(factory.DjangoModelFactory):
|
||||
class Meta(object):
|
||||
model = CreditProvider
|
||||
|
||||
provider_id = FuzzyText(length=5)
|
||||
provider_url = FuzzyText(prefix='http://')
|
||||
|
||||
|
||||
class CreditEligibilityFactory(factory.DjangoModelFactory):
|
||||
class Meta(object):
|
||||
model = CreditEligibility
|
||||
|
||||
course = factory.SubFactory(CreditCourseFactory)
|
||||
|
||||
|
||||
class CreditRequestFactory(factory.DjangoModelFactory):
|
||||
class Meta(object):
|
||||
model = CreditRequest
|
||||
|
||||
uuid = factory.LazyAttribute(lambda o: uuid.uuid4().hex) # pylint: disable=undefined-variable
|
||||
|
||||
# pylint: disable=access-member-before-definition,attribute-defined-outside-init,no-self-argument,unused-argument
|
||||
@factory.post_generation
|
||||
def post(obj, create, extracted, **kwargs):
|
||||
"""
|
||||
Post-generation handler.
|
||||
|
||||
Sets up parameters field.
|
||||
"""
|
||||
if not obj.parameters:
|
||||
course_key = obj.course.course_key
|
||||
user = User.objects.get(username=obj.username)
|
||||
user_profile = user.profile
|
||||
|
||||
# pylint:disable=access-member-before-definition
|
||||
obj.parameters = json.dumps({
|
||||
"request_uuid": obj.uuid,
|
||||
"timestamp": to_timestamp(datetime.datetime.now(pytz.UTC)),
|
||||
"course_org": course_key.org,
|
||||
"course_num": course_key.course,
|
||||
"course_run": course_key.run,
|
||||
"final_grade": '0.96',
|
||||
"user_username": user.username,
|
||||
"user_email": user.email,
|
||||
"user_full_name": user_profile.name,
|
||||
"user_mailing_address": "",
|
||||
"user_country": user_profile.country.code or "",
|
||||
})
|
||||
|
||||
obj.save()
|
||||
@@ -2,17 +2,13 @@
|
||||
Tests for the API functions in the credit app.
|
||||
"""
|
||||
import datetime
|
||||
import json
|
||||
import unittest
|
||||
|
||||
import ddt
|
||||
from django.conf import settings
|
||||
from django.core import mail
|
||||
from django.test import TestCase
|
||||
from django.test.utils import override_settings
|
||||
from django.db import connection, transaction
|
||||
from django.core.urlresolvers import reverse, NoReverseMatch
|
||||
from mock import patch
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
import pytz
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
@@ -39,8 +35,6 @@ from student.tests.factories import UserFactory
|
||||
|
||||
TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6"
|
||||
|
||||
from util.testing import UrlResetMixin
|
||||
|
||||
|
||||
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
|
||||
"hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY,
|
||||
@@ -880,119 +874,3 @@ class CreditProviderIntegrationApiTests(CreditApiTestBase):
|
||||
"""Check the user's credit status. """
|
||||
statuses = api.get_credit_requests_for_user(self.USER_INFO["username"])
|
||||
self.assertEqual(statuses[0]["status"], expected_status)
|
||||
|
||||
|
||||
class CreditApiFeatureFlagTest(UrlResetMixin, TestCase):
|
||||
"""
|
||||
Base class to test the credit api urls.
|
||||
"""
|
||||
|
||||
def setUp(self, **kwargs):
|
||||
enable_credit_api = kwargs.get('enable_credit_api', False)
|
||||
with patch.dict('django.conf.settings.FEATURES', {'ENABLE_CREDIT_API': enable_credit_api}):
|
||||
super(CreditApiFeatureFlagTest, self).setUp('lms.urls')
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditApiFeatureFlagDisabledTests(CreditApiFeatureFlagTest):
|
||||
"""
|
||||
Test Python API for credit provider api with feature flag
|
||||
'ENABLE_CREDIT_API' disabled.
|
||||
"""
|
||||
PROVIDER_ID = "hogwarts"
|
||||
|
||||
def setUp(self):
|
||||
super(CreditApiFeatureFlagDisabledTests, self).setUp(enable_credit_api=False)
|
||||
|
||||
def test_get_credit_provider_details(self):
|
||||
"""
|
||||
Test that 'get_provider_info' api url not found.
|
||||
"""
|
||||
with self.assertRaises(NoReverseMatch):
|
||||
reverse('credit:get_provider_info', args=[self.PROVIDER_ID])
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditApiFeatureFlagEnabledTests(CreditApiFeatureFlagTest, CreditApiTestBase):
|
||||
"""
|
||||
Test Python API for credit provider api with feature flag
|
||||
'ENABLE_CREDIT_API' enabled.
|
||||
"""
|
||||
USER_INFO = {
|
||||
"username": "bob",
|
||||
"email": "bob@example.com",
|
||||
"full_name": "Bob",
|
||||
"mailing_address": "123 Fake Street, Cambridge MA",
|
||||
"country": "US",
|
||||
}
|
||||
|
||||
FINAL_GRADE = 0.95
|
||||
|
||||
def setUp(self):
|
||||
super(CreditApiFeatureFlagEnabledTests, self).setUp(enable_credit_api=True)
|
||||
self.user = UserFactory(
|
||||
username=self.USER_INFO['username'],
|
||||
email=self.USER_INFO['email'],
|
||||
)
|
||||
|
||||
self.user.profile.name = self.USER_INFO['full_name']
|
||||
self.user.profile.mailing_address = self.USER_INFO['mailing_address']
|
||||
self.user.profile.country = self.USER_INFO['country']
|
||||
self.user.profile.save()
|
||||
|
||||
# By default, configure the database so that there is a single
|
||||
# credit requirement that the user has satisfied (minimum grade)
|
||||
self._configure_credit()
|
||||
|
||||
def test_get_credit_provider_details(self):
|
||||
"""Test that credit api method 'test_get_credit_provider_details'
|
||||
returns dictionary data related to provided credit provider.
|
||||
"""
|
||||
expected_result = {
|
||||
"provider_id": self.PROVIDER_ID,
|
||||
"display_name": self.PROVIDER_NAME,
|
||||
"provider_url": self.PROVIDER_URL,
|
||||
"provider_status_url": self.PROVIDER_STATUS_URL,
|
||||
"provider_description": self.PROVIDER_DESCRIPTION,
|
||||
"enable_integration": self.ENABLE_INTEGRATION,
|
||||
"fulfillment_instructions": self.FULFILLMENT_INSTRUCTIONS,
|
||||
"thumbnail_url": self.THUMBNAIL_URL
|
||||
}
|
||||
path = reverse('credit:get_provider_info', kwargs={'provider_id': self.PROVIDER_ID})
|
||||
result = self.client.get(path)
|
||||
result = json.loads(result.content)
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
# now test that user gets empty dict for non existent credit provider
|
||||
path = reverse('credit:get_provider_info', kwargs={'provider_id': 'fake_provider_id'})
|
||||
result = self.client.get(path)
|
||||
result = json.loads(result.content)
|
||||
self.assertEqual(result, {})
|
||||
|
||||
def _configure_credit(self):
|
||||
"""
|
||||
Configure a credit course and its requirements.
|
||||
|
||||
By default, add a single requirement (minimum grade)
|
||||
that the user has satisfied.
|
||||
|
||||
"""
|
||||
credit_course = self.add_credit_course()
|
||||
requirement = CreditRequirement.objects.create(
|
||||
course=credit_course,
|
||||
namespace="grade",
|
||||
name="grade",
|
||||
active=True
|
||||
)
|
||||
status = CreditRequirementStatus.objects.create(
|
||||
username=self.USER_INFO["username"],
|
||||
requirement=requirement,
|
||||
)
|
||||
status.status = "satisfied"
|
||||
status.reason = {"final_grade": self.FINAL_GRADE}
|
||||
status.save()
|
||||
|
||||
CreditEligibility.objects.create(
|
||||
username=self.USER_INFO['username'],
|
||||
course=CreditCourse.objects.get(course_key=self.course_key)
|
||||
)
|
||||
|
||||
@@ -5,7 +5,6 @@ Tests for credit course models.
|
||||
|
||||
import ddt
|
||||
from django.test import TestCase
|
||||
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
|
||||
from openedx.core.djangoapps.credit.models import CreditCourse, CreditRequirement
|
||||
@@ -17,7 +16,7 @@ class CreditEligibilityModelTests(TestCase):
|
||||
Tests for credit models used to track credit eligibility.
|
||||
"""
|
||||
|
||||
def setUp(self, **kwargs):
|
||||
def setUp(self):
|
||||
super(CreditEligibilityModelTests, self).setUp()
|
||||
self.course_key = CourseKey.from_string("edX/DemoX/Demo_Course")
|
||||
|
||||
|
||||
46
openedx/core/djangoapps/credit/tests/test_serializers.py
Normal file
46
openedx/core/djangoapps/credit/tests/test_serializers.py
Normal file
@@ -0,0 +1,46 @@
|
||||
""" Tests for Credit API serializers. """
|
||||
|
||||
# pylint: disable=no-member
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
from openedx.core.djangoapps.credit import serializers
|
||||
from openedx.core.djangoapps.credit.tests.factories import CreditProviderFactory, CreditEligibilityFactory
|
||||
from student.tests.factories import UserFactory
|
||||
|
||||
|
||||
class CreditProviderSerializerTests(TestCase):
|
||||
""" CreditProviderSerializer tests. """
|
||||
|
||||
def test_data(self):
|
||||
""" Verify the correct fields are serialized. """
|
||||
provider = CreditProviderFactory(active=False)
|
||||
serializer = serializers.CreditProviderSerializer(provider)
|
||||
expected = {
|
||||
'id': provider.provider_id,
|
||||
'display_name': provider.display_name,
|
||||
'url': provider.provider_url,
|
||||
'status_url': provider.provider_status_url,
|
||||
'description': provider.provider_description,
|
||||
'enable_integration': provider.enable_integration,
|
||||
'fulfillment_instructions': provider.fulfillment_instructions,
|
||||
'thumbnail_url': provider.thumbnail_url,
|
||||
}
|
||||
self.assertDictEqual(serializer.data, expected)
|
||||
|
||||
|
||||
class CreditEligibilitySerializerTests(TestCase):
|
||||
""" CreditEligibilitySerializer tests. """
|
||||
|
||||
def test_data(self):
|
||||
""" Verify the correct fields are serialized. """
|
||||
user = UserFactory()
|
||||
eligibility = CreditEligibilityFactory(username=user.username)
|
||||
serializer = serializers.CreditEligibilitySerializer(eligibility)
|
||||
expected = {
|
||||
'course_key': unicode(eligibility.course.course_key),
|
||||
'deadline': eligibility.deadline.strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
|
||||
'username': user.username,
|
||||
}
|
||||
self.assertDictEqual(serializer.data, expected)
|
||||
@@ -1,6 +1,10 @@
|
||||
"""
|
||||
Tests for credit app views.
|
||||
"""
|
||||
|
||||
# pylint: disable=no-member
|
||||
|
||||
from __future__ import unicode_literals
|
||||
import datetime
|
||||
import json
|
||||
import unittest
|
||||
@@ -10,397 +14,98 @@ from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test import TestCase, Client
|
||||
from django.test.utils import override_settings
|
||||
from mock import patch
|
||||
from oauth2_provider.tests.factories import AccessTokenFactory, ClientFactory
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
import pytz
|
||||
|
||||
from student.tests.factories import UserFactory
|
||||
from util.date_utils import to_timestamp
|
||||
from util.testing import UrlResetMixin
|
||||
from openedx.core.djangoapps.credit import api
|
||||
from openedx.core.djangoapps.credit.signature import signature
|
||||
from openedx.core.djangoapps.credit.serializers import CreditProviderSerializer, CreditEligibilitySerializer
|
||||
from openedx.core.djangoapps.credit.tests.factories import (
|
||||
CreditProviderFactory,
|
||||
CreditEligibilityFactory,
|
||||
CreditCourseFactory, CreditRequestFactory)
|
||||
from student.tests.factories import UserFactory, AdminFactory
|
||||
from openedx.core.djangoapps.credit.models import (
|
||||
CreditCourse,
|
||||
CreditProvider,
|
||||
CreditRequirement,
|
||||
CreditRequirementStatus,
|
||||
CreditEligibility,
|
||||
CreditRequest,
|
||||
)
|
||||
CreditProvider, CreditRequest, CreditRequirement, CreditRequirementStatus)
|
||||
from util.date_utils import to_timestamp
|
||||
|
||||
JSON = 'application/json'
|
||||
TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6"
|
||||
|
||||
|
||||
class ApiTestCaseMixin(object):
|
||||
""" Mixin to aid with API testing. """
|
||||
|
||||
def assert_error_response(self, response, msg, status_code=400):
|
||||
""" Validate the response's status and detail message. """
|
||||
self.assertEqual(response.status_code, status_code)
|
||||
self.assertDictEqual(response.data, {'detail': msg})
|
||||
|
||||
|
||||
class UserMixin(object):
|
||||
""" Test mixin that creates, and authenticates, a new user for every test. """
|
||||
password = 'password'
|
||||
list_path = None
|
||||
|
||||
def setUp(self):
|
||||
super(UserMixin, self).setUp()
|
||||
|
||||
# This value must be set here, as setting it outside of a method results in issues with CMS/Studio tests.
|
||||
if self.list_path:
|
||||
self.path = reverse(self.list_path)
|
||||
|
||||
# Create a user and login, so that we can use session auth for the
|
||||
# tests that aren't specifically testing authentication or authorization.
|
||||
self.user = UserFactory(password=self.password, is_staff=True)
|
||||
self.client.login(username=self.user.username, password=self.password)
|
||||
|
||||
|
||||
class AuthMixin(object):
|
||||
""" Test mixin with methods to test OAuth 2.0 and session authentication. """
|
||||
|
||||
def test_authentication_required(self):
|
||||
""" Verify the endpoint requires authentication. """
|
||||
self.client.logout()
|
||||
response = self.client.get(self.path)
|
||||
self.assertEqual(response.status_code, 401)
|
||||
|
||||
def test_oauth(self):
|
||||
""" Verify the endpoint supports authentication via OAuth 2.0. """
|
||||
access_token = AccessTokenFactory(user=self.user, client=ClientFactory()).token
|
||||
headers = {
|
||||
'HTTP_AUTHORIZATION': 'Bearer ' + access_token
|
||||
}
|
||||
self.client.logout()
|
||||
response = self.client.get(self.path, **headers)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_session_auth(self):
|
||||
""" Verify the endpoint supports authentication via session. """
|
||||
self.client.logout()
|
||||
self.client.login(username=self.user.username, password=self.password)
|
||||
response = self.client.get(self.path)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
|
||||
"hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY
|
||||
})
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditProviderViewTests(UrlResetMixin, TestCase):
|
||||
"""
|
||||
Tests for HTTP end-points used to issue requests to credit providers
|
||||
and receive responses approving or denying requests.
|
||||
"""
|
||||
class ReadOnlyMixin(object):
|
||||
""" Test mixin for read-only API endpoints. """
|
||||
|
||||
USERNAME = "ron"
|
||||
USER_FULL_NAME = "Ron Weasley"
|
||||
PASSWORD = "password"
|
||||
PROVIDER_ID = "hogwarts"
|
||||
PROVIDER_URL = "https://credit.example.com/request"
|
||||
COURSE_KEY = CourseKey.from_string("edX/DemoX/Demo_Course")
|
||||
FINAL_GRADE = 0.95
|
||||
|
||||
@patch.dict(settings.FEATURES, {"ENABLE_CREDIT_API": True})
|
||||
def setUp(self):
|
||||
"""
|
||||
Configure a credit course.
|
||||
"""
|
||||
super(CreditProviderViewTests, self).setUp()
|
||||
|
||||
# Create the test user and log in
|
||||
self.user = UserFactory(username=self.USERNAME, password=self.PASSWORD)
|
||||
self.user.profile.name = self.USER_FULL_NAME
|
||||
self.user.profile.save()
|
||||
|
||||
success = self.client.login(username=self.USERNAME, password=self.PASSWORD)
|
||||
self.assertTrue(success, msg="Could not log in")
|
||||
|
||||
# Enable the course for credit
|
||||
credit_course = CreditCourse.objects.create(
|
||||
course_key=self.COURSE_KEY,
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
# Configure a credit provider for the course
|
||||
CreditProvider.objects.create(
|
||||
provider_id=self.PROVIDER_ID,
|
||||
enable_integration=True,
|
||||
provider_url=self.PROVIDER_URL,
|
||||
)
|
||||
|
||||
# Add a single credit requirement (final grade)
|
||||
requirement = CreditRequirement.objects.create(
|
||||
course=credit_course,
|
||||
namespace="grade",
|
||||
name="grade",
|
||||
)
|
||||
|
||||
# Mark the user as having satisfied the requirement
|
||||
# and eligible for credit.
|
||||
CreditRequirementStatus.objects.create(
|
||||
username=self.USERNAME,
|
||||
requirement=requirement,
|
||||
status="satisfied",
|
||||
reason={"final_grade": self.FINAL_GRADE}
|
||||
)
|
||||
CreditEligibility.objects.create(
|
||||
username=self.USERNAME,
|
||||
course=credit_course,
|
||||
)
|
||||
|
||||
def test_credit_request_and_response(self):
|
||||
# Initiate a request
|
||||
response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that the user's request status is pending
|
||||
requests = api.get_credit_requests_for_user(self.USERNAME)
|
||||
self.assertEqual(len(requests), 1)
|
||||
self.assertEqual(requests[0]["status"], "pending")
|
||||
|
||||
# Check request parameters
|
||||
content = json.loads(response.content)
|
||||
self.assertEqual(content["url"], self.PROVIDER_URL)
|
||||
self.assertEqual(content["method"], "POST")
|
||||
self.assertEqual(len(content["parameters"]["request_uuid"]), 32)
|
||||
self.assertEqual(content["parameters"]["course_org"], "edX")
|
||||
self.assertEqual(content["parameters"]["course_num"], "DemoX")
|
||||
self.assertEqual(content["parameters"]["course_run"], "Demo_Course")
|
||||
self.assertEqual(content["parameters"]["final_grade"], unicode(self.FINAL_GRADE))
|
||||
self.assertEqual(content["parameters"]["user_username"], self.USERNAME)
|
||||
self.assertEqual(content["parameters"]["user_full_name"], self.USER_FULL_NAME)
|
||||
self.assertEqual(content["parameters"]["user_mailing_address"], "")
|
||||
self.assertEqual(content["parameters"]["user_country"], "")
|
||||
|
||||
# The signature is going to change each test run because the request
|
||||
# is assigned a different UUID each time.
|
||||
# For this reason, we use the signature function directly
|
||||
# (the "signature" parameter will be ignored when calculating the signature).
|
||||
# Other unit tests verify that the signature function is working correctly.
|
||||
self.assertEqual(
|
||||
content["parameters"]["signature"],
|
||||
signature(content["parameters"], TEST_CREDIT_PROVIDER_SECRET_KEY)
|
||||
)
|
||||
|
||||
# Simulate a response from the credit provider
|
||||
response = self._credit_provider_callback(
|
||||
content["parameters"]["request_uuid"],
|
||||
"approved"
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that the user's status is approved
|
||||
requests = api.get_credit_requests_for_user(self.USERNAME)
|
||||
self.assertEqual(len(requests), 1)
|
||||
self.assertEqual(requests[0]["status"], "approved")
|
||||
|
||||
def test_request_credit_anonymous_user(self):
|
||||
self.client.logout()
|
||||
response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
def test_request_credit_for_another_user(self):
|
||||
response = self._create_credit_request("another_user", self.COURSE_KEY)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
@ddt.data(
|
||||
# Invalid JSON
|
||||
"{",
|
||||
|
||||
# Missing required parameters
|
||||
json.dumps({"username": USERNAME}),
|
||||
json.dumps({"course_key": unicode(COURSE_KEY)}),
|
||||
|
||||
# Invalid course key format
|
||||
json.dumps({"username": USERNAME, "course_key": "invalid"}),
|
||||
)
|
||||
def test_create_credit_request_invalid_parameters(self, request_data):
|
||||
url = reverse("credit:create_request", args=[self.PROVIDER_ID])
|
||||
response = self.client.post(url, data=request_data, content_type=JSON)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_credit_provider_callback_validates_signature(self):
|
||||
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
|
||||
|
||||
# Simulate a callback from the credit provider with an invalid signature
|
||||
# Since the signature is invalid, we respond with a 403 Not Authorized.
|
||||
response = self._credit_provider_callback(request_uuid, "approved", sig="invalid")
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
def test_credit_provider_callback_validates_timestamp(self):
|
||||
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
|
||||
|
||||
# Simulate a callback from the credit provider with a timestamp too far in the past
|
||||
# (slightly more than 15 minutes)
|
||||
# Since the message isn't timely, respond with a 403.
|
||||
timestamp = to_timestamp(datetime.datetime.now(pytz.UTC) - datetime.timedelta(0, 60 * 15 + 1))
|
||||
response = self._credit_provider_callback(request_uuid, "approved", timestamp=timestamp)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
def test_credit_provider_callback_handles_string_timestamp(self):
|
||||
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
|
||||
|
||||
# Simulate a callback from the credit provider with a timestamp
|
||||
# encoded as a string instead of an integer.
|
||||
timestamp = str(to_timestamp(datetime.datetime.now(pytz.UTC)))
|
||||
response = self._credit_provider_callback(request_uuid, "approved", timestamp=timestamp)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_credit_provider_callback_is_idempotent(self):
|
||||
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
|
||||
|
||||
# Initially, the status should be "pending"
|
||||
self._assert_request_status(request_uuid, "pending")
|
||||
|
||||
# First call sets the status to approved
|
||||
self._credit_provider_callback(request_uuid, "approved")
|
||||
self._assert_request_status(request_uuid, "approved")
|
||||
|
||||
# Second call succeeds as well; status is still approved
|
||||
self._credit_provider_callback(request_uuid, "approved")
|
||||
self._assert_request_status(request_uuid, "approved")
|
||||
|
||||
@ddt.data(
|
||||
# Invalid JSON
|
||||
"{",
|
||||
|
||||
# Not a dictionary
|
||||
"4",
|
||||
|
||||
# Invalid timestamp format
|
||||
json.dumps({
|
||||
"request_uuid": "557168d0f7664fe59097106c67c3f847",
|
||||
"status": "approved",
|
||||
"timestamp": "invalid",
|
||||
"signature": "7685ae1c8f763597ee7ce526685c5ac24353317dbfe087f0ed32a699daf7dc63",
|
||||
}),
|
||||
)
|
||||
def test_credit_provider_callback_invalid_parameters(self, request_data):
|
||||
url = reverse("credit:provider_callback", args=[self.PROVIDER_ID])
|
||||
response = self.client.post(url, data=request_data, content_type=JSON)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_credit_provider_invalid_status(self):
|
||||
response = self._credit_provider_callback("557168d0f7664fe59097106c67c3f847", "invalid")
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_credit_provider_key_not_configured(self):
|
||||
# Cannot initiate a request because we can't sign it
|
||||
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
|
||||
response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
# Create the request with the secret key configured
|
||||
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
|
||||
|
||||
# Callback from the provider is not authorized, because
|
||||
# the shared secret isn't configured.
|
||||
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
|
||||
response = self._credit_provider_callback(request_uuid, "approved")
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
def test_request_associated_with_another_provider(self):
|
||||
other_provider_id = "other_provider"
|
||||
other_provider_secret_key = "1d01f067a5a54b0b8059f7095a7c636d"
|
||||
|
||||
# Create an additional credit provider
|
||||
CreditProvider.objects.create(provider_id=other_provider_id, enable_integration=True)
|
||||
|
||||
# Initiate a credit request with the first provider
|
||||
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
|
||||
|
||||
# Attempt to update the request status for a different provider
|
||||
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={other_provider_id: other_provider_secret_key}):
|
||||
response = self._credit_provider_callback(
|
||||
request_uuid,
|
||||
"approved",
|
||||
provider_id=other_provider_id,
|
||||
secret_key=other_provider_secret_key,
|
||||
)
|
||||
|
||||
# Response should be a 404 to avoid leaking request UUID values to other providers.
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
# Request status should still be "pending"
|
||||
self._assert_request_status(request_uuid, "pending")
|
||||
|
||||
def _create_credit_request(self, username, course_key):
|
||||
"""
|
||||
Initiate a request for credit.
|
||||
"""
|
||||
url = reverse("credit:create_request", args=[self.PROVIDER_ID])
|
||||
return self.client.post(
|
||||
url,
|
||||
data=json.dumps({
|
||||
"username": username,
|
||||
"course_key": unicode(course_key),
|
||||
}),
|
||||
content_type=JSON,
|
||||
)
|
||||
|
||||
def _create_credit_request_and_get_uuid(self, username, course_key):
|
||||
"""
|
||||
Initiate a request for credit and return the request UUID.
|
||||
"""
|
||||
response = self._create_credit_request(username, course_key)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
return json.loads(response.content)["parameters"]["request_uuid"]
|
||||
|
||||
def _credit_provider_callback(self, request_uuid, status, **kwargs):
|
||||
"""
|
||||
Simulate a response from the credit provider approving
|
||||
or rejecting the credit request.
|
||||
|
||||
Arguments:
|
||||
request_uuid (str): The UUID of the credit request.
|
||||
status (str): The status of the credit request.
|
||||
|
||||
Keyword Arguments:
|
||||
provider_id (str): Identifier for the credit provider.
|
||||
secret_key (str): Shared secret key for signing messages.
|
||||
timestamp (datetime): Timestamp of the message.
|
||||
sig (str): Digital signature to use on messages.
|
||||
|
||||
"""
|
||||
provider_id = kwargs.get("provider_id", self.PROVIDER_ID)
|
||||
secret_key = kwargs.get("secret_key", TEST_CREDIT_PROVIDER_SECRET_KEY)
|
||||
timestamp = kwargs.get("timestamp", to_timestamp(datetime.datetime.now(pytz.UTC)))
|
||||
|
||||
url = reverse("credit:provider_callback", args=[provider_id])
|
||||
|
||||
parameters = {
|
||||
"request_uuid": request_uuid,
|
||||
"status": status,
|
||||
"timestamp": timestamp,
|
||||
}
|
||||
parameters["signature"] = kwargs.get("sig", signature(parameters, secret_key))
|
||||
|
||||
return self.client.post(url, data=json.dumps(parameters), content_type=JSON)
|
||||
|
||||
def _assert_request_status(self, uuid, expected_status):
|
||||
"""
|
||||
Check the status of a credit request.
|
||||
"""
|
||||
request = CreditRequest.objects.get(uuid=uuid)
|
||||
self.assertEqual(request.status, expected_status)
|
||||
|
||||
def test_get_providers_detail(self):
|
||||
"""Verify that the method 'get_provider_detail' returns provider with
|
||||
the given provide in 'provider_ids'.
|
||||
"""
|
||||
url = reverse("credit:providers_detail") + "?provider_ids=hogwarts"
|
||||
response = self.client.get(url)
|
||||
expected = [
|
||||
{
|
||||
'enable_integration': True,
|
||||
'description': '',
|
||||
'url': 'https://credit.example.com/request',
|
||||
'status_url': '',
|
||||
'thumbnail_url': '',
|
||||
'fulfillment_instructions': None,
|
||||
'display_name': '',
|
||||
'id': 'hogwarts'
|
||||
}
|
||||
]
|
||||
|
||||
self.assertListEqual(json.loads(response.content), expected)
|
||||
|
||||
def test_get_providers_with_multiple_provider_ids(self):
|
||||
"""Test that the method 'get_provider_detail' returns multiple
|
||||
providers with given 'provider_ids' or when no provider in
|
||||
'provider_ids' is given.
|
||||
"""
|
||||
# Add another credit provider for the course
|
||||
CreditProvider.objects.create(
|
||||
provider_id='dummy_id',
|
||||
enable_integration=True,
|
||||
provider_url='https://example.com',
|
||||
)
|
||||
|
||||
# verify that all the matching providers are returned when provider ids
|
||||
# are given in parameter 'provider_ids'
|
||||
url = reverse("credit:providers_detail") + "?provider_ids=hogwarts,dummy_id"
|
||||
response = self.client.get(url)
|
||||
self.assertEquals(len(json.loads(response.content)), 2)
|
||||
|
||||
# verify that all providers are returned when no provider id in
|
||||
# parameter 'provider_ids' is provided
|
||||
url = reverse("credit:providers_detail")
|
||||
response = self.client.get(url)
|
||||
self.assertEquals(len(json.loads(response.content)), 2)
|
||||
@ddt.data('delete', 'post', 'put')
|
||||
def test_readonly(self, method):
|
||||
""" Verify the viewset does not allow CreditProvider objects to be created or modified. """
|
||||
response = getattr(self.client, method)(self.path)
|
||||
self.assertEqual(response.status_code, 405)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditCourseViewSetTests(TestCase):
|
||||
class CreditCourseViewSetTests(UserMixin, TestCase):
|
||||
""" Tests for the CreditCourse endpoints.
|
||||
|
||||
GET/POST /api/v1/credit/creditcourse/
|
||||
GET/PUT /api/v1/credit/creditcourse/:course_id/
|
||||
"""
|
||||
password = 'password'
|
||||
|
||||
def setUp(self):
|
||||
super(CreditCourseViewSetTests, self).setUp()
|
||||
|
||||
# This value must be set here, as setting it outside of a method results in issues with CMS/Studio tests.
|
||||
self.path = reverse('credit:creditcourse-list')
|
||||
|
||||
# Create a user and login, so that we can use session auth for the
|
||||
# tests that aren't specifically testing authentication or authorization.
|
||||
user = UserFactory(password=self.password, is_staff=True)
|
||||
self.client.login(username=user.username, password=self.password)
|
||||
list_path = 'credit:creditcourse-list'
|
||||
|
||||
def _serialize_credit_course(self, credit_course):
|
||||
""" Serializes a CreditCourse to a Python dict. """
|
||||
@@ -442,7 +147,7 @@ class CreditCourseViewSetTests(TestCase):
|
||||
self.assertIn('CSRF', response.content)
|
||||
|
||||
# Retrieve a CSRF token
|
||||
response = client.get('/dashboard')
|
||||
response = client.get('/')
|
||||
csrf_token = response.cookies[settings.CSRF_COOKIE_NAME].value # pylint: disable=no-member
|
||||
self.assertGreater(len(csrf_token), 0)
|
||||
|
||||
@@ -552,3 +257,410 @@ class CreditCourseViewSetTests(TestCase):
|
||||
# Verify the data was persisted
|
||||
credit_course = CreditCourse.objects.get(course_key=credit_course.course_key)
|
||||
self.assertTrue(credit_course.enabled)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditProviderViewSetTests(ApiTestCaseMixin, ReadOnlyMixin, AuthMixin, UserMixin, TestCase):
|
||||
""" Tests for CreditProviderViewSet. """
|
||||
list_path = 'credit:creditprovider-list'
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(CreditProviderViewSetTests, cls).setUpClass()
|
||||
cls.bayside = CreditProviderFactory(provider_id='bayside')
|
||||
cls.hogwarts = CreditProviderFactory(provider_id='hogwarts')
|
||||
cls.starfleet = CreditProviderFactory(provider_id='starfleet')
|
||||
|
||||
def test_list(self):
|
||||
""" Verify the endpoint returns a list of all CreditProvider objects. """
|
||||
response = self.client.get(self.path)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
expected = CreditProviderSerializer(CreditProvider.objects.all(), many=True).data
|
||||
self.assertEqual(response.data, expected)
|
||||
|
||||
@ddt.data(
|
||||
('bayside',),
|
||||
('hogwarts', 'starfleet')
|
||||
)
|
||||
def test_list_filtering(self, provider_ids):
|
||||
""" Verify the endpoint returns a list of all CreditProvider objects, filtered to contain only those objects
|
||||
associated with the given IDs. """
|
||||
url = '{}?provider_ids={}'.format(self.path, ','.join(provider_ids))
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
expected = CreditProviderSerializer(CreditProvider.objects.filter(provider_id__in=provider_ids),
|
||||
many=True).data
|
||||
self.assertEqual(response.data, expected)
|
||||
|
||||
def test_retrieve(self):
|
||||
""" Verify the endpoint returns the details for a single CreditProvider. """
|
||||
url = reverse('credit:creditprovider-detail', kwargs={'provider_id': self.bayside.provider_id})
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.data, CreditProviderSerializer(self.bayside).data)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditProviderRequestCreateViewTests(ApiTestCaseMixin, UserMixin, TestCase):
|
||||
""" Tests for CreditProviderRequestCreateView. """
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(CreditProviderRequestCreateViewTests, cls).setUpClass()
|
||||
cls.provider = CreditProviderFactory()
|
||||
|
||||
def setUp(self):
|
||||
super(CreditProviderRequestCreateViewTests, self).setUp()
|
||||
self.path = reverse('credit:create_request', kwargs={'provider_id': self.provider.provider_id})
|
||||
self.eligibility = CreditEligibilityFactory(username=self.user.username)
|
||||
|
||||
def post_credit_request(self, username, course_id):
|
||||
""" Create a credit request for the given user and course. """
|
||||
data = {
|
||||
'username': username,
|
||||
'course_key': unicode(course_id)
|
||||
}
|
||||
return self.client.post(self.path, json.dumps(data), content_type=JSON)
|
||||
|
||||
def test_post_with_provider_integration(self):
|
||||
""" Verify the endpoint can create a new credit request. """
|
||||
username = self.user.username
|
||||
course = self.eligibility.course
|
||||
course_key = course.course_key
|
||||
final_grade = 0.95
|
||||
|
||||
# Enable provider integration
|
||||
self.provider.enable_integration = True
|
||||
self.provider.save()
|
||||
|
||||
# Add a single credit requirement (final grade)
|
||||
requirement = CreditRequirement.objects.create(
|
||||
course=course,
|
||||
namespace='grade',
|
||||
name='grade',
|
||||
)
|
||||
|
||||
# Mark the user as having satisfied the requirement and eligible for credit.
|
||||
CreditRequirementStatus.objects.create(
|
||||
username=username,
|
||||
requirement=requirement,
|
||||
status='satisfied',
|
||||
reason={'final_grade': final_grade}
|
||||
)
|
||||
|
||||
secret_key = 'secret'
|
||||
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={self.provider.provider_id: secret_key}):
|
||||
response = self.post_credit_request(username, course_key)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# Check that the user's request status is pending
|
||||
request = CreditRequest.objects.get(username=username, course__course_key=course_key)
|
||||
self.assertEqual(request.status, 'pending')
|
||||
|
||||
# Check request parameters
|
||||
content = json.loads(response.content)
|
||||
parameters = content['parameters']
|
||||
|
||||
self.assertEqual(content['url'], self.provider.provider_url)
|
||||
self.assertEqual(content['method'], 'POST')
|
||||
self.assertEqual(len(parameters['request_uuid']), 32)
|
||||
self.assertEqual(parameters['course_org'], course_key.org)
|
||||
self.assertEqual(parameters['course_num'], course_key.course)
|
||||
self.assertEqual(parameters['course_run'], course_key.run)
|
||||
self.assertEqual(parameters['final_grade'], unicode(final_grade))
|
||||
self.assertEqual(parameters['user_username'], username)
|
||||
self.assertEqual(parameters['user_full_name'], self.user.get_full_name())
|
||||
self.assertEqual(parameters['user_mailing_address'], '')
|
||||
self.assertEqual(parameters['user_country'], '')
|
||||
|
||||
# The signature is going to change each test run because the request
|
||||
# is assigned a different UUID each time.
|
||||
# For this reason, we use the signature function directly
|
||||
# (the "signature" parameter will be ignored when calculating the signature).
|
||||
# Other unit tests verify that the signature function is working correctly.
|
||||
self.assertEqual(parameters['signature'], signature(parameters, secret_key))
|
||||
|
||||
def test_post_invalid_provider(self):
|
||||
""" Verify the endpoint returns HTTP 404 if the credit provider is not valid. """
|
||||
path = reverse('credit:create_request', kwargs={'provider_id': 'fake'})
|
||||
response = self.client.post(path, {})
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_post_no_username(self):
|
||||
""" Verify the endpoint returns HTTP 400 if no username is supplied. """
|
||||
response = self.post_credit_request(None, 'a/b/c')
|
||||
self.assert_error_response(response, 'A username must be specified.')
|
||||
|
||||
def test_post_invalid_course_key(self):
|
||||
""" Verify the endpoint returns HTTP 400 if the course is not a valid course key. """
|
||||
course_key = 'not-a-course-id'
|
||||
response = self.post_credit_request(self.user.username, course_key)
|
||||
self.assert_error_response(response, '[{}] is not a valid course key.'.format(course_key))
|
||||
|
||||
def test_post_user_not_eligible(self):
|
||||
""" Verify the endpoint returns HTTP 400 if the user is not eligible for credit for the course. """
|
||||
credit_course = CreditCourseFactory()
|
||||
username = 'ineligible-user'
|
||||
course_key = credit_course.course_key
|
||||
|
||||
response = self.post_credit_request(username, course_key)
|
||||
msg = '[{username}] is not eligible for credit for [{course_key}].'.format(username=username,
|
||||
course_key=course_key)
|
||||
self.assert_error_response(response, msg)
|
||||
|
||||
def test_post_permissions_staff(self):
|
||||
""" Verify staff users can create requests for any user. """
|
||||
admin = AdminFactory(password=self.password)
|
||||
self.client.logout()
|
||||
self.client.login(username=admin.username, password=self.password)
|
||||
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_post_other_user(self):
|
||||
""" Verify non-staff users cannot create requests for other users. """
|
||||
user = UserFactory(password=self.password)
|
||||
self.client.logout()
|
||||
self.client.login(username=user.username, password=self.password)
|
||||
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
def test_post_no_provider_integration(self):
|
||||
""" Verify the endpoint returns the provider URL if provider integration is not enabled. """
|
||||
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
expected = {
|
||||
'url': self.provider.provider_url,
|
||||
'method': 'GET',
|
||||
'parameters': {},
|
||||
}
|
||||
self.assertEqual(response.data, expected)
|
||||
|
||||
def test_post_secret_key_not_set(self):
|
||||
""" Verify the endpoint returns HTTP 400 if we attempt to create a
|
||||
request for a provider with no secret key set. """
|
||||
# Enable provider integration
|
||||
self.provider.enable_integration = True
|
||||
self.provider.save()
|
||||
|
||||
# Cannot initiate a request because we cannot sign it
|
||||
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
|
||||
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditProviderCallbackViewTests(UserMixin, TestCase):
|
||||
""" Tests for CreditProviderCallbackView. """
|
||||
|
||||
def setUp(self):
|
||||
super(CreditProviderCallbackViewTests, self).setUp()
|
||||
|
||||
# Authentication should NOT be required for this endpoint.
|
||||
self.client.logout()
|
||||
|
||||
self.provider = CreditProviderFactory()
|
||||
self.path = reverse('credit:provider_callback', args=[self.provider.provider_id])
|
||||
self.eligibility = CreditEligibilityFactory(username=self.user.username)
|
||||
|
||||
def _credit_provider_callback(self, request_uuid, status, **kwargs):
|
||||
"""
|
||||
Simulate a response from the credit provider approving
|
||||
or rejecting the credit request.
|
||||
|
||||
Arguments:
|
||||
request_uuid (str): The UUID of the credit request.
|
||||
status (str): The status of the credit request.
|
||||
|
||||
Keyword Arguments:
|
||||
provider_id (str): Identifier for the credit provider.
|
||||
secret_key (str): Shared secret key for signing messages.
|
||||
timestamp (datetime): Timestamp of the message.
|
||||
sig (str): Digital signature to use on messages.
|
||||
keys (dict): Override for CREDIT_PROVIDER_SECRET_KEYS setting.
|
||||
|
||||
"""
|
||||
provider_id = kwargs.get('provider_id', self.provider.provider_id)
|
||||
secret_key = kwargs.get('secret_key', '931433d583c84ca7ba41784bad3232e6')
|
||||
timestamp = kwargs.get('timestamp', to_timestamp(datetime.datetime.now(pytz.UTC)))
|
||||
keys = kwargs.get('keys', {self.provider.provider_id: secret_key})
|
||||
|
||||
url = reverse('credit:provider_callback', args=[provider_id])
|
||||
|
||||
parameters = {
|
||||
'request_uuid': request_uuid,
|
||||
'status': status,
|
||||
'timestamp': timestamp,
|
||||
}
|
||||
parameters['signature'] = kwargs.get('sig', signature(parameters, secret_key))
|
||||
|
||||
with override_settings(CREDIT_PROVIDER_SECRET_KEYS=keys):
|
||||
return self.client.post(url, data=json.dumps(parameters), content_type=JSON)
|
||||
|
||||
def _create_credit_request_and_get_uuid(self, username=None, course_key=None):
|
||||
""" Initiate a request for credit and return the request UUID. """
|
||||
username = username or self.user.username
|
||||
course = CreditCourse.objects.get(course_key=course_key) if course_key else self.eligibility.course
|
||||
credit_request = CreditRequestFactory(username=username, course=course, provider=self.provider)
|
||||
return credit_request.uuid
|
||||
|
||||
def _assert_request_status(self, uuid, expected_status):
|
||||
""" Check the status of a credit request. """
|
||||
request = CreditRequest.objects.get(uuid=uuid)
|
||||
self.assertEqual(request.status, expected_status)
|
||||
|
||||
def test_post_invalid_provider_id(self):
|
||||
""" Verify the endpoint returns HTTP 404 if the provider does not exist. """
|
||||
provider_id = 'fakey-provider'
|
||||
self.assertFalse(CreditProvider.objects.filter(provider_id=provider_id).exists())
|
||||
|
||||
path = reverse('credit:provider_callback', args=[provider_id])
|
||||
response = self.client.post(path, {})
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_post_with_invalid_signature(self):
|
||||
""" Verify the endpoint returns HTTP 403 if a request is received with an invalid signature. """
|
||||
request_uuid = self._create_credit_request_and_get_uuid()
|
||||
|
||||
# Simulate a callback from the credit provider with an invalid signature
|
||||
# Since the signature is invalid, we respond with a 403 Not Authorized.
|
||||
response = self._credit_provider_callback(request_uuid, "approved", sig="invalid")
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
@ddt.data(
|
||||
to_timestamp(datetime.datetime.now(pytz.UTC) - datetime.timedelta(0, 60 * 15 + 1)),
|
||||
'invalid'
|
||||
)
|
||||
def test_post_with_invalid_timestamp(self, timestamp):
|
||||
""" Verify HTTP 400 is returned for requests with an invalid timestamp. """
|
||||
request_uuid = self._create_credit_request_and_get_uuid()
|
||||
response = self._credit_provider_callback(request_uuid, 'approved', timestamp=timestamp)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_post_with_string_timestamp(self):
|
||||
""" Verify the endpoint supports timestamps transmitted as strings instead of integers. """
|
||||
request_uuid = self._create_credit_request_and_get_uuid()
|
||||
timestamp = str(to_timestamp(datetime.datetime.now(pytz.UTC)))
|
||||
response = self._credit_provider_callback(request_uuid, 'approved', timestamp=timestamp)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_credit_provider_callback_is_idempotent(self):
|
||||
""" Verify clients can make subsequent calls with the same status. """
|
||||
request_uuid = self._create_credit_request_and_get_uuid()
|
||||
|
||||
# Initially, the status should be "pending"
|
||||
self._assert_request_status(request_uuid, "pending")
|
||||
|
||||
# First call sets the status to approved
|
||||
self._credit_provider_callback(request_uuid, 'approved')
|
||||
self._assert_request_status(request_uuid, "approved")
|
||||
|
||||
# Second call succeeds as well; status is still approved
|
||||
self._credit_provider_callback(request_uuid, 'approved')
|
||||
self._assert_request_status(request_uuid, "approved")
|
||||
|
||||
def test_credit_provider_invalid_status(self):
|
||||
""" Verify requests with an invalid status value return HTTP 400. """
|
||||
request_uuid = self._create_credit_request_and_get_uuid()
|
||||
response = self._credit_provider_callback(request_uuid, 'invalid')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_request_associated_with_another_provider(self):
|
||||
""" Verify the endpoint returns HTTP 404 if a request is received for the incorrect provider. """
|
||||
other_provider_id = 'other-provider'
|
||||
other_provider_secret_key = '1d01f067a5a54b0b8059f7095a7c636d'
|
||||
|
||||
# Create an additional credit provider
|
||||
CreditProvider.objects.create(provider_id=other_provider_id, enable_integration=True)
|
||||
|
||||
# Initiate a credit request with the first provider
|
||||
request_uuid = self._create_credit_request_and_get_uuid()
|
||||
|
||||
# Attempt to update the request status for a different provider
|
||||
response = self._credit_provider_callback(
|
||||
request_uuid,
|
||||
'approved',
|
||||
provider_id=other_provider_id,
|
||||
secret_key=other_provider_secret_key,
|
||||
keys={other_provider_id: other_provider_secret_key}
|
||||
)
|
||||
|
||||
# Response should be a 404 to avoid leaking request UUID values to other providers.
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
# Request status should still be 'pending'
|
||||
self._assert_request_status(request_uuid, 'pending')
|
||||
|
||||
def test_credit_provider_key_not_configured(self):
|
||||
""" Verify the endpoint returns HTTP 403 if the provider has no key configured. """
|
||||
request_uuid = self._create_credit_request_and_get_uuid()
|
||||
|
||||
# Callback from the provider is not authorized, because the shared secret isn't configured.
|
||||
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
|
||||
response = self._credit_provider_callback(request_uuid, 'approved', keys={})
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CreditEligibilityViewTests(AuthMixin, UserMixin, ReadOnlyMixin, TestCase):
|
||||
""" Tests for CreditEligibilityView. """
|
||||
view_name = 'credit:eligibility_details'
|
||||
|
||||
def setUp(self):
|
||||
super(CreditEligibilityViewTests, self).setUp()
|
||||
self.eligibility = CreditEligibilityFactory(username=self.user.username)
|
||||
self.path = self.create_url(self.eligibility)
|
||||
|
||||
def create_url(self, eligibility):
|
||||
""" Returns a URL that can be used to view eligibility data. """
|
||||
return '{path}?username={username}&course_key={course_key}'.format(
|
||||
path=reverse(self.view_name),
|
||||
username=eligibility.username,
|
||||
course_key=eligibility.course.course_key
|
||||
)
|
||||
|
||||
def assert_valid_get_response(self, eligibility):
|
||||
""" Ensure the endpoint returns the correct eligibility data. """
|
||||
url = self.create_url(eligibility)
|
||||
response = self.client.get(url)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertListEqual(response.data, CreditEligibilitySerializer([eligibility], many=True).data)
|
||||
|
||||
def test_get(self):
|
||||
""" Verify the endpoint returns eligibility information for the give user and course. """
|
||||
self.assert_valid_get_response(self.eligibility)
|
||||
|
||||
def test_get_with_missing_parameters(self):
|
||||
""" Verify the endpoint returns HTTP status 400 if either the username or course_key querystring argument
|
||||
is not provided. """
|
||||
response = self.client.get(reverse(self.view_name))
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertDictEqual(response.data,
|
||||
{'detail': 'Both the course_key and username querystring parameters must be supplied.'})
|
||||
|
||||
def test_get_with_invalid_course_key(self):
|
||||
""" Verify the endpoint returns HTTP status 400 if the provided course_key is not an actual CourseKey. """
|
||||
url = '{}?username=edx&course_key=a'.format(reverse(self.view_name))
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertDictEqual(response.data, {'detail': '[a] is not a valid course key.'})
|
||||
|
||||
def test_staff_can_view_all(self):
|
||||
""" Verify that staff users can view eligibility data for all users. """
|
||||
staff = AdminFactory(password=self.password)
|
||||
self.client.logout()
|
||||
self.client.login(username=staff.username, password=self.password)
|
||||
self.assert_valid_get_response(self.eligibility)
|
||||
|
||||
def test_nonstaff_can_only_view_own_data(self):
|
||||
""" Verify that non-staff users can only view their own eligibility data. """
|
||||
user = UserFactory(password=self.password)
|
||||
self.client.logout()
|
||||
self.client.login(username=user.username, password=self.password)
|
||||
response = self.client.get(self.path)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
@@ -3,47 +3,25 @@ URLs for the credit app.
|
||||
"""
|
||||
from django.conf.urls import patterns, url, include
|
||||
|
||||
from openedx.core.djangoapps.credit import views, routers
|
||||
from openedx.core.djangoapps.credit.api.provider import get_credit_provider_info
|
||||
from openedx.core.djangoapps.credit import views, routers, models
|
||||
|
||||
PROVIDER_ID_PATTERN = r'(?P<provider_id>[^/]+)'
|
||||
PROVIDER_ID_PATTERN = r'(?P<provider_id>{})'.format(models.CREDIT_PROVIDER_ID_REGEX)
|
||||
|
||||
PROVIDER_URLS = patterns(
|
||||
'',
|
||||
url(r'^request/$', views.CreditProviderRequestCreateView.as_view(), name='create_request'),
|
||||
url(r'^callback/?$', views.CreditProviderCallbackView.as_view(), name='provider_callback'),
|
||||
)
|
||||
|
||||
V1_URLS = patterns(
|
||||
'',
|
||||
|
||||
url(
|
||||
r'^providers/$',
|
||||
views.get_providers_detail,
|
||||
name='providers_detail'
|
||||
),
|
||||
|
||||
url(
|
||||
r'^providers/{provider_id}/$'.format(provider_id=PROVIDER_ID_PATTERN),
|
||||
get_credit_provider_info,
|
||||
name='get_provider_info'
|
||||
),
|
||||
|
||||
url(
|
||||
r'^providers/{provider_id}/request/$'.format(provider_id=PROVIDER_ID_PATTERN),
|
||||
views.create_credit_request,
|
||||
name='create_request'
|
||||
),
|
||||
|
||||
url(
|
||||
r'^providers/{provider_id}/callback/?$'.format(provider_id=PROVIDER_ID_PATTERN),
|
||||
views.credit_provider_callback,
|
||||
name='provider_callback'
|
||||
),
|
||||
|
||||
url(
|
||||
r'^eligibility/$',
|
||||
views.get_eligibility_for_user,
|
||||
name='eligibility_details'
|
||||
),
|
||||
url(r'^providers/{}/'.format(PROVIDER_ID_PATTERN), include(PROVIDER_URLS)),
|
||||
url(r'^eligibility/$', views.CreditEligibilityView.as_view(), name='eligibility_details'),
|
||||
)
|
||||
|
||||
router = routers.SimpleRouter() # pylint: disable=invalid-name
|
||||
router.register(r'courses', views.CreditCourseViewSet)
|
||||
router.register(r'providers', views.CreditProviderViewSet)
|
||||
V1_URLS += router.urls
|
||||
|
||||
urlpatterns = patterns(
|
||||
|
||||
@@ -1,376 +1,157 @@
|
||||
"""
|
||||
Views for the credit Django app.
|
||||
"""
|
||||
import datetime
|
||||
import json
|
||||
from __future__ import unicode_literals
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import (
|
||||
HttpResponse,
|
||||
HttpResponseBadRequest,
|
||||
HttpResponseForbidden,
|
||||
Http404
|
||||
)
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.decorators.http import require_POST, require_GET
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
import pytz
|
||||
from rest_framework import viewsets, mixins, permissions
|
||||
from rest_framework import viewsets, mixins, permissions, views, generics
|
||||
from rest_framework.authentication import SessionAuthentication
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.response import Response
|
||||
from rest_framework_oauth.authentication import OAuth2Authentication
|
||||
|
||||
from openedx.core.djangoapps.credit import api
|
||||
from openedx.core.djangoapps.credit.exceptions import CreditApiBadRequest, CreditRequestNotFound
|
||||
from openedx.core.djangoapps.credit.models import CreditCourse
|
||||
from openedx.core.djangoapps.credit.serializers import CreditCourseSerializer
|
||||
from openedx.core.djangoapps.credit.signature import signature, get_shared_secret_key
|
||||
from openedx.core.djangoapps.credit.api import create_credit_request
|
||||
from openedx.core.djangoapps.credit.exceptions import (UserNotEligibleException, InvalidCourseKey, CreditApiBadRequest,
|
||||
InvalidCreditRequest)
|
||||
from openedx.core.djangoapps.credit.models import (CreditCourse, CreditProvider, CREDIT_PROVIDER_ID_REGEX,
|
||||
CreditEligibility, CreditRequest)
|
||||
from openedx.core.djangoapps.credit.serializers import (CreditCourseSerializer, CreditProviderSerializer,
|
||||
CreditEligibilitySerializer, CreditProviderCallbackSerializer)
|
||||
from openedx.core.lib.api.mixins import PutAsCreateMixin
|
||||
from util.date_utils import from_timestamp
|
||||
from util.json_request import JsonResponse
|
||||
from openedx.core.lib.api.permissions import IsStaffOrOwner
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@require_GET
|
||||
def get_providers_detail(request):
|
||||
"""
|
||||
class CreditProviderViewSet(viewsets.ReadOnlyModelViewSet):
|
||||
""" Credit provider endpoints. """
|
||||
|
||||
**User Cases**
|
||||
lookup_field = 'provider_id'
|
||||
lookup_value_regex = CREDIT_PROVIDER_ID_REGEX
|
||||
authentication_classes = (OAuth2Authentication, SessionAuthentication,)
|
||||
pagination_class = None
|
||||
permission_classes = (permissions.IsAuthenticated,)
|
||||
queryset = CreditProvider.objects.all()
|
||||
serializer_class = CreditProviderSerializer
|
||||
|
||||
Returns details of the credit providers filtered by provided query parameters.
|
||||
def filter_queryset(self, queryset):
|
||||
queryset = super(CreditProviderViewSet, self).filter_queryset(queryset)
|
||||
|
||||
**Parameters:**
|
||||
# Filter by provider ID
|
||||
provider_ids = self.request.GET.get('provider_ids', None)
|
||||
|
||||
* provider_id (list of provider ids separated with ","): The identifiers for the providers for which
|
||||
user requested
|
||||
if provider_ids:
|
||||
provider_ids = provider_ids.split(',')
|
||||
queryset = queryset.filter(provider_id__in=provider_ids)
|
||||
|
||||
**Example Usage:**
|
||||
|
||||
GET /api/credit/v1/providers?provider_id=asu,hogwarts
|
||||
"response": [
|
||||
"id": "hogwarts",
|
||||
"display_name": "Hogwarts School of Witchcraft and Wizardry",
|
||||
"url": "https://credit.example.com/",
|
||||
"status_url": "https://credit.example.com/status/",
|
||||
"description": "A new model for the Witchcraft and Wizardry School System.",
|
||||
"enable_integration": false,
|
||||
"fulfillment_instructions": "
|
||||
<p>In order to fulfill credit, Hogwarts School of Witchcraft and Wizardry requires learners to:</p>
|
||||
<ul>
|
||||
<li>Sample instruction abc</li>
|
||||
<li>Sample instruction xyz</li>
|
||||
</ul>",
|
||||
},
|
||||
...
|
||||
]
|
||||
|
||||
**Responses:**
|
||||
|
||||
* 200 OK: The request was created successfully. Returned content
|
||||
is a JSON-encoded dictionary describing what the client should
|
||||
send to the credit provider.
|
||||
|
||||
* 404 Not Found: The provider does not exist.
|
||||
|
||||
"""
|
||||
provider_ids = request.GET.get("provider_ids", None)
|
||||
providers_list = provider_ids.split(",") if provider_ids else None
|
||||
providers = api.get_credit_providers(providers_list)
|
||||
return JsonResponse(providers)
|
||||
return queryset
|
||||
|
||||
|
||||
@require_POST
|
||||
def create_credit_request(request, provider_id):
|
||||
"""
|
||||
Initiate a request for credit in a course.
|
||||
class CreditProviderRequestCreateView(views.APIView):
|
||||
""" Creates a credit request for the given user and course, if the user is eligible for credit."""
|
||||
|
||||
This end-point will get-or-create a record in the database to track
|
||||
the request. It will then calculate the parameters to send to
|
||||
the credit provider and digitally sign the parameters, using a secret
|
||||
key shared with the credit provider.
|
||||
authentication_classes = (OAuth2Authentication, SessionAuthentication,)
|
||||
permission_classes = (permissions.IsAuthenticated, IsStaffOrOwner,)
|
||||
|
||||
The user's browser is responsible for POSTing these parameters
|
||||
directly to the credit provider.
|
||||
def post(self, request, provider_id):
|
||||
""" POST handler. """
|
||||
# Get the provider, or return HTTP 404 if it doesn't exist
|
||||
provider = generics.get_object_or_404(CreditProvider, provider_id=provider_id)
|
||||
|
||||
**Example Usage:**
|
||||
# Validate the course key
|
||||
course_key = request.data.get('course_key')
|
||||
try:
|
||||
course_key = CourseKey.from_string(course_key)
|
||||
except InvalidKeyError:
|
||||
raise InvalidCourseKey(course_key)
|
||||
|
||||
POST /api/credit/v1/providers/hogwarts/request/
|
||||
{
|
||||
"username": "ron",
|
||||
"course_key": "edX/DemoX/Demo_Course"
|
||||
}
|
||||
# Validate the username
|
||||
username = request.data.get('username')
|
||||
if not username:
|
||||
raise ValidationError({'detail': 'A username must be specified.'})
|
||||
|
||||
Response: 200 OK
|
||||
Content-Type: application/json
|
||||
{
|
||||
"url": "http://example.com/request-credit",
|
||||
"method": "POST",
|
||||
"parameters": {
|
||||
request_uuid: "557168d0f7664fe59097106c67c3f847"
|
||||
timestamp: 1434631630,
|
||||
course_org: "ASUx"
|
||||
course_num: "DemoX"
|
||||
course_run: "1T2015"
|
||||
final_grade: "0.95",
|
||||
user_username: "john",
|
||||
user_email: "john@example.com"
|
||||
user_full_name: "John Smith"
|
||||
user_mailing_address: "",
|
||||
user_country: "US",
|
||||
signature: "cRCNjkE4IzY+erIjRwOQCpRILgOvXx4q2qvx141BCqI="
|
||||
}
|
||||
}
|
||||
# Ensure the user is actually eligible to receive credit
|
||||
if not CreditEligibility.is_user_eligible_for_credit(course_key, username):
|
||||
raise UserNotEligibleException(course_key, username)
|
||||
|
||||
**Parameters:**
|
||||
try:
|
||||
credit_request = create_credit_request(course_key, provider.provider_id, username)
|
||||
return Response(credit_request)
|
||||
except CreditApiBadRequest as ex:
|
||||
raise InvalidCreditRequest(ex.message)
|
||||
|
||||
* username (unicode): The username of the user requesting credit.
|
||||
|
||||
* course_key (unicode): The identifier for the course for which the user
|
||||
is requesting credit.
|
||||
class CreditProviderCallbackView(views.APIView):
|
||||
""" Callback used by credit providers to update credit request status. """
|
||||
|
||||
**Responses:**
|
||||
# This endpoint should be open to all external credit providers.
|
||||
authentication_classes = ()
|
||||
permission_classes = ()
|
||||
|
||||
* 200 OK: The request was created successfully. Returned content
|
||||
is a JSON-encoded dictionary describing what the client should
|
||||
send to the credit provider.
|
||||
@method_decorator(csrf_exempt)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
return super(CreditProviderCallbackView, self).dispatch(request, *args, **kwargs)
|
||||
|
||||
* 400 Bad Request:
|
||||
- The provided course key did not correspond to a valid credit course.
|
||||
- The user already has a completed credit request for this course and provider.
|
||||
def post(self, request, provider_id):
|
||||
""" POST handler. """
|
||||
provider = generics.get_object_or_404(CreditProvider, provider_id=provider_id)
|
||||
data = request.data
|
||||
|
||||
* 403 Not Authorized:
|
||||
- The username does not match the name of the logged in user.
|
||||
- The user is not eligible for credit in the course.
|
||||
# Ensure the input data is valid
|
||||
serializer = CreditProviderCallbackSerializer(data=data, provider=provider)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
|
||||
* 404 Not Found:
|
||||
- The provider does not exist.
|
||||
# Update the credit request status
|
||||
request_uuid = data['request_uuid']
|
||||
new_status = data['status']
|
||||
credit_request = generics.get_object_or_404(CreditRequest, uuid=request_uuid, provider=provider)
|
||||
old_status = credit_request.status
|
||||
credit_request.status = new_status
|
||||
credit_request.save()
|
||||
|
||||
"""
|
||||
response, parameters = _validate_json_parameters(request.body, ["username", "course_key"])
|
||||
if response is not None:
|
||||
return response
|
||||
|
||||
try:
|
||||
course_key = CourseKey.from_string(parameters["course_key"])
|
||||
except InvalidKeyError:
|
||||
return HttpResponseBadRequest(
|
||||
u'Could not parse "{course_key}" as a course key'.format(
|
||||
course_key=parameters["course_key"]
|
||||
)
|
||||
log.info(
|
||||
'Updated [%s] CreditRequest [%s] from status [%s] to [%s].',
|
||||
provider_id, request_uuid, old_status, new_status
|
||||
)
|
||||
|
||||
# Check user authorization
|
||||
if not (request.user and request.user.username == parameters["username"]):
|
||||
log.warning(
|
||||
u'User with ID %s attempted to initiate a credit request for user with username "%s"',
|
||||
request.user.id if request.user else "[Anonymous]",
|
||||
parameters["username"]
|
||||
return Response()
|
||||
|
||||
|
||||
class CreditEligibilityView(generics.ListAPIView):
|
||||
""" Returns eligibility for a user-course combination. """
|
||||
|
||||
authentication_classes = (OAuth2Authentication, SessionAuthentication,)
|
||||
pagination_class = None
|
||||
permission_classes = (permissions.IsAuthenticated, IsStaffOrOwner)
|
||||
serializer_class = CreditEligibilitySerializer
|
||||
queryset = CreditEligibility.objects.all()
|
||||
|
||||
def filter_queryset(self, queryset):
|
||||
username = self.request.GET.get('username')
|
||||
course_key = self.request.GET.get('course_key')
|
||||
|
||||
if not (username and course_key):
|
||||
raise ValidationError(
|
||||
{'detail': 'Both the course_key and username querystring parameters must be supplied.'})
|
||||
|
||||
course_key = unicode(course_key)
|
||||
|
||||
try:
|
||||
course_key = CourseKey.from_string(course_key)
|
||||
except InvalidKeyError:
|
||||
raise ValidationError({'detail': '[{}] is not a valid course key.'.format(course_key)})
|
||||
return queryset.filter(
|
||||
username=username,
|
||||
course__course_key=course_key,
|
||||
deadline__gt=datetime.datetime.now(pytz.UTC)
|
||||
)
|
||||
return HttpResponseForbidden("Users are not allowed to initiate credit requests for other users.")
|
||||
|
||||
# Initiate the request
|
||||
try:
|
||||
credit_request = api.create_credit_request(course_key, provider_id, parameters["username"])
|
||||
except CreditApiBadRequest as ex:
|
||||
return HttpResponseBadRequest(ex)
|
||||
else:
|
||||
return JsonResponse(credit_request)
|
||||
|
||||
|
||||
@require_POST
|
||||
@csrf_exempt
|
||||
def credit_provider_callback(request, provider_id):
|
||||
"""
|
||||
Callback end-point used by credit providers to approve or reject
|
||||
a request for credit.
|
||||
|
||||
**Example Usage:**
|
||||
|
||||
POST /api/credit/v1/providers/{provider-id}/callback
|
||||
{
|
||||
"request_uuid": "557168d0f7664fe59097106c67c3f847",
|
||||
"status": "approved",
|
||||
"timestamp": 1434631630,
|
||||
"signature": "cRCNjkE4IzY+erIjRwOQCpRILgOvXx4q2qvx141BCqI="
|
||||
}
|
||||
|
||||
Response: 200 OK
|
||||
|
||||
**Parameters:**
|
||||
|
||||
* request_uuid (string): The UUID of the request.
|
||||
|
||||
* status (string): Either "approved" or "rejected".
|
||||
|
||||
* timestamp (int or string): The datetime at which the POST request was made, represented
|
||||
as the number of seconds since January 1, 1970 00:00:00 UTC.
|
||||
If the timestamp is a string, it will be converted to an integer.
|
||||
|
||||
* signature (string): A digital signature of the request parameters,
|
||||
created using a secret key shared with the credit provider.
|
||||
|
||||
**Responses:**
|
||||
|
||||
* 200 OK: The user's status was updated successfully.
|
||||
|
||||
* 400 Bad request: The provided parameters were not valid.
|
||||
Response content will be a JSON-encoded string describing the error.
|
||||
|
||||
* 403 Forbidden: Signature was invalid or timestamp was too far in the past.
|
||||
|
||||
* 404 Not Found: Could not find a request with the specified UUID associated with this provider.
|
||||
|
||||
"""
|
||||
response, parameters = _validate_json_parameters(request.body, [
|
||||
"request_uuid", "status", "timestamp", "signature"
|
||||
])
|
||||
if response is not None:
|
||||
return response
|
||||
|
||||
# Validate the digital signature of the request.
|
||||
# This ensures that the message came from the credit provider
|
||||
# and hasn't been tampered with.
|
||||
response = _validate_signature(parameters, provider_id)
|
||||
if response is not None:
|
||||
return response
|
||||
|
||||
# Validate the timestamp to ensure that the request is timely.
|
||||
response = _validate_timestamp(parameters["timestamp"], provider_id)
|
||||
if response is not None:
|
||||
return response
|
||||
|
||||
# Update the credit request status
|
||||
try:
|
||||
api.update_credit_request_status(parameters["request_uuid"], provider_id, parameters["status"])
|
||||
except CreditRequestNotFound:
|
||||
raise Http404
|
||||
except CreditApiBadRequest as ex:
|
||||
return HttpResponseBadRequest(ex)
|
||||
else:
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
@require_GET
|
||||
def get_eligibility_for_user(request):
|
||||
"""
|
||||
|
||||
**User Cases**
|
||||
|
||||
Retrieve user eligibility against course.
|
||||
|
||||
**Parameters:**
|
||||
|
||||
* course_key (unicode): Identifier of course.
|
||||
* username (unicode): Username of current User.
|
||||
|
||||
**Example Usage:**
|
||||
|
||||
GET /api/credit/v1/eligibility?username=user&course_key=edX/Demo_101/Fall
|
||||
"response": {
|
||||
"course_key": "edX/Demo_101/Fall",
|
||||
"deadline": "2015-10-23"
|
||||
}
|
||||
|
||||
**Responses:**
|
||||
|
||||
* 200 OK: The request was created successfully.
|
||||
|
||||
* 404 Not Found: The provider does not exist.
|
||||
|
||||
"""
|
||||
course_key = request.GET.get("course_key", None)
|
||||
username = request.GET.get("username", None)
|
||||
return JsonResponse(api.get_eligibilities_for_user(username=username, course_key=course_key))
|
||||
|
||||
|
||||
def _validate_json_parameters(params_string, expected_parameters):
|
||||
"""
|
||||
Load the request parameters as a JSON dictionary and check that
|
||||
all required paramters are present.
|
||||
|
||||
Arguments:
|
||||
params_string (unicode): The JSON-encoded parameter dictionary.
|
||||
expected_parameters (list): Required keys of the parameters dictionary.
|
||||
|
||||
Returns: tuple of (HttpResponse, dict)
|
||||
|
||||
"""
|
||||
try:
|
||||
parameters = json.loads(params_string)
|
||||
except (TypeError, ValueError):
|
||||
return HttpResponseBadRequest("Could not parse the request body as JSON."), None
|
||||
|
||||
if not isinstance(parameters, dict):
|
||||
return HttpResponseBadRequest("Request parameters must be a JSON-encoded dictionary."), None
|
||||
|
||||
missing_params = set(expected_parameters) - set(parameters.keys())
|
||||
if missing_params:
|
||||
msg = u"Required parameters are missing: {missing}".format(missing=u", ".join(missing_params))
|
||||
return HttpResponseBadRequest(msg), None
|
||||
|
||||
return None, parameters
|
||||
|
||||
|
||||
def _validate_signature(parameters, provider_id):
|
||||
"""
|
||||
Check that the signature from the credit provider is valid.
|
||||
|
||||
Arguments:
|
||||
parameters (dict): Parameters received from the credit provider.
|
||||
provider_id (unicode): Identifier for the credit provider.
|
||||
|
||||
Returns:
|
||||
HttpResponseForbidden or None
|
||||
|
||||
"""
|
||||
secret_key = get_shared_secret_key(provider_id)
|
||||
if secret_key is None:
|
||||
log.error(
|
||||
(
|
||||
u'Could not retrieve secret key for credit provider with ID "%s". '
|
||||
u'Since no key has been configured, we cannot validate requests from the credit provider.'
|
||||
), provider_id
|
||||
)
|
||||
return HttpResponseForbidden("Credit provider credentials have not been configured.")
|
||||
|
||||
if signature(parameters, secret_key) != parameters["signature"]:
|
||||
log.warning(u'Request from credit provider with ID "%s" had an invalid signature', parameters["signature"])
|
||||
return HttpResponseForbidden("Invalid signature.")
|
||||
|
||||
|
||||
def _validate_timestamp(timestamp_value, provider_id):
|
||||
"""
|
||||
Check that the timestamp of the request is recent.
|
||||
|
||||
Arguments:
|
||||
timestamp (int or string): Number of seconds since Jan. 1, 1970 UTC.
|
||||
If specified as a string, it will be converted to an integer.
|
||||
provider_id (unicode): Identifier for the credit provider.
|
||||
|
||||
Returns:
|
||||
HttpResponse or None
|
||||
|
||||
"""
|
||||
timestamp = from_timestamp(timestamp_value)
|
||||
if timestamp is None:
|
||||
msg = u'"{timestamp}" is not a valid timestamp'.format(timestamp=timestamp_value)
|
||||
log.warning(msg)
|
||||
return HttpResponseBadRequest(msg)
|
||||
|
||||
# Check that the timestamp is recent
|
||||
elapsed_seconds = (datetime.datetime.now(pytz.UTC) - timestamp).total_seconds()
|
||||
if elapsed_seconds > settings.CREDIT_PROVIDER_TIMESTAMP_EXPIRATION:
|
||||
log.warning(
|
||||
(
|
||||
u'Timestamp %s is too far in the past (%s seconds), '
|
||||
u'so we are rejecting the notification from the credit provider "%s".'
|
||||
),
|
||||
timestamp_value, elapsed_seconds, provider_id,
|
||||
)
|
||||
return HttpResponseForbidden(u"Timestamp is too far in the past.")
|
||||
|
||||
|
||||
class CreditCourseViewSet(PutAsCreateMixin, mixins.UpdateModelMixin, viewsets.ReadOnlyModelViewSet):
|
||||
|
||||
@@ -80,3 +80,16 @@ class IsStaffOrReadOnly(permissions.BasePermission):
|
||||
return (request.user.is_staff or
|
||||
CourseStaffRole(obj.course_id).has_user(request.user) or
|
||||
request.method in permissions.SAFE_METHODS)
|
||||
|
||||
|
||||
class IsStaffOrOwner(permissions.BasePermission):
|
||||
"""
|
||||
Permission that allows access to admin users or the owner of an object.
|
||||
The owner is considered the User object represented by obj.user.
|
||||
"""
|
||||
def has_object_permission(self, request, view, obj):
|
||||
return request.user.is_staff or obj.user == request.user
|
||||
|
||||
def has_permission(self, request, view):
|
||||
user = request.user
|
||||
return user.is_staff or (user.username == request.data.get('username'))
|
||||
|
||||
65
openedx/core/lib/api/tests/test_permissions.py
Normal file
65
openedx/core/lib/api/tests/test_permissions.py
Normal file
@@ -0,0 +1,65 @@
|
||||
""" Tests for API permissions classes. """
|
||||
from django.test import TestCase, RequestFactory
|
||||
|
||||
from openedx.core.lib.api.permissions import IsStaffOrOwner
|
||||
from student.tests.factories import UserFactory
|
||||
|
||||
|
||||
class TestObject(object):
|
||||
""" Fake class for object permission tests. """
|
||||
user = None
|
||||
|
||||
|
||||
class IsStaffOrOwnerTests(TestCase):
|
||||
""" Tests for IsStaffOrOwner permission class. """
|
||||
def setUp(self):
|
||||
super(IsStaffOrOwnerTests, self).setUp()
|
||||
self.permission = IsStaffOrOwner()
|
||||
self.request = RequestFactory().get('/')
|
||||
self.obj = TestObject()
|
||||
|
||||
def assert_user_has_object_permission(self, user, permitted):
|
||||
"""
|
||||
Asserts whether or not the user has permission to access an object.
|
||||
|
||||
Arguments
|
||||
user (User)
|
||||
permitted (boolean)
|
||||
"""
|
||||
self.request.user = user
|
||||
self.assertEqual(self.permission.has_object_permission(self.request, None, self.obj), permitted)
|
||||
|
||||
def test_staff_user(self):
|
||||
""" Staff users should be permitted. """
|
||||
user = UserFactory.create(is_staff=True)
|
||||
self.assert_user_has_object_permission(user, True)
|
||||
|
||||
def test_owner(self):
|
||||
""" Owners should be permitted. """
|
||||
user = UserFactory.create()
|
||||
self.obj.user = user
|
||||
self.assert_user_has_object_permission(user, True)
|
||||
|
||||
def test_non_staff_test_non_owner_or_staff_user(self):
|
||||
""" Non-staff and non-owner users should not be permitted. """
|
||||
user = UserFactory.create()
|
||||
self.assert_user_has_object_permission(user, False)
|
||||
|
||||
def test_has_permission_as_staff(self):
|
||||
""" Staff users always have permission. """
|
||||
self.request.user = UserFactory.create(is_staff=True)
|
||||
self.assertTrue(self.permission.has_permission(self.request, None))
|
||||
|
||||
def test_has_permission_as_owner(self):
|
||||
""" Owners always have permission. """
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory().get('/?username={}'.format(user.username))
|
||||
request.user = user
|
||||
self.assertTrue(self.permission.has_permission(request, None))
|
||||
|
||||
def test_has_permission_as_non_owner(self):
|
||||
""" Non-owners should not have permission. """
|
||||
user = UserFactory.create()
|
||||
request = RequestFactory().get('/?username={}'.format(user.username))
|
||||
request.user = UserFactory.create()
|
||||
self.assertFalse(self.permission.has_permission(request, None))
|
||||
Reference in New Issue
Block a user