Merge pull request #27052 from edx/third-party-auth

Pyupgrade in common/djangoapps/third-party-modes/
This commit is contained in:
Awais Qureshi
2021-03-18 17:13:19 +05:00
committed by GitHub
12 changed files with 176 additions and 179 deletions

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Admin site configuration for third party authentication
"""
@@ -56,7 +55,7 @@ class SAMLProviderConfigAdmin(KeyedConfigurationModelAdmin):
"""
Filter the queryset to exclude the archived records.
"""
queryset = super(SAMLProviderConfigAdmin, self).get_queryset(request).exclude(archived=True) # lint-amnesty, pylint: disable=super-with-arguments
queryset = super().get_queryset(request).exclude(archived=True)
return queryset
def archive_provider_configuration(self, request, queryset):
@@ -81,7 +80,7 @@ class SAMLProviderConfigAdmin(KeyedConfigurationModelAdmin):
"""
Get the actions.
"""
actions = super(SAMLProviderConfigAdmin, self).get_actions(request) # lint-amnesty, pylint: disable=super-with-arguments
actions = super().get_actions(request)
action_delete = {
'archive_provider_configuration': (
SAMLProviderConfigAdmin.archive_provider_configuration,
@@ -99,11 +98,11 @@ class SAMLProviderConfigAdmin(KeyedConfigurationModelAdmin):
if not instance.is_active:
return instance.name
update_url = reverse('admin:{}_{}_add'.format(self.model._meta.app_label, self.model._meta.model_name))
update_url += '?source={}'.format(instance.pk)
return format_html(u'<a href="{}">{}</a>', update_url, instance.name)
update_url = reverse(f'admin:{self.model._meta.app_label}_{self.model._meta.model_name}_add')
update_url += f'?source={instance.pk}'
return format_html('<a href="{}">{}</a>', update_url, instance.name)
name_with_update_link.short_description = u'Name'
name_with_update_link.short_description = 'Name'
def has_data(self, inst):
""" Do we have cached metadata for this SAML provider? """
@@ -111,7 +110,7 @@ class SAMLProviderConfigAdmin(KeyedConfigurationModelAdmin):
return None # N/A
data = SAMLProviderData.current(inst.entity_id)
return bool(data and data.is_valid())
has_data.short_description = u'Metadata Ready'
has_data.short_description = 'Metadata Ready'
has_data.boolean = True
def mode(self, inst):
@@ -128,7 +127,7 @@ class SAMLProviderConfigAdmin(KeyedConfigurationModelAdmin):
Note: This only works if the celery worker and the app worker are using the
same 'configuration' cache.
"""
super(SAMLProviderConfigAdmin, self).save_model(request, obj, form, change) # lint-amnesty, pylint: disable=super-with-arguments
super().save_model(request, obj, form, change)
fetch_saml_metadata.apply_async((), countdown=2)
admin.site.register(SAMLProviderConfig, SAMLProviderConfigAdmin)
@@ -148,10 +147,10 @@ class SAMLConfigurationAdmin(KeyedConfigurationModelAdmin):
public_key = inst.get_setting('SP_PUBLIC_CERT')
private_key = inst.get_setting('SP_PRIVATE_KEY')
if not public_key or not private_key:
return format_html(u'<em>Key pair incomplete/missing</em>')
return format_html('<em>Key pair incomplete/missing</em>')
pub1, pub2 = public_key[0:10], public_key[-10:]
priv1, priv2 = private_key[0:10], private_key[-10:]
return format_html(u'Public: {}{}<br>Private: {}{}', pub1, pub2, priv1, priv2)
return format_html('Public: {}{}<br>Private: {}{}', pub1, pub2, priv1, priv2)
admin.site.register(SAMLConfiguration, SAMLConfigurationAdmin)

View File

@@ -108,7 +108,7 @@ class AppleIdAuth(BaseOAuth2):
Apple requires to set `response_mode` to `form_post` if `scope`
parameter is passed.
"""
params = super(AppleIdAuth, self).auth_params(*args, **kwargs)
params = super().auth_params(*args, **kwargs)
if self.RESPONSE_MODE:
params['response_mode'] = self.RESPONSE_MODE
elif self.get_scope():
@@ -213,7 +213,7 @@ class AppleIdAuth(BaseOAuth2):
raise AuthFailed(self, 'Missing id_token parameter')
decoded_data = self.decode_id_token(jwt_string)
return super(AppleIdAuth, self).do_auth(
return super().do_auth(
access_token,
response=decoded_data,
*args,

View File

@@ -36,7 +36,7 @@ class IdentityServer3(BaseOAuth2):
"""
url = self.get_config().get_setting('user_info_url')
# The access token returned from the service's token route.
header = {"Authorization": u"Bearer %s" % access_token}
header = {"Authorization": "Bearer %s" % access_token}
return self.get_json(url, headers=header)
def get_setting_if_exist(self, name, default):

View File

@@ -161,7 +161,7 @@ class LTIAuthBackend(BaseAuth):
parameters_string = normalize_parameters(parameters)
base_string = construct_base_string(request.http_method, base_uri, parameters_string)
computed_signature = sign_hmac_sha1(base_string, six.text_type(lti_consumer_secret), '')
computed_signature = sign_hmac_sha1(base_string, str(lti_consumer_secret), '')
submitted_signature = request.oauth_signature
data = {parameter_value_pair[0]: parameter_value_pair[1] for parameter_value_pair in parameters}
@@ -189,7 +189,7 @@ class LTIAuthBackend(BaseAuth):
if valid:
return data
except AttributeError as error:
log.error(u"'{}' not found.".format(text_type(error)))
log.error("'{}' not found.".format(str(error)))
return None
@classmethod

View File

