Remove DOP dispatching from oauth_dispatch.

https://openedx.atlassian.net/browse/BOM-1330
This commit is contained in:
jinder1s
2020-02-26 11:40:28 -05:00
committed by Diana Huang
parent fe22e77072
commit 295da79fe5
15 changed files with 117 additions and 889 deletions

View File

@@ -137,9 +137,6 @@ REQUIRE_DEBUG = DEBUG
########################### OAUTH2 #################################
OAUTH_OIDC_ISSUER = 'http://127.0.0.1:8000/oauth2'
ENABLE_DOP_ADAPTER = False
# pylint: disable=unicode-format-string
JWT_AUTH.update({
'JWT_SECRET_KEY': 'lms-secret',

View File

@@ -1,50 +0,0 @@
"""Pages relevant for OAuth2 confirmation."""
from bok_choy.page_object import PageObject
from common.test.acceptance.pages.lms import BASE_URL
class OAuth2Confirmation(PageObject):
"""Page for OAuth2 confirmation view."""
def __init__(self, browser, client_id="test-id", scopes=("email",)):
super(OAuth2Confirmation, self).__init__(browser)
self.client_id = client_id
self.scopes = scopes
@property
def url(self):
return "{}/oauth2/authorize?client_id={}&response_type=code&scope={}".format(
BASE_URL, self.client_id, ' '.join(self.scopes))
def is_browser_on_page(self):
return self.q(css="body.oauth2").visible
def cancel(self):
"""
Cancel the request.
This redirects to an invalid URI, because we don't want actual network
connections being made.
"""
self.q(css="input[name=cancel]").click()
def confirm(self):
"""
Confirm OAuth access
This redirects to an invalid URI, because we don't want actual network
connections being made.
"""
self.q(css="input[name=authorize]").click()
@property
def has_error(self):
"""Boolean for if the page has an error or not."""
return self.q(css=".error").present
@property
def error_message(self):
"""Text of the page's error message."""
return self.q(css='.error').text[0]

View File

@@ -1,110 +0,0 @@
# -*- coding: utf-8 -*-
"""Tests for OAuth2 permission delegation."""
from six.moves.urllib.parse import parse_qsl, urlparse # pylint: disable=import-error
from common.test.acceptance.pages.common.auto_auth import AutoAuthPage
from common.test.acceptance.pages.lms.oauth2_confirmation import OAuth2Confirmation
from common.test.acceptance.tests.helpers import AcceptanceTest
class OAuth2PermissionDelegationTests(AcceptanceTest):
"""
Tests for acceptance/denial of permission delegation requests.
"""
shard = 16
def setUp(self):
super(OAuth2PermissionDelegationTests, self).setUp()
self.oauth_page = OAuth2Confirmation(self.browser)
def _auth(self):
"""Authenticate the user."""
AutoAuthPage(self.browser).visit()
def _qs(self, url):
"""Parse url's querystring into a dict."""
return dict(parse_qsl(urlparse(url).query))
def test_error_for_invalid_scopes(self):
"""Requests for invalid scopes throw errors."""
self._auth()
self.oauth_page.scopes = ('email', 'does-not-exist')
assert self.oauth_page.visit()
self.assertTrue(self.oauth_page.has_error)
self.assertIn('not a valid scope', self.oauth_page.error_message)
def test_cancelling_redirects(self):
"""
If you cancel the request, you're redirected to the redirect_url with a
denied query param.
"""
self._auth()
assert self.oauth_page.visit()
self.oauth_page.cancel()
def check_redirect():
"""
Checks that the page correctly redirects to a url with a
denied query param.
"""
query = self._qs(self.browser.current_url)
return 'access_denied' in query['error']
def check_redirect_chrome():
"""
Similar to `check_redirect`, but, due to a bug in ChromeDriver,
we use `self.browser.title` here instead of `self.browser.current_url`
"""
query = self._qs(self.browser.title)
return 'access_denied' in query['error']
# This redirects to an invalid URI. For chrome verify title, current_url otherwise
if self.browser.name == 'chrome':
self.oauth_page.wait_for(check_redirect_chrome, 'redirected to invalid URL (chrome)')
else:
self.oauth_page.wait_for(check_redirect, 'redirected to invalid URL')
def test_accepting_redirects(self):
"""
If you accept the request, you're redirected to the redirect_url with
the correct query params.
"""
self._auth()
assert self.oauth_page.visit()
# This redirects to an invalid URI.
self.oauth_page.confirm()
self.oauth_page.wait_for_element_absence(
'input[name=authorize]', 'Authorization button is not present'
)
def check_query_string():
"""
Checks that 'code' appears in the browser's current url.
"""
query = self._qs(self.browser.current_url)
return 'code' in query
def check_query_string_chrome():
"""
Similar to check_query_string, but, due to a bug in ChromeDriver,
when chrome is on an invalid URI, `self.browser.current_url` outputs
"data:text/html,chromewebdata" instead of the current URI.
However, since the query string is present in the `title`, we use
that for chrome.
"""
query = self._qs(self.browser.title)
return 'code' in query
if self.browser.name == 'chrome':
self.oauth_page.wait_for(
check_query_string_chrome, 'redirected with correct query parameters (chrome)'
)
else:
self.oauth_page.wait_for(
check_query_string, 'redirected with correct query parameters'
)

View File

@@ -21,8 +21,6 @@ from django.urls import Resolver404, resolve, reverse
from django.utils.timezone import now
from oauth2_provider import models as dot_models
from opaque_keys.edx.keys import CourseKey
from provider.constants import CONFIDENTIAL
from provider.oauth2.models import Client, Grant
from rest_framework import status
from rest_framework.test import APITestCase
from six.moves import range, zip
@@ -40,7 +38,6 @@ from student.roles import CourseCcxCoachRole, CourseInstructorRole, CourseStaffR
from student.tests.factories import AdminFactory, UserFactory
USER_PASSWORD = 'test'
AUTH_ATTRS = ('auth', 'auth_header_oauth2_provider')
class CcxRestApiTest(CcxTestCase, APITestCase):
@@ -74,51 +71,17 @@ class CcxRestApiTest(CcxTestCase, APITestCase):
instructor = UserFactory()
allow_access(self.course, instructor, 'instructor')
# FIXME: Testing for multiple authentication types in multiple test cases is overkill. Stop it!
self.auth, self.auth_header_oauth2_provider = self.prepare_auth_token(app_user)
self.auth = self.prepare_auth_token(app_user)
self.course.enable_ccx = True
self.mstore.update_item(self.course, self.coach.id)
# making the master course chapters easily available
self.master_course_chapters = courses.get_course_chapter_ids(self.master_course_key)
def get_auth_token(self, app_grant, app_client):
"""
Helper method to get the oauth token
"""
token_data = {
'grant_type': 'authorization_code',
'code': app_grant.code,
'client_id': app_client.client_id,
'client_secret': app_client.client_secret
}
token_resp = self.client.post(reverse('oauth2:access_token'), data=token_data, format='multipart')
self.assertEqual(token_resp.status_code, status.HTTP_200_OK)
token_resp_json = json.loads(token_resp.content.decode('utf-8'))
return u'{token_type} {token}'.format(
token_type=token_resp_json['token_type'],
token=token_resp_json['access_token']
)
def prepare_auth_token(self, user):
"""
creates auth token for users
"""
# create an oauth client app entry
app_client = Client.objects.create(
user=user,
name='test client',
url='http://localhost//',
redirect_uri='http://localhost//',
client_type=CONFIDENTIAL
)
# create an authorization code
app_grant = Grant.objects.create(
user=user,
client=app_client,
redirect_uri='http://localhost//'
)
# create an oauth2 provider client app entry
app_client_oauth2_provider = dot_models.Application.objects.create(
name='test client 2',
@@ -137,9 +100,8 @@ class CcxRestApiTest(CcxTestCase, APITestCase):
)
auth_header_oauth2_provider = u"Bearer {0}".format(auth_oauth2_provider)
auth = self.get_auth_token(app_grant, app_client)
return auth, auth_header_oauth2_provider
return auth_header_oauth2_provider
def expect_error(self, http_code, error_code_str, resp_obj):
"""
@@ -186,8 +148,7 @@ class CcxListTest(CcxRestApiTest):
'?master_course_id={0}'.format(six.moves.urllib.parse.quote_plus(self.master_course_key_str))
)
@ddt.data(*AUTH_ATTRS)
def test_authorization(self, auth_attr):
def test_authorization(self):
"""
Test that only the right token is authorized
"""
@@ -203,7 +164,7 @@ class CcxListTest(CcxRestApiTest):
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_authorization_no_oauth_staff(self):
@@ -281,8 +242,7 @@ class CcxListTest(CcxRestApiTest):
resp = self.client.post(self.list_url, data, format='json')
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
@ddt.data(*AUTH_ATTRS)
def test_get_list_wrong_master_course(self, auth_attr):
def test_get_list_wrong_master_course(self):
"""
Test for various get requests with wrong master course string
"""
@@ -291,31 +251,30 @@ class CcxListTest(CcxRestApiTest):
with mock.patch(mock_class_str, autospec=True) as mocked_perm_class:
mocked_perm_class.return_value = True
# case with no master_course_id provided
resp = self.client.get(self.list_url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(self.list_url, {}, HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'master_course_id_not_provided', resp)
base_url = six.moves.urllib.parse.urljoin(self.list_url, '?master_course_id=')
# case with empty master_course_id
resp = self.client.get(base_url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(base_url, {}, HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'course_id_not_valid', resp)
# case with invalid master_course_id
url = '{0}invalid_master_course_str'.format(base_url)
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'course_id_not_valid', resp)
# case with inexistent master_course_id
url = '{0}course-v1%3Aorg_foo.0%2Bcourse_bar_0%2BRun_0'.format(base_url)
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_404_NOT_FOUND, 'course_id_does_not_exist', resp)
@ddt.data(*AUTH_ATTRS)
def test_get_list(self, auth_attr):
def test_get_list(self):
"""
Tests the API to get a list of CCX Courses
"""
# there are no CCX courses
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=self.auth)
self.assertIn('count', resp.data)
self.assertEqual(resp.data['count'], 0)
@@ -323,15 +282,14 @@ class CcxListTest(CcxRestApiTest):
num_ccx = 10
for _ in range(num_ccx):
self.make_ccx()
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertIn('count', resp.data)
self.assertEqual(resp.data['count'], num_ccx)
self.assertIn('results', resp.data)
self.assertEqual(len(resp.data['results']), num_ccx)
@ddt.data(*AUTH_ATTRS)
def test_get_sorted_list(self, auth_attr):
def test_get_sorted_list(self):
"""
Tests the API to get a sorted list of CCX Courses
"""
@@ -350,7 +308,7 @@ class CcxListTest(CcxRestApiTest):
# sort by display name
url = '{0}&order_by=display_name'.format(self.list_url_master_course)
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data['results']), num_ccx)
# the display_name should be sorted as "Title CCX x", "Title CCX y", "Title CCX z"
@@ -359,14 +317,13 @@ class CcxListTest(CcxRestApiTest):
# add sort order desc
url = '{0}&order_by=display_name&sort_order=desc'.format(self.list_url_master_course)
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=self.auth)
# the only thing I can check is that the display name is in alphabetically reversed order
# in the same way when the field has been updated above, so with the id asc
for num, ccx in enumerate(resp.data['results']):
self.assertEqual(title_str.format(string.ascii_lowercase[-(num + 1)]), ccx['display_name'])
@ddt.data(*AUTH_ATTRS)
def test_get_paginated_list(self, auth_attr):
def test_get_paginated_list(self):
"""
Tests the API to get a paginated list of CCX Courses
"""
@@ -377,7 +334,7 @@ class CcxListTest(CcxRestApiTest):
page_size = settings.REST_FRAMEWORK.get('PAGE_SIZE', 10)
num_pages = int(math.ceil(num_ccx / float(page_size)))
# get first page
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(self.list_url_master_course, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data['count'], num_ccx)
self.assertEqual(resp.data['num_pages'], num_pages)
@@ -388,7 +345,7 @@ class CcxListTest(CcxRestApiTest):
# get a page in the middle
url = '{0}&page=24'.format(self.list_url_master_course)
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data['count'], num_ccx)
self.assertEqual(resp.data['num_pages'], num_pages)
@@ -399,7 +356,7 @@ class CcxListTest(CcxRestApiTest):
# get last page
url = '{0}&page={1}'.format(self.list_url_master_course, num_pages)
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data['count'], num_ccx)
self.assertEqual(resp.data['num_pages'], num_pages)
@@ -410,7 +367,7 @@ class CcxListTest(CcxRestApiTest):
# last page + 1
url = '{0}&page={1}'.format(self.list_url_master_course, num_pages + 1)
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
@ddt.data(
@@ -418,65 +375,30 @@ class CcxListTest(CcxRestApiTest):
{},
status.HTTP_400_BAD_REQUEST,
'master_course_id_not_provided',
'auth_header_oauth2_provider'
),
(
{},
status.HTTP_400_BAD_REQUEST,
'master_course_id_not_provided',
'auth'
),
(
{'master_course_id': None},
status.HTTP_400_BAD_REQUEST,
'master_course_id_not_provided',
'auth_header_oauth2_provider'
),
(
{'master_course_id': None},
status.HTTP_400_BAD_REQUEST,
'master_course_id_not_provided',
'auth'
),
(
{'master_course_id': ''},
status.HTTP_400_BAD_REQUEST,
'course_id_not_valid',
'auth_header_oauth2_provider'
),
(
{'master_course_id': ''},
status.HTTP_400_BAD_REQUEST,
'course_id_not_valid',
'auth'
),
(
{'master_course_id': 'invalid_master_course_str'},
status.HTTP_400_BAD_REQUEST,
'course_id_not_valid',
'auth'
),
(
{'master_course_id': 'invalid_master_course_str'},
status.HTTP_400_BAD_REQUEST,
'course_id_not_valid',
'auth_header_oauth2_provider'
),
(
{'master_course_id': 'course-v1:org_foo.0+course_bar_0+Run_0'},
status.HTTP_404_NOT_FOUND,
'course_id_does_not_exist',
'auth'
),
(
{'master_course_id': 'course-v1:org_foo.0+course_bar_0+Run_0'},
status.HTTP_404_NOT_FOUND,
'course_id_does_not_exist',
'auth_header_oauth2_provider'
),
)
@ddt.unpack
def test_post_list_wrong_master_course(self, data, expected_http_error, expected_error_string, auth_attr):
def test_post_list_wrong_master_course(self, data, expected_http_error, expected_error_string):
"""
Test for various post requests with wrong master course string
"""
@@ -485,11 +407,10 @@ class CcxListTest(CcxRestApiTest):
with mock.patch(mock_class_str, autospec=True) as mocked_perm_class:
mocked_perm_class.return_value = True
# case with no master_course_id provided
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(expected_http_error, expected_error_string, resp)
@ddt.data(*AUTH_ATTRS)
def test_post_list_wrong_master_course_special_cases(self, auth_attr):
def test_post_list_wrong_master_course_special_cases(self):
"""
Same as test_post_list_wrong_master_course,
but different ways to test the wrong master_course_id
@@ -499,7 +420,7 @@ class CcxListTest(CcxRestApiTest):
self.mstore.update_item(self.course, self.coach.id)
data = {'master_course_id': self.master_course_key_str}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_403_FORBIDDEN, 'ccx_not_enabled_for_master_course', resp)
self.course.enable_ccx = True
self.mstore.update_item(self.course, self.coach.id)
@@ -507,7 +428,7 @@ class CcxListTest(CcxRestApiTest):
# case with deprecated master_course_id
with mock.patch('lms.djangoapps.courseware.courses.get_course_by_id', autospec=True) as mocked:
mocked.return_value.id.deprecated = True
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'deprecated_master_course_id', resp)
@@ -519,16 +440,6 @@ class CcxListTest(CcxRestApiTest):
'display_name': 'missing_field_display_name',
'coach_email': 'missing_field_coach_email'
},
'auth'
),
(
{},
{
'max_students_allowed': 'missing_field_max_students_allowed',
'display_name': 'missing_field_display_name',
'coach_email': 'missing_field_coach_email'
},
'auth_header_oauth2_provider'
),
(
{
@@ -538,17 +449,6 @@ class CcxListTest(CcxRestApiTest):
{
'coach_email': 'missing_field_coach_email'
},
'auth'
),
(
{
'max_students_allowed': 10,
'display_name': 'CCX Title'
},
{
'coach_email': 'missing_field_coach_email'
},
'auth_header_oauth2_provider'
),
(
{
@@ -561,20 +461,6 @@ class CcxListTest(CcxRestApiTest):
'display_name': 'null_field_display_name',
'coach_email': 'null_field_coach_email'
},
'auth'
),
(
{
'max_students_allowed': None,
'display_name': None,
'coach_email': None
},
{
'max_students_allowed': 'null_field_max_students_allowed',
'display_name': 'null_field_display_name',
'coach_email': 'null_field_coach_email'
},
'auth_header_oauth2_provider'
),
(
{
@@ -583,16 +469,6 @@ class CcxListTest(CcxRestApiTest):
'coach_email': 'this is not an email@test.com'
},
{'coach_email': 'invalid_coach_email'},
'auth'
),
(
{
'max_students_allowed': 10,
'display_name': 'CCX Title',
'coach_email': 'this is not an email@test.com'
},
{'coach_email': 'invalid_coach_email'},
'auth_header_oauth2_provider'
),
(
{
@@ -601,16 +477,6 @@ class CcxListTest(CcxRestApiTest):
'coach_email': 'email@test.com'
},
{'display_name': 'invalid_display_name'},
'auth'
),
(
{
'max_students_allowed': 10,
'display_name': '',
'coach_email': 'email@test.com'
},
{'display_name': 'invalid_display_name'},
'auth_header_oauth2_provider'
),
(
{
@@ -619,16 +485,6 @@ class CcxListTest(CcxRestApiTest):
'coach_email': 'email@test.com'
},
{'max_students_allowed': 'invalid_max_students_allowed'},
'auth'
),
(
{
'max_students_allowed': 'a',
'display_name': 'CCX Title',
'coach_email': 'email@test.com'
},
{'max_students_allowed': 'invalid_max_students_allowed'},
'auth_header_oauth2_provider'
),
(
{
@@ -638,17 +494,6 @@ class CcxListTest(CcxRestApiTest):
'course_modules': {'foo': 'bar'}
},
{'course_modules': 'invalid_course_module_list'},
'auth'
),
(
{
'max_students_allowed': 10,
'display_name': 'CCX Title',
'coach_email': 'email@test.com',
'course_modules': {'foo': 'bar'}
},
{'course_modules': 'invalid_course_module_list'},
'auth_header_oauth2_provider'
),
(
{
@@ -658,17 +503,6 @@ class CcxListTest(CcxRestApiTest):
'course_modules': 'block-v1:org.0+course_0+Run_0+type@chapter+block@chapter_1'
},
{'course_modules': 'invalid_course_module_list'},
'auth'
),
(
{
'max_students_allowed': 10,
'display_name': 'CCX Title',
'coach_email': 'email@test.com',
'course_modules': 'block-v1:org.0+course_0+Run_0+type@chapter+block@chapter_1'
},
{'course_modules': 'invalid_course_module_list'},
'auth_header_oauth2_provider'
),
(
{
@@ -678,31 +512,19 @@ class CcxListTest(CcxRestApiTest):
'course_modules': ['foo', 'bar']
},
{'course_modules': 'invalid_course_module_keys'},
'auth'
),
(
{
'max_students_allowed': 10,
'display_name': 'CCX Title',
'coach_email': 'email@test.com',
'course_modules': ['foo', 'bar']
},
{'course_modules': 'invalid_course_module_keys'},
'auth_header_oauth2_provider'
),
)
)
@ddt.unpack
def test_post_list_wrong_input_data(self, data, expected_errors, auth_attr):
def test_post_list_wrong_input_data(self, data, expected_errors):
"""
Test for various post requests with wrong input data
"""
# add the master_course_key_str to the request data
data['master_course_id'] = self.master_course_key_str
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error_fields(expected_errors, resp)
@ddt.data(*AUTH_ATTRS)
def test_post_list_coach_does_not_exist(self, auth_attr):
def test_post_list_coach_does_not_exist(self):
"""
Specific test for the case when the input data is valid but the coach does not exist.
"""
@@ -712,11 +534,10 @@ class CcxListTest(CcxRestApiTest):
'display_name': 'CCX Title',
'coach_email': 'inexisting_email@test.com'
}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_404_NOT_FOUND, 'coach_user_does_not_exist', resp)
@ddt.data(*AUTH_ATTRS)
def test_post_list_wrong_modules(self, auth_attr):
def test_post_list_wrong_modules(self):
"""
Specific test for the case when the input data is valid but the
course modules do not belong to the master course
@@ -731,11 +552,10 @@ class CcxListTest(CcxRestApiTest):
'block-v1:org.0+course_0+Run_0+type@chapter+block@chapter_bar'
]
}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'course_module_list_not_belonging_to_master_course', resp)
@ddt.data(*AUTH_ATTRS)
def test_post_list_mixed_wrong_and_valid_modules(self, auth_attr):
def test_post_list_mixed_wrong_and_valid_modules(self):
"""
Specific test for the case when the input data is valid but some of
the course modules do not belong to the master course
@@ -748,11 +568,10 @@ class CcxListTest(CcxRestApiTest):
'coach_email': self.coach.email,
'course_modules': modules
}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'course_module_list_not_belonging_to_master_course', resp)
@ddt.data(*AUTH_ATTRS)
def test_post_list(self, auth_attr):
def test_post_list(self):
"""
Test the creation of a CCX
"""
@@ -764,7 +583,7 @@ class CcxListTest(CcxRestApiTest):
'coach_email': self.coach.email,
'course_modules': self.master_course_chapters[0:1]
}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_201_CREATED)
# check if the response has at least the same data of the request
for key, val in six.iteritems(data):
@@ -790,13 +609,10 @@ class CcxListTest(CcxRestApiTest):
self.assertIn(self.coach.email, outbox[0].recipients())
@ddt.data(
('auth', True),
('auth', False),
('auth_header_oauth2_provider', True),
('auth_header_oauth2_provider', False)
True,
False
)
@ddt.unpack
def test_post_list_on_active_state(self, auth_attr, user_is_active):
def test_post_list_on_active_state(self, user_is_active):
"""
Test the creation of a CCX on user's active states.
"""
@@ -810,15 +626,14 @@ class CcxListTest(CcxRestApiTest):
'coach_email': self.coach.email,
'course_modules': self.master_course_chapters[0:1]
}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
if not user_is_active:
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
else:
self.assertEqual(resp.status_code, status.HTTP_201_CREATED)
@ddt.data(*AUTH_ATTRS)
def test_post_list_duplicated_modules(self, auth_attr):
def test_post_list_duplicated_modules(self):
"""
Test the creation of a CCX, but with duplicated modules
"""
@@ -831,12 +646,11 @@ class CcxListTest(CcxRestApiTest):
'coach_email': self.coach.email,
'course_modules': duplicated_chapters
}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_201_CREATED)
self.assertEqual(resp.data.get('course_modules'), chapters)
@ddt.data(*AUTH_ATTRS)
def test_post_list_staff_master_course_in_ccx(self, auth_attr):
def test_post_list_staff_master_course_in_ccx(self):
"""
Specific test to check that the staff and instructor of the master
course are assigned to the CCX.
@@ -848,7 +662,7 @@ class CcxListTest(CcxRestApiTest):
'display_name': 'CCX Test Title',
'coach_email': self.coach.email
}
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.post(self.list_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_201_CREATED)
# check that only one email has been sent and it is to to the coach
self.assertEqual(len(outbox), 1)
@@ -929,8 +743,7 @@ class CcxDetailTest(CcxRestApiTest):
)
return ccx
@ddt.data(*AUTH_ATTRS)
def test_authorization(self, auth_attr):
def test_authorization(self):
"""
Test that only the right token is authorized
"""
@@ -946,7 +759,7 @@ class CcxDetailTest(CcxRestApiTest):
resp = self.client.get(self.detail_url, {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
resp = self.client.get(self.detail_url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(self.detail_url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_authorization_no_oauth_staff(self):
@@ -1037,15 +850,11 @@ class CcxDetailTest(CcxRestApiTest):
self.assertEqual(views.CCXDetailView.__module__, resolver.func.__module__)
@ddt.data(
('get', AUTH_ATTRS[0]),
('get', AUTH_ATTRS[1]),
('delete', AUTH_ATTRS[0]),
('delete', AUTH_ATTRS[1]),
('patch', AUTH_ATTRS[0]),
('patch', AUTH_ATTRS[1])
'get',
'delete',
'patch',
)
@ddt.unpack
def test_detail_wrong_ccx(self, http_method, auth_attr):
def test_detail_wrong_ccx(self, http_method):
"""
Test for different methods for detail of a ccx course.
All check the validity of the ccx course id
@@ -1056,46 +865,45 @@ class CcxDetailTest(CcxRestApiTest):
url = reverse('ccx_api:v0:ccx:detail', kwargs={'ccx_course_id': self.master_course_key_str})
# the permission class will give a 403 error because will not find the CCX
resp = client_request(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = client_request(url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
# bypassing the permission class we get another kind of error
with mock.patch(mock_class_str, autospec=True) as mocked_perm_class:
mocked_perm_class.return_value = True
resp = client_request(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = client_request(url, {}, HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'course_id_not_valid_ccx_id', resp)
# use an non existing ccx id
url = reverse('ccx_api:v0:ccx:detail', kwargs={'ccx_course_id': 'ccx-v1:foo.0+course_bar_0+Run_0+ccx@1'})
# the permission class will give a 403 error because will not find the CCX
resp = client_request(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = client_request(url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
# bypassing the permission class we get another kind of error
with mock.patch(mock_class_str, autospec=True) as mocked_perm_class:
mocked_perm_class.return_value = True
resp = client_request(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = client_request(url, {}, HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_404_NOT_FOUND, 'ccx_course_id_does_not_exist', resp)
# get a valid ccx key and add few 0s to get a non existing ccx for a valid course
ccx_key_str = '{0}000000'.format(self.ccx_key_str)
url = reverse('ccx_api:v0:ccx:detail', kwargs={'ccx_course_id': ccx_key_str})
# the permission class will give a 403 error because will not find the CCX
resp = client_request(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = client_request(url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
# bypassing the permission class we get another kind of error
with mock.patch(mock_class_str, autospec=True) as mocked_perm_class:
mocked_perm_class.return_value = True
resp = client_request(url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = client_request(url, {}, HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_404_NOT_FOUND, 'ccx_course_id_does_not_exist', resp)
@ddt.data(*AUTH_ATTRS)
def test_get_detail(self, auth_attr):
def test_get_detail(self):
"""
Test for getting detail of a ccx course
"""
resp = self.client.get(self.detail_url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.get(self.detail_url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(resp.data.get('ccx_course_id'), self.ccx_key_str)
self.assertEqual(resp.data.get('display_name'), self.ccx.display_name)
@@ -1107,15 +915,14 @@ class CcxDetailTest(CcxRestApiTest):
self.assertEqual(resp.data.get('master_course_id'), six.text_type(self.ccx.course_id))
six.assertCountEqual(self, resp.data.get('course_modules'), self.master_course_chapters)
@ddt.data(*AUTH_ATTRS)
def test_delete_detail(self, auth_attr):
def test_delete_detail(self):
"""
Test for deleting a ccx course
"""
# check that there are overrides
self.assertGreater(CcxFieldOverride.objects.filter(ccx=self.ccx).count(), 0)
self.assertGreater(CourseEnrollment.objects.filter(course_id=self.ccx_key).count(), 0)
resp = self.client.delete(self.detail_url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.delete(self.detail_url, {}, HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
self.assertIsNone(resp.data)
# the CCX does not exist any more
@@ -1125,15 +932,14 @@ class CcxDetailTest(CcxRestApiTest):
self.assertEqual(CcxFieldOverride.objects.filter(ccx=self.ccx).count(), 0)
self.assertEqual(CourseEnrollment.objects.filter(course_id=self.ccx_key).count(), 0)
@ddt.data(*AUTH_ATTRS)
def test_patch_detail_change_master_course(self, auth_attr):
def test_patch_detail_change_master_course(self):
"""
Test to patch a ccx course to change a master course
"""
data = {
'master_course_id': 'changed_course_id'
}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_403_FORBIDDEN, 'master_course_id_change_not_allowed', resp)
@ddt.data(
@@ -1148,94 +954,41 @@ class CcxDetailTest(CcxRestApiTest):
'display_name': 'null_field_display_name',
'coach_email': 'null_field_coach_email'
},
AUTH_ATTRS[0]
),
(
{
'max_students_allowed': None,
'display_name': None,
'coach_email': None
},
{
'max_students_allowed': 'null_field_max_students_allowed',
'display_name': 'null_field_display_name',
'coach_email': 'null_field_coach_email'
},
AUTH_ATTRS[1]
),
(
{'coach_email': 'this is not an email@test.com'},
{'coach_email': 'invalid_coach_email'},
AUTH_ATTRS[0]
),
(
{'coach_email': 'this is not an email@test.com'},
{'coach_email': 'invalid_coach_email'},
AUTH_ATTRS[1]
),
(
{'display_name': ''},
{'display_name': 'invalid_display_name'},
AUTH_ATTRS[0]
),
(
{'display_name': ''},
{'display_name': 'invalid_display_name'},
AUTH_ATTRS[1]
),
(
{'max_students_allowed': 'a'},
{'max_students_allowed': 'invalid_max_students_allowed'},
AUTH_ATTRS[0]
),
(
{'max_students_allowed': 'a'},
{'max_students_allowed': 'invalid_max_students_allowed'},
AUTH_ATTRS[1]
),
(
{'course_modules': {'foo': 'bar'}},
{'course_modules': 'invalid_course_module_list'},
AUTH_ATTRS[0]
),
(
{'course_modules': {'foo': 'bar'}},
{'course_modules': 'invalid_course_module_list'},
AUTH_ATTRS[1]
),
(
{'course_modules': 'block-v1:org.0+course_0+Run_0+type@chapter+block@chapter_1'},
{'course_modules': 'invalid_course_module_list'},
AUTH_ATTRS[0]
),
(
{'course_modules': 'block-v1:org.0+course_0+Run_0+type@chapter+block@chapter_1'},
{'course_modules': 'invalid_course_module_list'},
AUTH_ATTRS[1]
),
(
{'course_modules': ['foo', 'bar']},
{'course_modules': 'invalid_course_module_keys'},
AUTH_ATTRS[0]
),
(
{'course_modules': ['foo', 'bar']},
{'course_modules': 'invalid_course_module_keys'},
AUTH_ATTRS[1]
),
)
@ddt.unpack
def test_patch_detail_wrong_input_data(self, data, expected_errors, auth_attr):
def test_patch_detail_wrong_input_data(self, data, expected_errors):
"""
Test for different wrong inputs for the patch method
"""
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error_fields(expected_errors, resp)
@ddt.data(*AUTH_ATTRS)
def test_empty_patch(self, auth_attr):
def test_empty_patch(self):
"""
An empty patch does not modify anything
"""
@@ -1243,7 +996,7 @@ class CcxDetailTest(CcxRestApiTest):
max_students_allowed = self.ccx.max_student_enrollments_allowed
coach_email = self.ccx.coach.email # pylint: disable=no-member
ccx_structure = self.ccx.structure
resp = self.client.patch(self.detail_url, {}, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, {}, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
ccx = CustomCourseForEdX.objects.get(id=self.ccx.id)
self.assertEqual(display_name, ccx.display_name)
@@ -1251,8 +1004,7 @@ class CcxDetailTest(CcxRestApiTest):
self.assertEqual(coach_email, ccx.coach.email)
self.assertEqual(ccx_structure, ccx.structure)
@ddt.data(*AUTH_ATTRS)
def test_patch_detail_coach_does_not_exist(self, auth_attr):
def test_patch_detail_coach_does_not_exist(self):
"""
Specific test for the case when the input data is valid but the coach does not exist.
"""
@@ -1261,11 +1013,10 @@ class CcxDetailTest(CcxRestApiTest):
'display_name': 'CCX Title',
'coach_email': 'inexisting_email@test.com'
}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_404_NOT_FOUND, 'coach_user_does_not_exist', resp)
@ddt.data(*AUTH_ATTRS)
def test_patch_detail_wrong_modules(self, auth_attr):
def test_patch_detail_wrong_modules(self):
"""
Specific test for the case when the input data is valid but the
course modules do not belong to the master course
@@ -1276,11 +1027,10 @@ class CcxDetailTest(CcxRestApiTest):
'block-v1:org.0+course_0+Run_0+type@chapter+block@chapter_bar'
]
}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'course_module_list_not_belonging_to_master_course', resp)
@ddt.data(*AUTH_ATTRS)
def test_patch_detail_mixed_wrong_and_valid_modules(self, auth_attr):
def test_patch_detail_mixed_wrong_and_valid_modules(self):
"""
Specific test for the case when the input data is valid but some of
the course modules do not belong to the master course
@@ -1289,11 +1039,10 @@ class CcxDetailTest(CcxRestApiTest):
data = {
'course_modules': modules
}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.expect_error(status.HTTP_400_BAD_REQUEST, 'course_module_list_not_belonging_to_master_course', resp)
@ddt.data(*AUTH_ATTRS)
def test_patch_detail(self, auth_attr):
def test_patch_detail(self):
"""
Test for successful patch
"""
@@ -1305,7 +1054,7 @@ class CcxDetailTest(CcxRestApiTest):
'display_name': 'CCX Title',
'coach_email': new_coach.email
}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
ccx_from_db = CustomCourseForEdX.objects.get(id=self.ccx.id)
self.assertEqual(ccx_from_db.max_student_enrollments_allowed, data['max_students_allowed'])
@@ -1323,50 +1072,46 @@ class CcxDetailTest(CcxRestApiTest):
self.assertEqual(len(outbox), 1)
self.assertIn(new_coach.email, outbox[0].recipients())
@ddt.data(*AUTH_ATTRS)
def test_patch_detail_modules(self, auth_attr):
def test_patch_detail_modules(self):
"""
Specific test for successful patch of the course modules
"""
data = {'course_modules': self.master_course_chapters[0:1]}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
ccx_from_db = CustomCourseForEdX.objects.get(id=self.ccx.id)
six.assertCountEqual(self, ccx_from_db.structure, data['course_modules'])
data = {'course_modules': []}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
ccx_from_db = CustomCourseForEdX.objects.get(id=self.ccx.id)
six.assertCountEqual(self, ccx_from_db.structure, [])
data = {'course_modules': self.master_course_chapters}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
ccx_from_db = CustomCourseForEdX.objects.get(id=self.ccx.id)
six.assertCountEqual(self, ccx_from_db.structure, self.master_course_chapters)
data = {'course_modules': None}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
ccx_from_db = CustomCourseForEdX.objects.get(id=self.ccx.id)
self.assertEqual(ccx_from_db.structure, None)
chapters = self.master_course_chapters[0:1]
data = {'course_modules': chapters * 3}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
ccx_from_db = CustomCourseForEdX.objects.get(id=self.ccx.id)
six.assertCountEqual(self, ccx_from_db.structure, chapters)
@ddt.data(
('auth', True),
('auth', False),
('auth_header_oauth2_provider', True),
('auth_header_oauth2_provider', False)
True,
False
)
@ddt.unpack
def test_patch_user_on_active_state(self, auth_attr, user_is_active):
def test_patch_user_on_active_state(self, user_is_active):
"""
Test patch ccx course on user's active state.
"""
@@ -1375,7 +1120,7 @@ class CcxDetailTest(CcxRestApiTest):
chapters = self.master_course_chapters[0:1]
data = {'course_modules': chapters * 3}
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.patch(self.detail_url, data, format='json', HTTP_AUTHORIZATION=self.auth)
if not user_is_active:
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
else:
@@ -1384,13 +1129,10 @@ class CcxDetailTest(CcxRestApiTest):
six.assertCountEqual(self, ccx_from_db.structure, chapters)
@ddt.data(
('auth', True),
('auth', False),
('auth_header_oauth2_provider', True),
('auth_header_oauth2_provider', False)
True,
False
)
@ddt.unpack
def test_delete_detail_on_active_state(self, auth_attr, user_is_active):
def test_delete_detail_on_active_state(self, user_is_active):
"""
Test for deleting a ccx course on user's active state.
"""
@@ -1400,7 +1142,7 @@ class CcxDetailTest(CcxRestApiTest):
# check that there are overrides
self.assertGreater(CcxFieldOverride.objects.filter(ccx=self.ccx).count(), 0)
self.assertGreater(CourseEnrollment.objects.filter(course_id=self.ccx_key).count(), 0)
resp = self.client.delete(self.detail_url, {}, HTTP_AUTHORIZATION=getattr(self, auth_attr))
resp = self.client.delete(self.detail_url, {}, HTTP_AUTHORIZATION=self.auth)
if not user_is_active:
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)

View File

@@ -616,20 +616,6 @@ OAUTH_ENFORCE_SECURE = True
OAUTH_EXPIRE_CONFIDENTIAL_CLIENT_DAYS = 365
OAUTH_EXPIRE_PUBLIC_CLIENT_DAYS = 30
# .. toggle_name: ENABLE_DOP_ADAPTER
# .. toggle_implementation: DjangoSetting
# .. toggle_default: True
# .. toggle_description: A switch toggle for controlling whether or not we allow usage of the DOP OAuth adapter with the goal of removing the DOP adapter once we're confident it won't be used.
# .. toggle_category: n/a
# .. toggle_use_cases: incremental_release
# .. toggle_creation_date: 2020-02-06
# .. toggle_expiration_date: 2020-02-29
# .. toggle_warnings: None
# .. toggle_tickets: BOM-1160
# .. toggle_status: supported
ENABLE_DOP_ADAPTER = True
################################## THIRD_PARTY_AUTH CONFIGURATION #############################
TPA_PROVIDER_BURST_THROTTLE = '10/min'
TPA_PROVIDER_SUSTAINED_THROTTLE = '50/hr'

View File

@@ -769,7 +769,6 @@ if FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
OAUTH_ID_TOKEN_EXPIRATION = ENV_TOKENS.get('OAUTH_ID_TOKEN_EXPIRATION', OAUTH_ID_TOKEN_EXPIRATION)
OAUTH_DELETE_EXPIRED = ENV_TOKENS.get('OAUTH_DELETE_EXPIRED', OAUTH_DELETE_EXPIRED)
ENABLE_DOP_ADAPTER = ENV_TOKENS.get('ENABLE_DOP_ADAPTER', ENABLE_DOP_ADAPTER)
##### GOOGLE ANALYTICS IDS #####
GOOGLE_ANALYTICS_ACCOUNT = AUTH_TOKENS.get('GOOGLE_ANALYTICS_ACCOUNT')

View File

@@ -14,45 +14,6 @@ from openedx.core.djangoapps.oauth_dispatch.tests.constants import DUMMY_REDIREC
from ..views import DOTAccessTokenExchangeView
class DOPAdapterMixin(object):
"""
Mixin to rewire existing tests to use django-oauth2-provider (DOP) backend
Overwrites self.client_id, self.access_token, self.oauth2_adapter
"""
client_id = 'dop_test_client_id'
access_token = 'dop_test_access_token'
oauth2_adapter = adapters.DOPAdapter()
def create_public_client(self, user, client_id=None):
"""
Create an oauth client application that is public.
"""
return self.oauth2_adapter.create_public_client(
name='Test Public Client',
user=user,
client_id=client_id,
redirect_uri=DUMMY_REDIRECT_URL,
)
def create_confidential_client(self, user, client_id=None):
"""
Create an oauth client application that is confidential.
"""
return self.oauth2_adapter.create_confidential_client(
name='Test Confidential Client',
user=user,
client_id=client_id,
redirect_uri=DUMMY_REDIRECT_URL,
)
def get_token_response_keys(self):
"""
Return the set of keys provided when requesting an access token
"""
return {'access_token', 'token_type', 'expires_in', 'scope'}
class DOTAdapterMixin(object):
"""
Mixin to rewire existing tests to use django-oauth-toolkit (DOT) backend

View File

@@ -3,5 +3,4 @@ Adapters to provide a common interface to django-oauth2-provider (DOP) and
django-oauth-toolkit (DOT).
"""
from .dop import DOPAdapter
from .dot import DOTAdapter

View File

@@ -1,89 +0,0 @@
"""
Adapter to isolate django-oauth2-provider dependencies
"""
from provider import constants, scope
from provider.oauth2 import models
class DOPAdapter(object):
"""
Standard interface for working with django-oauth2-provider
"""
backend = object()
def create_confidential_client(self, name, user, redirect_uri, client_id=None):
"""
Create an oauth client application that is confidential.
"""
return models.Client.objects.create(
name=name,
user=user,
client_id=client_id,
redirect_uri=redirect_uri,
client_type=constants.CONFIDENTIAL,
)
def create_public_client(self, name, user, redirect_uri, client_id=None):
"""
Create an oauth client application that is public.
"""
return models.Client.objects.create(
name=name,
user=user,
client_id=client_id,
redirect_uri=redirect_uri,
client_type=constants.PUBLIC,
)
def get_client(self, **filters):
"""
Get the oauth client application with the specified filters.
Wraps django's queryset.get() method.
"""
return models.Client.objects.get(**filters)
def get_client_for_token(self, token):
"""
Given an AccessToken object, return the associated client application.
"""
return token.client
def get_access_token(self, token_string):
"""
Given a token string, return the matching AccessToken object.
"""
return models.AccessToken.objects.get(token=token_string)
def create_access_token_for_test(self, token_string, client, user, expires):
"""
Returns a new AccessToken object created from the given arguments.
This method is currently used only by tests.
"""
return models.AccessToken.objects.create(
token=token_string,
client=client,
user=user,
expires=expires,
)
def get_token_scope_names(self, token):
"""
Given an access token object, return its scopes.
"""
return scope.to_names(token.scope)
def is_client_restricted(self, client): # pylint: disable=unused-argument
"""
Returns true if the client is set up as a RestrictedApplication.
"""
return False
def get_authorization_filters(self, client): # pylint: disable=unused-argument
"""
Get the authorization filters for the given client application.
"""
return []

View File

@@ -5,16 +5,12 @@ from oauth2_provider.models import AccessToken as dot_access_token
from oauth2_provider.models import RefreshToken as dot_refresh_token
from oauth2_provider.settings import oauth2_settings as dot_settings
from oauthlib.oauth2.rfc6749.tokens import BearerToken
from provider.oauth2.models import AccessToken as dop_access_token
from provider.oauth2.models import RefreshToken as dop_refresh_token
def destroy_oauth_tokens(user):
"""
Destroys ALL OAuth access and refresh tokens for the given user.
"""
dop_access_token.objects.filter(user=user.id).delete()
dop_refresh_token.objects.filter(user=user.id).delete()
dot_access_token.objects.filter(user=user.id).delete()
dot_refresh_token.objects.filter(user=user.id).delete()

View File

@@ -1,78 +0,0 @@
"""
Tests for DOP Adapter
"""
from datetime import timedelta
import ddt
from django.test import TestCase
from django.utils.timezone import now
from provider import constants
from provider.oauth2 import models
from student.tests.factories import UserFactory
from ..adapters import DOPAdapter
from .constants import DUMMY_REDIRECT_URL
@ddt.ddt
class DOPAdapterTestCase(TestCase):
"""
Test class for DOPAdapter.
"""
adapter = DOPAdapter()
def setUp(self):
super(DOPAdapterTestCase, self).setUp()
self.user = UserFactory()
self.public_client = self.adapter.create_public_client(
name='public client',
user=self.user,
redirect_uri=DUMMY_REDIRECT_URL,
client_id='public-client-id',
)
self.confidential_client = self.adapter.create_confidential_client(
name='confidential client',
user=self.user,
redirect_uri=DUMMY_REDIRECT_URL,
client_id='confidential-client-id',
)
@ddt.data(
('confidential', constants.CONFIDENTIAL),
('public', constants.PUBLIC),
)
@ddt.unpack
def test_create_client(self, client_name, client_type):
client = getattr(self, '{}_client'.format(client_name))
self.assertIsInstance(client, models.Client)
self.assertEqual(client.client_id, '{}-client-id'.format(client_name))
self.assertEqual(client.client_type, client_type)
def test_get_client(self):
client = self.adapter.get_client(client_type=constants.CONFIDENTIAL)
self.assertIsInstance(client, models.Client)
self.assertEqual(client.client_type, constants.CONFIDENTIAL)
def test_get_client_not_found(self):
with self.assertRaises(models.Client.DoesNotExist):
self.adapter.get_client(client_id='not-found')
def test_get_client_for_token(self):
token = models.AccessToken(
user=self.user,
client=self.public_client,
)
self.assertEqual(self.adapter.get_client_for_token(token), self.public_client)
def test_get_access_token(self):
token = self.adapter.create_access_token_for_test(
'token-id',
client=self.public_client,
user=self.user,
expires=now() + timedelta(days=30),
)
self.assertEqual(self.adapter.get_access_token(token_string='token-id'), token)

View File

@@ -10,7 +10,7 @@ from django.utils.timezone import now
from mock import patch
from openedx.core.djangoapps.oauth_dispatch import jwt as jwt_api
from openedx.core.djangoapps.oauth_dispatch.adapters import DOPAdapter, DOTAdapter
from openedx.core.djangoapps.oauth_dispatch.adapters import DOTAdapter
from openedx.core.djangoapps.oauth_dispatch.models import RestrictedApplication
from openedx.core.djangoapps.oauth_dispatch.tests.mixins import AccessTokenMixin
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
@@ -61,9 +61,8 @@ class TestCreateJWTs(AccessTokenMixin, TestCase):
jwt_token, self.user, self.default_scopes, should_be_asymmetric_key=should_be_asymmetric_key,
)
@ddt.data(DOPAdapter, DOPAdapter)
def test_create_jwt_for_token(self, oauth_adapter_cls):
oauth_adapter = oauth_adapter_cls()
def test_create_jwt_for_token(self):
oauth_adapter = DOTAdapter()
jwt_token = self._create_jwt_for_token(oauth_adapter, use_asymmetric_key=False)
self._assert_jwt_is_valid(jwt_token, should_be_asymmetric_key=False)

View File

@@ -15,7 +15,6 @@ from django.urls import reverse
from jwkest import jwk
from mock import call, patch
from oauth2_provider import models as dot_models
from provider import constants
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from student.tests.factories import UserFactory
@@ -86,7 +85,6 @@ class _DispatchingViewTestCase(TestCase):
"""
def setUp(self):
super(_DispatchingViewTestCase, self).setUp()
self.dop_adapter = adapters.DOPAdapter()
self.dot_adapter = adapters.DOTAdapter()
self.user = UserFactory()
self.dot_app = self.dot_adapter.create_public_client(
@@ -95,12 +93,6 @@ class _DispatchingViewTestCase(TestCase):
redirect_uri=DUMMY_REDIRECT_URL,
client_id='dot-app-client-id',
)
self.dop_app = self.dop_adapter.create_public_client(
name='test dop client',
user=self.user,
redirect_uri=DUMMY_REDIRECT_URL,
client_id='dop-app-client-id',
)
self.dot_app_access = models.ApplicationAccess.objects.create(
application=self.dot_app,
@@ -197,7 +189,7 @@ class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _Dispa
should_be_restricted=False,
)
@ddt.data('dop_app', 'dot_app')
@ddt.data('dot_app')
def test_access_token_fields(self, client_attr):
client = getattr(self, client_attr)
response = self._post_request(self.user, client)
@@ -227,15 +219,15 @@ class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _Dispa
True
)
@ddt.data('dop_app', 'dot_app')
@ddt.data('dot_app')
def test_jwt_access_token_from_parameter(self, client_attr):
self._test_jwt_access_token(client_attr, token_type='jwt')
@ddt.data('dop_app', 'dot_app')
@ddt.data('dot_app')
def test_jwt_access_token_from_header(self, client_attr):
self._test_jwt_access_token(client_attr, headers={'HTTP_X_TOKEN_TYPE': 'jwt'})
@ddt.data('dop_app', 'dot_app')
@ddt.data('dot_app')
def test_jwt_access_token_from_parameter_not_header(self, client_attr):
self._test_jwt_access_token(client_attr, token_type='jwt', headers={'HTTP_X_TOKEN_TYPE': 'invalid'})
@@ -261,7 +253,7 @@ class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _Dispa
'grant_type': grant_type.replace('-', '_'),
}
bad_response = self.client.post(self.url, invalid_body)
self.assertEqual(bad_response.status_code, 400)
self.assertEqual(bad_response.status_code, 401)
expected_calls = [
call('oauth_token_type', 'no_token_type_supplied'),
call('oauth_grant_type', 'password'),
@@ -322,12 +314,6 @@ class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _Dispa
data = json.loads(response.content.decode('utf-8'))
self.assertIn('refresh_token', data)
def test_dop_public_client_access_token(self):
response = self._post_request(self.user, self.dop_app)
self.assertEqual(response.status_code, 200)
data = json.loads(response.content.decode('utf-8'))
self.assertNotIn('refresh_token', data)
@ddt.data(dot_models.Application.GRANT_CLIENT_CREDENTIALS, dot_models.Application.GRANT_PASSWORD)
def test_jwt_access_token_scopes_and_filters(self, grant_type):
"""
@@ -396,7 +382,6 @@ class TestAuthorizationView(_DispatchingViewTestCase):
def setUp(self):
super(TestAuthorizationView, self).setUp()
self.dop_adapter = adapters.DOPAdapter()
self.user = UserFactory()
self.dot_app = self.dot_adapter.create_confidential_client(
name='test dot application',
@@ -412,16 +397,10 @@ class TestAuthorizationView(_DispatchingViewTestCase):
'other_filter:filter_val',
]
)
self.dop_app = self.dop_adapter.create_confidential_client(
name='test dop client',
user=self.user,
redirect_uri=DUMMY_REDIRECT_URL,
client_id='confidential-dop-app-client-id',
)
@ddt.data(
('dop', 'authorize'),
('dot', 'allow')
('dot', 'allow'),
('dot', 'authorize')
)
@ddt.unpack
def test_post_authorization_view(self, client_type, allow_field):
@@ -506,23 +485,6 @@ class TestAuthorizationView(_DispatchingViewTestCase):
expected_redirect_prefix = u'{}?'.format(DUMMY_REDIRECT_URL)
self._assert_startswith(self._redirect_destination(response), expected_redirect_prefix)
def _check_dop_response(self, response):
"""
Check that django-oauth2-provider gives an appropriate authorization response.
"""
# django-oauth-provider redirects to a confirmation page
self.assertRedirects(response, u'/oauth2/authorize/confirm', target_status_code=200)
context = response.context_data
form = context['form']
self.assertIsNone(form['authorize'].value())
oauth_data = context['oauth_data']
self.assertEqual(oauth_data['redirect_uri'], DUMMY_REDIRECT_URL)
self.assertEqual(oauth_data['state'], 'random_state_string')
# TODO: figure out why it chooses this scope.
self.assertEqual(oauth_data['scope'], constants.READ_WRITE)
def _assert_startswith(self, string, prefix):
"""
Assert that the string starts with the specified prefix.
@@ -545,16 +507,9 @@ class TestViewDispatch(TestCase):
def setUp(self):
super(TestViewDispatch, self).setUp()
self.dop_adapter = adapters.DOPAdapter()
self.dot_adapter = adapters.DOTAdapter()
self.user = UserFactory()
self.view = views._DispatchingView() # pylint: disable=protected-access
self.dop_adapter.create_public_client(
name='',
user=self.user,
client_id='dop-id',
redirect_uri=DUMMY_REDIRECT_URL
)
self.dot_adapter.create_public_client(
name='',
user=self.user,
@@ -589,54 +544,26 @@ class TestViewDispatch(TestCase):
"""
return RequestFactory().get('/?client_id={}'.format(client_id))
def _verify_oauth_metrics_calls(self, mock_set_custom_metric, expected_oauth_adapter):
"""
Args:
mock_set_custom_metric: MagicMock of set_custom_metric
expected_oauth_adapter: Either 'dot' or 'dop'
"""
expected_calls = [
call('oauth_client_id', '{}-id'.format(expected_oauth_adapter)),
call('oauth_adapter', expected_oauth_adapter),
]
mock_set_custom_metric.assert_has_calls(expected_calls, any_order=True)
@patch('edx_django_utils.monitoring.set_custom_metric')
def test_dispatching_post_to_dot(self, mock_set_custom_metric):
def test_dispatching_post_to_dot(self):
request = self._post_request('dot-id')
self.assertEqual(self.view.select_backend(request), self.dot_adapter.backend)
self._verify_oauth_metrics_calls(mock_set_custom_metric, 'dot')
@patch('edx_django_utils.monitoring.set_custom_metric')
def test_dispatching_post_to_dop(self, mock_set_custom_metric):
request = self._post_request('dop-id')
self.assertEqual(self.view.select_backend(request), self.dop_adapter.backend)
self._verify_oauth_metrics_calls(mock_set_custom_metric, 'dop')
def test_dispatching_get_to_dot(self):
request = self._get_request('dot-id')
self.assertEqual(self.view.select_backend(request), self.dot_adapter.backend)
def test_dispatching_get_to_dop(self):
request = self._get_request('dop-id')
self.assertEqual(self.view.select_backend(request), self.dop_adapter.backend)
def test_dispatching_with_no_client(self):
request = self._post_request(None)
self.assertEqual(self.view.select_backend(request), self.dop_adapter.backend)
self.assertEqual(self.view.select_backend(request), self.dot_adapter.backend)
def test_dispatching_with_invalid_client(self):
request = self._post_request('abcesdfljh')
self.assertEqual(self.view.select_backend(request), self.dop_adapter.backend)
self.assertEqual(self.view.select_backend(request), self.dot_adapter.backend)
def test_get_view_for_dot(self):
view_object = views.AccessTokenView()
self.assert_is_view(view_object.get_view_for_backend(self.dot_adapter.backend))
def test_get_view_for_dop(self):
view_object = views.AccessTokenView()
self.assert_is_view(view_object.get_view_for_backend(self.dop_adapter.backend))
def test_get_view_for_no_backend(self):
view_object = views.AccessTokenView()
self.assertRaises(KeyError, view_object.get_view_for_backend, None)

View File

@@ -10,8 +10,6 @@ from django.conf import settings
from django.utils.decorators import method_decorator
from django.views.generic import View
from edx_django_utils import monitoring as monitoring_utils
from edx_oauth2_provider import views as dop_views # django-oauth2-provider views
from oauth2_provider import models as dot_models # django-oauth-toolkit
from oauth2_provider import views as dot_views
from ratelimit import ALL
from ratelimit.decorators import ratelimit
@@ -30,7 +28,6 @@ class _DispatchingView(View):
"""
dot_adapter = adapters.DOTAdapter()
dop_adapter = adapters.DOPAdapter()
def get_adapter(self, request):
"""
@@ -39,12 +36,7 @@ class _DispatchingView(View):
client_id = self._get_client_id(request)
monitoring_utils.set_custom_metric('oauth_client_id', client_id)
if dot_models.Application.objects.filter(client_id=client_id).exists() or not settings.ENABLE_DOP_ADAPTER:
monitoring_utils.set_custom_metric('oauth_adapter', 'dot')
return self.dot_adapter
else:
monitoring_utils.set_custom_metric('oauth_adapter', 'dop')
return self.dop_adapter
return self.dot_adapter
def dispatch(self, request, *args, **kwargs):
"""
@@ -69,11 +61,7 @@ class _DispatchingView(View):
Return the appropriate view from the requested backend.
"""
if backend == self.dot_adapter.backend:
monitoring_utils.set_custom_metric('oauth_view', 'dot')
return self.dot_view.as_view()
elif backend == self.dop_adapter.backend:
monitoring_utils.set_custom_metric('oauth_view', 'dop')
return self.dop_view.as_view()
else:
raise KeyError('Failed to dispatch view. Invalid backend {}'.format(backend))
@@ -98,7 +86,6 @@ class AccessTokenView(_DispatchingView):
Handle access token requests.
"""
dot_view = dot_views.TokenView
dop_view = dop_views.AccessTokenView
def dispatch(self, request, *args, **kwargs):
response = super(AccessTokenView, self).dispatch(request, *args, **kwargs)
@@ -128,7 +115,6 @@ class AuthorizationView(_DispatchingView):
"""
Part of the authorization flow.
"""
dop_view = dop_views.Capture
dot_view = dot_overrides_views.EdxOAuth2AuthorizationView
@@ -138,19 +124,6 @@ class AccessTokenExchangeView(_DispatchingView):
"""
dot_view = auth_exchange_views.DOTAccessTokenExchangeView
def get_view_for_backend(self, backend):
"""
Return the appropriate view from the requested backend.
Since AccessTokenExchangeView no longer supports dop, this function needed to
be overwritten from _DispatchingView, it was decided that the dop path should not be removed
from _DispatchingView due to it still being used in other views(AuthorizationView, AccessTokenView)
"""
if backend == self.dot_adapter.backend:
monitoring_utils.set_custom_metric('oauth_view', 'dot')
return self.dot_view.as_view()
else:
raise KeyError('Failed to dispatch view. Invalid backend {}'.format(backend))
class RevokeTokenView(_DispatchingView):
"""

View File

@@ -68,7 +68,6 @@ class OAuth2AllowInActiveUsersTests(TestCase):
def setUp(self):
super(OAuth2AllowInActiveUsersTests, self).setUp()
self.dop_adapter = adapters.DOPAdapter()
self.dot_adapter = adapters.DOTAdapter()
self.csrf_client = APIClient(enforce_csrf_checks=True)
self.username = 'john'
@@ -76,29 +75,6 @@ class OAuth2AllowInActiveUsersTests(TestCase):
self.password = 'password'
self.user = User.objects.create_user(self.username, self.email, self.password)
self.CLIENT_ID = 'client_key' # pylint: disable=invalid-name
self.CLIENT_SECRET = 'client_secret' # pylint: disable=invalid-name
self.ACCESS_TOKEN = 'access_token' # pylint: disable=invalid-name
self.REFRESH_TOKEN = 'refresh_token' # pylint: disable=invalid-name
self.dop_oauth2_client = self.dop_adapter.create_public_client(
name='example',
user=self.user,
client_id=self.CLIENT_ID,
redirect_uri='https://example.edx/redirect',
)
self.access_token = oauth2_provider.oauth2.models.AccessToken.objects.create(
token=self.ACCESS_TOKEN,
client=self.dop_oauth2_client,
user=self.user,
)
self.refresh_token = oauth2_provider.oauth2.models.RefreshToken.objects.create(
user=self.user,
access_token=self.access_token,
client=self.dop_oauth2_client,
)
self.dot_oauth2_client = self.dot_adapter.create_public_client(
name='example',
user=self.user,
@@ -111,6 +87,11 @@ class OAuth2AllowInActiveUsersTests(TestCase):
application=self.dot_oauth2_client,
expires=now() + timedelta(days=30),
)
self.dot_refresh_token = dot_models.RefreshToken.objects.create(
user=self.user,
token='dot-refresh-token',
application=self.dot_oauth2_client,
)
# This is the a change we've made from the django-rest-framework-oauth version
# of these tests.
@@ -125,6 +106,11 @@ class OAuth2AllowInActiveUsersTests(TestCase):
# edx-auth2-provider.
scope.SCOPE_NAME_DICT = {'read': constants.READ, 'write': constants.WRITE}
def _create_authorization_header(self, token=None):
if token is None:
token = self.dot_access_token.token
return "Bearer {0}".format(token)
def get_with_bearer_token(self, target_url, params=None, token=None):
"""
Make a GET request to the specified URL with an OAuth2 bearer token. If
@@ -151,11 +137,6 @@ class OAuth2AllowInActiveUsersTests(TestCase):
self.assertEqual(response.status_code, status_code)
self.assertEqual(response_dict['error_code'], error_code)
def _create_authorization_header(self, token=None):
if token is None:
token = self.access_token.token
return "Bearer {0}".format(token)
@ddt.data(None, {})
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_type_failing(self, params):
@@ -173,18 +154,13 @@ class OAuth2AllowInActiveUsersTests(TestCase):
# provided (yet).
self.assertNotIn('error_code', json.loads(response.content.decode('utf-8')))
def test_get_form_passing_auth(self):
"""Ensure GETing form over OAuth with correct client credentials succeed"""
response = self.get_with_bearer_token(self.OAUTH2_BASE_TESTING_URL)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_get_form_passing_auth_with_dot(self):
response = self.get_with_bearer_token(self.OAUTH2_BASE_TESTING_URL, token=self.dot_access_token.token)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_get_form_failing_auth_url_transport(self):
"""Ensure GETing form over OAuth with correct client credentials in query fails when DEBUG is False"""
query = urlencode({'access_token': self.access_token.token})
query = urlencode({'access_token': self.dot_access_token.token})
response = self.csrf_client.get(self.OAUTH2_BASE_TESTING_URL + '?%s' % query)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
@@ -198,7 +174,7 @@ class OAuth2AllowInActiveUsersTests(TestCase):
def test_post_form_token_removed_failing_auth(self):
"""Ensure POSTing when there is no OAuth access token in db fails"""
self.access_token.delete()
self.dot_access_token.delete()
response = self.post_with_bearer_token(self.OAUTH2_BASE_TESTING_URL)
self.check_error_codes(
response,
@@ -208,7 +184,7 @@ class OAuth2AllowInActiveUsersTests(TestCase):
def test_post_form_with_refresh_token_failing_auth(self):
"""Ensure POSTing with refresh token instead of access token fails"""
response = self.post_with_bearer_token(self.OAUTH2_BASE_TESTING_URL, token=self.refresh_token.token)
response = self.post_with_bearer_token(self.OAUTH2_BASE_TESTING_URL, token=self.dot_refresh_token.token)
self.check_error_codes(
response,
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -217,8 +193,8 @@ class OAuth2AllowInActiveUsersTests(TestCase):
def test_post_form_with_expired_access_token_failing_auth(self):
"""Ensure POSTing with expired access token fails with a 'token_expired' error"""
self.access_token.expires = now() - timedelta(seconds=10) # 10 seconds late
self.access_token.save()
self.dot_access_token.expires = now() - timedelta(seconds=10) # 10 seconds late
self.dot_access_token.save()
response = self.post_with_bearer_token(self.OAUTH2_BASE_TESTING_URL)
self.check_error_codes(
response,