Merge pull request #13755 from Microsoft/cdodge/restricted-oauth2-applications
Cdodge/restricted oauth2 applications
This commit is contained in:
@@ -1201,3 +1201,10 @@ DOC_LINK_BASE_URL = None
|
||||
|
||||
# Theme directory locale paths
|
||||
COMPREHENSIVE_THEME_LOCALE_PATHS = []
|
||||
|
||||
# This is required for the migrations in oauth_dispatch.models
|
||||
# otherwise it fails saying this attribute is not present in Settings
|
||||
# Although Studio does not exable OAuth2 Provider capability, the new approach
|
||||
# to generating test databases will discover and try to create all tables
|
||||
# and this setting needs to be present
|
||||
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
|
||||
|
||||
@@ -331,7 +331,3 @@ FEATURES['CUSTOM_COURSES_EDX'] = True
|
||||
|
||||
# API access management -- needed for simple-history to run.
|
||||
INSTALLED_APPS += ('openedx.core.djangoapps.api_admin',)
|
||||
|
||||
# Set the default Oauth2 Provider Model so that migrations can run in
|
||||
# verbose mode
|
||||
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
|
||||
|
||||
@@ -451,9 +451,14 @@ OAUTH2_PROVIDER = {
|
||||
'read': 'Read scope',
|
||||
'write': 'Write scope',
|
||||
'email': 'Email scope',
|
||||
'profile': 'Profile scope',
|
||||
}
|
||||
# conform profile scope message that is presented to end-user
|
||||
# to lms/templates/provider/authorize.html. This may be revised later.
|
||||
'profile': 'Read your user profile',
|
||||
},
|
||||
}
|
||||
# This is required for the migrations in oauth_dispatch.models
|
||||
# otherwise it fails saying this attribute is not present in Settings
|
||||
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
|
||||
|
||||
################################## TEMPLATE CONFIGURATION #####################################
|
||||
# Mako templating
|
||||
|
||||
@@ -582,10 +582,6 @@ JWT_AUTH.update({
|
||||
'JWT_AUDIENCE': 'test-key',
|
||||
})
|
||||
|
||||
# Set the default Oauth2 Provider Model so that migrations can run in
|
||||
# verbose mode
|
||||
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
|
||||
|
||||
COURSE_CATALOG_API_URL = 'https://catalog.example.com/api/v1'
|
||||
|
||||
COMPREHENSIVE_THEME_DIRS = [REPO_ROOT / "themes", REPO_ROOT / "common/test"]
|
||||
|
||||
@@ -15,3 +15,41 @@
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// For the django-oauth-toolkit Authorization view
|
||||
.wrapper-authorize {
|
||||
background: $white;
|
||||
width: 60%;
|
||||
padding-right: 140px;
|
||||
padding-left: 140px;
|
||||
|
||||
font-family: $sans-serif;
|
||||
|
||||
h1 {
|
||||
@extend %t-title4;
|
||||
margin-bottom: 0;
|
||||
margin-left: 0;
|
||||
padding: $baseline;
|
||||
padding-left: 0px;
|
||||
@include text-align(left);
|
||||
}
|
||||
|
||||
p {
|
||||
@extend %t-copy-base;
|
||||
margin: $baseline/2 0;
|
||||
}
|
||||
|
||||
.control-group {
|
||||
float: right;
|
||||
}
|
||||
|
||||
.btn-authorization-allow {
|
||||
@extend %btn-primary-blue;
|
||||
margin-left: 20px;
|
||||
line-height: 0.7em;
|
||||
}
|
||||
|
||||
.btn-authorization-cancel {
|
||||
@extend %btn-secondary-blue-outline;
|
||||
}
|
||||
}
|
||||
|
||||
51
lms/templates/oauth2_provider/authorize.html
Normal file
51
lms/templates/oauth2_provider/authorize.html
Normal file
@@ -0,0 +1,51 @@
|
||||
{% extends "main_django.html" %}
|
||||
{% load i18n configuration %}
|
||||
|
||||
{% block title %}
|
||||
{% trans "Authorize" %} | {% platform_name %}
|
||||
{% endblock %}
|
||||
|
||||
{% block body %}
|
||||
<main id="main" aria-label="Content" tabindex="-1">
|
||||
<section class="container authorize {{ selected_tab }}" id="authorize-content">
|
||||
<div class="wrapper-authorize">
|
||||
<div class="block-center">
|
||||
{% if not error %}
|
||||
<form id="authorizationForm" method="post">
|
||||
<h1 class="block-center-heading">{% trans "Authorize" %} {{ application.name }}?</h1>
|
||||
{% csrf_token %}
|
||||
|
||||
{% for field in form %}
|
||||
{% if field.is_hidden %}
|
||||
{{ field }}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
<p>{% trans "The above application requests the following permissions from your account:" %}</p>
|
||||
<ul>
|
||||
{% for scope in scopes_descriptions %}
|
||||
<li>{{ scope }}</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
<p>{% trans "Please click the 'Allow' button to grant these permissions to the above application. Otherwise, to withhold these permissions, please click the 'Cancel' button." %}
|
||||
</p>
|
||||
|
||||
{{ form.errors }}
|
||||
{{ form.non_field_errors }}
|
||||
|
||||
<div class="control-group">
|
||||
<div class="controls">
|
||||
<button type="submit" class="btn btn-authorization-cancel" name="cancel"/>{% trans "Cancel" %}</button><button type="submit" class="btn btn-authorization-allow" name="allow" value="Authorize"/>{% trans "Allow" %}</button>
|
||||
</div>
|
||||
</div>
|
||||
</form>
|
||||
|
||||
{% else %}
|
||||
<h2>{% trans "Error" %}: {{ error.error }}</h2>
|
||||
<p>{{ error.description }}</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
</section>
|
||||
</main>
|
||||
{% endblock %}
|
||||
@@ -5,6 +5,8 @@ Override admin configuration for django-oauth-toolkit
|
||||
from django.contrib.admin import ModelAdmin, site
|
||||
from oauth2_provider import models
|
||||
|
||||
from .models import RestrictedApplication
|
||||
|
||||
|
||||
def reregister(model_class):
|
||||
"""
|
||||
@@ -71,3 +73,13 @@ class DOTGrantAdmin(ModelAdmin):
|
||||
list_filter = [u'application']
|
||||
raw_id_fields = [u'user']
|
||||
search_fields = [u'code', u'user__username']
|
||||
|
||||
|
||||
class RestrictedApplicationAdmin(ModelAdmin):
|
||||
"""
|
||||
ModelAdmin for the Restricted Application
|
||||
"""
|
||||
list_display = [u'application']
|
||||
|
||||
|
||||
site.register(RestrictedApplication, RestrictedApplicationAdmin)
|
||||
|
||||
@@ -3,10 +3,32 @@ Classes that override default django-oauth-toolkit behavior
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from .models import RestrictedApplication
|
||||
|
||||
from datetime import datetime
|
||||
from django.contrib.auth import authenticate, get_user_model
|
||||
from django.db.models.signals import pre_save
|
||||
from django.dispatch import receiver
|
||||
from pytz import utc
|
||||
|
||||
from oauth2_provider.models import AccessToken
|
||||
from oauth2_provider.oauth2_validators import OAuth2Validator
|
||||
|
||||
|
||||
@receiver(pre_save, sender=AccessToken)
|
||||
def on_access_token_presave(sender, instance, *args, **kwargs): # pylint: disable=unused-argument
|
||||
"""
|
||||
A hook on the AccessToken. Since we do not have protected scopes, we must mark all
|
||||
AccessTokens as expired for 'restricted applications'.
|
||||
|
||||
We do this as a pre-save hook on the ORM
|
||||
"""
|
||||
|
||||
is_application_restricted = RestrictedApplication.objects.filter(application=instance.application).exists()
|
||||
if is_application_restricted:
|
||||
RestrictedApplication.set_access_token_as_expired(instance)
|
||||
|
||||
|
||||
class EdxOAuth2Validator(OAuth2Validator):
|
||||
"""
|
||||
Validator class that implements edX-specific custom behavior:
|
||||
@@ -61,6 +83,23 @@ class EdxOAuth2Validator(OAuth2Validator):
|
||||
|
||||
super(EdxOAuth2Validator, self).save_bearer_token(token, request, *args, **kwargs)
|
||||
|
||||
is_application_restricted = RestrictedApplication.objects.filter(application=request.client).exists()
|
||||
if is_application_restricted:
|
||||
# Since RestrictedApplications will override the DOT defined expiry, so that access_tokens
|
||||
# are always expired, we need to re-read the token from the database and then calculate the
|
||||
# expires_in (in seconds) from what we stored in the database. This value should be a negative
|
||||
#value, meaning that it is already expired
|
||||
|
||||
access_token = AccessToken.objects.get(token=token['access_token'])
|
||||
utc_now = datetime.utcnow().replace(tzinfo=utc)
|
||||
expires_in = (access_token.expires - utc_now).total_seconds()
|
||||
|
||||
# assert that RestriectedApplications only issue expired tokens
|
||||
# blow up processing if we see otherwise
|
||||
assert expires_in < 0
|
||||
|
||||
token['expires_in'] = expires_in
|
||||
|
||||
# Restore the original request attributes
|
||||
request.grant_type = grant_type
|
||||
request.user = user
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_APPLICATION_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='RestrictedApplication',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('application', models.ForeignKey(to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
|
||||
],
|
||||
),
|
||||
]
|
||||
45
openedx/core/djangoapps/oauth_dispatch/models.py
Normal file
45
openedx/core/djangoapps/oauth_dispatch/models.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""
|
||||
Specialized models for oauth_dispatch djangoapp
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from django.db import models
|
||||
from pytz import utc
|
||||
|
||||
from oauth2_provider.settings import oauth2_settings
|
||||
|
||||
|
||||
class RestrictedApplication(models.Model):
|
||||
"""
|
||||
This model lists which django-oauth-toolkit Applications are considered 'restricted'
|
||||
and thus have a limited ability to use various APIs.
|
||||
|
||||
A restricted Application will only get expired token/JWT payloads
|
||||
so that they cannot be used to call into APIs.
|
||||
"""
|
||||
|
||||
application = models.ForeignKey(oauth2_settings.APPLICATION_MODEL, null=False)
|
||||
|
||||
def __unicode__(self):
|
||||
"""
|
||||
Return a unicode representation of this object
|
||||
"""
|
||||
return u"<RestrictedApplication '{name}'>".format(
|
||||
name=self.application.name
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def set_access_token_as_expired(cls, access_token):
|
||||
"""
|
||||
For access_tokens for RestrictedApplications, put the expire timestamp into the beginning of the epoch
|
||||
which is Jan. 1, 1970
|
||||
"""
|
||||
access_token.expires = datetime(1970, 1, 1, tzinfo=utc)
|
||||
|
||||
@classmethod
|
||||
def verify_access_token_as_expired(cls, access_token):
|
||||
"""
|
||||
For access_tokens for RestrictedApplications, make sure that the expiry date
|
||||
is set at the beginning of the epoch which is Jan. 1, 1970
|
||||
"""
|
||||
return access_token.expires == datetime(1970, 1, 1, tzinfo=utc)
|
||||
@@ -3,3 +3,4 @@ Constants for testing purposes
|
||||
"""
|
||||
|
||||
DUMMY_REDIRECT_URL = u'https://example.com/edx/redirect'
|
||||
DUMMY_REDIRECT_URL2 = u'https://example.com/edx/other-redirect'
|
||||
|
||||
@@ -1,22 +1,26 @@
|
||||
"""
|
||||
OAuth Dispatch test mixins
|
||||
"""
|
||||
import jwt
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
import jwt
|
||||
from jwt.exceptions import ExpiredSignatureError
|
||||
|
||||
from student.models import UserProfile, anonymous_id_for_user
|
||||
|
||||
|
||||
class AccessTokenMixin(object):
|
||||
""" Mixin for tests dealing with OAuth 2 access tokens. """
|
||||
|
||||
def assert_valid_jwt_access_token(self, access_token, user, scopes=None):
|
||||
def assert_valid_jwt_access_token(self, access_token, user, scopes=None, should_be_expired=False):
|
||||
"""
|
||||
Verify the specified JWT access token is valid, and belongs to the specified user.
|
||||
|
||||
Args:
|
||||
access_token (str): JWT
|
||||
user (User): User whose information is contained in the JWT payload.
|
||||
(optional) should_be_expired: indicates if the passed in JWT token is expected to be expired
|
||||
|
||||
Returns:
|
||||
dict: Decoded JWT payload
|
||||
@@ -24,13 +28,27 @@ class AccessTokenMixin(object):
|
||||
scopes = scopes or []
|
||||
audience = settings.JWT_AUTH['JWT_AUDIENCE']
|
||||
issuer = settings.JWT_AUTH['JWT_ISSUER']
|
||||
payload = jwt.decode(
|
||||
access_token,
|
||||
settings.JWT_AUTH['JWT_SECRET_KEY'],
|
||||
algorithms=[settings.JWT_AUTH['JWT_ALGORITHM']],
|
||||
audience=audience,
|
||||
issuer=issuer
|
||||
)
|
||||
|
||||
def _decode_jwt(verify_expiration):
|
||||
"""
|
||||
Helper method to decode a JWT with the ability to
|
||||
verify the expiration of said token
|
||||
"""
|
||||
return jwt.decode(
|
||||
access_token,
|
||||
settings.JWT_AUTH['JWT_SECRET_KEY'],
|
||||
algorithms=[settings.JWT_AUTH['JWT_ALGORITHM']],
|
||||
audience=audience,
|
||||
issuer=issuer,
|
||||
verify_expiration=verify_expiration
|
||||
)
|
||||
|
||||
# Note that if we expect the claims to have expired
|
||||
# then we ask the JWT library not to verify expiration
|
||||
# as that would throw a ExpiredSignatureError and
|
||||
# halt other verifications steps. We'll do a manual
|
||||
# expiry verification later on
|
||||
payload = _decode_jwt(verify_expiration=not should_be_expired)
|
||||
|
||||
expected = {
|
||||
'aud': audience,
|
||||
@@ -54,4 +72,13 @@ class AccessTokenMixin(object):
|
||||
|
||||
self.assertDictContainsSubset(expected, payload)
|
||||
|
||||
# Since we suppressed checking of expiry
|
||||
# in the claim in the above check, because we want
|
||||
# to fully examine the claims outside of the expiry,
|
||||
# now we should assert that the claim is indeed
|
||||
# expired
|
||||
if should_be_expired:
|
||||
with self.assertRaises(ExpiredSignatureError):
|
||||
_decode_jwt(verify_expiration=True)
|
||||
|
||||
return payload
|
||||
|
||||
@@ -5,17 +5,21 @@ Tests for DOT Adapter
|
||||
from datetime import timedelta
|
||||
|
||||
import ddt
|
||||
from django.conf import settings
|
||||
from django.test import TestCase
|
||||
from django.utils.timezone import now
|
||||
from oauth2_provider import models
|
||||
import unittest
|
||||
|
||||
from student.tests.factories import UserFactory
|
||||
|
||||
from ..adapters import DOTAdapter
|
||||
from .constants import DUMMY_REDIRECT_URL
|
||||
from .constants import DUMMY_REDIRECT_URL, DUMMY_REDIRECT_URL2
|
||||
from ..models import RestrictedApplication
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
|
||||
class DOTAdapterTestCase(TestCase):
|
||||
"""
|
||||
Test class for DOTAdapter.
|
||||
@@ -38,6 +42,21 @@ class DOTAdapterTestCase(TestCase):
|
||||
redirect_uri=DUMMY_REDIRECT_URL,
|
||||
client_id='confidential-client-id',
|
||||
)
|
||||
self.restricted_client = self.adapter.create_confidential_client(
|
||||
name='restricted app',
|
||||
user=self.user,
|
||||
redirect_uri=DUMMY_REDIRECT_URL2,
|
||||
client_id='restricted-client-id',
|
||||
)
|
||||
self.restricted_app = RestrictedApplication.objects.create(application=self.restricted_client)
|
||||
|
||||
def test_restricted_app_unicode(self):
|
||||
"""
|
||||
Make sure unicode representation of RestrictedApplication is correct
|
||||
"""
|
||||
self.assertEqual(unicode(self.restricted_app), u"<RestrictedApplication '{name}'>".format(
|
||||
name=self.restricted_client.name
|
||||
))
|
||||
|
||||
@ddt.data(
|
||||
('confidential', models.Application.CLIENT_CONFIDENTIAL),
|
||||
@@ -51,7 +70,14 @@ class DOTAdapterTestCase(TestCase):
|
||||
self.assertEqual(client.client_type, client_type)
|
||||
|
||||
def test_get_client(self):
|
||||
client = self.adapter.get_client(client_type=models.Application.CLIENT_CONFIDENTIAL)
|
||||
"""
|
||||
Read back one of the confidential clients (there are two)
|
||||
and verify that we get back what we expected
|
||||
"""
|
||||
client = self.adapter.get_client(
|
||||
redirect_uris=DUMMY_REDIRECT_URL,
|
||||
client_type=models.Application.CLIENT_CONFIDENTIAL
|
||||
)
|
||||
self.assertIsInstance(client, models.Application)
|
||||
self.assertEqual(client.client_type, models.Application.CLIENT_CONFIDENTIAL)
|
||||
|
||||
@@ -74,3 +100,18 @@ class DOTAdapterTestCase(TestCase):
|
||||
expires=now() + timedelta(days=30),
|
||||
)
|
||||
self.assertEqual(self.adapter.get_access_token(token_string='token-id'), token)
|
||||
|
||||
def test_get_restricted_access_token(self):
|
||||
"""
|
||||
Make sure when generating an access_token for a restricted client
|
||||
that the token is immediately expired
|
||||
"""
|
||||
models.AccessToken.objects.create(
|
||||
token='expired-token-id',
|
||||
application=self.restricted_client,
|
||||
user=self.user,
|
||||
expires=now() + timedelta(days=30),
|
||||
)
|
||||
|
||||
readback_token = self.adapter.get_access_token(token_string='expired-token-id')
|
||||
self.assertTrue(RestrictedApplication.verify_access_token_as_expired(readback_token))
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
# pylint: disable=missing-docstring
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import TestCase
|
||||
from oauth2_provider.models import Application, AccessToken, RefreshToken
|
||||
import unittest
|
||||
|
||||
from openedx.core.djangoapps.oauth_dispatch.tests import factories
|
||||
from student.tests.factories import UserFactory
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
|
||||
class TestClientFactory(TestCase):
|
||||
def setUp(self):
|
||||
super(TestClientFactory, self).setUp()
|
||||
@@ -18,6 +21,7 @@ class TestClientFactory(TestCase):
|
||||
self.assertEqual(actual_application, expected_application)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
|
||||
class TestAccessTokenFactory(TestCase):
|
||||
def setUp(self):
|
||||
super(TestAccessTokenFactory, self).setUp()
|
||||
@@ -30,6 +34,7 @@ class TestAccessTokenFactory(TestCase):
|
||||
self.assertEqual(actual_access_token, expected_access_token)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
|
||||
class TestRefreshTokenFactory(TestCase):
|
||||
def setUp(self):
|
||||
super(TestRefreshTokenFactory, self).setUp()
|
||||
|
||||
@@ -9,6 +9,7 @@ from django.conf import settings
|
||||
from django.test import RequestFactory, TestCase
|
||||
from django.core.urlresolvers import reverse
|
||||
import httpretty
|
||||
from oauth2_provider import models as dot_models
|
||||
from provider import constants
|
||||
import unittest
|
||||
|
||||
@@ -16,10 +17,49 @@ from student.tests.factories import UserFactory
|
||||
from third_party_auth.tests.utils import ThirdPartyOAuthTestMixin, ThirdPartyOAuthTestMixinGoogle
|
||||
|
||||
from .constants import DUMMY_REDIRECT_URL
|
||||
from . import mixins
|
||||
from .. import adapters
|
||||
from .. import models
|
||||
|
||||
if settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"):
|
||||
from .. import views
|
||||
from . import mixins
|
||||
|
||||
|
||||
class AccessTokenLoginMixin(object):
|
||||
"""
|
||||
Shared helper class to assert proper access levels when using access_tokens
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Initialize mixin
|
||||
"""
|
||||
super(AccessTokenLoginMixin, self).setUp()
|
||||
self.login_with_access_token_url = reverse("login_with_access_token")
|
||||
|
||||
def login_with_access_token(self, access_token=None):
|
||||
"""
|
||||
Login with access token and return response.
|
||||
You can optionally send in an accss_token to override
|
||||
the object's attribute
|
||||
"""
|
||||
|
||||
return self.client.post(
|
||||
self.login_with_access_token_url,
|
||||
HTTP_AUTHORIZATION="Bearer {0}".format(access_token if access_token else self.access_token)
|
||||
)
|
||||
|
||||
def _assert_access_token_is_valid(self, access_token=None):
|
||||
"""
|
||||
Asserts that oauth assigned access_token is valid and usable
|
||||
"""
|
||||
self.assertEqual(self.login_with_access_token(access_token=access_token).status_code, 204)
|
||||
|
||||
def _assert_access_token_invalidated(self, access_token=None):
|
||||
"""
|
||||
Asserts that oauth assigned access_token is not valid
|
||||
"""
|
||||
self.assertEqual(self.login_with_access_token(access_token=access_token).status_code, 401)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
|
||||
@@ -48,6 +88,16 @@ class _DispatchingViewTestCase(TestCase):
|
||||
client_id='dop-app-client-id',
|
||||
)
|
||||
|
||||
# Create a "restricted" DOT Application which means any AccessToken/JWT
|
||||
# generated for this application will be immediately expired
|
||||
self.restricted_dot_app = self.dot_adapter.create_public_client(
|
||||
name='test restricted dot application',
|
||||
user=self.user,
|
||||
redirect_uri=DUMMY_REDIRECT_URL,
|
||||
client_id='dot-restricted-app-client-id',
|
||||
)
|
||||
models.RestrictedApplication.objects.create(application=self.restricted_dot_app)
|
||||
|
||||
def _post_request(self, user, client, token_type=None):
|
||||
"""
|
||||
Call the view with a POST request objectwith the appropriate format,
|
||||
@@ -63,14 +113,14 @@ class _DispatchingViewTestCase(TestCase):
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase):
|
||||
class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _DispatchingViewTestCase):
|
||||
"""
|
||||
Test class for AccessTokenView
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestAccessTokenView, self).setUp()
|
||||
self.url = reverse('access_token')
|
||||
self.view_class = views.AccessTokenView
|
||||
super(TestAccessTokenView, self).setUp()
|
||||
|
||||
def _post_body(self, user, client, token_type=None):
|
||||
"""
|
||||
@@ -99,6 +149,22 @@ class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase):
|
||||
self.assertIn('scope', data)
|
||||
self.assertIn('token_type', data)
|
||||
|
||||
def test_restricted_access_token_fields(self):
|
||||
response = self._post_request(self.user, self.restricted_dot_app)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.content)
|
||||
self.assertIn('access_token', data)
|
||||
self.assertIn('expires_in', data)
|
||||
self.assertIn('scope', data)
|
||||
self.assertIn('token_type', data)
|
||||
|
||||
# Restricted applications have immediately expired tokens
|
||||
self.assertLess(data['expires_in'], 0)
|
||||
|
||||
# double check that the token stored in the DB is marked as expired
|
||||
access_token = dot_models.AccessToken.objects.get(token=data['access_token'])
|
||||
self.assertTrue(models.RestrictedApplication.verify_access_token_as_expired(access_token))
|
||||
|
||||
@ddt.data('dop_app', 'dot_app')
|
||||
def test_jwt_access_token(self, client_attr):
|
||||
client = getattr(self, client_attr)
|
||||
@@ -109,6 +175,47 @@ class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase):
|
||||
self.assertEqual(data['token_type'], 'JWT')
|
||||
self.assert_valid_jwt_access_token(data['access_token'], self.user, data['scope'].split(' '))
|
||||
|
||||
def test_restricted_jwt_access_token(self):
|
||||
"""
|
||||
Verify that when requesting a JWT token from a restricted Application
|
||||
within the DOT subsystem, that our claims is marked as already expired
|
||||
(i.e. expiry set to Jan 1, 1970)
|
||||
"""
|
||||
response = self._post_request(self.user, self.restricted_dot_app, token_type='jwt')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.content)
|
||||
self.assertIn('expires_in', data)
|
||||
|
||||
# jwt must indicate that it is already expired
|
||||
self.assertLess(data['expires_in'], 0)
|
||||
self.assertEqual(data['token_type'], 'JWT')
|
||||
self.assert_valid_jwt_access_token(
|
||||
data['access_token'],
|
||||
self.user,
|
||||
data['scope'].split(' '),
|
||||
should_be_expired=True
|
||||
)
|
||||
|
||||
def test_restricted_access_token(self):
|
||||
"""
|
||||
Verify that an access_token generated for a RestrictedApplication fails when
|
||||
submitted to an API endpoint
|
||||
"""
|
||||
|
||||
response = self._post_request(self.user, self.restricted_dot_app)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = json.loads(response.content)
|
||||
|
||||
self.assertIn('expires_in', data)
|
||||
self.assertIn('access_token', data)
|
||||
|
||||
# the payload should indicate that the token is expired
|
||||
self.assertLess(data['expires_in'], 0)
|
||||
|
||||
# try submitting this expired access_token to an API,
|
||||
# and assert that it fails
|
||||
self._assert_access_token_invalidated(data['access_token'])
|
||||
|
||||
def test_dot_access_token_provides_refresh_token(self):
|
||||
response = self._post_request(self.user, self.dot_app)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -197,6 +304,47 @@ class TestAuthorizationView(_DispatchingViewTestCase):
|
||||
check_response = getattr(self, '_check_{}_response'.format(client_type))
|
||||
check_response(response)
|
||||
|
||||
def test_check_dot_authorization_page_get(self):
|
||||
"""
|
||||
Make sure we get the overridden Authorization page - not
|
||||
the default django-oauth-toolkit when we perform a page load
|
||||
"""
|
||||
self.client.login(username=self.user.username, password='test')
|
||||
response = self.client.get(
|
||||
'/oauth2/authorize/',
|
||||
{
|
||||
'client_id': self.dot_app.client_id,
|
||||
'response_type': 'code',
|
||||
'state': 'random_state_string',
|
||||
'redirect_uri': DUMMY_REDIRECT_URL,
|
||||
'scope': 'profile'
|
||||
},
|
||||
follow=True,
|
||||
)
|
||||
|
||||
# are the requested scopes on the page? We only requested 'profile', lets make
|
||||
# sure the page only lists that one
|
||||
self.assertContains(response, settings.OAUTH2_PROVIDER['SCOPES']['profile'])
|
||||
self.assertNotContains(response, settings.OAUTH2_PROVIDER['SCOPES']['read'])
|
||||
self.assertNotContains(response, settings.OAUTH2_PROVIDER['SCOPES']['write'])
|
||||
self.assertNotContains(response, settings.OAUTH2_PROVIDER['SCOPES']['email'])
|
||||
|
||||
# is the application name specified?
|
||||
self.assertContains(
|
||||
response,
|
||||
"Authorize {name}".format(name=self.dot_app.name)
|
||||
)
|
||||
|
||||
# are the cancel and allow buttons on the page?
|
||||
self.assertContains(
|
||||
response,
|
||||
'<button type="submit" class="btn btn-authorization-cancel" name="cancel"/>Cancel</button>'
|
||||
)
|
||||
self.assertContains(
|
||||
response,
|
||||
'<button type="submit" class="btn btn-authorization-allow" name="allow" value="Authorize"/>Allow</button>'
|
||||
)
|
||||
|
||||
def _check_dot_response(self, response):
|
||||
"""
|
||||
Check that django-oauth-toolkit gives an appropriate authorization response.
|
||||
@@ -327,12 +475,11 @@ class TestViewDispatch(TestCase):
|
||||
self.assertRaises(KeyError, view_object.get_view_for_backend, None)
|
||||
|
||||
|
||||
class TestRevokeTokenView(_DispatchingViewTestCase): # pylint: disable=abstract-method
|
||||
class TestRevokeTokenView(AccessTokenLoginMixin, _DispatchingViewTestCase): # pylint: disable=abstract-method
|
||||
"""
|
||||
Test class for RevokeTokenView
|
||||
"""
|
||||
def setUp(self):
|
||||
self.login_with_access_token_url = reverse("login_with_access_token")
|
||||
self.revoke_token_url = reverse('revoke_token')
|
||||
self.access_token_url = reverse('access_token')
|
||||
|
||||
@@ -374,27 +521,6 @@ class TestRevokeTokenView(_DispatchingViewTestCase): # pylint: disable=abstract
|
||||
'token': token,
|
||||
}
|
||||
|
||||
def login_with_access_token(self):
|
||||
"""
|
||||
Login with access token and return response
|
||||
"""
|
||||
return self.client.post(
|
||||
self.login_with_access_token_url,
|
||||
HTTP_AUTHORIZATION="Bearer {0}".format(self.access_token)
|
||||
)
|
||||
|
||||
def _assert_access_token_is_valid(self):
|
||||
"""
|
||||
Asserts that oauth assigned access_token is valid and usable
|
||||
"""
|
||||
self.assertEqual(self.login_with_access_token().status_code, 204)
|
||||
|
||||
def _assert_access_token_invalidated(self):
|
||||
"""
|
||||
Asserts that oauth assigned access_token is not valid
|
||||
"""
|
||||
self.assertEqual(self.login_with_access_token().status_code, 401)
|
||||
|
||||
def _assert_refresh_token_invalidated(self):
|
||||
"""
|
||||
Asserts that oauth assigned refresh_token is not valid
|
||||
|
||||
@@ -11,6 +11,7 @@ from collections import namedtuple
|
||||
|
||||
import ddt
|
||||
from datetime import datetime, timedelta
|
||||
from django.conf import settings
|
||||
from django.conf.urls import patterns, url, include
|
||||
from django.contrib.auth.models import User
|
||||
from django.http import HttpResponse
|
||||
@@ -26,6 +27,7 @@ from rest_framework.test import APIRequestFactory, APIClient
|
||||
from rest_framework.views import APIView
|
||||
from rest_framework_oauth import permissions
|
||||
from rest_framework_oauth.compat import oauth2_provider, oauth2_provider_scope
|
||||
import unittest
|
||||
|
||||
from openedx.core.djangoapps.oauth_dispatch import adapters
|
||||
from openedx.core.lib.api import authentication
|
||||
@@ -73,6 +75,7 @@ urlpatterns = patterns(
|
||||
|
||||
@attr(shard=2)
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
|
||||
class OAuth2Tests(TestCase):
|
||||
"""OAuth 2.0 authentication"""
|
||||
urls = 'openedx.core.lib.api.tests.test_authentication'
|
||||
|
||||
Reference in New Issue
Block a user