feat: add more authentication information to swagger (#35674)

* feat: add more authentication information to swagger
* updates the `docs-settings` to make the generated swagger
  `securityDefinitions` include both JWT and CSRF methods, as well as
  basic. A few linter fixes happened as a side effect.
* Put in wordier descriptions for all three, since we don't have great
  shared documentation about authn/authz.
* Added CSRF to `login_session`, which also serves as a proof of concept
  for other endpoits
* Also regenerated the swagger doc, which picked up some extra changes.

Generated swagger now has help and allows extra auth methods so some
preveiously unusable endpoints can be hit.

FIXES: APER-3554
This commit is contained in:
Deborah Kaplan
2024-10-28 16:34:15 -04:00
committed by GitHub
parent 949378f63f
commit 97449ef54f
3 changed files with 491 additions and 225 deletions

View File

@@ -4,7 +4,7 @@ Basically the LMS devstack settings plus a few items needed to successfully
import all the Studio code.
"""
from textwrap import dedent
import os
from openedx.core.lib.derived import derive_settings
@@ -27,18 +27,71 @@ for key, value in FEATURES.items():
FEATURES[key] = True
# Settings that will fail if we enable them, and we don't need them for docs anyway.
FEATURES['RUN_AS_ANALYTICS_SERVER_ENABLED'] = False
FEATURES['ENABLE_SOFTWARE_SECURE_FAKE'] = False
FEATURES['ENABLE_MKTG_SITE'] = False
FEATURES["RUN_AS_ANALYTICS_SERVER_ENABLED"] = False
FEATURES["ENABLE_SOFTWARE_SECURE_FAKE"] = False
FEATURES["ENABLE_MKTG_SITE"] = False
INSTALLED_APPS.extend([
'cms.djangoapps.contentstore.apps.ContentstoreConfig',
'cms.djangoapps.course_creators',
'cms.djangoapps.xblock_config.apps.XBlockConfig',
'lms.djangoapps.lti_provider',
])
INSTALLED_APPS.extend(
[
"cms.djangoapps.contentstore.apps.ContentstoreConfig",
"cms.djangoapps.course_creators",
"cms.djangoapps.xblock_config.apps.XBlockConfig",
"lms.djangoapps.lti_provider",
]
)
# Swagger generation details
openapi_security_info_basic = (
"Obtain with a `POST` request to `/user/v1/account/login_session/`. "
"If needed, copy the cookies from the response to your new call."
)
openapi_security_info_jwt = dedent(
"""
Obtain by making a `POST` request to `/oauth2/v1/access_token`.
You will need to be logged in and have a client ID and secret already created.
Your request should have the headers
```
'Content-Type': 'application/x-www-form-urlencoded'
```
Your request should have the data payload
```
'grant_type': 'client_credentials'
'client_id': [your client ID]
'client_secret': [your client secret]
'token_type': 'jwt'
```
Your JWT will be returned in the response as `access_token`. Prefix with `JWT ` in your header.
"""
)
openapi_security_info_csrf = (
"Obtain by making a `GET` request to `/csrf/api/v1/token`. The token will be in the response cookie `csrftoken`."
)
SWAGGER_SETTINGS["SECURITY_DEFINITIONS"] = {
"Basic": {
"type": "basic",
"description": openapi_security_info_basic,
},
"jwt": {
"type": "apiKey",
"name": "Authorization",
"in": "header",
"description": openapi_security_info_jwt,
},
"csrf": {
"type": "apiKey",
"name": "X-CSRFToken",
"in": "header",
"description": openapi_security_info_csrf,
},
}
COMMON_TEST_DATA_ROOT = ''
COMMON_TEST_DATA_ROOT = ""
derive_settings(__name__)

View File

@@ -13,8 +13,44 @@ produces:
securityDefinitions:
Basic:
type: basic
description: Obtain with a `POST` request to `/user/v1/account/login_session/`. If
needed, copy the cookies from the response to your new call.
jwt:
type: apiKey
name: Authorization
in: header
description: |2
Obtain by making a `POST` request to `/oauth2/v1/access_token`.
You will need to be logged in and have a client ID and secret already created.
Your request should have the headers
```
'Content-Type': 'application/x-www-form-urlencoded'
```
Your request should have the data payload
```
'grant_type': 'client_credentials'
'client_id': [your client ID]
'client_secret': [your client secret]
'token_type': 'jwt'
```
Your JWT will be returned in the response as `access_token`. Prefix with `JWT ` in your header.
csrf:
type: apiKey
name: X-CSRFToken
in: header
description: Obtain by making a `GET` request to `/csrf/api/v1/token`. The token
will be in the response cookie `csrftoken`.
security:
- Basic: []
- csrf: []
- jwt: []
paths:
/agreements/v1/integrity_signature/{course_id}:
get:
@@ -3975,6 +4011,7 @@ paths:
"profile_name": "Jon Doe"
"verification_attempt_id": (Optional)
"proctored_exam_attempt_id": (Optional)
"platform_verification_attempt_id": (Optional)
"status": (Optional)
}
parameters:
@@ -4130,6 +4167,7 @@ paths:
"profile_name": "Jon Doe"
"verification_attempt_id": (Optional)
"proctored_exam_attempt_id": (Optional)
"platform_verification_attempt_id": (Optional)
"status": (Optional)
}
parameters:
@@ -6788,6 +6826,59 @@ paths:
in: path
required: true
type: string
/mobile/{api_version}/notifications/create-token/:
post:
operationId: mobile_notifications_create-token_create
summary: |-
**Use Case**
This endpoint allows clients to register a device for push notifications.
description: |-
If the device is already registered, the existing registration will be updated.
If setting PUSH_NOTIFICATIONS_SETTINGS is not configured, the endpoint will return a 501 error.
**Example Request**
POST /api/mobile/{version}/notifications/create-token/
**POST Parameters**
The body of the POST request can include the following parameters.
* name (optional) - A name of the device.
* registration_id (required) - The device token of the device.
* device_id (optional) - ANDROID_ID / TelephonyManager.getDeviceId() (always as hex)
* active (optional) - Whether the device is active, default is True.
If False, the device will not receive notifications.
* cloud_message_type (required) - You should choose FCM or GCM. Currently, only FCM is supported.
* application_id (optional) - Opaque application identity, should be filled in for multiple
key/certificate access. Should be equal settings.FCM_APP_NAME.
**Example Response**
```json
{
"id": 1,
"name": "My Device",
"registration_id": "fj3j4",
"device_id": 1234,
"active": true,
"date_created": "2024-04-18T07:39:37.132787Z",
"cloud_message_type": "FCM",
"application_id": "my_app_id"
}
```
parameters:
- name: data
in: body
required: true
schema:
$ref: '#/definitions/GCMDevice'
responses:
'201':
description: ''
schema:
$ref: '#/definitions/GCMDevice'
tags:
- mobile
parameters:
- name: api_version
in: path
required: true
type: string
/mobile/{api_version}/users/{username}:
get:
operationId: mobile_users_read
@@ -8849,22 +8940,6 @@ paths:
tags:
- user
parameters: []
/user/v1/accounts/verifications/{attempt_id}/:
get:
operationId: user_v1_accounts_verifications_read
description: Get IDV attempt details by attempt_id. Only accessible by global
staff.
parameters: []
responses:
'200':
description: ''
tags:
- user
parameters:
- name: attempt_id
in: path
required: true
type: string
/user/v1/accounts/{username}:
get:
operationId: user_v1_accounts_read
@@ -9423,22 +9498,57 @@ paths:
- user
post:
operationId: user_account_login_session_create
summary: Log in a user.
description: |-
See `login_user` for details.
Example Usage:
POST /api/user/v1/login_session
with POST params `email`, `password`.
200 {'success': true}
parameters: []
summary: POST /user/{api_version}/account/login_session/
description: Returns 200 on success, and a detailed error message otherwise.
parameters:
- name: data
in: body
required: true
schema:
type: object
properties:
email:
type: string
password:
type: string
responses:
'201':
'200':
description: ''
schema:
type: object
properties:
success:
type: boolean
value:
type: string
error_code:
type: string
'400':
description: ''
schema:
type: object
properties:
success:
type: boolean
value:
type: string
error_code:
type: string
'403':
description: ''
schema:
type: object
properties:
success:
type: boolean
value:
type: string
error_code:
type: string
tags:
- user
security:
- csrf: []
parameters:
- name: api_version
in: path
@@ -10047,6 +10157,7 @@ definitions:
required:
- celebrations
- course_access
- studio_access
- course_id
- is_enrolled
- is_self_paced
@@ -10084,6 +10195,9 @@ definitions:
additionalProperties:
type: string
x-nullable: true
studio_access:
title: Studio access
type: boolean
course_id:
title: Course id
type: string
@@ -11237,10 +11351,24 @@ definitions:
title: Verification attempt id
type: integer
x-nullable: true
verification_attempt_status:
title: Verification attempt status
type: string
minLength: 1
x-nullable: true
proctored_exam_attempt_id:
title: Proctored exam attempt id
type: integer
x-nullable: true
platform_verification_attempt_id:
title: Platform verification attempt id
type: integer
x-nullable: true
platform_verification_attempt_status:
title: Platform verification attempt status
type: string
minLength: 1
x-nullable: true
status:
title: Status
type: string
@@ -11277,10 +11405,24 @@ definitions:
title: Verification attempt id
type: integer
x-nullable: true
verification_attempt_status:
title: Verification attempt status
type: string
minLength: 1
x-nullable: true
proctored_exam_attempt_id:
title: Proctored exam attempt id
type: integer
x-nullable: true
platform_verification_attempt_id:
title: Platform verification attempt id
type: integer
x-nullable: true
platform_verification_attempt_status:
title: Platform verification attempt status
type: string
minLength: 1
x-nullable: true
status:
title: Status
type: string
@@ -11710,6 +11852,52 @@ definitions:
title: Enddatetime
type: string
format: date-time
GCMDevice:
required:
- registration_id
type: object
properties:
id:
title: ID
type: integer
name:
title: Name
type: string
maxLength: 255
x-nullable: true
registration_id:
title: Registration ID
type: string
minLength: 1
device_id:
title: Device id
description: 'ANDROID_ID / TelephonyManager.getDeviceId() (e.g: 0x01)'
type: integer
x-nullable: true
active:
title: Is active
description: Inactive devices will not be sent notifications
type: boolean
date_created:
title: Creation date
type: string
format: date-time
readOnly: true
x-nullable: true
cloud_message_type:
title: Cloud Message Type
description: You should choose FCM, GCM is deprecated
type: string
enum:
- FCM
- GCM
application_id:
title: Application ID
description: Opaque application identity, should be filled in for multiple
key/certificate access
type: string
maxLength: 64
x-nullable: true
mobile_api.User:
required:
- username

View File

@@ -23,11 +23,14 @@ from django.views.decorators.csrf import csrf_exempt, csrf_protect, ensure_csrf_
from django.views.decorators.debug import sensitive_post_parameters
from django.views.decorators.http import require_http_methods
from django_ratelimit.decorators import ratelimit
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from edx_django_utils.monitoring import set_custom_attribute
from eventtracking import tracker
from openedx_events.learning.data import UserData, UserPersonalData
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
from openedx_filters.learning.filters import StudentLoginRequested
from rest_framework import status
from rest_framework.views import APIView
from common.djangoapps import third_party_auth
@@ -49,7 +52,7 @@ from openedx.core.djangoapps.user_authn.exceptions import AuthFailedError, Vulne
from openedx.core.djangoapps.user_authn.tasks import check_pwned_password_and_send_track_event
from openedx.core.djangoapps.user_authn.toggles import (
is_require_third_party_auth_enabled,
should_redirect_to_authn_microfrontend
should_redirect_to_authn_microfrontend,
)
from openedx.core.djangoapps.user_authn.views.login_form import get_login_session_form
from openedx.core.djangoapps.user_authn.views.password_reset import send_password_reset_email_for_user
@@ -62,7 +65,7 @@ from openedx.features.enterprise_support.api import activate_learner_enterprise,
log = logging.getLogger("edx.student")
AUDIT_LOG = logging.getLogger("audit")
USER_MODEL = get_user_model()
PASSWORD_RESET_INITIATED = 'edx.user.passwordreset.initiated'
PASSWORD_RESET_INITIATED = "edx.user.passwordreset.initiated"
def _do_third_party_auth(request):
@@ -70,9 +73,9 @@ def _do_third_party_auth(request):
User is already authenticated via 3rd party, now try to find and return their associated Django user.
"""
running_pipeline = pipeline.get(request)
username = running_pipeline['kwargs'].get('username')
backend_name = running_pipeline['backend']
third_party_uid = running_pipeline['kwargs']['uid']
username = running_pipeline["kwargs"].get("username")
backend_name = running_pipeline["backend"]
third_party_uid = running_pipeline["kwargs"]["uid"]
requested_provider = provider.Registry.get_from_pipeline(running_pipeline)
platform_name = configuration_helpers.get_value("platform_name", settings.PLATFORM_NAME)
@@ -81,26 +84,25 @@ def _do_third_party_auth(request):
except USER_MODEL.DoesNotExist:
AUDIT_LOG.info(
"Login failed - user with username {username} has no social auth "
"with backend_name {backend_name}".format(
username=username, backend_name=backend_name)
"with backend_name {backend_name}".format(username=username, backend_name=backend_name)
)
message = Text(_(
"You've successfully signed in to your {provider_name} account, "
"but this account isn't linked with your {platform_name} account yet. {blank_lines}"
"Use your {platform_name} username and password to sign in to {platform_name} below, "
"and then link your {platform_name} account with {provider_name} from your dashboard. {blank_lines}"
"If you don't have an account on {platform_name} yet, "
"click {register_label_strong} at the top of the page."
)).format(
blank_lines=HTML('<br/><br/>'),
message = Text(
_(
"You've successfully signed in to your {provider_name} account, "
"but this account isn't linked with your {platform_name} account yet. {blank_lines}"
"Use your {platform_name} username and password to sign in to {platform_name} below, "
"and then link your {platform_name} account with {provider_name} from your dashboard. {blank_lines}"
"If you don't have an account on {platform_name} yet, "
"click {register_label_strong} at the top of the page."
)
).format(
blank_lines=HTML("<br/><br/>"),
platform_name=platform_name,
provider_name=requested_provider.name,
register_label_strong=HTML('<strong>{register_text}</strong>').format(
register_text=_('Register')
)
register_label_strong=HTML("<strong>{register_text}</strong>").format(register_text=_("Register")),
)
raise AuthFailedError(message, error_code='third-party-auth-with-no-linked-account') # lint-amnesty, pylint: disable=raise-missing-from
raise AuthFailedError(message, error_code="third-party-auth-with-no-linked-account") # lint-amnesty, pylint: disable=raise-missing-from
def _get_user_by_email(email):
@@ -128,14 +130,14 @@ def _get_user_by_email_or_username(request, api_version):
Finds a user object in the database based on the given request, ignores all fields except for email and username.
"""
is_api_v2 = api_version != API_V1
login_fields = ['email', 'password']
login_fields = ["email", "password"]
if is_api_v2:
login_fields = ['email_or_username', 'password']
login_fields = ["email_or_username", "password"]
if any(f not in request.POST.keys() for f in login_fields):
raise AuthFailedError(_('There was an error receiving your login information. Please email us.'))
raise AuthFailedError(_("There was an error receiving your login information. Please email us."))
email_or_username = request.POST.get('email', None) or request.POST.get('email_or_username', None)
email_or_username = request.POST.get("email", None) or request.POST.get("email_or_username", None)
user = _get_user_by_email(email_or_username)
if not user and is_api_v2:
@@ -143,7 +145,7 @@ def _get_user_by_email_or_username(request, api_version):
user = _get_user_by_username(email_or_username)
if not user:
digest = hashlib.shake_128(email_or_username.encode('utf-8')).hexdigest(16)
digest = hashlib.shake_128(email_or_username.encode("utf-8")).hexdigest(16)
AUDIT_LOG.warning(f"Login failed - Unknown user email or username {digest}")
return user
@@ -165,27 +167,30 @@ def _generate_locked_out_error_message():
"""
locked_out_period_in_sec = settings.MAX_FAILED_LOGIN_ATTEMPTS_LOCKOUT_PERIOD_SECS
error_message = Text(_('To protect your account, its been temporarily '
'locked. Try again in {locked_out_period} minutes.'
'{li_start}To be on the safe side, you can reset your '
'password {link_start}here{link_end} before you try again.')).format(
link_start=HTML('<a http="#login" class="form-toggle" data-type="password-reset">'),
link_end=HTML('</a>'),
li_start=HTML('<li>'),
li_end=HTML('</li>'),
locked_out_period=int(locked_out_period_in_sec / 60))
error_message = Text(
_(
"To protect your account, its been temporarily "
"locked. Try again in {locked_out_period} minutes."
"{li_start}To be on the safe side, you can reset your "
"password {link_start}here{link_end} before you try again."
)
).format(
link_start=HTML('<a http="#login" class="form-toggle" data-type="password-reset">'),
link_end=HTML("</a>"),
li_start=HTML("<li>"),
li_end=HTML("</li>"),
locked_out_period=int(locked_out_period_in_sec / 60),
)
raise AuthFailedError(
error_message,
error_code='account-locked-out',
context={
'locked_out_period': int(locked_out_period_in_sec / 60)
}
error_code="account-locked-out",
context={"locked_out_period": int(locked_out_period_in_sec / 60)},
)
def _enforce_password_policy_compliance(request, user): # lint-amnesty, pylint: disable=missing-function-docstring
try:
password_policy_compliance.enforce_compliance_on_login(user, request.POST.get('password'))
password_policy_compliance.enforce_compliance_on_login(user, request.POST.get("password"))
except password_policy_compliance.NonCompliantPasswordWarning as e:
# Allow login, but warn the user that they will be required to reset their password soon.
PageLevelMessages.register_warning_message(request, HTML(str(e)))
@@ -201,7 +206,7 @@ def _enforce_password_policy_compliance(request, user): # lint-amnesty, pylint:
{
"user_id": user.id,
"source": "Policy Compliance",
}
},
)
send_password_reset_email_for_user(user, request)
@@ -214,19 +219,17 @@ def _log_and_raise_inactive_user_auth_error(unauthenticated_user):
Depending on Django version we can get here a couple of ways, but this takes care of logging an auth attempt
by an inactive user, re-sending the activation email, and raising an error with the correct message.
"""
AUDIT_LOG.warning(
f"Login failed - Account not active for user.id: {unauthenticated_user.id}, resending activation"
)
AUDIT_LOG.warning(f"Login failed - Account not active for user.id: {unauthenticated_user.id}, resending activation")
profile = UserProfile.objects.get(user=unauthenticated_user)
compose_and_send_activation_email(unauthenticated_user, profile)
raise AuthFailedError(
error_code='inactive-user',
error_code="inactive-user",
context={
'platformName': configuration_helpers.get_value('PLATFORM_NAME', settings.PLATFORM_NAME),
'supportLink': configuration_helpers.get_value('SUPPORT_SITE_LINK', settings.SUPPORT_SITE_LINK)
}
"platformName": configuration_helpers.get_value("PLATFORM_NAME", settings.PLATFORM_NAME),
"supportLink": configuration_helpers.get_value("SUPPORT_SITE_LINK", settings.SUPPORT_SITE_LINK),
},
)
@@ -234,9 +237,11 @@ def _authenticate_first_party(request, unauthenticated_user, third_party_auth_re
"""
Use Django authentication on the given request, using rate limiting if configured
"""
should_be_rate_limited = getattr(request, 'limited', False)
should_be_rate_limited = getattr(request, "limited", False)
if should_be_rate_limited:
raise AuthFailedError(_('Too many failed login attempts. Try again later.')) # lint-amnesty, pylint: disable=raise-missing-from
raise AuthFailedError(
_("Too many failed login attempts. Try again later.")
) # lint-amnesty, pylint: disable=raise-missing-from
# If the user doesn't exist, we want to set the username to an invalid username so that authentication is guaranteed
# to fail and we can take advantage of the ratelimited backend
@@ -248,12 +253,8 @@ def _authenticate_first_party(request, unauthenticated_user, third_party_auth_re
if not third_party_auth_requested:
_check_user_auth_flow(request.site, unauthenticated_user)
password = normalize_password(request.POST['password'])
return authenticate(
username=username,
password=password,
request=request
)
password = normalize_password(request.POST["password"])
return authenticate(username=username, password=password, request=request)
def _handle_failed_authentication(user, authenticated_user):
@@ -279,34 +280,37 @@ def _handle_failed_authentication(user, authenticated_user):
if not LoginFailures.is_user_locked_out(user):
max_failures_allowed = settings.MAX_FAILED_LOGIN_ATTEMPTS_ALLOWED
remaining_attempts = max_failures_allowed - failure_count
error_message = Text(_('Email or password is incorrect.'
'{li_start}You have {remaining_attempts} more sign-in '
'attempts before your account is temporarily locked.{li_end}'
'{li_start}If you\'ve forgotten your password, click '
'{link_start}here{link_end} to reset.{li_end}')).format(
link_start=HTML(
'<a http="#login" class="form-toggle" data-type="password-reset">'
),
link_end=HTML('</a>'),
li_start=HTML('<li>'),
li_end=HTML('</li>'),
remaining_attempts=remaining_attempts)
error_message = Text(
_(
"Email or password is incorrect."
"{li_start}You have {remaining_attempts} more sign-in "
"attempts before your account is temporarily locked.{li_end}"
"{li_start}If you've forgotten your password, click "
"{link_start}here{link_end} to reset.{li_end}"
)
).format(
link_start=HTML('<a http="#login" class="form-toggle" data-type="password-reset">'),
link_end=HTML("</a>"),
li_start=HTML("<li>"),
li_end=HTML("</li>"),
remaining_attempts=remaining_attempts,
)
raise AuthFailedError(
error_message,
error_code='failed-login-attempt',
error_code="failed-login-attempt",
context={
'remaining_attempts': remaining_attempts,
'allowed_failure_attempts': max_failures_allowed,
'failure_count': failure_count,
}
"remaining_attempts": remaining_attempts,
"allowed_failure_attempts": max_failures_allowed,
"failure_count": failure_count,
},
)
_generate_locked_out_error_message()
raise AuthFailedError(
_('Email or password is incorrect.'),
error_code='incorrect-email-or-password',
context={'failure_count': failure_count},
_("Email or password is incorrect."),
error_code="incorrect-email-or-password",
context={"failure_count": failure_count},
)
@@ -352,25 +356,18 @@ def _track_user_login(user, request):
# .. pii_retirement: third_party
segment.identify(
user.id,
{
'email': user.email,
'username': user.username
},
{"email": user.email, "username": user.username},
{
# Disable MailChimp because we don't want to update the user's email
# and username in MailChimp on every page load. We only need to capture
# this data on registration/activation.
'MailChimp': False
}
"MailChimp": False
},
)
segment.track(
user.id,
"edx.bi.user.account.authenticated",
{
'category': "conversion",
'label': request.POST.get('course_id'),
'provider': None
},
{"category": "conversion", "label": request.POST.get("course_id"), "provider": None},
)
@@ -380,20 +377,22 @@ def _create_message(site, root_url, allowed_domain):
to an allowed domain and not whitelisted then ask such users to login
through allowed domain SSO provider.
"""
msg = Text(_(
'As {allowed_domain} user, You must login with your {allowed_domain} '
'{link_start}{provider} account{link_end}.'
)).format(
msg = Text(
_(
"As {allowed_domain} user, You must login with your {allowed_domain} "
"{link_start}{provider} account{link_end}."
)
).format(
allowed_domain=allowed_domain,
link_start=HTML("<a href='{root_url}{tpa_provider_link}'>").format(
root_url=root_url if root_url else '',
tpa_provider_link='{dashboard_url}?tpa_hint={tpa_hint}'.format(
dashboard_url=reverse('dashboard'),
tpa_hint=site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_HINT'),
)
root_url=root_url if root_url else "",
tpa_provider_link="{dashboard_url}?tpa_hint={tpa_hint}".format(
dashboard_url=reverse("dashboard"),
tpa_hint=site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_HINT"),
),
),
provider=site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_PROVIDER'),
link_end=HTML("</a>")
provider=site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_PROVIDER"),
link_end=HTML("</a>"),
)
return msg
@@ -404,13 +403,13 @@ def _check_user_auth_flow(site, user):
then ask user to login through allowed domain SSO provider.
"""
if user and ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY.is_enabled():
allowed_domain = site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_DOMAIN', '').lower()
email_parts = user.email.split('@')
allowed_domain = site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_DOMAIN", "").lower()
email_parts = user.email.split("@")
if len(email_parts) != 2:
# User has a nonstandard email so we record their id.
# we don't record their e-mail in case there is sensitive info accidentally
# in there.
set_custom_attribute('login_tpa_domain_shortcircuit_user_id', user.id)
set_custom_attribute("login_tpa_domain_shortcircuit_user_id", user.id)
log.warning("User %s has nonstandard e-mail. Shortcircuiting THIRD_PART_AUTH_ONLY_DOMAIN check.", user.id)
return
user_domain = email_parts[1].strip().lower()
@@ -422,19 +421,19 @@ def _check_user_auth_flow(site, user):
raise AuthFailedError(msg)
raise AuthFailedError(
error_code='allowed-domain-login-error',
error_code="allowed-domain-login-error",
context={
'allowed_domain': allowed_domain,
'provider': site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_PROVIDER'),
'tpa_hint': site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_HINT'),
}
"allowed_domain": allowed_domain,
"provider": site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_PROVIDER"),
"tpa_hint": site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_HINT"),
},
)
@login_required
@require_http_methods(['GET'])
@require_http_methods(["GET"])
def finish_auth(request):
""" Following logistration (1st or 3rd party), handle any special query string params.
"""Following logistration (1st or 3rd party), handle any special query string params.
See FinishAuthView.js for details on the query string params.
@@ -459,10 +458,13 @@ def finish_auth(request):
GET /account/finish_auth/?course_id=course-v1:blah&enrollment_action=enroll
"""
return render_to_response('student_account/finish_auth.html', {
'disable_courseware_js': True,
'disable_footer': True,
})
return render_to_response(
"student_account/finish_auth.html",
{
"disable_courseware_js": True,
"disable_footer": True,
},
)
def enterprise_selection_page(request, user, next_url):
@@ -478,14 +480,14 @@ def enterprise_selection_page(request, user, next_url):
response = get_enterprise_learner_data_from_api(user)
if response and len(response) > 1:
redirect_url = reverse('enterprise_select_active') + '/?success_url=' + urllib.parse.quote(next_url)
redirect_url = reverse("enterprise_select_active") + "/?success_url=" + urllib.parse.quote(next_url)
# Check to see if next url has an enterprise in it. In this case if user is associated with
# that enterprise, activate that enterprise and bypass the selection page.
if re.match(ENTERPRISE_ENROLLMENT_URL_REGEX, urllib.parse.unquote(next_url)):
enterprise_in_url = re.search(UUID4_REGEX, next_url).group(0)
for enterprise in response:
if enterprise_in_url == str(enterprise['enterprise_customer']['uuid']):
if enterprise_in_url == str(enterprise["enterprise_customer"]["uuid"]):
is_activated_successfully = activate_learner_enterprise(request, user, enterprise_in_url)
if is_activated_successfully:
redirect_url = next_url
@@ -495,20 +497,20 @@ def enterprise_selection_page(request, user, next_url):
@ensure_csrf_cookie
@require_http_methods(['POST'])
@require_http_methods(["POST"])
@ratelimit(
key='openedx.core.djangoapps.util.ratelimit.request_post_email_or_username',
key="openedx.core.djangoapps.util.ratelimit.request_post_email_or_username",
rate=settings.LOGISTRATION_PER_EMAIL_RATELIMIT_RATE,
method='POST',
method="POST",
block=False,
) # lint-amnesty, pylint: disable=too-many-statements
@ratelimit(
key='openedx.core.djangoapps.util.ratelimit.real_ip',
key="openedx.core.djangoapps.util.ratelimit.real_ip",
rate=settings.LOGISTRATION_RATELIMIT_RATE,
method='POST',
method="POST",
block=False,
) # lint-amnesty, pylint: disable=too-many-statements
def login_user(request, api_version='v1'): # pylint: disable=too-many-statements
def login_user(request, api_version="v1"): # pylint: disable=too-many-statements
"""
AJAX request to log in the user.
@@ -542,10 +544,10 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement
_parse_analytics_param_for_course_id(request)
third_party_auth_requested = third_party_auth.is_enabled() and pipeline.running(request)
first_party_auth_requested = any(bool(request.POST.get(p)) for p in ['email', 'email_or_username', 'password'])
first_party_auth_requested = any(bool(request.POST.get(p)) for p in ["email", "email_or_username", "password"])
is_user_third_party_authenticated = False
set_custom_attribute('login_user_course_id', request.POST.get('course_id'))
set_custom_attribute("login_user_course_id", request.POST.get("course_id"))
if is_require_third_party_auth_enabled() and not third_party_auth_requested:
return HttpResponseForbidden(
@@ -564,12 +566,12 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement
try:
user = _do_third_party_auth(request)
is_user_third_party_authenticated = True
set_custom_attribute('login_user_tpa_success', True)
set_custom_attribute("login_user_tpa_success", True)
except AuthFailedError as e:
set_custom_attribute('login_user_tpa_success', False)
set_custom_attribute('login_user_tpa_failure_msg', e.value)
set_custom_attribute("login_user_tpa_success", False)
set_custom_attribute("login_user_tpa_failure_msg", e.value)
if e.error_code:
set_custom_attribute('login_error_code', e.error_code)
set_custom_attribute("login_error_code", e.error_code)
# user successfully authenticated with a third party provider, but has no linked Open edX account
response_content = e.get_response()
@@ -585,7 +587,10 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement
possibly_authenticated_user = StudentLoginRequested.run_filter(user=possibly_authenticated_user)
except StudentLoginRequested.PreventLogin as exc:
raise AuthFailedError(
str(exc), redirect_url=exc.redirect_to, error_code=exc.error_code, context=exc.context,
str(exc),
redirect_url=exc.redirect_to,
error_code=exc.error_code,
context=exc.context,
) from exc
if not is_user_third_party_authenticated:
@@ -599,82 +604,82 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement
):
_handle_failed_authentication(user, possibly_authenticated_user)
pwned_properties = check_pwned_password_and_send_track_event(
user_id=user.id,
password=request.POST.get('password'),
internal_user=user.is_staff,
request_page='login'
) if not is_user_third_party_authenticated else {}
# Set default for third party login
password_frequency = pwned_properties.get('frequency', -1)
if (
settings.ENABLE_AUTHN_LOGIN_BLOCK_HIBP_POLICY and
password_frequency >= settings.HIBP_LOGIN_BLOCK_PASSWORD_FREQUENCY_THRESHOLD
):
raise VulnerablePasswordError(
accounts.AUTHN_LOGIN_BLOCK_HIBP_POLICY_MSG,
'require-password-change'
pwned_properties = (
check_pwned_password_and_send_track_event(
user_id=user.id,
password=request.POST.get("password"),
internal_user=user.is_staff,
request_page="login",
)
if not is_user_third_party_authenticated
else {}
)
# Set default for third party login
password_frequency = pwned_properties.get("frequency", -1)
if (
settings.ENABLE_AUTHN_LOGIN_BLOCK_HIBP_POLICY
and password_frequency >= settings.HIBP_LOGIN_BLOCK_PASSWORD_FREQUENCY_THRESHOLD
):
raise VulnerablePasswordError(accounts.AUTHN_LOGIN_BLOCK_HIBP_POLICY_MSG, "require-password-change")
_handle_successful_authentication_and_login(possibly_authenticated_user, request)
# The AJAX method calling should know the default destination upon success
redirect_url, finish_auth_url = None, ''
redirect_url, finish_auth_url = None, ""
if third_party_auth_requested:
running_pipeline = pipeline.get(request)
finish_auth_url = pipeline.get_complete_url(backend_name=running_pipeline['backend'])
finish_auth_url = pipeline.get_complete_url(backend_name=running_pipeline["backend"])
if is_user_third_party_authenticated:
redirect_url = finish_auth_url
elif should_redirect_to_authn_microfrontend():
next_url, root_url = get_next_url_for_login_page(request, include_host=True)
redirect_url = get_redirect_url_with_host(
root_url,
enterprise_selection_page(request, possibly_authenticated_user, finish_auth_url or next_url)
root_url, enterprise_selection_page(request, possibly_authenticated_user, finish_auth_url or next_url)
)
if (
settings.ENABLE_AUTHN_LOGIN_NUDGE_HIBP_POLICY and
0 <= password_frequency <= settings.HIBP_LOGIN_NUDGE_PASSWORD_FREQUENCY_THRESHOLD
settings.ENABLE_AUTHN_LOGIN_NUDGE_HIBP_POLICY
and 0 <= password_frequency <= settings.HIBP_LOGIN_NUDGE_PASSWORD_FREQUENCY_THRESHOLD
):
raise VulnerablePasswordError(
accounts.AUTHN_LOGIN_NUDGE_HIBP_POLICY_MSG,
'nudge-password-change',
redirect_url
accounts.AUTHN_LOGIN_NUDGE_HIBP_POLICY_MSG, "nudge-password-change", redirect_url
)
response = JsonResponse({
'success': True,
'redirect_url': redirect_url,
})
response = JsonResponse(
{
"success": True,
"redirect_url": redirect_url,
}
)
# Ensure that the external marketing site can
# detect that the user is logged in.
response = set_logged_in_cookies(request, response, possibly_authenticated_user)
set_custom_attribute('login_user_auth_failed_error', False)
set_custom_attribute('login_user_response_status', response.status_code)
set_custom_attribute('login_user_redirect_url', redirect_url)
set_custom_attribute("login_user_auth_failed_error", False)
set_custom_attribute("login_user_response_status", response.status_code)
set_custom_attribute("login_user_redirect_url", redirect_url)
mark_user_change_as_expected(user.id)
return response
except AuthFailedError as error:
response_content = error.get_response()
log.exception(response_content)
error_code = response_content.get('error_code')
error_code = response_content.get("error_code")
if error_code:
set_custom_attribute('login_error_code', error_code)
email_or_username_key = 'email' if api_version == API_V1 else 'email_or_username'
set_custom_attribute("login_error_code", error_code)
email_or_username_key = "email" if api_version == API_V1 else "email_or_username"
email_or_username = request.POST.get(email_or_username_key, None)
email_or_username = possibly_authenticated_user.email if possibly_authenticated_user else email_or_username
response_content['email'] = email_or_username
response_content["email"] = email_or_username
except VulnerablePasswordError as error:
response_content = error.get_response()
log.exception(response_content)
response = JsonResponse(response_content, status=400)
set_custom_attribute('login_user_auth_failed_error', True)
set_custom_attribute('login_user_response_status', response.status_code)
set_custom_attribute("login_user_auth_failed_error", True)
set_custom_attribute("login_user_response_status", response.status_code)
return response
@@ -683,10 +688,10 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement
# to get a CSRF token before we need to refresh adds too much
# complexity.
@csrf_exempt
@require_http_methods(['POST'])
@require_http_methods(["POST"])
def login_refresh(request): # lint-amnesty, pylint: disable=missing-function-docstring
if not request.user.is_authenticated or request.user.is_anonymous:
return JsonResponse('Unauthorized', status=401)
return JsonResponse("Unauthorized", status=401)
try:
return get_response_with_refreshed_jwt_cookies(request, request.user)
@@ -700,33 +705,57 @@ def redirect_to_lms_login(request):
This view redirect the admin/login url to the site's login page if
waffle switch is on otherwise returns the admin site's login view.
"""
return redirect('/login?next=/admin')
return redirect("/login?next=/admin")
login_user_schema = openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"email": openapi.Schema(type=openapi.TYPE_STRING),
"password": openapi.Schema(type=openapi.TYPE_STRING),
},
)
login_user_return_schema = openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"success": openapi.Schema(type=openapi.TYPE_BOOLEAN),
"value": openapi.Schema(type=openapi.TYPE_STRING),
"error_code": openapi.Schema(type=openapi.TYPE_STRING),
},
)
class LoginSessionView(APIView):
"""HTTP end-points for logging in users. """
"""HTTP end-points for logging in users."""
# This end-point is available to anonymous users,
# so do not require authentication.
authentication_classes = []
login_user_responses = {
status.HTTP_200_OK: login_user_return_schema,
status.HTTP_400_BAD_REQUEST: login_user_return_schema,
status.HTTP_403_FORBIDDEN: login_user_return_schema,
}
@method_decorator(ensure_csrf_cookie)
def get(self, request, *args, **kwargs):
return HttpResponse(get_login_session_form(request).to_json(), content_type="application/json") # lint-amnesty, pylint: disable=http-response-with-content-type-json
@swagger_auto_schema(
request_body=login_user_schema,
responses=login_user_responses,
security=[
{"csrf": []},
],
)
@method_decorator(csrf_protect)
def post(self, request, api_version):
"""Log in a user.
See `login_user` for details.
Example Usage:
POST /api/user/v1/login_session
with POST params `email`, `password`.
200 {'success': true}
"""
POST /user/{api_version}/account/login_session/
Returns 200 on success, and a detailed error message otherwise.
"""
return login_user(request, api_version)
@@ -736,19 +765,19 @@ class LoginSessionView(APIView):
def _parse_analytics_param_for_course_id(request):
""" If analytics request param is found, parse and add course id as a new request param. """
"""If analytics request param is found, parse and add course id as a new request param."""
# Make a copy of the current POST request to modify.
modified_request = request.POST.copy()
if isinstance(request, HttpRequest):
# Works for an HttpRequest but not a rest_framework.request.Request.
# Note: This case seems to be used for tests only.
request.POST = modified_request
set_custom_attribute('login_user_request_type', 'django')
set_custom_attribute("login_user_request_type", "django")
else:
# The request must be a rest_framework.request.Request.
# Note: Only DRF seems to be used in Production.
request._data = modified_request # pylint: disable=protected-access
set_custom_attribute('login_user_request_type', 'drf')
set_custom_attribute("login_user_request_type", "drf")
# Include the course ID if it's specified in the analytics info
# so it can be included in analytics events.
@@ -758,9 +787,5 @@ def _parse_analytics_param_for_course_id(request):
if "enroll_course_id" in analytics:
modified_request["course_id"] = analytics.get("enroll_course_id")
except (ValueError, TypeError):
set_custom_attribute('shim_analytics_course_id', 'parse-error')
log.error(
"Could not parse analytics object sent to user API: {analytics}".format(
analytics=analytics
)
)
set_custom_attribute("shim_analytics_course_id", "parse-error")
log.error("Could not parse analytics object sent to user API: {analytics}".format(analytics=analytics))