diff --git a/cms/djangoapps/contentstore/views/program.py b/cms/djangoapps/contentstore/views/program.py index 5772d6edaf..eceeb18f94 100644 --- a/cms/djangoapps/contentstore/views/program.py +++ b/cms/djangoapps/contentstore/views/program.py @@ -1,14 +1,16 @@ """Programs views for use with Studio.""" from django.conf import settings from django.contrib.auth.decorators import login_required +from django.core.exceptions import ImproperlyConfigured from django.core.urlresolvers import reverse from django.http import Http404, JsonResponse from django.utils.decorators import method_decorator from django.views.generic import View +from provider.oauth2.models import Client from edxmako.shortcuts import render_to_response from openedx.core.djangoapps.programs.models import ProgramsApiConfig -from openedx.core.lib.token_utils import get_id_token +from openedx.core.lib.token_utils import JwtBuilder class ProgramAuthoringView(View): @@ -44,7 +46,24 @@ class ProgramsIdTokenView(View): def get(self, request, *args, **kwargs): """Generate and return a token, if the integration is enabled.""" if ProgramsApiConfig.current().is_studio_tab_enabled: - id_token = get_id_token(request.user, 'programs') - return JsonResponse({'id_token': id_token}) + # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name. + client_name = 'programs' + + try: + client = Client.objects.get(name=client_name) + except Client.DoesNotExist: + raise ImproperlyConfigured( + 'OAuth2 Client with name [{}] does not exist.'.format(client_name) + ) + + scopes = ['email', 'profile'] + expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION + jwt = JwtBuilder(request.user, secret=client.client_secret).build_token( + scopes, + expires_in, + aud=client.client_id + ) + + return JsonResponse({'id_token': jwt}) else: raise Http404 diff --git a/cms/djangoapps/contentstore/views/tests/test_programs.py b/cms/djangoapps/contentstore/views/tests/test_programs.py index 1f3042170e..b2f4db0fb9 100644 --- a/cms/djangoapps/contentstore/views/tests/test_programs.py +++ b/cms/djangoapps/contentstore/views/tests/test_programs.py @@ -147,18 +147,17 @@ class TestProgramsIdTokenView(ProgramsApiConfigMixin, SharedModuleStoreTestCase) self.assertEqual(response.status_code, 302) self.assertIn(settings.LOGIN_URL, response['Location']) - @mock.patch('cms.djangoapps.contentstore.views.program.get_id_token', return_value='test-id-token') - def test_config_enabled(self, mock_get_id_token): + @mock.patch('cms.djangoapps.contentstore.views.program.JwtBuilder.build_token') + def test_config_enabled(self, mock_build_token): """ Ensure the endpoint responds with a valid JSON payload when authoring is enabled. """ + mock_build_token.return_value = 'test-id-token' + ClientFactory(name=ProgramsApiConfig.OAUTH2_CLIENT_NAME, client_type=CONFIDENTIAL) + self.create_programs_config() response = self.client.get(self.path) self.assertEqual(response.status_code, 200) payload = json.loads(response.content) - self.assertEqual(payload, {"id_token": "test-id-token"}) - # this comparison is a little long-handed because we need to compare user instances directly - user, client_name = mock_get_id_token.call_args[0] - self.assertEqual(user, self.user) - self.assertEqual(client_name, "programs") + self.assertEqual(payload, {'id_token': 'test-id-token'}) diff --git a/cms/envs/aws.py b/cms/envs/aws.py index 4664e47fb2..3a07499311 100644 --- a/cms/envs/aws.py +++ b/cms/envs/aws.py @@ -448,6 +448,9 @@ MICROSITE_DATABASE_TEMPLATE_CACHE_TTL = ENV_TOKENS.get( # OpenID Connect issuer ID. Normally the URL of the authentication endpoint. OAUTH_OIDC_ISSUER = ENV_TOKENS['OAUTH_OIDC_ISSUER'] +#### JWT configuration #### +JWT_AUTH.update(ENV_TOKENS.get('JWT_AUTH', {})) + ######################## CUSTOM COURSES for EDX CONNECTOR ###################### if FEATURES.get('CUSTOM_COURSES_EDX'): INSTALLED_APPS += ('openedx.core.djangoapps.ccxcon',) diff --git a/cms/envs/common.py b/cms/envs/common.py index dc5ee57fcb..326d2acf06 100644 --- a/cms/envs/common.py +++ b/cms/envs/common.py @@ -75,6 +75,8 @@ from lms.envs.common import ( # constants for redirects app REDIRECT_CACHE_TIMEOUT, REDIRECT_CACHE_KEY_PREFIX, + + JWT_AUTH, ) from path import Path as path from warnings import simplefilter diff --git a/cms/envs/test.py b/cms/envs/test.py index 05ca7a2143..bf3f5e2b0c 100644 --- a/cms/envs/test.py +++ b/cms/envs/test.py @@ -35,6 +35,7 @@ from lms.envs.test import ( MEDIA_ROOT, MEDIA_URL, COMPREHENSIVE_THEME_DIRS, + JWT_AUTH, ) # mongo connection settings diff --git a/lms/djangoapps/edxnotes/helpers.py b/lms/djangoapps/edxnotes/helpers.py index 048b0c1b42..2184844e88 100644 --- a/lms/djangoapps/edxnotes/helpers.py +++ b/lms/djangoapps/edxnotes/helpers.py @@ -18,12 +18,13 @@ from django.conf import settings from django.core.exceptions import ImproperlyConfigured from django.core.urlresolvers import reverse from django.utils.translation import ugettext as _ +from provider.oauth2.models import Client from edxnotes.exceptions import EdxNotesParseError, EdxNotesServiceUnavailable from edxnotes.plugins import EdxNotesTab from courseware.views.views import get_current_child from courseware.access import has_access -from openedx.core.lib.token_utils import get_id_token +from openedx.core.lib.token_utils import JwtBuilder from student.models import anonymous_id_for_user from util.date_utils import get_default_time_display from xmodule.modulestore.django import modulestore @@ -52,7 +53,19 @@ def get_edxnotes_id_token(user): """ Returns generated ID Token for edxnotes. """ - return get_id_token(user, CLIENT_NAME) + # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name. + try: + client = Client.objects.get(name=CLIENT_NAME) + except Client.DoesNotExist: + raise ImproperlyConfigured( + 'OAuth2 Client with name [{}] does not exist.'.format(CLIENT_NAME) + ) + + scopes = ['email', 'profile'] + expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION + jwt = JwtBuilder(user, secret=client.client_secret).build_token(scopes, expires_in, aud=client.client_id) + + return jwt def get_token_url(course_id): diff --git a/lms/djangoapps/oauth_dispatch/tests/mixins.py b/lms/djangoapps/oauth_dispatch/tests/mixins.py index 900ee6a593..f51524c148 100644 --- a/lms/djangoapps/oauth_dispatch/tests/mixins.py +++ b/lms/djangoapps/oauth_dispatch/tests/mixins.py @@ -4,6 +4,8 @@ OAuth Dispatch test mixins import jwt from django.conf import settings +from student.models import UserProfile, anonymous_id_for_user + class AccessTokenMixin(object): """ Mixin for tests dealing with OAuth 2 access tokens. """ @@ -35,11 +37,21 @@ class AccessTokenMixin(object): 'iss': issuer, 'preferred_username': user.username, 'scopes': scopes, + 'sub': anonymous_id_for_user(user, None), } if 'email' in scopes: expected['email'] = user.email + if 'profile' in scopes: + try: + name = UserProfile.objects.get(user=user).name + except UserProfile.DoesNotExist: + name = None + + expected['name'] = name + expected['administrator'] = user.is_staff + self.assertDictContainsSubset(expected, payload) return payload diff --git a/lms/djangoapps/oauth_dispatch/views.py b/lms/djangoapps/oauth_dispatch/views.py index b78fab924a..64c7739554 100644 --- a/lms/djangoapps/oauth_dispatch/views.py +++ b/lms/djangoapps/oauth_dispatch/views.py @@ -17,6 +17,7 @@ from edx_oauth2_provider import views as dop_views # django-oauth2-provider vie from oauth2_provider import models as dot_models, views as dot_views # django-oauth-toolkit from openedx.core.djangoapps.theming import helpers +from openedx.core.lib.token_utils import JwtBuilder from . import adapters @@ -87,15 +88,6 @@ class AccessTokenView(_DispatchingView): dot_view = dot_views.TokenView dop_view = dop_views.AccessTokenView - @cached_property - def claim_handlers(self): - """ Returns a dictionary mapping scopes to methods that will add claims to the JWT payload. """ - - return { - 'email': self._attach_email_claim, - 'profile': self._attach_profile_claim - } - def dispatch(self, request, *args, **kwargs): response = super(AccessTokenView, self).dispatch(request, *args, **kwargs) @@ -103,7 +95,7 @@ class AccessTokenView(_DispatchingView): expires_in, scopes, user = self._decompose_access_token_response(request, response) content = { - 'access_token': self._generate_jwt(user, scopes, expires_in), + 'access_token': JwtBuilder(user).build_token(scopes, expires_in), 'expires_in': expires_in, 'token_type': 'JWT', 'scope': ' '.join(scopes), @@ -123,43 +115,6 @@ class AccessTokenView(_DispatchingView): expires_in = content['expires_in'] return expires_in, scopes, user - def _generate_jwt(self, user, scopes, expires_in): - """ Returns a JWT access token. """ - now = int(time()) - jwt_auth = helpers.get_value("JWT_AUTH", settings.JWT_AUTH) - payload = { - 'iss': jwt_auth['JWT_ISSUER'], - 'aud': jwt_auth['JWT_AUDIENCE'], - 'exp': now + expires_in, - 'iat': now, - 'preferred_username': user.username, - 'scopes': scopes, - } - - for scope in scopes: - handler = self.claim_handlers.get(scope) - - if handler: - handler(payload, user) - - secret = jwt_auth['JWT_SECRET_KEY'] - token = jwt.encode(payload, secret, algorithm=jwt_auth['JWT_ALGORITHM']) - - return token - - def _attach_email_claim(self, payload, user): - """ Add the email claim details to the JWT payload. """ - payload['email'] = user.email - - def _attach_profile_claim(self, payload, user): - """ Add the profile claim details to the JWT payload. """ - payload.update({ - 'family_name': user.last_name, - 'name': user.get_full_name(), - 'given_name': user.first_name, - 'administrator': user.is_staff, - }) - class AuthorizationView(_DispatchingView): """ diff --git a/openedx/core/djangoapps/api_admin/utils.py b/openedx/core/djangoapps/api_admin/utils.py index 07a7f2a0f1..850200a8ca 100644 --- a/openedx/core/djangoapps/api_admin/utils.py +++ b/openedx/core/djangoapps/api_admin/utils.py @@ -1,49 +1,14 @@ """ Course Discovery API Service. """ -import datetime - -import jwt from django.conf import settings from edx_rest_api_client.client import EdxRestApiClient -from openedx.core.djangoapps.theming import helpers -from student.models import UserProfile, anonymous_id_for_user - - -def get_id_token(user): - """ - Return a JWT for `user`, suitable for use with the course discovery service. - - Arguments: - user (User): User for whom to generate the JWT. - - Returns: - str: The JWT. - """ - try: - # Service users may not have user profiles. - full_name = UserProfile.objects.get(user=user).name - except UserProfile.DoesNotExist: - full_name = None - - now = datetime.datetime.utcnow() - expires_in = getattr(settings, 'OAUTH_ID_TOKEN_EXPIRATION', 30) - - payload = { - 'preferred_username': user.username, - 'name': full_name, - 'email': user.email, - 'administrator': user.is_staff, - 'iss': helpers.get_value('OAUTH_OIDC_ISSUER', settings.OAUTH_OIDC_ISSUER), - 'exp': now + datetime.timedelta(seconds=expires_in), - 'iat': now, - 'aud': helpers.get_value('JWT_AUTH', settings.JWT_AUTH)['JWT_AUDIENCE'], - 'sub': anonymous_id_for_user(user, None), - } - secret_key = helpers.get_value('JWT_AUTH', settings.JWT_AUTH)['JWT_SECRET_KEY'] - - return jwt.encode(payload, secret_key).decode('utf-8') +from openedx.core.lib.token_utils import JwtBuilder def course_discovery_api_client(user): """ Returns a Course Discovery API client setup with authentication for the specified user. """ - return EdxRestApiClient(settings.COURSE_CATALOG_API_URL, jwt=get_id_token(user)) + scopes = ['email', 'profile'] + expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION + jwt = JwtBuilder(user).build_token(scopes, expires_in) + + return EdxRestApiClient(settings.COURSE_CATALOG_API_URL, jwt=jwt) diff --git a/openedx/core/djangoapps/programs/tasks/v1/tasks.py b/openedx/core/djangoapps/programs/tasks/v1/tasks.py index b80a8111b5..1b4fa4dea9 100644 --- a/openedx/core/djangoapps/programs/tasks/v1/tasks.py +++ b/openedx/core/djangoapps/programs/tasks/v1/tasks.py @@ -5,13 +5,15 @@ from celery import task from celery.utils.log import get_task_logger # pylint: disable=no-name-in-module, import-error from django.conf import settings from django.contrib.auth.models import User +from django.core.exceptions import ImproperlyConfigured from edx_rest_api_client.client import EdxRestApiClient +from provider.oauth2.models import Client from openedx.core.djangoapps.credentials.models import CredentialsApiConfig from openedx.core.djangoapps.credentials.utils import get_user_credentials from openedx.core.djangoapps.programs.models import ProgramsApiConfig from openedx.core.djangoapps.programs.utils import ProgramProgressMeter -from openedx.core.lib.token_utils import get_id_token +from openedx.core.lib.token_utils import JwtBuilder LOGGER = get_task_logger(__name__) @@ -31,8 +33,21 @@ def get_api_client(api_config, student): EdxRestApiClient """ - id_token = get_id_token(student, api_config.OAUTH2_CLIENT_NAME) - return EdxRestApiClient(api_config.internal_api_url, jwt=id_token) + # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name. + client_name = api_config.OAUTH2_CLIENT_NAME + + try: + client = Client.objects.get(name=client_name) + except Client.DoesNotExist: + raise ImproperlyConfigured( + 'OAuth2 Client with name [{}] does not exist.'.format(client_name) + ) + + scopes = ['email', 'profile'] + expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION + jwt = JwtBuilder(student, secret=client.client_secret).build_token(scopes, expires_in, aud=client.client_id) + + return EdxRestApiClient(api_config.internal_api_url, jwt=jwt) def get_completed_programs(student): diff --git a/openedx/core/djangoapps/programs/tasks/v1/tests/test_tasks.py b/openedx/core/djangoapps/programs/tasks/v1/tests/test_tasks.py index cbd75a13da..642b2d20dc 100644 --- a/openedx/core/djangoapps/programs/tasks/v1/tests/test_tasks.py +++ b/openedx/core/djangoapps/programs/tasks/v1/tests/test_tasks.py @@ -34,8 +34,8 @@ class GetApiClientTestCase(TestCase, ProgramsApiConfigMixin): Test the get_api_client function """ - @mock.patch(TASKS_MODULE + '.get_id_token') - def test_get_api_client(self, mock_get_id_token): + @mock.patch(TASKS_MODULE + '.JwtBuilder.build_token') + def test_get_api_client(self, mock_build_token): """ Ensure the function is making the right API calls based on inputs """ @@ -45,10 +45,9 @@ class GetApiClientTestCase(TestCase, ProgramsApiConfigMixin): internal_service_url='http://foo', api_version_number=99, ) - mock_get_id_token.return_value = 'test-token' + mock_build_token.return_value = 'test-token' api_client = tasks.get_api_client(api_config, student) - self.assertEqual(mock_get_id_token.call_args[0], (student, 'programs')) self.assertEqual(api_client._store['base_url'], 'http://foo/api/v99/') # pylint: disable=protected-access self.assertEqual(api_client._store['session'].auth.token, 'test-token') # pylint: disable=protected-access diff --git a/openedx/core/lib/edx_api_utils.py b/openedx/core/lib/edx_api_utils.py index f9aff9c410..0a561e4bf2 100644 --- a/openedx/core/lib/edx_api_utils.py +++ b/openedx/core/lib/edx_api_utils.py @@ -2,10 +2,13 @@ from __future__ import unicode_literals import logging +from django.conf import settings from django.core.cache import cache +from django.core.exceptions import ImproperlyConfigured from edx_rest_api_client.client import EdxRestApiClient +from provider.oauth2.models import Client -from openedx.core.lib.token_utils import get_id_token +from openedx.core.lib.token_utils import JwtBuilder log = logging.getLogger(__name__) @@ -48,7 +51,20 @@ def get_edx_api_data(api_config, user, resource, try: if not api: - jwt = get_id_token(user, api_config.OAUTH2_CLIENT_NAME) + # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name. + client_name = api_config.OAUTH2_CLIENT_NAME + + try: + client = Client.objects.get(name=client_name) + except Client.DoesNotExist: + raise ImproperlyConfigured( + 'OAuth2 Client with name [{}] does not exist.'.format(client_name) + ) + + scopes = ['email', 'profile'] + expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION + jwt = JwtBuilder(user, secret=client.client_secret).build_token(scopes, expires_in, aud=client.client_id) + api = EdxRestApiClient(api_config.internal_api_url, jwt=jwt) except: # pylint: disable=bare-except log.exception('Failed to initialize the %s API client.', api_config.API_NAME) diff --git a/openedx/core/lib/tests/test_token_utils.py b/openedx/core/lib/tests/test_token_utils.py index 86ce51b280..e027e44354 100644 --- a/openedx/core/lib/tests/test_token_utils.py +++ b/openedx/core/lib/tests/test_token_utils.py @@ -1,71 +1,60 @@ -"""Tests covering utilities for working with ID tokens.""" -import calendar -import datetime - +"""Tests covering JWT construction utilities.""" import ddt -from django.conf import settings -from django.core.exceptions import ImproperlyConfigured from django.test import TestCase -from django.test.utils import override_settings -import freezegun import jwt from nose.plugins.attrib import attr -from edx_oauth2_provider.tests.factories import ClientFactory -from provider.constants import CONFIDENTIAL -from openedx.core.lib.token_utils import get_id_token -from student.models import anonymous_id_for_user +from lms.djangoapps.oauth_dispatch.tests import mixins +from openedx.core.lib.token_utils import JwtBuilder from student.tests.factories import UserFactory, UserProfileFactory @attr('shard_2') @ddt.ddt -class TestIdTokenGeneration(TestCase): - """Tests covering ID token generation.""" - client_name = 'edx-dummy-client' +class TestJwtBuilder(mixins.AccessTokenMixin, TestCase): + """ + Test class for JwtBuilder. + """ + + expires_in = 10 def setUp(self): - super(TestIdTokenGeneration, self).setUp() + super(TestJwtBuilder, self).setUp() - self.oauth2_client = ClientFactory(name=self.client_name, client_type=CONFIDENTIAL) + self.user = UserFactory() + self.profile = UserProfileFactory(user=self.user) - self.user = UserFactory.build() - self.user.save() + @ddt.data( + [], + ['email'], + ['profile'], + ['email', 'profile'], + ) + def test_jwt_construction(self, scopes): + """ + Verify that a valid JWT is built, including claims for the requested scopes. + """ + token = JwtBuilder(self.user).build_token(scopes, self.expires_in) + self.assert_valid_jwt_access_token(token, self.user, scopes) - @override_settings(OAUTH_OIDC_ISSUER='test-issuer', OAUTH_ID_TOKEN_EXPIRATION=1) - @freezegun.freeze_time('2015-01-01 12:00:00') - @ddt.data(True, False) - def test_get_id_token(self, has_profile): - """Verify that ID tokens are signed with the correct secret and generated with the correct claims.""" - full_name = UserProfileFactory(user=self.user).name if has_profile else None + def test_user_profile_missing(self): + """ + Verify that token construction succeeds if the UserProfile is missing. + """ + self.profile.delete() # pylint: disable=no-member - token = get_id_token(self.user, self.client_name) + scopes = ['profile'] + token = JwtBuilder(self.user).build_token(scopes, self.expires_in) + self.assert_valid_jwt_access_token(token, self.user, scopes) - payload = jwt.decode( - token, - self.oauth2_client.client_secret, - audience=self.oauth2_client.client_id, - issuer=settings.OAUTH_OIDC_ISSUER, - ) + def test_override_secret_and_audience(self): + """ + Verify that the signing key and audience can be overridden. + """ + secret = 'avoid-this' + audience = 'avoid-this-too' + scopes = [] - now = datetime.datetime.utcnow() - expiration = now + datetime.timedelta(seconds=settings.OAUTH_ID_TOKEN_EXPIRATION) + token = JwtBuilder(self.user, secret=secret).build_token(scopes, self.expires_in, aud=audience) - expected_payload = { - 'preferred_username': self.user.username, - 'name': full_name, - 'email': self.user.email, - 'administrator': self.user.is_staff, - 'iss': settings.OAUTH_OIDC_ISSUER, - 'exp': calendar.timegm(expiration.utctimetuple()), - 'iat': calendar.timegm(now.utctimetuple()), - 'aud': self.oauth2_client.client_id, - 'sub': anonymous_id_for_user(self.user, None), - } - - self.assertEqual(payload, expected_payload) - - def test_get_id_token_invalid_client(self): - """Verify that ImproperlyConfigured is raised when an invalid client name is provided.""" - with self.assertRaises(ImproperlyConfigured): - get_id_token(self.user, 'does-not-exist') + jwt.decode(token, secret, audience=audience) diff --git a/openedx/core/lib/token_utils.py b/openedx/core/lib/token_utils.py index 9172e4d956..d351a8d226 100644 --- a/openedx/core/lib/token_utils.py +++ b/openedx/core/lib/token_utils.py @@ -1,120 +1,100 @@ """Utilities for working with ID tokens.""" -import datetime +from time import time from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_pem_private_key from django.conf import settings -from django.core.exceptions import ImproperlyConfigured +from django.utils.functional import cached_property import jwt -from provider.oauth2.models import Client +from openedx.core.djangoapps.theming import helpers from student.models import UserProfile, anonymous_id_for_user -def get_id_token(user, client_name, secret_key=None): - """Construct a JWT for use with the named client. +class JwtBuilder(object): + """Utility for building JWTs. - The JWT is signed with the named client's secret, and includes the following claims: + Unifies diverse approaches to JWT creation in a single class. This utility defaults to using the system's + JWT configuration. - preferred_username (str): The user's username. The claim name is borrowed from edx-oauth2-provider. - name (str): The user's full name. - email (str): The user's email address. - administrator (Boolean): Whether the user has staff permissions. - iss (str): Registered claim. Identifies the principal that issued the JWT. - exp (int): Registered claim. Identifies the expiration time on or after which - the JWT must NOT be accepted for processing. - iat (int): Registered claim. Identifies the time at which the JWT was issued. - aud (str): Registered claim. Identifies the recipients that the JWT is intended for. This implementation - uses the named client's ID. - sub (int): Registered claim. Identifies the user. This implementation uses the raw user id. - - Arguments: - user (User): User for which to generate the JWT. - client_name (unicode): Name of the OAuth2 Client for which the token is intended. - secret_key (str): Optional secret key for signing the JWT. Defaults to the configured client secret - if not provided. - - Returns: - str: the JWT - - Raises: - ImproperlyConfigured: If no OAuth2 Client with the provided name exists. - """ - try: - client = Client.objects.get(name=client_name) - except Client.DoesNotExist: - raise ImproperlyConfigured('OAuth2 Client with name [%s] does not exist' % client_name) - - try: - # Service users may not have user profiles. - full_name = UserProfile.objects.get(user=user).name - except UserProfile.DoesNotExist: - full_name = None - - now = datetime.datetime.utcnow() - expires_in = getattr(settings, 'OAUTH_ID_TOKEN_EXPIRATION', 30) - - payload = { - 'preferred_username': user.username, - 'name': full_name, - 'email': user.email, - 'administrator': user.is_staff, - 'iss': settings.OAUTH_OIDC_ISSUER, - 'exp': now + datetime.timedelta(seconds=expires_in), - 'iat': now, - 'aud': client.client_id, - 'sub': anonymous_id_for_user(user, None), - } - - if secret_key is None: - secret_key = client.client_secret - - return jwt.encode(payload, secret_key) - - -def get_asymmetric_token(user, client_id): - """Construct a JWT signed with this app's private key. - - The JWT includes the following claims: - - preferred_username (str): The user's username. The claim name is borrowed from edx-oauth2-provider. - name (str): The user's full name. - email (str): The user's email address. - administrator (Boolean): Whether the user has staff permissions. - iss (str): Registered claim. Identifies the principal that issued the JWT. - exp (int): Registered claim. Identifies the expiration time on or after which - the JWT must NOT be accepted for processing. - iat (int): Registered claim. Identifies the time at which the JWT was issued. - sub (int): Registered claim. Identifies the user. This implementation uses the raw user id. + NOTE: This utility class will allow you to override the signing key and audience claim to support those + clients which still require this. This approach to JWT creation is DEPRECATED. Avoid doing this for new clients. Arguments: user (User): User for which to generate the JWT. - Returns: - str: the JWT - + Keyword Arguments: + asymmetric (Boolean): Whether the JWT should be signed with this app's private key. + secret (string): Overrides configured JWT secret (signing) key. Unused if an asymmetric signature is requested. """ - private_key = load_pem_private_key(settings.PRIVATE_RSA_KEY, None, default_backend()) + def __init__(self, user, asymmetric=False, secret=None): + self.user = user + self.asymmetric = asymmetric + self.secret = secret + self.jwt_auth = helpers.get_value('JWT_AUTH', settings.JWT_AUTH) - try: - # Service users may not have user profiles. - full_name = UserProfile.objects.get(user=user).name - except UserProfile.DoesNotExist: - full_name = None + def build_token(self, scopes, expires_in, aud=None): + """Returns a JWT access token. - now = datetime.datetime.utcnow() - expires_in = getattr(settings, 'OAUTH_ID_TOKEN_EXPIRATION', 30) + Arguments: + scopes (list): Scopes controlling which optional claims are included in the token. + expires_in (int): Time to token expiry, specified in seconds. - payload = { - 'preferred_username': user.username, - 'name': full_name, - 'email': user.email, - 'administrator': user.is_staff, - 'iss': settings.OAUTH_OIDC_ISSUER, - 'exp': now + datetime.timedelta(seconds=expires_in), - 'iat': now, - 'aud': client_id, - 'sub': anonymous_id_for_user(user, None), - } + Keyword Arguments: + aud (string): Overrides configured JWT audience claim. + """ + now = int(time()) + payload = { + 'aud': aud if aud else self.jwt_auth['JWT_AUDIENCE'], + 'exp': now + expires_in, + 'iat': now, + 'iss': self.jwt_auth['JWT_ISSUER'], + 'preferred_username': self.user.username, + 'scopes': scopes, + 'sub': anonymous_id_for_user(self.user, None), + } - return jwt.encode(payload, private_key, algorithm='RS512') + for scope in scopes: + handler = self.claim_handlers.get(scope) + + if handler: + handler(payload) + + return self.encode(payload) + + @cached_property + def claim_handlers(self): + """Returns a dictionary mapping scopes to methods that will add claims to the JWT payload.""" + + return { + 'email': self.attach_email_claim, + 'profile': self.attach_profile_claim + } + + def attach_email_claim(self, payload): + """Add the email claim details to the JWT payload.""" + payload['email'] = self.user.email + + def attach_profile_claim(self, payload): + """Add the profile claim details to the JWT payload.""" + try: + # Some users (e.g., service users) may not have user profiles. + name = UserProfile.objects.get(user=self.user).name + except UserProfile.DoesNotExist: + name = None + + payload.update({ + 'name': name, + 'administrator': self.user.is_staff, + }) + + def encode(self, payload): + """Encode the provided payload.""" + if self.asymmetric: + secret = load_pem_private_key(settings.PRIVATE_RSA_KEY, None, default_backend()) + algorithm = 'RS512' + else: + secret = self.secret if self.secret else self.jwt_auth['JWT_SECRET_KEY'] + algorithm = self.jwt_auth['JWT_ALGORITHM'] + + return jwt.encode(payload, secret, algorithm=algorithm)