BOM-1324: remove oauth2.enforce_jwt_scopes toggle

The oauth2.enforce_jwt_scopes waffle switch was added temporarily for
the rollout of jwt scopes. This removes the toggle and replacing code
with the equivalent of `oauth2.enforce_jwt_scopes` as True.
This commit is contained in:
Robert Raposa
2020-02-24 17:25:53 -05:00
parent 18e21f374e
commit 84686e81c3
11 changed files with 155 additions and 256 deletions

View File

@@ -19,7 +19,6 @@ from course_modes.api.v1.views import CourseModesView
from course_modes.models import CourseMode
from course_modes.tests.factories import CourseModeFactory
from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from openedx.core.djangoapps.user_authn.tests.utils import JWT_AUTH_TYPES, AuthAndScopesTestMixin, AuthType
from student.tests.factories import UserFactory
@@ -81,21 +80,19 @@ class CourseModesViewTestBase(AuthAndScopesTestMixin):
"""
pass
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced):
@ddt.data(JWT_AUTH_TYPES)
def test_jwt_on_behalf_of_user(self, auth_type):
"""
We have to override this super method due to this API
being restricted to staff users only.
"""
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True)
# include_me_filter=True means a JWT filter will require the username
# of the requesting user to be in the requested URL
url = self.get_url(self.student) + '?username={}'.format(self.student.username)
jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True)
# include_me_filter=True means a JWT filter will require the username
# of the requesting user to be in the requested URL
url = self.get_url(self.student) + '?username={}'.format(self.student.username)
resp = self.get_response(AuthType.jwt, token=jwt_token, url=url)
assert status.HTTP_200_OK == resp.status_code
resp = self.get_response(AuthType.jwt, token=jwt_token, url=url)
assert status.HTTP_200_OK == resp.status_code
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')

View File

@@ -113,32 +113,14 @@ class ThirdPartyAuthPermissionTest(TestCase):
self.assertEqual(response.status_code, 403)
@ddt.data(
# **** Unenforced ****
# unrestricted
dict(
is_enforced=False,
is_restricted=False,
expected_response=403,
),
# restricted
dict(
is_enforced=False,
is_restricted=True,
expected_response=403,
),
# **** Enforced ****
# unrestricted (for example, jwt cookies)
dict(
is_enforced=True,
is_restricted=False,
expected_response=403,
),
# restricted (note: further test cases for scopes and filters are in tests below)
dict(
is_enforced=True,
is_restricted=True,
expected_response=403,
),
@@ -146,25 +128,18 @@ class ThirdPartyAuthPermissionTest(TestCase):
@ddt.unpack
def test_jwt_without_scopes_and_filters(
self,
is_enforced,
is_restricted,
expected_response,
):
# pylint: disable=line-too-long
# Note: Unenforced tests can be retired when rollout waffle switch `oauth2.enforce_jwt_scopes` is retired.
# See https://github.com/edx/edx-drf-extensions/blob/609e1dbaa98f476b36e50143de97732f2f6a9b4f/edx_rest_framework_extensions/config.py#L5
# pylint: enable=line-too-long
with patch('edx_rest_framework_extensions.permissions.waffle.switch_is_active') as mock_toggle:
mock_toggle.return_value = is_enforced
user = self._create_user()
user = self._create_user()
auth_header = self._create_jwt_header(user, is_restricted=is_restricted)
request = self._create_request(
auth_header=auth_header,
)
auth_header = self._create_jwt_header(user, is_restricted=is_restricted)
request = self._create_request(
auth_header=auth_header,
)
response = self.SomeTpaClassView().dispatch(request)
self.assertEqual(response.status_code, expected_response)
response = self.SomeTpaClassView().dispatch(request)
self.assertEqual(response.status_code, expected_response)
@ddt.data(
# valid scopes
@@ -177,7 +152,7 @@ class ThirdPartyAuthPermissionTest(TestCase):
)
@ddt.unpack
def test_jwt_scopes(self, scopes, expected_response):
self._assert_jwt_enforced_restricted_case(
self._assert_jwt_restricted_case(
scopes=scopes,
filters=['tpa_provider:some_tpa_provider'],
expected_response=expected_response,
@@ -202,19 +177,17 @@ class ThirdPartyAuthPermissionTest(TestCase):
)
@ddt.unpack
def test_jwt_org_filters(self, filters, expected_response):
self._assert_jwt_enforced_restricted_case(
self._assert_jwt_restricted_case(
scopes=['tpa:read'],
filters=filters,
expected_response=expected_response,
)
def _assert_jwt_enforced_restricted_case(self, scopes, filters, expected_response):
with patch('edx_rest_framework_extensions.permissions.waffle.switch_is_active') as mock_toggle:
mock_toggle.return_value = True
user = self._create_user()
def _assert_jwt_restricted_case(self, scopes, filters, expected_response):
user = self._create_user()
auth_header = self._create_jwt_header(user, is_restricted=True, scopes=scopes, filters=filters)
request = self._create_request(auth_header=auth_header)
auth_header = self._create_jwt_header(user, is_restricted=True, scopes=scopes, filters=filters)
request = self._create_request(auth_header=auth_header)
response = self.SomeTpaClassView().dispatch(request, provider_id='some_tpa_provider')
self.assertEqual(response.status_code, expected_response)
response = self.SomeTpaClassView().dispatch(request, provider_id='some_tpa_provider')
self.assertEqual(response.status_code, expected_response)

View File

@@ -20,7 +20,6 @@ from lms.djangoapps.certificates.apis.v0.views import CertificatesDetailView, Ce
from lms.djangoapps.certificates.models import CertificateStatuses
from lms.djangoapps.certificates.tests.factories import GeneratedCertificateFactory
from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from openedx.core.djangoapps.user_api.tests.factories import UserPreferenceFactory
from openedx.core.djangoapps.user_authn.tests.utils import JWT_AUTH_TYPES, AuthAndScopesTestMixin, AuthType
from student.tests.factories import UserFactory
@@ -202,22 +201,21 @@ class CertificatesListRestApiTest(AuthAndScopesTestMixin, SharedModuleStoreTestC
)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.data(*product(list(AuthType)))
@ddt.unpack
def test_another_user(self, auth_type, scopes_enforced, mock_log):
def test_another_user(self, auth_type, mock_log):
"""
Returns 200 with empty list for OAuth, Session, and JWT auth.
Returns 200 for jwt_restricted and user:me filter unset.
"""
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type, requesting_user=self.other_student)
resp = self.get_response(auth_type, requesting_user=self.other_student)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 0)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 0)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.data(*product(list(AuthType)))
@ddt.unpack
def test_another_user_with_certs_shared_public(self, auth_type, scopes_enforced):
def test_another_user_with_certs_shared_public(self, auth_type):
"""
Returns 200 with cert list for OAuth, Session, and JWT auth.
Returns 200 for jwt_restricted and user:me filter unset.
@@ -230,15 +228,14 @@ class CertificatesListRestApiTest(AuthAndScopesTestMixin, SharedModuleStoreTestC
value='all_users',
).save()
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type, requesting_user=self.other_student)
resp = self.get_response(auth_type, requesting_user=self.other_student)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 1)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 1)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.data(*product(list(AuthType)))
@ddt.unpack
def test_another_user_with_certs_shared_custom(self, auth_type, scopes_enforced):
def test_another_user_with_certs_shared_custom(self, auth_type):
"""
Returns 200 with cert list for OAuth, Session, and JWT auth.
Returns 200 for jwt_restricted and user:me filter unset.
@@ -256,33 +253,24 @@ class CertificatesListRestApiTest(AuthAndScopesTestMixin, SharedModuleStoreTestC
value='all_users',
).save()
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type, requesting_user=self.other_student)
resp = self.get_response(auth_type, requesting_user=self.other_student)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 1)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 1)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log):
@ddt.data(JWT_AUTH_TYPES)
def test_jwt_on_behalf_of_other_user(self, auth_type, mock_log):
""" Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.other_student, auth_type, include_me_filter=True)
resp = self.get_response(AuthType.jwt, token=jwt_token)
jwt_token = self._create_jwt_token(self.other_student, auth_type, include_me_filter=True)
resp = self.get_response(AuthType.jwt, token=jwt_token)
if scopes_enforced and auth_type == AuthType.jwt_restricted:
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning)
else:
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 0)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log):
self.assertTrue(True) # pylint: disable=redundant-unittest-assert
if auth_type == AuthType.jwt_restricted:
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning)
else:
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assertEqual(len(resp.data), 0)
def test_no_certificate(self):
student_no_cert = UserFactory.create(password=self.user_password)

