Files
Taimoor Ahmed 56f7da908a fix: send thread_created signal after transaction commit (#37675)
Prevents notification failures with MySQL backend by ensuring signals
are only sent after database transactions commit. This fixes race
conditions where Celery workers couldn't see newly created threads.

- Added send_signal_after_commit() helper function
- Updated both thread creation paths to use the helper

Co-authored-by: Taimoor  Ahmed <taimoor.ahmed@A006-01711.local>
2025-11-26 11:43:45 +05:00

521 lines
18 KiB
Python

"""
Utils for discussion API.
"""
import logging
from datetime import datetime
from typing import Callable, Dict, List
import requests
from crum import get_current_request
from django.conf import settings
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
from django.core.paginator import Paginator
from django.db import transaction
from django.db.models.functions import Length
from pytz import UTC
from common.djangoapps.student.roles import CourseInstructorRole, CourseStaffRole
from common.djangoapps.student.models import CourseAccessRole
from openedx.core.djangoapps.django_comment_common.comment_client.thread import Thread
from lms.djangoapps.discussion.config.settings import ENABLE_CAPTCHA_IN_DISCUSSION
from lms.djangoapps.discussion.django_comment_client.utils import has_discussion_privileges
from openedx.core.djangoapps.discussions.models import DiscussionsConfiguration, PostingRestriction
from openedx.core.djangoapps.django_comment_common.models import (
FORUM_ROLE_ADMINISTRATOR,
FORUM_ROLE_COMMUNITY_TA,
FORUM_ROLE_GROUP_MODERATOR,
FORUM_ROLE_MODERATOR,
FORUM_ROLE_STUDENT,
Role
)
from ..django_comment_client.utils import get_user_role_names
log = logging.getLogger(__name__)
class AttributeDict(dict):
"""
Converts Dict Keys into Attributes
"""
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def discussion_open_for_user(course, user):
"""
Check if the course discussion are open or not for user.
Arguments:
course: Course to check discussions for
user: User to check for privileges in course
"""
discussions_posting_restrictions = DiscussionsConfiguration.get(course.id).posting_restrictions
blackout_dates = course.get_discussion_blackout_datetimes()
return (
is_posting_allowed(discussions_posting_restrictions, blackout_dates) or
has_discussion_privileges(user, course.id)
)
def set_attribute(threads, attribute, value):
"""
Iterates over the list of dicts and assigns the provided value to the given attribute
Arguments:
threads: List of threads (dict objects)
attribute: the key for thread dict
value: the value to assign to the thread attribute
"""
for thread in threads:
thread[attribute] = value
return threads
def get_usernames_from_search_string(course_id, search_string, page_number, page_size):
"""
Gets usernames for all users in course that match string.
Args:
course_id (CourseKey): Course to check discussions for
search_string (str): String to search matching
page_number (int): Page number to fetch
page_size (int): Number of items in each page
Returns:
page_matched_users (str): comma seperated usernames for the page
matched_users_count (int): count of matched users in course
matched_users_pages (int): pages of matched users in course
"""
matched_users_in_course = User.objects.filter(
courseenrollment__course_id=course_id,
username__icontains=search_string).order_by(Length('username').asc()).values_list('username', flat=True)
if not matched_users_in_course:
return '', 0, 0
matched_users_count = len(matched_users_in_course)
paginator = Paginator(matched_users_in_course, page_size)
page_matched_users = paginator.page(page_number)
matched_users_pages = int(matched_users_count / page_size)
return ','.join(page_matched_users), matched_users_count, matched_users_pages
def get_usernames_for_course(course_id, page_number, page_size):
"""
Gets usernames for all users in course.
Args:
course_id (CourseKey): Course to check discussions for
page_number (int): Page numbers to fetch
page_size (int): Number of items in each page
Returns:
page_matched_users (str): comma seperated usernames for the page
matched_users_count (int): count of matched users in course
matched_users_pages (int): pages of matched users in course
"""
matched_users_in_course = User.objects.filter(courseenrollment__course_id=course_id, ) \
.order_by(Length('username').asc()).values_list('username', flat=True)
if not matched_users_in_course:
return '', 0, 0
matched_users_count = len(matched_users_in_course)
paginator = Paginator(matched_users_in_course, page_size)
page_matched_users = paginator.page(page_number)
matched_users_pages = int(matched_users_count / page_size)
return ','.join(page_matched_users), matched_users_count, matched_users_pages
def add_stats_for_users_with_no_discussion_content(course_stats, users_in_course):
"""
Update users stats for users with no discussion stats available in course
"""
users_returned_from_api = [user['username'] for user in course_stats]
user_list = users_in_course.split(',')
users_with_no_discussion_content = set(user_list) ^ set(users_returned_from_api)
updated_course_stats = course_stats
for user in users_with_no_discussion_content:
updated_course_stats.append({
'username': user,
'threads': 0,
'replies': 0,
'responses': 0,
'active_flags': 0,
'inactive_flags': 0,
})
updated_course_stats = sorted(updated_course_stats, key=lambda d: len(d['username']))
return updated_course_stats
def get_course_staff_users_list(course_id):
"""
Gets user ids for Staff roles for course discussions.
Roles Course Instructor and Course Staff.
"""
# TODO: cache course_staff_user_ids if we need to improve perf
course_staff_user_ids = []
staff = list(CourseStaffRole(course_id).users_with_role().values_list('id', flat=True))
admins = list(CourseInstructorRole(course_id).users_with_role().values_list('id', flat=True))
course_staff_user_ids.extend(staff)
course_staff_user_ids.extend(admins)
return list(set(course_staff_user_ids))
def get_course_ta_users_list(course_id):
"""
Gets user ids for TA roles for course discussions.
Roles include Community TA and Group Community TA.
"""
# TODO: cache ta_users_ids if we need to improve perf
ta_users_ids = [
user.id
for role in Role.objects.filter(name__in=[FORUM_ROLE_GROUP_MODERATOR,
FORUM_ROLE_COMMUNITY_TA], course_id=course_id)
for user in role.users.all()
]
return ta_users_ids
def get_moderator_users_list(course_id):
"""
Gets user ids for Moderator roles for course discussions.
Roles include Discussion Administrator and Discussion Moderator.
"""
# TODO: cache moderator_user_ids if we need to improve perf
moderator_user_ids = [
user.id
for role in Role.objects.filter(
name__in=[FORUM_ROLE_ADMINISTRATOR, FORUM_ROLE_MODERATOR],
course_id=course_id
)
for user in role.users.all()
]
return moderator_user_ids
def filter_topic_from_discussion_id(discussion_id, topics_list):
"""
Returns topic based on discussion id
"""
for topic in topics_list:
if topic.get("id") == discussion_id:
return topic
return {}
def create_discussion_children_from_ids(children_ids, blocks, topics):
"""
Takes ids of discussion and return discussion dictionary
"""
discussions = []
for child_id in children_ids:
topic = blocks.get(child_id)
if topic.get('type') == 'vertical':
discussions_id = topic.get('discussions_id')
topic = filter_topic_from_discussion_id(discussions_id, topics)
if topic:
discussions.append(topic)
return discussions
def create_blocks_params(course_usage_key, user):
"""
Returns param dict that is needed to get blocks
"""
return {
'usage_key': course_usage_key,
'user': user,
'depth': None,
'nav_depth': None,
'requested_fields': {
'display_name',
'student_view_data',
'children',
'discussions_id',
'type',
'block_types_filter'
},
'block_counts': set(),
'student_view_data': {'discussion'},
'return_type': 'dict',
'block_types_filter': {
'discussion',
'chapter',
'vertical',
'sequential',
'course'
}
}
def add_thread_stats_to_subsection(topics_list):
"""
Add topic stats at subsection by adding stats of all units in
the topic
"""
for section in topics_list:
for subsection in section.get('children', []):
discussions = 0
questions = 0
for unit in subsection.get('children', []):
thread_counts = unit.get('thread_counts', {})
discussions += thread_counts.get('discussion', 0)
questions += thread_counts.get('question', 0)
subsection['thread_counts'] = {
'discussion': discussions,
'question': questions,
}
def create_topics_v3_structure(blocks, topics):
"""
Create V3 topics structure from blocks and v2 topics
"""
non_courseware_topics = [
dict({**topic, 'courseware': False})
for topic in topics
if topic.get('usage_key', '') is None
]
courseware_topics = []
for key, value in blocks.items():
if value.get("type") == "chapter":
value['courseware'] = True
courseware_topics.append(value)
value['children'] = create_discussion_children_from_ids(
value.get('children', []),
blocks,
topics,
)
subsections = value.get('children')
for subsection in subsections:
subsection['children'] = create_discussion_children_from_ids(
subsection.get('children', []),
blocks,
topics,
)
add_thread_stats_to_subsection(courseware_topics)
structured_topics = non_courseware_topics + courseware_topics
topic_ids = get_topic_ids_from_topics(topics)
# Remove all topic ids that are contained in the structured topics
for chapter in structured_topics:
for sequential in chapter.get('children', []):
for item in sequential['children']:
topic_ids.remove(item['id'])
archived_topics = {
'id': "archived",
'children': get_archived_topics(topic_ids, topics)
}
if archived_topics['children']:
structured_topics.append(archived_topics)
return remove_empty_sequentials(structured_topics)
def remove_empty_sequentials(data):
"""
Removes all objects of type "sequential" from a nested list of objects if they
have no children.
After removing the empty sequentials, if the parent of the sequential is now empty,
it will also be removed.
Parameters:
data (list): A list of nested objects to check and remove empty sequentials from.
Returns:
list: The modified list with empty sequentials removed.
"""
new_data = []
for obj in data:
block_type = obj.get('type')
if block_type != 'sequential' or (block_type == 'sequential' and obj.get('children')):
if obj.get('children'):
obj['children'] = remove_empty_sequentials(obj['children'])
if obj['children'] or block_type != 'chapter':
new_data.append(obj)
else:
if block_type != 'chapter':
new_data.append(obj)
return new_data
def get_topic_ids_from_topics(topics: List[Dict[str, str]]) -> List[str]:
"""
This function takes a list of topics and returns a list of the topic ids.
Args:
- topics (List[Dict[str, str]]): A list of topic dictionaries. Each dictionary should have an 'id' field.
Returns:
- A list of topic ids, extracted from the input list of topics.
"""
return [topic['id'] for topic in topics]
def get_archived_topics(filtered_topic_ids: List[str], topics: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""
This function takes a list of topic ids and a list of topics, and returns the list of archived topics.
A topic is considered archived if it has a non-null `usage_key` field.
Args:
- filtered_topic_ids (List[str]): A list of topic ids to filter on.
- topics (List[Dict[str, str]]): A list of topic dictionaries.
- Each dictionary should have a 'id' and a 'usage_key' field.
Returns:
- A list of archived topic dictionaries, with the same format as the input topics.
"""
archived_topics = []
for topic_id in filtered_topic_ids:
for topic in topics:
if topic['id'] == topic_id and topic['usage_key'] is not None:
archived_topics.append(topic)
return archived_topics
def is_posting_allowed(posting_restrictions: str, blackout_schedules: List):
"""
Check if posting is allowed based on the given posting restrictions and blackout schedules.
Args:
posting_restrictions (str): Values would be "disabled", "scheduled" or "enabled".
blackout_schedules (List[Dict[str, datetime]]): The list of blackout schedules
Returns:
bool: True if posting is allowed, False otherwise.
"""
now = datetime.now(UTC)
if posting_restrictions == PostingRestriction.DISABLED:
return True
elif posting_restrictions == PostingRestriction.SCHEDULED:
return not any(schedule["start"] <= now <= schedule["end"] for schedule in blackout_schedules)
else:
return False
def can_user_notify_all_learners(user_roles, is_course_staff, is_course_admin):
"""
Check if user posting is allowed to notify all learners based on the given restrictions
Args:
user_roles (Dict): Roles of the posting user
is_course_staff (Boolean): Whether the user has a course staff access.
is_course_admin (Boolean): Whether the user has a course admin access.
Returns:
bool: True if posting for all learner is allowed to this user, False otherwise.
"""
is_staff_or_instructor = any([
user_roles.intersection({FORUM_ROLE_ADMINISTRATOR, FORUM_ROLE_MODERATOR}),
is_course_staff,
is_course_admin,
])
return is_staff_or_instructor
def is_captcha_enabled(course_id) -> bool:
"""
Check if reCAPTCHA is enabled for discussion posts in the given course.
"""
return bool(ENABLE_CAPTCHA_IN_DISCUSSION.is_enabled(course_id) and settings.RECAPTCHA_PRIVATE_KEY)
def get_course_id_from_thread_id(thread_id: str) -> str:
"""
Get course id from thread id.
"""
thread = Thread(id=thread_id).retrieve(**{
'with_responses': False,
'mark_as_read': False
})
return thread["course_id"]
def is_only_student(course_key, user) -> bool:
"""
Check if the user is only a user and doesn't hold any other roles the given course.
"""
is_course_staff_or_admin = (CourseAccessRole.objects.filter
(user=user,
course_id__in=[course_key],
role__in=["instructor", "staff", "limited_staff"]
).exists())
is_user_admin = user.is_staff
user_roles = get_user_role_names(user, course_key)
return user_roles == {FORUM_ROLE_STUDENT} and not (is_course_staff_or_admin or is_user_admin)
def verify_recaptcha_token(token: str) -> bool:
"""
Assess the reCAPTCHA token using Google reCAPTCHA Enterprise API.
Logs success or error and returns True if an error occurs, along with logging the error.
"""
try:
site_key = get_captcha_site_key_by_platform(get_platform_from_request())
url = (f"https://recaptchaenterprise.googleapis.com/v1/projects/{settings.RECAPTCHA_PROJECT_ID}/assessments"
f"?key={settings.RECAPTCHA_PRIVATE_KEY}")
data = {
"event": {
"token": token,
"siteKey": site_key,
}
}
response = requests.post(url, json=data, timeout=10).json()
if response.get('tokenProperties', {}).get('valid'):
logging.info("reCAPTCHA token assessment successful. Token is valid.")
return True
elif response.get('error'):
logging.error(f"reCAPTCHA token assessment failed: {response['error']}.")
return True
else:
logging.error(f"reCAPTCHA token assessment failed: Invalid token.{response}.")
return False
except requests.exceptions.RequestException as e:
logging.error(f"Network or API error during reCAPTCHA assessment: {e}")
return True
except KeyError as e:
logging.error(f"Unexpected response format from reCAPTCHA API. Missing key: {e}. Full response: {response}")
return True
except Exception as e: # lint-amnesty, pylint: disable=broad-except
logging.error(f"An unexpected error occurred during reCAPTCHA assessment: {e}", exc_info=True)
return True
def get_platform_from_request():
"""
get Mobile-Platform-Identifier header value from request
"""
return get_current_request().headers.get('Mobile-Platform-Identifier', 'web')
def get_captcha_site_key_by_platform(platform: str) -> str | None:
"""
Get reCAPTCHA site key based on the platform.
"""
return settings.RECAPTCHA_SITE_KEYS.get(platform, None)
def send_signal_after_commit(signal_func: Callable):
"""
Schedule a signal to be sent after the current database transaction commits.
This helper ensures that signals are only sent after the transaction commits,
preventing race conditions where async tasks (like Celery workers) may try to
access database records before they are visible (especially important for MySQL
backend with transaction isolation).
Args:
signal_func: A callable that sends the signal. This will be executed
after the transaction commits.
Example:
send_signal_after_commit(
lambda: thread_created.send(sender=None, user=user, post=thread, notify_all_learners=False)
)
"""
transaction.on_commit(signal_func)