* feat: renamed DeprecatedRestApiClient from EdxRestApiClient * chore: Updating Python Requirements (#33917) * fix: Put slumber in the proper alphabetical order. --------- Co-authored-by: Yagnesh <yagnesh.nayi@manprax.com> Co-authored-by: edX requirements bot <49161187+edx-requirements-bot@users.noreply.github.com>
339 lines
11 KiB
Python
339 lines
11 KiB
Python
""" Commerce API Service. """
|
|
|
|
|
|
import requests
|
|
import slumber
|
|
import datetime
|
|
import json
|
|
import os
|
|
import socket
|
|
from django.conf import settings
|
|
from edx_rest_api_client.auth import SuppliedJwtAuth, JwtAuth
|
|
from edx_django_utils.cache import TieredCache
|
|
from eventtracking import tracker
|
|
|
|
from openedx.core.djangoapps.oauth_dispatch.jwt import create_jwt_for_user
|
|
from requests.auth import AuthBase
|
|
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
|
|
from edx_django_utils.monitoring import set_custom_attribute
|
|
|
|
# When caching tokens, use this value to err on expiring tokens a little early so they are
|
|
# sure to be valid at the time they are used.
|
|
ACCESS_TOKEN_EXPIRED_THRESHOLD_SECONDS = 5
|
|
|
|
# How long should we wait to connect to the auth service.
|
|
# https://requests.readthedocs.io/en/master/user/advanced/#timeouts
|
|
REQUEST_CONNECT_TIMEOUT = 3.05
|
|
__version__ = '5.6.1'
|
|
REQUEST_READ_TIMEOUT = 5
|
|
|
|
ECOMMERCE_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
|
|
|
|
|
|
class BearerAuth(AuthBase):
|
|
"""
|
|
Attaches Bearer Authentication to the given Request object.
|
|
"""
|
|
|
|
def __init__(self, token):
|
|
"""
|
|
Instantiate the auth class.
|
|
"""
|
|
self.token = token
|
|
|
|
def __call__(self, r):
|
|
"""
|
|
Update the request headers.
|
|
"""
|
|
r.headers['Authorization'] = f'Bearer {self.token}'
|
|
return r
|
|
|
|
|
|
def user_agent():
|
|
"""
|
|
Return a User-Agent that identifies this client.
|
|
|
|
Example:
|
|
python-requests/2.9.1 edx-rest-api-client/1.7.2 ecommerce
|
|
|
|
The last item in the list will be the application name, taken from the
|
|
OS environment variable EDX_REST_API_CLIENT_NAME. If that environment
|
|
variable is not set, it will default to the hostname.
|
|
"""
|
|
client_name = 'unknown_client_name'
|
|
try:
|
|
client_name = os.environ.get("EDX_REST_API_CLIENT_NAME") or socket.gethostbyname(socket.gethostname())
|
|
except: # pylint: disable=bare-except
|
|
pass # using 'unknown_client_name' is good enough. no need to log.
|
|
return "{} edx-rest-api-client/{} {}".format(
|
|
requests.utils.default_user_agent(), # e.g. "python-requests/2.9.1"
|
|
__version__, # version of this client
|
|
client_name
|
|
)
|
|
|
|
|
|
USER_AGENT = user_agent()
|
|
|
|
|
|
def _get_oauth_url(url):
|
|
"""
|
|
Returns the complete url for the oauth2 endpoint.
|
|
|
|
Args:
|
|
url (str): base url of the LMS oauth endpoint, which can optionally include some or all of the path
|
|
``/oauth2/access_token``. Common example settings that would work for ``url`` would include:
|
|
LMS_BASE_URL = 'http://edx.devstack.lms:18000'
|
|
BACKEND_SERVICE_EDX_OAUTH2_PROVIDER_URL = 'http://edx.devstack.lms:18000/oauth2'
|
|
|
|
"""
|
|
stripped_url = url.rstrip('/')
|
|
if stripped_url.endswith('/access_token'):
|
|
return url
|
|
|
|
if stripped_url.endswith('/oauth2'):
|
|
return stripped_url + '/access_token'
|
|
|
|
return stripped_url + '/oauth2/access_token'
|
|
|
|
|
|
def get_oauth_access_token(url, client_id, client_secret, token_type='jwt', grant_type='client_credentials',
|
|
refresh_token=None,
|
|
timeout=(REQUEST_CONNECT_TIMEOUT, REQUEST_READ_TIMEOUT)):
|
|
"""
|
|
Retrieves OAuth 2.0 access token using the given grant type.
|
|
|
|
Args:
|
|
url (str): Oauth2 access token endpoint, optionally including part of the path.
|
|
client_id (str): client ID
|
|
client_secret (str): client secret
|
|
Kwargs:
|
|
token_type (str): Type of token to return. Options include bearer and jwt.
|
|
grant_type (str): One of 'client_credentials' or 'refresh_token'
|
|
refresh_token (str): The previous access token (for grant_type=refresh_token)
|
|
|
|
Raises:
|
|
requests.RequestException if there is a problem retrieving the access token.
|
|
|
|
Returns:
|
|
tuple: Tuple containing (access token string, expiration datetime).
|
|
|
|
"""
|
|
now = datetime.datetime.utcnow()
|
|
data = {
|
|
'grant_type': grant_type,
|
|
'client_id': client_id,
|
|
'client_secret': client_secret,
|
|
'token_type': token_type,
|
|
}
|
|
if refresh_token:
|
|
data['refresh_token'] = refresh_token
|
|
else:
|
|
assert grant_type != 'refresh_token', "refresh_token parameter required"
|
|
|
|
response = requests.post(
|
|
_get_oauth_url(url),
|
|
data=data,
|
|
headers={
|
|
'User-Agent': USER_AGENT,
|
|
},
|
|
timeout=timeout
|
|
)
|
|
|
|
response.raise_for_status() # Raise an exception for bad status codes.
|
|
try:
|
|
data = response.json()
|
|
access_token = data['access_token']
|
|
expires_in = data['expires_in']
|
|
except (KeyError, json.decoder.JSONDecodeError) as json_error:
|
|
raise requests.RequestException(response=response) from json_error
|
|
|
|
expires_at = now + datetime.timedelta(seconds=expires_in)
|
|
|
|
return access_token, expires_at
|
|
|
|
|
|
def get_and_cache_oauth_access_token(url, client_id, client_secret, token_type='jwt', grant_type='client_credentials',
|
|
refresh_token=None,
|
|
timeout=(REQUEST_CONNECT_TIMEOUT, REQUEST_READ_TIMEOUT)):
|
|
"""
|
|
Retrieves a possibly cached OAuth 2.0 access token using the given grant type.
|
|
|
|
See ``get_oauth_access_token`` for usage details.
|
|
|
|
First retrieves the access token from the cache and ensures it has not expired. If
|
|
the access token either wasn't found in the cache, or was expired, retrieves a new
|
|
access token and caches it for the lifetime of the token.
|
|
|
|
Note: Consider tokens to be expired ACCESS_TOKEN_EXPIRED_THRESHOLD_SECONDS early
|
|
to ensure the token won't expire while it is in use.
|
|
|
|
Returns:
|
|
tuple: Tuple containing (access token string, expiration datetime).
|
|
|
|
"""
|
|
oauth_url = _get_oauth_url(url)
|
|
cache_key = 'edx_rest_api_client.access_token.{}.{}.{}.{}'.format(
|
|
token_type,
|
|
grant_type,
|
|
client_id,
|
|
oauth_url,
|
|
)
|
|
cached_response = TieredCache.get_cached_response(cache_key)
|
|
|
|
# Attempt to get an unexpired cached access token
|
|
if cached_response.is_found:
|
|
_, expiration = cached_response.value
|
|
# Double-check the token hasn't already expired as a safety net.
|
|
adjusted_expiration = expiration - datetime.timedelta(seconds=ACCESS_TOKEN_EXPIRED_THRESHOLD_SECONDS)
|
|
if datetime.datetime.utcnow() < adjusted_expiration:
|
|
return cached_response.value
|
|
|
|
# Get a new access token if no unexpired access token was found in the cache.
|
|
oauth_access_token_response = get_oauth_access_token(
|
|
oauth_url,
|
|
client_id,
|
|
client_secret,
|
|
grant_type=grant_type,
|
|
refresh_token=refresh_token,
|
|
timeout=timeout,
|
|
)
|
|
|
|
# Cache the new access token with an expiration matching the lifetime of the token.
|
|
_, expiration = oauth_access_token_response
|
|
expires_in = (expiration - datetime.datetime.utcnow()).seconds - ACCESS_TOKEN_EXPIRED_THRESHOLD_SECONDS
|
|
TieredCache.set_all_tiers(cache_key, oauth_access_token_response, expires_in)
|
|
|
|
return oauth_access_token_response
|
|
|
|
|
|
def create_tracking_context(user):
|
|
""" Assembles attributes from user and request objects to be sent along
|
|
in E-Commerce API calls for tracking purposes. """
|
|
context_tracker = tracker.get_tracker().resolve_context()
|
|
|
|
return {
|
|
'lms_user_id': user.id,
|
|
'lms_ip': context_tracker.get('ip'),
|
|
}
|
|
|
|
|
|
def is_commerce_service_configured():
|
|
"""
|
|
Return a Boolean indicating whether or not configuration is present to use the external commerce service.
|
|
"""
|
|
ecommerce_api_url = configuration_helpers.get_value('ECOMMERCE_API_URL', settings.ECOMMERCE_API_URL)
|
|
return bool(ecommerce_api_url)
|
|
|
|
|
|
def get_ecommerce_api_base_url():
|
|
"""
|
|
Returns an E-Commerce API base URL.
|
|
"""
|
|
return configuration_helpers.get_value('ECOMMERCE_API_URL', settings.ECOMMERCE_API_URL)
|
|
|
|
|
|
def ecommerce_api_client(user, session=None):
|
|
"""
|
|
Returns an E-Commerce API client setup with authentication for the specified user.
|
|
|
|
DEPRECATED: To be replaced with get_ecommerce_api_client.
|
|
"""
|
|
claims = {'tracking_context': create_tracking_context(user)}
|
|
scopes = [
|
|
'user_id',
|
|
'email',
|
|
'profile'
|
|
]
|
|
jwt = create_jwt_for_user(user, additional_claims=claims, scopes=scopes)
|
|
|
|
return DeprecatedRestApiClient(
|
|
configuration_helpers.get_value('ECOMMERCE_API_URL', settings.ECOMMERCE_API_URL),
|
|
jwt=jwt,
|
|
session=session
|
|
)
|
|
|
|
|
|
def get_ecommerce_api_client(user):
|
|
"""
|
|
Returns an E-Commerce API client setup with authentication for the specified user.
|
|
"""
|
|
claims = {'tracking_context': create_tracking_context(user)}
|
|
scopes = [
|
|
'user_id',
|
|
'email',
|
|
'profile'
|
|
]
|
|
jwt = create_jwt_for_user(user, additional_claims=claims, scopes=scopes)
|
|
|
|
client = requests.Session()
|
|
client.auth = SuppliedJwtAuth(jwt)
|
|
|
|
return client
|
|
|
|
|
|
class DeprecatedRestApiClient(slumber.API):
|
|
"""
|
|
API client for edX REST API.
|
|
|
|
(deprecated) See docs/decisions/0002-oauth-api-client-replacement.rst.
|
|
"""
|
|
|
|
@classmethod
|
|
def user_agent(cls):
|
|
return USER_AGENT
|
|
|
|
@classmethod
|
|
def get_oauth_access_token(cls, url, client_id, client_secret, token_type='bearer',
|
|
timeout=(REQUEST_CONNECT_TIMEOUT, REQUEST_READ_TIMEOUT)):
|
|
"""
|
|
To help transition to OAuthAPIClient, use DeprecatedRestApiClient.
|
|
get_and_cache_jwt_oauth_access_token instead'
|
|
|
|
'of DeprecatedRestApiClient.get_oauth_access_token to share cached jwt token used by OAuthAPIClient.'
|
|
|
|
"""
|
|
return get_oauth_access_token(url, client_id, client_secret, token_type=token_type, timeout=timeout)
|
|
|
|
@classmethod
|
|
def get_and_cache_jwt_oauth_access_token(cls, url, client_id, client_secret,
|
|
timeout=(REQUEST_CONNECT_TIMEOUT, REQUEST_READ_TIMEOUT)):
|
|
return get_and_cache_oauth_access_token(url, client_id, client_secret, token_type="jwt", timeout=timeout)
|
|
|
|
def __init__(self, url, signing_key=None, username=None, full_name=None, email=None,
|
|
timeout=5, issuer=None, expires_in=30, tracking_context=None, oauth_access_token=None,
|
|
session=None, jwt=None, **kwargs):
|
|
"""
|
|
DeprecatedRestApiClient is deprecated. Use OAuthAPIClient instead.
|
|
|
|
Instantiate a new client. You can pass extra kwargs to Slumber like
|
|
'append_slash'.
|
|
|
|
Raises:
|
|
ValueError: If a URL is not provided.
|
|
|
|
"""
|
|
set_custom_attribute('api_client', 'DeprecatedRestApiClient')
|
|
if not url:
|
|
raise ValueError('An API url must be supplied!')
|
|
|
|
if jwt:
|
|
auth = SuppliedJwtAuth(jwt)
|
|
elif oauth_access_token:
|
|
auth = BearerAuth(oauth_access_token)
|
|
elif signing_key and username:
|
|
auth = JwtAuth(username, full_name, email, signing_key,
|
|
issuer=issuer, expires_in=expires_in, tracking_context=tracking_context)
|
|
else:
|
|
auth = None
|
|
|
|
session = session or requests.Session()
|
|
session.headers['User-Agent'] = self.user_agent()
|
|
|
|
session.timeout = timeout
|
|
super().__init__(
|
|
url,
|
|
session=session,
|
|
auth=auth,
|
|
**kwargs
|
|
)
|