Files
2025-08-18 16:12:02 +05:00

572 lines
20 KiB
Python

"""
API Views for course team management in support app.
"""
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from opaque_keys.edx.keys import CourseKey
from rest_framework import status
from rest_framework.exceptions import NotFound, ValidationError
from rest_framework.generics import GenericAPIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from common.djangoapps.student.models import CourseEnrollment
from common.djangoapps.student.models.user import CourseAccessRole
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
from ..serializers import CourseTeamManageSerializer
User = get_user_model()
class CourseTeamManageAPIView(GenericAPIView):
"""
Use case:
- APIs for viewing and managing a user's course team roles.
- Lists courses where the authenticated user has permission to manage roles.
- Displays the given user's role in each accessible course.
- Allows assigning or revoking roles via PUT, limited to courses the user can manage.
"""
permission_classes = (IsAuthenticated,)
serializer_class = CourseTeamManageSerializer
pagination_class = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._course_role_map = {}
self._access_roles = None
def get_serializer_context(self):
"""Provide extra context to the serializer."""
context = super().get_serializer_context()
context["course_role_map"] = getattr(self, "_course_role_map", {})
return context
def get_course_role_map_for_user(self, user):
"""Return a mapping of course_id to role for staff/instructor roles of given user."""
access_roles = CourseAccessRole.objects.filter(
user=user, role__in=["staff", "instructor"]
)
course_role_map = {}
for access_role in access_roles:
course_id = str(access_role.course_id)
# Prioritize instructor role if present
if access_role.role == "instructor" or course_id not in course_role_map:
course_role_map[course_id] = access_role.role
return course_role_map
def get_accessible_courses_for_user(self, auth_user):
"""Return queryset of courses accessible by the authenticated user."""
if auth_user.is_superuser or auth_user.is_staff:
return CourseOverview.objects.all()
access_roles = CourseAccessRole.objects.filter(
user=auth_user, role="instructor"
).only("org", "course_id")
# Collect org-level and course-level permissions
orgs = set()
course_keys = set()
for access_role in access_roles:
if access_role.course_id:
course_keys.add(str(access_role.course_id))
elif access_role.org:
orgs.add(access_role.org)
# Build a filter to fetch all courses:
# - Courses in the orgs where the user has org-level access
# - Courses explicitly assigned to the user
course_filter = Q()
if orgs:
course_filter |= Q(org__in=orgs)
if course_keys:
course_filter |= Q(id__in=course_keys)
return (
CourseOverview.objects.filter(course_filter)
if course_filter
else CourseOverview.objects.none()
)
def get_queryset(self):
"""Main entry to return courses filtered by authenticated user access and requested user roles."""
email = self.request.query_params.get("email")
username = self.request.query_params.get("username")
user_id = self.request.query_params.get("user_id")
if not any([email, username, user_id]):
raise ValidationError(
{
"detail": "Missing required query parameters. "
"At least one of 'email', 'username', or 'user_id' is required."
}
)
# Build user lookup query
user_query = Q(is_active=True)
if email:
user_query &= Q(email=email)
elif username:
user_query &= Q(username=username)
elif user_id:
user_query &= Q(id=user_id)
try:
user = User.objects.get(user_query)
except ObjectDoesNotExist as exc:
identifier = email or username or user_id
raise NotFound(
f"User with identifier '{identifier}' not found or not active."
) from exc
self._access_roles = CourseAccessRole.objects.filter(
user=user, role__in=["staff", "instructor"]
)
self._course_role_map = self.get_course_role_map_for_user(user)
auth_user = self.request.user
return self.get_accessible_courses_for_user(auth_user)
def list(self, request, *args, **kwargs):
"""list of courses for the organization and user."""
queryset = self.get_queryset()
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
@swagger_auto_schema(
manual_parameters=[
openapi.Parameter(
"email",
openapi.IN_QUERY,
description="User's email address",
type=openapi.TYPE_STRING,
),
openapi.Parameter(
"username",
openapi.IN_QUERY,
description="User's username",
type=openapi.TYPE_STRING,
),
openapi.Parameter(
"user_id",
openapi.IN_QUERY,
description="User's ID",
type=openapi.TYPE_INTEGER,
),
]
)
def get(self, request, *args, **kwargs):
"""
Retrieve a list of courses accessible by the authenticated user
**Use Case**
GET API to retrieve a list of courses accessible by the authenticated user,
along with the specified user's role ("instructor", "staff", or null) in each course.
**Endpoint**
GET /api/support/v1/manage_course_team/
**Query Parameters**
At least one of the following parameters is required:
- email: User's email address
- username: User's username
- user_id: User's ID
**Returns**
List of courses accessible to the authenticated user, each annotated with the
specified user's role in that course. Each course includes organizational
information and identifiers.
**Example Response**
```json
{
"results": [
{
"course_id": "course-v1:edX+DemoX+2025_T1",
"course_name": "edX Demonstration Course",
"course_url": "https://studio.example.com/course/course-v1:edX+DemoX+2025_T1",
"role": "instructor",
"status": "active",
"org": "edX",
"run": "2025_T1",
"number": "DemoX"
},
{
"course_id": "course-v1:MITx+6.00x+2024_Fall",
"course_name": "Introduction to Computer Science",
"course_url": "https://studio.example.com/course/course-v1:MITx+6.00x+2024_Fall",
"role": "staff",
"status": "archived",
"org": "MITx",
"run": "2024_Fall",
"number": "6.00x"
}
]
}
```
"""
return self.list(request, *args, **kwargs)
def put(self, request, *args, **kwargs):
"""
Bulk assign or revoke course team roles for a user across multiple courses.
**Endpoint**
PUT /api/support/v1/manage_course_team/
**Permissions**
- Admin/Staff users: Can manage roles for any course
- Instructor users: Can only manage roles for courses/orgs they have instructor access to
- Other users: Access denied
**Request Data**
A JSON object containing:
- email: User's email address
- bulk_role_operations: List of role operations, each containing:
- course_id: Course key string
- role: Role to assign or revoke ("instructor" or "staff")
- action: "assign" or "revoke"
**Example Request**
```json
{
"email": "user1@example.com",
"bulk_role_operations": [
{
"course_id": "course-v1:edX+DemoX+2025_T1",
"role": "instructor",
"action": "assign"
},
{
"course_id": "course-v1:edX+DemoX+2025_T2",
"role": "staff",
"action": "revoke"
}
]
}
```
**Returns**
- HTTP 200 with results for each operation (success or error details)
- HTTP 400 if the request data is invalid
**Example Response**
```json
{
"email": "user1@example.com",
"results": [
{
"course_id": "course-v1:edX+DemoX+2025_T1",
"role": "instructor",
"action": "assign",
"status": "success"
},
{
"course_id": "course-v1:edX+DemoX+2025_T2",
"role": "staff",
"action": "revoke",
"status": "failed",
"error": "error_message"
}
]
}
```
"""
results = []
# Validate request data structure and extract fields
if not isinstance(request.data, dict):
return self._error_response(
"",
"Request data must be a JSON object with 'email' and 'bulk_role_operations' fields.",
)
email = request.data.get("email")
bulk_role_operations = request.data.get("bulk_role_operations", [])
# Combined validation for email and bulk_role_operations
if not email:
return self._error_response(
"", "Missing required field: 'email' is required."
)
if not isinstance(bulk_role_operations, list) or not bulk_role_operations:
return self._error_response(
email,
"Missing or empty 'bulk_role_operations' field. Must be a non-empty list.",
)
auth_user = request.user
# Get accessible orgs/courses and user validation
accessible_orgs, accessible_courses, user, validation_error = (
self._validate_permissions_and_user(auth_user, email)
)
if validation_error:
return validation_error
course_key_cache = {}
for operation_data in bulk_role_operations:
result = self._handle_role_assignment_entry(
operation_data,
auth_user,
accessible_orgs,
accessible_courses,
user,
course_key_cache,
)
results.append(result)
return Response({"email": email, "results": results}, status=status.HTTP_200_OK)
def _fetch_user_accessible_orgs_and_courses(self, auth_user):
"""Return (orgs, courses, error_response) based on user permissions."""
accessible_orgs = set()
accessible_courses = set()
permission_error = None
# Admins and staff have full access by default
if not (auth_user.is_superuser or auth_user.is_staff):
roles = CourseAccessRole.objects.filter(
user=auth_user, role="instructor"
).only("org", "course_id")
if roles.exists():
for role in roles:
if role.course_id:
accessible_courses.add(str(role.course_id))
elif role.org:
# Lowercase course org for case-insensitive compare
accessible_orgs.add(role.org.lower())
else:
permission_error = Response(
{
"results": [
{
"status": "failed",
"error": "You do not have permission to perform bulk role operations.",
}
]
},
status=status.HTTP_403_FORBIDDEN,
)
return accessible_orgs, accessible_courses, permission_error
def _error_response(
self, email, error_message, status_code=status.HTTP_400_BAD_REQUEST
):
"""Helper method to create standardized error responses."""
return Response(
{
"email": email,
"results": [
{
"status": "failed",
"error": error_message,
}
],
},
status=status_code,
)
def _validate_permissions_and_user(self, auth_user, email):
"""Combined validation for user permissions and user lookup."""
error_response = None
user = None
# Get accessible orgs/courses
accessible_orgs, accessible_courses, permission_error = (
self._fetch_user_accessible_orgs_and_courses(auth_user)
)
if permission_error:
error_response = self._error_response(
email,
"You do not have permission to perform bulk role operations.",
status.HTTP_403_FORBIDDEN,
)
return accessible_orgs, accessible_courses, user, error_response
# Get and validate user
user = self._get_user(email, {})
if not user:
error_response = self._error_response(
email, "User not found.", status.HTTP_404_NOT_FOUND
)
elif not user.is_active:
error_response = self._error_response(email, "User is not active.")
return accessible_orgs, accessible_courses, user, error_response
def _handle_role_assignment_entry(
self,
data,
auth_user,
accessible_orgs,
accessible_courses,
user,
course_key_cache,
):
"""Validate and perform the action for a single operation entry."""
course_id = data.get("course_id")
role = data.get("role")
action = data.get("action")
# Validate required fields
if not all([course_id, role, action]):
return self._make_result(
data,
outcome="failed",
error="Missing required fields: 'course_id', 'role', and 'action' are all required.",
)
# Validate role field
if role not in {"instructor", "staff"}:
return self._make_result(
data, "failed", "Invalid role. Must be 'instructor' or 'staff'."
)
# Validate action field
if action not in {"assign", "revoke"}:
return self._make_result(
data, "failed", "Invalid action. Must be 'assign' or 'revoke'."
)
# Validate course
course_key = self._validate_course(course_id, course_key_cache)
if not course_key:
return self._make_result(
data, "failed", "Invalid course_id or course not found."
)
# Ensure only admin/staff or authorized instructors can manage roles for this course/org
if not (auth_user.is_staff or auth_user.is_superuser) and (
course_id not in accessible_courses
# Lowercase course org for case-insensitive compare
and course_key.org.lower() not in accessible_orgs
):
return self._make_result(
data,
"failed",
f"You do not have instructor access to course '{course_id}' or org '{course_key.org}'.",
)
# Perform role update based on action
return self._perform_role_action(data, role, action, user, course_key)
def _validate_course(self, course_id, cache):
"""
Validate that course_id is well-formed and course exists.
Caches results to minimize DB queries.
"""
if course_id in cache:
return cache[course_id]
try:
course_key = CourseKey.from_string(course_id)
if not CourseOverview.course_exists(course_key):
course_key = None
except Exception as exc: # pylint: disable=broad-except
course_key = None
cache[course_id] = course_key
return course_key
def _get_user(self, email, cache):
"""Fetch or get cached user by email."""
if email in cache:
return cache[email]
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
user = None
cache[email] = user
return user
def _perform_role_action(self, data, role, action, user, course_key):
"""Assign or revoke role for the user on the course."""
try:
if action == "assign":
self._assign_role(user, role, course_key)
elif action == "revoke":
self._revoke_role(user, role, course_key)
else:
return self._make_result(
data, "failed", "Invalid action. Use 'assign' or 'revoke'."
)
# Clear user roles cache
if hasattr(user, '_roles'):
del user._roles
return self._make_result(data, "success")
except Exception as exc: # pylint: disable=broad-except
return self._make_result(data, "failed", str(exc))
def _assign_role(self, user, role, course_key):
"""
Assign role with single entry.
"""
existing_role = CourseAccessRole.objects.filter(
user=user,
course_id=course_key,
org=course_key.org,
role__in=["staff", "instructor"]
).first()
if existing_role:
existing_role.role = role
existing_role.save()
else:
CourseAccessRole.objects.create(
user=user,
role=role,
course_id=course_key,
org=course_key.org,
)
CourseEnrollment.enroll(user, course_key)
def _revoke_role(self, user, role, course_key):
"""
Revoke role with cleanup - instructor revoke removes all access.
"""
existing_roles = CourseAccessRole.objects.filter(
user=user, course_id=course_key, role__in=["staff", "instructor"]
)
existing_roles.delete()
def _make_result(self, data, outcome, error=None):
"""
Formats the response for each role assignment entry.
"""
result = {
"course_id": data.get("course_id"),
"role": data.get("role"),
"action": data.get("action"),
"status": outcome,
}
if error:
result["error"] = error
return result