feat: choose signing cert explicitly in saml metadata ENT-677 (#29704)

* feat: choose signing cert explicitly in saml metadata

when parsing, we want to explicitly choose the use='signing' cert instead of accidentally choosing the encryption one

ENT-677

* test: test for parse_metadata_xml

* feat: also address case when signing key is absent

in this case we consider the 'use'-less keydescriptior as if it's for both signing and encryption

* test: test case for missing use=signing attribute

* test: fix test failure
This commit is contained in:
Binod Pant
2022-01-05 16:18:01 -05:00
committed by GitHub
parent bbfdb3376e
commit 33437e7fef
5 changed files with 133 additions and 63 deletions

View File

@@ -51,6 +51,7 @@ class TestSAMLCommand(CacheIsolationTestCase):
"""
Test django management command for fetching saml metadata.
"""
def setUp(self):
"""
Setup operations for saml configurations. these operations contain

View File

@@ -3,32 +3,23 @@ Code to manage fetching and storing the metadata of IdPs.
"""
import datetime
import logging
import dateutil.parser
import pytz
import requests
from celery import shared_task
from django.utils.timezone import now
from edx_django_utils.monitoring import set_code_owner_attribute
from lxml import etree
from onelogin.saml2.utils import OneLogin_Saml2_Utils
from requests import exceptions
from openedx.core.djangolib.markup import Text
from common.djangoapps.third_party_auth.models import SAMLConfiguration, SAMLProviderConfig, SAMLProviderData
from common.djangoapps.third_party_auth.utils import MetadataParseError, parse_metadata_xml
log = logging.getLogger(__name__)
SAML_XML_NS = 'urn:oasis:names:tc:SAML:2.0:metadata' # The SAML Metadata XML namespace
class MetadataParseError(Exception):
""" An error occurred while parsing the SAML metadata from an IdP """
pass # lint-amnesty, pylint: disable=unnecessary-pass
@shared_task
@set_code_owner_attribute
def fetch_saml_metadata():
@@ -93,7 +84,7 @@ def fetch_saml_metadata():
for entity_id in entity_ids:
log.info("Processing IdP with entityID %s", entity_id)
public_key, sso_url, expires_at = _parse_metadata_xml(xml, entity_id)
public_key, sso_url, expires_at = parse_metadata_xml(xml, entity_id)
changed = _update_data(entity_id, public_key, sso_url, expires_at)
if changed:
log.info("→ Created new record for SAMLProviderData")
@@ -135,56 +126,6 @@ def fetch_saml_metadata():
return num_total, num_skipped, num_attempted, num_updated, len(failure_messages), failure_messages
def _parse_metadata_xml(xml, entity_id):
"""
Given an XML document containing SAML 2.0 metadata, parse it and return a tuple of
(public_key, sso_url, expires_at) for the specified entityID.
Raises MetadataParseError if anything is wrong.
"""
if xml.tag == etree.QName(SAML_XML_NS, 'EntityDescriptor'):
entity_desc = xml
else:
if xml.tag != etree.QName(SAML_XML_NS, 'EntitiesDescriptor'):
raise MetadataParseError(Text("Expected root element to be <EntitiesDescriptor>, not {}").format(xml.tag))
entity_desc = xml.find(
".//{}[@entityID='{}']".format(etree.QName(SAML_XML_NS, 'EntityDescriptor'), entity_id)
)
if entity_desc is None:
raise MetadataParseError(f"Can't find EntityDescriptor for entityID {entity_id}")
expires_at = None
if "validUntil" in xml.attrib:
expires_at = dateutil.parser.parse(xml.attrib["validUntil"])
if "cacheDuration" in xml.attrib:
cache_expires = OneLogin_Saml2_Utils.parse_duration(xml.attrib["cacheDuration"])
cache_expires = datetime.datetime.fromtimestamp(cache_expires, tz=pytz.utc)
if expires_at is None or cache_expires < expires_at:
expires_at = cache_expires
sso_desc = entity_desc.find(etree.QName(SAML_XML_NS, "IDPSSODescriptor"))
if sso_desc is None:
raise MetadataParseError("IDPSSODescriptor missing")
if 'urn:oasis:names:tc:SAML:2.0:protocol' not in sso_desc.get("protocolSupportEnumeration"):
raise MetadataParseError("This IdP does not support SAML 2.0")
# Now we just need to get the public_key and sso_url
public_key = sso_desc.findtext("./{}//{}".format(
etree.QName(SAML_XML_NS, "KeyDescriptor"), "{http://www.w3.org/2000/09/xmldsig#}X509Certificate"
))
if not public_key:
raise MetadataParseError("Public Key missing. Expected an <X509Certificate>")
public_key = public_key.replace(" ", "")
binding_elements = sso_desc.iterfind("./{}".format(etree.QName(SAML_XML_NS, "SingleSignOnService")))
sso_bindings = {element.get('Binding'): element.get('Location') for element in binding_elements}
try:
# The only binding supported by python-saml and python-social-auth is HTTP-Redirect:
sso_url = sso_bindings['urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect']
except KeyError:
raise MetadataParseError("Unable to find SSO URL with HTTP-Redirect binding.") # lint-amnesty, pylint: disable=raise-missing-from
return public_key, sso_url, expires_at
def _update_data(entity_id, public_key, sso_url, expires_at):
"""
Update/Create the SAMLProviderData for the given entity ID.

View File

@@ -9,6 +9,7 @@ from unittest.mock import MagicMock
import ddt
from django.conf import settings
from lxml import etree
from common.djangoapps.student.tests.factories import UserFactory
from common.djangoapps.third_party_auth.tests.testutil import TestCase
@@ -17,6 +18,7 @@ from common.djangoapps.third_party_auth.utils import (
get_user_from_email,
is_enterprise_customer_user,
is_oauth_provider,
parse_metadata_xml,
user_exists,
convert_saml_slug_provider_id,
)
@@ -32,6 +34,7 @@ class TestUtils(TestCase):
"""
Test the utility functions.
"""
def test_user_exists(self):
"""
Verify that user_exists function returns correct response.
@@ -128,3 +131,54 @@ class TestUtils(TestCase):
else:
self.assertIsNone(association_response)
self.assertFalse(user_is_active_resonse)
def test_parse_metadata_uses_signing_cert(self):
entity_id = 'http://testid'
parser = etree.XMLParser(remove_comments=True)
xml_text = '''<?xml version="1.0"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ds="http://www.w3.org/2000/09/xmldsig#" entityID="http://testid">
<md:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<md:KeyDescriptor use="signing">
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data>
<ds:X509Certificate>abc+hkIuUktxkg=</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</md:KeyDescriptor>
<md:KeyDescriptor use="encryption">
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data>
<ds:X509Certificate>blachabc+hkIuUktxkg=blaal;skdjf;ksd</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</md:KeyDescriptor>
<md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="https://idp/SSOService.php"/>
</md:IDPSSODescriptor>
</md:EntityDescriptor>
'''
xml = etree.fromstring(xml_text, parser)
public_key, sso_url, _ = parse_metadata_xml(xml, entity_id)
assert public_key == 'abc+hkIuUktxkg='
assert sso_url == 'https://idp/SSOService.php'
def test_parse_metadata_with_use_attribute_missing(self):
entity_id = 'http://testid'
parser = etree.XMLParser(remove_comments=True)
xml_text = '''<?xml version="1.0"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ds="http://www.w3.org/2000/09/xmldsig#" entityID="http://testid">
<md:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<md:KeyDescriptor>
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data>
<ds:X509Certificate>abc+hkIuUktxkg=</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</md:KeyDescriptor>
<md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="https://idp/SSOService.php"/>
</md:IDPSSODescriptor>
</md:EntityDescriptor>
'''
xml = etree.fromstring(xml_text, parser)
public_key, sso_url, _ = parse_metadata_xml(xml, entity_id)
assert public_key == 'abc+hkIuUktxkg='
assert sso_url == 'https://idp/SSOService.php'

View File

@@ -14,7 +14,7 @@ from onelogin.saml2.errors import OneLogin_Saml2_Error
from common.djangoapps.third_party_auth import pipeline
# Define some XML namespaces:
from common.djangoapps.third_party_auth.tasks import SAML_XML_NS
from common.djangoapps.third_party_auth.utils import SAML_XML_NS
from .testutil import AUTH_FEATURE_ENABLED, AUTH_FEATURES_KEY, SAMLTestCase
@@ -164,6 +164,7 @@ class IdPRedirectViewTest(SAMLTestCase):
"""
Test IdPRedirectView.
"""
def setUp(self):
super().setUp()

View File

@@ -2,14 +2,87 @@
Utility functions for third_party_auth
"""
import datetime
from uuid import UUID
import dateutil.parser
import pytz
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
from enterprise.models import EnterpriseCustomerUser, EnterpriseCustomerIdentityProvider
from enterprise.models import EnterpriseCustomerIdentityProvider, EnterpriseCustomerUser
from lxml import etree
from onelogin.saml2.utils import OneLogin_Saml2_Utils
from social_core.pipeline.social_auth import associate_by_email
from common.djangoapps.third_party_auth.models import OAuth2ProviderConfig
from openedx.core.djangolib.markup import Text
from . import provider
SAML_XML_NS = 'urn:oasis:names:tc:SAML:2.0:metadata' # The SAML Metadata XML namespace
class MetadataParseError(Exception):
""" An error occurred while parsing the SAML metadata from an IdP """
pass # lint-amnesty, pylint: disable=unnecessary-pass
def parse_metadata_xml(xml, entity_id):
"""
Given an XML document containing SAML 2.0 metadata, parse it and return a tuple of
(public_key, sso_url, expires_at) for the specified entityID.
Raises MetadataParseError if anything is wrong.
"""
if xml.tag == etree.QName(SAML_XML_NS, 'EntityDescriptor'):
entity_desc = xml
else:
if xml.tag != etree.QName(SAML_XML_NS, 'EntitiesDescriptor'):
raise MetadataParseError(Text("Expected root element to be <EntitiesDescriptor>, not {}").format(xml.tag))
entity_desc = xml.find(
".//{}[@entityID='{}']".format(etree.QName(SAML_XML_NS, 'EntityDescriptor'), entity_id)
)
if entity_desc is None:
raise MetadataParseError(f"Can't find EntityDescriptor for entityID {entity_id}")
expires_at = None
if "validUntil" in xml.attrib:
expires_at = dateutil.parser.parse(xml.attrib["validUntil"])
if "cacheDuration" in xml.attrib:
cache_expires = OneLogin_Saml2_Utils.parse_duration(xml.attrib["cacheDuration"])
cache_expires = datetime.datetime.fromtimestamp(cache_expires, tz=pytz.utc)
if expires_at is None or cache_expires < expires_at:
expires_at = cache_expires
sso_desc = entity_desc.find(etree.QName(SAML_XML_NS, "IDPSSODescriptor"))
if sso_desc is None:
raise MetadataParseError("IDPSSODescriptor missing")
if 'urn:oasis:names:tc:SAML:2.0:protocol' not in sso_desc.get("protocolSupportEnumeration"):
raise MetadataParseError("This IdP does not support SAML 2.0")
# Now we just need to get the public_key and sso_url
# We want the use='signing' cert, not the 'encryption' one
public_key = sso_desc.findtext("./{}[@use='signing']//{}".format(
etree.QName(SAML_XML_NS, "KeyDescriptor"), "{http://www.w3.org/2000/09/xmldsig#}X509Certificate"
))
if not public_key:
# it's possible that there is just one keyDescription with no use attribute
# that is a shortcut for both signing and encryption combined. So we can use that as fallback.
public_key = sso_desc.findtext("./{}//{}".format(
etree.QName(SAML_XML_NS, "KeyDescriptor"), "{http://www.w3.org/2000/09/xmldsig#}X509Certificate"
))
if not public_key:
raise MetadataParseError("Public Key missing. Expected an <X509Certificate>")
public_key = public_key.replace(" ", "")
binding_elements = sso_desc.iterfind("./{}".format(etree.QName(SAML_XML_NS, "SingleSignOnService")))
sso_bindings = {element.get('Binding'): element.get('Location') for element in binding_elements}
try:
# The only binding supported by python-saml and python-social-auth is HTTP-Redirect:
sso_url = sso_bindings['urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect']
except KeyError:
raise MetadataParseError("Unable to find SSO URL with HTTP-Redirect binding.") # lint-amnesty, pylint: disable=raise-missing-from
return public_key, sso_url, expires_at
def user_exists(details):
"""