Although we are phasing out our support of OIDC, this particular feature will allow us to eliminate many of the settings we share across services. Instead of reading various endpoints and secret keys from settings or hardcoded values, services with the proper authentication backend can simply read (and cache) the information from this endpoint. ECOM-3629
119 lines
4.1 KiB
Python
119 lines
4.1 KiB
Python
"""Utilities for working with ID tokens."""
|
|
import json
|
|
from time import time
|
|
|
|
from Cryptodome.PublicKey import RSA
|
|
from django.conf import settings
|
|
from django.utils.functional import cached_property
|
|
from jwkest.jwk import KEYS, RSAKey
|
|
from jwkest.jws import JWS
|
|
|
|
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
|
|
from student.models import UserProfile, anonymous_id_for_user
|
|
|
|
|
|
class JwtBuilder(object):
|
|
"""Utility for building JWTs.
|
|
|
|
Unifies diverse approaches to JWT creation in a single class. This utility defaults to using the system's
|
|
JWT configuration.
|
|
|
|
NOTE: This utility class will allow you to override the signing key and audience claim to support those
|
|
clients which still require this. This approach to JWT creation is DEPRECATED. Avoid doing this for new clients.
|
|
|
|
Arguments:
|
|
user (User): User for which to generate the JWT.
|
|
|
|
Keyword Arguments:
|
|
asymmetric (Boolean): Whether the JWT should be signed with this app's private key.
|
|
secret (string): Overrides configured JWT secret (signing) key. Unused if an asymmetric signature is requested.
|
|
"""
|
|
|
|
def __init__(self, user, asymmetric=False, secret=None):
|
|
self.user = user
|
|
self.asymmetric = asymmetric
|
|
self.secret = secret
|
|
self.jwt_auth = configuration_helpers.get_value('JWT_AUTH', settings.JWT_AUTH)
|
|
|
|
def build_token(self, scopes, expires_in=None, aud=None, additional_claims=None):
|
|
"""Returns a JWT access token.
|
|
|
|
Arguments:
|
|
scopes (list): Scopes controlling which optional claims are included in the token.
|
|
|
|
Keyword Arguments:
|
|
expires_in (int): Time to token expiry, specified in seconds.
|
|
aud (string): Overrides configured JWT audience claim.
|
|
additional_claims (dict): Additional claims to include in the token.
|
|
|
|
Returns:
|
|
str: Encoded JWT
|
|
"""
|
|
now = int(time())
|
|
expires_in = expires_in or self.jwt_auth['JWT_EXPIRATION']
|
|
payload = {
|
|
# TODO Consider getting rid of this claim since we don't use it.
|
|
'aud': aud if aud else self.jwt_auth['JWT_AUDIENCE'],
|
|
'exp': now + expires_in,
|
|
'iat': now,
|
|
'iss': self.jwt_auth['JWT_ISSUER'],
|
|
'preferred_username': self.user.username,
|
|
'scopes': scopes,
|
|
'sub': anonymous_id_for_user(self.user, None),
|
|
}
|
|
|
|
if additional_claims:
|
|
payload.update(additional_claims)
|
|
|
|
for scope in scopes:
|
|
handler = self.claim_handlers.get(scope)
|
|
|
|
if handler:
|
|
handler(payload)
|
|
|
|
return self.encode(payload)
|
|
|
|
@cached_property
|
|
def claim_handlers(self):
|
|
"""Returns a dictionary mapping scopes to methods that will add claims to the JWT payload."""
|
|
|
|
return {
|
|
'email': self.attach_email_claim,
|
|
'profile': self.attach_profile_claim
|
|
}
|
|
|
|
def attach_email_claim(self, payload):
|
|
"""Add the email claim details to the JWT payload."""
|
|
payload['email'] = self.user.email
|
|
|
|
def attach_profile_claim(self, payload):
|
|
"""Add the profile claim details to the JWT payload."""
|
|
try:
|
|
# Some users (e.g., service users) may not have user profiles.
|
|
name = UserProfile.objects.get(user=self.user).name
|
|
except UserProfile.DoesNotExist:
|
|
name = None
|
|
|
|
payload.update({
|
|
'name': name,
|
|
'family_name': self.user.last_name,
|
|
'given_name': self.user.first_name,
|
|
'administrator': self.user.is_staff,
|
|
})
|
|
|
|
def encode(self, payload):
|
|
"""Encode the provided payload."""
|
|
keys = KEYS()
|
|
|
|
if self.asymmetric:
|
|
keys.add(RSAKey(key=RSA.importKey(settings.JWT_PRIVATE_SIGNING_KEY)))
|
|
algorithm = 'RS512'
|
|
else:
|
|
key = self.secret if self.secret else self.jwt_auth['JWT_SECRET_KEY']
|
|
keys.add({'key': key, 'kty': 'oct'})
|
|
algorithm = self.jwt_auth['JWT_ALGORITHM']
|
|
|
|
data = json.dumps(payload)
|
|
jws = JWS(data, alg=algorithm)
|
|
return jws.sign_compact(keys=keys)
|