[FC-0099] feat: add openedx-authz to library apis user_can_create_library and require_permission_for_library_key (#37501)
* feat: add the authz check to the library api function feat: add the authz publish check in rest_api blocks and containers feat: add the authz checks in libraries and refactor feat: add collections checks feat: update enforcement in serializer file refactor: refactor the permission check functions fix: fix value error fix: calling the queries twice * test: add structure for test and apply feedback refactor: refactor the tests and apply feedback fix: apply feedback Revert "refactor: refactor the tests and apply feedback" This reverts commit aa0bd527dd7bc7dec4a7ad7adb41a3c932f4a587. refactor: use constants and avoid mapping test: fix the test to have them in order docs: about we rely on bridgekeeper and the old check for two cases docs: update openedx/core/djangoapps/content_libraries/api/libraries.py Co-authored-by: Maria Grimaldi (Majo) <maria.grimaldi@edunext.co> refactor: use global scope wildcard instead of * refactor: allow receiving PermissionData objects refactor: do not inherit from BaseRolesTestCase to favor CL setup methods If both BaseRolesTestCase and ContentLibrariesRestApiTest define a method with the same name (e.g., setUp()), Python will use the one found first in the MRO, which is the one in BaseRolesTestCase because it is listed first in the class definition leading to unexpected behavior. refactor: remove unnecessary imports and indent * chore: bump openedx-authz version
This commit is contained in:
committed by
GitHub
parent
6c6fc5d551
commit
f4f14a6987
@@ -53,13 +53,11 @@ from django.core.validators import validate_unicode_slug
|
||||
from django.db import IntegrityError, transaction
|
||||
from django.db.models import Q, QuerySet
|
||||
from django.utils.translation import gettext as _
|
||||
from opaque_keys.edx.locator import (
|
||||
LibraryLocatorV2,
|
||||
LibraryUsageLocatorV2,
|
||||
)
|
||||
from openedx_events.content_authoring.data import (
|
||||
ContentLibraryData,
|
||||
)
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
|
||||
from openedx_authz import api as authz_api
|
||||
from openedx_authz.api import assign_role_to_user_in_scope
|
||||
from openedx_authz.constants import permissions as authz_permissions
|
||||
from openedx_events.content_authoring.data import ContentLibraryData
|
||||
from openedx_events.content_authoring.signals import (
|
||||
CONTENT_LIBRARY_CREATED,
|
||||
CONTENT_LIBRARY_DELETED,
|
||||
@@ -70,7 +68,6 @@ from openedx_learning.api.authoring_models import Component, LearningPackage
|
||||
from organizations.models import Organization
|
||||
from user_tasks.models import UserTaskArtifact, UserTaskStatus
|
||||
from xblock.core import XBlock
|
||||
from openedx_authz.api import assign_role_to_user_in_scope
|
||||
|
||||
from openedx.core.types import User as UserType
|
||||
|
||||
@@ -78,6 +75,7 @@ from .. import permissions, tasks
|
||||
from ..constants import ALL_RIGHTS_RESERVED
|
||||
from ..models import ContentLibrary, ContentLibraryPermission
|
||||
from .exceptions import LibraryAlreadyExists, LibraryPermissionIntegrityError
|
||||
from .permissions import LEGACY_LIB_PERMISSIONS
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -109,6 +107,7 @@ __all__ = [
|
||||
"revert_changes",
|
||||
"get_backup_task_status",
|
||||
"assign_library_role_to_user",
|
||||
"user_has_permission_across_lib_authz_systems",
|
||||
]
|
||||
|
||||
|
||||
@@ -245,7 +244,18 @@ def user_can_create_library(user: AbstractUser) -> bool:
|
||||
"""
|
||||
Check if the user has permission to create a content library.
|
||||
"""
|
||||
return user.has_perm(permissions.CAN_CREATE_CONTENT_LIBRARY)
|
||||
library_permission = permissions.CAN_CREATE_CONTENT_LIBRARY
|
||||
lib_permission_in_authz = _transform_legacy_lib_permission_to_authz_permission(library_permission)
|
||||
# The authz_api.is_user_allowed check only validates permissions within a specific library context. Since
|
||||
# creating a library is not tied to an existing one, we use user.has_perm (via Bridgekeeper) to check if the user
|
||||
# can create libraries, meaning they have the course creator role. In the future, this should rely on a global (*)
|
||||
# role defined in the Authorization Framework for instance-level resource creation.
|
||||
has_perms = user.has_perm(library_permission) or authz_api.is_user_allowed(
|
||||
user,
|
||||
lib_permission_in_authz,
|
||||
authz_api.data.GLOBAL_SCOPE_WILDCARD,
|
||||
)
|
||||
return has_perms
|
||||
|
||||
|
||||
def get_libraries_for_user(user, org=None, text_search=None, order=None) -> QuerySet[ContentLibrary]:
|
||||
@@ -336,7 +346,7 @@ def require_permission_for_library_key(library_key: LibraryLocatorV2, user: User
|
||||
library_obj = ContentLibrary.objects.get_by_key(library_key)
|
||||
# obj should be able to read any valid model object but mypy thinks it can only be
|
||||
# "User | AnonymousUser | None"
|
||||
if not user.has_perm(permission, obj=library_obj): # type:ignore[arg-type]
|
||||
if not user_has_permission_across_lib_authz_systems(user, permission, library_obj):
|
||||
raise PermissionDenied
|
||||
|
||||
return library_obj
|
||||
@@ -754,3 +764,90 @@ def get_backup_task_status(
|
||||
result['file'] = artifact.file
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _transform_legacy_lib_permission_to_authz_permission(permission: str) -> str:
|
||||
"""
|
||||
Transform a legacy content library permission to an openedx-authz permission.
|
||||
"""
|
||||
# There is no dedicated permission or role for can_create_content_library in openedx-authz yet,
|
||||
# so we reuse the same permission to rely on user.has_perm via Bridgekeeper.
|
||||
return {
|
||||
permissions.CAN_CREATE_CONTENT_LIBRARY: permissions.CAN_CREATE_CONTENT_LIBRARY,
|
||||
permissions.CAN_DELETE_THIS_CONTENT_LIBRARY: authz_permissions.DELETE_LIBRARY.identifier,
|
||||
permissions.CAN_EDIT_THIS_CONTENT_LIBRARY: authz_permissions.EDIT_LIBRARY_CONTENT.identifier,
|
||||
permissions.CAN_EDIT_THIS_CONTENT_LIBRARY_TEAM: authz_permissions.MANAGE_LIBRARY_TEAM.identifier,
|
||||
permissions.CAN_VIEW_THIS_CONTENT_LIBRARY: authz_permissions.VIEW_LIBRARY.identifier,
|
||||
permissions.CAN_VIEW_THIS_CONTENT_LIBRARY_TEAM: authz_permissions.VIEW_LIBRARY_TEAM.identifier,
|
||||
}.get(permission, permission)
|
||||
|
||||
|
||||
def _transform_authz_permission_to_legacy_lib_permission(permission: str) -> str:
|
||||
"""
|
||||
Transform an openedx-authz permission to a legacy content library permission.
|
||||
"""
|
||||
return {
|
||||
authz_permissions.PUBLISH_LIBRARY_CONTENT.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
|
||||
authz_permissions.CREATE_LIBRARY_COLLECTION.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
|
||||
authz_permissions.EDIT_LIBRARY_COLLECTION.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
|
||||
authz_permissions.DELETE_LIBRARY_COLLECTION.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
|
||||
}.get(permission, permission)
|
||||
|
||||
|
||||
def user_has_permission_across_lib_authz_systems(
|
||||
user: UserType,
|
||||
permission: str | authz_api.data.PermissionData,
|
||||
library_obj: ContentLibrary,
|
||||
) -> bool:
|
||||
"""
|
||||
Check whether a user has a given permission on a content library across both the
|
||||
legacy edx-platform permission system and the newer openedx-authz system.
|
||||
|
||||
The provided permission name is normalized to both systems (legacy and authz), and
|
||||
authorization is granted if either:
|
||||
- the user holds the legacy object-level permission on the ContentLibrary instance, or
|
||||
- the openedx-authz API allows the user for the corresponding permission on the library.
|
||||
|
||||
**Note:**
|
||||
Temporary: this function uses Bridgekeeper-based logic for cases not yet modeled in openedx-authz.
|
||||
|
||||
Current gaps covered here:
|
||||
- CAN_CREATE_CONTENT_LIBRARY: we call user.has_perm via Bridgekeeper to verify the user is a course creator.
|
||||
- CAN_VIEW_THIS_CONTENT_LIBRARY: we respect the allow_public_read flag via Bridgekeeper.
|
||||
|
||||
Replace these with authz_api.is_user_allowed once openedx-authz supports
|
||||
these conditions natively (including global (*) roles).
|
||||
|
||||
Args:
|
||||
user: The Django user (or user-like object) to check.
|
||||
permission: The permission identifier (either a legacy codename or an openedx-authz name).
|
||||
library_obj: The ContentLibrary instance to check against.
|
||||
|
||||
Returns:
|
||||
bool: True if the user is authorized by either system; otherwise False.
|
||||
"""
|
||||
if isinstance(permission, authz_api.data.PermissionData):
|
||||
permission = permission.identifier
|
||||
if _is_legacy_permission(permission):
|
||||
legacy_permission = permission
|
||||
authz_permission = _transform_legacy_lib_permission_to_authz_permission(permission)
|
||||
else:
|
||||
authz_permission = permission
|
||||
legacy_permission = _transform_authz_permission_to_legacy_lib_permission(permission)
|
||||
return (
|
||||
# Check both the legacy and the new openedx-authz permissions
|
||||
user.has_perm(perm=legacy_permission, obj=library_obj)
|
||||
or authz_api.is_user_allowed(
|
||||
user,
|
||||
authz_permission,
|
||||
str(library_obj.library_key),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _is_legacy_permission(permission: str) -> bool:
|
||||
"""
|
||||
Determine if the specified library permission is part of the legacy
|
||||
or the new openedx-authz system.
|
||||
"""
|
||||
return permission in LEGACY_LIB_PERMISSIONS
|
||||
|
||||
@@ -12,3 +12,13 @@ from ..permissions import (
|
||||
CAN_VIEW_THIS_CONTENT_LIBRARY,
|
||||
CAN_VIEW_THIS_CONTENT_LIBRARY_TEAM
|
||||
)
|
||||
|
||||
LEGACY_LIB_PERMISSIONS = frozenset({
|
||||
CAN_CREATE_CONTENT_LIBRARY,
|
||||
CAN_DELETE_THIS_CONTENT_LIBRARY,
|
||||
CAN_EDIT_THIS_CONTENT_LIBRARY,
|
||||
CAN_EDIT_THIS_CONTENT_LIBRARY_TEAM,
|
||||
CAN_LEARN_FROM_THIS_CONTENT_LIBRARY,
|
||||
CAN_VIEW_THIS_CONTENT_LIBRARY,
|
||||
CAN_VIEW_THIS_CONTENT_LIBRARY_TEAM,
|
||||
})
|
||||
|
||||
@@ -9,6 +9,7 @@ from django.urls import reverse
|
||||
from django.utils.decorators import method_decorator
|
||||
from drf_yasg.utils import swagger_auto_schema
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
|
||||
from openedx_authz.constants import permissions as authz_permissions
|
||||
from openedx_learning.api import authoring as authoring_api
|
||||
from rest_framework import status
|
||||
from rest_framework.exceptions import NotFound, ValidationError
|
||||
@@ -238,7 +239,7 @@ class LibraryBlockPublishView(APIView):
|
||||
api.require_permission_for_library_key(
|
||||
key.lib_key,
|
||||
request.user,
|
||||
permissions.CAN_EDIT_THIS_CONTENT_LIBRARY
|
||||
authz_permissions.PUBLISH_LIBRARY_CONTENT
|
||||
)
|
||||
api.publish_component_changes(key, request.user)
|
||||
return Response({})
|
||||
|
||||
@@ -13,6 +13,7 @@ from rest_framework.viewsets import ModelViewSet
|
||||
from rest_framework.status import HTTP_204_NO_CONTENT
|
||||
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2
|
||||
from openedx_authz.constants import permissions as authz_permissions
|
||||
from openedx_learning.api import authoring as authoring_api
|
||||
from openedx_learning.api.authoring_models import Collection
|
||||
|
||||
@@ -56,7 +57,6 @@ class LibraryCollectionsView(ModelViewSet):
|
||||
if self.request.method in ['OPTIONS', 'GET']
|
||||
else permissions.CAN_EDIT_THIS_CONTENT_LIBRARY
|
||||
)
|
||||
|
||||
self._content_library = api.require_permission_for_library_key(
|
||||
library_key,
|
||||
self.request.user,
|
||||
@@ -110,6 +110,11 @@ class LibraryCollectionsView(ModelViewSet):
|
||||
Create a Collection that belongs to a Content Library
|
||||
"""
|
||||
content_library = self.get_content_library()
|
||||
api.require_permission_for_library_key(
|
||||
content_library.library_key,
|
||||
request.user,
|
||||
authz_permissions.CREATE_LIBRARY_COLLECTION
|
||||
)
|
||||
create_serializer = ContentLibraryCollectionUpdateSerializer(data=request.data)
|
||||
create_serializer.is_valid(raise_exception=True)
|
||||
|
||||
@@ -144,6 +149,11 @@ class LibraryCollectionsView(ModelViewSet):
|
||||
Update a Collection that belongs to a Content Library
|
||||
"""
|
||||
content_library = self.get_content_library()
|
||||
api.require_permission_for_library_key(
|
||||
content_library.library_key,
|
||||
request.user,
|
||||
authz_permissions.EDIT_LIBRARY_COLLECTION
|
||||
)
|
||||
collection_key = kwargs["key"]
|
||||
|
||||
update_serializer = ContentLibraryCollectionUpdateSerializer(
|
||||
@@ -165,6 +175,12 @@ class LibraryCollectionsView(ModelViewSet):
|
||||
"""
|
||||
Soft-deletes a Collection that belongs to a Content Library
|
||||
"""
|
||||
content_library = self.get_content_library()
|
||||
api.require_permission_for_library_key(
|
||||
content_library.library_key,
|
||||
request.user,
|
||||
authz_permissions.DELETE_LIBRARY_COLLECTION
|
||||
)
|
||||
collection = super().get_object()
|
||||
assert collection.learning_package_id
|
||||
authoring_api.delete_collection(
|
||||
@@ -181,6 +197,11 @@ class LibraryCollectionsView(ModelViewSet):
|
||||
Restores a soft-deleted Collection that belongs to a Content Library
|
||||
"""
|
||||
content_library = self.get_content_library()
|
||||
api.require_permission_for_library_key(
|
||||
content_library.library_key,
|
||||
request.user,
|
||||
authz_permissions.EDIT_LIBRARY_COLLECTION
|
||||
)
|
||||
assert content_library.learning_package_id
|
||||
collection_key = kwargs["key"]
|
||||
authoring_api.restore_collection(
|
||||
@@ -198,6 +219,11 @@ class LibraryCollectionsView(ModelViewSet):
|
||||
Collection and items must all be part of the given library/learning package.
|
||||
"""
|
||||
content_library = self.get_content_library()
|
||||
api.require_permission_for_library_key(
|
||||
content_library.library_key,
|
||||
request.user,
|
||||
authz_permissions.EDIT_LIBRARY_COLLECTION
|
||||
)
|
||||
collection_key = kwargs["key"]
|
||||
|
||||
serializer = ContentLibraryItemKeysSerializer(data=request.data)
|
||||
|
||||
@@ -12,6 +12,7 @@ from drf_yasg.utils import swagger_auto_schema
|
||||
from drf_yasg import openapi
|
||||
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryContainerLocator
|
||||
from openedx_authz.constants import permissions as authz_permissions
|
||||
from openedx_learning.api import authoring as authoring_api
|
||||
from rest_framework.generics import GenericAPIView
|
||||
from rest_framework.response import Response
|
||||
@@ -379,7 +380,7 @@ class LibraryContainerPublishView(GenericAPIView):
|
||||
api.require_permission_for_library_key(
|
||||
container_key.lib_key,
|
||||
request.user,
|
||||
permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
|
||||
authz_permissions.PUBLISH_LIBRARY_CONTENT
|
||||
)
|
||||
api.publish_container_changes(container_key, request.user.id)
|
||||
# If we need to in the future, we could return a list of all the child containers/components that were
|
||||
|
||||
@@ -82,6 +82,7 @@ from drf_yasg.utils import swagger_auto_schema
|
||||
from user_tasks.models import UserTaskStatus
|
||||
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
|
||||
from openedx_authz.constants import permissions as authz_permissions
|
||||
from organizations.api import ensure_organization
|
||||
from organizations.exceptions import InvalidOrganizationException
|
||||
from organizations.models import Organization
|
||||
@@ -219,7 +220,7 @@ class LibraryRootView(GenericAPIView):
|
||||
"""
|
||||
Create a new content library.
|
||||
"""
|
||||
if not request.user.has_perm(permissions.CAN_CREATE_CONTENT_LIBRARY):
|
||||
if not api.user_can_create_library(request.user):
|
||||
raise PermissionDenied
|
||||
serializer = ContentLibraryMetadataSerializer(data=request.data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
@@ -479,7 +480,11 @@ class LibraryCommitView(APIView):
|
||||
descendants.
|
||||
"""
|
||||
key = LibraryLocatorV2.from_string(lib_key_str)
|
||||
api.require_permission_for_library_key(key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
|
||||
api.require_permission_for_library_key(
|
||||
key,
|
||||
request.user,
|
||||
authz_permissions.PUBLISH_LIBRARY_CONTENT
|
||||
)
|
||||
api.publish_changes(key, request.user.id)
|
||||
return Response({})
|
||||
|
||||
@@ -838,7 +843,7 @@ class LibraryRestoreView(APIView):
|
||||
"""
|
||||
Restore a library from a backup file.
|
||||
"""
|
||||
if not request.user.has_perm(permissions.CAN_CREATE_CONTENT_LIBRARY):
|
||||
if not api.user_can_create_library(request.user):
|
||||
raise PermissionDenied
|
||||
|
||||
serializer = LibraryRestoreFileSerializer(data=request.data)
|
||||
|
||||
@@ -14,6 +14,7 @@ from rest_framework.exceptions import ValidationError
|
||||
from user_tasks.models import UserTaskStatus
|
||||
|
||||
from openedx.core.djangoapps.content_libraries.tasks import LibraryRestoreTask
|
||||
from openedx.core.djangoapps.content_libraries import api
|
||||
from openedx.core.djangoapps.content_libraries.api.containers import ContainerType
|
||||
from openedx.core.djangoapps.content_libraries.constants import ALL_RIGHTS_RESERVED, LICENSE_OPTIONS
|
||||
from openedx.core.djangoapps.content_libraries.models import (
|
||||
@@ -75,7 +76,8 @@ class ContentLibraryMetadataSerializer(serializers.Serializer):
|
||||
return False
|
||||
|
||||
library_obj = ContentLibrary.objects.get_by_key(obj.key)
|
||||
return user.has_perm(permissions.CAN_EDIT_THIS_CONTENT_LIBRARY, obj=library_obj)
|
||||
return api.user_has_permission_across_lib_authz_systems(
|
||||
user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY, library_obj)
|
||||
|
||||
|
||||
class ContentLibraryUpdateSerializer(serializers.Serializer):
|
||||
|
||||
@@ -19,9 +19,10 @@ from django.db.models import Q
|
||||
from django.test import override_settings
|
||||
from django.test.client import Client
|
||||
from freezegun import freeze_time
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
|
||||
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2, LibraryCollectionLocator
|
||||
from organizations.models import Organization
|
||||
from rest_framework.test import APITestCase
|
||||
from rest_framework import status
|
||||
from openedx_learning.api.authoring_models import LearningPackage
|
||||
from user_tasks.models import UserTaskStatus, UserTaskArtifact
|
||||
|
||||
@@ -35,6 +36,9 @@ from openedx.core.djangoapps.content_libraries.tests.base import (
|
||||
URL_BLOCK_XBLOCK_HANDLER,
|
||||
ContentLibrariesRestApiTest,
|
||||
)
|
||||
from openedx_authz import api as authz_api
|
||||
from openedx_authz.constants import roles
|
||||
from openedx_authz.engine.enforcer import AuthzEnforcer
|
||||
from openedx.core.djangoapps.xblock import api as xblock_api
|
||||
from openedx.core.djangolib.testing.utils import skip_unless_cms
|
||||
from openedx_authz.constants.permissions import VIEW_LIBRARY
|
||||
@@ -1704,3 +1708,282 @@ class ContentLibraryXBlockValidationTest(APITestCase):
|
||||
secure_token='random',
|
||||
)))
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
|
||||
@skip_unless_cms
|
||||
class ContentLibrariesRestAPIAuthzIntegrationTestCase(ContentLibrariesRestApiTest):
|
||||
"""
|
||||
Test that Content Libraries REST API endpoints respect AuthZ roles and permissions.
|
||||
|
||||
Roles tested:
|
||||
1. Library Admin: Full access to all library operations.
|
||||
2. Library Author: Can view and edit library content, but cannot delete the library.
|
||||
3. Library Contributor: Can view and edit library content, but cannot delete or publish the library.
|
||||
4. Library User: Can only view library content.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self._seed_database_with_policies()
|
||||
|
||||
self.library_admin = UserFactory.create(
|
||||
username="library_admin",
|
||||
email="libadmin@example.com")
|
||||
self.library_author = UserFactory.create(
|
||||
username="library_author",
|
||||
email="libauthor@example.com")
|
||||
self.library_contributor = UserFactory.create(
|
||||
username="library_contributor",
|
||||
email="libcontributor@example.com")
|
||||
self.library_user = UserFactory.create(
|
||||
username="library_user",
|
||||
email="libuser@example.com")
|
||||
self.random_user = UserFactory.create(
|
||||
username="random_user",
|
||||
email="random@example.com")
|
||||
|
||||
# Define user groups by permission level
|
||||
self.list_of_all_users = [
|
||||
self.library_admin,
|
||||
self.library_author,
|
||||
self.library_contributor,
|
||||
self.library_user,
|
||||
self.random_user,
|
||||
]
|
||||
self.library_viewers = [self.library_admin, self.library_author, self.library_contributor, self.library_user]
|
||||
self.library_editors = [self.library_admin, self.library_author, self.library_contributor]
|
||||
self.library_publishers = [self.library_admin, self.library_author]
|
||||
self.library_collection_editors = [self.library_admin, self.library_author, self.library_contributor]
|
||||
self.library_deleters = [self.library_admin]
|
||||
|
||||
# Create library and assign roles
|
||||
library = self._create_library(
|
||||
slug="authzlib",
|
||||
title="AuthZ Test Library",
|
||||
description="Testing AuthZ",
|
||||
)
|
||||
self.lib_id = library["id"]
|
||||
|
||||
authz_api.assign_role_to_user_in_scope(
|
||||
self.library_admin.username,
|
||||
roles.LIBRARY_ADMIN.external_key, self.lib_id)
|
||||
authz_api.assign_role_to_user_in_scope(
|
||||
self.library_author.username,
|
||||
roles.LIBRARY_AUTHOR.external_key, self.lib_id)
|
||||
authz_api.assign_role_to_user_in_scope(
|
||||
self.library_contributor.username,
|
||||
roles.LIBRARY_CONTRIBUTOR.external_key, self.lib_id)
|
||||
authz_api.assign_role_to_user_in_scope(
|
||||
self.library_user.username,
|
||||
roles.LIBRARY_USER.external_key, self.lib_id)
|
||||
AuthzEnforcer.get_enforcer().load_policy() # Load policies to simulate fresh start
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after each test to ensure isolation."""
|
||||
super().tearDown()
|
||||
AuthzEnforcer.get_enforcer().clear_policy() # Clear policies after each test to ensure isolation
|
||||
|
||||
@classmethod
|
||||
def _seed_database_with_policies(cls):
|
||||
"""Seed the database with policies from the policy file.
|
||||
|
||||
This simulates the one-time database seeding that would happen
|
||||
during application deployment, separate from the runtime policy loading.
|
||||
"""
|
||||
import pkg_resources
|
||||
from openedx_authz.engine.utils import migrate_policy_between_enforcers
|
||||
import casbin
|
||||
|
||||
global_enforcer = AuthzEnforcer.get_enforcer()
|
||||
global_enforcer.load_policy()
|
||||
model_path = pkg_resources.resource_filename("openedx_authz.engine", "config/model.conf")
|
||||
policy_path = pkg_resources.resource_filename("openedx_authz.engine", "config/authz.policy")
|
||||
|
||||
migrate_policy_between_enforcers(
|
||||
source_enforcer=casbin.Enforcer(model_path, policy_path),
|
||||
target_enforcer=global_enforcer,
|
||||
)
|
||||
global_enforcer.clear_policy() # Clear to simulate fresh start for each test
|
||||
|
||||
def _all_users_excluding(self, excluded_users):
|
||||
return set(self.list_of_all_users) - set(excluded_users)
|
||||
|
||||
def test_view_permissions(self):
|
||||
"""
|
||||
Verify that only users with view permissions can view.
|
||||
"""
|
||||
# Test library view access
|
||||
for user in self.library_viewers:
|
||||
with self.as_user(user):
|
||||
self._get_library(self.lib_id, expect_response=status.HTTP_200_OK)
|
||||
for user in self._all_users_excluding(self.library_viewers):
|
||||
with self.as_user(user):
|
||||
self._get_library(self.lib_id, expect_response=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_edit_permissions(self):
|
||||
"""
|
||||
Verify that only users with edit permissions can edit.
|
||||
"""
|
||||
# Test library edit access
|
||||
for user in self.library_editors:
|
||||
with self.as_user(user):
|
||||
self._update_library(
|
||||
self.lib_id,
|
||||
description=f"Description by {user.username}",
|
||||
expect_response=status.HTTP_200_OK,
|
||||
)
|
||||
#Verify the permitted changes were made
|
||||
data = self._get_library(self.lib_id)
|
||||
assert data['description'] == f"Description by {user.username}"
|
||||
|
||||
for user in self._all_users_excluding(self.library_editors):
|
||||
with self.as_user(user):
|
||||
self._update_library(
|
||||
self.lib_id,
|
||||
description="I can't edit this.", expect_response=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# Verify the no permitted changes weren't made:
|
||||
data = self._get_library(self.lib_id)
|
||||
assert data['description'] != "I can't edit this."
|
||||
|
||||
# Library XBlock editing
|
||||
for user in self.library_editors:
|
||||
with self.as_user(user):
|
||||
# They can create blocks
|
||||
block_data = self._add_block_to_library(self.lib_id, "problem", f"problem_{user.username}")
|
||||
# They can modify blocks
|
||||
self._set_library_block_olx(
|
||||
block_data["id"],
|
||||
"<problem/>",
|
||||
expect_response=status.HTTP_200_OK)
|
||||
self._set_library_block_fields(
|
||||
block_data["id"],
|
||||
{"data": "<problem />", "metadata": {}},
|
||||
expect_response=status.HTTP_200_OK)
|
||||
self._set_library_block_asset(
|
||||
block_data["id"],
|
||||
"static/test.txt",
|
||||
b"data",
|
||||
expect_response=status.HTTP_200_OK)
|
||||
# They can remove blocks
|
||||
self._delete_library_block(block_data["id"], expect_response=status.HTTP_200_OK)
|
||||
# Verify deletion
|
||||
self._get_library_block(block_data["id"], expect_response=404)
|
||||
|
||||
# Recreate blocks for further tests
|
||||
block_data = self._add_block_to_library(self.lib_id, "problem", "new_problem")
|
||||
|
||||
for user in self._all_users_excluding(self.library_editors):
|
||||
with self.as_user(user):
|
||||
self._add_block_to_library(
|
||||
self.lib_id,
|
||||
"problem",
|
||||
"problem1",
|
||||
expect_response=status.HTTP_403_FORBIDDEN)
|
||||
# They can't modify blocks
|
||||
self._set_library_block_olx(
|
||||
block_data["id"],
|
||||
"<problem/>",
|
||||
expect_response=status.HTTP_403_FORBIDDEN)
|
||||
self._set_library_block_fields(
|
||||
block_data["id"],
|
||||
{"data": "<problem />", "metadata": {}},
|
||||
expect_response=status.HTTP_403_FORBIDDEN)
|
||||
self._set_library_block_asset(
|
||||
block_data["id"],
|
||||
"static/test.txt",
|
||||
b"data",
|
||||
expect_response=status.HTTP_403_FORBIDDEN)
|
||||
# They can't remove blocks
|
||||
self._delete_library_block(block_data["id"], expect_response=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_publish_permissions(self):
|
||||
"""
|
||||
Verify that only users with publish permissions can publish.
|
||||
"""
|
||||
# Test publish access
|
||||
for user in self.library_publishers:
|
||||
with self.as_user(user):
|
||||
block_data = self._add_block_to_library(self.lib_id, "problem", f"problem_{user.username}_1")
|
||||
self._publish_library_block(block_data["id"], expect_response=status.HTTP_200_OK)
|
||||
block_data = self._add_block_to_library(self.lib_id, "problem", f"problem_{user.username}_2")
|
||||
assert self._get_library(self.lib_id)['has_unpublished_changes'] is True
|
||||
self._commit_library_changes(self.lib_id, expect_response=status.HTTP_200_OK)
|
||||
assert self._get_library(self.lib_id)['has_unpublished_changes'] is False
|
||||
|
||||
block_data = self._add_block_to_library(self.lib_id, "problem", "draft_problem")
|
||||
assert self._get_library(self.lib_id)['has_unpublished_changes'] is True
|
||||
|
||||
for user in self._all_users_excluding(self.library_publishers):
|
||||
with self.as_user(user):
|
||||
self._publish_library_block(block_data["id"], expect_response=status.HTTP_403_FORBIDDEN)
|
||||
self._commit_library_changes(self.lib_id, expect_response=status.HTTP_403_FORBIDDEN)
|
||||
# Verify that no changes were published
|
||||
assert self._get_library(self.lib_id)['has_unpublished_changes'] is True
|
||||
|
||||
def test_collection_permissions(self):
|
||||
"""
|
||||
Verify that only users with collection permissions can perform collection actions.
|
||||
"""
|
||||
library_key = LibraryLocatorV2.from_string(self.lib_id)
|
||||
block_data = self._add_block_to_library(self.lib_id, "problem", "collection_problem")
|
||||
# Test library collection access
|
||||
for user in self.library_collection_editors:
|
||||
with self.as_user(user):
|
||||
# Create collection
|
||||
collection_data = self._create_collection(
|
||||
self.lib_id,
|
||||
title=f"Temp Collection {user.username}",
|
||||
expect_response=status.HTTP_200_OK)
|
||||
collection_id = collection_data["key"]
|
||||
collection_key = LibraryCollectionLocator(lib_key=library_key, collection_id=collection_id)
|
||||
# Update collection
|
||||
self._update_collection(collection_key, title="Updated Collection", expect_response=status.HTTP_200_OK)
|
||||
self._add_items_to_collection(
|
||||
collection_key,
|
||||
item_keys=[block_data["id"]],
|
||||
expect_response=status.HTTP_200_OK)
|
||||
# Delete collection
|
||||
self._soft_delete_collection(collection_key, expect_response=status.HTTP_204_NO_CONTENT)
|
||||
|
||||
collection_data = self._create_collection(
|
||||
self.lib_id,
|
||||
title="New Temp Collection",
|
||||
expect_response=status.HTTP_200_OK)
|
||||
collection_id = collection_data["key"]
|
||||
collection_key = LibraryCollectionLocator(lib_key=library_key, collection_id=collection_id)
|
||||
|
||||
for user in self._all_users_excluding(self.library_collection_editors):
|
||||
with self.as_user(user):
|
||||
# Attempt to create collection
|
||||
self._create_collection(
|
||||
self.lib_id,
|
||||
title="Unauthorized Collection",
|
||||
expect_response=status.HTTP_403_FORBIDDEN)
|
||||
# Attempt to update collection
|
||||
self._update_collection(
|
||||
collection_key,
|
||||
title="Unauthorized Change",
|
||||
expect_response=status.HTTP_403_FORBIDDEN)
|
||||
self._add_items_to_collection(
|
||||
collection_key,
|
||||
item_keys=[block_data["id"]],
|
||||
expect_response=status.HTTP_403_FORBIDDEN)
|
||||
# Attempt to delete collection
|
||||
self._soft_delete_collection(collection_key, expect_response=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_delete_library_permissions(self):
|
||||
"""
|
||||
Verify that only users with delete permissions can delete a library.
|
||||
"""
|
||||
# Test library delete access
|
||||
for user in self._all_users_excluding(self.library_deleters):
|
||||
with self.as_user(user):
|
||||
result = self._delete_library(self.lib_id, expect_response=status.HTTP_403_FORBIDDEN)
|
||||
assert 'detail' in result # Error message
|
||||
assert 'permission' in result['detail'].lower()
|
||||
|
||||
for user in self.library_deleters:
|
||||
with self.as_user(user):
|
||||
result = self._delete_library(self.lib_id, expect_response=status.HTTP_200_OK)
|
||||
assert result == {}
|
||||
|
||||
@@ -826,7 +826,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.13.0
|
||||
openedx-authz==0.15.0
|
||||
# via -r requirements/edx/kernel.in
|
||||
openedx-calc==4.0.2
|
||||
# via -r requirements/edx/kernel.in
|
||||
|
||||
@@ -1376,7 +1376,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.13.0
|
||||
openedx-authz==0.15.0
|
||||
# via
|
||||
# -r requirements/edx/doc.txt
|
||||
# -r requirements/edx/testing.txt
|
||||
|
||||
@@ -1003,7 +1003,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.13.0
|
||||
openedx-authz==0.15.0
|
||||
# via -r requirements/edx/base.txt
|
||||
openedx-calc==4.0.2
|
||||
# via -r requirements/edx/base.txt
|
||||
|
||||
@@ -1049,7 +1049,7 @@ openedx-atlas==0.7.0
|
||||
# enterprise-integrated-channels
|
||||
# openedx-authz
|
||||
# openedx-forum
|
||||
openedx-authz==0.13.0
|
||||
openedx-authz==0.15.0
|
||||
# via -r requirements/edx/base.txt
|
||||
openedx-calc==4.0.2
|
||||
# via -r requirements/edx/base.txt
|
||||
|
||||
Reference in New Issue
Block a user