diff --git a/common/djangoapps/third_party_auth/management/commands/tests/test_saml.py b/common/djangoapps/third_party_auth/management/commands/tests/test_saml.py index a1134903a4..ea918d7b5b 100644 --- a/common/djangoapps/third_party_auth/management/commands/tests/test_saml.py +++ b/common/djangoapps/third_party_auth/management/commands/tests/test_saml.py @@ -51,6 +51,7 @@ class TestSAMLCommand(CacheIsolationTestCase): """ Test django management command for fetching saml metadata. """ + def setUp(self): """ Setup operations for saml configurations. these operations contain diff --git a/common/djangoapps/third_party_auth/tasks.py b/common/djangoapps/third_party_auth/tasks.py index 90c256e67b..a4f873f468 100644 --- a/common/djangoapps/third_party_auth/tasks.py +++ b/common/djangoapps/third_party_auth/tasks.py @@ -3,32 +3,23 @@ Code to manage fetching and storing the metadata of IdPs. """ -import datetime import logging -import dateutil.parser -import pytz import requests from celery import shared_task from django.utils.timezone import now from edx_django_utils.monitoring import set_code_owner_attribute from lxml import etree -from onelogin.saml2.utils import OneLogin_Saml2_Utils from requests import exceptions -from openedx.core.djangolib.markup import Text from common.djangoapps.third_party_auth.models import SAMLConfiguration, SAMLProviderConfig, SAMLProviderData +from common.djangoapps.third_party_auth.utils import MetadataParseError, parse_metadata_xml log = logging.getLogger(__name__) SAML_XML_NS = 'urn:oasis:names:tc:SAML:2.0:metadata' # The SAML Metadata XML namespace -class MetadataParseError(Exception): - """ An error occurred while parsing the SAML metadata from an IdP """ - pass # lint-amnesty, pylint: disable=unnecessary-pass - - @shared_task @set_code_owner_attribute def fetch_saml_metadata(): @@ -93,7 +84,7 @@ def fetch_saml_metadata(): for entity_id in entity_ids: log.info("Processing IdP with entityID %s", entity_id) - public_key, sso_url, expires_at = _parse_metadata_xml(xml, entity_id) + public_key, sso_url, expires_at = parse_metadata_xml(xml, entity_id) changed = _update_data(entity_id, public_key, sso_url, expires_at) if changed: log.info("→ Created new record for SAMLProviderData") @@ -135,56 +126,6 @@ def fetch_saml_metadata(): return num_total, num_skipped, num_attempted, num_updated, len(failure_messages), failure_messages -def _parse_metadata_xml(xml, entity_id): - """ - Given an XML document containing SAML 2.0 metadata, parse it and return a tuple of - (public_key, sso_url, expires_at) for the specified entityID. - - Raises MetadataParseError if anything is wrong. - """ - if xml.tag == etree.QName(SAML_XML_NS, 'EntityDescriptor'): - entity_desc = xml - else: - if xml.tag != etree.QName(SAML_XML_NS, 'EntitiesDescriptor'): - raise MetadataParseError(Text("Expected root element to be , not {}").format(xml.tag)) - entity_desc = xml.find( - ".//{}[@entityID='{}']".format(etree.QName(SAML_XML_NS, 'EntityDescriptor'), entity_id) - ) - if entity_desc is None: - raise MetadataParseError(f"Can't find EntityDescriptor for entityID {entity_id}") - - expires_at = None - if "validUntil" in xml.attrib: - expires_at = dateutil.parser.parse(xml.attrib["validUntil"]) - if "cacheDuration" in xml.attrib: - cache_expires = OneLogin_Saml2_Utils.parse_duration(xml.attrib["cacheDuration"]) - cache_expires = datetime.datetime.fromtimestamp(cache_expires, tz=pytz.utc) - if expires_at is None or cache_expires < expires_at: - expires_at = cache_expires - - sso_desc = entity_desc.find(etree.QName(SAML_XML_NS, "IDPSSODescriptor")) - if sso_desc is None: - raise MetadataParseError("IDPSSODescriptor missing") - if 'urn:oasis:names:tc:SAML:2.0:protocol' not in sso_desc.get("protocolSupportEnumeration"): - raise MetadataParseError("This IdP does not support SAML 2.0") - - # Now we just need to get the public_key and sso_url - public_key = sso_desc.findtext("./{}//{}".format( - etree.QName(SAML_XML_NS, "KeyDescriptor"), "{http://www.w3.org/2000/09/xmldsig#}X509Certificate" - )) - if not public_key: - raise MetadataParseError("Public Key missing. Expected an ") - public_key = public_key.replace(" ", "") - binding_elements = sso_desc.iterfind("./{}".format(etree.QName(SAML_XML_NS, "SingleSignOnService"))) - sso_bindings = {element.get('Binding'): element.get('Location') for element in binding_elements} - try: - # The only binding supported by python-saml and python-social-auth is HTTP-Redirect: - sso_url = sso_bindings['urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect'] - except KeyError: - raise MetadataParseError("Unable to find SSO URL with HTTP-Redirect binding.") # lint-amnesty, pylint: disable=raise-missing-from - return public_key, sso_url, expires_at - - def _update_data(entity_id, public_key, sso_url, expires_at): """ Update/Create the SAMLProviderData for the given entity ID. diff --git a/common/djangoapps/third_party_auth/tests/test_utils.py b/common/djangoapps/third_party_auth/tests/test_utils.py index dc13bc8a30..2d971e69d0 100644 --- a/common/djangoapps/third_party_auth/tests/test_utils.py +++ b/common/djangoapps/third_party_auth/tests/test_utils.py @@ -9,6 +9,7 @@ from unittest.mock import MagicMock import ddt from django.conf import settings +from lxml import etree from common.djangoapps.student.tests.factories import UserFactory from common.djangoapps.third_party_auth.tests.testutil import TestCase @@ -17,6 +18,7 @@ from common.djangoapps.third_party_auth.utils import ( get_user_from_email, is_enterprise_customer_user, is_oauth_provider, + parse_metadata_xml, user_exists, convert_saml_slug_provider_id, ) @@ -32,6 +34,7 @@ class TestUtils(TestCase): """ Test the utility functions. """ + def test_user_exists(self): """ Verify that user_exists function returns correct response. @@ -128,3 +131,54 @@ class TestUtils(TestCase): else: self.assertIsNone(association_response) self.assertFalse(user_is_active_resonse) + + def test_parse_metadata_uses_signing_cert(self): + entity_id = 'http://testid' + parser = etree.XMLParser(remove_comments=True) + xml_text = ''' + + + + + + abc+hkIuUktxkg= + + + + + + + blachabc+hkIuUktxkg=blaal;skdjf;ksd + + + + + + + ''' + xml = etree.fromstring(xml_text, parser) + public_key, sso_url, _ = parse_metadata_xml(xml, entity_id) + assert public_key == 'abc+hkIuUktxkg=' + assert sso_url == 'https://idp/SSOService.php' + + def test_parse_metadata_with_use_attribute_missing(self): + entity_id = 'http://testid' + parser = etree.XMLParser(remove_comments=True) + xml_text = ''' + + + + + + abc+hkIuUktxkg= + + + + + + + ''' + xml = etree.fromstring(xml_text, parser) + public_key, sso_url, _ = parse_metadata_xml(xml, entity_id) + assert public_key == 'abc+hkIuUktxkg=' + assert sso_url == 'https://idp/SSOService.php' diff --git a/common/djangoapps/third_party_auth/tests/test_views.py b/common/djangoapps/third_party_auth/tests/test_views.py index 3508a9d941..4c89b5da45 100644 --- a/common/djangoapps/third_party_auth/tests/test_views.py +++ b/common/djangoapps/third_party_auth/tests/test_views.py @@ -14,7 +14,7 @@ from onelogin.saml2.errors import OneLogin_Saml2_Error from common.djangoapps.third_party_auth import pipeline # Define some XML namespaces: -from common.djangoapps.third_party_auth.tasks import SAML_XML_NS +from common.djangoapps.third_party_auth.utils import SAML_XML_NS from .testutil import AUTH_FEATURE_ENABLED, AUTH_FEATURES_KEY, SAMLTestCase @@ -164,6 +164,7 @@ class IdPRedirectViewTest(SAMLTestCase): """ Test IdPRedirectView. """ + def setUp(self): super().setUp() diff --git a/common/djangoapps/third_party_auth/utils.py b/common/djangoapps/third_party_auth/utils.py index 46da90794d..3d411bb63a 100644 --- a/common/djangoapps/third_party_auth/utils.py +++ b/common/djangoapps/third_party_auth/utils.py @@ -2,14 +2,87 @@ Utility functions for third_party_auth """ +import datetime from uuid import UUID + +import dateutil.parser +import pytz from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user -from enterprise.models import EnterpriseCustomerUser, EnterpriseCustomerIdentityProvider +from enterprise.models import EnterpriseCustomerIdentityProvider, EnterpriseCustomerUser +from lxml import etree +from onelogin.saml2.utils import OneLogin_Saml2_Utils from social_core.pipeline.social_auth import associate_by_email from common.djangoapps.third_party_auth.models import OAuth2ProviderConfig +from openedx.core.djangolib.markup import Text + from . import provider +SAML_XML_NS = 'urn:oasis:names:tc:SAML:2.0:metadata' # The SAML Metadata XML namespace + + +class MetadataParseError(Exception): + """ An error occurred while parsing the SAML metadata from an IdP """ + pass # lint-amnesty, pylint: disable=unnecessary-pass + + +def parse_metadata_xml(xml, entity_id): + """ + Given an XML document containing SAML 2.0 metadata, parse it and return a tuple of + (public_key, sso_url, expires_at) for the specified entityID. + + Raises MetadataParseError if anything is wrong. + """ + + if xml.tag == etree.QName(SAML_XML_NS, 'EntityDescriptor'): + entity_desc = xml + else: + if xml.tag != etree.QName(SAML_XML_NS, 'EntitiesDescriptor'): + raise MetadataParseError(Text("Expected root element to be , not {}").format(xml.tag)) + entity_desc = xml.find( + ".//{}[@entityID='{}']".format(etree.QName(SAML_XML_NS, 'EntityDescriptor'), entity_id) + ) + if entity_desc is None: + raise MetadataParseError(f"Can't find EntityDescriptor for entityID {entity_id}") + + expires_at = None + if "validUntil" in xml.attrib: + expires_at = dateutil.parser.parse(xml.attrib["validUntil"]) + if "cacheDuration" in xml.attrib: + cache_expires = OneLogin_Saml2_Utils.parse_duration(xml.attrib["cacheDuration"]) + cache_expires = datetime.datetime.fromtimestamp(cache_expires, tz=pytz.utc) + if expires_at is None or cache_expires < expires_at: + expires_at = cache_expires + + sso_desc = entity_desc.find(etree.QName(SAML_XML_NS, "IDPSSODescriptor")) + if sso_desc is None: + raise MetadataParseError("IDPSSODescriptor missing") + if 'urn:oasis:names:tc:SAML:2.0:protocol' not in sso_desc.get("protocolSupportEnumeration"): + raise MetadataParseError("This IdP does not support SAML 2.0") + + # Now we just need to get the public_key and sso_url + # We want the use='signing' cert, not the 'encryption' one + public_key = sso_desc.findtext("./{}[@use='signing']//{}".format( + etree.QName(SAML_XML_NS, "KeyDescriptor"), "{http://www.w3.org/2000/09/xmldsig#}X509Certificate" + )) + if not public_key: + # it's possible that there is just one keyDescription with no use attribute + # that is a shortcut for both signing and encryption combined. So we can use that as fallback. + public_key = sso_desc.findtext("./{}//{}".format( + etree.QName(SAML_XML_NS, "KeyDescriptor"), "{http://www.w3.org/2000/09/xmldsig#}X509Certificate" + )) + if not public_key: + raise MetadataParseError("Public Key missing. Expected an ") + public_key = public_key.replace(" ", "") + binding_elements = sso_desc.iterfind("./{}".format(etree.QName(SAML_XML_NS, "SingleSignOnService"))) + sso_bindings = {element.get('Binding'): element.get('Location') for element in binding_elements} + try: + # The only binding supported by python-saml and python-social-auth is HTTP-Redirect: + sso_url = sso_bindings['urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect'] + except KeyError: + raise MetadataParseError("Unable to find SSO URL with HTTP-Redirect binding.") # lint-amnesty, pylint: disable=raise-missing-from + return public_key, sso_url, expires_at + def user_exists(details): """