diff --git a/lms/djangoapps/certificates/api.py b/lms/djangoapps/certificates/api.py index c76e34a95b..c93bc85a42 100644 --- a/lms/djangoapps/certificates/api.py +++ b/lms/djangoapps/certificates/api.py @@ -49,6 +49,8 @@ from lms.djangoapps.certificates.utils import ( certificate_status_for_student as _certificate_status_for_student, ) from lms.djangoapps.instructor import access +from lms.djangoapps.utils import _get_key +from opaque_keys.edx.keys import CourseKey from openedx.core.djangoapps.content.course_overviews.api import get_course_overview_or_none from openedx.core.djangoapps.content.course_overviews.models import CourseOverview from xmodule.data import CertificatesDisplayBehaviors # lint-amnesty, pylint: disable=wrong-import-order @@ -920,3 +922,34 @@ def _has_passed_or_is_allowlisted(course, student, course_grade): has_passed = course_grade and course_grade.passed return has_passed or is_allowlisted + + +def invalidate_certificate(user_id, course_key_or_id, source): + """ + Invalidate the user certificate in a given course if it exists and the user is not on the allowlist for this + course run. + + This function is called in services.py and handlers.py within the certificates folder. As of now, + The call in services.py occurs when an exam attempt is rejected in the legacy exams backend, edx-proctoring. + The call in handlers.py is occurs when an exam attempt is rejected in the newer exams backend, edx-exams. + """ + course_key = _get_key(course_key_or_id, CourseKey) + if _is_on_certificate_allowlist(user_id, course_key): + log.info(f'User {user_id} is on the allowlist for {course_key}. The certificate will not be invalidated.') + return False + + try: + generated_certificate = GeneratedCertificate.objects.get( + user=user_id, + course_id=course_key + ) + generated_certificate.invalidate(source=source) + except ObjectDoesNotExist: + log.warning( + 'Invalidation failed because a certificate for user %d in course %s does not exist.', + user_id, + course_key + ) + return False + + return True diff --git a/lms/djangoapps/certificates/handlers.py b/lms/djangoapps/certificates/handlers.py new file mode 100644 index 0000000000..8d45468497 --- /dev/null +++ b/lms/djangoapps/certificates/handlers.py @@ -0,0 +1,28 @@ +""" +Handlers for credits +""" +import logging + +from django.contrib.auth import get_user_model +from django.dispatch import receiver +from openedx_events.learning.signals import EXAM_ATTEMPT_REJECTED + +from lms.djangoapps.certificates.api import invalidate_certificate + +User = get_user_model() + +log = logging.getLogger(__name__) + + +@receiver(EXAM_ATTEMPT_REJECTED) +def handle_exam_attempt_rejected_event(sender, signal, **kwargs): + """ + Consume `EXAM_ATTEMPT_REJECTED` events from the event bus. + Pass the received data to invalidate_certificate in the services.py file in this folder. + """ + event_data = kwargs.get('exam_attempt') + user_data = event_data.student_user + course_key = event_data.course_key + + # Note that the course_key is the same as the course_key_or_id, and is being passed in as the course_key param + invalidate_certificate(user_data.id, course_key, source='exam_event') diff --git a/lms/djangoapps/certificates/services.py b/lms/djangoapps/certificates/services.py index 29ee5a05d3..508bb3ad6d 100644 --- a/lms/djangoapps/certificates/services.py +++ b/lms/djangoapps/certificates/services.py @@ -2,17 +2,7 @@ Certificate service """ - -import logging - -from django.core.exceptions import ObjectDoesNotExist -from opaque_keys.edx.keys import CourseKey - -from lms.djangoapps.certificates.generation_handler import is_on_certificate_allowlist -from lms.djangoapps.certificates.models import GeneratedCertificate -from lms.djangoapps.utils import _get_key - -log = logging.getLogger(__name__) +from lms.djangoapps.certificates.api import invalidate_certificate class CertificateService: @@ -21,27 +11,6 @@ class CertificateService: """ def invalidate_certificate(self, user_id, course_key_or_id): - """ - Invalidate the user certificate in a given course if it exists and the user is not on the allowlist for this - course run. - """ - course_key = _get_key(course_key_or_id, CourseKey) - if is_on_certificate_allowlist(user_id, course_key): - log.info(f'User {user_id} is on the allowlist for {course_key}. The certificate will not be invalidated.') - return False - - try: - generated_certificate = GeneratedCertificate.objects.get( - user=user_id, - course_id=course_key - ) - generated_certificate.invalidate(source='certificate_service') - except ObjectDoesNotExist: - log.warning( - 'Invalidation failed because a certificate for user %d in course %s does not exist.', - user_id, - course_key - ) - return False - - return True + # The original code for this function was moved to this helper function to be call-able + # By both the legacy and current exams backends (edx-proctoring and edx-exams). + return invalidate_certificate(user_id, course_key_or_id, source='certificate_service') diff --git a/lms/djangoapps/certificates/tests/test_handlers.py b/lms/djangoapps/certificates/tests/test_handlers.py new file mode 100644 index 0000000000..241d9500bd --- /dev/null +++ b/lms/djangoapps/certificates/tests/test_handlers.py @@ -0,0 +1,87 @@ +""" +Unit tests for certificates signals +""" +from datetime import datetime, timezone +from unittest import mock +from uuid import uuid4 + +from django.test import TestCase +from opaque_keys.edx.keys import CourseKey, UsageKey +from openedx_events.data import EventsMetadata +from openedx_events.learning.data import ExamAttemptData, UserData, UserPersonalData +from openedx_events.learning.signals import EXAM_ATTEMPT_REJECTED + +from common.djangoapps.student.tests.factories import UserFactory +from lms.djangoapps.certificates.handlers import handle_exam_attempt_rejected_event + + +class ExamCompletionEventBusTests(TestCase): + """ + Tests completion events from the event bus. + """ + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.course_key = CourseKey.from_string('course-v1:edX+TestX+Test_Course') + cls.subsection_id = 'block-v1:edX+TestX+Test_Course+type@sequential+block@subsection' + cls.usage_key = UsageKey.from_string(cls.subsection_id) + cls.student_user = UserFactory( + username='student_user', + ) + + @staticmethod + def _get_exam_event_data(student_user, course_key, usage_key, exam_type, requesting_user=None): + """ create ExamAttemptData object for exam based event """ + if requesting_user: + requesting_user_data = UserData( + id=requesting_user.id, + is_active=True, + pii=None + ) + else: + requesting_user_data = None + + return ExamAttemptData( + student_user=UserData( + id=student_user.id, + is_active=True, + pii=UserPersonalData( + username=student_user.username, + email=student_user.email, + ), + ), + course_key=course_key, + usage_key=usage_key, + requesting_user=requesting_user_data, + exam_type=exam_type, + ) + + @staticmethod + def _get_exam_event_metadata(event_signal): + """ create metadata object for event """ + return EventsMetadata( + event_type=event_signal.event_type, + id=uuid4(), + minorversion=0, + source='openedx/lms/web', + sourcehost='lms.test', + time=datetime.now(timezone.utc) + ) + + @mock.patch('lms.djangoapps.certificates.handlers.invalidate_certificate') + def test_exam_attempt_rejected_event(self, mock_api_function): + """ + Assert that CertificateService api's invalidate_certificate is called upon consuming the event + """ + exam_event_data = self._get_exam_event_data(self.student_user, + self.course_key, + self.usage_key, + exam_type='proctored') + event_metadata = self._get_exam_event_metadata(EXAM_ATTEMPT_REJECTED) + + event_kwargs = { + 'exam_attempt': exam_event_data, + 'metadata': event_metadata + } + handle_exam_attempt_rejected_event(None, EXAM_ATTEMPT_REJECTED, **event_kwargs) + mock_api_function.assert_called_once_with(self.student_user.id, self.course_key, source='exam_event')