From 54164a918cf0ff9c6b8a3f8f5676e5b69d18700c Mon Sep 17 00:00:00 2001 From: mubbsharanwar Date: Mon, 21 Jul 2025 17:42:04 +0500 Subject: [PATCH] refactor: convert function based view to class based --- .../contentstore/views/certificate_manager.py | 309 ++++++++++++++ .../contentstore/views/certificates.py | 401 +++--------------- .../contentstore/views/serializers.py | 85 ++++ .../views/tests/test_certificates.py | 12 +- cms/urls.py | 6 +- 5 files changed, 473 insertions(+), 340 deletions(-) create mode 100644 cms/djangoapps/contentstore/views/certificate_manager.py diff --git a/cms/djangoapps/contentstore/views/certificate_manager.py b/cms/djangoapps/contentstore/views/certificate_manager.py new file mode 100644 index 0000000000..429950477f --- /dev/null +++ b/cms/djangoapps/contentstore/views/certificate_manager.py @@ -0,0 +1,309 @@ +""" +Certificate Manager. +""" +import json +import logging + +from django.conf import settings +from django.utils.translation import gettext as _ + +from common.djangoapps.course_modes.models import CourseMode +from eventtracking import tracker +from opaque_keys import InvalidKeyError +from opaque_keys.edx.keys import AssetKey +from .assets import delete_asset + +from common.djangoapps.util.db import MYSQL_MAX_INT, generate_int_id + +from ..exceptions import AssetNotFoundException + +CERTIFICATE_SCHEMA_VERSION = 1 +CERTIFICATE_MINIMUM_ID = 100 + +LOGGER = logging.getLogger(__name__) + + +class CertificateException(Exception): + """ + Base exception for Certificates workflows + """ + pass # lint-amnesty, pylint: disable=unnecessary-pass + + +class CertificateValidationError(CertificateException): + """ + An exception raised when certificate information is invalid. + """ + pass # lint-amnesty, pylint: disable=unnecessary-pass + + +def _delete_asset(course_key, asset_key_string): + """ + Internal method used to create asset key from string and + remove asset by calling delete_asset method of assets module. + """ + if asset_key_string: + try: + asset_key = AssetKey.from_string(asset_key_string) + except InvalidKeyError: + # remove first slash in asset path + # otherwise it generates InvalidKeyError in case of split modulestore + if '/' == asset_key_string[0]: + asset_key_string = asset_key_string[1:] + try: + asset_key = AssetKey.from_string(asset_key_string) + except InvalidKeyError: + # Unable to parse the asset key, log and return + LOGGER.info( + "In course %r, unable to parse asset key %r, not attempting to delete signatory.", + course_key, + asset_key_string, + ) + return + else: + # Unable to parse the asset key, log and return + LOGGER.info( + "In course %r, unable to parse asset key %r, not attempting to delete signatory.", + course_key, + asset_key_string, + ) + return + + try: + delete_asset(course_key, asset_key) + # If the asset was not found, it doesn't have to be deleted... + except AssetNotFoundException: + pass + + +class CertificateManager: + """ + The CertificateManager is responsible for storage, retrieval, and manipulation of Certificates + Certificates are not stored in the Django ORM, they are a field/setting on the course block + """ + @staticmethod + def parse(json_string): + """ + Deserialize the provided JSON data into a standard Python object + """ + try: + certificate = json.loads(json_string) + except ValueError: + raise CertificateValidationError(_("invalid JSON")) # lint-amnesty, pylint: disable=raise-missing-from + # Include the data contract version + certificate["version"] = CERTIFICATE_SCHEMA_VERSION + # Ensure a signatories list is always returned + if certificate.get("signatories") is None: + certificate["signatories"] = [] + certificate["editing"] = False + return certificate + + @staticmethod + def validate(certificate_data): + """ + Ensure the certificate data contains all of the necessary fields and the values match our rules + """ + # Ensure the schema version meets our expectations + if certificate_data.get("version") != CERTIFICATE_SCHEMA_VERSION: + raise TypeError( + "Unsupported certificate schema version: {}. Expected version: {}.".format( + certificate_data.get("version"), + CERTIFICATE_SCHEMA_VERSION + ) + ) + if not certificate_data.get("name"): + raise CertificateValidationError(_("must have name of the certificate")) + + @staticmethod + def is_activated(course): + """ + Returns whether certificates are activated for the given course, + along with the certificates. + """ + is_active = False + certificates = None + if settings.FEATURES.get('CERTIFICATES_HTML_VIEW', False): + certificates = CertificateManager.get_certificates(course) + # we are assuming only one certificate in certificates collection. + for certificate in certificates: + is_active = certificate.get('is_active', False) + break + return is_active, certificates + + @staticmethod + def get_used_ids(course): + """ + Return a list of certificate identifiers that are already in use for this course + """ + if not course.certificates or not course.certificates.get('certificates'): + return [] + return [cert['id'] for cert in course.certificates['certificates']] + + @staticmethod + def assign_id(course, certificate_data, certificate_id=None): + """ + Assign an identifier to the provided certificate data. + If the caller did not provide an identifier, we autogenerate a unique one for them + In addition, we check the certificate's signatories and ensure they also have unique ids + """ + used_ids = CertificateManager.get_used_ids(course) + if certificate_id: + certificate_data['id'] = int(certificate_id) + else: + certificate_data['id'] = generate_int_id( + CERTIFICATE_MINIMUM_ID, + MYSQL_MAX_INT, + used_ids + ) + + for index, signatory in enumerate(certificate_data['signatories']): # pylint: disable=unused-variable + if signatory and not signatory.get('id', False): + signatory['id'] = generate_int_id(used_ids=used_ids) + used_ids.append(signatory['id']) + + return certificate_data + + @staticmethod + def serialize_certificate(certificate): + """ + Serialize the Certificate object's locally-stored certificate data to a JSON representation + We use direct access here for specific keys in order to enforce their presence + """ + certificate_data = certificate.certificate_data + certificate_response = { + "id": certificate_data['id'], + "name": certificate_data['name'], + "description": certificate_data['description'], + "is_active": certificate_data['is_active'], + "version": CERTIFICATE_SCHEMA_VERSION, + "signatories": certificate_data['signatories'] + } + + # Some keys are not required, such as the title override... + if certificate_data.get('course_title'): + certificate_response["course_title"] = certificate_data['course_title'] + + return certificate_response + + @staticmethod + def deserialize_certificate(course, value): + """ + Deserialize from a JSON representation into a Certificate object. + 'value' should be either a Certificate instance, or a valid JSON string + """ + if isinstance(value, bytes): + value = value.decode('utf-8') + + # Ensure the schema fieldset meets our expectations + for key in ("name", "description", "version"): + if key not in value: + raise CertificateValidationError(_("Certificate dict {0} missing value key '{1}'").format(value, key)) + + # Load up the Certificate data + certificate_data = CertificateManager.parse(value) + CertificateManager.validate(certificate_data) + certificate_data = CertificateManager.assign_id(course, certificate_data, certificate_data.get('id', None)) + certificate = Certificate(course, certificate_data) + + # Return a new Certificate object instance + return certificate + + @staticmethod + def get_certificates(course, only_active=False): + """ + Retrieve the certificates list from the provided course, + if `only_active` is True it would skip inactive certificates. + """ + # The top-level course field is 'certificates', which contains various properties, + # including the actual 'certificates' list that we're working with in this context + certificates = course.certificates.get('certificates', []) + if only_active: + certificates = [certificate for certificate in certificates if certificate.get('is_active', False)] + return certificates + + @staticmethod + def get_course_modes(course): + """ + Retrieve certificate modes for the given course, + including expired modes but excluding audit mode. + """ + course_modes = [ + mode.slug for mode in CourseMode.modes_for_course( + course=course, include_expired=True + ) if mode.slug != CourseMode.AUDIT + ] + return course_modes + + @staticmethod + def is_enabled(course): + """ + Is enabled when there is at least one course mode for the given course, + including expired modes but excluding audit mode + """ + course_modes = CertificateManager.get_course_modes(course) + return len(course_modes) > 0 + + @staticmethod + def remove_certificate(request, store, course, certificate_id): + """ + Remove certificate from the course + """ + for index, cert in enumerate(course.certificates['certificates']): + if int(cert['id']) == int(certificate_id): + certificate = course.certificates['certificates'][index] + # Remove any signatory assets prior to dropping the entire cert record from the course + for sig_index, signatory in enumerate(certificate.get('signatories')): # pylint: disable=unused-variable + _delete_asset(course.id, signatory['signature_image_path']) + # Now drop the certificate record + course.certificates['certificates'].pop(index) + store.update_item(course, request.user.id) + break + + # pylint-disable: unused-variable + @staticmethod + def remove_signatory(request, store, course, certificate_id, signatory_id): + """ + Remove the specified signatory from the provided course certificate + """ + for cert_index, cert in enumerate(course.certificates['certificates']): # pylint: disable=unused-variable + if int(cert['id']) == int(certificate_id): + for sig_index, signatory in enumerate(cert.get('signatories')): + if int(signatory_id) == int(signatory['id']): + _delete_asset(course.id, signatory['signature_image_path']) + del cert['signatories'][sig_index] + store.update_item(course, request.user.id) + break + + @staticmethod + def track_event(event_name, event_data): + """Track certificate configuration event. + + Arguments: + event_name (str): Name of the event to be logged. + event_data (dict): A Dictionary containing event data + Returns: + None + + """ + event_name = '.'.join(['edx', 'certificate', 'configuration', event_name]) + tracker.emit(event_name, event_data) + + +class Certificate: + """ + The logical representation of an individual course certificate + """ + def __init__(self, course, certificate_data): + """ + Instantiate a Certificate object instance using the provided information. + """ + self.course = course + self._certificate_data = certificate_data + self.id = certificate_data['id'] # pylint: disable=invalid-name + + @property + def certificate_data(self): + """ + Retrieve the locally-stored certificate data from the Certificate object via a helper method + """ + return self._certificate_data diff --git a/cms/djangoapps/contentstore/views/certificates.py b/cms/djangoapps/contentstore/views/certificates.py index c50ea54d9a..052bca72aa 100644 --- a/cms/djangoapps/contentstore/views/certificates.py +++ b/cms/djangoapps/contentstore/views/certificates.py @@ -23,10 +23,8 @@ course.certificates: { """ -import json import logging -from django.conf import settings from django.contrib.auth.decorators import login_required from django.core.exceptions import PermissionDenied from django.http import HttpResponse @@ -39,34 +37,28 @@ from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework import status -from common.djangoapps.course_modes.models import CourseMode -from eventtracking import tracker -from opaque_keys import InvalidKeyError -from opaque_keys.edx.keys import AssetKey, CourseKey +from opaque_keys.edx.keys import CourseKey from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin from common.djangoapps.edxmako.shortcuts import render_to_response from common.djangoapps.student.auth import has_studio_write_access from common.djangoapps.student.roles import GlobalStaff -from common.djangoapps.util.db import MYSQL_MAX_INT, generate_int_id + from common.djangoapps.util.json_request import JsonResponse from xmodule.modulestore import EdxJSONEncoder # lint-amnesty, pylint: disable=wrong-import-order from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order -from cms.djangoapps.contentstore.views.serializers import CertificateActivationSerializer +from cms.djangoapps.contentstore.views.serializers import CertificateActivationSerializer, CertificateSerializer from cms.djangoapps.contentstore.views.permissions import HasStudioWriteAccess - -from ..exceptions import AssetNotFoundException +from .certificate_manager import CertificateManager from ..toggles import use_new_certificates_page from ..utils import ( get_certificates_context, get_certificates_url, reverse_course_url, ) -from .assets import delete_asset -CERTIFICATE_SCHEMA_VERSION = 1 CERTIFICATE_MINIMUM_ID = 100 LOGGER = logging.getLogger(__name__) @@ -83,45 +75,6 @@ def _get_course_and_check_access(course_key, user, depth=0): return course_block -def _delete_asset(course_key, asset_key_string): - """ - Internal method used to create asset key from string and - remove asset by calling delete_asset method of assets module. - """ - if asset_key_string: - try: - asset_key = AssetKey.from_string(asset_key_string) - except InvalidKeyError: - # remove first slash in asset path - # otherwise it generates InvalidKeyError in case of split modulestore - if '/' == asset_key_string[0]: - asset_key_string = asset_key_string[1:] - try: - asset_key = AssetKey.from_string(asset_key_string) - except InvalidKeyError: - # Unable to parse the asset key, log and return - LOGGER.info( - "In course %r, unable to parse asset key %r, not attempting to delete signatory.", - course_key, - asset_key_string, - ) - return - else: - # Unable to parse the asset key, log and return - LOGGER.info( - "In course %r, unable to parse asset key %r, not attempting to delete signatory.", - course_key, - asset_key_string, - ) - return - - try: - delete_asset(course_key, asset_key) - # If the asset was not found, it doesn't have to be deleted... - except AssetNotFoundException: - pass - - # Certificates Exceptions class CertificateException(Exception): """ @@ -137,239 +90,6 @@ class CertificateValidationError(CertificateException): pass # lint-amnesty, pylint: disable=unnecessary-pass -class CertificateManager: - """ - The CertificateManager is responsible for storage, retrieval, and manipulation of Certificates - Certificates are not stored in the Django ORM, they are a field/setting on the course block - """ - @staticmethod - def parse(json_string): - """ - Deserialize the provided JSON data into a standard Python object - """ - try: - certificate = json.loads(json_string) - except ValueError: - raise CertificateValidationError(_("invalid JSON")) # lint-amnesty, pylint: disable=raise-missing-from - # Include the data contract version - certificate["version"] = CERTIFICATE_SCHEMA_VERSION - # Ensure a signatories list is always returned - if certificate.get("signatories") is None: - certificate["signatories"] = [] - certificate["editing"] = False - return certificate - - @staticmethod - def validate(certificate_data): - """ - Ensure the certificate data contains all of the necessary fields and the values match our rules - """ - # Ensure the schema version meets our expectations - if certificate_data.get("version") != CERTIFICATE_SCHEMA_VERSION: - raise TypeError( - "Unsupported certificate schema version: {}. Expected version: {}.".format( - certificate_data.get("version"), - CERTIFICATE_SCHEMA_VERSION - ) - ) - if not certificate_data.get("name"): - raise CertificateValidationError(_("must have name of the certificate")) - - @staticmethod - def is_activated(course): - """ - Returns whether certificates are activated for the given course, - along with the certificates. - """ - is_active = False - certificates = None - if settings.FEATURES.get('CERTIFICATES_HTML_VIEW', False): - certificates = CertificateManager.get_certificates(course) - # we are assuming only one certificate in certificates collection. - for certificate in certificates: - is_active = certificate.get('is_active', False) - break - return is_active, certificates - - @staticmethod - def get_used_ids(course): - """ - Return a list of certificate identifiers that are already in use for this course - """ - if not course.certificates or not course.certificates.get('certificates'): - return [] - return [cert['id'] for cert in course.certificates['certificates']] - - @staticmethod - def assign_id(course, certificate_data, certificate_id=None): - """ - Assign an identifier to the provided certificate data. - If the caller did not provide an identifier, we autogenerate a unique one for them - In addition, we check the certificate's signatories and ensure they also have unique ids - """ - used_ids = CertificateManager.get_used_ids(course) - if certificate_id: - certificate_data['id'] = int(certificate_id) - else: - certificate_data['id'] = generate_int_id( - CERTIFICATE_MINIMUM_ID, - MYSQL_MAX_INT, - used_ids - ) - - for index, signatory in enumerate(certificate_data['signatories']): # pylint: disable=unused-variable - if signatory and not signatory.get('id', False): - signatory['id'] = generate_int_id(used_ids=used_ids) - used_ids.append(signatory['id']) - - return certificate_data - - @staticmethod - def serialize_certificate(certificate): - """ - Serialize the Certificate object's locally-stored certificate data to a JSON representation - We use direct access here for specific keys in order to enforce their presence - """ - certificate_data = certificate.certificate_data - certificate_response = { - "id": certificate_data['id'], - "name": certificate_data['name'], - "description": certificate_data['description'], - "is_active": certificate_data['is_active'], - "version": CERTIFICATE_SCHEMA_VERSION, - "signatories": certificate_data['signatories'] - } - - # Some keys are not required, such as the title override... - if certificate_data.get('course_title'): - certificate_response["course_title"] = certificate_data['course_title'] - - return certificate_response - - @staticmethod - def deserialize_certificate(course, value): - """ - Deserialize from a JSON representation into a Certificate object. - 'value' should be either a Certificate instance, or a valid JSON string - """ - if isinstance(value, bytes): - value = value.decode('utf-8') - - # Ensure the schema fieldset meets our expectations - for key in ("name", "description", "version"): - if key not in value: - raise CertificateValidationError(_("Certificate dict {0} missing value key '{1}'").format(value, key)) - - # Load up the Certificate data - certificate_data = CertificateManager.parse(value) - CertificateManager.validate(certificate_data) - certificate_data = CertificateManager.assign_id(course, certificate_data, certificate_data.get('id', None)) - certificate = Certificate(course, certificate_data) - - # Return a new Certificate object instance - return certificate - - @staticmethod - def get_certificates(course, only_active=False): - """ - Retrieve the certificates list from the provided course, - if `only_active` is True it would skip inactive certificates. - """ - # The top-level course field is 'certificates', which contains various properties, - # including the actual 'certificates' list that we're working with in this context - certificates = course.certificates.get('certificates', []) - if only_active: - certificates = [certificate for certificate in certificates if certificate.get('is_active', False)] - return certificates - - @staticmethod - def get_course_modes(course): - """ - Retrieve certificate modes for the given course, - including expired modes but excluding audit mode. - """ - course_modes = [ - mode.slug for mode in CourseMode.modes_for_course( - course=course, include_expired=True - ) if mode.slug != CourseMode.AUDIT - ] - return course_modes - - @staticmethod - def is_enabled(course): - """ - Is enabled when there is at least one course mode for the given course, - including expired modes but excluding audit mode - """ - course_modes = CertificateManager.get_course_modes(course) - return len(course_modes) > 0 - - @staticmethod - def remove_certificate(request, store, course, certificate_id): - """ - Remove certificate from the course - """ - for index, cert in enumerate(course.certificates['certificates']): - if int(cert['id']) == int(certificate_id): - certificate = course.certificates['certificates'][index] - # Remove any signatory assets prior to dropping the entire cert record from the course - for sig_index, signatory in enumerate(certificate.get('signatories')): # pylint: disable=unused-variable - _delete_asset(course.id, signatory['signature_image_path']) - # Now drop the certificate record - course.certificates['certificates'].pop(index) - store.update_item(course, request.user.id) - break - - # pylint-disable: unused-variable - @staticmethod - def remove_signatory(request, store, course, certificate_id, signatory_id): - """ - Remove the specified signatory from the provided course certificate - """ - for cert_index, cert in enumerate(course.certificates['certificates']): # pylint: disable=unused-variable - if int(cert['id']) == int(certificate_id): - for sig_index, signatory in enumerate(cert.get('signatories')): - if int(signatory_id) == int(signatory['id']): - _delete_asset(course.id, signatory['signature_image_path']) - del cert['signatories'][sig_index] - store.update_item(course, request.user.id) - break - - @staticmethod - def track_event(event_name, event_data): - """Track certificate configuration event. - - Arguments: - event_name (str): Name of the event to be logged. - event_data (dict): A Dictionary containing event data - Returns: - None - - """ - event_name = '.'.join(['edx', 'certificate', 'configuration', event_name]) - tracker.emit(event_name, event_data) - - -class Certificate: - """ - The logical representation of an individual course certificate - """ - def __init__(self, course, certificate_data): - """ - Instantiate a Certificate object instance using the provided information. - """ - self.course = course - self._certificate_data = certificate_data - self.id = certificate_data['id'] # pylint: disable=invalid-name - - @property - def certificate_data(self): - """ - Retrieve the locally-stored certificate data from the Certificate object via a helper method - """ - return self._certificate_data - - class ModulestoreMixin: """ Mixin to provide a get_modulestore() method for views. @@ -479,69 +199,49 @@ def certificates_list_handler(request, course_key_string): return HttpResponse(status=406) -@login_required -@ensure_csrf_cookie -@require_http_methods(("POST", "PUT", "DELETE")) -def certificates_detail_handler(request, course_key_string, certificate_id): +class CertificateDetailAPIView(ModulestoreMixin, APIView): """ JSON API endpoint for manipulating a course certificate via its internal identifier. Utilized by the Backbone.js 'certificates' application model - - POST or PUT - json: update the specified certificate based on provided information - DELETE - json: remove the specified certificate from the course """ - course_key = CourseKey.from_string(course_key_string) - course = _get_course_and_check_access(course_key, request.user) - certificates_list = course.certificates.get('certificates', []) - match_index = None - match_cert = None - for index, cert in enumerate(certificates_list): - if certificate_id is not None: - if int(cert['id']) == int(certificate_id): - match_index = index - match_cert = cert + permission_classes = [IsAuthenticated, HasStudioWriteAccess] + serializer_class = CertificateSerializer - store = modulestore() - if request.method in ('POST', 'PUT'): - if certificate_id: - active_certificates = CertificateManager.get_certificates(course, only_active=True) - if int(certificate_id) in [int(certificate["id"]) for certificate in active_certificates]: - # Only global staff (PMs) are able to edit active certificate configuration - if not GlobalStaff().has_user(request.user): - raise PermissionDenied() - try: - new_certificate = CertificateManager.deserialize_certificate(course, request.body) - except CertificateValidationError as err: - return JsonResponse({"error": str(err)}, status=400) + @method_decorator(ensure_csrf_cookie) + def post(self, request, course_key_string, certificate_id=None): + """ + Create a certificate for the specified course based on provided information. + """ + return self._handle_create_or_update(request, course_key_string, certificate_id) - serialized_certificate = CertificateManager.serialize_certificate(new_certificate) - cert_event_type = 'created' - if match_cert: - cert_event_type = 'modified' - certificates_list[match_index] = serialized_certificate - else: - certificates_list.append(serialized_certificate) + @method_decorator(ensure_csrf_cookie) + def put(self, request, course_key_string, certificate_id=None): + """ + Update a certificate for the specified course based on provided information. + """ + return self._handle_create_or_update(request, course_key_string, certificate_id) - store.update_item(course, request.user.id) - CertificateManager.track_event(cert_event_type, { - 'course_id': str(course.id), - 'configuration_id': serialized_certificate["id"] - }) - return JsonResponse(serialized_certificate, status=201) + @method_decorator(ensure_csrf_cookie) + def delete(self, request, course_key_string, certificate_id): + """ + Delete a certificate for the specified course. + """ + course_key = CourseKey.from_string(course_key_string) + course = _get_course_and_check_access(course_key, request.user) + + certificates_list = course.certificates.get('certificates', []) + match_cert = next((cert for cert in certificates_list if int(cert['id']) == int(certificate_id)), None) - elif request.method == "DELETE": if not match_cert: return JsonResponse(status=404) active_certificates = CertificateManager.get_certificates(course, only_active=True) - if int(certificate_id) in [int(certificate["id"]) for certificate in active_certificates]: - # Only global staff (PMs) are able to delete active certificate configuration + if int(certificate_id) in [int(cert["id"]) for cert in active_certificates]: if not GlobalStaff().has_user(request.user): raise PermissionDenied() + store = self.get_modulestore() CertificateManager.remove_certificate( request=request, store=store, @@ -554,6 +254,45 @@ def certificates_detail_handler(request, course_key_string, certificate_id): }) return JsonResponse(status=204) + def _handle_create_or_update(self, request, course_key_string, certificate_id): + """ + Handle the creation or update of a certificate for the specified course.""" + course_key = CourseKey.from_string(course_key_string) + course = self.get_modulestore().get_course(course_key, depth=0) + + certificates_list = course.certificates.get('certificates', []) + match_index = None + match_cert = None + for index, cert in enumerate(certificates_list): + if certificate_id is not None and int(cert['id']) == int(certificate_id): + match_index = index + match_cert = cert + + serializer = CertificateSerializer( + data=request.data, + context={"request": request, "course": course, "certificate_id": certificate_id} + ) + if not serializer.is_valid(): + return JsonResponse({"error": serializer.errors}, status=400) + + new_certificate = serializer.save() + serialized_certificate = CertificateSerializer(new_certificate).data + + cert_event_type = 'created' + if match_cert: + cert_event_type = 'modified' + certificates_list[match_index] = serialized_certificate + else: + certificates_list.append(serialized_certificate) + + store = self.get_modulestore() + store.update_item(course, request.user.id) + CertificateManager.track_event(cert_event_type, { + 'course_id': str(course.id), + 'configuration_id': serialized_certificate["id"] + }) + return JsonResponse(serialized_certificate, status=201) + @login_required @ensure_csrf_cookie diff --git a/cms/djangoapps/contentstore/views/serializers.py b/cms/djangoapps/contentstore/views/serializers.py index c96e716410..71608fb0de 100644 --- a/cms/djangoapps/contentstore/views/serializers.py +++ b/cms/djangoapps/contentstore/views/serializers.py @@ -6,6 +6,13 @@ Add new serializers here as needed for API endpoints in this module. """ from rest_framework import serializers +from django.core.exceptions import PermissionDenied + +from cms.djangoapps.contentstore.views.certificate_manager import ( + CERTIFICATE_SCHEMA_VERSION, + CertificateManager, Certificate, +) +from common.djangoapps.student.roles import GlobalStaff class CertificateActivationSerializer(serializers.Serializer): @@ -14,3 +21,81 @@ class CertificateActivationSerializer(serializers.Serializer): """ # This field indicates whether the certificate should be activated or deactivated. is_active = serializers.BooleanField(required=False, default=False) + + +class SignatorySerializer(serializers.Serializer): + """ + Serializer for signatories in a course certificate. + """ + id = serializers.IntegerField(required=False) + name = serializers.CharField(required=False, allow_blank=True) + title = serializers.CharField(required=False, allow_blank=True) + organization = serializers.CharField(required=False, allow_blank=True) + signature_image_path = serializers.CharField(required=False, allow_blank=True) + certificate = serializers.CharField(required=False, allow_blank=True, allow_null=True) + + +class CertificateSerializer(serializers.Serializer): + """ + Serializer for course certificates. + """ + id = serializers.IntegerField(read_only=True) + version = serializers.IntegerField(default=CERTIFICATE_SCHEMA_VERSION) + name = serializers.CharField(required=True, allow_blank=False) + description = serializers.CharField(required=True, allow_blank=False) + is_active = serializers.BooleanField(default=False) + course_title = serializers.CharField(required=False, allow_blank=True) + + signatories = SignatorySerializer(many=True, required=False, default=list) + + def validate(self, data): + """ + Validate the certificate data. + """ + certificate_id = self.context.get("certificate_id") + course = self.context.get("course") + request = self.context.get("request") + + if certificate_id and course: + active_certificates = CertificateManager.get_certificates(course, only_active=True) + active_ids = [int(cert["id"]) for cert in active_certificates] + + if int(certificate_id) in active_ids: + if not GlobalStaff().has_user(request.user): + raise PermissionDenied() + + return data + + def create(self, validated_data): + """ + Create a new Certificate instance with the provided validated data. + """ + course = self.context.get("course") + certificate_id = self.context.get("certificate_id") + + validated_data = CertificateManager.assign_id(course, validated_data, certificate_id) + return Certificate(course=course, certificate_data=validated_data) + + def update(self, instance, validated_data): + """ + Update an existing Certificate instance with the provided validated data. + """ + instance.certificate_data.update(validated_data) # pylint: disable=protected-access + return instance + + def to_representation(self, instance): + """ + Convert the Certificate instance to a dictionary representation. + """ + data = instance.certificate_data + result = { + "id": data.get("id"), + "name": data.get("name"), + "description": data.get("description"), + "is_active": data.get("is_active", False), + "version": data.get("version", CERTIFICATE_SCHEMA_VERSION), + "signatories": data.get("signatories", []), + } + if data.get("course_title"): + result["course_title"] = data.get("course_title") + return result diff --git a/cms/djangoapps/contentstore/views/tests/test_certificates.py b/cms/djangoapps/contentstore/views/tests/test_certificates.py index f50c3d3f1b..dfbad798f6 100644 --- a/cms/djangoapps/contentstore/views/tests/test_certificates.py +++ b/cms/djangoapps/contentstore/views/tests/test_certificates.py @@ -25,7 +25,7 @@ from xmodule.contentstore.content import StaticContent # lint-amnesty, pylint: from xmodule.contentstore.django import contentstore # lint-amnesty, pylint: disable=wrong-import-order from xmodule.exceptions import NotFoundError # lint-amnesty, pylint: disable=wrong-import-order -from ..certificates import CERTIFICATE_SCHEMA_VERSION, CertificateManager +from ..certificate_manager import CERTIFICATE_SCHEMA_VERSION, CertificateManager FEATURES_WITH_CERTS_ENABLED = settings.FEATURES.copy() FEATURES_WITH_CERTS_ENABLED['CERTIFICATES_HTML_VIEW'] = True @@ -167,7 +167,7 @@ class CertificatesBaseTestCase: self.assertEqual(response.status_code, 400) self.assertNotIn("Location", response) content = json.loads(response.content.decode('utf-8')) - self.assertIn("error", content) + self.assertTrue("error" in content or "detail" in content) def test_certificate_data_validation(self): #Test certificate schema version @@ -200,7 +200,7 @@ class CertificatesBaseTestCase: @ddt.ddt @override_settings(FEATURES=FEATURES_WITH_CERTS_ENABLED) class CertificatesListHandlerTestCase( - EventTestMixin, CourseTestCase, CertificatesBaseTestCase, HelperMethods, UrlResetMixin + EventTestMixin, CourseTestCase, HelperMethods, UrlResetMixin ): """ Test cases for certificates_list_handler. @@ -210,7 +210,7 @@ class CertificatesListHandlerTestCase( """ Set up CertificatesListHandlerTestCase. """ - super().setUp('cms.djangoapps.contentstore.views.certificates.tracker') + super().setUp('cms.djangoapps.contentstore.views.certificate_manager.tracker') self.reset_urls() def _url(self): @@ -238,7 +238,7 @@ class CertificatesListHandlerTestCase( self.assertEqual(response.status_code, 201) self.assertIn("Location", response) content = json.loads(response.content.decode('utf-8')) - certificate_id = self._remove_ids(content) + certificate_id = content.pop("id") self.assertEqual(content, expected) self.assert_event_emitted( 'edx.certificate.configuration.created', @@ -440,7 +440,7 @@ class CertificatesDetailHandlerTestCase( """ Set up CertificatesDetailHandlerTestCase. """ - super().setUp('cms.djangoapps.contentstore.views.certificates.tracker') + super().setUp('cms.djangoapps.contentstore.views.certificate_manager.tracker') self.reset_urls() def _url(self, cid=-1): diff --git a/cms/urls.py b/cms/urls.py index af10c6b619..50ea7b3add 100644 --- a/cms/urls.py +++ b/cms/urls.py @@ -265,9 +265,9 @@ if core_toggles.ENTRANCE_EXAMS.is_enabled(): if settings.FEATURES.get('CERTIFICATES_HTML_VIEW'): from cms.djangoapps.contentstore.views.certificates import ( CertificateActivationAPIView, + CertificateDetailAPIView, + certificates_list_handler, signatory_detail_handler, - certificates_detail_handler, - certificates_list_handler ) urlpatterns += [ @@ -277,7 +277,7 @@ if settings.FEATURES.get('CERTIFICATES_HTML_VIEW'): re_path(r'^certificates/{}/(?P\d+)/signatories/(?P\d+)?$'.format( settings.COURSE_KEY_PATTERN), signatory_detail_handler, name='signatory_detail_handler'), re_path(fr'^certificates/{settings.COURSE_KEY_PATTERN}/(?P\d+)?$', - certificates_detail_handler, name='certificates_detail_handler'), + CertificateDetailAPIView.as_view(), name='certificates_detail_handler'), re_path(fr'^certificates/{settings.COURSE_KEY_PATTERN}$', certificates_list_handler, name='certificates_list_handler') ]