feat: API to manage course team roles across multiple courses (#36990)

* feat: API to fetch course-roles mapping by user and org

* fix: added docstring and resolve pylint issues

* feat: support bulk course team role updates via PUT API

* fix: refactor APIs based on user permissions

* chore: improve Swagger schema for course_team endpoints

* fix: refactor GET and PUT api code

* fix: apply pylint rules and optimize code

* fix: resolve test cases for supoort apis

* fix: change url path
This commit is contained in:
Pandi Ganesh
2025-07-29 11:33:09 +05:30
committed by GitHub
parent 0913b51170
commit b5bcb37c5a
9 changed files with 1003 additions and 0 deletions

View File

@@ -0,0 +1,47 @@
"""
Serializers for use in the support app.
"""
from datetime import datetime
import pytz
from rest_framework import serializers
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
class CourseTeamManageSerializer(serializers.ModelSerializer):
"""Serializer for course team management context data"""
role = serializers.SerializerMethodField()
status = serializers.SerializerMethodField()
class Meta:
model = CourseOverview
fields = ("id", "display_name", "role", "status")
def get_role(self, obj):
course_role_map = self.context.get("course_role_map", {})
return course_role_map.get(str(obj.id))
def get_status(self, obj):
"""
Determine if the course is active or archived based on end date.
Returns 'active' if course end is null or in the future, 'archived' otherwise.
"""
if obj.end is None or obj.end >= datetime.now().replace(tzinfo=pytz.UTC):
return "active"
return "archived"
def to_representation(self, instance):
data = super().to_representation(instance)
course_key = instance.id
return {
"course_id": str(course_key),
"course_name": data["display_name"],
"role": data["role"],
"status": data["status"],
"org": course_key.org,
"run": course_key.run,
"number": course_key.course,
}

View File

@@ -0,0 +1,11 @@
"""
URL definitions for the support API.
"""
from django.urls import include, path
app_name = "lms.djangoapps.support.rest_api"
urlpatterns = [
path("v1/", include("lms.djangoapps.support.rest_api.v1.urls", namespace="v1")),
]

View File

@@ -0,0 +1,383 @@
"""
Tests for support views.
"""
import ddt
from django.urls import reverse
from rest_framework.test import APIClient
from common.djangoapps.student.models import CourseEnrollment
from common.djangoapps.student.models.user import CourseAccessRole
from common.djangoapps.student.roles import CourseInstructorRole, CourseStaffRole
from common.djangoapps.student.tests.factories import (
TEST_PASSWORD,
AdminFactory,
InstructorFactory,
StaffFactory,
SuperuserFactory,
UserFactory
)
from lms.djangoapps.support.tests.test_views import SupportViewTestCase
from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory
@ddt.ddt
class CourseTeamManageAPIViewTest(SupportViewTestCase):
"""
Tests for CourseTeamManagementAPIView.
Covers:
- GET: Viewing user's course team roles by authenticated users.
- PUT: Assigning/revoking user roles by authorized users.
"""
def setUp(self):
super().setUp()
self.staff = AdminFactory()
self.superuser = SuperuserFactory()
self.user = UserFactory()
self.org = "TestOrg"
self.client = APIClient()
self.url = reverse("lms.djangoapps.support.rest_api:v1:manage_course_team")
self.extra_courses = [
CourseOverviewFactory.create(
org=self.org, run=f"CS10{i}", display_name=f"Test-Course-{i}"
)
for i in range(3)
]
self.instructor_user = InstructorFactory.create(
course_key=self.extra_courses[0].id
)
self.staff_user = StaffFactory.create(course_key=self.extra_courses[1].id)
# --- GET API TEST CASES ---
def test_get_api_missing_query_params_returns_400(self):
"""GET API: Returns 400 if no query parameters are provided."""
self.client.login(username=self.user.username, password=TEST_PASSWORD)
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 400)
def test_get_api_with_username_parameter(self):
"""GET API: Can query by username parameter."""
self.client.login(username=self.staff.username, password=TEST_PASSWORD)
resp = self.client.get(self.url, {"username": self.user.username})
self.assertEqual(resp.status_code, 200)
self.assertIsInstance(resp.data, list)
def test_get_api_with_user_id_parameter(self):
"""GET API: Can query by user_id parameter."""
self.client.login(username=self.staff.username, password=TEST_PASSWORD)
resp = self.client.get(self.url, {"user_id": self.user.id})
self.assertEqual(resp.status_code, 200)
self.assertIsInstance(resp.data, list)
def test_get_api_nonexistent_user_returns_404(self):
"""GET API: Returns 404 for a nonexistent user email."""
self.client.login(username=self.user.username, password=TEST_PASSWORD)
resp = self.client.get(self.url, {"email": "notfound@example.com"})
self.assertEqual(resp.status_code, 404)
def test_get_api_unauthenticated_user_returns_401(self):
"""GET API: Unauthenticated users receive 401 Unauthorized."""
resp = self.client.get(self.url, {"email": self.user.email})
self.assertEqual(resp.status_code, 401)
@ddt.data(
("instructor", "instructor"),
("staff", "staff"),
)
@ddt.unpack
def test_get_api_admin_can_fetch_course_roles(self, assigned_role, expected_role):
"""GET API: Admin/staff user can view roles for any course for target user."""
self.client.login(username=self.staff.username, password=TEST_PASSWORD)
CourseAccessRole.objects.filter(user=self.user, org=self.org).delete()
if assigned_role == "instructor":
CourseInstructorRole(self.extra_courses[0].id).add_users(self.user)
elif assigned_role == "staff":
CourseStaffRole(self.extra_courses[0].id).add_users(self.user)
response = self.client.get(self.url, {"email": self.user.email})
self.assertEqual(response.status_code, 200)
self.assertIsInstance(response.data, list)
course_found = False
for course in response.data:
if course["course_id"] == str(self.extra_courses[0].id):
with self.subTest(role=assigned_role):
self.assertEqual(course["role"], expected_role)
# Verify new fields are present
self.assertIn("status", course)
self.assertIn("org", course)
self.assertIn("run", course)
self.assertIn("number", course)
self.assertIn("course_name", course)
course_found = True
self.assertTrue(course_found, "Expected course not found in response.")
def test_get_api_instructor_can_only_see_their_courses(self):
"""GET API: Course instructor sees only courses they have access to."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
resp = self.client.get(self.url, {"email": self.user.email})
self.assertEqual(resp.status_code, 200)
course_ids = [course["course_id"] for course in resp.data]
self.assertIn(str(self.extra_courses[0].id), course_ids)
for i in range(1, 3):
self.assertNotIn(str(self.extra_courses[i].id), course_ids)
def test_get_api_user_with_no_access_sees_no_courses(self):
"""GET API: Non-instructor users see no courses in the response."""
self.client.login(username=self.user.username, password=TEST_PASSWORD)
resp = self.client.get(self.url, {"email": self.user.email})
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data, [])
# --- PUT API TEST CASES ---
def test_put_api_missing_email_returns_400(self):
"""PUT API: Sending request without email returns 400 with an error message."""
self.client.login(username=self.user.username, password=TEST_PASSWORD)
data = {"bulk_role_operations": []}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.data["email"], "")
result = resp.data["results"][0]
self.assertEqual(result["status"], "failed")
self.assertIn("error", result)
self.assertIn("email", result["error"])
def test_put_api_empty_operations_returns_400(self):
"""PUT API: Sending empty bulk_role_operations returns 400 with an error message."""
self.client.login(username=self.user.username, password=TEST_PASSWORD)
data = {"email": "test@example.com", "bulk_role_operations": []}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.data["email"], "test@example.com")
result = resp.data["results"][0]
self.assertEqual(result["status"], "failed")
self.assertIn("error", result)
self.assertIn("bulk_role_operations", result["error"])
def test_put_api_non_dict_input_returns_400(self):
"""PUT API: Sending a non-dict input returns 400 with an error message."""
self.client.login(username=self.user.username, password=TEST_PASSWORD)
resp = self.client.put(self.url, ["invalid"], format="json")
self.assertEqual(resp.status_code, 400)
result = resp.data["results"][0]
self.assertEqual(result["status"], "failed")
self.assertIn("error", result)
self.assertIn("JSON object", result["error"])
def test_put_api_missing_fields_returns_error(self):
"""PUT API: Request with missing required fields returns field-level errors."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
data = {
"email": self.user.email,
"bulk_role_operations": [{"course_id": "", "role": "", "action": ""}],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
result = resp.data["results"][0]
self.assertEqual(result["status"], "failed")
for field in ("course_id", "role", "action"):
self.assertIn(field, result)
def test_put_api_invalid_user_email_returns_error(self):
"""PUT API: Invalid user email in request returns an error."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
data = {
"email": "notfound@example.com",
"bulk_role_operations": [
{
"course_id": str(self.extra_courses[0].id),
"role": "instructor",
"action": "assign",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 404)
self.assertEqual(resp.data["email"], "notfound@example.com")
result = resp.data["results"][0]
self.assertEqual(result["status"], "failed")
self.assertIn("User not found", result["error"])
def test_put_api_invalid_course_id_returns_error(self):
"""PUT API: Invalid course_id in request returns an error."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": "invalid-course-id",
"role": "instructor",
"action": "assign",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
result = resp.data["results"][0]
self.assertEqual(result["status"], "failed")
self.assertIn("Invalid course_id", result["error"])
def test_put_api_invalid_action_returns_error(self):
"""PUT API: Invalid action value in request returns an error."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": str(self.extra_courses[0].id),
"role": "instructor",
"action": "invalid_action",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
result = resp.data["results"][0]
self.assertEqual(result["status"], "failed")
self.assertIn("Invalid action", result["error"])
def test_put_api_assign_role_enrolls_user(self):
"""PUT API: Assigning a role enrolls user in the course."""
self.client.login(username=self.staff.username, password=TEST_PASSWORD)
course = self.extra_courses[2]
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": str(course.id),
"role": "instructor",
"action": "assign",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
self.assertEqual(resp.data["results"][0]["status"], "success")
self.assertTrue(CourseInstructorRole(course.id).has_user(self.user))
self.assertTrue(CourseEnrollment.is_enrolled(self.user, course.id))
def test_put_api_revoke_role_removes_user(self):
"""PUT API: Revoking a role removes user from course team."""
self.client.login(username=self.staff.username, password=TEST_PASSWORD)
course = self.extra_courses[1]
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": str(course.id),
"role": "staff",
"action": "revoke",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
self.assertEqual(resp.data["results"][0]["status"], "success")
self.assertFalse(CourseStaffRole(course.id).has_user(self.user))
def test_put_api_course_instructor_can_manage_own_courses(self):
"""PUT API: Course instructor can assign roles for courses they manage."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": str(self.extra_courses[0].id),
"role": "staff",
"action": "assign",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
self.assertEqual(resp.data["results"][0]["status"], "success")
self.assertTrue(CourseStaffRole(self.extra_courses[0].id).has_user(self.user))
def test_put_api_non_instructor_user_forbidden(self):
"""PUT API: Non-instructor users receive 403 Forbidden when assigning roles."""
self.client.login(username=self.user.username, password=TEST_PASSWORD)
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": str(self.extra_courses[0].id),
"role": "staff",
"action": "assign",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 403)
self.assertEqual(resp.data["email"], self.user.email)
self.assertEqual(resp.data["results"][0]["status"], "failed")
self.assertIn("do not have permission", resp.data["results"][0]["error"])
def test_put_api_org_level_instructor_can_manage_all_org_courses(self):
"""PUT API: Org-level instructors can manage roles for all courses in their org."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
CourseAccessRole.objects.create(
user=self.instructor_user, role="instructor", org=self.org, course_id=None
)
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": str(self.extra_courses[1].id),
"role": "staff",
"action": "assign",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
self.assertEqual(resp.data["results"][0]["status"], "success")
self.assertTrue(CourseStaffRole(self.extra_courses[1].id).has_user(self.user))
def test_put_api_course_instructor_cannot_manage_other_courses(self):
"""PUT API: Course instructor cannot assign roles for courses they don't manage."""
self.client.login(
username=self.instructor_user.username, password=TEST_PASSWORD
)
data = {
"email": self.user.email,
"bulk_role_operations": [
{
"course_id": str(self.extra_courses[1].id),
"role": "staff",
"action": "assign",
}
],
}
resp = self.client.put(self.url, data, format="json")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.data["email"], self.user.email)
self.assertEqual(resp.data["results"][0]["status"], "failed")
self.assertIn("do not have instructor access", resp.data["results"][0]["error"])

