Files
Daniel Wong cd6faeb966 Follow-up to PR 36789 (#37751)
* refactor(certificates): replace direct model imports with data classes and APIs

* fix: use Certificates API to create certificates

* docs: update docstring for get_certificate_for_user

* fix: remove trailing whitespace

---------

Co-authored-by: coder1918 <ram.chandra@wgu.edu>
Co-authored-by: Deborah Kaplan <deborahgu@users.noreply.github.com>
2026-01-08 13:03:46 -05:00

793 lines
34 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
This file contains celery tasks and utility functions responsible for syncing course and program certificate metadata
between the monolith and the Credentials IDA.
"""
from typing import TYPE_CHECKING, Dict, List, Optional
from urllib.parse import urljoin
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
from celery.utils.log import get_task_logger
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.sites.models import Site
from django.core.exceptions import ObjectDoesNotExist
from edx_django_utils.monitoring import set_code_owner_attribute
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from requests.exceptions import HTTPError
from common.djangoapps.course_modes.models import CourseMode
from lms.djangoapps.certificates.data import GeneratedCertificateData
from lms.djangoapps.certificates.api import get_eligible_certificate
from openedx.core.djangoapps.content.course_overviews.api import get_course_overview_or_none
from openedx.core.djangoapps.credentials.api import is_credentials_enabled
from openedx.core.djangoapps.credentials.utils import (
get_credentials,
get_credentials_api_base_url,
get_credentials_api_client,
)
from openedx.core.djangoapps.programs.utils import ProgramProgressMeter
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
from xmodule.data import CertificatesDisplayBehaviors
if TYPE_CHECKING:
from datetime import datetime
from django.contrib.auth.models import User as UserType # pylint: disable=imported-auth-user
from requests import Session
User = get_user_model()
LOGGER = get_task_logger(__name__)
# Maximum number of retries before giving up on awarding credentials. For reference, 11 retries with exponential backoff
# yields a maximum waiting time of 2047 seconds (about 30 minutes). Setting this to None could yield unwanted behavior:
# infinite retries.
MAX_RETRIES = 11
PROGRAM_CERTIFICATE = "program"
COURSE_CERTIFICATE = "course-run"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
def get_completed_programs(site: Site, student: "UserType") -> Dict:
"""
Given a set of completed courses, determine which programs are completed.
Args:
site (Site): Site for which data should be retrieved.
student (User): Representing the student whose completed programs to check for.
Returns:
Dict of program_UUIDs:availability dates
"""
meter = ProgramProgressMeter(site, student)
return meter.completed_programs_with_available_dates
def get_inverted_programs(student: "UserType"):
"""
Get programs keyed by course run ID.
Args:
student (User): Representing the student whose programs to check for.
Returns:
dict, programs keyed by course run ID
"""
inverted_programs = {}
for site in Site.objects.all():
meter = ProgramProgressMeter(site, student)
inverted_programs.update(meter.invert_programs())
return inverted_programs
def get_certified_programs(student: "UserType", raise_on_error: bool = False) -> List[str]:
"""
Find the UUIDs of all the programs for which the student has already been awarded
a certificate.
Args:
student: User object representing the student
Keyword Arguments:
raise_on_error (bool): Reraise errors back to the caller, instead of returning empty results.
Returns:
str[]: UUIDs of the programs for which the student has been awarded a certificate
"""
certified_programs = []
for credential in get_credentials(
student,
credential_type=PROGRAM_CERTIFICATE,
raise_on_error=raise_on_error,
):
certified_programs.append(credential["credential"]["program_uuid"])
return certified_programs
def get_revokable_program_uuids(course_specific_programs: List[Dict], student: "UserType") -> List[str]:
"""
Get program uuids for which certificate to be revoked.
Checks for existing learner certificates and filter out the program UUIDS
for which a certificate needs to be revoked.
Args:
course_specific_programs (dict[]): list of programs specific to a course
student (User): Representing the student whose programs to check for.
Returns:
list of program UUIDs for which certificates to be revoked
Raises:
HttpError, if the API call generated by get_certified_programs fails
"""
program_uuids_to_revoke = []
# Get any programs where the user has already been rewarded a certificate
# Failed API calls with get_certified_programs should raise exceptions,
# because an empty response would dangerously imply a false negative.
existing_program_uuids = get_certified_programs(student, raise_on_error=True)
for program in course_specific_programs:
if program["uuid"] in existing_program_uuids:
program_uuids_to_revoke.append(program["uuid"])
return program_uuids_to_revoke
def award_program_certificate(client: "Session", user: "UserType", program_uuid: "str") -> None:
"""
Issue a new certificate of completion to the given student for the given program.
Args:
client:
credentials API client (requests.Session)
user:
The student's user data
program_uuid:
uuid of the completed program
Returns:
None
"""
credentials_api_base_url = get_credentials_api_base_url()
api_url = urljoin(f"{credentials_api_base_url}/", "credentials/")
response = client.post(
api_url,
json={
"username": user.username,
"lms_user_id": user.id,
"credential": {"type": PROGRAM_CERTIFICATE, "program_uuid": program_uuid},
},
)
response.raise_for_status()
def revoke_program_certificate(client, username, program_uuid):
"""
Make a request to the Credentials IDA, requesting the system to revoke a program certificate from the given user in
a given program.
Args:
client: credentials API client (requests.Session)
username: The username of the student
program_uuid: uuid of the program
Returns:
None
"""
credentials_api_base_url = get_credentials_api_base_url()
api_url = urljoin(f"{credentials_api_base_url}/", "credentials/")
response = client.post(
api_url,
json={
"username": username,
"status": "revoked",
"credential": {"type": PROGRAM_CERTIFICATE, "program_uuid": program_uuid},
},
)
response.raise_for_status()
def post_course_certificate(
client: "Session",
username: str,
certificate: GeneratedCertificateData,
date_override: Optional["datetime"] = None,
org: Optional[str] = None,
):
"""
POST a certificate that has been updated to Credentials
"""
credentials_api_base_url = get_credentials_api_base_url(org)
api_url = urljoin(f"{credentials_api_base_url}/", "credentials/")
response = client.post(
api_url,
json={
"username": username,
"status": "awarded" if certificate.is_valid() else "revoked", # Only need the two options at this time
"credential": {
"course_run_key": str(certificate.course_id),
"mode": certificate.mode,
"type": COURSE_CERTIFICATE,
},
"date_override": {"date": date_override.strftime(DATE_FORMAT)} if date_override else None,
},
)
response.raise_for_status()
def post_course_certificate_configuration(client, cert_config, certificate_available_date=None):
"""
Make a POST request to the Credentials IDA's `course_certificates` endpoint (/api/v2/course_certificates/). This
endpoint manages the course certificate configurations within the Credentials IDA.
Args:
client(Session): An authenticated Credentials API Client
cert_config(Dict): A dictionary containing course metadata (course-run key and mode as Strings) important to the
Course Certificate Configuration.
certificate_available_date(Str): The desired Certificate Available Date for the Course Certificate Configuration
in the form of an ISO 8601 DateTime String.
"""
credentials_api_base_url = get_credentials_api_base_url()
credentials_api_url = urljoin(f"{credentials_api_base_url}/", "course_certificates/")
certificate_config = {
"course_id": cert_config["course_id"],
"certificate_type": cert_config["mode"],
"certificate_available_date": certificate_available_date,
"is_active": True,
}
response = client.post(
credentials_api_url,
json=certificate_config,
)
# Sometimes helpful error context is swallowed when calling `raise_for_status()`. We try to print out any additional
# error details here in the hope that it will save someone time when debugging an issue.
#
# Also... even though this endpoint does an `update_or_create()` on the Credentials side, it always passes back a
# 201 on a successful call.
if response.status_code != 201:
LOGGER.error(
"Error creating or updating a course certificate configuration in the Credentials IDA.\n"
f"config sent: {certificate_config}\nAdditional details: {response.text}"
)
response.raise_for_status()
# pylint: disable=unused-argument
@shared_task(
bind=True,
ignore_result=True,
autoretry_for=(Exception,),
max_retries=10,
retry_backoff=30,
retry_backoff_max=600,
retry_jitter=True,
)
@set_code_owner_attribute
def award_program_certificates(self, username): # lint-amnesty, pylint: disable=too-many-statements
"""
This task is designed to be called whenever a student's completion status changes with respect to one or more
courses (primarily, when a course certificate is awarded).
It will consult with a variety of APIs to determine whether or not the specified user should be awarded a program
certificate in one or more programs, and use the credentials service to create said certificates if so.
This task may also be invoked independently of any course completion status change - for example, to backpopulate
missing program credentials for a student.
If this function is moved, make sure to update it's entry in EXPLICIT_QUEUES in the settings files so it runs in the
correct queue.
Args:
username (str): The username of the student
Returns:
None
"""
# If the credentials config model is disabled for this feature, it may indicate a condition where processing of such
# tasks has been temporarily disabled. This is a recoverable situation, so let celery retry.
if not is_credentials_enabled():
error_msg = (
"Task award_program_certificates cannot be executed, use of the Credentials service is disabled by config"
)
LOGGER.warning(error_msg)
raise MaxRetriesExceededError(f"Failed to award a program certificate. Reason: {error_msg}")
try:
student = User.objects.get(username=username)
except User.DoesNotExist:
LOGGER.warning(
"Task award_program_certificates was called with an invalid username. Could not retrieve a User instance "
f"with username {username}"
)
return
# this check will prevent unnecessary logging for partners without program certificates
programs_without_certificates = configuration_helpers.get_value("programs_without_certificates", [])
if programs_without_certificates:
if str(programs_without_certificates[0]).lower() == "all":
return
LOGGER.info(f"Running task award_program_certificates for user {student.id}")
try:
completed_programs = {}
for site in Site.objects.all():
completed_programs.update(get_completed_programs(site, student))
if not completed_programs:
LOGGER.warning(
f"Task award_program_certificates was called for user {student.id} with no completed programs"
)
return
# determine which program certificates have been awarded to the user
existing_program_uuids = get_certified_programs(student)
# construct a list of program UUIDs where the learner has already been awarded a program certificate or if the
# program is part of the "programs without certificates" list in our site configuration
awarded_and_skipped_program_uuids = list(set(existing_program_uuids + list(programs_without_certificates)))
except Exception as exc:
error_msg = f"Failed to determine program certificates to be awarded for user {student.id}: {exc}"
LOGGER.exception(error_msg)
raise MaxRetriesExceededError(
f"Failed to award a program certificate to user {student.id}. Reason: {error_msg}"
) from exc
# For each completed program for which the student doesn't already have a certificate, award one now.
#
# This logic is important, because we will retry the whole task if awarding any particular program cert fails.
#
# N.B. the list is sorted to facilitate deterministic ordering, e.g. for tests.
new_program_uuids = sorted(list(set(completed_programs.keys()) - set(awarded_and_skipped_program_uuids)))
if new_program_uuids:
try:
credentials_client = get_credentials_api_client(
User.objects.get(username=settings.CREDENTIALS_SERVICE_USERNAME),
)
except Exception as exc:
error_msg = "Failed to create a credentials API client to award program certificates"
LOGGER.exception(error_msg)
# A misconfiguration could be fixed; let celery retry.
raise MaxRetriesExceededError(
f"Failed to award a program certificate to user {student.id}. Reason: {error_msg}"
) from exc
failed_program_certificate_award_attempts = []
for program_uuid in new_program_uuids:
try:
award_program_certificate(credentials_client, student, program_uuid)
LOGGER.info(f"Awarded program certificate to user {student.id} in program {program_uuid}")
except HTTPError as exc:
if exc.response.status_code == 404:
LOGGER.warning(
f"Unable to award a program certificate to user {student.id} in program {program_uuid}. A "
f"certificate configuration for program {program_uuid} could not be found, the program might "
"not be configured correctly in Credentials"
)
elif exc.response.status_code == 429:
# Let celery handle retry attempts and backoff
error_msg = (
f"Rate limited. Attempting to award certificate to user {student.id} in program {program_uuid}."
)
LOGGER.warning(error_msg)
raise MaxRetriesExceededError(
f"Failed to award a program certificate to user {student.id}. Reason: {error_msg}"
) from exc
else:
LOGGER.warning(
f"Unable to award program certificate to user {student.id} in program {program_uuid}. The "
"program might not be configured correctly in Credentials"
)
except Exception as exc: # pylint: disable=broad-except
# keep trying to award other certs, but let celery retry the whole task to fix any missing entries
LOGGER.exception(
f"Failed to award program certificate to user {student.id} in program {program_uuid}: {exc}"
)
failed_program_certificate_award_attempts.append(program_uuid)
if failed_program_certificate_award_attempts:
# N.B. This logic assumes that this task is idempotent
LOGGER.info(f"Retrying failed tasks to award program certificate(s) to user {student.id}")
# The error message may change on each reattempt but will never be raised until the max number of retries
# have been exceeded. It is unlikely that this list will change by the time it reaches its maximimum number
# of attempts.
error_msg = (
f"Failed to award program certificate(s) for user {student.id} in programs "
f"{failed_program_certificate_award_attempts}"
)
raise MaxRetriesExceededError(
f"Failed to award a program certificate to user {student.id}. Reason: {error_msg}"
)
else:
LOGGER.warning(f"User {student.id} is not eligible for any new program certificates")
LOGGER.info(f"Successfully completed the task award_program_certificates for user {student.id}")
# pylint: disable=W0613
@shared_task(
bind=True,
ignore_result=True,
autoretry_for=(Exception,),
max_retries=10,
retry_backoff=30,
retry_backoff_max=600,
retry_jitter=True,
)
@set_code_owner_attribute
def update_credentials_course_certificate_configuration_available_date(
self, course_key, certificate_available_date=None
):
"""
This task will update the CourseCertificate configuration's available date
in Credentials.
Arguments:
course_run_key (str): The course run key to award the certificate for
certificate_available_date (str): A string representation of the
datetime for when to make the certificate available to the user. If
not provided, it will be None.
"""
LOGGER.info(
f"Running task `update_credentials_course_certificate_configuration_available_date` for course {course_key} "
f"with certificate_available_date {certificate_available_date}"
)
course_key = str(course_key)
course_modes = CourseMode.objects.filter(course_id=course_key)
# There should only ever be one certificate relevant mode per course run
modes = [
mode.slug
for mode in course_modes
if mode.slug in CourseMode.CERTIFICATE_RELEVANT_MODES or CourseMode.is_eligible_for_certificate(mode.slug)
]
if len(modes) != 1:
LOGGER.exception(f"Either course {course_key} has no certificate mode or multiple modes. Task failed.")
return
credentials_client = get_credentials_api_client(
User.objects.get(username=settings.CREDENTIALS_SERVICE_USERNAME),
)
cert_config = {
"course_id": course_key,
"mode": modes[0],
}
post_course_certificate_configuration(
client=credentials_client,
cert_config=cert_config,
certificate_available_date=certificate_available_date,
)
@shared_task(
bind=True,
ignore_result=True,
autoretry_for=(Exception,),
max_retries=10,
retry_backoff=30,
retry_backoff_max=600,
retry_jitter=True,
)
@set_code_owner_attribute
def award_course_certificate(self, username, course_run_key):
"""
This task is designed to be called whenever a student GeneratedCertificate is updated, or when a course-run's
`certificate_available_date` value is updated.
It can be called independently for a username and a course_run, but is invoked on each GeneratedCertificate.save.
If this function is moved, make sure to update its entry in EXPLICIT_QUEUES in the settings files so it runs in the
correct queue.
Arguments:
username (str): The user to award the Credentials course cert to
course_run_key (str): The course run key to award the certificate for
"""
# If the credentials config model is disabled for this feature, it may indicate a condition where processing of such
# tasks has been temporarily disabled. This is a recoverable situation, let celery retry.
if not is_credentials_enabled():
error_msg = (
"Task award_course_certificate cannot be executed when credentials issuance is disabled in API config"
)
LOGGER.warning(error_msg)
raise MaxRetriesExceededError(f"Failed to award course certificate. Reason: {error_msg}")
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
LOGGER.warning(
"Task award_course_certificate was called with an invalid username. Could not retrieve a User instance "
f"with username {username}"
)
return
LOGGER.info(f"Running task award_course_certificate for user {user.id}")
try:
course_key = CourseKey.from_string(course_run_key)
except InvalidKeyError as exc:
error_msg = "Failed to determine course key"
LOGGER.warning(
f"Failed to award course certificate for user {user.id} for course {course_run_key}. Reason: {error_msg}"
)
return
# Get the cert for the course key and username if it's both passing and available in professional/verified
certificate = get_eligible_certificate(user=user, course_id=course_key)
if certificate is None:
LOGGER.warning(
f"Task award_course_certificate was called for user {user.id} in course run {course_key} but this learner "
"has not earned a course certificate in this course run"
)
return
try:
if (
certificate.mode not in CourseMode.CERTIFICATE_RELEVANT_MODES
and not CourseMode.is_eligible_for_certificate(certificate.mode)
):
LOGGER.warning(
f"Task award_course_certificate was called for user {user.id} in course run {course_key} but "
f"this course has an ineligible mode of {certificate.mode} for a certificate on this instance."
)
return
except Exception as exc:
error_msg = f"Failed to determine course mode certificate eligibility for {certificate}."
LOGGER.error(error_msg)
raise MaxRetriesExceededError(
f"Failed to award course certificate for user {user.id} for course {course_run_key}. Reason: {error_msg}"
) from exc
course_overview = get_course_overview_or_none(course_key)
if not course_overview:
LOGGER.warning(
f"Task award_course_certificate was called for user {user.id} in course {course_key} but no course "
"overview could be retrieved for the course run"
)
return
# If the certificate has an associated CertificateDateOverride, send it along
try:
date_override = certificate.date_override.date # type: Optional["datetime"]
LOGGER.info(
f"Task award_course_certificate will award a course certificate to user {user.id} in course run "
f"{course_key} with an override date of {date_override}"
)
except ObjectDoesNotExist:
date_override = None # type: Optional["datetime"]
try:
credentials_client = get_credentials_api_client(
User.objects.get(username=settings.CREDENTIALS_SERVICE_USERNAME),
)
post_course_certificate(
credentials_client,
username,
certificate,
date_override,
org=course_key.org,
)
except Exception as exc:
error_msg = f"Failed to post course certificate to be awarded for user {user.id}."
raise MaxRetriesExceededError(
f"Failed to award course certificate for user {user.id} for course {course_run_key}. Reason: {error_msg}"
) from exc
# Successfully posted the cert to credentials
LOGGER.info(f"Awarded a course certificate to user {user.id} in course run {course_key}")
@shared_task(
bind=True,
ignore_result=True,
autoretry_for=(Exception,),
max_retries=10,
retry_backoff=30,
retry_backoff_max=600,
retry_jitter=True,
)
@set_code_owner_attribute
def revoke_program_certificates(self, username, course_key): # lint-amnesty, pylint: disable=too-many-statements
"""
This task is designed to be called whenever a student's course certificate is revoked.
It will consult with a variety of APIs to determine whether or not the specified user's certificate should be
revoked in one or more programs, and use the credentials service to revoke the said certificates if so.
If this function is moved, make sure to update its entry in EXPLICIT_QUEUES in the settings files so it runs in the
correct queue.
Args:
username (str): The username of the student
course_key (str): The course identifier
Returns:
None
"""
# If the credentials config model is disabled for this feature, it may indicate a condition where processing of such
# tasks has been temporarily disabled. Since this is a recoverable situation, let celery retry.
if not is_credentials_enabled():
error_msg = (
"Task revoke_program_certificates cannot be executed, use of the Credentials service is disabled by config"
)
LOGGER.warning(error_msg)
raise MaxRetriesExceededError(f"Failed to revoke program certificate. Reason: {error_msg}")
try:
student = User.objects.get(username=username)
except User.DoesNotExist:
LOGGER.warning(
"Task revoke_program_certificates was called with an invalid username. Could not retrieve a User instance "
f"with username {username}"
)
return
LOGGER.info(f"Running task revoke_program_certificates for user {student.id}")
try:
inverted_programs = get_inverted_programs(student)
course_specific_programs = inverted_programs.get(course_key)
if not course_specific_programs:
LOGGER.warning(
f"Task revoke_program_certificates was called for user {student.id} and course run {course_key} with "
"no engaged programs"
)
return
# Determine which program certificates the user has already been awarded, if any.
program_uuids_to_revoke = get_revokable_program_uuids(course_specific_programs, student)
except Exception as exc:
error_msg = (
f"Failed to determine if any program certificates associated with course run {course_key} should be "
f"revoked from user {student.id}"
)
LOGGER.exception(error_msg)
raise MaxRetriesExceededError(
f"Failed to revoke program certificate for user {student.id} for course {course_key}. Reason: {error_msg}"
) from exc
if program_uuids_to_revoke:
try:
credentials_client = get_credentials_api_client(
User.objects.get(username=settings.CREDENTIALS_SERVICE_USERNAME),
)
except Exception as exc:
error_msg = "Failed to create a credentials API client to revoke program certificates"
LOGGER.exception(error_msg)
# Stil retryable because a misconfiguration could be fixed
raise MaxRetriesExceededError(
f"Failed to revoke program certificate for user {student.id} for course {course_key}. Reason: {exc}"
) from exc
failed_program_certificate_revoke_attempts = []
for program_uuid in program_uuids_to_revoke:
try:
revoke_program_certificate(credentials_client, username, program_uuid)
LOGGER.info(f"Revoked program certificate from user {student.id} in program {program_uuid}")
except HTTPError as exc:
if exc.response.status_code == 404:
LOGGER.warning(
f"Unable to revoke program certificate from user {student.id} in program {program_uuid}, a "
"program certificate could not be found"
)
elif exc.response.status_code == 429:
# Let celery handle retry attempts and backoff
error_msg = (
f"Rate limited. Attempting to revoke a program certificate from user {student.id} in program "
f"{program_uuid}."
)
LOGGER.warning(error_msg)
raise MaxRetriesExceededError(
f"Failed to revoke program certificate for user {student.id} Reason: {error_msg}"
) from exc
else:
LOGGER.warning(
f"Unable to revoke program certificate from user {student.id} in program {program_uuid}"
)
except Exception as exc: # pylint: disable=broad-except
# keep trying to revoke other certs, but let celery retry the whole task to fix any missing entries
LOGGER.exception(
f"Failed to revoke program certificate from user {student.id} in program {program_uuid}: {exc}"
)
failed_program_certificate_revoke_attempts.append(program_uuid)
if failed_program_certificate_revoke_attempts:
# N.B. This logic assumes that this task is idempotent
LOGGER.info(f"Failed task to revoke program certificate(s) from user {student .id}")
# The error message may change on each reattempt but will never be raised until the max number of retries
# have been exceeded. It is unlikely that this list will change by the time it reaches its maximimum number
# of attempts.
error_msg = (
f"Failed to revoke program certificate(s) from user {student.id} for programs "
f"{failed_program_certificate_revoke_attempts}"
)
raise MaxRetriesExceededError(
f"Failed to revoke program certificate for user {student.id} for course {course_key}. "
f"Reason: {error_msg}"
)
else:
LOGGER.info(f"No program certificates to revoke from user {student.id}")
LOGGER.info(f"Successfully completed the task revoke_program_certificates for user {student.id}")
@shared_task(
bind=True,
ignore_result=True,
autoretry_for=(Exception,),
max_retries=10,
retry_backoff=30,
retry_backoff_max=600,
retry_jitter=True,
)
@set_code_owner_attribute
def update_certificate_available_date_on_course_update(self, course_key):
"""
This task is designed to be enqueued whenever a course run's Certificate Display Behavior (CDB) or Certificate
Available Date (CAD) has been updated in the CMS.
When executed, this task is responsible for enqueuing an additional subtask responsible for syncing the updated CAD
value in the Credentials IDA's internal records.
Args:
course_key(str): The course run's identifier
"""
# If the CredentialsApiConfig configuration model is disabled for this feature, it may indicate a condition where
# processing of such tasks has been temporarily disabled. Since this is a recoverable situation, mark this task for
# retry instead of failing it.
if not is_credentials_enabled():
error_msg = (
"Cannot execute the `update_certificate_available_date_on_course_update` task. Issuing user credentials "
"through the Credentials IDA is disabled."
)
LOGGER.warning(error_msg)
raise MaxRetriesExceededError(
"Failed to update the `certificate_available_date` in the Credentials service for course-run "
f"{course_key}. Reason: {error_msg}"
)
course_overview = get_course_overview_or_none(course_key)
if not course_overview:
LOGGER.warning(
f"Unable to send the updated certificate available date of course run [{course_key}] to Credentials. A "
"course overview for this course run could not be found"
)
return
# When updating the certificate available date of instructor-paced course runs,
# - If the display behavior is set to "A date after the course end date" (END_WITH_DATE), we should send the
# certificate available date set by the course team in Studio (and stored as part of the course runs Course
# Overview)
# - If the display behavior is set to "End date of course" (END), we should send the end date of the course run
# as the certificate available date. We send the end date because the Credentials IDA doesn't understand the
# concept of course pacing and needs an explicit date in order to correctly gate the visibility of course and
# program certificates.
# - If the display behavior is set to "Immediately upon passing" (EARLY_NO_INFO), we should always send None for
# the course runs certificate available date. A course run configured with this display behavior must not have a
# certificate available date associated with or the Credentials system will incorrectly hide certificates from
# learners.
if course_overview.self_paced is False:
if course_overview.certificates_display_behavior == CertificatesDisplayBehaviors.END_WITH_DATE:
new_certificate_available_date = str(course_overview.certificate_available_date)
elif course_overview.certificates_display_behavior == CertificatesDisplayBehaviors.END:
new_certificate_available_date = str(course_overview.end) # `end_date` is deprecated, use `end` instead
elif course_overview.certificates_display_behavior == CertificatesDisplayBehaviors.EARLY_NO_INFO:
new_certificate_available_date = None
# Else, this course run is self-paced, and having a certificate available date associated with a self-paced course
# run is not allowed. Course runs with this type of pacing should always award a certificate to learners immediately
# upon passing. If the system detects that an update must be sent to Credentials, we *always* send a certificate
# available date of `None`. We are aware of a defect that sometimes allows a certificate available date to be saved
# for a self-paced course run. This is an attempt to prevent bad data from being synced to the Credentials service
# too.
else:
new_certificate_available_date = None
update_credentials_course_certificate_configuration_available_date.delay(
str(course_key), new_certificate_available_date
)