Files

1100 lines
43 KiB
Python

"""
Instructor API v2 views.
This module contains the v2 API endpoints for instructor functionality.
These APIs are designed to be consumed by MFEs and other API clients.
"""
import csv
import io
import logging
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Tuple
import edx_api_doc_tools as apidocs
from django.conf import settings
from django.db import transaction
from django.utils.decorators import method_decorator
from django.utils.html import strip_tags
from django.utils.translation import gettext as _
from django.views.decorators.cache import cache_control
from edx_when import api as edx_when_api
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey, UsageKey
from pytz import UTC
from rest_framework import status
from rest_framework.exceptions import NotFound
from rest_framework.generics import GenericAPIView, ListAPIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.exceptions import ItemNotFoundError
from common.djangoapps.util.json_request import JsonResponseBadRequest
from openedx.core.djangoapps.course_groups.cohorts import is_course_cohorted
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
from lms.djangoapps.courseware.tabs import get_course_tab_list
from lms.djangoapps.instructor import permissions
from lms.djangoapps.instructor.views.api import _display_unit, get_student_from_identifier
from lms.djangoapps.instructor.views.instructor_task_helpers import extract_task_features
from lms.djangoapps.instructor_task import api as task_api
from lms.djangoapps.instructor_task.api_helper import AlreadyRunningError, QueueConnectionError
from lms.djangoapps.instructor.constants import ReportType
from lms.djangoapps.instructor.ora import get_open_response_assessment_list, get_ora_summary
from lms.djangoapps.instructor_analytics import basic as instructor_analytics_basic
from lms.djangoapps.instructor_analytics import csvs as instructor_analytics_csvs
from lms.djangoapps.instructor_task.models import ReportStore
from lms.djangoapps.instructor_task.tasks_helper.utils import upload_csv_file_to_report_store
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin
from openedx.core.lib.courses import get_course_by_id
from .serializers_v2 import (
InstructorTaskListSerializer,
CourseInformationSerializerV2,
BlockDueDateSerializerV2,
UnitExtensionSerializer,
ORASerializer,
ORASummarySerializer,
)
from .tools import (
find_unit,
get_units_with_due_date,
keep_field_private,
set_due_date_extension,
title_or_url,
)
log = logging.getLogger(__name__)
class CourseMetadataView(DeveloperErrorViewMixin, APIView):
"""
**Use Cases**
Retrieve comprehensive course metadata including enrollment counts, dashboard configuration,
permissions, and navigation sections.
"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
permission_name = permissions.VIEW_DASHBOARD
@apidocs.schema(
parameters=[
apidocs.string_parameter(
'course_id',
apidocs.ParameterLocation.PATH,
description="Course key for the course.",
),
],
responses={
200: CourseInformationSerializerV2,
401: "The requesting user is not authenticated.",
403: "The requesting user lacks instructor access to the course.",
404: "The requested course does not exist.",
},
)
def get(self, request, course_id):
"""
Retrieve comprehensive course information including metadata, enrollment statistics,
dashboard configuration, and user permissions.
**Use Cases**
Retrieve comprehensive course metadata including enrollment counts, dashboard configuration,
permissions, and navigation sections.
**Example Requests**
GET /api/instructor/v2/courses/{course_id}
**Response Values**
{
"course_id": "course-v1:edX+DemoX+Demo_Course",
"display_name": "Demonstration Course",
"org": "edX",
"course_number": "DemoX",
"enrollment_start": "2013-02-05T00:00:00Z",
"enrollment_end": null,
"start": "2013-02-05T05:00:00Z",
"end": "2024-12-31T23:59:59Z",
"pacing": "instructor",
"has_started": true,
"has_ended": false,
"total_enrollment": 150,
"enrollment_counts": {
"total": 150,
"audit": 100,
"verified": 40,
"honor": 10
},
"num_sections": 12,
"grade_cutoffs": "A is 0.9, B is 0.8, C is 0.7, D is 0.6",
"course_errors": [],
"studio_url": "https://studio.example.com/course/course-v1:edX+DemoX+2024",
"permissions": {
"admin": false,
"instructor": true,
"finance_admin": false,
"sales_admin": false,
"staff": true,
"forum_admin": true,
"data_researcher": false
},
"tabs": [
{
"tab_id": "courseware",
"title": "Course",
"url": "INSTRUCTOR_MICROFRONTEND_URL/courses/course-v1:edX+DemoX+2024/courseware"
},
{
"tab_id": "progress",
"title": "Progress",
"url": "INSTRUCTOR_MICROFRONTEND_URL/courses/course-v1:edX+DemoX+2024/progress"
},
],
"disable_buttons": false,
"analytics_dashboard_message": "To gain insights into student enrollment and participation..."
}
**Parameters**
course_key: Course key for the course.
**Returns**
* 200: OK - Returns course metadata
* 401: Unauthorized - User is not authenticated
* 403: Forbidden - User lacks instructor permissions
* 404: Not Found - Course does not exist
"""
course_key = CourseKey.from_string(course_id)
course = get_course_by_id(course_key)
tabs = get_course_tab_list(request.user, course)
context = {
'tabs': tabs,
'course': course,
'user': request.user,
'request': request
}
serializer = CourseInformationSerializerV2(context)
return Response(serializer.data, status=status.HTTP_200_OK)
class InstructorTaskListView(DeveloperErrorViewMixin, APIView):
"""
**Use Cases**
List instructor tasks for a course.
**Example Requests**
GET /api/instructor/v2/courses/{course_key}/instructor_tasks
GET /api/instructor/v2/courses/{course_key}/instructor_tasks?problem_location_str=block-v1:...
GET /api/instructor/v2/courses/{course_key}/instructor_tasks?
problem_location_str=block-v1:...&unique_student_identifier=student@example.com
**Response Values**
{
"tasks": [
{
"task_id": "2519ff31-22d9-4a62-91e2-55495895b355",
"task_type": "grade_problems",
"task_state": "PROGRESS",
"status": "Incomplete",
"created": "2019-01-15T18:00:15.902470+00:00",
"task_input": "{}",
"task_output": null,
"duration_sec": "unknown",
"task_message": "No status information available",
"requester": "staff"
}
]
}
**Parameters**
course_key: Course key for the course.
problem_location_str (optional): Filter tasks to a specific problem location.
unique_student_identifier (optional): Filter tasks to specific student (must be used with problem_location_str).
**Returns**
* 200: OK - Returns list of instructor tasks
* 400: Bad Request - Invalid parameters
* 401: Unauthorized - User is not authenticated
* 403: Forbidden - User lacks instructor permissions
* 404: Not Found - Course does not exist
"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
permission_name = permissions.SHOW_TASKS
@apidocs.schema(
parameters=[
apidocs.string_parameter(
'course_id',
apidocs.ParameterLocation.PATH,
description="Course key for the course.",
),
apidocs.string_parameter(
'problem_location_str',
apidocs.ParameterLocation.QUERY,
description="Optional: Filter tasks to a specific problem location.",
),
apidocs.string_parameter(
'unique_student_identifier',
apidocs.ParameterLocation.QUERY,
description="Optional: Filter tasks to a specific student (requires problem_location_str).",
),
],
responses={
200: InstructorTaskListSerializer,
400: "Invalid parameters provided.",
401: "The requesting user is not authenticated.",
403: "The requesting user lacks instructor access to the course.",
404: "The requested course does not exist.",
},
)
def get(self, request, course_id):
"""
List instructor tasks for a course.
"""
course_key = CourseKey.from_string(course_id)
# Get query parameters
problem_location_str = request.query_params.get('problem_location_str', None)
unique_student_identifier = request.query_params.get('unique_student_identifier', None)
student = None
if unique_student_identifier:
try:
student = get_student_from_identifier(unique_student_identifier)
except Exception: # pylint: disable=broad-except
return Response(
{'error': 'Invalid student identifier'},
status=status.HTTP_400_BAD_REQUEST
)
# Validate parameters
if student and not problem_location_str:
return Response(
{'error': 'unique_student_identifier must be used with problem_location_str'},
status=status.HTTP_400_BAD_REQUEST
)
# Get tasks based on filters
if problem_location_str:
try:
module_state_key = UsageKey.from_string(problem_location_str).map_into_course(course_key)
except InvalidKeyError:
return Response(
{'error': 'Invalid problem location'},
status=status.HTTP_400_BAD_REQUEST
)
if student:
# Tasks for specific problem and student
tasks = task_api.get_instructor_task_history(course_key, module_state_key, student)
else:
# Tasks for specific problem
tasks = task_api.get_instructor_task_history(course_key, module_state_key)
else:
# All running tasks
tasks = task_api.get_running_instructor_tasks(course_key)
# Extract task features and serialize
tasks_data = [extract_task_features(task) for task in tasks]
serializer = InstructorTaskListSerializer({'tasks': tasks_data})
return Response(serializer.data, status=status.HTTP_200_OK)
@method_decorator(cache_control(no_cache=True, no_store=True, must_revalidate=True), name='dispatch')
class ChangeDueDateView(APIView):
"""
Grants a due date extension to a student for a particular unit.
this version works with a new payload that is JSON and more up to date.
"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
permission_name = permissions.GIVE_STUDENT_EXTENSION
serializer_class = BlockDueDateSerializerV2
def post(self, request, course_id):
"""
Grants a due date extension to a learner for a particular unit.
params:
blockId (str): The URL related to the block that needs the due date update.
due_datetime (str): The new due date and time for the block.
email_or_username (str): The email or username of the learner whose access is being modified.
"""
serializer_data = self.serializer_class(data=request.data)
if not serializer_data.is_valid():
return JsonResponseBadRequest({'error': serializer_data.errors})
learner = serializer_data.validated_data.get('email_or_username')
due_date = serializer_data.validated_data.get('due_datetime')
course = get_course_by_id(CourseKey.from_string(course_id))
unit = find_unit(course, serializer_data.validated_data.get('block_id'))
reason = strip_tags(serializer_data.validated_data.get('reason', ''))
try:
set_due_date_extension(course, unit, learner, due_date, request.user, reason=reason)
except Exception as error: # pylint: disable=broad-except
return JsonResponseBadRequest({'error': str(error)})
return Response(
{
'message': _(
'Successfully changed due date for learner {0} for {1} '
'to {2}').
format(learner.profile.name, _display_unit(unit), due_date.strftime('%Y-%m-%d %H:%M')
)})
class GradedSubsectionsView(APIView):
"""View to retrieve graded subsections with due dates"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
permission_name = permissions.VIEW_DASHBOARD
def get(self, request, course_id):
"""
Retrieves a list of graded subsections (units with due dates) within a specified course.
"""
course_key = CourseKey.from_string(course_id)
course = get_course_by_id(course_key)
graded_subsections = get_units_with_due_date(course)
formated_subsections = {"items": [
{
"display_name": title_or_url(unit),
"subsection_id": str(unit.location)
} for unit in graded_subsections]}
return Response(formated_subsections, status=status.HTTP_200_OK)
@dataclass(frozen=True)
class UnitDueDateExtension:
"""Dataclass representing a unit due date extension for a student."""
username: str
full_name: str
email: str
unit_title: str
unit_location: str
extended_due_date: Optional[str]
@classmethod
def from_block_tuple(cls, row: Tuple, unit):
username, full_name, due_date, email, location = row
unit_title = title_or_url(unit)
return cls(
username=username,
full_name=full_name,
email=email,
unit_title=unit_title,
unit_location=location,
extended_due_date=due_date,
)
@classmethod
def from_course_tuple(cls, row: Tuple, units_dict: dict):
username, full_name, email, location, due_date = row
unit_title = title_or_url(units_dict[str(location)])
return cls(
username=username,
full_name=full_name,
email=email,
unit_title=unit_title,
unit_location=location,
extended_due_date=due_date,
)
class UnitExtensionsView(ListAPIView):
"""
Retrieve a paginated list of due date extensions for units in a course.
**Example Requests**
GET /api/instructor/v2/courses/{course_id}/unit_extensions
GET /api/instructor/v2/courses/{course_id}/unit_extensions?page=2
GET /api/instructor/v2/courses/{course_id}/unit_extensions?page_size=50
GET /api/instructor/v2/courses/{course_id}/unit_extensions?email_or_username=john
GET /api/instructor/v2/courses/{course_id}/unit_extensions?block_id=block-v1:org@problem+block@unit1
**Response Values**
{
"count": 150,
"next": "http://example.com/api/instructor/v2/courses/course-v1:org+course+run/unit_extensions?page=2",
"previous": null,
"results": [
{
"username": "student1",
"full_name": "John Doe",
"email": "john.doe@example.com",
"unit_title": "Unit 1: Introduction",
"unit_location": "block-v1:org+course+run+type@problem+block@unit1",
"extended_due_date": "2023-12-25T23:59:59Z"
},
...
]
}
**Parameters**
course_id: Course key for the course.
page (optional): Page number for pagination.
page_size (optional): Number of results per page.
**Returns**
* 200: OK - Returns paginated list of unit extensions
* 401: Unauthorized - User is not authenticated
* 403: Forbidden - User lacks instructor permissions
* 404: Not Found - Course does not exist
"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
permission_name = permissions.VIEW_DASHBOARD
serializer_class = UnitExtensionSerializer
filter_backends = []
def _matches_email_or_username(self, unit_extension, filter_value):
"""
Check if the unit extension matches the email or username filter.
"""
return (
filter_value in unit_extension.username.lower()
or filter_value in unit_extension.email.lower()
)
def get_queryset(self):
"""
Returns the queryset of unit extensions for the specified course.
This method uses the core logic from get_overrides_for_course to retrieve
due date extension data and transforms it into a list of normalized objects
that can be paginated and serialized.
Supports filtering by:
- email_or_username: Filter by username or email address
- block_id: Filter by specific unit/subsection location
"""
course_id = self.kwargs["course_id"]
course_key = CourseKey.from_string(course_id)
course = get_course_by_id(course_key)
email_or_username_filter = self.request.query_params.get("email_or_username")
block_id_filter = self.request.query_params.get("block_id")
units = get_units_with_due_date(course)
units_dict = {str(u.location): u for u in units}
# Fetch and normalize overrides
if block_id_filter:
try:
unit = find_unit(course, block_id_filter)
query_data = edx_when_api.get_overrides_for_block(course.id, unit.location)
unit_due_date_extensions = [
UnitDueDateExtension.from_block_tuple(row, unit)
for row in query_data
]
except InvalidKeyError:
# If block_id is invalid, return empty list
unit_due_date_extensions = []
else:
query_data = edx_when_api.get_overrides_for_course(course.id)
unit_due_date_extensions = [
UnitDueDateExtension.from_course_tuple(row, units_dict)
for row in query_data
if str(row[3]) in units_dict # Ensure unit has due date
]
# Apply filters if any
filter_value = email_or_username_filter.lower() if email_or_username_filter else None
results = [
extension
for extension in unit_due_date_extensions
if self._matches_email_or_username(extension, filter_value)
] if filter_value else unit_due_date_extensions # if no filter, use all
# Sort for consistent ordering
results.sort(
key=lambda o: (
o.username,
o.unit_title,
)
)
return results
class ORAView(GenericAPIView):
"""
View to list all Open Response Assessments (ORAs) for a given course.
* Requires token authentication.
* Only instructors or staff for the course are able to access this view.
"""
permission_classes = [IsAuthenticated, permissions.InstructorPermission]
permission_name = permissions.VIEW_DASHBOARD
serializer_class = ORASerializer
def get_course(self):
"""
Retrieve the course object based on the course_id URL parameter.
Validates that the course exists and is not deprecated.
Raises NotFound if the course does not exist.
"""
course_id = self.kwargs.get("course_id")
try:
course_key = CourseKey.from_string(course_id)
except InvalidKeyError as exc:
log.error("Unable to find course with course key %s while loading the Instructor Dashboard.", course_id)
raise NotFound("Course not found") from exc
if course_key.deprecated:
raise NotFound("Course not found")
course = get_course_by_id(course_key, depth=None)
return course
def get(self, request, *args, **kwargs):
"""
Return a list of all ORAs for the specified course.
"""
course = self.get_course()
items = get_open_response_assessment_list(course)
page = self.paginate_queryset(items)
if page is None:
# Pagination is required for this endpoint
return Response(
{"detail": "Pagination is required for this endpoint."},
status=status.HTTP_400_BAD_REQUEST,
)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
class ReportDownloadsView(DeveloperErrorViewMixin, APIView):
"""
**Use Cases**
List all available report downloads for a course.
**Example Requests**
GET /api/instructor/v2/courses/{course_key}/reports
**Response Values**
{
"downloads": [
{
"report_name": "course-v1_edX_DemoX_Demo_Course_grade_report_2024-01-26-1030.csv",
"report_url":
"/grades/course-v1:edX+DemoX+Demo_Course/"
"course-v1_edX_DemoX_Demo_Course_grade_report_2024-01-26-1030.csv",
"date_generated": "2024-01-26T10:30:00Z",
"report_type": "grade" # Uses ReportType.GRADE.value
}
]
}
**Parameters**
course_key: Course key for the course.
**Returns**
* 200: OK - Returns list of available reports
* 401: Unauthorized - User is not authenticated
* 403: Forbidden - User lacks staff access to the course
* 404: Not Found - Course does not exist
"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
# Use ENROLLMENT_REPORT permission which allows course staff and data researchers
# to view generated reports, aligning with the intended audience of instructors/course staff
permission_name = permissions.ENROLLMENT_REPORT
@apidocs.schema(
parameters=[
apidocs.string_parameter(
'course_id',
apidocs.ParameterLocation.PATH,
description="Course key for the course.",
),
],
responses={
200: "Returns list of available report downloads.",
401: "The requesting user is not authenticated.",
403: "The requesting user lacks instructor access to the course.",
404: "The requested course does not exist.",
},
)
def get(self, request, course_id):
"""
List all available report downloads for a course.
"""
course_key = CourseKey.from_string(course_id)
# Validate that the course exists
get_course_by_id(course_key)
report_store = ReportStore.from_config(config_name='GRADES_DOWNLOAD')
downloads = []
for name, url in report_store.links_for(course_key):
# Determine report type from filename using helper method
report_type = self._detect_report_type_from_filename(name)
# Extract date from filename if possible (format: YYYY-MM-DD-HHMM)
date_generated = self._extract_date_from_filename(name)
downloads.append({
'report_name': name,
'report_url': url,
'date_generated': date_generated,
'report_type': report_type,
})
return Response({'downloads': downloads}, status=status.HTTP_200_OK)
def _detect_report_type_from_filename(self, filename):
"""
Detect report type from filename using pattern matching.
Check more specific patterns first to avoid false matches.
Args:
filename: The name of the report file
Returns:
str: The report type identifier
"""
name_lower = filename.lower()
# Check more specific patterns first to avoid false matches
# Match exact report names from the filename format: {course_prefix}_{csv_name}_{timestamp}.csv
if 'inactive_enrolled' in name_lower:
return ReportType.PENDING_ACTIVATIONS.value
elif 'problem_grade_report' in name_lower:
return ReportType.PROBLEM_GRADE.value
elif 'ora2_submission' in name_lower or 'submission_files' in name_lower or 'ora_submission' in name_lower:
return ReportType.ORA2_SUBMISSION_FILES.value
elif 'ora2_summary' in name_lower or 'ora_summary' in name_lower:
return ReportType.ORA2_SUMMARY.value
elif 'ora2_data' in name_lower or 'ora_data' in name_lower:
return ReportType.ORA2_DATA.value
elif 'may_enroll' in name_lower:
return ReportType.PENDING_ENROLLMENTS.value
elif 'student_state' in name_lower or 'problem_responses' in name_lower:
return ReportType.PROBLEM_RESPONSES.value
elif 'anonymized_ids' in name_lower or 'anon' in name_lower:
return ReportType.ANONYMIZED_STUDENT_IDS.value
elif 'issued_certificates' in name_lower or 'certificate' in name_lower:
return ReportType.ISSUED_CERTIFICATES.value
elif 'grade_report' in name_lower:
return ReportType.GRADE.value
elif 'enrolled_students' in name_lower or 'profile' in name_lower:
return ReportType.ENROLLED_STUDENTS.value
return ReportType.UNKNOWN.value
def _extract_date_from_filename(self, filename):
"""
Extract date from filename (format: YYYY-MM-DD-HHMM).
Args:
filename: The name of the report file
Returns:
str: ISO formatted date string or None
"""
date_match = re.search(r'_(\d{4}-\d{2}-\d{2}-\d{4})', filename)
if date_match:
date_str = date_match.group(1)
try:
# Parse the date string (YYYY-MM-DD-HHMM) directly
dt = datetime.strptime(date_str, '%Y-%m-%d-%H%M')
# Format as ISO 8601 with UTC timezone
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
except ValueError:
pass
return None
@method_decorator(transaction.non_atomic_requests, name='dispatch')
class GenerateReportView(DeveloperErrorViewMixin, APIView):
"""
**Use Cases**
Generate a specific type of report for a course.
**Example Requests**
POST /api/instructor/v2/courses/{course_key}/reports/enrolled_students/generate
POST /api/instructor/v2/courses/{course_key}/reports/grade/generate
POST /api/instructor/v2/courses/{course_key}/reports/problem_responses/generate
**Response Values**
{
"status": "The report is being created. Please check the data downloads section for the status."
}
**Parameters**
course_key: Course key for the course.
report_type: Type of report to generate. Valid values:
- enrolled_students: Enrolled Students Report
- pending_enrollments: Pending Enrollments Report
- pending_activations: Pending Activations Report (inactive users with enrollments)
- anonymized_student_ids: Anonymized Student IDs Report
- grade: Grade Report
- problem_grade: Problem Grade Report
- problem_responses: Problem Responses Report
- ora2_summary: ORA Summary Report
- ora2_data: ORA Data Report
- ora2_submission_files: ORA Submission Files Report
- issued_certificates: Issued Certificates Report
**Returns**
* 200: OK - Report generation task has been submitted
* 400: Bad Request - Task is already running or invalid report type
* 401: Unauthorized - User is not authenticated
* 403: Forbidden - User lacks instructor permissions
* 404: Not Found - Course does not exist
"""
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
@property
def permission_name(self):
"""
Return the appropriate permission name based on the requested report type.
For the issued certificates report, mirror the v1 behavior by using
VIEW_ISSUED_CERTIFICATES (course-level staff access). For all other reports,
require CAN_RESEARCH.
"""
report_type = self.kwargs.get('report_type')
if report_type == ReportType.ISSUED_CERTIFICATES.value:
return permissions.VIEW_ISSUED_CERTIFICATES
return permissions.CAN_RESEARCH
@apidocs.schema(
parameters=[
apidocs.string_parameter(
'course_id',
apidocs.ParameterLocation.PATH,
description="Course key for the course.",
),
apidocs.string_parameter(
'report_type',
apidocs.ParameterLocation.PATH,
description=(
"Type of report to generate. Valid values: "
"enrolled_students, pending_enrollments, pending_activations, "
"anonymized_student_ids, grade, problem_grade, problem_responses, "
"ora2_summary, ora2_data, ora2_submission_files, issued_certificates"
),
),
],
responses={
200: "Report generation task has been submitted successfully.",
400: "The requested task is already running or invalid report type.",
401: "The requesting user is not authenticated.",
403: "The requesting user lacks instructor access to the course.",
404: "The requested course does not exist.",
},
)
def post(self, request, course_id, report_type):
"""
Generate a specific type of report for a course.
"""
course_key = CourseKey.from_string(course_id)
# Map report types to their submission functions
report_handlers = {
ReportType.ENROLLED_STUDENTS.value: self._generate_enrolled_students_report,
ReportType.PENDING_ENROLLMENTS.value: self._generate_pending_enrollments_report,
ReportType.PENDING_ACTIVATIONS.value: self._generate_pending_activations_report,
ReportType.ANONYMIZED_STUDENT_IDS.value: self._generate_anonymized_ids_report,
ReportType.GRADE.value: self._generate_grade_report,
ReportType.PROBLEM_GRADE.value: self._generate_problem_grade_report,
ReportType.PROBLEM_RESPONSES.value: self._generate_problem_responses_report,
ReportType.ORA2_SUMMARY.value: self._generate_ora2_summary_report,
ReportType.ORA2_DATA.value: self._generate_ora2_data_report,
ReportType.ORA2_SUBMISSION_FILES.value: self._generate_ora2_submission_files_report,
ReportType.ISSUED_CERTIFICATES.value: self._generate_issued_certificates_report,
}
handler = report_handlers.get(report_type)
if not handler:
return Response(
{'error': f'Invalid report type: {report_type}'},
status=status.HTTP_400_BAD_REQUEST
)
try:
success_message = handler(request, course_key)
except AlreadyRunningError as error:
log.warning("Task already running for %s report: %s", report_type, error)
return Response(
{'error': _('A report generation task is already running. Please wait for it to complete.')},
status=status.HTTP_400_BAD_REQUEST
)
except QueueConnectionError as error:
log.error("Queue connection error for %s report task: %s", report_type, error)
return Response(
{'error': _('Unable to connect to the task queue. Please try again later.')},
status=status.HTTP_503_SERVICE_UNAVAILABLE
)
except ValueError as error:
log.error("Error submitting %s report task: %s", report_type, error)
return Response(
{'error': str(error)},
status=status.HTTP_400_BAD_REQUEST
)
return Response({'status': success_message}, status=status.HTTP_200_OK)
def _generate_enrolled_students_report(self, request, course_key):
"""Generate enrolled students report."""
course = get_course_by_id(course_key)
available_features = instructor_analytics_basic.get_available_features(course_key)
# Allow for sites to be able to define additional columns.
# Note that adding additional columns has the potential to break
# the student profile report due to a character limit on the
# asynchronous job input which in this case is a JSON string
# containing the list of columns to include in the report.
# TODO: Refactor the student profile report code to remove the list of columns
# that should be included in the report from the asynchronous job input.
# We need to clone the list because we modify it below
query_features = list(configuration_helpers.get_value('student_profile_download_fields', []))
if not query_features:
query_features = [
'id', 'username', 'name', 'email', 'language', 'location',
'year_of_birth', 'gender', 'level_of_education', 'mailing_address',
'goals', 'enrollment_mode', 'last_login', 'date_joined', 'external_user_key',
'enrollment_date',
]
additional_attributes = configuration_helpers.get_value_for_org(
course_key.org,
"additional_student_profile_attributes"
)
if additional_attributes:
# Fail fast: must be list/tuple of strings.
if not isinstance(additional_attributes, (list, tuple)):
raise ValueError(
_('Invalid additional student attribute configuration: expected list of strings, got {type}.')
.format(type=type(additional_attributes).__name__)
)
if not all(isinstance(v, str) for v in additional_attributes):
raise ValueError(
_('Invalid additional student attribute configuration: all entries must be strings.')
)
# Reject empty string entries explicitly.
if any(v == '' for v in additional_attributes):
raise ValueError(
_('Invalid additional student attribute configuration: empty attribute names are not allowed.')
)
# Validate each attribute is in available_features; allow duplicates as provided.
invalid = [v for v in additional_attributes if v not in available_features]
if invalid:
raise ValueError(
_('Invalid additional student attributes: {attrs}').format(
attrs=', '.join(invalid)
)
)
query_features.extend(additional_attributes)
for field in settings.PROFILE_INFORMATION_REPORT_PRIVATE_FIELDS:
keep_field_private(query_features, field)
if is_course_cohorted(course.id):
query_features.append('cohort')
if course.teams_enabled:
query_features.append('team')
# For compatibility reasons, city and country should always appear last.
query_features.append('city')
query_features.append('country')
task_api.submit_calculate_students_features_csv(request, course_key, query_features)
return _('The enrolled student report is being created.')
def _generate_pending_enrollments_report(self, request, course_key):
"""Generate pending enrollments report."""
query_features = ['email']
task_api.submit_calculate_may_enroll_csv(request, course_key, query_features)
return _('The pending enrollments report is being created.')
def _generate_pending_activations_report(self, request, course_key):
"""Generate pending activations report."""
query_features = ['email']
task_api.submit_calculate_inactive_enrolled_students_csv(request, course_key, query_features)
return _('The pending activations report is being created.')
def _generate_anonymized_ids_report(self, request, course_key):
"""Generate anonymized student IDs report."""
task_api.generate_anonymous_ids(request, course_key)
return _('The anonymized student IDs report is being created.')
def _generate_grade_report(self, request, course_key):
"""Generate grade report."""
task_api.submit_calculate_grades_csv(request, course_key)
return _('The grade report is being created.')
def _generate_problem_grade_report(self, request, course_key):
"""Generate problem grade report."""
task_api.submit_problem_grade_report(request, course_key)
return _('The problem grade report is being created.')
def _generate_problem_responses_report(self, request, course_key):
"""
Generate problem responses report.
Requires a problem_location (section or problem block id).
Supports optional filtering by problem types.
"""
problem_location = request.data.get('problem_location', '').strip()
problem_types_filter = request.data.get('problem_types_filter')
if not problem_location:
raise ValueError(_('Specify Section or Problem block id is required.'))
# Validate problem location
try:
usage_key = UsageKey.from_string(problem_location).map_into_course(course_key)
except InvalidKeyError as exc:
raise ValueError(_('Invalid problem location format.')) from exc
# Check if the problem actually exists in the modulestore
store = modulestore()
try:
store.get_item(usage_key)
except ItemNotFoundError as exc:
raise ValueError(_('The problem location does not exist in this course.')) from exc
problem_locations_str = problem_location
task_api.submit_calculate_problem_responses_csv(
request, course_key, problem_locations_str, problem_types_filter
)
return _('The problem responses report is being created.')
def _generate_ora2_summary_report(self, request, course_key):
"""Generate ORA2 summary report."""
task_api.submit_export_ora2_summary(request, course_key)
return _('The ORA2 summary report is being created.')
def _generate_ora2_data_report(self, request, course_key):
"""Generate ORA2 data report."""
task_api.submit_export_ora2_data(request, course_key)
return _('The ORA2 data report is being created.')
def _generate_ora2_submission_files_report(self, request, course_key):
"""Generate ORA2 submission files archive."""
task_api.submit_export_ora2_submission_files(request, course_key)
return _('The ORA2 submission files archive is being created.')
def _generate_issued_certificates_report(self, request, course_key):
"""Generate issued certificates report."""
# Query features for the report
query_features = ['course_id', 'mode', 'total_issued_certificate', 'report_run_date']
query_features_names = [
('course_id', _('CourseID')),
('mode', _('Certificate Type')),
('total_issued_certificate', _('Total Certificates Issued')),
('report_run_date', _('Date Report Run'))
]
# Get certificates data
certificates_data = instructor_analytics_basic.issued_certificates(course_key, query_features)
# Format the data for CSV
__, data_rows = instructor_analytics_csvs.format_dictlist(certificates_data, query_features)
# Generate CSV content as a file-like object
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow([col_header for __, col_header in query_features_names])
# Write data rows
for row in data_rows:
writer.writerow(row)
# Reset the buffer position to the beginning
output.seek(0)
# Store the report using the standard helper function with UTC timestamp
timestamp = datetime.now(UTC)
upload_csv_file_to_report_store(
output,
'issued_certificates',
course_key,
timestamp,
config_name='GRADES_DOWNLOAD'
)
return _('The issued certificates report has been created.')
class ORASummaryView(GenericAPIView):
"""
View to get a summary of Open Response Assessments (ORAs) for a given course.
* Requires token authentication.
* Only instructors or staff for the course are able to access this view.
"""
permission_classes = [IsAuthenticated, permissions.InstructorPermission]
permission_name = permissions.VIEW_DASHBOARD
serializer_class = ORASummarySerializer
def get_course(self):
"""
Retrieve the course object based on the course_id URL parameter.
Validates that the course exists and is not deprecated.
Raises NotFound if the course does not exist.
"""
course_id = self.kwargs.get("course_id")
try:
course_key = CourseKey.from_string(course_id)
except InvalidKeyError as exc:
log.error("Unable to find course with course key %s while loading the Instructor Dashboard.", course_id)
raise NotFound("Course not found") from exc
if course_key.deprecated:
raise NotFound("Course not found")
course = get_course_by_id(course_key, depth=None)
return course
def get(self, request, *args, **kwargs):
"""
Return a summary of ORAs for the specified course.
"""
course = self.get_course()
items = get_ora_summary(course)
serializer = self.get_serializer(items)
return Response(serializer.data)