View File

@@ -0,0 +1,17 @@
"""
URL definitions for the course_modes v1 API.
"""
from django.urls import re_path
from .views import CourseTeamManageAPIView
app_name = "v1"
urlpatterns = [
re_path(
r"manage_course_team/?$",
CourseTeamManageAPIView.as_view(),
name="manage_course_team",
),
]

View File

@@ -0,0 +1,537 @@
"""
API Views for course team management in support app.
"""
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from opaque_keys.edx.keys import CourseKey
from rest_framework import status
from rest_framework.exceptions import NotFound, ValidationError
from rest_framework.generics import GenericAPIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from common.djangoapps.student.models import CourseEnrollment
from common.djangoapps.student.models.user import CourseAccessRole
from common.djangoapps.student.roles import CourseRole
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
from ..serializers import CourseTeamManageSerializer
User = get_user_model()
class CourseTeamManageAPIView(GenericAPIView):
"""
Use case:
- APIs for viewing and managing a user's course team roles.
- Lists courses where the authenticated user has permission to manage roles.
- Displays the given user's role in each accessible course.
- Allows assigning or revoking roles via PUT, limited to courses the user can manage.
"""
permission_classes = (IsAuthenticated,)
serializer_class = CourseTeamManageSerializer
pagination_class = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._course_role_map = {}
self._access_roles = None
def get_serializer_context(self):
"""Provide extra context to the serializer."""
context = super().get_serializer_context()
context["course_role_map"] = getattr(self, "_course_role_map", {})
return context
def get_course_role_map_for_user(self, user):
"""Return a mapping of course_id to role for staff/instructor roles of given user."""
access_roles = CourseAccessRole.objects.filter(
user=user, role__in=["staff", "instructor"]
)
course_role_map = {}
for access_role in access_roles:
course_id = str(access_role.course_id)
# Prioritize instructor role if present
if access_role.role == "instructor" or course_id not in course_role_map:
course_role_map[course_id] = access_role.role
return course_role_map
def get_accessible_courses_for_user(self, auth_user):
"""Return queryset of courses accessible by the authenticated user."""
if auth_user.is_superuser or auth_user.is_staff:
return CourseOverview.objects.all()
access_roles = CourseAccessRole.objects.filter(
user=auth_user, role="instructor"
).only("org", "course_id")
# Collect org-level and course-level permissions
orgs = set()
course_keys = set()
for access_role in access_roles:
if access_role.course_id:
course_keys.add(str(access_role.course_id))
elif access_role.org:
orgs.add(access_role.org)
# Build a filter to fetch all courses:
# - Courses in the orgs where the user has org-level access
# - Courses explicitly assigned to the user
course_filter = Q()
if orgs:
course_filter |= Q(org__in=orgs)
if course_keys:
course_filter |= Q(id__in=course_keys)
return (
CourseOverview.objects.filter(course_filter)
if course_filter
else CourseOverview.objects.none()
)
def get_queryset(self):
"""Main entry to return courses filtered by authenticated user access and requested user roles."""
email = self.request.query_params.get("email")
username = self.request.query_params.get("username")
user_id = self.request.query_params.get("user_id")
if not any([email, username, user_id]):
raise ValidationError(
{
"detail": "Missing required query parameters. "
"At least one of 'email', 'username', or 'user_id' is required."
}
)
# Build user lookup query
user_query = Q(is_active=True)
if email:
user_query &= Q(email=email)
elif username:
user_query &= Q(username=username)
elif user_id:
user_query &= Q(id=user_id)
try:
user = User.objects.get(user_query)
except ObjectDoesNotExist as exc:
identifier = email or username or user_id
raise NotFound(
f"User with identifier '{identifier}' not found or not active."
) from exc
self._access_roles = CourseAccessRole.objects.filter(
user=user, role__in=["staff", "instructor"]
)
self._course_role_map = self.get_course_role_map_for_user(user)
auth_user = self.request.user
return self.get_accessible_courses_for_user(auth_user)
def list(self, request, *args, **kwargs):
"""list of courses for the organization and user."""
queryset = self.get_queryset()
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
@swagger_auto_schema(
manual_parameters=[
openapi.Parameter(
"email",
openapi.IN_QUERY,
description="User's email address",
type=openapi.TYPE_STRING,
),
openapi.Parameter(
"username",
openapi.IN_QUERY,
description="User's username",
type=openapi.TYPE_STRING,
),
openapi.Parameter(
"user_id",
openapi.IN_QUERY,
description="User's ID",
type=openapi.TYPE_INTEGER,
),
]
)
def get(self, request, *args, **kwargs):
"""
Retrieve a list of courses accessible by the authenticated user
**Use Case**
GET API to retrieve a list of courses accessible by the authenticated user,
along with the specified user's role ("instructor", "staff", or null) in each course.
**Endpoint**
GET /api/support/v1/manage_course_team/
**Query Parameters**
At least one of the following parameters is required:
- email: User's email address
- username: User's username
- user_id: User's ID
**Returns**
List of courses accessible to the authenticated user, each annotated with the
specified user's role in that course. Each course includes organizational
information and identifiers.
**Example Response**
```json
{
"results": [
{
"course_id": "course-v1:edX+DemoX+2025_T1",
"course_name": "edX Demonstration Course",
"role": "instructor",
"status": "active",
"org": "edX",
"run": "2025_T1",
"number": "DemoX"
},
{
"course_id": "course-v1:MITx+6.00x+2024_Fall",
"course_name": "Introduction to Computer Science",
"role": "staff",
"status": "archived",
"org": "MITx",
"run": "2024_Fall",
"number": "6.00x"
}
]
}
```
"""
return self.list(request, *args, **kwargs)
def put(self, request, *args, **kwargs):
"""
Bulk assign or revoke course team roles for a user across multiple courses.
**Endpoint**
PUT /api/support/v1/manage_course_team/
**Permissions**
- Admin/Staff users: Can manage roles for any course
- Instructor users: Can only manage roles for courses/orgs they have instructor access to
- Other users: Access denied
**Request Data**
A JSON object containing:
- email: User's email address
- bulk_role_operations: List of role operations, each containing:
- course_id: Course key string
- role: Role to assign or revoke ("instructor" or "staff")
- action: "assign" or "revoke"
**Example Request**
```json
{
"email": "user1@example.com",
"bulk_role_operations": [
{
"course_id": "course-v1:edX+DemoX+2025_T1",
"role": "instructor",
"action": "assign"
},
{
"course_id": "course-v1:edX+DemoX+2025_T2",
"role": "staff",
"action": "revoke"
}
]
}
```
**Returns**
- HTTP 200 with results for each operation (success or error details)
- HTTP 400 if the request data is invalid
**Example Response**
```json
{
"email": "user1@example.com",
"results": [
{
"course_id": "course-v1:edX+DemoX+2025_T1",
"role": "instructor",
"action": "assign",
"status": "success"
},
{
"course_id": "course-v1:edX+DemoX+2025_T2",
"role": "staff",
"action": "revoke",
"status": "failed",
"error": "error_message"
}
]
}
```
"""
results = []
# Validate request data structure and extract fields
if not isinstance(request.data, dict):
return self._error_response(
"",
"Request data must be a JSON object with 'email' and 'bulk_role_operations' fields.",
)
email = request.data.get("email")
bulk_role_operations = request.data.get("bulk_role_operations", [])
# Combined validation for email and bulk_role_operations
if not email:
return self._error_response(
"", "Missing required field: 'email' is required."
)
if not isinstance(bulk_role_operations, list) or not bulk_role_operations:
return self._error_response(
email,
"Missing or empty 'bulk_role_operations' field. Must be a non-empty list.",
)
auth_user = request.user
# Get accessible orgs/courses and user validation
accessible_orgs, accessible_courses, user, validation_error = (
self._validate_permissions_and_user(auth_user, email)
)
if validation_error:
return validation_error
course_key_cache = {}
for operation_data in bulk_role_operations:
result = self._handle_role_assignment_entry(
operation_data,
auth_user,
accessible_orgs,
accessible_courses,
user,
course_key_cache,
)
results.append(result)
return Response({"email": email, "results": results}, status=status.HTTP_200_OK)
def _fetch_user_accessible_orgs_and_courses(self, auth_user):
"""Return (orgs, courses, error_response) based on user permissions."""
accessible_orgs = set()
accessible_courses = set()
permission_error = None
# Admins and staff have full access by default
if not (auth_user.is_superuser or auth_user.is_staff):
roles = CourseAccessRole.objects.filter(
user=auth_user, role="instructor"
).only("org", "course_id")
if roles.exists():
for role in roles:
if role.course_id:
accessible_courses.add(str(role.course_id))
elif role.org:
# Lowercase course org for case-insensitive compare
accessible_orgs.add(role.org.lower())
else:
permission_error = Response(
{
"results": [
{
"status": "failed",
"error": "You do not have permission to perform bulk role operations.",
}
]
},
status=status.HTTP_403_FORBIDDEN,
)
return accessible_orgs, accessible_courses, permission_error
def _error_response(
self, email, error_message, status_code=status.HTTP_400_BAD_REQUEST
):
"""Helper method to create standardized error responses."""
return Response(
{
"email": email,
"results": [
{
"status": "failed",
"error": error_message,
}
],
},
status=status_code,
)
def _validate_permissions_and_user(self, auth_user, email):
"""Combined validation for user permissions and user lookup."""
error_response = None
user = None
# Get accessible orgs/courses
accessible_orgs, accessible_courses, permission_error = (
self._fetch_user_accessible_orgs_and_courses(auth_user)
)
if permission_error:
error_response = self._error_response(
email,
"You do not have permission to perform bulk role operations.",
status.HTTP_403_FORBIDDEN,
)
return accessible_orgs, accessible_courses, user, error_response
# Get and validate user
user = self._get_user(email, {})
if not user:
error_response = self._error_response(
email, "User not found.", status.HTTP_404_NOT_FOUND
)
elif not user.is_active:
error_response = self._error_response(email, "User is not active.")
return accessible_orgs, accessible_courses, user, error_response
def _handle_role_assignment_entry(
self,
data,
auth_user,
accessible_orgs,
accessible_courses,
user,
course_key_cache,
):
"""Validate and perform the action for a single operation entry."""
course_id = data.get("course_id")
role = data.get("role")
action = data.get("action")
# Validate required fields
if not all([course_id, role, action]):
return self._make_result(
data,
outcome="failed",
error="Missing required fields: 'course_id', 'role', and 'action' are all required.",
)
# Validate role field
if role not in {"instructor", "staff"}:
return self._make_result(
data, "failed", "Invalid role. Must be 'instructor' or 'staff'."
)
# Validate action field
if action not in {"assign", "revoke"}:
return self._make_result(
data, "failed", "Invalid action. Must be 'assign' or 'revoke'."
)
# Validate course
course_key = self._validate_course(course_id, course_key_cache)
if not course_key:
return self._make_result(
data, "failed", "Invalid course_id or course not found."
)
# Ensure only admin/staff or authorized instructors can manage roles for this course/org
if not (auth_user.is_staff or auth_user.is_superuser) and (
course_id not in accessible_courses
# Lowercase course org for case-insensitive compare
and course_key.org.lower() not in accessible_orgs
):
return self._make_result(
data,
"failed",
f"You do not have instructor access to course '{course_id}' or org '{course_key.org}'.",
)
# Perform role update based on action
return self._perform_role_action(data, role, action, user, course_key)
def _validate_course(self, course_id, cache):
"""
Validate that course_id is well-formed and course exists.
Caches results to minimize DB queries.
"""
if course_id in cache:
return cache[course_id]
try:
course_key = CourseKey.from_string(course_id)
if not CourseOverview.course_exists(course_key):
course_key = None
except Exception as exc: # pylint: disable=broad-except
course_key = None
cache[course_id] = course_key
return course_key
def _get_user(self, email, cache):
"""Fetch or get cached user by email."""
if email in cache:
return cache[email]
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
user = None
cache[email] = user
return user
def _perform_role_action(self, data, role, action, user, course_key):
"""Assign or revoke role for the user on the course."""
try:
course_role = CourseRole(role, course_key)
if action == "assign":
course_role.add_users(user)
CourseEnrollment.enroll(user, course_key)
elif action == "revoke":
course_role.remove_users(user)
else:
return self._make_result(
data, "failed", "Invalid action. Use 'assign' or 'revoke'."
)
return self._make_result(data, "success")
except Exception as exc: # pylint: disable=broad-except
return self._make_result(data, "failed", str(exc))
def _make_result(self, data, outcome, error=None):
"""
Formats the response for each role assignment entry.
"""
result = {
"course_id": data.get("course_id"),
"role": data.get("role"),
"action": data.get("action"),
"status": outcome,
}
if error:
result["error"] = error
return result

View File

@@ -227,6 +227,14 @@ urlpatterns += [
urlpatterns += [
path('support/', include('lms.djangoapps.support.urls')),
# Support API RESTful endpoints
path(
'api/support/',
include(
('lms.djangoapps.support.rest_api.urls', 'lms.djangoapps.support'),
namespace='support_api',
)
),
]
# Favicon