From b5bcb37c5a34fa8c8eb6a2b8b8ea2c5b65bd6e1e Mon Sep 17 00:00:00 2001 From: Pandi Ganesh Date: Tue, 29 Jul 2025 11:33:09 +0530 Subject: [PATCH] feat: API to manage course team roles across multiple courses (#36990) * feat: API to fetch course-roles mapping by user and org * fix: added docstring and resolve pylint issues * feat: support bulk course team role updates via PUT API * fix: refactor APIs based on user permissions * chore: improve Swagger schema for course_team endpoints * fix: refactor GET and PUT api code * fix: apply pylint rules and optimize code * fix: resolve test cases for supoort apis * fix: change url path --- lms/djangoapps/support/rest_api/__init__.py | 0 .../support/rest_api/serializers.py | 47 ++ lms/djangoapps/support/rest_api/urls.py | 11 + .../support/rest_api/v1/__init__.py | 0 .../support/rest_api/v1/tests/__init__.py | 0 .../support/rest_api/v1/tests/test_views.py | 383 +++++++++++++ lms/djangoapps/support/rest_api/v1/urls.py | 17 + lms/djangoapps/support/rest_api/v1/views.py | 537 ++++++++++++++++++ lms/urls.py | 8 + 9 files changed, 1003 insertions(+) create mode 100644 lms/djangoapps/support/rest_api/__init__.py create mode 100644 lms/djangoapps/support/rest_api/serializers.py create mode 100644 lms/djangoapps/support/rest_api/urls.py create mode 100644 lms/djangoapps/support/rest_api/v1/__init__.py create mode 100644 lms/djangoapps/support/rest_api/v1/tests/__init__.py create mode 100644 lms/djangoapps/support/rest_api/v1/tests/test_views.py create mode 100644 lms/djangoapps/support/rest_api/v1/urls.py create mode 100644 lms/djangoapps/support/rest_api/v1/views.py diff --git a/lms/djangoapps/support/rest_api/__init__.py b/lms/djangoapps/support/rest_api/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lms/djangoapps/support/rest_api/serializers.py b/lms/djangoapps/support/rest_api/serializers.py new file mode 100644 index 0000000000..d3020ae07b --- /dev/null +++ b/lms/djangoapps/support/rest_api/serializers.py @@ -0,0 +1,47 @@ +""" +Serializers for use in the support app. +""" + +from datetime import datetime + +import pytz +from rest_framework import serializers + +from openedx.core.djangoapps.content.course_overviews.models import CourseOverview + + +class CourseTeamManageSerializer(serializers.ModelSerializer): + """Serializer for course team management context data""" + + role = serializers.SerializerMethodField() + status = serializers.SerializerMethodField() + + class Meta: + model = CourseOverview + fields = ("id", "display_name", "role", "status") + + def get_role(self, obj): + course_role_map = self.context.get("course_role_map", {}) + return course_role_map.get(str(obj.id)) + + def get_status(self, obj): + """ + Determine if the course is active or archived based on end date. + Returns 'active' if course end is null or in the future, 'archived' otherwise. + """ + if obj.end is None or obj.end >= datetime.now().replace(tzinfo=pytz.UTC): + return "active" + return "archived" + + def to_representation(self, instance): + data = super().to_representation(instance) + course_key = instance.id + return { + "course_id": str(course_key), + "course_name": data["display_name"], + "role": data["role"], + "status": data["status"], + "org": course_key.org, + "run": course_key.run, + "number": course_key.course, + } diff --git a/lms/djangoapps/support/rest_api/urls.py b/lms/djangoapps/support/rest_api/urls.py new file mode 100644 index 0000000000..7841fcb894 --- /dev/null +++ b/lms/djangoapps/support/rest_api/urls.py @@ -0,0 +1,11 @@ +""" +URL definitions for the support API. +""" + +from django.urls import include, path + +app_name = "lms.djangoapps.support.rest_api" + +urlpatterns = [ + path("v1/", include("lms.djangoapps.support.rest_api.v1.urls", namespace="v1")), +] diff --git a/lms/djangoapps/support/rest_api/v1/__init__.py b/lms/djangoapps/support/rest_api/v1/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lms/djangoapps/support/rest_api/v1/tests/__init__.py b/lms/djangoapps/support/rest_api/v1/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lms/djangoapps/support/rest_api/v1/tests/test_views.py b/lms/djangoapps/support/rest_api/v1/tests/test_views.py new file mode 100644 index 0000000000..f0e3265b7c --- /dev/null +++ b/lms/djangoapps/support/rest_api/v1/tests/test_views.py @@ -0,0 +1,383 @@ +""" +Tests for support views. +""" + +import ddt +from django.urls import reverse +from rest_framework.test import APIClient + +from common.djangoapps.student.models import CourseEnrollment +from common.djangoapps.student.models.user import CourseAccessRole +from common.djangoapps.student.roles import CourseInstructorRole, CourseStaffRole +from common.djangoapps.student.tests.factories import ( + TEST_PASSWORD, + AdminFactory, + InstructorFactory, + StaffFactory, + SuperuserFactory, + UserFactory +) +from lms.djangoapps.support.tests.test_views import SupportViewTestCase +from openedx.core.djangoapps.content.course_overviews.tests.factories import CourseOverviewFactory + + +@ddt.ddt +class CourseTeamManageAPIViewTest(SupportViewTestCase): + """ + Tests for CourseTeamManagementAPIView. + + Covers: + - GET: Viewing user's course team roles by authenticated users. + - PUT: Assigning/revoking user roles by authorized users. + """ + + def setUp(self): + super().setUp() + self.staff = AdminFactory() + self.superuser = SuperuserFactory() + self.user = UserFactory() + self.org = "TestOrg" + self.client = APIClient() + self.url = reverse("lms.djangoapps.support.rest_api:v1:manage_course_team") + + self.extra_courses = [ + CourseOverviewFactory.create( + org=self.org, run=f"CS10{i}", display_name=f"Test-Course-{i}" + ) + for i in range(3) + ] + self.instructor_user = InstructorFactory.create( + course_key=self.extra_courses[0].id + ) + self.staff_user = StaffFactory.create(course_key=self.extra_courses[1].id) + + # --- GET API TEST CASES --- + + def test_get_api_missing_query_params_returns_400(self): + """GET API: Returns 400 if no query parameters are provided.""" + self.client.login(username=self.user.username, password=TEST_PASSWORD) + resp = self.client.get(self.url) + self.assertEqual(resp.status_code, 400) + + def test_get_api_with_username_parameter(self): + """GET API: Can query by username parameter.""" + self.client.login(username=self.staff.username, password=TEST_PASSWORD) + resp = self.client.get(self.url, {"username": self.user.username}) + self.assertEqual(resp.status_code, 200) + self.assertIsInstance(resp.data, list) + + def test_get_api_with_user_id_parameter(self): + """GET API: Can query by user_id parameter.""" + self.client.login(username=self.staff.username, password=TEST_PASSWORD) + resp = self.client.get(self.url, {"user_id": self.user.id}) + self.assertEqual(resp.status_code, 200) + self.assertIsInstance(resp.data, list) + + def test_get_api_nonexistent_user_returns_404(self): + """GET API: Returns 404 for a nonexistent user email.""" + self.client.login(username=self.user.username, password=TEST_PASSWORD) + resp = self.client.get(self.url, {"email": "notfound@example.com"}) + self.assertEqual(resp.status_code, 404) + + def test_get_api_unauthenticated_user_returns_401(self): + """GET API: Unauthenticated users receive 401 Unauthorized.""" + resp = self.client.get(self.url, {"email": self.user.email}) + self.assertEqual(resp.status_code, 401) + + @ddt.data( + ("instructor", "instructor"), + ("staff", "staff"), + ) + @ddt.unpack + def test_get_api_admin_can_fetch_course_roles(self, assigned_role, expected_role): + """GET API: Admin/staff user can view roles for any course for target user.""" + self.client.login(username=self.staff.username, password=TEST_PASSWORD) + CourseAccessRole.objects.filter(user=self.user, org=self.org).delete() + + if assigned_role == "instructor": + CourseInstructorRole(self.extra_courses[0].id).add_users(self.user) + elif assigned_role == "staff": + CourseStaffRole(self.extra_courses[0].id).add_users(self.user) + + response = self.client.get(self.url, {"email": self.user.email}) + self.assertEqual(response.status_code, 200) + self.assertIsInstance(response.data, list) + + course_found = False + for course in response.data: + if course["course_id"] == str(self.extra_courses[0].id): + with self.subTest(role=assigned_role): + self.assertEqual(course["role"], expected_role) + # Verify new fields are present + self.assertIn("status", course) + self.assertIn("org", course) + self.assertIn("run", course) + self.assertIn("number", course) + self.assertIn("course_name", course) + course_found = True + self.assertTrue(course_found, "Expected course not found in response.") + + def test_get_api_instructor_can_only_see_their_courses(self): + """GET API: Course instructor sees only courses they have access to.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + resp = self.client.get(self.url, {"email": self.user.email}) + self.assertEqual(resp.status_code, 200) + + course_ids = [course["course_id"] for course in resp.data] + self.assertIn(str(self.extra_courses[0].id), course_ids) + for i in range(1, 3): + self.assertNotIn(str(self.extra_courses[i].id), course_ids) + + def test_get_api_user_with_no_access_sees_no_courses(self): + """GET API: Non-instructor users see no courses in the response.""" + self.client.login(username=self.user.username, password=TEST_PASSWORD) + resp = self.client.get(self.url, {"email": self.user.email}) + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data, []) + + # --- PUT API TEST CASES --- + + def test_put_api_missing_email_returns_400(self): + """PUT API: Sending request without email returns 400 with an error message.""" + self.client.login(username=self.user.username, password=TEST_PASSWORD) + data = {"bulk_role_operations": []} + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.data["email"], "") + result = resp.data["results"][0] + self.assertEqual(result["status"], "failed") + self.assertIn("error", result) + self.assertIn("email", result["error"]) + + def test_put_api_empty_operations_returns_400(self): + """PUT API: Sending empty bulk_role_operations returns 400 with an error message.""" + self.client.login(username=self.user.username, password=TEST_PASSWORD) + data = {"email": "test@example.com", "bulk_role_operations": []} + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.data["email"], "test@example.com") + result = resp.data["results"][0] + self.assertEqual(result["status"], "failed") + self.assertIn("error", result) + self.assertIn("bulk_role_operations", result["error"]) + + def test_put_api_non_dict_input_returns_400(self): + """PUT API: Sending a non-dict input returns 400 with an error message.""" + self.client.login(username=self.user.username, password=TEST_PASSWORD) + resp = self.client.put(self.url, ["invalid"], format="json") + self.assertEqual(resp.status_code, 400) + result = resp.data["results"][0] + self.assertEqual(result["status"], "failed") + self.assertIn("error", result) + self.assertIn("JSON object", result["error"]) + + def test_put_api_missing_fields_returns_error(self): + """PUT API: Request with missing required fields returns field-level errors.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + data = { + "email": self.user.email, + "bulk_role_operations": [{"course_id": "", "role": "", "action": ""}], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + result = resp.data["results"][0] + self.assertEqual(result["status"], "failed") + for field in ("course_id", "role", "action"): + self.assertIn(field, result) + + def test_put_api_invalid_user_email_returns_error(self): + """PUT API: Invalid user email in request returns an error.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + data = { + "email": "notfound@example.com", + "bulk_role_operations": [ + { + "course_id": str(self.extra_courses[0].id), + "role": "instructor", + "action": "assign", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 404) + self.assertEqual(resp.data["email"], "notfound@example.com") + result = resp.data["results"][0] + self.assertEqual(result["status"], "failed") + self.assertIn("User not found", result["error"]) + + def test_put_api_invalid_course_id_returns_error(self): + """PUT API: Invalid course_id in request returns an error.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": "invalid-course-id", + "role": "instructor", + "action": "assign", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + result = resp.data["results"][0] + self.assertEqual(result["status"], "failed") + self.assertIn("Invalid course_id", result["error"]) + + def test_put_api_invalid_action_returns_error(self): + """PUT API: Invalid action value in request returns an error.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": str(self.extra_courses[0].id), + "role": "instructor", + "action": "invalid_action", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + result = resp.data["results"][0] + self.assertEqual(result["status"], "failed") + self.assertIn("Invalid action", result["error"]) + + def test_put_api_assign_role_enrolls_user(self): + """PUT API: Assigning a role enrolls user in the course.""" + self.client.login(username=self.staff.username, password=TEST_PASSWORD) + course = self.extra_courses[2] + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": str(course.id), + "role": "instructor", + "action": "assign", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + self.assertEqual(resp.data["results"][0]["status"], "success") + self.assertTrue(CourseInstructorRole(course.id).has_user(self.user)) + self.assertTrue(CourseEnrollment.is_enrolled(self.user, course.id)) + + def test_put_api_revoke_role_removes_user(self): + """PUT API: Revoking a role removes user from course team.""" + self.client.login(username=self.staff.username, password=TEST_PASSWORD) + course = self.extra_courses[1] + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": str(course.id), + "role": "staff", + "action": "revoke", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + self.assertEqual(resp.data["results"][0]["status"], "success") + self.assertFalse(CourseStaffRole(course.id).has_user(self.user)) + + def test_put_api_course_instructor_can_manage_own_courses(self): + """PUT API: Course instructor can assign roles for courses they manage.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": str(self.extra_courses[0].id), + "role": "staff", + "action": "assign", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + self.assertEqual(resp.data["results"][0]["status"], "success") + self.assertTrue(CourseStaffRole(self.extra_courses[0].id).has_user(self.user)) + + def test_put_api_non_instructor_user_forbidden(self): + """PUT API: Non-instructor users receive 403 Forbidden when assigning roles.""" + self.client.login(username=self.user.username, password=TEST_PASSWORD) + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": str(self.extra_courses[0].id), + "role": "staff", + "action": "assign", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 403) + self.assertEqual(resp.data["email"], self.user.email) + self.assertEqual(resp.data["results"][0]["status"], "failed") + self.assertIn("do not have permission", resp.data["results"][0]["error"]) + + def test_put_api_org_level_instructor_can_manage_all_org_courses(self): + """PUT API: Org-level instructors can manage roles for all courses in their org.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + CourseAccessRole.objects.create( + user=self.instructor_user, role="instructor", org=self.org, course_id=None + ) + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": str(self.extra_courses[1].id), + "role": "staff", + "action": "assign", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + self.assertEqual(resp.data["results"][0]["status"], "success") + self.assertTrue(CourseStaffRole(self.extra_courses[1].id).has_user(self.user)) + + def test_put_api_course_instructor_cannot_manage_other_courses(self): + """PUT API: Course instructor cannot assign roles for courses they don't manage.""" + self.client.login( + username=self.instructor_user.username, password=TEST_PASSWORD + ) + data = { + "email": self.user.email, + "bulk_role_operations": [ + { + "course_id": str(self.extra_courses[1].id), + "role": "staff", + "action": "assign", + } + ], + } + resp = self.client.put(self.url, data, format="json") + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.data["email"], self.user.email) + self.assertEqual(resp.data["results"][0]["status"], "failed") + self.assertIn("do not have instructor access", resp.data["results"][0]["error"]) diff --git a/lms/djangoapps/support/rest_api/v1/urls.py b/lms/djangoapps/support/rest_api/v1/urls.py new file mode 100644 index 0000000000..8b6343e18b --- /dev/null +++ b/lms/djangoapps/support/rest_api/v1/urls.py @@ -0,0 +1,17 @@ +""" +URL definitions for the course_modes v1 API. +""" + +from django.urls import re_path + +from .views import CourseTeamManageAPIView + +app_name = "v1" + +urlpatterns = [ + re_path( + r"manage_course_team/?$", + CourseTeamManageAPIView.as_view(), + name="manage_course_team", + ), +] diff --git a/lms/djangoapps/support/rest_api/v1/views.py b/lms/djangoapps/support/rest_api/v1/views.py new file mode 100644 index 0000000000..6ca3e350a1 --- /dev/null +++ b/lms/djangoapps/support/rest_api/v1/views.py @@ -0,0 +1,537 @@ +""" +API Views for course team management in support app. +""" + +from django.contrib.auth import get_user_model +from django.core.exceptions import ObjectDoesNotExist +from django.db.models import Q +from opaque_keys.edx.keys import CourseKey +from rest_framework import status +from rest_framework.exceptions import NotFound, ValidationError +from rest_framework.generics import GenericAPIView +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from common.djangoapps.student.models import CourseEnrollment +from common.djangoapps.student.models.user import CourseAccessRole +from common.djangoapps.student.roles import CourseRole +from openedx.core.djangoapps.content.course_overviews.models import CourseOverview + +from ..serializers import CourseTeamManageSerializer + +User = get_user_model() + + +class CourseTeamManageAPIView(GenericAPIView): + """ + Use case: + - APIs for viewing and managing a user's course team roles. + - Lists courses where the authenticated user has permission to manage roles. + - Displays the given user's role in each accessible course. + - Allows assigning or revoking roles via PUT, limited to courses the user can manage. + """ + + permission_classes = (IsAuthenticated,) + serializer_class = CourseTeamManageSerializer + pagination_class = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._course_role_map = {} + self._access_roles = None + + def get_serializer_context(self): + """Provide extra context to the serializer.""" + context = super().get_serializer_context() + context["course_role_map"] = getattr(self, "_course_role_map", {}) + return context + + def get_course_role_map_for_user(self, user): + """Return a mapping of course_id to role for staff/instructor roles of given user.""" + access_roles = CourseAccessRole.objects.filter( + user=user, role__in=["staff", "instructor"] + ) + course_role_map = {} + for access_role in access_roles: + course_id = str(access_role.course_id) + # Prioritize instructor role if present + if access_role.role == "instructor" or course_id not in course_role_map: + course_role_map[course_id] = access_role.role + return course_role_map + + def get_accessible_courses_for_user(self, auth_user): + """Return queryset of courses accessible by the authenticated user.""" + if auth_user.is_superuser or auth_user.is_staff: + return CourseOverview.objects.all() + + access_roles = CourseAccessRole.objects.filter( + user=auth_user, role="instructor" + ).only("org", "course_id") + + # Collect org-level and course-level permissions + orgs = set() + course_keys = set() + + for access_role in access_roles: + if access_role.course_id: + course_keys.add(str(access_role.course_id)) + elif access_role.org: + orgs.add(access_role.org) + + # Build a filter to fetch all courses: + # - Courses in the orgs where the user has org-level access + # - Courses explicitly assigned to the user + course_filter = Q() + if orgs: + course_filter |= Q(org__in=orgs) + if course_keys: + course_filter |= Q(id__in=course_keys) + + return ( + CourseOverview.objects.filter(course_filter) + if course_filter + else CourseOverview.objects.none() + ) + + def get_queryset(self): + """Main entry to return courses filtered by authenticated user access and requested user roles.""" + email = self.request.query_params.get("email") + username = self.request.query_params.get("username") + user_id = self.request.query_params.get("user_id") + + if not any([email, username, user_id]): + raise ValidationError( + { + "detail": "Missing required query parameters. " + "At least one of 'email', 'username', or 'user_id' is required." + } + ) + + # Build user lookup query + user_query = Q(is_active=True) + if email: + user_query &= Q(email=email) + elif username: + user_query &= Q(username=username) + elif user_id: + user_query &= Q(id=user_id) + + try: + user = User.objects.get(user_query) + except ObjectDoesNotExist as exc: + identifier = email or username or user_id + raise NotFound( + f"User with identifier '{identifier}' not found or not active." + ) from exc + + self._access_roles = CourseAccessRole.objects.filter( + user=user, role__in=["staff", "instructor"] + ) + self._course_role_map = self.get_course_role_map_for_user(user) + + auth_user = self.request.user + return self.get_accessible_courses_for_user(auth_user) + + def list(self, request, *args, **kwargs): + """list of courses for the organization and user.""" + queryset = self.get_queryset() + serializer = self.get_serializer(queryset, many=True) + return Response(serializer.data) + + from drf_yasg import openapi + from drf_yasg.utils import swagger_auto_schema + + @swagger_auto_schema( + manual_parameters=[ + openapi.Parameter( + "email", + openapi.IN_QUERY, + description="User's email address", + type=openapi.TYPE_STRING, + ), + openapi.Parameter( + "username", + openapi.IN_QUERY, + description="User's username", + type=openapi.TYPE_STRING, + ), + openapi.Parameter( + "user_id", + openapi.IN_QUERY, + description="User's ID", + type=openapi.TYPE_INTEGER, + ), + ] + ) + def get(self, request, *args, **kwargs): + """ + Retrieve a list of courses accessible by the authenticated user + + **Use Case** + + GET API to retrieve a list of courses accessible by the authenticated user, + along with the specified user's role ("instructor", "staff", or null) in each course. + + **Endpoint** + + GET /api/support/v1/manage_course_team/ + + **Query Parameters** + + At least one of the following parameters is required: + - email: User's email address + - username: User's username + - user_id: User's ID + + **Returns** + + List of courses accessible to the authenticated user, each annotated with the + specified user's role in that course. Each course includes organizational + information and identifiers. + + **Example Response** + + ```json + { + "results": [ + { + "course_id": "course-v1:edX+DemoX+2025_T1", + "course_name": "edX Demonstration Course", + "role": "instructor", + "status": "active", + "org": "edX", + "run": "2025_T1", + "number": "DemoX" + }, + { + "course_id": "course-v1:MITx+6.00x+2024_Fall", + "course_name": "Introduction to Computer Science", + "role": "staff", + "status": "archived", + "org": "MITx", + "run": "2024_Fall", + "number": "6.00x" + } + ] + } + ``` + """ + return self.list(request, *args, **kwargs) + + def put(self, request, *args, **kwargs): + """ + Bulk assign or revoke course team roles for a user across multiple courses. + + **Endpoint** + + PUT /api/support/v1/manage_course_team/ + + **Permissions** + + - Admin/Staff users: Can manage roles for any course + - Instructor users: Can only manage roles for courses/orgs they have instructor access to + - Other users: Access denied + + **Request Data** + + A JSON object containing: + - email: User's email address + - bulk_role_operations: List of role operations, each containing: + - course_id: Course key string + - role: Role to assign or revoke ("instructor" or "staff") + - action: "assign" or "revoke" + + **Example Request** + + ```json + { + "email": "user1@example.com", + "bulk_role_operations": [ + { + "course_id": "course-v1:edX+DemoX+2025_T1", + "role": "instructor", + "action": "assign" + }, + { + "course_id": "course-v1:edX+DemoX+2025_T2", + "role": "staff", + "action": "revoke" + } + ] + } + ``` + + **Returns** + + - HTTP 200 with results for each operation (success or error details) + - HTTP 400 if the request data is invalid + + **Example Response** + + ```json + { + "email": "user1@example.com", + "results": [ + { + "course_id": "course-v1:edX+DemoX+2025_T1", + "role": "instructor", + "action": "assign", + "status": "success" + }, + { + "course_id": "course-v1:edX+DemoX+2025_T2", + "role": "staff", + "action": "revoke", + "status": "failed", + "error": "error_message" + } + ] + } + ``` + """ + results = [] + + # Validate request data structure and extract fields + if not isinstance(request.data, dict): + return self._error_response( + "", + "Request data must be a JSON object with 'email' and 'bulk_role_operations' fields.", + ) + + email = request.data.get("email") + bulk_role_operations = request.data.get("bulk_role_operations", []) + + # Combined validation for email and bulk_role_operations + if not email: + return self._error_response( + "", "Missing required field: 'email' is required." + ) + + if not isinstance(bulk_role_operations, list) or not bulk_role_operations: + return self._error_response( + email, + "Missing or empty 'bulk_role_operations' field. Must be a non-empty list.", + ) + + auth_user = request.user + + # Get accessible orgs/courses and user validation + accessible_orgs, accessible_courses, user, validation_error = ( + self._validate_permissions_and_user(auth_user, email) + ) + if validation_error: + return validation_error + + course_key_cache = {} + + for operation_data in bulk_role_operations: + result = self._handle_role_assignment_entry( + operation_data, + auth_user, + accessible_orgs, + accessible_courses, + user, + course_key_cache, + ) + results.append(result) + + return Response({"email": email, "results": results}, status=status.HTTP_200_OK) + + def _fetch_user_accessible_orgs_and_courses(self, auth_user): + """Return (orgs, courses, error_response) based on user permissions.""" + accessible_orgs = set() + accessible_courses = set() + permission_error = None + + # Admins and staff have full access by default + if not (auth_user.is_superuser or auth_user.is_staff): + roles = CourseAccessRole.objects.filter( + user=auth_user, role="instructor" + ).only("org", "course_id") + + if roles.exists(): + for role in roles: + if role.course_id: + accessible_courses.add(str(role.course_id)) + elif role.org: + # Lowercase course org for case-insensitive compare + accessible_orgs.add(role.org.lower()) + else: + permission_error = Response( + { + "results": [ + { + "status": "failed", + "error": "You do not have permission to perform bulk role operations.", + } + ] + }, + status=status.HTTP_403_FORBIDDEN, + ) + + return accessible_orgs, accessible_courses, permission_error + + def _error_response( + self, email, error_message, status_code=status.HTTP_400_BAD_REQUEST + ): + """Helper method to create standardized error responses.""" + return Response( + { + "email": email, + "results": [ + { + "status": "failed", + "error": error_message, + } + ], + }, + status=status_code, + ) + + def _validate_permissions_and_user(self, auth_user, email): + """Combined validation for user permissions and user lookup.""" + error_response = None + user = None + + # Get accessible orgs/courses + accessible_orgs, accessible_courses, permission_error = ( + self._fetch_user_accessible_orgs_and_courses(auth_user) + ) + + if permission_error: + error_response = self._error_response( + email, + "You do not have permission to perform bulk role operations.", + status.HTTP_403_FORBIDDEN, + ) + return accessible_orgs, accessible_courses, user, error_response + + # Get and validate user + user = self._get_user(email, {}) + if not user: + error_response = self._error_response( + email, "User not found.", status.HTTP_404_NOT_FOUND + ) + elif not user.is_active: + error_response = self._error_response(email, "User is not active.") + + return accessible_orgs, accessible_courses, user, error_response + + def _handle_role_assignment_entry( + self, + data, + auth_user, + accessible_orgs, + accessible_courses, + user, + course_key_cache, + ): + """Validate and perform the action for a single operation entry.""" + + course_id = data.get("course_id") + role = data.get("role") + action = data.get("action") + + # Validate required fields + if not all([course_id, role, action]): + return self._make_result( + data, + outcome="failed", + error="Missing required fields: 'course_id', 'role', and 'action' are all required.", + ) + + # Validate role field + if role not in {"instructor", "staff"}: + return self._make_result( + data, "failed", "Invalid role. Must be 'instructor' or 'staff'." + ) + + # Validate action field + if action not in {"assign", "revoke"}: + return self._make_result( + data, "failed", "Invalid action. Must be 'assign' or 'revoke'." + ) + + # Validate course + course_key = self._validate_course(course_id, course_key_cache) + if not course_key: + return self._make_result( + data, "failed", "Invalid course_id or course not found." + ) + + # Ensure only admin/staff or authorized instructors can manage roles for this course/org + if not (auth_user.is_staff or auth_user.is_superuser) and ( + course_id not in accessible_courses + # Lowercase course org for case-insensitive compare + and course_key.org.lower() not in accessible_orgs + ): + return self._make_result( + data, + "failed", + f"You do not have instructor access to course '{course_id}' or org '{course_key.org}'.", + ) + + # Perform role update based on action + return self._perform_role_action(data, role, action, user, course_key) + + def _validate_course(self, course_id, cache): + """ + Validate that course_id is well-formed and course exists. + Caches results to minimize DB queries. + """ + if course_id in cache: + return cache[course_id] + + try: + course_key = CourseKey.from_string(course_id) + if not CourseOverview.course_exists(course_key): + course_key = None + except Exception as exc: # pylint: disable=broad-except + course_key = None + + cache[course_id] = course_key + return course_key + + def _get_user(self, email, cache): + """Fetch or get cached user by email.""" + if email in cache: + return cache[email] + + try: + user = User.objects.get(email=email) + except User.DoesNotExist: + user = None + + cache[email] = user + return user + + def _perform_role_action(self, data, role, action, user, course_key): + """Assign or revoke role for the user on the course.""" + try: + course_role = CourseRole(role, course_key) + if action == "assign": + course_role.add_users(user) + CourseEnrollment.enroll(user, course_key) + elif action == "revoke": + course_role.remove_users(user) + else: + return self._make_result( + data, "failed", "Invalid action. Use 'assign' or 'revoke'." + ) + return self._make_result(data, "success") + except Exception as exc: # pylint: disable=broad-except + return self._make_result(data, "failed", str(exc)) + + def _make_result(self, data, outcome, error=None): + """ + Formats the response for each role assignment entry. + """ + result = { + "course_id": data.get("course_id"), + "role": data.get("role"), + "action": data.get("action"), + "status": outcome, + } + if error: + result["error"] = error + return result diff --git a/lms/urls.py b/lms/urls.py index d6884690d0..6cd51e94cf 100644 --- a/lms/urls.py +++ b/lms/urls.py @@ -227,6 +227,14 @@ urlpatterns += [ urlpatterns += [ path('support/', include('lms.djangoapps.support.urls')), + # Support API RESTful endpoints + path( + 'api/support/', + include( + ('lms.djangoapps.support.rest_api.urls', 'lms.djangoapps.support'), + namespace='support_api', + ) + ), ] # Favicon