Files
wgu-jesse-stewart 254dd2f689 Fix: CORS issues in third-party auth disconnect by adding JSON endpoint (#37100)
Add a json auth endpoint where previously there was only an HTML redirect version. This will make it easier to work with MFEs.

---------

Co-authored-by: Feanil Patel <feanil@axim.org>
2025-09-15 10:07:49 -04:00

271 lines
11 KiB
Python

"""
Extra views required for SSO
"""
import logging
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.core.exceptions import ValidationError, PermissionDenied
from django.db import DatabaseError
from django.http import (
Http404, HttpResponse, HttpResponseNotAllowed, HttpResponseNotFound, HttpResponseServerError, JsonResponse
)
from django.shortcuts import redirect, render
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.views.generic.base import View
from edx_django_utils.monitoring import record_exception
from social_core.utils import setting_name
from social_django.models import UserSocialAuth
from social_django.utils import load_backend, load_strategy, psa
from social_django.views import complete
from common.djangoapps import third_party_auth
from common.djangoapps.student.helpers import get_next_url_for_login_page, is_safe_login_or_logout_redirect
from common.djangoapps.student.models import UserProfile
from common.djangoapps.student.views import compose_and_send_activation_email
from common.djangoapps.third_party_auth import pipeline, provider
from .models import SAMLConfiguration, SAMLProviderConfig
URL_NAMESPACE = getattr(settings, setting_name('URL_NAMESPACE'), None) or 'social'
log = logging.getLogger(__name__)
def inactive_user_view(request):
"""
A newly or recently registered user has completed the social auth pipeline.
Their account is not yet activated, but we let them login since the third party auth
provider is trusted to vouch for them. See details in pipeline.py.
The reason this view exists is that if we don't define this as the
SOCIAL_AUTH_INACTIVE_USER_URL, inactive users will get sent to LOGIN_ERROR_URL, which we
don't want.
If the third_party_provider.skip_email_verification is set then the user is activated
and verification email is not sent
"""
# 'next' may be set to '/account/finish_auth/.../' if this user needs to be auto-enrolled
# in a course. Otherwise, just redirect them to the dashboard, which displays a message
# about activating their account.
user = request.user
profile = UserProfile.objects.get(user=user)
activated = user.is_active
# If the user is registering via 3rd party auth, track which provider they use
if third_party_auth.is_enabled() and pipeline.running(request):
running_pipeline = pipeline.get(request)
third_party_provider = provider.Registry.get_from_pipeline(running_pipeline)
if third_party_provider and third_party_provider.skip_email_verification and not activated:
user.is_active = True
user.save()
activated = True
if not activated:
compose_and_send_activation_email(user, profile)
request_params = request.GET
redirect_to = request_params.get('next')
if redirect_to and is_safe_login_or_logout_redirect(redirect_to=redirect_to, request_host=request.get_host(),
dot_client_id=request_params.get('client_id'),
require_https=request.is_secure()):
return redirect(redirect_to)
return redirect('dashboard')
def saml_metadata_view(request):
"""
Get the Service Provider metadata for this edx-platform instance.
You must send this XML to any Shibboleth Identity Provider that you wish to use.
"""
idp_slug = request.GET.get('tpa_hint', None)
saml_config = 'default'
if idp_slug:
idp = SAMLProviderConfig.current(idp_slug)
if idp.saml_configuration:
saml_config = idp.saml_configuration.slug
if not SAMLConfiguration.is_enabled(request.site, saml_config):
raise Http404
complete_url = reverse('social:complete', args=("tpa-saml", ))
if settings.APPEND_SLASH and not complete_url.endswith('/'):
complete_url = complete_url + '/' # Required for consistency
saml_backend = load_backend(load_strategy(request), "tpa-saml", redirect_uri=complete_url)
metadata, errors = saml_backend.generate_metadata_xml(idp_slug)
if not errors:
return HttpResponse(content=metadata, content_type='text/xml')
return HttpResponseServerError(content=', '.join(errors))
@csrf_exempt
@psa(f'{URL_NAMESPACE}:complete')
def lti_login_and_complete_view(request, backend, *args, **kwargs):
"""This is a combination login/complete due to LTI being a one step login"""
if request.method != 'POST':
return HttpResponseNotAllowed('POST')
request.backend.start()
return complete(request, backend, *args, **kwargs)
def post_to_custom_auth_form(request):
"""
Redirect to a custom login/register page.
Since we can't do a redirect-to-POST, this view is used to pass SSO data from
the third_party_auth pipeline to a custom login/register form (possibly on another server).
"""
pipeline_data = request.session.pop('tpa_custom_auth_entry_data', None)
if not pipeline_data:
raise Http404
# Verify the format of pipeline_data:
data = {
'post_url': pipeline_data['post_url'],
# data: The provider info and user's name, email, etc. as base64 encoded JSON
# It's base64 encoded because it's signed cryptographically and we don't want whitespace
# or ordering issues affecting the hash/signature.
'data': pipeline_data['data'],
# The cryptographic hash of user_data:
'hmac': pipeline_data['hmac'],
}
return render(request, 'third_party_auth/post_custom_auth_entry.html', data)
class IdPRedirectView(View):
"""
Redirect to an IdP's login page if the IdP exists; otherwise, return a 404.
Example usage:
GET auth/idp_redirect/saml-default
"""
def get(self, request, *args, **kwargs): # lint-amnesty, pylint: disable=unused-argument
"""
Return either a redirect to the login page of an identity provider that
corresponds to the provider_slug keyword argument or a 404 if the
provider_slug does not correspond to an identity provider.
Args:
request (HttpRequest)
Keyword Args:
provider_slug (str): a slug corresponding to a configured identity provider
Returns:
HttpResponse: 302 to a provider's login url if the provider_slug kwarg matches an identity provider
HttpResponse: 404 if the provider_slug kwarg does not match an identity provider
"""
# this gets the url to redirect to after login/registration/third_party_auth
# it also handles checking the safety of the redirect url (next query parameter)
# it checks against settings.LOGIN_REDIRECT_WHITELIST, so be sure to add the url
# to this setting
next_destination_url = get_next_url_for_login_page(request)
try:
url = pipeline.get_login_url(kwargs['provider_slug'], pipeline.AUTH_ENTRY_LOGIN, next_destination_url)
return redirect(url)
except ValueError:
return HttpResponseNotFound()
@login_required
@require_http_methods(["POST"])
def disconnect_json_view(request, backend, association_id=None):
"""
Custom disconnect view that returns JSON response instead of redirecting.
See https://github.com/python-social-auth/social-app-django/issues/774 for why this is needed.
"""
user = request.user
# Check URL parameter first, then POST parameter
if not association_id:
association_id = request.POST.get('association_id')
try:
# Load the backend strategy and backend instance
strategy = load_strategy(request)
backend_instance = load_backend(strategy, backend, redirect_uri=request.build_absolute_uri())
# Use backend.disconnect method - simplified approach without partial pipeline
response = backend_instance.disconnect(user=user, association_id=association_id)
# Always return JSON response regardless of what backend.disconnect returns
return JsonResponse({
'success': True,
'message': 'Account successfully disconnected',
'backend': backend,
'association_id': association_id
})
except UserSocialAuth.DoesNotExist:
log.warning(
'Social auth association not found during disconnect: backend=%s, association_id=%s, user_id=%s',
backend, association_id, user.id
)
return JsonResponse({
'success': False,
'error': 'Account not found or already disconnected',
'backend': backend,
'association_id': association_id
}, status=404)
except (ValueError, TypeError) as e:
log.error(
'Invalid parameter during social auth disconnect: backend=%s, association_id=%s, user_id=%s, error=%s',
backend, association_id, user.id, str(e)
)
record_exception()
return JsonResponse({
'success': False,
'error': 'Invalid request parameters',
'backend': backend,
'association_id': association_id
}, status=400)
except DatabaseError as e:
log.error(
'Database error during social auth disconnect: backend=%s, association_id=%s, user_id=%s, error=%s',
backend, association_id, user.id, str(e)
)
record_exception()
return JsonResponse({
'success': False,
'error': 'Service temporarily unavailable',
'backend': backend,
'association_id': association_id
}, status=500)
except ValidationError as e:
log.error(
'Validation error during social auth disconnect: backend=%s, association_id=%s, user_id=%s, error=%s',
backend, association_id, user.id, str(e)
)
record_exception()
return JsonResponse({
'success': False,
'error': 'Invalid request data',
'backend': backend,
'association_id': association_id
}, status=400)
except PermissionDenied as e:
log.warning(
'Permission denied during social auth disconnect: backend=%s, association_id=%s, user_id=%s, error=%s',
backend, association_id, user.id, str(e)
)
record_exception()
return JsonResponse({
'success': False,
'error': 'You do not have permission to perform this action',
'backend': backend,
'association_id': association_id
}, status=403)
except (ImportError, AttributeError, RuntimeError) as e:
log.error(
'System error during social auth disconnect: backend=%s, association_id=%s, user_id=%s, error=%s',
backend, association_id, user.id, str(e)
)
record_exception()
return JsonResponse({
'success': False,
'error': 'Service temporarily unavailable',
'backend': backend,
'association_id': association_id
}, status=500)