feat!: upgrade generate_bulk_certificate_exceptions to drf ( 26 ) (#35577)

* feat!: upgrading api to DRF.
This commit is contained in:
Awais Qureshi
2025-03-14 15:48:06 +05:00
committed by GitHub
parent f90e59e52a
commit 62e5904286
2 changed files with 98 additions and 93 deletions

View File

@@ -3629,109 +3629,114 @@ class GenerateCertificateExceptions(DeveloperErrorViewMixin, APIView):
return JsonResponse(response_payload)
@cache_control(no_cache=True, no_store=True, must_revalidate=True)
@require_course_permission(permissions.GENERATE_BULK_CERTIFICATE_EXCEPTIONS)
@require_POST
def generate_bulk_certificate_exceptions(request, course_id):
@method_decorator(cache_control(no_cache=True, no_store=True, must_revalidate=True), name='dispatch')
class GenerateBulkCertificateExceptions(APIView):
"""
Adds students to the certificate allowlist using data from the uploaded CSV file.
Arguments:
request (WSGIRequest): Django HTTP request object.
course_id (string): Course-Run key
Returns:
dict:
{
general_errors: [errors related to csv file e.g. csv uploading, csv attachment, content reading, etc. ],
row_errors: {
data_format_error: [users/data in csv file that are not well formatted],
user_not_exist: [users that cannot be found in the LMS],
user_already_allowlisted: [users that already appear on the allowlist of this course-run],
user_not_enrolled: [users that are not currently enrolled in this course-run],
user_on_certificate_invalidation_list: [users that have an active certificate invalidation in this
course-run]
},
success: [list of users sucessfully added to the certificate allowlist]
}
"""
user_index = 0
notes_index = 1
row_errors_key = [
'data_format_error',
'user_not_exist',
'user_already_allowlisted',
'user_not_enrolled',
'user_on_certificate_invalidation_list'
]
course_key = CourseKey.from_string(course_id)
students, general_errors, success = [], [], []
row_errors = {key: [] for key in row_errors_key}
def build_row_errors(key, _user, row_count):
permission_classes = (IsAuthenticated, permissions.InstructorPermission)
permission_name = permissions.GENERATE_BULK_CERTIFICATE_EXCEPTIONS
@method_decorator(ensure_csrf_cookie)
def post(self, request, course_id):
"""
inner method to build dict of csv data as row errors.
Arguments:
request (WSGIRequest): Django HTTP request object.
course_id (string): Course-Run key
Returns:
dict:
{
general_errors: [errors related to csv file e.g. csv uploading, csv attachment, content reading, etc. ],
row_errors: {
data_format_error: [users/data in csv file that are not well formatted],
user_not_exist: [users that cannot be found in the LMS],
user_already_allowlisted: [users that already appear on the allowlist of this course-run],
user_not_enrolled: [users that are not currently enrolled in this course-run],
user_on_certificate_invalidation_list: [users that have an active certificate invalidation in this
course-run]
},
success: [list of users sucessfully added to the certificate allowlist]
}
"""
row_errors[key].append(_('user "{user}" in row# {row}').format(user=_user, row=row_count))
user_index = 0
notes_index = 1
row_errors_key = [
'data_format_error',
'user_not_exist',
'user_already_allowlisted',
'user_not_enrolled',
'user_on_certificate_invalidation_list'
]
course_key = CourseKey.from_string(course_id)
students, general_errors, success = [], [], []
row_errors = {key: [] for key in row_errors_key}
if 'students_list' in request.FILES:
try:
upload_file = request.FILES.get('students_list')
if upload_file.name.endswith('.csv'):
students = list(csv.reader(upload_file.read().decode('utf-8-sig').splitlines()))
else:
general_errors.append(_('Make sure that the file you upload is in CSV format with no '
'extraneous characters or rows.'))
except Exception: # pylint: disable=broad-except
general_errors.append(_('Could not read uploaded file.'))
finally:
upload_file.close()
def build_row_errors(key, _user, row_count):
"""
inner method to build dict of csv data as row errors.
"""
row_errors[key].append(_('user "{user}" in row# {row}').format(user=_user, row=row_count))
row_num = 0
for student in students:
row_num += 1
# verify that we have exactly two column in every row either email or username and notes but allow for
# blank lines
if len(student) != 2:
if student:
build_row_errors('data_format_error', student[user_index], row_num)
log.info(f'Invalid data/format in csv row# {row_num}')
continue
user = student[user_index]
if 'students_list' in request.FILES:
try:
user = get_user_by_username_or_email(user)
except ObjectDoesNotExist:
build_row_errors('user_not_exist', user, row_num)
log.info(f'Student {user} does not exist')
else:
# make sure learner doesn't have an active certificate invalidation
if certs_api.is_certificate_invalidated(user, course_key):
build_row_errors('user_on_certificate_invalidation_list', user, row_num)
log.warning(f'Student {user.id} is blocked from receiving a Certificate in Course {course_key}')
# make sure learner isn't already on the allowlist
elif certs_api.is_on_allowlist(user, course_key):
build_row_errors('user_already_allowlisted', user, row_num)
log.warning(f'Student {user.id} already appears on the allowlist in Course {course_key}.')
# make sure user is enrolled in course
elif not is_user_enrolled_in_course(user, course_key):
build_row_errors('user_not_enrolled', user, row_num)
log.warning(f'Student {user.id} is not enrolled in Course {course_key}')
upload_file = request.FILES.get('students_list')
if upload_file.name.endswith('.csv'):
students = list(csv.reader(upload_file.read().decode('utf-8-sig').splitlines()))
else:
certs_api.create_or_update_certificate_allowlist_entry(
user,
course_key,
notes=student[notes_index]
)
success.append(_('user "{username}" in row# {row}').format(username=user.username, row=row_num))
else:
general_errors.append(_('File is not attached.'))
general_errors.append(_('Make sure that the file you upload is in CSV format with no '
'extraneous characters or rows.'))
except Exception: # pylint: disable=broad-except
general_errors.append(_('Could not read uploaded file.'))
finally:
upload_file.close()
results = {
'general_errors': general_errors,
'row_errors': row_errors,
'success': success
}
return JsonResponse(results)
row_num = 0
for student in students:
row_num += 1
# verify that we have exactly two column in every row either email or username and notes but allow for
# blank lines
if len(student) != 2:
if student:
build_row_errors('data_format_error', student[user_index], row_num)
log.info(f'Invalid data/format in csv row# {row_num}')
continue
user = student[user_index]
try:
user = get_user_by_username_or_email(user)
except ObjectDoesNotExist:
build_row_errors('user_not_exist', user, row_num)
log.info(f'Student {user} does not exist')
else:
# make sure learner doesn't have an active certificate invalidation
if certs_api.is_certificate_invalidated(user, course_key):
build_row_errors('user_on_certificate_invalidation_list', user, row_num)
log.warning(f'Student {user.id} is blocked from receiving a Certificate in Course {course_key}')
# make sure learner isn't already on the allowlist
elif certs_api.is_on_allowlist(user, course_key):
build_row_errors('user_already_allowlisted', user, row_num)
log.warning(f'Student {user.id} already appears on the allowlist in Course {course_key}.')
# make sure user is enrolled in course
elif not is_user_enrolled_in_course(user, course_key):
build_row_errors('user_not_enrolled', user, row_num)
log.warning(f'Student {user.id} is not enrolled in Course {course_key}')
else:
certs_api.create_or_update_certificate_allowlist_entry(
user,
course_key,
notes=student[notes_index]
)
success.append(_('user "{username}" in row# {row}').format(username=user.username, row=row_num))
else:
general_errors.append(_('File is not attached.'))
results = {
'general_errors': general_errors,
'row_errors': row_errors,
'success': success
}
return JsonResponse(results)
@method_decorator(cache_control(no_cache=True, no_store=True, must_revalidate=True), name='dispatch')

View File

@@ -88,7 +88,7 @@ urlpatterns = [
path('certificate_exception_view/', api.CertificateExceptionView.as_view(), name='certificate_exception_view'),
re_path(r'^generate_certificate_exceptions/(?P<generate_for>[^/]*)', api.GenerateCertificateExceptions.as_view(),
name='generate_certificate_exceptions'),
path('generate_bulk_certificate_exceptions', api.generate_bulk_certificate_exceptions,
path('generate_bulk_certificate_exceptions', api.GenerateBulkCertificateExceptions.as_view(),
name='generate_bulk_certificate_exceptions'),
path(
'certificate_invalidation_view/',