feat: authorize advanced settings endpoints via openedx-authz when fl… (#38009)
This commit is contained in:
@@ -2,13 +2,23 @@
|
||||
Tests for the course advanced settings API.
|
||||
"""
|
||||
import json
|
||||
import pkg_resources
|
||||
from unittest.mock import patch
|
||||
|
||||
import casbin
|
||||
import ddt
|
||||
from django.test import override_settings
|
||||
from django.urls import reverse
|
||||
from milestones.tests.utils import MilestonesTestCaseMixin
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from cms.djangoapps.contentstore.tests.utils import CourseTestCase
|
||||
from common.djangoapps.student.tests.factories import UserFactory
|
||||
from openedx.core import toggles as core_toggles
|
||||
from openedx_authz.api.users import assign_role_to_user_in_scope
|
||||
from openedx_authz.constants.roles import COURSE_STAFF
|
||||
from openedx_authz.engine.enforcer import AuthzEnforcer
|
||||
from openedx_authz.engine.utils import migrate_policy_between_enforcers
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@@ -91,3 +101,105 @@ class CourseAdvanceSettingViewTest(CourseTestCase, MilestonesTestCaseMixin):
|
||||
with override_settings(FEATURES={setting: False}):
|
||||
resp = self.client.get(self.url, {"fetch_all": 0})
|
||||
assert excluded_field not in resp.data
|
||||
|
||||
|
||||
@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True)
|
||||
class AdvancedSettingsAuthzTest(CourseTestCase):
|
||||
"""
|
||||
Tests for AdvancedCourseSettingsView authorization with openedx-authz.
|
||||
|
||||
These tests enable the AUTHZ_COURSE_AUTHORING_FLAG by default.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self._seed_database_with_policies()
|
||||
self.url = reverse(
|
||||
"cms.djangoapps.contentstore:v0:course_advanced_settings",
|
||||
kwargs={"course_id": self.course.id},
|
||||
)
|
||||
|
||||
# Create test users
|
||||
self.authorized_user = UserFactory()
|
||||
self.unauthorized_user = UserFactory()
|
||||
|
||||
# Assign role to authorized user
|
||||
assign_role_to_user_in_scope(
|
||||
self.authorized_user.username,
|
||||
COURSE_STAFF.external_key,
|
||||
str(self.course.id)
|
||||
)
|
||||
AuthzEnforcer.get_enforcer().load_policy()
|
||||
|
||||
# Create API clients and force_authenticate
|
||||
self.authorized_client = APIClient()
|
||||
self.authorized_client.force_authenticate(user=self.authorized_user)
|
||||
self.unauthorized_client = APIClient()
|
||||
self.unauthorized_client.force_authenticate(user=self.unauthorized_user)
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
AuthzEnforcer.get_enforcer().clear_policy()
|
||||
|
||||
@classmethod
|
||||
def _seed_database_with_policies(cls):
|
||||
"""Seed the database with policies from the policy file."""
|
||||
global_enforcer = AuthzEnforcer.get_enforcer()
|
||||
global_enforcer.load_policy()
|
||||
model_path = pkg_resources.resource_filename("openedx_authz.engine", "config/model.conf")
|
||||
policy_path = pkg_resources.resource_filename("openedx_authz.engine", "config/authz.policy")
|
||||
migrate_policy_between_enforcers(
|
||||
source_enforcer=casbin.Enforcer(model_path, policy_path),
|
||||
target_enforcer=global_enforcer,
|
||||
)
|
||||
|
||||
def test_authorized_for_specific_course(self, mock_flag):
|
||||
"""User authorized for specific course can access."""
|
||||
response = self.authorized_client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_unauthorized_for_specific_course(self, mock_flag):
|
||||
"""User without authorization for specific course cannot access."""
|
||||
response = self.unauthorized_client.get(self.url)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
def test_unauthorized_for_different_course(self, mock_flag):
|
||||
"""User authorized for one course cannot access another course."""
|
||||
other_course = self.store.create_course("OtherOrg", "OtherCourse", "Run", self.user.id)
|
||||
other_url = reverse(
|
||||
"cms.djangoapps.contentstore:v0:course_advanced_settings",
|
||||
kwargs={"course_id": other_course.id},
|
||||
)
|
||||
response = self.authorized_client.get(other_url)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
def test_staff_authorized_by_default(self, mock_flag):
|
||||
"""Staff users are authorized by default."""
|
||||
response = self.client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_superuser_authorized_by_default(self, mock_flag):
|
||||
"""Superusers are authorized by default."""
|
||||
superuser = UserFactory(is_superuser=True, is_staff=False)
|
||||
superuser_client = APIClient()
|
||||
superuser_client.force_authenticate(user=superuser)
|
||||
response = superuser_client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_patch_authorized_for_specific_course(self, mock_flag):
|
||||
"""User authorized for specific course can PATCH."""
|
||||
response = self.authorized_client.patch(
|
||||
self.url,
|
||||
{"display_name": {"value": "Test"}},
|
||||
content_type="application/json"
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_patch_unauthorized_for_specific_course(self, mock_flag):
|
||||
"""User without authorization for specific course cannot PATCH."""
|
||||
response = self.unauthorized_client.patch(
|
||||
self.url,
|
||||
{"display_name": {"value": "Test"}},
|
||||
content_type="application/json"
|
||||
)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
@@ -11,7 +11,7 @@ from xmodule.modulestore.django import modulestore
|
||||
|
||||
from cms.djangoapps.models.settings.course_metadata import CourseMetadata
|
||||
from cms.djangoapps.contentstore.api.views.utils import get_bool_param
|
||||
from common.djangoapps.student.auth import has_studio_read_access, has_studio_write_access
|
||||
from common.djangoapps.student.auth import check_course_advanced_settings_access
|
||||
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin, verify_course_exists, view_auth_classes
|
||||
from ..serializers import CourseAdvancedSettingsSerializer
|
||||
from ....views.course import update_course_advanced_settings
|
||||
@@ -115,7 +115,7 @@ class AdvancedCourseSettingsView(DeveloperErrorViewMixin, APIView):
|
||||
if not filter_query_data.is_valid():
|
||||
raise ValidationError(filter_query_data.errors)
|
||||
course_key = CourseKey.from_string(course_id)
|
||||
if not has_studio_read_access(request.user, course_key):
|
||||
if not check_course_advanced_settings_access(request.user, course_key, access_type='read'):
|
||||
self.permission_denied(request)
|
||||
course_block = modulestore().get_course(course_key)
|
||||
fetch_all = get_bool_param(request, 'fetch_all', True)
|
||||
@@ -184,7 +184,7 @@ class AdvancedCourseSettingsView(DeveloperErrorViewMixin, APIView):
|
||||
along with all the course's settings similar to a ``GET`` request.
|
||||
"""
|
||||
course_key = CourseKey.from_string(course_id)
|
||||
if not has_studio_write_access(request.user, course_key):
|
||||
if not check_course_advanced_settings_access(request.user, course_key, access_type='write'):
|
||||
self.permission_denied(request)
|
||||
course_block = modulestore().get_course(course_key)
|
||||
updated_data = update_course_advanced_settings(course_block, request.data, request.user)
|
||||
|
||||
@@ -13,7 +13,7 @@ from rest_framework.views import APIView
|
||||
from cms.djangoapps.contentstore.views.course import get_course_and_check_access
|
||||
from cms.djangoapps.contentstore.utils import get_proctored_exam_settings_url
|
||||
from cms.djangoapps.models.settings.course_metadata import CourseMetadata
|
||||
from common.djangoapps.student.auth import has_studio_advanced_settings_access
|
||||
from common.djangoapps.student.auth import check_course_advanced_settings_access
|
||||
from xmodule.course_block import (
|
||||
get_available_providers,
|
||||
get_requires_escalation_email_providers,
|
||||
@@ -21,7 +21,6 @@ from xmodule.course_block import (
|
||||
from openedx.core.djangoapps.course_apps.toggles import exams_ida_enabled
|
||||
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin, verify_course_exists, view_auth_classes
|
||||
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
|
||||
|
||||
from ..serializers import (
|
||||
LimitedProctoredExamSettingsSerializer,
|
||||
ProctoredExamConfigurationSerializer,
|
||||
@@ -260,7 +259,9 @@ class ProctoringErrorsView(DeveloperErrorViewMixin, APIView):
|
||||
```
|
||||
"""
|
||||
course_key = CourseKey.from_string(course_id)
|
||||
if not has_studio_advanced_settings_access(request.user):
|
||||
if not check_course_advanced_settings_access(
|
||||
request.user, course_key, access_type='feature_restricted'
|
||||
):
|
||||
self.permission_denied(request)
|
||||
|
||||
course_block = modulestore().get_course(course_key)
|
||||
|
||||
@@ -13,6 +13,7 @@ from rest_framework.test import APITestCase
|
||||
|
||||
from cms.djangoapps.contentstore.tests.test_utils import AuthorizeStaffTestCase
|
||||
from cms.djangoapps.contentstore.tests.utils import CourseTestCase
|
||||
from openedx.core import toggles as core_toggles
|
||||
from openedx.core.djangoapps.course_apps.toggles import EXAMS_IDA
|
||||
from xmodule.modulestore.django import (
|
||||
modulestore,
|
||||
@@ -453,3 +454,39 @@ class CourseProctoringErrorsViewTest(CourseTestCase, PermissionAccessMixin):
|
||||
self.assertEqual(
|
||||
response.status_code, 403 if disable_advanced_settings else 200
|
||||
)
|
||||
|
||||
@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True)
|
||||
@patch('common.djangoapps.student.auth.authz_api.is_user_allowed')
|
||||
def test_authz_user_allowed(self, mock_is_user_allowed, mock_flag):
|
||||
"""User with authz permission can access proctoring errors."""
|
||||
mock_is_user_allowed.return_value = True
|
||||
response = self.non_staff_client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
mock_is_user_allowed.assert_called_once()
|
||||
|
||||
@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True)
|
||||
@patch('common.djangoapps.student.auth.authz_api.is_user_allowed')
|
||||
def test_authz_user_not_allowed(self, mock_is_user_allowed, mock_flag):
|
||||
"""User without authz permission cannot access proctoring errors."""
|
||||
mock_is_user_allowed.return_value = False
|
||||
response = self.non_staff_client.get(self.url)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
mock_is_user_allowed.assert_called_once()
|
||||
|
||||
@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True)
|
||||
@patch('common.djangoapps.student.auth.authz_api.is_user_allowed')
|
||||
def test_authz_with_disable_advanced_settings_staff_allowed(self, mock_is_user_allowed, mock_flag):
|
||||
"""Staff user can access when DISABLE_ADVANCED_SETTINGS is enabled, bypassing authz."""
|
||||
with override_settings(FEATURES={"DISABLE_ADVANCED_SETTINGS": True}):
|
||||
response = self.client.get(self.url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
mock_is_user_allowed.assert_not_called()
|
||||
|
||||
@patch.object(core_toggles.AUTHZ_COURSE_AUTHORING_FLAG, 'is_enabled', return_value=True)
|
||||
@patch('common.djangoapps.student.auth.authz_api.is_user_allowed')
|
||||
def test_authz_with_disable_advanced_settings_non_staff_denied(self, mock_is_user_allowed, mock_flag):
|
||||
"""Non-staff user is denied when DISABLE_ADVANCED_SETTINGS is enabled, bypassing authz."""
|
||||
with override_settings(FEATURES={"DISABLE_ADVANCED_SETTINGS": True}):
|
||||
response = self.non_staff_client.get(self.url)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
mock_is_user_allowed.assert_not_called()
|
||||
|
||||
@@ -10,7 +10,10 @@ from ccx_keys.locator import CCXBlockUsageLocator, CCXLocator
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from opaque_keys.edx.locator import LibraryLocator
|
||||
from openedx_authz import api as authz_api
|
||||
from openedx_authz.constants.permissions import COURSES_MANAGE_ADVANCED_SETTINGS
|
||||
|
||||
from openedx.core import toggles as core_toggles
|
||||
from common.djangoapps.student.roles import (
|
||||
CourseBetaTesterRole,
|
||||
CourseCreatorRole,
|
||||
@@ -177,6 +180,49 @@ def has_studio_read_access(user, course_key):
|
||||
return bool(STUDIO_VIEW_CONTENT & get_user_permissions(user, course_key))
|
||||
|
||||
|
||||
def check_course_advanced_settings_access(user, course_key, access_type='read'):
|
||||
"""
|
||||
Check if user has access to advanced settings for a course.
|
||||
|
||||
Uses openedx-authz when AUTHZ_COURSE_AUTHORING_FLAG is enabled,
|
||||
otherwise falls back to legacy permission checks.
|
||||
|
||||
If the DISABLE_ADVANCED_SETTINGS feature flag is on, then authz will not be used for the
|
||||
permission check.
|
||||
|
||||
Args:
|
||||
user: Django user object
|
||||
course_key: CourseKey for the course
|
||||
access_type: Type of access to check. Options:
|
||||
- 'read': Check studio read access (default)
|
||||
- 'write': Check studio write access
|
||||
- 'feature_restricted': Check access based on the DISABLE_ADVANCED_SETTINGS feature
|
||||
|
||||
Returns:
|
||||
bool: True if user has permission, False otherwise
|
||||
"""
|
||||
if core_toggles.AUTHZ_COURSE_AUTHORING_FLAG.is_enabled(course_key):
|
||||
# For feature_restricted access type, check DISABLE_ADVANCED_SETTINGS feature
|
||||
if (
|
||||
access_type == 'feature_restricted'
|
||||
and settings.FEATURES.get('DISABLE_ADVANCED_SETTINGS', False)
|
||||
):
|
||||
# When feature is disabled, only staff/superuser can access (bypass authz)
|
||||
return user.is_staff or user.is_superuser
|
||||
# Otherwise check authz permission
|
||||
return authz_api.is_user_allowed(user.username, COURSES_MANAGE_ADVANCED_SETTINGS.identifier, str(course_key))
|
||||
|
||||
# Legacy permission checks
|
||||
if access_type == 'read':
|
||||
return has_studio_read_access(user, course_key)
|
||||
if access_type == 'feature_restricted':
|
||||
return has_studio_advanced_settings_access(user)
|
||||
if access_type == 'write':
|
||||
return has_studio_write_access(user, course_key)
|
||||
|
||||
raise ValueError(f"Invalid access_type: {access_type}")
|
||||
|
||||
|
||||
def is_content_creator(user, org):
|
||||
"""
|
||||
Check if the user has the role to create content.
|
||||
|
||||
@@ -804,7 +804,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.20.0
|
||||
openedx-authz==0.22.0
|
||||
# via -r requirements/edx/kernel.in
|
||||
openedx-calc==4.0.3
|
||||
# via -r requirements/edx/kernel.in
|
||||
|
||||
@@ -1357,7 +1357,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.20.0
|
||||
openedx-authz==0.22.0
|
||||
# via
|
||||
# -r requirements/edx/doc.txt
|
||||
# -r requirements/edx/testing.txt
|
||||
|
||||
@@ -980,7 +980,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.20.0
|
||||
openedx-authz==0.22.0
|
||||
# via -r requirements/edx/base.txt
|
||||
openedx-calc==4.0.3
|
||||
# via -r requirements/edx/base.txt
|
||||
|
||||
@@ -1030,7 +1030,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.20.0
|
||||
openedx-authz==0.22.0
|
||||
# via -r requirements/edx/base.txt
|
||||
openedx-calc==4.0.3
|
||||
# via -r requirements/edx/base.txt
|
||||
|
||||
Reference in New Issue
Block a user