Update documentation, comments, and docstrings throughout the codebase to reflect the migration from setup.py to pyproject.toml: - Transformer class docstrings: changed to reference "entry point name in the package configuration" for better future-proofing - Block structure module docs: updated to reference pyproject.toml - Test file comments: updated entry point references - Config files (tox.ini, pytest.ini): updated references - Documentation (extension_points.rst, course apps ADRs): updated to reference pyproject.toml with inclusive language for external packages - Requirements documentation (github.in): updated with inclusive language - edxmako README: modernized install command to use pip install Historical ADRs and references to external packages that may still use setup.py were intentionally left unchanged or updated with inclusive language acknowledging both pyproject.toml and setup.py. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
305 lines
11 KiB
Python
305 lines
11 KiB
Python
"""Defines ``Group`` and ``UserPartition`` models for partitioning"""
|
|
|
|
|
|
from collections import namedtuple
|
|
|
|
from stevedore.extension import ExtensionManager
|
|
|
|
# We use ``id`` in this file as the IDs of our Groups and UserPartitions,
|
|
# which Pylint disapproves of.
|
|
# pylint: disable=redefined-builtin
|
|
|
|
|
|
# Each user partition has an ID that is unique within its learning context.
|
|
# The IDs must be valid MySQL primary keys, ie positive integers 1 -> 2^31-1.
|
|
# We must carefully manage these IDs, because once they are saved to OLX and the db, they cannot change.
|
|
# Here is how we delegate the ID range:
|
|
# * 1 -> 49: Unused/Reserved
|
|
# * 50: The enrollment track partition
|
|
# * 51: The content type gating partition (defined elsewhere)
|
|
# * 52-99: Available for other single user partitions, plugged in via entry points.
|
|
# Operators, beware of conflicting IDs between plugins!
|
|
# * 100 -> 2^31-1: General namespace for generating IDs at runtime.
|
|
# This includes, at least: content partitions, the cohort partition, and teamset partitions.
|
|
# When using this range, user partition implementations must check to see that they
|
|
# are not conflicting with an existing partition for the course.
|
|
ENROLLMENT_TRACK_PARTITION_ID = 50
|
|
MINIMUM_UNUSED_PARTITION_ID = 100
|
|
|
|
|
|
class UserPartitionError(Exception):
|
|
"""
|
|
Base Exception for when an error was found regarding user partitions.
|
|
"""
|
|
pass # lint-amnesty, pylint: disable=unnecessary-pass
|
|
|
|
|
|
class NoSuchUserPartitionError(UserPartitionError):
|
|
"""
|
|
Exception to be raised when looking up a UserPartition by its ID fails.
|
|
"""
|
|
pass # lint-amnesty, pylint: disable=unnecessary-pass
|
|
|
|
|
|
class NoSuchUserPartitionGroupError(UserPartitionError):
|
|
"""
|
|
Exception to be raised when looking up a UserPartition Group by its ID fails.
|
|
"""
|
|
pass # lint-amnesty, pylint: disable=unnecessary-pass
|
|
|
|
|
|
class ReadOnlyUserPartitionError(UserPartitionError):
|
|
"""
|
|
Exception to be raised when attempting to modify a read only partition.
|
|
"""
|
|
pass # lint-amnesty, pylint: disable=unnecessary-pass
|
|
|
|
|
|
class Group(namedtuple("Group", "id name")):
|
|
"""
|
|
An id and name for a group of students. The id should be unique
|
|
within the UserPartition this group appears in.
|
|
"""
|
|
# in case we want to add to this class, a version will be handy
|
|
# for deserializing old versions. (This will be serialized in courses)
|
|
VERSION = 1
|
|
|
|
def __new__(cls, id, name):
|
|
return super().__new__(cls, int(id), name)
|
|
|
|
def to_json(self):
|
|
"""
|
|
'Serialize' to a json-serializable representation.
|
|
|
|
Returns:
|
|
a dictionary with keys for the properties of the group.
|
|
"""
|
|
return {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"version": Group.VERSION
|
|
}
|
|
|
|
@staticmethod
|
|
def from_json(value):
|
|
"""
|
|
Deserialize a Group from a json-like representation.
|
|
|
|
Args:
|
|
value: a dictionary with keys for the properties of the group.
|
|
|
|
Raises TypeError if the value doesn't have the right keys.
|
|
"""
|
|
if isinstance(value, Group):
|
|
return value
|
|
|
|
for key in ("id", "name", "version"):
|
|
if key not in value:
|
|
raise TypeError("Group dict {} missing value key '{}'".format(
|
|
value, key))
|
|
|
|
if value["version"] != Group.VERSION:
|
|
raise TypeError("Group dict {} has unexpected version".format(
|
|
value))
|
|
|
|
return Group(value["id"], value["name"])
|
|
|
|
|
|
# The Stevedore extension point namespace for user partition scheme plugins.
|
|
USER_PARTITION_SCHEME_NAMESPACE = 'openedx.user_partition_scheme'
|
|
|
|
|
|
class UserPartition(namedtuple("UserPartition", "id name description groups scheme parameters active")):
|
|
"""A named way to partition users into groups, primarily intended for
|
|
running experiments. It is expected that each user will be in at most one
|
|
group in a partition.
|
|
|
|
A Partition has an id, name, scheme, description, parameters, and a list
|
|
of groups. The id is intended to be unique within the context where these
|
|
are used. (e.g., for partitions of users within a course, the ids should
|
|
be unique per-course). The scheme is used to assign users into groups.
|
|
The parameters field is used to save extra parameters e.g., location of
|
|
the block in case of VerificationPartitionScheme.
|
|
|
|
Partitions can be marked as inactive by setting the "active" flag to False.
|
|
Any group access rule referencing inactive partitions will be ignored
|
|
when performing access checks.
|
|
"""
|
|
VERSION = 3
|
|
|
|
# The collection of user partition scheme extensions.
|
|
scheme_extensions = None
|
|
|
|
# The default scheme to be used when upgrading version 1 partitions.
|
|
VERSION_1_SCHEME = "random"
|
|
|
|
def __new__(cls, id, name, description, groups, scheme=None, parameters=None, active=True,
|
|
scheme_id=VERSION_1_SCHEME):
|
|
if not scheme:
|
|
scheme = UserPartition.get_scheme(scheme_id)
|
|
if parameters is None:
|
|
parameters = {}
|
|
|
|
return super().__new__(cls, int(id), name, description, groups, scheme, parameters, active)
|
|
|
|
@staticmethod
|
|
def get_scheme(name):
|
|
"""
|
|
Returns the user partition scheme with the given name.
|
|
"""
|
|
# Note: we're creating the extension manager lazily to ensure that the Python path
|
|
# has been correctly set up. Trying to create this statically will fail, unfortunately.
|
|
if not UserPartition.scheme_extensions:
|
|
UserPartition.scheme_extensions = ExtensionManager(namespace=USER_PARTITION_SCHEME_NAMESPACE)
|
|
try:
|
|
scheme = UserPartition.scheme_extensions[name].plugin # lint-amnesty, pylint: disable=unsubscriptable-object
|
|
except KeyError:
|
|
raise UserPartitionError(f"Unrecognized scheme '{name}'") # lint-amnesty, pylint: disable=raise-missing-from
|
|
scheme.name = name
|
|
return scheme
|
|
|
|
def to_json(self):
|
|
"""
|
|
'Serialize' to a json-serializable representation.
|
|
|
|
Returns:
|
|
a dictionary with keys for the properties of the partition.
|
|
"""
|
|
return {
|
|
"id": self.id,
|
|
"name": self.name,
|
|
"scheme": self.scheme.name,
|
|
"description": self.description,
|
|
"parameters": self.parameters,
|
|
"groups": [g.to_json() for g in self.groups],
|
|
"active": bool(self.active),
|
|
"version": UserPartition.VERSION
|
|
}
|
|
|
|
@staticmethod
|
|
def from_json(value):
|
|
"""
|
|
Deserialize a Group from a json-like representation.
|
|
|
|
Args:
|
|
value: a dictionary with keys for the properties of the group.
|
|
|
|
Raises TypeError if the value doesn't have the right keys.
|
|
"""
|
|
if isinstance(value, UserPartition):
|
|
return value
|
|
|
|
for key in ("id", "name", "description", "version", "groups"):
|
|
if key not in value:
|
|
raise TypeError(f"UserPartition dict {value} missing value key '{key}'")
|
|
|
|
if value["version"] == 1:
|
|
# If no scheme was provided, set it to the default ('random')
|
|
scheme_id = UserPartition.VERSION_1_SCHEME
|
|
|
|
# Version changes should be backwards compatible in case the code
|
|
# gets rolled back. If we see a version number greater than the current
|
|
# version, we should try to read it rather than raising an exception.
|
|
elif value["version"] >= 2:
|
|
if "scheme" not in value:
|
|
raise TypeError(f"UserPartition dict {value} missing value key 'scheme'")
|
|
|
|
scheme_id = value["scheme"]
|
|
else:
|
|
raise TypeError(f"UserPartition dict {value} has unexpected version")
|
|
|
|
parameters = value.get("parameters", {})
|
|
active = value.get("active", True)
|
|
groups = [Group.from_json(g) for g in value["groups"]]
|
|
scheme = UserPartition.get_scheme(scheme_id)
|
|
if not scheme:
|
|
raise TypeError(f"UserPartition dict {value} has unrecognized scheme {scheme_id}")
|
|
|
|
if getattr(scheme, 'read_only', False):
|
|
raise ReadOnlyUserPartitionError(f"UserPartition dict {value} uses scheme {scheme_id} which is read only") # lint-amnesty, pylint: disable=line-too-long
|
|
|
|
if hasattr(scheme, "create_user_partition"):
|
|
return scheme.create_user_partition(
|
|
value["id"],
|
|
value["name"],
|
|
value["description"],
|
|
groups,
|
|
parameters,
|
|
active,
|
|
)
|
|
else:
|
|
return UserPartition(
|
|
value["id"],
|
|
value["name"],
|
|
value["description"],
|
|
groups,
|
|
scheme,
|
|
parameters,
|
|
active,
|
|
)
|
|
|
|
def get_group(self, group_id):
|
|
"""
|
|
Returns the group with the specified id.
|
|
|
|
Arguments:
|
|
group_id (int): ID of the partition group.
|
|
|
|
Raises:
|
|
NoSuchUserPartitionGroupError: The specified group could not be found.
|
|
|
|
"""
|
|
for group in self.groups:
|
|
if group.id == group_id:
|
|
return group
|
|
|
|
raise NoSuchUserPartitionGroupError(
|
|
"Could not find a Group with ID [{group_id}] in UserPartition [{partition_id}].".format(
|
|
group_id=group_id, partition_id=self.id
|
|
)
|
|
)
|
|
|
|
def access_denied_message(self, block_key, user, user_group, allowed_groups): # lint-amnesty, pylint: disable=unused-argument
|
|
"""
|
|
Return a message that should be displayed to the user when they are not allowed to access
|
|
content managed by this partition, or None if there is no applicable message.
|
|
|
|
Arguments:
|
|
block_key (:class:`.BlockUsageLocator`): The content being managed
|
|
user (:class:`.User`): The user who was denied access
|
|
user_group (:class:`.Group`): The current Group the user is in
|
|
allowed_groups (list of :class:`.Group`): The groups who are allowed to see the content
|
|
|
|
Returns: str
|
|
"""
|
|
return None
|
|
|
|
def access_denied_fragment(self, block, user, user_group, allowed_groups): # lint-amnesty, pylint: disable=unused-argument
|
|
"""
|
|
Return an html fragment that should be displayed to the user when they are not allowed to access
|
|
content managed by this partition, or None if there is no applicable message.
|
|
|
|
Arguments:
|
|
block (:class:`.XBlock`): The content being managed
|
|
user (:class:`.User`): The user who was denied access
|
|
user_group (:class:`.Group`): The current Group the user is in
|
|
allowed_groups (list of :class:`.Group`): The groups who are allowed to see the content
|
|
|
|
Returns: :class:`.Fragment`
|
|
"""
|
|
return None
|
|
|
|
|
|
def get_partition_from_id(partitions, user_partition_id):
|
|
"""
|
|
Look for a user partition with a matching id in the provided list of partitions.
|
|
|
|
Returns:
|
|
A UserPartition, or None if not found.
|
|
"""
|
|
for partition in partitions:
|
|
if partition.id == user_partition_id:
|
|
return partition
|
|
|
|
return None
|