Merge pull request #12372 from edx/feature/catalog-admin
Merge catalog admin into master.
This commit is contained in:
@@ -15,5 +15,4 @@ class ApiAccessRequestAdmin(admin.ModelAdmin):
|
||||
readonly_fields = ('user', 'website', 'reason', 'company_name', 'company_address', 'contacted', )
|
||||
exclude = ('site',)
|
||||
|
||||
|
||||
admin.site.register(ApiAccessConfig, ConfigurationModelAdmin)
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
"""Forms for API management."""
|
||||
from django import forms
|
||||
from django.contrib.auth.models import User
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from openedx.core.djangoapps.api_admin.models import ApiAccessRequest
|
||||
from openedx.core.djangoapps.api_admin.models import ApiAccessRequest, Catalog
|
||||
from openedx.core.djangoapps.api_admin.widgets import TermsOfServiceCheckboxInput
|
||||
|
||||
|
||||
@@ -32,3 +33,50 @@ class ApiAccessRequestForm(forms.ModelForm):
|
||||
# Get rid of the colons at the end of the field labels.
|
||||
kwargs.setdefault('label_suffix', '')
|
||||
super(ApiAccessRequestForm, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class ViewersWidget(forms.widgets.TextInput):
|
||||
"""Form widget to display a comma-separated list of usernames."""
|
||||
|
||||
def render(self, name, value, attrs=None):
|
||||
return super(ViewersWidget, self).render(name, ', '.join(value), attrs)
|
||||
|
||||
|
||||
class ViewersField(forms.Field):
|
||||
"""Custom form field for a comma-separated list of usernames."""
|
||||
|
||||
widget = ViewersWidget
|
||||
|
||||
default_error_messages = {
|
||||
'invalid': 'Enter a comma-separated list of usernames.',
|
||||
}
|
||||
|
||||
def to_python(self, value):
|
||||
"""Parse out a comma-separated list of usernames."""
|
||||
return [username.strip() for username in value.split(',')]
|
||||
|
||||
def validate(self, value):
|
||||
super(ViewersField, self).validate(value)
|
||||
nonexistent_users = []
|
||||
for username in value:
|
||||
try:
|
||||
User.objects.get(username=username)
|
||||
except User.DoesNotExist:
|
||||
nonexistent_users.append(username)
|
||||
if nonexistent_users:
|
||||
raise forms.ValidationError(
|
||||
_('The following users do not exist: {usernames}.').format(usernames=nonexistent_users)
|
||||
)
|
||||
|
||||
|
||||
class CatalogForm(forms.ModelForm):
|
||||
"""Form to create a catalog."""
|
||||
|
||||
viewers = ViewersField()
|
||||
|
||||
class Meta(object):
|
||||
model = Catalog
|
||||
fields = ('name', 'query', 'viewers')
|
||||
help_texts = {
|
||||
'viewers': _('Comma-separated list of usernames which will be able to view this catalog.'),
|
||||
}
|
||||
|
||||
26
openedx/core/djangoapps/api_admin/migrations/0006_catalog.py
Normal file
26
openedx/core/djangoapps/api_admin/migrations/0006_catalog.py
Normal file
@@ -0,0 +1,26 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('api_admin', '0005_auto_20160414_1232'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Catalog',
|
||||
fields=[
|
||||
('id', models.IntegerField(serialize=False, primary_key=True)),
|
||||
('name', models.CharField(max_length=255)),
|
||||
('query', models.TextField()),
|
||||
('viewers', models.TextField()),
|
||||
],
|
||||
options={
|
||||
'managed': False,
|
||||
},
|
||||
),
|
||||
]
|
||||
@@ -179,3 +179,45 @@ def _send_decision_email(instance):
|
||||
instance.contacted = True
|
||||
except SMTPException:
|
||||
log.exception('Error sending API user notification email for request [%s].', instance.id)
|
||||
|
||||
|
||||
class Catalog(models.Model):
|
||||
"""A (non-Django-managed) model for Catalogs in the course discovery service."""
|
||||
|
||||
id = models.IntegerField(primary_key=True) # pylint: disable=invalid-name
|
||||
name = models.CharField(max_length=255, null=False, blank=False)
|
||||
query = models.TextField(null=False, blank=False)
|
||||
viewers = models.TextField()
|
||||
|
||||
class Meta(object):
|
||||
# Catalogs live in course discovery, so we do not create any
|
||||
# tables in LMS. Instead we override the save method to not
|
||||
# touch the database, and use our API client to communicate
|
||||
# with discovery.
|
||||
managed = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
attributes = kwargs.get('attributes')
|
||||
if attributes:
|
||||
self.id = attributes['id'] # pylint: disable=invalid-name
|
||||
self.name = attributes['name']
|
||||
self.query = attributes['query']
|
||||
self.viewers = attributes['viewers']
|
||||
else:
|
||||
super(Catalog, self).__init__(*args, **kwargs)
|
||||
|
||||
def save(self, **kwargs): # pylint: disable=unused-argument
|
||||
return None
|
||||
|
||||
@property
|
||||
def attributes(self):
|
||||
"""Return a dictionary representation of this catalog."""
|
||||
return {
|
||||
'id': self.id,
|
||||
'name': self.name,
|
||||
'query': self.query,
|
||||
'viewers': self.viewers,
|
||||
}
|
||||
|
||||
def __unicode__(self):
|
||||
return u'Catalog {name} [{query}]'.format(name=self.name, query=self.query)
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
"""Factories for API management."""
|
||||
import factory
|
||||
from factory.fuzzy import FuzzyInteger, FuzzyText
|
||||
from factory.django import DjangoModelFactory
|
||||
from oauth2_provider.models import get_application_model
|
||||
|
||||
from microsite_configuration.tests.factories import SiteFactory
|
||||
from openedx.core.djangoapps.api_admin.models import ApiAccessRequest
|
||||
from openedx.core.djangoapps.api_admin.models import ApiAccessRequest, Catalog
|
||||
from student.tests.factories import UserFactory
|
||||
|
||||
|
||||
@@ -27,3 +28,14 @@ class ApplicationFactory(DjangoModelFactory):
|
||||
|
||||
authorization_grant_type = Application.GRANT_CLIENT_CREDENTIALS
|
||||
client_type = Application.CLIENT_CONFIDENTIAL
|
||||
|
||||
|
||||
class CatalogFactory(DjangoModelFactory):
|
||||
"""Factory for Catalog objects."""
|
||||
|
||||
class Meta(object):
|
||||
model = Catalog
|
||||
|
||||
id = FuzzyInteger(0, 999) # pylint: disable=invalid-name
|
||||
query = '*'
|
||||
name = FuzzyText(prefix='test-catalog')
|
||||
|
||||
@@ -1,21 +1,29 @@
|
||||
#pylint: disable=missing-docstring
|
||||
import unittest
|
||||
import json
|
||||
from urlparse import urljoin
|
||||
|
||||
import ddt
|
||||
from django.conf import settings
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test import TestCase
|
||||
from django.test.utils import override_settings
|
||||
from edx_oauth2_provider.tests.factories import ClientFactory
|
||||
import httpretty
|
||||
from oauth2_provider.models import get_application_model
|
||||
|
||||
from openedx.core.djangoapps.api_admin.models import ApiAccessRequest, ApiAccessConfig
|
||||
from openedx.core.djangoapps.api_admin.tests.factories import ApiAccessRequestFactory, ApplicationFactory
|
||||
from openedx.core.djangoapps.api_admin.tests.factories import (
|
||||
ApiAccessRequestFactory, ApplicationFactory, CatalogFactory
|
||||
)
|
||||
from openedx.core.djangoapps.api_admin.tests.utils import VALID_DATA
|
||||
from student.tests.factories import UserFactory
|
||||
|
||||
|
||||
Application = get_application_model() # pylint: disable=invalid-name
|
||||
|
||||
MOCK_CATALOG_API_URL_ROOT = 'https://api.example.com/'
|
||||
|
||||
|
||||
class ApiAdminTest(TestCase):
|
||||
|
||||
@@ -206,3 +214,169 @@ class ApiTosViewTest(ApiAdminTest):
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn('Terms of Service', response.content)
|
||||
|
||||
|
||||
class CatalogTest(ApiAdminTest):
|
||||
|
||||
def setUp(self):
|
||||
super(CatalogTest, self).setUp()
|
||||
password = 'abc123'
|
||||
self.user = UserFactory(password=password, is_staff=True)
|
||||
self.client.login(username=self.user.username, password=password)
|
||||
ClientFactory(user=self.user, name='course-discovery', url=MOCK_CATALOG_API_URL_ROOT)
|
||||
|
||||
def mock_catalog_api(self, url, data, method=httpretty.GET, status_code=200):
|
||||
self.assertTrue(httpretty.is_enabled(), msg='httpretty must be enabled to mock Catalog API calls.')
|
||||
httpretty.reset()
|
||||
httpretty.register_uri(
|
||||
method,
|
||||
urljoin(MOCK_CATALOG_API_URL_ROOT, url),
|
||||
body=json.dumps(data),
|
||||
content_type='application/json',
|
||||
status=status_code
|
||||
)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CatalogSearchViewTest(CatalogTest):
|
||||
|
||||
def setUp(self):
|
||||
super(CatalogSearchViewTest, self).setUp()
|
||||
self.url = reverse('api_admin:catalog-search')
|
||||
|
||||
def test_get(self):
|
||||
response = self.client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@httpretty.activate
|
||||
def test_post(self):
|
||||
catalog_user = UserFactory()
|
||||
self.mock_catalog_api('api/v1/catalogs/', {'results': []})
|
||||
response = self.client.post(self.url, {'username': catalog_user.username})
|
||||
self.assertRedirects(response, reverse('api_admin:catalog-list', kwargs={'username': catalog_user.username}))
|
||||
|
||||
def test_post_without_username(self):
|
||||
response = self.client.post(self.url, {'username': ''})
|
||||
self.assertRedirects(response, reverse('api_admin:catalog-search'))
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CatalogListViewTest(CatalogTest):
|
||||
|
||||
def setUp(self):
|
||||
super(CatalogListViewTest, self).setUp()
|
||||
self.catalog_user = UserFactory()
|
||||
self.url = reverse('api_admin:catalog-list', kwargs={'username': self.catalog_user.username})
|
||||
|
||||
@httpretty.activate
|
||||
def test_get(self):
|
||||
catalog = CatalogFactory(viewers=[self.catalog_user.username])
|
||||
self.mock_catalog_api('api/v1/catalogs/', {
|
||||
'results': [catalog.attributes]
|
||||
})
|
||||
response = self.client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(catalog.name, response.content.decode('utf-8'))
|
||||
|
||||
@httpretty.activate
|
||||
def test_post(self):
|
||||
catalog_data = {
|
||||
'name': 'test-catalog',
|
||||
'query': '*',
|
||||
'viewers': [self.catalog_user.username]
|
||||
}
|
||||
catalog_id = 123
|
||||
self.mock_catalog_api('api/v1/catalogs/', dict(catalog_data, id=catalog_id), method=httpretty.POST)
|
||||
response = self.client.post(self.url, catalog_data)
|
||||
self.assertEqual(httpretty.last_request().method, 'POST')
|
||||
self.mock_catalog_api('api/v1/catalogs/{}/'.format(catalog_id), CatalogFactory().attributes)
|
||||
self.assertRedirects(response, reverse('api_admin:catalog-edit', kwargs={'catalog_id': catalog_id}))
|
||||
|
||||
@httpretty.activate
|
||||
def test_post_invalid(self):
|
||||
catalog = CatalogFactory(viewers=[self.catalog_user.username])
|
||||
self.mock_catalog_api('api/v1/catalogs/', {
|
||||
'results': [catalog.attributes]
|
||||
})
|
||||
response = self.client.post(self.url, {
|
||||
'name': '',
|
||||
'query': '*',
|
||||
'viewers': [self.catalog_user.username]
|
||||
})
|
||||
self.assertEqual(response.status_code, 400)
|
||||
# Assert that no POST was made to the catalog API
|
||||
self.assertEqual(len([r for r in httpretty.httpretty.latest_requests if r.method == 'POST']), 0)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CatalogEditViewTest(CatalogTest):
|
||||
|
||||
def setUp(self):
|
||||
super(CatalogEditViewTest, self).setUp()
|
||||
self.catalog_user = UserFactory()
|
||||
self.catalog = CatalogFactory(viewers=[self.catalog_user.username])
|
||||
self.url = reverse('api_admin:catalog-edit', kwargs={'catalog_id': self.catalog.id})
|
||||
|
||||
@httpretty.activate
|
||||
def test_get(self):
|
||||
self.mock_catalog_api('api/v1/catalogs/{}/'.format(self.catalog.id), self.catalog.attributes)
|
||||
response = self.client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(self.catalog.name, response.content.decode('utf-8'))
|
||||
|
||||
@httpretty.activate
|
||||
def test_delete(self):
|
||||
self.mock_catalog_api(
|
||||
'api/v1/catalogs/{}/'.format(self.catalog.id),
|
||||
self.catalog.attributes,
|
||||
method=httpretty.DELETE
|
||||
)
|
||||
response = self.client.post(self.url, {'delete-catalog': 'on'})
|
||||
self.assertRedirects(response, reverse('api_admin:catalog-search'))
|
||||
self.assertEqual(httpretty.last_request().method, 'DELETE')
|
||||
self.assertEqual(
|
||||
httpretty.last_request().path,
|
||||
'/api/v1/catalogs/{}/'.format(self.catalog.id)
|
||||
)
|
||||
self.assertEqual(len(httpretty.httpretty.latest_requests), 1)
|
||||
|
||||
@httpretty.activate
|
||||
def test_edit(self):
|
||||
self.mock_catalog_api(
|
||||
'api/v1/catalogs/{}/'.format(self.catalog.id),
|
||||
self.catalog.attributes, method=httpretty.PATCH
|
||||
)
|
||||
new_attributes = dict(self.catalog.attributes, **{'delete-catalog': 'off', 'name': 'changed'})
|
||||
response = self.client.post(self.url, new_attributes)
|
||||
self.mock_catalog_api('api/v1/catalogs/{}/'.format(self.catalog.id), new_attributes)
|
||||
self.assertRedirects(response, reverse('api_admin:catalog-edit', kwargs={'catalog_id': self.catalog.id}))
|
||||
|
||||
@httpretty.activate
|
||||
def test_edit_invalid(self):
|
||||
self.mock_catalog_api('api/v1/catalogs/{}/'.format(self.catalog.id), self.catalog.attributes)
|
||||
new_attributes = dict(self.catalog.attributes, **{'delete-catalog': 'off', 'name': ''})
|
||||
response = self.client.post(self.url, new_attributes)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
# Assert that no PATCH was made to the Catalog API
|
||||
self.assertEqual(len([r for r in httpretty.httpretty.latest_requests if r.method == 'PATCH']), 0)
|
||||
|
||||
|
||||
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
|
||||
class CatalogPreviewViewTest(CatalogTest):
|
||||
|
||||
def setUp(self):
|
||||
super(CatalogPreviewViewTest, self).setUp()
|
||||
self.url = reverse('api_admin:catalog-preview')
|
||||
|
||||
@httpretty.activate
|
||||
def test_get(self):
|
||||
data = {'count': 1, 'results': ['test data'], 'next': None, 'prev': None}
|
||||
self.mock_catalog_api('api/v1/courses/', data)
|
||||
response = self.client.get(self.url, {'q': '*'})
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(json.loads(response.content), data)
|
||||
|
||||
def test_get_without_query(self):
|
||||
response = self.client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(json.loads(response.content), {'count': 0, 'results': [], 'next': None, 'prev': None})
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
"""URLs for API access management."""
|
||||
|
||||
from django.conf.urls import url
|
||||
from django.contrib.admin.views.decorators import staff_member_required
|
||||
from django.contrib.auth.decorators import login_required
|
||||
|
||||
from openedx.core.djangoapps.api_admin.decorators import api_access_enabled_or_404
|
||||
from openedx.core.djangoapps.api_admin.views import ApiRequestView, ApiRequestStatusView, ApiTosView
|
||||
from openedx.core.djangoapps.api_admin.views import (
|
||||
ApiRequestView, ApiRequestStatusView, ApiTosView, CatalogListView, CatalogEditView,
|
||||
CatalogPreviewView, CatalogSearchView
|
||||
)
|
||||
|
||||
urlpatterns = (
|
||||
url(
|
||||
@@ -17,6 +21,42 @@ urlpatterns = (
|
||||
api_access_enabled_or_404(ApiTosView.as_view()),
|
||||
name="api-tos"
|
||||
),
|
||||
url(
|
||||
r'^catalogs/preview/$',
|
||||
staff_member_required(
|
||||
api_access_enabled_or_404(CatalogPreviewView.as_view()),
|
||||
login_url='dashboard',
|
||||
redirect_field_name=None
|
||||
),
|
||||
name='catalog-preview',
|
||||
),
|
||||
url(
|
||||
r'^catalogs/user/(?P<username>[\w.@+-]+)/$',
|
||||
staff_member_required(
|
||||
api_access_enabled_or_404(CatalogListView.as_view()),
|
||||
login_url='dashboard',
|
||||
redirect_field_name=None
|
||||
),
|
||||
name='catalog-list',
|
||||
),
|
||||
url(
|
||||
r'^catalogs/(?P<catalog_id>\d+)/$',
|
||||
staff_member_required(
|
||||
api_access_enabled_or_404(CatalogEditView.as_view()),
|
||||
login_url='dashboard',
|
||||
redirect_field_name=None
|
||||
),
|
||||
name='catalog-edit',
|
||||
),
|
||||
url(
|
||||
r'^catalogs/$',
|
||||
staff_member_required(
|
||||
api_access_enabled_or_404(CatalogSearchView.as_view()),
|
||||
login_url='dashboard',
|
||||
redirect_field_name=None
|
||||
),
|
||||
name='catalog-search',
|
||||
),
|
||||
url(
|
||||
r'^$',
|
||||
api_access_enabled_or_404(login_required(ApiRequestView.as_view())),
|
||||
|
||||
15
openedx/core/djangoapps/api_admin/utils.py
Normal file
15
openedx/core/djangoapps/api_admin/utils.py
Normal file
@@ -0,0 +1,15 @@
|
||||
""" Course Discovery API Service. """
|
||||
from edx_rest_api_client.client import EdxRestApiClient
|
||||
from openedx.core.lib.token_utils import get_id_token
|
||||
from provider.oauth2.models import Client
|
||||
|
||||
CLIENT_NAME = 'course-discovery'
|
||||
|
||||
|
||||
def course_discovery_api_client(user):
|
||||
""" Returns a Course Discovery API client setup with authentication for the specified user. """
|
||||
course_discovery_client = Client.objects.get(name=CLIENT_NAME)
|
||||
return EdxRestApiClient(
|
||||
course_discovery_client.url,
|
||||
jwt=get_id_token(user, CLIENT_NAME)
|
||||
)
|
||||
@@ -4,6 +4,7 @@ import logging
|
||||
from django.conf import settings
|
||||
from django.contrib.sites.shortcuts import get_current_site
|
||||
from django.core.urlresolvers import reverse_lazy, reverse
|
||||
from django.http.response import JsonResponse
|
||||
from django.shortcuts import redirect
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.views.generic import View
|
||||
@@ -15,8 +16,9 @@ from oauth2_provider.views import ApplicationRegistration
|
||||
|
||||
from edxmako.shortcuts import render_to_response
|
||||
from openedx.core.djangoapps.api_admin.decorators import require_api_access
|
||||
from openedx.core.djangoapps.api_admin.forms import ApiAccessRequestForm
|
||||
from openedx.core.djangoapps.api_admin.models import ApiAccessRequest
|
||||
from openedx.core.djangoapps.api_admin.forms import ApiAccessRequestForm, CatalogForm
|
||||
from openedx.core.djangoapps.api_admin.models import ApiAccessRequest, Catalog
|
||||
from openedx.core.djangoapps.api_admin.utils import course_discovery_api_client
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -115,3 +117,112 @@ class ApiTosView(TemplateView):
|
||||
"""View to show the API Terms of Service."""
|
||||
|
||||
template_name = 'api_admin/terms_of_service.html'
|
||||
|
||||
|
||||
class CatalogSearchView(View):
|
||||
"""View to search for catalogs belonging to a user."""
|
||||
|
||||
def get(self, request):
|
||||
"""Display a form to search for catalogs belonging to a user."""
|
||||
return render_to_response('api_admin/catalogs/search.html')
|
||||
|
||||
def post(self, request):
|
||||
"""Redirect to the list view for the given user."""
|
||||
username = request.POST.get('username')
|
||||
# If no username is provided, bounce back to this page.
|
||||
if not username:
|
||||
return redirect(reverse('api_admin:catalog-search'))
|
||||
return redirect(reverse('api_admin:catalog-list', kwargs={'username': username}))
|
||||
|
||||
|
||||
class CatalogListView(View):
|
||||
"""View to list existing catalogs and create new ones."""
|
||||
|
||||
template = 'api_admin/catalogs/list.html'
|
||||
|
||||
def get(self, request, username):
|
||||
"""Display a list of a user's catalogs."""
|
||||
client = course_discovery_api_client(request.user)
|
||||
response = client.api.v1.catalogs.get(username=username)
|
||||
catalogs = [Catalog(attributes=catalog) for catalog in response['results']]
|
||||
return render_to_response(self.template, {
|
||||
'username': username,
|
||||
'catalogs': catalogs,
|
||||
'form': CatalogForm(initial={'viewers': [username]}),
|
||||
'preview_url': reverse('api_admin:catalog-preview'),
|
||||
'catalog_api_url': client.api.v1.courses.url(),
|
||||
})
|
||||
|
||||
def post(self, request, username):
|
||||
"""Create a new catalog for a user."""
|
||||
form = CatalogForm(request.POST)
|
||||
client = course_discovery_api_client(request.user)
|
||||
|
||||
if not form.is_valid():
|
||||
response = client.api.v1.catalogs.get(username=username)
|
||||
catalogs = [Catalog(attributes=catalog) for catalog in response['results']]
|
||||
return render_to_response(self.template, {
|
||||
'form': form,
|
||||
'catalogs': catalogs,
|
||||
'username': username,
|
||||
'preview_url': reverse('api_admin:catalog-preview'),
|
||||
'catalog_api_url': client.api.v1.courses.url(),
|
||||
}, status=400)
|
||||
|
||||
attrs = form.instance.attributes
|
||||
catalog = client.api.v1.catalogs.post(attrs)
|
||||
return redirect(reverse('api_admin:catalog-edit', kwargs={'catalog_id': catalog['id']}))
|
||||
|
||||
|
||||
class CatalogEditView(View):
|
||||
"""View to edit an individual catalog."""
|
||||
|
||||
def get(self, request, catalog_id):
|
||||
"""Display a form to edit this catalog."""
|
||||
client = course_discovery_api_client(request.user)
|
||||
response = client.api.v1.catalogs(catalog_id).get()
|
||||
catalog = Catalog(attributes=response)
|
||||
form = CatalogForm(instance=catalog)
|
||||
return render_to_response('api_admin/catalogs/edit.html', {
|
||||
'catalog': catalog,
|
||||
'form': form,
|
||||
'preview_url': reverse('api_admin:catalog-preview'),
|
||||
'catalog_api_url': client.api.v1.courses.url(),
|
||||
})
|
||||
|
||||
def post(self, request, catalog_id):
|
||||
"""Update or delete this catalog."""
|
||||
client = course_discovery_api_client(request.user)
|
||||
if request.POST.get('delete-catalog') == 'on':
|
||||
client.api.v1.catalogs(catalog_id).delete()
|
||||
return redirect(reverse('api_admin:catalog-search'))
|
||||
form = CatalogForm(request.POST)
|
||||
if not form.is_valid():
|
||||
response = client.api.v1.catalogs(catalog_id).get()
|
||||
catalog = Catalog(attributes=response)
|
||||
return render_to_response('api_admin/catalogs/edit.html', {
|
||||
'catalog': catalog,
|
||||
'form': form,
|
||||
'preview_url': reverse('api_admin:catalog-preview'),
|
||||
'catalog_api_url': client.api.v1.courses.url(),
|
||||
}, status=400)
|
||||
catalog = client.api.v1.catalogs(catalog_id).patch(form.instance.attributes)
|
||||
return redirect(reverse('api_admin:catalog-edit', kwargs={'catalog_id': catalog['id']}))
|
||||
|
||||
|
||||
class CatalogPreviewView(View):
|
||||
"""Endpoint to preview courses for a query."""
|
||||
|
||||
def get(self, request):
|
||||
"""
|
||||
Return the results of a query against the course catalog API. If no
|
||||
query parameter is given, returns an empty result set.
|
||||
"""
|
||||
client = course_discovery_api_client(request.user)
|
||||
# Just pass along the request params including limit/offset pagination
|
||||
if 'q' in request.GET:
|
||||
results = client.api.v1.courses.get(**request.GET)
|
||||
# Ensure that we don't just return all the courses if no query is given
|
||||
else:
|
||||
results = {'count': 0, 'results': [], 'next': None, 'prev': None}
|
||||
return JsonResponse(results)
|
||||
|
||||
21
openedx/core/lib/rsa_key_utils.py
Normal file
21
openedx/core/lib/rsa_key_utils.py
Normal file
@@ -0,0 +1,21 @@
|
||||
""" Utils for RSA keys"""
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives.serialization import(
|
||||
Encoding, PublicFormat, PrivateFormat, NoEncryption
|
||||
)
|
||||
|
||||
|
||||
def generate_rsa_key_pair(key_size=2048):
|
||||
""" Generates a public and private RSA PEM encoded key pair"""
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=key_size,
|
||||
backend=default_backend()
|
||||
)
|
||||
private_key_str = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
|
||||
public_key_str = private_key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
|
||||
|
||||
# Not intented for programmatic use, so we print the keys out
|
||||
print public_key_str
|
||||
print private_key_str
|
||||
@@ -1,6 +1,8 @@
|
||||
"""Utilities for working with ID tokens."""
|
||||
import datetime
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
import jwt
|
||||
@@ -63,3 +65,51 @@ def get_id_token(user, client_name):
|
||||
}
|
||||
|
||||
return jwt.encode(payload, client.client_secret)
|
||||
|
||||
|
||||
def get_asymmetric_token(user, client_id):
|
||||
"""Construct a JWT signed with this app's private key.
|
||||
|
||||
The JWT includes the following claims:
|
||||
|
||||
preferred_username (str): The user's username. The claim name is borrowed from edx-oauth2-provider.
|
||||
name (str): The user's full name.
|
||||
email (str): The user's email address.
|
||||
administrator (Boolean): Whether the user has staff permissions.
|
||||
iss (str): Registered claim. Identifies the principal that issued the JWT.
|
||||
exp (int): Registered claim. Identifies the expiration time on or after which
|
||||
the JWT must NOT be accepted for processing.
|
||||
iat (int): Registered claim. Identifies the time at which the JWT was issued.
|
||||
sub (int): Registered claim. Identifies the user. This implementation uses the raw user id.
|
||||
|
||||
Arguments:
|
||||
user (User): User for which to generate the JWT.
|
||||
|
||||
Returns:
|
||||
str: the JWT
|
||||
|
||||
"""
|
||||
private_key = load_pem_private_key(settings.PRIVATE_RSA_KEY, None, default_backend())
|
||||
|
||||
try:
|
||||
# Service users may not have user profiles.
|
||||
full_name = UserProfile.objects.get(user=user).name
|
||||
except UserProfile.DoesNotExist:
|
||||
full_name = None
|
||||
|
||||
now = datetime.datetime.utcnow()
|
||||
expires_in = getattr(settings, 'OAUTH_ID_TOKEN_EXPIRATION', 30)
|
||||
|
||||
payload = {
|
||||
'preferred_username': user.username,
|
||||
'name': full_name,
|
||||
'email': user.email,
|
||||
'administrator': user.is_staff,
|
||||
'iss': settings.OAUTH_OIDC_ISSUER,
|
||||
'exp': now + datetime.timedelta(seconds=expires_in),
|
||||
'iat': now,
|
||||
'aud': client_id,
|
||||
'sub': anonymous_id_for_user(user, None),
|
||||
}
|
||||
|
||||
return jwt.encode(payload, private_key, algorithm='RS512')
|
||||
|
||||
Reference in New Issue
Block a user