refactor: convert function based view to class based
This commit is contained in:
309
cms/djangoapps/contentstore/views/certificate_manager.py
Normal file
309
cms/djangoapps/contentstore/views/certificate_manager.py
Normal file
@@ -0,0 +1,309 @@
|
||||
"""
|
||||
Certificate Manager.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from common.djangoapps.course_modes.models import CourseMode
|
||||
from eventtracking import tracker
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import AssetKey
|
||||
from .assets import delete_asset
|
||||
|
||||
from common.djangoapps.util.db import MYSQL_MAX_INT, generate_int_id
|
||||
|
||||
from ..exceptions import AssetNotFoundException
|
||||
|
||||
CERTIFICATE_SCHEMA_VERSION = 1
|
||||
CERTIFICATE_MINIMUM_ID = 100
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CertificateException(Exception):
|
||||
"""
|
||||
Base exception for Certificates workflows
|
||||
"""
|
||||
pass # lint-amnesty, pylint: disable=unnecessary-pass
|
||||
|
||||
|
||||
class CertificateValidationError(CertificateException):
|
||||
"""
|
||||
An exception raised when certificate information is invalid.
|
||||
"""
|
||||
pass # lint-amnesty, pylint: disable=unnecessary-pass
|
||||
|
||||
|
||||
def _delete_asset(course_key, asset_key_string):
|
||||
"""
|
||||
Internal method used to create asset key from string and
|
||||
remove asset by calling delete_asset method of assets module.
|
||||
"""
|
||||
if asset_key_string:
|
||||
try:
|
||||
asset_key = AssetKey.from_string(asset_key_string)
|
||||
except InvalidKeyError:
|
||||
# remove first slash in asset path
|
||||
# otherwise it generates InvalidKeyError in case of split modulestore
|
||||
if '/' == asset_key_string[0]:
|
||||
asset_key_string = asset_key_string[1:]
|
||||
try:
|
||||
asset_key = AssetKey.from_string(asset_key_string)
|
||||
except InvalidKeyError:
|
||||
# Unable to parse the asset key, log and return
|
||||
LOGGER.info(
|
||||
"In course %r, unable to parse asset key %r, not attempting to delete signatory.",
|
||||
course_key,
|
||||
asset_key_string,
|
||||
)
|
||||
return
|
||||
else:
|
||||
# Unable to parse the asset key, log and return
|
||||
LOGGER.info(
|
||||
"In course %r, unable to parse asset key %r, not attempting to delete signatory.",
|
||||
course_key,
|
||||
asset_key_string,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
delete_asset(course_key, asset_key)
|
||||
# If the asset was not found, it doesn't have to be deleted...
|
||||
except AssetNotFoundException:
|
||||
pass
|
||||
|
||||
|
||||
class CertificateManager:
|
||||
"""
|
||||
The CertificateManager is responsible for storage, retrieval, and manipulation of Certificates
|
||||
Certificates are not stored in the Django ORM, they are a field/setting on the course block
|
||||
"""
|
||||
@staticmethod
|
||||
def parse(json_string):
|
||||
"""
|
||||
Deserialize the provided JSON data into a standard Python object
|
||||
"""
|
||||
try:
|
||||
certificate = json.loads(json_string)
|
||||
except ValueError:
|
||||
raise CertificateValidationError(_("invalid JSON")) # lint-amnesty, pylint: disable=raise-missing-from
|
||||
# Include the data contract version
|
||||
certificate["version"] = CERTIFICATE_SCHEMA_VERSION
|
||||
# Ensure a signatories list is always returned
|
||||
if certificate.get("signatories") is None:
|
||||
certificate["signatories"] = []
|
||||
certificate["editing"] = False
|
||||
return certificate
|
||||
|
||||
@staticmethod
|
||||
def validate(certificate_data):
|
||||
"""
|
||||
Ensure the certificate data contains all of the necessary fields and the values match our rules
|
||||
"""
|
||||
# Ensure the schema version meets our expectations
|
||||
if certificate_data.get("version") != CERTIFICATE_SCHEMA_VERSION:
|
||||
raise TypeError(
|
||||
"Unsupported certificate schema version: {}. Expected version: {}.".format(
|
||||
certificate_data.get("version"),
|
||||
CERTIFICATE_SCHEMA_VERSION
|
||||
)
|
||||
)
|
||||
if not certificate_data.get("name"):
|
||||
raise CertificateValidationError(_("must have name of the certificate"))
|
||||
|
||||
@staticmethod
|
||||
def is_activated(course):
|
||||
"""
|
||||
Returns whether certificates are activated for the given course,
|
||||
along with the certificates.
|
||||
"""
|
||||
is_active = False
|
||||
certificates = None
|
||||
if settings.FEATURES.get('CERTIFICATES_HTML_VIEW', False):
|
||||
certificates = CertificateManager.get_certificates(course)
|
||||
# we are assuming only one certificate in certificates collection.
|
||||
for certificate in certificates:
|
||||
is_active = certificate.get('is_active', False)
|
||||
break
|
||||
return is_active, certificates
|
||||
|
||||
@staticmethod
|
||||
def get_used_ids(course):
|
||||
"""
|
||||
Return a list of certificate identifiers that are already in use for this course
|
||||
"""
|
||||
if not course.certificates or not course.certificates.get('certificates'):
|
||||
return []
|
||||
return [cert['id'] for cert in course.certificates['certificates']]
|
||||
|
||||
@staticmethod
|
||||
def assign_id(course, certificate_data, certificate_id=None):
|
||||
"""
|
||||
Assign an identifier to the provided certificate data.
|
||||
If the caller did not provide an identifier, we autogenerate a unique one for them
|
||||
In addition, we check the certificate's signatories and ensure they also have unique ids
|
||||
"""
|
||||
used_ids = CertificateManager.get_used_ids(course)
|
||||
if certificate_id:
|
||||
certificate_data['id'] = int(certificate_id)
|
||||
else:
|
||||
certificate_data['id'] = generate_int_id(
|
||||
CERTIFICATE_MINIMUM_ID,
|
||||
MYSQL_MAX_INT,
|
||||
used_ids
|
||||
)
|
||||
|
||||
for index, signatory in enumerate(certificate_data['signatories']): # pylint: disable=unused-variable
|
||||
if signatory and not signatory.get('id', False):
|
||||
signatory['id'] = generate_int_id(used_ids=used_ids)
|
||||
used_ids.append(signatory['id'])
|
||||
|
||||
return certificate_data
|
||||
|
||||
@staticmethod
|
||||
def serialize_certificate(certificate):
|
||||
"""
|
||||
Serialize the Certificate object's locally-stored certificate data to a JSON representation
|
||||
We use direct access here for specific keys in order to enforce their presence
|
||||
"""
|
||||
certificate_data = certificate.certificate_data
|
||||
certificate_response = {
|
||||
"id": certificate_data['id'],
|
||||
"name": certificate_data['name'],
|
||||
"description": certificate_data['description'],
|
||||
"is_active": certificate_data['is_active'],
|
||||
"version": CERTIFICATE_SCHEMA_VERSION,
|
||||
"signatories": certificate_data['signatories']
|
||||
}
|
||||
|
||||
# Some keys are not required, such as the title override...
|
||||
if certificate_data.get('course_title'):
|
||||
certificate_response["course_title"] = certificate_data['course_title']
|
||||
|
||||
return certificate_response
|
||||
|
||||
@staticmethod
|
||||
def deserialize_certificate(course, value):
|
||||
"""
|
||||
Deserialize from a JSON representation into a Certificate object.
|
||||
'value' should be either a Certificate instance, or a valid JSON string
|
||||
"""
|
||||
if isinstance(value, bytes):
|
||||
value = value.decode('utf-8')
|
||||
|
||||
# Ensure the schema fieldset meets our expectations
|
||||
for key in ("name", "description", "version"):
|
||||
if key not in value:
|
||||
raise CertificateValidationError(_("Certificate dict {0} missing value key '{1}'").format(value, key))
|
||||
|
||||
# Load up the Certificate data
|
||||
certificate_data = CertificateManager.parse(value)
|
||||
CertificateManager.validate(certificate_data)
|
||||
certificate_data = CertificateManager.assign_id(course, certificate_data, certificate_data.get('id', None))
|
||||
certificate = Certificate(course, certificate_data)
|
||||
|
||||
# Return a new Certificate object instance
|
||||
return certificate
|
||||
|
||||
@staticmethod
|
||||
def get_certificates(course, only_active=False):
|
||||
"""
|
||||
Retrieve the certificates list from the provided course,
|
||||
if `only_active` is True it would skip inactive certificates.
|
||||
"""
|
||||
# The top-level course field is 'certificates', which contains various properties,
|
||||
# including the actual 'certificates' list that we're working with in this context
|
||||
certificates = course.certificates.get('certificates', [])
|
||||
if only_active:
|
||||
certificates = [certificate for certificate in certificates if certificate.get('is_active', False)]
|
||||
return certificates
|
||||
|
||||
@staticmethod
|
||||
def get_course_modes(course):
|
||||
"""
|
||||
Retrieve certificate modes for the given course,
|
||||
including expired modes but excluding audit mode.
|
||||
"""
|
||||
course_modes = [
|
||||
mode.slug for mode in CourseMode.modes_for_course(
|
||||
course=course, include_expired=True
|
||||
) if mode.slug != CourseMode.AUDIT
|
||||
]
|
||||
return course_modes
|
||||
|
||||
@staticmethod
|
||||
def is_enabled(course):
|
||||
"""
|
||||
Is enabled when there is at least one course mode for the given course,
|
||||
including expired modes but excluding audit mode
|
||||
"""
|
||||
course_modes = CertificateManager.get_course_modes(course)
|
||||
return len(course_modes) > 0
|
||||
|
||||
@staticmethod
|
||||
def remove_certificate(request, store, course, certificate_id):
|
||||
"""
|
||||
Remove certificate from the course
|
||||
"""
|
||||
for index, cert in enumerate(course.certificates['certificates']):
|
||||
if int(cert['id']) == int(certificate_id):
|
||||
certificate = course.certificates['certificates'][index]
|
||||
# Remove any signatory assets prior to dropping the entire cert record from the course
|
||||
for sig_index, signatory in enumerate(certificate.get('signatories')): # pylint: disable=unused-variable
|
||||
_delete_asset(course.id, signatory['signature_image_path'])
|
||||
# Now drop the certificate record
|
||||
course.certificates['certificates'].pop(index)
|
||||
store.update_item(course, request.user.id)
|
||||
break
|
||||
|
||||
# pylint-disable: unused-variable
|
||||
@staticmethod
|
||||
def remove_signatory(request, store, course, certificate_id, signatory_id):
|
||||
"""
|
||||
Remove the specified signatory from the provided course certificate
|
||||
"""
|
||||
for cert_index, cert in enumerate(course.certificates['certificates']): # pylint: disable=unused-variable
|
||||
if int(cert['id']) == int(certificate_id):
|
||||
for sig_index, signatory in enumerate(cert.get('signatories')):
|
||||
if int(signatory_id) == int(signatory['id']):
|
||||
_delete_asset(course.id, signatory['signature_image_path'])
|
||||
del cert['signatories'][sig_index]
|
||||
store.update_item(course, request.user.id)
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def track_event(event_name, event_data):
|
||||
"""Track certificate configuration event.
|
||||
|
||||
Arguments:
|
||||
event_name (str): Name of the event to be logged.
|
||||
event_data (dict): A Dictionary containing event data
|
||||
Returns:
|
||||
None
|
||||
|
||||
"""
|
||||
event_name = '.'.join(['edx', 'certificate', 'configuration', event_name])
|
||||
tracker.emit(event_name, event_data)
|
||||
|
||||
|
||||
class Certificate:
|
||||
"""
|
||||
The logical representation of an individual course certificate
|
||||
"""
|
||||
def __init__(self, course, certificate_data):
|
||||
"""
|
||||
Instantiate a Certificate object instance using the provided information.
|
||||
"""
|
||||
self.course = course
|
||||
self._certificate_data = certificate_data
|
||||
self.id = certificate_data['id'] # pylint: disable=invalid-name
|
||||
|
||||
@property
|
||||
def certificate_data(self):
|
||||
"""
|
||||
Retrieve the locally-stored certificate data from the Certificate object via a helper method
|
||||
"""
|
||||
return self._certificate_data
|
||||
@@ -23,10 +23,8 @@ course.certificates: {
|
||||
"""
|
||||
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.http import HttpResponse
|
||||
@@ -39,34 +37,28 @@ from rest_framework.views import APIView
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from rest_framework.response import Response
|
||||
from rest_framework import status
|
||||
from common.djangoapps.course_modes.models import CourseMode
|
||||
from eventtracking import tracker
|
||||
from opaque_keys import InvalidKeyError
|
||||
from opaque_keys.edx.keys import AssetKey, CourseKey
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
|
||||
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin
|
||||
from common.djangoapps.edxmako.shortcuts import render_to_response
|
||||
from common.djangoapps.student.auth import has_studio_write_access
|
||||
from common.djangoapps.student.roles import GlobalStaff
|
||||
from common.djangoapps.util.db import MYSQL_MAX_INT, generate_int_id
|
||||
|
||||
from common.djangoapps.util.json_request import JsonResponse
|
||||
from xmodule.modulestore import EdxJSONEncoder # lint-amnesty, pylint: disable=wrong-import-order
|
||||
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
|
||||
|
||||
from cms.djangoapps.contentstore.views.serializers import CertificateActivationSerializer
|
||||
from cms.djangoapps.contentstore.views.serializers import CertificateActivationSerializer, CertificateSerializer
|
||||
from cms.djangoapps.contentstore.views.permissions import HasStudioWriteAccess
|
||||
|
||||
|
||||
from ..exceptions import AssetNotFoundException
|
||||
from .certificate_manager import CertificateManager
|
||||
from ..toggles import use_new_certificates_page
|
||||
from ..utils import (
|
||||
get_certificates_context,
|
||||
get_certificates_url,
|
||||
reverse_course_url,
|
||||
)
|
||||
from .assets import delete_asset
|
||||
|
||||
CERTIFICATE_SCHEMA_VERSION = 1
|
||||
CERTIFICATE_MINIMUM_ID = 100
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
@@ -83,45 +75,6 @@ def _get_course_and_check_access(course_key, user, depth=0):
|
||||
return course_block
|
||||
|
||||
|
||||
def _delete_asset(course_key, asset_key_string):
|
||||
"""
|
||||
Internal method used to create asset key from string and
|
||||
remove asset by calling delete_asset method of assets module.
|
||||
"""
|
||||
if asset_key_string:
|
||||
try:
|
||||
asset_key = AssetKey.from_string(asset_key_string)
|
||||
except InvalidKeyError:
|
||||
# remove first slash in asset path
|
||||
# otherwise it generates InvalidKeyError in case of split modulestore
|
||||
if '/' == asset_key_string[0]:
|
||||
asset_key_string = asset_key_string[1:]
|
||||
try:
|
||||
asset_key = AssetKey.from_string(asset_key_string)
|
||||
except InvalidKeyError:
|
||||
# Unable to parse the asset key, log and return
|
||||
LOGGER.info(
|
||||
"In course %r, unable to parse asset key %r, not attempting to delete signatory.",
|
||||
course_key,
|
||||
asset_key_string,
|
||||
)
|
||||
return
|
||||
else:
|
||||
# Unable to parse the asset key, log and return
|
||||
LOGGER.info(
|
||||
"In course %r, unable to parse asset key %r, not attempting to delete signatory.",
|
||||
course_key,
|
||||
asset_key_string,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
delete_asset(course_key, asset_key)
|
||||
# If the asset was not found, it doesn't have to be deleted...
|
||||
except AssetNotFoundException:
|
||||
pass
|
||||
|
||||
|
||||
# Certificates Exceptions
|
||||
class CertificateException(Exception):
|
||||
"""
|
||||
@@ -137,239 +90,6 @@ class CertificateValidationError(CertificateException):
|
||||
pass # lint-amnesty, pylint: disable=unnecessary-pass
|
||||
|
||||
|
||||
class CertificateManager:
|
||||
"""
|
||||
The CertificateManager is responsible for storage, retrieval, and manipulation of Certificates
|
||||
Certificates are not stored in the Django ORM, they are a field/setting on the course block
|
||||
"""
|
||||
@staticmethod
|
||||
def parse(json_string):
|
||||
"""
|
||||
Deserialize the provided JSON data into a standard Python object
|
||||
"""
|
||||
try:
|
||||
certificate = json.loads(json_string)
|
||||
except ValueError:
|
||||
raise CertificateValidationError(_("invalid JSON")) # lint-amnesty, pylint: disable=raise-missing-from
|
||||
# Include the data contract version
|
||||
certificate["version"] = CERTIFICATE_SCHEMA_VERSION
|
||||
# Ensure a signatories list is always returned
|
||||
if certificate.get("signatories") is None:
|
||||
certificate["signatories"] = []
|
||||
certificate["editing"] = False
|
||||
return certificate
|
||||
|
||||
@staticmethod
|
||||
def validate(certificate_data):
|
||||
"""
|
||||
Ensure the certificate data contains all of the necessary fields and the values match our rules
|
||||
"""
|
||||
# Ensure the schema version meets our expectations
|
||||
if certificate_data.get("version") != CERTIFICATE_SCHEMA_VERSION:
|
||||
raise TypeError(
|
||||
"Unsupported certificate schema version: {}. Expected version: {}.".format(
|
||||
certificate_data.get("version"),
|
||||
CERTIFICATE_SCHEMA_VERSION
|
||||
)
|
||||
)
|
||||
if not certificate_data.get("name"):
|
||||
raise CertificateValidationError(_("must have name of the certificate"))
|
||||
|
||||
@staticmethod
|
||||
def is_activated(course):
|
||||
"""
|
||||
Returns whether certificates are activated for the given course,
|
||||
along with the certificates.
|
||||
"""
|
||||
is_active = False
|
||||
certificates = None
|
||||
if settings.FEATURES.get('CERTIFICATES_HTML_VIEW', False):
|
||||
certificates = CertificateManager.get_certificates(course)
|
||||
# we are assuming only one certificate in certificates collection.
|
||||
for certificate in certificates:
|
||||
is_active = certificate.get('is_active', False)
|
||||
break
|
||||
return is_active, certificates
|
||||
|
||||
@staticmethod
|
||||
def get_used_ids(course):
|
||||
"""
|
||||
Return a list of certificate identifiers that are already in use for this course
|
||||
"""
|
||||
if not course.certificates or not course.certificates.get('certificates'):
|
||||
return []
|
||||
return [cert['id'] for cert in course.certificates['certificates']]
|
||||
|
||||
@staticmethod
|
||||
def assign_id(course, certificate_data, certificate_id=None):
|
||||
"""
|
||||
Assign an identifier to the provided certificate data.
|
||||
If the caller did not provide an identifier, we autogenerate a unique one for them
|
||||
In addition, we check the certificate's signatories and ensure they also have unique ids
|
||||
"""
|
||||
used_ids = CertificateManager.get_used_ids(course)
|
||||
if certificate_id:
|
||||
certificate_data['id'] = int(certificate_id)
|
||||
else:
|
||||
certificate_data['id'] = generate_int_id(
|
||||
CERTIFICATE_MINIMUM_ID,
|
||||
MYSQL_MAX_INT,
|
||||
used_ids
|
||||
)
|
||||
|
||||
for index, signatory in enumerate(certificate_data['signatories']): # pylint: disable=unused-variable
|
||||
if signatory and not signatory.get('id', False):
|
||||
signatory['id'] = generate_int_id(used_ids=used_ids)
|
||||
used_ids.append(signatory['id'])
|
||||
|
||||
return certificate_data
|
||||
|
||||
@staticmethod
|
||||
def serialize_certificate(certificate):
|
||||
"""
|
||||
Serialize the Certificate object's locally-stored certificate data to a JSON representation
|
||||
We use direct access here for specific keys in order to enforce their presence
|
||||
"""
|
||||
certificate_data = certificate.certificate_data
|
||||
certificate_response = {
|
||||
"id": certificate_data['id'],
|
||||
"name": certificate_data['name'],
|
||||
"description": certificate_data['description'],
|
||||
"is_active": certificate_data['is_active'],
|
||||
"version": CERTIFICATE_SCHEMA_VERSION,
|
||||
"signatories": certificate_data['signatories']
|
||||
}
|
||||
|
||||
# Some keys are not required, such as the title override...
|
||||
if certificate_data.get('course_title'):
|
||||
certificate_response["course_title"] = certificate_data['course_title']
|
||||
|
||||
return certificate_response
|
||||
|
||||
@staticmethod
|
||||
def deserialize_certificate(course, value):
|
||||
"""
|
||||
Deserialize from a JSON representation into a Certificate object.
|
||||
'value' should be either a Certificate instance, or a valid JSON string
|
||||
"""
|
||||
if isinstance(value, bytes):
|
||||
value = value.decode('utf-8')
|
||||
|
||||
# Ensure the schema fieldset meets our expectations
|
||||
for key in ("name", "description", "version"):
|
||||
if key not in value:
|
||||
raise CertificateValidationError(_("Certificate dict {0} missing value key '{1}'").format(value, key))
|
||||
|
||||
# Load up the Certificate data
|
||||
certificate_data = CertificateManager.parse(value)
|
||||
CertificateManager.validate(certificate_data)
|
||||
certificate_data = CertificateManager.assign_id(course, certificate_data, certificate_data.get('id', None))
|
||||
certificate = Certificate(course, certificate_data)
|
||||
|
||||
# Return a new Certificate object instance
|
||||
return certificate
|
||||
|
||||
@staticmethod
|
||||
def get_certificates(course, only_active=False):
|
||||
"""
|
||||
Retrieve the certificates list from the provided course,
|
||||
if `only_active` is True it would skip inactive certificates.
|
||||
"""
|
||||
# The top-level course field is 'certificates', which contains various properties,
|
||||
# including the actual 'certificates' list that we're working with in this context
|
||||
certificates = course.certificates.get('certificates', [])
|
||||
if only_active:
|
||||
certificates = [certificate for certificate in certificates if certificate.get('is_active', False)]
|
||||
return certificates
|
||||
|
||||
@staticmethod
|
||||
def get_course_modes(course):
|
||||
"""
|
||||
Retrieve certificate modes for the given course,
|
||||
including expired modes but excluding audit mode.
|
||||
"""
|
||||
course_modes = [
|
||||
mode.slug for mode in CourseMode.modes_for_course(
|
||||
course=course, include_expired=True
|
||||
) if mode.slug != CourseMode.AUDIT
|
||||
]
|
||||
return course_modes
|
||||
|
||||
@staticmethod
|
||||
def is_enabled(course):
|
||||
"""
|
||||
Is enabled when there is at least one course mode for the given course,
|
||||
including expired modes but excluding audit mode
|
||||
"""
|
||||
course_modes = CertificateManager.get_course_modes(course)
|
||||
return len(course_modes) > 0
|
||||
|
||||
@staticmethod
|
||||
def remove_certificate(request, store, course, certificate_id):
|
||||
"""
|
||||
Remove certificate from the course
|
||||
"""
|
||||
for index, cert in enumerate(course.certificates['certificates']):
|
||||
if int(cert['id']) == int(certificate_id):
|
||||
certificate = course.certificates['certificates'][index]
|
||||
# Remove any signatory assets prior to dropping the entire cert record from the course
|
||||
for sig_index, signatory in enumerate(certificate.get('signatories')): # pylint: disable=unused-variable
|
||||
_delete_asset(course.id, signatory['signature_image_path'])
|
||||
# Now drop the certificate record
|
||||
course.certificates['certificates'].pop(index)
|
||||
store.update_item(course, request.user.id)
|
||||
break
|
||||
|
||||
# pylint-disable: unused-variable
|
||||
@staticmethod
|
||||
def remove_signatory(request, store, course, certificate_id, signatory_id):
|
||||
"""
|
||||
Remove the specified signatory from the provided course certificate
|
||||
"""
|
||||
for cert_index, cert in enumerate(course.certificates['certificates']): # pylint: disable=unused-variable
|
||||
if int(cert['id']) == int(certificate_id):
|
||||
for sig_index, signatory in enumerate(cert.get('signatories')):
|
||||
if int(signatory_id) == int(signatory['id']):
|
||||
_delete_asset(course.id, signatory['signature_image_path'])
|
||||
del cert['signatories'][sig_index]
|
||||
store.update_item(course, request.user.id)
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def track_event(event_name, event_data):
|
||||
"""Track certificate configuration event.
|
||||
|
||||
Arguments:
|
||||
event_name (str): Name of the event to be logged.
|
||||
event_data (dict): A Dictionary containing event data
|
||||
Returns:
|
||||
None
|
||||
|
||||
"""
|
||||
event_name = '.'.join(['edx', 'certificate', 'configuration', event_name])
|
||||
tracker.emit(event_name, event_data)
|
||||
|
||||
|
||||
class Certificate:
|
||||
"""
|
||||
The logical representation of an individual course certificate
|
||||
"""
|
||||
def __init__(self, course, certificate_data):
|
||||
"""
|
||||
Instantiate a Certificate object instance using the provided information.
|
||||
"""
|
||||
self.course = course
|
||||
self._certificate_data = certificate_data
|
||||
self.id = certificate_data['id'] # pylint: disable=invalid-name
|
||||
|
||||
@property
|
||||
def certificate_data(self):
|
||||
"""
|
||||
Retrieve the locally-stored certificate data from the Certificate object via a helper method
|
||||
"""
|
||||
return self._certificate_data
|
||||
|
||||
|
||||
class ModulestoreMixin:
|
||||
"""
|
||||
Mixin to provide a get_modulestore() method for views.
|
||||
@@ -479,69 +199,49 @@ def certificates_list_handler(request, course_key_string):
|
||||
return HttpResponse(status=406)
|
||||
|
||||
|
||||
@login_required
|
||||
@ensure_csrf_cookie
|
||||
@require_http_methods(("POST", "PUT", "DELETE"))
|
||||
def certificates_detail_handler(request, course_key_string, certificate_id):
|
||||
class CertificateDetailAPIView(ModulestoreMixin, APIView):
|
||||
"""
|
||||
JSON API endpoint for manipulating a course certificate via its internal identifier.
|
||||
Utilized by the Backbone.js 'certificates' application model
|
||||
|
||||
POST or PUT
|
||||
json: update the specified certificate based on provided information
|
||||
DELETE
|
||||
json: remove the specified certificate from the course
|
||||
"""
|
||||
course_key = CourseKey.from_string(course_key_string)
|
||||
course = _get_course_and_check_access(course_key, request.user)
|
||||
|
||||
certificates_list = course.certificates.get('certificates', [])
|
||||
match_index = None
|
||||
match_cert = None
|
||||
for index, cert in enumerate(certificates_list):
|
||||
if certificate_id is not None:
|
||||
if int(cert['id']) == int(certificate_id):
|
||||
match_index = index
|
||||
match_cert = cert
|
||||
permission_classes = [IsAuthenticated, HasStudioWriteAccess]
|
||||
serializer_class = CertificateSerializer
|
||||
|
||||
store = modulestore()
|
||||
if request.method in ('POST', 'PUT'):
|
||||
if certificate_id:
|
||||
active_certificates = CertificateManager.get_certificates(course, only_active=True)
|
||||
if int(certificate_id) in [int(certificate["id"]) for certificate in active_certificates]:
|
||||
# Only global staff (PMs) are able to edit active certificate configuration
|
||||
if not GlobalStaff().has_user(request.user):
|
||||
raise PermissionDenied()
|
||||
try:
|
||||
new_certificate = CertificateManager.deserialize_certificate(course, request.body)
|
||||
except CertificateValidationError as err:
|
||||
return JsonResponse({"error": str(err)}, status=400)
|
||||
@method_decorator(ensure_csrf_cookie)
|
||||
def post(self, request, course_key_string, certificate_id=None):
|
||||
"""
|
||||
Create a certificate for the specified course based on provided information.
|
||||
"""
|
||||
return self._handle_create_or_update(request, course_key_string, certificate_id)
|
||||
|
||||
serialized_certificate = CertificateManager.serialize_certificate(new_certificate)
|
||||
cert_event_type = 'created'
|
||||
if match_cert:
|
||||
cert_event_type = 'modified'
|
||||
certificates_list[match_index] = serialized_certificate
|
||||
else:
|
||||
certificates_list.append(serialized_certificate)
|
||||
@method_decorator(ensure_csrf_cookie)
|
||||
def put(self, request, course_key_string, certificate_id=None):
|
||||
"""
|
||||
Update a certificate for the specified course based on provided information.
|
||||
"""
|
||||
return self._handle_create_or_update(request, course_key_string, certificate_id)
|
||||
|
||||
store.update_item(course, request.user.id)
|
||||
CertificateManager.track_event(cert_event_type, {
|
||||
'course_id': str(course.id),
|
||||
'configuration_id': serialized_certificate["id"]
|
||||
})
|
||||
return JsonResponse(serialized_certificate, status=201)
|
||||
@method_decorator(ensure_csrf_cookie)
|
||||
def delete(self, request, course_key_string, certificate_id):
|
||||
"""
|
||||
Delete a certificate for the specified course.
|
||||
"""
|
||||
course_key = CourseKey.from_string(course_key_string)
|
||||
course = _get_course_and_check_access(course_key, request.user)
|
||||
|
||||
certificates_list = course.certificates.get('certificates', [])
|
||||
match_cert = next((cert for cert in certificates_list if int(cert['id']) == int(certificate_id)), None)
|
||||
|
||||
elif request.method == "DELETE":
|
||||
if not match_cert:
|
||||
return JsonResponse(status=404)
|
||||
|
||||
active_certificates = CertificateManager.get_certificates(course, only_active=True)
|
||||
if int(certificate_id) in [int(certificate["id"]) for certificate in active_certificates]:
|
||||
# Only global staff (PMs) are able to delete active certificate configuration
|
||||
if int(certificate_id) in [int(cert["id"]) for cert in active_certificates]:
|
||||
if not GlobalStaff().has_user(request.user):
|
||||
raise PermissionDenied()
|
||||
|
||||
store = self.get_modulestore()
|
||||
CertificateManager.remove_certificate(
|
||||
request=request,
|
||||
store=store,
|
||||
@@ -554,6 +254,45 @@ def certificates_detail_handler(request, course_key_string, certificate_id):
|
||||
})
|
||||
return JsonResponse(status=204)
|
||||
|
||||
def _handle_create_or_update(self, request, course_key_string, certificate_id):
|
||||
"""
|
||||
Handle the creation or update of a certificate for the specified course."""
|
||||
course_key = CourseKey.from_string(course_key_string)
|
||||
course = self.get_modulestore().get_course(course_key, depth=0)
|
||||
|
||||
certificates_list = course.certificates.get('certificates', [])
|
||||
match_index = None
|
||||
match_cert = None
|
||||
for index, cert in enumerate(certificates_list):
|
||||
if certificate_id is not None and int(cert['id']) == int(certificate_id):
|
||||
match_index = index
|
||||
match_cert = cert
|
||||
|
||||
serializer = CertificateSerializer(
|
||||
data=request.data,
|
||||
context={"request": request, "course": course, "certificate_id": certificate_id}
|
||||
)
|
||||
if not serializer.is_valid():
|
||||
return JsonResponse({"error": serializer.errors}, status=400)
|
||||
|
||||
new_certificate = serializer.save()
|
||||
serialized_certificate = CertificateSerializer(new_certificate).data
|
||||
|
||||
cert_event_type = 'created'
|
||||
if match_cert:
|
||||
cert_event_type = 'modified'
|
||||
certificates_list[match_index] = serialized_certificate
|
||||
else:
|
||||
certificates_list.append(serialized_certificate)
|
||||
|
||||
store = self.get_modulestore()
|
||||
store.update_item(course, request.user.id)
|
||||
CertificateManager.track_event(cert_event_type, {
|
||||
'course_id': str(course.id),
|
||||
'configuration_id': serialized_certificate["id"]
|
||||
})
|
||||
return JsonResponse(serialized_certificate, status=201)
|
||||
|
||||
|
||||
@login_required
|
||||
@ensure_csrf_cookie
|
||||
|
||||
@@ -6,6 +6,13 @@ Add new serializers here as needed for API endpoints in this module.
|
||||
"""
|
||||
|
||||
from rest_framework import serializers
|
||||
from django.core.exceptions import PermissionDenied
|
||||
|
||||
from cms.djangoapps.contentstore.views.certificate_manager import (
|
||||
CERTIFICATE_SCHEMA_VERSION,
|
||||
CertificateManager, Certificate,
|
||||
)
|
||||
from common.djangoapps.student.roles import GlobalStaff
|
||||
|
||||
|
||||
class CertificateActivationSerializer(serializers.Serializer):
|
||||
@@ -14,3 +21,81 @@ class CertificateActivationSerializer(serializers.Serializer):
|
||||
"""
|
||||
# This field indicates whether the certificate should be activated or deactivated.
|
||||
is_active = serializers.BooleanField(required=False, default=False)
|
||||
|
||||
|
||||
class SignatorySerializer(serializers.Serializer):
|
||||
"""
|
||||
Serializer for signatories in a course certificate.
|
||||
"""
|
||||
id = serializers.IntegerField(required=False)
|
||||
name = serializers.CharField(required=False, allow_blank=True)
|
||||
title = serializers.CharField(required=False, allow_blank=True)
|
||||
organization = serializers.CharField(required=False, allow_blank=True)
|
||||
signature_image_path = serializers.CharField(required=False, allow_blank=True)
|
||||
certificate = serializers.CharField(required=False, allow_blank=True, allow_null=True)
|
||||
|
||||
|
||||
class CertificateSerializer(serializers.Serializer):
|
||||
"""
|
||||
Serializer for course certificates.
|
||||
"""
|
||||
id = serializers.IntegerField(read_only=True)
|
||||
version = serializers.IntegerField(default=CERTIFICATE_SCHEMA_VERSION)
|
||||
name = serializers.CharField(required=True, allow_blank=False)
|
||||
description = serializers.CharField(required=True, allow_blank=False)
|
||||
is_active = serializers.BooleanField(default=False)
|
||||
course_title = serializers.CharField(required=False, allow_blank=True)
|
||||
|
||||
signatories = SignatorySerializer(many=True, required=False, default=list)
|
||||
|
||||
def validate(self, data):
|
||||
"""
|
||||
Validate the certificate data.
|
||||
"""
|
||||
certificate_id = self.context.get("certificate_id")
|
||||
course = self.context.get("course")
|
||||
request = self.context.get("request")
|
||||
|
||||
if certificate_id and course:
|
||||
active_certificates = CertificateManager.get_certificates(course, only_active=True)
|
||||
active_ids = [int(cert["id"]) for cert in active_certificates]
|
||||
|
||||
if int(certificate_id) in active_ids:
|
||||
if not GlobalStaff().has_user(request.user):
|
||||
raise PermissionDenied()
|
||||
|
||||
return data
|
||||
|
||||
def create(self, validated_data):
|
||||
"""
|
||||
Create a new Certificate instance with the provided validated data.
|
||||
"""
|
||||
course = self.context.get("course")
|
||||
certificate_id = self.context.get("certificate_id")
|
||||
|
||||
validated_data = CertificateManager.assign_id(course, validated_data, certificate_id)
|
||||
return Certificate(course=course, certificate_data=validated_data)
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
"""
|
||||
Update an existing Certificate instance with the provided validated data.
|
||||
"""
|
||||
instance.certificate_data.update(validated_data) # pylint: disable=protected-access
|
||||
return instance
|
||||
|
||||
def to_representation(self, instance):
|
||||
"""
|
||||
Convert the Certificate instance to a dictionary representation.
|
||||
"""
|
||||
data = instance.certificate_data
|
||||
result = {
|
||||
"id": data.get("id"),
|
||||
"name": data.get("name"),
|
||||
"description": data.get("description"),
|
||||
"is_active": data.get("is_active", False),
|
||||
"version": data.get("version", CERTIFICATE_SCHEMA_VERSION),
|
||||
"signatories": data.get("signatories", []),
|
||||
}
|
||||
if data.get("course_title"):
|
||||
result["course_title"] = data.get("course_title")
|
||||
return result
|
||||
|
||||
@@ -25,7 +25,7 @@ from xmodule.contentstore.content import StaticContent # lint-amnesty, pylint:
|
||||
from xmodule.contentstore.django import contentstore # lint-amnesty, pylint: disable=wrong-import-order
|
||||
from xmodule.exceptions import NotFoundError # lint-amnesty, pylint: disable=wrong-import-order
|
||||
|
||||
from ..certificates import CERTIFICATE_SCHEMA_VERSION, CertificateManager
|
||||
from ..certificate_manager import CERTIFICATE_SCHEMA_VERSION, CertificateManager
|
||||
|
||||
FEATURES_WITH_CERTS_ENABLED = settings.FEATURES.copy()
|
||||
FEATURES_WITH_CERTS_ENABLED['CERTIFICATES_HTML_VIEW'] = True
|
||||
@@ -167,7 +167,7 @@ class CertificatesBaseTestCase:
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertNotIn("Location", response)
|
||||
content = json.loads(response.content.decode('utf-8'))
|
||||
self.assertIn("error", content)
|
||||
self.assertTrue("error" in content or "detail" in content)
|
||||
|
||||
def test_certificate_data_validation(self):
|
||||
#Test certificate schema version
|
||||
@@ -200,7 +200,7 @@ class CertificatesBaseTestCase:
|
||||
@ddt.ddt
|
||||
@override_settings(FEATURES=FEATURES_WITH_CERTS_ENABLED)
|
||||
class CertificatesListHandlerTestCase(
|
||||
EventTestMixin, CourseTestCase, CertificatesBaseTestCase, HelperMethods, UrlResetMixin
|
||||
EventTestMixin, CourseTestCase, HelperMethods, UrlResetMixin
|
||||
):
|
||||
"""
|
||||
Test cases for certificates_list_handler.
|
||||
@@ -210,7 +210,7 @@ class CertificatesListHandlerTestCase(
|
||||
"""
|
||||
Set up CertificatesListHandlerTestCase.
|
||||
"""
|
||||
super().setUp('cms.djangoapps.contentstore.views.certificates.tracker')
|
||||
super().setUp('cms.djangoapps.contentstore.views.certificate_manager.tracker')
|
||||
self.reset_urls()
|
||||
|
||||
def _url(self):
|
||||
@@ -238,7 +238,7 @@ class CertificatesListHandlerTestCase(
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertIn("Location", response)
|
||||
content = json.loads(response.content.decode('utf-8'))
|
||||
certificate_id = self._remove_ids(content)
|
||||
certificate_id = content.pop("id")
|
||||
self.assertEqual(content, expected)
|
||||
self.assert_event_emitted(
|
||||
'edx.certificate.configuration.created',
|
||||
@@ -440,7 +440,7 @@ class CertificatesDetailHandlerTestCase(
|
||||
"""
|
||||
Set up CertificatesDetailHandlerTestCase.
|
||||
"""
|
||||
super().setUp('cms.djangoapps.contentstore.views.certificates.tracker')
|
||||
super().setUp('cms.djangoapps.contentstore.views.certificate_manager.tracker')
|
||||
self.reset_urls()
|
||||
|
||||
def _url(self, cid=-1):
|
||||
|
||||
@@ -265,9 +265,9 @@ if core_toggles.ENTRANCE_EXAMS.is_enabled():
|
||||
if settings.FEATURES.get('CERTIFICATES_HTML_VIEW'):
|
||||
from cms.djangoapps.contentstore.views.certificates import (
|
||||
CertificateActivationAPIView,
|
||||
CertificateDetailAPIView,
|
||||
certificates_list_handler,
|
||||
signatory_detail_handler,
|
||||
certificates_detail_handler,
|
||||
certificates_list_handler
|
||||
)
|
||||
|
||||
urlpatterns += [
|
||||
@@ -277,7 +277,7 @@ if settings.FEATURES.get('CERTIFICATES_HTML_VIEW'):
|
||||
re_path(r'^certificates/{}/(?P<certificate_id>\d+)/signatories/(?P<signatory_id>\d+)?$'.format(
|
||||
settings.COURSE_KEY_PATTERN), signatory_detail_handler, name='signatory_detail_handler'),
|
||||
re_path(fr'^certificates/{settings.COURSE_KEY_PATTERN}/(?P<certificate_id>\d+)?$',
|
||||
certificates_detail_handler, name='certificates_detail_handler'),
|
||||
CertificateDetailAPIView.as_view(), name='certificates_detail_handler'),
|
||||
re_path(fr'^certificates/{settings.COURSE_KEY_PATTERN}$',
|
||||
certificates_list_handler, name='certificates_list_handler')
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user