View File

@@ -51,7 +51,7 @@ payload.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We will no longer return expired *JWTs as access tokens* to Restricted
Applications. We will sign them with a *new key* that is not shared with
Applications. We will sign them with a *new key* that is not shared with
unprotected microservices.
* API endpoints that are exposed by other microservices and that
@@ -59,7 +59,7 @@ unprotected microservices.
they are also updated to enforce scopes.
* We do not want a lock-step deployment across all of our microservices.
We want to enable these changes without blocking on updating all
We want to enable these changes without blocking on updating all
microservices.
* We do not want to issue unexpired *Bearer tokens* to Restricted
@@ -88,8 +88,8 @@ unprotected microservices.
JWT tokens for Restricted Applications, but ONLY if:
* the token_type in the request equals *"jwt"* and
* a `feature toggle (switch)`_ named "oauth2.enforce_jwt_scopes"
is enabled.
* a `feature toggle (switch)`_ named "oauth2.enforce_jwt_scopes" is enabled.
* **Note:** the toggle has since been retired with the equivalent of ``enforce_jwt_scopes`` value of True.
.. _edx-platform settings: https://github.com/edx/edx-platform/blob/master/lms/envs/docs/README.rst
.. _JwtBuilder: https://github.com/edx/edx-platform/blob/d3d64970c36f36a96d684571ec5b48ed645618d8/openedx/core/lib/token_utils.py#L15
@@ -182,7 +182,7 @@ See 0007-include-organizations-in-tokens_ for decisions on this.
`feature toggle (switch)`_ named "oauth2.enforce_token_scopes". When the
switch is disabled, the new Permission class fails verification of all
Restricted Application requests.
.. _custom Permission: http://www.django-rest-framework.org/api-guide/permissions/#custom-permissions
.. _TokenHasScope: https://github.com/evonove/django-oauth-toolkit/blob/50e4df7d97af90439d27a73c5923f2c06a4961f2/oauth2_provider/contrib/rest_framework/permissions.py#L13
.. _`REST_FRAMEWORK's DEFAULT_PERMISSION_CLASSES`: http://www.django-rest-framework.org/api-guide/permissions/#setting-the-permission-policy
@@ -195,15 +195,15 @@ See 0007-include-organizations-in-tokens_ for decisions on this.
Consequences
------------
* Putting these changes behind a feature toggle allows us to decouple
* Putting these changes behind a feature toggle allows us to decouple
release from deployment and disable these changes in the event of
unexpected issues.
unexpected issues.
* Minimizing the places that the feature toggle is checked (at the
time of returning unexpired tokens and at the time of validating
requests), minimizes the complexity of the code.
* By associating Scopes with DOT Applications and not Restricted
* By associating Scopes with DOT Applications and not Restricted
Applications, we can eventually eliminate Restricted Applications
altogether. Besides, they were introduced as a temporary concept
until Scopes were fully rolled out.