@@ -20,7 +20,7 @@ class ExceptionMiddleware(SocialAuthExceptionMiddleware, MiddlewareMixin):
def get_redirect_uri(self, request, exception):
# Fall back to django settings's SOCIAL_AUTH_LOGIN_ERROR_URL.
redirect_uri = super(ExceptionMiddleware, self).get_redirect_uri(request, exception) # lint-amnesty, pylint: disable=super-with-arguments
redirect_uri = super().get_redirect_uri(request, exception)
# Safe because it's already been validated by
# pipeline.parse_query_params. If that pipeline step ever moves later
@@ -48,4 +48,4 @@ class ExceptionMiddleware(SocialAuthExceptionMiddleware, MiddlewareMixin):
redirect_url = get_next_url_for_login_page(request)
return redirect('/login?next=' + redirect_url)
return super(ExceptionMiddleware, self).process_exception(request, exception) # lint-amnesty, pylint: disable=super-with-arguments
return super().process_exception(request, exception)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Models used to implement SAML SSO support in third_party_auth
(inlcuding Shibboleth support)
@@ -61,9 +60,9 @@ def clean_json(value, of_type):
try:
value_python = json.loads(value)
except ValueError as err:
raise ValidationError(u"Invalid JSON: {}".format(err)) # lint-amnesty, pylint: disable=raise-missing-from
raise ValidationError(f"Invalid JSON: {err}") # lint-amnesty, pylint: disable=raise-missing-from
if not isinstance(value_python, of_type):
raise ValidationError(u"Expected a JSON {}".format(of_type))
raise ValidationError(f"Expected a JSON {of_type}")
return json.dumps(value_python, indent=4)
@@ -75,7 +74,7 @@ def clean_username(username=''):
class AuthNotConfigured(SocialAuthBaseException):
""" Exception when SAMLProviderData or other required info is missing """
def __init__(self, provider_name):
super(AuthNotConfigured, self).__init__() # lint-amnesty, pylint: disable=super-with-arguments
super().__init__()
self.provider_name = provider_name
def __str__(self):
@@ -95,9 +94,9 @@ class ProviderConfig(ConfigurationModel):
icon_class = models.CharField(
max_length=50,
blank=True,
default=u'fa-sign-in',
default='fa-sign-in',
help_text=(
u'The Font Awesome (or custom) icon class to use on the login button for this provider. '
'The Font Awesome (or custom) icon class to use on the login button for this provider. '
'Examples: fa-google-plus, fa-facebook, fa-linkedin, fa-sign-in, fa-university'
),
)
@@ -108,15 +107,15 @@ class ProviderConfig(ConfigurationModel):
icon_image = models.FileField(
blank=True,
help_text=(
u'If there is no Font Awesome icon available for this provider, upload a custom image. '
'If there is no Font Awesome icon available for this provider, upload a custom image. '
'SVG images are recommended as they can scale to any size.'
),
)
name = models.CharField(max_length=50, blank=False, help_text=u"Name of this provider (shown to users)")
name = models.CharField(max_length=50, blank=False, help_text="Name of this provider (shown to users)")
slug = models.SlugField(
max_length=30, db_index=True, default=u'default',
max_length=30, db_index=True, default='default',
help_text=(
u'A short string uniquely identifying this provider. '
'A short string uniquely identifying this provider. '
'Cannot contain spaces and should be a usable as a CSS class. Examples: "ubc", "mit-staging"'
))
secondary = models.BooleanField(
@@ -186,7 +185,7 @@ class ProviderConfig(ConfigurationModel):
null=True,
blank=True,
default=None,
verbose_name=u'Max session length (seconds)',
verbose_name='Max session length (seconds)',
help_text=_(
"If this option is set, then users logging in using this SSO provider will have "
"their session length limited to no longer than this value. If set to 0 (zero), "
@@ -211,7 +210,7 @@ class ProviderConfig(ConfigurationModel):
)
enable_sso_id_verification = models.BooleanField(
default=False,
help_text=u"Use the presence of a profile from a trusted third party as proof of identity verification.",
help_text="Use the presence of a profile from a trusted third party as proof of identity verification.",
)
prefix = None # used for provider_id. Set to a string value in subclass
backend_name = None # Set to a field or fixed value in subclass
@@ -219,13 +218,13 @@ class ProviderConfig(ConfigurationModel):
# "enabled" field is inherited from ConfigurationModel
class Meta(object):
class Meta:
app_label = "third_party_auth"
abstract = True
def clean(self):
""" Ensure that at most `icon_class` or `icon_image` is set """
super(ProviderConfig, self).clean() # lint-amnesty, pylint: disable=super-with-arguments
super().clean()
if bool(self.icon_class) and bool(self.icon_image):
raise ValidationError('Either an icon class or an icon image must be given (but not both)')
@@ -243,7 +242,7 @@ class ProviderConfig(ConfigurationModel):
@property
def full_class_name(self):
""" Get the fully qualified class name of this provider. """
return '{}.{}'.format(self.__module__, self.__class__.__name__)
return f'{self.__module__}.{self.__class__.__name__}'
def get_url_params(self):
""" Get a dict of GET parameters to append to login links for this provider """
@@ -318,7 +317,7 @@ class ProviderConfig(ConfigurationModel):
def get_authentication_backend(self):
"""Gets associated Django settings.AUTHENTICATION_BACKEND string."""
return '{}.{}'.format(self.backend_class.__module__, self.backend_class.__name__)
return f'{self.backend_class.__module__}.{self.backend_class.__name__}'
@property
def display_for_login(self):
@@ -354,33 +353,33 @@ class OAuth2ProviderConfig(ProviderConfig):
backend_name = models.CharField(
max_length=50, blank=False, db_index=True,
help_text=(
u"Which python-social-auth OAuth2 provider backend to use. "
"Which python-social-auth OAuth2 provider backend to use. "
"The list of backend choices is determined by the THIRD_PARTY_AUTH_BACKENDS setting."
# To be precise, it's set by AUTHENTICATION_BACKENDS
# which production.py sets from THIRD_PARTY_AUTH_BACKENDS
)
)
key = models.TextField(blank=True, verbose_name=u"Client ID")
key = models.TextField(blank=True, verbose_name="Client ID")
secret = models.TextField(
blank=True,
verbose_name=u"Client Secret",
verbose_name="Client Secret",
help_text=(
u'For increased security, you can avoid storing this in your database by leaving '
'For increased security, you can avoid storing this in your database by leaving '
' this field blank and setting '
'SOCIAL_AUTH_OAUTH_SECRETS = {"(backend name)": "secret", ...} '
'in your instance\'s Django settings (or lms.yml)'
)
)
other_settings = models.TextField(blank=True, help_text=u"Optional JSON object with advanced settings, if any.")
other_settings = models.TextField(blank=True, help_text="Optional JSON object with advanced settings, if any.")
class Meta(object):
class Meta:
app_label = "third_party_auth"
verbose_name = u"Provider Configuration (OAuth)"
verbose_name = "Provider Configuration (OAuth)"
verbose_name_plural = verbose_name
def clean(self):
""" Standardize and validate fields """
super(OAuth2ProviderConfig, self).clean() # lint-amnesty, pylint: disable=super-with-arguments
super().clean()
self.other_settings = clean_json(self.other_settings, dict)
def get_setting(self, name):
@@ -419,15 +418,15 @@ class SAMLConfiguration(ConfigurationModel):
)
slug = models.SlugField(
max_length=30,
default=u'default',
default='default',
help_text=(
u'A short string uniquely identifying this configuration. '
'A short string uniquely identifying this configuration. '
'Cannot contain spaces. Examples: "ubc", "mit-staging"'
),
)
private_key = models.TextField(
help_text=(
u'To generate a key pair as two files, run '
'To generate a key pair as two files, run '
'"openssl req -new -x509 -days 3652 -nodes -out saml.crt -keyout saml.key". '
'Paste the contents of saml.key here. '
'For increased security, you can avoid storing this in your database by leaving '
@@ -438,45 +437,45 @@ class SAMLConfiguration(ConfigurationModel):
)
public_key = models.TextField(
help_text=(
u'Public key certificate. '
'Public key certificate. '
'For increased security, you can avoid storing this in your database by leaving '
'this field blank and setting it via the SOCIAL_AUTH_SAML_SP_PUBLIC_CERT setting '
'in your instance\'s Django settings (or lms.yml).'
),
blank=True,
)
entity_id = models.CharField(max_length=255, default="http://saml.example.com", verbose_name=u"Entity ID")
entity_id = models.CharField(max_length=255, default="http://saml.example.com", verbose_name="Entity ID")
org_info_str = models.TextField(
verbose_name=u"Organization Info",
default=u'{"en-US": {"url": "http://www.example.com", "displayname": "Example Inc.", "name": "example"}}',
help_text=u"JSON dictionary of 'url', 'displayname', and 'name' for each language",
verbose_name="Organization Info",
default='{"en-US": {"url": "http://www.example.com", "displayname": "Example Inc.", "name": "example"}}',
help_text="JSON dictionary of 'url', 'displayname', and 'name' for each language",
)
other_config_str = models.TextField(
default=u'{\n"SECURITY_CONFIG": {"metadataCacheDuration": 604800, "signMetadata": false}\n}',
default='{\n"SECURITY_CONFIG": {"metadataCacheDuration": 604800, "signMetadata": false}\n}',
help_text=(
u"JSON object defining advanced settings that are passed on to python-saml. "
"JSON object defining advanced settings that are passed on to python-saml. "
"Valid keys that can be set here include: SECURITY_CONFIG and SP_EXTRA"
),
)
is_public = models.BooleanField(
default=False,
verbose_name=u"Allow customers to see and use this SAML configuration",
verbose_name="Allow customers to see and use this SAML configuration",
help_text=(
u"When checked, customers will be able to choose this SAML Configuration "
"When checked, customers will be able to choose this SAML Configuration "
"in the admin portal."
),
)
class Meta(object):
class Meta:
app_label = "third_party_auth"
verbose_name = u"SAML Configuration"
verbose_name = "SAML Configuration"
verbose_name_plural = verbose_name
def __str__(self):
"""
Return human-readable string representation.
"""
return u"SAMLConfiguration {site}: {slug} on {date:%Y-%m-%d %H:%M:%S}".format(
return "SAMLConfiguration {site}: {slug} on {date:%Y-%m-%d %H:%M:%S}".format(
site=self.site.name,
slug=self.slug,
date=self.change_date,
@@ -484,7 +483,7 @@ class SAMLConfiguration(ConfigurationModel):
def clean(self):
""" Standardize and validate fields """
super(SAMLConfiguration, self).clean() # lint-amnesty, pylint: disable=super-with-arguments
super().clean()
self.org_info_str = clean_json(self.org_info_str, dict)
self.other_config_str = clean_json(self.other_config_str, dict)
@@ -507,7 +506,7 @@ class SAMLConfiguration(ConfigurationModel):
""" Get the value of a setting, or raise KeyError """
default_saml_contact = {
# Default contact information to put into the SAML metadata that gets generated by python-saml.
"givenName": _(u"{platform_name} Support").format(
"givenName": _("{platform_name} Support").format(
platform_name=configuration_helpers.get_value('PLATFORM_NAME', settings.PLATFORM_NAME)
),
"emailAddress": configuration_helpers.get_value('TECH_SUPPORT_EMAIL', settings.TECH_SUPPORT_EMAIL),
@@ -561,76 +560,76 @@ class SAMLProviderConfig(ProviderConfig):
"""
prefix = 'saml'
backend_name = models.CharField(
max_length=50, default=u'tpa-saml', blank=False,
help_text=u"Which python-social-auth provider backend to use. 'tpa-saml' is the standard edX SAML backend.")
max_length=50, default='tpa-saml', blank=False,
help_text="Which python-social-auth provider backend to use. 'tpa-saml' is the standard edX SAML backend.")
entity_id = models.CharField(
max_length=255, verbose_name=u"Entity ID", help_text=u"Example: https://idp.testshib.org/idp/shibboleth")
max_length=255, verbose_name="Entity ID", help_text="Example: https://idp.testshib.org/idp/shibboleth")
metadata_source = models.CharField(
max_length=255,
help_text=(
u"URL to this provider's XML metadata. Should be an HTTPS URL. "
"URL to this provider's XML metadata. Should be an HTTPS URL. "
"Example: https://www.testshib.org/metadata/testshib-providers.xml"
))
attr_user_permanent_id = models.CharField(
max_length=128, blank=True, verbose_name=u"User ID Attribute",
max_length=128, blank=True, verbose_name="User ID Attribute",
help_text=(
u"URN of the SAML attribute that we can use as a unique, "
"URN of the SAML attribute that we can use as a unique, "
"persistent user ID. Leave blank for default."
))
attr_full_name = models.CharField(
max_length=128, blank=True, verbose_name=u"Full Name Attribute",
help_text=u"URN of SAML attribute containing the user's full name. Leave blank for default.")
max_length=128, blank=True, verbose_name="Full Name Attribute",
help_text="URN of SAML attribute containing the user's full name. Leave blank for default.")
default_full_name = models.CharField(
max_length=255, blank=True, verbose_name=u"Default Value for Full Name",
help_text=u"Default value for full name to be used if not present in SAML response.")
max_length=255, blank=True, verbose_name="Default Value for Full Name",
help_text="Default value for full name to be used if not present in SAML response.")
attr_first_name = models.CharField(
max_length=128, blank=True, verbose_name=u"First Name Attribute",
help_text=u"URN of SAML attribute containing the user's first name. Leave blank for default.")
max_length=128, blank=True, verbose_name="First Name Attribute",
help_text="URN of SAML attribute containing the user's first name. Leave blank for default.")
default_first_name = models.CharField(
max_length=255, blank=True, verbose_name=u"Default Value for First Name",
help_text=u"Default value for first name to be used if not present in SAML response.")
max_length=255, blank=True, verbose_name="Default Value for First Name",
help_text="Default value for first name to be used if not present in SAML response.")
attr_last_name = models.CharField(
max_length=128, blank=True, verbose_name=u"Last Name Attribute",
help_text=u"URN of SAML attribute containing the user's last name. Leave blank for default.")
max_length=128, blank=True, verbose_name="Last Name Attribute",
help_text="URN of SAML attribute containing the user's last name. Leave blank for default.")
default_last_name = models.CharField(
max_length=255, blank=True, verbose_name=u"Default Value for Last Name",
help_text=u"Default value for last name to be used if not present in SAML response.")
max_length=255, blank=True, verbose_name="Default Value for Last Name",
help_text="Default value for last name to be used if not present in SAML response.")
attr_username = models.CharField(
max_length=128, blank=True, verbose_name=u"Username Hint Attribute",
help_text=u"URN of SAML attribute to use as a suggested username for this user. Leave blank for default.")
max_length=128, blank=True, verbose_name="Username Hint Attribute",
help_text="URN of SAML attribute to use as a suggested username for this user. Leave blank for default.")
default_username = models.CharField(
max_length=255, blank=True, verbose_name=u"Default Value for Username",
help_text=u"Default value for username to be used if not present in SAML response.")
max_length=255, blank=True, verbose_name="Default Value for Username",
help_text="Default value for username to be used if not present in SAML response.")
attr_email = models.CharField(
max_length=128, blank=True, verbose_name=u"Email Attribute",
help_text=u"URN of SAML attribute containing the user's email address[es]. Leave blank for default.")
max_length=128, blank=True, verbose_name="Email Attribute",
help_text="URN of SAML attribute containing the user's email address[es]. Leave blank for default.")
default_email = models.CharField(
max_length=255, blank=True, verbose_name=u"Default Value for Email",
help_text=u"Default value for email to be used if not present in SAML response.")
max_length=255, blank=True, verbose_name="Default Value for Email",
help_text="Default value for email to be used if not present in SAML response.")
automatic_refresh_enabled = models.BooleanField(
default=True, verbose_name=u"Enable automatic metadata refresh",
help_text=u"When checked, the SAML provider's metadata will be included "
default=True, verbose_name="Enable automatic metadata refresh",
help_text="When checked, the SAML provider's metadata will be included "
"in the automatic refresh job, if configured."
)
identity_provider_type = models.CharField(
max_length=128, blank=False, verbose_name=u"Identity Provider Type", default=STANDARD_SAML_PROVIDER_KEY,
max_length=128, blank=False, verbose_name="Identity Provider Type", default=STANDARD_SAML_PROVIDER_KEY,
choices=get_saml_idp_choices(), help_text=(
u"Some SAML providers require special behavior. For example, SAP SuccessFactors SAML providers require an "
"Some SAML providers require special behavior. For example, SAP SuccessFactors SAML providers require an "
"additional API call to retrieve user metadata not provided in the SAML response. Select the provider type "
"which best matches your use case. If in doubt, choose the Standard SAML Provider type."
)
)
debug_mode = models.BooleanField(
default=False, verbose_name=u"Debug Mode",
default=False, verbose_name="Debug Mode",
help_text=(
u"In debug mode, all SAML XML requests and responses will be logged. "
"In debug mode, all SAML XML requests and responses will be logged. "
"This is helpful for testing/setup but should always be disabled before users start using this provider."
),
)
country = models.CharField(
max_length=128,
help_text=(
u'URN of SAML attribute containing the user`s country.',
'URN of SAML attribute containing the user`s country.',
),
blank=True,
)
@@ -665,9 +664,9 @@ class SAMLProviderConfig(ProviderConfig):
),
)
other_settings = models.TextField(
verbose_name=u"Advanced settings", blank=True,
verbose_name="Advanced settings", blank=True,
help_text=(
u'For advanced use cases, enter a JSON object with addtional configuration. '
'For advanced use cases, enter a JSON object with addtional configuration. '
'The tpa-saml backend supports {"requiredEntitlements": ["urn:..."]}, '
'which can be used to require the presence of a specific eduPersonEntitlement, '
'and {"extra_field_definitions": [{"name": "...", "urn": "..."},...]}, which can be '
@@ -687,12 +686,12 @@ class SAMLProviderConfig(ProviderConfig):
def clean(self):
""" Standardize and validate fields """
super(SAMLProviderConfig, self).clean() # lint-amnesty, pylint: disable=super-with-arguments
super().clean()
self.other_settings = clean_json(self.other_settings, dict)
class Meta(object):
class Meta:
app_label = "third_party_auth"
verbose_name = u"Provider Configuration (SAML IdP)"
verbose_name = "Provider Configuration (SAML IdP)"
verbose_name_plural = "Provider Configuration (SAML IdPs)"
def get_url_params(self):
@@ -716,7 +715,7 @@ class SAMLProviderConfig(ProviderConfig):
def get_social_auth_uid(self, remote_id):
""" Get social auth uid from remote id by prepending idp_slug to the remote id """
return '{}:{}'.format(self.slug, remote_id)
return f'{self.slug}:{remote_id}'
def get_setting(self, name):
""" Get the value of a setting, or raise KeyError """
@@ -794,12 +793,12 @@ class SAMLProviderData(models.Model):
expires_at = models.DateTimeField(db_index=True, null=True)
entity_id = models.CharField(max_length=255, db_index=True) # This is the key for lookups in this table
sso_url = models.URLField(verbose_name=u"SSO URL")
sso_url = models.URLField(verbose_name="SSO URL")
public_key = models.TextField()
class Meta(object):
class Meta:
app_label = "third_party_auth"
verbose_name = u"SAML Provider Data"
verbose_name = "SAML Provider Data"
verbose_name_plural = verbose_name
ordering = ('-fetched_at', )
@@ -813,7 +812,7 @@ class SAMLProviderData(models.Model):
@classmethod
def cache_key_name(cls, entity_id):
""" Return the name of the key to use to cache the current data """
return 'configuration/{}/current/{}'.format(cls.__name__, entity_id)
return f'configuration/{cls.__name__}/current/{entity_id}'
@classmethod
def current(cls, entity_id):
@@ -857,15 +856,15 @@ class LTIProviderConfig(ProviderConfig):
lti_consumer_key = models.CharField(
max_length=255,
help_text=(
u'The name that the LTI Tool Consumer will use to identify itself'
'The name that the LTI Tool Consumer will use to identify itself'
)
)
lti_hostname = models.CharField(
default=u'localhost',
default='localhost',
max_length=255,
help_text=(
u'The domain that will be acting as the LTI consumer.'
'The domain that will be acting as the LTI consumer.'
),
db_index=True
)
@@ -874,7 +873,7 @@ class LTIProviderConfig(ProviderConfig):
default=create_hash256,
max_length=255,
help_text=(
u'The shared secret that the LTI Tool Consumer will use to '
'The shared secret that the LTI Tool Consumer will use to '
'authenticate requests. Only this edX instance and this '
'tool consumer instance should know this value. '
'For increased security, you can avoid storing this in '
@@ -888,7 +887,7 @@ class LTIProviderConfig(ProviderConfig):
lti_max_timestamp_age = models.IntegerField(
default=10,
help_text=(
u'The maximum age of oauth_timestamp values, in seconds.'
'The maximum age of oauth_timestamp values, in seconds.'
)
)
@@ -919,7 +918,7 @@ class LTIProviderConfig(ProviderConfig):
return self.lti_consumer_secret
return getattr(settings, 'SOCIAL_AUTH_LTI_CONSUMER_SECRETS', {}).get(self.lti_consumer_key, '')
class Meta(object):
class Meta:
app_label = "third_party_auth"
verbose_name = u"Provider Configuration (LTI)"
verbose_name = "Provider Configuration (LTI)"
verbose_name_plural = verbose_name

View File

@@ -186,7 +186,7 @@ class AuthEntryError(AuthException):
"""
class ProviderUserState(object):
class ProviderUserState:
"""Object representing the provider state (attached or not) for a user.
This is intended only for use when rendering templates. See for example
@@ -243,7 +243,7 @@ def get_idp_logout_url_from_running_pipeline(request):
try:
return tpa_provider.get_setting('logout_url')
except KeyError:
logger.info(u'[THIRD_PARTY_AUTH] idP [%s] logout_url setting not defined', tpa_provider.name)
logger.info('[THIRD_PARTY_AUTH] idP [%s] logout_url setting not defined', tpa_provider.name)
def get_real_social_auth_object(request):
@@ -315,7 +315,7 @@ def _get_enabled_provider(provider_id):
enabled_provider = provider.Registry.get(provider_id)
if not enabled_provider:
raise ValueError(u'Provider %s not enabled' % provider_id)
raise ValueError('Provider %s not enabled' % provider_id)
return enabled_provider
@@ -337,7 +337,7 @@ def _get_url(view_name, backend_name, auth_entry=None, redirect_url=None,
if extra_params:
query_params.update(extra_params)
return u"{url}?{params}".format(
return "{url}?{params}".format(
url=url,
params=six.moves.urllib.parse.urlencode(query_params)
)
@@ -357,7 +357,7 @@ def get_complete_url(backend_name):
ValueError: if no provider is enabled with the given backend_name.
"""
if not any(provider.Registry.get_enabled_by_backend_name(backend_name)):
raise ValueError(u'Provider with backend %s not enabled' % backend_name)
raise ValueError('Provider with backend %s not enabled' % backend_name)
return _get_url('social:complete', backend_name)
@@ -533,7 +533,7 @@ def redirect_to_custom_form(request, auth_entry, details, kwargs):
provider_id = provider.Registry.get_from_pipeline({'backend': backend_name, 'kwargs': kwargs}).provider_id
form_info = AUTH_ENTRY_CUSTOM[auth_entry]
secret_key = form_info['secret_key']
if isinstance(secret_key, six.text_type):
if isinstance(secret_key, str):
secret_key = secret_key.encode('utf-8')
custom_form_url = form_info['url']
data_bytes = json.dumps({
@@ -652,8 +652,8 @@ def ensure_user_information(strategy, auth_entry, backend=None, user=None, socia
# register anew via SSO. See SOL-1324 in JIRA.
# However, we will log a warning for this case:
logger.warning(
u'[THIRD_PARTY_AUTH] User is using third_party_auth to login but has not yet activated their account. '
u'Username: {username}'.format(username=user.username)
'[THIRD_PARTY_AUTH] User is using third_party_auth to login but has not yet activated their account. '
'Username: {username}'.format(username=user.username)
)
@@ -777,9 +777,9 @@ def associate_by_email_if_saml(auth_entry, backend, details, user, strategy, *ar
try:
enterprise_customer_user = is_enterprise_customer_user(current_provider.provider_id, current_user)
logger.info(
u'[Multiple_SSO_SAML_Accounts_Association_to_User] Enterprise user verification:'
u'User Email: {email}, User ID: {user_id}, Provider ID: {provider_id},'
u' is_enterprise_customer_user: {enterprise_customer_user}'.format(
'[Multiple_SSO_SAML_Accounts_Association_to_User] Enterprise user verification:'
'User Email: {email}, User ID: {user_id}, Provider ID: {provider_id},'
' is_enterprise_customer_user: {enterprise_customer_user}'.format(
email=current_user.email,
user_id=current_user.id,
provider_id=current_provider.provider_id,
@@ -808,9 +808,9 @@ def associate_by_email_if_saml(auth_entry, backend, details, user, strategy, *ar
not association_response['user'].is_active
):
logger.info(
u'[Multiple_SSO_SAML_Accounts_Association_to_User] User association account is not'
u' active: User Email: {email}, User ID: {user_id}, Provider ID: {provider_id},'
u' is_enterprise_customer_user: {enterprise_customer_user}'.format(
'[Multiple_SSO_SAML_Accounts_Association_to_User] User association account is not'
' active: User Email: {email}, User ID: {user_id}, Provider ID: {provider_id},'
' is_enterprise_customer_user: {enterprise_customer_user}'.format(
email=current_user.email,
user_id=current_user.id,
provider_id=current_provider.provider_id,
@@ -880,9 +880,9 @@ def user_details_force_sync(auth_entry, strategy, details, user=None, *args, **k
current_value = getattr(model, field)
if provider_value is not None and current_value != provider_value:
if field in integrity_conflict_fields and User.objects.filter(**{field: provider_value}).exists():
logger.warning(u'[THIRD_PARTY_AUTH] Profile data synchronization conflict. '
u'UserId: {user_id}, Provider: {provider}, ConflictField: {conflict_field}, '
u'ConflictValue: {conflict_value}'.format(
logger.warning('[THIRD_PARTY_AUTH] Profile data synchronization conflict. '
'UserId: {user_id}, Provider: {provider}, ConflictField: {conflict_field}, '
'ConflictValue: {conflict_value}'.format(
user_id=user.id,
provider=current_provider.name,
conflict_field=field,
@@ -893,8 +893,8 @@ def user_details_force_sync(auth_entry, strategy, details, user=None, *args, **k
if changed:
logger.info(
u'[THIRD_PARTY_AUTH] User performed SSO and data was synchronized. '
u'Username: {username}, Provider: {provider}, UpdatedKeys: {updated_keys}'.format(
'[THIRD_PARTY_AUTH] User performed SSO and data was synchronized. '
'Username: {username}, Provider: {provider}, UpdatedKeys: {updated_keys}'.format(
username=user.username,
provider=current_provider.name,
updated_keys=list(changed.keys())
@@ -924,7 +924,7 @@ def user_details_force_sync(auth_entry, strategy, details, user=None, *args, **k
email.send()
except SMTPException:
logger.exception('[THIRD_PARTY_AUTH] Error sending IdP learner data sync-initiated email change '
u'notification email. Username: {username}'.format(username=user.username))
'notification email. Username: {username}'.format(username=user.username))
def set_id_verification_status(auth_entry, strategy, details, user=None, *args, **kwargs): # lint-amnesty, pylint: disable=keyword-arg-before-vararg
@@ -1012,7 +1012,7 @@ def get_username(strategy, details, backend, user=None, *args, **kwargs): # lin
while not final_username or len(final_username) < min_length or user_exists({'username': final_username}):
username = short_username + uuid4().hex[:uuid_length]
final_username = slug_func(clean_func(username[:max_length]))
logger.info(u'[THIRD_PARTY_AUTH] New username generated. Username: {username}'.format(
logger.info('[THIRD_PARTY_AUTH] New username generated. Username: {username}'.format(
username=final_username))
else:
final_username = storage.user.get_username(user)

View File

@@ -18,7 +18,7 @@ from common.djangoapps.third_party_auth.models import (
)
class Registry(object):
class Registry:
"""
API for querying third-party auth ProviderConfig objects.

View File

@@ -19,8 +19,8 @@ from social_core.exceptions import AuthForbidden
from openedx.core.djangoapps.theming.helpers import get_current_request
from common.djangoapps.third_party_auth.exceptions import IncorrectConfigurationException
STANDARD_SAML_PROVIDER_KEY = u'standard_saml_provider'
SAP_SUCCESSFACTORS_SAML_KEY = u'sap_success_factors'
STANDARD_SAML_PROVIDER_KEY = 'standard_saml_provider'
SAP_SUCCESSFACTORS_SAML_KEY = 'sap_success_factors'
log = logging.getLogger(__name__)
@@ -83,18 +83,18 @@ class SAMLAuthBackend(SAMLAuth): # pylint: disable=abstract-method
config["sp"].update(self.get_idp_setting(idp, "SP_EXTRA", {}))
return config
else:
return super(SAMLAuthBackend, self).generate_saml_config() # lint-amnesty, pylint: disable=super-with-arguments
return super().generate_saml_config()
def get_user_id(self, details, response):
"""
Calling the parent function and handling the exception properly.
"""
try:
return super(SAMLAuthBackend, self).get_user_id(details, response) # lint-amnesty, pylint: disable=super-with-arguments
return super().get_user_id(details, response)
except KeyError as ex:
log.warning(
u'[THIRD_PARTY_AUTH] Error in SAML authentication flow. '
u'Provider: {idp_name}, Message: {message}'.format(
'[THIRD_PARTY_AUTH] Error in SAML authentication flow. '
'Provider: {idp_name}, Message: {message}'.format(
message=ex.message, # lint-amnesty, pylint: disable=no-member
idp_name=response.get('idp_name')
)
@@ -127,7 +127,7 @@ class SAMLAuthBackend(SAMLAuth): # pylint: disable=abstract-method
log.error('[THIRD_PARTY_AUTH] SAML authentication is not enabled')
raise Http404
return super(SAMLAuthBackend, self).auth_url() # lint-amnesty, pylint: disable=super-with-arguments
return super().auth_url()
def disconnect(self, *args, **kwargs):
"""
@@ -136,7 +136,7 @@ class SAMLAuthBackend(SAMLAuth): # pylint: disable=abstract-method
from openedx.features.enterprise_support.api import unlink_enterprise_user_from_idp
user = kwargs.get('user', None)
unlink_enterprise_user_from_idp(self.strategy.request, user, self.name)
return super(SAMLAuthBackend, self).disconnect(*args, **kwargs) # lint-amnesty, pylint: disable=super-with-arguments
return super().disconnect(*args, **kwargs)
def _check_entitlements(self, idp, attributes):
"""
@@ -150,8 +150,8 @@ class SAMLAuthBackend(SAMLAuth): # pylint: disable=abstract-method
for expected in idp.conf['requiredEntitlements']:
if expected not in entitlements:
log.warning(
u'[THIRD_PARTY_AUTH] SAML user rejected due to missing eduPersonEntitlement. '
u'Provider: {provider}, Entitlement: {entitlement}'.format(
'[THIRD_PARTY_AUTH] SAML user rejected due to missing eduPersonEntitlement. '
'Provider: {provider}, Entitlement: {entitlement}'.format(
provider=idp.name,
entitlement=expected)
)
@@ -165,7 +165,7 @@ class SAMLAuthBackend(SAMLAuth): # pylint: disable=abstract-method
"""
# We only override this method so that we can add extra debugging when debug_mode is True
# Note that auth_inst is instantiated just for the current HTTP request, then is destroyed
auth_inst = super(SAMLAuthBackend, self)._create_saml_auth(idp) # lint-amnesty, pylint: disable=super-with-arguments
auth_inst = super()._create_saml_auth(idp)
from .models import SAMLProviderConfig
if SAMLProviderConfig.current(idp.name).debug_mode:
@@ -177,7 +177,7 @@ class SAMLAuthBackend(SAMLAuth): # pylint: disable=abstract-method
""" Wrapped login or process_response method """
result = method(*args, **kwargs)
log.info(
u"SAML login %s for IdP %s. Data: %s. Next url %s. XML is:\n%s",
"SAML login %s for IdP %s. Data: %s. Next url %s. XML is:\n%s",
action_description, idp.name, request_data, next_url, xml_getter()
)
return result
@@ -207,7 +207,7 @@ class EdXSAMLIdentityProvider(SAMLIdentityProvider):
Overrides `get_user_details` from the base class; retrieves those details,
then updates the dict with values from whatever additional fields are desired.
"""
details = super(EdXSAMLIdentityProvider, self).get_user_details(attributes) # lint-amnesty, pylint: disable=super-with-arguments
details = super().get_user_details(attributes)
extra_field_definitions = self.conf.get('extra_field_definitions', [])
details.update({
field['name']: attributes[field['urn']][0] if field['urn'] in attributes else None
@@ -227,8 +227,8 @@ class EdXSAMLIdentityProvider(SAMLIdentityProvider):
try:
return attributes[key][0]
except IndexError:
log.warning(u'[THIRD_PARTY_AUTH] SAML attribute value not found. '
u'SamlAttribute: {attribute}'.format(attribute=key))
log.warning('[THIRD_PARTY_AUTH] SAML attribute value not found. '
'SamlAttribute: {attribute}'.format(attribute=key))
return self.conf['attr_defaults'].get(conf_key) or None
@property
@@ -375,8 +375,8 @@ class SapSuccessFactorsIdentityProvider(EdXSAMLIdentityProvider):
if not all(var in self.conf for var in self.required_variables):
missing = [var for var in self.required_variables if var not in self.conf]
log.warning(
u'[THIRD_PARTY_AUTH] To retrieve rich user data for a SAP SuccessFactors identity provider, '
u'the following keys in other_settings are required, but were missing. MissingKeys: {keys}'.format(
'[THIRD_PARTY_AUTH] To retrieve rich user data for a SAP SuccessFactors identity provider, '
'the following keys in other_settings are required, but were missing. MissingKeys: {keys}'.format(
keys=missing
)
)
@@ -394,21 +394,21 @@ class SapSuccessFactorsIdentityProvider(EdXSAMLIdentityProvider):
token_data = transaction_data.get('token_data')
token_data = token_data if token_data else 'Not available'
log_msg_template = (
u'SAPSuccessFactors exception received for {operation_name} request. ' +
u'URL: {url} ' +
u'Company ID: {company_id}. ' +
u'User ID: {user_id}. ' +
u'Error message: {err_msg}. ' +
u'System message: {sys_msg}. ' +
u'Headers: {headers}. ' +
u'Token Data: {token_data}.'
'SAPSuccessFactors exception received for {operation_name} request. ' +
'URL: {url} ' +
'Company ID: {company_id}. ' +
'User ID: {user_id}. ' +
'Error message: {err_msg}. ' +
'System message: {sys_msg}. ' +
'Headers: {headers}. ' +
'Token Data: {token_data}.'
)
log_msg = log_msg_template.format(
operation_name=transaction_data['operation_name'],
url=transaction_data['endpoint_url'],
company_id=transaction_data['company_id'],
user_id=transaction_data['user_id'],
err_msg=text_type(err),
err_msg=str(err),
sys_msg=sys_msg,
headers=headers,
token_data=token_data,
@@ -482,7 +482,7 @@ class SapSuccessFactorsIdentityProvider(EdXSAMLIdentityProvider):
if not access_token_data:
return None
token_string = access_token_data['access_token']
session.headers.update({'Authorization': u'Bearer {}'.format(token_string), 'Accept': 'application/json'})
session.headers.update({'Authorization': f'Bearer {token_string}', 'Accept': 'application/json'})
session.token_data = access_token_data
return session
@@ -492,7 +492,7 @@ class SapSuccessFactorsIdentityProvider(EdXSAMLIdentityProvider):
of the info we need to do that, or if the request triggers an exception, then fail nicely by
returning the basic user details we're able to extract from just the SAML response.
"""
basic_details = super(SapSuccessFactorsIdentityProvider, self).get_user_details(attributes) # lint-amnesty, pylint: disable=super-with-arguments
basic_details = super().get_user_details(attributes)
if self.invalid_configuration():
return basic_details
user_id = basic_details['username']
@@ -526,7 +526,7 @@ class SapSuccessFactorsIdentityProvider(EdXSAMLIdentityProvider):
self.log_bizx_api_exception(transaction_data, err)
return basic_details
log.info(u'[THIRD_PARTY_AUTH] BizX Odata response for user [%s] %s', user_id, response)
log.info('[THIRD_PARTY_AUTH] BizX Odata response for user [%s] %s', user_id, response)
return self.get_registration_fields(response)
@@ -552,7 +552,7 @@ def get_saml_idp_class(idp_identifier_string):
}
if idp_identifier_string not in choices:
log.error(
u'[THIRD_PARTY_AUTH] Invalid EdXSAMLIdentityProvider subclass--'
u'using EdXSAMLIdentityProvider base class. Provider: {provider}'.format(provider=idp_identifier_string)
'[THIRD_PARTY_AUTH] Invalid EdXSAMLIdentityProvider subclass--'
'using EdXSAMLIdentityProvider base class. Provider: {provider}'.format(provider=idp_identifier_string)
)
return choices.get(idp_identifier_string, EdXSAMLIdentityProvider)

View File

@@ -57,4 +57,4 @@ class ConfigurationModelStrategy(DjangoStrategy):
# At this point, we know 'name' is not set in a [OAuth2|LTI|SAML]ProviderConfig row.
# It's probably a global Django setting like 'FIELDS_STORED_IN_SESSION':
return super(ConfigurationModelStrategy, self).setting(name, default, backend) # lint-amnesty, pylint: disable=super-with-arguments
return super().setting(name, default, backend)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Code to manage fetching and storing the metadata of IdPs.
"""
@@ -80,9 +79,9 @@ def fetch_saml_metadata():
failure_messages = [] # We return the length of this array for num_failed
for url, entity_ids in url_map.items():
try:
log.info(u"Fetching %s", url)
log.info("Fetching %s", url)
if not url.lower().startswith('https'):
log.warning(u"This SAML metadata URL is not secure! It should use HTTPS. (%s)", url)
log.warning("This SAML metadata URL is not secure! It should use HTTPS. (%s)", url)
response = requests.get(url, verify=True) # May raise HTTPError or SSLError or ConnectionError
response.raise_for_status() # May raise an HTTPError
@@ -94,14 +93,14 @@ def fetch_saml_metadata():
# TODO: Can use OneLogin_Saml2_Utils to validate signed XML if anyone is using that
for entity_id in entity_ids:
log.info(u"Processing IdP with entityID %s", entity_id)
log.info("Processing IdP with entityID %s", entity_id)
public_key, sso_url, expires_at = _parse_metadata_xml(xml, entity_id)
changed = _update_data(entity_id, public_key, sso_url, expires_at)
if changed:
log.info(u"→ Created new record for SAMLProviderData")
log.info("→ Created new record for SAMLProviderData")
num_updated += 1
else:
log.info(u"→ Updated existing SAMLProviderData. Nothing has changed.")
log.info("→ Updated existing SAMLProviderData. Nothing has changed.")
except (exceptions.SSLError, exceptions.HTTPError, exceptions.RequestException, MetadataParseError) as error:
# Catch and process exception in case of errors during fetching and processing saml metadata.
# Here is a description of each exception.
@@ -110,25 +109,25 @@ def fetch_saml_metadata():
# RequestException is the base exception for any request related error that "requests" lib raises.
# MetadataParseError is raised if there is error in the fetched meta data (e.g. missing @entityID etc.)
log.exception(text_type(error))
log.exception(str(error))
failure_messages.append(
u"{error_type}: {error_message}\nMetadata Source: {url}\nEntity IDs: \n{entity_ids}.".format(
"{error_type}: {error_message}\nMetadata Source: {url}\nEntity IDs: \n{entity_ids}.".format(
error_type=type(error).__name__,
error_message=text_type(error),
error_message=str(error),
url=url,
entity_ids="\n".join(
[u"\t{}: {}".format(count, item) for count, item in enumerate(entity_ids, start=1)],
[f"\t{count}: {item}" for count, item in enumerate(entity_ids, start=1)],
)
)
)
except etree.XMLSyntaxError as error:
log.exception(text_type(error))
log.exception(str(error))
failure_messages.append(
u"XMLSyntaxError: {error_message}\nMetadata Source: {url}\nEntity IDs: \n{entity_ids}.".format(
"XMLSyntaxError: {error_message}\nMetadata Source: {url}\nEntity IDs: \n{entity_ids}.".format(
error_message=str(error.error_log), # lint-amnesty, pylint: disable=no-member
url=url,
entity_ids="\n".join(
[u"\t{}: {}".format(count, item) for count, item in enumerate(entity_ids, start=1)],
[f"\t{count}: {item}" for count, item in enumerate(entity_ids, start=1)],
)
)
)
@@ -148,12 +147,12 @@ def _parse_metadata_xml(xml, entity_id):
entity_desc = xml
else:
if xml.tag != etree.QName(SAML_XML_NS, 'EntitiesDescriptor'):
raise MetadataParseError(Text(u"Expected root element to be <EntitiesDescriptor>, not {}").format(xml.tag))
raise MetadataParseError(Text("Expected root element to be <EntitiesDescriptor>, not {}").format(xml.tag))
entity_desc = xml.find(
".//{}[@entityID='{}']".format(etree.QName(SAML_XML_NS, 'EntityDescriptor'), entity_id)
)
if entity_desc is None:
raise MetadataParseError(u"Can't find EntityDescriptor for entityID {}".format(entity_id))
raise MetadataParseError(f"Can't find EntityDescriptor for entityID {entity_id}")
expires_at = None
if "validUntil" in xml.attrib:

View File

@@ -82,7 +82,7 @@ def saml_metadata_view(request):
@csrf_exempt
@psa('{0}:complete'.format(URL_NAMESPACE))
@psa(f'{URL_NAMESPACE}:complete')
def lti_login_and_complete_view(request, backend, *args, **kwargs):
"""This is a combination login/complete due to LTI being a one step login"""