Merge pull request #9842 from ubc/tpa-mapping-api
Implement third party auth ID mapping API
This commit is contained in:
@@ -2,12 +2,21 @@
|
||||
"""
|
||||
Admin site configuration for third party authentication
|
||||
"""
|
||||
from django import forms
|
||||
|
||||
from django.contrib import admin
|
||||
|
||||
from config_models.admin import ConfigurationModelAdmin, KeyedConfigurationModelAdmin
|
||||
from .models import OAuth2ProviderConfig, SAMLProviderConfig, SAMLConfiguration, SAMLProviderData, LTIProviderConfig
|
||||
from .models import (
|
||||
OAuth2ProviderConfig,
|
||||
SAMLProviderConfig,
|
||||
SAMLConfiguration,
|
||||
SAMLProviderData,
|
||||
LTIProviderConfig,
|
||||
ProviderApiPermissions
|
||||
)
|
||||
from .tasks import fetch_saml_metadata
|
||||
from third_party_auth.provider import Registry
|
||||
|
||||
|
||||
class OAuth2ProviderConfigAdmin(KeyedConfigurationModelAdmin):
|
||||
@@ -111,3 +120,26 @@ class LTIProviderConfigAdmin(KeyedConfigurationModelAdmin):
|
||||
)
|
||||
|
||||
admin.site.register(LTIProviderConfig, LTIProviderConfigAdmin)
|
||||
|
||||
|
||||
class ApiPermissionsAdminForm(forms.ModelForm):
|
||||
""" Django admin form for ApiPermissions model """
|
||||
class Meta(object): # pylint: disable=missing-docstring
|
||||
model = ProviderApiPermissions
|
||||
|
||||
provider_id = forms.ChoiceField(choices=[], required=True)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ApiPermissionsAdminForm, self).__init__(*args, **kwargs)
|
||||
self.fields['provider_id'].choices = (
|
||||
(provider.provider_id, "{} ({})".format(provider.name, provider.provider_id))
|
||||
for provider in Registry.enabled()
|
||||
)
|
||||
|
||||
|
||||
class ApiPermissionsAdmin(admin.ModelAdmin):
|
||||
""" Django Admin class for ApiPermissions """
|
||||
list_display = ('client', 'provider_id')
|
||||
form = ApiPermissionsAdminForm
|
||||
|
||||
admin.site.register(ProviderApiPermissions, ApiPermissionsAdmin)
|
||||
|
||||
31
common/djangoapps/third_party_auth/api/permissions.py
Normal file
31
common/djangoapps/third_party_auth/api/permissions.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""
|
||||
Third party auth API related permissions
|
||||
"""
|
||||
from rest_framework import permissions
|
||||
|
||||
from third_party_auth.models import ProviderApiPermissions
|
||||
|
||||
|
||||
class ThirdPartyAuthProviderApiPermission(permissions.BasePermission):
|
||||
"""
|
||||
Allow someone to access the view if they have valid OAuth client credential.
|
||||
"""
|
||||
def __init__(self, provider_id):
|
||||
""" Initialize the class with a provider_id """
|
||||
self.provider_id = provider_id
|
||||
|
||||
def has_permission(self, request, view):
|
||||
"""
|
||||
Check if the OAuth client associated with auth token in current request has permission to access
|
||||
the information for provider
|
||||
"""
|
||||
if not request.auth or not self.provider_id:
|
||||
# doesn't have access token or no provider_id specified
|
||||
return False
|
||||
|
||||
try:
|
||||
ProviderApiPermissions.objects.get(client__pk=request.auth.client_id, provider_id=self.provider_id)
|
||||
except ProviderApiPermissions.DoesNotExist:
|
||||
return False
|
||||
|
||||
return True
|
||||
22
common/djangoapps/third_party_auth/api/serializers.py
Normal file
22
common/djangoapps/third_party_auth/api/serializers.py
Normal file
@@ -0,0 +1,22 @@
|
||||
""" Django REST Framework Serializers """
|
||||
|
||||
from rest_framework import serializers
|
||||
|
||||
|
||||
class UserMappingSerializer(serializers.Serializer): # pylint: disable=abstract-method
|
||||
""" Serializer for User Mapping"""
|
||||
provider = None
|
||||
username = serializers.SerializerMethodField()
|
||||
remote_id = serializers.SerializerMethodField()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.provider = kwargs['context'].get('provider', None)
|
||||
super(UserMappingSerializer, self).__init__(*args, **kwargs)
|
||||
|
||||
def get_username(self, social_user):
|
||||
""" Gets the edx username from a social user """
|
||||
return social_user.user.username
|
||||
|
||||
def get_remote_id(self, social_user):
|
||||
""" Gets remote id from social user based on provider """
|
||||
return self.provider.get_remote_id_from_social_auth(social_user)
|
||||
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Tests for the Third Party Auth permissions
|
||||
"""
|
||||
import unittest
|
||||
import ddt
|
||||
from mock import Mock
|
||||
|
||||
from rest_framework.test import APITestCase
|
||||
from django.conf import settings
|
||||
from third_party_auth.api.permissions import ThirdPartyAuthProviderApiPermission
|
||||
|
||||
from third_party_auth.tests.testutil import ThirdPartyAuthTestMixin
|
||||
|
||||
IDP_SLUG_TESTSHIB = 'testshib'
|
||||
PROVIDER_ID_TESTSHIB = 'saml-' + IDP_SLUG_TESTSHIB
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class ThirdPartyAuthApiPermissionTest(ThirdPartyAuthTestMixin, APITestCase):
|
||||
""" Tests for third party auth API permission """
|
||||
def setUp(self):
|
||||
""" Create users and oauth client for use in the tests """
|
||||
super(ThirdPartyAuthApiPermissionTest, self).setUp()
|
||||
|
||||
client = self.configure_oauth_client()
|
||||
self.configure_api_permission(client, PROVIDER_ID_TESTSHIB)
|
||||
|
||||
@ddt.data(
|
||||
(1, PROVIDER_ID_TESTSHIB, True),
|
||||
(1, 'invalid-provider-id', False),
|
||||
(999, PROVIDER_ID_TESTSHIB, False),
|
||||
(999, 'invalid-provider-id', False),
|
||||
(1, None, False),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_api_permission(self, client_pk, provider_id, expect):
|
||||
request = Mock()
|
||||
request.auth = Mock()
|
||||
request.auth.client_id = client_pk
|
||||
|
||||
result = ThirdPartyAuthProviderApiPermission(provider_id).has_permission(request, None)
|
||||
self.assertEqual(result, expect)
|
||||
|
||||
def test_api_permission_unauthorized_client(self):
|
||||
client = self.configure_oauth_client()
|
||||
self.configure_api_permission(client, 'saml-anotherprovider')
|
||||
|
||||
request = Mock()
|
||||
request.auth = Mock()
|
||||
request.auth.client_id = client.pk
|
||||
|
||||
result = ThirdPartyAuthProviderApiPermission(PROVIDER_ID_TESTSHIB).has_permission(request, None)
|
||||
self.assertEqual(result, False)
|
||||
@@ -1,60 +1,65 @@
|
||||
# pylint: disable=no-member
|
||||
"""
|
||||
Tests for the Third Party Auth REST API
|
||||
"""
|
||||
import json
|
||||
import unittest
|
||||
|
||||
import ddt
|
||||
from mock import patch
|
||||
from django.test import Client
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.http import QueryDict
|
||||
from mock import patch
|
||||
from provider.constants import CONFIDENTIAL
|
||||
from provider.oauth2.models import Client, AccessToken
|
||||
from openedx.core.lib.api.permissions import ApiKeyHeaderPermission
|
||||
from rest_framework.test import APITestCase
|
||||
from rest_framework import status
|
||||
from django.conf import settings
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from util.testing import UrlResetMixin
|
||||
from openedx.core.lib.django_test_client_utils import get_absolute_url
|
||||
from social.apps.django_app.default.models import UserSocialAuth
|
||||
|
||||
from student.tests.factories import UserFactory
|
||||
from third_party_auth.api.permissions import ThirdPartyAuthProviderApiPermission
|
||||
from third_party_auth.models import ProviderApiPermissions
|
||||
from third_party_auth.tests.testutil import ThirdPartyAuthTestMixin
|
||||
|
||||
|
||||
VALID_API_KEY = "i am a key"
|
||||
IDP_SLUG_TESTSHIB = 'testshib'
|
||||
PROVIDER_ID_TESTSHIB = 'saml-' + IDP_SLUG_TESTSHIB
|
||||
|
||||
ALICE_USERNAME = "alice"
|
||||
CARL_USERNAME = "carl"
|
||||
STAFF_USERNAME = "staff"
|
||||
ADMIN_USERNAME = "admin"
|
||||
# These users will be created and linked to third party accounts:
|
||||
LINKED_USERS = (ALICE_USERNAME, STAFF_USERNAME, ADMIN_USERNAME)
|
||||
PASSWORD = "edx"
|
||||
|
||||
|
||||
@override_settings(EDX_API_KEY=VALID_API_KEY)
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class ThirdPartyAuthAPITests(ThirdPartyAuthTestMixin, APITestCase):
|
||||
"""
|
||||
Test the Third Party Auth REST API
|
||||
"""
|
||||
ALICE_USERNAME = "alice"
|
||||
CARL_USERNAME = "carl"
|
||||
STAFF_USERNAME = "staff"
|
||||
ADMIN_USERNAME = "admin"
|
||||
# These users will be created and linked to third party accounts:
|
||||
LINKED_USERS = (ALICE_USERNAME, STAFF_USERNAME, ADMIN_USERNAME)
|
||||
PASSWORD = "edx"
|
||||
def get_mapping_data_by_usernames(usernames):
|
||||
""" Generate mapping data used in response """
|
||||
return [{'username': username, 'remote_id': 'remote_' + username} for username in usernames]
|
||||
|
||||
|
||||
class TpaAPITestCase(ThirdPartyAuthTestMixin, APITestCase):
|
||||
""" Base test class """
|
||||
|
||||
def setUp(self):
|
||||
""" Create users for use in the tests """
|
||||
super(ThirdPartyAuthAPITests, self).setUp()
|
||||
super(TpaAPITestCase, self).setUp()
|
||||
|
||||
google = self.configure_google_provider(enabled=True)
|
||||
self.configure_facebook_provider(enabled=True)
|
||||
self.configure_linkedin_provider(enabled=False)
|
||||
self.enable_saml()
|
||||
testshib = self.configure_saml_provider(name='TestShib', enabled=True, idp_slug='testshib')
|
||||
testshib = self.configure_saml_provider(name='TestShib', enabled=True, idp_slug=IDP_SLUG_TESTSHIB)
|
||||
|
||||
# Create several users and link each user to Google and TestShib
|
||||
for username in self.LINKED_USERS:
|
||||
make_superuser = (username == self.ADMIN_USERNAME)
|
||||
make_staff = (username == self.STAFF_USERNAME) or make_superuser
|
||||
for username in LINKED_USERS:
|
||||
make_superuser = (username == ADMIN_USERNAME)
|
||||
make_staff = (username == STAFF_USERNAME) or make_superuser
|
||||
user = UserFactory.create(
|
||||
username=username,
|
||||
password=self.PASSWORD,
|
||||
password=PASSWORD,
|
||||
is_staff=make_staff,
|
||||
is_superuser=make_superuser
|
||||
)
|
||||
@@ -66,14 +71,23 @@ class ThirdPartyAuthAPITests(ThirdPartyAuthTestMixin, APITestCase):
|
||||
UserSocialAuth.objects.create(
|
||||
user=user,
|
||||
provider=testshib.backend_name,
|
||||
uid='{}:{}'.format(testshib.idp_slug, username),
|
||||
uid='{}:remote_{}'.format(testshib.idp_slug, username),
|
||||
)
|
||||
# Create another user not linked to any providers:
|
||||
UserFactory.create(username=self.CARL_USERNAME, password=self.PASSWORD)
|
||||
UserFactory.create(username=CARL_USERNAME, password=PASSWORD)
|
||||
|
||||
|
||||
@override_settings(EDX_API_KEY=VALID_API_KEY)
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class UserViewAPITests(TpaAPITestCase):
|
||||
"""
|
||||
Test the Third Party Auth User REST API
|
||||
"""
|
||||
|
||||
def expected_active(self, username):
|
||||
""" The JSON active providers list response expected for the given user """
|
||||
if username not in self.LINKED_USERS:
|
||||
if username not in LINKED_USERS:
|
||||
return []
|
||||
return [
|
||||
{
|
||||
@@ -82,11 +96,11 @@ class ThirdPartyAuthAPITests(ThirdPartyAuthTestMixin, APITestCase):
|
||||
"remote_id": "{}@gmail.com".format(username),
|
||||
},
|
||||
{
|
||||
"provider_id": "saml-testshib",
|
||||
"provider_id": PROVIDER_ID_TESTSHIB,
|
||||
"name": "TestShib",
|
||||
# The "testshib:" prefix is stored in the UserSocialAuth.uid field but should
|
||||
# not be present in the 'remote_id', since that's an implementation detail:
|
||||
"remote_id": username,
|
||||
"remote_id": 'remote_' + username,
|
||||
},
|
||||
]
|
||||
|
||||
@@ -106,7 +120,7 @@ class ThirdPartyAuthAPITests(ThirdPartyAuthTestMixin, APITestCase):
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_list_connected_providers(self, request_user, target_user, expect_result):
|
||||
self.client.login(username=request_user, password=self.PASSWORD)
|
||||
self.client.login(username=request_user, password=PASSWORD)
|
||||
url = reverse('third_party_auth_users_api', kwargs={'username': target_user})
|
||||
|
||||
response = self.client.get(url)
|
||||
@@ -130,3 +144,133 @@ class ThirdPartyAuthAPITests(ThirdPartyAuthTestMixin, APITestCase):
|
||||
if expect_result == 200:
|
||||
self.assertIn("active", response.data)
|
||||
self.assertItemsEqual(response.data["active"], self.expected_active(target_user))
|
||||
|
||||
|
||||
@override_settings(EDX_API_KEY=VALID_API_KEY)
|
||||
@ddt.ddt
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class UserMappingViewAPITests(TpaAPITestCase):
|
||||
"""
|
||||
Test the Third Party Auth User Mapping REST API
|
||||
"""
|
||||
@ddt.data(
|
||||
(VALID_API_KEY, PROVIDER_ID_TESTSHIB, 200, get_mapping_data_by_usernames(LINKED_USERS)),
|
||||
("i am an invalid key", PROVIDER_ID_TESTSHIB, 403, None),
|
||||
(None, PROVIDER_ID_TESTSHIB, 403, None),
|
||||
(VALID_API_KEY, 'non-existing-id', 404, []),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_list_all_user_mappings_withapi_key(self, api_key, provider_id, expect_code, expect_data):
|
||||
url = reverse('third_party_auth_user_mapping_api', kwargs={'provider_id': provider_id})
|
||||
response = self.client.get(url, HTTP_X_EDX_API_KEY=api_key)
|
||||
self._verify_response(response, expect_code, expect_data)
|
||||
|
||||
@ddt.data(
|
||||
(PROVIDER_ID_TESTSHIB, 'valid-token', 200, get_mapping_data_by_usernames(LINKED_USERS)),
|
||||
('non-existing-id', 'valid-token', 404, []),
|
||||
(PROVIDER_ID_TESTSHIB, 'invalid-token', 401, []),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_list_all_user_mappings_oauth2(self, provider_id, access_token, expect_code, expect_data):
|
||||
url = reverse('third_party_auth_user_mapping_api', kwargs={'provider_id': provider_id})
|
||||
# create oauth2 auth data
|
||||
user = UserFactory.create(username='api_user')
|
||||
client = Client.objects.create(name='oauth2_client', client_type=CONFIDENTIAL)
|
||||
token = AccessToken.objects.create(user=user, client=client)
|
||||
ProviderApiPermissions.objects.create(client=client, provider_id=provider_id)
|
||||
|
||||
if access_token == 'valid-token':
|
||||
access_token = token.token
|
||||
|
||||
response = self.client.get(url, HTTP_AUTHORIZATION='Bearer {}'.format(access_token))
|
||||
self._verify_response(response, expect_code, expect_data)
|
||||
|
||||
@ddt.data(
|
||||
({'username': [ALICE_USERNAME, STAFF_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
({'remote_id': ['remote_' + ALICE_USERNAME, 'remote_' + STAFF_USERNAME, 'remote_' + CARL_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
({'username': [ALICE_USERNAME, CARL_USERNAME, STAFF_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
({'username': [ALICE_USERNAME], 'remote_id': ['remote_' + STAFF_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_user_mappings_with_query_params_comma_separated(self, query_params, expect_code, expect_data):
|
||||
""" test queries like username=user1,user2,... """
|
||||
base_url = reverse(
|
||||
'third_party_auth_user_mapping_api', kwargs={'provider_id': PROVIDER_ID_TESTSHIB}
|
||||
)
|
||||
params = []
|
||||
for attr in ['username', 'remote_id']:
|
||||
if attr in query_params:
|
||||
params.append('{}={}'.format(attr, ','.join(query_params[attr])))
|
||||
url = "{}?{}".format(base_url, '&'.join(params))
|
||||
response = self.client.get(url, HTTP_X_EDX_API_KEY=VALID_API_KEY)
|
||||
self._verify_response(response, expect_code, expect_data)
|
||||
|
||||
@ddt.data(
|
||||
({'username': [ALICE_USERNAME, STAFF_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
({'remote_id': ['remote_' + ALICE_USERNAME, 'remote_' + STAFF_USERNAME, 'remote_' + CARL_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
({'username': [ALICE_USERNAME, CARL_USERNAME, STAFF_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
({'username': [ALICE_USERNAME], 'remote_id': ['remote_' + STAFF_USERNAME]}, 200,
|
||||
get_mapping_data_by_usernames([ALICE_USERNAME, STAFF_USERNAME])),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_user_mappings_with_query_params_multi_value_key(self, query_params, expect_code, expect_data):
|
||||
""" test queries like username=user1&username=user2&... """
|
||||
base_url = reverse(
|
||||
'third_party_auth_user_mapping_api', kwargs={'provider_id': PROVIDER_ID_TESTSHIB}
|
||||
)
|
||||
params = QueryDict('', mutable=True)
|
||||
for attr in ['username', 'remote_id']:
|
||||
if attr in query_params:
|
||||
params.setlist(attr, query_params[attr])
|
||||
url = "{}?{}".format(base_url, params.urlencode())
|
||||
response = self.client.get(url, HTTP_X_EDX_API_KEY=VALID_API_KEY)
|
||||
self._verify_response(response, expect_code, expect_data)
|
||||
|
||||
def test_user_mappings_only_return_requested_idp_mapping_by_provider_id(self):
|
||||
testshib2 = self.configure_saml_provider(name='TestShib2', enabled=True, idp_slug='testshib2')
|
||||
username = 'testshib2user'
|
||||
user = UserFactory.create(
|
||||
username=username,
|
||||
password=PASSWORD,
|
||||
is_staff=False,
|
||||
is_superuser=False
|
||||
)
|
||||
UserSocialAuth.objects.create(
|
||||
user=user,
|
||||
provider=testshib2.backend_name,
|
||||
uid='{}:{}'.format(testshib2.idp_slug, username),
|
||||
)
|
||||
|
||||
url = reverse('third_party_auth_user_mapping_api', kwargs={'provider_id': PROVIDER_ID_TESTSHIB})
|
||||
response = self.client.get(url, HTTP_X_EDX_API_KEY=VALID_API_KEY)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self._verify_response(response, 200, get_mapping_data_by_usernames(LINKED_USERS))
|
||||
|
||||
@ddt.data(
|
||||
(True, True, 200),
|
||||
(False, True, 200),
|
||||
(True, False, 200),
|
||||
(False, False, 403)
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_user_mapping_permission_logic(self, api_key_permission, token_permission, expect):
|
||||
url = reverse('third_party_auth_user_mapping_api', kwargs={'provider_id': PROVIDER_ID_TESTSHIB})
|
||||
with patch.object(ApiKeyHeaderPermission, 'has_permission', return_value=api_key_permission):
|
||||
with patch.object(ThirdPartyAuthProviderApiPermission, 'has_permission', return_value=token_permission):
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, expect)
|
||||
|
||||
def _verify_response(self, response, expect_code, expect_result):
|
||||
""" verify the items in data_list exists in response and data_results matches results in response """
|
||||
self.assertEqual(response.status_code, expect_code)
|
||||
if expect_code == 200:
|
||||
for item in ['results', 'count', 'num_pages']:
|
||||
self.assertIn(item, response.data)
|
||||
self.assertItemsEqual(response.data['results'], expect_result)
|
||||
|
||||
@@ -2,11 +2,14 @@
|
||||
|
||||
from django.conf.urls import patterns, url
|
||||
|
||||
from .views import UserView
|
||||
from .views import UserView, UserMappingView
|
||||
|
||||
USERNAME_PATTERN = r'(?P<username>[\w.+-]+)'
|
||||
PROVIDER_PATTERN = r'(?P<provider_id>[\w.+-]+)(?:\:(?P<idp_slug>[\w.+-]+))?'
|
||||
|
||||
urlpatterns = patterns(
|
||||
'',
|
||||
url(r'^v0/users/' + USERNAME_PATTERN + '$', UserView.as_view(), name='third_party_auth_users_api'),
|
||||
url(r'^v0/providers/' + PROVIDER_PATTERN + '/users$', UserMappingView.as_view(),
|
||||
name='third_party_auth_user_mapping_api'),
|
||||
)
|
||||
|
||||
@@ -2,6 +2,11 @@
|
||||
Third Party Auth REST API views
|
||||
"""
|
||||
from django.contrib.auth.models import User
|
||||
from django.db.models import Q
|
||||
from django.http import Http404
|
||||
from rest_framework.generics import ListAPIView
|
||||
from rest_framework_oauth.authentication import OAuth2Authentication
|
||||
from social.apps.django_app.default.models import UserSocialAuth
|
||||
from openedx.core.lib.api.authentication import (
|
||||
OAuth2AuthenticationAllowInactiveUser,
|
||||
SessionAuthenticationAllowInactiveUser,
|
||||
@@ -9,10 +14,13 @@ from openedx.core.lib.api.authentication import (
|
||||
from openedx.core.lib.api.permissions import (
|
||||
ApiKeyHeaderPermission,
|
||||
)
|
||||
from rest_framework import status
|
||||
from rest_framework import status, exceptions
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
from third_party_auth import pipeline
|
||||
from third_party_auth.api import serializers
|
||||
from third_party_auth.api.permissions import ThirdPartyAuthProviderApiPermission
|
||||
from third_party_auth.provider import Registry
|
||||
|
||||
|
||||
class UserView(APIView):
|
||||
@@ -89,3 +97,136 @@ class UserView(APIView):
|
||||
return Response({
|
||||
"active": active_providers
|
||||
})
|
||||
|
||||
|
||||
class UserMappingView(ListAPIView):
|
||||
"""
|
||||
Map between the third party auth account IDs (remote_id) and EdX username.
|
||||
|
||||
This API is intended to be a server-to-server endpoint. An on-campus middleware or system should consume this.
|
||||
|
||||
**Use Case**
|
||||
|
||||
Get a paginated list of mappings between edX users and remote user IDs for all users currently
|
||||
linked to the given backend.
|
||||
|
||||
The list can be filtered by edx username or third party ids. The filter is limited by the max length of URL.
|
||||
It is suggested to query no more than 50 usernames or remote_ids in each request to stay within above
|
||||
limitation
|
||||
|
||||
The page size can be changed by specifying `page_size` parameter in the request.
|
||||
|
||||
**Example Requests**
|
||||
|
||||
GET /api/third_party_auth/v0/providers/{provider_id}/users
|
||||
|
||||
GET /api/third_party_auth/v0/providers/{provider_id}/users?username={username1},{username2}
|
||||
|
||||
GET /api/third_party_auth/v0/providers/{provider_id}/users?username={username1}&usernames={username2}
|
||||
|
||||
GET /api/third_party_auth/v0/providers/{provider_id}/users?remote_id={remote_id1},{remote_id2}
|
||||
|
||||
GET /api/third_party_auth/v0/providers/{provider_id}/users?remote_id={remote_id1}&remote_id={remote_id2}
|
||||
|
||||
GET /api/third_party_auth/v0/providers/{provider_id}/users?username={username1}&remote_id={remote_id1}
|
||||
|
||||
**URL Parameters**
|
||||
|
||||
* provider_id: The unique identifier of third_party_auth provider (e.g. "saml-ubc", "oa2-google", etc.
|
||||
This is not the same thing as the backend_name.). (Optional/future: We may also want to allow
|
||||
this to be an 'external domain' like 'ssl:MIT' so that this API can also search the legacy
|
||||
ExternalAuthMap table used by Standford/MIT)
|
||||
|
||||
**Query Parameters**
|
||||
|
||||
* remote_ids: Optional. List of comma separated remote (third party) user IDs to filter the result set.
|
||||
e.g. ?remote_ids=8721384623
|
||||
|
||||
* usernames: Optional. List of comma separated edX usernames to filter the result set.
|
||||
e.g. ?usernames=bob123,jane456
|
||||
|
||||
* page, page_size: Optional. Used for paging the result set, especially when getting
|
||||
an unfiltered list.
|
||||
|
||||
**Response Values**
|
||||
|
||||
If the request for information about the user is successful, an HTTP 200 "OK" response
|
||||
is returned.
|
||||
|
||||
The HTTP 200 response has the following values:
|
||||
|
||||
* count: The number of mappings for the backend.
|
||||
|
||||
* next: The URI to the next page of the mappings.
|
||||
|
||||
* previous: The URI to the previous page of the mappings.
|
||||
|
||||
* num_pages: The number of pages listing the mappings.
|
||||
|
||||
* results: A list of mappings returned. Each collection in the list
|
||||
contains these fields.
|
||||
|
||||
* username: The edx username
|
||||
|
||||
* remote_id: The Id from third party auth provider
|
||||
"""
|
||||
authentication_classes = (
|
||||
OAuth2Authentication,
|
||||
)
|
||||
|
||||
serializer_class = serializers.UserMappingSerializer
|
||||
provider = None
|
||||
|
||||
def get_queryset(self):
|
||||
provider_id = self.kwargs.get('provider_id')
|
||||
|
||||
# permission checking. We allow both API_KEY access and OAuth2 client credential access
|
||||
if not (
|
||||
self.request.user.is_superuser or ApiKeyHeaderPermission().has_permission(self.request, self) or
|
||||
ThirdPartyAuthProviderApiPermission(provider_id).has_permission(self.request, self)
|
||||
):
|
||||
raise exceptions.PermissionDenied()
|
||||
|
||||
# provider existence checking
|
||||
self.provider = Registry.get(provider_id)
|
||||
if not self.provider:
|
||||
raise Http404
|
||||
|
||||
query_set = UserSocialAuth.objects.select_related('user').filter(provider=self.provider.backend_name)
|
||||
|
||||
# build our query filters
|
||||
# When using multi-IdP backend, we only retrieve the ones that are for current IdP.
|
||||
# test if the current provider has a slug
|
||||
uid = self.provider.get_social_auth_uid('uid')
|
||||
if uid is not 'uid':
|
||||
# if yes, we add a filter for the slug on uid column
|
||||
query_set = query_set.filter(uid__startswith=uid[:-3])
|
||||
|
||||
query = Q()
|
||||
|
||||
usernames = self.request.QUERY_PARAMS.getlist('username', None)
|
||||
remote_ids = self.request.QUERY_PARAMS.getlist('remote_id', None)
|
||||
|
||||
if usernames:
|
||||
usernames = ','.join(usernames)
|
||||
usernames = set(usernames.split(',')) if usernames else set()
|
||||
if len(usernames):
|
||||
query = query | Q(user__username__in=usernames)
|
||||
|
||||
if remote_ids:
|
||||
remote_ids = ','.join(remote_ids)
|
||||
remote_ids = set(remote_ids.split(',')) if remote_ids else set()
|
||||
if len(remote_ids):
|
||||
query = query | Q(uid__in=[self.provider.get_social_auth_uid(remote_id) for remote_id in remote_ids])
|
||||
|
||||
return query_set.filter(query)
|
||||
|
||||
def get_serializer_context(self):
|
||||
"""
|
||||
Extra context provided to the serializer class with current provider. We need the provider to
|
||||
remove idp_slug from the remote_id if there is any
|
||||
"""
|
||||
context = super(UserMappingView, self).get_serializer_context()
|
||||
context['provider'] = self.provider
|
||||
|
||||
return context
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from south.utils import datetime_utils as datetime
|
||||
from south.db import db
|
||||
from south.v2 import SchemaMigration
|
||||
from django.db import models
|
||||
|
||||
|
||||
class Migration(SchemaMigration):
|
||||
|
||||
def forwards(self, orm):
|
||||
# Adding model 'ProviderApiPermissions'
|
||||
db.create_table('third_party_auth_providerapipermissions', (
|
||||
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
|
||||
('client', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['oauth2.Client'])),
|
||||
('provider_id', self.gf('django.db.models.fields.CharField')(max_length=255)),
|
||||
))
|
||||
db.send_create_signal('third_party_auth', ['ProviderApiPermissions'])
|
||||
|
||||
|
||||
def backwards(self, orm):
|
||||
# Deleting model 'ProviderApiPermissions'
|
||||
db.delete_table('third_party_auth_providerapipermissions')
|
||||
|
||||
|
||||
models = {
|
||||
'auth.group': {
|
||||
'Meta': {'object_name': 'Group'},
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
|
||||
'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
|
||||
},
|
||||
'auth.permission': {
|
||||
'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'},
|
||||
'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
|
||||
'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
|
||||
},
|
||||
'auth.user': {
|
||||
'Meta': {'object_name': 'User'},
|
||||
'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
|
||||
'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
|
||||
'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
|
||||
'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
|
||||
'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
|
||||
'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
|
||||
'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
|
||||
'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
|
||||
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
|
||||
},
|
||||
'contenttypes.contenttype': {
|
||||
'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
|
||||
'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
|
||||
},
|
||||
'oauth2.client': {
|
||||
'Meta': {'object_name': 'Client'},
|
||||
'client_id': ('django.db.models.fields.CharField', [], {'default': "'d9843a249e3f607e3177'", 'max_length': '255'}),
|
||||
'client_secret': ('django.db.models.fields.CharField', [], {'default': "'4009d3b78cee9c0144da4a47a11e2186c6a46d4c'", 'max_length': '255'}),
|
||||
'client_type': ('django.db.models.fields.IntegerField', [], {}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '255', 'blank': 'True'}),
|
||||
'redirect_uri': ('django.db.models.fields.URLField', [], {'max_length': '200'}),
|
||||
'url': ('django.db.models.fields.URLField', [], {'max_length': '200'}),
|
||||
'user': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'oauth2_client'", 'null': 'True', 'to': "orm['auth.User']"})
|
||||
},
|
||||
'third_party_auth.providerapipermissions': {
|
||||
'Meta': {'object_name': 'ProviderApiPermissions'},
|
||||
'client': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['oauth2.Client']"}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'provider_id': ('django.db.models.fields.CharField', [], {'max_length': '255'})
|
||||
},
|
||||
'third_party_auth.ltiproviderconfig': {
|
||||
'Meta': {'object_name': 'LTIProviderConfig'},
|
||||
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
|
||||
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
|
||||
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'icon_class': ('django.db.models.fields.CharField', [], {'default': "'fa-sign-in'", 'max_length': '50'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'lti_consumer_key': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
|
||||
'lti_consumer_secret': ('django.db.models.fields.CharField', [], {'default': "'011ecd0d33af228631f68d89b335cd6303c00508'", 'max_length': '255', 'blank': 'True'}),
|
||||
'lti_max_timestamp_age': ('django.db.models.fields.IntegerField', [], {'default': '10'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'}),
|
||||
'secondary': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'skip_email_verification': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'skip_registration_form': ('django.db.models.fields.BooleanField', [], {'default': 'False'})
|
||||
},
|
||||
'third_party_auth.oauth2providerconfig': {
|
||||
'Meta': {'object_name': 'OAuth2ProviderConfig'},
|
||||
'backend_name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
|
||||
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
|
||||
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
|
||||
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'icon_class': ('django.db.models.fields.CharField', [], {'default': "'fa-sign-in'", 'max_length': '50'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'key': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'}),
|
||||
'other_settings': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'secondary': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'secret': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'skip_email_verification': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'skip_registration_form': ('django.db.models.fields.BooleanField', [], {'default': 'False'})
|
||||
},
|
||||
'third_party_auth.samlconfiguration': {
|
||||
'Meta': {'object_name': 'SAMLConfiguration'},
|
||||
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
|
||||
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
|
||||
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'entity_id': ('django.db.models.fields.CharField', [], {'default': "'http://saml.example.com'", 'max_length': '255'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'org_info_str': ('django.db.models.fields.TextField', [], {'default': '\'{"en-US": {"url": "http://www.example.com", "displayname": "Example Inc.", "name": "example"}}\''}),
|
||||
'other_config_str': ('django.db.models.fields.TextField', [], {'default': '\'{\\n"SECURITY_CONFIG": {"metadataCacheDuration": 604800, "signMetadata": false}\\n}\''}),
|
||||
'private_key': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'public_key': ('django.db.models.fields.TextField', [], {'blank': 'True'})
|
||||
},
|
||||
'third_party_auth.samlproviderconfig': {
|
||||
'Meta': {'object_name': 'SAMLProviderConfig'},
|
||||
'attr_email': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
|
||||
'attr_first_name': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
|
||||
'attr_full_name': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
|
||||
'attr_last_name': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
|
||||
'attr_user_permanent_id': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
|
||||
'attr_username': ('django.db.models.fields.CharField', [], {'max_length': '128', 'blank': 'True'}),
|
||||
'backend_name': ('django.db.models.fields.CharField', [], {'default': "'tpa-saml'", 'max_length': '50'}),
|
||||
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
|
||||
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
|
||||
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'entity_id': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
|
||||
'icon_class': ('django.db.models.fields.CharField', [], {'default': "'fa-sign-in'", 'max_length': '50'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'idp_slug': ('django.db.models.fields.SlugField', [], {'max_length': '30'}),
|
||||
'metadata_source': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'}),
|
||||
'other_settings': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'secondary': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'skip_email_verification': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'skip_registration_form': ('django.db.models.fields.BooleanField', [], {'default': 'False'})
|
||||
},
|
||||
'third_party_auth.samlproviderdata': {
|
||||
'Meta': {'ordering': "('-fetched_at',)", 'object_name': 'SAMLProviderData'},
|
||||
'entity_id': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}),
|
||||
'expires_at': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'db_index': 'True'}),
|
||||
'fetched_at': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'public_key': ('django.db.models.fields.TextField', [], {}),
|
||||
'sso_url': ('django.db.models.fields.URLField', [], {'max_length': '200'})
|
||||
}
|
||||
}
|
||||
|
||||
complete_apps = ['third_party_auth']
|
||||
@@ -14,6 +14,7 @@ from django.utils.translation import ugettext_lazy as _
|
||||
import json
|
||||
import logging
|
||||
from provider.utils import long_token
|
||||
from provider.oauth2.models import Client
|
||||
from social.backends.base import BaseAuth
|
||||
from social.backends.oauth import OAuthAuth
|
||||
from social.backends.saml import SAMLAuth, SAMLIdentityProvider
|
||||
@@ -136,6 +137,14 @@ class ProviderConfig(ConfigurationModel):
|
||||
assert self.match_social_auth(social_auth)
|
||||
return social_auth.uid
|
||||
|
||||
def get_social_auth_uid(self, remote_id):
|
||||
"""
|
||||
Return the uid in social auth.
|
||||
|
||||
This is default implementation. Subclass may override with a different one.
|
||||
"""
|
||||
return remote_id
|
||||
|
||||
@classmethod
|
||||
def get_register_form_data(cls, pipeline_kwargs):
|
||||
"""Gets dict of data to display on the register form.
|
||||
@@ -305,6 +314,10 @@ class SAMLProviderConfig(ProviderConfig):
|
||||
# Remove the prefix from the UID
|
||||
return social_auth.uid[len(self.idp_slug) + 1:]
|
||||
|
||||
def get_social_auth_uid(self, remote_id):
|
||||
""" Get social auth uid from remote id by prepending idp_slug to the remote id """
|
||||
return '{}:{}'.format(self.idp_slug, remote_id)
|
||||
|
||||
def get_config(self):
|
||||
"""
|
||||
Return a SAMLIdentityProvider instance for use by SAMLAuthBackend.
|
||||
@@ -554,3 +567,22 @@ class LTIProviderConfig(ProviderConfig):
|
||||
class Meta(object):
|
||||
verbose_name = "Provider Configuration (LTI)"
|
||||
verbose_name_plural = verbose_name
|
||||
|
||||
|
||||
class ProviderApiPermissions(models.Model):
|
||||
"""
|
||||
This model links OAuth2 client with provider Id.
|
||||
|
||||
It gives permission for a OAuth2 client to access the information under certain IdPs.
|
||||
"""
|
||||
client = models.ForeignKey(Client)
|
||||
provider_id = models.CharField(
|
||||
max_length=255,
|
||||
help_text=(
|
||||
'Uniquely identify a provider. This is different from backend_name.'
|
||||
)
|
||||
)
|
||||
|
||||
class Meta(object): # pylint: disable=missing-docstring
|
||||
verbose_name = "Provider API Permission"
|
||||
verbose_name_plural = verbose_name + 's'
|
||||
|
||||
@@ -7,6 +7,8 @@ Used by Django and non-Django tests; must not have Django deps.
|
||||
from contextlib import contextmanager
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from provider.oauth2.models import Client as OAuth2Client
|
||||
from provider import constants
|
||||
import django.test
|
||||
import mock
|
||||
import os.path
|
||||
@@ -17,6 +19,7 @@ from third_party_auth.models import (
|
||||
SAMLConfiguration,
|
||||
LTIProviderConfig,
|
||||
cache as config_cache,
|
||||
ProviderApiPermissions,
|
||||
)
|
||||
|
||||
|
||||
@@ -113,6 +116,16 @@ class ThirdPartyAuthTestMixin(object):
|
||||
user.is_active = True
|
||||
user.save()
|
||||
|
||||
@staticmethod
|
||||
def configure_oauth_client():
|
||||
""" Configure a oauth client for testing """
|
||||
return OAuth2Client.objects.create(client_type=constants.CONFIDENTIAL)
|
||||
|
||||
@staticmethod
|
||||
def configure_api_permission(client, provider_id):
|
||||
""" Configure the client and provider_id pair. This will give the access to a client for that provider. """
|
||||
return ProviderApiPermissions.objects.create(client=client, provider_id=provider_id)
|
||||
|
||||
@staticmethod
|
||||
def read_data_file(filename):
|
||||
""" Read the contents of a file in the data folder """
|
||||
|
||||
Reference in New Issue
Block a user