View File

@@ -38,10 +38,6 @@ to test other grant types if they are substituted in the appropriate places.
iii. Click Save.
iv. If the temporary waffle switch `oauth2.enforce_jwt_scopes`_ is still defined in your codebase, you will need to enable this switch in the LMS under http://localhost:18000/admin/waffle/switch/add/
.. _oauth2.enforce_jwt_scopes: https://github.com/edx/edx-drf-extensions/blob/609e1dbaa98f476b36e50143de97732f2f6a9b4f/edx_rest_framework_extensions/config.py#L5-L18
3. Create a publicly accessible URL to the LMS if you are testing on devstack. This step is needed to support the redirecting handshake in the Authorization Code protocol from Google's server back to localhost.
i. Install `localtunnel`_:

View File

@@ -10,7 +10,6 @@ from edx_rbac.utils import create_role_auth_claim_for_user
from jwkest import jwk
from jwkest.jws import JWS
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from student.models import UserProfile, anonymous_id_for_user
@@ -58,7 +57,7 @@ def create_jwt_from_token(token_dict, oauth_adapter, use_asymmetric_key=None):
provide the given token's information.
use_asymmetric_key (Boolean): Optional. Whether the JWT should be signed
with this app's private key. If not provided, defaults to whether
ENFORCE_JWT_SCOPES is enabled and the OAuth client is restricted.
the OAuth client is restricted.
"""
access_token = oauth_adapter.get_access_token(token_dict['access_token'])
client = oauth_adapter.get_client_for_token(access_token)
@@ -102,7 +101,7 @@ def _create_jwt(
additional_claims (dict): Optional. Additional claims to include in the token.
use_asymmetric_key (Boolean): Optional. Whether the JWT should be signed
with this app's private key. If not provided, defaults to whether
ENFORCE_JWT_SCOPES is enabled and the OAuth client is restricted.
the OAuth client is restricted.
secret (string): Overrides configured JWT secret (signing) key.
"""
use_asymmetric_key = _get_use_asymmetric_key_value(is_restricted, use_asymmetric_key)
@@ -138,18 +137,7 @@ def _get_use_asymmetric_key_value(is_restricted, use_asymmetric_key):
"""
Returns the value to use for use_asymmetric_key.
"""
# TODO: (ARCH-162)
# If JWT scope enforcement is enabled, we need to sign tokens
# given to restricted applications with a key that
# other IDAs do not have access to. This prevents restricted
# applications from getting access to API endpoints available
# on other IDAs which have not yet been protected with the
# scope-related DRF permission classes. Once all endpoints have
# been protected, we can enable all IDAs to use the same new
# (asymmetric) key.
if use_asymmetric_key is None:
use_asymmetric_key = ENFORCE_JWT_SCOPES.is_enabled() and is_restricted
return use_asymmetric_key
return use_asymmetric_key or is_restricted
def _compute_time_fields(expires_in):

