""" Utils for discussion API. """ import logging from datetime import datetime from typing import Callable, Dict, List import requests from crum import get_current_request from django.conf import settings from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user from django.core.paginator import Paginator from django.db import transaction from django.db.models.functions import Length from pytz import UTC from common.djangoapps.student.roles import CourseInstructorRole, CourseStaffRole from common.djangoapps.student.models import CourseAccessRole from openedx.core.djangoapps.django_comment_common.comment_client.thread import Thread from lms.djangoapps.discussion.config.settings import ENABLE_CAPTCHA_IN_DISCUSSION from lms.djangoapps.discussion.django_comment_client.utils import has_discussion_privileges from openedx.core.djangoapps.discussions.models import DiscussionsConfiguration, PostingRestriction from openedx.core.djangoapps.django_comment_common.models import ( FORUM_ROLE_ADMINISTRATOR, FORUM_ROLE_COMMUNITY_TA, FORUM_ROLE_GROUP_MODERATOR, FORUM_ROLE_MODERATOR, FORUM_ROLE_STUDENT, Role ) from ..django_comment_client.utils import get_user_role_names log = logging.getLogger(__name__) class AttributeDict(dict): """ Converts Dict Keys into Attributes """ __getattr__ = dict.__getitem__ __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def discussion_open_for_user(course, user): """ Check if the course discussion are open or not for user. Arguments: course: Course to check discussions for user: User to check for privileges in course """ discussions_posting_restrictions = DiscussionsConfiguration.get(course.id).posting_restrictions blackout_dates = course.get_discussion_blackout_datetimes() return ( is_posting_allowed(discussions_posting_restrictions, blackout_dates) or has_discussion_privileges(user, course.id) ) def set_attribute(threads, attribute, value): """ Iterates over the list of dicts and assigns the provided value to the given attribute Arguments: threads: List of threads (dict objects) attribute: the key for thread dict value: the value to assign to the thread attribute """ for thread in threads: thread[attribute] = value return threads def get_usernames_from_search_string(course_id, search_string, page_number, page_size): """ Gets usernames for all users in course that match string. Args: course_id (CourseKey): Course to check discussions for search_string (str): String to search matching page_number (int): Page number to fetch page_size (int): Number of items in each page Returns: page_matched_users (str): comma seperated usernames for the page matched_users_count (int): count of matched users in course matched_users_pages (int): pages of matched users in course """ matched_users_in_course = User.objects.filter( courseenrollment__course_id=course_id, username__icontains=search_string).order_by(Length('username').asc()).values_list('username', flat=True) if not matched_users_in_course: return '', 0, 0 matched_users_count = len(matched_users_in_course) paginator = Paginator(matched_users_in_course, page_size) page_matched_users = paginator.page(page_number) matched_users_pages = int(matched_users_count / page_size) return ','.join(page_matched_users), matched_users_count, matched_users_pages def get_usernames_for_course(course_id, page_number, page_size): """ Gets usernames for all users in course. Args: course_id (CourseKey): Course to check discussions for page_number (int): Page numbers to fetch page_size (int): Number of items in each page Returns: page_matched_users (str): comma seperated usernames for the page matched_users_count (int): count of matched users in course matched_users_pages (int): pages of matched users in course """ matched_users_in_course = User.objects.filter(courseenrollment__course_id=course_id, ) \ .order_by(Length('username').asc()).values_list('username', flat=True) if not matched_users_in_course: return '', 0, 0 matched_users_count = len(matched_users_in_course) paginator = Paginator(matched_users_in_course, page_size) page_matched_users = paginator.page(page_number) matched_users_pages = int(matched_users_count / page_size) return ','.join(page_matched_users), matched_users_count, matched_users_pages def add_stats_for_users_with_no_discussion_content(course_stats, users_in_course): """ Update users stats for users with no discussion stats available in course """ users_returned_from_api = [user['username'] for user in course_stats] user_list = users_in_course.split(',') users_with_no_discussion_content = set(user_list) ^ set(users_returned_from_api) updated_course_stats = course_stats for user in users_with_no_discussion_content: updated_course_stats.append({ 'username': user, 'threads': 0, 'replies': 0, 'responses': 0, 'active_flags': 0, 'inactive_flags': 0, }) updated_course_stats = sorted(updated_course_stats, key=lambda d: len(d['username'])) return updated_course_stats def get_course_staff_users_list(course_id): """ Gets user ids for Staff roles for course discussions. Roles Course Instructor and Course Staff. """ # TODO: cache course_staff_user_ids if we need to improve perf course_staff_user_ids = [] staff = list(CourseStaffRole(course_id).users_with_role().values_list('id', flat=True)) admins = list(CourseInstructorRole(course_id).users_with_role().values_list('id', flat=True)) course_staff_user_ids.extend(staff) course_staff_user_ids.extend(admins) return list(set(course_staff_user_ids)) def get_course_ta_users_list(course_id): """ Gets user ids for TA roles for course discussions. Roles include Community TA and Group Community TA. """ # TODO: cache ta_users_ids if we need to improve perf ta_users_ids = [ user.id for role in Role.objects.filter(name__in=[FORUM_ROLE_GROUP_MODERATOR, FORUM_ROLE_COMMUNITY_TA], course_id=course_id) for user in role.users.all() ] return ta_users_ids def get_moderator_users_list(course_id): """ Gets user ids for Moderator roles for course discussions. Roles include Discussion Administrator and Discussion Moderator. """ # TODO: cache moderator_user_ids if we need to improve perf moderator_user_ids = [ user.id for role in Role.objects.filter( name__in=[FORUM_ROLE_ADMINISTRATOR, FORUM_ROLE_MODERATOR], course_id=course_id ) for user in role.users.all() ] return moderator_user_ids def filter_topic_from_discussion_id(discussion_id, topics_list): """ Returns topic based on discussion id """ for topic in topics_list: if topic.get("id") == discussion_id: return topic return {} def create_discussion_children_from_ids(children_ids, blocks, topics): """ Takes ids of discussion and return discussion dictionary """ discussions = [] for child_id in children_ids: topic = blocks.get(child_id) if topic.get('type') == 'vertical': discussions_id = topic.get('discussions_id') topic = filter_topic_from_discussion_id(discussions_id, topics) if topic: discussions.append(topic) return discussions def create_blocks_params(course_usage_key, user): """ Returns param dict that is needed to get blocks """ return { 'usage_key': course_usage_key, 'user': user, 'depth': None, 'nav_depth': None, 'requested_fields': { 'display_name', 'student_view_data', 'children', 'discussions_id', 'type', 'block_types_filter' }, 'block_counts': set(), 'student_view_data': {'discussion'}, 'return_type': 'dict', 'block_types_filter': { 'discussion', 'chapter', 'vertical', 'sequential', 'course' } } def add_thread_stats_to_subsection(topics_list): """ Add topic stats at subsection by adding stats of all units in the topic """ for section in topics_list: for subsection in section.get('children', []): discussions = 0 questions = 0 for unit in subsection.get('children', []): thread_counts = unit.get('thread_counts', {}) discussions += thread_counts.get('discussion', 0) questions += thread_counts.get('question', 0) subsection['thread_counts'] = { 'discussion': discussions, 'question': questions, } def create_topics_v3_structure(blocks, topics): """ Create V3 topics structure from blocks and v2 topics """ non_courseware_topics = [ dict({**topic, 'courseware': False}) for topic in topics if topic.get('usage_key', '') is None ] courseware_topics = [] for key, value in blocks.items(): if value.get("type") == "chapter": value['courseware'] = True courseware_topics.append(value) value['children'] = create_discussion_children_from_ids( value.get('children', []), blocks, topics, ) subsections = value.get('children') for subsection in subsections: subsection['children'] = create_discussion_children_from_ids( subsection.get('children', []), blocks, topics, ) add_thread_stats_to_subsection(courseware_topics) structured_topics = non_courseware_topics + courseware_topics topic_ids = get_topic_ids_from_topics(topics) # Remove all topic ids that are contained in the structured topics for chapter in structured_topics: for sequential in chapter.get('children', []): for item in sequential['children']: topic_ids.remove(item['id']) archived_topics = { 'id': "archived", 'children': get_archived_topics(topic_ids, topics) } if archived_topics['children']: structured_topics.append(archived_topics) return remove_empty_sequentials(structured_topics) def remove_empty_sequentials(data): """ Removes all objects of type "sequential" from a nested list of objects if they have no children. After removing the empty sequentials, if the parent of the sequential is now empty, it will also be removed. Parameters: data (list): A list of nested objects to check and remove empty sequentials from. Returns: list: The modified list with empty sequentials removed. """ new_data = [] for obj in data: block_type = obj.get('type') if block_type != 'sequential' or (block_type == 'sequential' and obj.get('children')): if obj.get('children'): obj['children'] = remove_empty_sequentials(obj['children']) if obj['children'] or block_type != 'chapter': new_data.append(obj) else: if block_type != 'chapter': new_data.append(obj) return new_data def get_topic_ids_from_topics(topics: List[Dict[str, str]]) -> List[str]: """ This function takes a list of topics and returns a list of the topic ids. Args: - topics (List[Dict[str, str]]): A list of topic dictionaries. Each dictionary should have an 'id' field. Returns: - A list of topic ids, extracted from the input list of topics. """ return [topic['id'] for topic in topics] def get_archived_topics(filtered_topic_ids: List[str], topics: List[Dict[str, str]]) -> List[Dict[str, str]]: """ This function takes a list of topic ids and a list of topics, and returns the list of archived topics. A topic is considered archived if it has a non-null `usage_key` field. Args: - filtered_topic_ids (List[str]): A list of topic ids to filter on. - topics (List[Dict[str, str]]): A list of topic dictionaries. - Each dictionary should have a 'id' and a 'usage_key' field. Returns: - A list of archived topic dictionaries, with the same format as the input topics. """ archived_topics = [] for topic_id in filtered_topic_ids: for topic in topics: if topic['id'] == topic_id and topic['usage_key'] is not None: archived_topics.append(topic) return archived_topics def is_posting_allowed(posting_restrictions: str, blackout_schedules: List): """ Check if posting is allowed based on the given posting restrictions and blackout schedules. Args: posting_restrictions (str): Values would be "disabled", "scheduled" or "enabled". blackout_schedules (List[Dict[str, datetime]]): The list of blackout schedules Returns: bool: True if posting is allowed, False otherwise. """ now = datetime.now(UTC) if posting_restrictions == PostingRestriction.DISABLED: return True elif posting_restrictions == PostingRestriction.SCHEDULED: return not any(schedule["start"] <= now <= schedule["end"] for schedule in blackout_schedules) else: return False def can_user_notify_all_learners(user_roles, is_course_staff, is_course_admin): """ Check if user posting is allowed to notify all learners based on the given restrictions Args: user_roles (Dict): Roles of the posting user is_course_staff (Boolean): Whether the user has a course staff access. is_course_admin (Boolean): Whether the user has a course admin access. Returns: bool: True if posting for all learner is allowed to this user, False otherwise. """ is_staff_or_instructor = any([ user_roles.intersection({FORUM_ROLE_ADMINISTRATOR, FORUM_ROLE_MODERATOR}), is_course_staff, is_course_admin, ]) return is_staff_or_instructor def is_captcha_enabled(course_id) -> bool: """ Check if reCAPTCHA is enabled for discussion posts in the given course. """ return bool(ENABLE_CAPTCHA_IN_DISCUSSION.is_enabled(course_id) and settings.RECAPTCHA_PRIVATE_KEY) def get_course_id_from_thread_id(thread_id: str) -> str: """ Get course id from thread id. """ thread = Thread(id=thread_id).retrieve(**{ 'with_responses': False, 'mark_as_read': False }) return thread["course_id"] def is_only_student(course_key, user) -> bool: """ Check if the user is only a user and doesn't hold any other roles the given course. """ is_course_staff_or_admin = (CourseAccessRole.objects.filter (user=user, course_id__in=[course_key], role__in=["instructor", "staff", "limited_staff"] ).exists()) is_user_admin = user.is_staff user_roles = get_user_role_names(user, course_key) return user_roles == {FORUM_ROLE_STUDENT} and not (is_course_staff_or_admin or is_user_admin) def verify_recaptcha_token(token: str) -> bool: """ Assess the reCAPTCHA token using Google reCAPTCHA Enterprise API. Logs success or error and returns True if an error occurs, along with logging the error. """ try: site_key = get_captcha_site_key_by_platform(get_platform_from_request()) url = (f"https://recaptchaenterprise.googleapis.com/v1/projects/{settings.RECAPTCHA_PROJECT_ID}/assessments" f"?key={settings.RECAPTCHA_PRIVATE_KEY}") data = { "event": { "token": token, "siteKey": site_key, } } response = requests.post(url, json=data, timeout=10).json() if response.get('tokenProperties', {}).get('valid'): logging.info("reCAPTCHA token assessment successful. Token is valid.") return True elif response.get('error'): logging.error(f"reCAPTCHA token assessment failed: {response['error']}.") return True else: logging.error(f"reCAPTCHA token assessment failed: Invalid token.{response}.") return False except requests.exceptions.RequestException as e: logging.error(f"Network or API error during reCAPTCHA assessment: {e}") return True except KeyError as e: logging.error(f"Unexpected response format from reCAPTCHA API. Missing key: {e}. Full response: {response}") return True except Exception as e: # lint-amnesty, pylint: disable=broad-except logging.error(f"An unexpected error occurred during reCAPTCHA assessment: {e}", exc_info=True) return True def get_platform_from_request(): """ get Mobile-Platform-Identifier header value from request """ return get_current_request().headers.get('Mobile-Platform-Identifier', 'web') def get_captcha_site_key_by_platform(platform: str) -> str | None: """ Get reCAPTCHA site key based on the platform. """ return settings.RECAPTCHA_SITE_KEYS.get(platform, None) def send_signal_after_commit(signal_func: Callable): """ Schedule a signal to be sent after the current database transaction commits. This helper ensures that signals are only sent after the transaction commits, preventing race conditions where async tasks (like Celery workers) may try to access database records before they are visible (especially important for MySQL backend with transaction isolation). Args: signal_func: A callable that sends the signal. This will be executed after the transaction commits. Example: send_signal_after_commit( lambda: thread_created.send(sender=None, user=user, post=thread, notify_all_learners=False) ) """ transaction.on_commit(signal_func)