View File

@@ -14,7 +14,6 @@ from oauth2_provider.settings import oauth2_settings
from organizations.models import Organization
from pytz import utc
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from openedx.core.djangolib.markup import HTML
from openedx.core.lib.request_utils import get_request_or_stub
@@ -46,10 +45,9 @@ class RestrictedApplication(models.Model):
@classmethod
def should_expire_access_token(cls, application):
set_token_expired = not ENFORCE_JWT_SCOPES.is_enabled()
jwt_not_requested = get_request_or_stub().POST.get('token_type', '').lower() != 'jwt'
restricted_application = cls.objects.filter(application=application).exists()
return restricted_application and (jwt_not_requested or set_token_expired)
return restricted_application and jwt_not_requested
@classmethod
def verify_access_token_as_expired(cls, access_token):

View File

@@ -13,7 +13,6 @@ from openedx.core.djangoapps.oauth_dispatch import jwt as jwt_api
from openedx.core.djangoapps.oauth_dispatch.adapters import DOPAdapter, DOTAdapter
from openedx.core.djangoapps.oauth_dispatch.models import RestrictedApplication
from openedx.core.djangoapps.oauth_dispatch.tests.mixins import AccessTokenMixin
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from student.tests.factories import UserFactory
@@ -71,19 +70,14 @@ class TestCreateJWTs(AccessTokenMixin, TestCase):
jwt_token = self._create_jwt_for_token(DOTAdapter(), use_asymmetric_key=True)
self._assert_jwt_is_valid(jwt_token, should_be_asymmetric_key=True)
@ddt.data(*itertools.product(
(True, False),
(True, False),
))
@ddt.unpack
def test_dot_create_jwt_for_token(self, scopes_enforced, client_restricted):
with ENFORCE_JWT_SCOPES.override(scopes_enforced):
jwt_token = self._create_jwt_for_token(
DOTAdapter(),
use_asymmetric_key=None,
client_restricted=client_restricted,
)
self._assert_jwt_is_valid(jwt_token, should_be_asymmetric_key=scopes_enforced and client_restricted)
@ddt.data((True, False))
def test_dot_create_jwt_for_token(self, client_restricted):
jwt_token = self._create_jwt_for_token(
DOTAdapter(),
use_asymmetric_key=None,
client_restricted=client_restricted,
)
self._assert_jwt_is_valid(jwt_token, should_be_asymmetric_key=client_restricted)
@patch('openedx.core.djangoapps.oauth_dispatch.jwt.create_role_auth_claim_for_user')
@ddt.data(True, False)

View File

@@ -18,7 +18,6 @@ from oauth2_provider import models as dot_models
from organizations.tests.factories import OrganizationFactory
from provider import constants
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from student.tests.factories import UserFactory
from third_party_auth.tests.utils import ThirdPartyOAuthTestMixin, ThirdPartyOAuthTestMixinGoogle
@@ -213,24 +212,22 @@ class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _Dispa
self.assertIn('scope', data)
self.assertIn('token_type', data)
@ddt.data(False, True)
def test_restricted_non_jwt_access_token_fields(self, enforce_jwt_scopes_enabled):
with ENFORCE_JWT_SCOPES.override(enforce_jwt_scopes_enabled):
response = self._post_request(self.user, self.restricted_dot_app)
self.assertEqual(response.status_code, 200)
data = json.loads(response.content.decode('utf-8'))
self.assertIn('access_token', data)
self.assertIn('expires_in', data)
self.assertIn('scope', data)
self.assertIn('token_type', data)
def test_restricted_non_jwt_access_token_fields(self):
response = self._post_request(self.user, self.restricted_dot_app)
self.assertEqual(response.status_code, 200)
data = json.loads(response.content.decode('utf-8'))
self.assertIn('access_token', data)
self.assertIn('expires_in', data)
self.assertIn('scope', data)
self.assertIn('token_type', data)
# Verify token expiration.
self.assertEqual(data['expires_in'] < 0, True)
access_token = dot_models.AccessToken.objects.get(token=data['access_token'])
self.assertEqual(
models.RestrictedApplication.verify_access_token_as_expired(access_token),
True
)
# Verify token expiration.
self.assertEqual(data['expires_in'] < 0, True)
access_token = dot_models.AccessToken.objects.get(token=data['access_token'])
self.assertEqual(
models.RestrictedApplication.verify_access_token_as_expired(access_token),
True
)
@ddt.data('dop_app', 'dot_app')
def test_jwt_access_token_from_parameter(self, client_attr):
@@ -273,33 +270,28 @@ class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _Dispa
]
mock_set_custom_metric.assert_has_calls(expected_calls, any_order=True)
@ddt.data(
(False, True),
(True, False),
)
@ddt.unpack
def test_restricted_jwt_access_token(self, enforce_jwt_scopes_enabled, expiration_expected):
@ddt.data((True, False))
def test_restricted_jwt_access_token(self, expiration_expected):
"""
Verify that when requesting a JWT token from a restricted Application
within the DOT subsystem, that our claims is marked as already expired
(i.e. expiry set to Jan 1, 1970)
"""
with ENFORCE_JWT_SCOPES.override(enforce_jwt_scopes_enabled):
response = self._post_request(self.user, self.restricted_dot_app, token_type='jwt')
self.assertEqual(response.status_code, 200)
data = json.loads(response.content.decode('utf-8'))
response = self._post_request(self.user, self.restricted_dot_app, token_type='jwt')
self.assertEqual(response.status_code, 200)
data = json.loads(response.content.decode('utf-8'))
self.assertIn('expires_in', data)
self.assertEqual(data['expires_in'] < 0, expiration_expected)
self.assertEqual(data['token_type'], 'JWT')
self.assert_valid_jwt_access_token(
data['access_token'],
self.user,
data['scope'].split(' '),
should_be_expired=expiration_expected,
should_be_asymmetric_key=enforce_jwt_scopes_enabled,
should_be_restricted=True,
)
self.assertIn('expires_in', data)
self.assertEqual(data['expires_in'] < 0, expiration_expected)
self.assertEqual(data['token_type'], 'JWT')
self.assert_valid_jwt_access_token(
data['access_token'],
self.user,
data['scope'].split(' '),
should_be_expired=expiration_expected,
should_be_asymmetric_key=True,
should_be_restricted=True,
)
def test_restricted_access_token(self):
"""

View File

@@ -1,11 +0,0 @@
"""
Feature toggle code for oauth_dispatch.
"""
from edx_rest_framework_extensions.config import OAUTH_TOGGLE_NAMESPACE, SWITCH_ENFORCE_JWT_SCOPES
from openedx.core.djangoapps.waffle_utils import WaffleSwitch, WaffleSwitchNamespace
ENFORCE_JWT_SCOPES = WaffleSwitch(WaffleSwitchNamespace(name=OAUTH_TOGGLE_NAMESPACE), SWITCH_ENFORCE_JWT_SCOPES)

View File

@@ -14,7 +14,6 @@ from rest_framework import status
from openedx.core.djangoapps.oauth_dispatch.adapters.dot import DOTAdapter
from openedx.core.djangoapps.oauth_dispatch.jwt import _create_jwt
from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES
from student.tests.factories import UserFactory
@@ -143,106 +142,91 @@ class AuthAndScopesTestMixin(object):
resp = self.client.get(self.get_url(self.student.username))
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_self_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assert_success_response_for_student(resp)
@ddt.data(JWT_AUTH_TYPES)
def test_self_user(self, auth_type):
resp = self.get_response(auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assert_success_response_for_student(resp)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_staff_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type, requesting_user=self.global_staff)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assert_success_response_for_student(resp)
@ddt.data(list(AuthType))
def test_staff_user(self, auth_type):
resp = self.get_response(auth_type, requesting_user=self.global_staff)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
self.assert_success_response_for_student(resp)
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.unpack
def test_inactive_user(self, auth_type, scopes_enforced):
@ddt.data(list(AuthType))
def test_inactive_user(self, auth_type):
self.student.is_active = False
self.student.save()
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
resp = self.get_response(auth_type)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(list(AuthType), (True, False)))
@ddt.data(list(AuthType))
@ddt.unpack
def test_another_user(self, auth_type, scopes_enforced, mock_log):
def test_another_user(self, auth_type, mock_log):
"""
Returns 403 for OAuth, Session, and JWT auth with IsUserInUrl.
Returns 200 for jwt_restricted and user:me filter unset.
"""
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
resp = self.get_response(auth_type, requesting_user=self.other_student)
resp = self.get_response(auth_type, requesting_user=self.other_student)
# Restricted JWT tokens without the user:me filter have access to other users
expected_jwt_access_granted = scopes_enforced and auth_type == AuthType.jwt_restricted
# Restricted JWT tokens without the user:me filter have access to other users
expected_jwt_access_granted = auth_type == AuthType.jwt_restricted
self.assertEqual(
resp.status_code,
status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN,
)
if not expected_jwt_access_granted:
self._assert_in_log("IsUserInUrl", mock_log.info)
self.assertEqual(
resp.status_code,
status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN,
)
if not expected_jwt_access_granted:
self._assert_in_log("IsUserInUrl", mock_log.info)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_no_scopes(self, auth_type, scopes_enforced, mock_log):
@ddt.data(JWT_AUTH_TYPES)
def test_jwt_no_scopes(self, auth_type, mock_log):
""" Returns 403 when scopes are enforced with JwtHasScope. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[])
resp = self.get_response(AuthType.jwt, token=jwt_token)
jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[])
resp = self.get_response(AuthType.jwt, token=jwt_token)
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
is_enforced = auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
if is_enforced:
self._assert_in_log("JwtHasScope", mock_log.warning)
if is_enforced:
self._assert_in_log("JwtHasScope", mock_log.warning)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log):
@ddt.data(JWT_AUTH_TYPES)
def test_jwt_no_filter(self, auth_type, mock_log):
""" Returns 403 when scopes are enforced with JwtHasContentOrgFilterForRequestedCourse. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False)
resp = self.get_response(AuthType.jwt, token=jwt_token)
jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False)
resp = self.get_response(AuthType.jwt, token=jwt_token)
is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
is_enforced = auth_type == AuthType.jwt_restricted
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK)
if is_enforced:
self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning)
if is_enforced:
self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning)
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced):
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True)
@ddt.data(JWT_AUTH_TYPES)
def test_jwt_on_behalf_of_user(self, auth_type):
jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True)
resp = self.get_response(AuthType.jwt, token=jwt_token)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
resp = self.get_response(AuthType.jwt, token=jwt_token)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
@patch('edx_rest_framework_extensions.permissions.log')
@ddt.data(*product(JWT_AUTH_TYPES, (True, False)))
@ddt.unpack
def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log):
@ddt.data(JWT_AUTH_TYPES)
def test_jwt_on_behalf_of_other_user(self, auth_type, mock_log):
""" Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """
with ENFORCE_JWT_SCOPES.override(active=scopes_enforced):
jwt_token = self._create_jwt_token(self.other_student, auth_type, include_me_filter=True)
resp = self.get_response(AuthType.jwt, token=jwt_token)
jwt_token = self._create_jwt_token(self.other_student, auth_type, include_me_filter=True)
resp = self.get_response(AuthType.jwt, token=jwt_token)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
if scopes_enforced and auth_type == AuthType.jwt_restricted:
self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning)
else:
self._assert_in_log("IsUserInUrl", mock_log.info)
if auth_type == AuthType.jwt_restricted:
self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning)
else:
self._assert_in_log("IsUserInUrl", mock_log.info)
def test_valid_oauth_token(self):
resp = self.get_response(AuthType.oauth)